Add separate ClusterPersistentTasksCustomMetadata (MP-1945)

This PR adds ClusterPersistentTasksCustomMetadata by splitting from the
existing PersistentTasksCustomMetadata. The latter will be used for
project scoped persistent tasks. It also updates tasks executors to
declare its scope.

The new task type is not used anywhere yet and has no wire BWC handling.
Both will be addressed in follow-ups.

Split from: MP-1938
This commit is contained in:
Yang Wang 2025-02-07 11:08:34 +11:00 committed by GitHub
parent 5afe9e5ba0
commit 7ed771e3f9
10 changed files with 986 additions and 524 deletions

View file

@ -76,6 +76,7 @@ import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.injection.guice.AbstractModule;
import org.elasticsearch.persistent.ClusterPersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksNodeService;
import org.elasticsearch.plugins.ClusterPlugin;
@ -226,12 +227,20 @@ public class ClusterModule extends AbstractModule {
registerProjectCustom(entries, IngestMetadata.TYPE, IngestMetadata::new, IngestMetadata::readDiffFrom);
registerProjectCustom(entries, ScriptMetadata.TYPE, ScriptMetadata::new, ScriptMetadata::readDiffFrom);
registerProjectCustom(entries, IndexGraveyard.TYPE, IndexGraveyard::new, IndexGraveyard::readDiffFrom);
// Project scoped persistent tasks
registerProjectCustom(
entries,
PersistentTasksCustomMetadata.TYPE,
PersistentTasksCustomMetadata::new,
PersistentTasksCustomMetadata::readDiffFrom
);
// Cluster scoped persistent tasks
registerMetadataCustom(
entries,
ClusterPersistentTasksCustomMetadata.TYPE,
ClusterPersistentTasksCustomMetadata::new,
ClusterPersistentTasksCustomMetadata::readDiffFrom
);
registerProjectCustom(
entries,
ComponentTemplateMetadata.TYPE,
@ -283,6 +292,7 @@ public class ClusterModule extends AbstractModule {
entries.add(
new NamedXContentRegistry.Entry(Metadata.ProjectCustom.class, new ParseField(IndexGraveyard.TYPE), IndexGraveyard::fromXContent)
);
// Project scoped persistent tasks
entries.add(
new NamedXContentRegistry.Entry(
Metadata.ProjectCustom.class,
@ -290,6 +300,14 @@ public class ClusterModule extends AbstractModule {
PersistentTasksCustomMetadata::fromXContent
)
);
// Cluster scoped persistent tasks
entries.add(
new NamedXContentRegistry.Entry(
Metadata.ClusterCustom.class,
new ParseField(ClusterPersistentTasksCustomMetadata.TYPE),
ClusterPersistentTasksCustomMetadata::fromXContent
)
);
entries.add(
new NamedXContentRegistry.Entry(
Metadata.ProjectCustom.class,

View file

@ -0,0 +1,174 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.persistent;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentParser;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.cluster.metadata.Metadata.ALL_CONTEXTS;
import static org.elasticsearch.persistent.PersistentTasks.Parsers.PERSISTENT_TASK_PARSER;
/**
* A cluster state record that contains a list of all running persistent tasks for the cluster itself
*/
public final class ClusterPersistentTasksCustomMetadata extends AbstractNamedDiffable<Metadata.ClusterCustom>
implements
Metadata.ClusterCustom,
PersistentTasks {
public static final String TYPE = "cluster_persistent_tasks";
static final ObjectParser<Builder, Void> PERSISTENT_TASKS_PARSER = new ObjectParser<>(TYPE, Builder::new);
static {
// Tasks parser initialization
PERSISTENT_TASKS_PARSER.declareLong(Builder::setLastAllocationId, new ParseField("last_allocation_id"));
PERSISTENT_TASKS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_PARSER, new ParseField("tasks"));
}
@Deprecated(forRemoval = true)
public static ClusterPersistentTasksCustomMetadata getPersistentTasksCustomMetadata(ClusterState clusterState) {
return get(clusterState.metadata());
}
public static ClusterPersistentTasksCustomMetadata get(Metadata metadata) {
return metadata.custom(TYPE);
}
// TODO: Implement custom Diff for tasks
private final Map<String, PersistentTasksCustomMetadata.PersistentTask<?>> tasks;
private final long lastAllocationId;
public ClusterPersistentTasksCustomMetadata(long lastAllocationId, Map<String, PersistentTasksCustomMetadata.PersistentTask<?>> tasks) {
this.lastAllocationId = lastAllocationId;
this.tasks = tasks;
}
public ClusterPersistentTasksCustomMetadata(StreamInput in) throws IOException {
lastAllocationId = in.readLong();
tasks = in.readMap(PersistentTasksCustomMetadata.PersistentTask::new);
}
public static ClusterPersistentTasksCustomMetadata fromXContent(XContentParser parser) {
return PERSISTENT_TASKS_PARSER.apply(parser, null).build();
}
@SuppressWarnings("unchecked")
public static <Params extends PersistentTaskParams> PersistentTasksCustomMetadata.PersistentTask<Params> getTaskWithId(
ClusterState clusterState,
String taskId
) {
ClusterPersistentTasksCustomMetadata tasks = get(clusterState.metadata());
if (tasks != null) {
return (PersistentTasksCustomMetadata.PersistentTask<Params>) tasks.getTask(taskId);
}
return null;
}
@Override
public long getLastAllocationId() {
return lastAllocationId;
}
@Override
public Map<String, PersistentTasksCustomMetadata.PersistentTask<?>> taskMap() {
return this.tasks;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClusterPersistentTasksCustomMetadata that = (ClusterPersistentTasksCustomMetadata) o;
return lastAllocationId == that.lastAllocationId && Objects.equals(tasks, that.tasks);
}
@Override
public int hashCode() {
return Objects.hash(tasks, lastAllocationId);
}
@Override
public String toString() {
return Strings.toString(this);
}
@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersions.MINIMUM_COMPATIBLE;
}
@Override
public EnumSet<Metadata.XContentContext> context() {
return ALL_CONTEXTS;
}
@Override
public String getWriteableName() {
return TYPE;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
doWriteTo(out);
}
public static NamedDiff<Metadata.ClusterCustom> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(Metadata.ClusterCustom.class, TYPE, in);
}
@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
return doToXContentChunked();
}
public static Builder builder() {
return new Builder();
}
public static Builder builder(ClusterPersistentTasksCustomMetadata tasks) {
return new Builder(tasks);
}
public static class Builder extends PersistentTasks.Builder<Builder> {
protected Builder() {
super();
}
protected Builder(PersistentTasks tasksInProgress) {
super(tasksInProgress);
}
@Override
public ClusterPersistentTasksCustomMetadata build() {
return new ClusterPersistentTasksCustomMetadata(getLastAllocationId(), Collections.unmodifiableMap(getCurrentTasks()));
}
}
}

View file

@ -0,0 +1,375 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.persistent;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentParser;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
/**
* The common data structure for persistent tasks, both cluster-scoped and project-scoped.
* It is meant to be used in places where both type of tasks are processed similarly to achieve better code reuse.
*/
public interface PersistentTasks {
String API_CONTEXT = Metadata.XContentContext.API.toString();
Assignment LOST_NODE_ASSIGNMENT = new Assignment(null, "awaiting reassignment after node loss");
Assignment INITIAL_ASSIGNMENT = new Assignment(null, "waiting for initial assignment");
/**
* @return The last allocation id for the tasks.
*/
long getLastAllocationId();
/**
* @return The map of actual tasks keyed by task ID.
*/
Map<String, PersistentTask<?>> taskMap();
/**
* @return A collection of all tasks
*/
default Collection<PersistentTask<?>> tasks() {
return taskMap().values();
}
/**
* @param id The task ID
* @return The task with the specified task ID
*/
default PersistentTask<?> getTask(String id) {
return taskMap().get(id);
}
/**
* @param taskName The task name, see also {@link PersistentTasksExecutor#getTaskName()}
* @param predicate The filter for the matching tasks
* @return A collection of tasks matching the specified taskName and predicate
*/
default Collection<PersistentTask<?>> findTasks(String taskName, Predicate<PersistentTask<?>> predicate) {
return tasks().stream().filter(p -> taskName.equals(p.getTaskName())).filter(predicate).toList();
}
/**
* @param nodeId The node ID where the task runs
* @param taskName The task name, see also {@link PersistentTasksExecutor#getTaskName()}
* @return The number of tasks of the taskName currently running on the specified nodeId
*/
default long getNumberOfTasksOnNode(String nodeId, String taskName) {
return tasks().stream()
.filter(task -> taskName.equals(task.getTaskName()) && nodeId.equals(task.getAssignment().getExecutorNode()))
.count();
}
default void doWriteTo(StreamOutput out) throws IOException {
out.writeLong(getLastAllocationId());
Map<String, PersistentTask<?>> filteredTasks = tasks().stream()
.filter(t -> VersionedNamedWriteable.shouldSerialize(out, t.getParams()))
.collect(Collectors.toMap(PersistentTask::getId, Function.identity()));
out.writeMap(filteredTasks, StreamOutput::writeWriteable);
}
default Iterator<? extends ToXContent> doToXContentChunked() {
return Iterators.concat(
Iterators.single((builder, params) -> builder.field("last_allocation_id", getLastAllocationId())),
ChunkedToXContentHelper.array("tasks", tasks().iterator())
);
}
class Parsers {
public static final ConstructingObjectParser<Assignment, Void> ASSIGNMENT_PARSER = new ConstructingObjectParser<>(
"assignment",
objects -> new Assignment((String) objects[0], (String) objects[1])
);
static final ObjectParser<TaskBuilder<PersistentTaskParams>, Void> PERSISTENT_TASK_PARSER = new ObjectParser<>(
"tasks",
TaskBuilder::new
);
private static final ObjectParser.NamedObjectParser<TaskDescriptionBuilder<PersistentTaskParams>, Void> TASK_DESCRIPTION_PARSER;
static {
// Task description parser initialization
ObjectParser<TaskDescriptionBuilder<PersistentTaskParams>, String> parser = new ObjectParser<>("named");
parser.declareObject(
TaskDescriptionBuilder::setParams,
(p, c) -> p.namedObject(PersistentTaskParams.class, c, null),
new ParseField("params")
);
parser.declareObject(
TaskDescriptionBuilder::setState,
(p, c) -> p.namedObject(PersistentTaskState.class, c, null),
new ParseField("state", "status")
);
TASK_DESCRIPTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new TaskDescriptionBuilder<>(name), name);
// Assignment parser
ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("executor_node"));
ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("explanation"));
// Task parser initialization
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setId, new ParseField("id"));
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setTaskName, new ParseField("name"));
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id"));
PERSISTENT_TASK_PARSER.declareNamedObjects(
(TaskBuilder<PersistentTaskParams> taskBuilder, List<TaskDescriptionBuilder<PersistentTaskParams>> objects) -> {
if (objects.size() != 1) {
throw new IllegalArgumentException("only one task description per task is allowed");
}
TaskDescriptionBuilder<PersistentTaskParams> builder = objects.get(0);
taskBuilder.setTaskName(builder.taskName);
taskBuilder.setParams(builder.params);
taskBuilder.setState(builder.state);
},
TASK_DESCRIPTION_PARSER,
new ParseField("task")
);
PERSISTENT_TASK_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment"));
PERSISTENT_TASK_PARSER.declareLong(
TaskBuilder::setAllocationIdOnLastStatusUpdate,
new ParseField("allocation_id_on_last_status_update")
);
}
}
/**
* Private builder used in XContent parser to build task-specific portion (params and state)
*/
class TaskDescriptionBuilder<Params extends PersistentTaskParams> {
private final String taskName;
private Params params;
private PersistentTaskState state;
private TaskDescriptionBuilder(String taskName) {
this.taskName = taskName;
}
private TaskDescriptionBuilder<Params> setParams(Params params) {
this.params = params;
return this;
}
private TaskDescriptionBuilder<Params> setState(PersistentTaskState state) {
this.state = state;
return this;
}
}
class TaskBuilder<Params extends PersistentTaskParams> {
private String id;
private long allocationId;
private String taskName;
private Params params;
private PersistentTaskState state;
private Assignment assignment = INITIAL_ASSIGNMENT;
private Long allocationIdOnLastStatusUpdate;
public TaskBuilder<Params> setId(String id) {
this.id = id;
return this;
}
public TaskBuilder<Params> setAllocationId(long allocationId) {
this.allocationId = allocationId;
return this;
}
public TaskBuilder<Params> setTaskName(String taskName) {
this.taskName = taskName;
return this;
}
public TaskBuilder<Params> setParams(Params params) {
this.params = params;
return this;
}
public TaskBuilder<Params> setState(PersistentTaskState state) {
this.state = state;
return this;
}
public TaskBuilder<Params> setAssignment(Assignment assignment) {
this.assignment = assignment;
return this;
}
public TaskBuilder<Params> setAllocationIdOnLastStatusUpdate(Long allocationIdOnLastStatusUpdate) {
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
return this;
}
public PersistentTask<Params> build() {
return new PersistentTask<>(id, allocationId, taskName, params, state, assignment, allocationIdOnLastStatusUpdate);
}
}
abstract class Builder<T extends Builder<T>> {
private final Map<String, PersistentTask<?>> tasks = new HashMap<>();
private long lastAllocationId;
private boolean changed;
protected Builder() {}
protected Builder(PersistentTasks tasksInProgress) {
if (tasksInProgress != null) {
tasks.putAll(tasksInProgress.taskMap());
lastAllocationId = tasksInProgress.getLastAllocationId();
} else {
lastAllocationId = 0;
}
}
public long getLastAllocationId() {
return lastAllocationId;
}
@SuppressWarnings("unchecked")
protected T setLastAllocationId(long currentId) {
this.lastAllocationId = currentId;
return (T) this;
}
@SuppressWarnings("unchecked")
protected <Params extends PersistentTaskParams> T setTasks(List<TaskBuilder<Params>> tasks) {
for (TaskBuilder<Params> builder : tasks) {
PersistentTask<?> task = builder.build();
this.tasks.put(task.getId(), task);
}
return (T) this;
}
private long getNextAllocationId() {
lastAllocationId++;
return lastAllocationId;
}
/**
* Adds a new task to the builder
* <p>
* After the task is added its id can be found by calling {{@link #getLastAllocationId()}} method.
*/
@SuppressWarnings("unchecked")
public <Params extends PersistentTaskParams> T addTask(String taskId, String taskName, Params params, Assignment assignment) {
changed = true;
PersistentTask<?> previousTask = tasks.put(
taskId,
new PersistentTask<>(taskId, taskName, params, getNextAllocationId(), assignment)
);
if (previousTask != null) {
throw new ResourceAlreadyExistsException("Trying to override task with id {" + taskId + "}");
}
return (T) this;
}
/**
* Reassigns the task to another node
*/
@SuppressWarnings("unchecked")
public T reassignTask(String taskId, Assignment assignment) {
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
tasks.put(taskId, new PersistentTask<>(taskInProgress, getNextAllocationId(), assignment));
} else {
throw new ResourceNotFoundException("cannot reassign task with id {" + taskId + "}, the task no longer exists");
}
return (T) this;
}
/**
* Updates the task state
*/
@SuppressWarnings("unchecked")
public T updateTaskState(final String taskId, final PersistentTaskState taskState) {
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
tasks.put(taskId, new PersistentTask<>(taskInProgress, taskState));
} else {
throw new ResourceNotFoundException("cannot update task with id {" + taskId + "}, the task no longer exists");
}
return (T) this;
}
/**
* Removes the task
*/
@SuppressWarnings("unchecked")
public T removeTask(String taskId) {
if (tasks.remove(taskId) != null) {
changed = true;
} else {
throw new ResourceNotFoundException("cannot remove task with id {" + taskId + "}, the task no longer exists");
}
return (T) this;
}
/**
* Checks if the task is currently present in the list
*/
public boolean hasTask(String taskId) {
return tasks.containsKey(taskId);
}
/**
* Checks if the task is currently present in the list and has the right allocation id
*/
public boolean hasTask(String taskId, long allocationId) {
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
return taskInProgress.getAllocationId() == allocationId;
}
return false;
}
Map<String, PersistentTask<?>> getCurrentTasks() {
return tasks;
}
Set<String> getCurrentTaskIds() {
return tasks.keySet();
}
/**
* Returns true if any the task list was changed since the builder was created
*/
public boolean isChanged() {
return changed;
}
public abstract PersistentTasks build();
}
}

View file

@ -8,8 +8,6 @@
*/
package org.elasticsearch.persistent;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.AbstractNamedDiffable;
@ -19,16 +17,12 @@ import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ObjectParser.NamedObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;
@ -36,30 +30,25 @@ import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.metadata.Metadata.ALL_CONTEXTS;
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.persistent.PersistentTasks.Parsers.PERSISTENT_TASK_PARSER;
/**
* A cluster state record that contains a list of all running persistent tasks
* A cluster state record that contains a list of all running persistent tasks from a project
*/
public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable<Metadata.ProjectCustom> implements Metadata.ProjectCustom {
@FixForMultiProject(description = "Consider renaming it to ProjectPersistentTasksCustomMetadata")
public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable<Metadata.ProjectCustom>
implements
Metadata.ProjectCustom,
PersistentTasks {
public static final String TYPE = "persistent_tasks";
private static final String API_CONTEXT = Metadata.XContentContext.API.toString();
static final Assignment LOST_NODE_ASSIGNMENT = new Assignment(null, "awaiting reassignment after node loss");
// TODO: Implement custom Diff for tasks
private final Map<String, PersistentTask<?>> tasks;
@ -72,64 +61,10 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable<M
private static final ObjectParser<Builder, Void> PERSISTENT_TASKS_PARSER = new ObjectParser<>(TYPE, Builder::new);
private static final ObjectParser<TaskBuilder<PersistentTaskParams>, Void> PERSISTENT_TASK_PARSER = new ObjectParser<>(
"tasks",
TaskBuilder::new
);
public static final ConstructingObjectParser<Assignment, Void> ASSIGNMENT_PARSER = new ConstructingObjectParser<>(
"assignment",
objects -> new Assignment((String) objects[0], (String) objects[1])
);
private static final NamedObjectParser<TaskDescriptionBuilder<PersistentTaskParams>, Void> TASK_DESCRIPTION_PARSER;
static {
// Tasks parser initialization
PERSISTENT_TASKS_PARSER.declareLong(Builder::setLastAllocationId, new ParseField("last_allocation_id"));
PERSISTENT_TASKS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_PARSER, new ParseField("tasks"));
// Task description parser initialization
ObjectParser<TaskDescriptionBuilder<PersistentTaskParams>, String> parser = new ObjectParser<>("named");
parser.declareObject(
TaskDescriptionBuilder::setParams,
(p, c) -> p.namedObject(PersistentTaskParams.class, c, null),
new ParseField("params")
);
parser.declareObject(
TaskDescriptionBuilder::setState,
(p, c) -> p.namedObject(PersistentTaskState.class, c, null),
new ParseField("state", "status")
);
TASK_DESCRIPTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new TaskDescriptionBuilder<>(name), name);
// Assignment parser
ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("executor_node"));
ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("explanation"));
// Task parser initialization
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setId, new ParseField("id"));
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setTaskName, new ParseField("name"));
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id"));
PERSISTENT_TASK_PARSER.declareNamedObjects(
(TaskBuilder<PersistentTaskParams> taskBuilder, List<TaskDescriptionBuilder<PersistentTaskParams>> objects) -> {
if (objects.size() != 1) {
throw new IllegalArgumentException("only one task description per task is allowed");
}
TaskDescriptionBuilder<PersistentTaskParams> builder = objects.get(0);
taskBuilder.setTaskName(builder.taskName);
taskBuilder.setParams(builder.params);
taskBuilder.setState(builder.state);
},
TASK_DESCRIPTION_PARSER,
new ParseField("task")
);
PERSISTENT_TASK_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment"));
PERSISTENT_TASK_PARSER.declareLong(
TaskBuilder::setAllocationIdOnLastStatusUpdate,
new ParseField("allocation_id_on_last_status_update")
);
}
@Deprecated(forRemoval = true)
@ -138,49 +73,19 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable<M
}
public static PersistentTasksCustomMetadata get(ProjectMetadata projectMetadata) {
return projectMetadata.custom(PersistentTasksCustomMetadata.TYPE);
return projectMetadata.custom(TYPE);
}
/**
* Private builder used in XContent parser to build task-specific portion (params and state)
*/
private static class TaskDescriptionBuilder<Params extends PersistentTaskParams> {
private final String taskName;
private Params params;
private PersistentTaskState state;
private TaskDescriptionBuilder(String taskName) {
this.taskName = taskName;
}
private TaskDescriptionBuilder<Params> setParams(Params params) {
this.params = params;
return this;
}
private TaskDescriptionBuilder<Params> setState(PersistentTaskState state) {
this.state = state;
return this;
}
}
public Collection<PersistentTask<?>> tasks() {
return this.tasks.values();
@Override
public long getLastAllocationId() {
return lastAllocationId;
}
@Override
public Map<String, PersistentTask<?>> taskMap() {
return this.tasks;
}
public PersistentTask<?> getTask(String id) {
return this.tasks.get(id);
}
public Collection<PersistentTask<?>> findTasks(String taskName, Predicate<PersistentTask<?>> predicate) {
return this.tasks().stream().filter(p -> taskName.equals(p.getTaskName())).filter(predicate).toList();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -199,13 +104,6 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable<M
return Strings.toString(this);
}
public long getNumberOfTasksOnNode(String nodeId, String taskName) {
return tasks.values()
.stream()
.filter(task -> taskName.equals(task.taskName) && nodeId.equals(task.assignment.executorNode))
.count();
}
@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersions.MINIMUM_COMPATIBLE;
@ -222,7 +120,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable<M
@SuppressWarnings("unchecked")
public static <Params extends PersistentTaskParams> PersistentTask<Params> getTaskWithId(ClusterState clusterState, String taskId) {
PersistentTasksCustomMetadata tasks = clusterState.metadata().getProject().custom(PersistentTasksCustomMetadata.TYPE);
PersistentTasksCustomMetadata tasks = get(clusterState.metadata().getProject());
if (tasks != null) {
return (PersistentTask<Params>) tasks.getTask(taskId);
}
@ -233,7 +131,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable<M
* Unassign any persistent tasks executing on nodes that are no longer in
* the cluster. If the task's assigment has a non-null executor node and that
* node is no longer in the cluster then the assignment is set to
* {@link #LOST_NODE_ASSIGNMENT}
* {@link PersistentTasks#LOST_NODE_ASSIGNMENT}
*
* @param clusterState The clusterstate
* @return If no changes the argument {@code clusterState} is returned else
@ -265,7 +163,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable<M
return projectState.metadata();
}
PersistentTasksCustomMetadata.Builder taskBuilder = PersistentTasksCustomMetadata.builder(tasks);
var taskBuilder = PersistentTasksCustomMetadata.builder(tasks);
for (PersistentTask<?> task : tasks.tasks()) {
if (task.getAssignment().getExecutorNode() != null
&& projectState.cluster().nodes().nodeExists(task.getAssignment().getExecutorNode()) == false) {
@ -282,6 +180,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable<M
return metadataBuilder.build();
}
@FixForMultiProject(description = "Consider moving it to PersistentTasks")
public static class Assignment {
@Nullable
private final String executorNode;
@ -325,11 +224,10 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable<M
}
}
public static final Assignment INITIAL_ASSIGNMENT = new Assignment(null, "waiting for initial assignment");
/**
* A record that represents a single running persistent task
*/
@FixForMultiProject(description = "Consider moving it to PersistentTasks")
public static class PersistentTask<P extends PersistentTaskParams> implements Writeable, ToXContentObject {
private final String id;
@ -352,7 +250,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable<M
this(task.id, task.allocationId, task.taskName, task.params, state, task.assignment, task.allocationId);
}
private PersistentTask(
PersistentTask(
final String id,
final long allocationId,
final String name,
@ -505,55 +403,6 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable<M
}
}
private static class TaskBuilder<Params extends PersistentTaskParams> {
private String id;
private long allocationId;
private String taskName;
private Params params;
private PersistentTaskState state;
private Assignment assignment = INITIAL_ASSIGNMENT;
private Long allocationIdOnLastStatusUpdate;
public TaskBuilder<Params> setId(String id) {
this.id = id;
return this;
}
public TaskBuilder<Params> setAllocationId(long allocationId) {
this.allocationId = allocationId;
return this;
}
public TaskBuilder<Params> setTaskName(String taskName) {
this.taskName = taskName;
return this;
}
public TaskBuilder<Params> setParams(Params params) {
this.params = params;
return this;
}
public TaskBuilder<Params> setState(PersistentTaskState state) {
this.state = state;
return this;
}
public TaskBuilder<Params> setAssignment(Assignment assignment) {
this.assignment = assignment;
return this;
}
public TaskBuilder<Params> setAllocationIdOnLastStatusUpdate(Long allocationIdOnLastStatusUpdate) {
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
return this;
}
public PersistentTask<Params> build() {
return new PersistentTask<>(id, allocationId, taskName, params, state, assignment, allocationIdOnLastStatusUpdate);
}
}
@Override
public String getWriteableName() {
return TYPE;
@ -566,12 +415,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable<M
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(lastAllocationId);
Map<String, PersistentTask<?>> filteredTasks = tasks.values()
.stream()
.filter(t -> VersionedNamedWriteable.shouldSerialize(out, t.getParams()))
.collect(Collectors.toMap(PersistentTask::getId, Function.identity()));
out.writeMap(filteredTasks, StreamOutput::writeWriteable);
doWriteTo(out);
}
public static NamedDiff<Metadata.ProjectCustom> readDiffFrom(StreamInput in) throws IOException {
@ -580,10 +424,7 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable<M
@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
return Iterators.concat(
Iterators.single((builder, params) -> builder.field("last_allocation_id", lastAllocationId)),
ChunkedToXContentHelper.array("tasks", tasks.values().iterator())
);
return doToXContentChunked();
}
public static Builder builder() {
@ -594,132 +435,19 @@ public final class PersistentTasksCustomMetadata extends AbstractNamedDiffable<M
return new Builder(tasks);
}
public static class Builder {
private final Map<String, PersistentTask<?>> tasks = new HashMap<>();
private long lastAllocationId;
private boolean changed;
public static class Builder extends PersistentTasks.Builder<Builder> {
private Builder() {}
private Builder(PersistentTasksCustomMetadata tasksInProgress) {
if (tasksInProgress != null) {
tasks.putAll(tasksInProgress.tasks);
lastAllocationId = tasksInProgress.lastAllocationId;
} else {
lastAllocationId = 0;
}
protected Builder() {
super();
}
public long getLastAllocationId() {
return lastAllocationId;
}
private Builder setLastAllocationId(long currentId) {
this.lastAllocationId = currentId;
return this;
}
private <Params extends PersistentTaskParams> Builder setTasks(List<TaskBuilder<Params>> tasks) {
for (TaskBuilder<Params> builder : tasks) {
PersistentTask<?> task = builder.build();
this.tasks.put(task.getId(), task);
}
return this;
}
private long getNextAllocationId() {
lastAllocationId++;
return lastAllocationId;
}
/**
* Adds a new task to the builder
* <p>
* After the task is added its id can be found by calling {{@link #getLastAllocationId()}} method.
*/
public <Params extends PersistentTaskParams> Builder addTask(String taskId, String taskName, Params params, Assignment assignment) {
changed = true;
PersistentTask<?> previousTask = tasks.put(
taskId,
new PersistentTask<>(taskId, taskName, params, getNextAllocationId(), assignment)
);
if (previousTask != null) {
throw new ResourceAlreadyExistsException("Trying to override task with id {" + taskId + "}");
}
return this;
}
/**
* Reassigns the task to another node
*/
public Builder reassignTask(String taskId, Assignment assignment) {
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
tasks.put(taskId, new PersistentTask<>(taskInProgress, getNextAllocationId(), assignment));
} else {
throw new ResourceNotFoundException("cannot reassign task with id {" + taskId + "}, the task no longer exists");
}
return this;
}
/**
* Updates the task state
*/
public Builder updateTaskState(final String taskId, final PersistentTaskState taskState) {
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
tasks.put(taskId, new PersistentTask<>(taskInProgress, taskState));
} else {
throw new ResourceNotFoundException("cannot update task with id {" + taskId + "}, the task no longer exists");
}
return this;
}
/**
* Removes the task
*/
public Builder removeTask(String taskId) {
if (tasks.remove(taskId) != null) {
changed = true;
} else {
throw new ResourceNotFoundException("cannot remove task with id {" + taskId + "}, the task no longer exists");
}
return this;
}
/**
* Checks if the task is currently present in the list
*/
public boolean hasTask(String taskId) {
return tasks.containsKey(taskId);
}
/**
* Checks if the task is currently present in the list and has the right allocation id
*/
public boolean hasTask(String taskId, long allocationId) {
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
return taskInProgress.getAllocationId() == allocationId;
}
return false;
}
Set<String> getCurrentTaskIds() {
return tasks.keySet();
}
/**
* Returns true if any the task list was changed since the builder was created
*/
public boolean isChanged() {
return changed;
protected Builder(PersistentTasks tasksInProgress) {
super(tasksInProgress);
}
@Override
public PersistentTasksCustomMetadata build() {
return new PersistentTasksCustomMetadata(lastAllocationId, Collections.unmodifiableMap(tasks));
return new PersistentTasksCustomMetadata(getLastAllocationId(), Collections.unmodifiableMap(getCurrentTasks()));
}
}
}

View file

@ -39,6 +39,21 @@ public abstract class PersistentTasksExecutor<Params extends PersistentTaskParam
return taskName;
}
public enum Scope {
/**
* The persistent task runs separately for each project
*/
PROJECT,
/**
* The persistent task runs for the cluster itself with no project context
*/
CLUSTER
}
public Scope scope() {
return Scope.PROJECT;
}
public static final Assignment NO_NODE_FOUND = new Assignment(null, "no appropriate nodes found for the assignment");
/**

View file

@ -8,18 +8,22 @@
*/
package org.elasticsearch.persistent;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.Strings;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* Components that registers all persistent task executors
*/
public class PersistentTasksExecutorRegistry {
private static final Set<String> CLUSTER_SCOPED_TASKS = ConcurrentCollections.newConcurrentSet();
private final Map<String, PersistentTasksExecutor<?>> taskExecutors;
public PersistentTasksExecutorRegistry(Collection<PersistentTasksExecutor<?>> taskExecutors) {
@ -36,6 +40,9 @@ public class PersistentTasksExecutorRegistry {
assert false : message;
throw new IllegalStateException(message);
}
if (executor.scope() == PersistentTasksExecutor.Scope.CLUSTER) {
CLUSTER_SCOPED_TASKS.add(executor.getTaskName());
}
}
this.taskExecutors = Collections.unmodifiableMap(map);
}
@ -48,4 +55,8 @@ public class PersistentTasksExecutorRegistry {
}
return executor;
}
public static boolean isClusterScopedTask(String taskName) {
return CLUSTER_SCOPED_TASKS.contains(taskName);
}
}

View file

@ -0,0 +1,274 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.persistent;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ChunkedToXContentDiffableSerializationTestCase;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_GATEWAY;
import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_SNAPSHOT;
import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND;
import static org.elasticsearch.test.TransportVersionUtils.getFirstVersion;
import static org.elasticsearch.test.TransportVersionUtils.getNextVersion;
import static org.elasticsearch.test.TransportVersionUtils.getPreviousVersion;
import static org.elasticsearch.test.TransportVersionUtils.randomVersionBetween;
import static org.hamcrest.Matchers.contains;
public abstract class BasePersistentTasksCustomMetadataTests<T extends Metadata.MetadataCustom<T>> extends
ChunkedToXContentDiffableSerializationTestCase<T> {
@SuppressWarnings("unchecked")
@Override
protected T createTestInstance() {
int numberOfTasks = randomInt(10);
PersistentTasks.Builder<?> tasks = builder();
for (int i = 0; i < numberOfTasks; i++) {
String taskId = UUIDs.base64UUID();
tasks.addTask(
taskId,
TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME,
new TestPersistentTasksPlugin.TestParams(randomAlphaOfLength(10)),
randomAssignment()
);
if (randomBoolean()) {
// From time to time update status
tasks.updateTaskState(taskId, new TestPersistentTasksPlugin.State(randomAlphaOfLength(10)));
}
}
return (T) tasks.build();
}
@Override
protected T mutateInstance(T instance) throws IOException {
return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929
}
@SuppressWarnings("unchecked")
@Override
protected T makeTestChanges(T testInstance) {
final PersistentTasks.Builder<?> builder = builder(testInstance);
switch (randomInt(3)) {
case 0:
addRandomTask(builder);
break;
case 1:
if (builder.getCurrentTaskIds().isEmpty()) {
addRandomTask(builder);
} else {
builder.reassignTask(pickRandomTask(builder), randomAssignment());
}
break;
case 2:
if (builder.getCurrentTaskIds().isEmpty()) {
addRandomTask(builder);
} else {
builder.updateTaskState(
pickRandomTask(builder),
randomBoolean() ? new TestPersistentTasksPlugin.State(randomAlphaOfLength(10)) : null
);
}
break;
case 3:
if (builder.getCurrentTaskIds().isEmpty()) {
addRandomTask(builder);
} else {
builder.removeTask(pickRandomTask(builder));
}
break;
}
return (T) builder.build();
}
@SuppressWarnings("unchecked")
public void testSerializationContext() throws Exception {
T testInstance = createTestInstance();
for (int i = 0; i < randomInt(10); i++) {
testInstance = makeTestChanges(testInstance);
}
ToXContent.MapParams params = new ToXContent.MapParams(
Collections.singletonMap(Metadata.CONTEXT_MODE_PARAM, randomFrom(CONTEXT_MODE_SNAPSHOT, CONTEXT_MODE_GATEWAY))
);
XContentType xContentType = randomFrom(XContentType.values());
BytesReference shuffled = toShuffledXContent(asXContent(testInstance), xContentType, params, false);
T newInstance;
try (XContentParser parser = createParser(XContentFactory.xContent(xContentType), shuffled)) {
newInstance = doParseInstance(parser);
}
assertNotSame(newInstance, testInstance);
assertEquals(asPersistentTasks(testInstance).tasks().size(), asPersistentTasks(newInstance).tasks().size());
for (PersistentTasksCustomMetadata.PersistentTask<?> testTask : asPersistentTasks(testInstance).tasks()) {
PersistentTasksCustomMetadata.PersistentTask<TestPersistentTasksPlugin.TestParams> newTask =
(PersistentTasksCustomMetadata.PersistentTask<TestPersistentTasksPlugin.TestParams>) asPersistentTasks(newInstance).getTask(
testTask.getId()
);
assertNotNull(newTask);
// Things that should be serialized
assertEquals(testTask.getTaskName(), newTask.getTaskName());
assertEquals(testTask.getId(), newTask.getId());
assertEquals(testTask.getState(), newTask.getState());
assertEquals(testTask.getParams(), newTask.getParams());
// Things that shouldn't be serialized
assertEquals(0, newTask.getAllocationId());
assertNull(newTask.getExecutorNode());
}
}
@SuppressWarnings("unchecked")
public void testBuilder() {
T persistentTasks = null;
String lastKnownTask = "";
for (int i = 0; i < randomIntBetween(10, 100); i++) {
final PersistentTasks.Builder<?> builder;
if (randomBoolean()) {
builder = builder();
} else {
builder = builder(persistentTasks);
}
boolean changed = false;
for (int j = 0; j < randomIntBetween(1, 10); j++) {
switch (randomInt(3)) {
case 0:
lastKnownTask = addRandomTask(builder);
changed = true;
break;
case 1:
if (builder.hasTask(lastKnownTask)) {
changed = true;
builder.reassignTask(lastKnownTask, randomAssignment());
} else {
String fLastKnownTask = lastKnownTask;
expectThrows(ResourceNotFoundException.class, () -> builder.reassignTask(fLastKnownTask, randomAssignment()));
}
break;
case 2:
if (builder.hasTask(lastKnownTask)) {
changed = true;
builder.updateTaskState(
lastKnownTask,
randomBoolean() ? new TestPersistentTasksPlugin.State(randomAlphaOfLength(10)) : null
);
} else {
String fLastKnownTask = lastKnownTask;
expectThrows(ResourceNotFoundException.class, () -> builder.updateTaskState(fLastKnownTask, null));
}
break;
case 3:
if (builder.hasTask(lastKnownTask)) {
changed = true;
builder.removeTask(lastKnownTask);
} else {
String fLastKnownTask = lastKnownTask;
expectThrows(ResourceNotFoundException.class, () -> builder.removeTask(fLastKnownTask));
}
break;
}
}
assertEquals(changed, builder.isChanged());
persistentTasks = (T) builder.build();
}
}
@SuppressWarnings("unchecked")
public void testMinVersionSerialization() throws IOException {
PersistentTasks.Builder<?> tasks = builder();
TransportVersion minVersion = getFirstVersion();
TransportVersion streamVersion = randomVersionBetween(random(), minVersion, getPreviousVersion(TransportVersion.current()));
tasks.addTask(
"test_compatible_version",
TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME,
new TestPersistentTasksPlugin.TestParams(
null,
randomVersionBetween(random(), minVersion, streamVersion),
randomBoolean() ? Optional.empty() : Optional.of("test")
),
randomAssignment()
);
tasks.addTask(
"test_incompatible_version",
TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME,
new TestPersistentTasksPlugin.TestParams(
null,
randomVersionBetween(random(), getNextVersion(streamVersion), TransportVersion.current()),
randomBoolean() ? Optional.empty() : Optional.of("test")
),
randomAssignment()
);
final BytesStreamOutput out = new BytesStreamOutput();
out.setTransportVersion(streamVersion);
((T) tasks.build()).writeTo(out);
final StreamInput input = out.bytes().streamInput();
input.setTransportVersion(streamVersion);
PersistentTasksCustomMetadata read = new PersistentTasksCustomMetadata(
new NamedWriteableAwareStreamInput(input, getNamedWriteableRegistry())
);
assertThat(read.taskMap().keySet(), contains("test_compatible_version"));
}
private PersistentTasks asPersistentTasks(T testInstance) {
return (PersistentTasks) testInstance;
}
protected String addRandomTask(PersistentTasks.Builder<?> builder) {
String taskId = UUIDs.base64UUID();
builder.addTask(
taskId,
TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME,
new TestPersistentTasksPlugin.TestParams(randomAlphaOfLength(10)),
randomAssignment()
);
return taskId;
}
protected String pickRandomTask(PersistentTasks.Builder<?> testInstance) {
return randomFrom(new ArrayList<>(testInstance.getCurrentTaskIds()));
}
protected PersistentTasksCustomMetadata.Assignment randomAssignment() {
if (randomBoolean()) {
if (randomBoolean()) {
return NO_NODE_FOUND;
} else {
return new PersistentTasksCustomMetadata.Assignment(null, randomAlphaOfLength(10));
}
}
return new PersistentTasksCustomMetadata.Assignment(randomAlphaOfLength(10), randomAlphaOfLength(10));
}
protected abstract PersistentTasks.Builder<?> builder();
protected abstract PersistentTasks.Builder<?> builder(T testInstance);
}

View file

@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.persistent;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.Metadata.ClusterCustom;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.State;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentParser;
import java.util.Arrays;
public class ClusterPersistentTasksCustomMetadataTests extends BasePersistentTasksCustomMetadataTests<ClusterCustom> {
@Override
protected Writeable.Reader<ClusterCustom> instanceReader() {
return ClusterPersistentTasksCustomMetadata::new;
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(
Arrays.asList(
new Entry(ClusterCustom.class, ClusterPersistentTasksCustomMetadata.TYPE, ClusterPersistentTasksCustomMetadata::new),
new Entry(NamedDiff.class, ClusterPersistentTasksCustomMetadata.TYPE, ClusterPersistentTasksCustomMetadata::readDiffFrom),
new Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new),
new Entry(PersistentTaskState.class, TestPersistentTasksExecutor.NAME, State::new)
)
);
}
@Override
protected Writeable.Reader<Diff<ClusterCustom>> diffReader() {
return ClusterPersistentTasksCustomMetadata::readDiffFrom;
}
@Override
protected ClusterPersistentTasksCustomMetadata doParseInstance(XContentParser parser) {
return ClusterPersistentTasksCustomMetadata.fromXContent(parser);
}
@Override
protected NamedXContentRegistry xContentRegistry() {
return new NamedXContentRegistry(
Arrays.asList(
new NamedXContentRegistry.Entry(
PersistentTaskParams.class,
new ParseField(TestPersistentTasksExecutor.NAME),
TestParams::fromXContent
),
new NamedXContentRegistry.Entry(
PersistentTaskState.class,
new ParseField(TestPersistentTasksExecutor.NAME),
State::fromXContent
)
)
);
}
@Override
protected PersistentTasks.Builder<?> builder() {
return ClusterPersistentTasksCustomMetadata.builder();
}
@Override
protected PersistentTasks.Builder<?> builder(ClusterCustom testInstance) {
return ClusterPersistentTasksCustomMetadata.builder((ClusterPersistentTasksCustomMetadata) testInstance);
}
}

View file

@ -8,7 +8,6 @@
*/
package org.elasticsearch.persistent;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
@ -20,68 +19,24 @@ import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Builder;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.State;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.test.ChunkedToXContentDiffableSerializationTestCase;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_GATEWAY;
import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_SNAPSHOT;
import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND;
import static org.elasticsearch.test.TransportVersionUtils.getFirstVersion;
import static org.elasticsearch.test.TransportVersionUtils.getNextVersion;
import static org.elasticsearch.test.TransportVersionUtils.getPreviousVersion;
import static org.elasticsearch.test.TransportVersionUtils.randomVersionBetween;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
public class PersistentTasksCustomMetadataTests extends ChunkedToXContentDiffableSerializationTestCase<ProjectCustom> {
@Override
protected PersistentTasksCustomMetadata createTestInstance() {
int numberOfTasks = randomInt(10);
PersistentTasksCustomMetadata.Builder tasks = PersistentTasksCustomMetadata.builder();
for (int i = 0; i < numberOfTasks; i++) {
String taskId = UUIDs.base64UUID();
tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)), randomAssignment());
if (randomBoolean()) {
// From time to time update status
tasks.updateTaskState(taskId, new State(randomAlphaOfLength(10)));
}
}
return tasks.build();
}
@Override
protected ProjectCustom mutateInstance(ProjectCustom instance) {
return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929
}
public class PersistentTasksCustomMetadataTests extends BasePersistentTasksCustomMetadataTests<ProjectCustom> {
@Override
protected Writeable.Reader<ProjectCustom> instanceReader() {
@ -100,38 +55,6 @@ public class PersistentTasksCustomMetadataTests extends ChunkedToXContentDiffabl
);
}
@Override
protected ProjectCustom makeTestChanges(ProjectCustom testInstance) {
Builder builder = PersistentTasksCustomMetadata.builder((PersistentTasksCustomMetadata) testInstance);
switch (randomInt(3)) {
case 0:
addRandomTask(builder);
break;
case 1:
if (builder.getCurrentTaskIds().isEmpty()) {
addRandomTask(builder);
} else {
builder.reassignTask(pickRandomTask(builder), randomAssignment());
}
break;
case 2:
if (builder.getCurrentTaskIds().isEmpty()) {
addRandomTask(builder);
} else {
builder.updateTaskState(pickRandomTask(builder), randomBoolean() ? new State(randomAlphaOfLength(10)) : null);
}
break;
case 3:
if (builder.getCurrentTaskIds().isEmpty()) {
addRandomTask(builder);
} else {
builder.removeTask(pickRandomTask(builder));
}
break;
}
return builder.build();
}
@Override
protected Writeable.Reader<Diff<ProjectCustom>> diffReader() {
return PersistentTasksCustomMetadata::readDiffFrom;
@ -142,16 +65,6 @@ public class PersistentTasksCustomMetadataTests extends ChunkedToXContentDiffabl
return PersistentTasksCustomMetadata.fromXContent(parser);
}
private String addRandomTask(Builder builder) {
String taskId = UUIDs.base64UUID();
builder.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)), randomAssignment());
return taskId;
}
private String pickRandomTask(PersistentTasksCustomMetadata.Builder testInstance) {
return randomFrom(new ArrayList<>(testInstance.getCurrentTaskIds()));
}
@Override
protected NamedXContentRegistry xContentRegistry() {
return new NamedXContentRegistry(
@ -170,131 +83,14 @@ public class PersistentTasksCustomMetadataTests extends ChunkedToXContentDiffabl
);
}
@SuppressWarnings("unchecked")
public void testSerializationContext() throws Exception {
PersistentTasksCustomMetadata testInstance = createTestInstance();
for (int i = 0; i < randomInt(10); i++) {
testInstance = (PersistentTasksCustomMetadata) makeTestChanges(testInstance);
@Override
protected PersistentTasks.Builder<?> builder() {
return PersistentTasksCustomMetadata.builder();
}
ToXContent.MapParams params = new ToXContent.MapParams(
Collections.singletonMap(Metadata.CONTEXT_MODE_PARAM, randomFrom(CONTEXT_MODE_SNAPSHOT, CONTEXT_MODE_GATEWAY))
);
XContentType xContentType = randomFrom(XContentType.values());
BytesReference shuffled = toShuffledXContent(asXContent(testInstance), xContentType, params, false);
PersistentTasksCustomMetadata newInstance;
try (XContentParser parser = createParser(XContentFactory.xContent(xContentType), shuffled)) {
newInstance = doParseInstance(parser);
}
assertNotSame(newInstance, testInstance);
assertEquals(testInstance.tasks().size(), newInstance.tasks().size());
for (PersistentTask<?> testTask : testInstance.tasks()) {
PersistentTask<TestParams> newTask = (PersistentTask<TestParams>) newInstance.getTask(testTask.getId());
assertNotNull(newTask);
// Things that should be serialized
assertEquals(testTask.getTaskName(), newTask.getTaskName());
assertEquals(testTask.getId(), newTask.getId());
assertEquals(testTask.getState(), newTask.getState());
assertEquals(testTask.getParams(), newTask.getParams());
// Things that shouldn't be serialized
assertEquals(0, newTask.getAllocationId());
assertNull(newTask.getExecutorNode());
}
}
public void testBuilder() {
PersistentTasksCustomMetadata persistentTasks = null;
String lastKnownTask = "";
for (int i = 0; i < randomIntBetween(10, 100); i++) {
final Builder builder;
if (randomBoolean()) {
builder = PersistentTasksCustomMetadata.builder();
} else {
builder = PersistentTasksCustomMetadata.builder(persistentTasks);
}
boolean changed = false;
for (int j = 0; j < randomIntBetween(1, 10); j++) {
switch (randomInt(3)) {
case 0:
lastKnownTask = addRandomTask(builder);
changed = true;
break;
case 1:
if (builder.hasTask(lastKnownTask)) {
changed = true;
builder.reassignTask(lastKnownTask, randomAssignment());
} else {
String fLastKnownTask = lastKnownTask;
expectThrows(ResourceNotFoundException.class, () -> builder.reassignTask(fLastKnownTask, randomAssignment()));
}
break;
case 2:
if (builder.hasTask(lastKnownTask)) {
changed = true;
builder.updateTaskState(lastKnownTask, randomBoolean() ? new State(randomAlphaOfLength(10)) : null);
} else {
String fLastKnownTask = lastKnownTask;
expectThrows(ResourceNotFoundException.class, () -> builder.updateTaskState(fLastKnownTask, null));
}
break;
case 3:
if (builder.hasTask(lastKnownTask)) {
changed = true;
builder.removeTask(lastKnownTask);
} else {
String fLastKnownTask = lastKnownTask;
expectThrows(ResourceNotFoundException.class, () -> builder.removeTask(fLastKnownTask));
}
break;
}
}
assertEquals(changed, builder.isChanged());
persistentTasks = builder.build();
}
}
public void testMinVersionSerialization() throws IOException {
PersistentTasksCustomMetadata.Builder tasks = PersistentTasksCustomMetadata.builder();
TransportVersion minVersion = getFirstVersion();
TransportVersion streamVersion = randomVersionBetween(random(), minVersion, getPreviousVersion(TransportVersion.current()));
tasks.addTask(
"test_compatible_version",
TestPersistentTasksExecutor.NAME,
new TestParams(
null,
randomVersionBetween(random(), minVersion, streamVersion),
randomBoolean() ? Optional.empty() : Optional.of("test")
),
randomAssignment()
);
tasks.addTask(
"test_incompatible_version",
TestPersistentTasksExecutor.NAME,
new TestParams(
null,
randomVersionBetween(random(), getNextVersion(streamVersion), TransportVersion.current()),
randomBoolean() ? Optional.empty() : Optional.of("test")
),
randomAssignment()
);
final BytesStreamOutput out = new BytesStreamOutput();
out.setTransportVersion(streamVersion);
tasks.build().writeTo(out);
final StreamInput input = out.bytes().streamInput();
input.setTransportVersion(streamVersion);
PersistentTasksCustomMetadata read = new PersistentTasksCustomMetadata(
new NamedWriteableAwareStreamInput(input, getNamedWriteableRegistry())
);
assertThat(read.taskMap().keySet(), contains("test_compatible_version"));
@Override
protected PersistentTasks.Builder<?> builder(ProjectCustom testInstance) {
return PersistentTasksCustomMetadata.builder((PersistentTasksCustomMetadata) testInstance);
}
public void testDisassociateDeadNodes_givenNoPersistentTasks() {
@ -379,7 +175,7 @@ public class PersistentTasksCustomMetadataTests extends ChunkedToXContentDiffabl
assertEquals(originalTasks.getTask("assigned-task"), returnedTasks.getTask("assigned-task"));
assertNotEquals(originalTasks.getTask("task-on-deceased-node"), returnedTasks.getTask("task-on-deceased-node"));
assertEquals(PersistentTasksCustomMetadata.LOST_NODE_ASSIGNMENT, returnedTasks.getTask("task-on-deceased-node").getAssignment());
assertEquals(PersistentTasks.LOST_NODE_ASSIGNMENT, returnedTasks.getTask("task-on-deceased-node").getAssignment());
}
private PersistentTaskParams emptyTaskParams(String taskName) {
@ -406,15 +202,4 @@ public class PersistentTasksCustomMetadataTests extends ChunkedToXContentDiffabl
}
};
}
private Assignment randomAssignment() {
if (randomBoolean()) {
if (randomBoolean()) {
return NO_NODE_FOUND;
} else {
return new Assignment(null, randomAlphaOfLength(10));
}
}
return new Assignment(randomAlphaOfLength(10), randomAlphaOfLength(10));
}
}

View file

@ -28,7 +28,7 @@ public class PersistentTasksExecutorResponseTests extends AbstractWireSerializin
TestPersistentTasksExecutor.NAME,
new TestPersistentTasksPlugin.TestParams("test"),
randomLong(),
PersistentTasksCustomMetadata.INITIAL_ASSIGNMENT
PersistentTasks.INITIAL_ASSIGNMENT
)
);
} else {