From f60401a61cc12839b8b8657460fb177e1be7aafa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Witek?= Date: Thu, 2 Feb 2023 19:03:16 +0100 Subject: [PATCH] [Transform] Transform `_schedule_now` API (#92948) --- docs/changelog/92948.yaml | 6 + .../reference/transform/api-quickref.asciidoc | 1 + docs/reference/transform/apis/index.asciidoc | 2 + .../apis/schedule-now-transform.asciidoc | 63 ++++++ .../transform/apis/transform-apis.asciidoc | 1 + .../api/transform.schedule_now_transform.json | 38 ++++ .../action/ScheduleNowTransformAction.java | 156 ++++++++++++++ ...cheduleNowTransformActionRequestTests.java | 58 +++++ ...heduleNowTransformActionResponseTests.java | 34 +++ .../xpack/security/operator/Constants.java | 1 + .../test/transform/transforms_start_stop.yml | 33 +++ .../integration/TransformRestTestCase.java | 10 + .../integration/TransformScheduleNowIT.java | 202 ++++++++++++++++++ .../xpack/transform/Transform.java | 7 +- .../TransportScheduleNowTransformAction.java | 179 ++++++++++++++++ .../RestScheduleNowTransformAction.java | 45 ++++ .../scheduling/TransformScheduler.java | 33 ++- .../RestScheduleNowTransformActionTests.java | 69 ++++++ .../scheduling/TransformSchedulerTests.java | 78 ++++++- 19 files changed, 1011 insertions(+), 5 deletions(-) create mode 100644 docs/changelog/92948.yaml create mode 100644 docs/reference/transform/apis/schedule-now-transform.asciidoc create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/transform.schedule_now_transform.json create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/ScheduleNowTransformAction.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/ScheduleNowTransformActionRequestTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/ScheduleNowTransformActionResponseTests.java create mode 100644 x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformScheduleNowIT.java create mode 100644 x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportScheduleNowTransformAction.java create mode 100644 x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestScheduleNowTransformAction.java create mode 100644 x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/rest/action/RestScheduleNowTransformActionTests.java diff --git a/docs/changelog/92948.yaml b/docs/changelog/92948.yaml new file mode 100644 index 000000000000..a335e60035ce --- /dev/null +++ b/docs/changelog/92948.yaml @@ -0,0 +1,6 @@ +pr: 92948 +summary: Transform _schedule_now API +area: Transform +type: feature +issues: + - 44722 diff --git a/docs/reference/transform/api-quickref.asciidoc b/docs/reference/transform/api-quickref.asciidoc index 2f46b11abcf2..e6a71f7e2b48 100644 --- a/docs/reference/transform/api-quickref.asciidoc +++ b/docs/reference/transform/api-quickref.asciidoc @@ -18,6 +18,7 @@ _transform/ * <> * <> * <> +* <> * <> For the full list, see <>. diff --git a/docs/reference/transform/apis/index.asciidoc b/docs/reference/transform/apis/index.asciidoc index e52daf0ad599..bd3a7588db44 100644 --- a/docs/reference/transform/apis/index.asciidoc +++ b/docs/reference/transform/apis/index.asciidoc @@ -15,6 +15,8 @@ include::reset-transform.asciidoc[leveloffset=+2] include::start-transform.asciidoc[leveloffset=+2] //STOP include::stop-transform.asciidoc[leveloffset=+2] +//SCHEDULE_NOW +include::schedule-now-transform.asciidoc[leveloffset=+2] //UPDATE-UPGRADE include::update-transform.asciidoc[leveloffset=+2] include::upgrade-transforms.asciidoc[leveloffset=+2] diff --git a/docs/reference/transform/apis/schedule-now-transform.asciidoc b/docs/reference/transform/apis/schedule-now-transform.asciidoc new file mode 100644 index 000000000000..9bc42b8b7d30 --- /dev/null +++ b/docs/reference/transform/apis/schedule-now-transform.asciidoc @@ -0,0 +1,63 @@ +[role="xpack"] +[testenv="basic"] +[[schedule-now-transform]] += Schedule Now {transform} API + +[subs="attributes"] +++++ +Shedule Now {transform} +++++ + +Schedules now a {transform}. + +[[schedule-now-transform-request]] +== {api-request-title} + +`POST _transform//_schedule_now` + +[[schedule-now-transform-prereqs]] +== {api-prereq-title} + +* Requires the `manage_transform` cluster privilege. This privilege is included +in the `transform_admin` built-in role. + +[schedule-now-transform-desc]] +== {api-description-title} + +If you _schedule_now a {transform}, it will process the new data instantly, +without waiting for the configured `frequency` interval. +After _schedule_now API is called, the transform will be processed again at +`now + frequency` unless _schedule_now API is called again in the meantime. + +[[schedule-now-transform-path-parms]] +== {api-path-parms-title} + +``:: +(Required, string) +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-id] + +[[schedule-now-transform-query-parms]] +== {api-query-parms-title} + +`timeout`:: +(Optional, time) +Period to wait for a response. If no response is received before the timeout +expires, the request fails and returns an error. Defaults to `30s`. + +[[schedule-now-transform-examples]] +== {api-examples-title} + +[source,console] +-------------------------------------------------- +POST _transform/ecommerce_transform/_schedule_now +-------------------------------------------------- +// TEST[skip:setup kibana sample data] + +When the {transform} is scheduled now, you receive the following results: + +[source,console-result] +---- +{ + "acknowledged" : true +} +---- diff --git a/docs/reference/transform/apis/transform-apis.asciidoc b/docs/reference/transform/apis/transform-apis.asciidoc index f98e192e495d..2912ffb1b17d 100644 --- a/docs/reference/transform/apis/transform-apis.asciidoc +++ b/docs/reference/transform/apis/transform-apis.asciidoc @@ -10,5 +10,6 @@ * <> * <> * <> +* <> * <> * <> diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/transform.schedule_now_transform.json b/rest-api-spec/src/main/resources/rest-api-spec/api/transform.schedule_now_transform.json new file mode 100644 index 000000000000..81ba9e071cfd --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/transform.schedule_now_transform.json @@ -0,0 +1,38 @@ +{ + "transform.schedule_now_transform":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/schedule-now-transform.html", + "description":"Schedules now a transform." + }, + "stability":"stable", + "visibility":"public", + "headers":{ + "accept":[ "application/json"], + "content_type":["application/json"] + }, + "url":{ + "paths":[ + { + "path":"/_transform/{transform_id}/_schedule_now", + "methods":[ + "POST" + ], + "parts":{ + "transform_id":{ + "type":"string", + "required":true, + "description":"The id of the transform." + } + } + } + ] + }, + "params":{ + "timeout":{ + "type":"time", + "required":false, + "description":"Controls the time to wait for the scheduling to take place" + } + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/ScheduleNowTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/ScheduleNowTransformAction.java new file mode 100644 index 000000000000..96d3f498e8de --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/ScheduleNowTransformAction.java @@ -0,0 +1,156 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.action; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.support.tasks.BaseTasksRequest; +import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.transform.TransformField; +import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class ScheduleNowTransformAction extends ActionType { + + public static final ScheduleNowTransformAction INSTANCE = new ScheduleNowTransformAction(); + public static final String NAME = "cluster:admin/transform/schedule_now"; + + private ScheduleNowTransformAction() { + super(NAME, ScheduleNowTransformAction.Response::new); + } + + public static class Request extends BaseTasksRequest { + + private final String id; + + public Request(String id, TimeValue timeout) { + this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName()); + this.setTimeout(ExceptionsHelper.requireNonNull(timeout, TransformField.TIMEOUT.getPreferredName())); + } + + public Request(StreamInput in) throws IOException { + super(in); + this.id = in.readString(); + } + + public static Request fromXContent(final String id, final TimeValue timeout) { + return new Request(id, timeout); + } + + @Override + public ActionRequestValidationException validate() { + if (Metadata.ALL.equals(id)) { + ActionRequestValidationException e = new ActionRequestValidationException(); + e.addValidationError("_schedule_now API does not support _all wildcard"); + return e; + } + return null; + } + + public String getId() { + return id; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(id); + } + + @Override + public int hashCode() { + // the base class does not implement hashCode, therefore we need to hash timeout ourselves + return Objects.hash(getTimeout(), id); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Request other = (Request) obj; + + // the base class does not implement equals, therefore we need to check timeout ourselves + return this.id.equals(other.id) && getTimeout().equals(other.getTimeout()); + } + } + + public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { + + public static final Response TRUE = new Response(true); + + private final boolean acknowledged; + + public Response(StreamInput in) throws IOException { + super(in); + acknowledged = in.readBoolean(); + } + + public Response(boolean acknowledged) { + super(Collections.emptyList(), Collections.emptyList()); + this.acknowledged = acknowledged; + } + + public Response( + List taskFailures, + List nodeFailures, + boolean acknowledged + ) { + super(taskFailures, nodeFailures); + this.acknowledged = acknowledged; + } + + public boolean isAcknowledged() { + return acknowledged; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(acknowledged); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + toXContentCommon(builder, params); + builder.field("acknowledged", acknowledged); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ScheduleNowTransformAction.Response response = (ScheduleNowTransformAction.Response) o; + return acknowledged == response.acknowledged; + } + + @Override + public int hashCode() { + return Objects.hash(acknowledged); + } + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/ScheduleNowTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/ScheduleNowTransformActionRequestTests.java new file mode 100644 index 000000000000..e98e14e341cf --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/ScheduleNowTransformActionRequestTests.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.transform.action.ScheduleNowTransformAction.Request; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class ScheduleNowTransformActionRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Request createTestInstance() { + return new Request(randomAlphaOfLengthBetween(1, 20), TimeValue.parseTimeValue(randomTimeValue(), "timeout")); + } + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request mutateInstance(Request instance) { + String id = instance.getId(); + TimeValue timeout = instance.getTimeout(); + + switch (between(0, 1)) { + case 0 -> id += randomAlphaOfLengthBetween(1, 5); + case 1 -> timeout = new TimeValue(timeout.duration() + randomLongBetween(1, 5), timeout.timeUnit()); + default -> throw new AssertionError("Illegal randomization branch"); + } + + return new ScheduleNowTransformAction.Request(id, timeout); + } + + public void testValidationSuccess() { + Request request = new Request("id", TimeValue.ZERO); + assertThat(request.validate(), is(nullValue())); + } + + public void testValidationFailure() { + Request request = new Request("_all", TimeValue.ZERO); + ActionRequestValidationException e = request.validate(); + assertThat(e, is(notNullValue())); + assertThat(e.validationErrors(), contains("_schedule_now API does not support _all wildcard")); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/ScheduleNowTransformActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/ScheduleNowTransformActionResponseTests.java new file mode 100644 index 000000000000..07d36604c4a8 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/ScheduleNowTransformActionResponseTests.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.action; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.xpack.core.transform.action.ScheduleNowTransformAction.Response; + +public class ScheduleNowTransformActionResponseTests extends AbstractWireSerializingTransformTestCase { + + @Override + protected Response createTestInstance() { + return new Response(randomBoolean()); + } + + @Override + protected Reader instanceReader() { + return Response::new; + } + + @Override + protected Response mutateInstance(Response instance) { + boolean acknowledged = instance.isAcknowledged(); + return new Response(acknowledged == false); + } + + public void testResponseTrue() { + assertTrue(Response.TRUE.isAcknowledged()); + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index edb6d07c6326..68e89451c506 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -88,6 +88,7 @@ public class Constants { "cluster:admin/transform/reset", "cluster:admin/transform/start", "cluster:admin/transform/stop", + "cluster:admin/transform/schedule_now", "cluster:admin/transform/update", "cluster:admin/transform/upgrade", "cluster:admin/transform/validate", diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_start_stop.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_start_stop.yml index 8efeca6a47dc..aebc28ce0bfe 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_start_stop.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_start_stop.yml @@ -137,6 +137,39 @@ teardown: transform.start_transform: transform_id: "airline-transform-start-stop" +--- +"Test schedule_now on a stopped transform": + - do: + transform.schedule_now_transform: + transform_id: "airline-transform-start-stop" + - match: { acknowledged: true } + +--- +"Test schedule_now on an already started transform": + - do: + transform.start_transform: + transform_id: "airline-transform-start-stop" + - match: { acknowledged: true } + + - do: + transform.schedule_now_transform: + transform_id: "airline-transform-start-stop" + - match: { acknowledged: true } + +--- +"Test schedule_now all transforms": + - do: + catch: /_schedule_now API does not support _all wildcard/ + transform.schedule_now_transform: + transform_id: "_all" + +--- +"Test schedule_now missing transform": + - do: + catch: missing + transform.schedule_now_transform: + transform_id: "missing" + --- "Verify start transform creates destination index with appropriate mapping": - do: diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index 0bf5ec54b9df..5e741bd662e8 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -495,6 +495,16 @@ public abstract class TransformRestTestCase extends ESRestTestCase { assertThat(resetTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); } + protected void scheduleNowTransform(String transformId) throws IOException { + final Request scheduleNowTransformRequest = createRequestWithAuth( + "POST", + getTransformEndpoint() + transformId + "/_schedule_now", + null + ); + Map scheduleNowTransformResponse = entityAsMap(client().performRequest(scheduleNowTransformRequest)); + assertThat(scheduleNowTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + } + protected Request createRequestWithSecondaryAuth( final String method, final String endpoint, diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformScheduleNowIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformScheduleNowIT.java new file mode 100644 index 000000000000..2a5236445fa1 --- /dev/null +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformScheduleNowIT.java @@ -0,0 +1,202 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.integration; + +import org.apache.http.HttpHost; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.core.Strings; +import org.junit.Before; + +import java.io.IOException; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class TransformScheduleNowIT extends TransformRestTestCase { + + private static final String TEST_USER_NAME = "transform_user"; + private static final String TEST_ADMIN_USER_NAME_1 = "transform_admin_1"; + private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1 = basicAuthHeaderValue( + TEST_ADMIN_USER_NAME_1, + TEST_PASSWORD_SECURE_STRING + ); + private static final String DATA_ACCESS_ROLE = "test_data_access"; + + private static boolean indicesCreated = false; + + // preserve indices in order to reuse source indices in several test cases + @Override + protected boolean preserveIndicesUponCompletion() { + return true; + } + + @Override + protected boolean enableWarningsCheck() { + return false; + } + + @Override + protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException { + RestClientBuilder builder = RestClient.builder(hosts); + configureClient(builder, settings); + builder.setStrictDeprecationMode(false); + return builder.build(); + } + + @Before + public void createIndexes() throws IOException { + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME); + setupUser(TEST_USER_NAME, Arrays.asList("transform_user", DATA_ACCESS_ROLE)); + setupUser(TEST_ADMIN_USER_NAME_1, Arrays.asList("transform_admin", DATA_ACCESS_ROLE)); + + // it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack + if (indicesCreated) { + return; + } + + createReviewsIndex(); + indicesCreated = true; + } + + public void testScheduleNow() throws Exception { + String sourceIndex = REVIEWS_INDEX_NAME; + String transformId = "old_transform"; + String destIndex = transformId + "_idx"; + setupDataAccessRole(DATA_ACCESS_ROLE, sourceIndex, destIndex); + + final Request createTransformRequest = createRequestWithAuth( + "PUT", + getTransformEndpoint() + transformId, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1 + ); + String config = Strings.format(""" + { + "dest": { + "index": "%s" + }, + "source": { + "index": "%s" + }, + "pivot": { + "group_by": { + "reviewer": { + "terms": { + "field": "user_id" + } + } + }, + "aggregations": { + "avg_rating": { + "avg": { + "field": "stars" + } + } + } + }, + "sync": { + "time": { + "field": "timestamp", + "delay": "1s" + } + }, + "frequency": "1h" + }""", destIndex, sourceIndex); + createTransformRequest.setJsonEntity(config); + Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); + assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + // Verify that _schedule_now is a no-op on a new transform + scheduleNowTransform(transformId); + + // Start the transform, notice that frequency is set pretty high + startAndWaitForContinuousTransform(transformId, destIndex, null, null, 1L); + + String newUser = "user_666"; + verifyNumberOfSourceDocs(sourceIndex, newUser, 0); + verifyDestDoc(destIndex, newUser, 0, null); + + // Ingest a new document to *source* index + indexSourceDoc(sourceIndex, newUser, 7); + + // Wait a little bit to accommodate differences between "now" timestamp in the test and the current time in the server + Thread.sleep(5_000); + + // Verify the new data is in the source index but not yet in the destination index + verifyNumberOfSourceDocs(sourceIndex, newUser, 1); + verifyDestDoc(destIndex, newUser, 0, null); + + // Schedule now the transform to force processing the new data despite 1h-long interval + scheduleNowTransform(transformId); + waitForTransformCheckpoint(transformId, 2L); + + // Verify that the new data is available in the destination index after _schedule_now + verifyNumberOfSourceDocs(sourceIndex, newUser, 1); + verifyDestDoc(destIndex, newUser, 1, 7.0); + + // Ingest a new document to *source* index + indexSourceDoc(sourceIndex, newUser, 9); + + // Wait a little bit to accommodate differences between "now" timestamp in the test and the current time in the server + Thread.sleep(5_000); + + // Verify the new data is in the source index but not yet in the destination index + verifyNumberOfSourceDocs(sourceIndex, newUser, 2); + verifyDestDoc(destIndex, newUser, 1, 7.0); + + // Try scheduling now all the transforms at once using _all wildcard, it is *not* supported + ResponseException e = expectThrows(ResponseException.class, () -> scheduleNowTransform("_all")); + assertThat(e.getMessage(), containsString("_schedule_now API does not support _all wildcard")); + + // Schedule now the transform to force processing the new data despite 1h-long interval + scheduleNowTransform(transformId); + waitForTransformCheckpoint(transformId, 3L); + + // Verify that the new data is available in the destination index after _schedule_now + verifyNumberOfSourceDocs(sourceIndex, newUser, 2); + verifyDestDoc(destIndex, newUser, 1, 8.0); // 8.0 = (7.0 + 9.0) / 2 + + // Verify that _schedule_now works on a stopped transform + stopTransform(transformId, false); + scheduleNowTransform(transformId); + } + + private void indexSourceDoc(String sourceIndex, String user, int stars) throws IOException { + String doc = Strings.format(""" + {"user_id":"%s","stars":%s,"timestamp":%s} + """, user, stars, Instant.now().toEpochMilli()); + + Request indexRequest = new Request("POST", sourceIndex + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity(doc); + + Map indexResponse = entityAsMap(client().performRequest(indexRequest)); + assertThat(indexResponse.get("result"), equalTo("created")); + } + + private void verifyNumberOfSourceDocs(String sourceIndex, String user, int expectedDocCount) throws IOException { + Map searchResult = getAsMap(sourceIndex + "/_search?q=user_id:" + user); + assertEquals(expectedDocCount, XContentMapValues.extractValue("hits.total.value", searchResult)); + } + + private void verifyDestDoc(String destIndex, String user, int expectedDocCount, Double expectedAvgRating) throws IOException { + Map searchResult = getAsMap(destIndex + "/_search?q=reviewer:" + user); + assertEquals(expectedDocCount, XContentMapValues.extractValue("hits.total.value", searchResult)); + if (expectedAvgRating != null) { + assertEquals(List.of(expectedAvgRating), XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)); + } + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index c4a89b594e88..d5ac73afde1a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -68,6 +68,7 @@ import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction; import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction; import org.elasticsearch.xpack.core.transform.action.PutTransformAction; import org.elasticsearch.xpack.core.transform.action.ResetTransformAction; +import org.elasticsearch.xpack.core.transform.action.ScheduleNowTransformAction; import org.elasticsearch.xpack.core.transform.action.SetResetModeAction; import org.elasticsearch.xpack.core.transform.action.StartTransformAction; import org.elasticsearch.xpack.core.transform.action.StopTransformAction; @@ -83,6 +84,7 @@ import org.elasticsearch.xpack.transform.action.TransportGetTransformStatsAction import org.elasticsearch.xpack.transform.action.TransportPreviewTransformAction; import org.elasticsearch.xpack.transform.action.TransportPutTransformAction; import org.elasticsearch.xpack.transform.action.TransportResetTransformAction; +import org.elasticsearch.xpack.transform.action.TransportScheduleNowTransformAction; import org.elasticsearch.xpack.transform.action.TransportSetTransformResetModeAction; import org.elasticsearch.xpack.transform.action.TransportStartTransformAction; import org.elasticsearch.xpack.transform.action.TransportStopTransformAction; @@ -101,6 +103,7 @@ import org.elasticsearch.xpack.transform.rest.action.RestGetTransformStatsAction import org.elasticsearch.xpack.transform.rest.action.RestPreviewTransformAction; import org.elasticsearch.xpack.transform.rest.action.RestPutTransformAction; import org.elasticsearch.xpack.transform.rest.action.RestResetTransformAction; +import org.elasticsearch.xpack.transform.rest.action.RestScheduleNowTransformAction; import org.elasticsearch.xpack.transform.rest.action.RestStartTransformAction; import org.elasticsearch.xpack.transform.rest.action.RestStopTransformAction; import org.elasticsearch.xpack.transform.rest.action.RestUpdateTransformAction; @@ -190,7 +193,8 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa new RestUpdateTransformAction(), new RestCatTransformAction(), new RestUpgradeTransformsAction(), - new RestResetTransformAction() + new RestResetTransformAction(), + new RestScheduleNowTransformAction() ); } @@ -209,6 +213,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa new ActionHandler<>(SetResetModeAction.INSTANCE, TransportSetTransformResetModeAction.class), new ActionHandler<>(UpgradeTransformsAction.INSTANCE, TransportUpgradeTransformsAction.class), new ActionHandler<>(ResetTransformAction.INSTANCE, TransportResetTransformAction.class), + new ActionHandler<>(ScheduleNowTransformAction.INSTANCE, TransportScheduleNowTransformAction.class), // internal, no rest endpoint new ActionHandler<>(ValidateTransformAction.INSTANCE, TransportValidateTransformAction.class), diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportScheduleNowTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportScheduleNowTransformAction.java new file mode 100644 index 000000000000..98918bb7dcc8 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportScheduleNowTransformAction.java @@ -0,0 +1,179 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ActionNotFoundTransportException; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.security.SecurityContext; +import org.elasticsearch.xpack.core.transform.action.ScheduleNowTransformAction; +import org.elasticsearch.xpack.core.transform.action.ScheduleNowTransformAction.Request; +import org.elasticsearch.xpack.core.transform.action.ScheduleNowTransformAction.Response; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformState; +import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import org.elasticsearch.xpack.transform.TransformServices; +import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; +import org.elasticsearch.xpack.transform.transforms.TransformTask; +import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler; + +import java.util.List; + +import static org.elasticsearch.core.Strings.format; +import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable; + +public class TransportScheduleNowTransformAction extends TransportTasksAction { + + private static final Logger logger = LogManager.getLogger(TransportScheduleNowTransformAction.class); + private final TransformConfigManager transformConfigManager; + private final TransformScheduler transformScheduler; + private final SecurityContext securityContext; + + @Inject + public TransportScheduleNowTransformAction( + Settings settings, + TransportService transportService, + ThreadPool threadPool, + ActionFilters actionFilters, + ClusterService clusterService, + TransformServices transformServices + ) { + super( + ScheduleNowTransformAction.NAME, + clusterService, + transportService, + actionFilters, + Request::new, + Response::new, + Response::new, + ThreadPool.Names.SAME + ); + + this.transformConfigManager = transformServices.getConfigManager(); + this.transformScheduler = transformServices.getScheduler(); + this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) + ? new SecurityContext(settings, threadPool.getThreadContext()) + : null; + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + final ClusterState clusterState = clusterService.state(); + XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); + + useSecondaryAuthIfAvailable(securityContext, () -> { + ActionListener getTransformListener = ActionListener.wrap(unusedConfig -> { + PersistentTasksCustomMetadata.PersistentTask transformTask = TransformTask.getTransformTask( + request.getId(), + clusterState + ); + + // to send a request to schedule now the transform at runtime, several requirements must be met: + // - transform must be running, meaning a task exists + // - transform is not failed (stopped transforms do not have a task) + if (transformTask != null + && transformTask.isAssigned() + && transformTask.getState() instanceof TransformState + && ((TransformState) transformTask.getState()).getTaskState() != TransformTaskState.FAILED) { + + ActionListener taskScheduleNowListener = ActionListener.wrap(listener::onResponse, e -> { + // benign: A transform might have been stopped meanwhile, this is not a problem + if (e instanceof TransformTaskDisappearedDuringScheduleNowException) { + logger.debug( + () -> format("[%s] transform task disappeared during schedule_now, ignoring.", request.getId()), + e + ); + listener.onResponse(Response.TRUE); + return; + } + if (e instanceof TransformTaskScheduleNowException) { + logger.warn(() -> format("[%s] failed to schedule now the running transform.", request.getId()), e); + listener.onResponse(Response.TRUE); + return; + } + listener.onFailure(e); + }); + request.setNodes(transformTask.getExecutorNode()); + super.doExecute(task, request, taskScheduleNowListener); + } else { + listener.onResponse(Response.TRUE); + } + }, listener::onFailure); + + // <1> Get the config to verify it exists and is valid + transformConfigManager.getTransformConfiguration(request.getId(), getTransformListener); + }); + } + + @Override + protected void taskOperation(Task actionTask, Request request, TransformTask transformTask, ActionListener listener) { + transformScheduler.scheduleNow(request.getId()); + listener.onResponse(Response.TRUE); + } + + @Override + protected Response newResponse( + Request request, + List tasks, + List taskOperationFailures, + List failedNodeExceptions + ) { + if (tasks.isEmpty()) { + if (taskOperationFailures.isEmpty() == false) { + throw new TransformTaskScheduleNowException( + "Failed to schedule now the running transform due to task operation failure.", + taskOperationFailures.get(0).getCause() + ); + } else if (failedNodeExceptions.isEmpty() == false) { + FailedNodeException failedNodeException = failedNodeExceptions.get(0); + Throwable failedNodeExceptionCause = ExceptionsHelper.unwrapCause(failedNodeException.getCause()); + if (failedNodeExceptionCause instanceof ActionNotFoundTransportException) { + throw (ActionNotFoundTransportException) failedNodeExceptionCause; + } + throw new TransformTaskScheduleNowException( + "Failed to schedule now the running transform due to failed node exception.", + failedNodeExceptions.get(0) + ); + } else { + throw new TransformTaskDisappearedDuringScheduleNowException( + "Could not schedule now the running transform as it has been stopped." + ); + } + } + return tasks.get(0); + } + + private static class TransformTaskScheduleNowException extends ElasticsearchException { + TransformTaskScheduleNowException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } + } + + private static class TransformTaskDisappearedDuringScheduleNowException extends ElasticsearchException { + TransformTaskDisappearedDuringScheduleNowException(String msg) { + super(msg); + } + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestScheduleNowTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestScheduleNowTransformAction.java new file mode 100644 index 000000000000..1ae65747de80 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestScheduleNowTransformAction.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.rest.action; + +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.transform.TransformField; +import org.elasticsearch.xpack.core.transform.action.ScheduleNowTransformAction; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class RestScheduleNowTransformAction extends BaseRestHandler { + + @Override + public List routes() { + return List.of(new Route(POST, TransformField.REST_BASE_PATH_TRANSFORMS_BY_ID + "_schedule_now")); + } + + @Override + public String getName() { + return "transform_schedule_now_transform_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + String id = restRequest.param(TransformField.ID.getPreferredName()); + TimeValue timeout = restRequest.paramAsTime(TransformField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT); + + ScheduleNowTransformAction.Request request = ScheduleNowTransformAction.Request.fromXContent(id, timeout); + + return channel -> client.execute(ScheduleNowTransformAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java index 59b77af08aa7..aa7a5f9873dc 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java @@ -115,9 +115,12 @@ public final class TransformScheduler { } if (isTraceEnabled) { Instant processingFinished = clock.instant(); - logger.trace( - format("Processing scheduled tasks finished, took %dms", Duration.between(processingStarted, processingFinished).toMillis()) - ); + long tookMs = Duration.between(processingStarted, processingFinished).toMillis(); + if (taskWasProcessed) { + logger.trace(format("Processing one scheduled task finished, took %dms", tookMs)); + } else { + logger.trace(format("Looking for scheduled tasks to process finished, took %dms", tookMs)); + } } if (taskWasProcessed == false) { return; @@ -228,6 +231,30 @@ public final class TransformScheduler { ); } + /** + * Updates the transform task's next_scheduled_time so that it is set to now. + * Doing so may result in the task being processed earlier that it would normally (i.e.: according to its frequency) be. + * + * @param transformId id of the transform to schedule now + */ + public void scheduleNow(String transformId) { + logger.trace(() -> format("[%s] schedule_now transform", transformId)); + long currentTimeMillis = clock.millis(); + // Update the task's next_scheduled_time + scheduledTasks.update( + transformId, + task -> new TransformScheduledTask( + task.getTransformId(), + task.getFrequency(), + task.getLastTriggeredTimeMillis(), + task.getFailureCount(), + currentTimeMillis, // we schedule this task at current clock time so that it is processed ASAP + task.getListener() + ) + ); + processScheduledTasks(); + } + /** * De-registers the given transform by removing it from the queue. * diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/rest/action/RestScheduleNowTransformActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/rest/action/RestScheduleNowTransformActionTests.java new file mode 100644 index 000000000000..9aa792e0bf2e --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/rest/action/RestScheduleNowTransformActionTests.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.rest.action; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.FakeRestRequest; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xpack.core.transform.action.ScheduleNowTransformAction; +import org.junit.Before; + +import java.util.Map; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +public class RestScheduleNowTransformActionTests extends ESTestCase { + + private static final String ID = "id"; + private static final String TIMEOUT = "timeout"; + + private RestChannel channel; + private NodeClient client; + + @Before + public void initializeMocks() { + channel = mock(RestChannel.class); + client = mock(NodeClient.class); + } + + public void testHandleRequest() throws Exception { + RestScheduleNowTransformAction handler = new RestScheduleNowTransformAction(); + FakeRestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams(Map.of(ID, "my-id")).build(); + + handler.handleRequest(request, channel, client); + + ScheduleNowTransformAction.Request expectedActionRequest = new ScheduleNowTransformAction.Request( + "my-id", + TimeValue.timeValueSeconds(30) + ); + verify(client).execute(eq(ScheduleNowTransformAction.INSTANCE), eq(expectedActionRequest), any()); + verifyNoMoreInteractions(client); + } + + public void testHandleRequestWithTimeout() throws Exception { + RestScheduleNowTransformAction handler = new RestScheduleNowTransformAction(); + FakeRestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams(Map.of(ID, "my-id", TIMEOUT, "45s")) + .build(); + + handler.handleRequest(request, channel, client); + + ScheduleNowTransformAction.Request expectedActionRequest = new ScheduleNowTransformAction.Request( + "my-id", + TimeValue.timeValueSeconds(45) + ); + verify(client).execute(eq(ScheduleNowTransformAction.INSTANCE), eq(expectedActionRequest), any()); + verifyNoMoreInteractions(client); + } +} diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java index 96f497f04dc2..4fa5b5eac001 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java @@ -29,12 +29,15 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; @@ -171,6 +174,55 @@ public class TransformSchedulerTests extends ESTestCase { transformScheduler.stop(); } + public void testScheduleNow() { + String transformId = "test-schedule-now-with-fake-clock"; + TimeValue frequency = TimeValue.timeValueHours(1); + TransformTaskParams transformTaskParams = new TransformTaskParams(transformId, Version.CURRENT, frequency, false); + FakeClock clock = new FakeClock(Instant.ofEpochMilli(0)); + CopyOnWriteArrayList events = new CopyOnWriteArrayList<>(); + TransformScheduler.Listener listener = events::add; + + TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS); + transformScheduler.registerTransform(transformTaskParams, listener); + assertThat( + transformScheduler.getTransformScheduledTasks(), + contains(new TransformScheduledTask(transformId, frequency, 0L, 0, 60 * 60 * 1000, listener)) + ); + assertThat(events, hasSize(1)); + + // Advance time by 30 minutes (half of the configured transform frequency). + clock.advanceTimeBy(Duration.ofMillis(frequency.millis() / 2)); + assertThat( + transformScheduler.getTransformScheduledTasks(), + contains(new TransformScheduledTask(transformId, frequency, 0L, 0, 60 * 60 * 1000, listener)) + ); + assertThat(events, hasSize(1)); + + // Schedule the transform now even though it is half-way through between checkpoints. + transformScheduler.scheduleNow(transformId); + assertThat( + transformScheduler.getTransformScheduledTasks(), + contains(new TransformScheduledTask(transformId, frequency, 30 * 60 * 1000L, 0, 90 * 60 * 1000, listener)) + ); + assertThat(events, hasSize(2)); + + clock.advanceTimeBy(Duration.ofMinutes(1)); + transformScheduler.scheduleNow(transformId); + assertThat( + transformScheduler.getTransformScheduledTasks(), + contains(new TransformScheduledTask(transformId, frequency, 31 * 60 * 1000L, 0, 91 * 60 * 1000, listener)) + ); + assertThat(events, hasSize(3)); + assertThat(events.get(0), is(equalTo(new TransformScheduler.Event(transformId, 0, 0)))); + assertThat(events.get(1), is(equalTo(new TransformScheduler.Event(transformId, 30 * 60 * 1000, 30 * 60 * 1000)))); + assertThat(events.get(2), is(equalTo(new TransformScheduler.Event(transformId, 31 * 60 * 1000, 31 * 60 * 1000)))); + + transformScheduler.deregisterTransform(transformId); + assertThat(transformScheduler.getTransformScheduledTasks(), is(empty())); + + transformScheduler.stop(); + } + public void testConcurrentProcessing() throws Exception { String transformId = "test-with-fake-clock-concurrent"; int frequencySeconds = 5; @@ -252,7 +304,7 @@ public class TransformSchedulerTests extends ESTestCase { ); } - public void testWithSystemClock() throws Exception { + public void testSchedulingWithSystemClock() throws Exception { String transformId = "test-with-system-clock"; TimeValue frequency = TimeValue.timeValueSeconds(1); TransformTaskParams transformTaskParams = new TransformTaskParams(transformId, Version.CURRENT, frequency, false); @@ -277,6 +329,30 @@ public class TransformSchedulerTests extends ESTestCase { transformScheduler.stop(); } + public void testScheduleNowWithSystemClock() throws Exception { + String transformId = "test-schedule-now-with-system-clock"; + TimeValue frequency = TimeValue.timeValueHours(1); // Very long pause between checkpoints + TransformTaskParams transformTaskParams = new TransformTaskParams(transformId, Version.CURRENT, frequency, false); + Clock clock = Clock.systemUTC(); + CopyOnWriteArrayList events = new CopyOnWriteArrayList<>(); + + TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS); + transformScheduler.start(); + transformScheduler.registerTransform(transformTaskParams, events::add); + assertThat(events, hasSize(1)); + + Thread.sleep(5 * 1000L); + transformScheduler.scheduleNow(transformId); + + assertThat(events, hasSize(2)); + assertThat(events.get(0).transformId(), is(equalTo(transformId))); + assertThat(events.get(1).transformId(), is(equalTo(transformId))); + assertThat(events.get(1).scheduledTime() - events.get(0).triggeredTime(), is(allOf(greaterThan(4 * 1000L), lessThan(6 * 1000L)))); + + transformScheduler.deregisterTransform(transformId); + transformScheduler.stop(); + } + public void testScheduledTransformTaskEqualsAndHashCode() { Supplier listenerSupplier = () -> new TransformScheduler.Listener() { @Override