diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index cfb2698365d7..b001d188585a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -76,6 +76,7 @@ import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.ingest.IngestMetadata; import org.elasticsearch.injection.guice.AbstractModule; +import org.elasticsearch.persistent.ClusterPersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksNodeService; import org.elasticsearch.plugins.ClusterPlugin; @@ -226,12 +227,20 @@ public class ClusterModule extends AbstractModule { registerProjectCustom(entries, IngestMetadata.TYPE, IngestMetadata::new, IngestMetadata::readDiffFrom); registerProjectCustom(entries, ScriptMetadata.TYPE, ScriptMetadata::new, ScriptMetadata::readDiffFrom); registerProjectCustom(entries, IndexGraveyard.TYPE, IndexGraveyard::new, IndexGraveyard::readDiffFrom); + // Project scoped persistent tasks registerProjectCustom( entries, PersistentTasksCustomMetadata.TYPE, PersistentTasksCustomMetadata::new, PersistentTasksCustomMetadata::readDiffFrom ); + // Cluster scoped persistent tasks + registerMetadataCustom( + entries, + ClusterPersistentTasksCustomMetadata.TYPE, + ClusterPersistentTasksCustomMetadata::new, + ClusterPersistentTasksCustomMetadata::readDiffFrom + ); registerProjectCustom( entries, ComponentTemplateMetadata.TYPE, @@ -283,6 +292,7 @@ public class ClusterModule extends AbstractModule { entries.add( new NamedXContentRegistry.Entry(Metadata.ProjectCustom.class, new ParseField(IndexGraveyard.TYPE), IndexGraveyard::fromXContent) ); + // Project scoped persistent tasks entries.add( new NamedXContentRegistry.Entry( Metadata.ProjectCustom.class, @@ -290,6 +300,14 @@ public class ClusterModule extends AbstractModule { PersistentTasksCustomMetadata::fromXContent ) ); + // Cluster scoped persistent tasks + entries.add( + new NamedXContentRegistry.Entry( + Metadata.ClusterCustom.class, + new ParseField(ClusterPersistentTasksCustomMetadata.TYPE), + ClusterPersistentTasksCustomMetadata::fromXContent + ) + ); entries.add( new NamedXContentRegistry.Entry( Metadata.ProjectCustom.class, diff --git a/server/src/main/java/org/elasticsearch/persistent/ClusterPersistentTasksCustomMetadata.java b/server/src/main/java/org/elasticsearch/persistent/ClusterPersistentTasksCustomMetadata.java new file mode 100644 index 000000000000..e343bcf27a3e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/persistent/ClusterPersistentTasksCustomMetadata.java @@ -0,0 +1,174 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.persistent; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.cluster.AbstractNamedDiffable; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Collections; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; + +import static org.elasticsearch.cluster.metadata.Metadata.ALL_CONTEXTS; +import static org.elasticsearch.persistent.PersistentTasks.Parsers.PERSISTENT_TASK_PARSER; + +/** + * A cluster state record that contains a list of all running persistent tasks for the cluster itself + */ +public final class ClusterPersistentTasksCustomMetadata extends AbstractNamedDiffable + implements + Metadata.ClusterCustom, + PersistentTasks { + + public static final String TYPE = "cluster_persistent_tasks"; + + static final ObjectParser PERSISTENT_TASKS_PARSER = new ObjectParser<>(TYPE, Builder::new); + + static { + // Tasks parser initialization + PERSISTENT_TASKS_PARSER.declareLong(Builder::setLastAllocationId, new ParseField("last_allocation_id")); + PERSISTENT_TASKS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_PARSER, new ParseField("tasks")); + } + + @Deprecated(forRemoval = true) + public static ClusterPersistentTasksCustomMetadata getPersistentTasksCustomMetadata(ClusterState clusterState) { + return get(clusterState.metadata()); + } + + public static ClusterPersistentTasksCustomMetadata get(Metadata metadata) { + return metadata.custom(TYPE); + } + + // TODO: Implement custom Diff for tasks + private final Map> tasks; + private final long lastAllocationId; + + public ClusterPersistentTasksCustomMetadata(long lastAllocationId, Map> tasks) { + this.lastAllocationId = lastAllocationId; + this.tasks = tasks; + } + + public ClusterPersistentTasksCustomMetadata(StreamInput in) throws IOException { + lastAllocationId = in.readLong(); + tasks = in.readMap(PersistentTasksCustomMetadata.PersistentTask::new); + } + + public static ClusterPersistentTasksCustomMetadata fromXContent(XContentParser parser) { + return PERSISTENT_TASKS_PARSER.apply(parser, null).build(); + } + + @SuppressWarnings("unchecked") + public static PersistentTasksCustomMetadata.PersistentTask getTaskWithId( + ClusterState clusterState, + String taskId + ) { + ClusterPersistentTasksCustomMetadata tasks = get(clusterState.metadata()); + if (tasks != null) { + return (PersistentTasksCustomMetadata.PersistentTask) tasks.getTask(taskId); + } + return null; + } + + @Override + public long getLastAllocationId() { + return lastAllocationId; + } + + @Override + public Map> taskMap() { + return this.tasks; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClusterPersistentTasksCustomMetadata that = (ClusterPersistentTasksCustomMetadata) o; + return lastAllocationId == that.lastAllocationId && Objects.equals(tasks, that.tasks); + } + + @Override + public int hashCode() { + return Objects.hash(tasks, lastAllocationId); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersions.MINIMUM_COMPATIBLE; + } + + @Override + public EnumSet context() { + return ALL_CONTEXTS; + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + doWriteTo(out); + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(Metadata.ClusterCustom.class, TYPE, in); + } + + @Override + public Iterator toXContentChunked(ToXContent.Params ignored) { + return doToXContentChunked(); + } + + public static Builder builder() { + return new Builder(); + } + + public static Builder builder(ClusterPersistentTasksCustomMetadata tasks) { + return new Builder(tasks); + } + + public static class Builder extends PersistentTasks.Builder { + + protected Builder() { + super(); + } + + protected Builder(PersistentTasks tasksInProgress) { + super(tasksInProgress); + } + + @Override + public ClusterPersistentTasksCustomMetadata build() { + return new ClusterPersistentTasksCustomMetadata(getLastAllocationId(), Collections.unmodifiableMap(getCurrentTasks())); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasks.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasks.java new file mode 100644 index 000000000000..0eaed2db9fec --- /dev/null +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasks.java @@ -0,0 +1,375 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.persistent; + +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.VersionedNamedWriteable; +import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; + +/** + * The common data structure for persistent tasks, both cluster-scoped and project-scoped. + * It is meant to be used in places where both type of tasks are processed similarly to achieve better code reuse. + */ +public interface PersistentTasks { + + String API_CONTEXT = Metadata.XContentContext.API.toString(); + Assignment LOST_NODE_ASSIGNMENT = new Assignment(null, "awaiting reassignment after node loss"); + Assignment INITIAL_ASSIGNMENT = new Assignment(null, "waiting for initial assignment"); + + /** + * @return The last allocation id for the tasks. + */ + long getLastAllocationId(); + + /** + * @return The map of actual tasks keyed by task ID. + */ + Map> taskMap(); + + /** + * @return A collection of all tasks + */ + default Collection> tasks() { + return taskMap().values(); + } + + /** + * @param id The task ID + * @return The task with the specified task ID + */ + default PersistentTask getTask(String id) { + return taskMap().get(id); + } + + /** + * @param taskName The task name, see also {@link PersistentTasksExecutor#getTaskName()} + * @param predicate The filter for the matching tasks + * @return A collection of tasks matching the specified taskName and predicate + */ + default Collection> findTasks(String taskName, Predicate> predicate) { + return tasks().stream().filter(p -> taskName.equals(p.getTaskName())).filter(predicate).toList(); + } + + /** + * @param nodeId The node ID where the task runs + * @param taskName The task name, see also {@link PersistentTasksExecutor#getTaskName()} + * @return The number of tasks of the taskName currently running on the specified nodeId + */ + default long getNumberOfTasksOnNode(String nodeId, String taskName) { + return tasks().stream() + .filter(task -> taskName.equals(task.getTaskName()) && nodeId.equals(task.getAssignment().getExecutorNode())) + .count(); + } + + default void doWriteTo(StreamOutput out) throws IOException { + out.writeLong(getLastAllocationId()); + Map> filteredTasks = tasks().stream() + .filter(t -> VersionedNamedWriteable.shouldSerialize(out, t.getParams())) + .collect(Collectors.toMap(PersistentTask::getId, Function.identity())); + out.writeMap(filteredTasks, StreamOutput::writeWriteable); + } + + default Iterator doToXContentChunked() { + return Iterators.concat( + Iterators.single((builder, params) -> builder.field("last_allocation_id", getLastAllocationId())), + ChunkedToXContentHelper.array("tasks", tasks().iterator()) + ); + } + + class Parsers { + public static final ConstructingObjectParser ASSIGNMENT_PARSER = new ConstructingObjectParser<>( + "assignment", + objects -> new Assignment((String) objects[0], (String) objects[1]) + ); + static final ObjectParser, Void> PERSISTENT_TASK_PARSER = new ObjectParser<>( + "tasks", + TaskBuilder::new + ); + private static final ObjectParser.NamedObjectParser, Void> TASK_DESCRIPTION_PARSER; + + static { + // Task description parser initialization + ObjectParser, String> parser = new ObjectParser<>("named"); + parser.declareObject( + TaskDescriptionBuilder::setParams, + (p, c) -> p.namedObject(PersistentTaskParams.class, c, null), + new ParseField("params") + ); + parser.declareObject( + TaskDescriptionBuilder::setState, + (p, c) -> p.namedObject(PersistentTaskState.class, c, null), + new ParseField("state", "status") + ); + TASK_DESCRIPTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new TaskDescriptionBuilder<>(name), name); + + // Assignment parser + ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("executor_node")); + ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("explanation")); + + // Task parser initialization + PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setId, new ParseField("id")); + PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setTaskName, new ParseField("name")); + PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id")); + + PERSISTENT_TASK_PARSER.declareNamedObjects( + (TaskBuilder taskBuilder, List> objects) -> { + if (objects.size() != 1) { + throw new IllegalArgumentException("only one task description per task is allowed"); + } + TaskDescriptionBuilder builder = objects.get(0); + taskBuilder.setTaskName(builder.taskName); + taskBuilder.setParams(builder.params); + taskBuilder.setState(builder.state); + }, + TASK_DESCRIPTION_PARSER, + new ParseField("task") + ); + PERSISTENT_TASK_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment")); + PERSISTENT_TASK_PARSER.declareLong( + TaskBuilder::setAllocationIdOnLastStatusUpdate, + new ParseField("allocation_id_on_last_status_update") + ); + } + } + + /** + * Private builder used in XContent parser to build task-specific portion (params and state) + */ + class TaskDescriptionBuilder { + + private final String taskName; + private Params params; + private PersistentTaskState state; + + private TaskDescriptionBuilder(String taskName) { + this.taskName = taskName; + } + + private TaskDescriptionBuilder setParams(Params params) { + this.params = params; + return this; + } + + private TaskDescriptionBuilder setState(PersistentTaskState state) { + this.state = state; + return this; + } + } + + class TaskBuilder { + private String id; + private long allocationId; + private String taskName; + private Params params; + private PersistentTaskState state; + private Assignment assignment = INITIAL_ASSIGNMENT; + private Long allocationIdOnLastStatusUpdate; + + public TaskBuilder setId(String id) { + this.id = id; + return this; + } + + public TaskBuilder setAllocationId(long allocationId) { + this.allocationId = allocationId; + return this; + } + + public TaskBuilder setTaskName(String taskName) { + this.taskName = taskName; + return this; + } + + public TaskBuilder setParams(Params params) { + this.params = params; + return this; + } + + public TaskBuilder setState(PersistentTaskState state) { + this.state = state; + return this; + } + + public TaskBuilder setAssignment(Assignment assignment) { + this.assignment = assignment; + return this; + } + + public TaskBuilder setAllocationIdOnLastStatusUpdate(Long allocationIdOnLastStatusUpdate) { + this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate; + return this; + } + + public PersistentTask build() { + return new PersistentTask<>(id, allocationId, taskName, params, state, assignment, allocationIdOnLastStatusUpdate); + } + } + + abstract class Builder> { + private final Map> tasks = new HashMap<>(); + private long lastAllocationId; + private boolean changed; + + protected Builder() {} + + protected Builder(PersistentTasks tasksInProgress) { + if (tasksInProgress != null) { + tasks.putAll(tasksInProgress.taskMap()); + lastAllocationId = tasksInProgress.getLastAllocationId(); + } else { + lastAllocationId = 0; + } + } + + public long getLastAllocationId() { + return lastAllocationId; + } + + @SuppressWarnings("unchecked") + protected T setLastAllocationId(long currentId) { + this.lastAllocationId = currentId; + return (T) this; + } + + @SuppressWarnings("unchecked") + protected T setTasks(List> tasks) { + for (TaskBuilder builder : tasks) { + PersistentTask task = builder.build(); + this.tasks.put(task.getId(), task); + } + return (T) this; + } + + private long getNextAllocationId() { + lastAllocationId++; + return lastAllocationId; + } + + /** + * Adds a new task to the builder + *

+ * After the task is added its id can be found by calling {{@link #getLastAllocationId()}} method. + */ + @SuppressWarnings("unchecked") + public T addTask(String taskId, String taskName, Params params, Assignment assignment) { + changed = true; + PersistentTask previousTask = tasks.put( + taskId, + new PersistentTask<>(taskId, taskName, params, getNextAllocationId(), assignment) + ); + if (previousTask != null) { + throw new ResourceAlreadyExistsException("Trying to override task with id {" + taskId + "}"); + } + return (T) this; + } + + /** + * Reassigns the task to another node + */ + @SuppressWarnings("unchecked") + public T reassignTask(String taskId, Assignment assignment) { + PersistentTask taskInProgress = tasks.get(taskId); + if (taskInProgress != null) { + changed = true; + tasks.put(taskId, new PersistentTask<>(taskInProgress, getNextAllocationId(), assignment)); + } else { + throw new ResourceNotFoundException("cannot reassign task with id {" + taskId + "}, the task no longer exists"); + } + return (T) this; + } + + /** + * Updates the task state + */ + @SuppressWarnings("unchecked") + public T updateTaskState(final String taskId, final PersistentTaskState taskState) { + PersistentTask taskInProgress = tasks.get(taskId); + if (taskInProgress != null) { + changed = true; + tasks.put(taskId, new PersistentTask<>(taskInProgress, taskState)); + } else { + throw new ResourceNotFoundException("cannot update task with id {" + taskId + "}, the task no longer exists"); + } + return (T) this; + } + + /** + * Removes the task + */ + @SuppressWarnings("unchecked") + public T removeTask(String taskId) { + if (tasks.remove(taskId) != null) { + changed = true; + } else { + throw new ResourceNotFoundException("cannot remove task with id {" + taskId + "}, the task no longer exists"); + } + return (T) this; + } + + /** + * Checks if the task is currently present in the list + */ + public boolean hasTask(String taskId) { + return tasks.containsKey(taskId); + } + + /** + * Checks if the task is currently present in the list and has the right allocation id + */ + public boolean hasTask(String taskId, long allocationId) { + PersistentTask taskInProgress = tasks.get(taskId); + if (taskInProgress != null) { + return taskInProgress.getAllocationId() == allocationId; + } + return false; + } + + Map> getCurrentTasks() { + return tasks; + } + + Set getCurrentTaskIds() { + return tasks.keySet(); + } + + /** + * Returns true if any the task list was changed since the builder was created + */ + public boolean isChanged() { + return changed; + } + + public abstract PersistentTasks build(); + } +} diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetadata.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetadata.java index 93c03c940d4d..8a0677a4fc42 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetadata.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetadata.java @@ -8,8 +8,6 @@ */ package org.elasticsearch.persistent; -import org.elasticsearch.ResourceAlreadyExistsException; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.cluster.AbstractNamedDiffable; @@ -19,16 +17,12 @@ import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.VersionedNamedWriteable; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Nullable; -import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ObjectParser; -import org.elasticsearch.xcontent.ObjectParser.NamedObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.ToXContentObject; @@ -36,30 +30,25 @@ import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; -import java.util.Collection; import java.util.Collections; import java.util.EnumSet; -import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.Metadata.ALL_CONTEXTS; -import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.persistent.PersistentTasks.Parsers.PERSISTENT_TASK_PARSER; /** - * A cluster state record that contains a list of all running persistent tasks + * A cluster state record that contains a list of all running persistent tasks from a project */ -public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable implements Metadata.ProjectCustom { +@FixForMultiProject(description = "Consider renaming it to ProjectPersistentTasksCustomMetadata") +public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable + implements + Metadata.ProjectCustom, + PersistentTasks { public static final String TYPE = "persistent_tasks"; - private static final String API_CONTEXT = Metadata.XContentContext.API.toString(); - static final Assignment LOST_NODE_ASSIGNMENT = new Assignment(null, "awaiting reassignment after node loss"); // TODO: Implement custom Diff for tasks private final Map> tasks; @@ -72,64 +61,10 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable PERSISTENT_TASKS_PARSER = new ObjectParser<>(TYPE, Builder::new); - private static final ObjectParser, Void> PERSISTENT_TASK_PARSER = new ObjectParser<>( - "tasks", - TaskBuilder::new - ); - - public static final ConstructingObjectParser ASSIGNMENT_PARSER = new ConstructingObjectParser<>( - "assignment", - objects -> new Assignment((String) objects[0], (String) objects[1]) - ); - - private static final NamedObjectParser, Void> TASK_DESCRIPTION_PARSER; - static { // Tasks parser initialization PERSISTENT_TASKS_PARSER.declareLong(Builder::setLastAllocationId, new ParseField("last_allocation_id")); PERSISTENT_TASKS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_PARSER, new ParseField("tasks")); - - // Task description parser initialization - ObjectParser, String> parser = new ObjectParser<>("named"); - parser.declareObject( - TaskDescriptionBuilder::setParams, - (p, c) -> p.namedObject(PersistentTaskParams.class, c, null), - new ParseField("params") - ); - parser.declareObject( - TaskDescriptionBuilder::setState, - (p, c) -> p.namedObject(PersistentTaskState.class, c, null), - new ParseField("state", "status") - ); - TASK_DESCRIPTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new TaskDescriptionBuilder<>(name), name); - - // Assignment parser - ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("executor_node")); - ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("explanation")); - - // Task parser initialization - PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setId, new ParseField("id")); - PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setTaskName, new ParseField("name")); - PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id")); - - PERSISTENT_TASK_PARSER.declareNamedObjects( - (TaskBuilder taskBuilder, List> objects) -> { - if (objects.size() != 1) { - throw new IllegalArgumentException("only one task description per task is allowed"); - } - TaskDescriptionBuilder builder = objects.get(0); - taskBuilder.setTaskName(builder.taskName); - taskBuilder.setParams(builder.params); - taskBuilder.setState(builder.state); - }, - TASK_DESCRIPTION_PARSER, - new ParseField("task") - ); - PERSISTENT_TASK_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment")); - PERSISTENT_TASK_PARSER.declareLong( - TaskBuilder::setAllocationIdOnLastStatusUpdate, - new ParseField("allocation_id_on_last_status_update") - ); } @Deprecated(forRemoval = true) @@ -138,49 +73,19 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable { - - private final String taskName; - private Params params; - private PersistentTaskState state; - - private TaskDescriptionBuilder(String taskName) { - this.taskName = taskName; - } - - private TaskDescriptionBuilder setParams(Params params) { - this.params = params; - return this; - } - - private TaskDescriptionBuilder setState(PersistentTaskState state) { - this.state = state; - return this; - } - } - - public Collection> tasks() { - return this.tasks.values(); + @Override + public long getLastAllocationId() { + return lastAllocationId; } + @Override public Map> taskMap() { return this.tasks; } - public PersistentTask getTask(String id) { - return this.tasks.get(id); - } - - public Collection> findTasks(String taskName, Predicate> predicate) { - return this.tasks().stream().filter(p -> taskName.equals(p.getTaskName())).filter(predicate).toList(); - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -199,13 +104,6 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable taskName.equals(task.taskName) && nodeId.equals(task.assignment.executorNode)) - .count(); - } - @Override public TransportVersion getMinimalSupportedVersion() { return TransportVersions.MINIMUM_COMPATIBLE; @@ -222,7 +120,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable PersistentTask getTaskWithId(ClusterState clusterState, String taskId) { - PersistentTasksCustomMetadata tasks = clusterState.metadata().getProject().custom(PersistentTasksCustomMetadata.TYPE); + PersistentTasksCustomMetadata tasks = get(clusterState.metadata().getProject()); if (tasks != null) { return (PersistentTask) tasks.getTask(taskId); } @@ -233,7 +131,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable task : tasks.tasks()) { if (task.getAssignment().getExecutorNode() != null && projectState.cluster().nodes().nodeExists(task.getAssignment().getExecutorNode()) == false) { @@ -282,6 +180,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable implements Writeable, ToXContentObject { private final String id; @@ -352,7 +250,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable { - private String id; - private long allocationId; - private String taskName; - private Params params; - private PersistentTaskState state; - private Assignment assignment = INITIAL_ASSIGNMENT; - private Long allocationIdOnLastStatusUpdate; - - public TaskBuilder setId(String id) { - this.id = id; - return this; - } - - public TaskBuilder setAllocationId(long allocationId) { - this.allocationId = allocationId; - return this; - } - - public TaskBuilder setTaskName(String taskName) { - this.taskName = taskName; - return this; - } - - public TaskBuilder setParams(Params params) { - this.params = params; - return this; - } - - public TaskBuilder setState(PersistentTaskState state) { - this.state = state; - return this; - } - - public TaskBuilder setAssignment(Assignment assignment) { - this.assignment = assignment; - return this; - } - - public TaskBuilder setAllocationIdOnLastStatusUpdate(Long allocationIdOnLastStatusUpdate) { - this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate; - return this; - } - - public PersistentTask build() { - return new PersistentTask<>(id, allocationId, taskName, params, state, assignment, allocationIdOnLastStatusUpdate); - } - } - @Override public String getWriteableName() { return TYPE; @@ -566,12 +415,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable> filteredTasks = tasks.values() - .stream() - .filter(t -> VersionedNamedWriteable.shouldSerialize(out, t.getParams())) - .collect(Collectors.toMap(PersistentTask::getId, Function.identity())); - out.writeMap(filteredTasks, StreamOutput::writeWriteable); + doWriteTo(out); } public static NamedDiff readDiffFrom(StreamInput in) throws IOException { @@ -580,10 +424,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable toXContentChunked(ToXContent.Params ignored) { - return Iterators.concat( - Iterators.single((builder, params) -> builder.field("last_allocation_id", lastAllocationId)), - ChunkedToXContentHelper.array("tasks", tasks.values().iterator()) - ); + return doToXContentChunked(); } public static Builder builder() { @@ -594,132 +435,19 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable> tasks = new HashMap<>(); - private long lastAllocationId; - private boolean changed; + public static class Builder extends PersistentTasks.Builder { - private Builder() {} - - private Builder(PersistentTasksCustomMetadata tasksInProgress) { - if (tasksInProgress != null) { - tasks.putAll(tasksInProgress.tasks); - lastAllocationId = tasksInProgress.lastAllocationId; - } else { - lastAllocationId = 0; - } + protected Builder() { + super(); } - public long getLastAllocationId() { - return lastAllocationId; - } - - private Builder setLastAllocationId(long currentId) { - this.lastAllocationId = currentId; - return this; - } - - private Builder setTasks(List> tasks) { - for (TaskBuilder builder : tasks) { - PersistentTask task = builder.build(); - this.tasks.put(task.getId(), task); - } - return this; - } - - private long getNextAllocationId() { - lastAllocationId++; - return lastAllocationId; - } - - /** - * Adds a new task to the builder - *

- * After the task is added its id can be found by calling {{@link #getLastAllocationId()}} method. - */ - public Builder addTask(String taskId, String taskName, Params params, Assignment assignment) { - changed = true; - PersistentTask previousTask = tasks.put( - taskId, - new PersistentTask<>(taskId, taskName, params, getNextAllocationId(), assignment) - ); - if (previousTask != null) { - throw new ResourceAlreadyExistsException("Trying to override task with id {" + taskId + "}"); - } - return this; - } - - /** - * Reassigns the task to another node - */ - public Builder reassignTask(String taskId, Assignment assignment) { - PersistentTask taskInProgress = tasks.get(taskId); - if (taskInProgress != null) { - changed = true; - tasks.put(taskId, new PersistentTask<>(taskInProgress, getNextAllocationId(), assignment)); - } else { - throw new ResourceNotFoundException("cannot reassign task with id {" + taskId + "}, the task no longer exists"); - } - return this; - } - - /** - * Updates the task state - */ - public Builder updateTaskState(final String taskId, final PersistentTaskState taskState) { - PersistentTask taskInProgress = tasks.get(taskId); - if (taskInProgress != null) { - changed = true; - tasks.put(taskId, new PersistentTask<>(taskInProgress, taskState)); - } else { - throw new ResourceNotFoundException("cannot update task with id {" + taskId + "}, the task no longer exists"); - } - return this; - } - - /** - * Removes the task - */ - public Builder removeTask(String taskId) { - if (tasks.remove(taskId) != null) { - changed = true; - } else { - throw new ResourceNotFoundException("cannot remove task with id {" + taskId + "}, the task no longer exists"); - } - return this; - } - - /** - * Checks if the task is currently present in the list - */ - public boolean hasTask(String taskId) { - return tasks.containsKey(taskId); - } - - /** - * Checks if the task is currently present in the list and has the right allocation id - */ - public boolean hasTask(String taskId, long allocationId) { - PersistentTask taskInProgress = tasks.get(taskId); - if (taskInProgress != null) { - return taskInProgress.getAllocationId() == allocationId; - } - return false; - } - - Set getCurrentTaskIds() { - return tasks.keySet(); - } - - /** - * Returns true if any the task list was changed since the builder was created - */ - public boolean isChanged() { - return changed; + protected Builder(PersistentTasks tasksInProgress) { + super(tasksInProgress); } + @Override public PersistentTasksCustomMetadata build() { - return new PersistentTasksCustomMetadata(lastAllocationId, Collections.unmodifiableMap(tasks)); + return new PersistentTasksCustomMetadata(getLastAllocationId(), Collections.unmodifiableMap(getCurrentTasks())); } } } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java index 00d8847c346e..5c2ff6bf09a4 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java @@ -39,6 +39,21 @@ public abstract class PersistentTasksExecutor CLUSTER_SCOPED_TASKS = ConcurrentCollections.newConcurrentSet(); + private final Map> taskExecutors; public PersistentTasksExecutorRegistry(Collection> taskExecutors) { @@ -36,6 +40,9 @@ public class PersistentTasksExecutorRegistry { assert false : message; throw new IllegalStateException(message); } + if (executor.scope() == PersistentTasksExecutor.Scope.CLUSTER) { + CLUSTER_SCOPED_TASKS.add(executor.getTaskName()); + } } this.taskExecutors = Collections.unmodifiableMap(map); } @@ -48,4 +55,8 @@ public class PersistentTasksExecutorRegistry { } return executor; } + + public static boolean isClusterScopedTask(String taskName) { + return CLUSTER_SCOPED_TASKS.contains(taskName); + } } diff --git a/server/src/test/java/org/elasticsearch/persistent/BasePersistentTasksCustomMetadataTests.java b/server/src/test/java/org/elasticsearch/persistent/BasePersistentTasksCustomMetadataTests.java new file mode 100644 index 000000000000..4818716b4062 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/persistent/BasePersistentTasksCustomMetadataTests.java @@ -0,0 +1,274 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.persistent; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.test.ChunkedToXContentDiffableSerializationTestCase; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Optional; + +import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_GATEWAY; +import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_SNAPSHOT; +import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND; +import static org.elasticsearch.test.TransportVersionUtils.getFirstVersion; +import static org.elasticsearch.test.TransportVersionUtils.getNextVersion; +import static org.elasticsearch.test.TransportVersionUtils.getPreviousVersion; +import static org.elasticsearch.test.TransportVersionUtils.randomVersionBetween; +import static org.hamcrest.Matchers.contains; + +public abstract class BasePersistentTasksCustomMetadataTests> extends + ChunkedToXContentDiffableSerializationTestCase { + + @SuppressWarnings("unchecked") + @Override + protected T createTestInstance() { + int numberOfTasks = randomInt(10); + PersistentTasks.Builder tasks = builder(); + for (int i = 0; i < numberOfTasks; i++) { + String taskId = UUIDs.base64UUID(); + tasks.addTask( + taskId, + TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, + new TestPersistentTasksPlugin.TestParams(randomAlphaOfLength(10)), + randomAssignment() + ); + if (randomBoolean()) { + // From time to time update status + tasks.updateTaskState(taskId, new TestPersistentTasksPlugin.State(randomAlphaOfLength(10))); + } + } + return (T) tasks.build(); + } + + @Override + protected T mutateInstance(T instance) throws IOException { + return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929 + } + + @SuppressWarnings("unchecked") + @Override + protected T makeTestChanges(T testInstance) { + final PersistentTasks.Builder builder = builder(testInstance); + switch (randomInt(3)) { + case 0: + addRandomTask(builder); + break; + case 1: + if (builder.getCurrentTaskIds().isEmpty()) { + addRandomTask(builder); + } else { + builder.reassignTask(pickRandomTask(builder), randomAssignment()); + } + break; + case 2: + if (builder.getCurrentTaskIds().isEmpty()) { + addRandomTask(builder); + } else { + builder.updateTaskState( + pickRandomTask(builder), + randomBoolean() ? new TestPersistentTasksPlugin.State(randomAlphaOfLength(10)) : null + ); + } + break; + case 3: + if (builder.getCurrentTaskIds().isEmpty()) { + addRandomTask(builder); + } else { + builder.removeTask(pickRandomTask(builder)); + } + break; + } + return (T) builder.build(); + } + + @SuppressWarnings("unchecked") + public void testSerializationContext() throws Exception { + T testInstance = createTestInstance(); + for (int i = 0; i < randomInt(10); i++) { + testInstance = makeTestChanges(testInstance); + } + + ToXContent.MapParams params = new ToXContent.MapParams( + Collections.singletonMap(Metadata.CONTEXT_MODE_PARAM, randomFrom(CONTEXT_MODE_SNAPSHOT, CONTEXT_MODE_GATEWAY)) + ); + + XContentType xContentType = randomFrom(XContentType.values()); + BytesReference shuffled = toShuffledXContent(asXContent(testInstance), xContentType, params, false); + + T newInstance; + try (XContentParser parser = createParser(XContentFactory.xContent(xContentType), shuffled)) { + newInstance = doParseInstance(parser); + } + assertNotSame(newInstance, testInstance); + + assertEquals(asPersistentTasks(testInstance).tasks().size(), asPersistentTasks(newInstance).tasks().size()); + for (PersistentTasksCustomMetadata.PersistentTask testTask : asPersistentTasks(testInstance).tasks()) { + PersistentTasksCustomMetadata.PersistentTask newTask = + (PersistentTasksCustomMetadata.PersistentTask) asPersistentTasks(newInstance).getTask( + testTask.getId() + ); + assertNotNull(newTask); + + // Things that should be serialized + assertEquals(testTask.getTaskName(), newTask.getTaskName()); + assertEquals(testTask.getId(), newTask.getId()); + assertEquals(testTask.getState(), newTask.getState()); + assertEquals(testTask.getParams(), newTask.getParams()); + + // Things that shouldn't be serialized + assertEquals(0, newTask.getAllocationId()); + assertNull(newTask.getExecutorNode()); + } + } + + @SuppressWarnings("unchecked") + public void testBuilder() { + T persistentTasks = null; + String lastKnownTask = ""; + for (int i = 0; i < randomIntBetween(10, 100); i++) { + final PersistentTasks.Builder builder; + if (randomBoolean()) { + builder = builder(); + } else { + builder = builder(persistentTasks); + } + boolean changed = false; + for (int j = 0; j < randomIntBetween(1, 10); j++) { + switch (randomInt(3)) { + case 0: + lastKnownTask = addRandomTask(builder); + changed = true; + break; + case 1: + if (builder.hasTask(lastKnownTask)) { + changed = true; + builder.reassignTask(lastKnownTask, randomAssignment()); + } else { + String fLastKnownTask = lastKnownTask; + expectThrows(ResourceNotFoundException.class, () -> builder.reassignTask(fLastKnownTask, randomAssignment())); + } + break; + case 2: + if (builder.hasTask(lastKnownTask)) { + changed = true; + builder.updateTaskState( + lastKnownTask, + randomBoolean() ? new TestPersistentTasksPlugin.State(randomAlphaOfLength(10)) : null + ); + } else { + String fLastKnownTask = lastKnownTask; + expectThrows(ResourceNotFoundException.class, () -> builder.updateTaskState(fLastKnownTask, null)); + } + break; + case 3: + if (builder.hasTask(lastKnownTask)) { + changed = true; + builder.removeTask(lastKnownTask); + } else { + String fLastKnownTask = lastKnownTask; + expectThrows(ResourceNotFoundException.class, () -> builder.removeTask(fLastKnownTask)); + } + break; + } + } + assertEquals(changed, builder.isChanged()); + persistentTasks = (T) builder.build(); + } + } + + @SuppressWarnings("unchecked") + public void testMinVersionSerialization() throws IOException { + PersistentTasks.Builder tasks = builder(); + + TransportVersion minVersion = getFirstVersion(); + TransportVersion streamVersion = randomVersionBetween(random(), minVersion, getPreviousVersion(TransportVersion.current())); + tasks.addTask( + "test_compatible_version", + TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, + new TestPersistentTasksPlugin.TestParams( + null, + randomVersionBetween(random(), minVersion, streamVersion), + randomBoolean() ? Optional.empty() : Optional.of("test") + ), + randomAssignment() + ); + tasks.addTask( + "test_incompatible_version", + TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, + new TestPersistentTasksPlugin.TestParams( + null, + randomVersionBetween(random(), getNextVersion(streamVersion), TransportVersion.current()), + randomBoolean() ? Optional.empty() : Optional.of("test") + ), + randomAssignment() + ); + final BytesStreamOutput out = new BytesStreamOutput(); + + out.setTransportVersion(streamVersion); + ((T) tasks.build()).writeTo(out); + + final StreamInput input = out.bytes().streamInput(); + input.setTransportVersion(streamVersion); + PersistentTasksCustomMetadata read = new PersistentTasksCustomMetadata( + new NamedWriteableAwareStreamInput(input, getNamedWriteableRegistry()) + ); + + assertThat(read.taskMap().keySet(), contains("test_compatible_version")); + } + + private PersistentTasks asPersistentTasks(T testInstance) { + return (PersistentTasks) testInstance; + } + + protected String addRandomTask(PersistentTasks.Builder builder) { + String taskId = UUIDs.base64UUID(); + builder.addTask( + taskId, + TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, + new TestPersistentTasksPlugin.TestParams(randomAlphaOfLength(10)), + randomAssignment() + ); + return taskId; + } + + protected String pickRandomTask(PersistentTasks.Builder testInstance) { + return randomFrom(new ArrayList<>(testInstance.getCurrentTaskIds())); + } + + protected PersistentTasksCustomMetadata.Assignment randomAssignment() { + if (randomBoolean()) { + if (randomBoolean()) { + return NO_NODE_FOUND; + } else { + return new PersistentTasksCustomMetadata.Assignment(null, randomAlphaOfLength(10)); + } + } + return new PersistentTasksCustomMetadata.Assignment(randomAlphaOfLength(10), randomAlphaOfLength(10)); + } + + protected abstract PersistentTasks.Builder builder(); + + protected abstract PersistentTasks.Builder builder(T testInstance); + +} diff --git a/server/src/test/java/org/elasticsearch/persistent/ClusterPersistentTasksCustomMetadataTests.java b/server/src/test/java/org/elasticsearch/persistent/ClusterPersistentTasksCustomMetadataTests.java new file mode 100644 index 000000000000..92364b4e38a3 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/persistent/ClusterPersistentTasksCustomMetadataTests.java @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.persistent; + +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.cluster.metadata.Metadata.ClusterCustom; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.State; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParser; + +import java.util.Arrays; + +public class ClusterPersistentTasksCustomMetadataTests extends BasePersistentTasksCustomMetadataTests { + + @Override + protected Writeable.Reader instanceReader() { + return ClusterPersistentTasksCustomMetadata::new; + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry( + Arrays.asList( + new Entry(ClusterCustom.class, ClusterPersistentTasksCustomMetadata.TYPE, ClusterPersistentTasksCustomMetadata::new), + new Entry(NamedDiff.class, ClusterPersistentTasksCustomMetadata.TYPE, ClusterPersistentTasksCustomMetadata::readDiffFrom), + new Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new), + new Entry(PersistentTaskState.class, TestPersistentTasksExecutor.NAME, State::new) + ) + ); + } + + @Override + protected Writeable.Reader> diffReader() { + return ClusterPersistentTasksCustomMetadata::readDiffFrom; + } + + @Override + protected ClusterPersistentTasksCustomMetadata doParseInstance(XContentParser parser) { + return ClusterPersistentTasksCustomMetadata.fromXContent(parser); + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + return new NamedXContentRegistry( + Arrays.asList( + new NamedXContentRegistry.Entry( + PersistentTaskParams.class, + new ParseField(TestPersistentTasksExecutor.NAME), + TestParams::fromXContent + ), + new NamedXContentRegistry.Entry( + PersistentTaskState.class, + new ParseField(TestPersistentTasksExecutor.NAME), + State::fromXContent + ) + ) + ); + } + + @Override + protected PersistentTasks.Builder builder() { + return ClusterPersistentTasksCustomMetadata.builder(); + } + + @Override + protected PersistentTasks.Builder builder(ClusterCustom testInstance) { + return ClusterPersistentTasksCustomMetadata.builder((ClusterPersistentTasksCustomMetadata) testInstance); + } +} diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetadataTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetadataTests.java index 7c9bb119b4ee..4b86ace2f6a1 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetadataTests.java @@ -8,7 +8,6 @@ */ package org.elasticsearch.persistent; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.TransportVersion; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -20,68 +19,24 @@ import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment; -import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Builder; -import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; import org.elasticsearch.persistent.TestPersistentTasksPlugin.State; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; -import org.elasticsearch.test.ChunkedToXContentDiffableSerializationTestCase; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ParseField; -import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentParser; -import org.elasticsearch.xcontent.XContentType; -import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.Optional; -import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_GATEWAY; -import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_SNAPSHOT; -import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND; -import static org.elasticsearch.test.TransportVersionUtils.getFirstVersion; -import static org.elasticsearch.test.TransportVersionUtils.getNextVersion; -import static org.elasticsearch.test.TransportVersionUtils.getPreviousVersion; -import static org.elasticsearch.test.TransportVersionUtils.randomVersionBetween; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.sameInstance; -public class PersistentTasksCustomMetadataTests extends ChunkedToXContentDiffableSerializationTestCase { - - @Override - protected PersistentTasksCustomMetadata createTestInstance() { - int numberOfTasks = randomInt(10); - PersistentTasksCustomMetadata.Builder tasks = PersistentTasksCustomMetadata.builder(); - for (int i = 0; i < numberOfTasks; i++) { - String taskId = UUIDs.base64UUID(); - tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)), randomAssignment()); - if (randomBoolean()) { - // From time to time update status - tasks.updateTaskState(taskId, new State(randomAlphaOfLength(10))); - } - } - return tasks.build(); - } - - @Override - protected ProjectCustom mutateInstance(ProjectCustom instance) { - return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929 - } +public class PersistentTasksCustomMetadataTests extends BasePersistentTasksCustomMetadataTests { @Override protected Writeable.Reader instanceReader() { @@ -100,38 +55,6 @@ public class PersistentTasksCustomMetadataTests extends ChunkedToXContentDiffabl ); } - @Override - protected ProjectCustom makeTestChanges(ProjectCustom testInstance) { - Builder builder = PersistentTasksCustomMetadata.builder((PersistentTasksCustomMetadata) testInstance); - switch (randomInt(3)) { - case 0: - addRandomTask(builder); - break; - case 1: - if (builder.getCurrentTaskIds().isEmpty()) { - addRandomTask(builder); - } else { - builder.reassignTask(pickRandomTask(builder), randomAssignment()); - } - break; - case 2: - if (builder.getCurrentTaskIds().isEmpty()) { - addRandomTask(builder); - } else { - builder.updateTaskState(pickRandomTask(builder), randomBoolean() ? new State(randomAlphaOfLength(10)) : null); - } - break; - case 3: - if (builder.getCurrentTaskIds().isEmpty()) { - addRandomTask(builder); - } else { - builder.removeTask(pickRandomTask(builder)); - } - break; - } - return builder.build(); - } - @Override protected Writeable.Reader> diffReader() { return PersistentTasksCustomMetadata::readDiffFrom; @@ -142,16 +65,6 @@ public class PersistentTasksCustomMetadataTests extends ChunkedToXContentDiffabl return PersistentTasksCustomMetadata.fromXContent(parser); } - private String addRandomTask(Builder builder) { - String taskId = UUIDs.base64UUID(); - builder.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)), randomAssignment()); - return taskId; - } - - private String pickRandomTask(PersistentTasksCustomMetadata.Builder testInstance) { - return randomFrom(new ArrayList<>(testInstance.getCurrentTaskIds())); - } - @Override protected NamedXContentRegistry xContentRegistry() { return new NamedXContentRegistry( @@ -170,131 +83,14 @@ public class PersistentTasksCustomMetadataTests extends ChunkedToXContentDiffabl ); } - @SuppressWarnings("unchecked") - public void testSerializationContext() throws Exception { - PersistentTasksCustomMetadata testInstance = createTestInstance(); - for (int i = 0; i < randomInt(10); i++) { - testInstance = (PersistentTasksCustomMetadata) makeTestChanges(testInstance); - } - - ToXContent.MapParams params = new ToXContent.MapParams( - Collections.singletonMap(Metadata.CONTEXT_MODE_PARAM, randomFrom(CONTEXT_MODE_SNAPSHOT, CONTEXT_MODE_GATEWAY)) - ); - - XContentType xContentType = randomFrom(XContentType.values()); - BytesReference shuffled = toShuffledXContent(asXContent(testInstance), xContentType, params, false); - - PersistentTasksCustomMetadata newInstance; - try (XContentParser parser = createParser(XContentFactory.xContent(xContentType), shuffled)) { - newInstance = doParseInstance(parser); - } - assertNotSame(newInstance, testInstance); - - assertEquals(testInstance.tasks().size(), newInstance.tasks().size()); - for (PersistentTask testTask : testInstance.tasks()) { - PersistentTask newTask = (PersistentTask) newInstance.getTask(testTask.getId()); - assertNotNull(newTask); - - // Things that should be serialized - assertEquals(testTask.getTaskName(), newTask.getTaskName()); - assertEquals(testTask.getId(), newTask.getId()); - assertEquals(testTask.getState(), newTask.getState()); - assertEquals(testTask.getParams(), newTask.getParams()); - - // Things that shouldn't be serialized - assertEquals(0, newTask.getAllocationId()); - assertNull(newTask.getExecutorNode()); - } + @Override + protected PersistentTasks.Builder builder() { + return PersistentTasksCustomMetadata.builder(); } - public void testBuilder() { - PersistentTasksCustomMetadata persistentTasks = null; - String lastKnownTask = ""; - for (int i = 0; i < randomIntBetween(10, 100); i++) { - final Builder builder; - if (randomBoolean()) { - builder = PersistentTasksCustomMetadata.builder(); - } else { - builder = PersistentTasksCustomMetadata.builder(persistentTasks); - } - boolean changed = false; - for (int j = 0; j < randomIntBetween(1, 10); j++) { - switch (randomInt(3)) { - case 0: - lastKnownTask = addRandomTask(builder); - changed = true; - break; - case 1: - if (builder.hasTask(lastKnownTask)) { - changed = true; - builder.reassignTask(lastKnownTask, randomAssignment()); - } else { - String fLastKnownTask = lastKnownTask; - expectThrows(ResourceNotFoundException.class, () -> builder.reassignTask(fLastKnownTask, randomAssignment())); - } - break; - case 2: - if (builder.hasTask(lastKnownTask)) { - changed = true; - builder.updateTaskState(lastKnownTask, randomBoolean() ? new State(randomAlphaOfLength(10)) : null); - } else { - String fLastKnownTask = lastKnownTask; - expectThrows(ResourceNotFoundException.class, () -> builder.updateTaskState(fLastKnownTask, null)); - } - break; - case 3: - if (builder.hasTask(lastKnownTask)) { - changed = true; - builder.removeTask(lastKnownTask); - } else { - String fLastKnownTask = lastKnownTask; - expectThrows(ResourceNotFoundException.class, () -> builder.removeTask(fLastKnownTask)); - } - break; - } - } - assertEquals(changed, builder.isChanged()); - persistentTasks = builder.build(); - } - } - - public void testMinVersionSerialization() throws IOException { - PersistentTasksCustomMetadata.Builder tasks = PersistentTasksCustomMetadata.builder(); - - TransportVersion minVersion = getFirstVersion(); - TransportVersion streamVersion = randomVersionBetween(random(), minVersion, getPreviousVersion(TransportVersion.current())); - tasks.addTask( - "test_compatible_version", - TestPersistentTasksExecutor.NAME, - new TestParams( - null, - randomVersionBetween(random(), minVersion, streamVersion), - randomBoolean() ? Optional.empty() : Optional.of("test") - ), - randomAssignment() - ); - tasks.addTask( - "test_incompatible_version", - TestPersistentTasksExecutor.NAME, - new TestParams( - null, - randomVersionBetween(random(), getNextVersion(streamVersion), TransportVersion.current()), - randomBoolean() ? Optional.empty() : Optional.of("test") - ), - randomAssignment() - ); - final BytesStreamOutput out = new BytesStreamOutput(); - - out.setTransportVersion(streamVersion); - tasks.build().writeTo(out); - - final StreamInput input = out.bytes().streamInput(); - input.setTransportVersion(streamVersion); - PersistentTasksCustomMetadata read = new PersistentTasksCustomMetadata( - new NamedWriteableAwareStreamInput(input, getNamedWriteableRegistry()) - ); - - assertThat(read.taskMap().keySet(), contains("test_compatible_version")); + @Override + protected PersistentTasks.Builder builder(ProjectCustom testInstance) { + return PersistentTasksCustomMetadata.builder((PersistentTasksCustomMetadata) testInstance); } public void testDisassociateDeadNodes_givenNoPersistentTasks() { @@ -379,7 +175,7 @@ public class PersistentTasksCustomMetadataTests extends ChunkedToXContentDiffabl assertEquals(originalTasks.getTask("assigned-task"), returnedTasks.getTask("assigned-task")); assertNotEquals(originalTasks.getTask("task-on-deceased-node"), returnedTasks.getTask("task-on-deceased-node")); - assertEquals(PersistentTasksCustomMetadata.LOST_NODE_ASSIGNMENT, returnedTasks.getTask("task-on-deceased-node").getAssignment()); + assertEquals(PersistentTasks.LOST_NODE_ASSIGNMENT, returnedTasks.getTask("task-on-deceased-node").getAssignment()); } private PersistentTaskParams emptyTaskParams(String taskName) { @@ -406,15 +202,4 @@ public class PersistentTasksCustomMetadataTests extends ChunkedToXContentDiffabl } }; } - - private Assignment randomAssignment() { - if (randomBoolean()) { - if (randomBoolean()) { - return NO_NODE_FOUND; - } else { - return new Assignment(null, randomAlphaOfLength(10)); - } - } - return new Assignment(randomAlphaOfLength(10), randomAlphaOfLength(10)); - } } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorResponseTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorResponseTests.java index aae6e7650070..0315ab2f720f 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorResponseTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorResponseTests.java @@ -28,7 +28,7 @@ public class PersistentTasksExecutorResponseTests extends AbstractWireSerializin TestPersistentTasksExecutor.NAME, new TestPersistentTasksPlugin.TestParams("test"), randomLong(), - PersistentTasksCustomMetadata.INITIAL_ASSIGNMENT + PersistentTasks.INITIAL_ASSIGNMENT ) ); } else {