Properly handle empty incremental bulk requests (#112974)

This commit ensures we properly throw exceptions when an empty bulk
request is received with the incremental handling enabled.
This commit is contained in:
Tim Brooks 2024-09-16 17:45:35 -06:00
parent dce8a0bfd3
commit 92daeeba11
3 changed files with 75 additions and 40 deletions

View file

@ -26,12 +26,10 @@ public class RequestsWithoutContentIT extends ESRestTestCase {
assertResponseException(responseException, "request body is required");
}
@AwaitsFix(bugUrl = "need to decide how to handle this scenario")
public void testBulkMissingBody() throws IOException {
ResponseException responseException = expectThrows(
ResponseException.class,
() -> client().performRequest(new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"))
);
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
request.setJsonEntity("");
ResponseException responseException = expectThrows(ResponseException.class, () -> client().performRequest(request));
assertResponseException(responseException, "request body is required");
}

View file

@ -23,11 +23,33 @@ import java.util.List;
import java.util.Map;
import static org.elasticsearch.rest.RestStatus.OK;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
public class IncrementalBulkRestIT extends HttpSmokeTestCase {
public void testBulkMissingBody() throws IOException {
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
request.setJsonEntity("");
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("request body is required"));
}
public void testBulkRequestBodyImproperlyTerminated() throws IOException {
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
// missing final line of the bulk body. cannot process
request.setJsonEntity(
"{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
+ "{\"field\":1}\n"
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}"
);
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("could not parse bulk request body"));
}
public void testIncrementalBulk() throws IOException {
Request createRequest = new Request("PUT", "/index_name");
createRequest.setJsonEntity("""

View file

@ -9,6 +9,7 @@
package org.elasticsearch.rest.action.document;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestParser;
@ -150,6 +151,7 @@ public class RestBulkAction extends BaseRestHandler {
private volatile RestChannel restChannel;
private boolean shortCircuited;
private int bytesParsed = 0;
private final ArrayDeque<ReleasableBytesReference> unParsedChunks = new ArrayDeque<>(4);
private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4);
@ -186,48 +188,61 @@ public class RestBulkAction extends BaseRestHandler {
final BytesReference data;
int bytesConsumed;
try {
unParsedChunks.add(chunk);
if (chunk.length() == 0) {
chunk.close();
bytesConsumed = 0;
} else {
try {
unParsedChunks.add(chunk);
if (unParsedChunks.size() > 1) {
data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0]));
} else {
data = chunk;
if (unParsedChunks.size() > 1) {
data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0]));
} else {
data = chunk;
}
// TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in
// BulkRequest#add is fine
bytesConsumed = parser.incrementalParse(
data,
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
request.getXContentType(),
(request, type) -> items.add(request),
items::add,
items::add,
isLast == false,
stringDeduplicator
);
bytesParsed += bytesConsumed;
} catch (Exception e) {
shortCircuit();
new RestToXContentListener<>(channel).onFailure(
new ElasticsearchParseException("could not parse bulk request body", e)
);
return;
}
// TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in
// BulkRequest#add is fine
bytesConsumed = parser.incrementalParse(
data,
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
request.getXContentType(),
(request, type) -> items.add(request),
items::add,
items::add,
isLast == false,
stringDeduplicator
);
} catch (Exception e) {
shortCircuit();
new RestToXContentListener<>(channel).onFailure(e);
return;
}
final ArrayList<Releasable> releasables = accountParsing(bytesConsumed);
if (isLast) {
assert unParsedChunks.isEmpty();
assert channel != null;
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
items.clear();
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
if (bytesParsed == 0) {
shortCircuit();
new RestToXContentListener<>(channel).onFailure(new ElasticsearchParseException("request body is required"));
} else {
assert channel != null;
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
items.clear();
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
}
} else if (items.isEmpty() == false) {
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
items.clear();