Properly handle empty incremental bulk requests (#112974)

This commit ensures we properly throw exceptions when an empty bulk
request is received with the incremental handling enabled.
This commit is contained in:
Tim Brooks 2024-09-16 17:45:35 -06:00
parent dce8a0bfd3
commit 92daeeba11
3 changed files with 75 additions and 40 deletions

View file

@ -26,12 +26,10 @@ public class RequestsWithoutContentIT extends ESRestTestCase {
assertResponseException(responseException, "request body is required"); assertResponseException(responseException, "request body is required");
} }
@AwaitsFix(bugUrl = "need to decide how to handle this scenario")
public void testBulkMissingBody() throws IOException { public void testBulkMissingBody() throws IOException {
ResponseException responseException = expectThrows( Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
ResponseException.class, request.setJsonEntity("");
() -> client().performRequest(new Request(randomBoolean() ? "POST" : "PUT", "/_bulk")) ResponseException responseException = expectThrows(ResponseException.class, () -> client().performRequest(request));
);
assertResponseException(responseException, "request body is required"); assertResponseException(responseException, "request body is required");
} }

View file

@ -23,11 +23,33 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.rest.RestStatus.OK; import static org.elasticsearch.rest.RestStatus.OK;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0) @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
public class IncrementalBulkRestIT extends HttpSmokeTestCase { public class IncrementalBulkRestIT extends HttpSmokeTestCase {
public void testBulkMissingBody() throws IOException {
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
request.setJsonEntity("");
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("request body is required"));
}
public void testBulkRequestBodyImproperlyTerminated() throws IOException {
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
// missing final line of the bulk body. cannot process
request.setJsonEntity(
"{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
+ "{\"field\":1}\n"
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}"
);
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("could not parse bulk request body"));
}
public void testIncrementalBulk() throws IOException { public void testIncrementalBulk() throws IOException {
Request createRequest = new Request("PUT", "/index_name"); Request createRequest = new Request("PUT", "/index_name");
createRequest.setJsonEntity(""" createRequest.setJsonEntity("""

View file

@ -9,6 +9,7 @@
package org.elasticsearch.rest.action.document; package org.elasticsearch.rest.action.document;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestParser; import org.elasticsearch.action.bulk.BulkRequestParser;
@ -150,6 +151,7 @@ public class RestBulkAction extends BaseRestHandler {
private volatile RestChannel restChannel; private volatile RestChannel restChannel;
private boolean shortCircuited; private boolean shortCircuited;
private int bytesParsed = 0;
private final ArrayDeque<ReleasableBytesReference> unParsedChunks = new ArrayDeque<>(4); private final ArrayDeque<ReleasableBytesReference> unParsedChunks = new ArrayDeque<>(4);
private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4); private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4);
@ -186,6 +188,10 @@ public class RestBulkAction extends BaseRestHandler {
final BytesReference data; final BytesReference data;
int bytesConsumed; int bytesConsumed;
if (chunk.length() == 0) {
chunk.close();
bytesConsumed = 0;
} else {
try { try {
unParsedChunks.add(chunk); unParsedChunks.add(chunk);
@ -214,20 +220,29 @@ public class RestBulkAction extends BaseRestHandler {
isLast == false, isLast == false,
stringDeduplicator stringDeduplicator
); );
bytesParsed += bytesConsumed;
} catch (Exception e) { } catch (Exception e) {
shortCircuit(); shortCircuit();
new RestToXContentListener<>(channel).onFailure(e); new RestToXContentListener<>(channel).onFailure(
new ElasticsearchParseException("could not parse bulk request body", e)
);
return; return;
} }
}
final ArrayList<Releasable> releasables = accountParsing(bytesConsumed); final ArrayList<Releasable> releasables = accountParsing(bytesConsumed);
if (isLast) { if (isLast) {
assert unParsedChunks.isEmpty(); assert unParsedChunks.isEmpty();
if (bytesParsed == 0) {
shortCircuit();
new RestToXContentListener<>(channel).onFailure(new ElasticsearchParseException("request body is required"));
} else {
assert channel != null; assert channel != null;
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items); ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
items.clear(); items.clear();
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel)); handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
}
} else if (items.isEmpty() == false) { } else if (items.isEmpty() == false) {
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items); ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
items.clear(); items.clear();