diff --git a/muted-tests.yml b/muted-tests.yml index db632eb78f8e..8d451e2726b6 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -61,9 +61,6 @@ tests: - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_start_stop/Verify start transform reuses destination index} issue: https://github.com/elastic/elasticsearch/issues/115808 -- class: org.elasticsearch.search.StressSearchServiceReaperIT - method: testStressReaper - issue: https://github.com/elastic/elasticsearch/issues/115816 - class: org.elasticsearch.xpack.application.connector.ConnectorIndexServiceTests issue: https://github.com/elastic/elasticsearch/issues/116087 - class: org.elasticsearch.xpack.test.rest.XPackRestIT @@ -329,6 +326,8 @@ tests: - class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackIT method: testEnrichExplosionManyMatches issue: https://github.com/elastic/elasticsearch/issues/122913 +- class: org.elasticsearch.xpack.search.AsyncSearchSecurityIT + issue: https://github.com/elastic/elasticsearch/issues/122940 # Examples: # diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f463dce2ec70..94e8989bc5a1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2925,7 +2925,7 @@ public class InternalEngine extends Engine { * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time * of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene). */ - final Map extraCommitUserData = getCommitExtraUserData(); + final Map extraCommitUserData = getCommitExtraUserData(localCheckpoint); final Map commitData = Maps.newMapWithExpectedSize(8 + extraCommitUserData.size()); commitData.putAll(extraCommitUserData); commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID()); @@ -2973,8 +2973,10 @@ public class InternalEngine extends Engine { /** * Allows InternalEngine extenders to return custom key-value pairs which will be included in the Lucene commit user-data. Custom user * data keys can be overwritten by if their keys conflict keys used by InternalEngine. + * + * @param localCheckpoint the local checkpoint of the commit */ - protected Map getCommitExtraUserData() { + protected Map getCommitExtraUserData(final long localCheckpoint) { return Collections.emptyMap(); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d56d7471d498..7fecc53826ff 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -4491,14 +4491,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } /** - * Registers a listener for an event when the shard advances to the provided primary term and segment generation + * Registers a listener for an event when the shard advances to the provided primary term and segment generation. + * Completes the listener with a {@link IndexShardClosedException} if the shard is closed. */ public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGeneration, ActionListener listener) { - waitForEngineOrClosedShard( - listener.delegateFailureAndWrap( - (l, ignored) -> getEngine().addPrimaryTermAndGenerationListener(primaryTerm, segmentGeneration, l) - ) - ); + waitForEngineOrClosedShard(listener.delegateFailureAndWrap((l, ignored) -> { + if (state == IndexShardState.CLOSED) { + l.onFailure(new IndexShardClosedException(shardId)); + } else { + getEngine().addPrimaryTermAndGenerationListener(primaryTerm, segmentGeneration, l); + } + })); } /** diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java index e5a9886b4f45..f593f0d3923c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java @@ -58,7 +58,7 @@ class AvgAggregator extends SumAggregator { public void collect(int doc, long bucket) throws IOException { if (values.advanceExact(doc)) { maybeGrow(bucket); - computeSum(bucket, values, sums, compensations); + computeSum(bucket, values.doubleValue(), sums, compensations); counts.increment(bucket, 1L); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java index 073ab8e0a3d5..5328dad7eedd 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java @@ -105,7 +105,6 @@ class ExtendedStatsAggregator extends NumericMetricsAggregator.MultiDoubleValue @Override protected LeafBucketCollector getLeafCollector(NumericDoubleValues values, final LeafBucketCollector sub) { - final CompensatedSum compensatedSum = new CompensatedSum(0, 0); return new LeafBucketCollectorBase(sub, values) { @Override @@ -114,20 +113,9 @@ class ExtendedStatsAggregator extends NumericMetricsAggregator.MultiDoubleValue maybeGrow(bucket); final double value = values.doubleValue(); counts.increment(bucket, 1L); - // Compute the sum and sum of squires for double values with Kahan summation algorithm - // which is more accurate than naive summation. - compensatedSum.reset(sums.get(bucket), compensations.get(bucket)); - compensatedSum.add(value); - sums.set(bucket, compensatedSum.value()); - compensations.set(bucket, compensatedSum.delta()); - - compensatedSum.reset(sumOfSqrs.get(bucket), compensationOfSqrs.get(bucket)); - compensatedSum.add(value * value); - sumOfSqrs.set(bucket, compensatedSum.value()); - compensationOfSqrs.set(bucket, compensatedSum.delta()); - - mins.set(bucket, Math.min(mins.get(bucket), value)); - maxes.set(bucket, Math.max(maxes.get(bucket), value)); + SumAggregator.computeSum(bucket, value, sums, compensations); + SumAggregator.computeSum(bucket, value * value, sumOfSqrs, compensationOfSqrs); + StatsAggregator.updateMinsAndMaxes(bucket, value, mins, maxes); } } @@ -138,13 +126,14 @@ class ExtendedStatsAggregator extends NumericMetricsAggregator.MultiDoubleValue if (bucket >= counts.size()) { final long from = counts.size(); final long overSize = BigArrays.overSize(bucket + 1); - counts = bigArrays().resize(counts, overSize); - sums = bigArrays().resize(sums, overSize); - compensations = bigArrays().resize(compensations, overSize); - mins = bigArrays().resize(mins, overSize); - maxes = bigArrays().resize(maxes, overSize); - sumOfSqrs = bigArrays().resize(sumOfSqrs, overSize); - compensationOfSqrs = bigArrays().resize(compensationOfSqrs, overSize); + var bigArrays = bigArrays(); + counts = bigArrays.resize(counts, overSize); + sums = bigArrays.resize(sums, overSize); + compensations = bigArrays.resize(compensations, overSize); + mins = bigArrays.resize(mins, overSize); + maxes = bigArrays.resize(maxes, overSize); + sumOfSqrs = bigArrays.resize(sumOfSqrs, overSize); + compensationOfSqrs = bigArrays.resize(compensationOfSqrs, overSize); mins.fill(from, overSize, Double.POSITIVE_INFINITY); maxes.fill(from, overSize, Double.NEGATIVE_INFINITY); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GeoCentroidAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GeoCentroidAggregator.java index 874d3bd221e1..9eaa48a43298 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GeoCentroidAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GeoCentroidAggregator.java @@ -93,7 +93,6 @@ final class GeoCentroidAggregator extends MetricsAggregator { } private LeafBucketCollector getLeafCollector(GeoPointValues values, LeafBucketCollector sub) { - final CompensatedSum compensatedSum = new CompensatedSum(0, 0); return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long bucket) throws IOException { @@ -104,16 +103,8 @@ final class GeoCentroidAggregator extends MetricsAggregator { // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. final GeoPoint value = values.pointValue(); - // latitude - compensatedSum.reset(latSum.get(bucket), latCompensations.get(bucket)); - compensatedSum.add(value.getLat()); - latSum.set(bucket, compensatedSum.value()); - latCompensations.set(bucket, compensatedSum.delta()); - // longitude - compensatedSum.reset(lonSum.get(bucket), lonCompensations.get(bucket)); - compensatedSum.add(value.getLon()); - lonSum.set(bucket, compensatedSum.value()); - lonCompensations.set(bucket, compensatedSum.delta()); + SumAggregator.computeSum(bucket, value.getLat(), latSum, latCompensations); + SumAggregator.computeSum(bucket, value.getLon(), lonSum, lonCompensations); } } }; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java index 850fe4f7177d..a01d9cf61f8b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java @@ -81,36 +81,43 @@ class StatsAggregator extends NumericMetricsAggregator.MultiDoubleValue { @Override public LeafBucketCollector getLeafCollector(NumericDoubleValues values, LeafBucketCollector sub) { - final CompensatedSum kahanSummation = new CompensatedSum(0, 0); return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long bucket) throws IOException { if (values.advanceExact(doc)) { maybeGrow(bucket); counts.increment(bucket, 1L); - // Compute the sum of double values with Kahan summation algorithm which is more - // accurate than naive summation. - kahanSummation.reset(sums.get(bucket), compensations.get(bucket)); double value = values.doubleValue(); - kahanSummation.add(value); - sums.set(bucket, kahanSummation.value()); - compensations.set(bucket, kahanSummation.delta()); - mins.set(bucket, Math.min(mins.get(bucket), value)); - maxes.set(bucket, Math.max(maxes.get(bucket), value)); + SumAggregator.computeSum(bucket, value, sums, compensations); + updateMinsAndMaxes(bucket, value, mins, maxes); } } }; } + static void updateMinsAndMaxes(long bucket, double value, DoubleArray mins, DoubleArray maxes) { + double min = mins.get(bucket); + double updated = Math.min(value, min); + if (updated != min) { + mins.set(bucket, updated); + } + double max = maxes.get(bucket); + updated = Math.max(value, max); + if (updated != max) { + maxes.set(bucket, updated); + } + } + private void maybeGrow(long bucket) { if (bucket >= counts.size()) { final long from = counts.size(); final long overSize = BigArrays.overSize(bucket + 1); - counts = bigArrays().resize(counts, overSize); - sums = bigArrays().resize(sums, overSize); - compensations = bigArrays().resize(compensations, overSize); - mins = bigArrays().resize(mins, overSize); - maxes = bigArrays().resize(maxes, overSize); + var bigArrays = bigArrays(); + counts = bigArrays.resize(counts, overSize); + sums = bigArrays.resize(sums, overSize); + compensations = bigArrays.resize(compensations, overSize); + mins = bigArrays.resize(mins, overSize); + maxes = bigArrays.resize(maxes, overSize); mins.fill(from, overSize, Double.POSITIVE_INFINITY); maxes.fill(from, overSize, Double.NEGATIVE_INFINITY); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java index 237ba6dfe406..2d9b51b8f2d4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java @@ -97,16 +97,15 @@ public class SumAggregator extends NumericMetricsAggregator.SingleDoubleValue { public void collect(int doc, long bucket) throws IOException { if (values.advanceExact(doc)) { maybeGrow(bucket); - computeSum(bucket, values, sums, compensations); + computeSum(bucket, values.doubleValue(), sums, compensations); } } }; } - static void computeSum(long bucket, NumericDoubleValues values, DoubleArray sums, DoubleArray compensations) throws IOException { + static void computeSum(long bucket, double added, DoubleArray sums, DoubleArray compensations) { // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. - double added = values.doubleValue(); double value = addIfNonOrInf(added, sums.get(bucket)); if (Double.isFinite(value)) { double delta = compensations.get(bucket); @@ -122,8 +121,7 @@ public class SumAggregator extends NumericMetricsAggregator.SingleDoubleValue { protected final void maybeGrow(long bucket) { if (bucket >= sums.size()) { - var bigArrays = bigArrays(); - doGrow(bucket, bigArrays); + doGrow(bucket, bigArrays()); } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 26de6a789778..e154a3d62b4e 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -7327,7 +7327,7 @@ public class InternalEngineTests extends EngineTestCase { engine.close(); engine = new InternalEngine(engine.config()) { @Override - protected Map getCommitExtraUserData() { + protected Map getCommitExtraUserData(final long localCheckpoint) { return Map.of("userkey", "userdata", ES_VERSION, IndexVersions.ZERO.toString()); } }; diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index c07b396626c4..975565b73a0d 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -27,6 +27,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Constants; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; @@ -3334,6 +3335,21 @@ public class IndexShardTests extends IndexShardTestCase { assertThat("listener should have been called", called.get(), equalTo(true)); } + public void testWaitForPrimaryTermAndGenerationFailsForClosedShard() throws IOException { + Settings settings = indexSettings(IndexVersion.current(), 1, 1).build(); + IndexMetadata metadata = IndexMetadata.builder("test").putMapping(""" + { "properties": { "foo": { "type": "text"}}}""").settings(settings).primaryTerm(0, 1).build(); + IndexShard initializingShard = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, null); + + var future = new PlainActionFuture(); + initializingShard.waitForPrimaryTermAndGeneration(0L, 0L, future); + + assertFalse("waitForPrimaryTermAndGeneration should be waiting", future.isDone()); + closeShards(initializingShard); + // Should bail out earlier without calling the engine + assertNotNull(ExceptionsHelper.unwrap(expectThrows(Exception.class, future::get), IndexShardClosedException.class)); + } + public void testRecoverFromLocalShard() throws IOException { Settings settings = indexSettings(IndexVersion.current(), 1, 1).build(); IndexMetadata metadata = IndexMetadata.builder("source")