[Transform] Transform _schedule_now API (#92948)

This commit is contained in:
Przemysław Witek 2023-02-02 19:03:16 +01:00 committed by GitHub
parent a71210c111
commit f60401a61c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 1011 additions and 5 deletions

View file

@ -0,0 +1,6 @@
pr: 92948
summary: Transform _schedule_now API
area: Transform
type: feature
issues:
- 44722

View file

@ -18,6 +18,7 @@ _transform/
* <<reset-transform,Reset {transforms}>>
* <<start-transform,Start {transforms}>>
* <<stop-transform,Stop {transforms}>>
* <<schedule-now-transform,Schedule Now {transforms}>>
* <<update-transform,Update {transforms}>>
For the full list, see <<transform-apis>>.

View file

@ -15,6 +15,8 @@ include::reset-transform.asciidoc[leveloffset=+2]
include::start-transform.asciidoc[leveloffset=+2]
//STOP
include::stop-transform.asciidoc[leveloffset=+2]
//SCHEDULE_NOW
include::schedule-now-transform.asciidoc[leveloffset=+2]
//UPDATE-UPGRADE
include::update-transform.asciidoc[leveloffset=+2]
include::upgrade-transforms.asciidoc[leveloffset=+2]

View file

@ -0,0 +1,63 @@
[role="xpack"]
[testenv="basic"]
[[schedule-now-transform]]
= Schedule Now {transform} API
[subs="attributes"]
++++
<titleabbrev>Shedule Now {transform}</titleabbrev>
++++
Schedules now a {transform}.
[[schedule-now-transform-request]]
== {api-request-title}
`POST _transform/<transform_id>/_schedule_now`
[[schedule-now-transform-prereqs]]
== {api-prereq-title}
* Requires the `manage_transform` cluster privilege. This privilege is included
in the `transform_admin` built-in role.
[schedule-now-transform-desc]]
== {api-description-title}
If you _schedule_now a {transform}, it will process the new data instantly,
without waiting for the configured `frequency` interval.
After _schedule_now API is called, the transform will be processed again at
`now + frequency` unless _schedule_now API is called again in the meantime.
[[schedule-now-transform-path-parms]]
== {api-path-parms-title}
`<transform_id>`::
(Required, string)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-id]
[[schedule-now-transform-query-parms]]
== {api-query-parms-title}
`timeout`::
(Optional, time)
Period to wait for a response. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.
[[schedule-now-transform-examples]]
== {api-examples-title}
[source,console]
--------------------------------------------------
POST _transform/ecommerce_transform/_schedule_now
--------------------------------------------------
// TEST[skip:setup kibana sample data]
When the {transform} is scheduled now, you receive the following results:
[source,console-result]
----
{
"acknowledged" : true
}
----

View file

@ -10,5 +10,6 @@
* <<reset-transform>>
* <<start-transform>>
* <<stop-transform>>
* <<schedule-now-transform>>
* <<update-transform>>
* <<upgrade-transforms>>

View file

@ -0,0 +1,38 @@
{
"transform.schedule_now_transform":{
"documentation":{
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/schedule-now-transform.html",
"description":"Schedules now a transform."
},
"stability":"stable",
"visibility":"public",
"headers":{
"accept":[ "application/json"],
"content_type":["application/json"]
},
"url":{
"paths":[
{
"path":"/_transform/{transform_id}/_schedule_now",
"methods":[
"POST"
],
"parts":{
"transform_id":{
"type":"string",
"required":true,
"description":"The id of the transform."
}
}
}
]
},
"params":{
"timeout":{
"type":"time",
"required":false,
"description":"Controls the time to wait for the scheduling to take place"
}
}
}
}

View file

@ -0,0 +1,156 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.transform.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
public class ScheduleNowTransformAction extends ActionType<ScheduleNowTransformAction.Response> {
public static final ScheduleNowTransformAction INSTANCE = new ScheduleNowTransformAction();
public static final String NAME = "cluster:admin/transform/schedule_now";
private ScheduleNowTransformAction() {
super(NAME, ScheduleNowTransformAction.Response::new);
}
public static class Request extends BaseTasksRequest<Request> {
private final String id;
public Request(String id, TimeValue timeout) {
this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName());
this.setTimeout(ExceptionsHelper.requireNonNull(timeout, TransformField.TIMEOUT.getPreferredName()));
}
public Request(StreamInput in) throws IOException {
super(in);
this.id = in.readString();
}
public static Request fromXContent(final String id, final TimeValue timeout) {
return new Request(id, timeout);
}
@Override
public ActionRequestValidationException validate() {
if (Metadata.ALL.equals(id)) {
ActionRequestValidationException e = new ActionRequestValidationException();
e.addValidationError("_schedule_now API does not support _all wildcard");
return e;
}
return null;
}
public String getId() {
return id;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
}
@Override
public int hashCode() {
// the base class does not implement hashCode, therefore we need to hash timeout ourselves
return Objects.hash(getTimeout(), id);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
// the base class does not implement equals, therefore we need to check timeout ourselves
return this.id.equals(other.id) && getTimeout().equals(other.getTimeout());
}
}
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
public static final Response TRUE = new Response(true);
private final boolean acknowledged;
public Response(StreamInput in) throws IOException {
super(in);
acknowledged = in.readBoolean();
}
public Response(boolean acknowledged) {
super(Collections.emptyList(), Collections.emptyList());
this.acknowledged = acknowledged;
}
public Response(
List<TaskOperationFailure> taskFailures,
List<? extends ElasticsearchException> nodeFailures,
boolean acknowledged
) {
super(taskFailures, nodeFailures);
this.acknowledged = acknowledged;
}
public boolean isAcknowledged() {
return acknowledged;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
toXContentCommon(builder, params);
builder.field("acknowledged", acknowledged);
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ScheduleNowTransformAction.Response response = (ScheduleNowTransformAction.Response) o;
return acknowledged == response.acknowledged;
}
@Override
public int hashCode() {
return Objects.hash(acknowledged);
}
}
}

View file

@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.transform.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.transform.action.ScheduleNowTransformAction.Request;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class ScheduleNowTransformActionRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomAlphaOfLengthBetween(1, 20), TimeValue.parseTimeValue(randomTimeValue(), "timeout"));
}
@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
@Override
protected Request mutateInstance(Request instance) {
String id = instance.getId();
TimeValue timeout = instance.getTimeout();
switch (between(0, 1)) {
case 0 -> id += randomAlphaOfLengthBetween(1, 5);
case 1 -> timeout = new TimeValue(timeout.duration() + randomLongBetween(1, 5), timeout.timeUnit());
default -> throw new AssertionError("Illegal randomization branch");
}
return new ScheduleNowTransformAction.Request(id, timeout);
}
public void testValidationSuccess() {
Request request = new Request("id", TimeValue.ZERO);
assertThat(request.validate(), is(nullValue()));
}
public void testValidationFailure() {
Request request = new Request("_all", TimeValue.ZERO);
ActionRequestValidationException e = request.validate();
assertThat(e, is(notNullValue()));
assertThat(e.validationErrors(), contains("_schedule_now API does not support _all wildcard"));
}
}

View file

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.transform.action;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.xpack.core.transform.action.ScheduleNowTransformAction.Response;
public class ScheduleNowTransformActionResponseTests extends AbstractWireSerializingTransformTestCase<Response> {
@Override
protected Response createTestInstance() {
return new Response(randomBoolean());
}
@Override
protected Reader<Response> instanceReader() {
return Response::new;
}
@Override
protected Response mutateInstance(Response instance) {
boolean acknowledged = instance.isAcknowledged();
return new Response(acknowledged == false);
}
public void testResponseTrue() {
assertTrue(Response.TRUE.isAcknowledged());
}
}

View file

@ -88,6 +88,7 @@ public class Constants {
"cluster:admin/transform/reset",
"cluster:admin/transform/start",
"cluster:admin/transform/stop",
"cluster:admin/transform/schedule_now",
"cluster:admin/transform/update",
"cluster:admin/transform/upgrade",
"cluster:admin/transform/validate",

View file

@ -137,6 +137,39 @@ teardown:
transform.start_transform:
transform_id: "airline-transform-start-stop"
---
"Test schedule_now on a stopped transform":
- do:
transform.schedule_now_transform:
transform_id: "airline-transform-start-stop"
- match: { acknowledged: true }
---
"Test schedule_now on an already started transform":
- do:
transform.start_transform:
transform_id: "airline-transform-start-stop"
- match: { acknowledged: true }
- do:
transform.schedule_now_transform:
transform_id: "airline-transform-start-stop"
- match: { acknowledged: true }
---
"Test schedule_now all transforms":
- do:
catch: /_schedule_now API does not support _all wildcard/
transform.schedule_now_transform:
transform_id: "_all"
---
"Test schedule_now missing transform":
- do:
catch: missing
transform.schedule_now_transform:
transform_id: "missing"
---
"Verify start transform creates destination index with appropriate mapping":
- do:

View file

@ -495,6 +495,16 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
assertThat(resetTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}
protected void scheduleNowTransform(String transformId) throws IOException {
final Request scheduleNowTransformRequest = createRequestWithAuth(
"POST",
getTransformEndpoint() + transformId + "/_schedule_now",
null
);
Map<String, Object> scheduleNowTransformResponse = entityAsMap(client().performRequest(scheduleNowTransformRequest));
assertThat(scheduleNowTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}
protected Request createRequestWithSecondaryAuth(
final String method,
final String endpoint,

View file

@ -0,0 +1,202 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.transform.integration;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.core.Strings;
import org.junit.Before;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class TransformScheduleNowIT extends TransformRestTestCase {
private static final String TEST_USER_NAME = "transform_user";
private static final String TEST_ADMIN_USER_NAME_1 = "transform_admin_1";
private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1 = basicAuthHeaderValue(
TEST_ADMIN_USER_NAME_1,
TEST_PASSWORD_SECURE_STRING
);
private static final String DATA_ACCESS_ROLE = "test_data_access";
private static boolean indicesCreated = false;
// preserve indices in order to reuse source indices in several test cases
@Override
protected boolean preserveIndicesUponCompletion() {
return true;
}
@Override
protected boolean enableWarningsCheck() {
return false;
}
@Override
protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException {
RestClientBuilder builder = RestClient.builder(hosts);
configureClient(builder, settings);
builder.setStrictDeprecationMode(false);
return builder.build();
}
@Before
public void createIndexes() throws IOException {
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
setupUser(TEST_USER_NAME, Arrays.asList("transform_user", DATA_ACCESS_ROLE));
setupUser(TEST_ADMIN_USER_NAME_1, Arrays.asList("transform_admin", DATA_ACCESS_ROLE));
// it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack
if (indicesCreated) {
return;
}
createReviewsIndex();
indicesCreated = true;
}
public void testScheduleNow() throws Exception {
String sourceIndex = REVIEWS_INDEX_NAME;
String transformId = "old_transform";
String destIndex = transformId + "_idx";
setupDataAccessRole(DATA_ACCESS_ROLE, sourceIndex, destIndex);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1
);
String config = Strings.format("""
{
"dest": {
"index": "%s"
},
"source": {
"index": "%s"
},
"pivot": {
"group_by": {
"reviewer": {
"terms": {
"field": "user_id"
}
}
},
"aggregations": {
"avg_rating": {
"avg": {
"field": "stars"
}
}
}
},
"sync": {
"time": {
"field": "timestamp",
"delay": "1s"
}
},
"frequency": "1h"
}""", destIndex, sourceIndex);
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
// Verify that _schedule_now is a no-op on a new transform
scheduleNowTransform(transformId);
// Start the transform, notice that frequency is set pretty high
startAndWaitForContinuousTransform(transformId, destIndex, null, null, 1L);
String newUser = "user_666";
verifyNumberOfSourceDocs(sourceIndex, newUser, 0);
verifyDestDoc(destIndex, newUser, 0, null);
// Ingest a new document to *source* index
indexSourceDoc(sourceIndex, newUser, 7);
// Wait a little bit to accommodate differences between "now" timestamp in the test and the current time in the server
Thread.sleep(5_000);
// Verify the new data is in the source index but not yet in the destination index
verifyNumberOfSourceDocs(sourceIndex, newUser, 1);
verifyDestDoc(destIndex, newUser, 0, null);
// Schedule now the transform to force processing the new data despite 1h-long interval
scheduleNowTransform(transformId);
waitForTransformCheckpoint(transformId, 2L);
// Verify that the new data is available in the destination index after _schedule_now
verifyNumberOfSourceDocs(sourceIndex, newUser, 1);
verifyDestDoc(destIndex, newUser, 1, 7.0);
// Ingest a new document to *source* index
indexSourceDoc(sourceIndex, newUser, 9);
// Wait a little bit to accommodate differences between "now" timestamp in the test and the current time in the server
Thread.sleep(5_000);
// Verify the new data is in the source index but not yet in the destination index
verifyNumberOfSourceDocs(sourceIndex, newUser, 2);
verifyDestDoc(destIndex, newUser, 1, 7.0);
// Try scheduling now all the transforms at once using _all wildcard, it is *not* supported
ResponseException e = expectThrows(ResponseException.class, () -> scheduleNowTransform("_all"));
assertThat(e.getMessage(), containsString("_schedule_now API does not support _all wildcard"));
// Schedule now the transform to force processing the new data despite 1h-long interval
scheduleNowTransform(transformId);
waitForTransformCheckpoint(transformId, 3L);
// Verify that the new data is available in the destination index after _schedule_now
verifyNumberOfSourceDocs(sourceIndex, newUser, 2);
verifyDestDoc(destIndex, newUser, 1, 8.0); // 8.0 = (7.0 + 9.0) / 2
// Verify that _schedule_now works on a stopped transform
stopTransform(transformId, false);
scheduleNowTransform(transformId);
}
private void indexSourceDoc(String sourceIndex, String user, int stars) throws IOException {
String doc = Strings.format("""
{"user_id":"%s","stars":%s,"timestamp":%s}
""", user, stars, Instant.now().toEpochMilli());
Request indexRequest = new Request("POST", sourceIndex + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity(doc);
Map<String, Object> indexResponse = entityAsMap(client().performRequest(indexRequest));
assertThat(indexResponse.get("result"), equalTo("created"));
}
private void verifyNumberOfSourceDocs(String sourceIndex, String user, int expectedDocCount) throws IOException {
Map<String, Object> searchResult = getAsMap(sourceIndex + "/_search?q=user_id:" + user);
assertEquals(expectedDocCount, XContentMapValues.extractValue("hits.total.value", searchResult));
}
private void verifyDestDoc(String destIndex, String user, int expectedDocCount, Double expectedAvgRating) throws IOException {
Map<String, Object> searchResult = getAsMap(destIndex + "/_search?q=reviewer:" + user);
assertEquals(expectedDocCount, XContentMapValues.extractValue("hits.total.value", searchResult));
if (expectedAvgRating != null) {
assertEquals(List.of(expectedAvgRating), XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult));
}
}
}

View file

@ -68,6 +68,7 @@ import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction;
import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction;
import org.elasticsearch.xpack.core.transform.action.PutTransformAction;
import org.elasticsearch.xpack.core.transform.action.ResetTransformAction;
import org.elasticsearch.xpack.core.transform.action.ScheduleNowTransformAction;
import org.elasticsearch.xpack.core.transform.action.SetResetModeAction;
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
@ -83,6 +84,7 @@ import org.elasticsearch.xpack.transform.action.TransportGetTransformStatsAction
import org.elasticsearch.xpack.transform.action.TransportPreviewTransformAction;
import org.elasticsearch.xpack.transform.action.TransportPutTransformAction;
import org.elasticsearch.xpack.transform.action.TransportResetTransformAction;
import org.elasticsearch.xpack.transform.action.TransportScheduleNowTransformAction;
import org.elasticsearch.xpack.transform.action.TransportSetTransformResetModeAction;
import org.elasticsearch.xpack.transform.action.TransportStartTransformAction;
import org.elasticsearch.xpack.transform.action.TransportStopTransformAction;
@ -101,6 +103,7 @@ import org.elasticsearch.xpack.transform.rest.action.RestGetTransformStatsAction
import org.elasticsearch.xpack.transform.rest.action.RestPreviewTransformAction;
import org.elasticsearch.xpack.transform.rest.action.RestPutTransformAction;
import org.elasticsearch.xpack.transform.rest.action.RestResetTransformAction;
import org.elasticsearch.xpack.transform.rest.action.RestScheduleNowTransformAction;
import org.elasticsearch.xpack.transform.rest.action.RestStartTransformAction;
import org.elasticsearch.xpack.transform.rest.action.RestStopTransformAction;
import org.elasticsearch.xpack.transform.rest.action.RestUpdateTransformAction;
@ -190,7 +193,8 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
new RestUpdateTransformAction(),
new RestCatTransformAction(),
new RestUpgradeTransformsAction(),
new RestResetTransformAction()
new RestResetTransformAction(),
new RestScheduleNowTransformAction()
);
}
@ -209,6 +213,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
new ActionHandler<>(SetResetModeAction.INSTANCE, TransportSetTransformResetModeAction.class),
new ActionHandler<>(UpgradeTransformsAction.INSTANCE, TransportUpgradeTransformsAction.class),
new ActionHandler<>(ResetTransformAction.INSTANCE, TransportResetTransformAction.class),
new ActionHandler<>(ScheduleNowTransformAction.INSTANCE, TransportScheduleNowTransformAction.class),
// internal, no rest endpoint
new ActionHandler<>(ValidateTransformAction.INSTANCE, TransportValidateTransformAction.class),

View file

@ -0,0 +1,179 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.transform.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.transform.action.ScheduleNowTransformAction;
import org.elasticsearch.xpack.core.transform.action.ScheduleNowTransformAction.Request;
import org.elasticsearch.xpack.core.transform.action.ScheduleNowTransformAction.Response;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.transforms.TransformTask;
import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler;
import java.util.List;
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;
public class TransportScheduleNowTransformAction extends TransportTasksAction<TransformTask, Request, Response, Response> {
private static final Logger logger = LogManager.getLogger(TransportScheduleNowTransformAction.class);
private final TransformConfigManager transformConfigManager;
private final TransformScheduler transformScheduler;
private final SecurityContext securityContext;
@Inject
public TransportScheduleNowTransformAction(
Settings settings,
TransportService transportService,
ThreadPool threadPool,
ActionFilters actionFilters,
ClusterService clusterService,
TransformServices transformServices
) {
super(
ScheduleNowTransformAction.NAME,
clusterService,
transportService,
actionFilters,
Request::new,
Response::new,
Response::new,
ThreadPool.Names.SAME
);
this.transformConfigManager = transformServices.getConfigManager();
this.transformScheduler = transformServices.getScheduler();
this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings)
? new SecurityContext(settings, threadPool.getThreadContext())
: null;
}
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final ClusterState clusterState = clusterService.state();
XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);
useSecondaryAuthIfAvailable(securityContext, () -> {
ActionListener<TransformConfig> getTransformListener = ActionListener.wrap(unusedConfig -> {
PersistentTasksCustomMetadata.PersistentTask<?> transformTask = TransformTask.getTransformTask(
request.getId(),
clusterState
);
// to send a request to schedule now the transform at runtime, several requirements must be met:
// - transform must be running, meaning a task exists
// - transform is not failed (stopped transforms do not have a task)
if (transformTask != null
&& transformTask.isAssigned()
&& transformTask.getState() instanceof TransformState
&& ((TransformState) transformTask.getState()).getTaskState() != TransformTaskState.FAILED) {
ActionListener<Response> taskScheduleNowListener = ActionListener.wrap(listener::onResponse, e -> {
// benign: A transform might have been stopped meanwhile, this is not a problem
if (e instanceof TransformTaskDisappearedDuringScheduleNowException) {
logger.debug(
() -> format("[%s] transform task disappeared during schedule_now, ignoring.", request.getId()),
e
);
listener.onResponse(Response.TRUE);
return;
}
if (e instanceof TransformTaskScheduleNowException) {
logger.warn(() -> format("[%s] failed to schedule now the running transform.", request.getId()), e);
listener.onResponse(Response.TRUE);
return;
}
listener.onFailure(e);
});
request.setNodes(transformTask.getExecutorNode());
super.doExecute(task, request, taskScheduleNowListener);
} else {
listener.onResponse(Response.TRUE);
}
}, listener::onFailure);
// <1> Get the config to verify it exists and is valid
transformConfigManager.getTransformConfiguration(request.getId(), getTransformListener);
});
}
@Override
protected void taskOperation(Task actionTask, Request request, TransformTask transformTask, ActionListener<Response> listener) {
transformScheduler.scheduleNow(request.getId());
listener.onResponse(Response.TRUE);
}
@Override
protected Response newResponse(
Request request,
List<Response> tasks,
List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions
) {
if (tasks.isEmpty()) {
if (taskOperationFailures.isEmpty() == false) {
throw new TransformTaskScheduleNowException(
"Failed to schedule now the running transform due to task operation failure.",
taskOperationFailures.get(0).getCause()
);
} else if (failedNodeExceptions.isEmpty() == false) {
FailedNodeException failedNodeException = failedNodeExceptions.get(0);
Throwable failedNodeExceptionCause = ExceptionsHelper.unwrapCause(failedNodeException.getCause());
if (failedNodeExceptionCause instanceof ActionNotFoundTransportException) {
throw (ActionNotFoundTransportException) failedNodeExceptionCause;
}
throw new TransformTaskScheduleNowException(
"Failed to schedule now the running transform due to failed node exception.",
failedNodeExceptions.get(0)
);
} else {
throw new TransformTaskDisappearedDuringScheduleNowException(
"Could not schedule now the running transform as it has been stopped."
);
}
}
return tasks.get(0);
}
private static class TransformTaskScheduleNowException extends ElasticsearchException {
TransformTaskScheduleNowException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}
private static class TransformTaskDisappearedDuringScheduleNowException extends ElasticsearchException {
TransformTaskDisappearedDuringScheduleNowException(String msg) {
super(msg);
}
}
}

View file

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.transform.rest.action;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.action.ScheduleNowTransformAction;
import java.io.IOException;
import java.util.List;
import static org.elasticsearch.rest.RestRequest.Method.POST;
public class RestScheduleNowTransformAction extends BaseRestHandler {
@Override
public List<Route> routes() {
return List.of(new Route(POST, TransformField.REST_BASE_PATH_TRANSFORMS_BY_ID + "_schedule_now"));
}
@Override
public String getName() {
return "transform_schedule_now_transform_action";
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String id = restRequest.param(TransformField.ID.getPreferredName());
TimeValue timeout = restRequest.paramAsTime(TransformField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT);
ScheduleNowTransformAction.Request request = ScheduleNowTransformAction.Request.fromXContent(id, timeout);
return channel -> client.execute(ScheduleNowTransformAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View file

@ -115,9 +115,12 @@ public final class TransformScheduler {
}
if (isTraceEnabled) {
Instant processingFinished = clock.instant();
logger.trace(
format("Processing scheduled tasks finished, took %dms", Duration.between(processingStarted, processingFinished).toMillis())
);
long tookMs = Duration.between(processingStarted, processingFinished).toMillis();
if (taskWasProcessed) {
logger.trace(format("Processing one scheduled task finished, took %dms", tookMs));
} else {
logger.trace(format("Looking for scheduled tasks to process finished, took %dms", tookMs));
}
}
if (taskWasProcessed == false) {
return;
@ -228,6 +231,30 @@ public final class TransformScheduler {
);
}
/**
* Updates the transform task's next_scheduled_time so that it is set to now.
* Doing so may result in the task being processed earlier that it would normally (i.e.: according to its frequency) be.
*
* @param transformId id of the transform to schedule now
*/
public void scheduleNow(String transformId) {
logger.trace(() -> format("[%s] schedule_now transform", transformId));
long currentTimeMillis = clock.millis();
// Update the task's next_scheduled_time
scheduledTasks.update(
transformId,
task -> new TransformScheduledTask(
task.getTransformId(),
task.getFrequency(),
task.getLastTriggeredTimeMillis(),
task.getFailureCount(),
currentTimeMillis, // we schedule this task at current clock time so that it is processed ASAP
task.getListener()
)
);
processScheduledTasks();
}
/**
* De-registers the given transform by removing it from the queue.
*

View file

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.transform.rest.action;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xpack.core.transform.action.ScheduleNowTransformAction;
import org.junit.Before;
import java.util.Map;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
public class RestScheduleNowTransformActionTests extends ESTestCase {
private static final String ID = "id";
private static final String TIMEOUT = "timeout";
private RestChannel channel;
private NodeClient client;
@Before
public void initializeMocks() {
channel = mock(RestChannel.class);
client = mock(NodeClient.class);
}
public void testHandleRequest() throws Exception {
RestScheduleNowTransformAction handler = new RestScheduleNowTransformAction();
FakeRestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams(Map.of(ID, "my-id")).build();
handler.handleRequest(request, channel, client);
ScheduleNowTransformAction.Request expectedActionRequest = new ScheduleNowTransformAction.Request(
"my-id",
TimeValue.timeValueSeconds(30)
);
verify(client).execute(eq(ScheduleNowTransformAction.INSTANCE), eq(expectedActionRequest), any());
verifyNoMoreInteractions(client);
}
public void testHandleRequestWithTimeout() throws Exception {
RestScheduleNowTransformAction handler = new RestScheduleNowTransformAction();
FakeRestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams(Map.of(ID, "my-id", TIMEOUT, "45s"))
.build();
handler.handleRequest(request, channel, client);
ScheduleNowTransformAction.Request expectedActionRequest = new ScheduleNowTransformAction.Request(
"my-id",
TimeValue.timeValueSeconds(45)
);
verify(client).execute(eq(ScheduleNowTransformAction.INSTANCE), eq(expectedActionRequest), any());
verifyNoMoreInteractions(client);
}
}

View file

@ -29,12 +29,15 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
@ -171,6 +174,55 @@ public class TransformSchedulerTests extends ESTestCase {
transformScheduler.stop();
}
public void testScheduleNow() {
String transformId = "test-schedule-now-with-fake-clock";
TimeValue frequency = TimeValue.timeValueHours(1);
TransformTaskParams transformTaskParams = new TransformTaskParams(transformId, Version.CURRENT, frequency, false);
FakeClock clock = new FakeClock(Instant.ofEpochMilli(0));
CopyOnWriteArrayList<TransformScheduler.Event> events = new CopyOnWriteArrayList<>();
TransformScheduler.Listener listener = events::add;
TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS);
transformScheduler.registerTransform(transformTaskParams, listener);
assertThat(
transformScheduler.getTransformScheduledTasks(),
contains(new TransformScheduledTask(transformId, frequency, 0L, 0, 60 * 60 * 1000, listener))
);
assertThat(events, hasSize(1));
// Advance time by 30 minutes (half of the configured transform frequency).
clock.advanceTimeBy(Duration.ofMillis(frequency.millis() / 2));
assertThat(
transformScheduler.getTransformScheduledTasks(),
contains(new TransformScheduledTask(transformId, frequency, 0L, 0, 60 * 60 * 1000, listener))
);
assertThat(events, hasSize(1));
// Schedule the transform now even though it is half-way through between checkpoints.
transformScheduler.scheduleNow(transformId);
assertThat(
transformScheduler.getTransformScheduledTasks(),
contains(new TransformScheduledTask(transformId, frequency, 30 * 60 * 1000L, 0, 90 * 60 * 1000, listener))
);
assertThat(events, hasSize(2));
clock.advanceTimeBy(Duration.ofMinutes(1));
transformScheduler.scheduleNow(transformId);
assertThat(
transformScheduler.getTransformScheduledTasks(),
contains(new TransformScheduledTask(transformId, frequency, 31 * 60 * 1000L, 0, 91 * 60 * 1000, listener))
);
assertThat(events, hasSize(3));
assertThat(events.get(0), is(equalTo(new TransformScheduler.Event(transformId, 0, 0))));
assertThat(events.get(1), is(equalTo(new TransformScheduler.Event(transformId, 30 * 60 * 1000, 30 * 60 * 1000))));
assertThat(events.get(2), is(equalTo(new TransformScheduler.Event(transformId, 31 * 60 * 1000, 31 * 60 * 1000))));
transformScheduler.deregisterTransform(transformId);
assertThat(transformScheduler.getTransformScheduledTasks(), is(empty()));
transformScheduler.stop();
}
public void testConcurrentProcessing() throws Exception {
String transformId = "test-with-fake-clock-concurrent";
int frequencySeconds = 5;
@ -252,7 +304,7 @@ public class TransformSchedulerTests extends ESTestCase {
);
}
public void testWithSystemClock() throws Exception {
public void testSchedulingWithSystemClock() throws Exception {
String transformId = "test-with-system-clock";
TimeValue frequency = TimeValue.timeValueSeconds(1);
TransformTaskParams transformTaskParams = new TransformTaskParams(transformId, Version.CURRENT, frequency, false);
@ -277,6 +329,30 @@ public class TransformSchedulerTests extends ESTestCase {
transformScheduler.stop();
}
public void testScheduleNowWithSystemClock() throws Exception {
String transformId = "test-schedule-now-with-system-clock";
TimeValue frequency = TimeValue.timeValueHours(1); // Very long pause between checkpoints
TransformTaskParams transformTaskParams = new TransformTaskParams(transformId, Version.CURRENT, frequency, false);
Clock clock = Clock.systemUTC();
CopyOnWriteArrayList<TransformScheduler.Event> events = new CopyOnWriteArrayList<>();
TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS);
transformScheduler.start();
transformScheduler.registerTransform(transformTaskParams, events::add);
assertThat(events, hasSize(1));
Thread.sleep(5 * 1000L);
transformScheduler.scheduleNow(transformId);
assertThat(events, hasSize(2));
assertThat(events.get(0).transformId(), is(equalTo(transformId)));
assertThat(events.get(1).transformId(), is(equalTo(transformId)));
assertThat(events.get(1).scheduledTime() - events.get(0).triggeredTime(), is(allOf(greaterThan(4 * 1000L), lessThan(6 * 1000L))));
transformScheduler.deregisterTransform(transformId);
transformScheduler.stop();
}
public void testScheduledTransformTaskEqualsAndHashCode() {
Supplier<TransformScheduler.Listener> listenerSupplier = () -> new TransformScheduler.Listener() {
@Override