Ingest all documents in a single call to Engine.index

This amortizes the locks in org.apache.lucene.index.DocumentsWriter
This commit is contained in:
Felix Barnsteiner 2025-04-16 13:24:51 +02:00
parent a50efd9125
commit 8fd1afc245
No known key found for this signature in database
GPG key ID: B13943FF9047831D

View file

@ -68,6 +68,7 @@ import java.util.Map;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class MetricsTransportAction extends HandledTransportAction<
MetricsTransportAction.MetricsRequest,
@ -136,11 +137,9 @@ public class MetricsTransportAction extends HandledTransportAction<
// to ThreadPool.Names.WRITE but we perform all writes on the same thread.
// We expect concurrency to come from a client submitting concurrent requests.
List<String> errors = new ArrayList<>();
for (var luceneDocument : dataPointDocuments) {
Engine.IndexResult indexed = engine.index(getIndexRequest(luceneDocument, null));
if (indexed.getFailure() != null) {
errors.add(indexed.getFailure().getMessage());
}
Engine.IndexResult indexed = engine.index(getIndexRequest(dataPointDocuments, null));
if (indexed.getFailure() != null) {
errors.add(indexed.getFailure().getMessage());
}
if (request.normalized) {
@ -153,7 +152,7 @@ public class MetricsTransportAction extends HandledTransportAction<
tsidCache.clear();
}
tsidCache.add(ts.tsid());
Engine.IndexResult result = engine.index(getIndexRequest(ts.toLuceneDocument(), ts.tsid()));
Engine.IndexResult result = engine.index(getIndexRequest(Collections.singletonList(ts.toLuceneDocument()), ts.tsid()));
if (result.getFailure() != null) {
errors.add(result.getFailure().getMessage());
} else {
@ -186,7 +185,7 @@ public class MetricsTransportAction extends HandledTransportAction<
}
}
private static Engine.Index getIndexRequest(LuceneDocument luceneDocument, @Nullable String id) {
private static Engine.Index getIndexRequest(List<LuceneDocument> luceneDocuments, @Nullable String id) {
if (id == null) {
id = UUIDs.base64TimeBasedKOrderedUUIDWithHash(OptionalInt.empty());
}
@ -197,7 +196,7 @@ public class MetricsTransportAction extends HandledTransportAction<
SeqNoFieldMapper.SequenceIDFields.emptySeqID(),
id,
null,
Collections.singletonList(luceneDocument),
luceneDocuments,
null,
XContentType.JSON,
null,
@ -359,9 +358,17 @@ public class MetricsTransportAction extends HandledTransportAction<
SortedNumericDocValuesField.indexedField(fieldName, NumericUtils.doubleToSortableLong(value.getDoubleValue()))
);
case ARRAY_VALUE -> {
for (AnyValue arrayValue : value.getArrayValue().getValuesList()) {
addField(fields, fieldName, arrayValue);
}
String csv = value.getArrayValue().getValuesList().stream().map(v -> {
if (v.hasStringValue()) {
return v.getStringValue();
} else {
return v.toString();
}
}).collect(Collectors.joining(", "));
fields.add(SortedSetDocValuesField.indexedField(fieldName, new BytesRef(csv)));
// for (AnyValue arrayValue : value.getArrayValue().getValuesList()) {
// addField(fields, fieldName, arrayValue);
// }
}
default -> throw new IllegalArgumentException("Unsupported attribute type: " + value.getValueCase());
}