From 7ed771e3f942b3ce77d4a30cc3aa7f9c59a27f88 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 7 Feb 2025 11:08:34 +1100 Subject: [PATCH] Add separate ClusterPersistentTasksCustomMetadata (MP-1945) This PR adds ClusterPersistentTasksCustomMetadata by splitting from the existing PersistentTasksCustomMetadata. The latter will be used for project scoped persistent tasks. It also updates tasks executors to declare its scope. The new task type is not used anywhere yet and has no wire BWC handling. Both will be addressed in follow-ups. Split from: MP-1938 --- .../elasticsearch/cluster/ClusterModule.java | 18 + .../ClusterPersistentTasksCustomMetadata.java | 174 ++++++++ .../persistent/PersistentTasks.java | 375 ++++++++++++++++++ .../PersistentTasksCustomMetadata.java | 328 ++------------- .../persistent/PersistentTasksExecutor.java | 15 + .../PersistentTasksExecutorRegistry.java | 11 + ...asePersistentTasksCustomMetadataTests.java | 274 +++++++++++++ ...terPersistentTasksCustomMetadataTests.java | 82 ++++ .../PersistentTasksCustomMetadataTests.java | 231 +---------- .../PersistentTasksExecutorResponseTests.java | 2 +- 10 files changed, 986 insertions(+), 524 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/persistent/ClusterPersistentTasksCustomMetadata.java create mode 100644 server/src/main/java/org/elasticsearch/persistent/PersistentTasks.java create mode 100644 server/src/test/java/org/elasticsearch/persistent/BasePersistentTasksCustomMetadataTests.java create mode 100644 server/src/test/java/org/elasticsearch/persistent/ClusterPersistentTasksCustomMetadataTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index cfb2698365d7..b001d188585a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -76,6 +76,7 @@ import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.ingest.IngestMetadata; import org.elasticsearch.injection.guice.AbstractModule; +import org.elasticsearch.persistent.ClusterPersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksNodeService; import org.elasticsearch.plugins.ClusterPlugin; @@ -226,12 +227,20 @@ public class ClusterModule extends AbstractModule { registerProjectCustom(entries, IngestMetadata.TYPE, IngestMetadata::new, IngestMetadata::readDiffFrom); registerProjectCustom(entries, ScriptMetadata.TYPE, ScriptMetadata::new, ScriptMetadata::readDiffFrom); registerProjectCustom(entries, IndexGraveyard.TYPE, IndexGraveyard::new, IndexGraveyard::readDiffFrom); + // Project scoped persistent tasks registerProjectCustom( entries, PersistentTasksCustomMetadata.TYPE, PersistentTasksCustomMetadata::new, PersistentTasksCustomMetadata::readDiffFrom ); + // Cluster scoped persistent tasks + registerMetadataCustom( + entries, + ClusterPersistentTasksCustomMetadata.TYPE, + ClusterPersistentTasksCustomMetadata::new, + ClusterPersistentTasksCustomMetadata::readDiffFrom + ); registerProjectCustom( entries, ComponentTemplateMetadata.TYPE, @@ -283,6 +292,7 @@ public class ClusterModule extends AbstractModule { entries.add( new NamedXContentRegistry.Entry(Metadata.ProjectCustom.class, new ParseField(IndexGraveyard.TYPE), IndexGraveyard::fromXContent) ); + // Project scoped persistent tasks entries.add( new NamedXContentRegistry.Entry( Metadata.ProjectCustom.class, @@ -290,6 +300,14 @@ public class ClusterModule extends AbstractModule { PersistentTasksCustomMetadata::fromXContent ) ); + // Cluster scoped persistent tasks + entries.add( + new NamedXContentRegistry.Entry( + Metadata.ClusterCustom.class, + new ParseField(ClusterPersistentTasksCustomMetadata.TYPE), + ClusterPersistentTasksCustomMetadata::fromXContent + ) + ); entries.add( new NamedXContentRegistry.Entry( Metadata.ProjectCustom.class, diff --git a/server/src/main/java/org/elasticsearch/persistent/ClusterPersistentTasksCustomMetadata.java b/server/src/main/java/org/elasticsearch/persistent/ClusterPersistentTasksCustomMetadata.java new file mode 100644 index 000000000000..e343bcf27a3e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/persistent/ClusterPersistentTasksCustomMetadata.java @@ -0,0 +1,174 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.persistent; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.cluster.AbstractNamedDiffable; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Collections; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; + +import static org.elasticsearch.cluster.metadata.Metadata.ALL_CONTEXTS; +import static org.elasticsearch.persistent.PersistentTasks.Parsers.PERSISTENT_TASK_PARSER; + +/** + * A cluster state record that contains a list of all running persistent tasks for the cluster itself + */ +public final class ClusterPersistentTasksCustomMetadata extends AbstractNamedDiffable + implements + Metadata.ClusterCustom, + PersistentTasks { + + public static final String TYPE = "cluster_persistent_tasks"; + + static final ObjectParser PERSISTENT_TASKS_PARSER = new ObjectParser<>(TYPE, Builder::new); + + static { + // Tasks parser initialization + PERSISTENT_TASKS_PARSER.declareLong(Builder::setLastAllocationId, new ParseField("last_allocation_id")); + PERSISTENT_TASKS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_PARSER, new ParseField("tasks")); + } + + @Deprecated(forRemoval = true) + public static ClusterPersistentTasksCustomMetadata getPersistentTasksCustomMetadata(ClusterState clusterState) { + return get(clusterState.metadata()); + } + + public static ClusterPersistentTasksCustomMetadata get(Metadata metadata) { + return metadata.custom(TYPE); + } + + // TODO: Implement custom Diff for tasks + private final Map> tasks; + private final long lastAllocationId; + + public ClusterPersistentTasksCustomMetadata(long lastAllocationId, Map> tasks) { + this.lastAllocationId = lastAllocationId; + this.tasks = tasks; + } + + public ClusterPersistentTasksCustomMetadata(StreamInput in) throws IOException { + lastAllocationId = in.readLong(); + tasks = in.readMap(PersistentTasksCustomMetadata.PersistentTask::new); + } + + public static ClusterPersistentTasksCustomMetadata fromXContent(XContentParser parser) { + return PERSISTENT_TASKS_PARSER.apply(parser, null).build(); + } + + @SuppressWarnings("unchecked") + public static PersistentTasksCustomMetadata.PersistentTask getTaskWithId( + ClusterState clusterState, + String taskId + ) { + ClusterPersistentTasksCustomMetadata tasks = get(clusterState.metadata()); + if (tasks != null) { + return (PersistentTasksCustomMetadata.PersistentTask) tasks.getTask(taskId); + } + return null; + } + + @Override + public long getLastAllocationId() { + return lastAllocationId; + } + + @Override + public Map> taskMap() { + return this.tasks; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClusterPersistentTasksCustomMetadata that = (ClusterPersistentTasksCustomMetadata) o; + return lastAllocationId == that.lastAllocationId && Objects.equals(tasks, that.tasks); + } + + @Override + public int hashCode() { + return Objects.hash(tasks, lastAllocationId); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersions.MINIMUM_COMPATIBLE; + } + + @Override + public EnumSet context() { + return ALL_CONTEXTS; + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + doWriteTo(out); + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(Metadata.ClusterCustom.class, TYPE, in); + } + + @Override + public Iterator toXContentChunked(ToXContent.Params ignored) { + return doToXContentChunked(); + } + + public static Builder builder() { + return new Builder(); + } + + public static Builder builder(ClusterPersistentTasksCustomMetadata tasks) { + return new Builder(tasks); + } + + public static class Builder extends PersistentTasks.Builder { + + protected Builder() { + super(); + } + + protected Builder(PersistentTasks tasksInProgress) { + super(tasksInProgress); + } + + @Override + public ClusterPersistentTasksCustomMetadata build() { + return new ClusterPersistentTasksCustomMetadata(getLastAllocationId(), Collections.unmodifiableMap(getCurrentTasks())); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasks.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasks.java new file mode 100644 index 000000000000..0eaed2db9fec --- /dev/null +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasks.java @@ -0,0 +1,375 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.persistent; + +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.VersionedNamedWriteable; +import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; + +/** + * The common data structure for persistent tasks, both cluster-scoped and project-scoped. + * It is meant to be used in places where both type of tasks are processed similarly to achieve better code reuse. + */ +public interface PersistentTasks { + + String API_CONTEXT = Metadata.XContentContext.API.toString(); + Assignment LOST_NODE_ASSIGNMENT = new Assignment(null, "awaiting reassignment after node loss"); + Assignment INITIAL_ASSIGNMENT = new Assignment(null, "waiting for initial assignment"); + + /** + * @return The last allocation id for the tasks. + */ + long getLastAllocationId(); + + /** + * @return The map of actual tasks keyed by task ID. + */ + Map> taskMap(); + + /** + * @return A collection of all tasks + */ + default Collection> tasks() { + return taskMap().values(); + } + + /** + * @param id The task ID + * @return The task with the specified task ID + */ + default PersistentTask getTask(String id) { + return taskMap().get(id); + } + + /** + * @param taskName The task name, see also {@link PersistentTasksExecutor#getTaskName()} + * @param predicate The filter for the matching tasks + * @return A collection of tasks matching the specified taskName and predicate + */ + default Collection> findTasks(String taskName, Predicate> predicate) { + return tasks().stream().filter(p -> taskName.equals(p.getTaskName())).filter(predicate).toList(); + } + + /** + * @param nodeId The node ID where the task runs + * @param taskName The task name, see also {@link PersistentTasksExecutor#getTaskName()} + * @return The number of tasks of the taskName currently running on the specified nodeId + */ + default long getNumberOfTasksOnNode(String nodeId, String taskName) { + return tasks().stream() + .filter(task -> taskName.equals(task.getTaskName()) && nodeId.equals(task.getAssignment().getExecutorNode())) + .count(); + } + + default void doWriteTo(StreamOutput out) throws IOException { + out.writeLong(getLastAllocationId()); + Map> filteredTasks = tasks().stream() + .filter(t -> VersionedNamedWriteable.shouldSerialize(out, t.getParams())) + .collect(Collectors.toMap(PersistentTask::getId, Function.identity())); + out.writeMap(filteredTasks, StreamOutput::writeWriteable); + } + + default Iterator doToXContentChunked() { + return Iterators.concat( + Iterators.single((builder, params) -> builder.field("last_allocation_id", getLastAllocationId())), + ChunkedToXContentHelper.array("tasks", tasks().iterator()) + ); + } + + class Parsers { + public static final ConstructingObjectParser ASSIGNMENT_PARSER = new ConstructingObjectParser<>( + "assignment", + objects -> new Assignment((String) objects[0], (String) objects[1]) + ); + static final ObjectParser, Void> PERSISTENT_TASK_PARSER = new ObjectParser<>( + "tasks", + TaskBuilder::new + ); + private static final ObjectParser.NamedObjectParser, Void> TASK_DESCRIPTION_PARSER; + + static { + // Task description parser initialization + ObjectParser, String> parser = new ObjectParser<>("named"); + parser.declareObject( + TaskDescriptionBuilder::setParams, + (p, c) -> p.namedObject(PersistentTaskParams.class, c, null), + new ParseField("params") + ); + parser.declareObject( + TaskDescriptionBuilder::setState, + (p, c) -> p.namedObject(PersistentTaskState.class, c, null), + new ParseField("state", "status") + ); + TASK_DESCRIPTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new TaskDescriptionBuilder<>(name), name); + + // Assignment parser + ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("executor_node")); + ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("explanation")); + + // Task parser initialization + PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setId, new ParseField("id")); + PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setTaskName, new ParseField("name")); + PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id")); + + PERSISTENT_TASK_PARSER.declareNamedObjects( + (TaskBuilder taskBuilder, List> objects) -> { + if (objects.size() != 1) { + throw new IllegalArgumentException("only one task description per task is allowed"); + } + TaskDescriptionBuilder builder = objects.get(0); + taskBuilder.setTaskName(builder.taskName); + taskBuilder.setParams(builder.params); + taskBuilder.setState(builder.state); + }, + TASK_DESCRIPTION_PARSER, + new ParseField("task") + ); + PERSISTENT_TASK_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment")); + PERSISTENT_TASK_PARSER.declareLong( + TaskBuilder::setAllocationIdOnLastStatusUpdate, + new ParseField("allocation_id_on_last_status_update") + ); + } + } + + /** + * Private builder used in XContent parser to build task-specific portion (params and state) + */ + class TaskDescriptionBuilder { + + private final String taskName; + private Params params; + private PersistentTaskState state; + + private TaskDescriptionBuilder(String taskName) { + this.taskName = taskName; + } + + private TaskDescriptionBuilder setParams(Params params) { + this.params = params; + return this; + } + + private TaskDescriptionBuilder setState(PersistentTaskState state) { + this.state = state; + return this; + } + } + + class TaskBuilder { + private String id; + private long allocationId; + private String taskName; + private Params params; + private PersistentTaskState state; + private Assignment assignment = INITIAL_ASSIGNMENT; + private Long allocationIdOnLastStatusUpdate; + + public TaskBuilder setId(String id) { + this.id = id; + return this; + } + + public TaskBuilder setAllocationId(long allocationId) { + this.allocationId = allocationId; + return this; + } + + public TaskBuilder setTaskName(String taskName) { + this.taskName = taskName; + return this; + } + + public TaskBuilder setParams(Params params) { + this.params = params; + return this; + } + + public TaskBuilder setState(PersistentTaskState state) { + this.state = state; + return this; + } + + public TaskBuilder setAssignment(Assignment assignment) { + this.assignment = assignment; + return this; + } + + public TaskBuilder setAllocationIdOnLastStatusUpdate(Long allocationIdOnLastStatusUpdate) { + this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate; + return this; + } + + public PersistentTask build() { + return new PersistentTask<>(id, allocationId, taskName, params, state, assignment, allocationIdOnLastStatusUpdate); + } + } + + abstract class Builder> { + private final Map> tasks = new HashMap<>(); + private long lastAllocationId; + private boolean changed; + + protected Builder() {} + + protected Builder(PersistentTasks tasksInProgress) { + if (tasksInProgress != null) { + tasks.putAll(tasksInProgress.taskMap()); + lastAllocationId = tasksInProgress.getLastAllocationId(); + } else { + lastAllocationId = 0; + } + } + + public long getLastAllocationId() { + return lastAllocationId; + } + + @SuppressWarnings("unchecked") + protected T setLastAllocationId(long currentId) { + this.lastAllocationId = currentId; + return (T) this; + } + + @SuppressWarnings("unchecked") + protected T setTasks(List> tasks) { + for (TaskBuilder builder : tasks) { + PersistentTask task = builder.build(); + this.tasks.put(task.getId(), task); + } + return (T) this; + } + + private long getNextAllocationId() { + lastAllocationId++; + return lastAllocationId; + } + + /** + * Adds a new task to the builder + *

+ * After the task is added its id can be found by calling {{@link #getLastAllocationId()}} method. + */ + @SuppressWarnings("unchecked") + public T addTask(String taskId, String taskName, Params params, Assignment assignment) { + changed = true; + PersistentTask previousTask = tasks.put( + taskId, + new PersistentTask<>(taskId, taskName, params, getNextAllocationId(), assignment) + ); + if (previousTask != null) { + throw new ResourceAlreadyExistsException("Trying to override task with id {" + taskId + "}"); + } + return (T) this; + } + + /** + * Reassigns the task to another node + */ + @SuppressWarnings("unchecked") + public T reassignTask(String taskId, Assignment assignment) { + PersistentTask taskInProgress = tasks.get(taskId); + if (taskInProgress != null) { + changed = true; + tasks.put(taskId, new PersistentTask<>(taskInProgress, getNextAllocationId(), assignment)); + } else { + throw new ResourceNotFoundException("cannot reassign task with id {" + taskId + "}, the task no longer exists"); + } + return (T) this; + } + + /** + * Updates the task state + */ + @SuppressWarnings("unchecked") + public T updateTaskState(final String taskId, final PersistentTaskState taskState) { + PersistentTask taskInProgress = tasks.get(taskId); + if (taskInProgress != null) { + changed = true; + tasks.put(taskId, new PersistentTask<>(taskInProgress, taskState)); + } else { + throw new ResourceNotFoundException("cannot update task with id {" + taskId + "}, the task no longer exists"); + } + return (T) this; + } + + /** + * Removes the task + */ + @SuppressWarnings("unchecked") + public T removeTask(String taskId) { + if (tasks.remove(taskId) != null) { + changed = true; + } else { + throw new ResourceNotFoundException("cannot remove task with id {" + taskId + "}, the task no longer exists"); + } + return (T) this; + } + + /** + * Checks if the task is currently present in the list + */ + public boolean hasTask(String taskId) { + return tasks.containsKey(taskId); + } + + /** + * Checks if the task is currently present in the list and has the right allocation id + */ + public boolean hasTask(String taskId, long allocationId) { + PersistentTask taskInProgress = tasks.get(taskId); + if (taskInProgress != null) { + return taskInProgress.getAllocationId() == allocationId; + } + return false; + } + + Map> getCurrentTasks() { + return tasks; + } + + Set getCurrentTaskIds() { + return tasks.keySet(); + } + + /** + * Returns true if any the task list was changed since the builder was created + */ + public boolean isChanged() { + return changed; + } + + public abstract PersistentTasks build(); + } +} diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetadata.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetadata.java index 93c03c940d4d..8a0677a4fc42 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetadata.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetadata.java @@ -8,8 +8,6 @@ */ package org.elasticsearch.persistent; -import org.elasticsearch.ResourceAlreadyExistsException; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.cluster.AbstractNamedDiffable; @@ -19,16 +17,12 @@ import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.VersionedNamedWriteable; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Nullable; -import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ObjectParser; -import org.elasticsearch.xcontent.ObjectParser.NamedObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.ToXContentObject; @@ -36,30 +30,25 @@ import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; -import java.util.Collection; import java.util.Collections; import java.util.EnumSet; -import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.Metadata.ALL_CONTEXTS; -import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.persistent.PersistentTasks.Parsers.PERSISTENT_TASK_PARSER; /** - * A cluster state record that contains a list of all running persistent tasks + * A cluster state record that contains a list of all running persistent tasks from a project */ -public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable implements Metadata.ProjectCustom { +@FixForMultiProject(description = "Consider renaming it to ProjectPersistentTasksCustomMetadata") +public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable + implements + Metadata.ProjectCustom, + PersistentTasks { public static final String TYPE = "persistent_tasks"; - private static final String API_CONTEXT = Metadata.XContentContext.API.toString(); - static final Assignment LOST_NODE_ASSIGNMENT = new Assignment(null, "awaiting reassignment after node loss"); // TODO: Implement custom Diff for tasks private final Map> tasks; @@ -72,64 +61,10 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable PERSISTENT_TASKS_PARSER = new ObjectParser<>(TYPE, Builder::new); - private static final ObjectParser, Void> PERSISTENT_TASK_PARSER = new ObjectParser<>( - "tasks", - TaskBuilder::new - ); - - public static final ConstructingObjectParser ASSIGNMENT_PARSER = new ConstructingObjectParser<>( - "assignment", - objects -> new Assignment((String) objects[0], (String) objects[1]) - ); - - private static final NamedObjectParser, Void> TASK_DESCRIPTION_PARSER; - static { // Tasks parser initialization PERSISTENT_TASKS_PARSER.declareLong(Builder::setLastAllocationId, new ParseField("last_allocation_id")); PERSISTENT_TASKS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_PARSER, new ParseField("tasks")); - - // Task description parser initialization - ObjectParser, String> parser = new ObjectParser<>("named"); - parser.declareObject( - TaskDescriptionBuilder::setParams, - (p, c) -> p.namedObject(PersistentTaskParams.class, c, null), - new ParseField("params") - ); - parser.declareObject( - TaskDescriptionBuilder::setState, - (p, c) -> p.namedObject(PersistentTaskState.class, c, null), - new ParseField("state", "status") - ); - TASK_DESCRIPTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new TaskDescriptionBuilder<>(name), name); - - // Assignment parser - ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("executor_node")); - ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("explanation")); - - // Task parser initialization - PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setId, new ParseField("id")); - PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setTaskName, new ParseField("name")); - PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id")); - - PERSISTENT_TASK_PARSER.declareNamedObjects( - (TaskBuilder taskBuilder, List> objects) -> { - if (objects.size() != 1) { - throw new IllegalArgumentException("only one task description per task is allowed"); - } - TaskDescriptionBuilder builder = objects.get(0); - taskBuilder.setTaskName(builder.taskName); - taskBuilder.setParams(builder.params); - taskBuilder.setState(builder.state); - }, - TASK_DESCRIPTION_PARSER, - new ParseField("task") - ); - PERSISTENT_TASK_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment")); - PERSISTENT_TASK_PARSER.declareLong( - TaskBuilder::setAllocationIdOnLastStatusUpdate, - new ParseField("allocation_id_on_last_status_update") - ); } @Deprecated(forRemoval = true) @@ -138,49 +73,19 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable { - - private final String taskName; - private Params params; - private PersistentTaskState state; - - private TaskDescriptionBuilder(String taskName) { - this.taskName = taskName; - } - - private TaskDescriptionBuilder setParams(Params params) { - this.params = params; - return this; - } - - private TaskDescriptionBuilder setState(PersistentTaskState state) { - this.state = state; - return this; - } - } - - public Collection> tasks() { - return this.tasks.values(); + @Override + public long getLastAllocationId() { + return lastAllocationId; } + @Override public Map> taskMap() { return this.tasks; } - public PersistentTask getTask(String id) { - return this.tasks.get(id); - } - - public Collection> findTasks(String taskName, Predicate> predicate) { - return this.tasks().stream().filter(p -> taskName.equals(p.getTaskName())).filter(predicate).toList(); - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -199,13 +104,6 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable taskName.equals(task.taskName) && nodeId.equals(task.assignment.executorNode)) - .count(); - } - @Override public TransportVersion getMinimalSupportedVersion() { return TransportVersions.MINIMUM_COMPATIBLE; @@ -222,7 +120,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable PersistentTask getTaskWithId(ClusterState clusterState, String taskId) { - PersistentTasksCustomMetadata tasks = clusterState.metadata().getProject().custom(PersistentTasksCustomMetadata.TYPE); + PersistentTasksCustomMetadata tasks = get(clusterState.metadata().getProject()); if (tasks != null) { return (PersistentTask) tasks.getTask(taskId); } @@ -233,7 +131,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable task : tasks.tasks()) { if (task.getAssignment().getExecutorNode() != null && projectState.cluster().nodes().nodeExists(task.getAssignment().getExecutorNode()) == false) { @@ -282,6 +180,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable implements Writeable, ToXContentObject { private final String id; @@ -352,7 +250,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable { - private String id; - private long allocationId; - private String taskName; - private Params params; - private PersistentTaskState state; - private Assignment assignment = INITIAL_ASSIGNMENT; - private Long allocationIdOnLastStatusUpdate; - - public TaskBuilder setId(String id) { - this.id = id; - return this; - } - - public TaskBuilder setAllocationId(long allocationId) { - this.allocationId = allocationId; - return this; - } - - public TaskBuilder setTaskName(String taskName) { - this.taskName = taskName; - return this; - } - - public TaskBuilder setParams(Params params) { - this.params = params; - return this; - } - - public TaskBuilder setState(PersistentTaskState state) { - this.state = state; - return this; - } - - public TaskBuilder setAssignment(Assignment assignment) { - this.assignment = assignment; - return this; - } - - public TaskBuilder setAllocationIdOnLastStatusUpdate(Long allocationIdOnLastStatusUpdate) { - this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate; - return this; - } - - public PersistentTask build() { - return new PersistentTask<>(id, allocationId, taskName, params, state, assignment, allocationIdOnLastStatusUpdate); - } - } - @Override public String getWriteableName() { return TYPE; @@ -566,12 +415,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable> filteredTasks = tasks.values() - .stream() - .filter(t -> VersionedNamedWriteable.shouldSerialize(out, t.getParams())) - .collect(Collectors.toMap(PersistentTask::getId, Function.identity())); - out.writeMap(filteredTasks, StreamOutput::writeWriteable); + doWriteTo(out); } public static NamedDiff readDiffFrom(StreamInput in) throws IOException { @@ -580,10 +424,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable toXContentChunked(ToXContent.Params ignored) { - return Iterators.concat( - Iterators.single((builder, params) -> builder.field("last_allocation_id", lastAllocationId)), - ChunkedToXContentHelper.array("tasks", tasks.values().iterator()) - ); + return doToXContentChunked(); } public static Builder builder() { @@ -594,132 +435,19 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable> tasks = new HashMap<>(); - private long lastAllocationId; - private boolean changed; + public static class Builder extends PersistentTasks.Builder { - private Builder() {} - - private Builder(PersistentTasksCustomMetadata tasksInProgress) { - if (tasksInProgress != null) { - tasks.putAll(tasksInProgress.tasks); - lastAllocationId = tasksInProgress.lastAllocationId; - } else { - lastAllocationId = 0; - } + protected Builder() { + super(); } - public long getLastAllocationId() { - return lastAllocationId; - } - - private Builder setLastAllocationId(long currentId) { - this.lastAllocationId = currentId; - return this; - } - - private Builder setTasks(List> tasks) { - for (TaskBuilder builder : tasks) { - PersistentTask task = builder.build(); - this.tasks.put(task.getId(), task); - } - return this; - } - - private long getNextAllocationId() { - lastAllocationId++; - return lastAllocationId; - } - - /** - * Adds a new task to the builder - *

- * After the task is added its id can be found by calling {{@link #getLastAllocationId()}} method. - */ - public Builder addTask(String taskId, String taskName, Params params, Assignment assignment) { - changed = true; - PersistentTask previousTask = tasks.put( - taskId, - new PersistentTask<>(taskId, taskName, params, getNextAllocationId(), assignment) - ); - if (previousTask != null) { - throw new ResourceAlreadyExistsException("Trying to override task with id {" + taskId + "}"); - } - return this; - } - - /** - * Reassigns the task to another node - */ - public Builder reassignTask(String taskId, Assignment assignment) { - PersistentTask taskInProgress = tasks.get(taskId); - if (taskInProgress != null) { - changed = true; - tasks.put(taskId, new PersistentTask<>(taskInProgress, getNextAllocationId(), assignment)); - } else { - throw new ResourceNotFoundException("cannot reassign task with id {" + taskId + "}, the task no longer exists"); - } - return this; - } - - /** - * Updates the task state - */ - public Builder updateTaskState(final String taskId, final PersistentTaskState taskState) { - PersistentTask taskInProgress = tasks.get(taskId); - if (taskInProgress != null) { - changed = true; - tasks.put(taskId, new PersistentTask<>(taskInProgress, taskState)); - } else { - throw new ResourceNotFoundException("cannot update task with id {" + taskId + "}, the task no longer exists"); - } - return this; - } - - /** - * Removes the task - */ - public Builder removeTask(String taskId) { - if (tasks.remove(taskId) != null) { - changed = true; - } else { - throw new ResourceNotFoundException("cannot remove task with id {" + taskId + "}, the task no longer exists"); - } - return this; - } - - /** - * Checks if the task is currently present in the list - */ - public boolean hasTask(String taskId) { - return tasks.containsKey(taskId); - } - - /** - * Checks if the task is currently present in the list and has the right allocation id - */ - public boolean hasTask(String taskId, long allocationId) { - PersistentTask taskInProgress = tasks.get(taskId); - if (taskInProgress != null) { - return taskInProgress.getAllocationId() == allocationId; - } - return false; - } - - Set getCurrentTaskIds() { - return tasks.keySet(); - } - - /** - * Returns true if any the task list was changed since the builder was created - */ - public boolean isChanged() { - return changed; + protected Builder(PersistentTasks tasksInProgress) { + super(tasksInProgress); } + @Override public PersistentTasksCustomMetadata build() { - return new PersistentTasksCustomMetadata(lastAllocationId, Collections.unmodifiableMap(tasks)); + return new PersistentTasksCustomMetadata(getLastAllocationId(), Collections.unmodifiableMap(getCurrentTasks())); } } } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java index 00d8847c346e..5c2ff6bf09a4 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java @@ -39,6 +39,21 @@ public abstract class PersistentTasksExecutor CLUSTER_SCOPED_TASKS = ConcurrentCollections.newConcurrentSet(); + private final Map> taskExecutors; public PersistentTasksExecutorRegistry(Collection> taskExecutors) { @@ -36,6 +40,9 @@ public class PersistentTasksExecutorRegistry { assert false : message; throw new IllegalStateException(message); } + if (executor.scope() == PersistentTasksExecutor.Scope.CLUSTER) { + CLUSTER_SCOPED_TASKS.add(executor.getTaskName()); + } } this.taskExecutors = Collections.unmodifiableMap(map); } @@ -48,4 +55,8 @@ public class PersistentTasksExecutorRegistry { } return executor; } + + public static boolean isClusterScopedTask(String taskName) { + return CLUSTER_SCOPED_TASKS.contains(taskName); + } } diff --git a/server/src/test/java/org/elasticsearch/persistent/BasePersistentTasksCustomMetadataTests.java b/server/src/test/java/org/elasticsearch/persistent/BasePersistentTasksCustomMetadataTests.java new file mode 100644 index 000000000000..4818716b4062 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/persistent/BasePersistentTasksCustomMetadataTests.java @@ -0,0 +1,274 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.persistent; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.test.ChunkedToXContentDiffableSerializationTestCase; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Optional; + +import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_GATEWAY; +import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_SNAPSHOT; +import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND; +import static org.elasticsearch.test.TransportVersionUtils.getFirstVersion; +import static org.elasticsearch.test.TransportVersionUtils.getNextVersion; +import static org.elasticsearch.test.TransportVersionUtils.getPreviousVersion; +import static org.elasticsearch.test.TransportVersionUtils.randomVersionBetween; +import static org.hamcrest.Matchers.contains; + +public abstract class BasePersistentTasksCustomMetadataTests> extends + ChunkedToXContentDiffableSerializationTestCase { + + @SuppressWarnings("unchecked") + @Override + protected T createTestInstance() { + int numberOfTasks = randomInt(10); + PersistentTasks.Builder tasks = builder(); + for (int i = 0; i < numberOfTasks; i++) { + String taskId = UUIDs.base64UUID(); + tasks.addTask( + taskId, + TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, + new TestPersistentTasksPlugin.TestParams(randomAlphaOfLength(10)), + randomAssignment() + ); + if (randomBoolean()) { + // From time to time update status + tasks.updateTaskState(taskId, new TestPersistentTasksPlugin.State(randomAlphaOfLength(10))); + } + } + return (T) tasks.build(); + } + + @Override + protected T mutateInstance(T instance) throws IOException { + return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929 + } + + @SuppressWarnings("unchecked") + @Override + protected T makeTestChanges(T testInstance) { + final PersistentTasks.Builder builder = builder(testInstance); + switch (randomInt(3)) { + case 0: + addRandomTask(builder); + break; + case 1: + if (builder.getCurrentTaskIds().isEmpty()) { + addRandomTask(builder); + } else { + builder.reassignTask(pickRandomTask(builder), randomAssignment()); + } + break; + case 2: + if (builder.getCurrentTaskIds().isEmpty()) { + addRandomTask(builder); + } else { + builder.updateTaskState( + pickRandomTask(builder), + randomBoolean() ? new TestPersistentTasksPlugin.State(randomAlphaOfLength(10)) : null + ); + } + break; + case 3: + if (builder.getCurrentTaskIds().isEmpty()) { + addRandomTask(builder); + } else { + builder.removeTask(pickRandomTask(builder)); + } + break; + } + return (T) builder.build(); + } + + @SuppressWarnings("unchecked") + public void testSerializationContext() throws Exception { + T testInstance = createTestInstance(); + for (int i = 0; i < randomInt(10); i++) { + testInstance = makeTestChanges(testInstance); + } + + ToXContent.MapParams params = new ToXContent.MapParams( + Collections.singletonMap(Metadata.CONTEXT_MODE_PARAM, randomFrom(CONTEXT_MODE_SNAPSHOT, CONTEXT_MODE_GATEWAY)) + ); + + XContentType xContentType = randomFrom(XContentType.values()); + BytesReference shuffled = toShuffledXContent(asXContent(testInstance), xContentType, params, false); + + T newInstance; + try (XContentParser parser = createParser(XContentFactory.xContent(xContentType), shuffled)) { + newInstance = doParseInstance(parser); + } + assertNotSame(newInstance, testInstance); + + assertEquals(asPersistentTasks(testInstance).tasks().size(), asPersistentTasks(newInstance).tasks().size()); + for (PersistentTasksCustomMetadata.PersistentTask testTask : asPersistentTasks(testInstance).tasks()) { + PersistentTasksCustomMetadata.PersistentTask newTask = + (PersistentTasksCustomMetadata.PersistentTask) asPersistentTasks(newInstance).getTask( + testTask.getId() + ); + assertNotNull(newTask); + + // Things that should be serialized + assertEquals(testTask.getTaskName(), newTask.getTaskName()); + assertEquals(testTask.getId(), newTask.getId()); + assertEquals(testTask.getState(), newTask.getState()); + assertEquals(testTask.getParams(), newTask.getParams()); + + // Things that shouldn't be serialized + assertEquals(0, newTask.getAllocationId()); + assertNull(newTask.getExecutorNode()); + } + } + + @SuppressWarnings("unchecked") + public void testBuilder() { + T persistentTasks = null; + String lastKnownTask = ""; + for (int i = 0; i < randomIntBetween(10, 100); i++) { + final PersistentTasks.Builder builder; + if (randomBoolean()) { + builder = builder(); + } else { + builder = builder(persistentTasks); + } + boolean changed = false; + for (int j = 0; j < randomIntBetween(1, 10); j++) { + switch (randomInt(3)) { + case 0: + lastKnownTask = addRandomTask(builder); + changed = true; + break; + case 1: + if (builder.hasTask(lastKnownTask)) { + changed = true; + builder.reassignTask(lastKnownTask, randomAssignment()); + } else { + String fLastKnownTask = lastKnownTask; + expectThrows(ResourceNotFoundException.class, () -> builder.reassignTask(fLastKnownTask, randomAssignment())); + } + break; + case 2: + if (builder.hasTask(lastKnownTask)) { + changed = true; + builder.updateTaskState( + lastKnownTask, + randomBoolean() ? new TestPersistentTasksPlugin.State(randomAlphaOfLength(10)) : null + ); + } else { + String fLastKnownTask = lastKnownTask; + expectThrows(ResourceNotFoundException.class, () -> builder.updateTaskState(fLastKnownTask, null)); + } + break; + case 3: + if (builder.hasTask(lastKnownTask)) { + changed = true; + builder.removeTask(lastKnownTask); + } else { + String fLastKnownTask = lastKnownTask; + expectThrows(ResourceNotFoundException.class, () -> builder.removeTask(fLastKnownTask)); + } + break; + } + } + assertEquals(changed, builder.isChanged()); + persistentTasks = (T) builder.build(); + } + } + + @SuppressWarnings("unchecked") + public void testMinVersionSerialization() throws IOException { + PersistentTasks.Builder tasks = builder(); + + TransportVersion minVersion = getFirstVersion(); + TransportVersion streamVersion = randomVersionBetween(random(), minVersion, getPreviousVersion(TransportVersion.current())); + tasks.addTask( + "test_compatible_version", + TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, + new TestPersistentTasksPlugin.TestParams( + null, + randomVersionBetween(random(), minVersion, streamVersion), + randomBoolean() ? Optional.empty() : Optional.of("test") + ), + randomAssignment() + ); + tasks.addTask( + "test_incompatible_version", + TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, + new TestPersistentTasksPlugin.TestParams( + null, + randomVersionBetween(random(), getNextVersion(streamVersion), TransportVersion.current()), + randomBoolean() ? Optional.empty() : Optional.of("test") + ), + randomAssignment() + ); + final BytesStreamOutput out = new BytesStreamOutput(); + + out.setTransportVersion(streamVersion); + ((T) tasks.build()).writeTo(out); + + final StreamInput input = out.bytes().streamInput(); + input.setTransportVersion(streamVersion); + PersistentTasksCustomMetadata read = new PersistentTasksCustomMetadata( + new NamedWriteableAwareStreamInput(input, getNamedWriteableRegistry()) + ); + + assertThat(read.taskMap().keySet(), contains("test_compatible_version")); + } + + private PersistentTasks asPersistentTasks(T testInstance) { + return (PersistentTasks) testInstance; + } + + protected String addRandomTask(PersistentTasks.Builder builder) { + String taskId = UUIDs.base64UUID(); + builder.addTask( + taskId, + TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, + new TestPersistentTasksPlugin.TestParams(randomAlphaOfLength(10)), + randomAssignment() + ); + return taskId; + } + + protected String pickRandomTask(PersistentTasks.Builder testInstance) { + return randomFrom(new ArrayList<>(testInstance.getCurrentTaskIds())); + } + + protected PersistentTasksCustomMetadata.Assignment randomAssignment() { + if (randomBoolean()) { + if (randomBoolean()) { + return NO_NODE_FOUND; + } else { + return new PersistentTasksCustomMetadata.Assignment(null, randomAlphaOfLength(10)); + } + } + return new PersistentTasksCustomMetadata.Assignment(randomAlphaOfLength(10), randomAlphaOfLength(10)); + } + + protected abstract PersistentTasks.Builder builder(); + + protected abstract PersistentTasks.Builder builder(T testInstance); + +} diff --git a/server/src/test/java/org/elasticsearch/persistent/ClusterPersistentTasksCustomMetadataTests.java b/server/src/test/java/org/elasticsearch/persistent/ClusterPersistentTasksCustomMetadataTests.java new file mode 100644 index 000000000000..92364b4e38a3 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/persistent/ClusterPersistentTasksCustomMetadataTests.java @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.persistent; + +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.cluster.metadata.Metadata.ClusterCustom; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.State; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParser; + +import java.util.Arrays; + +public class ClusterPersistentTasksCustomMetadataTests extends BasePersistentTasksCustomMetadataTests { + + @Override + protected Writeable.Reader instanceReader() { + return ClusterPersistentTasksCustomMetadata::new; + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry( + Arrays.asList( + new Entry(ClusterCustom.class, ClusterPersistentTasksCustomMetadata.TYPE, ClusterPersistentTasksCustomMetadata::new), + new Entry(NamedDiff.class, ClusterPersistentTasksCustomMetadata.TYPE, ClusterPersistentTasksCustomMetadata::readDiffFrom), + new Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new), + new Entry(PersistentTaskState.class, TestPersistentTasksExecutor.NAME, State::new) + ) + ); + } + + @Override + protected Writeable.Reader> diffReader() { + return ClusterPersistentTasksCustomMetadata::readDiffFrom; + } + + @Override + protected ClusterPersistentTasksCustomMetadata doParseInstance(XContentParser parser) { + return ClusterPersistentTasksCustomMetadata.fromXContent(parser); + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + return new NamedXContentRegistry( + Arrays.asList( + new NamedXContentRegistry.Entry( + PersistentTaskParams.class, + new ParseField(TestPersistentTasksExecutor.NAME), + TestParams::fromXContent + ), + new NamedXContentRegistry.Entry( + PersistentTaskState.class, + new ParseField(TestPersistentTasksExecutor.NAME), + State::fromXContent + ) + ) + ); + } + + @Override + protected PersistentTasks.Builder builder() { + return ClusterPersistentTasksCustomMetadata.builder(); + } + + @Override + protected PersistentTasks.Builder builder(ClusterCustom testInstance) { + return ClusterPersistentTasksCustomMetadata.builder((ClusterPersistentTasksCustomMetadata) testInstance); + } +} diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetadataTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetadataTests.java index 7c9bb119b4ee..4b86ace2f6a1 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetadataTests.java @@ -8,7 +8,6 @@ */ package org.elasticsearch.persistent; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.TransportVersion; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -20,68 +19,24 @@ import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment; -import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Builder; -import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; import org.elasticsearch.persistent.TestPersistentTasksPlugin.State; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; -import org.elasticsearch.test.ChunkedToXContentDiffableSerializationTestCase; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ParseField; -import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentParser; -import org.elasticsearch.xcontent.XContentType; -import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.Optional; -import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_GATEWAY; -import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_SNAPSHOT; -import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND; -import static org.elasticsearch.test.TransportVersionUtils.getFirstVersion; -import static org.elasticsearch.test.TransportVersionUtils.getNextVersion; -import static org.elasticsearch.test.TransportVersionUtils.getPreviousVersion; -import static org.elasticsearch.test.TransportVersionUtils.randomVersionBetween; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.sameInstance; -public class PersistentTasksCustomMetadataTests extends ChunkedToXContentDiffableSerializationTestCase { - - @Override - protected PersistentTasksCustomMetadata createTestInstance() { - int numberOfTasks = randomInt(10); - PersistentTasksCustomMetadata.Builder tasks = PersistentTasksCustomMetadata.builder(); - for (int i = 0; i < numberOfTasks; i++) { - String taskId = UUIDs.base64UUID(); - tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)), randomAssignment()); - if (randomBoolean()) { - // From time to time update status - tasks.updateTaskState(taskId, new State(randomAlphaOfLength(10))); - } - } - return tasks.build(); - } - - @Override - protected ProjectCustom mutateInstance(ProjectCustom instance) { - return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929 - } +public class PersistentTasksCustomMetadataTests extends BasePersistentTasksCustomMetadataTests { @Override protected Writeable.Reader instanceReader() { @@ -100,38 +55,6 @@ public class PersistentTasksCustomMetadataTests extends ChunkedToXContentDiffabl ); } - @Override - protected ProjectCustom makeTestChanges(ProjectCustom testInstance) { - Builder builder = PersistentTasksCustomMetadata.builder((PersistentTasksCustomMetadata) testInstance); - switch (randomInt(3)) { - case 0: - addRandomTask(builder); - break; - case 1: - if (builder.getCurrentTaskIds().isEmpty()) { - addRandomTask(builder); - } else { - builder.reassignTask(pickRandomTask(builder), randomAssignment()); - } - break; - case 2: - if (builder.getCurrentTaskIds().isEmpty()) { - addRandomTask(builder); - } else { - builder.updateTaskState(pickRandomTask(builder), randomBoolean() ? new State(randomAlphaOfLength(10)) : null); - } - break; - case 3: - if (builder.getCurrentTaskIds().isEmpty()) { - addRandomTask(builder); - } else { - builder.removeTask(pickRandomTask(builder)); - } - break; - } - return builder.build(); - } - @Override protected Writeable.Reader> diffReader() { return PersistentTasksCustomMetadata::readDiffFrom; @@ -142,16 +65,6 @@ public class PersistentTasksCustomMetadataTests extends ChunkedToXContentDiffabl return PersistentTasksCustomMetadata.fromXContent(parser); } - private String addRandomTask(Builder builder) { - String taskId = UUIDs.base64UUID(); - builder.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)), randomAssignment()); - return taskId; - } - - private String pickRandomTask(PersistentTasksCustomMetadata.Builder testInstance) { - return randomFrom(new ArrayList<>(testInstance.getCurrentTaskIds())); - } - @Override protected NamedXContentRegistry xContentRegistry() { return new NamedXContentRegistry( @@ -170,131 +83,14 @@ public class PersistentTasksCustomMetadataTests extends ChunkedToXContentDiffabl ); } - @SuppressWarnings("unchecked") - public void testSerializationContext() throws Exception { - PersistentTasksCustomMetadata testInstance = createTestInstance(); - for (int i = 0; i < randomInt(10); i++) { - testInstance = (PersistentTasksCustomMetadata) makeTestChanges(testInstance); - } - - ToXContent.MapParams params = new ToXContent.MapParams( - Collections.singletonMap(Metadata.CONTEXT_MODE_PARAM, randomFrom(CONTEXT_MODE_SNAPSHOT, CONTEXT_MODE_GATEWAY)) - ); - - XContentType xContentType = randomFrom(XContentType.values()); - BytesReference shuffled = toShuffledXContent(asXContent(testInstance), xContentType, params, false); - - PersistentTasksCustomMetadata newInstance; - try (XContentParser parser = createParser(XContentFactory.xContent(xContentType), shuffled)) { - newInstance = doParseInstance(parser); - } - assertNotSame(newInstance, testInstance); - - assertEquals(testInstance.tasks().size(), newInstance.tasks().size()); - for (PersistentTask testTask : testInstance.tasks()) { - PersistentTask newTask = (PersistentTask) newInstance.getTask(testTask.getId()); - assertNotNull(newTask); - - // Things that should be serialized - assertEquals(testTask.getTaskName(), newTask.getTaskName()); - assertEquals(testTask.getId(), newTask.getId()); - assertEquals(testTask.getState(), newTask.getState()); - assertEquals(testTask.getParams(), newTask.getParams()); - - // Things that shouldn't be serialized - assertEquals(0, newTask.getAllocationId()); - assertNull(newTask.getExecutorNode()); - } + @Override + protected PersistentTasks.Builder builder() { + return PersistentTasksCustomMetadata.builder(); } - public void testBuilder() { - PersistentTasksCustomMetadata persistentTasks = null; - String lastKnownTask = ""; - for (int i = 0; i < randomIntBetween(10, 100); i++) { - final Builder builder; - if (randomBoolean()) { - builder = PersistentTasksCustomMetadata.builder(); - } else { - builder = PersistentTasksCustomMetadata.builder(persistentTasks); - } - boolean changed = false; - for (int j = 0; j < randomIntBetween(1, 10); j++) { - switch (randomInt(3)) { - case 0: - lastKnownTask = addRandomTask(builder); - changed = true; - break; - case 1: - if (builder.hasTask(lastKnownTask)) { - changed = true; - builder.reassignTask(lastKnownTask, randomAssignment()); - } else { - String fLastKnownTask = lastKnownTask; - expectThrows(ResourceNotFoundException.class, () -> builder.reassignTask(fLastKnownTask, randomAssignment())); - } - break; - case 2: - if (builder.hasTask(lastKnownTask)) { - changed = true; - builder.updateTaskState(lastKnownTask, randomBoolean() ? new State(randomAlphaOfLength(10)) : null); - } else { - String fLastKnownTask = lastKnownTask; - expectThrows(ResourceNotFoundException.class, () -> builder.updateTaskState(fLastKnownTask, null)); - } - break; - case 3: - if (builder.hasTask(lastKnownTask)) { - changed = true; - builder.removeTask(lastKnownTask); - } else { - String fLastKnownTask = lastKnownTask; - expectThrows(ResourceNotFoundException.class, () -> builder.removeTask(fLastKnownTask)); - } - break; - } - } - assertEquals(changed, builder.isChanged()); - persistentTasks = builder.build(); - } - } - - public void testMinVersionSerialization() throws IOException { - PersistentTasksCustomMetadata.Builder tasks = PersistentTasksCustomMetadata.builder(); - - TransportVersion minVersion = getFirstVersion(); - TransportVersion streamVersion = randomVersionBetween(random(), minVersion, getPreviousVersion(TransportVersion.current())); - tasks.addTask( - "test_compatible_version", - TestPersistentTasksExecutor.NAME, - new TestParams( - null, - randomVersionBetween(random(), minVersion, streamVersion), - randomBoolean() ? Optional.empty() : Optional.of("test") - ), - randomAssignment() - ); - tasks.addTask( - "test_incompatible_version", - TestPersistentTasksExecutor.NAME, - new TestParams( - null, - randomVersionBetween(random(), getNextVersion(streamVersion), TransportVersion.current()), - randomBoolean() ? Optional.empty() : Optional.of("test") - ), - randomAssignment() - ); - final BytesStreamOutput out = new BytesStreamOutput(); - - out.setTransportVersion(streamVersion); - tasks.build().writeTo(out); - - final StreamInput input = out.bytes().streamInput(); - input.setTransportVersion(streamVersion); - PersistentTasksCustomMetadata read = new PersistentTasksCustomMetadata( - new NamedWriteableAwareStreamInput(input, getNamedWriteableRegistry()) - ); - - assertThat(read.taskMap().keySet(), contains("test_compatible_version")); + @Override + protected PersistentTasks.Builder builder(ProjectCustom testInstance) { + return PersistentTasksCustomMetadata.builder((PersistentTasksCustomMetadata) testInstance); } public void testDisassociateDeadNodes_givenNoPersistentTasks() { @@ -379,7 +175,7 @@ public class PersistentTasksCustomMetadataTests extends ChunkedToXContentDiffabl assertEquals(originalTasks.getTask("assigned-task"), returnedTasks.getTask("assigned-task")); assertNotEquals(originalTasks.getTask("task-on-deceased-node"), returnedTasks.getTask("task-on-deceased-node")); - assertEquals(PersistentTasksCustomMetadata.LOST_NODE_ASSIGNMENT, returnedTasks.getTask("task-on-deceased-node").getAssignment()); + assertEquals(PersistentTasks.LOST_NODE_ASSIGNMENT, returnedTasks.getTask("task-on-deceased-node").getAssignment()); } private PersistentTaskParams emptyTaskParams(String taskName) { @@ -406,15 +202,4 @@ public class PersistentTasksCustomMetadataTests extends ChunkedToXContentDiffabl } }; } - - private Assignment randomAssignment() { - if (randomBoolean()) { - if (randomBoolean()) { - return NO_NODE_FOUND; - } else { - return new Assignment(null, randomAlphaOfLength(10)); - } - } - return new Assignment(randomAlphaOfLength(10), randomAlphaOfLength(10)); - } } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorResponseTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorResponseTests.java index aae6e7650070..0315ab2f720f 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorResponseTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorResponseTests.java @@ -28,7 +28,7 @@ public class PersistentTasksExecutorResponseTests extends AbstractWireSerializin TestPersistentTasksExecutor.NAME, new TestPersistentTasksPlugin.TestParams("test"), randomLong(), - PersistentTasksCustomMetadata.INITIAL_ASSIGNMENT + PersistentTasks.INITIAL_ASSIGNMENT ) ); } else {