From cc29af3b14c10b17a94f1db35e41f36d7162c36f Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 19 Feb 2025 11:50:09 +0100 Subject: [PATCH 1/5] Speedup a few more aggregations using Kahan summation (#122920) Follow-up to #120241 and linked issues, using the logic that doesn't require any mutable object indirection led to significant speedups for sum+avg and should be of similar if not more help in these as well. --- .../aggregations/metrics/AvgAggregator.java | 2 +- .../metrics/ExtendedStatsAggregator.java | 33 ++++++----------- .../metrics/GeoCentroidAggregator.java | 13 ++----- .../aggregations/metrics/StatsAggregator.java | 35 +++++++++++-------- .../aggregations/metrics/SumAggregator.java | 8 ++--- 5 files changed, 38 insertions(+), 53 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java index e5a9886b4f45..f593f0d3923c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java @@ -58,7 +58,7 @@ class AvgAggregator extends SumAggregator { public void collect(int doc, long bucket) throws IOException { if (values.advanceExact(doc)) { maybeGrow(bucket); - computeSum(bucket, values, sums, compensations); + computeSum(bucket, values.doubleValue(), sums, compensations); counts.increment(bucket, 1L); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java index 073ab8e0a3d5..5328dad7eedd 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java @@ -105,7 +105,6 @@ class ExtendedStatsAggregator extends NumericMetricsAggregator.MultiDoubleValue @Override protected LeafBucketCollector getLeafCollector(NumericDoubleValues values, final LeafBucketCollector sub) { - final CompensatedSum compensatedSum = new CompensatedSum(0, 0); return new LeafBucketCollectorBase(sub, values) { @Override @@ -114,20 +113,9 @@ class ExtendedStatsAggregator extends NumericMetricsAggregator.MultiDoubleValue maybeGrow(bucket); final double value = values.doubleValue(); counts.increment(bucket, 1L); - // Compute the sum and sum of squires for double values with Kahan summation algorithm - // which is more accurate than naive summation. - compensatedSum.reset(sums.get(bucket), compensations.get(bucket)); - compensatedSum.add(value); - sums.set(bucket, compensatedSum.value()); - compensations.set(bucket, compensatedSum.delta()); - - compensatedSum.reset(sumOfSqrs.get(bucket), compensationOfSqrs.get(bucket)); - compensatedSum.add(value * value); - sumOfSqrs.set(bucket, compensatedSum.value()); - compensationOfSqrs.set(bucket, compensatedSum.delta()); - - mins.set(bucket, Math.min(mins.get(bucket), value)); - maxes.set(bucket, Math.max(maxes.get(bucket), value)); + SumAggregator.computeSum(bucket, value, sums, compensations); + SumAggregator.computeSum(bucket, value * value, sumOfSqrs, compensationOfSqrs); + StatsAggregator.updateMinsAndMaxes(bucket, value, mins, maxes); } } @@ -138,13 +126,14 @@ class ExtendedStatsAggregator extends NumericMetricsAggregator.MultiDoubleValue if (bucket >= counts.size()) { final long from = counts.size(); final long overSize = BigArrays.overSize(bucket + 1); - counts = bigArrays().resize(counts, overSize); - sums = bigArrays().resize(sums, overSize); - compensations = bigArrays().resize(compensations, overSize); - mins = bigArrays().resize(mins, overSize); - maxes = bigArrays().resize(maxes, overSize); - sumOfSqrs = bigArrays().resize(sumOfSqrs, overSize); - compensationOfSqrs = bigArrays().resize(compensationOfSqrs, overSize); + var bigArrays = bigArrays(); + counts = bigArrays.resize(counts, overSize); + sums = bigArrays.resize(sums, overSize); + compensations = bigArrays.resize(compensations, overSize); + mins = bigArrays.resize(mins, overSize); + maxes = bigArrays.resize(maxes, overSize); + sumOfSqrs = bigArrays.resize(sumOfSqrs, overSize); + compensationOfSqrs = bigArrays.resize(compensationOfSqrs, overSize); mins.fill(from, overSize, Double.POSITIVE_INFINITY); maxes.fill(from, overSize, Double.NEGATIVE_INFINITY); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GeoCentroidAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GeoCentroidAggregator.java index 874d3bd221e1..9eaa48a43298 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GeoCentroidAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GeoCentroidAggregator.java @@ -93,7 +93,6 @@ final class GeoCentroidAggregator extends MetricsAggregator { } private LeafBucketCollector getLeafCollector(GeoPointValues values, LeafBucketCollector sub) { - final CompensatedSum compensatedSum = new CompensatedSum(0, 0); return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long bucket) throws IOException { @@ -104,16 +103,8 @@ final class GeoCentroidAggregator extends MetricsAggregator { // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. final GeoPoint value = values.pointValue(); - // latitude - compensatedSum.reset(latSum.get(bucket), latCompensations.get(bucket)); - compensatedSum.add(value.getLat()); - latSum.set(bucket, compensatedSum.value()); - latCompensations.set(bucket, compensatedSum.delta()); - // longitude - compensatedSum.reset(lonSum.get(bucket), lonCompensations.get(bucket)); - compensatedSum.add(value.getLon()); - lonSum.set(bucket, compensatedSum.value()); - lonCompensations.set(bucket, compensatedSum.delta()); + SumAggregator.computeSum(bucket, value.getLat(), latSum, latCompensations); + SumAggregator.computeSum(bucket, value.getLon(), lonSum, lonCompensations); } } }; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java index 850fe4f7177d..a01d9cf61f8b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java @@ -81,36 +81,43 @@ class StatsAggregator extends NumericMetricsAggregator.MultiDoubleValue { @Override public LeafBucketCollector getLeafCollector(NumericDoubleValues values, LeafBucketCollector sub) { - final CompensatedSum kahanSummation = new CompensatedSum(0, 0); return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long bucket) throws IOException { if (values.advanceExact(doc)) { maybeGrow(bucket); counts.increment(bucket, 1L); - // Compute the sum of double values with Kahan summation algorithm which is more - // accurate than naive summation. - kahanSummation.reset(sums.get(bucket), compensations.get(bucket)); double value = values.doubleValue(); - kahanSummation.add(value); - sums.set(bucket, kahanSummation.value()); - compensations.set(bucket, kahanSummation.delta()); - mins.set(bucket, Math.min(mins.get(bucket), value)); - maxes.set(bucket, Math.max(maxes.get(bucket), value)); + SumAggregator.computeSum(bucket, value, sums, compensations); + updateMinsAndMaxes(bucket, value, mins, maxes); } } }; } + static void updateMinsAndMaxes(long bucket, double value, DoubleArray mins, DoubleArray maxes) { + double min = mins.get(bucket); + double updated = Math.min(value, min); + if (updated != min) { + mins.set(bucket, updated); + } + double max = maxes.get(bucket); + updated = Math.max(value, max); + if (updated != max) { + maxes.set(bucket, updated); + } + } + private void maybeGrow(long bucket) { if (bucket >= counts.size()) { final long from = counts.size(); final long overSize = BigArrays.overSize(bucket + 1); - counts = bigArrays().resize(counts, overSize); - sums = bigArrays().resize(sums, overSize); - compensations = bigArrays().resize(compensations, overSize); - mins = bigArrays().resize(mins, overSize); - maxes = bigArrays().resize(maxes, overSize); + var bigArrays = bigArrays(); + counts = bigArrays.resize(counts, overSize); + sums = bigArrays.resize(sums, overSize); + compensations = bigArrays.resize(compensations, overSize); + mins = bigArrays.resize(mins, overSize); + maxes = bigArrays.resize(maxes, overSize); mins.fill(from, overSize, Double.POSITIVE_INFINITY); maxes.fill(from, overSize, Double.NEGATIVE_INFINITY); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java index 237ba6dfe406..2d9b51b8f2d4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java @@ -97,16 +97,15 @@ public class SumAggregator extends NumericMetricsAggregator.SingleDoubleValue { public void collect(int doc, long bucket) throws IOException { if (values.advanceExact(doc)) { maybeGrow(bucket); - computeSum(bucket, values, sums, compensations); + computeSum(bucket, values.doubleValue(), sums, compensations); } } }; } - static void computeSum(long bucket, NumericDoubleValues values, DoubleArray sums, DoubleArray compensations) throws IOException { + static void computeSum(long bucket, double added, DoubleArray sums, DoubleArray compensations) { // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. - double added = values.doubleValue(); double value = addIfNonOrInf(added, sums.get(bucket)); if (Double.isFinite(value)) { double delta = compensations.get(bucket); @@ -122,8 +121,7 @@ public class SumAggregator extends NumericMetricsAggregator.SingleDoubleValue { protected final void maybeGrow(long bucket) { if (bucket >= sums.size()) { - var bigArrays = bigArrays(); - doGrow(bucket, bigArrays); + doGrow(bucket, bigArrays()); } } From f220abaf0858dca1c732598f923e589599cd5ad6 Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Wed, 19 Feb 2025 13:55:30 +0200 Subject: [PATCH 2/5] StressSearchServiceReaperIT_unmute_test (#122793) unmute test --- muted-tests.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index db632eb78f8e..500c89d814b2 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -61,9 +61,6 @@ tests: - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_start_stop/Verify start transform reuses destination index} issue: https://github.com/elastic/elasticsearch/issues/115808 -- class: org.elasticsearch.search.StressSearchServiceReaperIT - method: testStressReaper - issue: https://github.com/elastic/elasticsearch/issues/115816 - class: org.elasticsearch.xpack.application.connector.ConnectorIndexServiceTests issue: https://github.com/elastic/elasticsearch/issues/116087 - class: org.elasticsearch.xpack.test.rest.XPackRestIT From 8e34393227dbe83a2879c58469e02f911db12bc1 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Wed, 19 Feb 2025 13:46:25 +0100 Subject: [PATCH 3/5] Fail primary term and generation listeners on a closed shard (#122713) If a shard has been closed, we should quickly bail out and fail all waiting primary term and generation listeners. Otherwise, the engine implementation may try to successfully to complete the provided listeners and perform operations on an already closed shard and cause some unexpected errors. --- .../elasticsearch/index/shard/IndexShard.java | 15 +++++++++------ .../index/shard/IndexShardTests.java | 16 ++++++++++++++++ 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d56d7471d498..7fecc53826ff 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -4491,14 +4491,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } /** - * Registers a listener for an event when the shard advances to the provided primary term and segment generation + * Registers a listener for an event when the shard advances to the provided primary term and segment generation. + * Completes the listener with a {@link IndexShardClosedException} if the shard is closed. */ public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGeneration, ActionListener listener) { - waitForEngineOrClosedShard( - listener.delegateFailureAndWrap( - (l, ignored) -> getEngine().addPrimaryTermAndGenerationListener(primaryTerm, segmentGeneration, l) - ) - ); + waitForEngineOrClosedShard(listener.delegateFailureAndWrap((l, ignored) -> { + if (state == IndexShardState.CLOSED) { + l.onFailure(new IndexShardClosedException(shardId)); + } else { + getEngine().addPrimaryTermAndGenerationListener(primaryTerm, segmentGeneration, l); + } + })); } /** diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index c07b396626c4..975565b73a0d 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -27,6 +27,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Constants; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; @@ -3334,6 +3335,21 @@ public class IndexShardTests extends IndexShardTestCase { assertThat("listener should have been called", called.get(), equalTo(true)); } + public void testWaitForPrimaryTermAndGenerationFailsForClosedShard() throws IOException { + Settings settings = indexSettings(IndexVersion.current(), 1, 1).build(); + IndexMetadata metadata = IndexMetadata.builder("test").putMapping(""" + { "properties": { "foo": { "type": "text"}}}""").settings(settings).primaryTerm(0, 1).build(); + IndexShard initializingShard = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, null); + + var future = new PlainActionFuture(); + initializingShard.waitForPrimaryTermAndGeneration(0L, 0L, future); + + assertFalse("waitForPrimaryTermAndGeneration should be waiting", future.isDone()); + closeShards(initializingShard); + // Should bail out earlier without calling the engine + assertNotNull(ExceptionsHelper.unwrap(expectThrows(Exception.class, future::get), IndexShardClosedException.class)); + } + public void testRecoverFromLocalShard() throws IOException { Settings settings = indexSettings(IndexVersion.current(), 1, 1).build(); IndexMetadata metadata = IndexMetadata.builder("source") From 04422cdc7fab2137fa8685f8aa221c09ae69e859 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine <58790826+elasticsearchmachine@users.noreply.github.com> Date: Thu, 20 Feb 2025 00:37:05 +1100 Subject: [PATCH 4/5] Mute org.elasticsearch.xpack.search.AsyncSearchSecurityIT org.elasticsearch.xpack.search.AsyncSearchSecurityIT #122940 --- muted-tests.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/muted-tests.yml b/muted-tests.yml index 500c89d814b2..8d451e2726b6 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -326,6 +326,8 @@ tests: - class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackIT method: testEnrichExplosionManyMatches issue: https://github.com/elastic/elasticsearch/issues/122913 +- class: org.elasticsearch.xpack.search.AsyncSearchSecurityIT + issue: https://github.com/elastic/elasticsearch/issues/122940 # Examples: # From ad220c1abb65854b072a3dd9bf59bf43e036190c Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Wed, 19 Feb 2025 16:12:50 +0200 Subject: [PATCH 5/5] Pass checkpoint to commit extra user data function (#122930) Relates ES-10852 --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 6 ++++-- .../org/elasticsearch/index/engine/InternalEngineTests.java | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f463dce2ec70..94e8989bc5a1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2925,7 +2925,7 @@ public class InternalEngine extends Engine { * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time * of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene). */ - final Map extraCommitUserData = getCommitExtraUserData(); + final Map extraCommitUserData = getCommitExtraUserData(localCheckpoint); final Map commitData = Maps.newMapWithExpectedSize(8 + extraCommitUserData.size()); commitData.putAll(extraCommitUserData); commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID()); @@ -2973,8 +2973,10 @@ public class InternalEngine extends Engine { /** * Allows InternalEngine extenders to return custom key-value pairs which will be included in the Lucene commit user-data. Custom user * data keys can be overwritten by if their keys conflict keys used by InternalEngine. + * + * @param localCheckpoint the local checkpoint of the commit */ - protected Map getCommitExtraUserData() { + protected Map getCommitExtraUserData(final long localCheckpoint) { return Collections.emptyMap(); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 26de6a789778..e154a3d62b4e 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -7327,7 +7327,7 @@ public class InternalEngineTests extends EngineTestCase { engine.close(); engine = new InternalEngine(engine.config()) { @Override - protected Map getCommitExtraUserData() { + protected Map getCommitExtraUserData(final long localCheckpoint) { return Map.of("userkey", "userdata", ES_VERSION, IndexVersions.ZERO.toString()); } };