Add the ability to reload search analyzers during shard recovery (#97421)

This change adds the ability for reloadable search analysers to adapt their loading based on
the index creation context. It is useful for reloadable search analysers that need to load
expensive resources from indices or disk. In such case they can defer the loading of the
resource during the shard recovery and avoid blocking a master or a create index thread.
---------

Co-authored-by: Mayya Sharipova <mayya.sharipova@elastic.co>
This commit is contained in:
Jim Ferenczi 2023-07-07 17:19:47 +01:00 committed by GitHub
parent c508f6172e
commit bccf4eeed2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
36 changed files with 225 additions and 95 deletions

View file

@ -154,7 +154,6 @@ public class CommonAnalysisPlugin extends Plugin implements AnalysisPlugin, Scri
private final SetOnce<ScriptService> scriptServiceHolder = new SetOnce<>();
private final SetOnce<SynonymsManagementAPIService> synonymsManagementServiceHolder = new SetOnce<>();
private final SetOnce<ThreadPool> threadPoolHolder = new SetOnce<>();
@Override
public Collection<Object> createComponents(
@ -175,7 +174,6 @@ public class CommonAnalysisPlugin extends Plugin implements AnalysisPlugin, Scri
) {
this.scriptServiceHolder.set(scriptService);
this.synonymsManagementServiceHolder.set(new SynonymsManagementAPIService(client));
this.threadPoolHolder.set(threadPool);
return Collections.emptyList();
}
@ -341,22 +339,11 @@ public class CommonAnalysisPlugin extends Plugin implements AnalysisPlugin, Scri
filters.put("stemmer", StemmerTokenFilterFactory::new);
filters.put(
"synonym",
requiresAnalysisSettings(
(i, e, n, s) -> new SynonymTokenFilterFactory(i, e, n, s, synonymsManagementServiceHolder.get(), threadPoolHolder.get())
)
requiresAnalysisSettings((i, e, n, s) -> new SynonymTokenFilterFactory(i, e, n, s, synonymsManagementServiceHolder.get()))
);
filters.put(
"synonym_graph",
requiresAnalysisSettings(
(i, e, n, s) -> new SynonymGraphTokenFilterFactory(
i,
e,
n,
s,
synonymsManagementServiceHolder.get(),
threadPoolHolder.get()
)
)
requiresAnalysisSettings((i, e, n, s) -> new SynonymGraphTokenFilterFactory(i, e, n, s, synonymsManagementServiceHolder.get()))
);
filters.put("trim", TrimTokenFilterFactory::new);
filters.put("truncate", requiresAnalysisSettings(TruncateTokenFilterFactory::new));

View file

@ -16,6 +16,7 @@ import org.apache.lucene.analysis.tokenattributes.PositionIncrementAttribute;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexService.IndexCreationContext;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AbstractTokenFilterFactory;
import org.elasticsearch.index.analysis.AnalysisMode;
@ -51,6 +52,7 @@ public class MultiplexerTokenFilterFactory extends AbstractTokenFilterFactory {
@Override
public TokenFilterFactory getChainAwareTokenFilterFactory(
IndexCreationContext context,
TokenizerFactory tokenizer,
List<CharFilterFactory> charFilters,
List<TokenFilterFactory> previousTokenFilters,
@ -66,7 +68,7 @@ public class MultiplexerTokenFilterFactory extends AbstractTokenFilterFactory {
String[] parts = Strings.tokenizeToStringArray(filter, ",");
if (parts.length == 1) {
TokenFilterFactory factory = resolveFilterFactory(allFilters, parts[0]);
factory = factory.getChainAwareTokenFilterFactory(tokenizer, charFilters, previousTokenFilters, allFilters);
factory = factory.getChainAwareTokenFilterFactory(context, tokenizer, charFilters, previousTokenFilters, allFilters);
filters.add(factory);
mode = mode.merge(factory.getAnalysisMode());
} else {
@ -74,7 +76,7 @@ public class MultiplexerTokenFilterFactory extends AbstractTokenFilterFactory {
List<TokenFilterFactory> chain = new ArrayList<>();
for (String subfilter : parts) {
TokenFilterFactory factory = resolveFilterFactory(allFilters, subfilter);
factory = factory.getChainAwareTokenFilterFactory(tokenizer, charFilters, existingChain, allFilters);
factory = factory.getChainAwareTokenFilterFactory(context, tokenizer, charFilters, existingChain, allFilters);
chain.add(factory);
existingChain.add(factory);
mode = mode.merge(factory.getAnalysisMode());

View file

@ -11,6 +11,7 @@ package org.elasticsearch.analysis.common;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.miscellaneous.ConditionalTokenFilter;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService.IndexCreationContext;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AbstractTokenFilterFactory;
import org.elasticsearch.index.analysis.CharFilterFactory;
@ -57,6 +58,7 @@ public class ScriptedConditionTokenFilterFactory extends AbstractTokenFilterFact
@Override
public TokenFilterFactory getChainAwareTokenFilterFactory(
IndexCreationContext context,
TokenizerFactory tokenizer,
List<CharFilterFactory> charFilters,
List<TokenFilterFactory> previousTokenFilters,
@ -71,7 +73,7 @@ public class ScriptedConditionTokenFilterFactory extends AbstractTokenFilterFact
"ScriptedConditionTokenFilter [" + name() + "] refers to undefined token filter [" + filter + "]"
);
}
tff = tff.getChainAwareTokenFilterFactory(tokenizer, charFilters, existingChain, allFilters);
tff = tff.getChainAwareTokenFilterFactory(context, tokenizer, charFilters, existingChain, allFilters);
filters.add(tff);
existingChain.add(tff);
}

View file

@ -14,13 +14,13 @@ import org.apache.lucene.analysis.synonym.SynonymGraphFilter;
import org.apache.lucene.analysis.synonym.SynonymMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexService.IndexCreationContext;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisMode;
import org.elasticsearch.index.analysis.CharFilterFactory;
import org.elasticsearch.index.analysis.TokenFilterFactory;
import org.elasticsearch.index.analysis.TokenizerFactory;
import org.elasticsearch.synonyms.SynonymsManagementAPIService;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.List;
import java.util.function.Function;
@ -32,10 +32,9 @@ public class SynonymGraphTokenFilterFactory extends SynonymTokenFilterFactory {
Environment env,
String name,
Settings settings,
SynonymsManagementAPIService synonymsManagementAPIService,
ThreadPool threadPool
SynonymsManagementAPIService synonymsManagementAPIService
) {
super(indexSettings, env, name, settings, synonymsManagementAPIService, threadPool);
super(indexSettings, env, name, settings, synonymsManagementAPIService);
}
@Override
@ -45,13 +44,14 @@ public class SynonymGraphTokenFilterFactory extends SynonymTokenFilterFactory {
@Override
public TokenFilterFactory getChainAwareTokenFilterFactory(
IndexCreationContext context,
TokenizerFactory tokenizer,
List<CharFilterFactory> charFilters,
List<TokenFilterFactory> previousTokenFilters,
Function<String, TokenFilterFactory> allFilters
) {
final Analyzer analyzer = buildSynonymAnalyzer(tokenizer, charFilters, previousTokenFilters);
ReaderWithOrigin rulesFromSettings = getRulesFromSettings(environment);
ReaderWithOrigin rulesFromSettings = getRulesFromSettings(environment, context);
final SynonymMap synonyms = buildSynonyms(analyzer, rulesFromSettings);
final String name = name();
return new TokenFilterFactory() {

View file

@ -12,11 +12,11 @@ import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.synonym.SynonymFilter;
import org.apache.lucene.analysis.synonym.SynonymMap;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexService.IndexCreationContext;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AbstractTokenFilterFactory;
import org.elasticsearch.index.analysis.Analysis;
@ -27,7 +27,6 @@ import org.elasticsearch.index.analysis.TokenFilterFactory;
import org.elasticsearch.index.analysis.TokenizerFactory;
import org.elasticsearch.synonyms.SynonymsAPI;
import org.elasticsearch.synonyms.SynonymsManagementAPIService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Reader;
import java.io.StringReader;
@ -45,15 +44,13 @@ public class SynonymTokenFilterFactory extends AbstractTokenFilterFactory {
protected final Environment environment;
protected final AnalysisMode analysisMode;
private final SynonymsManagementAPIService synonymsManagementAPIService;
private final ThreadPool threadPool;
SynonymTokenFilterFactory(
IndexSettings indexSettings,
Environment env,
String name,
Settings settings,
SynonymsManagementAPIService synonymsManagementAPIService,
ThreadPool threadPool
SynonymsManagementAPIService synonymsManagementAPIService
) {
super(name, settings);
this.settings = settings;
@ -73,7 +70,6 @@ public class SynonymTokenFilterFactory extends AbstractTokenFilterFactory {
this.analysisMode = updateable ? AnalysisMode.SEARCH_TIME : AnalysisMode.ALL;
this.environment = env;
this.synonymsManagementAPIService = synonymsManagementAPIService;
this.threadPool = threadPool;
}
@Override
@ -88,13 +84,14 @@ public class SynonymTokenFilterFactory extends AbstractTokenFilterFactory {
@Override
public TokenFilterFactory getChainAwareTokenFilterFactory(
IndexCreationContext context,
TokenizerFactory tokenizer,
List<CharFilterFactory> charFilters,
List<TokenFilterFactory> previousTokenFilters,
Function<String, TokenFilterFactory> allFilters
) {
final Analyzer analyzer = buildSynonymAnalyzer(tokenizer, charFilters, previousTokenFilters);
ReaderWithOrigin rulesFromSettings = getRulesFromSettings(environment);
ReaderWithOrigin rulesFromSettings = getRulesFromSettings(environment, context);
final SynonymMap synonyms = buildSynonyms(analyzer, rulesFromSettings);
final String name = name();
return new TokenFilterFactory() {
@ -156,7 +153,7 @@ public class SynonymTokenFilterFactory extends AbstractTokenFilterFactory {
}
}
protected ReaderWithOrigin getRulesFromSettings(Environment env) {
protected ReaderWithOrigin getRulesFromSettings(Environment env, IndexCreationContext context) {
if (settings.getAsList("synonyms", null) != null) {
List<String> rulesList = Analysis.getWordList(env, settings, "synonyms");
StringBuilder sb = new StringBuilder();
@ -171,15 +168,17 @@ public class SynonymTokenFilterFactory extends AbstractTokenFilterFactory {
);
}
String synonymsSet = settings.get("synonyms_set", null);
// provide fake synonyms on master thread, as on Master an analyzer is built for validation only
if (MasterService.isMasterUpdateThread()) {
// provide fake synonyms on index creation and index metadata checks to ensure that we
// don't block a master thread
if (context != IndexCreationContext.RELOAD_ANALYZERS) {
return new ReaderWithOrigin(
new StringReader("fake rule => fake"),
"fake [" + synonymsSet + "] synonyms_set in .synonyms index"
"fake [" + synonymsSet + "] synonyms_set in .synonyms index",
synonymsSet
);
}
return new ReaderWithOrigin(
Analysis.getReaderFromIndex(synonymsSet, threadPool, synonymsManagementAPIService),
Analysis.getReaderFromIndex(synonymsSet, synonymsManagementAPIService),
"[" + synonymsSet + "] synonyms_set in .synonyms index",
synonymsSet
);

View file

@ -16,6 +16,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.IndexService.IndexCreationContext;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.IndexAnalyzers;
import org.elasticsearch.index.analysis.MyFilterTokenFilterFactory;
@ -64,7 +65,7 @@ public class CompoundAnalysisTests extends ESTestCase {
private List<String> analyze(Settings settings, String analyzerName, String text) throws IOException {
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("test", settings);
AnalysisModule analysisModule = createAnalysisModule(settings);
IndexAnalyzers indexAnalyzers = analysisModule.getAnalysisRegistry().build(idxSettings);
IndexAnalyzers indexAnalyzers = analysisModule.getAnalysisRegistry().build(IndexCreationContext.CREATE_INDEX, idxSettings);
Analyzer analyzer = indexAnalyzers.get(analyzerName).analyzer();
TokenStream stream = analyzer.tokenStream("", text);

View file

@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService.IndexCreationContext;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.IndexAnalyzers;
import org.elasticsearch.index.analysis.NamedAnalyzer;
@ -41,7 +42,7 @@ public class EdgeNGramTokenizerTests extends ESTokenStreamTestCase {
TestEnvironment.newEnvironment(settings),
Collections.singletonList(new CommonAnalysisPlugin()),
new StablePluginsRegistry()
).getAnalysisRegistry().build(idxSettings);
).getAnalysisRegistry().build(IndexCreationContext.CREATE_INDEX, idxSettings);
}
public void testPreConfiguredTokenizer() throws IOException {

View file

@ -13,6 +13,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.IndexService.IndexCreationContext;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.IndexAnalyzers;
import org.elasticsearch.index.analysis.NamedAnalyzer;
@ -44,7 +45,7 @@ public class MultiplexerTokenFilterTests extends ESTokenStreamTestCase {
TestEnvironment.newEnvironment(settings),
Collections.singletonList(new CommonAnalysisPlugin()),
new StablePluginsRegistry()
).getAnalysisRegistry().build(idxSettings);
).getAnalysisRegistry().build(IndexCreationContext.CREATE_INDEX, idxSettings);
try (NamedAnalyzer analyzer = indexAnalyzers.get("myAnalyzer")) {
assertNotNull(analyzer);
@ -79,7 +80,7 @@ public class MultiplexerTokenFilterTests extends ESTokenStreamTestCase {
TestEnvironment.newEnvironment(settings),
Collections.singletonList(new CommonAnalysisPlugin()),
new StablePluginsRegistry()
).getAnalysisRegistry().build(idxSettings);
).getAnalysisRegistry().build(IndexCreationContext.CREATE_INDEX, idxSettings);
try (NamedAnalyzer analyzer = indexAnalyzers.get("myAnalyzer")) {
assertNotNull(analyzer);

View file

@ -19,6 +19,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.IndexService.IndexCreationContext;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.IndexAnalyzers;
import org.elasticsearch.index.analysis.NamedAnalyzer;
@ -74,7 +75,7 @@ public class PredicateTokenScriptFilterTests extends ESTokenStreamTestCase {
new StablePluginsRegistry()
);
IndexAnalyzers analyzers = module.getAnalysisRegistry().build(idxSettings);
IndexAnalyzers analyzers = module.getAnalysisRegistry().build(IndexCreationContext.CREATE_INDEX, idxSettings);
try (NamedAnalyzer analyzer = analyzers.get("myAnalyzer")) {
assertNotNull(analyzer);

View file

@ -19,6 +19,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.IndexService.IndexCreationContext;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.IndexAnalyzers;
import org.elasticsearch.index.analysis.NamedAnalyzer;
@ -74,7 +75,7 @@ public class ScriptedConditionTokenFilterTests extends ESTokenStreamTestCase {
new StablePluginsRegistry()
);
IndexAnalyzers analyzers = module.getAnalysisRegistry().build(idxSettings);
IndexAnalyzers analyzers = module.getAnalysisRegistry().build(IndexCreationContext.CREATE_INDEX, idxSettings);
try (NamedAnalyzer analyzer = analyzers.get("myAnalyzer")) {
assertNotNull(analyzer);

View file

@ -268,7 +268,7 @@ public class SynonymsAnalysisTests extends ESTestCase {
for (String factory : bypassingFactories) {
TokenFilterFactory tff = plugin.getTokenFilters().get(factory).get(idxSettings, null, factory, settings);
TokenizerFactory tok = new KeywordTokenizerFactory(idxSettings, null, "keyword", settings);
SynonymTokenFilterFactory stff = new SynonymTokenFilterFactory(idxSettings, null, "synonym", settings, null, null);
SynonymTokenFilterFactory stff = new SynonymTokenFilterFactory(idxSettings, null, "synonym", settings, null);
Analyzer analyzer = SynonymTokenFilterFactory.buildSynonymAnalyzer(
tok,
Collections.emptyList(),
@ -338,7 +338,7 @@ public class SynonymsAnalysisTests extends ESTestCase {
for (String factory : disallowedFactories) {
TokenFilterFactory tff = plugin.getTokenFilters().get(factory).get(idxSettings, null, factory, settings);
TokenizerFactory tok = new KeywordTokenizerFactory(idxSettings, null, "keyword", settings);
SynonymTokenFilterFactory stff = new SynonymTokenFilterFactory(idxSettings, null, "synonym", settings, null, null);
SynonymTokenFilterFactory stff = new SynonymTokenFilterFactory(idxSettings, null, "synonym", settings, null);
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,

View file

@ -14,6 +14,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.IndexService.IndexCreationContext;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisTestsHelper;
import org.elasticsearch.index.analysis.IndexAnalyzers;
@ -196,7 +197,7 @@ public class WordDelimiterGraphTokenFilterFactoryTests extends BaseWordDelimiter
TestEnvironment.newEnvironment(settings),
Collections.singletonList(new CommonAnalysisPlugin()),
new StablePluginsRegistry()
).getAnalysisRegistry().build(idxSettings)
).getAnalysisRegistry().build(IndexCreationContext.CREATE_INDEX, idxSettings)
) {
NamedAnalyzer analyzer = indexAnalyzers.get("my_analyzer");
@ -221,7 +222,7 @@ public class WordDelimiterGraphTokenFilterFactoryTests extends BaseWordDelimiter
TestEnvironment.newEnvironment(settings),
Collections.singletonList(new CommonAnalysisPlugin()),
new StablePluginsRegistry()
).getAnalysisRegistry().build(idxSettings)
).getAnalysisRegistry().build(IndexCreationContext.CREATE_INDEX, idxSettings)
) {
NamedAnalyzer analyzer = indexAnalyzers.get("my_analyzer");

View file

@ -141,6 +141,25 @@ setup:
query: bye
- match: { hits.total.value: 1 }
- do:
indices.close:
index: my_index
- do:
indices.open:
index: my_index
# Confirm that the index analyzers are reloaded
- do:
search:
index: my_index
body:
query:
match:
my_field:
query: hola
- match: { hits.total.value: 1 }
---
"Delete the synonyms set and confirm failed reload analyzers details":
- do: