mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-25 07:37:19 -04:00
HLRC: Add support for reindex rethrottling (#33832)
This change adds support for rethrottling reindex requests to the RestHighLevelClient.
This commit is contained in:
parent
b33c18d316
commit
77145bb477
9 changed files with 350 additions and 17 deletions
|
@ -530,6 +530,17 @@ final class RequestConverters {
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static Request rethrottle(RethrottleRequest rethrottleRequest) throws IOException {
|
||||||
|
String endpoint = new EndpointBuilder().addPathPart("_reindex").addPathPart(rethrottleRequest.getTaskId().toString())
|
||||||
|
.addPathPart("_rethrottle").build();
|
||||||
|
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
|
||||||
|
Params params = new Params(request)
|
||||||
|
.withRequestsPerSecond(rethrottleRequest.getRequestsPerSecond());
|
||||||
|
// we set "group_by" to "none" because this is the response format we can parse back
|
||||||
|
params.putParam("group_by", "none");
|
||||||
|
return request;
|
||||||
|
}
|
||||||
|
|
||||||
static Request putScript(PutStoredScriptRequest putStoredScriptRequest) throws IOException {
|
static Request putScript(PutStoredScriptRequest putStoredScriptRequest) throws IOException {
|
||||||
String endpoint = new EndpointBuilder().addPathPartAsIs("_scripts").addPathPart(putStoredScriptRequest.id()).build();
|
String endpoint = new EndpointBuilder().addPathPartAsIs("_scripts").addPathPart(putStoredScriptRequest.id()).build();
|
||||||
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
|
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
|
||||||
|
@ -719,11 +730,11 @@ final class RequestConverters {
|
||||||
|
|
||||||
Params withRequestsPerSecond(float requestsPerSecond) {
|
Params withRequestsPerSecond(float requestsPerSecond) {
|
||||||
// the default in AbstractBulkByScrollRequest is Float.POSITIVE_INFINITY,
|
// the default in AbstractBulkByScrollRequest is Float.POSITIVE_INFINITY,
|
||||||
// but we don't want to add that to the URL parameters, instead we leave it out
|
// but we don't want to add that to the URL parameters, instead we use -1
|
||||||
if (Float.isFinite(requestsPerSecond)) {
|
if (Float.isFinite(requestsPerSecond)) {
|
||||||
return putParam("requests_per_second", Float.toString(requestsPerSecond));
|
return putParam(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, Float.toString(requestsPerSecond));
|
||||||
} else {
|
} else {
|
||||||
return putParam("requests_per_second", "-1");
|
return putParam(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, "-1");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.ElasticsearchStatusException;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.ActionRequest;
|
import org.elasticsearch.action.ActionRequest;
|
||||||
import org.elasticsearch.action.ActionRequestValidationException;
|
import org.elasticsearch.action.ActionRequestValidationException;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest;
|
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequest;
|
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
|
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
|
||||||
|
@ -474,13 +475,14 @@ public class RestHighLevelClient implements Closeable {
|
||||||
* Asynchronously executes an update by query request.
|
* Asynchronously executes an update by query request.
|
||||||
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
|
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
|
||||||
* Update By Query API on elastic.co</a>
|
* Update By Query API on elastic.co</a>
|
||||||
|
* @param updateByQueryRequest the request
|
||||||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
||||||
* @param listener the listener to be notified upon request completion
|
* @param listener the listener to be notified upon request completion
|
||||||
*/
|
*/
|
||||||
public final void updateByQueryAsync(UpdateByQueryRequest reindexRequest, RequestOptions options,
|
public final void updateByQueryAsync(UpdateByQueryRequest updateByQueryRequest, RequestOptions options,
|
||||||
ActionListener<BulkByScrollResponse> listener) {
|
ActionListener<BulkByScrollResponse> listener) {
|
||||||
performRequestAsyncAndParseEntity(
|
performRequestAsyncAndParseEntity(
|
||||||
reindexRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
|
updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -503,16 +505,45 @@ public class RestHighLevelClient implements Closeable {
|
||||||
* Asynchronously executes a delete by query request.
|
* Asynchronously executes a delete by query request.
|
||||||
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
|
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
|
||||||
* Delete By Query API on elastic.co</a>
|
* Delete By Query API on elastic.co</a>
|
||||||
|
* @param deleteByQueryRequest the request
|
||||||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
||||||
* @param listener the listener to be notified upon request completion
|
* @param listener the listener to be notified upon request completion
|
||||||
*/
|
*/
|
||||||
public final void deleteByQueryAsync(DeleteByQueryRequest reindexRequest, RequestOptions options,
|
public final void deleteByQueryAsync(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options,
|
||||||
ActionListener<BulkByScrollResponse> listener) {
|
ActionListener<BulkByScrollResponse> listener) {
|
||||||
performRequestAsyncAndParseEntity(
|
performRequestAsyncAndParseEntity(
|
||||||
reindexRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
|
deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executes a reindex rethrottling request.
|
||||||
|
* See the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex-rethrottle">
|
||||||
|
* Reindex rethrottling API on elastic.co</a>
|
||||||
|
* @param rethrottleRequest the request
|
||||||
|
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
||||||
|
* @return the response
|
||||||
|
* @throws IOException in case there is a problem sending the request or parsing back the response
|
||||||
|
*/
|
||||||
|
public final ListTasksResponse reindexRethrottle(RethrottleRequest rethrottleRequest, RequestOptions options) throws IOException {
|
||||||
|
return performRequestAndParseEntity(rethrottleRequest, RequestConverters::rethrottle, options, ListTasksResponse::fromXContent,
|
||||||
|
emptySet());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executes a reindex rethrottling request.
|
||||||
|
* See the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex-rethrottle">
|
||||||
|
* Reindex rethrottling API on elastic.co</a>
|
||||||
|
* @param rethrottleRequest the request
|
||||||
|
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
||||||
|
* @param listener the listener to be notified upon request completion
|
||||||
|
*/
|
||||||
|
public final void reindexRethrottleAsync(RethrottleRequest rethrottleRequest, RequestOptions options,
|
||||||
|
ActionListener<ListTasksResponse> listener) {
|
||||||
|
performRequestAsyncAndParseEntity(rethrottleRequest, RequestConverters::rethrottle, options, ListTasksResponse::fromXContent,
|
||||||
|
listener, emptySet());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise
|
* Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise
|
||||||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
||||||
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.client;
|
||||||
|
|
||||||
|
import org.elasticsearch.tasks.TaskId;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A request changing throttling of a task.
|
||||||
|
*/
|
||||||
|
public class RethrottleRequest implements Validatable {
|
||||||
|
|
||||||
|
static final String REQUEST_PER_SECOND_PARAMETER = "requests_per_second";
|
||||||
|
|
||||||
|
private final TaskId taskId;
|
||||||
|
private final float requestsPerSecond;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new {@link RethrottleRequest} which disables any throttling for the given taskId.
|
||||||
|
* @param taskId the task for which throttling will be disabled
|
||||||
|
*/
|
||||||
|
public RethrottleRequest(TaskId taskId) {
|
||||||
|
this.taskId = taskId;
|
||||||
|
this.requestsPerSecond = Float.POSITIVE_INFINITY;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new {@link RethrottleRequest} which changes the throttling for the given taskId.
|
||||||
|
* @param taskId the task that throttling changes will be applied to
|
||||||
|
* @param requestsPerSecond the number of requests per second that the task should perform. This needs to be a positive value.
|
||||||
|
*/
|
||||||
|
public RethrottleRequest(TaskId taskId, float requestsPerSecond) {
|
||||||
|
Objects.requireNonNull(taskId, "taskId cannot be null");
|
||||||
|
if (requestsPerSecond <= 0) {
|
||||||
|
throw new IllegalArgumentException("requestsPerSecond needs to be positive value but was [" + requestsPerSecond+"]");
|
||||||
|
}
|
||||||
|
this.taskId = taskId;
|
||||||
|
this.requestsPerSecond = requestsPerSecond;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the task Id
|
||||||
|
*/
|
||||||
|
public TaskId getTaskId() {
|
||||||
|
return taskId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the requests per seconds value
|
||||||
|
*/
|
||||||
|
public float getRequestsPerSecond() {
|
||||||
|
return requestsPerSecond;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "RethrottleRequest: taskID = " + taskId +"; reqestsPerSecond = " + requestsPerSecond;
|
||||||
|
}
|
||||||
|
}
|
|
@ -21,8 +21,12 @@ package org.elasticsearch.client;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.ElasticsearchStatusException;
|
import org.elasticsearch.ElasticsearchStatusException;
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.DocWriteRequest;
|
import org.elasticsearch.action.DocWriteRequest;
|
||||||
import org.elasticsearch.action.DocWriteResponse;
|
import org.elasticsearch.action.DocWriteResponse;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
|
||||||
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||||
import org.elasticsearch.action.bulk.BulkProcessor;
|
import org.elasticsearch.action.bulk.BulkProcessor;
|
||||||
|
@ -52,12 +56,15 @@ import org.elasticsearch.index.get.GetResult;
|
||||||
import org.elasticsearch.index.query.IdsQueryBuilder;
|
import org.elasticsearch.index.query.IdsQueryBuilder;
|
||||||
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
||||||
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
||||||
|
import org.elasticsearch.index.reindex.ReindexAction;
|
||||||
import org.elasticsearch.index.reindex.ReindexRequest;
|
import org.elasticsearch.index.reindex.ReindexRequest;
|
||||||
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
|
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.script.Script;
|
import org.elasticsearch.script.Script;
|
||||||
import org.elasticsearch.script.ScriptType;
|
import org.elasticsearch.script.ScriptType;
|
||||||
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
||||||
|
import org.elasticsearch.tasks.RawTaskStatus;
|
||||||
|
import org.elasticsearch.tasks.TaskId;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.DateTimeZone;
|
import org.joda.time.DateTimeZone;
|
||||||
import org.joda.time.format.DateTimeFormat;
|
import org.joda.time.format.DateTimeFormat;
|
||||||
|
@ -65,9 +72,15 @@ import org.joda.time.format.DateTimeFormat;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static java.util.Collections.singletonMap;
|
import static java.util.Collections.singletonMap;
|
||||||
|
import static org.hamcrest.Matchers.empty;
|
||||||
|
import static org.hamcrest.Matchers.hasSize;
|
||||||
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
|
import static org.hamcrest.Matchers.lessThan;
|
||||||
|
|
||||||
public class CrudIT extends ESRestHighLevelClientTestCase {
|
public class CrudIT extends ESRestHighLevelClientTestCase {
|
||||||
|
|
||||||
|
@ -631,7 +644,7 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
|
||||||
validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);
|
validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testReindex() throws IOException {
|
public void testReindex() throws Exception {
|
||||||
final String sourceIndex = "source1";
|
final String sourceIndex = "source1";
|
||||||
final String destinationIndex = "dest";
|
final String destinationIndex = "dest";
|
||||||
{
|
{
|
||||||
|
@ -642,15 +655,14 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
|
||||||
.build();
|
.build();
|
||||||
createIndex(sourceIndex, settings);
|
createIndex(sourceIndex, settings);
|
||||||
createIndex(destinationIndex, settings);
|
createIndex(destinationIndex, settings);
|
||||||
|
BulkRequest bulkRequest = new BulkRequest()
|
||||||
|
.add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
|
||||||
|
.add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
|
||||||
|
.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
|
||||||
assertEquals(
|
assertEquals(
|
||||||
RestStatus.OK,
|
RestStatus.OK,
|
||||||
highLevelClient().bulk(
|
highLevelClient().bulk(
|
||||||
new BulkRequest()
|
bulkRequest,
|
||||||
.add(new IndexRequest(sourceIndex, "type", "1")
|
|
||||||
.source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
|
|
||||||
.add(new IndexRequest(sourceIndex, "type", "2")
|
|
||||||
.source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
|
|
||||||
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
|
|
||||||
RequestOptions.DEFAULT
|
RequestOptions.DEFAULT
|
||||||
).status()
|
).status()
|
||||||
);
|
);
|
||||||
|
@ -692,6 +704,72 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
|
||||||
assertEquals(0, bulkResponse.getBulkFailures().size());
|
assertEquals(0, bulkResponse.getBulkFailures().size());
|
||||||
assertEquals(0, bulkResponse.getSearchFailures().size());
|
assertEquals(0, bulkResponse.getSearchFailures().size());
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
// test reindex rethrottling
|
||||||
|
ReindexRequest reindexRequest = new ReindexRequest();
|
||||||
|
reindexRequest.setSourceIndices(sourceIndex);
|
||||||
|
reindexRequest.setDestIndex(destinationIndex);
|
||||||
|
|
||||||
|
// this following settings are supposed to halt reindexing after first document
|
||||||
|
reindexRequest.setSourceBatchSize(1);
|
||||||
|
reindexRequest.setRequestsPerSecond(0.00001f);
|
||||||
|
final CountDownLatch reindexTaskFinished = new CountDownLatch(1);
|
||||||
|
highLevelClient().reindexAsync(reindexRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onResponse(BulkByScrollResponse response) {
|
||||||
|
reindexTaskFinished.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
fail(e.toString());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
TaskGroup taskGroupToRethrottle = findTaskToRethrottle();
|
||||||
|
assertThat(taskGroupToRethrottle.getChildTasks(), empty());
|
||||||
|
TaskId taskIdToRethrottle = taskGroupToRethrottle.getTaskInfo().getTaskId();
|
||||||
|
|
||||||
|
float requestsPerSecond = 1000f;
|
||||||
|
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
|
||||||
|
highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync);
|
||||||
|
assertThat(response.getTasks(), hasSize(1));
|
||||||
|
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
|
||||||
|
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
|
||||||
|
assertEquals(Float.toString(requestsPerSecond),
|
||||||
|
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
|
||||||
|
reindexTaskFinished.await(2, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
// any rethrottling after the reindex is done performed with the same taskId should result in a failure
|
||||||
|
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
|
||||||
|
highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync);
|
||||||
|
assertTrue(response.getTasks().isEmpty());
|
||||||
|
assertFalse(response.getNodeFailures().isEmpty());
|
||||||
|
assertEquals(1, response.getNodeFailures().size());
|
||||||
|
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
|
||||||
|
response.getNodeFailures().get(0).getCause().getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private TaskGroup findTaskToRethrottle() throws IOException {
|
||||||
|
long start = System.nanoTime();
|
||||||
|
ListTasksRequest request = new ListTasksRequest();
|
||||||
|
request.setActions(ReindexAction.NAME);
|
||||||
|
request.setDetailed(true);
|
||||||
|
do {
|
||||||
|
ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
|
||||||
|
list.rethrowFailures("Finding tasks to rethrottle");
|
||||||
|
assertThat("tasks are left over from the last execution of this test",
|
||||||
|
list.getTaskGroups(), hasSize(lessThan(2)));
|
||||||
|
if (0 == list.getTaskGroups().size()) {
|
||||||
|
// The parent task hasn't started yet
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
return list.getTaskGroups().get(0);
|
||||||
|
} while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
|
||||||
|
throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " +
|
||||||
|
highLevelClient().tasks().list(request, RequestOptions.DEFAULT));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testUpdateByQuery() throws IOException {
|
public void testUpdateByQuery() throws IOException {
|
||||||
|
|
|
@ -95,6 +95,7 @@ import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
|
||||||
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
|
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
|
||||||
import org.elasticsearch.search.suggest.SuggestBuilder;
|
import org.elasticsearch.search.suggest.SuggestBuilder;
|
||||||
import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder;
|
import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder;
|
||||||
|
import org.elasticsearch.tasks.TaskId;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.test.RandomObjects;
|
import org.elasticsearch.test.RandomObjects;
|
||||||
|
|
||||||
|
@ -319,10 +320,10 @@ public class RequestConvertersTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
float requestsPerSecond = (float) randomDoubleBetween(0.0, 10.0, false);
|
float requestsPerSecond = (float) randomDoubleBetween(0.0, 10.0, false);
|
||||||
expectedParams.put("requests_per_second", Float.toString(requestsPerSecond));
|
expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, Float.toString(requestsPerSecond));
|
||||||
reindexRequest.setRequestsPerSecond(requestsPerSecond);
|
reindexRequest.setRequestsPerSecond(requestsPerSecond);
|
||||||
} else {
|
} else {
|
||||||
expectedParams.put("requests_per_second", "-1");
|
expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, "-1");
|
||||||
}
|
}
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
reindexRequest.setDestRouting("=cat");
|
reindexRequest.setDestRouting("=cat");
|
||||||
|
@ -465,6 +466,34 @@ public class RequestConvertersTests extends ESTestCase {
|
||||||
assertToXContentBody(deleteByQueryRequest, request.getEntity());
|
assertToXContentBody(deleteByQueryRequest, request.getEntity());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testRethrottle() throws IOException {
|
||||||
|
TaskId taskId = new TaskId(randomAlphaOfLength(10), randomIntBetween(1, 100));
|
||||||
|
RethrottleRequest rethrottleRequest;
|
||||||
|
Float requestsPerSecond;
|
||||||
|
Map<String, String> expectedParams = new HashMap<>();
|
||||||
|
if (frequently()) {
|
||||||
|
requestsPerSecond = (float) randomDoubleBetween(0.0, 100.0, true);
|
||||||
|
rethrottleRequest = new RethrottleRequest(taskId, requestsPerSecond);
|
||||||
|
expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, Float.toString(requestsPerSecond));
|
||||||
|
} else {
|
||||||
|
rethrottleRequest = new RethrottleRequest(taskId);
|
||||||
|
expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, "-1");
|
||||||
|
}
|
||||||
|
expectedParams.put("group_by", "none");
|
||||||
|
Request request = RequestConverters.rethrottle(rethrottleRequest);
|
||||||
|
assertEquals("/_reindex/" + taskId + "/_rethrottle", request.getEndpoint());
|
||||||
|
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
|
||||||
|
assertEquals(expectedParams, request.getParameters());
|
||||||
|
assertNull(request.getEntity());
|
||||||
|
|
||||||
|
// test illegal RethrottleRequest values
|
||||||
|
Exception e = expectThrows(NullPointerException.class, () -> new RethrottleRequest(null, 1.0f));
|
||||||
|
assertEquals("taskId cannot be null", e.getMessage());
|
||||||
|
|
||||||
|
e = expectThrows(IllegalArgumentException.class, () -> new RethrottleRequest(new TaskId("taskId", 1), -5.0f));
|
||||||
|
assertEquals("requestsPerSecond needs to be positive value but was [-5.0]", e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
public void testIndex() throws IOException {
|
public void testIndex() throws IOException {
|
||||||
String index = randomAlphaOfLengthBetween(3, 10);
|
String index = randomAlphaOfLengthBetween(3, 10);
|
||||||
String type = randomAlphaOfLengthBetween(3, 10);
|
String type = randomAlphaOfLengthBetween(3, 10);
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.elasticsearch.client;
|
package org.elasticsearch.client;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonParseException;
|
import com.fasterxml.jackson.core.JsonParseException;
|
||||||
|
|
||||||
import org.apache.http.HttpEntity;
|
import org.apache.http.HttpEntity;
|
||||||
import org.apache.http.HttpHost;
|
import org.apache.http.HttpHost;
|
||||||
import org.apache.http.HttpResponse;
|
import org.apache.http.HttpResponse;
|
||||||
|
@ -658,7 +659,6 @@ public class RestHighLevelClientTests extends ESTestCase {
|
||||||
"indices.get_upgrade",
|
"indices.get_upgrade",
|
||||||
"indices.put_alias",
|
"indices.put_alias",
|
||||||
"mtermvectors",
|
"mtermvectors",
|
||||||
"reindex_rethrottle",
|
|
||||||
"render_search_template",
|
"render_search_template",
|
||||||
"scripts_painless_execute",
|
"scripts_painless_execute",
|
||||||
"tasks.get",
|
"tasks.get",
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.DocWriteRequest;
|
import org.elasticsearch.action.DocWriteRequest;
|
||||||
import org.elasticsearch.action.DocWriteResponse;
|
import org.elasticsearch.action.DocWriteResponse;
|
||||||
import org.elasticsearch.action.LatchedActionListener;
|
import org.elasticsearch.action.LatchedActionListener;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||||
import org.elasticsearch.action.bulk.BulkProcessor;
|
import org.elasticsearch.action.bulk.BulkProcessor;
|
||||||
|
@ -50,6 +51,7 @@ import org.elasticsearch.client.Request;
|
||||||
import org.elasticsearch.client.RequestOptions;
|
import org.elasticsearch.client.RequestOptions;
|
||||||
import org.elasticsearch.client.Response;
|
import org.elasticsearch.client.Response;
|
||||||
import org.elasticsearch.client.RestHighLevelClient;
|
import org.elasticsearch.client.RestHighLevelClient;
|
||||||
|
import org.elasticsearch.client.RethrottleRequest;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -75,6 +77,7 @@ import org.elasticsearch.script.Script;
|
||||||
import org.elasticsearch.script.ScriptType;
|
import org.elasticsearch.script.ScriptType;
|
||||||
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
||||||
import org.elasticsearch.search.sort.SortOrder;
|
import org.elasticsearch.search.sort.SortOrder;
|
||||||
|
import org.elasticsearch.tasks.TaskId;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
@ -902,6 +905,48 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testReindexRethrottle() throws Exception {
|
||||||
|
RestHighLevelClient client = highLevelClient();
|
||||||
|
TaskId taskId = new TaskId("oTUltX4IQMOUUVeiohTt8A:124");
|
||||||
|
{
|
||||||
|
// tag::rethrottle-disable-request
|
||||||
|
RethrottleRequest rethrottleRequest = new RethrottleRequest(taskId); // <1>
|
||||||
|
client.reindexRethrottle(rethrottleRequest, RequestOptions.DEFAULT);
|
||||||
|
// end::rethrottle-disable-request
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// tag::rethrottle-request
|
||||||
|
RethrottleRequest rethrottleRequest = new RethrottleRequest(taskId, 100.0f); // <1>
|
||||||
|
client.reindexRethrottle(rethrottleRequest, RequestOptions.DEFAULT);
|
||||||
|
// end::rethrottle-request
|
||||||
|
}
|
||||||
|
|
||||||
|
// tag::rethrottle-request-async
|
||||||
|
ActionListener<ListTasksResponse> listener = new ActionListener<ListTasksResponse>() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(ListTasksResponse response) {
|
||||||
|
// <1>
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
// <2>
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// end::rethrottle-request-async
|
||||||
|
|
||||||
|
// Replace the empty listener by a blocking listener in test
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
listener = new LatchedActionListener<>(listener, latch);
|
||||||
|
|
||||||
|
RethrottleRequest rethrottleRequest = new RethrottleRequest(taskId);
|
||||||
|
// tag::rethrottle-execute-async
|
||||||
|
client.reindexRethrottleAsync(rethrottleRequest, RequestOptions.DEFAULT, listener); // <1>
|
||||||
|
// end::rethrottle-execute-async
|
||||||
|
assertTrue(latch.await(30L, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
public void testUpdateByQuery() throws Exception {
|
public void testUpdateByQuery() throws Exception {
|
||||||
RestHighLevelClient client = highLevelClient();
|
RestHighLevelClient client = highLevelClient();
|
||||||
{
|
{
|
||||||
|
|
|
@ -0,0 +1,60 @@
|
||||||
|
[[java-rest-high-document-reindex-rethrottle]]
|
||||||
|
=== Reindex Rethrottle API
|
||||||
|
|
||||||
|
[[java-rest-high-document-reindex-rethrottle-request]]
|
||||||
|
==== Reindex Rethrolle Request
|
||||||
|
|
||||||
|
A `RethrottleRequest` can be used to change existing throttling on a runnind
|
||||||
|
reindex task or disable it entirely. It requires the task Id of the reindex
|
||||||
|
task to change.
|
||||||
|
|
||||||
|
In its simplest form, you can use it to disable throttling of a running
|
||||||
|
reindex task using the following:
|
||||||
|
|
||||||
|
["source","java",subs="attributes,callouts,macros"]
|
||||||
|
--------------------------------------------------
|
||||||
|
include-tagged::{doc-tests}/CRUDDocumentationIT.java[rethrottle-disable-request]
|
||||||
|
--------------------------------------------------
|
||||||
|
<1> Create a `RethrottleRequest` that disables throttling for a specific task id
|
||||||
|
|
||||||
|
By providing a `requestsPerSecond` argument, the request will change the
|
||||||
|
existing task throttling to the specified value:
|
||||||
|
|
||||||
|
["source","java",subs="attributes,callouts,macros"]
|
||||||
|
--------------------------------------------------
|
||||||
|
include-tagged::{doc-tests}/CRUDDocumentationIT.java[rethrottle-request]
|
||||||
|
--------------------------------------------------
|
||||||
|
<1> Request to change the throttling of a task to 100 requests per second
|
||||||
|
|
||||||
|
[[java-rest-high-document-reindex-rethrottle-async]]
|
||||||
|
==== Asynchronous Execution
|
||||||
|
|
||||||
|
The asynchronous execution of a rethrottle request requires both the `RethrottleRequest`
|
||||||
|
instance and an `ActionListener` instance to be passed to the asynchronous
|
||||||
|
method:
|
||||||
|
|
||||||
|
["source","java",subs="attributes,callouts,macros"]
|
||||||
|
--------------------------------------------------
|
||||||
|
include-tagged::{doc-tests}/CRUDDocumentationIT.java[rethrottle-execute-async]
|
||||||
|
--------------------------------------------------
|
||||||
|
<1> The RethrottleRequest to execute and the ActionListener to use when the
|
||||||
|
execution completes
|
||||||
|
|
||||||
|
The asynchronous method does not block and returns immediately.
|
||||||
|
Once it is completed the `ActionListener` is called back using the `onResponse` method
|
||||||
|
if the execution successfully completed or using the `onFailure` method if
|
||||||
|
it failed. A typical listener looks like this:
|
||||||
|
|
||||||
|
["source","java",subs="attributes,callouts,macros"]
|
||||||
|
--------------------------------------------------
|
||||||
|
include-tagged::{doc-tests}/CRUDDocumentationIT.java[rethrottle-request-async]
|
||||||
|
--------------------------------------------------
|
||||||
|
<1> Code executed when the request is successfully completed
|
||||||
|
<2> Code executed when the request fails with an exception
|
||||||
|
|
||||||
|
[[java-rest-high-document-reindex-retrottle-response]]
|
||||||
|
==== Rethrottle Response
|
||||||
|
|
||||||
|
Rethrottling returns the task that has been rethrottled in the form of a
|
||||||
|
`ListTasksResponse`. The structure of this response object is described in detail
|
||||||
|
in <<java-rest-high-cluster-list-tasks-response,this section>>.
|
|
@ -18,6 +18,7 @@ Multi-document APIs::
|
||||||
* <<java-rest-high-document-reindex>>
|
* <<java-rest-high-document-reindex>>
|
||||||
* <<java-rest-high-document-update-by-query>>
|
* <<java-rest-high-document-update-by-query>>
|
||||||
* <<java-rest-high-document-delete-by-query>>
|
* <<java-rest-high-document-delete-by-query>>
|
||||||
|
* <<java-rest-high-document-reindex-rethrottle>>
|
||||||
|
|
||||||
include::document/index.asciidoc[]
|
include::document/index.asciidoc[]
|
||||||
include::document/get.asciidoc[]
|
include::document/get.asciidoc[]
|
||||||
|
@ -29,6 +30,7 @@ include::document/multi-get.asciidoc[]
|
||||||
include::document/reindex.asciidoc[]
|
include::document/reindex.asciidoc[]
|
||||||
include::document/update-by-query.asciidoc[]
|
include::document/update-by-query.asciidoc[]
|
||||||
include::document/delete-by-query.asciidoc[]
|
include::document/delete-by-query.asciidoc[]
|
||||||
|
include::document/reindex-rethrottle.asciidoc[]
|
||||||
|
|
||||||
== Search APIs
|
== Search APIs
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue