diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index 5e74262fa20e..41caf2625a23 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -108,6 +108,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.rankeval.RankEvalRequest; import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.protocol.xpack.XPackInfoRequest; @@ -866,6 +867,32 @@ final class RequestConverters { return request; } + static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException { + String endpoint = + endpoint(deleteByQueryRequest.indices(), deleteByQueryRequest.getDocTypes(), "_delete_by_query"); + Request request = new Request(HttpPost.METHOD_NAME, endpoint); + Params params = new Params(request) + .withRouting(deleteByQueryRequest.getRouting()) + .withRefresh(deleteByQueryRequest.isRefresh()) + .withTimeout(deleteByQueryRequest.getTimeout()) + .withWaitForActiveShards(deleteByQueryRequest.getWaitForActiveShards()) + .withIndicesOptions(deleteByQueryRequest.indicesOptions()); + if (deleteByQueryRequest.isAbortOnVersionConflict() == false) { + params.putParam("conflicts", "proceed"); + } + if (deleteByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) { + params.putParam("scroll_size", Integer.toString(deleteByQueryRequest.getBatchSize())); + } + if (deleteByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) { + params.putParam("scroll", deleteByQueryRequest.getScrollTime()); + } + if (deleteByQueryRequest.getSize() > 0) { + params.putParam("size", Integer.toString(deleteByQueryRequest.getSize())); + } + request.setEntity(createEntity(deleteByQueryRequest, REQUEST_BODY_CONTENT_TYPE)); + return request; + } + static Request rollover(RolloverRequest rolloverRequest) throws IOException { String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover") .addPathPart(rolloverRequest.getNewIndexName()).build(); @@ -1174,10 +1201,10 @@ final class RequestConverters { static Request xPackGraphExplore(GraphExploreRequest exploreRequest) throws IOException { String endpoint = endpoint(exploreRequest.indices(), exploreRequest.types(), "_xpack/graph/_explore"); Request request = new Request(HttpGet.METHOD_NAME, endpoint); - request.setEntity(createEntity(exploreRequest, REQUEST_BODY_CONTENT_TYPE)); + request.setEntity(createEntity(exploreRequest, REQUEST_BODY_CONTENT_TYPE)); return request; - } - + } + static Request xPackWatcherPutWatch(PutWatchRequest putWatchRequest) { String endpoint = new EndpointBuilder() .addPathPartAsIs("_xpack") diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index 6e3c5a6fb831..f11d7cc15732 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -65,6 +65,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.rankeval.RankEvalRequest; import org.elasticsearch.index.rankeval.RankEvalResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.plugins.spi.NamedXContentProvider; @@ -328,7 +329,7 @@ public class RestHighLevelClient implements Closeable { * Watcher APIs on elastic.co for more information. */ public WatcherClient watcher() { return watcherClient; } - + /** * Provides methods for accessing the Elastic Licensed Graph explore API that * is shipped with the default distribution of Elasticsearch. All of @@ -337,7 +338,7 @@ public class RestHighLevelClient implements Closeable { * See the * Graph API on elastic.co for more information. */ - public GraphClient graph() { return graphClient; } + public GraphClient graph() { return graphClient; } /** * Provides methods for accessing the Elastic Licensed Licensing APIs that @@ -454,6 +455,35 @@ public class RestHighLevelClient implements Closeable { ); } + /** + * Executes a delete by query request. + * See + * Delete By Query API on elastic.co + * @param deleteByQueryRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public final BulkByScrollResponse deleteByQuery(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options) throws IOException { + return performRequestAndParseEntity( + deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, emptySet() + ); + } + + /** + * Asynchronously executes a delete by query request. + * See + * Delete By Query API on elastic.co + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + */ + public final void deleteByQueryAsync(DeleteByQueryRequest reindexRequest, RequestOptions options, + ActionListener listener) { + performRequestAsyncAndParseEntity( + reindexRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet() + ); + } + /** * Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index e02d9f451ebe..feb57bed9c46 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -36,6 +36,7 @@ import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; @@ -50,6 +51,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.RestStatus; @@ -758,6 +760,52 @@ public class CrudIT extends ESRestHighLevelClientTestCase { } } + public void testDeleteByQuery() throws IOException { + final String sourceIndex = "source1"; + { + // Prepare + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(sourceIndex, settings); + assertEquals( + RestStatus.OK, + highLevelClient().bulk( + new BulkRequest() + .add(new IndexRequest(sourceIndex, "type", "1") + .source(Collections.singletonMap("foo", 1), XContentType.JSON)) + .add(new IndexRequest(sourceIndex, "type", "2") + .source(Collections.singletonMap("foo", 2), XContentType.JSON)) + .setRefreshPolicy(RefreshPolicy.IMMEDIATE), + RequestOptions.DEFAULT + ).status() + ); + } + { + // test1: delete one doc + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); + deleteByQueryRequest.indices(sourceIndex); + deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1").types("type")); + deleteByQueryRequest.setRefresh(true); + BulkByScrollResponse bulkResponse = + execute(deleteByQueryRequest, highLevelClient()::deleteByQuery, highLevelClient()::deleteByQueryAsync); + assertEquals(1, bulkResponse.getTotal()); + assertEquals(1, bulkResponse.getDeleted()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + assertEquals( + 1, + highLevelClient().search(new SearchRequest(sourceIndex), RequestOptions.DEFAULT).getHits().totalHits + ); + } + } + public void testBulkProcessorIntegration() throws IOException { int nbItems = randomIntBetween(10, 100); boolean[] errors = new boolean[nbItems]; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 92930d14cf4a..a0597d3b7947 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -127,6 +127,7 @@ import org.elasticsearch.index.rankeval.RankEvalRequest; import org.elasticsearch.index.rankeval.RankEvalSpec; import org.elasticsearch.index.rankeval.RatedRequest; import org.elasticsearch.index.rankeval.RestRankEvalAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.RemoteInfo; import org.elasticsearch.index.reindex.UpdateByQueryRequest; @@ -526,6 +527,53 @@ public class RequestConvertersTests extends ESTestCase { assertToXContentBody(updateByQueryRequest, request.getEntity()); } + public void testDeleteByQuery() throws IOException { + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); + deleteByQueryRequest.indices(randomIndicesNames(1, 5)); + Map expectedParams = new HashMap<>(); + if (randomBoolean()) { + deleteByQueryRequest.setDocTypes(generateRandomStringArray(5, 5, false, false)); + } + if (randomBoolean()) { + int batchSize = randomInt(100); + deleteByQueryRequest.setBatchSize(batchSize); + expectedParams.put("scroll_size", Integer.toString(batchSize)); + } + if (randomBoolean()) { + deleteByQueryRequest.setRouting("=cat"); + expectedParams.put("routing", "=cat"); + } + if (randomBoolean()) { + int size = randomIntBetween(100, 1000); + deleteByQueryRequest.setSize(size); + expectedParams.put("size", Integer.toString(size)); + } + if (randomBoolean()) { + deleteByQueryRequest.setAbortOnVersionConflict(false); + expectedParams.put("conflicts", "proceed"); + } + if (randomBoolean()) { + String ts = randomTimeValue(); + deleteByQueryRequest.setScroll(TimeValue.parseTimeValue(ts, "scroll")); + expectedParams.put("scroll", ts); + } + if (randomBoolean()) { + deleteByQueryRequest.setQuery(new TermQueryBuilder("foo", "fooval")); + } + setRandomIndicesOptions(deleteByQueryRequest::setIndicesOptions, deleteByQueryRequest::indicesOptions, expectedParams); + setRandomTimeout(deleteByQueryRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams); + Request request = RequestConverters.deleteByQuery(deleteByQueryRequest); + StringJoiner joiner = new StringJoiner("/", "/", ""); + joiner.add(String.join(",", deleteByQueryRequest.indices())); + if (deleteByQueryRequest.getDocTypes().length > 0) + joiner.add(String.join(",", deleteByQueryRequest.getDocTypes())); + joiner.add("_delete_by_query"); + assertEquals(joiner.toString(), request.getEndpoint()); + assertEquals(HttpPost.METHOD_NAME, request.getMethod()); + assertEquals(expectedParams, request.getParameters()); + assertToXContentBody(deleteByQueryRequest, request.getEntity()); + } + public void testPutMapping() throws IOException { PutMappingRequest putMappingRequest = new PutMappingRequest(); @@ -2720,7 +2768,7 @@ public class RequestConvertersTests extends ESTestCase { request.getEntity().writeTo(bos); assertThat(bos.toString("UTF-8"), is(body)); } - + public void testGraphExplore() throws Exception { Map expectedParams = new HashMap<>(); @@ -2748,7 +2796,7 @@ public class RequestConvertersTests extends ESTestCase { assertEquals(expectedParams, request.getParameters()); assertThat(request.getEntity().getContentType().getValue(), is(XContentType.JSON.mediaTypeWithoutParameters())); assertToXContentBody(graphExploreRequest, request.getEntity()); - } + } public void testXPackDeleteWatch() { DeleteWatchRequest deleteWatchRequest = new DeleteWatchRequest(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index 7df4e0725768..dcc6a51dabac 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -649,7 +649,6 @@ public class RestHighLevelClientTests extends ESTestCase { "cluster.remote_info", "count", "create", - "delete_by_query", "exists_source", "get_source", "indices.delete_alias", diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java index ac9d42c65ca5..142eacd820ff 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java @@ -65,6 +65,7 @@ import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.RemoteInfo; import org.elasticsearch.index.reindex.ScrollableHitSource; @@ -1020,6 +1021,113 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase { } } + public void testDeleteByQuery() throws Exception { + RestHighLevelClient client = highLevelClient(); + { + String mapping = + "\"doc\": {\n" + + " \"properties\": {\n" + + " \"user\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"field1\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"field2\": {\n" + + " \"type\": \"integer\"\n" + + " }\n" + + " }\n" + + " }"; + createIndex("source1", Settings.EMPTY, mapping); + createIndex("source2", Settings.EMPTY, mapping); + } + { + // tag::delete-by-query-request + DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2"); // <1> + // end::delete-by-query-request + // tag::delete-by-query-request-conflicts + request.setConflicts("proceed"); // <1> + // end::delete-by-query-request-conflicts + // tag::delete-by-query-request-typeOrQuery + request.setDocTypes("doc"); // <1> + request.setQuery(new TermQueryBuilder("user", "kimchy")); // <2> + // end::delete-by-query-request-typeOrQuery + // tag::delete-by-query-request-size + request.setSize(10); // <1> + // end::delete-by-query-request-size + // tag::delete-by-query-request-scrollSize + request.setBatchSize(100); // <1> + // end::delete-by-query-request-scrollSize + // tag::delete-by-query-request-timeout + request.setTimeout(TimeValue.timeValueMinutes(2)); // <1> + // end::delete-by-query-request-timeout + // tag::delete-by-query-request-refresh + request.setRefresh(true); // <1> + // end::delete-by-query-request-refresh + // tag::delete-by-query-request-slices + request.setSlices(2); // <1> + // end::delete-by-query-request-slices + // tag::delete-by-query-request-scroll + request.setScroll(TimeValue.timeValueMinutes(10)); // <1> + // end::delete-by-query-request-scroll + // tag::delete-by-query-request-routing + request.setRouting("=cat"); // <1> + // end::delete-by-query-request-routing + // tag::delete-by-query-request-indicesOptions + request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); // <1> + // end::delete-by-query-request-indicesOptions + + // tag::delete-by-query-execute + BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT); + // end::delete-by-query-execute + assertSame(0, bulkResponse.getSearchFailures().size()); + assertSame(0, bulkResponse.getBulkFailures().size()); + // tag::delete-by-query-response + TimeValue timeTaken = bulkResponse.getTook(); // <1> + boolean timedOut = bulkResponse.isTimedOut(); // <2> + long totalDocs = bulkResponse.getTotal(); // <3> + long deletedDocs = bulkResponse.getDeleted(); // <4> + long batches = bulkResponse.getBatches(); // <5> + long noops = bulkResponse.getNoops(); // <6> + long versionConflicts = bulkResponse.getVersionConflicts(); // <7> + long bulkRetries = bulkResponse.getBulkRetries(); // <8> + long searchRetries = bulkResponse.getSearchRetries(); // <9> + TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); // <10> + TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil(); // <11> + List searchFailures = bulkResponse.getSearchFailures(); // <12> + List bulkFailures = bulkResponse.getBulkFailures(); // <13> + // end::delete-by-query-response + } + { + DeleteByQueryRequest request = new DeleteByQueryRequest(); + request.indices("source1"); + + // tag::delete-by-query-execute-listener + ActionListener listener = new ActionListener() { + @Override + public void onResponse(BulkByScrollResponse bulkResponse) { + // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::delete-by-query-execute-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::delete-by-query-execute-async + client.deleteByQueryAsync(request, RequestOptions.DEFAULT, listener); // <1> + // end::delete-by-query-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + } + public void testGet() throws Exception { RestHighLevelClient client = highLevelClient(); { diff --git a/docs/java-rest/high-level/document/delete-by-query.asciidoc b/docs/java-rest/high-level/document/delete-by-query.asciidoc new file mode 100644 index 000000000000..5ec246a9121e --- /dev/null +++ b/docs/java-rest/high-level/document/delete-by-query.asciidoc @@ -0,0 +1,163 @@ +[[java-rest-high-document-delete-by-query]] +=== Delete By Query API + +[[java-rest-high-document-delete-by-query-request]] +==== Delete By Query Request + +A `DeleteByQueryRequest` can be used to delete documents from an index. It requires an existing index (or a set of indices) +on which deletion is to be performed. + +The simplest form of a `DeleteByQueryRequest` looks like: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request] +-------------------------------------------------- +<1> Creates the `DeleteByQueryRequest` on a set of indices. + +By default version conflicts abort the `DeleteByQueryRequest` process but you can just count them by settings it to +`proceed` in the request body + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-conflicts] +-------------------------------------------------- +<1> Set `proceed` on version conflict + +You can limit the documents by adding a type to the source or by adding a query. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-typeOrQuery] +-------------------------------------------------- +<1> Only copy `doc` type +<2> Only copy documents which have field `user` set to `kimchy` + +It’s also possible to limit the number of processed documents by setting size. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-size] +-------------------------------------------------- +<1> Only copy 10 documents + +By default `DeleteByQueryRequest` uses batches of 1000. You can change the batch size with `setBatchSize`. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-scrollSize] +-------------------------------------------------- +<1> Use batches of 100 documents + +`DeleteByQueryRequest` also helps in automatically parallelizing using `sliced-scroll` to +slice on `_uid`. Use `setSlices` to specify the number of slices to use. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-slices] +-------------------------------------------------- +<1> set number of slices to use + +`DeleteByQueryRequest` uses the `scroll` parameter to control how long it keeps the "search context" alive. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-scroll] +-------------------------------------------------- +<1> set scroll time + +If you provide routing then the routing is copied to the scroll query, limiting the process to the shards that match +that routing value. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-routing] +-------------------------------------------------- +<1> set routing + + +==== Optional arguments +In addition to the options above the following arguments can optionally be also provided: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-timeout] +-------------------------------------------------- +<1> Timeout to wait for the delete by query request to be performed as a `TimeValue` + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-refresh] +-------------------------------------------------- +<1> Refresh index after calling delete by query + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-indicesOptions] +-------------------------------------------------- +<1> Set indices options + + +[[java-rest-high-document-delete-by-query-sync]] +==== Synchronous Execution + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-execute] +-------------------------------------------------- + +[[java-rest-high-document-delete-by-query-async]] +==== Asynchronous Execution + +The asynchronous execution of an delete by query request requires both the `DeleteByQueryRequest` +instance and an `ActionListener` instance to be passed to the asynchronous +method: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-execute-async] +-------------------------------------------------- +<1> The `DeleteByQueryRequest` to execute and the `ActionListener` to use when +the execution completes + +The asynchronous method does not block and returns immediately. Once it is +completed the `ActionListener` is called back using the `onResponse` method +if the execution successfully completed or using the `onFailure` method if +it failed. + +A typical listener for `BulkByScrollResponse` looks like: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-execute-listener] +-------------------------------------------------- +<1> Called when the execution is successfully completed. The response is +provided as an argument and contains a list of individual results for each +operation that was executed. Note that one or more operations might have +failed while the others have been successfully executed. +<2> Called when the whole `DeleteByQueryRequest` fails. In this case the raised +exception is provided as an argument and no operation has been executed. + +[[java-rest-high-document-delete-by-query-execute-listener-response]] +==== Delete By Query Response + +The returned `BulkByScrollResponse` contains information about the executed operations and + allows to iterate over each result as follows: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-response] +-------------------------------------------------- +<1> Get total time taken +<2> Check if the request timed out +<3> Get total number of docs processed +<4> Number of docs that were deleted +<5> Number of batches that were executed +<6> Number of skipped docs +<7> Number of version conflicts +<8> Number of times request had to retry bulk index operations +<9> Number of times request had to retry search operations +<10> The total time this request has throttled itself not including the current throttle time if it is currently sleeping +<11> Remaining delay of any current throttle sleep or 0 if not sleeping +<12> Failures during search phase +<13> Failures during bulk index operation diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index b39c83b69131..481a2470aa2d 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -17,6 +17,7 @@ Multi-document APIs:: * <> * <> * <> +* <> include::document/index.asciidoc[] include::document/get.asciidoc[] @@ -27,6 +28,7 @@ include::document/bulk.asciidoc[] include::document/multi-get.asciidoc[] include::document/reindex.asciidoc[] include::document/update-by-query.asciidoc[] +include::document/delete-by-query.asciidoc[] == Search APIs diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java index e0ebaa85193d..be232ca7c402 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.reindex; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.RestController; @@ -56,7 +55,7 @@ public class RestDeleteByQueryAction extends AbstractBulkByQueryRestHandler> consumers = new HashMap<>(); consumers.put("conflicts", o -> internal.setConflicts((String) o)); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java index 30621ab607bf..574a41181e56 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.Version; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -103,7 +102,7 @@ public class RoundTripTests extends ESTestCase { } public void testDeleteByQueryRequest() throws IOException { - DeleteByQueryRequest delete = new DeleteByQueryRequest(new SearchRequest()); + DeleteByQueryRequest delete = new DeleteByQueryRequest(); randomRequest(delete); DeleteByQueryRequest tripped = new DeleteByQueryRequest(toInputByteStream(delete)); assertRequestEquals(delete, tripped); diff --git a/server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java index f848e8722c71..2713e5e2661d 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java @@ -24,6 +24,9 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.tasks.TaskId; import java.io.IOException; @@ -47,12 +50,18 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; *
  • it's results won't be visible until the index is refreshed.
  • * */ -public class DeleteByQueryRequest extends AbstractBulkByScrollRequest implements IndicesRequest.Replaceable { +public class DeleteByQueryRequest extends AbstractBulkByScrollRequest + implements IndicesRequest.Replaceable, ToXContentObject { public DeleteByQueryRequest() { + this(new SearchRequest()); } - public DeleteByQueryRequest(SearchRequest search) { + public DeleteByQueryRequest(String... indices) { + this(new SearchRequest(indices)); + } + + DeleteByQueryRequest(SearchRequest search) { this(search, true); } @@ -68,6 +77,78 @@ public class DeleteByQueryRequest extends AbstractBulkByScrollRequest listener.onResponse(true), listener::onFailure)); } @@ -142,9 +140,9 @@ public class JobDataDeleter { DeleteByQueryHolder deleteByQueryHolder = new DeleteByQueryHolder(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)); deleteByQueryHolder.dbqRequest.setRefresh(false); - deleteByQueryHolder.searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); + deleteByQueryHolder.dbqRequest.setIndicesOptions(IndicesOptions.lenientExpandOpen()); QueryBuilder qb = QueryBuilders.termQuery(Result.IS_INTERIM.getPreferredName(), true); - deleteByQueryHolder.searchRequest.source(new SearchSourceBuilder().query(new ConstantScoreQueryBuilder(qb))); + deleteByQueryHolder.dbqRequest.setQuery(new ConstantScoreQueryBuilder(qb)); try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) { client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest).get(); @@ -156,13 +154,11 @@ public class JobDataDeleter { // Wrapper to ensure safety private static class DeleteByQueryHolder { - private final SearchRequest searchRequest; private final DeleteByQueryRequest dbqRequest; private DeleteByQueryHolder(String index) { - // The search request has to be constructed and passed to the DeleteByQueryRequest before more details are set to it - searchRequest = new SearchRequest(index); - dbqRequest = new DeleteByQueryRequest(searchRequest); + dbqRequest = new DeleteByQueryRequest(); + dbqRequest.indices(index); dbqRequest.setSlices(5); dbqRequest.setAbortOnVersionConflict(false); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobStorageDeletionTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobStorageDeletionTask.java index 19cb42a220ed..61ed8ed4e118 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobStorageDeletionTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobStorageDeletionTask.java @@ -13,7 +13,6 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; @@ -28,7 +27,6 @@ import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction; @@ -95,12 +93,11 @@ public class JobStorageDeletionTask extends Task { ActionListener deleteCategorizerStateHandler = ActionListener.wrap( response -> { logger.info("Running DBQ on [" + indexName + "," + indexPattern + "] for job [" + jobId + "]"); - SearchRequest searchRequest = new SearchRequest(indexName, indexPattern); - DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); + DeleteByQueryRequest request = new DeleteByQueryRequest(indexName, indexPattern); ConstantScoreQueryBuilder query = new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)); - searchRequest.source(new SearchSourceBuilder().query(query)); - searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); + request.setQuery(query); + request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); request.setSlices(5); request.setAbortOnVersionConflict(false); request.setRefresh(true); @@ -125,14 +122,13 @@ public class JobStorageDeletionTask extends Task { private void deleteQuantiles(String jobId, Client client, ActionListener finishedHandler) { // The quantiles type and doc ID changed in v5.5 so delete both the old and new format - SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexName()); - DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); + DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName()); // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId), // TODO: remove in 7.0 Quantiles.v54DocumentId(jobId)); - searchRequest.source(new SearchSourceBuilder().query(query)); - searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); + request.setQuery(query); + request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); request.setAbortOnVersionConflict(false); request.setRefresh(true); @@ -162,14 +158,13 @@ public class JobStorageDeletionTask extends Task { private void deleteCategorizerState(String jobId, Client client, int docNum, ActionListener finishedHandler) { // The categorizer state type and doc ID changed in v5.5 so delete both the old and new format - SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexName()); - DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); + DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName()); // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum), // TODO: remove in 7.0 CategorizerState.v54DocumentId(jobId, docNum)); - searchRequest.source(new SearchSourceBuilder().query(query)); - searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); + request.setQuery(query); + request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); request.setAbortOnVersionConflict(false); request.setRefresh(true); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java index 2facfd4678e1..4c923f2f77ce 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -18,7 +17,6 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.MlMetaIndex; @@ -76,15 +74,12 @@ public class TransportDeleteCalendarAction extends HandledTransportAction forecastsToDelete) { - SearchRequest searchRequest = new SearchRequest(); - // We need to create the DeleteByQueryRequest before we modify the SearchRequest - // because the constructor of the former wipes the latter - DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); + DeleteByQueryRequest request = new DeleteByQueryRequest(); request.setSlices(5); - searchRequest.indices(RESULTS_INDEX_PATTERN); + request.indices(RESULTS_INDEX_PATTERN); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1); boolQuery.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE)); @@ -157,7 +154,7 @@ public class ExpiredForecastsRemover implements MlDataRemover { .must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastToDelete.getForecastId()))); } QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery); - searchRequest.source(new SearchSourceBuilder().query(query)); + request.setQuery(query); return request; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index f59fdddedecd..c882c9011688 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.job.retention; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.Loggers; @@ -17,7 +16,6 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -87,19 +85,16 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { } private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) { - SearchRequest searchRequest = new SearchRequest(); - // We need to create the DeleteByQueryRequest before we modify the SearchRequest - // because the constructor of the former wipes the latter - DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); + DeleteByQueryRequest request = new DeleteByQueryRequest(); request.setSlices(5); - searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); + request.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); QueryBuilder excludeFilter = QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE, ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE); QueryBuilder query = createQuery(job.getId(), cutoffEpochMs) .filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName())) .mustNot(excludeFilter); - searchRequest.source(new SearchSourceBuilder().query(query)); + request.setQuery(query); return request; } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ExpiredTokenRemover.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ExpiredTokenRemover.java index b8ae5c944419..78670dd99f6c 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ExpiredTokenRemover.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ExpiredTokenRemover.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; @@ -50,15 +49,14 @@ final class ExpiredTokenRemover extends AbstractRunnable { @Override public void doRun() { - SearchRequest searchRequest = new SearchRequest(SecurityIndexManager.SECURITY_INDEX_NAME); - DeleteByQueryRequest expiredDbq = new DeleteByQueryRequest(searchRequest); + DeleteByQueryRequest expiredDbq = new DeleteByQueryRequest(SecurityIndexManager.SECURITY_INDEX_NAME); if (timeout != TimeValue.MINUS_ONE) { expiredDbq.setTimeout(timeout); - searchRequest.source().timeout(timeout); + expiredDbq.getSearchRequest().source().timeout(timeout); } final Instant now = Instant.now(); - searchRequest.source() - .query(QueryBuilders.boolQuery() + expiredDbq + .setQuery(QueryBuilders.boolQuery() .filter(QueryBuilders.termsQuery("doc_type", TokenService.INVALIDATED_TOKEN_DOC_TYPE, "token")) .filter(QueryBuilders.boolQuery() .should(QueryBuilders.rangeQuery("expiration_time").lte(now.toEpochMilli()))