Merge revision ad220c1abb into multi-project

This commit is contained in:
Tim Vernum 2025-02-20 11:37:54 +11:00
commit 00159c5030
10 changed files with 70 additions and 65 deletions

View file

@ -61,9 +61,6 @@ tests:
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=transform/transforms_start_stop/Verify start transform reuses destination index}
issue: https://github.com/elastic/elasticsearch/issues/115808
- class: org.elasticsearch.search.StressSearchServiceReaperIT
method: testStressReaper
issue: https://github.com/elastic/elasticsearch/issues/115816
- class: org.elasticsearch.xpack.application.connector.ConnectorIndexServiceTests
issue: https://github.com/elastic/elasticsearch/issues/116087
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
@ -329,6 +326,8 @@ tests:
- class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackIT
method: testEnrichExplosionManyMatches
issue: https://github.com/elastic/elasticsearch/issues/122913
- class: org.elasticsearch.xpack.search.AsyncSearchSecurityIT
issue: https://github.com/elastic/elasticsearch/issues/122940
# Examples:
#

View file

@ -2925,7 +2925,7 @@ public class InternalEngine extends Engine {
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> extraCommitUserData = getCommitExtraUserData();
final Map<String, String> extraCommitUserData = getCommitExtraUserData(localCheckpoint);
final Map<String, String> commitData = Maps.newMapWithExpectedSize(8 + extraCommitUserData.size());
commitData.putAll(extraCommitUserData);
commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());
@ -2973,8 +2973,10 @@ public class InternalEngine extends Engine {
/**
* Allows InternalEngine extenders to return custom key-value pairs which will be included in the Lucene commit user-data. Custom user
* data keys can be overwritten by if their keys conflict keys used by InternalEngine.
*
* @param localCheckpoint the local checkpoint of the commit
*/
protected Map<String, String> getCommitExtraUserData() {
protected Map<String, String> getCommitExtraUserData(final long localCheckpoint) {
return Collections.emptyMap();
}

View file

@ -4491,14 +4491,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
/**
* Registers a listener for an event when the shard advances to the provided primary term and segment generation
* Registers a listener for an event when the shard advances to the provided primary term and segment generation.
* Completes the listener with a {@link IndexShardClosedException} if the shard is closed.
*/
public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGeneration, ActionListener<Long> listener) {
waitForEngineOrClosedShard(
listener.delegateFailureAndWrap(
(l, ignored) -> getEngine().addPrimaryTermAndGenerationListener(primaryTerm, segmentGeneration, l)
)
);
waitForEngineOrClosedShard(listener.delegateFailureAndWrap((l, ignored) -> {
if (state == IndexShardState.CLOSED) {
l.onFailure(new IndexShardClosedException(shardId));
} else {
getEngine().addPrimaryTermAndGenerationListener(primaryTerm, segmentGeneration, l);
}
}));
}
/**

View file

@ -58,7 +58,7 @@ class AvgAggregator extends SumAggregator {
public void collect(int doc, long bucket) throws IOException {
if (values.advanceExact(doc)) {
maybeGrow(bucket);
computeSum(bucket, values, sums, compensations);
computeSum(bucket, values.doubleValue(), sums, compensations);
counts.increment(bucket, 1L);
}
}

View file

@ -105,7 +105,6 @@ class ExtendedStatsAggregator extends NumericMetricsAggregator.MultiDoubleValue
@Override
protected LeafBucketCollector getLeafCollector(NumericDoubleValues values, final LeafBucketCollector sub) {
final CompensatedSum compensatedSum = new CompensatedSum(0, 0);
return new LeafBucketCollectorBase(sub, values) {
@Override
@ -114,20 +113,9 @@ class ExtendedStatsAggregator extends NumericMetricsAggregator.MultiDoubleValue
maybeGrow(bucket);
final double value = values.doubleValue();
counts.increment(bucket, 1L);
// Compute the sum and sum of squires for double values with Kahan summation algorithm
// which is more accurate than naive summation.
compensatedSum.reset(sums.get(bucket), compensations.get(bucket));
compensatedSum.add(value);
sums.set(bucket, compensatedSum.value());
compensations.set(bucket, compensatedSum.delta());
compensatedSum.reset(sumOfSqrs.get(bucket), compensationOfSqrs.get(bucket));
compensatedSum.add(value * value);
sumOfSqrs.set(bucket, compensatedSum.value());
compensationOfSqrs.set(bucket, compensatedSum.delta());
mins.set(bucket, Math.min(mins.get(bucket), value));
maxes.set(bucket, Math.max(maxes.get(bucket), value));
SumAggregator.computeSum(bucket, value, sums, compensations);
SumAggregator.computeSum(bucket, value * value, sumOfSqrs, compensationOfSqrs);
StatsAggregator.updateMinsAndMaxes(bucket, value, mins, maxes);
}
}
@ -138,13 +126,14 @@ class ExtendedStatsAggregator extends NumericMetricsAggregator.MultiDoubleValue
if (bucket >= counts.size()) {
final long from = counts.size();
final long overSize = BigArrays.overSize(bucket + 1);
counts = bigArrays().resize(counts, overSize);
sums = bigArrays().resize(sums, overSize);
compensations = bigArrays().resize(compensations, overSize);
mins = bigArrays().resize(mins, overSize);
maxes = bigArrays().resize(maxes, overSize);
sumOfSqrs = bigArrays().resize(sumOfSqrs, overSize);
compensationOfSqrs = bigArrays().resize(compensationOfSqrs, overSize);
var bigArrays = bigArrays();
counts = bigArrays.resize(counts, overSize);
sums = bigArrays.resize(sums, overSize);
compensations = bigArrays.resize(compensations, overSize);
mins = bigArrays.resize(mins, overSize);
maxes = bigArrays.resize(maxes, overSize);
sumOfSqrs = bigArrays.resize(sumOfSqrs, overSize);
compensationOfSqrs = bigArrays.resize(compensationOfSqrs, overSize);
mins.fill(from, overSize, Double.POSITIVE_INFINITY);
maxes.fill(from, overSize, Double.NEGATIVE_INFINITY);
}

View file

@ -93,7 +93,6 @@ final class GeoCentroidAggregator extends MetricsAggregator {
}
private LeafBucketCollector getLeafCollector(GeoPointValues values, LeafBucketCollector sub) {
final CompensatedSum compensatedSum = new CompensatedSum(0, 0);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
@ -104,16 +103,8 @@ final class GeoCentroidAggregator extends MetricsAggregator {
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
final GeoPoint value = values.pointValue();
// latitude
compensatedSum.reset(latSum.get(bucket), latCompensations.get(bucket));
compensatedSum.add(value.getLat());
latSum.set(bucket, compensatedSum.value());
latCompensations.set(bucket, compensatedSum.delta());
// longitude
compensatedSum.reset(lonSum.get(bucket), lonCompensations.get(bucket));
compensatedSum.add(value.getLon());
lonSum.set(bucket, compensatedSum.value());
lonCompensations.set(bucket, compensatedSum.delta());
SumAggregator.computeSum(bucket, value.getLat(), latSum, latCompensations);
SumAggregator.computeSum(bucket, value.getLon(), lonSum, lonCompensations);
}
}
};

View file

@ -81,36 +81,43 @@ class StatsAggregator extends NumericMetricsAggregator.MultiDoubleValue {
@Override
public LeafBucketCollector getLeafCollector(NumericDoubleValues values, LeafBucketCollector sub) {
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
if (values.advanceExact(doc)) {
maybeGrow(bucket);
counts.increment(bucket, 1L);
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
kahanSummation.reset(sums.get(bucket), compensations.get(bucket));
double value = values.doubleValue();
kahanSummation.add(value);
sums.set(bucket, kahanSummation.value());
compensations.set(bucket, kahanSummation.delta());
mins.set(bucket, Math.min(mins.get(bucket), value));
maxes.set(bucket, Math.max(maxes.get(bucket), value));
SumAggregator.computeSum(bucket, value, sums, compensations);
updateMinsAndMaxes(bucket, value, mins, maxes);
}
}
};
}
static void updateMinsAndMaxes(long bucket, double value, DoubleArray mins, DoubleArray maxes) {
double min = mins.get(bucket);
double updated = Math.min(value, min);
if (updated != min) {
mins.set(bucket, updated);
}
double max = maxes.get(bucket);
updated = Math.max(value, max);
if (updated != max) {
maxes.set(bucket, updated);
}
}
private void maybeGrow(long bucket) {
if (bucket >= counts.size()) {
final long from = counts.size();
final long overSize = BigArrays.overSize(bucket + 1);
counts = bigArrays().resize(counts, overSize);
sums = bigArrays().resize(sums, overSize);
compensations = bigArrays().resize(compensations, overSize);
mins = bigArrays().resize(mins, overSize);
maxes = bigArrays().resize(maxes, overSize);
var bigArrays = bigArrays();
counts = bigArrays.resize(counts, overSize);
sums = bigArrays.resize(sums, overSize);
compensations = bigArrays.resize(compensations, overSize);
mins = bigArrays.resize(mins, overSize);
maxes = bigArrays.resize(maxes, overSize);
mins.fill(from, overSize, Double.POSITIVE_INFINITY);
maxes.fill(from, overSize, Double.NEGATIVE_INFINITY);
}

View file

@ -97,16 +97,15 @@ public class SumAggregator extends NumericMetricsAggregator.SingleDoubleValue {
public void collect(int doc, long bucket) throws IOException {
if (values.advanceExact(doc)) {
maybeGrow(bucket);
computeSum(bucket, values, sums, compensations);
computeSum(bucket, values.doubleValue(), sums, compensations);
}
}
};
}
static void computeSum(long bucket, NumericDoubleValues values, DoubleArray sums, DoubleArray compensations) throws IOException {
static void computeSum(long bucket, double added, DoubleArray sums, DoubleArray compensations) {
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double added = values.doubleValue();
double value = addIfNonOrInf(added, sums.get(bucket));
if (Double.isFinite(value)) {
double delta = compensations.get(bucket);
@ -122,8 +121,7 @@ public class SumAggregator extends NumericMetricsAggregator.SingleDoubleValue {
protected final void maybeGrow(long bucket) {
if (bucket >= sums.size()) {
var bigArrays = bigArrays();
doGrow(bucket, bigArrays);
doGrow(bucket, bigArrays());
}
}

View file

@ -7327,7 +7327,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.close();
engine = new InternalEngine(engine.config()) {
@Override
protected Map<String, String> getCommitExtraUserData() {
protected Map<String, String> getCommitExtraUserData(final long localCheckpoint) {
return Map.of("userkey", "userdata", ES_VERSION, IndexVersions.ZERO.toString());
}
};

View file

@ -27,6 +27,7 @@ import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Constants;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
@ -3334,6 +3335,21 @@ public class IndexShardTests extends IndexShardTestCase {
assertThat("listener should have been called", called.get(), equalTo(true));
}
public void testWaitForPrimaryTermAndGenerationFailsForClosedShard() throws IOException {
Settings settings = indexSettings(IndexVersion.current(), 1, 1).build();
IndexMetadata metadata = IndexMetadata.builder("test").putMapping("""
{ "properties": { "foo": { "type": "text"}}}""").settings(settings).primaryTerm(0, 1).build();
IndexShard initializingShard = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, null);
var future = new PlainActionFuture<Long>();
initializingShard.waitForPrimaryTermAndGeneration(0L, 0L, future);
assertFalse("waitForPrimaryTermAndGeneration should be waiting", future.isDone());
closeShards(initializingShard);
// Should bail out earlier without calling the engine
assertNotNull(ExceptionsHelper.unwrap(expectThrows(Exception.class, future::get), IndexShardClosedException.class));
}
public void testRecoverFromLocalShard() throws IOException {
Settings settings = indexSettings(IndexVersion.current(), 1, 1).build();
IndexMetadata metadata = IndexMetadata.builder("source")