From 22eb035a27ff01eacdf51b5ef92c7e030cbef124 Mon Sep 17 00:00:00 2001 From: Ignacio Vera Date: Thu, 19 Jun 2025 13:13:00 +0200 Subject: [PATCH] Clone IndexInput when creating MemorySegmentPostingsVisitor (#129690) --- .../vectors/DefaultIVFVectorsReader.java | 2 +- .../codec/vectors/IVFVectorsFormatTests.java | 46 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java index 2a2bef3dfcf1..5c86f602a654 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java @@ -122,7 +122,7 @@ public class DefaultIVFVectorsReader extends IVFVectorsReader implements OffHeap PostingVisitor getPostingVisitor(FieldInfo fieldInfo, IndexInput indexInput, float[] target, IntPredicate needsScoring) throws IOException { FieldEntry entry = fields.get(fieldInfo.number); - return new MemorySegmentPostingsVisitor(target, indexInput, entry, fieldInfo, needsScoring); + return new MemorySegmentPostingsVisitor(target, indexInput.clone(), entry, fieldInfo, needsScoring); } // TODO can we do this in off-heap blocks? diff --git a/server/src/test/java/org/elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java index 177a3d00c3dc..8499aa9a1732 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java @@ -34,6 +34,7 @@ import org.junit.Before; import java.io.IOException; import java.util.List; import java.util.Locale; +import java.util.concurrent.atomic.AtomicBoolean; import static java.lang.String.format; import static org.elasticsearch.index.codec.vectors.IVFVectorsFormat.MAX_VECTORS_PER_CLUSTER; @@ -128,4 +129,49 @@ public class IVFVectorsFormatTests extends BaseKnnVectorsFormatTestCase { } } } + + // this is a modified version of lucene's TestSearchWithThreads test case + public void testWithThreads() throws Exception { + final int numThreads = random().nextInt(2, 5); + final int numSearches = atLeast(100); + final int numDocs = atLeast(1000); + final int dimensions = random().nextInt(12, 500); + try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, newIndexWriterConfig())) { + for (int docCount = 0; docCount < numDocs; docCount++) { + final Document doc = new Document(); + doc.add(new KnnFloatVectorField("f", randomVector(dimensions), VectorSimilarityFunction.EUCLIDEAN)); + w.addDocument(doc); + } + w.forceMerge(1); + try (IndexReader reader = DirectoryReader.open(w)) { + final AtomicBoolean failed = new AtomicBoolean(); + Thread[] threads = new Thread[numThreads]; + for (int threadID = 0; threadID < numThreads; threadID++) { + threads[threadID] = new Thread(() -> { + try { + long totSearch = 0; + for (; totSearch < numSearches && failed.get() == false; totSearch++) { + float[] vector = randomVector(dimensions); + LeafReader leafReader = getOnlyLeafReader(reader); + leafReader.searchNearestVectors("f", vector, 10, leafReader.getLiveDocs(), Integer.MAX_VALUE); + } + assertTrue(totSearch > 0); + } catch (Exception exc) { + failed.set(true); + throw new RuntimeException(exc); + } + }); + threads[threadID].setDaemon(true); + } + + for (Thread t : threads) { + t.start(); + } + + for (Thread t : threads) { + t.join(); + } + } + } + } }