Make downsampling project-aware (#124000)

Allows downsampling to work on multiple projects.
This commit is contained in:
Niels Bauman 2025-03-06 12:55:04 +01:00 committed by GitHub
parent f360d6b781
commit 2a7eb6e117
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 123 additions and 29 deletions

View file

@ -34,3 +34,15 @@ if (buildParams.inFipsJvm){
// This test cluster is using a BASIC license and FIPS 140 mode is not supported in BASIC
tasks.named("yamlRestTest").configure{enabled = false }
}
configurations {
basicRestSpecs {
attributes {
attribute(ArtifactTypeDefinition.ARTIFACT_TYPE_ATTRIBUTE, ArtifactTypeDefinition.DIRECTORY_TYPE)
}
}
}
artifacts {
basicRestSpecs(new File(projectDir, "src/yamlRestTest/resources/rest-api-spec/test"))
}

View file

@ -116,7 +116,7 @@ public class DownsampleShardPersistentTaskExecutor extends PersistentTasksExecut
public void validate(DownsampleShardTaskParams params, ClusterState clusterState) {
// This is just a pre-check, but doesn't prevent from avoiding from aborting the task when source index disappeared
// after initial creation of the persistent task.
var indexShardRouting = clusterState.routingTable().shardRoutingTable(params.shardId().getIndexName(), params.shardId().id());
var indexShardRouting = findShardRoutingTable(params.shardId(), clusterState);
if (indexShardRouting == null) {
throw new ShardNotFoundException(params.shardId());
}
@ -178,11 +178,8 @@ public class DownsampleShardPersistentTaskExecutor extends PersistentTasksExecut
}
private static IndexShardRoutingTable findShardRoutingTable(ShardId shardId, ClusterState clusterState) {
var indexRoutingTable = clusterState.routingTable().index(shardId.getIndexName());
if (indexRoutingTable != null) {
return indexRoutingTable.shard(shardId.getId());
}
return null;
var indexRoutingTable = clusterState.globalRoutingTable().indexRouting(clusterState.metadata(), shardId.getIndex());
return indexRoutingTable.map(routingTable -> routingTable.shard(shardId.getId())).orElse(null);
}
static void realNodeOperation(
@ -327,6 +324,7 @@ public class DownsampleShardPersistentTaskExecutor extends PersistentTasksExecut
realNodeOperation(client, indicesService, downsampleMetrics, request.task, request.params, request.lastDownsampleTsid);
listener.onResponse(ActionResponse.Empty.INSTANCE);
}
}
}
}

View file

@ -34,9 +34,10 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.allocation.DataTier;
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationActionListener;
import org.elasticsearch.cluster.service.ClusterService;
@ -118,6 +119,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
private final ThreadContext threadContext;
private final PersistentTasksService persistentTasksService;
private final DownsampleMetrics downsampleMetrics;
private final ProjectResolver projectResolver;
private static final Set<String> FORBIDDEN_SETTINGS = Set.of(
IndexSettings.DEFAULT_PIPELINE.getKey(),
@ -154,6 +156,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
ThreadPool threadPool,
MetadataCreateIndexService metadataCreateIndexService,
ActionFilters actionFilters,
ProjectResolver projectResolver,
IndexScopedSettings indexScopedSettings,
PersistentTasksService persistentTasksService,
DownsampleMetrics downsampleMetrics
@ -170,6 +173,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
this.client = new OriginSettingClient(client, ClientHelper.ROLLUP_ORIGIN);
this.indicesService = indicesService;
this.metadataCreateIndexService = metadataCreateIndexService;
this.projectResolver = projectResolver;
this.indexScopedSettings = indexScopedSettings;
this.threadContext = threadPool.getThreadContext();
this.taskQueue = clusterService.createTaskQueue("downsample", Priority.URGENT, STATE_UPDATE_TASK_EXECUTOR);
@ -223,7 +227,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
}
}
}
final ProjectMetadata projectMetadata = state.metadata().getProject();
final ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(state);
// Assert source index exists
IndexMetadata sourceIndexMetadata = projectMetadata.index(sourceIndexName);
if (sourceIndexMetadata == null) {
@ -250,7 +254,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
}
// Assert source index is read-only
if (state.blocks().indexBlocked(ClusterBlockLevel.WRITE, sourceIndexName) == false) {
if (state.blocks().indexBlocked(projectMetadata.id(), ClusterBlockLevel.WRITE, sourceIndexName) == false) {
recordInvalidConfigurationMetrics(startTime);
listener.onFailure(
new ElasticsearchException(
@ -268,7 +272,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
return;
}
try {
MetadataCreateIndexService.validateIndexName(downsampleIndexName, projectMetadata, state.routingTable());
MetadataCreateIndexService.validateIndexName(downsampleIndexName, projectMetadata, state.routingTable(projectMetadata.id()));
} catch (ResourceAlreadyExistsException e) {
// ignore index already exists
}
@ -356,6 +360,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
// 3. Create downsample index
createDownsampleIndex(
projectMetadata.id(),
downsampleIndexName,
minNumReplicas,
sourceIndexMetadata,
@ -364,6 +369,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
ActionListener.wrap(createIndexResp -> {
if (createIndexResp.isAcknowledged()) {
performShardDownsampling(
projectMetadata.id(),
request,
delegate,
minNumReplicas,
@ -386,13 +392,14 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
parentTask,
request.getWaitTimeout(),
startTime,
clusterService.state().metadata().getProject(),
clusterService.state().metadata().getProject(projectMetadata.id()),
listener
)) {
logger.info("Downsample tasks are not created, because a previous execution already completed downsampling");
return;
}
performShardDownsampling(
projectMetadata.id(),
request,
delegate,
minNumReplicas,
@ -449,6 +456,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
.refresh(
refreshRequest,
new RefreshDownsampleIndexActionListener(
projectMetadata.id(),
listener,
parentTask,
targetIndexMetadata.getIndex().getName(),
@ -463,6 +471,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
// 3. downsample index created or already exist (in case of retry). Run downsample indexer persistent task on each shard.
private void performShardDownsampling(
final ProjectId projectId,
DownsampleAction.Request request,
ActionListener<AcknowledgedResponse> listener,
int minNumReplicas,
@ -525,6 +534,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
if (countDown.decrementAndGet() == 0) {
logger.info("All downsampling tasks completed [" + numberOfShards + "]");
updateTargetIndexSettingStep(
projectId,
request,
listener,
minNumReplicas,
@ -552,6 +562,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
TimeValue.THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */,
ActionListener.wrap(
startedTask -> persistentTasksService.waitForPersistentTaskCondition(
projectId,
startedTask.getId(),
predicate,
request.getWaitTimeout(),
@ -561,6 +572,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
if (e instanceof ResourceAlreadyExistsException) {
logger.info("Task [" + persistentTaskId + "] already exists. Waiting.");
persistentTasksService.waitForPersistentTaskCondition(
projectId,
persistentTaskId,
predicate,
request.getWaitTimeout(),
@ -577,6 +589,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
// 4. Make downsample index read-only and set the correct number of replicas
private void updateTargetIndexSettingStep(
ProjectId projectId,
final DownsampleAction.Request request,
final ActionListener<AcknowledgedResponse> listener,
int minNumReplicas,
@ -607,6 +620,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
.updateSettings(
updateSettingsReq,
new UpdateDownsampleIndexSettingsActionListener(
projectId,
listener,
parentTask,
downsampleIndexName,
@ -901,6 +915,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
}
private void createDownsampleIndex(
ProjectId projectId,
String downsampleIndexName,
int minNumReplicas,
IndexMetadata sourceIndexMetadata,
@ -941,6 +956,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
CreateIndexClusterStateUpdateRequest createIndexClusterStateUpdateRequest = new CreateIndexClusterStateUpdateRequest(
"downsample",
projectId,
downsampleIndexName,
downsampleIndexName
).settings(builder.build()).mappings(mapping).waitForActiveShards(ActiveShardCount.ONE);
@ -983,6 +999,8 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
* Refreshes the downsample target index
*/
class UpdateDownsampleIndexSettingsActionListener implements ActionListener<AcknowledgedResponse> {
final ProjectId projectId;
final ActionListener<AcknowledgedResponse> listener;
final TaskId parentTask;
final String downsampleIndexName;
@ -990,12 +1008,14 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
final long startTime;
UpdateDownsampleIndexSettingsActionListener(
ProjectId projectId,
final ActionListener<AcknowledgedResponse> listener,
final TaskId parentTask,
final String downsampleIndexName,
final TimeValue timeout,
final long startTime
) {
this.projectId = projectId;
this.listener = listener;
this.parentTask = parentTask;
this.downsampleIndexName = downsampleIndexName;
@ -1009,7 +1029,10 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
request.setParentTask(parentTask);
client.admin()
.indices()
.refresh(request, new RefreshDownsampleIndexActionListener(listener, parentTask, downsampleIndexName, timeout, startTime));
.refresh(
request,
new RefreshDownsampleIndexActionListener(projectId, listener, parentTask, downsampleIndexName, timeout, startTime)
);
}
@Override
@ -1025,6 +1048,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
*/
class RefreshDownsampleIndexActionListener implements ActionListener<BroadcastResponse> {
private final ProjectId projectId;
private final ActionListener<AcknowledgedResponse> actionListener;
private final TaskId parentTask;
private final String downsampleIndexName;
@ -1032,12 +1056,14 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
private final long startTime;
RefreshDownsampleIndexActionListener(
ProjectId projectId,
final ActionListener<AcknowledgedResponse> actionListener,
TaskId parentTask,
final String downsampleIndexName,
final TimeValue timeout,
final long startTime
) {
this.projectId = projectId;
this.actionListener = actionListener;
this.parentTask = parentTask;
this.downsampleIndexName = downsampleIndexName;
@ -1059,22 +1085,21 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
@Override
public ClusterState execute(ClusterState currentState) {
final Metadata metadata = currentState.metadata();
final IndexMetadata downsampleIndex = metadata.getProject()
.index(metadata.getProject().index(downsampleIndexName).getIndex());
final ProjectMetadata project = currentState.metadata().getProject(projectId);
final IndexMetadata downsampleIndex = project.index(downsampleIndexName);
if (IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings()) == DownsampleTaskStatus.SUCCESS) {
return currentState;
}
final Metadata.Builder metadataBuilder = Metadata.builder(metadata);
metadataBuilder.updateSettings(
final ProjectMetadata.Builder projectBuilder = ProjectMetadata.builder(project);
projectBuilder.updateSettings(
Settings.builder()
.put(downsampleIndex.getSettings())
.put(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey(), DownsampleTaskStatus.SUCCESS)
.build(),
downsampleIndexName
);
return ClusterState.builder(currentState).metadata(metadataBuilder.build()).build();
return ClusterState.builder(currentState).putProjectMetadata(projectBuilder).build();
}
},
timeout

View file

@ -32,6 +32,7 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressedXContent;
@ -1194,7 +1195,7 @@ public class DownsampleActionSingleNodeTests extends ESSingleNodeTestCase {
.get()
.getState()
.getMetadata()
.getProject()
.getProject(Metadata.DEFAULT_PROJECT_ID)
.index(sourceIndex);
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
final MapperService mapperService = indicesService.createIndexMapperServiceForValidation(indexMetadata);

View file

@ -11,6 +11,7 @@ import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
@ -41,30 +42,38 @@ import static org.mockito.Mockito.mock;
public class DownsampleShardPersistentTaskExecutorTests extends ESTestCase {
private ProjectId projectId;
private ClusterState initialClusterState;
private DownsampleShardPersistentTaskExecutor executor;
@Before
public void setup() {
projectId = randomProjectIdOrDefault();
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
Instant start = now.minus(2, ChronoUnit.HOURS);
Instant end = now.plus(40, ChronoUnit.MINUTES);
initialClusterState = DataStreamTestHelper.getClusterStateWithDataStream("metrics-app1", List.of(new Tuple<>(start, end)));
initialClusterState = DataStreamTestHelper.getClusterStateWithDataStream(
projectId,
"metrics-app1",
List.of(new Tuple<>(start, end))
);
executor = new DownsampleShardPersistentTaskExecutor(mock(Client.class), DownsampleShardTask.TASK_NAME, mock(Executor.class));
}
public void testGetAssignment() {
var backingIndex = initialClusterState.metadata().getProject().dataStreams().get("metrics-app1").getWriteIndex();
var backingIndex = initialClusterState.metadata().getProject(projectId).dataStreams().get("metrics-app1").getWriteIndex();
var node = newNode();
var shardId = new ShardId(backingIndex, 0);
var clusterState = ClusterState.builder(initialClusterState)
.nodes(new DiscoveryNodes.Builder().add(node).build())
.routingTable(
.putRoutingTable(
projectId,
RoutingTable.builder()
.add(
IndexRoutingTable.builder(backingIndex)
.addShard(shardRoutingBuilder(shardId, node.getId(), true, STARTED).withRecoverySource(null).build())
)
.build()
)
.build();
@ -83,17 +92,19 @@ public class DownsampleShardPersistentTaskExecutorTests extends ESTestCase {
}
public void testGetAssignmentMissingIndex() {
var backingIndex = initialClusterState.metadata().getProject().dataStreams().get("metrics-app1").getWriteIndex();
var backingIndex = initialClusterState.metadata().getProject(projectId).dataStreams().get("metrics-app1").getWriteIndex();
var node = newNode();
var shardId = new ShardId(backingIndex, 0);
var clusterState = ClusterState.builder(initialClusterState)
.nodes(new DiscoveryNodes.Builder().add(node).build())
.routingTable(
.putRoutingTable(
projectId,
RoutingTable.builder()
.add(
IndexRoutingTable.builder(backingIndex)
.addShard(shardRoutingBuilder(shardId, node.getId(), true, STARTED).withRecoverySource(null).build())
)
.build()
)
.build();