diff --git a/libs/entitlement/src/main/java/org/elasticsearch/entitlement/initialization/EntitlementInitialization.java b/libs/entitlement/src/main/java/org/elasticsearch/entitlement/initialization/EntitlementInitialization.java index b1f26d5d59ab..1968dbdd4f20 100644 --- a/libs/entitlement/src/main/java/org/elasticsearch/entitlement/initialization/EntitlementInitialization.java +++ b/libs/entitlement/src/main/java/org/elasticsearch/entitlement/initialization/EntitlementInitialization.java @@ -279,7 +279,10 @@ public class EntitlementInitialization { new FilesEntitlement(List.of(FileData.ofBaseDirPath(CONFIG, READ), FileData.ofBaseDirPath(DATA, READ_WRITE))) ) ), - new Scope("org.apache.lucene.misc", List.of(new FilesEntitlement(List.of(FileData.ofBaseDirPath(DATA, READ_WRITE))))), + new Scope( + "org.apache.lucene.misc", + List.of(new FilesEntitlement(List.of(FileData.ofBaseDirPath(DATA, READ_WRITE))), new ReadStoreAttributesEntitlement()) + ), new Scope( "org.apache.logging.log4j.core", List.of(new ManageThreadsEntitlement(), new FilesEntitlement(List.of(FileData.ofBaseDirPath(LOGS, READ_WRITE)))) diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/DirectIOIndexInputSupplier.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/DirectIOIndexInputSupplier.java new file mode 100644 index 000000000000..0640a5dacce6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/DirectIOIndexInputSupplier.java @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.vectors.es818; + +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; + +import java.io.IOException; + +/** + * A hook for {@link DirectIOLucene99FlatVectorsReader} to specify the input should be opened using DirectIO. + * Remove when IOContext allows more extensible payloads to be specified. + */ +public interface DirectIOIndexInputSupplier { + IndexInput openInputDirect(String name, IOContext context) throws IOException; +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/DirectIOLucene99FlatVectorsFormat.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/DirectIOLucene99FlatVectorsFormat.java new file mode 100644 index 000000000000..532525817f63 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/DirectIOLucene99FlatVectorsFormat.java @@ -0,0 +1,72 @@ +/* + * @notice + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modifications copyright (C) 2024 Elasticsearch B.V. + */ +package org.elasticsearch.index.codec.vectors.es818; + +import org.apache.lucene.codecs.hnsw.FlatVectorsFormat; +import org.apache.lucene.codecs.hnsw.FlatVectorsReader; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.hnsw.FlatVectorsWriter; +import org.apache.lucene.codecs.lucene99.Lucene99FlatVectorsWriter; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.SegmentWriteState; + +import java.io.IOException; + +/** + * Copied from Lucene99FlatVectorsFormat in Lucene 10.1 + * + * This is copied to change the implementation of {@link #fieldsReader} only. + * The codec format itself is not changed, so we keep the original {@link #NAME} + */ +public class DirectIOLucene99FlatVectorsFormat extends FlatVectorsFormat { + + static final String NAME = "Lucene99FlatVectorsFormat"; + static final String META_CODEC_NAME = "Lucene99FlatVectorsFormatMeta"; + static final String VECTOR_DATA_CODEC_NAME = "Lucene99FlatVectorsFormatData"; + static final String META_EXTENSION = "vemf"; + static final String VECTOR_DATA_EXTENSION = "vec"; + + public static final int VERSION_START = 0; + public static final int VERSION_CURRENT = VERSION_START; + + static final int DIRECT_MONOTONIC_BLOCK_SHIFT = 16; + private final FlatVectorsScorer vectorsScorer; + + /** Constructs a format */ + public DirectIOLucene99FlatVectorsFormat(FlatVectorsScorer vectorsScorer) { + super(NAME); + this.vectorsScorer = vectorsScorer; + } + + @Override + public FlatVectorsWriter fieldsWriter(SegmentWriteState state) throws IOException { + return new Lucene99FlatVectorsWriter(state, vectorsScorer); + } + + @Override + public FlatVectorsReader fieldsReader(SegmentReadState state) throws IOException { + return new DirectIOLucene99FlatVectorsReader(state, vectorsScorer); + } + + @Override + public String toString() { + return "ES818FlatVectorsFormat(" + "vectorsScorer=" + vectorsScorer + ')'; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/DirectIOLucene99FlatVectorsReader.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/DirectIOLucene99FlatVectorsReader.java new file mode 100644 index 000000000000..5f7dbb31a1ad --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/DirectIOLucene99FlatVectorsReader.java @@ -0,0 +1,344 @@ +/* + * @notice + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modifications copyright (C) 2025 Elasticsearch B.V. + */ +package org.elasticsearch.index.codec.vectors.es818; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.hnsw.FlatVectorsReader; +import org.apache.lucene.codecs.hnsw.FlatVectorsScorer; +import org.apache.lucene.codecs.lucene95.OffHeapByteVectorValues; +import org.apache.lucene.codecs.lucene95.OffHeapFloatVectorValues; +import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration; +import org.apache.lucene.index.ByteVectorValues; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.FloatVectorValues; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.internal.hppc.IntObjectHashMap; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.ReadAdvice; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.SuppressForbidden; +import org.apache.lucene.util.hnsw.RandomVectorScorer; + +import java.io.IOException; +import java.io.UncheckedIOException; + +import static org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader.readSimilarityFunction; +import static org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader.readVectorEncoding; + +/** Copied from Lucene99FlatVectorsReader in Lucene 10.2, then modified to support DirectIOIndexInputSupplier */ +@SuppressForbidden(reason = "Copied from lucene") +public class DirectIOLucene99FlatVectorsReader extends FlatVectorsReader { + + private static final boolean USE_DIRECT_IO = Boolean.parseBoolean(System.getProperty("vector.rescoring.directio", "true")); + + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(DirectIOLucene99FlatVectorsReader.class); + + private final IntObjectHashMap fields = new IntObjectHashMap<>(); + private final IndexInput vectorData; + private final FieldInfos fieldInfos; + + public DirectIOLucene99FlatVectorsReader(SegmentReadState state, FlatVectorsScorer scorer) throws IOException { + super(scorer); + int versionMeta = readMetadata(state); + this.fieldInfos = state.fieldInfos; + boolean success = false; + try { + vectorData = openDataInput( + state, + versionMeta, + DirectIOLucene99FlatVectorsFormat.VECTOR_DATA_EXTENSION, + DirectIOLucene99FlatVectorsFormat.VECTOR_DATA_CODEC_NAME, + // Flat formats are used to randomly access vectors from their node ID that is stored + // in the HNSW graph. + state.context.withReadAdvice(ReadAdvice.RANDOM) + ); + success = true; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(this); + } + } + } + + private int readMetadata(SegmentReadState state) throws IOException { + String metaFileName = IndexFileNames.segmentFileName( + state.segmentInfo.name, + state.segmentSuffix, + DirectIOLucene99FlatVectorsFormat.META_EXTENSION + ); + int versionMeta = -1; + try (ChecksumIndexInput meta = state.directory.openChecksumInput(metaFileName)) { + Throwable priorE = null; + try { + versionMeta = CodecUtil.checkIndexHeader( + meta, + DirectIOLucene99FlatVectorsFormat.META_CODEC_NAME, + DirectIOLucene99FlatVectorsFormat.VERSION_START, + DirectIOLucene99FlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix + ); + readFields(meta, state.fieldInfos); + } catch (Throwable exception) { + priorE = exception; + } finally { + CodecUtil.checkFooter(meta, priorE); + } + } + return versionMeta; + } + + private static IndexInput openDataInput( + SegmentReadState state, + int versionMeta, + String fileExtension, + String codecName, + IOContext context + ) throws IOException { + String fileName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, fileExtension); + // use direct IO for accessing raw vector data for searches + IndexInput in = USE_DIRECT_IO + && context.context() == IOContext.Context.DEFAULT + && state.directory instanceof DirectIOIndexInputSupplier did + ? did.openInputDirect(fileName, context) + : state.directory.openInput(fileName, context); + boolean success = false; + try { + int versionVectorData = CodecUtil.checkIndexHeader( + in, + codecName, + DirectIOLucene99FlatVectorsFormat.VERSION_START, + DirectIOLucene99FlatVectorsFormat.VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix + ); + if (versionMeta != versionVectorData) { + throw new CorruptIndexException( + "Format versions mismatch: meta=" + versionMeta + ", " + codecName + "=" + versionVectorData, + in + ); + } + CodecUtil.retrieveChecksum(in); + success = true; + return in; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(in); + } + } + } + + private void readFields(ChecksumIndexInput meta, FieldInfos infos) throws IOException { + for (int fieldNumber = meta.readInt(); fieldNumber != -1; fieldNumber = meta.readInt()) { + FieldInfo info = infos.fieldInfo(fieldNumber); + if (info == null) { + throw new CorruptIndexException("Invalid field number: " + fieldNumber, meta); + } + FieldEntry fieldEntry = FieldEntry.create(meta, info); + fields.put(info.number, fieldEntry); + } + } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + fields.ramBytesUsed(); + } + + @Override + public void checkIntegrity() throws IOException { + CodecUtil.checksumEntireFile(vectorData); + } + + @Override + public FlatVectorsReader getMergeInstance() { + try { + // Update the read advice since vectors are guaranteed to be accessed sequentially for merge + this.vectorData.updateReadAdvice(ReadAdvice.SEQUENTIAL); + return this; + } catch (IOException exception) { + throw new UncheckedIOException(exception); + } + } + + private FieldEntry getFieldEntry(String field, VectorEncoding expectedEncoding) { + final FieldInfo info = fieldInfos.fieldInfo(field); + final FieldEntry fieldEntry; + if (info == null || (fieldEntry = fields.get(info.number)) == null) { + throw new IllegalArgumentException("field=\"" + field + "\" not found"); + } + if (fieldEntry.vectorEncoding != expectedEncoding) { + throw new IllegalArgumentException( + "field=\"" + field + "\" is encoded as: " + fieldEntry.vectorEncoding + " expected: " + expectedEncoding + ); + } + return fieldEntry; + } + + @Override + public FloatVectorValues getFloatVectorValues(String field) throws IOException { + final FieldEntry fieldEntry = getFieldEntry(field, VectorEncoding.FLOAT32); + return OffHeapFloatVectorValues.load( + fieldEntry.similarityFunction, + vectorScorer, + fieldEntry.ordToDoc, + fieldEntry.vectorEncoding, + fieldEntry.dimension, + fieldEntry.vectorDataOffset, + fieldEntry.vectorDataLength, + vectorData + ); + } + + @Override + public ByteVectorValues getByteVectorValues(String field) throws IOException { + final FieldEntry fieldEntry = getFieldEntry(field, VectorEncoding.BYTE); + return OffHeapByteVectorValues.load( + fieldEntry.similarityFunction, + vectorScorer, + fieldEntry.ordToDoc, + fieldEntry.vectorEncoding, + fieldEntry.dimension, + fieldEntry.vectorDataOffset, + fieldEntry.vectorDataLength, + vectorData + ); + } + + @Override + public RandomVectorScorer getRandomVectorScorer(String field, float[] target) throws IOException { + final FieldEntry fieldEntry = getFieldEntry(field, VectorEncoding.FLOAT32); + return vectorScorer.getRandomVectorScorer( + fieldEntry.similarityFunction, + OffHeapFloatVectorValues.load( + fieldEntry.similarityFunction, + vectorScorer, + fieldEntry.ordToDoc, + fieldEntry.vectorEncoding, + fieldEntry.dimension, + fieldEntry.vectorDataOffset, + fieldEntry.vectorDataLength, + vectorData + ), + target + ); + } + + @Override + public RandomVectorScorer getRandomVectorScorer(String field, byte[] target) throws IOException { + final FieldEntry fieldEntry = getFieldEntry(field, VectorEncoding.BYTE); + return vectorScorer.getRandomVectorScorer( + fieldEntry.similarityFunction, + OffHeapByteVectorValues.load( + fieldEntry.similarityFunction, + vectorScorer, + fieldEntry.ordToDoc, + fieldEntry.vectorEncoding, + fieldEntry.dimension, + fieldEntry.vectorDataOffset, + fieldEntry.vectorDataLength, + vectorData + ), + target + ); + } + + @Override + public void finishMerge() throws IOException { + // This makes sure that the access pattern hint is reverted back since HNSW implementation + // needs it + this.vectorData.updateReadAdvice(ReadAdvice.RANDOM); + } + + @Override + public void close() throws IOException { + IOUtils.close(vectorData); + } + + private record FieldEntry( + VectorSimilarityFunction similarityFunction, + VectorEncoding vectorEncoding, + long vectorDataOffset, + long vectorDataLength, + int dimension, + int size, + OrdToDocDISIReaderConfiguration ordToDoc, + FieldInfo info + ) { + + FieldEntry { + if (similarityFunction != info.getVectorSimilarityFunction()) { + throw new IllegalStateException( + "Inconsistent vector similarity function for field=\"" + + info.name + + "\"; " + + similarityFunction + + " != " + + info.getVectorSimilarityFunction() + ); + } + int infoVectorDimension = info.getVectorDimension(); + if (infoVectorDimension != dimension) { + throw new IllegalStateException( + "Inconsistent vector dimension for field=\"" + info.name + "\"; " + infoVectorDimension + " != " + dimension + ); + } + + int byteSize = switch (info.getVectorEncoding()) { + case BYTE -> Byte.BYTES; + case FLOAT32 -> Float.BYTES; + }; + long vectorBytes = Math.multiplyExact((long) infoVectorDimension, byteSize); + long numBytes = Math.multiplyExact(vectorBytes, size); + if (numBytes != vectorDataLength) { + throw new IllegalStateException( + "Vector data length " + + vectorDataLength + + " not matching size=" + + size + + " * dim=" + + dimension + + " * byteSize=" + + byteSize + + " = " + + numBytes + ); + } + } + + static FieldEntry create(IndexInput input, FieldInfo info) throws IOException { + final VectorEncoding vectorEncoding = readVectorEncoding(input); + final VectorSimilarityFunction similarityFunction = readSimilarityFunction(input); + final var vectorDataOffset = input.readVLong(); + final var vectorDataLength = input.readVLong(); + final var dimension = input.readVInt(); + final var size = input.readInt(); + final var ordToDoc = OrdToDocDISIReaderConfiguration.fromStoredMeta(input, size); + return new FieldEntry(similarityFunction, vectorEncoding, vectorDataOffset, vectorDataLength, dimension, size, ordToDoc, info); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/ES818BinaryQuantizedVectorsFormat.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/ES818BinaryQuantizedVectorsFormat.java index 893640dfb484..c3e12a765eb5 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/ES818BinaryQuantizedVectorsFormat.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/ES818BinaryQuantizedVectorsFormat.java @@ -23,7 +23,6 @@ import org.apache.lucene.codecs.hnsw.FlatVectorScorerUtil; import org.apache.lucene.codecs.hnsw.FlatVectorsFormat; import org.apache.lucene.codecs.hnsw.FlatVectorsReader; import org.apache.lucene.codecs.hnsw.FlatVectorsWriter; -import org.apache.lucene.codecs.lucene99.Lucene99FlatVectorsFormat; import org.apache.lucene.index.SegmentReadState; import org.apache.lucene.index.SegmentWriteState; import org.elasticsearch.index.codec.vectors.OptimizedScalarQuantizer; @@ -98,7 +97,7 @@ public class ES818BinaryQuantizedVectorsFormat extends FlatVectorsFormat { static final String VECTOR_DATA_EXTENSION = "veb"; static final int DIRECT_MONOTONIC_BLOCK_SHIFT = 16; - private static final FlatVectorsFormat rawVectorFormat = new Lucene99FlatVectorsFormat( + private static final DirectIOLucene99FlatVectorsFormat rawVectorFormat = new DirectIOLucene99FlatVectorsFormat( FlatVectorScorerUtil.getLucene99FlatVectorsScorer() ); diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/reflect/OffHeapByteSizeUtils.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/reflect/OffHeapByteSizeUtils.java index 81cea18a3d56..dcd9fc3b1273 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/reflect/OffHeapByteSizeUtils.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/reflect/OffHeapByteSizeUtils.java @@ -19,6 +19,7 @@ import org.apache.lucene.codecs.lucene99.Lucene99FlatVectorsReader; import org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader; import org.apache.lucene.codecs.lucene99.Lucene99ScalarQuantizedVectorsReader; import org.apache.lucene.index.FieldInfo; +import org.elasticsearch.index.codec.vectors.es818.DirectIOLucene99FlatVectorsReader; import java.util.Map; import java.util.stream.Collectors; @@ -51,6 +52,9 @@ public class OffHeapByteSizeUtils { case Lucene99FlatVectorsReader flatVectorsReader -> { return OffHeapReflectionUtils.getOffHeapByteSizeF99FLT(flatVectorsReader, fieldInfo); } + case DirectIOLucene99FlatVectorsReader flatVectorsReader -> { + return OffHeapReflectionUtils.getOffHeapByteSizeF99FLT(flatVectorsReader, fieldInfo); + } case Lucene95HnswVectorsReader lucene95HnswVectorsReader -> { return OffHeapReflectionUtils.getOffHeapByteSizeL95HNSW(lucene95HnswVectorsReader, fieldInfo); } diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/reflect/OffHeapReflectionUtils.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/reflect/OffHeapReflectionUtils.java index 8ac80c8ba691..599a20550838 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/reflect/OffHeapReflectionUtils.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/reflect/OffHeapReflectionUtils.java @@ -21,6 +21,7 @@ import org.apache.lucene.codecs.lucene99.Lucene99ScalarQuantizedVectorsReader; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.VectorEncoding; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.index.codec.vectors.es818.DirectIOLucene99FlatVectorsReader; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; @@ -48,12 +49,15 @@ public class OffHeapReflectionUtils { private static final VarHandle RAW_VECTORS_READER_HNDL_SQ; private static final MethodHandle GET_FIELD_ENTRY_HANDLE_L99FLT; private static final MethodHandle VECTOR_DATA_LENGTH_HANDLE_L99FLT; + private static final MethodHandle GET_FIELD_ENTRY_HANDLE_DIOL99FLT; + private static final MethodHandle VECTOR_DATA_LENGTH_HANDLE_DIOL99FLT; private static final MethodHandle GET_FIELD_ENTRY_HANDLE_L99HNSW; private static final MethodHandle GET_VECTOR_INDEX_LENGTH_HANDLE_L99HNSW; private static final VarHandle FLAT_VECTORS_READER_HNDL_L99HNSW; static final Class L99_SQ_VR_CLS = Lucene99ScalarQuantizedVectorsReader.class; static final Class L99_FLT_VR_CLS = Lucene99FlatVectorsReader.class; + static final Class DIOL99_FLT_VR_CLS = DirectIOLucene99FlatVectorsReader.class; static final Class L99_HNSW_VR_CLS = Lucene99HnswVectorsReader.class; // old codecs @@ -98,6 +102,12 @@ public class OffHeapReflectionUtils { mt = methodType(cls, String.class, VectorEncoding.class); GET_FIELD_ENTRY_HANDLE_L99FLT = lookup.findVirtual(L99_FLT_VR_CLS, "getFieldEntry", mt); VECTOR_DATA_LENGTH_HANDLE_L99FLT = lookup.findVirtual(cls, "vectorDataLength", methodType(long.class)); + // DirectIOLucene99FlatVectorsReader + cls = Class.forName("org.elasticsearch.index.codec.vectors.es818.DirectIOLucene99FlatVectorsReader$FieldEntry"); + lookup = privilegedPrivateLookupIn(DIOL99_FLT_VR_CLS, MethodHandles.lookup()); + mt = methodType(cls, String.class, VectorEncoding.class); + GET_FIELD_ENTRY_HANDLE_DIOL99FLT = lookup.findVirtual(DIOL99_FLT_VR_CLS, "getFieldEntry", mt); + VECTOR_DATA_LENGTH_HANDLE_DIOL99FLT = lookup.findVirtual(cls, "vectorDataLength", methodType(long.class)); // Lucene99HnswVectorsReader cls = Class.forName("org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader$FieldEntry"); lookup = privilegedPrivateLookupIn(L99_HNSW_VR_CLS, MethodHandles.lookup()); @@ -174,6 +184,18 @@ public class OffHeapReflectionUtils { throw new AssertionError("should not reach here"); } + @SuppressForbidden(reason = "static type is not accessible") + static Map getOffHeapByteSizeF99FLT(DirectIOLucene99FlatVectorsReader reader, FieldInfo fieldInfo) { + try { + var entry = GET_FIELD_ENTRY_HANDLE_DIOL99FLT.invoke(reader, fieldInfo.name, fieldInfo.getVectorEncoding()); + long len = (long) VECTOR_DATA_LENGTH_HANDLE_DIOL99FLT.invoke(entry); + return Map.of(FLAT_VECTOR_DATA_EXTENSION, len); + } catch (Throwable t) { + handleThrowable(t); + } + throw new AssertionError("should not reach here"); + } + @SuppressForbidden(reason = "static type is not accessible") static Map getOffHeapByteSizeL99HNSW(Lucene99HnswVectorsReader reader, FieldInfo fieldInfo) { try { diff --git a/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java b/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java index bc94db13074d..76dfee2f7f2d 100644 --- a/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java +++ b/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java @@ -9,6 +9,7 @@ package org.elasticsearch.index.store; +import org.apache.lucene.misc.store.DirectIODirectory; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FileSwitchDirectory; @@ -25,18 +26,24 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.core.IOUtils; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.codec.vectors.es818.DirectIOIndexInputSupplier; import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.plugins.IndexStorePlugin; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.HashSet; +import java.util.OptionalLong; import java.util.Set; import java.util.function.BiPredicate; public class FsDirectoryFactory implements IndexStorePlugin.DirectoryFactory { + private static final Logger Log = LogManager.getLogger(FsDirectoryFactory.class); + public static final Setting INDEX_LOCK_FACTOR_SETTING = new Setting<>("index.store.fs.fs_lock", "native", (s) -> { return switch (s) { case "native" -> NativeFSLockFactory.INSTANCE; @@ -109,12 +116,29 @@ public class FsDirectoryFactory implements IndexStorePlugin.DirectoryFactory { return unwrap instanceof HybridDirectory; } - static final class HybridDirectory extends NIOFSDirectory { + static final class HybridDirectory extends NIOFSDirectory implements DirectIOIndexInputSupplier { private final MMapDirectory delegate; + private final DirectIODirectory directIODelegate; HybridDirectory(LockFactory lockFactory, MMapDirectory delegate) throws IOException { super(delegate.getDirectory(), lockFactory); this.delegate = delegate; + + DirectIODirectory directIO; + try { + // use 8kB buffer (two pages) to guarantee it can load all of an un-page-aligned 1024-dim float vector + directIO = new DirectIODirectory(delegate, 8192, DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT) { + @Override + protected boolean useDirectIO(String name, IOContext context, OptionalLong fileLength) { + return true; + } + }; + } catch (Exception e) { + // directio not supported + Log.warn("Could not initialize DirectIO access", e); + directIO = null; + } + this.directIODelegate = directIO; } @Override @@ -135,6 +159,18 @@ public class FsDirectoryFactory implements IndexStorePlugin.DirectoryFactory { } } + @Override + public IndexInput openInputDirect(String name, IOContext context) throws IOException { + if (directIODelegate == null) { + return openInput(name, context); + } + // we need to do these checks on the outer directory since the inner doesn't know about pending deletes + ensureOpen(); + ensureCanRead(name); + + return directIODelegate.openInput(name, context); + } + @Override public void close() throws IOException { IOUtils.close(super::close, delegate); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/DiskUsageIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/DiskUsageIntegTestCase.java index c3384ede3a1a..e35128e14ce3 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/DiskUsageIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/DiskUsageIntegTestCase.java @@ -218,7 +218,12 @@ public class DiskUsageIntegTestCase extends ESIntegTestCase { } TestFileStore getTestFileStore(Path path) { - final TestFileStore fileStore = trackedPaths.get(path); + final TestFileStore fileStore = trackedPaths.entrySet() + .stream() + .filter(e -> path.startsWith(e.getKey())) + .map(Map.Entry::getValue) + .findAny() + .orElse(null); if (fileStore != null) { return fileStore; }