Clone IndexInput when creating MemorySegmentPostingsVisitor (#129690)

This commit is contained in:
Ignacio Vera 2025-06-19 13:13:00 +02:00 committed by GitHub
parent 0e538bdd61
commit 22eb035a27
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 47 additions and 1 deletions

View file

@ -122,7 +122,7 @@ public class DefaultIVFVectorsReader extends IVFVectorsReader implements OffHeap
PostingVisitor getPostingVisitor(FieldInfo fieldInfo, IndexInput indexInput, float[] target, IntPredicate needsScoring) PostingVisitor getPostingVisitor(FieldInfo fieldInfo, IndexInput indexInput, float[] target, IntPredicate needsScoring)
throws IOException { throws IOException {
FieldEntry entry = fields.get(fieldInfo.number); FieldEntry entry = fields.get(fieldInfo.number);
return new MemorySegmentPostingsVisitor(target, indexInput, entry, fieldInfo, needsScoring); return new MemorySegmentPostingsVisitor(target, indexInput.clone(), entry, fieldInfo, needsScoring);
} }
// TODO can we do this in off-heap blocks? // TODO can we do this in off-heap blocks?

View file

@ -34,6 +34,7 @@ import org.junit.Before;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.lang.String.format; import static java.lang.String.format;
import static org.elasticsearch.index.codec.vectors.IVFVectorsFormat.MAX_VECTORS_PER_CLUSTER; import static org.elasticsearch.index.codec.vectors.IVFVectorsFormat.MAX_VECTORS_PER_CLUSTER;
@ -128,4 +129,49 @@ public class IVFVectorsFormatTests extends BaseKnnVectorsFormatTestCase {
} }
} }
} }
// this is a modified version of lucene's TestSearchWithThreads test case
public void testWithThreads() throws Exception {
final int numThreads = random().nextInt(2, 5);
final int numSearches = atLeast(100);
final int numDocs = atLeast(1000);
final int dimensions = random().nextInt(12, 500);
try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, newIndexWriterConfig())) {
for (int docCount = 0; docCount < numDocs; docCount++) {
final Document doc = new Document();
doc.add(new KnnFloatVectorField("f", randomVector(dimensions), VectorSimilarityFunction.EUCLIDEAN));
w.addDocument(doc);
}
w.forceMerge(1);
try (IndexReader reader = DirectoryReader.open(w)) {
final AtomicBoolean failed = new AtomicBoolean();
Thread[] threads = new Thread[numThreads];
for (int threadID = 0; threadID < numThreads; threadID++) {
threads[threadID] = new Thread(() -> {
try {
long totSearch = 0;
for (; totSearch < numSearches && failed.get() == false; totSearch++) {
float[] vector = randomVector(dimensions);
LeafReader leafReader = getOnlyLeafReader(reader);
leafReader.searchNearestVectors("f", vector, 10, leafReader.getLiveDocs(), Integer.MAX_VALUE);
}
assertTrue(totSearch > 0);
} catch (Exception exc) {
failed.set(true);
throw new RuntimeException(exc);
}
});
threads[threadID].setDaemon(true);
}
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
}
}
}
} }