From ce3a778fa140c9931dc79686b1a5d616f267e32d Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 13 Mar 2025 07:46:08 +0100 Subject: [PATCH] Improve downsample performance by buffering docids and do bulk processing. (#124477) --- docs/changelog/124477.yaml | 5 + .../AbstractDownsampleFieldProducer.java | 3 +- .../downsample/DimensionFieldProducer.java | 38 ++-- .../downsample/DownsampleShardIndexer.java | 210 ++++++++++++------ .../xpack/downsample/LabelFieldProducer.java | 35 +-- .../xpack/downsample/MetricFieldProducer.java | 24 +- .../downsample/LabelFieldProducerTests.java | 5 +- 7 files changed, 207 insertions(+), 113 deletions(-) create mode 100644 docs/changelog/124477.yaml diff --git a/docs/changelog/124477.yaml b/docs/changelog/124477.yaml new file mode 100644 index 000000000000..d37a3f27b4dd --- /dev/null +++ b/docs/changelog/124477.yaml @@ -0,0 +1,5 @@ +pr: 124477 +summary: Improve downsample performance by buffering docids and do bulk processing +area: Downsampling +type: enhancement +issues: [] diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AbstractDownsampleFieldProducer.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AbstractDownsampleFieldProducer.java index 43c68e81e869..518c04413872 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AbstractDownsampleFieldProducer.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AbstractDownsampleFieldProducer.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.downsample; +import org.apache.lucene.internal.hppc.IntArrayList; import org.elasticsearch.index.fielddata.FormattedDocValues; import java.io.IOException; @@ -43,5 +44,5 @@ abstract class AbstractDownsampleFieldProducer implements DownsampleFieldSeriali return isEmpty; } - public abstract void collect(FormattedDocValues docValues, int docId) throws IOException; + public abstract void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException; } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldProducer.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldProducer.java index 43fee5b2505b..ba7f9eea8eee 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldProducer.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldProducer.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.downsample; +import org.apache.lucene.internal.hppc.IntArrayList; import org.elasticsearch.index.fielddata.FormattedDocValues; import org.elasticsearch.xcontent.XContentBuilder; @@ -55,13 +56,16 @@ public class DimensionFieldProducer extends AbstractDownsampleFieldProducer { * This is an expensive check, that slows down downsampling significantly. * Given that index is sorted by tsid as primary key, this shouldn't really happen. */ - boolean validate(FormattedDocValues docValues, int docId) throws IOException { - if (docValues.advanceExact(docId)) { - int docValueCount = docValues.docValueCount(); - for (int i = 0; i < docValueCount; i++) { - var value = docValues.nextValue(); - if (value.equals(this.value) == false) { - assert false : "Dimension value changed without tsid change [" + value + "] != [" + this.value + "]"; + boolean validate(FormattedDocValues docValues, IntArrayList buffer) throws IOException { + for (int i = 0; i < buffer.size(); i++) { + int docId = buffer.get(i); + if (docValues.advanceExact(docId)) { + int docValueCount = docValues.docValueCount(); + for (int j = 0; j < docValueCount; j++) { + var value = docValues.nextValue(); + if (value.equals(this.value) == false) { + assert false : "Dimension value changed without tsid change [" + value + "] != [" + this.value + "]"; + } } } } @@ -81,19 +85,25 @@ public class DimensionFieldProducer extends AbstractDownsampleFieldProducer { } @Override - public void collect(FormattedDocValues docValues, int docId) throws IOException { + public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException { if (dimension.isEmpty == false) { - assert dimension.validate(docValues, docId); + assert dimension.validate(docValues, docIdBuffer); return; } - if (docValues.advanceExact(docId) == false) { + for (int i = 0; i < docIdBuffer.size(); i++) { + int docId = docIdBuffer.get(i); + if (docValues.advanceExact(docId) == false) { + continue; + } + int docValueCount = docValues.docValueCount(); + for (int j = 0; j < docValueCount; j++) { + this.dimension.collectOnce(docValues.nextValue()); + } + // Only need to record one dimension value from one document, within in the same tsid-and-time-interval bucket values are the + // same. return; } - int docValueCount = docValues.docValueCount(); - for (int i = 0; i < docValueCount; i++) { - this.dimension.collectOnce(docValues.nextValue()); - } } @Override diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java index bb3230ef0652..629cce9a2f9a 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.internal.hppc.IntArrayList; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; @@ -80,6 +81,7 @@ import static org.elasticsearch.core.Strings.format; class DownsampleShardIndexer { private static final Logger logger = LogManager.getLogger(DownsampleShardIndexer.class); + private static final int DOCID_BUFFER_SIZE = 8096; public static final int DOWNSAMPLE_BULK_ACTIONS = 10000; public static final ByteSizeValue DOWNSAMPLE_BULK_SIZE = ByteSizeValue.of(1, ByteSizeUnit.MB); public static final ByteSizeValue DOWNSAMPLE_MAX_BYTES_IN_FLIGHT = ByteSizeValue.of(50, ByteSizeUnit.MB); @@ -338,6 +340,7 @@ class DownsampleShardIndexer { private class TimeSeriesBucketCollector extends BucketCollector { private final BulkProcessor2 bulkProcessor; private final DownsampleBucketBuilder downsampleBucketBuilder; + private final List leafBucketCollectors = new ArrayList<>(); private long docsProcessed; private long bucketsCreated; long lastTimestamp = Long.MAX_VALUE; @@ -365,83 +368,138 @@ class DownsampleShardIndexer { formattedDocValues[i] = fieldValueFetchers.get(i).getLeaf(ctx); } - return new LeafBucketCollector() { - @Override - public void collect(int docId, long owningBucketOrd) throws IOException { - task.addNumReceived(1); - final BytesRef tsidHash = aggCtx.getTsidHash(); - assert tsidHash != null : "Document without [" + TimeSeriesIdFieldMapper.NAME + "] field was found."; - final int tsidHashOrd = aggCtx.getTsidHashOrd(); - final long timestamp = timestampField.resolution().roundDownToMillis(aggCtx.getTimestamp()); + var leafBucketCollector = new LeafDownsampleCollector(aggCtx, docCountProvider, fieldProducers, formattedDocValues); + leafBucketCollectors.add(leafBucketCollector); + return leafBucketCollector; + } - boolean tsidChanged = tsidHashOrd != downsampleBucketBuilder.tsidOrd(); - if (tsidChanged || timestamp < lastHistoTimestamp) { - lastHistoTimestamp = Math.max( - rounding.round(timestamp), - searchExecutionContext.getIndexSettings().getTimestampBounds().startTime() - ); - } - task.setLastSourceTimestamp(timestamp); - task.setLastTargetTimestamp(lastHistoTimestamp); + void bulkCollection() throws IOException { + // The leaf bucket collectors with newer timestamp go first, to correctly capture the last value for counters and labels. + leafBucketCollectors.sort((o1, o2) -> -Long.compare(o1.firstTimeStampForBulkCollection, o2.firstTimeStampForBulkCollection)); + for (LeafDownsampleCollector leafBucketCollector : leafBucketCollectors) { + leafBucketCollector.leafBulkCollection(); + } + } - if (logger.isTraceEnabled()) { - logger.trace( - "Doc: [{}] - _tsid: [{}], @timestamp: [{}] -> downsample bucket ts: [{}]", - docId, - DocValueFormat.TIME_SERIES_ID.format(tsidHash), - timestampFormat.format(timestamp), - timestampFormat.format(lastHistoTimestamp) - ); - } + class LeafDownsampleCollector extends LeafBucketCollector { - /* - * Sanity checks to ensure that we receive documents in the correct order - * - _tsid must be sorted in ascending order - * - @timestamp must be sorted in descending order within the same _tsid - */ - BytesRef lastTsid = downsampleBucketBuilder.tsid(); - assert lastTsid == null || lastTsid.compareTo(tsidHash) <= 0 - : "_tsid is not sorted in ascending order: [" - + DocValueFormat.TIME_SERIES_ID.format(lastTsid) - + "] -> [" - + DocValueFormat.TIME_SERIES_ID.format(tsidHash) - + "]"; - assert tsidHash.equals(lastTsid) == false || lastTimestamp >= timestamp - : "@timestamp is not sorted in descending order: [" - + timestampFormat.format(lastTimestamp) - + "] -> [" - + timestampFormat.format(timestamp) - + "]"; - lastTimestamp = timestamp; + final AggregationExecutionContext aggCtx; + final DocCountProvider docCountProvider; + final FormattedDocValues[] formattedDocValues; + final AbstractDownsampleFieldProducer[] fieldProducers; - if (tsidChanged || downsampleBucketBuilder.timestamp() != lastHistoTimestamp) { - // Flush downsample doc if not empty - if (downsampleBucketBuilder.isEmpty() == false) { - XContentBuilder doc = downsampleBucketBuilder.buildDownsampleDocument(); - indexBucket(doc); - } + // Capture the first timestamp in order to determine which leaf collector's leafBulkCollection() is invoked first. + long firstTimeStampForBulkCollection; + final IntArrayList docIdBuffer = new IntArrayList(DOCID_BUFFER_SIZE); + final long timestampBoundStartTime = searchExecutionContext.getIndexSettings().getTimestampBounds().startTime(); - // Create new downsample bucket - if (tsidChanged) { - downsampleBucketBuilder.resetTsid(tsidHash, tsidHashOrd, lastHistoTimestamp); - } else { - downsampleBucketBuilder.resetTimestamp(lastHistoTimestamp); - } - bucketsCreated++; - } + LeafDownsampleCollector( + AggregationExecutionContext aggCtx, + DocCountProvider docCountProvider, + AbstractDownsampleFieldProducer[] fieldProducers, + FormattedDocValues[] formattedDocValues + ) { + this.aggCtx = aggCtx; + this.docCountProvider = docCountProvider; + this.fieldProducers = fieldProducers; + this.formattedDocValues = formattedDocValues; + } - final int docCount = docCountProvider.getDocCount(docId); - downsampleBucketBuilder.collectDocCount(docCount); - // Iterate over all field values and collect the doc_values for this docId - for (int i = 0; i < fieldProducers.length; i++) { - AbstractDownsampleFieldProducer fieldProducer = fieldProducers[i]; - FormattedDocValues docValues = formattedDocValues[i]; - fieldProducer.collect(docValues, docId); - } - docsProcessed++; - task.setDocsProcessed(docsProcessed); + @Override + public void collect(int docId, long owningBucketOrd) throws IOException { + task.addNumReceived(1); + final BytesRef tsidHash = aggCtx.getTsidHash(); + assert tsidHash != null : "Document without [" + TimeSeriesIdFieldMapper.NAME + "] field was found."; + final int tsidHashOrd = aggCtx.getTsidHashOrd(); + final long timestamp = timestampField.resolution().roundDownToMillis(aggCtx.getTimestamp()); + + boolean tsidChanged = tsidHashOrd != downsampleBucketBuilder.tsidOrd(); + if (tsidChanged || timestamp < lastHistoTimestamp) { + lastHistoTimestamp = Math.max(rounding.round(timestamp), timestampBoundStartTime); } - }; + task.setLastSourceTimestamp(timestamp); + task.setLastTargetTimestamp(lastHistoTimestamp); + + if (logger.isTraceEnabled()) { + logger.trace( + "Doc: [{}] - _tsid: [{}], @timestamp: [{}] -> downsample bucket ts: [{}]", + docId, + DocValueFormat.TIME_SERIES_ID.format(tsidHash), + timestampFormat.format(timestamp), + timestampFormat.format(lastHistoTimestamp) + ); + } + + /* + * Sanity checks to ensure that we receive documents in the correct order + * - _tsid must be sorted in ascending order + * - @timestamp must be sorted in descending order within the same _tsid + */ + BytesRef lastTsid = downsampleBucketBuilder.tsid(); + assert lastTsid == null || lastTsid.compareTo(tsidHash) <= 0 + : "_tsid is not sorted in ascending order: [" + + DocValueFormat.TIME_SERIES_ID.format(lastTsid) + + "] -> [" + + DocValueFormat.TIME_SERIES_ID.format(tsidHash) + + "]"; + assert tsidHash.equals(lastTsid) == false || lastTimestamp >= timestamp + : "@timestamp is not sorted in descending order: [" + + timestampFormat.format(lastTimestamp) + + "] -> [" + + timestampFormat.format(timestamp) + + "]"; + lastTimestamp = timestamp; + + if (tsidChanged || downsampleBucketBuilder.timestamp() != lastHistoTimestamp) { + bulkCollection(); + // Flush downsample doc if not empty + if (downsampleBucketBuilder.isEmpty() == false) { + XContentBuilder doc = downsampleBucketBuilder.buildDownsampleDocument(); + indexBucket(doc); + } + + // Create new downsample bucket + if (tsidChanged) { + downsampleBucketBuilder.resetTsid(tsidHash, tsidHashOrd, lastHistoTimestamp); + } else { + downsampleBucketBuilder.resetTimestamp(lastHistoTimestamp); + } + bucketsCreated++; + } + + if (docIdBuffer.isEmpty()) { + firstTimeStampForBulkCollection = aggCtx.getTimestamp(); + } + // buffer.add() always delegates to system.arraycopy() and checks buffer size for resizing purposes: + docIdBuffer.buffer[docIdBuffer.elementsCount++] = docId; + if (docIdBuffer.size() == DOCID_BUFFER_SIZE) { + bulkCollection(); + } + } + + void leafBulkCollection() throws IOException { + if (docIdBuffer.isEmpty()) { + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("buffered {} docids", docIdBuffer.size()); + } + + downsampleBucketBuilder.collectDocCount(docIdBuffer, docCountProvider); + // Iterate over all field values and collect the doc_values for this docId + for (int i = 0; i < fieldProducers.length; i++) { + AbstractDownsampleFieldProducer fieldProducer = fieldProducers[i]; + FormattedDocValues docValues = formattedDocValues[i]; + fieldProducer.collect(docValues, docIdBuffer); + } + + docsProcessed += docIdBuffer.size(); + task.setDocsProcessed(docsProcessed); + + // buffer.clean() also overwrites all slots with zeros + docIdBuffer.elementsCount = 0; + } } private void indexBucket(XContentBuilder doc) { @@ -464,6 +522,7 @@ class DownsampleShardIndexer { @Override public void postCollection() throws IOException { // Flush downsample doc if not empty + bulkCollection(); if (downsampleBucketBuilder.isEmpty() == false) { XContentBuilder doc = downsampleBucketBuilder.buildDownsampleDocument(); indexBucket(doc); @@ -545,8 +604,15 @@ class DownsampleShardIndexer { } } - public void collectDocCount(int docCount) { - this.docCount += docCount; + public void collectDocCount(IntArrayList buffer, DocCountProvider docCountProvider) throws IOException { + if (docCountProvider.alwaysOne()) { + this.docCount += buffer.size(); + } else { + for (int i = 0; i < buffer.size(); i++) { + int docId = buffer.get(i); + this.docCount += docCountProvider.getDocCount(docId); + } + } } public XContentBuilder buildDownsampleDocument() throws IOException { diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java index 8a90411bc1c5..7e2c5061d0b5 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.downsample; +import org.apache.lucene.internal.hppc.IntArrayList; import org.apache.lucene.util.BytesRef; import org.elasticsearch.index.fielddata.FormattedDocValues; import org.elasticsearch.index.fielddata.HistogramValue; @@ -114,25 +115,31 @@ abstract class LabelFieldProducer extends AbstractDownsampleFieldProducer { } @Override - public void collect(FormattedDocValues docValues, int docId) throws IOException { + public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException { if (isEmpty() == false) { return; } - if (docValues.advanceExact(docId) == false) { - return; - } - int docValuesCount = docValues.docValueCount(); - assert docValuesCount > 0; - isEmpty = false; - if (docValuesCount == 1) { - label.collect(docValues.nextValue()); - } else { - Object[] values = new Object[docValuesCount]; - for (int i = 0; i < docValuesCount; i++) { - values[i] = docValues.nextValue(); + for (int i = 0; i < docIdBuffer.size(); i++) { + int docId = docIdBuffer.get(i); + if (docValues.advanceExact(docId) == false) { + continue; } - label.collect(values); + int docValuesCount = docValues.docValueCount(); + assert docValuesCount > 0; + isEmpty = false; + if (docValuesCount == 1) { + label.collect(docValues.nextValue()); + } else { + var values = new Object[docValuesCount]; + for (int j = 0; j < docValuesCount; j++) { + values[j] = docValues.nextValue(); + } + label.collect(values); + } + // Only need to record one label value from one document, within in the same tsid-and-time-interval we only keep the first + // with downsampling. + return; } } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java index 1305ea8ab38d..0c6fc6a60e98 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.downsample; +import org.apache.lucene.internal.hppc.IntArrayList; import org.elasticsearch.index.fielddata.FormattedDocValues; import org.elasticsearch.search.aggregations.metrics.CompensatedSum; import org.elasticsearch.xcontent.XContentBuilder; @@ -53,14 +54,17 @@ abstract sealed class MetricFieldProducer extends AbstractDownsampleFieldProduce } @Override - public void collect(FormattedDocValues docValues, int docId) throws IOException { - if (docValues.advanceExact(docId) == false) { - return; - } - int docValuesCount = docValues.docValueCount(); - for (int i = 0; i < docValuesCount; i++) { - Number num = (Number) docValues.nextValue(); - collect(num); + public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException { + for (int i = 0; i < docIdBuffer.size(); i++) { + int docId = docIdBuffer.get(i); + if (docValues.advanceExact(docId) == false) { + continue; + } + int docValuesCount = docValues.docValueCount(); + for (int j = 0; j < docValuesCount; j++) { + Number num = (Number) docValues.nextValue(); + collect(num); + } } } @@ -236,13 +240,13 @@ abstract sealed class MetricFieldProducer extends AbstractDownsampleFieldProduce } @Override - public void collect(FormattedDocValues docValues, int docId) throws IOException { + public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException { // Counter producers only collect the last_value. Since documents are // collected by descending timestamp order, the producer should only // process the first value for every tsid. So, it will only collect the // field if no value has been set before. if (isEmpty()) { - super.collect(docValues, docId); + super.collect(docValues, docIdBuffer); } } diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/LabelFieldProducerTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/LabelFieldProducerTests.java index 844eb1b8e27d..b3b18cdfd17b 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/LabelFieldProducerTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/LabelFieldProducerTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.downsample; +import org.apache.lucene.internal.hppc.IntArrayList; import org.elasticsearch.common.Strings; import org.elasticsearch.index.fielddata.FormattedDocValues; import org.elasticsearch.search.aggregations.AggregatorTestCase; @@ -93,7 +94,7 @@ public class LabelFieldProducerTests extends AggregatorTestCase { return "aaaa"; } }; - producer.collect(docValues, 1); + producer.collect(docValues, IntArrayList.from(1)); // producer.collect("dummy", "aaaa"); assertFalse(producer.isEmpty()); assertEquals("aaaa", producer.label().get()); @@ -129,7 +130,7 @@ public class LabelFieldProducerTests extends AggregatorTestCase { } }; - producer.collect(docValues, 1); + producer.collect(docValues, IntArrayList.from(1)); assertFalse(producer.isEmpty()); assertEquals("a\0value_a", (((Object[]) producer.label().get())[0]).toString()); assertEquals("b\0value_b", (((Object[]) producer.label().get())[1]).toString());