Copy Lucene99FlatVectorsReader allowing direct IO to be specified directly (#125921)

We want to use DirectIO to access raw vector data randomly so it doesn't load everything into the page cache
This commit is contained in:
Simon Cooper 2025-04-24 11:00:30 +01:00 committed by GitHub
parent f599fe32de
commit c5ada66410
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 513 additions and 5 deletions

View file

@ -279,7 +279,10 @@ public class EntitlementInitialization {
new FilesEntitlement(List.of(FileData.ofBaseDirPath(CONFIG, READ), FileData.ofBaseDirPath(DATA, READ_WRITE)))
)
),
new Scope("org.apache.lucene.misc", List.of(new FilesEntitlement(List.of(FileData.ofBaseDirPath(DATA, READ_WRITE))))),
new Scope(
"org.apache.lucene.misc",
List.of(new FilesEntitlement(List.of(FileData.ofBaseDirPath(DATA, READ_WRITE))), new ReadStoreAttributesEntitlement())
),
new Scope(
"org.apache.logging.log4j.core",
List.of(new ManageThreadsEntitlement(), new FilesEntitlement(List.of(FileData.ofBaseDirPath(LOGS, READ_WRITE))))

View file

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.index.codec.vectors.es818;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import java.io.IOException;
/**
* A hook for {@link DirectIOLucene99FlatVectorsReader} to specify the input should be opened using DirectIO.
* Remove when IOContext allows more extensible payloads to be specified.
*/
public interface DirectIOIndexInputSupplier {
IndexInput openInputDirect(String name, IOContext context) throws IOException;
}

View file

@ -0,0 +1,72 @@
/*
* @notice
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Modifications copyright (C) 2024 Elasticsearch B.V.
*/
package org.elasticsearch.index.codec.vectors.es818;
import org.apache.lucene.codecs.hnsw.FlatVectorsFormat;
import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
import org.apache.lucene.codecs.lucene99.Lucene99FlatVectorsWriter;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SegmentWriteState;
import java.io.IOException;
/**
* Copied from Lucene99FlatVectorsFormat in Lucene 10.1
*
* This is copied to change the implementation of {@link #fieldsReader} only.
* The codec format itself is not changed, so we keep the original {@link #NAME}
*/
public class DirectIOLucene99FlatVectorsFormat extends FlatVectorsFormat {
static final String NAME = "Lucene99FlatVectorsFormat";
static final String META_CODEC_NAME = "Lucene99FlatVectorsFormatMeta";
static final String VECTOR_DATA_CODEC_NAME = "Lucene99FlatVectorsFormatData";
static final String META_EXTENSION = "vemf";
static final String VECTOR_DATA_EXTENSION = "vec";
public static final int VERSION_START = 0;
public static final int VERSION_CURRENT = VERSION_START;
static final int DIRECT_MONOTONIC_BLOCK_SHIFT = 16;
private final FlatVectorsScorer vectorsScorer;
/** Constructs a format */
public DirectIOLucene99FlatVectorsFormat(FlatVectorsScorer vectorsScorer) {
super(NAME);
this.vectorsScorer = vectorsScorer;
}
@Override
public FlatVectorsWriter fieldsWriter(SegmentWriteState state) throws IOException {
return new Lucene99FlatVectorsWriter(state, vectorsScorer);
}
@Override
public FlatVectorsReader fieldsReader(SegmentReadState state) throws IOException {
return new DirectIOLucene99FlatVectorsReader(state, vectorsScorer);
}
@Override
public String toString() {
return "ES818FlatVectorsFormat(" + "vectorsScorer=" + vectorsScorer + ')';
}
}

View file

@ -0,0 +1,344 @@
/*
* @notice
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Modifications copyright (C) 2025 Elasticsearch B.V.
*/
package org.elasticsearch.index.codec.vectors.es818;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
import org.apache.lucene.codecs.lucene95.OffHeapByteVectorValues;
import org.apache.lucene.codecs.lucene95.OffHeapFloatVectorValues;
import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
import org.apache.lucene.index.ByteVectorValues;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.FloatVectorValues;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.VectorEncoding;
import org.apache.lucene.index.VectorSimilarityFunction;
import org.apache.lucene.internal.hppc.IntObjectHashMap;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.ReadAdvice;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.SuppressForbidden;
import org.apache.lucene.util.hnsw.RandomVectorScorer;
import java.io.IOException;
import java.io.UncheckedIOException;
import static org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader.readSimilarityFunction;
import static org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader.readVectorEncoding;
/** Copied from Lucene99FlatVectorsReader in Lucene 10.2, then modified to support DirectIOIndexInputSupplier */
@SuppressForbidden(reason = "Copied from lucene")
public class DirectIOLucene99FlatVectorsReader extends FlatVectorsReader {
private static final boolean USE_DIRECT_IO = Boolean.parseBoolean(System.getProperty("vector.rescoring.directio", "true"));
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(DirectIOLucene99FlatVectorsReader.class);
private final IntObjectHashMap<FieldEntry> fields = new IntObjectHashMap<>();
private final IndexInput vectorData;
private final FieldInfos fieldInfos;
public DirectIOLucene99FlatVectorsReader(SegmentReadState state, FlatVectorsScorer scorer) throws IOException {
super(scorer);
int versionMeta = readMetadata(state);
this.fieldInfos = state.fieldInfos;
boolean success = false;
try {
vectorData = openDataInput(
state,
versionMeta,
DirectIOLucene99FlatVectorsFormat.VECTOR_DATA_EXTENSION,
DirectIOLucene99FlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
// Flat formats are used to randomly access vectors from their node ID that is stored
// in the HNSW graph.
state.context.withReadAdvice(ReadAdvice.RANDOM)
);
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(this);
}
}
}
private int readMetadata(SegmentReadState state) throws IOException {
String metaFileName = IndexFileNames.segmentFileName(
state.segmentInfo.name,
state.segmentSuffix,
DirectIOLucene99FlatVectorsFormat.META_EXTENSION
);
int versionMeta = -1;
try (ChecksumIndexInput meta = state.directory.openChecksumInput(metaFileName)) {
Throwable priorE = null;
try {
versionMeta = CodecUtil.checkIndexHeader(
meta,
DirectIOLucene99FlatVectorsFormat.META_CODEC_NAME,
DirectIOLucene99FlatVectorsFormat.VERSION_START,
DirectIOLucene99FlatVectorsFormat.VERSION_CURRENT,
state.segmentInfo.getId(),
state.segmentSuffix
);
readFields(meta, state.fieldInfos);
} catch (Throwable exception) {
priorE = exception;
} finally {
CodecUtil.checkFooter(meta, priorE);
}
}
return versionMeta;
}
private static IndexInput openDataInput(
SegmentReadState state,
int versionMeta,
String fileExtension,
String codecName,
IOContext context
) throws IOException {
String fileName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, fileExtension);
// use direct IO for accessing raw vector data for searches
IndexInput in = USE_DIRECT_IO
&& context.context() == IOContext.Context.DEFAULT
&& state.directory instanceof DirectIOIndexInputSupplier did
? did.openInputDirect(fileName, context)
: state.directory.openInput(fileName, context);
boolean success = false;
try {
int versionVectorData = CodecUtil.checkIndexHeader(
in,
codecName,
DirectIOLucene99FlatVectorsFormat.VERSION_START,
DirectIOLucene99FlatVectorsFormat.VERSION_CURRENT,
state.segmentInfo.getId(),
state.segmentSuffix
);
if (versionMeta != versionVectorData) {
throw new CorruptIndexException(
"Format versions mismatch: meta=" + versionMeta + ", " + codecName + "=" + versionVectorData,
in
);
}
CodecUtil.retrieveChecksum(in);
success = true;
return in;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(in);
}
}
}
private void readFields(ChecksumIndexInput meta, FieldInfos infos) throws IOException {
for (int fieldNumber = meta.readInt(); fieldNumber != -1; fieldNumber = meta.readInt()) {
FieldInfo info = infos.fieldInfo(fieldNumber);
if (info == null) {
throw new CorruptIndexException("Invalid field number: " + fieldNumber, meta);
}
FieldEntry fieldEntry = FieldEntry.create(meta, info);
fields.put(info.number, fieldEntry);
}
}
@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + fields.ramBytesUsed();
}
@Override
public void checkIntegrity() throws IOException {
CodecUtil.checksumEntireFile(vectorData);
}
@Override
public FlatVectorsReader getMergeInstance() {
try {
// Update the read advice since vectors are guaranteed to be accessed sequentially for merge
this.vectorData.updateReadAdvice(ReadAdvice.SEQUENTIAL);
return this;
} catch (IOException exception) {
throw new UncheckedIOException(exception);
}
}
private FieldEntry getFieldEntry(String field, VectorEncoding expectedEncoding) {
final FieldInfo info = fieldInfos.fieldInfo(field);
final FieldEntry fieldEntry;
if (info == null || (fieldEntry = fields.get(info.number)) == null) {
throw new IllegalArgumentException("field=\"" + field + "\" not found");
}
if (fieldEntry.vectorEncoding != expectedEncoding) {
throw new IllegalArgumentException(
"field=\"" + field + "\" is encoded as: " + fieldEntry.vectorEncoding + " expected: " + expectedEncoding
);
}
return fieldEntry;
}
@Override
public FloatVectorValues getFloatVectorValues(String field) throws IOException {
final FieldEntry fieldEntry = getFieldEntry(field, VectorEncoding.FLOAT32);
return OffHeapFloatVectorValues.load(
fieldEntry.similarityFunction,
vectorScorer,
fieldEntry.ordToDoc,
fieldEntry.vectorEncoding,
fieldEntry.dimension,
fieldEntry.vectorDataOffset,
fieldEntry.vectorDataLength,
vectorData
);
}
@Override
public ByteVectorValues getByteVectorValues(String field) throws IOException {
final FieldEntry fieldEntry = getFieldEntry(field, VectorEncoding.BYTE);
return OffHeapByteVectorValues.load(
fieldEntry.similarityFunction,
vectorScorer,
fieldEntry.ordToDoc,
fieldEntry.vectorEncoding,
fieldEntry.dimension,
fieldEntry.vectorDataOffset,
fieldEntry.vectorDataLength,
vectorData
);
}
@Override
public RandomVectorScorer getRandomVectorScorer(String field, float[] target) throws IOException {
final FieldEntry fieldEntry = getFieldEntry(field, VectorEncoding.FLOAT32);
return vectorScorer.getRandomVectorScorer(
fieldEntry.similarityFunction,
OffHeapFloatVectorValues.load(
fieldEntry.similarityFunction,
vectorScorer,
fieldEntry.ordToDoc,
fieldEntry.vectorEncoding,
fieldEntry.dimension,
fieldEntry.vectorDataOffset,
fieldEntry.vectorDataLength,
vectorData
),
target
);
}
@Override
public RandomVectorScorer getRandomVectorScorer(String field, byte[] target) throws IOException {
final FieldEntry fieldEntry = getFieldEntry(field, VectorEncoding.BYTE);
return vectorScorer.getRandomVectorScorer(
fieldEntry.similarityFunction,
OffHeapByteVectorValues.load(
fieldEntry.similarityFunction,
vectorScorer,
fieldEntry.ordToDoc,
fieldEntry.vectorEncoding,
fieldEntry.dimension,
fieldEntry.vectorDataOffset,
fieldEntry.vectorDataLength,
vectorData
),
target
);
}
@Override
public void finishMerge() throws IOException {
// This makes sure that the access pattern hint is reverted back since HNSW implementation
// needs it
this.vectorData.updateReadAdvice(ReadAdvice.RANDOM);
}
@Override
public void close() throws IOException {
IOUtils.close(vectorData);
}
private record FieldEntry(
VectorSimilarityFunction similarityFunction,
VectorEncoding vectorEncoding,
long vectorDataOffset,
long vectorDataLength,
int dimension,
int size,
OrdToDocDISIReaderConfiguration ordToDoc,
FieldInfo info
) {
FieldEntry {
if (similarityFunction != info.getVectorSimilarityFunction()) {
throw new IllegalStateException(
"Inconsistent vector similarity function for field=\""
+ info.name
+ "\"; "
+ similarityFunction
+ " != "
+ info.getVectorSimilarityFunction()
);
}
int infoVectorDimension = info.getVectorDimension();
if (infoVectorDimension != dimension) {
throw new IllegalStateException(
"Inconsistent vector dimension for field=\"" + info.name + "\"; " + infoVectorDimension + " != " + dimension
);
}
int byteSize = switch (info.getVectorEncoding()) {
case BYTE -> Byte.BYTES;
case FLOAT32 -> Float.BYTES;
};
long vectorBytes = Math.multiplyExact((long) infoVectorDimension, byteSize);
long numBytes = Math.multiplyExact(vectorBytes, size);
if (numBytes != vectorDataLength) {
throw new IllegalStateException(
"Vector data length "
+ vectorDataLength
+ " not matching size="
+ size
+ " * dim="
+ dimension
+ " * byteSize="
+ byteSize
+ " = "
+ numBytes
);
}
}
static FieldEntry create(IndexInput input, FieldInfo info) throws IOException {
final VectorEncoding vectorEncoding = readVectorEncoding(input);
final VectorSimilarityFunction similarityFunction = readSimilarityFunction(input);
final var vectorDataOffset = input.readVLong();
final var vectorDataLength = input.readVLong();
final var dimension = input.readVInt();
final var size = input.readInt();
final var ordToDoc = OrdToDocDISIReaderConfiguration.fromStoredMeta(input, size);
return new FieldEntry(similarityFunction, vectorEncoding, vectorDataOffset, vectorDataLength, dimension, size, ordToDoc, info);
}
}
}

View file

@ -23,7 +23,6 @@ import org.apache.lucene.codecs.hnsw.FlatVectorScorerUtil;
import org.apache.lucene.codecs.hnsw.FlatVectorsFormat;
import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
import org.apache.lucene.codecs.hnsw.FlatVectorsWriter;
import org.apache.lucene.codecs.lucene99.Lucene99FlatVectorsFormat;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SegmentWriteState;
import org.elasticsearch.index.codec.vectors.OptimizedScalarQuantizer;
@ -98,7 +97,7 @@ public class ES818BinaryQuantizedVectorsFormat extends FlatVectorsFormat {
static final String VECTOR_DATA_EXTENSION = "veb";
static final int DIRECT_MONOTONIC_BLOCK_SHIFT = 16;
private static final FlatVectorsFormat rawVectorFormat = new Lucene99FlatVectorsFormat(
private static final DirectIOLucene99FlatVectorsFormat rawVectorFormat = new DirectIOLucene99FlatVectorsFormat(
FlatVectorScorerUtil.getLucene99FlatVectorsScorer()
);

View file

@ -19,6 +19,7 @@ import org.apache.lucene.codecs.lucene99.Lucene99FlatVectorsReader;
import org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader;
import org.apache.lucene.codecs.lucene99.Lucene99ScalarQuantizedVectorsReader;
import org.apache.lucene.index.FieldInfo;
import org.elasticsearch.index.codec.vectors.es818.DirectIOLucene99FlatVectorsReader;
import java.util.Map;
import java.util.stream.Collectors;
@ -51,6 +52,9 @@ public class OffHeapByteSizeUtils {
case Lucene99FlatVectorsReader flatVectorsReader -> {
return OffHeapReflectionUtils.getOffHeapByteSizeF99FLT(flatVectorsReader, fieldInfo);
}
case DirectIOLucene99FlatVectorsReader flatVectorsReader -> {
return OffHeapReflectionUtils.getOffHeapByteSizeF99FLT(flatVectorsReader, fieldInfo);
}
case Lucene95HnswVectorsReader lucene95HnswVectorsReader -> {
return OffHeapReflectionUtils.getOffHeapByteSizeL95HNSW(lucene95HnswVectorsReader, fieldInfo);
}

View file

@ -21,6 +21,7 @@ import org.apache.lucene.codecs.lucene99.Lucene99ScalarQuantizedVectorsReader;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.VectorEncoding;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.index.codec.vectors.es818.DirectIOLucene99FlatVectorsReader;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
@ -48,12 +49,15 @@ public class OffHeapReflectionUtils {
private static final VarHandle RAW_VECTORS_READER_HNDL_SQ;
private static final MethodHandle GET_FIELD_ENTRY_HANDLE_L99FLT;
private static final MethodHandle VECTOR_DATA_LENGTH_HANDLE_L99FLT;
private static final MethodHandle GET_FIELD_ENTRY_HANDLE_DIOL99FLT;
private static final MethodHandle VECTOR_DATA_LENGTH_HANDLE_DIOL99FLT;
private static final MethodHandle GET_FIELD_ENTRY_HANDLE_L99HNSW;
private static final MethodHandle GET_VECTOR_INDEX_LENGTH_HANDLE_L99HNSW;
private static final VarHandle FLAT_VECTORS_READER_HNDL_L99HNSW;
static final Class<?> L99_SQ_VR_CLS = Lucene99ScalarQuantizedVectorsReader.class;
static final Class<?> L99_FLT_VR_CLS = Lucene99FlatVectorsReader.class;
static final Class<?> DIOL99_FLT_VR_CLS = DirectIOLucene99FlatVectorsReader.class;
static final Class<?> L99_HNSW_VR_CLS = Lucene99HnswVectorsReader.class;
// old codecs
@ -98,6 +102,12 @@ public class OffHeapReflectionUtils {
mt = methodType(cls, String.class, VectorEncoding.class);
GET_FIELD_ENTRY_HANDLE_L99FLT = lookup.findVirtual(L99_FLT_VR_CLS, "getFieldEntry", mt);
VECTOR_DATA_LENGTH_HANDLE_L99FLT = lookup.findVirtual(cls, "vectorDataLength", methodType(long.class));
// DirectIOLucene99FlatVectorsReader
cls = Class.forName("org.elasticsearch.index.codec.vectors.es818.DirectIOLucene99FlatVectorsReader$FieldEntry");
lookup = privilegedPrivateLookupIn(DIOL99_FLT_VR_CLS, MethodHandles.lookup());
mt = methodType(cls, String.class, VectorEncoding.class);
GET_FIELD_ENTRY_HANDLE_DIOL99FLT = lookup.findVirtual(DIOL99_FLT_VR_CLS, "getFieldEntry", mt);
VECTOR_DATA_LENGTH_HANDLE_DIOL99FLT = lookup.findVirtual(cls, "vectorDataLength", methodType(long.class));
// Lucene99HnswVectorsReader
cls = Class.forName("org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader$FieldEntry");
lookup = privilegedPrivateLookupIn(L99_HNSW_VR_CLS, MethodHandles.lookup());
@ -174,6 +184,18 @@ public class OffHeapReflectionUtils {
throw new AssertionError("should not reach here");
}
@SuppressForbidden(reason = "static type is not accessible")
static Map<String, Long> getOffHeapByteSizeF99FLT(DirectIOLucene99FlatVectorsReader reader, FieldInfo fieldInfo) {
try {
var entry = GET_FIELD_ENTRY_HANDLE_DIOL99FLT.invoke(reader, fieldInfo.name, fieldInfo.getVectorEncoding());
long len = (long) VECTOR_DATA_LENGTH_HANDLE_DIOL99FLT.invoke(entry);
return Map.of(FLAT_VECTOR_DATA_EXTENSION, len);
} catch (Throwable t) {
handleThrowable(t);
}
throw new AssertionError("should not reach here");
}
@SuppressForbidden(reason = "static type is not accessible")
static Map<String, Long> getOffHeapByteSizeL99HNSW(Lucene99HnswVectorsReader reader, FieldInfo fieldInfo) {
try {

View file

@ -9,6 +9,7 @@
package org.elasticsearch.index.store;
import org.apache.lucene.misc.store.DirectIODirectory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FileSwitchDirectory;
@ -25,18 +26,24 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.vectors.es818.DirectIOIndexInputSupplier;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.plugins.IndexStorePlugin;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.BiPredicate;
public class FsDirectoryFactory implements IndexStorePlugin.DirectoryFactory {
private static final Logger Log = LogManager.getLogger(FsDirectoryFactory.class);
public static final Setting<LockFactory> INDEX_LOCK_FACTOR_SETTING = new Setting<>("index.store.fs.fs_lock", "native", (s) -> {
return switch (s) {
case "native" -> NativeFSLockFactory.INSTANCE;
@ -109,12 +116,29 @@ public class FsDirectoryFactory implements IndexStorePlugin.DirectoryFactory {
return unwrap instanceof HybridDirectory;
}
static final class HybridDirectory extends NIOFSDirectory {
static final class HybridDirectory extends NIOFSDirectory implements DirectIOIndexInputSupplier {
private final MMapDirectory delegate;
private final DirectIODirectory directIODelegate;
HybridDirectory(LockFactory lockFactory, MMapDirectory delegate) throws IOException {
super(delegate.getDirectory(), lockFactory);
this.delegate = delegate;
DirectIODirectory directIO;
try {
// use 8kB buffer (two pages) to guarantee it can load all of an un-page-aligned 1024-dim float vector
directIO = new DirectIODirectory(delegate, 8192, DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT) {
@Override
protected boolean useDirectIO(String name, IOContext context, OptionalLong fileLength) {
return true;
}
};
} catch (Exception e) {
// directio not supported
Log.warn("Could not initialize DirectIO access", e);
directIO = null;
}
this.directIODelegate = directIO;
}
@Override
@ -135,6 +159,18 @@ public class FsDirectoryFactory implements IndexStorePlugin.DirectoryFactory {
}
}
@Override
public IndexInput openInputDirect(String name, IOContext context) throws IOException {
if (directIODelegate == null) {
return openInput(name, context);
}
// we need to do these checks on the outer directory since the inner doesn't know about pending deletes
ensureOpen();
ensureCanRead(name);
return directIODelegate.openInput(name, context);
}
@Override
public void close() throws IOException {
IOUtils.close(super::close, delegate);

View file

@ -218,7 +218,12 @@ public class DiskUsageIntegTestCase extends ESIntegTestCase {
}
TestFileStore getTestFileStore(Path path) {
final TestFileStore fileStore = trackedPaths.get(path);
final TestFileStore fileStore = trackedPaths.entrySet()
.stream()
.filter(e -> path.startsWith(e.getKey()))
.map(Map.Entry::getValue)
.findAny()
.orElse(null);
if (fileStore != null) {
return fileStore;
}