Provide document size reporter with MapperService (#109794)

Instead of indexMode a mapper service is necessary to reliably determine if an index is a timeseries datastream
This commit is contained in:
Przemyslaw Gomulka 2024-06-18 11:40:56 +02:00 committed by GitHub
parent a3f4d51f7c
commit b80b739993
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 45 additions and 20 deletions

View file

@ -0,0 +1,5 @@
pr: 109794
summary: Provide document size reporter with `MapperService`
area: Infra/Metrics
type: bug
issues: []

View file

@ -12,7 +12,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.ingest.common.IngestCommonPlugin;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
@ -102,7 +102,7 @@ public class DocumentSizeObserverWithPipelinesIT extends ESIntegTestCase {
@Override
public DocumentSizeReporter newDocumentSizeReporter(
String indexName,
IndexMode indexMode,
MapperService mapperService,
DocumentSizeAccumulator documentSizeAccumulator
) {
return DocumentSizeReporter.EMPTY_INSTANCE;

View file

@ -81,7 +81,8 @@ public class IndexingMemoryControllerIT extends ESSingleNodeTestCase {
config.getLeafSorter(),
config.getRelativeTimeInNanosSupplier(),
config.getIndexCommitListener(),
config.isPromotableToPrimary()
config.isPromotableToPrimary(),
config.getMapperService()
);
}

View file

@ -9,10 +9,10 @@
package org.elasticsearch.plugins.internal;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.IngestPlugin;
@ -104,7 +104,7 @@ public class DocumentSizeObserverIT extends ESIntegTestCase {
DocumentSizeReporter documentParsingReporter = documentParsingProvider.newDocumentSizeReporter(
shardId.getIndexName(),
IndexMode.STANDARD,
config().getMapperService(),
DocumentSizeAccumulator.EMPTY_INSTANCE
);
documentParsingReporter.onIndexingCompleted(index.parsedDoc());
@ -136,7 +136,7 @@ public class DocumentSizeObserverIT extends ESIntegTestCase {
@Override
public DocumentSizeReporter newDocumentSizeReporter(
String indexName,
IndexMode indexMode,
MapperService mapperService,
DocumentSizeAccumulator documentSizeAccumulator
) {
return new TestDocumentSizeReporter(indexName);

View file

@ -24,6 +24,7 @@ import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
@ -51,6 +52,7 @@ public final class EngineConfig {
private volatile boolean enableGcDeletes = true;
private final TimeValue flushMergesAfter;
private final String codecName;
private final MapperService mapperService;
private final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier;
private final ThreadPool threadPool;
private final Engine.Warmer warmer;
@ -163,7 +165,8 @@ public final class EngineConfig {
Comparator<LeafReader> leafSorter,
LongSupplier relativeTimeInNanosSupplier,
Engine.IndexCommitListener indexCommitListener,
boolean promotableToPrimary
boolean promotableToPrimary,
MapperService mapperService
) {
this.shardId = shardId;
this.indexSettings = indexSettings;
@ -176,6 +179,7 @@ public final class EngineConfig {
this.codecService = codecService;
this.eventListener = eventListener;
codecName = indexSettings.getValue(INDEX_CODEC_SETTING);
this.mapperService = mapperService;
// We need to make the indexing buffer for this shard at least as large
// as the amount of memory that is available for all engines on the
// local node so that decisions to flush segments to disk are made by
@ -436,4 +440,8 @@ public final class EngineConfig {
public boolean getUseCompoundFile() {
return useCompoundFile;
}
public MapperService getMapperService() {
return mapperService;
}
}

View file

@ -3491,7 +3491,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
isTimeBasedIndex ? TIMESERIES_LEAF_READERS_SORTER : null,
relativeTimeInNanosSupplier,
indexCommitListener,
routingEntry().isPromotableToPrimary()
routingEntry().isPromotableToPrimary(),
mapperService()
);
}

View file

@ -8,7 +8,7 @@
package org.elasticsearch.plugins.internal;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.mapper.MapperService;
/**
* An interface to provide instances of document parsing observer and reporter
@ -36,7 +36,7 @@ public interface DocumentParsingProvider {
*/
default DocumentSizeReporter newDocumentSizeReporter(
String indexName,
IndexMode indexMode,
MapperService mapperService,
DocumentSizeAccumulator documentSizeAccumulator
) {
return DocumentSizeReporter.EMPTY_INSTANCE;

View file

@ -3639,7 +3639,8 @@ public class InternalEngineTests extends EngineTestCase {
null,
config.getRelativeTimeInNanosSupplier(),
null,
true
true,
config.getMapperService()
);
expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig));
@ -7320,7 +7321,8 @@ public class InternalEngineTests extends EngineTestCase {
config.getLeafSorter(),
config.getRelativeTimeInNanosSupplier(),
config.getIndexCommitListener(),
config.isPromotableToPrimary()
config.isPromotableToPrimary(),
config.getMapperService()
);
try (InternalEngine engine = createEngine(configWithWarmer)) {
assertThat(warmedUpReaders, empty());

View file

@ -4820,7 +4820,8 @@ public class IndexShardTests extends IndexShardTestCase {
config.getLeafSorter(),
config.getRelativeTimeInNanosSupplier(),
config.getIndexCommitListener(),
config.isPromotableToPrimary()
config.isPromotableToPrimary(),
config.getMapperService()
);
return new InternalEngine(configWithWarmer);
});

View file

@ -156,7 +156,8 @@ public class RefreshListenersTests extends ESTestCase {
null,
System::nanoTime,
null,
true
true,
null
);
engine = new InternalEngine(config);
EngineTestCase.recoverFromTranslog(engine, (e, s) -> 0, Long.MAX_VALUE);

View file

@ -268,7 +268,8 @@ public abstract class EngineTestCase extends ESTestCase {
config.getLeafSorter(),
config.getRelativeTimeInNanosSupplier(),
config.getIndexCommitListener(),
config.isPromotableToPrimary()
config.isPromotableToPrimary(),
config.getMapperService()
);
}
@ -299,7 +300,8 @@ public abstract class EngineTestCase extends ESTestCase {
config.getLeafSorter(),
config.getRelativeTimeInNanosSupplier(),
config.getIndexCommitListener(),
config.isPromotableToPrimary()
config.isPromotableToPrimary(),
config.getMapperService()
);
}
@ -330,7 +332,8 @@ public abstract class EngineTestCase extends ESTestCase {
config.getLeafSorter(),
config.getRelativeTimeInNanosSupplier(),
config.getIndexCommitListener(),
config.isPromotableToPrimary()
config.isPromotableToPrimary(),
config.getMapperService()
);
}
@ -854,7 +857,8 @@ public abstract class EngineTestCase extends ESTestCase {
null,
this::relativeTimeInNanos,
indexCommitListener,
true
true,
null
);
}
@ -893,7 +897,8 @@ public abstract class EngineTestCase extends ESTestCase {
config.getLeafSorter(),
config.getRelativeTimeInNanosSupplier(),
config.getIndexCommitListener(),
config.isPromotableToPrimary()
config.isPromotableToPrimary(),
config.getMapperService()
);
}

View file

@ -252,7 +252,8 @@ public class FollowingEngineTests extends ESTestCase {
null,
System::nanoTime,
null,
true
true,
null
);
}