From afd45c14327cd0f8d155e5ac9740f48e8e39b09c Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 4 Apr 2017 20:33:51 -0400 Subject: [PATCH] Revert "Closing a ReleasableBytesStreamOutput closes the underlying BigArray (#23572)" This reverts commit 6bfecdf921a1941b48273d76551872df4062cfae. --- .../bytes/ReleasablePagedBytesReference.java | 8 +-- .../common/compress/Compressor.java | 5 -- .../common/compress/DeflateCompressor.java | 9 ++- .../common/io/{stream => }/BytesStream.java | 8 +-- .../common/io/ReleasableBytesStream.java | 32 +++++++++++ .../org/elasticsearch/common/io/Streams.java | 55 ------------------- .../common/io/stream/BytesStreamOutput.java | 9 +-- .../stream/ReleasableBytesStreamOutput.java | 39 ++----------- .../common/xcontent/XContentBuilder.java | 5 +- .../index/translog/Translog.java | 4 +- .../rest/AbstractRestChannel.java | 19 +------ .../elasticsearch/rest/BytesRestResponse.java | 6 +- .../elasticsearch/rest/RestController.java | 10 ++-- .../elasticsearch/transport/TcpTransport.java | 33 +++++------ .../ReleasableBytesStreamOutputTests.java | 51 ----------------- .../http/netty4/Netty4HttpChannel.java | 19 ++----- .../http/netty4/Netty4HttpChannelTests.java | 41 +------------- 17 files changed, 88 insertions(+), 265 deletions(-) rename core/src/main/java/org/elasticsearch/common/io/{stream => }/BytesStream.java (85%) create mode 100644 core/src/main/java/org/elasticsearch/common/io/ReleasableBytesStream.java delete mode 100644 core/src/test/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutputTests.java diff --git a/core/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java b/core/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java index ac90e546f7eb..2700ea4dc135 100644 --- a/core/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java +++ b/core/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java @@ -30,17 +30,13 @@ import org.elasticsearch.common.util.ByteArray; */ public final class ReleasablePagedBytesReference extends PagedBytesReference implements Releasable { - private final Releasable releasable; - - public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length, - Releasable releasable) { + public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length) { super(bigarrays, byteArray, length); - this.releasable = releasable; } @Override public void close() { - Releasables.close(releasable); + Releasables.close(byteArray); } } diff --git a/core/src/main/java/org/elasticsearch/common/compress/Compressor.java b/core/src/main/java/org/elasticsearch/common/compress/Compressor.java index b39e7f6e142f..05706debd371 100644 --- a/core/src/main/java/org/elasticsearch/common/compress/Compressor.java +++ b/core/src/main/java/org/elasticsearch/common/compress/Compressor.java @@ -20,7 +20,6 @@ package org.elasticsearch.common.compress; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -32,9 +31,5 @@ public interface Compressor { StreamInput streamInput(StreamInput in) throws IOException; - /** - * Creates a new stream output that compresses the contents and writes to the provided stream - * output. Closing the returned {@link StreamOutput} will close the provided stream output. - */ StreamOutput streamOutput(StreamOutput out) throws IOException; } diff --git a/core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java b/core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java index 794a8db4960c..42e2efa358cf 100644 --- a/core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java +++ b/core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java @@ -20,6 +20,7 @@ package org.elasticsearch.common.compress; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; @@ -46,7 +47,7 @@ public class DeflateCompressor implements Compressor { // It needs to be different from other compressors and to not be specific // enough so that no stream starting with these bytes could be detected as // a XContent - private static final byte[] HEADER = new byte[]{'D', 'F', 'L', '\0'}; + private static final byte[] HEADER = new byte[] { 'D', 'F', 'L', '\0' }; // 3 is a good trade-off between speed and compression ratio private static final int LEVEL = 3; // We use buffering on the input and output of in/def-laters in order to @@ -87,7 +88,6 @@ public class DeflateCompressor implements Compressor { decompressedIn = new BufferedInputStream(decompressedIn, BUFFER_SIZE); return new InputStreamStreamInput(decompressedIn) { final AtomicBoolean closed = new AtomicBoolean(false); - public void close() throws IOException { try { super.close(); @@ -107,11 +107,10 @@ public class DeflateCompressor implements Compressor { final boolean nowrap = true; final Deflater deflater = new Deflater(LEVEL, nowrap); final boolean syncFlush = true; - DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush); - OutputStream compressedOut = new BufferedOutputStream(deflaterOutputStream, BUFFER_SIZE); + OutputStream compressedOut = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush); + compressedOut = new BufferedOutputStream(compressedOut, BUFFER_SIZE); return new OutputStreamStreamOutput(compressedOut) { final AtomicBoolean closed = new AtomicBoolean(false); - public void close() throws IOException { try { super.close(); diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/BytesStream.java b/core/src/main/java/org/elasticsearch/common/io/BytesStream.java similarity index 85% rename from core/src/main/java/org/elasticsearch/common/io/stream/BytesStream.java rename to core/src/main/java/org/elasticsearch/common/io/BytesStream.java index c20dcf62c9bb..903c1dcb7996 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/BytesStream.java +++ b/core/src/main/java/org/elasticsearch/common/io/BytesStream.java @@ -17,11 +17,11 @@ * under the License. */ -package org.elasticsearch.common.io.stream; +package org.elasticsearch.common.io; import org.elasticsearch.common.bytes.BytesReference; -public abstract class BytesStream extends StreamOutput { +public interface BytesStream { - public abstract BytesReference bytes(); -} + BytesReference bytes(); +} \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/common/io/ReleasableBytesStream.java b/core/src/main/java/org/elasticsearch/common/io/ReleasableBytesStream.java new file mode 100644 index 000000000000..e31f206bcad9 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/io/ReleasableBytesStream.java @@ -0,0 +1,32 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.io; + +import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; + +/** + * A bytes stream that requires its bytes to be released once no longer used. + */ +public interface ReleasableBytesStream extends BytesStream { + + @Override + ReleasablePagedBytesReference bytes(); + +} diff --git a/core/src/main/java/org/elasticsearch/common/io/Streams.java b/core/src/main/java/org/elasticsearch/common/io/Streams.java index f24b703251b7..f922fde3e753 100644 --- a/core/src/main/java/org/elasticsearch/common/io/Streams.java +++ b/core/src/main/java/org/elasticsearch/common/io/Streams.java @@ -20,9 +20,6 @@ package org.elasticsearch.common.io; import org.apache.lucene.util.IOUtils; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.BytesStream; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.Callback; import java.io.BufferedReader; @@ -239,56 +236,4 @@ public abstract class Streams { } } } - - /** - * Wraps the given {@link BytesStream} in a {@link StreamOutput} that simply flushes when - * close is called. - */ - public static BytesStream flushOnCloseStream(BytesStream os) { - return new FlushOnCloseOutputStream(os); - } - - /** - * A wrapper around a {@link BytesStream} that makes the close operation a flush. This is - * needed as sometimes a stream will be closed but the bytes that the stream holds still need - * to be used and the stream cannot be closed until the bytes have been consumed. - */ - private static class FlushOnCloseOutputStream extends BytesStream { - - private final BytesStream delegate; - - private FlushOnCloseOutputStream(BytesStream bytesStreamOutput) { - this.delegate = bytesStreamOutput; - } - - @Override - public void writeByte(byte b) throws IOException { - delegate.writeByte(b); - } - - @Override - public void writeBytes(byte[] b, int offset, int length) throws IOException { - delegate.writeBytes(b, offset, length); - } - - @Override - public void flush() throws IOException { - delegate.flush(); - } - - @Override - public void close() throws IOException { - flush(); - } - - @Override - public void reset() throws IOException { - delegate.reset(); - } - - @Override - public BytesReference bytes() { - return delegate.bytes(); - } - } } diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java index ab9a1896ef7c..e65e8efb27b0 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java @@ -21,6 +21,7 @@ package org.elasticsearch.common.io.stream; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.PagedBytesReference; +import org.elasticsearch.common.io.BytesStream; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteArray; @@ -30,7 +31,7 @@ import java.io.IOException; * A @link {@link StreamOutput} that uses {@link BigArrays} to acquire pages of * bytes, which avoids frequent reallocation & copying of the internal data. */ -public class BytesStreamOutput extends BytesStream { +public class BytesStreamOutput extends StreamOutput implements BytesStream { protected final BigArrays bigArrays; @@ -49,7 +50,7 @@ public class BytesStreamOutput extends BytesStream { /** * Create a non recycling {@link BytesStreamOutput} with enough initial pages acquired * to satisfy the capacity given by expected size. - * + * * @param expectedSize the expected maximum size of the stream in bytes. */ public BytesStreamOutput(int expectedSize) { @@ -128,7 +129,7 @@ public class BytesStreamOutput extends BytesStream { /** * Returns the current size of the buffer. - * + * * @return the value of the count field, which is the number of valid * bytes in this output stream. * @see java.io.ByteArrayOutputStream#count @@ -150,7 +151,7 @@ public class BytesStreamOutput extends BytesStream { return bytes.ramBytesUsed(); } - void ensureCapacity(long offset) { + private void ensureCapacity(long offset) { if (offset > Integer.MAX_VALUE) { throw new IllegalArgumentException(getClass().getSimpleName() + " cannot hold more than 2GB of data"); } diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java index 347897c2ecba..674ff18f0fc1 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java @@ -20,56 +20,29 @@ package org.elasticsearch.common.io.stream; import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.io.ReleasableBytesStream; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.ByteArray; /** * An bytes stream output that allows providing a {@link BigArrays} instance * expecting it to require releasing its content ({@link #bytes()}) once done. *

- * Please note, closing this stream will release the bytes that are in use by any - * {@link ReleasablePagedBytesReference} returned from {@link #bytes()}, so this - * stream should only be closed after the bytes have been output or copied - * elsewhere. + * Please note, its is the responsibility of the caller to make sure the bytes + * reference do not "escape" and are released only once. */ -public class ReleasableBytesStreamOutput extends BytesStreamOutput - implements Releasable { - - private Releasable releasable; +public class ReleasableBytesStreamOutput extends BytesStreamOutput implements ReleasableBytesStream { public ReleasableBytesStreamOutput(BigArrays bigarrays) { - this(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays); + super(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays); } public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) { super(expectedSize, bigArrays); - this.releasable = Releasables.releaseOnce(this.bytes); } - /** - * Returns a {@link Releasable} implementation of a - * {@link org.elasticsearch.common.bytes.BytesReference} that represents the current state of - * the bytes in the stream. - */ @Override public ReleasablePagedBytesReference bytes() { - return new ReleasablePagedBytesReference(bigArrays, bytes, count, releasable); + return new ReleasablePagedBytesReference(bigArrays, bytes, count); } - @Override - public void close() { - Releasables.close(releasable); - } - - @Override - void ensureCapacity(long offset) { - final ByteArray prevBytes = this.bytes; - super.ensureCapacity(offset); - if (prevBytes != this.bytes) { - // re-create the releasable with the new reference - releasable = Releasables.releaseOnce(this.bytes); - } - } } diff --git a/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java b/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java index f0427ce24666..189e9d3c8d5d 100644 --- a/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java +++ b/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java @@ -22,7 +22,7 @@ package org.elasticsearch.common.xcontent; import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.geo.GeoPoint; -import org.elasticsearch.common.io.stream.BytesStream; +import org.elasticsearch.common.io.BytesStream; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.text.Text; @@ -53,7 +53,7 @@ import java.util.concurrent.TimeUnit; /** * A utility to build XContent (ie json). */ -public final class XContentBuilder implements Releasable, Flushable { +public final class XContentBuilder implements BytesStream, Releasable, Flushable { /** * Create a new {@link XContentBuilder} using the given {@link XContent} content. @@ -1041,6 +1041,7 @@ public final class XContentBuilder implements Releasable, Flushable { return this.generator; } + @Override public BytesReference bytes() { close(); return ((BytesStream) bos).bytes(); diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 4bad4593bc14..d9a8cc408f82 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -439,7 +439,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); } finally { - Releasables.close(out); + Releasables.close(out.bytes()); } } @@ -1332,7 +1332,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC bytes.writeTo(outStream); } } finally { - Releasables.close(out); + Releasables.close(out.bytes()); } } diff --git a/core/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java b/core/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java index 4db9aec6e93e..bdc78c82dd50 100644 --- a/core/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java +++ b/core/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java @@ -20,14 +20,12 @@ package org.elasticsearch.rest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; -import java.io.OutputStream; import java.util.Collections; import java.util.Set; import java.util.function.Predicate; @@ -99,9 +97,7 @@ public abstract class AbstractRestChannel implements RestChannel { excludes = filters.stream().filter(EXCLUDE_FILTER).map(f -> f.substring(1)).collect(toSet()); } - OutputStream unclosableOutputStream = Streams.flushOnCloseStream(bytesOutput()); - XContentBuilder builder = - new XContentBuilder(XContentFactory.xContent(responseContentType), unclosableOutputStream, includes, excludes); + XContentBuilder builder = new XContentBuilder(XContentFactory.xContent(responseContentType), bytesOutput(), includes, excludes); if (pretty) { builder.prettyPrint().lfAtEnd(); } @@ -111,9 +107,8 @@ public abstract class AbstractRestChannel implements RestChannel { } /** - * A channel level bytes output that can be reused. The bytes output is lazily instantiated - * by a call to {@link #newBytesOutput()}. Once the stream is created, it gets reset on each - * call to this method. + * A channel level bytes output that can be reused. It gets reset on each call to this + * method. */ @Override public final BytesStreamOutput bytesOutput() { @@ -125,14 +120,6 @@ public abstract class AbstractRestChannel implements RestChannel { return bytesOut; } - /** - * An accessor to the raw value of the channel bytes output. This method will not instantiate - * a new stream if one does not exist and this method will not reset the stream. - */ - protected final BytesStreamOutput bytesOutputOrNull() { - return bytesOut; - } - protected BytesStreamOutput newBytesOutput() { return new BytesStreamOutput(); } diff --git a/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java b/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java index c16f862fb4af..72ee7efc4890 100644 --- a/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java +++ b/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java @@ -30,7 +30,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; @@ -147,8 +147,8 @@ public class BytesRestResponse extends RestResponse { return builder; } - static BytesRestResponse createSimpleErrorResponse(RestChannel channel, RestStatus status, String errorMessage) throws IOException { - return new BytesRestResponse(status, channel.newErrorBuilder().startObject() + static BytesRestResponse createSimpleErrorResponse(RestStatus status, String errorMessage) throws IOException { + return new BytesRestResponse(status, JsonXContent.contentBuilder().startObject() .field("error", errorMessage) .field("status", status.getStatus()) .endObject()); diff --git a/core/src/main/java/org/elasticsearch/rest/RestController.java b/core/src/main/java/org/elasticsearch/rest/RestController.java index a3d8a4b7db5f..ea603cf949f2 100644 --- a/core/src/main/java/org/elasticsearch/rest/RestController.java +++ b/core/src/main/java/org/elasticsearch/rest/RestController.java @@ -178,9 +178,8 @@ public class RestController extends AbstractComponent implements HttpServerTrans sendContentTypeErrorMessage(request, responseChannel); } else if (contentLength > 0 && handler != null && handler.supportsContentStream() && request.getXContentType() != XContentType.JSON && request.getXContentType() != XContentType.SMILE) { - responseChannel.sendResponse(BytesRestResponse.createSimpleErrorResponse(responseChannel, - RestStatus.NOT_ACCEPTABLE, "Content-Type [" + request.getXContentType() + - "] does not support stream parsing. Use JSON or SMILE instead")); + responseChannel.sendResponse(BytesRestResponse.createSimpleErrorResponse(RestStatus.NOT_ACCEPTABLE, "Content-Type [" + + request.getXContentType() + "] does not support stream parsing. Use JSON or SMILE instead")); } else { if (canTripCircuitBreaker(request)) { inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, ""); @@ -230,8 +229,7 @@ public class RestController extends AbstractComponent implements HttpServerTrans void dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client, ThreadContext threadContext, final RestHandler handler) throws Exception { if (checkRequestParameters(request, channel) == false) { - channel - .sendResponse(BytesRestResponse.createSimpleErrorResponse(channel,BAD_REQUEST, "error traces in responses are disabled.")); + channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(BAD_REQUEST, "error traces in responses are disabled.")); } else { for (String key : headersToCopy) { String httpHeader = request.header(key); @@ -285,7 +283,7 @@ public class RestController extends AbstractComponent implements HttpServerTrans Strings.collectionToCommaDelimitedString(restRequest.getAllHeaderValues("Content-Type")) + "] is not supported"; } - channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, NOT_ACCEPTABLE, errorMessage)); + channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(NOT_ACCEPTABLE, errorMessage)); } /** diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index cbdc0dfa1785..dd75ae295562 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -41,7 +41,7 @@ import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.NotCompressedException; -import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.ReleasableBytesStream; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -1025,8 +1025,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } status = TransportStatus.setRequest(status); ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); - boolean addedReleaseListener = false; - StreamOutput stream = Streams.flushOnCloseStream(bStream); + // we wrap this in a release once since if the onRequestSent callback throws an exception + // we might release things twice and this should be prevented + final Releasable toRelease = Releasables.releaseOnce(() -> Releasables.close(bStream.bytes())); + StreamOutput stream = bStream; try { // only compress if asked, and, the request is not bytes, since then only // the header part is compressed, and the "body" can't be extracted as compressed @@ -1045,17 +1047,12 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i stream.writeString(action); BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream, bStream); final TransportRequestOptions finalOptions = options; - final StreamOutput finalStream = stream; // this might be called in a different thread - SendListener onRequestSent = new SendListener( - () -> IOUtils.closeWhileHandlingException(finalStream, bStream), + SendListener onRequestSent = new SendListener(toRelease, () -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions)); internalSendMessage(targetChannel, message, onRequestSent); - addedReleaseListener = true; } finally { - if (!addedReleaseListener) { - IOUtils.close(stream, bStream); - } + IOUtils.close(stream); } } @@ -1117,8 +1114,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } status = TransportStatus.setResponse(status); // TODO share some code with sendRequest ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); - boolean addedReleaseListener = false; - StreamOutput stream = Streams.flushOnCloseStream(bStream); + // we wrap this in a release once since if the onRequestSent callback throws an exception + // we might release things twice and this should be prevented + final Releasable toRelease = Releasables.releaseOnce(() -> Releasables.close(bStream.bytes())); + StreamOutput stream = bStream; try { if (options.compress()) { status = TransportStatus.setCompress(status); @@ -1129,16 +1128,12 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream, bStream); final TransportResponseOptions finalOptions = options; - final StreamOutput finalStream = stream; // this might be called in a different thread - SendListener listener = new SendListener(() -> IOUtils.closeWhileHandlingException(finalStream, bStream), + SendListener listener = new SendListener(toRelease, () -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions)); internalSendMessage(channel, reference, listener); - addedReleaseListener = true; } finally { - if (!addedReleaseListener) { - IOUtils.close(stream, bStream); - } + IOUtils.close(stream); } } @@ -1166,7 +1161,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i * Serializes the given message into a bytes representation */ private BytesReference buildMessage(long requestId, byte status, Version nodeVersion, TransportMessage message, StreamOutput stream, - ReleasableBytesStreamOutput writtenBytes) throws IOException { + ReleasableBytesStream writtenBytes) throws IOException { final BytesReference zeroCopyBuffer; if (message instanceof BytesTransportRequest) { // what a shitty optimization - we should use a direct send method instead BytesTransportRequest bRequest = (BytesTransportRequest) message; diff --git a/core/src/test/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutputTests.java b/core/src/test/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutputTests.java deleted file mode 100644 index 557721a0241a..000000000000 --- a/core/src/test/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutputTests.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io.stream; - -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.MockBigArrays; -import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; -import org.elasticsearch.test.ESTestCase; - -import java.io.IOException; - -public class ReleasableBytesStreamOutputTests extends ESTestCase { - - public void testRelease() throws Exception { - MockBigArrays mockBigArrays = - new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); - try (ReleasableBytesStreamOutput output = - getRandomReleasableBytesStreamOutput(mockBigArrays)) { - output.writeBoolean(randomBoolean()); - } - MockBigArrays.ensureAllArraysAreReleased(); - } - - private ReleasableBytesStreamOutput getRandomReleasableBytesStreamOutput( - MockBigArrays mockBigArrays) throws IOException { - ReleasableBytesStreamOutput output = new ReleasableBytesStreamOutput(mockBigArrays); - if (randomBoolean()) { - for (int i = 0; i < scaledRandomIntBetween(1, 32); i++) { - output.write(randomByte()); - } - } - return output; - } -} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java index 07e91ec50e44..a4259b41fd82 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java @@ -85,7 +85,7 @@ final class Netty4HttpChannel extends AbstractRestChannel { } @Override - protected BytesStreamOutput newBytesOutput() { + public BytesStreamOutput newBytesOutput() { return new ReleasableBytesStreamOutput(transport.bigArrays); } @@ -114,8 +114,7 @@ final class Netty4HttpChannel extends AbstractRestChannel { addCustomHeaders(resp, threadContext.getResponseHeaders()); BytesReference content = response.content(); - boolean releaseContent = content instanceof Releasable; - boolean releaseBytesStreamOutput = bytesOutputOrNull() instanceof ReleasableBytesStreamOutput; + boolean release = content instanceof Releasable; try { // If our response doesn't specify a content-type header, set one setHeaderField(resp, HttpHeaderNames.CONTENT_TYPE.toString(), response.contentType(), false); @@ -126,14 +125,10 @@ final class Netty4HttpChannel extends AbstractRestChannel { final ChannelPromise promise = channel.newPromise(); - if (releaseContent) { + if (release) { promise.addListener(f -> ((Releasable)content).close()); } - if (releaseBytesStreamOutput) { - promise.addListener(f -> bytesOutputOrNull().close()); - } - if (isCloseConnection()) { promise.addListener(ChannelFutureListener.CLOSE); } @@ -145,15 +140,11 @@ final class Netty4HttpChannel extends AbstractRestChannel { msg = resp; } channel.writeAndFlush(msg, promise); - releaseContent = false; - releaseBytesStreamOutput = false; + release = false; } finally { - if (releaseContent) { + if (release) { ((Releasable) content).close(); } - if (releaseBytesStreamOutput) { - bytesOutputOrNull().close(); - } if (pipelinedRequest != null) { pipelinedRequest.release(); } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java index 7d8101df10ea..c075afd463f4 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java @@ -43,24 +43,18 @@ import io.netty.util.Attribute; import io.netty.util.AttributeKey; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteArray; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.http.HttpTransportSettings; import org.elasticsearch.http.NullDispatcher; import org.elasticsearch.http.netty4.cors.Netty4CorsHandler; import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; -import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; @@ -70,7 +64,6 @@ import org.elasticsearch.transport.netty4.Netty4Utils; import org.junit.After; import org.junit.Before; -import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.SocketAddress; import java.nio.charset.StandardCharsets; @@ -85,7 +78,6 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ENABLED; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -251,37 +243,6 @@ public class Netty4HttpChannelTests extends ESTestCase { } } - public void testReleaseOnSendToChannelAfterException() throws IOException { - final Settings settings = Settings.builder().build(); - final NamedXContentRegistry registry = xContentRegistry(); - try (Netty4HttpServerTransport httpServerTransport = - new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, registry, new NullDispatcher())) { - final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); - final EmbeddedChannel embeddedChannel = new EmbeddedChannel(); - final Netty4HttpRequest request = new Netty4HttpRequest(registry, httpRequest, embeddedChannel); - final HttpPipelinedRequest pipelinedRequest = randomBoolean() ? new HttpPipelinedRequest(request.request(), 1) : null; - final Netty4HttpChannel channel = - new Netty4HttpChannel(httpServerTransport, request, pipelinedRequest, randomBoolean(), threadPool.getThreadContext()); - final BytesRestResponse response = new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, - JsonXContent.contentBuilder().startObject().endObject()); - assertThat(response.content(), not(instanceOf(Releasable.class))); - - // ensure we have reserved bytes - if (randomBoolean()) { - BytesStreamOutput out = channel.bytesOutput(); - assertThat(out, instanceOf(ReleasableBytesStreamOutput.class)); - } else { - try (XContentBuilder builder = channel.newBuilder()) { - // do something builder - builder.startObject().endObject(); - } - } - - channel.sendResponse(response); - // ESTestCase#after will invoke ensureAllArraysAreReleased which will fail if the response content was not released - } - } - public void testConnectionClose() throws Exception { final Settings settings = Settings.builder().build(); try (Netty4HttpServerTransport httpServerTransport = @@ -588,7 +549,7 @@ public class Netty4HttpChannelTests extends ESTestCase { } final ByteArray bigArray = bigArrays.newByteArray(bytes.length); bigArray.set(0, bytes, 0, bytes.length); - reference = new ReleasablePagedBytesReference(bigArrays, bigArray, bytes.length, Releasables.releaseOnce(bigArray)); + reference = new ReleasablePagedBytesReference(bigArrays, bigArray, bytes.length); } @Override