mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-25 07:37:19 -04:00
Checkstyle shadows vars pt10 (#81361)
Part of #19752. Fix more instances where local variable names were shadowing field names.
This commit is contained in:
parent
bdc1b73006
commit
3e1fb1f930
59 changed files with 310 additions and 277 deletions
|
@ -107,6 +107,13 @@
|
||||||
|
|
||||||
<!-- Checks that a local variable or a parameter does not shadow a field that is defined in the same class. -->
|
<!-- Checks that a local variable or a parameter does not shadow a field that is defined in the same class. -->
|
||||||
<!-- Use a forked version that understands setters that don't start with "set". -->
|
<!-- Use a forked version that understands setters that don't start with "set". -->
|
||||||
|
<!-- Notes on `ignoredMethodNames`:
|
||||||
|
|
||||||
|
* `createParser` creates objects so should be considered a sort-of constructor
|
||||||
|
* `createComponents` by its nature ends up referring to fields
|
||||||
|
a lot, and there's no benefit to flagging shadowed variables in
|
||||||
|
those methods.
|
||||||
|
-->
|
||||||
<!-- Disabled until existing violations are fixed -->
|
<!-- Disabled until existing violations are fixed -->
|
||||||
<!--
|
<!--
|
||||||
<module name="org.elasticsearch.gradle.internal.checkstyle.HiddenFieldCheck">
|
<module name="org.elasticsearch.gradle.internal.checkstyle.HiddenFieldCheck">
|
||||||
|
@ -117,7 +124,7 @@
|
||||||
<property name="minLineCount" value="5" />
|
<property name="minLineCount" value="5" />
|
||||||
<property name="ignoreFormat" value="^(?:threadPool)$"/>
|
<property name="ignoreFormat" value="^(?:threadPool)$"/>
|
||||||
<property name="ignoreAbstractMethods" value="true"/>
|
<property name="ignoreAbstractMethods" value="true"/>
|
||||||
<property name="ignoreMethodNames" value="^(?:createParser)$"/>
|
<property name="ignoreMethodNames" value="^(?:createParser|createComponents)$"/>
|
||||||
<property name="setterCanReturnItsClass" value="true"/>
|
<property name="setterCanReturnItsClass" value="true"/>
|
||||||
<message key="hidden.field" value="''{0}'' hides a field." />
|
<message key="hidden.field" value="''{0}'' hides a field." />
|
||||||
</module>
|
</module>
|
||||||
|
|
|
@ -1114,7 +1114,7 @@ public class MachineLearning extends Plugin
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<RestHandler> getRestHandlers(
|
public List<RestHandler> getRestHandlers(
|
||||||
Settings settings,
|
Settings unused,
|
||||||
RestController restController,
|
RestController restController,
|
||||||
ClusterSettings clusterSettings,
|
ClusterSettings clusterSettings,
|
||||||
IndexScopedSettings indexScopedSettings,
|
IndexScopedSettings indexScopedSettings,
|
||||||
|
@ -1306,7 +1306,7 @@ public class MachineLearning extends Plugin
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
|
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings unused) {
|
||||||
if (false == enabled) {
|
if (false == enabled) {
|
||||||
return emptyList();
|
return emptyList();
|
||||||
}
|
}
|
||||||
|
@ -1496,7 +1496,7 @@ public class MachineLearning extends Plugin
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
|
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings unused) {
|
||||||
return List.of(
|
return List.of(
|
||||||
SystemIndexDescriptor.builder()
|
SystemIndexDescriptor.builder()
|
||||||
.setIndexPattern(MlMetaIndex.indexName() + "*")
|
.setIndexPattern(MlMetaIndex.indexName() + "*")
|
||||||
|
@ -1799,7 +1799,7 @@ public class MachineLearning extends Plugin
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BreakerSettings getCircuitBreaker(Settings settings) {
|
public BreakerSettings getCircuitBreaker(Settings settingsToUse) {
|
||||||
return BreakerSettings.updateFromSettings(
|
return BreakerSettings.updateFromSettings(
|
||||||
new BreakerSettings(
|
new BreakerSettings(
|
||||||
TRAINED_MODEL_CIRCUIT_BREAKER_NAME,
|
TRAINED_MODEL_CIRCUIT_BREAKER_NAME,
|
||||||
|
@ -1808,7 +1808,7 @@ public class MachineLearning extends Plugin
|
||||||
CircuitBreaker.Type.MEMORY,
|
CircuitBreaker.Type.MEMORY,
|
||||||
CircuitBreaker.Durability.TRANSIENT
|
CircuitBreaker.Durability.TRANSIENT
|
||||||
),
|
),
|
||||||
settings
|
settingsToUse
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -179,7 +179,7 @@ public class MlInitializationService implements ClusterStateListener {
|
||||||
|
|
||||||
// The atomic flag shortcircuits the check after no legacy templates have been found to exist.
|
// The atomic flag shortcircuits the check after no legacy templates have been found to exist.
|
||||||
if (this.isMaster && checkForLegacyMlTemplates.get()) {
|
if (this.isMaster && checkForLegacyMlTemplates.get()) {
|
||||||
if (deleteOneMlLegacyTemplateIfNecessary(client, event.state()) == false) {
|
if (deleteOneMlLegacyTemplateIfNecessary(event.state()) == false) {
|
||||||
checkForLegacyMlTemplates.set(false);
|
checkForLegacyMlTemplates.set(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -189,7 +189,7 @@ public class MlInitializationService implements ClusterStateListener {
|
||||||
* @return <code>true</code> if further calls to this method are worthwhile.
|
* @return <code>true</code> if further calls to this method are worthwhile.
|
||||||
* <code>false</code> if this method never needs to be called again.
|
* <code>false</code> if this method never needs to be called again.
|
||||||
*/
|
*/
|
||||||
private boolean deleteOneMlLegacyTemplateIfNecessary(Client client, ClusterState state) {
|
private boolean deleteOneMlLegacyTemplateIfNecessary(ClusterState state) {
|
||||||
|
|
||||||
String templateToDelete = nextTemplateToDelete(state.getMetadata().getTemplates());
|
String templateToDelete = nextTemplateToDelete(state.getMetadata().getTemplates());
|
||||||
if (templateToDelete != null) {
|
if (templateToDelete != null) {
|
||||||
|
|
|
@ -132,10 +132,10 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<
|
||||||
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
|
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
|
||||||
|
|
||||||
BooleanSupplier isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime);
|
BooleanSupplier isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime);
|
||||||
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService);
|
AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService);
|
||||||
|
|
||||||
if (Strings.isNullOrEmpty(request.getJobId()) || Strings.isAllOrWildcard(request.getJobId())) {
|
if (Strings.isNullOrEmpty(request.getJobId()) || Strings.isAllOrWildcard(request.getJobId())) {
|
||||||
List<MlDataRemover> dataRemovers = createDataRemovers(client, taskId, auditor);
|
List<MlDataRemover> dataRemovers = createDataRemovers(client, taskId, anomalyDetectionAuditor);
|
||||||
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
|
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
|
||||||
.execute(() -> deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier));
|
.execute(() -> deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier));
|
||||||
} else {
|
} else {
|
||||||
|
@ -144,7 +144,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<
|
||||||
List<Job> jobs = jobBuilders.stream().map(Job.Builder::build).collect(Collectors.toList());
|
List<Job> jobs = jobBuilders.stream().map(Job.Builder::build).collect(Collectors.toList());
|
||||||
String[] jobIds = jobs.stream().map(Job::getId).toArray(String[]::new);
|
String[] jobIds = jobs.stream().map(Job::getId).toArray(String[]::new);
|
||||||
request.setExpandedJobIds(jobIds);
|
request.setExpandedJobIds(jobIds);
|
||||||
List<MlDataRemover> dataRemovers = createDataRemovers(jobs, taskId, auditor);
|
List<MlDataRemover> dataRemovers = createDataRemovers(jobs, taskId, anomalyDetectionAuditor);
|
||||||
deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier);
|
deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier);
|
||||||
});
|
});
|
||||||
}, listener::onFailure));
|
}, listener::onFailure));
|
||||||
|
@ -232,40 +232,44 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<MlDataRemover> createDataRemovers(OriginSettingClient client, TaskId parentTaskId, AnomalyDetectionAuditor auditor) {
|
private List<MlDataRemover> createDataRemovers(
|
||||||
|
OriginSettingClient originClient,
|
||||||
|
TaskId parentTaskId,
|
||||||
|
AnomalyDetectionAuditor anomalyDetectionAuditor
|
||||||
|
) {
|
||||||
return Arrays.asList(
|
return Arrays.asList(
|
||||||
new ExpiredResultsRemover(
|
new ExpiredResultsRemover(
|
||||||
client,
|
originClient,
|
||||||
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)),
|
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(originClient)),
|
||||||
parentTaskId,
|
parentTaskId,
|
||||||
auditor,
|
anomalyDetectionAuditor,
|
||||||
threadPool
|
threadPool
|
||||||
),
|
),
|
||||||
new ExpiredForecastsRemover(client, threadPool, parentTaskId),
|
new ExpiredForecastsRemover(originClient, threadPool, parentTaskId),
|
||||||
new ExpiredModelSnapshotsRemover(
|
new ExpiredModelSnapshotsRemover(
|
||||||
client,
|
originClient,
|
||||||
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)),
|
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(originClient)),
|
||||||
threadPool,
|
threadPool,
|
||||||
parentTaskId,
|
parentTaskId,
|
||||||
jobResultsProvider,
|
jobResultsProvider,
|
||||||
auditor
|
anomalyDetectionAuditor
|
||||||
),
|
),
|
||||||
new UnusedStateRemover(client, parentTaskId),
|
new UnusedStateRemover(originClient, parentTaskId),
|
||||||
new EmptyStateIndexRemover(client, parentTaskId),
|
new EmptyStateIndexRemover(originClient, parentTaskId),
|
||||||
new UnusedStatsRemover(client, parentTaskId),
|
new UnusedStatsRemover(originClient, parentTaskId),
|
||||||
new ExpiredAnnotationsRemover(
|
new ExpiredAnnotationsRemover(
|
||||||
client,
|
originClient,
|
||||||
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(client)),
|
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(originClient)),
|
||||||
parentTaskId,
|
parentTaskId,
|
||||||
auditor,
|
anomalyDetectionAuditor,
|
||||||
threadPool
|
threadPool
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<MlDataRemover> createDataRemovers(List<Job> jobs, TaskId parentTaskId, AnomalyDetectionAuditor auditor) {
|
private List<MlDataRemover> createDataRemovers(List<Job> jobs, TaskId parentTaskId, AnomalyDetectionAuditor anomalyDetectionAuditor) {
|
||||||
return Arrays.asList(
|
return Arrays.asList(
|
||||||
new ExpiredResultsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, auditor, threadPool),
|
new ExpiredResultsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, anomalyDetectionAuditor, threadPool),
|
||||||
new ExpiredForecastsRemover(client, threadPool, parentTaskId),
|
new ExpiredForecastsRemover(client, threadPool, parentTaskId),
|
||||||
new ExpiredModelSnapshotsRemover(
|
new ExpiredModelSnapshotsRemover(
|
||||||
client,
|
client,
|
||||||
|
@ -273,12 +277,12 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<
|
||||||
threadPool,
|
threadPool,
|
||||||
parentTaskId,
|
parentTaskId,
|
||||||
jobResultsProvider,
|
jobResultsProvider,
|
||||||
auditor
|
anomalyDetectionAuditor
|
||||||
),
|
),
|
||||||
new UnusedStateRemover(client, parentTaskId),
|
new UnusedStateRemover(client, parentTaskId),
|
||||||
new EmptyStateIndexRemover(client, parentTaskId),
|
new EmptyStateIndexRemover(client, parentTaskId),
|
||||||
new UnusedStatsRemover(client, parentTaskId),
|
new UnusedStatsRemover(client, parentTaskId),
|
||||||
new ExpiredAnnotationsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, auditor, threadPool)
|
new ExpiredAnnotationsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, anomalyDetectionAuditor, threadPool)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -699,7 +699,7 @@ public class TransportStartDataFrameAnalyticsAction extends TransportMasterNodeA
|
||||||
public PersistentTasksCustomMetadata.Assignment getAssignment(
|
public PersistentTasksCustomMetadata.Assignment getAssignment(
|
||||||
TaskParams params,
|
TaskParams params,
|
||||||
Collection<DiscoveryNode> candidateNodes,
|
Collection<DiscoveryNode> candidateNodes,
|
||||||
ClusterState clusterState
|
@SuppressWarnings("HiddenField") ClusterState clusterState
|
||||||
) {
|
) {
|
||||||
boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
|
boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
|
||||||
Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment = getPotentialAssignment(
|
Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment = getPotentialAssignment(
|
||||||
|
|
|
@ -97,15 +97,15 @@ public class CategorizeTextAggregatorFactory extends AggregatorFactory {
|
||||||
+ "]"
|
+ "]"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
TermsAggregator.BucketCountThresholds bucketCountThresholds = new TermsAggregator.BucketCountThresholds(this.bucketCountThresholds);
|
TermsAggregator.BucketCountThresholds thresholds = new TermsAggregator.BucketCountThresholds(this.bucketCountThresholds);
|
||||||
if (bucketCountThresholds.getShardSize() == CategorizeTextAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize()) {
|
if (thresholds.getShardSize() == CategorizeTextAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize()) {
|
||||||
// The user has not made a shardSize selection. Use default
|
// The user has not made a shardSize selection. Use default
|
||||||
// heuristic to avoid any wrong-ranking caused by distributed
|
// heuristic to avoid any wrong-ranking caused by distributed
|
||||||
// counting
|
// counting
|
||||||
// TODO significant text does a 2x here, should we as well?
|
// TODO significant text does a 2x here, should we as well?
|
||||||
bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
|
thresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(thresholds.getRequiredSize()));
|
||||||
}
|
}
|
||||||
bucketCountThresholds.ensureValidity();
|
thresholds.ensureValidity();
|
||||||
|
|
||||||
return new CategorizeTextAggregator(
|
return new CategorizeTextAggregator(
|
||||||
name,
|
name,
|
||||||
|
@ -114,7 +114,7 @@ public class CategorizeTextAggregatorFactory extends AggregatorFactory {
|
||||||
parent,
|
parent,
|
||||||
indexedFieldName,
|
indexedFieldName,
|
||||||
fieldType,
|
fieldType,
|
||||||
bucketCountThresholds,
|
thresholds,
|
||||||
maxUniqueTokens,
|
maxUniqueTokens,
|
||||||
maxMatchTokens,
|
maxMatchTokens,
|
||||||
similarityThreshold,
|
similarityThreshold,
|
||||||
|
|
|
@ -53,14 +53,14 @@ public class InternalCategorizationAggregation extends InternalMultiBucketAggreg
|
||||||
return docCount;
|
return docCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Bucket reduce(BucketKey key, ReduceContext reduceContext) {
|
public Bucket reduce(BucketKey bucketKey, ReduceContext reduceContext) {
|
||||||
List<InternalAggregations> innerAggs = new ArrayList<>(toReduce.size());
|
List<InternalAggregations> innerAggs = new ArrayList<>(toReduce.size());
|
||||||
long docCount = 0;
|
long totalDocCount = 0;
|
||||||
for (Bucket bucket : toReduce) {
|
for (Bucket bucket : toReduce) {
|
||||||
innerAggs.add(bucket.aggregations);
|
innerAggs.add(bucket.aggregations);
|
||||||
docCount += bucket.docCount;
|
totalDocCount += bucket.docCount;
|
||||||
}
|
}
|
||||||
return new Bucket(key, docCount, InternalAggregations.reduce(innerAggs, reduceContext));
|
return new Bucket(bucketKey, totalDocCount, InternalAggregations.reduce(innerAggs, reduceContext));
|
||||||
}
|
}
|
||||||
|
|
||||||
public DelayedCategorizationBucket add(Bucket bucket) {
|
public DelayedCategorizationBucket add(Bucket bucket) {
|
||||||
|
@ -316,7 +316,7 @@ public class InternalCategorizationAggregation extends InternalMultiBucketAggreg
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalCategorizationAggregation create(List<Bucket> buckets) {
|
public InternalCategorizationAggregation create(List<Bucket> bucketList) {
|
||||||
return new InternalCategorizationAggregation(
|
return new InternalCategorizationAggregation(
|
||||||
name,
|
name,
|
||||||
requiredSize,
|
requiredSize,
|
||||||
|
@ -325,7 +325,7 @@ public class InternalCategorizationAggregation extends InternalMultiBucketAggreg
|
||||||
maxMatchTokens,
|
maxMatchTokens,
|
||||||
similarityThreshold,
|
similarityThreshold,
|
||||||
super.metadata,
|
super.metadata,
|
||||||
buckets
|
bucketList
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -264,10 +264,10 @@ public class InferencePipelineAggregationBuilder extends AbstractPipelineAggrega
|
||||||
|
|
||||||
SetOnce<LocalModel> loadedModel = new SetOnce<>();
|
SetOnce<LocalModel> loadedModel = new SetOnce<>();
|
||||||
BiConsumer<Client, ActionListener<?>> modelLoadAction = (client, listener) -> modelLoadingService.get()
|
BiConsumer<Client, ActionListener<?>> modelLoadAction = (client, listener) -> modelLoadingService.get()
|
||||||
.getModelForSearch(modelId, listener.delegateFailure((delegate, model) -> {
|
.getModelForSearch(modelId, listener.delegateFailure((delegate, localModel) -> {
|
||||||
loadedModel.set(model);
|
loadedModel.set(localModel);
|
||||||
|
|
||||||
boolean isLicensed = model.getLicenseLevel() == License.OperationMode.BASIC
|
boolean isLicensed = localModel.getLicenseLevel() == License.OperationMode.BASIC
|
||||||
|| MachineLearningField.ML_API_FEATURE.check(licenseState);
|
|| MachineLearningField.ML_API_FEATURE.check(licenseState);
|
||||||
if (isLicensed) {
|
if (isLicensed) {
|
||||||
delegate.onResponse(null);
|
delegate.onResponse(null);
|
||||||
|
|
|
@ -248,7 +248,7 @@ public class BucketCountKSTestAggregator extends SiblingPipelineAggregator {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
final MlAggsHelper.DoubleBucketValues bucketsValue = maybeBucketsValue.get();
|
final MlAggsHelper.DoubleBucketValues bucketsValue = maybeBucketsValue.get();
|
||||||
double[] fractions = this.fractions == null
|
double[] fractionsArr = this.fractions == null
|
||||||
? DoubleStream.concat(
|
? DoubleStream.concat(
|
||||||
DoubleStream.of(0.0),
|
DoubleStream.of(0.0),
|
||||||
Stream.generate(() -> 1.0 / (bucketsValue.getDocCounts().length - 1))
|
Stream.generate(() -> 1.0 / (bucketsValue.getDocCounts().length - 1))
|
||||||
|
@ -258,6 +258,6 @@ public class BucketCountKSTestAggregator extends SiblingPipelineAggregator {
|
||||||
// We prepend zero to the fractions as we prepend 0 to the doc counts and we want them to be the same length when
|
// We prepend zero to the fractions as we prepend 0 to the doc counts and we want them to be the same length when
|
||||||
// we create the monotonically increasing values for distribution comparison.
|
// we create the monotonically increasing values for distribution comparison.
|
||||||
: DoubleStream.concat(DoubleStream.of(0.0), Arrays.stream(this.fractions)).toArray();
|
: DoubleStream.concat(DoubleStream.of(0.0), Arrays.stream(this.fractions)).toArray();
|
||||||
return new InternalKSTestAggregation(name(), metadata(), ksTest(fractions, bucketsValue, alternatives, samplingMethod));
|
return new InternalKSTestAggregation(name(), metadata(), ksTest(fractionsArr, bucketsValue, alternatives, samplingMethod));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -604,15 +604,15 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
|
||||||
if (nodeLoads.size() > 1) {
|
if (nodeLoads.size() > 1) {
|
||||||
long totalAssignedJobs = nodeLoads.stream().mapToLong(NodeLoad::getNumAssignedJobs).sum();
|
long totalAssignedJobs = nodeLoads.stream().mapToLong(NodeLoad::getNumAssignedJobs).sum();
|
||||||
// one volatile read
|
// one volatile read
|
||||||
long maxOpenJobs = this.maxOpenJobs;
|
long maxOpenJobsCopy = this.maxOpenJobs;
|
||||||
if (totalAssignedJobs > maxOpenJobs) {
|
if (totalAssignedJobs > maxOpenJobsCopy) {
|
||||||
String msg = String.format(
|
String msg = String.format(
|
||||||
Locale.ROOT,
|
Locale.ROOT,
|
||||||
"not scaling down as the total number of jobs [%d] exceeds the setting [%s (%d)]. "
|
"not scaling down as the total number of jobs [%d] exceeds the setting [%s (%d)]. "
|
||||||
+ " To allow a scale down [%s] must be increased.",
|
+ " To allow a scale down [%s] must be increased.",
|
||||||
totalAssignedJobs,
|
totalAssignedJobs,
|
||||||
MAX_OPEN_JOBS_PER_NODE.getKey(),
|
MAX_OPEN_JOBS_PER_NODE.getKey(),
|
||||||
maxOpenJobs,
|
maxOpenJobsCopy,
|
||||||
MAX_OPEN_JOBS_PER_NODE.getKey()
|
MAX_OPEN_JOBS_PER_NODE.getKey()
|
||||||
);
|
);
|
||||||
logger.info(
|
logger.info(
|
||||||
|
|
|
@ -211,6 +211,7 @@ class ScrollDataExtractor implements DataExtractor {
|
||||||
searchHasShardFailure = true;
|
searchHasShardFailure = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("HiddenField")
|
||||||
protected SearchResponse executeSearchScrollRequest(String scrollId) {
|
protected SearchResponse executeSearchScrollRequest(String scrollId) {
|
||||||
return ClientHelper.executeWithHeaders(
|
return ClientHelper.executeWithHeaders(
|
||||||
context.headers,
|
context.headers,
|
||||||
|
|
|
@ -77,7 +77,7 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
|
||||||
ActionListener<DataExtractorFactory> listener
|
ActionListener<DataExtractorFactory> listener
|
||||||
) {
|
) {
|
||||||
|
|
||||||
// Step 2. Contruct the factory and notify listener
|
// Step 2. Construct the factory and notify listener
|
||||||
ActionListener<FieldCapabilitiesResponse> fieldCapabilitiesHandler = ActionListener.wrap(fieldCapabilitiesResponse -> {
|
ActionListener<FieldCapabilitiesResponse> fieldCapabilitiesHandler = ActionListener.wrap(fieldCapabilitiesResponse -> {
|
||||||
if (fieldCapabilitiesResponse.getIndices().length == 0) {
|
if (fieldCapabilitiesResponse.getIndices().length == 0) {
|
||||||
listener.onFailure(
|
listener.onFailure(
|
||||||
|
@ -89,10 +89,8 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
|
||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
TimeBasedExtractedFields extractedFields = TimeBasedExtractedFields.build(job, datafeed, fieldCapabilitiesResponse);
|
TimeBasedExtractedFields fields = TimeBasedExtractedFields.build(job, datafeed, fieldCapabilitiesResponse);
|
||||||
listener.onResponse(
|
listener.onResponse(new ScrollDataExtractorFactory(client, datafeed, job, fields, xContentRegistry, timingStatsReporter));
|
||||||
new ScrollDataExtractorFactory(client, datafeed, job, extractedFields, xContentRegistry, timingStatsReporter)
|
|
||||||
);
|
|
||||||
}, e -> {
|
}, e -> {
|
||||||
Throwable cause = ExceptionsHelper.unwrapCause(e);
|
Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||||
if (cause instanceof IndexNotFoundException) {
|
if (cause instanceof IndexNotFoundException) {
|
||||||
|
|
|
@ -153,7 +153,7 @@ public class DataFrameAnalyticsManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createStatsIndexAndUpdateMappingsIfNecessary(
|
private void createStatsIndexAndUpdateMappingsIfNecessary(
|
||||||
Client client,
|
Client clientToUse,
|
||||||
ClusterState clusterState,
|
ClusterState clusterState,
|
||||||
TimeValue masterNodeTimeout,
|
TimeValue masterNodeTimeout,
|
||||||
ActionListener<Boolean> listener
|
ActionListener<Boolean> listener
|
||||||
|
@ -162,7 +162,7 @@ public class DataFrameAnalyticsManager {
|
||||||
aBoolean -> ElasticsearchMappings.addDocMappingIfMissing(
|
aBoolean -> ElasticsearchMappings.addDocMappingIfMissing(
|
||||||
MlStatsIndex.writeAlias(),
|
MlStatsIndex.writeAlias(),
|
||||||
MlStatsIndex::wrappedMapping,
|
MlStatsIndex::wrappedMapping,
|
||||||
client,
|
clientToUse,
|
||||||
clusterState,
|
clusterState,
|
||||||
masterNodeTimeout,
|
masterNodeTimeout,
|
||||||
listener
|
listener
|
||||||
|
@ -170,7 +170,13 @@ public class DataFrameAnalyticsManager {
|
||||||
listener::onFailure
|
listener::onFailure
|
||||||
);
|
);
|
||||||
|
|
||||||
MlStatsIndex.createStatsIndexAndAliasIfNecessary(client, clusterState, expressionResolver, masterNodeTimeout, createIndexListener);
|
MlStatsIndex.createStatsIndexAndAliasIfNecessary(
|
||||||
|
clientToUse,
|
||||||
|
clusterState,
|
||||||
|
expressionResolver,
|
||||||
|
masterNodeTimeout,
|
||||||
|
createIndexListener
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void determineProgressAndResume(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) {
|
private void determineProgressAndResume(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) {
|
||||||
|
|
|
@ -213,7 +213,7 @@ public class DataFrameAnalyticsTask extends LicensedAllocatedPersistentTask impl
|
||||||
}
|
}
|
||||||
|
|
||||||
// Visible for testing
|
// Visible for testing
|
||||||
void persistProgress(Client client, String jobId, Runnable runnable) {
|
void persistProgress(Client clientToUse, String jobId, Runnable runnable) {
|
||||||
LOGGER.debug("[{}] Persisting progress", jobId);
|
LOGGER.debug("[{}] Persisting progress", jobId);
|
||||||
|
|
||||||
SetOnce<StoredProgress> storedProgress = new SetOnce<>();
|
SetOnce<StoredProgress> storedProgress = new SetOnce<>();
|
||||||
|
@ -266,7 +266,7 @@ public class DataFrameAnalyticsTask extends LicensedAllocatedPersistentTask impl
|
||||||
storedProgress.get().toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS);
|
storedProgress.get().toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS);
|
||||||
indexRequest.source(jsonBuilder);
|
indexRequest.source(jsonBuilder);
|
||||||
}
|
}
|
||||||
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, indexProgressDocListener);
|
executeAsyncWithOrigin(clientToUse, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, indexProgressDocListener);
|
||||||
}, e -> {
|
}, e -> {
|
||||||
LOGGER.error(
|
LOGGER.error(
|
||||||
new ParameterizedMessage(
|
new ParameterizedMessage(
|
||||||
|
@ -283,7 +283,7 @@ public class DataFrameAnalyticsTask extends LicensedAllocatedPersistentTask impl
|
||||||
SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern()).source(
|
SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern()).source(
|
||||||
new SearchSourceBuilder().size(1).query(new IdsQueryBuilder().addIds(progressDocId))
|
new SearchSourceBuilder().size(1).query(new IdsQueryBuilder().addIds(progressDocId))
|
||||||
);
|
);
|
||||||
executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, searchFormerProgressDocListener);
|
executeAsyncWithOrigin(clientToUse, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, searchFormerProgressDocListener);
|
||||||
}, e -> {
|
}, e -> {
|
||||||
LOGGER.error(
|
LOGGER.error(
|
||||||
new ParameterizedMessage(
|
new ParameterizedMessage(
|
||||||
|
|
|
@ -82,8 +82,8 @@ public class ExtractedFieldsDetector {
|
||||||
this.topNestedFieldPrefixes = findTopNestedFieldPrefixes(fieldCapabilitiesResponse);
|
this.topNestedFieldPrefixes = findTopNestedFieldPrefixes(fieldCapabilitiesResponse);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<String> findTopNestedFieldPrefixes(FieldCapabilitiesResponse fieldCapabilitiesResponse) {
|
private List<String> findTopNestedFieldPrefixes(FieldCapabilitiesResponse response) {
|
||||||
List<String> sortedNestedFieldPrefixes = fieldCapabilitiesResponse.get()
|
List<String> sortedNestedFieldPrefixes = response.get()
|
||||||
.keySet()
|
.keySet()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(field -> isNested(getMappingTypes(field)))
|
.filter(field -> isNested(getMappingTypes(field)))
|
||||||
|
|
|
@ -463,9 +463,9 @@ public class AnalyticsProcessManager {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private AnalyticsProcessConfig createProcessConfig(DataFrameDataExtractor dataExtractor, ExtractedFields extractedFields) {
|
private AnalyticsProcessConfig createProcessConfig(DataFrameDataExtractor extractor, ExtractedFields extractedFields) {
|
||||||
DataFrameDataExtractor.DataSummary dataSummary = dataExtractor.collectDataSummary();
|
DataFrameDataExtractor.DataSummary dataSummary = extractor.collectDataSummary();
|
||||||
Set<String> categoricalFields = dataExtractor.getCategoricalFields(config.getAnalysis());
|
Set<String> categoricalFields = extractor.getCategoricalFields(config.getAnalysis());
|
||||||
int threads = Math.min(config.getMaxNumThreads(), numAllocatedProcessors);
|
int threads = Math.min(config.getMaxNumThreads(), numAllocatedProcessors);
|
||||||
return new AnalyticsProcessConfig(
|
return new AnalyticsProcessConfig(
|
||||||
config.getId(),
|
config.getId(),
|
||||||
|
|
|
@ -206,7 +206,7 @@ public class TrainedModelAllocationNodeService implements ClusterStateListener {
|
||||||
|
|
||||||
public void stopDeploymentAndNotify(TrainedModelDeploymentTask task, String reason) {
|
public void stopDeploymentAndNotify(TrainedModelDeploymentTask task, String reason) {
|
||||||
ActionListener<Void> notifyDeploymentOfStopped = ActionListener.wrap(
|
ActionListener<Void> notifyDeploymentOfStopped = ActionListener.wrap(
|
||||||
stopped -> updateStoredState(
|
_void -> updateStoredState(
|
||||||
task.getModelId(),
|
task.getModelId(),
|
||||||
new RoutingStateAndReason(RoutingState.STOPPED, reason),
|
new RoutingStateAndReason(RoutingState.STOPPED, reason),
|
||||||
ActionListener.wrap(s -> {}, failure -> {})
|
ActionListener.wrap(s -> {}, failure -> {})
|
||||||
|
|
|
@ -378,35 +378,31 @@ public class DeploymentManager {
|
||||||
|
|
||||||
private void processResult(
|
private void processResult(
|
||||||
PyTorchResult pyTorchResult,
|
PyTorchResult pyTorchResult,
|
||||||
ProcessContext processContext,
|
ProcessContext context,
|
||||||
TokenizationResult tokenization,
|
TokenizationResult tokenization,
|
||||||
NlpTask.ResultProcessor inferenceResultsProcessor,
|
NlpTask.ResultProcessor inferenceResultsProcessor,
|
||||||
ActionListener<InferenceResults> listener
|
ActionListener<InferenceResults> resultsListener
|
||||||
) {
|
) {
|
||||||
if (pyTorchResult.isError()) {
|
if (pyTorchResult.isError()) {
|
||||||
listener.onFailure(new ElasticsearchStatusException(pyTorchResult.getError(), RestStatus.INTERNAL_SERVER_ERROR));
|
resultsListener.onFailure(new ElasticsearchStatusException(pyTorchResult.getError(), RestStatus.INTERNAL_SERVER_ERROR));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(() -> new ParameterizedMessage("[{}] retrieved result for request [{}]", context.task.getModelId(), requestId));
|
||||||
() -> new ParameterizedMessage("[{}] retrieved result for request [{}]", processContext.task.getModelId(), requestId)
|
|
||||||
);
|
|
||||||
if (notified.get()) {
|
if (notified.get()) {
|
||||||
// The request has timed out. No need to spend cycles processing the result.
|
// The request has timed out. No need to spend cycles processing the result.
|
||||||
logger.debug(
|
logger.debug(
|
||||||
() -> new ParameterizedMessage(
|
() -> new ParameterizedMessage(
|
||||||
"[{}] skipping result processing for request [{}] as the request has timed out",
|
"[{}] skipping result processing for request [{}] as the request has timed out",
|
||||||
processContext.task.getModelId(),
|
context.task.getModelId(),
|
||||||
requestId
|
requestId
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
InferenceResults results = inferenceResultsProcessor.processResult(tokenization, pyTorchResult);
|
InferenceResults results = inferenceResultsProcessor.processResult(tokenization, pyTorchResult);
|
||||||
logger.debug(
|
logger.debug(() -> new ParameterizedMessage("[{}] processed result for request [{}]", context.task.getModelId(), requestId));
|
||||||
() -> new ParameterizedMessage("[{}] processed result for request [{}]", processContext.task.getModelId(), requestId)
|
resultsListener.onResponse(results);
|
||||||
);
|
|
||||||
listener.onResponse(results);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,8 +35,8 @@ public class TrainedModelDeploymentTask extends CancellableTask implements Start
|
||||||
private final TaskParams params;
|
private final TaskParams params;
|
||||||
private final TrainedModelAllocationNodeService trainedModelAllocationNodeService;
|
private final TrainedModelAllocationNodeService trainedModelAllocationNodeService;
|
||||||
private volatile boolean stopped;
|
private volatile boolean stopped;
|
||||||
private final SetOnce<String> stoppedReason = new SetOnce<>();
|
private final SetOnce<String> stoppedReasonHolder = new SetOnce<>();
|
||||||
private final SetOnce<InferenceConfig> inferenceConfig = new SetOnce<>();
|
private final SetOnce<InferenceConfig> inferenceConfigHolder = new SetOnce<>();
|
||||||
private final XPackLicenseState licenseState;
|
private final XPackLicenseState licenseState;
|
||||||
private final LicensedFeature.Persistent licensedFeature;
|
private final LicensedFeature.Persistent licensedFeature;
|
||||||
|
|
||||||
|
@ -62,7 +62,7 @@ public class TrainedModelDeploymentTask extends CancellableTask implements Start
|
||||||
}
|
}
|
||||||
|
|
||||||
void init(InferenceConfig inferenceConfig) {
|
void init(InferenceConfig inferenceConfig) {
|
||||||
this.inferenceConfig.set(inferenceConfig);
|
this.inferenceConfigHolder.set(inferenceConfig);
|
||||||
licensedFeature.startTracking(licenseState, "model-" + params.getModelId());
|
licensedFeature.startTracking(licenseState, "model-" + params.getModelId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,14 +82,14 @@ public class TrainedModelDeploymentTask extends CancellableTask implements Start
|
||||||
logger.debug("[{}] Stopping due to reason [{}]", getModelId(), reason);
|
logger.debug("[{}] Stopping due to reason [{}]", getModelId(), reason);
|
||||||
licensedFeature.stopTracking(licenseState, "model-" + params.getModelId());
|
licensedFeature.stopTracking(licenseState, "model-" + params.getModelId());
|
||||||
stopped = true;
|
stopped = true;
|
||||||
stoppedReason.trySet(reason);
|
stoppedReasonHolder.trySet(reason);
|
||||||
trainedModelAllocationNodeService.stopDeploymentAndNotify(this, reason);
|
trainedModelAllocationNodeService.stopDeploymentAndNotify(this, reason);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stopWithoutNotification(String reason) {
|
public void stopWithoutNotification(String reason) {
|
||||||
licensedFeature.stopTracking(licenseState, "model-" + params.getModelId());
|
licensedFeature.stopTracking(licenseState, "model-" + params.getModelId());
|
||||||
logger.debug("[{}] Stopping due to reason [{}]", getModelId(), reason);
|
logger.debug("[{}] Stopping due to reason [{}]", getModelId(), reason);
|
||||||
stoppedReason.trySet(reason);
|
stoppedReasonHolder.trySet(reason);
|
||||||
stopped = true;
|
stopped = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,7 +98,7 @@ public class TrainedModelDeploymentTask extends CancellableTask implements Start
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<String> stoppedReason() {
|
public Optional<String> stoppedReason() {
|
||||||
return Optional.ofNullable(stoppedReason.get());
|
return Optional.ofNullable(stoppedReasonHolder.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -108,24 +108,24 @@ public class TrainedModelDeploymentTask extends CancellableTask implements Start
|
||||||
}
|
}
|
||||||
|
|
||||||
public void infer(Map<String, Object> doc, InferenceConfigUpdate update, TimeValue timeout, ActionListener<InferenceResults> listener) {
|
public void infer(Map<String, Object> doc, InferenceConfigUpdate update, TimeValue timeout, ActionListener<InferenceResults> listener) {
|
||||||
if (inferenceConfig.get() == null) {
|
if (inferenceConfigHolder.get() == null) {
|
||||||
listener.onFailure(
|
listener.onFailure(
|
||||||
ExceptionsHelper.badRequestException("[{}] inference not possible against uninitialized model", params.getModelId())
|
ExceptionsHelper.badRequestException("[{}] inference not possible against uninitialized model", params.getModelId())
|
||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (update.isSupported(inferenceConfig.get()) == false) {
|
if (update.isSupported(inferenceConfigHolder.get()) == false) {
|
||||||
listener.onFailure(
|
listener.onFailure(
|
||||||
ExceptionsHelper.badRequestException(
|
ExceptionsHelper.badRequestException(
|
||||||
"[{}] inference not possible. Task is configured with [{}] but received update of type [{}]",
|
"[{}] inference not possible. Task is configured with [{}] but received update of type [{}]",
|
||||||
params.getModelId(),
|
params.getModelId(),
|
||||||
inferenceConfig.get().getName(),
|
inferenceConfigHolder.get().getName(),
|
||||||
update.getName()
|
update.getName()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
trainedModelAllocationNodeService.infer(this, update.apply(inferenceConfig.get()), doc, timeout, listener);
|
trainedModelAllocationNodeService.infer(this, update.apply(inferenceConfigHolder.get()), doc, timeout, listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<ModelStats> modelStats() {
|
public Optional<ModelStats> modelStats() {
|
||||||
|
|
|
@ -68,35 +68,35 @@ public class ZeroShotClassificationProcessor implements NlpTask.Processor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public NlpTask.RequestBuilder getRequestBuilder(NlpConfig nlpConfig) {
|
public NlpTask.RequestBuilder getRequestBuilder(NlpConfig nlpConfig) {
|
||||||
final String[] labels;
|
final String[] labelsValue;
|
||||||
if (nlpConfig instanceof ZeroShotClassificationConfig) {
|
if (nlpConfig instanceof ZeroShotClassificationConfig) {
|
||||||
ZeroShotClassificationConfig zeroShotConfig = (ZeroShotClassificationConfig) nlpConfig;
|
ZeroShotClassificationConfig zeroShotConfig = (ZeroShotClassificationConfig) nlpConfig;
|
||||||
labels = zeroShotConfig.getLabels().toArray(new String[0]);
|
labelsValue = zeroShotConfig.getLabels().toArray(new String[0]);
|
||||||
} else {
|
} else {
|
||||||
labels = this.labels;
|
labelsValue = this.labels;
|
||||||
}
|
}
|
||||||
if (labels == null || labels.length == 0) {
|
if (labelsValue == null || labelsValue.length == 0) {
|
||||||
throw ExceptionsHelper.badRequestException("zero_shot_classification requires non-empty [labels]");
|
throw ExceptionsHelper.badRequestException("zero_shot_classification requires non-empty [labels]");
|
||||||
}
|
}
|
||||||
return new RequestBuilder(tokenizer, labels, hypothesisTemplate);
|
return new RequestBuilder(tokenizer, labelsValue, hypothesisTemplate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public NlpTask.ResultProcessor getResultProcessor(NlpConfig nlpConfig) {
|
public NlpTask.ResultProcessor getResultProcessor(NlpConfig nlpConfig) {
|
||||||
final String[] labels;
|
final String[] labelsValue;
|
||||||
final boolean isMultiLabel;
|
final boolean isMultiLabelValue;
|
||||||
final String resultsField;
|
final String resultsFieldValue;
|
||||||
if (nlpConfig instanceof ZeroShotClassificationConfig) {
|
if (nlpConfig instanceof ZeroShotClassificationConfig) {
|
||||||
ZeroShotClassificationConfig zeroShotConfig = (ZeroShotClassificationConfig) nlpConfig;
|
ZeroShotClassificationConfig zeroShotConfig = (ZeroShotClassificationConfig) nlpConfig;
|
||||||
labels = zeroShotConfig.getLabels().toArray(new String[0]);
|
labelsValue = zeroShotConfig.getLabels().toArray(new String[0]);
|
||||||
isMultiLabel = zeroShotConfig.isMultiLabel();
|
isMultiLabelValue = zeroShotConfig.isMultiLabel();
|
||||||
resultsField = zeroShotConfig.getResultsField();
|
resultsFieldValue = zeroShotConfig.getResultsField();
|
||||||
} else {
|
} else {
|
||||||
labels = this.labels;
|
labelsValue = this.labels;
|
||||||
isMultiLabel = this.isMultiLabel;
|
isMultiLabelValue = this.isMultiLabel;
|
||||||
resultsField = this.resultsField;
|
resultsFieldValue = this.resultsField;
|
||||||
}
|
}
|
||||||
return new ResultProcessor(entailmentPos, contraPos, labels, isMultiLabel, resultsField);
|
return new ResultProcessor(entailmentPos, contraPos, labelsValue, isMultiLabelValue, resultsFieldValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
static class RequestBuilder implements NlpTask.RequestBuilder {
|
static class RequestBuilder implements NlpTask.RequestBuilder {
|
||||||
|
|
|
@ -70,7 +70,7 @@ public class PyTorchResultProcessor {
|
||||||
logger.error(new ParameterizedMessage("[{}] Error processing results", deploymentId), e);
|
logger.error(new ParameterizedMessage("[{}] Error processing results", deploymentId), e);
|
||||||
}
|
}
|
||||||
pendingResults.forEach(
|
pendingResults.forEach(
|
||||||
(id, pendingResults) -> pendingResults.listener.onResponse(
|
(id, pendingResult) -> pendingResult.listener.onResponse(
|
||||||
new PyTorchResult(
|
new PyTorchResult(
|
||||||
id,
|
id,
|
||||||
null,
|
null,
|
||||||
|
@ -84,7 +84,7 @@ public class PyTorchResultProcessor {
|
||||||
pendingResults.clear();
|
pendingResults.clear();
|
||||||
} finally {
|
} finally {
|
||||||
pendingResults.forEach(
|
pendingResults.forEach(
|
||||||
(id, pendingResults) -> pendingResults.listener.onResponse(
|
(id, pendingResult) -> pendingResult.listener.onResponse(
|
||||||
new PyTorchResult(id, null, null, "inference canceled as process is stopping")
|
new PyTorchResult(id, null, null, "inference canceled as process is stopping")
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
|
@ -370,7 +370,7 @@ public class JobManager {
|
||||||
|
|
||||||
public void deleteJob(
|
public void deleteJob(
|
||||||
DeleteJobAction.Request request,
|
DeleteJobAction.Request request,
|
||||||
Client client,
|
Client clientToUse,
|
||||||
ClusterState state,
|
ClusterState state,
|
||||||
ActionListener<AcknowledgedResponse> listener
|
ActionListener<AcknowledgedResponse> listener
|
||||||
) {
|
) {
|
||||||
|
@ -404,7 +404,7 @@ public class JobManager {
|
||||||
);
|
);
|
||||||
|
|
||||||
// Step 1. Delete the physical storage
|
// Step 1. Delete the physical storage
|
||||||
new JobDataDeleter(client, jobId).deleteJobDocuments(
|
new JobDataDeleter(clientToUse, jobId).deleteJobDocuments(
|
||||||
jobConfigProvider,
|
jobConfigProvider,
|
||||||
indexNameExpressionResolver,
|
indexNameExpressionResolver,
|
||||||
state,
|
state,
|
||||||
|
|
|
@ -85,9 +85,9 @@ public class UpdateJobProcessNotifier {
|
||||||
private void stop() {
|
private void stop() {
|
||||||
orderedJobUpdates.clear();
|
orderedJobUpdates.clear();
|
||||||
|
|
||||||
ThreadPool.Cancellable cancellable = this.cancellable;
|
ThreadPool.Cancellable cancellableCopy = this.cancellable;
|
||||||
if (cancellable != null) {
|
if (cancellableCopy != null) {
|
||||||
cancellable.cancel();
|
cancellableCopy.cancel();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -421,7 +421,11 @@ public class JobDataDeleter {
|
||||||
deleteModelState(jobId, deleteStateHandler);
|
deleteModelState(jobId, deleteStateHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteResultsByQuery(String jobId, String[] indices, ActionListener<BulkByScrollResponse> listener) {
|
private void deleteResultsByQuery(
|
||||||
|
@SuppressWarnings("HiddenField") String jobId,
|
||||||
|
String[] indices,
|
||||||
|
ActionListener<BulkByScrollResponse> listener
|
||||||
|
) {
|
||||||
assert indices.length > 0;
|
assert indices.length > 0;
|
||||||
|
|
||||||
ActionListener<RefreshResponse> refreshListener = ActionListener.wrap(refreshResponse -> {
|
ActionListener<RefreshResponse> refreshListener = ActionListener.wrap(refreshResponse -> {
|
||||||
|
@ -442,7 +446,7 @@ public class JobDataDeleter {
|
||||||
executeAsyncWithOrigin(client, ML_ORIGIN, RefreshAction.INSTANCE, refreshRequest, refreshListener);
|
executeAsyncWithOrigin(client, ML_ORIGIN, RefreshAction.INSTANCE, refreshRequest, refreshListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteAliases(String jobId, ActionListener<AcknowledgedResponse> finishedHandler) {
|
private void deleteAliases(@SuppressWarnings("HiddenField") String jobId, ActionListener<AcknowledgedResponse> finishedHandler) {
|
||||||
final String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
|
final String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
|
||||||
final String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
|
final String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
|
||||||
|
|
||||||
|
@ -494,7 +498,7 @@ public class JobDataDeleter {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteQuantiles(String jobId, ActionListener<Boolean> finishedHandler) {
|
private void deleteQuantiles(@SuppressWarnings("HiddenField") String jobId, ActionListener<Boolean> finishedHandler) {
|
||||||
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
|
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
|
||||||
IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId));
|
IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId));
|
||||||
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()).setQuery(query)
|
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()).setQuery(query)
|
||||||
|
@ -511,7 +515,7 @@ public class JobDataDeleter {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteModelState(String jobId, ActionListener<BulkByScrollResponse> listener) {
|
private void deleteModelState(@SuppressWarnings("HiddenField") String jobId, ActionListener<BulkByScrollResponse> listener) {
|
||||||
GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request(jobId, null);
|
GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request(jobId, null);
|
||||||
request.setPageParams(new PageParams(0, MAX_SNAPSHOTS_TO_DELETE));
|
request.setPageParams(new PageParams(0, MAX_SNAPSHOTS_TO_DELETE));
|
||||||
executeAsyncWithOrigin(client, ML_ORIGIN, GetModelSnapshotsAction.INSTANCE, request, ActionListener.wrap(response -> {
|
executeAsyncWithOrigin(client, ML_ORIGIN, GetModelSnapshotsAction.INSTANCE, request, ActionListener.wrap(response -> {
|
||||||
|
@ -520,7 +524,11 @@ public class JobDataDeleter {
|
||||||
}, listener::onFailure));
|
}, listener::onFailure));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteCategorizerState(String jobId, int docNum, ActionListener<Boolean> finishedHandler) {
|
private void deleteCategorizerState(
|
||||||
|
@SuppressWarnings("HiddenField") String jobId,
|
||||||
|
int docNum,
|
||||||
|
ActionListener<Boolean> finishedHandler
|
||||||
|
) {
|
||||||
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
|
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
|
||||||
IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum));
|
IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum));
|
||||||
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()).setQuery(query)
|
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()).setQuery(query)
|
||||||
|
|
|
@ -132,7 +132,10 @@ public class JobResultsPersister {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void persistBucketInfluencersStandalone(String jobId, List<BucketInfluencer> bucketInfluencers) {
|
private void persistBucketInfluencersStandalone(
|
||||||
|
@SuppressWarnings("HiddenField") String jobId,
|
||||||
|
List<BucketInfluencer> bucketInfluencers
|
||||||
|
) {
|
||||||
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) {
|
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) {
|
||||||
for (BucketInfluencer bucketInfluencer : bucketInfluencers) {
|
for (BucketInfluencer bucketInfluencer : bucketInfluencers) {
|
||||||
String id = bucketInfluencer.getId();
|
String id = bucketInfluencer.getId();
|
||||||
|
@ -508,7 +511,7 @@ public class JobResultsPersister {
|
||||||
}
|
}
|
||||||
|
|
||||||
BulkResponse persist(Supplier<Boolean> shouldRetry, boolean requireAlias) {
|
BulkResponse persist(Supplier<Boolean> shouldRetry, boolean requireAlias) {
|
||||||
logCall(indexName);
|
logCall();
|
||||||
try {
|
try {
|
||||||
return resultsPersisterService.indexWithRetry(
|
return resultsPersisterService.indexWithRetry(
|
||||||
jobId,
|
jobId,
|
||||||
|
@ -533,7 +536,7 @@ public class JobResultsPersister {
|
||||||
}
|
}
|
||||||
|
|
||||||
void persist(ActionListener<IndexResponse> listener, boolean requireAlias) {
|
void persist(ActionListener<IndexResponse> listener, boolean requireAlias) {
|
||||||
logCall(indexName);
|
logCall();
|
||||||
|
|
||||||
try (XContentBuilder content = toXContentBuilder(object, params)) {
|
try (XContentBuilder content = toXContentBuilder(object, params)) {
|
||||||
IndexRequest indexRequest = new IndexRequest(indexName).id(id)
|
IndexRequest indexRequest = new IndexRequest(indexName).id(id)
|
||||||
|
@ -549,7 +552,7 @@ public class JobResultsPersister {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void logCall(String indexName) {
|
private void logCall() {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
if (id != null) {
|
if (id != null) {
|
||||||
logger.trace("[{}] ES API CALL: to index {} with ID [{}]", jobId, indexName, id);
|
logger.trace("[{}] ES API CALL: to index {} with ID [{}]", jobId, indexName, id);
|
||||||
|
|
|
@ -758,7 +758,7 @@ public class JobResultsProvider {
|
||||||
BucketsQueryBuilder query,
|
BucketsQueryBuilder query,
|
||||||
Consumer<QueryPage<Bucket>> handler,
|
Consumer<QueryPage<Bucket>> handler,
|
||||||
Consumer<Exception> errorHandler,
|
Consumer<Exception> errorHandler,
|
||||||
Client client
|
@SuppressWarnings("HiddenField") Client client
|
||||||
) throws ResourceNotFoundException {
|
) throws ResourceNotFoundException {
|
||||||
|
|
||||||
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
|
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
|
||||||
|
|
|
@ -137,8 +137,8 @@ public final class RecordsQueryBuilder {
|
||||||
.sort(sb)
|
.sort(sb)
|
||||||
.fetchSource(true);
|
.fetchSource(true);
|
||||||
|
|
||||||
for (String sortField : SECONDARY_SORT) {
|
for (String eachSortField : SECONDARY_SORT) {
|
||||||
searchSourceBuilder.sort(sortField, sortDescending ? SortOrder.DESC : SortOrder.ASC);
|
searchSourceBuilder.sort(eachSortField, sortDescending ? SortOrder.DESC : SortOrder.ASC);
|
||||||
}
|
}
|
||||||
|
|
||||||
return searchSourceBuilder;
|
return searchSourceBuilder;
|
||||||
|
|
|
@ -69,9 +69,9 @@ final class ProcessContext {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
void setRunning(AutodetectCommunicator autodetectCommunicator) {
|
void setRunning(AutodetectCommunicator communicator) {
|
||||||
assert lock.isHeldByCurrentThread();
|
assert lock.isHeldByCurrentThread();
|
||||||
state.setRunning(this, autodetectCommunicator);
|
state.setRunning(this, communicator);
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean setDying() {
|
boolean setDying() {
|
||||||
|
|
|
@ -414,37 +414,41 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private void assertRollupIndex(RollupActionConfig config, String sourceIndex, String rollupIndex) {
|
private void assertRollupIndex(RollupActionConfig config, String sourceIndex, String rollupIndexName) {
|
||||||
final CompositeAggregationBuilder aggregation = buildCompositeAggs("resp", config);
|
final CompositeAggregationBuilder aggregation = buildCompositeAggs("resp", config);
|
||||||
long numBuckets = 0;
|
long numBuckets = 0;
|
||||||
InternalComposite origResp = client().prepareSearch(sourceIndex).addAggregation(aggregation).get().getAggregations().get("resp");
|
InternalComposite origResp = client().prepareSearch(sourceIndex).addAggregation(aggregation).get().getAggregations().get("resp");
|
||||||
InternalComposite rollupResp = client().prepareSearch(rollupIndex).addAggregation(aggregation).get().getAggregations().get("resp");
|
InternalComposite rollupResp = client().prepareSearch(rollupIndexName)
|
||||||
|
.addAggregation(aggregation)
|
||||||
|
.get()
|
||||||
|
.getAggregations()
|
||||||
|
.get("resp");
|
||||||
while (origResp.afterKey() != null) {
|
while (origResp.afterKey() != null) {
|
||||||
numBuckets += origResp.getBuckets().size();
|
numBuckets += origResp.getBuckets().size();
|
||||||
assertThat(origResp, equalTo(rollupResp));
|
assertThat(origResp, equalTo(rollupResp));
|
||||||
aggregation.aggregateAfter(origResp.afterKey());
|
aggregation.aggregateAfter(origResp.afterKey());
|
||||||
origResp = client().prepareSearch(sourceIndex).addAggregation(aggregation).get().getAggregations().get("resp");
|
origResp = client().prepareSearch(sourceIndex).addAggregation(aggregation).get().getAggregations().get("resp");
|
||||||
rollupResp = client().prepareSearch(rollupIndex).addAggregation(aggregation).get().getAggregations().get("resp");
|
rollupResp = client().prepareSearch(rollupIndexName).addAggregation(aggregation).get().getAggregations().get("resp");
|
||||||
}
|
}
|
||||||
assertThat(origResp, equalTo(rollupResp));
|
assertThat(origResp, equalTo(rollupResp));
|
||||||
|
|
||||||
SearchResponse resp = client().prepareSearch(rollupIndex).setTrackTotalHits(true).get();
|
SearchResponse resp = client().prepareSearch(rollupIndexName).setTrackTotalHits(true).get();
|
||||||
assertThat(resp.getHits().getTotalHits().value, equalTo(numBuckets));
|
assertThat(resp.getHits().getTotalHits().value, equalTo(numBuckets));
|
||||||
|
|
||||||
GetIndexResponse indexSettingsResp = client().admin().indices().prepareGetIndex().addIndices(sourceIndex, rollupIndex).get();
|
GetIndexResponse indexSettingsResp = client().admin().indices().prepareGetIndex().addIndices(sourceIndex, rollupIndexName).get();
|
||||||
// Assert rollup metadata are set in index settings
|
// Assert rollup metadata are set in index settings
|
||||||
assertEquals(
|
assertEquals(
|
||||||
indexSettingsResp.getSetting(sourceIndex, "index.uuid"),
|
indexSettingsResp.getSetting(sourceIndex, "index.uuid"),
|
||||||
indexSettingsResp.getSetting(rollupIndex, "index.rollup.source.uuid")
|
indexSettingsResp.getSetting(rollupIndexName, "index.rollup.source.uuid")
|
||||||
);
|
);
|
||||||
assertEquals(
|
assertEquals(
|
||||||
indexSettingsResp.getSetting(sourceIndex, "index.provided_name"),
|
indexSettingsResp.getSetting(sourceIndex, "index.provided_name"),
|
||||||
indexSettingsResp.getSetting(rollupIndex, "index.rollup.source.name")
|
indexSettingsResp.getSetting(rollupIndexName, "index.rollup.source.name")
|
||||||
);
|
);
|
||||||
|
|
||||||
// Assert field mappings
|
// Assert field mappings
|
||||||
Map<String, Map<String, Object>> mappings = (Map<String, Map<String, Object>>) indexSettingsResp.getMappings()
|
Map<String, Map<String, Object>> mappings = (Map<String, Map<String, Object>>) indexSettingsResp.getMappings()
|
||||||
.get(rollupIndex)
|
.get(rollupIndexName)
|
||||||
.getSourceAsMap()
|
.getSourceAsMap()
|
||||||
.get("properties");
|
.get("properties");
|
||||||
|
|
||||||
|
@ -485,7 +489,7 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
|
||||||
// Assert that temporary index was removed
|
// Assert that temporary index was removed
|
||||||
expectThrows(
|
expectThrows(
|
||||||
IndexNotFoundException.class,
|
IndexNotFoundException.class,
|
||||||
() -> client().admin().indices().prepareGetIndex().addIndices(".rolluptmp-" + rollupIndex).get()
|
() -> client().admin().indices().prepareGetIndex().addIndices(".rolluptmp-" + rollupIndexName).get()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -399,7 +399,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
|
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings unused) {
|
||||||
return List.of(
|
return List.of(
|
||||||
SystemIndexDescriptor.builder()
|
SystemIndexDescriptor.builder()
|
||||||
.setIndexPattern(SNAPSHOT_BLOB_CACHE_INDEX_PATTERN)
|
.setIndexPattern(SNAPSHOT_BLOB_CACHE_INDEX_PATTERN)
|
||||||
|
@ -506,7 +506,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<RestHandler> getRestHandlers(
|
public List<RestHandler> getRestHandlers(
|
||||||
Settings settings,
|
Settings unused,
|
||||||
RestController restController,
|
RestController restController,
|
||||||
ClusterSettings clusterSettings,
|
ClusterSettings clusterSettings,
|
||||||
IndexScopedSettings indexScopedSettings,
|
IndexScopedSettings indexScopedSettings,
|
||||||
|
@ -533,11 +533,11 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {
|
public Collection<AllocationDecider> createAllocationDeciders(Settings settingsToUse, ClusterSettings clusterSettings) {
|
||||||
return List.of(
|
return List.of(
|
||||||
new SearchableSnapshotAllocationDecider(() -> SEARCHABLE_SNAPSHOT_FEATURE.checkWithoutTracking(getLicenseState())),
|
new SearchableSnapshotAllocationDecider(() -> SEARCHABLE_SNAPSHOT_FEATURE.checkWithoutTracking(getLicenseState())),
|
||||||
new SearchableSnapshotRepositoryExistsAllocationDecider(),
|
new SearchableSnapshotRepositoryExistsAllocationDecider(),
|
||||||
new SearchableSnapshotEnableAllocationDecider(settings, clusterSettings),
|
new SearchableSnapshotEnableAllocationDecider(settingsToUse, clusterSettings),
|
||||||
new HasFrozenCacheAllocationDecider(frozenCacheInfoService),
|
new HasFrozenCacheAllocationDecider(frozenCacheInfoService),
|
||||||
new DedicatedFrozenNodeAllocationDecider()
|
new DedicatedFrozenNodeAllocationDecider()
|
||||||
);
|
);
|
||||||
|
|
|
@ -94,7 +94,7 @@ public class SearchableSnapshotsStatsResponse extends BroadcastResponse {
|
||||||
if ("indices".equalsIgnoreCase(level) || "shards".equalsIgnoreCase(level)) {
|
if ("indices".equalsIgnoreCase(level) || "shards".equalsIgnoreCase(level)) {
|
||||||
builder.startObject("indices");
|
builder.startObject("indices");
|
||||||
final List<Index> indices = getStats().stream()
|
final List<Index> indices = getStats().stream()
|
||||||
.filter(stats -> stats.getStats().isEmpty() == false)
|
.filter(shardStats -> shardStats.getStats().isEmpty() == false)
|
||||||
.map(SearchableSnapshotShardStats::getShardRouting)
|
.map(SearchableSnapshotShardStats::getShardRouting)
|
||||||
.map(ShardRouting::index)
|
.map(ShardRouting::index)
|
||||||
.sorted(Comparator.comparing(Index::getName))
|
.sorted(Comparator.comparing(Index::getName))
|
||||||
|
|
|
@ -62,8 +62,8 @@ public class FailShardsOnInvalidLicenseClusterListener implements LicenseStateLi
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void licenseStateChanged() {
|
public synchronized void licenseStateChanged() {
|
||||||
final boolean allowed = SEARCHABLE_SNAPSHOT_FEATURE.checkWithoutTracking(xPackLicenseState);
|
final boolean isAllowed = SEARCHABLE_SNAPSHOT_FEATURE.checkWithoutTracking(xPackLicenseState);
|
||||||
if (allowed && this.allowed == false) {
|
if (isAllowed && this.allowed == false) {
|
||||||
rerouteService.reroute("reroute after license activation", Priority.NORMAL, new ActionListener<ClusterState>() {
|
rerouteService.reroute("reroute after license activation", Priority.NORMAL, new ActionListener<ClusterState>() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(ClusterState clusterState) {
|
public void onResponse(ClusterState clusterState) {
|
||||||
|
@ -76,7 +76,7 @@ public class FailShardsOnInvalidLicenseClusterListener implements LicenseStateLi
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
this.allowed = allowed;
|
this.allowed = isAllowed;
|
||||||
failActiveShardsIfNecessary();
|
failActiveShardsIfNecessary();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,15 +33,15 @@ public class SearchableSnapshotIndexFoldersDeletionListener implements IndexStor
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexEventListener.class);
|
private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexEventListener.class);
|
||||||
|
|
||||||
private final Supplier<CacheService> cacheService;
|
private final Supplier<CacheService> cacheServiceSupplier;
|
||||||
private final Supplier<FrozenCacheService> frozenCacheService;
|
private final Supplier<FrozenCacheService> frozenCacheServiceSupplier;
|
||||||
|
|
||||||
public SearchableSnapshotIndexFoldersDeletionListener(
|
public SearchableSnapshotIndexFoldersDeletionListener(
|
||||||
Supplier<CacheService> cacheService,
|
Supplier<CacheService> cacheServiceSupplier,
|
||||||
Supplier<FrozenCacheService> frozenCacheService
|
Supplier<FrozenCacheService> frozenCacheServiceSupplier
|
||||||
) {
|
) {
|
||||||
this.cacheService = Objects.requireNonNull(cacheService);
|
this.cacheServiceSupplier = Objects.requireNonNull(cacheServiceSupplier);
|
||||||
this.frozenCacheService = Objects.requireNonNull(frozenCacheService);
|
this.frozenCacheServiceSupplier = Objects.requireNonNull(frozenCacheServiceSupplier);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -61,7 +61,7 @@ public class SearchableSnapshotIndexFoldersDeletionListener implements IndexStor
|
||||||
}
|
}
|
||||||
|
|
||||||
private void markShardAsEvictedInCache(ShardId shardId, IndexSettings indexSettings) {
|
private void markShardAsEvictedInCache(ShardId shardId, IndexSettings indexSettings) {
|
||||||
final CacheService cacheService = this.cacheService.get();
|
final CacheService cacheService = this.cacheServiceSupplier.get();
|
||||||
assert cacheService != null : "cache service not initialized";
|
assert cacheService != null : "cache service not initialized";
|
||||||
|
|
||||||
logger.debug("{} marking shard as evicted in searchable snapshots cache (reason: cache files deleted from disk)", shardId);
|
logger.debug("{} marking shard as evicted in searchable snapshots cache (reason: cache files deleted from disk)", shardId);
|
||||||
|
@ -71,7 +71,7 @@ public class SearchableSnapshotIndexFoldersDeletionListener implements IndexStor
|
||||||
shardId
|
shardId
|
||||||
);
|
);
|
||||||
|
|
||||||
final FrozenCacheService frozenCacheService = this.frozenCacheService.get();
|
final FrozenCacheService frozenCacheService = this.frozenCacheServiceSupplier.get();
|
||||||
assert frozenCacheService != null : "frozen cache service not initialized";
|
assert frozenCacheService != null : "frozen cache service not initialized";
|
||||||
frozenCacheService.markShardAsEvictedInCache(
|
frozenCacheService.markShardAsEvictedInCache(
|
||||||
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()),
|
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()),
|
||||||
|
|
|
@ -65,16 +65,16 @@ public class SearchableSnapshotEnableAllocationDecider extends AllocationDecider
|
||||||
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
|
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
|
||||||
final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
|
final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
|
||||||
if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexMetadata.getSettings())) {
|
if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexMetadata.getSettings())) {
|
||||||
EnableAllocationDecider.Allocation enableAllocation = this.enableAllocation;
|
EnableAllocationDecider.Allocation enableAllocationCopy = this.enableAllocation;
|
||||||
boolean allocateOnRollingRestart = this.allocateOnRollingRestart;
|
boolean allocateOnRollingRestartCopy = this.allocateOnRollingRestart;
|
||||||
if (enableAllocation == EnableAllocationDecider.Allocation.PRIMARIES) {
|
if (enableAllocationCopy == EnableAllocationDecider.Allocation.PRIMARIES) {
|
||||||
if (allocateOnRollingRestart == false) {
|
if (allocateOnRollingRestartCopy == false) {
|
||||||
return allocation.decision(
|
return allocation.decision(
|
||||||
Decision.NO,
|
Decision.NO,
|
||||||
NAME,
|
NAME,
|
||||||
"no allocations of searchable snapshots allowed during rolling restart due to [%s=%s] and [%s=false]",
|
"no allocations of searchable snapshots allowed during rolling restart due to [%s=%s] and [%s=false]",
|
||||||
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(),
|
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(),
|
||||||
enableAllocation,
|
enableAllocationCopy,
|
||||||
SEARCHABLE_SNAPSHOTS_ALLOCATE_ON_ROLLING_RESTART.getKey()
|
SEARCHABLE_SNAPSHOTS_ALLOCATE_ON_ROLLING_RESTART.getKey()
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -523,8 +523,8 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener {
|
||||||
assert knownSnapshots != null;
|
assert knownSnapshots != null;
|
||||||
final Set<String> knownRepositories = existingRepositories;
|
final Set<String> knownRepositories = existingRepositories;
|
||||||
assert knownRepositories != null;
|
assert knownRepositories != null;
|
||||||
final Instant expirationTime = this.expirationTime;
|
final Instant expirationTimeCopy = this.expirationTime;
|
||||||
assert expirationTime != null;
|
assert expirationTimeCopy != null;
|
||||||
|
|
||||||
Object[] lastSortValues = null;
|
Object[] lastSortValues = null;
|
||||||
for (SearchHit searchHit : searchHits) {
|
for (SearchHit searchHit : searchHits) {
|
||||||
|
@ -551,7 +551,7 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener {
|
||||||
}
|
}
|
||||||
if (delete) {
|
if (delete) {
|
||||||
final Instant creationTime = getCreationTime(searchHit);
|
final Instant creationTime = getCreationTime(searchHit);
|
||||||
if (creationTime.isAfter(expirationTime)) {
|
if (creationTime.isAfter(expirationTimeCopy)) {
|
||||||
logger.trace(
|
logger.trace(
|
||||||
"blob store cache entry with id [{}] was created recently, skipping deletion",
|
"blob store cache entry with id [{}] was created recently, skipping deletion",
|
||||||
searchHit.getId()
|
searchHit.getId()
|
||||||
|
|
|
@ -65,8 +65,8 @@ public final class ByteRange implements Comparable<ByteRange> {
|
||||||
return start >= range.start() && end <= range.end();
|
return start >= range.start() && end <= range.end();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean contains(long start, long end) {
|
public boolean contains(long rangeStart, long rangeEnd) {
|
||||||
return start() <= start && end <= end();
|
return start() <= rangeStart && rangeEnd <= end();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ByteRange shift(long delta) {
|
public ByteRange shift(long delta) {
|
||||||
|
|
|
@ -180,8 +180,8 @@ public class CacheFile {
|
||||||
return tracker.getInitialLength();
|
return tracker.getInitialLength();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void acquire(final EvictionListener listener) throws IOException {
|
public void acquire(final EvictionListener evictionListener) throws IOException {
|
||||||
assert listener != null;
|
assert evictionListener != null;
|
||||||
|
|
||||||
ensureOpen();
|
ensureOpen();
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
|
@ -194,8 +194,8 @@ public class CacheFile {
|
||||||
channelRef = new FileChannelReference(fileExists ? OPEN_OPTIONS : CREATE_OPTIONS);
|
channelRef = new FileChannelReference(fileExists ? OPEN_OPTIONS : CREATE_OPTIONS);
|
||||||
fileExists = true;
|
fileExists = true;
|
||||||
}
|
}
|
||||||
final boolean added = listeners.add(listener);
|
final boolean added = listeners.add(evictionListener);
|
||||||
assert added : "listener already exists " + listener;
|
assert added : "listener already exists " + evictionListener;
|
||||||
}
|
}
|
||||||
success = true;
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -210,14 +210,14 @@ public class CacheFile {
|
||||||
assert invariant();
|
assert invariant();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void release(final EvictionListener listener) {
|
public void release(final EvictionListener evictionListener) {
|
||||||
assert listener != null;
|
assert evictionListener != null;
|
||||||
|
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
synchronized (listeners) {
|
synchronized (listeners) {
|
||||||
final boolean removed = listeners.remove(Objects.requireNonNull(listener));
|
final boolean removed = listeners.remove(Objects.requireNonNull(evictionListener));
|
||||||
assert removed : "listener does not exist " + listener;
|
assert removed : "listener does not exist " + evictionListener;
|
||||||
if (removed == false) {
|
if (removed == false) {
|
||||||
throw new IllegalStateException("Cannot remove an unknown listener");
|
throw new IllegalStateException("Cannot remove an unknown listener");
|
||||||
}
|
}
|
||||||
|
@ -251,15 +251,15 @@ public class CacheFile {
|
||||||
|
|
||||||
private boolean assertRefCounted(boolean isReleased) {
|
private boolean assertRefCounted(boolean isReleased) {
|
||||||
final boolean isEvicted = evicted.get();
|
final boolean isEvicted = evicted.get();
|
||||||
final boolean fileExists = Files.exists(file);
|
final boolean fileDoesExist = Files.exists(file);
|
||||||
assert isReleased == false || (isEvicted && fileExists == false)
|
assert isReleased == false || (isEvicted && fileDoesExist == false)
|
||||||
: "fully released cache file should be deleted from disk but got ["
|
: "fully released cache file should be deleted from disk but got ["
|
||||||
+ "released="
|
+ "released="
|
||||||
+ isReleased
|
+ isReleased
|
||||||
+ ", evicted="
|
+ ", evicted="
|
||||||
+ isEvicted
|
+ isEvicted
|
||||||
+ ", file exists="
|
+ ", file exists="
|
||||||
+ fileExists
|
+ fileDoesExist
|
||||||
+ ']';
|
+ ']';
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -274,7 +274,7 @@ public class CacheFile {
|
||||||
evictionListeners = new HashSet<>(listeners);
|
evictionListeners = new HashSet<>(listeners);
|
||||||
}
|
}
|
||||||
decrementRefCount();
|
decrementRefCount();
|
||||||
evictionListeners.forEach(listener -> listener.onEviction(this));
|
evictionListeners.forEach(eachListener -> eachListener.onEviction(this));
|
||||||
}
|
}
|
||||||
assert invariant();
|
assert invariant();
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,30 +65,30 @@ class ProgressListenableActionFuture extends AdapterActionFuture<Long, Long> {
|
||||||
* (inclusive) to {@code progress} (exclusive) is available. Calling this method potentially triggers the execution of one or
|
* (inclusive) to {@code progress} (exclusive) is available. Calling this method potentially triggers the execution of one or
|
||||||
* more listeners that are waiting for the progress to reach a value lower than the one just updated.
|
* more listeners that are waiting for the progress to reach a value lower than the one just updated.
|
||||||
*
|
*
|
||||||
* @param progress the new progress value
|
* @param progressValue the new progress value
|
||||||
*/
|
*/
|
||||||
public void onProgress(final long progress) {
|
public void onProgress(final long progressValue) {
|
||||||
ensureNotCompleted();
|
ensureNotCompleted();
|
||||||
|
|
||||||
if (progress <= start) {
|
if (progressValue <= start) {
|
||||||
assert false : progress + " <= " + start;
|
assert false : progressValue + " <= " + start;
|
||||||
throw new IllegalArgumentException("Cannot update progress with a value less than [start=" + start + ']');
|
throw new IllegalArgumentException("Cannot update progress with a value less than [start=" + start + ']');
|
||||||
}
|
}
|
||||||
if (end < progress) {
|
if (end < progressValue) {
|
||||||
assert false : end + " < " + progress;
|
assert false : end + " < " + progressValue;
|
||||||
throw new IllegalArgumentException("Cannot update progress with a value greater than [end=" + end + ']');
|
throw new IllegalArgumentException("Cannot update progress with a value greater than [end=" + end + ']');
|
||||||
}
|
}
|
||||||
|
|
||||||
List<ActionListener<Long>> listenersToExecute = null;
|
List<ActionListener<Long>> listenersToExecute = null;
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
assert this.progress < progress : this.progress + " < " + progress;
|
assert this.progress < progressValue : this.progress + " < " + progressValue;
|
||||||
this.progress = progress;
|
this.progress = progressValue;
|
||||||
|
|
||||||
final List<Tuple<Long, ActionListener<Long>>> listeners = this.listeners;
|
final List<Tuple<Long, ActionListener<Long>>> listenersCopy = this.listeners;
|
||||||
if (listeners != null) {
|
if (listenersCopy != null) {
|
||||||
List<Tuple<Long, ActionListener<Long>>> listenersToKeep = null;
|
List<Tuple<Long, ActionListener<Long>>> listenersToKeep = null;
|
||||||
for (Tuple<Long, ActionListener<Long>> listener : listeners) {
|
for (Tuple<Long, ActionListener<Long>> listener : listenersCopy) {
|
||||||
if (progress < listener.v1()) {
|
if (progressValue < listener.v1()) {
|
||||||
if (listenersToKeep == null) {
|
if (listenersToKeep == null) {
|
||||||
listenersToKeep = new ArrayList<>();
|
listenersToKeep = new ArrayList<>();
|
||||||
}
|
}
|
||||||
|
@ -104,7 +104,7 @@ class ProgressListenableActionFuture extends AdapterActionFuture<Long, Long> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (listenersToExecute != null) {
|
if (listenersToExecute != null) {
|
||||||
listenersToExecute.forEach(listener -> executeListener(listener, () -> progress));
|
listenersToExecute.forEach(listener -> executeListener(listener, () -> progressValue));
|
||||||
}
|
}
|
||||||
assert invariant();
|
assert invariant();
|
||||||
}
|
}
|
||||||
|
@ -152,22 +152,22 @@ class ProgressListenableActionFuture extends AdapterActionFuture<Long, Long> {
|
||||||
*/
|
*/
|
||||||
public void addListener(ActionListener<Long> listener, long value) {
|
public void addListener(ActionListener<Long> listener, long value) {
|
||||||
boolean executeImmediate = false;
|
boolean executeImmediate = false;
|
||||||
final long progress;
|
final long progressValue;
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
progress = this.progress;
|
progressValue = this.progress;
|
||||||
if (completed || value <= progress) {
|
if (completed || value <= progressValue) {
|
||||||
executeImmediate = true;
|
executeImmediate = true;
|
||||||
} else {
|
} else {
|
||||||
List<Tuple<Long, ActionListener<Long>>> listeners = this.listeners;
|
List<Tuple<Long, ActionListener<Long>>> listenersCopy = this.listeners;
|
||||||
if (listeners == null) {
|
if (listenersCopy == null) {
|
||||||
listeners = new ArrayList<>();
|
listenersCopy = new ArrayList<>();
|
||||||
}
|
}
|
||||||
listeners.add(Tuple.tuple(value, listener));
|
listenersCopy.add(Tuple.tuple(value, listener));
|
||||||
this.listeners = listeners;
|
this.listeners = listenersCopy;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (executeImmediate) {
|
if (executeImmediate) {
|
||||||
executeListener(listener, completed ? () -> actionGet(0L) : () -> progress);
|
executeListener(listener, completed ? () -> actionGet(0L) : () -> progressValue);
|
||||||
}
|
}
|
||||||
assert invariant();
|
assert invariant();
|
||||||
}
|
}
|
||||||
|
|
|
@ -372,13 +372,13 @@ public class FrozenCacheService implements Releasable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public CacheFileRegion get(CacheKey cacheKey, long fileLength, int region) {
|
public CacheFileRegion get(CacheKey cacheKey, long fileLength, int region) {
|
||||||
final long regionSize = getRegionSize(fileLength, region);
|
final long effectiveRegionSize = getRegionSize(fileLength, region);
|
||||||
try (Releasable ignore = keyedLock.acquire(cacheKey)) {
|
try (Releasable ignore = keyedLock.acquire(cacheKey)) {
|
||||||
final RegionKey regionKey = new RegionKey(cacheKey, region);
|
final RegionKey regionKey = new RegionKey(cacheKey, region);
|
||||||
final long now = currentTimeSupplier.getAsLong();
|
final long now = currentTimeSupplier.getAsLong();
|
||||||
final Entry<CacheFileRegion> entry = keyMapping.computeIfAbsent(
|
final Entry<CacheFileRegion> entry = keyMapping.computeIfAbsent(
|
||||||
regionKey,
|
regionKey,
|
||||||
key -> new Entry<>(new CacheFileRegion(regionKey, regionSize), now)
|
key -> new Entry<>(new CacheFileRegion(regionKey, effectiveRegionSize), now)
|
||||||
);
|
);
|
||||||
if (entry.chunk.sharedBytesPos == -1) {
|
if (entry.chunk.sharedBytesPos == -1) {
|
||||||
// new item
|
// new item
|
||||||
|
@ -885,15 +885,15 @@ public class FrozenCacheService implements Releasable {
|
||||||
final StepListener<Integer> lis = fileRegion.populateAndRead(
|
final StepListener<Integer> lis = fileRegion.populateAndRead(
|
||||||
subRangeToWrite,
|
subRangeToWrite,
|
||||||
subRangeToRead,
|
subRangeToRead,
|
||||||
(channel, channelPos, relativePos, length) -> {
|
(channel, channelPos, relativePos, len) -> {
|
||||||
assert regionOwners[fileRegion.sharedBytesPos].get() == fileRegion;
|
assert regionOwners[fileRegion.sharedBytesPos].get() == fileRegion;
|
||||||
assert channelPos >= fileRegion.physicalStartOffset() && channelPos + length <= fileRegion.physicalEndOffset();
|
assert channelPos >= fileRegion.physicalStartOffset() && channelPos + len <= fileRegion.physicalEndOffset();
|
||||||
return reader.onRangeAvailable(channel, channelPos, relativePos - readOffset, length);
|
return reader.onRangeAvailable(channel, channelPos, relativePos - readOffset, len);
|
||||||
},
|
},
|
||||||
(channel, channelPos, relativePos, length, progressUpdater) -> {
|
(channel, channelPos, relativePos, len, progressUpdater) -> {
|
||||||
assert regionOwners[fileRegion.sharedBytesPos].get() == fileRegion;
|
assert regionOwners[fileRegion.sharedBytesPos].get() == fileRegion;
|
||||||
assert channelPos >= fileRegion.physicalStartOffset() && channelPos + length <= fileRegion.physicalEndOffset();
|
assert channelPos >= fileRegion.physicalStartOffset() && channelPos + len <= fileRegion.physicalEndOffset();
|
||||||
writer.fillCacheRange(channel, channelPos, relativePos - writeOffset, length, progressUpdater);
|
writer.fillCacheRange(channel, channelPos, relativePos - writeOffset, len, progressUpdater);
|
||||||
},
|
},
|
||||||
executor
|
executor
|
||||||
);
|
);
|
||||||
|
|
|
@ -203,18 +203,19 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Loads the snapshot if and only if it the snapshot is not loaded yet.
|
* Loads the snapshot if and only if the snapshot is not loaded yet.
|
||||||
*
|
*
|
||||||
* @return true if the snapshot was loaded by executing this method, false otherwise
|
* @return true if the snapshot was loaded by executing this method, false otherwise
|
||||||
*/
|
*/
|
||||||
public boolean loadSnapshot(RecoveryState recoveryState, ActionListener<Void> preWarmListener) {
|
public boolean loadSnapshot(RecoveryState snapshotRecoveryState, ActionListener<Void> preWarmListener) {
|
||||||
assert recoveryState != null;
|
assert snapshotRecoveryState != null;
|
||||||
assert recoveryState instanceof SearchableSnapshotRecoveryState;
|
assert snapshotRecoveryState instanceof SearchableSnapshotRecoveryState;
|
||||||
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT
|
assert snapshotRecoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT
|
||||||
|| recoveryState.getRecoverySource().getType() == RecoverySource.Type.PEER : recoveryState.getRecoverySource().getType();
|
|| snapshotRecoveryState.getRecoverySource().getType() == RecoverySource.Type.PEER
|
||||||
|
: snapshotRecoveryState.getRecoverySource().getType();
|
||||||
assert assertCurrentThreadMayLoadSnapshot();
|
assert assertCurrentThreadMayLoadSnapshot();
|
||||||
// noinspection ConstantConditions in case assertions are disabled
|
// noinspection ConstantConditions in case assertions are disabled
|
||||||
if (recoveryState instanceof SearchableSnapshotRecoveryState == false) {
|
if (snapshotRecoveryState instanceof SearchableSnapshotRecoveryState == false) {
|
||||||
throw new IllegalArgumentException("A SearchableSnapshotRecoveryState instance was expected");
|
throw new IllegalArgumentException("A SearchableSnapshotRecoveryState instance was expected");
|
||||||
}
|
}
|
||||||
boolean alreadyLoaded = this.loaded;
|
boolean alreadyLoaded = this.loaded;
|
||||||
|
@ -227,7 +228,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
|
||||||
this.loaded = true;
|
this.loaded = true;
|
||||||
cleanExistingRegularShardFiles();
|
cleanExistingRegularShardFiles();
|
||||||
waitForPendingEvictions();
|
waitForPendingEvictions();
|
||||||
this.recoveryState = (SearchableSnapshotRecoveryState) recoveryState;
|
this.recoveryState = (SearchableSnapshotRecoveryState) snapshotRecoveryState;
|
||||||
prewarmCache(preWarmListener);
|
prewarmCache(preWarmListener);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -155,8 +155,8 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu
|
||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ByteRange rangeToReadFromBlobCache(long position, int length) {
|
protected ByteRange rangeToReadFromBlobCache(long position, int readLength) {
|
||||||
final long end = position + length;
|
final long end = position + readLength;
|
||||||
if (headerBlobCacheByteRange.contains(position, end)) {
|
if (headerBlobCacheByteRange.contains(position, end)) {
|
||||||
return headerBlobCacheByteRange;
|
return headerBlobCacheByteRange;
|
||||||
} else if (footerBlobCacheByteRange.contains(position, end)) {
|
} else if (footerBlobCacheByteRange.contains(position, end)) {
|
||||||
|
@ -170,23 +170,23 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu
|
||||||
* spans multiple blobs then this stream will request them in turn.
|
* spans multiple blobs then this stream will request them in turn.
|
||||||
*
|
*
|
||||||
* @param position The start of the range of bytes to read, relative to the start of the corresponding Lucene file.
|
* @param position The start of the range of bytes to read, relative to the start of the corresponding Lucene file.
|
||||||
* @param length The number of bytes to read
|
* @param readLength The number of bytes to read
|
||||||
*/
|
*/
|
||||||
protected InputStream openInputStreamFromBlobStore(final long position, final long length) throws IOException {
|
protected InputStream openInputStreamFromBlobStore(final long position, final long readLength) throws IOException {
|
||||||
assert assertCurrentThreadMayAccessBlobStore();
|
assert assertCurrentThreadMayAccessBlobStore();
|
||||||
if (fileInfo.numberOfParts() == 1L) {
|
if (fileInfo.numberOfParts() == 1L) {
|
||||||
assert position + length <= fileInfo.partBytes(0)
|
assert position + readLength <= fileInfo.partBytes(0)
|
||||||
: "cannot read [" + position + "-" + (position + length) + "] from [" + fileInfo + "]";
|
: "cannot read [" + position + "-" + (position + readLength) + "] from [" + fileInfo + "]";
|
||||||
stats.addBlobStoreBytesRequested(length);
|
stats.addBlobStoreBytesRequested(readLength);
|
||||||
return blobContainer.readBlob(fileInfo.partName(0), position, length);
|
return blobContainer.readBlob(fileInfo.partName(0), position, readLength);
|
||||||
} else {
|
} else {
|
||||||
final int startPart = getPartNumberForPosition(position);
|
final int startPart = getPartNumberForPosition(position);
|
||||||
final int endPart = getPartNumberForPosition(position + length - 1);
|
final int endPart = getPartNumberForPosition(position + readLength - 1);
|
||||||
|
|
||||||
for (int currentPart = startPart; currentPart <= endPart; currentPart++) {
|
for (int currentPart = startPart; currentPart <= endPart; currentPart++) {
|
||||||
final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L;
|
final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L;
|
||||||
final long endInPart = (currentPart == endPart)
|
final long endInPart = (currentPart == endPart)
|
||||||
? getRelativePositionInPart(position + length - 1) + 1
|
? getRelativePositionInPart(position + readLength - 1) + 1
|
||||||
: getLengthOfPart(currentPart);
|
: getLengthOfPart(currentPart);
|
||||||
stats.addBlobStoreBytesRequested(endInPart - startInPart);
|
stats.addBlobStoreBytesRequested(endInPart - startInPart);
|
||||||
}
|
}
|
||||||
|
@ -197,7 +197,7 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu
|
||||||
final int currentPart = startPart + slice;
|
final int currentPart = startPart + slice;
|
||||||
final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L;
|
final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L;
|
||||||
final long endInPart = (currentPart == endPart)
|
final long endInPart = (currentPart == endPart)
|
||||||
? getRelativePositionInPart(position + length - 1) + 1
|
? getRelativePositionInPart(position + readLength - 1) + 1
|
||||||
: getLengthOfPart(currentPart);
|
: getLengthOfPart(currentPart);
|
||||||
return blobContainer.readBlob(fileInfo.partName(currentPart), startInPart, endInPart - startInPart);
|
return blobContainer.readBlob(fileInfo.partName(currentPart), startInPart, endInPart - startInPart);
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ import static org.hamcrest.Matchers.is;
|
||||||
|
|
||||||
public class IndexPrivilegeIntegTests extends AbstractPrivilegeTestCase {
|
public class IndexPrivilegeIntegTests extends AbstractPrivilegeTestCase {
|
||||||
|
|
||||||
private String jsonDoc = "{ \"name\" : \"elasticsearch\", \"body\": \"foo bar\" }";
|
private final String jsonDoc = "{ \"name\" : \"elasticsearch\", \"body\": \"foo bar\" }";
|
||||||
|
|
||||||
private static final String ROLES = "all_cluster_role:\n"
|
private static final String ROLES = "all_cluster_role:\n"
|
||||||
+ " cluster: [ all ]\n"
|
+ " cluster: [ all ]\n"
|
||||||
|
@ -739,9 +739,9 @@ public class IndexPrivilegeIntegTests extends AbstractPrivilegeTestCase {
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case "delete":
|
case "delete":
|
||||||
String jsonDoc = "{ \"name\" : \"docToDelete\"}";
|
String jsonDocToDelete = "{ \"name\" : \"docToDelete\"}";
|
||||||
assertAccessIsAllowed("admin", "PUT", "/" + index + "/_doc/docToDelete", jsonDoc);
|
assertAccessIsAllowed("admin", "PUT", "/" + index + "/_doc/docToDelete", jsonDocToDelete);
|
||||||
assertAccessIsAllowed("admin", "PUT", "/" + index + "/_doc/docToDelete2", jsonDoc);
|
assertAccessIsAllowed("admin", "PUT", "/" + index + "/_doc/docToDelete2", jsonDocToDelete);
|
||||||
if (userIsAllowed) {
|
if (userIsAllowed) {
|
||||||
assertAccessIsAllowed(user, "DELETE", "/" + index + "/_doc/docToDelete");
|
assertAccessIsAllowed(user, "DELETE", "/" + index + "/_doc/docToDelete");
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -96,7 +96,7 @@ public class SecurityClearScrollTests extends SecurityIntegTestCase {
|
||||||
ClearScrollResponse clearScrollResponse = client().filterWithHeader(headers).prepareClearScroll().addScrollId("_all").get();
|
ClearScrollResponse clearScrollResponse = client().filterWithHeader(headers).prepareClearScroll().addScrollId("_all").get();
|
||||||
assertThat(clearScrollResponse.isSucceeded(), is(true));
|
assertThat(clearScrollResponse.isSucceeded(), is(true));
|
||||||
|
|
||||||
assertThatScrollIdsDoNotExist(scrollIds);
|
assertThatScrollIdsDoNotExist();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testThatClearingAllScrollIdsRequirePermissions() throws Exception {
|
public void testThatClearingAllScrollIdsRequirePermissions() throws Exception {
|
||||||
|
@ -116,10 +116,10 @@ public class SecurityClearScrollTests extends SecurityIntegTestCase {
|
||||||
assertThat(clearByIdScrollResponse.isSucceeded(), is(true));
|
assertThat(clearByIdScrollResponse.isSucceeded(), is(true));
|
||||||
|
|
||||||
// test with each id, that they do not exist
|
// test with each id, that they do not exist
|
||||||
assertThatScrollIdsDoNotExist(scrollIds);
|
assertThatScrollIdsDoNotExist();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertThatScrollIdsDoNotExist(List<String> scrollIds) {
|
private void assertThatScrollIdsDoNotExist() {
|
||||||
for (String scrollId : scrollIds) {
|
for (String scrollId : scrollIds) {
|
||||||
SearchPhaseExecutionException expectedException = expectThrows(
|
SearchPhaseExecutionException expectedException = expectThrows(
|
||||||
SearchPhaseExecutionException.class,
|
SearchPhaseExecutionException.class,
|
||||||
|
|
|
@ -60,6 +60,7 @@ public class CartesianPoint implements ToXContentFragment {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("HiddenField")
|
||||||
public CartesianPoint resetFromCoordinates(String value, final boolean ignoreZValue) {
|
public CartesianPoint resetFromCoordinates(String value, final boolean ignoreZValue) {
|
||||||
String[] vals = value.split(",");
|
String[] vals = value.split(",");
|
||||||
if (vals.length > 3 || vals.length < 2) {
|
if (vals.length > 3 || vals.length < 2) {
|
||||||
|
|
|
@ -276,21 +276,22 @@ public class CentroidCalculator {
|
||||||
* @param x the x-coordinate of the point
|
* @param x the x-coordinate of the point
|
||||||
* @param y the y-coordinate of the point
|
* @param y the y-coordinate of the point
|
||||||
* @param weight the associated weight of the coordinate
|
* @param weight the associated weight of the coordinate
|
||||||
|
* @param shapeType the associated shape type of the coordinate
|
||||||
*/
|
*/
|
||||||
private void addCoordinate(double x, double y, double weight, DimensionalShapeType dimensionalShapeType) {
|
private void addCoordinate(double x, double y, double weight, DimensionalShapeType shapeType) {
|
||||||
// x and y can be infinite due to really small areas and rounding problems
|
// x and y can be infinite due to really small areas and rounding problems
|
||||||
if (Double.isFinite(x) && Double.isFinite(y)) {
|
if (Double.isFinite(x) && Double.isFinite(y)) {
|
||||||
if (this.dimensionalShapeType == dimensionalShapeType) {
|
if (this.dimensionalShapeType == shapeType) {
|
||||||
compSumX.add(x * weight);
|
compSumX.add(x * weight);
|
||||||
compSumY.add(y * weight);
|
compSumY.add(y * weight);
|
||||||
compSumWeight.add(weight);
|
compSumWeight.add(weight);
|
||||||
this.dimensionalShapeType = dimensionalShapeType;
|
this.dimensionalShapeType = shapeType;
|
||||||
} else if (dimensionalShapeType.compareTo(this.dimensionalShapeType) > 0) {
|
} else if (shapeType.compareTo(this.dimensionalShapeType) > 0) {
|
||||||
// reset counters
|
// reset counters
|
||||||
compSumX.reset(x * weight, 0);
|
compSumX.reset(x * weight, 0);
|
||||||
compSumY.reset(y * weight, 0);
|
compSumY.reset(y * weight, 0);
|
||||||
compSumWeight.reset(weight, 0);
|
compSumWeight.reset(weight, 0);
|
||||||
this.dimensionalShapeType = dimensionalShapeType;
|
this.dimensionalShapeType = shapeType;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,6 +49,7 @@ class Extent {
|
||||||
this.posRight = posRight;
|
this.posRight = posRight;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("HiddenField")
|
||||||
public void reset(int top, int bottom, int negLeft, int negRight, int posLeft, int posRight) {
|
public void reset(int top, int bottom, int negLeft, int negRight, int posLeft, int posRight) {
|
||||||
this.top = top;
|
this.top = top;
|
||||||
this.bottom = bottom;
|
this.bottom = bottom;
|
||||||
|
|
|
@ -99,11 +99,11 @@ public class GeometryDocValueReader {
|
||||||
* Visit the triangle tree with the provided visitor
|
* Visit the triangle tree with the provided visitor
|
||||||
*/
|
*/
|
||||||
public void visit(TriangleTreeReader.Visitor visitor) throws IOException {
|
public void visit(TriangleTreeReader.Visitor visitor) throws IOException {
|
||||||
Extent extent = getExtent();
|
Extent geometryExtent = getExtent();
|
||||||
int thisMaxX = extent.maxX();
|
int thisMaxX = geometryExtent.maxX();
|
||||||
int thisMinX = extent.minX();
|
int thisMinX = geometryExtent.minX();
|
||||||
int thisMaxY = extent.maxY();
|
int thisMaxY = geometryExtent.maxY();
|
||||||
int thisMinY = extent.minY();
|
int thisMinY = geometryExtent.minY();
|
||||||
if (visitor.push(thisMinX, thisMinY, thisMaxX, thisMaxY)) {
|
if (visitor.push(thisMinX, thisMinY, thisMaxX, thisMaxY)) {
|
||||||
TriangleTreeReader.visit(input, visitor, thisMaxX, thisMaxY);
|
TriangleTreeReader.visit(input, visitor, thisMaxX, thisMaxY);
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,9 +62,9 @@ class Tile2DVisitor implements TriangleTreeReader.Visitor {
|
||||||
boolean ab = (metadata & 1 << 4) == 1 << 4;
|
boolean ab = (metadata & 1 << 4) == 1 << 4;
|
||||||
boolean bc = (metadata & 1 << 5) == 1 << 5;
|
boolean bc = (metadata & 1 << 5) == 1 << 5;
|
||||||
boolean ca = (metadata & 1 << 6) == 1 << 6;
|
boolean ca = (metadata & 1 << 6) == 1 << 6;
|
||||||
GeoRelation relation = relateTriangle(aX, aY, ab, bX, bY, bc, cX, cY, ca);
|
GeoRelation geoRelation = relateTriangle(aX, aY, ab, bX, bY, bc, cX, cY, ca);
|
||||||
if (relation != GeoRelation.QUERY_DISJOINT) {
|
if (geoRelation != GeoRelation.QUERY_DISJOINT) {
|
||||||
this.relation = relation;
|
this.relation = geoRelation;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,6 +89,7 @@ class Tile2DVisitor implements TriangleTreeReader.Visitor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@SuppressWarnings("HiddenField")
|
||||||
public boolean push(int minX, int minY, int maxX, int maxY) {
|
public boolean push(int minX, int minY, int maxX, int maxY) {
|
||||||
// exclude north and east boundary intersections with tiles from intersection consideration
|
// exclude north and east boundary intersections with tiles from intersection consideration
|
||||||
// for consistent tiling definition of shapes on the boundaries of tiles
|
// for consistent tiling definition of shapes on the boundaries of tiles
|
||||||
|
|
|
@ -49,9 +49,9 @@ class LatLonShapeDocValuesQuery extends Query {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString(String field) {
|
public String toString(String otherField) {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
if (this.field.equals(field) == false) {
|
if (this.field.equals(otherField) == false) {
|
||||||
sb.append(this.field);
|
sb.append(this.field);
|
||||||
sb.append(':');
|
sb.append(':');
|
||||||
sb.append(relation);
|
sb.append(relation);
|
||||||
|
|
|
@ -94,17 +94,17 @@ public class InternalGeoLine extends InternalAggregation implements GeoShapeMetr
|
||||||
@Override
|
@Override
|
||||||
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
|
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
|
||||||
int mergedSize = 0;
|
int mergedSize = 0;
|
||||||
boolean complete = true;
|
boolean reducedComplete = true;
|
||||||
boolean includeSorts = true;
|
boolean reducedIncludeSorts = true;
|
||||||
List<InternalGeoLine> internalGeoLines = new ArrayList<>(aggregations.size());
|
List<InternalGeoLine> internalGeoLines = new ArrayList<>(aggregations.size());
|
||||||
for (InternalAggregation aggregation : aggregations) {
|
for (InternalAggregation aggregation : aggregations) {
|
||||||
InternalGeoLine geoLine = (InternalGeoLine) aggregation;
|
InternalGeoLine geoLine = (InternalGeoLine) aggregation;
|
||||||
internalGeoLines.add(geoLine);
|
internalGeoLines.add(geoLine);
|
||||||
mergedSize += geoLine.line.length;
|
mergedSize += geoLine.line.length;
|
||||||
complete &= geoLine.complete;
|
reducedComplete &= geoLine.complete;
|
||||||
includeSorts &= geoLine.includeSorts;
|
reducedIncludeSorts &= geoLine.includeSorts;
|
||||||
}
|
}
|
||||||
complete &= mergedSize <= size;
|
reducedComplete &= mergedSize <= size;
|
||||||
int finalSize = Math.min(mergedSize, size);
|
int finalSize = Math.min(mergedSize, size);
|
||||||
|
|
||||||
MergedGeoLines mergedGeoLines = new MergedGeoLines(internalGeoLines, finalSize, sortOrder);
|
MergedGeoLines mergedGeoLines = new MergedGeoLines(internalGeoLines, finalSize, sortOrder);
|
||||||
|
@ -118,8 +118,8 @@ public class InternalGeoLine extends InternalAggregation implements GeoShapeMetr
|
||||||
mergedGeoLines.getFinalPoints(),
|
mergedGeoLines.getFinalPoints(),
|
||||||
mergedGeoLines.getFinalSortValues(),
|
mergedGeoLines.getFinalSortValues(),
|
||||||
getMetadata(),
|
getMetadata(),
|
||||||
complete,
|
reducedComplete,
|
||||||
includeSorts,
|
reducedIncludeSorts,
|
||||||
sortOrder,
|
sortOrder,
|
||||||
size
|
size
|
||||||
);
|
);
|
||||||
|
|
|
@ -72,6 +72,7 @@ public class BoundedGeoTileGridTiler extends AbstractGeoTileGridTiler {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@SuppressWarnings("HiddenField")
|
||||||
protected int setValuesForFullyContainedTile(int xTile, int yTile, int zTile, GeoShapeCellValues values, int valuesIndex) {
|
protected int setValuesForFullyContainedTile(int xTile, int yTile, int zTile, GeoShapeCellValues values, int valuesIndex) {
|
||||||
// For every level we go down, we half each dimension. The total number of splits is equal to 1 << (levelEnd - levelStart)
|
// For every level we go down, we half each dimension. The total number of splits is equal to 1 << (levelEnd - levelStart)
|
||||||
final int splits = 1 << precision - zTile;
|
final int splits = 1 << precision - zTile;
|
||||||
|
@ -101,9 +102,9 @@ public class BoundedGeoTileGridTiler extends AbstractGeoTileGridTiler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
final int minX = Math.max(this.minX, xTile * splits);
|
final int _minX = Math.max(this.minX, xTile * splits);
|
||||||
final int maxX = Math.min(this.maxX, xTile * splits + splits);
|
final int _maxX = Math.min(this.maxX, xTile * splits + splits);
|
||||||
for (int i = minX; i < maxX; i++) {
|
for (int i = _minX; i < _maxX; i++) {
|
||||||
for (int j = minY; j < maxY; j++) {
|
for (int j = minY; j < maxY; j++) {
|
||||||
assert validTile(i, j, precision);
|
assert validTile(i, j, precision);
|
||||||
values.add(valuesIndex++, GeoTileUtils.longEncodeTiles(precision, i, j));
|
values.add(valuesIndex++, GeoTileUtils.longEncodeTiles(precision, i, j));
|
||||||
|
|
|
@ -27,13 +27,13 @@ public class DelimitedTextStructureFinderFactory implements TextStructureFinderF
|
||||||
this.trimFields = trimFields;
|
this.trimFields = trimFields;
|
||||||
}
|
}
|
||||||
|
|
||||||
DelimitedTextStructureFinderFactory makeSimilar(Character quote, Boolean trimFields) {
|
DelimitedTextStructureFinderFactory makeSimilar(Character quote, Boolean shouldTrimFields) {
|
||||||
|
|
||||||
return new DelimitedTextStructureFinderFactory(
|
return new DelimitedTextStructureFinderFactory(
|
||||||
(char) csvPreference.getDelimiterChar(),
|
(char) csvPreference.getDelimiterChar(),
|
||||||
(quote == null) ? csvPreference.getQuoteChar() : quote,
|
(quote == null) ? csvPreference.getQuoteChar() : quote,
|
||||||
minFieldsPerRow,
|
minFieldsPerRow,
|
||||||
(trimFields == null) ? this.trimFields : trimFields
|
(shouldTrimFields == null) ? this.trimFields : shouldTrimFields
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -95,12 +95,12 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest
|
||||||
void setShardStats(ShardStats[] shardStats) {
|
void setShardStats(ShardStats[] shardStats) {
|
||||||
this.shardStats = shardStats;
|
this.shardStats = shardStats;
|
||||||
|
|
||||||
Set<String> indices = new HashSet<>();
|
Set<String> indicesSet = new HashSet<>();
|
||||||
for (ShardStats s : shardStats) {
|
for (ShardStats s : shardStats) {
|
||||||
indices.add(s.getShardRouting().getIndexName());
|
indicesSet.add(s.getShardRouting().getIndexName());
|
||||||
}
|
}
|
||||||
|
|
||||||
this.indices = indices.toArray(new String[0]);
|
this.indices = indicesSet.toArray(new String[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
|
|
@ -154,7 +154,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<RestHandler> getRestHandlers(
|
public List<RestHandler> getRestHandlers(
|
||||||
final Settings settings,
|
final Settings unused,
|
||||||
final RestController restController,
|
final RestController restController,
|
||||||
final ClusterSettings clusterSettings,
|
final ClusterSettings clusterSettings,
|
||||||
final IndexScopedSettings indexScopedSettings,
|
final IndexScopedSettings indexScopedSettings,
|
||||||
|
@ -202,9 +202,9 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
|
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settingsToUse) {
|
||||||
FixedExecutorBuilder indexing = new FixedExecutorBuilder(
|
FixedExecutorBuilder indexing = new FixedExecutorBuilder(
|
||||||
settings,
|
settingsToUse,
|
||||||
TASK_THREAD_POOL_NAME,
|
TASK_THREAD_POOL_NAME,
|
||||||
4,
|
4,
|
||||||
4,
|
4,
|
||||||
|
|
|
@ -72,7 +72,7 @@ class ClientTransformIndexer extends TransformIndexer {
|
||||||
private final Client client;
|
private final Client client;
|
||||||
private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);
|
private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);
|
||||||
|
|
||||||
private final AtomicReference<SeqNoPrimaryTermAndIndex> seqNoPrimaryTermAndIndex;
|
private final AtomicReference<SeqNoPrimaryTermAndIndex> seqNoPrimaryTermAndIndexHolder;
|
||||||
private final ConcurrentHashMap<String, PointInTimeBuilder> namedPits = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, PointInTimeBuilder> namedPits = new ConcurrentHashMap<>();
|
||||||
private volatile long pitCheckpoint;
|
private volatile long pitCheckpoint;
|
||||||
private volatile boolean disablePit = false;
|
private volatile boolean disablePit = false;
|
||||||
|
@ -107,7 +107,7 @@ class ClientTransformIndexer extends TransformIndexer {
|
||||||
context
|
context
|
||||||
);
|
);
|
||||||
this.client = ExceptionsHelper.requireNonNull(client, "client");
|
this.client = ExceptionsHelper.requireNonNull(client, "client");
|
||||||
this.seqNoPrimaryTermAndIndex = new AtomicReference<>(seqNoPrimaryTermAndIndex);
|
this.seqNoPrimaryTermAndIndexHolder = new AtomicReference<>(seqNoPrimaryTermAndIndex);
|
||||||
|
|
||||||
// TODO: move into context constructor
|
// TODO: move into context constructor
|
||||||
context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint);
|
context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint);
|
||||||
|
@ -329,7 +329,7 @@ class ClientTransformIndexer extends TransformIndexer {
|
||||||
newValue
|
newValue
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
boolean updated = seqNoPrimaryTermAndIndex.compareAndSet(expectedValue, newValue);
|
boolean updated = seqNoPrimaryTermAndIndexHolder.compareAndSet(expectedValue, newValue);
|
||||||
// This should never happen. We ONLY ever update this value if at initialization or we just finished updating the document
|
// This should never happen. We ONLY ever update this value if at initialization or we just finished updating the document
|
||||||
// famous last words...
|
// famous last words...
|
||||||
if (updated == false) {
|
if (updated == false) {
|
||||||
|
@ -337,7 +337,7 @@ class ClientTransformIndexer extends TransformIndexer {
|
||||||
"[{}] Unexpected change to internal state detected, expected [{}], got [{}]",
|
"[{}] Unexpected change to internal state detected, expected [{}], got [{}]",
|
||||||
transformConfig.getId(),
|
transformConfig.getId(),
|
||||||
expectedValue,
|
expectedValue,
|
||||||
seqNoPrimaryTermAndIndex.get()
|
seqNoPrimaryTermAndIndexHolder.get()
|
||||||
);
|
);
|
||||||
assert updated : "[" + getJobId() + "] unexpected change to seqNoPrimaryTermAndIndex.";
|
assert updated : "[" + getJobId() + "] unexpected change to seqNoPrimaryTermAndIndex.";
|
||||||
}
|
}
|
||||||
|
@ -345,7 +345,7 @@ class ClientTransformIndexer extends TransformIndexer {
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
SeqNoPrimaryTermAndIndex getSeqNoPrimaryTermAndIndex() {
|
SeqNoPrimaryTermAndIndex getSeqNoPrimaryTermAndIndex() {
|
||||||
return seqNoPrimaryTermAndIndex.get();
|
return seqNoPrimaryTermAndIndexHolder.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -321,8 +321,8 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
|
||||||
}
|
}
|
||||||
}, listener::onFailure);
|
}, listener::onFailure);
|
||||||
|
|
||||||
ActionListener<Map<String, String>> fieldMappingsListener = ActionListener.wrap(fieldMappings -> {
|
ActionListener<Map<String, String>> fieldMappingsListener = ActionListener.wrap(mappings -> {
|
||||||
this.fieldMappings = fieldMappings;
|
this.fieldMappings = mappings;
|
||||||
configurationReadyListener.onResponse(null);
|
configurationReadyListener.onResponse(null);
|
||||||
}, listener::onFailure);
|
}, listener::onFailure);
|
||||||
|
|
||||||
|
|
|
@ -181,8 +181,8 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
|
||||||
listener.onResponse(infoBuilder.build());
|
listener.onResponse(infoBuilder.build());
|
||||||
}, listener::onFailure);
|
}, listener::onFailure);
|
||||||
|
|
||||||
ClientTransformIndexer indexer = getIndexer();
|
ClientTransformIndexer transformIndexer = getIndexer();
|
||||||
if (indexer == null) {
|
if (transformIndexer == null) {
|
||||||
transformsCheckpointService.getCheckpointingInfo(
|
transformsCheckpointService.getCheckpointingInfo(
|
||||||
parentTaskClient,
|
parentTaskClient,
|
||||||
transform.getId(),
|
transform.getId(),
|
||||||
|
@ -193,12 +193,12 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
|
||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
indexer.getCheckpointProvider()
|
transformIndexer.getCheckpointProvider()
|
||||||
.getCheckpointingInfo(
|
.getCheckpointingInfo(
|
||||||
indexer.getLastCheckpoint(),
|
transformIndexer.getLastCheckpoint(),
|
||||||
indexer.getNextCheckpoint(),
|
transformIndexer.getNextCheckpoint(),
|
||||||
indexer.getPosition(),
|
transformIndexer.getPosition(),
|
||||||
indexer.getProgress(),
|
transformIndexer.getProgress(),
|
||||||
checkPointInfoListener
|
checkPointInfoListener
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -614,7 +614,7 @@ public class TransformIndexerStateTests extends ESTestCase {
|
||||||
AtomicReference<IndexerState> state,
|
AtomicReference<IndexerState> state,
|
||||||
Consumer<String> failureConsumer,
|
Consumer<String> failureConsumer,
|
||||||
ThreadPool threadPool,
|
ThreadPool threadPool,
|
||||||
TransformAuditor auditor,
|
TransformAuditor transformAuditor,
|
||||||
TransformIndexerPosition initialPosition,
|
TransformIndexerPosition initialPosition,
|
||||||
TransformIndexerStats jobStats,
|
TransformIndexerStats jobStats,
|
||||||
TransformContext context
|
TransformContext context
|
||||||
|
@ -624,7 +624,7 @@ public class TransformIndexerStateTests extends ESTestCase {
|
||||||
TransformServices transformServices = new TransformServices(
|
TransformServices transformServices = new TransformServices(
|
||||||
transformConfigManager,
|
transformConfigManager,
|
||||||
mock(TransformCheckpointService.class),
|
mock(TransformCheckpointService.class),
|
||||||
auditor,
|
transformAuditor,
|
||||||
mock(SchedulerEngine.class)
|
mock(SchedulerEngine.class)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -436,7 +436,7 @@ public class TransformIndexerTests extends ESTestCase {
|
||||||
AtomicReference<IndexerState> state,
|
AtomicReference<IndexerState> state,
|
||||||
Consumer<String> failureConsumer,
|
Consumer<String> failureConsumer,
|
||||||
ThreadPool threadPool,
|
ThreadPool threadPool,
|
||||||
TransformAuditor auditor,
|
TransformAuditor transformAuditor,
|
||||||
TransformIndexerStats jobStats,
|
TransformIndexerStats jobStats,
|
||||||
TransformContext context
|
TransformContext context
|
||||||
) {
|
) {
|
||||||
|
@ -445,7 +445,7 @@ public class TransformIndexerTests extends ESTestCase {
|
||||||
TransformServices transformServices = new TransformServices(
|
TransformServices transformServices = new TransformServices(
|
||||||
transformConfigManager,
|
transformConfigManager,
|
||||||
mock(TransformCheckpointService.class),
|
mock(TransformCheckpointService.class),
|
||||||
auditor,
|
transformAuditor,
|
||||||
mock(SchedulerEngine.class)
|
mock(SchedulerEngine.class)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue