diff --git a/core/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java b/core/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java
index ac90e546f7eb..2700ea4dc135 100644
--- a/core/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java
+++ b/core/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java
@@ -30,17 +30,13 @@ import org.elasticsearch.common.util.ByteArray;
*/
public final class ReleasablePagedBytesReference extends PagedBytesReference implements Releasable {
- private final Releasable releasable;
-
- public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length,
- Releasable releasable) {
+ public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length) {
super(bigarrays, byteArray, length);
- this.releasable = releasable;
}
@Override
public void close() {
- Releasables.close(releasable);
+ Releasables.close(byteArray);
}
}
diff --git a/core/src/main/java/org/elasticsearch/common/compress/Compressor.java b/core/src/main/java/org/elasticsearch/common/compress/Compressor.java
index b39e7f6e142f..05706debd371 100644
--- a/core/src/main/java/org/elasticsearch/common/compress/Compressor.java
+++ b/core/src/main/java/org/elasticsearch/common/compress/Compressor.java
@@ -20,7 +20,6 @@
package org.elasticsearch.common.compress;
import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -32,9 +31,5 @@ public interface Compressor {
StreamInput streamInput(StreamInput in) throws IOException;
- /**
- * Creates a new stream output that compresses the contents and writes to the provided stream
- * output. Closing the returned {@link StreamOutput} will close the provided stream output.
- */
StreamOutput streamOutput(StreamOutput out) throws IOException;
}
diff --git a/core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java b/core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java
index 794a8db4960c..42e2efa358cf 100644
--- a/core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java
+++ b/core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java
@@ -20,6 +20,7 @@
package org.elasticsearch.common.compress;
import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
@@ -46,7 +47,7 @@ public class DeflateCompressor implements Compressor {
// It needs to be different from other compressors and to not be specific
// enough so that no stream starting with these bytes could be detected as
// a XContent
- private static final byte[] HEADER = new byte[]{'D', 'F', 'L', '\0'};
+ private static final byte[] HEADER = new byte[] { 'D', 'F', 'L', '\0' };
// 3 is a good trade-off between speed and compression ratio
private static final int LEVEL = 3;
// We use buffering on the input and output of in/def-laters in order to
@@ -87,7 +88,6 @@ public class DeflateCompressor implements Compressor {
decompressedIn = new BufferedInputStream(decompressedIn, BUFFER_SIZE);
return new InputStreamStreamInput(decompressedIn) {
final AtomicBoolean closed = new AtomicBoolean(false);
-
public void close() throws IOException {
try {
super.close();
@@ -107,11 +107,10 @@ public class DeflateCompressor implements Compressor {
final boolean nowrap = true;
final Deflater deflater = new Deflater(LEVEL, nowrap);
final boolean syncFlush = true;
- DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush);
- OutputStream compressedOut = new BufferedOutputStream(deflaterOutputStream, BUFFER_SIZE);
+ OutputStream compressedOut = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush);
+ compressedOut = new BufferedOutputStream(compressedOut, BUFFER_SIZE);
return new OutputStreamStreamOutput(compressedOut) {
final AtomicBoolean closed = new AtomicBoolean(false);
-
public void close() throws IOException {
try {
super.close();
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/BytesStream.java b/core/src/main/java/org/elasticsearch/common/io/BytesStream.java
similarity index 85%
rename from core/src/main/java/org/elasticsearch/common/io/stream/BytesStream.java
rename to core/src/main/java/org/elasticsearch/common/io/BytesStream.java
index c20dcf62c9bb..903c1dcb7996 100644
--- a/core/src/main/java/org/elasticsearch/common/io/stream/BytesStream.java
+++ b/core/src/main/java/org/elasticsearch/common/io/BytesStream.java
@@ -17,11 +17,11 @@
* under the License.
*/
-package org.elasticsearch.common.io.stream;
+package org.elasticsearch.common.io;
import org.elasticsearch.common.bytes.BytesReference;
-public abstract class BytesStream extends StreamOutput {
+public interface BytesStream {
- public abstract BytesReference bytes();
-}
+ BytesReference bytes();
+}
\ No newline at end of file
diff --git a/core/src/main/java/org/elasticsearch/common/io/ReleasableBytesStream.java b/core/src/main/java/org/elasticsearch/common/io/ReleasableBytesStream.java
new file mode 100644
index 000000000000..e31f206bcad9
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/common/io/ReleasableBytesStream.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.io;
+
+import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
+
+/**
+ * A bytes stream that requires its bytes to be released once no longer used.
+ */
+public interface ReleasableBytesStream extends BytesStream {
+
+ @Override
+ ReleasablePagedBytesReference bytes();
+
+}
diff --git a/core/src/main/java/org/elasticsearch/common/io/Streams.java b/core/src/main/java/org/elasticsearch/common/io/Streams.java
index f24b703251b7..f922fde3e753 100644
--- a/core/src/main/java/org/elasticsearch/common/io/Streams.java
+++ b/core/src/main/java/org/elasticsearch/common/io/Streams.java
@@ -20,9 +20,6 @@
package org.elasticsearch.common.io;
import org.apache.lucene.util.IOUtils;
-import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.io.stream.BytesStream;
-import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.Callback;
import java.io.BufferedReader;
@@ -239,56 +236,4 @@ public abstract class Streams {
}
}
}
-
- /**
- * Wraps the given {@link BytesStream} in a {@link StreamOutput} that simply flushes when
- * close is called.
- */
- public static BytesStream flushOnCloseStream(BytesStream os) {
- return new FlushOnCloseOutputStream(os);
- }
-
- /**
- * A wrapper around a {@link BytesStream} that makes the close operation a flush. This is
- * needed as sometimes a stream will be closed but the bytes that the stream holds still need
- * to be used and the stream cannot be closed until the bytes have been consumed.
- */
- private static class FlushOnCloseOutputStream extends BytesStream {
-
- private final BytesStream delegate;
-
- private FlushOnCloseOutputStream(BytesStream bytesStreamOutput) {
- this.delegate = bytesStreamOutput;
- }
-
- @Override
- public void writeByte(byte b) throws IOException {
- delegate.writeByte(b);
- }
-
- @Override
- public void writeBytes(byte[] b, int offset, int length) throws IOException {
- delegate.writeBytes(b, offset, length);
- }
-
- @Override
- public void flush() throws IOException {
- delegate.flush();
- }
-
- @Override
- public void close() throws IOException {
- flush();
- }
-
- @Override
- public void reset() throws IOException {
- delegate.reset();
- }
-
- @Override
- public BytesReference bytes() {
- return delegate.bytes();
- }
- }
}
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java
index ab9a1896ef7c..e65e8efb27b0 100644
--- a/core/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java
+++ b/core/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java
@@ -21,6 +21,7 @@ package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.PagedBytesReference;
+import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
@@ -30,7 +31,7 @@ import java.io.IOException;
* A @link {@link StreamOutput} that uses {@link BigArrays} to acquire pages of
* bytes, which avoids frequent reallocation & copying of the internal data.
*/
-public class BytesStreamOutput extends BytesStream {
+public class BytesStreamOutput extends StreamOutput implements BytesStream {
protected final BigArrays bigArrays;
@@ -49,7 +50,7 @@ public class BytesStreamOutput extends BytesStream {
/**
* Create a non recycling {@link BytesStreamOutput} with enough initial pages acquired
* to satisfy the capacity given by expected size.
- *
+ *
* @param expectedSize the expected maximum size of the stream in bytes.
*/
public BytesStreamOutput(int expectedSize) {
@@ -128,7 +129,7 @@ public class BytesStreamOutput extends BytesStream {
/**
* Returns the current size of the buffer.
- *
+ *
* @return the value of the count
field, which is the number of valid
* bytes in this output stream.
* @see java.io.ByteArrayOutputStream#count
@@ -150,7 +151,7 @@ public class BytesStreamOutput extends BytesStream {
return bytes.ramBytesUsed();
}
- void ensureCapacity(long offset) {
+ private void ensureCapacity(long offset) {
if (offset > Integer.MAX_VALUE) {
throw new IllegalArgumentException(getClass().getSimpleName() + " cannot hold more than 2GB of data");
}
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java
index 347897c2ecba..674ff18f0fc1 100644
--- a/core/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java
+++ b/core/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java
@@ -20,56 +20,29 @@
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
-import org.elasticsearch.common.lease.Releasable;
-import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.io.ReleasableBytesStream;
import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.common.util.ByteArray;
/**
* An bytes stream output that allows providing a {@link BigArrays} instance
* expecting it to require releasing its content ({@link #bytes()}) once done.
*
- * Please note, closing this stream will release the bytes that are in use by any
- * {@link ReleasablePagedBytesReference} returned from {@link #bytes()}, so this
- * stream should only be closed after the bytes have been output or copied
- * elsewhere.
+ * Please note, its is the responsibility of the caller to make sure the bytes
+ * reference do not "escape" and are released only once.
*/
-public class ReleasableBytesStreamOutput extends BytesStreamOutput
- implements Releasable {
-
- private Releasable releasable;
+public class ReleasableBytesStreamOutput extends BytesStreamOutput implements ReleasableBytesStream {
public ReleasableBytesStreamOutput(BigArrays bigarrays) {
- this(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays);
+ super(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays);
}
public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) {
super(expectedSize, bigArrays);
- this.releasable = Releasables.releaseOnce(this.bytes);
}
- /**
- * Returns a {@link Releasable} implementation of a
- * {@link org.elasticsearch.common.bytes.BytesReference} that represents the current state of
- * the bytes in the stream.
- */
@Override
public ReleasablePagedBytesReference bytes() {
- return new ReleasablePagedBytesReference(bigArrays, bytes, count, releasable);
+ return new ReleasablePagedBytesReference(bigArrays, bytes, count);
}
- @Override
- public void close() {
- Releasables.close(releasable);
- }
-
- @Override
- void ensureCapacity(long offset) {
- final ByteArray prevBytes = this.bytes;
- super.ensureCapacity(offset);
- if (prevBytes != this.bytes) {
- // re-create the releasable with the new reference
- releasable = Releasables.releaseOnce(this.bytes);
- }
- }
}
diff --git a/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java b/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java
index f0427ce24666..189e9d3c8d5d 100644
--- a/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java
+++ b/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java
@@ -22,7 +22,7 @@ package org.elasticsearch.common.xcontent;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.GeoPoint;
-import org.elasticsearch.common.io.stream.BytesStream;
+import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.text.Text;
@@ -53,7 +53,7 @@ import java.util.concurrent.TimeUnit;
/**
* A utility to build XContent (ie json).
*/
-public final class XContentBuilder implements Releasable, Flushable {
+public final class XContentBuilder implements BytesStream, Releasable, Flushable {
/**
* Create a new {@link XContentBuilder} using the given {@link XContent} content.
@@ -1041,6 +1041,7 @@ public final class XContentBuilder implements Releasable, Flushable {
return this.generator;
}
+ @Override
public BytesReference bytes() {
close();
return ((BytesStream) bos).bytes();
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index 4bad4593bc14..d9a8cc408f82 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -439,7 +439,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
} finally {
- Releasables.close(out);
+ Releasables.close(out.bytes());
}
}
@@ -1332,7 +1332,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
bytes.writeTo(outStream);
}
} finally {
- Releasables.close(out);
+ Releasables.close(out.bytes());
}
}
diff --git a/core/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java b/core/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java
index 4db9aec6e93e..bdc78c82dd50 100644
--- a/core/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java
+++ b/core/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java
@@ -20,14 +20,12 @@ package org.elasticsearch.rest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
-import java.io.OutputStream;
import java.util.Collections;
import java.util.Set;
import java.util.function.Predicate;
@@ -99,9 +97,7 @@ public abstract class AbstractRestChannel implements RestChannel {
excludes = filters.stream().filter(EXCLUDE_FILTER).map(f -> f.substring(1)).collect(toSet());
}
- OutputStream unclosableOutputStream = Streams.flushOnCloseStream(bytesOutput());
- XContentBuilder builder =
- new XContentBuilder(XContentFactory.xContent(responseContentType), unclosableOutputStream, includes, excludes);
+ XContentBuilder builder = new XContentBuilder(XContentFactory.xContent(responseContentType), bytesOutput(), includes, excludes);
if (pretty) {
builder.prettyPrint().lfAtEnd();
}
@@ -111,9 +107,8 @@ public abstract class AbstractRestChannel implements RestChannel {
}
/**
- * A channel level bytes output that can be reused. The bytes output is lazily instantiated
- * by a call to {@link #newBytesOutput()}. Once the stream is created, it gets reset on each
- * call to this method.
+ * A channel level bytes output that can be reused. It gets reset on each call to this
+ * method.
*/
@Override
public final BytesStreamOutput bytesOutput() {
@@ -125,14 +120,6 @@ public abstract class AbstractRestChannel implements RestChannel {
return bytesOut;
}
- /**
- * An accessor to the raw value of the channel bytes output. This method will not instantiate
- * a new stream if one does not exist and this method will not reset the stream.
- */
- protected final BytesStreamOutput bytesOutputOrNull() {
- return bytesOut;
- }
-
protected BytesStreamOutput newBytesOutput() {
return new BytesStreamOutput();
}
diff --git a/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java b/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java
index c16f862fb4af..72ee7efc4890 100644
--- a/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java
+++ b/core/src/main/java/org/elasticsearch/rest/BytesRestResponse.java
@@ -30,7 +30,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
@@ -147,8 +147,8 @@ public class BytesRestResponse extends RestResponse {
return builder;
}
- static BytesRestResponse createSimpleErrorResponse(RestChannel channel, RestStatus status, String errorMessage) throws IOException {
- return new BytesRestResponse(status, channel.newErrorBuilder().startObject()
+ static BytesRestResponse createSimpleErrorResponse(RestStatus status, String errorMessage) throws IOException {
+ return new BytesRestResponse(status, JsonXContent.contentBuilder().startObject()
.field("error", errorMessage)
.field("status", status.getStatus())
.endObject());
diff --git a/core/src/main/java/org/elasticsearch/rest/RestController.java b/core/src/main/java/org/elasticsearch/rest/RestController.java
index a3d8a4b7db5f..ea603cf949f2 100644
--- a/core/src/main/java/org/elasticsearch/rest/RestController.java
+++ b/core/src/main/java/org/elasticsearch/rest/RestController.java
@@ -178,9 +178,8 @@ public class RestController extends AbstractComponent implements HttpServerTrans
sendContentTypeErrorMessage(request, responseChannel);
} else if (contentLength > 0 && handler != null && handler.supportsContentStream() &&
request.getXContentType() != XContentType.JSON && request.getXContentType() != XContentType.SMILE) {
- responseChannel.sendResponse(BytesRestResponse.createSimpleErrorResponse(responseChannel,
- RestStatus.NOT_ACCEPTABLE, "Content-Type [" + request.getXContentType() +
- "] does not support stream parsing. Use JSON or SMILE instead"));
+ responseChannel.sendResponse(BytesRestResponse.createSimpleErrorResponse(RestStatus.NOT_ACCEPTABLE, "Content-Type [" +
+ request.getXContentType() + "] does not support stream parsing. Use JSON or SMILE instead"));
} else {
if (canTripCircuitBreaker(request)) {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "