mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-20 21:27:35 -04:00
Properly handle empty incremental bulk requests (#112974)
This commit ensures we properly throw exceptions when an empty bulk request is received with the incremental handling enabled.
This commit is contained in:
parent
dce8a0bfd3
commit
92daeeba11
3 changed files with 75 additions and 40 deletions
|
@ -26,12 +26,10 @@ public class RequestsWithoutContentIT extends ESRestTestCase {
|
|||
assertResponseException(responseException, "request body is required");
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "need to decide how to handle this scenario")
|
||||
public void testBulkMissingBody() throws IOException {
|
||||
ResponseException responseException = expectThrows(
|
||||
ResponseException.class,
|
||||
() -> client().performRequest(new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"))
|
||||
);
|
||||
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
|
||||
request.setJsonEntity("");
|
||||
ResponseException responseException = expectThrows(ResponseException.class, () -> client().performRequest(request));
|
||||
assertResponseException(responseException, "request body is required");
|
||||
}
|
||||
|
||||
|
|
|
@ -23,11 +23,33 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.rest.RestStatus.OK;
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
|
||||
public class IncrementalBulkRestIT extends HttpSmokeTestCase {
|
||||
|
||||
public void testBulkMissingBody() throws IOException {
|
||||
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
|
||||
request.setJsonEntity("");
|
||||
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
|
||||
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
|
||||
assertThat(responseException.getMessage(), containsString("request body is required"));
|
||||
}
|
||||
|
||||
public void testBulkRequestBodyImproperlyTerminated() throws IOException {
|
||||
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
|
||||
// missing final line of the bulk body. cannot process
|
||||
request.setJsonEntity(
|
||||
"{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
|
||||
+ "{\"field\":1}\n"
|
||||
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}"
|
||||
);
|
||||
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
|
||||
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
|
||||
assertThat(responseException.getMessage(), containsString("could not parse bulk request body"));
|
||||
}
|
||||
|
||||
public void testIncrementalBulk() throws IOException {
|
||||
Request createRequest = new Request("PUT", "/index_name");
|
||||
createRequest.setJsonEntity("""
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
|
||||
package org.elasticsearch.rest.action.document;
|
||||
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.action.DocWriteRequest;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkRequestParser;
|
||||
|
@ -150,6 +151,7 @@ public class RestBulkAction extends BaseRestHandler {
|
|||
|
||||
private volatile RestChannel restChannel;
|
||||
private boolean shortCircuited;
|
||||
private int bytesParsed = 0;
|
||||
private final ArrayDeque<ReleasableBytesReference> unParsedChunks = new ArrayDeque<>(4);
|
||||
private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4);
|
||||
|
||||
|
@ -186,48 +188,61 @@ public class RestBulkAction extends BaseRestHandler {
|
|||
|
||||
final BytesReference data;
|
||||
int bytesConsumed;
|
||||
try {
|
||||
unParsedChunks.add(chunk);
|
||||
if (chunk.length() == 0) {
|
||||
chunk.close();
|
||||
bytesConsumed = 0;
|
||||
} else {
|
||||
try {
|
||||
unParsedChunks.add(chunk);
|
||||
|
||||
if (unParsedChunks.size() > 1) {
|
||||
data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0]));
|
||||
} else {
|
||||
data = chunk;
|
||||
if (unParsedChunks.size() > 1) {
|
||||
data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0]));
|
||||
} else {
|
||||
data = chunk;
|
||||
}
|
||||
|
||||
// TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in
|
||||
// BulkRequest#add is fine
|
||||
bytesConsumed = parser.incrementalParse(
|
||||
data,
|
||||
defaultIndex,
|
||||
defaultRouting,
|
||||
defaultFetchSourceContext,
|
||||
defaultPipeline,
|
||||
defaultRequireAlias,
|
||||
defaultRequireDataStream,
|
||||
defaultListExecutedPipelines,
|
||||
allowExplicitIndex,
|
||||
request.getXContentType(),
|
||||
(request, type) -> items.add(request),
|
||||
items::add,
|
||||
items::add,
|
||||
isLast == false,
|
||||
stringDeduplicator
|
||||
);
|
||||
bytesParsed += bytesConsumed;
|
||||
|
||||
} catch (Exception e) {
|
||||
shortCircuit();
|
||||
new RestToXContentListener<>(channel).onFailure(
|
||||
new ElasticsearchParseException("could not parse bulk request body", e)
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in
|
||||
// BulkRequest#add is fine
|
||||
bytesConsumed = parser.incrementalParse(
|
||||
data,
|
||||
defaultIndex,
|
||||
defaultRouting,
|
||||
defaultFetchSourceContext,
|
||||
defaultPipeline,
|
||||
defaultRequireAlias,
|
||||
defaultRequireDataStream,
|
||||
defaultListExecutedPipelines,
|
||||
allowExplicitIndex,
|
||||
request.getXContentType(),
|
||||
(request, type) -> items.add(request),
|
||||
items::add,
|
||||
items::add,
|
||||
isLast == false,
|
||||
stringDeduplicator
|
||||
);
|
||||
|
||||
} catch (Exception e) {
|
||||
shortCircuit();
|
||||
new RestToXContentListener<>(channel).onFailure(e);
|
||||
return;
|
||||
}
|
||||
|
||||
final ArrayList<Releasable> releasables = accountParsing(bytesConsumed);
|
||||
if (isLast) {
|
||||
assert unParsedChunks.isEmpty();
|
||||
assert channel != null;
|
||||
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
|
||||
items.clear();
|
||||
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
|
||||
if (bytesParsed == 0) {
|
||||
shortCircuit();
|
||||
new RestToXContentListener<>(channel).onFailure(new ElasticsearchParseException("request body is required"));
|
||||
} else {
|
||||
assert channel != null;
|
||||
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
|
||||
items.clear();
|
||||
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
|
||||
}
|
||||
} else if (items.isEmpty() == false) {
|
||||
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
|
||||
items.clear();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue