Revert "Closing a ReleasableBytesStreamOutput closes the underlying BigArray (#23572)"

This reverts commit 6bfecdf921.
This commit is contained in:
Jason Tedor 2017-04-04 20:33:51 -04:00
parent d31d2caf09
commit afd45c1432
17 changed files with 88 additions and 265 deletions

View file

@ -30,17 +30,13 @@ import org.elasticsearch.common.util.ByteArray;
*/ */
public final class ReleasablePagedBytesReference extends PagedBytesReference implements Releasable { public final class ReleasablePagedBytesReference extends PagedBytesReference implements Releasable {
private final Releasable releasable; public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length) {
public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length,
Releasable releasable) {
super(bigarrays, byteArray, length); super(bigarrays, byteArray, length);
this.releasable = releasable;
} }
@Override @Override
public void close() { public void close() {
Releasables.close(releasable); Releasables.close(byteArray);
} }
} }

View file

@ -20,7 +20,6 @@
package org.elasticsearch.common.compress; package org.elasticsearch.common.compress;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -32,9 +31,5 @@ public interface Compressor {
StreamInput streamInput(StreamInput in) throws IOException; StreamInput streamInput(StreamInput in) throws IOException;
/**
* Creates a new stream output that compresses the contents and writes to the provided stream
* output. Closing the returned {@link StreamOutput} will close the provided stream output.
*/
StreamOutput streamOutput(StreamOutput out) throws IOException; StreamOutput streamOutput(StreamOutput out) throws IOException;
} }

View file

@ -20,6 +20,7 @@
package org.elasticsearch.common.compress; package org.elasticsearch.common.compress;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -87,7 +88,6 @@ public class DeflateCompressor implements Compressor {
decompressedIn = new BufferedInputStream(decompressedIn, BUFFER_SIZE); decompressedIn = new BufferedInputStream(decompressedIn, BUFFER_SIZE);
return new InputStreamStreamInput(decompressedIn) { return new InputStreamStreamInput(decompressedIn) {
final AtomicBoolean closed = new AtomicBoolean(false); final AtomicBoolean closed = new AtomicBoolean(false);
public void close() throws IOException { public void close() throws IOException {
try { try {
super.close(); super.close();
@ -107,11 +107,10 @@ public class DeflateCompressor implements Compressor {
final boolean nowrap = true; final boolean nowrap = true;
final Deflater deflater = new Deflater(LEVEL, nowrap); final Deflater deflater = new Deflater(LEVEL, nowrap);
final boolean syncFlush = true; final boolean syncFlush = true;
DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush); OutputStream compressedOut = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush);
OutputStream compressedOut = new BufferedOutputStream(deflaterOutputStream, BUFFER_SIZE); compressedOut = new BufferedOutputStream(compressedOut, BUFFER_SIZE);
return new OutputStreamStreamOutput(compressedOut) { return new OutputStreamStreamOutput(compressedOut) {
final AtomicBoolean closed = new AtomicBoolean(false); final AtomicBoolean closed = new AtomicBoolean(false);
public void close() throws IOException { public void close() throws IOException {
try { try {
super.close(); super.close();

View file

@ -17,11 +17,11 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.common.io.stream; package org.elasticsearch.common.io;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
public abstract class BytesStream extends StreamOutput { public interface BytesStream {
public abstract BytesReference bytes(); BytesReference bytes();
} }

View file

@ -0,0 +1,32 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io;
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
/**
* A bytes stream that requires its bytes to be released once no longer used.
*/
public interface ReleasableBytesStream extends BytesStream {
@Override
ReleasablePagedBytesReference bytes();
}

View file

@ -20,9 +20,6 @@
package org.elasticsearch.common.io; package org.elasticsearch.common.io;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.Callback;
import java.io.BufferedReader; import java.io.BufferedReader;
@ -239,56 +236,4 @@ public abstract class Streams {
} }
} }
} }
/**
* Wraps the given {@link BytesStream} in a {@link StreamOutput} that simply flushes when
* close is called.
*/
public static BytesStream flushOnCloseStream(BytesStream os) {
return new FlushOnCloseOutputStream(os);
}
/**
* A wrapper around a {@link BytesStream} that makes the close operation a flush. This is
* needed as sometimes a stream will be closed but the bytes that the stream holds still need
* to be used and the stream cannot be closed until the bytes have been consumed.
*/
private static class FlushOnCloseOutputStream extends BytesStream {
private final BytesStream delegate;
private FlushOnCloseOutputStream(BytesStream bytesStreamOutput) {
this.delegate = bytesStreamOutput;
}
@Override
public void writeByte(byte b) throws IOException {
delegate.writeByte(b);
}
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
delegate.writeBytes(b, offset, length);
}
@Override
public void flush() throws IOException {
delegate.flush();
}
@Override
public void close() throws IOException {
flush();
}
@Override
public void reset() throws IOException {
delegate.reset();
}
@Override
public BytesReference bytes() {
return delegate.bytes();
}
}
} }

View file

@ -21,6 +21,7 @@ package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.PagedBytesReference; import org.elasticsearch.common.bytes.PagedBytesReference;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray; import org.elasticsearch.common.util.ByteArray;
@ -30,7 +31,7 @@ import java.io.IOException;
* A @link {@link StreamOutput} that uses {@link BigArrays} to acquire pages of * A @link {@link StreamOutput} that uses {@link BigArrays} to acquire pages of
* bytes, which avoids frequent reallocation & copying of the internal data. * bytes, which avoids frequent reallocation & copying of the internal data.
*/ */
public class BytesStreamOutput extends BytesStream { public class BytesStreamOutput extends StreamOutput implements BytesStream {
protected final BigArrays bigArrays; protected final BigArrays bigArrays;
@ -150,7 +151,7 @@ public class BytesStreamOutput extends BytesStream {
return bytes.ramBytesUsed(); return bytes.ramBytesUsed();
} }
void ensureCapacity(long offset) { private void ensureCapacity(long offset) {
if (offset > Integer.MAX_VALUE) { if (offset > Integer.MAX_VALUE) {
throw new IllegalArgumentException(getClass().getSimpleName() + " cannot hold more than 2GB of data"); throw new IllegalArgumentException(getClass().getSimpleName() + " cannot hold more than 2GB of data");
} }

View file

@ -20,56 +20,29 @@
package org.elasticsearch.common.io.stream; package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.io.ReleasableBytesStream;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
/** /**
* An bytes stream output that allows providing a {@link BigArrays} instance * An bytes stream output that allows providing a {@link BigArrays} instance
* expecting it to require releasing its content ({@link #bytes()}) once done. * expecting it to require releasing its content ({@link #bytes()}) once done.
* <p> * <p>
* Please note, closing this stream will release the bytes that are in use by any * Please note, its is the responsibility of the caller to make sure the bytes
* {@link ReleasablePagedBytesReference} returned from {@link #bytes()}, so this * reference do not "escape" and are released only once.
* stream should only be closed after the bytes have been output or copied
* elsewhere.
*/ */
public class ReleasableBytesStreamOutput extends BytesStreamOutput public class ReleasableBytesStreamOutput extends BytesStreamOutput implements ReleasableBytesStream {
implements Releasable {
private Releasable releasable;
public ReleasableBytesStreamOutput(BigArrays bigarrays) { public ReleasableBytesStreamOutput(BigArrays bigarrays) {
this(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays); super(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays);
} }
public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) { public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) {
super(expectedSize, bigArrays); super(expectedSize, bigArrays);
this.releasable = Releasables.releaseOnce(this.bytes);
} }
/**
* Returns a {@link Releasable} implementation of a
* {@link org.elasticsearch.common.bytes.BytesReference} that represents the current state of
* the bytes in the stream.
*/
@Override @Override
public ReleasablePagedBytesReference bytes() { public ReleasablePagedBytesReference bytes() {
return new ReleasablePagedBytesReference(bigArrays, bytes, count, releasable); return new ReleasablePagedBytesReference(bigArrays, bytes, count);
} }
@Override
public void close() {
Releasables.close(releasable);
}
@Override
void ensureCapacity(long offset) {
final ByteArray prevBytes = this.bytes;
super.ensureCapacity(offset);
if (prevBytes != this.bytes) {
// re-create the releasable with the new reference
releasable = Releasables.releaseOnce(this.bytes);
}
}
} }

View file

@ -22,7 +22,7 @@ package org.elasticsearch.common.xcontent;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.BytesStream; import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.text.Text; import org.elasticsearch.common.text.Text;
@ -53,7 +53,7 @@ import java.util.concurrent.TimeUnit;
/** /**
* A utility to build XContent (ie json). * A utility to build XContent (ie json).
*/ */
public final class XContentBuilder implements Releasable, Flushable { public final class XContentBuilder implements BytesStream, Releasable, Flushable {
/** /**
* Create a new {@link XContentBuilder} using the given {@link XContent} content. * Create a new {@link XContentBuilder} using the given {@link XContent} content.
@ -1041,6 +1041,7 @@ public final class XContentBuilder implements Releasable, Flushable {
return this.generator; return this.generator;
} }
@Override
public BytesReference bytes() { public BytesReference bytes() {
close(); close();
return ((BytesStream) bos).bytes(); return ((BytesStream) bos).bytes();

View file

@ -439,7 +439,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} }
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
} finally { } finally {
Releasables.close(out); Releasables.close(out.bytes());
} }
} }
@ -1332,7 +1332,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
bytes.writeTo(outStream); bytes.writeTo(outStream);
} }
} finally { } finally {
Releasables.close(out); Releasables.close(out.bytes());
} }
} }

View file

@ -20,14 +20,12 @@ package org.elasticsearch.rest;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections; import java.util.Collections;
import java.util.Set; import java.util.Set;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -99,9 +97,7 @@ public abstract class AbstractRestChannel implements RestChannel {
excludes = filters.stream().filter(EXCLUDE_FILTER).map(f -> f.substring(1)).collect(toSet()); excludes = filters.stream().filter(EXCLUDE_FILTER).map(f -> f.substring(1)).collect(toSet());
} }
OutputStream unclosableOutputStream = Streams.flushOnCloseStream(bytesOutput()); XContentBuilder builder = new XContentBuilder(XContentFactory.xContent(responseContentType), bytesOutput(), includes, excludes);
XContentBuilder builder =
new XContentBuilder(XContentFactory.xContent(responseContentType), unclosableOutputStream, includes, excludes);
if (pretty) { if (pretty) {
builder.prettyPrint().lfAtEnd(); builder.prettyPrint().lfAtEnd();
} }
@ -111,9 +107,8 @@ public abstract class AbstractRestChannel implements RestChannel {
} }
/** /**
* A channel level bytes output that can be reused. The bytes output is lazily instantiated * A channel level bytes output that can be reused. It gets reset on each call to this
* by a call to {@link #newBytesOutput()}. Once the stream is created, it gets reset on each * method.
* call to this method.
*/ */
@Override @Override
public final BytesStreamOutput bytesOutput() { public final BytesStreamOutput bytesOutput() {
@ -125,14 +120,6 @@ public abstract class AbstractRestChannel implements RestChannel {
return bytesOut; return bytesOut;
} }
/**
* An accessor to the raw value of the channel bytes output. This method will not instantiate
* a new stream if one does not exist and this method will not reset the stream.
*/
protected final BytesStreamOutput bytesOutputOrNull() {
return bytesOut;
}
protected BytesStreamOutput newBytesOutput() { protected BytesStreamOutput newBytesOutput() {
return new BytesStreamOutput(); return new BytesStreamOutput();
} }

View file

@ -30,7 +30,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException; import java.io.IOException;
@ -147,8 +147,8 @@ public class BytesRestResponse extends RestResponse {
return builder; return builder;
} }
static BytesRestResponse createSimpleErrorResponse(RestChannel channel, RestStatus status, String errorMessage) throws IOException { static BytesRestResponse createSimpleErrorResponse(RestStatus status, String errorMessage) throws IOException {
return new BytesRestResponse(status, channel.newErrorBuilder().startObject() return new BytesRestResponse(status, JsonXContent.contentBuilder().startObject()
.field("error", errorMessage) .field("error", errorMessage)
.field("status", status.getStatus()) .field("status", status.getStatus())
.endObject()); .endObject());

View file

@ -178,9 +178,8 @@ public class RestController extends AbstractComponent implements HttpServerTrans
sendContentTypeErrorMessage(request, responseChannel); sendContentTypeErrorMessage(request, responseChannel);
} else if (contentLength > 0 && handler != null && handler.supportsContentStream() && } else if (contentLength > 0 && handler != null && handler.supportsContentStream() &&
request.getXContentType() != XContentType.JSON && request.getXContentType() != XContentType.SMILE) { request.getXContentType() != XContentType.JSON && request.getXContentType() != XContentType.SMILE) {
responseChannel.sendResponse(BytesRestResponse.createSimpleErrorResponse(responseChannel, responseChannel.sendResponse(BytesRestResponse.createSimpleErrorResponse(RestStatus.NOT_ACCEPTABLE, "Content-Type [" +
RestStatus.NOT_ACCEPTABLE, "Content-Type [" + request.getXContentType() + request.getXContentType() + "] does not support stream parsing. Use JSON or SMILE instead"));
"] does not support stream parsing. Use JSON or SMILE instead"));
} else { } else {
if (canTripCircuitBreaker(request)) { if (canTripCircuitBreaker(request)) {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>"); inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
@ -230,8 +229,7 @@ public class RestController extends AbstractComponent implements HttpServerTrans
void dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client, ThreadContext threadContext, void dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client, ThreadContext threadContext,
final RestHandler handler) throws Exception { final RestHandler handler) throws Exception {
if (checkRequestParameters(request, channel) == false) { if (checkRequestParameters(request, channel) == false) {
channel channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(BAD_REQUEST, "error traces in responses are disabled."));
.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel,BAD_REQUEST, "error traces in responses are disabled."));
} else { } else {
for (String key : headersToCopy) { for (String key : headersToCopy) {
String httpHeader = request.header(key); String httpHeader = request.header(key);
@ -285,7 +283,7 @@ public class RestController extends AbstractComponent implements HttpServerTrans
Strings.collectionToCommaDelimitedString(restRequest.getAllHeaderValues("Content-Type")) + "] is not supported"; Strings.collectionToCommaDelimitedString(restRequest.getAllHeaderValues("Content-Type")) + "] is not supported";
} }
channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, NOT_ACCEPTABLE, errorMessage)); channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(NOT_ACCEPTABLE, errorMessage));
} }
/** /**

View file

@ -41,7 +41,7 @@ import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.NotCompressedException; import org.elasticsearch.common.compress.NotCompressedException;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.ReleasableBytesStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -1025,8 +1025,10 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
} }
status = TransportStatus.setRequest(status); status = TransportStatus.setRequest(status);
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
boolean addedReleaseListener = false; // we wrap this in a release once since if the onRequestSent callback throws an exception
StreamOutput stream = Streams.flushOnCloseStream(bStream); // we might release things twice and this should be prevented
final Releasable toRelease = Releasables.releaseOnce(() -> Releasables.close(bStream.bytes()));
StreamOutput stream = bStream;
try { try {
// only compress if asked, and, the request is not bytes, since then only // only compress if asked, and, the request is not bytes, since then only
// the header part is compressed, and the "body" can't be extracted as compressed // the header part is compressed, and the "body" can't be extracted as compressed
@ -1045,17 +1047,12 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
stream.writeString(action); stream.writeString(action);
BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream, bStream); BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream, bStream);
final TransportRequestOptions finalOptions = options; final TransportRequestOptions finalOptions = options;
final StreamOutput finalStream = stream;
// this might be called in a different thread // this might be called in a different thread
SendListener onRequestSent = new SendListener( SendListener onRequestSent = new SendListener(toRelease,
() -> IOUtils.closeWhileHandlingException(finalStream, bStream),
() -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions)); () -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions));
internalSendMessage(targetChannel, message, onRequestSent); internalSendMessage(targetChannel, message, onRequestSent);
addedReleaseListener = true;
} finally { } finally {
if (!addedReleaseListener) { IOUtils.close(stream);
IOUtils.close(stream, bStream);
}
} }
} }
@ -1117,8 +1114,10 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
} }
status = TransportStatus.setResponse(status); // TODO share some code with sendRequest status = TransportStatus.setResponse(status); // TODO share some code with sendRequest
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
boolean addedReleaseListener = false; // we wrap this in a release once since if the onRequestSent callback throws an exception
StreamOutput stream = Streams.flushOnCloseStream(bStream); // we might release things twice and this should be prevented
final Releasable toRelease = Releasables.releaseOnce(() -> Releasables.close(bStream.bytes()));
StreamOutput stream = bStream;
try { try {
if (options.compress()) { if (options.compress()) {
status = TransportStatus.setCompress(status); status = TransportStatus.setCompress(status);
@ -1129,16 +1128,12 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream, bStream); BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream, bStream);
final TransportResponseOptions finalOptions = options; final TransportResponseOptions finalOptions = options;
final StreamOutput finalStream = stream;
// this might be called in a different thread // this might be called in a different thread
SendListener listener = new SendListener(() -> IOUtils.closeWhileHandlingException(finalStream, bStream), SendListener listener = new SendListener(toRelease,
() -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions)); () -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions));
internalSendMessage(channel, reference, listener); internalSendMessage(channel, reference, listener);
addedReleaseListener = true;
} finally { } finally {
if (!addedReleaseListener) { IOUtils.close(stream);
IOUtils.close(stream, bStream);
}
} }
} }
@ -1166,7 +1161,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
* Serializes the given message into a bytes representation * Serializes the given message into a bytes representation
*/ */
private BytesReference buildMessage(long requestId, byte status, Version nodeVersion, TransportMessage message, StreamOutput stream, private BytesReference buildMessage(long requestId, byte status, Version nodeVersion, TransportMessage message, StreamOutput stream,
ReleasableBytesStreamOutput writtenBytes) throws IOException { ReleasableBytesStream writtenBytes) throws IOException {
final BytesReference zeroCopyBuffer; final BytesReference zeroCopyBuffer;
if (message instanceof BytesTransportRequest) { // what a shitty optimization - we should use a direct send method instead if (message instanceof BytesTransportRequest) { // what a shitty optimization - we should use a direct send method instead
BytesTransportRequest bRequest = (BytesTransportRequest) message; BytesTransportRequest bRequest = (BytesTransportRequest) message;

View file

@ -1,51 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
public class ReleasableBytesStreamOutputTests extends ESTestCase {
public void testRelease() throws Exception {
MockBigArrays mockBigArrays =
new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
try (ReleasableBytesStreamOutput output =
getRandomReleasableBytesStreamOutput(mockBigArrays)) {
output.writeBoolean(randomBoolean());
}
MockBigArrays.ensureAllArraysAreReleased();
}
private ReleasableBytesStreamOutput getRandomReleasableBytesStreamOutput(
MockBigArrays mockBigArrays) throws IOException {
ReleasableBytesStreamOutput output = new ReleasableBytesStreamOutput(mockBigArrays);
if (randomBoolean()) {
for (int i = 0; i < scaledRandomIntBetween(1, 32); i++) {
output.write(randomByte());
}
}
return output;
}
}

View file

@ -85,7 +85,7 @@ final class Netty4HttpChannel extends AbstractRestChannel {
} }
@Override @Override
protected BytesStreamOutput newBytesOutput() { public BytesStreamOutput newBytesOutput() {
return new ReleasableBytesStreamOutput(transport.bigArrays); return new ReleasableBytesStreamOutput(transport.bigArrays);
} }
@ -114,8 +114,7 @@ final class Netty4HttpChannel extends AbstractRestChannel {
addCustomHeaders(resp, threadContext.getResponseHeaders()); addCustomHeaders(resp, threadContext.getResponseHeaders());
BytesReference content = response.content(); BytesReference content = response.content();
boolean releaseContent = content instanceof Releasable; boolean release = content instanceof Releasable;
boolean releaseBytesStreamOutput = bytesOutputOrNull() instanceof ReleasableBytesStreamOutput;
try { try {
// If our response doesn't specify a content-type header, set one // If our response doesn't specify a content-type header, set one
setHeaderField(resp, HttpHeaderNames.CONTENT_TYPE.toString(), response.contentType(), false); setHeaderField(resp, HttpHeaderNames.CONTENT_TYPE.toString(), response.contentType(), false);
@ -126,14 +125,10 @@ final class Netty4HttpChannel extends AbstractRestChannel {
final ChannelPromise promise = channel.newPromise(); final ChannelPromise promise = channel.newPromise();
if (releaseContent) { if (release) {
promise.addListener(f -> ((Releasable)content).close()); promise.addListener(f -> ((Releasable)content).close());
} }
if (releaseBytesStreamOutput) {
promise.addListener(f -> bytesOutputOrNull().close());
}
if (isCloseConnection()) { if (isCloseConnection()) {
promise.addListener(ChannelFutureListener.CLOSE); promise.addListener(ChannelFutureListener.CLOSE);
} }
@ -145,15 +140,11 @@ final class Netty4HttpChannel extends AbstractRestChannel {
msg = resp; msg = resp;
} }
channel.writeAndFlush(msg, promise); channel.writeAndFlush(msg, promise);
releaseContent = false; release = false;
releaseBytesStreamOutput = false;
} finally { } finally {
if (releaseContent) { if (release) {
((Releasable) content).close(); ((Releasable) content).close();
} }
if (releaseBytesStreamOutput) {
bytesOutputOrNull().close();
}
if (pipelinedRequest != null) { if (pipelinedRequest != null) {
pipelinedRequest.release(); pipelinedRequest.release();
} }

View file

@ -43,24 +43,18 @@ import io.netty.util.Attribute;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray; import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.http.HttpTransportSettings; import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.http.NullDispatcher; import org.elasticsearch.http.NullDispatcher;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler; import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest; import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -70,7 +64,6 @@ import org.elasticsearch.transport.netty4.Netty4Utils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -85,7 +78,6 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ENABLED;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
@ -251,37 +243,6 @@ public class Netty4HttpChannelTests extends ESTestCase {
} }
} }
public void testReleaseOnSendToChannelAfterException() throws IOException {
final Settings settings = Settings.builder().build();
final NamedXContentRegistry registry = xContentRegistry();
try (Netty4HttpServerTransport httpServerTransport =
new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, registry, new NullDispatcher())) {
final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
final EmbeddedChannel embeddedChannel = new EmbeddedChannel();
final Netty4HttpRequest request = new Netty4HttpRequest(registry, httpRequest, embeddedChannel);
final HttpPipelinedRequest pipelinedRequest = randomBoolean() ? new HttpPipelinedRequest(request.request(), 1) : null;
final Netty4HttpChannel channel =
new Netty4HttpChannel(httpServerTransport, request, pipelinedRequest, randomBoolean(), threadPool.getThreadContext());
final BytesRestResponse response = new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
JsonXContent.contentBuilder().startObject().endObject());
assertThat(response.content(), not(instanceOf(Releasable.class)));
// ensure we have reserved bytes
if (randomBoolean()) {
BytesStreamOutput out = channel.bytesOutput();
assertThat(out, instanceOf(ReleasableBytesStreamOutput.class));
} else {
try (XContentBuilder builder = channel.newBuilder()) {
// do something builder
builder.startObject().endObject();
}
}
channel.sendResponse(response);
// ESTestCase#after will invoke ensureAllArraysAreReleased which will fail if the response content was not released
}
}
public void testConnectionClose() throws Exception { public void testConnectionClose() throws Exception {
final Settings settings = Settings.builder().build(); final Settings settings = Settings.builder().build();
try (Netty4HttpServerTransport httpServerTransport = try (Netty4HttpServerTransport httpServerTransport =
@ -588,7 +549,7 @@ public class Netty4HttpChannelTests extends ESTestCase {
} }
final ByteArray bigArray = bigArrays.newByteArray(bytes.length); final ByteArray bigArray = bigArrays.newByteArray(bytes.length);
bigArray.set(0, bytes, 0, bytes.length); bigArray.set(0, bytes, 0, bytes.length);
reference = new ReleasablePagedBytesReference(bigArrays, bigArray, bytes.length, Releasables.releaseOnce(bigArray)); reference = new ReleasablePagedBytesReference(bigArrays, bigArray, bytes.length);
} }
@Override @Override