diff --git a/docs/reference/modules/discovery/fault-detection.asciidoc b/docs/reference/modules/discovery/fault-detection.asciidoc index 2bbfc9f7f21f..7b368065afe1 100644 --- a/docs/reference/modules/discovery/fault-detection.asciidoc +++ b/docs/reference/modules/discovery/fault-detection.asciidoc @@ -201,7 +201,25 @@ logger.org.elasticsearch.cluster.coordination.LagDetector: DEBUG When this logger is enabled, {es} will attempt to run the <> API on the faulty node and report the results in -the logs on the elected master. +the logs on the elected master. The results are compressed, encoded, and split +into chunks to avoid truncation: + +[source,text] +---- +[DEBUG][o.e.c.c.LagDetector ] [master] hot threads from node [{node}{g3cCUaMDQJmQ2ZLtjr-3dg}{10.0.0.1:9300}] lagging at version [183619] despite commit of cluster state version [183620] [part 1]: H4sIAAAAAAAA/x... +[DEBUG][o.e.c.c.LagDetector ] [master] hot threads from node [{node}{g3cCUaMDQJmQ2ZLtjr-3dg}{10.0.0.1:9300}] lagging at version [183619] despite commit of cluster state version [183620] [part 2]: p7x3w1hmOQVtuV... +[DEBUG][o.e.c.c.LagDetector ] [master] hot threads from node [{node}{g3cCUaMDQJmQ2ZLtjr-3dg}{10.0.0.1:9300}] lagging at version [183619] despite commit of cluster state version [183620] [part 3]: v7uTboMGDbyOy+... +[DEBUG][o.e.c.c.LagDetector ] [master] hot threads from node [{node}{g3cCUaMDQJmQ2ZLtjr-3dg}{10.0.0.1:9300}] lagging at version [183619] despite commit of cluster state version [183620] [part 4]: 4tse0RnPnLeDNN... +[DEBUG][o.e.c.c.LagDetector ] [master] hot threads from node [{node}{g3cCUaMDQJmQ2ZLtjr-3dg}{10.0.0.1:9300}] lagging at version [183619] despite commit of cluster state version [183620] (gzip compressed, base64-encoded, and split into 4 parts on preceding log lines) +---- + +To reconstruct the output, base64-decode the data and decompress it using +`gzip`. For instance, on Unix-like systems: + +[source,sh] +---- +cat lagdetector.log | sed -e 's/.*://' | base64 --decode | gzip --decompress +---- ===== Diagnosing `follower check retry count exceeded` nodes diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java index 6748fa9980d1..5e21f7c69341 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java @@ -7,6 +7,7 @@ */ package org.elasticsearch.cluster.coordination; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; @@ -15,8 +16,13 @@ import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsReq import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.ReferenceDocs; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.logging.ChunkedLoggingStream; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.PrioritizedThrottledTaskRunner; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; @@ -25,6 +31,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportService; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -196,11 +204,13 @@ public class LagDetector { private final TransportService transportService; private final Client client; private final LagListener delegate; + private final PrioritizedThrottledTaskRunner loggingTaskRunner; HotThreadsLoggingLagListener(TransportService transportService, Client client, LagListener delegate) { this.transportService = transportService; this.client = client; this.delegate = delegate; + this.loggingTaskRunner = new PrioritizedThrottledTaskRunner<>("hot_threads", 1, transportService.getThreadPool().generic()); } @Override @@ -224,12 +234,13 @@ public class LagDetector { return; } - logger.debug( - "hot threads from node [{}] lagging at version [{}] despite commit of cluster state version [{}]:\n{}", - discoveryNode.descriptionWithoutAttributes(), - appliedVersion, - expectedVersion, - nodesHotThreadsResponse.getNodes().get(0).getHotThreads() + loggingTaskRunner.enqueueTask( + new HotThreadsLoggingTask( + discoveryNode, + appliedVersion, + expectedVersion, + nodesHotThreadsResponse.getNodes().get(0).getHotThreads() + ) ); } @@ -281,4 +292,42 @@ public class LagDetector { } } + static class HotThreadsLoggingTask extends AbstractRunnable implements Comparable { + + private final String nodeHotThreads; + private final String prefix; + + HotThreadsLoggingTask(DiscoveryNode discoveryNode, long appliedVersion, long expectedVersion, String nodeHotThreads) { + this.nodeHotThreads = nodeHotThreads; + this.prefix = Strings.format( + "hot threads from node [%s] lagging at version [%d] despite commit of cluster state version [%d]", + discoveryNode.descriptionWithoutAttributes(), + appliedVersion, + expectedVersion + ); + } + + @Override + public void onFailure(Exception e) { + logger.error(Strings.format("unexpected exception reporting %s", prefix), e); + } + + @Override + protected void doRun() throws Exception { + try ( + var writer = new OutputStreamWriter( + ChunkedLoggingStream.create(logger, Level.DEBUG, prefix, ReferenceDocs.LAGGING_NODE_TROUBLESHOOTING), + StandardCharsets.UTF_8 + ) + ) { + writer.write(nodeHotThreads); + } + } + + @Override + public int compareTo(HotThreadsLoggingTask o) { + return 0; + } + } + } diff --git a/server/src/main/java/org/elasticsearch/common/ReferenceDocs.java b/server/src/main/java/org/elasticsearch/common/ReferenceDocs.java index 1f7622a4e59c..7a1b9d341cb0 100644 --- a/server/src/main/java/org/elasticsearch/common/ReferenceDocs.java +++ b/server/src/main/java/org/elasticsearch/common/ReferenceDocs.java @@ -21,7 +21,8 @@ import java.util.List; public enum ReferenceDocs { INITIAL_MASTER_NODES("important-settings.html#initial_master_nodes"), DISCOVERY_TROUBLESHOOTING("discovery-troubleshooting.html"), - UNSTABLE_CLUSTER_TROUBLESHOOTING("cluster-fault-detection.html#cluster-fault-detection-troubleshooting"); + UNSTABLE_CLUSTER_TROUBLESHOOTING("cluster-fault-detection.html#cluster-fault-detection-troubleshooting"), + LAGGING_NODE_TROUBLESHOOTING("cluster-fault-detection.html#_diagnosing_lagging_nodes"); private final String relativePath; diff --git a/server/src/main/java/org/elasticsearch/common/logging/ChunkedLoggingStream.java b/server/src/main/java/org/elasticsearch/common/logging/ChunkedLoggingStream.java new file mode 100644 index 000000000000..941a6d1f9eb3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/logging/ChunkedLoggingStream.java @@ -0,0 +1,148 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.logging; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.ReferenceDocs; +import org.elasticsearch.common.unit.ByteSizeUnit; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Objects; +import java.util.zip.GZIPOutputStream; + +/** + * An {@link OutputStream} which Gzip-compresses the written data, Base64-encodes it, and writes it in fixed-size chunks to a logger. This + * is useful for debugging information that may be too large for a single log message and/or which may include data which cannot be + * recorded faithfully in plain-text (e.g. binary data or data with significant whitespace). + */ +public class ChunkedLoggingStream extends OutputStream { + + static final int CHUNK_SIZE = ByteSizeUnit.KB.toIntBytes(2); + + /** + * Create an {@link OutputStream} which Gzip-compresses the written data, Base64-encodes it, and writes it in fixed-size (2kiB) chunks + * to the given logger. If the data fits into a single chunk then the output looks like this: + * + *
+     * $PREFIX (gzip compressed and base64-encoded; for details see ...): H4sIAAAAA...
+     * 
+ * + * If there are multiple chunks then they are written like this: + * + *
+     * $PREFIX [part 1]: H4sIAAAAA...
+     * $PREFIX [part 2]: r38c4MBHO...
+     * $PREFIX [part 3]: ECyRFONaL...
+     * $PREFIX [part 4]: kTgm+Qswm...
+     * $PREFIX (gzip compressed, base64-encoded, and split into 4 parts on preceding log lines; for details see ...)
+     * 
+ * + * @param logger The logger to receive the chunks of data. + * @param level The log level to use for the logging. + * @param prefix A prefix for each chunk, which should be reasonably unique to allow for reconstruction of the original message + * even if multiple such streams are used concurrently. + * @param referenceDocs A link to the relevant reference docs to help users interpret the output. Relevant reference docs are required + * because the output is rather human-unfriendly and we need somewhere to describe how to decode it. + */ + public static OutputStream create(Logger logger, Level level, String prefix, ReferenceDocs referenceDocs) throws IOException { + return new GZIPOutputStream(Base64.getEncoder().wrap(new ChunkedLoggingStream(logger, level, prefix, referenceDocs))); + } + + private final Logger logger; + private final Level level; + private final String prefix; + private final ReferenceDocs referenceDocs; + + private int chunk; + private int offset; + private boolean closed; + private final byte[] buffer = new byte[CHUNK_SIZE]; + + ChunkedLoggingStream(Logger logger, Level level, String prefix, ReferenceDocs referenceDocs) { + this.logger = Objects.requireNonNull(logger); + this.level = Objects.requireNonNull(level); + this.prefix = Objects.requireNonNull(prefix); + this.referenceDocs = Objects.requireNonNull(referenceDocs); + } + + private void flushBuffer() { + assert closed || offset == CHUNK_SIZE : offset; + assert offset >= 0 && offset <= CHUNK_SIZE : offset; + chunk += 1; + + final var chunkString = new String(buffer, 0, offset, StandardCharsets.ISO_8859_1); + offset = 0; + + if (closed && chunk == 1) { + logger.log(level, "{} (gzip compressed and base64-encoded; for details see {}): {}", prefix, referenceDocs, chunkString); + } else { + logger.log(level, "{} [part {}]: {}", prefix, chunk, chunkString); + } + } + + @Override + public void write(int b) throws IOException { + assert closed == false; + if (offset == CHUNK_SIZE) { + flushBuffer(); + } + buffer[offset] = (byte) b; + assert assertSafeByte(buffer[offset]); + offset += 1; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + assert closed == false; + assert assertSafeBytes(b, off, len); + while (len > 0) { + if (offset == CHUNK_SIZE) { + flushBuffer(); + } + var copyLen = Math.min(len, CHUNK_SIZE - offset); + System.arraycopy(b, off, buffer, offset, copyLen); + offset += copyLen; + off += copyLen; + len -= copyLen; + } + } + + @Override + public void close() throws IOException { + if (closed == false) { + closed = true; + flushBuffer(); + if (chunk > 1) { + logger.log( + level, + "{} (gzip compressed, base64-encoded, and split into {} parts on preceding log lines; for details see {})", + prefix, + chunk, + referenceDocs + ); + } + } + } + + private static boolean assertSafeBytes(byte[] b, int off, int len) { + for (int i = off; i < off + len; i++) { + assertSafeByte(b[i]); + } + return true; + } + + private static boolean assertSafeByte(byte b) { + assert 0x20 <= b && b < 0x7f; + return true; + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 3f3060d3eb0d..08bc74d1caed 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -9,7 +9,6 @@ package org.elasticsearch.cluster.coordination; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.LogEvent; -import org.apache.lucene.util.Constants; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractNamedDiffable; @@ -2186,20 +2185,16 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase { ) ); - if (Constants.WINDOWS == false) { - // log messages containing control characters are hidden from the log assertions framework, and this includes the - // `\r` that Windows uses in its line endings, so we only see this message on systems with `\n` line endings: - mockLogAppender.addExpectation( - new MockLogAppender.SeenEventExpectation( - "hot threads from lagging node", - LagDetector.class.getCanonicalName(), - Level.DEBUG, - "hot threads from node [" - + brokenNode.getLocalNode().descriptionWithoutAttributes() - + "] lagging at version [*] despite commit of cluster state version [*]:\nHot threads at*" - ) - ); - } + mockLogAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "hot threads from lagging node", + LagDetector.class.getCanonicalName(), + Level.DEBUG, + "hot threads from node [" + + brokenNode.getLocalNode().descriptionWithoutAttributes() + + "] lagging at version [*] despite commit of cluster state version [*]*" + ) + ); // drop the publication messages to one node, but then restore connectivity so it remains in the cluster and does not fail // health checks diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LagDetectorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LagDetectorTests.java index 052da3dd2cfe..22ea808e00a6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/LagDetectorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LagDetectorTests.java @@ -7,11 +7,17 @@ */ package org.elasticsearch.cluster.coordination; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.ReferenceDocs; +import org.elasticsearch.common.logging.ChunkedLoggingStreamTests; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.junit.Before; import java.util.Arrays; @@ -243,4 +249,23 @@ public class LagDetectorTests extends ESTestCase { deterministicTaskQueue.runAllTasksInTimeOrder(); assertThat(failedNodes, empty()); // nodes added after a lag detector was started are also ignored } + + @TestLogging(reason = "testing LagDetector logging", value = "org.elasticsearch.cluster.coordination.LagDetector:DEBUG") + public void testHotThreadsChunkedLoggingEncoding() { + final var node = new DiscoveryNode("test", buildNewFakeTransportAddress(), Version.CURRENT); + final var expectedBody = randomUnicodeOfLengthBetween(1, 20000); + assertEquals( + expectedBody, + ChunkedLoggingStreamTests.getDecodedLoggedBody( + LogManager.getLogger(LagDetector.class), + Level.DEBUG, + "hot threads from node [" + + node.descriptionWithoutAttributes() + + "] lagging at version [1] despite commit of cluster state version [2]", + ReferenceDocs.LAGGING_NODE_TROUBLESHOOTING, + new LagDetector.HotThreadsLoggingTask(node, 1, 2, expectedBody)::run + ).utf8ToString() + ); + } + } diff --git a/server/src/test/java/org/elasticsearch/common/logging/ChunkedLoggingStreamTests.java b/server/src/test/java/org/elasticsearch/common/logging/ChunkedLoggingStreamTests.java new file mode 100644 index 000000000000..a0534ea52dfc --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/logging/ChunkedLoggingStreamTests.java @@ -0,0 +1,196 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.logging; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.Property; +import org.elasticsearch.common.ReferenceDocs; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.core.CheckedRunnable; +import org.elasticsearch.core.Streams; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.junit.annotations.TestLogging; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Base64; +import java.util.stream.IntStream; +import java.util.zip.GZIPInputStream; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class ChunkedLoggingStreamTests extends ESTestCase { + + public static final Logger logger = LogManager.getLogger(ChunkedLoggingStreamTests.class); + + @TestLogging(reason = "testing logging", value = "org.elasticsearch.common.logging.ChunkedLoggingStreamTests:DEBUG") + public void testLogMessageChunking() { + // bugs are most likely near chunk boundaries, so test sizes that are within +/- 3 bytes of 0, 1, and 2 chunks: + IntStream.rangeClosed(-3, 3) + .flatMap(i -> IntStream.iterate(i, j -> j + ChunkedLoggingStream.CHUNK_SIZE).limit(3)) + .filter(i -> i >= 0) + .sorted() + .forEach(ChunkedLoggingStreamTests::runChunkingTest); + } + + private static void runChunkingTest(int size) { + final var bytes = new byte[size]; + Arrays.fill(bytes, (byte) '.'); + final var expectedBody = new String(bytes, StandardCharsets.ISO_8859_1); + final var prefix = randomAlphaOfLength(10); + final var level = randomFrom(Level.DEBUG, Level.INFO, Level.WARN, Level.ERROR); + final var referenceDocs = randomFrom(ReferenceDocs.values()); + assertEquals(expectedBody, getLoggedBody(logger, level, prefix, referenceDocs, () -> { + try (var stream = new ChunkedLoggingStream(logger, level, prefix, referenceDocs)) { + writeRandomly(stream, bytes); + } + })); + } + + @TestLogging(reason = "testing logging", value = "org.elasticsearch.common.logging.ChunkedLoggingStreamTests:DEBUG") + public void testEncodingRoundTrip() { + final var bytes = randomByteArrayOfLength(between(0, 10000)); + final var level = randomFrom(Level.DEBUG, Level.INFO, Level.WARN, Level.ERROR); + final var referenceDocs = randomFrom(ReferenceDocs.values()); + assertEquals(new BytesArray(bytes), getDecodedLoggedBody(logger, level, "prefix", referenceDocs, () -> { + try (var stream = ChunkedLoggingStream.create(logger, level, "prefix", referenceDocs)) { + writeRandomly(stream, bytes); + } + })); + } + + private static String getLoggedBody( + Logger captureLogger, + final Level level, + String prefix, + final ReferenceDocs referenceDocs, + CheckedRunnable runnable + ) { + class ChunkReadingAppender extends AbstractAppender { + final StringBuilder encodedResponseBuilder = new StringBuilder(); + int chunks; + boolean seenTotal; + + ChunkReadingAppender() { + super("mock", null, null, false, Property.EMPTY_ARRAY); + } + + @Override + public void append(LogEvent event) { + if (event.getLevel() != level) { + return; + } + if (event.getLoggerName().equals(captureLogger.getName()) == false) { + return; + } + assertFalse(seenTotal); + final var message = event.getMessage().getFormattedMessage(); + final var onePartPrefix = prefix + " (gzip compressed and base64-encoded; for details see " + referenceDocs + "): "; + final var partPrefix = prefix + " [part " + (chunks + 1) + "]: "; + if (message.startsWith(partPrefix)) { + chunks += 1; + final var chunk = message.substring(partPrefix.length()); + assertThat(chunk.length(), lessThanOrEqualTo(ChunkedLoggingStream.CHUNK_SIZE)); + encodedResponseBuilder.append(chunk); + } else if (message.startsWith(onePartPrefix)) { + assertEquals(0, chunks); + chunks += 1; + final var chunk = message.substring(onePartPrefix.length()); + assertThat(chunk.length(), lessThanOrEqualTo(ChunkedLoggingStream.CHUNK_SIZE)); + encodedResponseBuilder.append(chunk); + seenTotal = true; + } else { + assertEquals( + prefix + + " (gzip compressed, base64-encoded, and split into " + + chunks + + " parts on preceding log lines; for details see " + + referenceDocs + + ")", + message + ); + assertThat(chunks, greaterThan(1)); + seenTotal = true; + } + } + } + + final var appender = new ChunkReadingAppender(); + try { + appender.start(); + Loggers.addAppender(captureLogger, appender); + runnable.run(); + } catch (Exception e) { + throw new AssertionError("unexpected", e); + } finally { + Loggers.removeAppender(captureLogger, appender); + appender.stop(); + } + + assertThat(appender.chunks, greaterThan(0)); + assertTrue(appender.seenTotal); + + return appender.encodedResponseBuilder.toString(); + } + + /** + * Test utility function which captures the logged output from a {@link ChunkedLoggingStream}, combines the chunks, Base64-decodes it + * and Gzip-decompresses it to retrieve the original data. + * + * @param captureLogger The logger whose output should be captured. + * @param level The log level for the data. + * @param prefix The prefix used by the logging stream. + * @param referenceDocs A link to the reference docs about the output. + * @param runnable The action which emits the logs. + * @return A {@link BytesReference} containing the captured data. + */ + public static BytesReference getDecodedLoggedBody( + Logger captureLogger, + Level level, + String prefix, + ReferenceDocs referenceDocs, + CheckedRunnable runnable + ) { + final var loggedBody = getLoggedBody(captureLogger, level, prefix, referenceDocs, runnable); + + try ( + var bytesStreamOutput = new BytesStreamOutput(); + var byteArrayInputStream = new ByteArrayInputStream(Base64.getDecoder().decode(loggedBody)); + var gzipInputStream = new GZIPInputStream(byteArrayInputStream) + ) { + Streams.copy(gzipInputStream, bytesStreamOutput); + return bytesStreamOutput.bytes(); + } catch (Exception e) { + throw new AssertionError("unexpected", e); + } + } + + private static void writeRandomly(OutputStream stream, byte[] bytes) throws IOException { + for (var pos = 0; pos < bytes.length;) { + if (randomBoolean()) { + stream.write(bytes[pos++]); + } else { + var len = between(1, bytes.length - pos); + stream.write(bytes, pos, len); + pos += len; + } + } + } + +}