HLRC: Add delete by query API (#32782)

Adds the delete-by-query API to the High Level REST Client.
This commit is contained in:
Sohaib Iftikhar 2018-09-04 14:56:26 +02:00 committed by Nik Everett
parent 1457b07a06
commit 761e8c461f
17 changed files with 552 additions and 65 deletions

View file

@ -108,6 +108,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.rankeval.RankEvalRequest;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
@ -866,6 +867,32 @@ final class RequestConverters {
return request;
}
static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException {
String endpoint =
endpoint(deleteByQueryRequest.indices(), deleteByQueryRequest.getDocTypes(), "_delete_by_query");
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new Params(request)
.withRouting(deleteByQueryRequest.getRouting())
.withRefresh(deleteByQueryRequest.isRefresh())
.withTimeout(deleteByQueryRequest.getTimeout())
.withWaitForActiveShards(deleteByQueryRequest.getWaitForActiveShards())
.withIndicesOptions(deleteByQueryRequest.indicesOptions());
if (deleteByQueryRequest.isAbortOnVersionConflict() == false) {
params.putParam("conflicts", "proceed");
}
if (deleteByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) {
params.putParam("scroll_size", Integer.toString(deleteByQueryRequest.getBatchSize()));
}
if (deleteByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) {
params.putParam("scroll", deleteByQueryRequest.getScrollTime());
}
if (deleteByQueryRequest.getSize() > 0) {
params.putParam("size", Integer.toString(deleteByQueryRequest.getSize()));
}
request.setEntity(createEntity(deleteByQueryRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}
static Request rollover(RolloverRequest rolloverRequest) throws IOException {
String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover")
.addPathPart(rolloverRequest.getNewIndexName()).build();

View file

@ -65,6 +65,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.rankeval.RankEvalRequest;
import org.elasticsearch.index.rankeval.RankEvalResponse;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.plugins.spi.NamedXContentProvider;
@ -454,6 +455,35 @@ public class RestHighLevelClient implements Closeable {
);
}
/**
* Executes a delete by query request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
* Delete By Query API on elastic.co</a>
* @param deleteByQueryRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public final BulkByScrollResponse deleteByQuery(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, emptySet()
);
}
/**
* Asynchronously executes a delete by query request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
* Delete By Query API on elastic.co</a>
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public final void deleteByQueryAsync(DeleteByQueryRequest reindexRequest, RequestOptions options,
ActionListener<BulkByScrollResponse> listener) {
performRequestAsyncAndParseEntity(
reindexRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
);
}
/**
* Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized

View file

@ -36,6 +36,7 @@ import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
@ -50,6 +51,7 @@ import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.RestStatus;
@ -758,6 +760,52 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
}
}
public void testDeleteByQuery() throws IOException {
final String sourceIndex = "source1";
{
// Prepare
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(sourceIndex, settings);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
new BulkRequest()
.add(new IndexRequest(sourceIndex, "type", "1")
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
.add(new IndexRequest(sourceIndex, "type", "2")
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
RequestOptions.DEFAULT
).status()
);
}
{
// test1: delete one doc
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
deleteByQueryRequest.indices(sourceIndex);
deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1").types("type"));
deleteByQueryRequest.setRefresh(true);
BulkByScrollResponse bulkResponse =
execute(deleteByQueryRequest, highLevelClient()::deleteByQuery, highLevelClient()::deleteByQueryAsync);
assertEquals(1, bulkResponse.getTotal());
assertEquals(1, bulkResponse.getDeleted());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
assertEquals(1, bulkResponse.getBatches());
assertTrue(bulkResponse.getTook().getMillis() > 0);
assertEquals(1, bulkResponse.getBatches());
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
assertEquals(
1,
highLevelClient().search(new SearchRequest(sourceIndex), RequestOptions.DEFAULT).getHits().totalHits
);
}
}
public void testBulkProcessorIntegration() throws IOException {
int nbItems = randomIntBetween(10, 100);
boolean[] errors = new boolean[nbItems];

View file

@ -127,6 +127,7 @@ import org.elasticsearch.index.rankeval.RankEvalRequest;
import org.elasticsearch.index.rankeval.RankEvalSpec;
import org.elasticsearch.index.rankeval.RatedRequest;
import org.elasticsearch.index.rankeval.RestRankEvalAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.RemoteInfo;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
@ -526,6 +527,53 @@ public class RequestConvertersTests extends ESTestCase {
assertToXContentBody(updateByQueryRequest, request.getEntity());
}
public void testDeleteByQuery() throws IOException {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
deleteByQueryRequest.indices(randomIndicesNames(1, 5));
Map<String, String> expectedParams = new HashMap<>();
if (randomBoolean()) {
deleteByQueryRequest.setDocTypes(generateRandomStringArray(5, 5, false, false));
}
if (randomBoolean()) {
int batchSize = randomInt(100);
deleteByQueryRequest.setBatchSize(batchSize);
expectedParams.put("scroll_size", Integer.toString(batchSize));
}
if (randomBoolean()) {
deleteByQueryRequest.setRouting("=cat");
expectedParams.put("routing", "=cat");
}
if (randomBoolean()) {
int size = randomIntBetween(100, 1000);
deleteByQueryRequest.setSize(size);
expectedParams.put("size", Integer.toString(size));
}
if (randomBoolean()) {
deleteByQueryRequest.setAbortOnVersionConflict(false);
expectedParams.put("conflicts", "proceed");
}
if (randomBoolean()) {
String ts = randomTimeValue();
deleteByQueryRequest.setScroll(TimeValue.parseTimeValue(ts, "scroll"));
expectedParams.put("scroll", ts);
}
if (randomBoolean()) {
deleteByQueryRequest.setQuery(new TermQueryBuilder("foo", "fooval"));
}
setRandomIndicesOptions(deleteByQueryRequest::setIndicesOptions, deleteByQueryRequest::indicesOptions, expectedParams);
setRandomTimeout(deleteByQueryRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams);
Request request = RequestConverters.deleteByQuery(deleteByQueryRequest);
StringJoiner joiner = new StringJoiner("/", "/", "");
joiner.add(String.join(",", deleteByQueryRequest.indices()));
if (deleteByQueryRequest.getDocTypes().length > 0)
joiner.add(String.join(",", deleteByQueryRequest.getDocTypes()));
joiner.add("_delete_by_query");
assertEquals(joiner.toString(), request.getEndpoint());
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals(expectedParams, request.getParameters());
assertToXContentBody(deleteByQueryRequest, request.getEntity());
}
public void testPutMapping() throws IOException {
PutMappingRequest putMappingRequest = new PutMappingRequest();

View file

@ -649,7 +649,6 @@ public class RestHighLevelClientTests extends ESTestCase {
"cluster.remote_info",
"count",
"create",
"delete_by_query",
"exists_source",
"get_source",
"indices.delete_alias",

View file

@ -65,6 +65,7 @@ import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.RemoteInfo;
import org.elasticsearch.index.reindex.ScrollableHitSource;
@ -1020,6 +1021,113 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {
}
}
public void testDeleteByQuery() throws Exception {
RestHighLevelClient client = highLevelClient();
{
String mapping =
"\"doc\": {\n" +
" \"properties\": {\n" +
" \"user\": {\n" +
" \"type\": \"text\"\n" +
" },\n" +
" \"field1\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"field2\": {\n" +
" \"type\": \"integer\"\n" +
" }\n" +
" }\n" +
" }";
createIndex("source1", Settings.EMPTY, mapping);
createIndex("source2", Settings.EMPTY, mapping);
}
{
// tag::delete-by-query-request
DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2"); // <1>
// end::delete-by-query-request
// tag::delete-by-query-request-conflicts
request.setConflicts("proceed"); // <1>
// end::delete-by-query-request-conflicts
// tag::delete-by-query-request-typeOrQuery
request.setDocTypes("doc"); // <1>
request.setQuery(new TermQueryBuilder("user", "kimchy")); // <2>
// end::delete-by-query-request-typeOrQuery
// tag::delete-by-query-request-size
request.setSize(10); // <1>
// end::delete-by-query-request-size
// tag::delete-by-query-request-scrollSize
request.setBatchSize(100); // <1>
// end::delete-by-query-request-scrollSize
// tag::delete-by-query-request-timeout
request.setTimeout(TimeValue.timeValueMinutes(2)); // <1>
// end::delete-by-query-request-timeout
// tag::delete-by-query-request-refresh
request.setRefresh(true); // <1>
// end::delete-by-query-request-refresh
// tag::delete-by-query-request-slices
request.setSlices(2); // <1>
// end::delete-by-query-request-slices
// tag::delete-by-query-request-scroll
request.setScroll(TimeValue.timeValueMinutes(10)); // <1>
// end::delete-by-query-request-scroll
// tag::delete-by-query-request-routing
request.setRouting("=cat"); // <1>
// end::delete-by-query-request-routing
// tag::delete-by-query-request-indicesOptions
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); // <1>
// end::delete-by-query-request-indicesOptions
// tag::delete-by-query-execute
BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);
// end::delete-by-query-execute
assertSame(0, bulkResponse.getSearchFailures().size());
assertSame(0, bulkResponse.getBulkFailures().size());
// tag::delete-by-query-response
TimeValue timeTaken = bulkResponse.getTook(); // <1>
boolean timedOut = bulkResponse.isTimedOut(); // <2>
long totalDocs = bulkResponse.getTotal(); // <3>
long deletedDocs = bulkResponse.getDeleted(); // <4>
long batches = bulkResponse.getBatches(); // <5>
long noops = bulkResponse.getNoops(); // <6>
long versionConflicts = bulkResponse.getVersionConflicts(); // <7>
long bulkRetries = bulkResponse.getBulkRetries(); // <8>
long searchRetries = bulkResponse.getSearchRetries(); // <9>
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); // <10>
TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil(); // <11>
List<ScrollableHitSource.SearchFailure> searchFailures = bulkResponse.getSearchFailures(); // <12>
List<BulkItemResponse.Failure> bulkFailures = bulkResponse.getBulkFailures(); // <13>
// end::delete-by-query-response
}
{
DeleteByQueryRequest request = new DeleteByQueryRequest();
request.indices("source1");
// tag::delete-by-query-execute-listener
ActionListener<BulkByScrollResponse> listener = new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkResponse) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::delete-by-query-execute-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);
// tag::delete-by-query-execute-async
client.deleteByQueryAsync(request, RequestOptions.DEFAULT, listener); // <1>
// end::delete-by-query-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
public void testGet() throws Exception {
RestHighLevelClient client = highLevelClient();
{

View file

@ -0,0 +1,163 @@
[[java-rest-high-document-delete-by-query]]
=== Delete By Query API
[[java-rest-high-document-delete-by-query-request]]
==== Delete By Query Request
A `DeleteByQueryRequest` can be used to delete documents from an index. It requires an existing index (or a set of indices)
on which deletion is to be performed.
The simplest form of a `DeleteByQueryRequest` looks like:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request]
--------------------------------------------------
<1> Creates the `DeleteByQueryRequest` on a set of indices.
By default version conflicts abort the `DeleteByQueryRequest` process but you can just count them by settings it to
`proceed` in the request body
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-conflicts]
--------------------------------------------------
<1> Set `proceed` on version conflict
You can limit the documents by adding a type to the source or by adding a query.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-typeOrQuery]
--------------------------------------------------
<1> Only copy `doc` type
<2> Only copy documents which have field `user` set to `kimchy`
Its also possible to limit the number of processed documents by setting size.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-size]
--------------------------------------------------
<1> Only copy 10 documents
By default `DeleteByQueryRequest` uses batches of 1000. You can change the batch size with `setBatchSize`.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-scrollSize]
--------------------------------------------------
<1> Use batches of 100 documents
`DeleteByQueryRequest` also helps in automatically parallelizing using `sliced-scroll` to
slice on `_uid`. Use `setSlices` to specify the number of slices to use.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-slices]
--------------------------------------------------
<1> set number of slices to use
`DeleteByQueryRequest` uses the `scroll` parameter to control how long it keeps the "search context" alive.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-scroll]
--------------------------------------------------
<1> set scroll time
If you provide routing then the routing is copied to the scroll query, limiting the process to the shards that match
that routing value.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-routing]
--------------------------------------------------
<1> set routing
==== Optional arguments
In addition to the options above the following arguments can optionally be also provided:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-timeout]
--------------------------------------------------
<1> Timeout to wait for the delete by query request to be performed as a `TimeValue`
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-refresh]
--------------------------------------------------
<1> Refresh index after calling delete by query
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-indicesOptions]
--------------------------------------------------
<1> Set indices options
[[java-rest-high-document-delete-by-query-sync]]
==== Synchronous Execution
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-execute]
--------------------------------------------------
[[java-rest-high-document-delete-by-query-async]]
==== Asynchronous Execution
The asynchronous execution of an delete by query request requires both the `DeleteByQueryRequest`
instance and an `ActionListener` instance to be passed to the asynchronous
method:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-execute-async]
--------------------------------------------------
<1> The `DeleteByQueryRequest` to execute and the `ActionListener` to use when
the execution completes
The asynchronous method does not block and returns immediately. Once it is
completed the `ActionListener` is called back using the `onResponse` method
if the execution successfully completed or using the `onFailure` method if
it failed.
A typical listener for `BulkByScrollResponse` looks like:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-execute-listener]
--------------------------------------------------
<1> Called when the execution is successfully completed. The response is
provided as an argument and contains a list of individual results for each
operation that was executed. Note that one or more operations might have
failed while the others have been successfully executed.
<2> Called when the whole `DeleteByQueryRequest` fails. In this case the raised
exception is provided as an argument and no operation has been executed.
[[java-rest-high-document-delete-by-query-execute-listener-response]]
==== Delete By Query Response
The returned `BulkByScrollResponse` contains information about the executed operations and
allows to iterate over each result as follows:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-response]
--------------------------------------------------
<1> Get total time taken
<2> Check if the request timed out
<3> Get total number of docs processed
<4> Number of docs that were deleted
<5> Number of batches that were executed
<6> Number of skipped docs
<7> Number of version conflicts
<8> Number of times request had to retry bulk index operations
<9> Number of times request had to retry search operations
<10> The total time this request has throttled itself not including the current throttle time if it is currently sleeping
<11> Remaining delay of any current throttle sleep or 0 if not sleeping
<12> Failures during search phase
<13> Failures during bulk index operation

View file

@ -17,6 +17,7 @@ Multi-document APIs::
* <<java-rest-high-document-multi-get>>
* <<java-rest-high-document-reindex>>
* <<java-rest-high-document-update-by-query>>
* <<java-rest-high-document-delete-by-query>>
include::document/index.asciidoc[]
include::document/get.asciidoc[]
@ -27,6 +28,7 @@ include::document/bulk.asciidoc[]
include::document/multi-get.asciidoc[]
include::document/reindex.asciidoc[]
include::document/update-by-query.asciidoc[]
include::document/delete-by-query.asciidoc[]
== Search APIs

View file

@ -19,7 +19,6 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestController;
@ -56,7 +55,7 @@ public class RestDeleteByQueryAction extends AbstractBulkByQueryRestHandler<Dele
* it to set its own defaults which differ from SearchRequest's
* defaults. Then the parseInternalRequest can override them.
*/
DeleteByQueryRequest internal = new DeleteByQueryRequest(new SearchRequest());
DeleteByQueryRequest internal = new DeleteByQueryRequest();
Map<String, Consumer<Object>> consumers = new HashMap<>();
consumers.put("conflicts", o -> internal.setConflicts((String) o));

View file

@ -20,7 +20,6 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -103,7 +102,7 @@ public class RoundTripTests extends ESTestCase {
}
public void testDeleteByQueryRequest() throws IOException {
DeleteByQueryRequest delete = new DeleteByQueryRequest(new SearchRequest());
DeleteByQueryRequest delete = new DeleteByQueryRequest();
randomRequest(delete);
DeleteByQueryRequest tripped = new DeleteByQueryRequest(toInputByteStream(delete));
assertRequestEquals(delete, tripped);

View file

@ -24,6 +24,9 @@ import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
@ -47,12 +50,18 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
* <li>it's results won't be visible until the index is refreshed.</li>
* </ul>
*/
public class DeleteByQueryRequest extends AbstractBulkByScrollRequest<DeleteByQueryRequest> implements IndicesRequest.Replaceable {
public class DeleteByQueryRequest extends AbstractBulkByScrollRequest<DeleteByQueryRequest>
implements IndicesRequest.Replaceable, ToXContentObject {
public DeleteByQueryRequest() {
this(new SearchRequest());
}
public DeleteByQueryRequest(SearchRequest search) {
public DeleteByQueryRequest(String... indices) {
this(new SearchRequest(indices));
}
DeleteByQueryRequest(SearchRequest search) {
this(search, true);
}
@ -68,6 +77,78 @@ public class DeleteByQueryRequest extends AbstractBulkByScrollRequest<DeleteByQu
}
}
/**
* Set the query for selective delete
*/
public DeleteByQueryRequest setQuery(QueryBuilder query) {
if (query != null) {
getSearchRequest().source().query(query);
}
return this;
}
/**
* Set the document types for the delete
*/
public DeleteByQueryRequest setDocTypes(String... types) {
if (types != null) {
getSearchRequest().types(types);
}
return this;
}
/**
* Set routing limiting the process to the shards that match that routing value
*/
public DeleteByQueryRequest setRouting(String routing) {
if (routing != null) {
getSearchRequest().routing(routing);
}
return this;
}
/**
* The scroll size to control number of documents processed per batch
*/
public DeleteByQueryRequest setBatchSize(int size) {
getSearchRequest().source().size(size);
return this;
}
/**
* Set the IndicesOptions for controlling unavailable indices
*/
public DeleteByQueryRequest setIndicesOptions(IndicesOptions indicesOptions) {
getSearchRequest().indicesOptions(indicesOptions);
return this;
}
/**
* Gets the batch size for this request
*/
public int getBatchSize() {
return getSearchRequest().source().size();
}
/**
* Gets the routing value used for this request
*/
public String getRouting() {
return getSearchRequest().routing();
}
/**
* Gets the document types on which this request would be executed. Returns an empty array if all
* types are to be processed.
*/
public String[] getDocTypes() {
if (getSearchRequest().types() != null) {
return getSearchRequest().types();
} else {
return new String[0];
}
}
@Override
protected DeleteByQueryRequest self() {
return this;
@ -132,4 +213,11 @@ public class DeleteByQueryRequest extends AbstractBulkByScrollRequest<DeleteByQu
return this;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
getSearchRequest().source().innerToXContent(builder, params);
builder.endObject();
return builder;
}
}

View file

@ -11,7 +11,6 @@ import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
@ -22,7 +21,6 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.core.ml.job.results.Result;
@ -129,8 +127,8 @@ public class JobDataDeleter {
QueryBuilder query = QueryBuilders.boolQuery()
.filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName()))
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
deleteByQueryHolder.searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
deleteByQueryHolder.searchRequest.source(new SearchSourceBuilder().query(query));
deleteByQueryHolder.dbqRequest.setIndicesOptions(IndicesOptions.lenientExpandOpen());
deleteByQueryHolder.dbqRequest.setQuery(query);
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest,
ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure));
}
@ -142,9 +140,9 @@ public class JobDataDeleter {
DeleteByQueryHolder deleteByQueryHolder = new DeleteByQueryHolder(AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
deleteByQueryHolder.dbqRequest.setRefresh(false);
deleteByQueryHolder.searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
deleteByQueryHolder.dbqRequest.setIndicesOptions(IndicesOptions.lenientExpandOpen());
QueryBuilder qb = QueryBuilders.termQuery(Result.IS_INTERIM.getPreferredName(), true);
deleteByQueryHolder.searchRequest.source(new SearchSourceBuilder().query(new ConstantScoreQueryBuilder(qb)));
deleteByQueryHolder.dbqRequest.setQuery(new ConstantScoreQueryBuilder(qb));
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) {
client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest).get();
@ -156,13 +154,11 @@ public class JobDataDeleter {
// Wrapper to ensure safety
private static class DeleteByQueryHolder {
private final SearchRequest searchRequest;
private final DeleteByQueryRequest dbqRequest;
private DeleteByQueryHolder(String index) {
// The search request has to be constructed and passed to the DeleteByQueryRequest before more details are set to it
searchRequest = new SearchRequest(index);
dbqRequest = new DeleteByQueryRequest(searchRequest);
dbqRequest = new DeleteByQueryRequest();
dbqRequest.indices(index);
dbqRequest.setSlices(5);
dbqRequest.setAbortOnVersionConflict(false);
}

View file

@ -13,7 +13,6 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
@ -28,7 +27,6 @@ import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
@ -95,12 +93,11 @@ public class JobStorageDeletionTask extends Task {
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
response -> {
logger.info("Running DBQ on [" + indexName + "," + indexPattern + "] for job [" + jobId + "]");
SearchRequest searchRequest = new SearchRequest(indexName, indexPattern);
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
DeleteByQueryRequest request = new DeleteByQueryRequest(indexName, indexPattern);
ConstantScoreQueryBuilder query =
new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
searchRequest.source(new SearchSourceBuilder().query(query));
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
request.setQuery(query);
request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
request.setSlices(5);
request.setAbortOnVersionConflict(false);
request.setRefresh(true);
@ -125,14 +122,13 @@ public class JobStorageDeletionTask extends Task {
private void deleteQuantiles(String jobId, Client client, ActionListener<Boolean> finishedHandler) {
// The quantiles type and doc ID changed in v5.5 so delete both the old and new format
SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexName());
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName());
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId),
// TODO: remove in 7.0
Quantiles.v54DocumentId(jobId));
searchRequest.source(new SearchSourceBuilder().query(query));
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
request.setQuery(query);
request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
request.setAbortOnVersionConflict(false);
request.setRefresh(true);
@ -162,14 +158,13 @@ public class JobStorageDeletionTask extends Task {
private void deleteCategorizerState(String jobId, Client client, int docNum, ActionListener<Boolean> finishedHandler) {
// The categorizer state type and doc ID changed in v5.5 so delete both the old and new format
SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexName());
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName());
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum),
// TODO: remove in 7.0
CategorizerState.v54DocumentId(jobId, docNum));
searchRequest.source(new SearchSourceBuilder().query(query));
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
request.setQuery(query);
request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
request.setAbortOnVersionConflict(false);
request.setRefresh(true);

View file

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@ -18,7 +17,6 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
@ -76,15 +74,12 @@ public class TransportDeleteCalendarAction extends HandledTransportAction<Delete
}
private DeleteByQueryRequest buildDeleteByQuery(String calendarId) {
SearchRequest searchRequest = new SearchRequest(MlMetaIndex.INDEX_NAME);
// The DBQ request constructor wipes the search request source
// so it has to be set after
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
DeleteByQueryRequest request = new DeleteByQueryRequest(MlMetaIndex.INDEX_NAME);
request.setSlices(5);
request.setRefresh(true);
QueryBuilder query = QueryBuilders.termsQuery(Calendar.ID.getPreferredName(), calendarId);
searchRequest.source(new SearchSourceBuilder().query(query));
request.setQuery(query);
return request;
}
}

View file

@ -141,13 +141,10 @@ public class ExpiredForecastsRemover implements MlDataRemover {
}
private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forecastsToDelete) {
SearchRequest searchRequest = new SearchRequest();
// We need to create the DeleteByQueryRequest before we modify the SearchRequest
// because the constructor of the former wipes the latter
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
DeleteByQueryRequest request = new DeleteByQueryRequest();
request.setSlices(5);
searchRequest.indices(RESULTS_INDEX_PATTERN);
request.indices(RESULTS_INDEX_PATTERN);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1);
boolQuery.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE));
@ -157,7 +154,7 @@ public class ExpiredForecastsRemover implements MlDataRemover {
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastToDelete.getForecastId())));
}
QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery);
searchRequest.source(new SearchSourceBuilder().query(query));
request.setQuery(query);
return request;
}
}

View file

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.job.retention;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
@ -17,7 +16,6 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@ -87,19 +85,16 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
}
private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) {
SearchRequest searchRequest = new SearchRequest();
// We need to create the DeleteByQueryRequest before we modify the SearchRequest
// because the constructor of the former wipes the latter
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);
DeleteByQueryRequest request = new DeleteByQueryRequest();
request.setSlices(5);
searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
request.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
QueryBuilder excludeFilter = QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
ModelSizeStats.RESULT_TYPE_VALUE, ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE);
QueryBuilder query = createQuery(job.getId(), cutoffEpochMs)
.filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName()))
.mustNot(excludeFilter);
searchRequest.source(new SearchSourceBuilder().query(query));
request.setQuery(query);
return request;
}

View file

@ -9,7 +9,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
@ -50,15 +49,14 @@ final class ExpiredTokenRemover extends AbstractRunnable {
@Override
public void doRun() {
SearchRequest searchRequest = new SearchRequest(SecurityIndexManager.SECURITY_INDEX_NAME);
DeleteByQueryRequest expiredDbq = new DeleteByQueryRequest(searchRequest);
DeleteByQueryRequest expiredDbq = new DeleteByQueryRequest(SecurityIndexManager.SECURITY_INDEX_NAME);
if (timeout != TimeValue.MINUS_ONE) {
expiredDbq.setTimeout(timeout);
searchRequest.source().timeout(timeout);
expiredDbq.getSearchRequest().source().timeout(timeout);
}
final Instant now = Instant.now();
searchRequest.source()
.query(QueryBuilders.boolQuery()
expiredDbq
.setQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termsQuery("doc_type", TokenService.INVALIDATED_TOKEN_DOC_TYPE, "token"))
.filter(QueryBuilders.boolQuery()
.should(QueryBuilders.rangeQuery("expiration_time").lte(now.toEpochMilli()))