diff --git a/core/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java b/core/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java index 2700ea4dc135..ac90e546f7eb 100644 --- a/core/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java +++ b/core/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java @@ -30,13 +30,17 @@ import org.elasticsearch.common.util.ByteArray; */ public final class ReleasablePagedBytesReference extends PagedBytesReference implements Releasable { - public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length) { + private final Releasable releasable; + + public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length, + Releasable releasable) { super(bigarrays, byteArray, length); + this.releasable = releasable; } @Override public void close() { - Releasables.close(byteArray); + Releasables.close(releasable); } } diff --git a/core/src/main/java/org/elasticsearch/common/compress/Compressor.java b/core/src/main/java/org/elasticsearch/common/compress/Compressor.java index 05706debd371..b39e7f6e142f 100644 --- a/core/src/main/java/org/elasticsearch/common/compress/Compressor.java +++ b/core/src/main/java/org/elasticsearch/common/compress/Compressor.java @@ -20,6 +20,7 @@ package org.elasticsearch.common.compress; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -31,5 +32,9 @@ public interface Compressor { StreamInput streamInput(StreamInput in) throws IOException; + /** + * Creates a new stream output that compresses the contents and writes to the provided stream + * output. Closing the returned {@link StreamOutput} will close the provided stream output. + */ StreamOutput streamOutput(StreamOutput out) throws IOException; } diff --git a/core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java b/core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java index 42e2efa358cf..794a8db4960c 100644 --- a/core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java +++ b/core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java @@ -20,7 +20,6 @@ package org.elasticsearch.common.compress; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; @@ -47,7 +46,7 @@ public class DeflateCompressor implements Compressor { // It needs to be different from other compressors and to not be specific // enough so that no stream starting with these bytes could be detected as // a XContent - private static final byte[] HEADER = new byte[] { 'D', 'F', 'L', '\0' }; + private static final byte[] HEADER = new byte[]{'D', 'F', 'L', '\0'}; // 3 is a good trade-off between speed and compression ratio private static final int LEVEL = 3; // We use buffering on the input and output of in/def-laters in order to @@ -88,6 +87,7 @@ public class DeflateCompressor implements Compressor { decompressedIn = new BufferedInputStream(decompressedIn, BUFFER_SIZE); return new InputStreamStreamInput(decompressedIn) { final AtomicBoolean closed = new AtomicBoolean(false); + public void close() throws IOException { try { super.close(); @@ -107,10 +107,11 @@ public class DeflateCompressor implements Compressor { final boolean nowrap = true; final Deflater deflater = new Deflater(LEVEL, nowrap); final boolean syncFlush = true; - OutputStream compressedOut = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush); - compressedOut = new BufferedOutputStream(compressedOut, BUFFER_SIZE); + DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush); + OutputStream compressedOut = new BufferedOutputStream(deflaterOutputStream, BUFFER_SIZE); return new OutputStreamStreamOutput(compressedOut) { final AtomicBoolean closed = new AtomicBoolean(false); + public void close() throws IOException { try { super.close(); diff --git a/core/src/main/java/org/elasticsearch/common/io/ReleasableBytesStream.java b/core/src/main/java/org/elasticsearch/common/io/ReleasableBytesStream.java deleted file mode 100644 index e31f206bcad9..000000000000 --- a/core/src/main/java/org/elasticsearch/common/io/ReleasableBytesStream.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io; - -import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; - -/** - * A bytes stream that requires its bytes to be released once no longer used. - */ -public interface ReleasableBytesStream extends BytesStream { - - @Override - ReleasablePagedBytesReference bytes(); - -} diff --git a/core/src/main/java/org/elasticsearch/common/io/Streams.java b/core/src/main/java/org/elasticsearch/common/io/Streams.java index f922fde3e753..f24b703251b7 100644 --- a/core/src/main/java/org/elasticsearch/common/io/Streams.java +++ b/core/src/main/java/org/elasticsearch/common/io/Streams.java @@ -20,6 +20,9 @@ package org.elasticsearch.common.io; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStream; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.Callback; import java.io.BufferedReader; @@ -236,4 +239,56 @@ public abstract class Streams { } } } + + /** + * Wraps the given {@link BytesStream} in a {@link StreamOutput} that simply flushes when + * close is called. + */ + public static BytesStream flushOnCloseStream(BytesStream os) { + return new FlushOnCloseOutputStream(os); + } + + /** + * A wrapper around a {@link BytesStream} that makes the close operation a flush. This is + * needed as sometimes a stream will be closed but the bytes that the stream holds still need + * to be used and the stream cannot be closed until the bytes have been consumed. + */ + private static class FlushOnCloseOutputStream extends BytesStream { + + private final BytesStream delegate; + + private FlushOnCloseOutputStream(BytesStream bytesStreamOutput) { + this.delegate = bytesStreamOutput; + } + + @Override + public void writeByte(byte b) throws IOException { + delegate.writeByte(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + delegate.writeBytes(b, offset, length); + } + + @Override + public void flush() throws IOException { + delegate.flush(); + } + + @Override + public void close() throws IOException { + flush(); + } + + @Override + public void reset() throws IOException { + delegate.reset(); + } + + @Override + public BytesReference bytes() { + return delegate.bytes(); + } + } } diff --git a/core/src/main/java/org/elasticsearch/common/io/BytesStream.java b/core/src/main/java/org/elasticsearch/common/io/stream/BytesStream.java similarity index 85% rename from core/src/main/java/org/elasticsearch/common/io/BytesStream.java rename to core/src/main/java/org/elasticsearch/common/io/stream/BytesStream.java index 903c1dcb7996..c20dcf62c9bb 100644 --- a/core/src/main/java/org/elasticsearch/common/io/BytesStream.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/BytesStream.java @@ -17,11 +17,11 @@ * under the License. */ -package org.elasticsearch.common.io; +package org.elasticsearch.common.io.stream; import org.elasticsearch.common.bytes.BytesReference; -public interface BytesStream { +public abstract class BytesStream extends StreamOutput { - BytesReference bytes(); -} \ No newline at end of file + public abstract BytesReference bytes(); +} diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java index e65e8efb27b0..ab9a1896ef7c 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java @@ -21,7 +21,6 @@ package org.elasticsearch.common.io.stream; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.PagedBytesReference; -import org.elasticsearch.common.io.BytesStream; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteArray; @@ -31,7 +30,7 @@ import java.io.IOException; * A @link {@link StreamOutput} that uses {@link BigArrays} to acquire pages of * bytes, which avoids frequent reallocation & copying of the internal data. */ -public class BytesStreamOutput extends StreamOutput implements BytesStream { +public class BytesStreamOutput extends BytesStream { protected final BigArrays bigArrays; @@ -50,7 +49,7 @@ public class BytesStreamOutput extends StreamOutput implements BytesStream { /** * Create a non recycling {@link BytesStreamOutput} with enough initial pages acquired * to satisfy the capacity given by expected size. - * + * * @param expectedSize the expected maximum size of the stream in bytes. */ public BytesStreamOutput(int expectedSize) { @@ -129,7 +128,7 @@ public class BytesStreamOutput extends StreamOutput implements BytesStream { /** * Returns the current size of the buffer. - * + * * @return the value of the count field, which is the number of valid * bytes in this output stream. * @see java.io.ByteArrayOutputStream#count @@ -151,7 +150,7 @@ public class BytesStreamOutput extends StreamOutput implements BytesStream { return bytes.ramBytesUsed(); } - private void ensureCapacity(long offset) { + void ensureCapacity(long offset) { if (offset > Integer.MAX_VALUE) { throw new IllegalArgumentException(getClass().getSimpleName() + " cannot hold more than 2GB of data"); } diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java index 674ff18f0fc1..347897c2ecba 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java @@ -20,29 +20,56 @@ package org.elasticsearch.common.io.stream; import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; -import org.elasticsearch.common.io.ReleasableBytesStream; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.ByteArray; /** * An bytes stream output that allows providing a {@link BigArrays} instance * expecting it to require releasing its content ({@link #bytes()}) once done. *

- * Please note, its is the responsibility of the caller to make sure the bytes - * reference do not "escape" and are released only once. + * Please note, closing this stream will release the bytes that are in use by any + * {@link ReleasablePagedBytesReference} returned from {@link #bytes()}, so this + * stream should only be closed after the bytes have been output or copied + * elsewhere. */ -public class ReleasableBytesStreamOutput extends BytesStreamOutput implements ReleasableBytesStream { +public class ReleasableBytesStreamOutput extends BytesStreamOutput + implements Releasable { + + private Releasable releasable; public ReleasableBytesStreamOutput(BigArrays bigarrays) { - super(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays); + this(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays); } public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) { super(expectedSize, bigArrays); + this.releasable = Releasables.releaseOnce(this.bytes); + } + + /** + * Returns a {@link Releasable} implementation of a + * {@link org.elasticsearch.common.bytes.BytesReference} that represents the current state of + * the bytes in the stream. + */ + @Override + public ReleasablePagedBytesReference bytes() { + return new ReleasablePagedBytesReference(bigArrays, bytes, count, releasable); } @Override - public ReleasablePagedBytesReference bytes() { - return new ReleasablePagedBytesReference(bigArrays, bytes, count); + public void close() { + Releasables.close(releasable); } + @Override + void ensureCapacity(long offset) { + final ByteArray prevBytes = this.bytes; + super.ensureCapacity(offset); + if (prevBytes != this.bytes) { + // re-create the releasable with the new reference + releasable = Releasables.releaseOnce(this.bytes); + } + } } diff --git a/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java b/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java index 189e9d3c8d5d..f0427ce24666 100644 --- a/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java +++ b/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java @@ -22,7 +22,7 @@ package org.elasticsearch.common.xcontent; import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.geo.GeoPoint; -import org.elasticsearch.common.io.BytesStream; +import org.elasticsearch.common.io.stream.BytesStream; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.text.Text; @@ -53,7 +53,7 @@ import java.util.concurrent.TimeUnit; /** * A utility to build XContent (ie json). */ -public final class XContentBuilder implements BytesStream, Releasable, Flushable { +public final class XContentBuilder implements Releasable, Flushable { /** * Create a new {@link XContentBuilder} using the given {@link XContent} content. @@ -1041,7 +1041,6 @@ public final class XContentBuilder implements BytesStream, Releasable, Flushable return this.generator; } - @Override public BytesReference bytes() { close(); return ((BytesStream) bos).bytes(); diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index d9a8cc408f82..4bad4593bc14 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -439,7 +439,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); } finally { - Releasables.close(out.bytes()); + Releasables.close(out); } } @@ -1332,7 +1332,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC bytes.writeTo(outStream); } } finally { - Releasables.close(out.bytes()); + Releasables.close(out); } } diff --git a/core/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java b/core/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java index bdc78c82dd50..4db9aec6e93e 100644 --- a/core/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java +++ b/core/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java @@ -20,12 +20,14 @@ package org.elasticsearch.rest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; +import java.io.OutputStream; import java.util.Collections; import java.util.Set; import java.util.function.Predicate; @@ -97,7 +99,9 @@ public abstract class AbstractRestChannel implements RestChannel { excludes = filters.stream().filter(EXCLUDE_FILTER).map(f -> f.substring(1)).collect(toSet()); } - XContentBuilder builder = new XContentBuilder(XContentFactory.xContent(responseContentType), bytesOutput(), includes, excludes); + OutputStream unclosableOutputStream = Streams.flushOnCloseStream(bytesOutput()); + XContentBuilder builder = + new XContentBuilder(XContentFactory.xContent(responseContentType), unclosableOutputStream, includes, excludes); if (pretty) { builder.prettyPrint().lfAtEnd(); } @@ -107,8 +111,9 @@ public abstract class AbstractRestChannel implements RestChannel { } /** - * A channel level bytes output that can be reused. It gets reset on each call to this - * method. + * A channel level bytes output that can be reused. The bytes output is lazily instantiated + * by a call to {@link #newBytesOutput()}. Once the stream is created, it gets reset on each + * call to this method. */ @Override public final BytesStreamOutput bytesOutput() { @@ -120,6 +125,14 @@ public abstract class AbstractRestChannel implements RestChannel { return bytesOut; } + /** + * An accessor to the raw value of the channel bytes output. This method will not instantiate + * a new stream if one does not exist and this method will not reset the stream. + */ + protected final BytesStreamOutput bytesOutputOrNull() { + return bytesOut; + } + protected BytesStreamOutput newBytesOutput() { return new BytesStreamOutput(); } diff --git a/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java b/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java index 72ee7efc4890..c16f862fb4af 100644 --- a/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java +++ b/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java @@ -30,7 +30,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; @@ -147,8 +147,8 @@ public class BytesRestResponse extends RestResponse { return builder; } - static BytesRestResponse createSimpleErrorResponse(RestStatus status, String errorMessage) throws IOException { - return new BytesRestResponse(status, JsonXContent.contentBuilder().startObject() + static BytesRestResponse createSimpleErrorResponse(RestChannel channel, RestStatus status, String errorMessage) throws IOException { + return new BytesRestResponse(status, channel.newErrorBuilder().startObject() .field("error", errorMessage) .field("status", status.getStatus()) .endObject()); diff --git a/core/src/main/java/org/elasticsearch/rest/RestController.java b/core/src/main/java/org/elasticsearch/rest/RestController.java index ea603cf949f2..a3d8a4b7db5f 100644 --- a/core/src/main/java/org/elasticsearch/rest/RestController.java +++ b/core/src/main/java/org/elasticsearch/rest/RestController.java @@ -178,8 +178,9 @@ public class RestController extends AbstractComponent implements HttpServerTrans sendContentTypeErrorMessage(request, responseChannel); } else if (contentLength > 0 && handler != null && handler.supportsContentStream() && request.getXContentType() != XContentType.JSON && request.getXContentType() != XContentType.SMILE) { - responseChannel.sendResponse(BytesRestResponse.createSimpleErrorResponse(RestStatus.NOT_ACCEPTABLE, "Content-Type [" + - request.getXContentType() + "] does not support stream parsing. Use JSON or SMILE instead")); + responseChannel.sendResponse(BytesRestResponse.createSimpleErrorResponse(responseChannel, + RestStatus.NOT_ACCEPTABLE, "Content-Type [" + request.getXContentType() + + "] does not support stream parsing. Use JSON or SMILE instead")); } else { if (canTripCircuitBreaker(request)) { inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, ""); @@ -229,7 +230,8 @@ public class RestController extends AbstractComponent implements HttpServerTrans void dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client, ThreadContext threadContext, final RestHandler handler) throws Exception { if (checkRequestParameters(request, channel) == false) { - channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(BAD_REQUEST, "error traces in responses are disabled.")); + channel + .sendResponse(BytesRestResponse.createSimpleErrorResponse(channel,BAD_REQUEST, "error traces in responses are disabled.")); } else { for (String key : headersToCopy) { String httpHeader = request.header(key); @@ -283,7 +285,7 @@ public class RestController extends AbstractComponent implements HttpServerTrans Strings.collectionToCommaDelimitedString(restRequest.getAllHeaderValues("Content-Type")) + "] is not supported"; } - channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(NOT_ACCEPTABLE, errorMessage)); + channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, NOT_ACCEPTABLE, errorMessage)); } /** diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index dd75ae295562..cbdc0dfa1785 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -41,7 +41,7 @@ import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.NotCompressedException; -import org.elasticsearch.common.io.ReleasableBytesStream; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -1025,10 +1025,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } status = TransportStatus.setRequest(status); ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); - // we wrap this in a release once since if the onRequestSent callback throws an exception - // we might release things twice and this should be prevented - final Releasable toRelease = Releasables.releaseOnce(() -> Releasables.close(bStream.bytes())); - StreamOutput stream = bStream; + boolean addedReleaseListener = false; + StreamOutput stream = Streams.flushOnCloseStream(bStream); try { // only compress if asked, and, the request is not bytes, since then only // the header part is compressed, and the "body" can't be extracted as compressed @@ -1047,12 +1045,17 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i stream.writeString(action); BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream, bStream); final TransportRequestOptions finalOptions = options; + final StreamOutput finalStream = stream; // this might be called in a different thread - SendListener onRequestSent = new SendListener(toRelease, + SendListener onRequestSent = new SendListener( + () -> IOUtils.closeWhileHandlingException(finalStream, bStream), () -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions)); internalSendMessage(targetChannel, message, onRequestSent); + addedReleaseListener = true; } finally { - IOUtils.close(stream); + if (!addedReleaseListener) { + IOUtils.close(stream, bStream); + } } } @@ -1114,10 +1117,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } status = TransportStatus.setResponse(status); // TODO share some code with sendRequest ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); - // we wrap this in a release once since if the onRequestSent callback throws an exception - // we might release things twice and this should be prevented - final Releasable toRelease = Releasables.releaseOnce(() -> Releasables.close(bStream.bytes())); - StreamOutput stream = bStream; + boolean addedReleaseListener = false; + StreamOutput stream = Streams.flushOnCloseStream(bStream); try { if (options.compress()) { status = TransportStatus.setCompress(status); @@ -1128,12 +1129,16 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream, bStream); final TransportResponseOptions finalOptions = options; + final StreamOutput finalStream = stream; // this might be called in a different thread - SendListener listener = new SendListener(toRelease, + SendListener listener = new SendListener(() -> IOUtils.closeWhileHandlingException(finalStream, bStream), () -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions)); internalSendMessage(channel, reference, listener); + addedReleaseListener = true; } finally { - IOUtils.close(stream); + if (!addedReleaseListener) { + IOUtils.close(stream, bStream); + } } } @@ -1161,7 +1166,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i * Serializes the given message into a bytes representation */ private BytesReference buildMessage(long requestId, byte status, Version nodeVersion, TransportMessage message, StreamOutput stream, - ReleasableBytesStream writtenBytes) throws IOException { + ReleasableBytesStreamOutput writtenBytes) throws IOException { final BytesReference zeroCopyBuffer; if (message instanceof BytesTransportRequest) { // what a shitty optimization - we should use a direct send method instead BytesTransportRequest bRequest = (BytesTransportRequest) message; diff --git a/core/src/test/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutputTests.java b/core/src/test/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutputTests.java new file mode 100644 index 000000000000..557721a0241a --- /dev/null +++ b/core/src/test/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutputTests.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.io.stream; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +public class ReleasableBytesStreamOutputTests extends ESTestCase { + + public void testRelease() throws Exception { + MockBigArrays mockBigArrays = + new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); + try (ReleasableBytesStreamOutput output = + getRandomReleasableBytesStreamOutput(mockBigArrays)) { + output.writeBoolean(randomBoolean()); + } + MockBigArrays.ensureAllArraysAreReleased(); + } + + private ReleasableBytesStreamOutput getRandomReleasableBytesStreamOutput( + MockBigArrays mockBigArrays) throws IOException { + ReleasableBytesStreamOutput output = new ReleasableBytesStreamOutput(mockBigArrays); + if (randomBoolean()) { + for (int i = 0; i < scaledRandomIntBetween(1, 32); i++) { + output.write(randomByte()); + } + } + return output; + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java index a4259b41fd82..07e91ec50e44 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java @@ -85,7 +85,7 @@ final class Netty4HttpChannel extends AbstractRestChannel { } @Override - public BytesStreamOutput newBytesOutput() { + protected BytesStreamOutput newBytesOutput() { return new ReleasableBytesStreamOutput(transport.bigArrays); } @@ -114,7 +114,8 @@ final class Netty4HttpChannel extends AbstractRestChannel { addCustomHeaders(resp, threadContext.getResponseHeaders()); BytesReference content = response.content(); - boolean release = content instanceof Releasable; + boolean releaseContent = content instanceof Releasable; + boolean releaseBytesStreamOutput = bytesOutputOrNull() instanceof ReleasableBytesStreamOutput; try { // If our response doesn't specify a content-type header, set one setHeaderField(resp, HttpHeaderNames.CONTENT_TYPE.toString(), response.contentType(), false); @@ -125,10 +126,14 @@ final class Netty4HttpChannel extends AbstractRestChannel { final ChannelPromise promise = channel.newPromise(); - if (release) { + if (releaseContent) { promise.addListener(f -> ((Releasable)content).close()); } + if (releaseBytesStreamOutput) { + promise.addListener(f -> bytesOutputOrNull().close()); + } + if (isCloseConnection()) { promise.addListener(ChannelFutureListener.CLOSE); } @@ -140,11 +145,15 @@ final class Netty4HttpChannel extends AbstractRestChannel { msg = resp; } channel.writeAndFlush(msg, promise); - release = false; + releaseContent = false; + releaseBytesStreamOutput = false; } finally { - if (release) { + if (releaseContent) { ((Releasable) content).close(); } + if (releaseBytesStreamOutput) { + bytesOutputOrNull().close(); + } if (pipelinedRequest != null) { pipelinedRequest.release(); } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java index c075afd463f4..7d8101df10ea 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java @@ -43,18 +43,24 @@ import io.netty.util.Attribute; import io.netty.util.AttributeKey; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteArray; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.http.HttpTransportSettings; import org.elasticsearch.http.NullDispatcher; import org.elasticsearch.http.netty4.cors.Netty4CorsHandler; import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; @@ -64,6 +70,7 @@ import org.elasticsearch.transport.netty4.Netty4Utils; import org.junit.After; import org.junit.Before; +import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.SocketAddress; import java.nio.charset.StandardCharsets; @@ -78,6 +85,7 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ENABLED; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -243,6 +251,37 @@ public class Netty4HttpChannelTests extends ESTestCase { } } + public void testReleaseOnSendToChannelAfterException() throws IOException { + final Settings settings = Settings.builder().build(); + final NamedXContentRegistry registry = xContentRegistry(); + try (Netty4HttpServerTransport httpServerTransport = + new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, registry, new NullDispatcher())) { + final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); + final EmbeddedChannel embeddedChannel = new EmbeddedChannel(); + final Netty4HttpRequest request = new Netty4HttpRequest(registry, httpRequest, embeddedChannel); + final HttpPipelinedRequest pipelinedRequest = randomBoolean() ? new HttpPipelinedRequest(request.request(), 1) : null; + final Netty4HttpChannel channel = + new Netty4HttpChannel(httpServerTransport, request, pipelinedRequest, randomBoolean(), threadPool.getThreadContext()); + final BytesRestResponse response = new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, + JsonXContent.contentBuilder().startObject().endObject()); + assertThat(response.content(), not(instanceOf(Releasable.class))); + + // ensure we have reserved bytes + if (randomBoolean()) { + BytesStreamOutput out = channel.bytesOutput(); + assertThat(out, instanceOf(ReleasableBytesStreamOutput.class)); + } else { + try (XContentBuilder builder = channel.newBuilder()) { + // do something builder + builder.startObject().endObject(); + } + } + + channel.sendResponse(response); + // ESTestCase#after will invoke ensureAllArraysAreReleased which will fail if the response content was not released + } + } + public void testConnectionClose() throws Exception { final Settings settings = Settings.builder().build(); try (Netty4HttpServerTransport httpServerTransport = @@ -549,7 +588,7 @@ public class Netty4HttpChannelTests extends ESTestCase { } final ByteArray bigArray = bigArrays.newByteArray(bytes.length); bigArray.set(0, bytes, 0, bytes.length); - reference = new ReleasablePagedBytesReference(bigArrays, bigArray, bytes.length); + reference = new ReleasablePagedBytesReference(bigArrays, bigArray, bytes.length, Releasables.releaseOnce(bigArray)); } @Override