[Transform] Check alias during update (#124825)

When the Transform System Index has been reindexed and aliased, we
should check the Transform Update index against the alias when updating
the Transform Config.
This commit is contained in:
Pat Whelan 2025-05-20 09:16:52 -04:00 committed by GitHub
parent be396aef85
commit c0f5e00378
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 98 additions and 6 deletions

View file

@ -0,0 +1,5 @@
pr: 124825
summary: Check alias during update
area: Transform
type: bug
issues: []

View file

@ -16,6 +16,8 @@ import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;
@ -25,12 +27,16 @@ import org.elasticsearch.xpack.core.transform.TransformConfigVersion;
import org.elasticsearch.xpack.core.transform.TransformDeprecations;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.action.GetTransformAction;
import org.elasticsearch.xpack.core.transform.action.PutTransformAction;
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction;
import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfigTests;
import org.elasticsearch.xpack.core.transform.utils.TransformConfigVersionUtils;
import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase;
import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex;
@ -253,4 +259,65 @@ public class TransformOldTransformsIT extends TransformSingleNodeTestCase {
assertMaxPageSearchSizeInSettings(transformId, expectedMaxPageSearchSize);
}
public void testMigratedTransformIndex() {
// create transform
var sourceIndex = "source-index";
createSourceIndex(sourceIndex);
var transformId = "transform-migrated-system-index";
var sourceConfig = new SourceConfig(sourceIndex);
var destConfig = new DestConfig("some-dest-index", null, null);
var config = new TransformConfig(
transformId,
sourceConfig,
destConfig,
null,
null,
null,
PivotConfigTests.randomPivotConfig(),
null,
null,
null,
null,
null,
null,
null
);
var putTransform = new PutTransformAction.Request(config, true, TimeValue.THIRTY_SECONDS);
assertTrue(client().execute(PutTransformAction.INSTANCE, putTransform).actionGet().isAcknowledged());
// simulate migration by reindexing and aliasing
var newSystemIndex = TransformInternalIndexConstants.LATEST_INDEX_NAME + "-reindexed";
var reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(TransformInternalIndexConstants.LATEST_INDEX_NAME);
reindexRequest.setDestIndex(newSystemIndex);
reindexRequest.setRefresh(true);
client().execute(ReindexAction.INSTANCE, reindexRequest).actionGet();
var aliasesRequest = admin().indices().prepareAliases(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS);
aliasesRequest.removeIndex(TransformInternalIndexConstants.LATEST_INDEX_NAME);
aliasesRequest.addAlias(newSystemIndex, TransformInternalIndexConstants.LATEST_INDEX_NAME);
aliasesRequest.execute().actionGet();
// update should succeed
var updateConfig = new TransformConfigUpdate(
sourceConfig,
new DestConfig("some-new-dest-index", null, null),
null,
null,
null,
null,
null,
null
);
var updateRequest = new UpdateTransformAction.Request(updateConfig, transformId, true, TimeValue.THIRTY_SECONDS);
client().execute(UpdateTransformAction.INSTANCE, updateRequest).actionGet();
// verify update succeeded
var getTransformRequest = new GetTransformAction.Request(transformId);
var getTransformResponse = client().execute(GetTransformAction.INSTANCE, getTransformRequest).actionGet();
var transformConfig = getTransformResponse.getTransformConfigurations().get(0);
assertThat(transformConfig.getDestination().getIndex(), equalTo("some-new-dest-index"));
}
}

View file

@ -78,7 +78,8 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
@Before
public void createComponents() {
clusterService = mock(ClusterService.class);
clusterService = mock();
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
transformConfigManager = new IndexBasedTransformConfigManager(
clusterService,
TestIndexNameExpressionResolver.newInstance(),

View file

@ -252,7 +252,7 @@ public class TransformUpdater {
long lastCheckpoint = currentState.v1().getTransformState().getCheckpoint();
// if: the state is stored on the latest index, it does not need an update
if (currentState.v2().getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) {
if (transformConfigManager.isLatestTransformIndex(currentState.v2().getIndex())) {
listener.onResponse(lastCheckpoint);
return;
}
@ -283,8 +283,7 @@ public class TransformUpdater {
ActionListener<Boolean> listener
) {
transformConfigManager.getTransformCheckpointForUpdate(transformId, lastCheckpoint, ActionListener.wrap(checkpointAndVersion -> {
if (checkpointAndVersion == null
|| checkpointAndVersion.v2().getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) {
if (checkpointAndVersion == null || transformConfigManager.isLatestTransformIndex(checkpointAndVersion.v2().getIndex())) {
listener.onResponse(true);
return;
}

View file

@ -170,7 +170,7 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
listener.onFailure(conflictStatusException("Cannot update Transform while the Transform feature is upgrading."));
return;
}
if (seqNoPrimaryTermAndIndex.getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_NAME)) {
if (isLatestTransformIndex(seqNoPrimaryTermAndIndex.getIndex())) {
// update the config in the same, current index using optimistic concurrency control
putTransformConfiguration(transformConfig, DocWriteRequest.OpType.INDEX, seqNoPrimaryTermAndIndex, listener);
} else {
@ -180,6 +180,21 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
}
}
@Override
public boolean isLatestTransformIndex(String indexName) {
if (TransformInternalIndexConstants.LATEST_INDEX_NAME.equals(indexName)) {
return true;
}
// in some cases, the System Index gets reindexed and LATEST_INDEX_NAME is now an alias pointing to that reindexed index
// this mostly likely happens after the SystemIndexMigrator ran
// we need to check if the LATEST_INDEX_NAME is now an alias and points to the indexName
var metadata = clusterService.state().projectState().metadata();
var indicesForAlias = metadata.aliasedIndices(TransformInternalIndexConstants.LATEST_INDEX_NAME);
var index = metadata.index(indexName);
return index != null && indicesForAlias.contains(index.getIndex());
}
@Override
public void deleteOldTransformConfigurations(String transformId, ActionListener<Boolean> listener) {
if (isUpgrading()) {
@ -697,7 +712,7 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
// could have been called, see gh#80073
indexRequest.opType(DocWriteRequest.OpType.INDEX);
// if on the latest index use optimistic concurrency control in addition
if (seqNoPrimaryTermAndIndex.getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_NAME)) {
if (isLatestTransformIndex(seqNoPrimaryTermAndIndex.getIndex())) {
indexRequest.setIfSeqNo(seqNoPrimaryTermAndIndex.getSeqNo())
.setIfPrimaryTerm(seqNoPrimaryTermAndIndex.getPrimaryTerm());
}

View file

@ -15,6 +15,7 @@ import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import java.util.Collection;
import java.util.Collections;
@ -206,4 +207,8 @@ public interface TransformConfigManager {
void getTransformStoredDocs(Collection<String> transformIds, TimeValue timeout, ActionListener<List<TransformStoredDoc>> listener);
void refresh(ActionListener<Boolean> listener);
default boolean isLatestTransformIndex(String indexName) {
return TransformInternalIndexConstants.LATEST_INDEX_NAME.equals(indexName);
}
}