[Transform] Allow passing additional settings during transform internal index creation. (#97312)

This commit is contained in:
Przemysław Witek 2023-07-03 13:27:28 +02:00 committed by GitHub
parent 13416f7367
commit 7f9b9a5677
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 48 additions and 17 deletions

View file

@ -752,7 +752,7 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
for (String index : indexes) {
IndexMetadata.Builder builder = new IndexMetadata.Builder(index).settings(
Settings.builder()
.put(TransformInternalIndex.settings())
.put(TransformInternalIndex.settings(Settings.EMPTY))
.put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT)
.build()
).numberOfReplicas(0).numberOfShards(1).putMapping(Strings.toString(TransformInternalIndex.mappings()));

View file

@ -117,7 +117,6 @@ import java.io.UncheckedIOException;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
@ -278,13 +277,14 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
// the transform services should have been created
assert transformServices.get() != null;
return Collections.singletonList(
return List.of(
new TransformPersistentTasksExecutor(
client,
transformServices.get(),
threadPool,
clusterService,
settingsModule.getSettings(),
getTransformInternalIndexAdditionalSettings(),
expressionResolver
)
);
@ -327,7 +327,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
try {
return Collections.singletonList(TransformInternalIndex.getSystemIndexDescriptor());
return List.of(TransformInternalIndex.getSystemIndexDescriptor(getTransformInternalIndexAdditionalSettings()));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
@ -443,4 +443,8 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
public boolean includeNodeInfo() {
return true;
}
public Settings getTransformInternalIndexAdditionalSettings() {
return Settings.EMPTY;
}
}

View file

@ -88,13 +88,13 @@ public final class TransformInternalIndex {
public static final String BOOLEAN = "boolean";
public static final String FLATTENED = "flattened";
public static SystemIndexDescriptor getSystemIndexDescriptor() throws IOException {
public static SystemIndexDescriptor getSystemIndexDescriptor(Settings transformInternalIndexAdditionalSettings) throws IOException {
return SystemIndexDescriptor.builder()
.setIndexPattern(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
.setPrimaryIndex(TransformInternalIndexConstants.LATEST_INDEX_NAME)
.setDescription("Contains Transform configuration data")
.setMappings(mappings())
.setSettings(settings())
.setSettings(settings(transformInternalIndexAdditionalSettings))
.setVersionMetaKey("version")
.setOrigin(TRANSFORM_ORIGIN)
.build();
@ -149,11 +149,13 @@ public final class TransformInternalIndex {
return builder;
}
public static Settings settings() {
public static Settings settings(Settings additionalSettings) {
assert additionalSettings != null;
return Settings.builder()
// the configurations are expected to be small
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.put(additionalSettings)
.build();
}
@ -389,7 +391,12 @@ public final class TransformInternalIndex {
* Without this check the data nodes will create an internal index with dynamic
* mappings when indexing a document, but we want our own well defined mappings.
*/
public static void createLatestVersionedIndexIfRequired(ClusterService clusterService, Client client, ActionListener<Void> listener) {
public static void createLatestVersionedIndexIfRequired(
ClusterService clusterService,
Client client,
Settings transformInternalIndexAdditionalSettings,
ActionListener<Void> listener
) {
ClusterState state = clusterService.state();
// The check for existence is against local cluster state, so very cheap
if (hasLatestVersionedIndex(state)) {
@ -405,7 +412,7 @@ public final class TransformInternalIndex {
// Creating the index involves communication with the master node, so it's more expensive but much rarer
try {
CreateIndexRequest request = new CreateIndexRequest(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME).settings(
settings()
settings(transformInternalIndexAdditionalSettings)
)
.mapping(mappings())
.origin(TRANSFORM_ORIGIN)

View file

@ -74,6 +74,7 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
private final ClusterService clusterService;
private final IndexNameExpressionResolver resolver;
private final TransformAuditor auditor;
private final Settings transformInternalIndexAdditionalSettings;
private volatile int numFailureRetries;
public TransformPersistentTasksExecutor(
@ -82,6 +83,7 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
ThreadPool threadPool,
ClusterService clusterService,
Settings settings,
Settings transformInternalIndexAdditionalSettings,
IndexNameExpressionResolver resolver
) {
super(TransformField.TASK_NAME, ThreadPool.Names.GENERIC);
@ -93,6 +95,7 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
this.auditor = transformServices.getAuditor();
this.numFailureRetries = Transform.NUM_FAILURE_RETRIES_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(Transform.NUM_FAILURE_RETRIES_SETTING, this::setNumFailureRetries);
this.transformInternalIndexAdditionalSettings = transformInternalIndexAdditionalSettings;
}
@Override
@ -339,7 +342,12 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
);
// <1> Check the latest internal index (IMPORTANT: according to _this_ node, which might be newer than master) is installed
TransformInternalIndex.createLatestVersionedIndexIfRequired(clusterService, buildTask.getParentTaskClient(), templateCheckListener);
TransformInternalIndex.createLatestVersionedIndexIfRequired(
clusterService,
buildTask.getParentTaskClient(),
transformInternalIndexAdditionalSettings,
templateCheckListener
);
}
private static IndexerState currentIndexerState(TransformState previousState) {

View file

@ -31,6 +31,7 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
@ -43,6 +44,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@ -64,7 +67,7 @@ public class TransformInternalIndexTests extends ESTestCase {
try {
IndexMetadata.Builder builder = new IndexMetadata.Builder(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME).settings(
Settings.builder()
.put(TransformInternalIndex.settings())
.put(TransformInternalIndex.settings(Settings.EMPTY))
.put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT)
.build()
).numberOfReplicas(0).numberOfShards(1).putMapping(Strings.toString(TransformInternalIndex.mappings()));
@ -122,7 +125,7 @@ public class TransformInternalIndexTests extends ESTestCase {
AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionTestUtils.assertNoFailureListener(aVoid -> gotResponse.set(true));
TransformInternalIndex.createLatestVersionedIndexIfRequired(clusterService, client, testListener);
TransformInternalIndex.createLatestVersionedIndexIfRequired(clusterService, client, Settings.EMPTY, testListener);
assertTrue(gotResponse.get());
verifyNoMoreInteractions(client);
@ -153,7 +156,7 @@ public class TransformInternalIndexTests extends ESTestCase {
AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionTestUtils.assertNoFailureListener(aVoid -> gotResponse.set(true));
TransformInternalIndex.createLatestVersionedIndexIfRequired(clusterService, client, testListener);
TransformInternalIndex.createLatestVersionedIndexIfRequired(clusterService, client, Settings.EMPTY, testListener);
assertTrue(gotResponse.get());
verify(client, times(1)).threadPool();
@ -190,7 +193,7 @@ public class TransformInternalIndexTests extends ESTestCase {
AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionTestUtils.assertNoFailureListener(aVoid -> gotResponse.set(true));
TransformInternalIndex.createLatestVersionedIndexIfRequired(clusterService, client, testListener);
TransformInternalIndex.createLatestVersionedIndexIfRequired(clusterService, client, Settings.EMPTY, testListener);
assertTrue(gotResponse.get());
verify(client, times(1)).threadPool();
@ -228,7 +231,7 @@ public class TransformInternalIndexTests extends ESTestCase {
AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionTestUtils.assertNoFailureListener(aVoid -> gotResponse.set(true));
TransformInternalIndex.createLatestVersionedIndexIfRequired(clusterService, client, testListener);
TransformInternalIndex.createLatestVersionedIndexIfRequired(clusterService, client, Settings.EMPTY, testListener);
assertTrue(gotResponse.get());
verify(client, times(1)).threadPool();
@ -275,7 +278,7 @@ public class TransformInternalIndexTests extends ESTestCase {
AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionTestUtils.assertNoFailureListener(aVoid -> gotResponse.set(true));
TransformInternalIndex.createLatestVersionedIndexIfRequired(clusterService, client, testListener);
TransformInternalIndex.createLatestVersionedIndexIfRequired(clusterService, client, Settings.EMPTY, testListener);
assertTrue(gotResponse.get());
verify(client, times(2)).threadPool();
@ -314,7 +317,7 @@ public class TransformInternalIndexTests extends ESTestCase {
AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionTestUtils.assertNoFailureListener(aVoid -> gotResponse.set(true));
TransformInternalIndex.createLatestVersionedIndexIfRequired(clusterService, client, testListener);
TransformInternalIndex.createLatestVersionedIndexIfRequired(clusterService, client, Settings.EMPTY, testListener);
assertTrue(gotResponse.get());
verify(client, times(1)).threadPool();
@ -325,4 +328,12 @@ public class TransformInternalIndexTests extends ESTestCase {
verify(indicesClient, times(1)).create(any(), any());
verifyNoMoreInteractions(indicesClient);
}
public void testSettings() {
Settings settings = TransformInternalIndex.settings(Settings.EMPTY);
assertThat(settings.get(IndexSettings.INDEX_FAST_REFRESH_SETTING.getKey()), is(nullValue()));
settings = TransformInternalIndex.settings(Settings.builder().put(IndexSettings.INDEX_FAST_REFRESH_SETTING.getKey(), true).build());
assertThat(settings.getAsBoolean(IndexSettings.INDEX_FAST_REFRESH_SETTING.getKey(), false), is(true));
}
}

View file

@ -429,6 +429,7 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
threadPool,
clusterService,
Settings.EMPTY,
Settings.EMPTY,
TestIndexNameExpressionResolver.newInstance()
);
}