mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 17:34:17 -04:00
Make ILMHistoryStore
semi-project-aware
This commit is contained in:
parent
b20e40f816
commit
fe507f4171
6 changed files with 72 additions and 44 deletions
|
@ -221,9 +221,9 @@ public class ExecuteStepsUpdateTask extends IndexLifecycleClusterStateUpdateTask
|
||||||
|
|
||||||
LifecycleExecutionState exState = indexMetadata.getLifecycleExecutionState();
|
LifecycleExecutionState exState = indexMetadata.getLifecycleExecutionState();
|
||||||
if (ErrorStep.NAME.equals(exState.step()) && this.failure != null) {
|
if (ErrorStep.NAME.equals(exState.step()) && this.failure != null) {
|
||||||
lifecycleRunner.registerFailedOperation(indexMetadata, failure);
|
lifecycleRunner.registerFailedOperation(projectId, indexMetadata, failure);
|
||||||
} else {
|
} else {
|
||||||
lifecycleRunner.registerSuccessfulOperation(indexMetadata);
|
lifecycleRunner.registerSuccessfulOperation(projectId, indexMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY) {
|
if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY) {
|
||||||
|
|
|
@ -153,7 +153,8 @@ public class IndexLifecycle extends Plugin implements ActionPlugin, HealthPlugin
|
||||||
new ILMHistoryStore(
|
new ILMHistoryStore(
|
||||||
new OriginSettingClient(services.client(), INDEX_LIFECYCLE_ORIGIN),
|
new OriginSettingClient(services.client(), INDEX_LIFECYCLE_ORIGIN),
|
||||||
services.clusterService(),
|
services.clusterService(),
|
||||||
services.threadPool()
|
services.threadPool(),
|
||||||
|
services.projectResolver()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -371,7 +371,7 @@ class IndexLifecycleRunner {
|
||||||
// Delete needs special handling, because after this step we
|
// Delete needs special handling, because after this step we
|
||||||
// will no longer have access to any information about the
|
// will no longer have access to any information about the
|
||||||
// index since it will be... deleted.
|
// index since it will be... deleted.
|
||||||
registerDeleteOperation(indexMetadata);
|
registerDeleteOperation(projectId, indexMetadata);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -479,7 +479,7 @@ class IndexLifecycleRunner {
|
||||||
),
|
),
|
||||||
new MoveToNextStepUpdateTask(projectId, index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, state -> {
|
new MoveToNextStepUpdateTask(projectId, index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, state -> {
|
||||||
IndexMetadata indexMetadata = state.metadata().index(index);
|
IndexMetadata indexMetadata = state.metadata().index(index);
|
||||||
registerSuccessfulOperation(indexMetadata);
|
registerSuccessfulOperation(projectId, indexMetadata);
|
||||||
if (newStepKey != null && newStepKey != TerminalPolicyStep.KEY && indexMetadata != null) {
|
if (newStepKey != null && newStepKey != TerminalPolicyStep.KEY && indexMetadata != null) {
|
||||||
maybeRunAsyncAction(state, indexMetadata, policy, newStepKey);
|
maybeRunAsyncAction(state, indexMetadata, policy, newStepKey);
|
||||||
}
|
}
|
||||||
|
@ -499,7 +499,7 @@ class IndexLifecycleRunner {
|
||||||
Strings.format("ilm-move-to-error-step {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(), currentStepKey),
|
Strings.format("ilm-move-to-error-step {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(), currentStepKey),
|
||||||
new MoveToErrorStepUpdateTask(projectId, index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, state -> {
|
new MoveToErrorStepUpdateTask(projectId, index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, state -> {
|
||||||
IndexMetadata indexMetadata = state.metadata().index(index);
|
IndexMetadata indexMetadata = state.metadata().index(index);
|
||||||
registerFailedOperation(indexMetadata, e);
|
registerFailedOperation(projectId, indexMetadata, e);
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -556,13 +556,14 @@ class IndexLifecycleRunner {
|
||||||
* For the given index metadata, register (index a document) that the index has transitioned
|
* For the given index metadata, register (index a document) that the index has transitioned
|
||||||
* successfully into this new state using the {@link ILMHistoryStore}
|
* successfully into this new state using the {@link ILMHistoryStore}
|
||||||
*/
|
*/
|
||||||
void registerSuccessfulOperation(IndexMetadata indexMetadata) {
|
void registerSuccessfulOperation(ProjectId projectId, IndexMetadata indexMetadata) {
|
||||||
if (indexMetadata == null) {
|
if (indexMetadata == null) {
|
||||||
// This index may have been deleted and has no metadata, so ignore it
|
// This index may have been deleted and has no metadata, so ignore it
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Long origination = calculateOriginationMillis(indexMetadata);
|
Long origination = calculateOriginationMillis(indexMetadata);
|
||||||
ilmHistoryStore.putAsync(
|
ilmHistoryStore.putAsync(
|
||||||
|
projectId,
|
||||||
ILMHistoryItem.success(
|
ILMHistoryItem.success(
|
||||||
indexMetadata.getIndex().getName(),
|
indexMetadata.getIndex().getName(),
|
||||||
indexMetadata.getLifecyclePolicyName(),
|
indexMetadata.getLifecyclePolicyName(),
|
||||||
|
@ -577,12 +578,13 @@ class IndexLifecycleRunner {
|
||||||
* For the given index metadata, register (index a document) that the index
|
* For the given index metadata, register (index a document) that the index
|
||||||
* has been deleted by ILM using the {@link ILMHistoryStore}
|
* has been deleted by ILM using the {@link ILMHistoryStore}
|
||||||
*/
|
*/
|
||||||
void registerDeleteOperation(IndexMetadata metadataBeforeDeletion) {
|
void registerDeleteOperation(ProjectId projectId, IndexMetadata metadataBeforeDeletion) {
|
||||||
if (metadataBeforeDeletion == null) {
|
if (metadataBeforeDeletion == null) {
|
||||||
throw new IllegalStateException("cannot register deletion of an index that did not previously exist");
|
throw new IllegalStateException("cannot register deletion of an index that did not previously exist");
|
||||||
}
|
}
|
||||||
Long origination = calculateOriginationMillis(metadataBeforeDeletion);
|
Long origination = calculateOriginationMillis(metadataBeforeDeletion);
|
||||||
ilmHistoryStore.putAsync(
|
ilmHistoryStore.putAsync(
|
||||||
|
projectId,
|
||||||
ILMHistoryItem.success(
|
ILMHistoryItem.success(
|
||||||
metadataBeforeDeletion.getIndex().getName(),
|
metadataBeforeDeletion.getIndex().getName(),
|
||||||
metadataBeforeDeletion.getLifecyclePolicyName(),
|
metadataBeforeDeletion.getLifecyclePolicyName(),
|
||||||
|
@ -600,13 +602,14 @@ class IndexLifecycleRunner {
|
||||||
* For the given index metadata, register (index a document) that the index has transitioned
|
* For the given index metadata, register (index a document) that the index has transitioned
|
||||||
* into the ERROR state using the {@link ILMHistoryStore}
|
* into the ERROR state using the {@link ILMHistoryStore}
|
||||||
*/
|
*/
|
||||||
void registerFailedOperation(IndexMetadata indexMetadata, Exception failure) {
|
void registerFailedOperation(ProjectId projectId, IndexMetadata indexMetadata, Exception failure) {
|
||||||
if (indexMetadata == null) {
|
if (indexMetadata == null) {
|
||||||
// This index may have been deleted and has no metadata, so ignore it
|
// This index may have been deleted and has no metadata, so ignore it
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Long origination = calculateOriginationMillis(indexMetadata);
|
Long origination = calculateOriginationMillis(indexMetadata);
|
||||||
ilmHistoryStore.putAsync(
|
ilmHistoryStore.putAsync(
|
||||||
|
projectId,
|
||||||
ILMHistoryItem.failure(
|
ILMHistoryItem.failure(
|
||||||
indexMetadata.getIndex().getName(),
|
indexMetadata.getIndex().getName(),
|
||||||
indexMetadata.getLifecyclePolicyName(),
|
indexMetadata.getLifecyclePolicyName(),
|
||||||
|
|
|
@ -19,6 +19,8 @@ import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.client.internal.Client;
|
import org.elasticsearch.client.internal.Client;
|
||||||
import org.elasticsearch.client.internal.OriginSettingClient;
|
import org.elasticsearch.client.internal.OriginSettingClient;
|
||||||
|
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||||
|
import org.elasticsearch.cluster.project.ProjectResolver;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.settings.Setting;
|
import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
@ -66,27 +68,25 @@ public class ILMHistoryStore implements Closeable {
|
||||||
);
|
);
|
||||||
|
|
||||||
private volatile boolean ilmHistoryEnabled = true;
|
private volatile boolean ilmHistoryEnabled = true;
|
||||||
|
private final ProjectResolver projectResolver;
|
||||||
private final BulkProcessor2 processor;
|
private final BulkProcessor2 processor;
|
||||||
|
|
||||||
public ILMHistoryStore(Client client, ClusterService clusterService, ThreadPool threadPool) {
|
public ILMHistoryStore(Client client, ClusterService clusterService, ThreadPool threadPool, ProjectResolver projectResolver) {
|
||||||
this(client, clusterService, threadPool, ActionListener.noop(), TimeValue.timeValueSeconds(5));
|
this(client, clusterService, threadPool, projectResolver, ActionListener.noop(), TimeValue.timeValueSeconds(5));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For unit testing, allows a more frequent flushInterval
|
* For unit testing, allows a more frequent flushInterval
|
||||||
* @param client
|
|
||||||
* @param clusterService
|
|
||||||
* @param threadPool
|
|
||||||
* @param listener
|
|
||||||
* @param flushInterval
|
|
||||||
*/
|
*/
|
||||||
ILMHistoryStore(
|
ILMHistoryStore(
|
||||||
Client client,
|
Client client,
|
||||||
ClusterService clusterService,
|
ClusterService clusterService,
|
||||||
ThreadPool threadPool,
|
ThreadPool threadPool,
|
||||||
|
ProjectResolver projectResolver,
|
||||||
ActionListener<BulkResponse> listener,
|
ActionListener<BulkResponse> listener,
|
||||||
TimeValue flushInterval
|
TimeValue flushInterval
|
||||||
) {
|
) {
|
||||||
|
this.projectResolver = projectResolver;
|
||||||
this.setIlmHistoryEnabled(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(clusterService.getSettings()));
|
this.setIlmHistoryEnabled(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(clusterService.getSettings()));
|
||||||
clusterService.getClusterSettings().addSettingsUpdateConsumer(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING, this::setIlmHistoryEnabled);
|
clusterService.getClusterSettings().addSettingsUpdateConsumer(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING, this::setIlmHistoryEnabled);
|
||||||
|
|
||||||
|
@ -95,7 +95,8 @@ public class ILMHistoryStore implements Closeable {
|
||||||
new BulkProcessor2.Listener() {
|
new BulkProcessor2.Listener() {
|
||||||
@Override
|
@Override
|
||||||
public void beforeBulk(long executionId, BulkRequest request) {
|
public void beforeBulk(long executionId, BulkRequest request) {
|
||||||
if (clusterService.state().getMetadata().getProject().templatesV2().containsKey(ILM_TEMPLATE_NAME) == false) {
|
final var project = projectResolver.getProjectMetadata(clusterService.state());
|
||||||
|
if (project.templatesV2().containsKey(ILM_TEMPLATE_NAME) == false) {
|
||||||
ElasticsearchException e = new ElasticsearchException("no ILM history template");
|
ElasticsearchException e = new ElasticsearchException("no ILM history template");
|
||||||
logger.warn(
|
logger.warn(
|
||||||
() -> format(
|
() -> format(
|
||||||
|
@ -169,7 +170,7 @@ public class ILMHistoryStore implements Closeable {
|
||||||
/**
|
/**
|
||||||
* Attempts to asynchronously index an ILM history entry
|
* Attempts to asynchronously index an ILM history entry
|
||||||
*/
|
*/
|
||||||
public void putAsync(ILMHistoryItem item) {
|
public void putAsync(ProjectId projectId, ILMHistoryItem item) {
|
||||||
if (ilmHistoryEnabled == false) {
|
if (ilmHistoryEnabled == false) {
|
||||||
logger.trace(
|
logger.trace(
|
||||||
"not recording ILM history item because [{}] is [false]: [{}]",
|
"not recording ILM history item because [{}] is [false]: [{}]",
|
||||||
|
@ -182,7 +183,7 @@ public class ILMHistoryStore implements Closeable {
|
||||||
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
|
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
|
||||||
item.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
item.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||||
IndexRequest request = new IndexRequest(ILM_HISTORY_DATA_STREAM).source(builder).opType(DocWriteRequest.OpType.CREATE);
|
IndexRequest request = new IndexRequest(ILM_HISTORY_DATA_STREAM).source(builder).opType(DocWriteRequest.OpType.CREATE);
|
||||||
processor.add(request);
|
projectResolver.executeOnProject(projectId, () -> processor.add(request));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error(() -> format("failed to send ILM history item to index [%s]: [%s]", ILM_HISTORY_DATA_STREAM, item), e);
|
logger.error(() -> format("failed to send ILM history item to index [%s]: [%s]", ILM_HISTORY_DATA_STREAM, item), e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,9 +16,11 @@ import org.elasticsearch.cluster.ClusterStateObserver;
|
||||||
import org.elasticsearch.cluster.ProjectState;
|
import org.elasticsearch.cluster.ProjectState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||||
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
|
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
|
||||||
|
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||||
import org.elasticsearch.cluster.metadata.ProjectMetadata;
|
import org.elasticsearch.cluster.metadata.ProjectMetadata;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
|
import org.elasticsearch.cluster.project.TestProjectResolvers;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
|
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
|
||||||
import org.elasticsearch.common.Priority;
|
import org.elasticsearch.common.Priority;
|
||||||
|
@ -1266,7 +1268,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
private final List<ILMHistoryItem> items = new CopyOnWriteArrayList<>();
|
private final List<ILMHistoryItem> items = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
NoOpHistoryStore(Client noopClient, ClusterService clusterService) {
|
NoOpHistoryStore(Client noopClient, ClusterService clusterService) {
|
||||||
super(noopClient, clusterService, clusterService.threadPool());
|
super(noopClient, clusterService, clusterService.threadPool(), TestProjectResolvers.alwaysThrow());
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<ILMHistoryItem> getItems() {
|
public List<ILMHistoryItem> getItems() {
|
||||||
|
@ -1274,7 +1276,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void putAsync(ILMHistoryItem item) {
|
public void putAsync(ProjectId projectId, ILMHistoryItem item) {
|
||||||
logger.info("--> adding ILM history item: [{}]", item);
|
logger.info("--> adding ILM history item: [{}]", item);
|
||||||
items.add(item);
|
items.add(item);
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,9 @@ import org.elasticsearch.action.index.IndexResponse;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
|
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
|
||||||
import org.elasticsearch.cluster.metadata.Metadata;
|
import org.elasticsearch.cluster.metadata.Metadata;
|
||||||
|
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||||
|
import org.elasticsearch.cluster.metadata.ProjectMetadata;
|
||||||
|
import org.elasticsearch.cluster.project.TestProjectResolvers;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.TriFunction;
|
import org.elasticsearch.common.TriFunction;
|
||||||
import org.elasticsearch.common.settings.ClusterSettings;
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
|
@ -65,6 +68,7 @@ public class ILMHistoryStoreTests extends ESTestCase {
|
||||||
private VerifyingClient client;
|
private VerifyingClient client;
|
||||||
private ClusterService clusterService;
|
private ClusterService clusterService;
|
||||||
private ILMHistoryStore historyStore;
|
private ILMHistoryStore historyStore;
|
||||||
|
private ProjectId projectId;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
|
@ -83,13 +87,21 @@ public class ILMHistoryStoreTests extends ESTestCase {
|
||||||
NamedXContentRegistry.EMPTY
|
NamedXContentRegistry.EMPTY
|
||||||
);
|
);
|
||||||
ClusterState state = clusterService.state();
|
ClusterState state = clusterService.state();
|
||||||
|
projectId = randomProjectIdOrDefault();
|
||||||
ClusterServiceUtils.setState(
|
ClusterServiceUtils.setState(
|
||||||
clusterService,
|
clusterService,
|
||||||
ClusterState.builder(state)
|
ClusterState.builder(state)
|
||||||
.metadata(Metadata.builder(state.metadata()).indexTemplates(registry.getComposableTemplateConfigs()))
|
.putProjectMetadata(ProjectMetadata.builder(projectId).indexTemplates(registry.getComposableTemplateConfigs()))
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
historyStore = new ILMHistoryStore(client, clusterService, threadPool, ActionListener.noop(), TimeValue.timeValueMillis(500));
|
historyStore = new ILMHistoryStore(
|
||||||
|
client,
|
||||||
|
clusterService,
|
||||||
|
threadPool,
|
||||||
|
TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext()),
|
||||||
|
ActionListener.noop(),
|
||||||
|
TimeValue.timeValueMillis(500)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -115,7 +127,7 @@ public class ILMHistoryStoreTests extends ESTestCase {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
historyStore.putAsync(record);
|
historyStore.putAsync(projectId, record);
|
||||||
assertFalse(latch.await(2, TimeUnit.SECONDS));
|
assertFalse(latch.await(2, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -156,7 +168,7 @@ public class ILMHistoryStoreTests extends ESTestCase {
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
historyStore.putAsync(record);
|
historyStore.putAsync(projectId, record);
|
||||||
assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
|
assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -207,7 +219,7 @@ public class ILMHistoryStoreTests extends ESTestCase {
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
historyStore.putAsync(record);
|
historyStore.putAsync(projectId, record);
|
||||||
assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
|
assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -255,7 +267,13 @@ public class ILMHistoryStoreTests extends ESTestCase {
|
||||||
);
|
);
|
||||||
return bulkItemResponse;
|
return bulkItemResponse;
|
||||||
});
|
});
|
||||||
try (ILMHistoryStore localHistoryStore = new ILMHistoryStore(client, clusterService, threadPool, new ActionListener<>() {
|
try (
|
||||||
|
ILMHistoryStore localHistoryStore = new ILMHistoryStore(
|
||||||
|
client,
|
||||||
|
clusterService,
|
||||||
|
threadPool,
|
||||||
|
TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext()),
|
||||||
|
new ActionListener<>() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(BulkResponse response) {
|
public void onResponse(BulkResponse response) {
|
||||||
int itemsInResponse = response.getItems().length;
|
int itemsInResponse = response.getItems().length;
|
||||||
|
@ -271,7 +289,10 @@ public class ILMHistoryStoreTests extends ESTestCase {
|
||||||
logger.error(e);
|
logger.error(e);
|
||||||
fail(e.getMessage());
|
fail(e.getMessage());
|
||||||
}
|
}
|
||||||
}, TimeValue.timeValueMillis(randomIntBetween(50, 1000)))) {
|
},
|
||||||
|
TimeValue.timeValueMillis(randomIntBetween(50, 1000))
|
||||||
|
)
|
||||||
|
) {
|
||||||
for (int i = 0; i < numberOfDocs; i++) {
|
for (int i = 0; i < numberOfDocs; i++) {
|
||||||
ILMHistoryItem record1 = ILMHistoryItem.success(
|
ILMHistoryItem record1 = ILMHistoryItem.success(
|
||||||
"index",
|
"index",
|
||||||
|
@ -280,7 +301,7 @@ public class ILMHistoryStoreTests extends ESTestCase {
|
||||||
10L,
|
10L,
|
||||||
LifecycleExecutionState.builder().setPhase("phase").build()
|
LifecycleExecutionState.builder().setPhase("phase").build()
|
||||||
);
|
);
|
||||||
localHistoryStore.putAsync(record1);
|
localHistoryStore.putAsync(projectId, record1);
|
||||||
}
|
}
|
||||||
latch.await(5, TimeUnit.SECONDS);
|
latch.await(5, TimeUnit.SECONDS);
|
||||||
assertThat(actions.get(), equalTo(numberOfDocs));
|
assertThat(actions.get(), equalTo(numberOfDocs));
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue