mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-24 23:27:25 -04:00
* Fix shadowed vars pt4 (#80842) Part of #19752. Fix more instances where local variable names were shadowing field names. * Fix shadowed vars pt5 (#80855) Part of #19752. Fix more instances where local variable names were shadowing field names. * Formatting Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
parent
a2c1481ab8
commit
0a7392a1e8
43 changed files with 184 additions and 211 deletions
|
@ -365,7 +365,8 @@ public class HiddenFieldCheck extends AbstractCheck {
|
||||||
// we should not capitalize the first character if the second
|
// we should not capitalize the first character if the second
|
||||||
// one is a capital one, since according to JavaBeans spec
|
// one is a capital one, since according to JavaBeans spec
|
||||||
// setXYzz() is a setter for XYzz property, not for xYzz one.
|
// setXYzz() is a setter for XYzz property, not for xYzz one.
|
||||||
if (name.length() == 1 || Character.isUpperCase(name.charAt(1)) == false) {
|
// @pugnascotia: unless the first char is 'x'.
|
||||||
|
if (name.length() == 1 || (Character.isUpperCase(name.charAt(1)) == false || name.charAt(0) == 'x')) {
|
||||||
setterName = name.substring(0, 1).toUpperCase(Locale.ENGLISH) + name.substring(1);
|
setterName = name.substring(0, 1).toUpperCase(Locale.ENGLISH) + name.substring(1);
|
||||||
}
|
}
|
||||||
return setterName;
|
return setterName;
|
||||||
|
|
|
@ -132,9 +132,9 @@ public class ContextApiSpecGenerator {
|
||||||
return new FileInputStream(classPath.toFile());
|
return new FileInputStream(classPath.toFile());
|
||||||
} else {
|
} else {
|
||||||
String packageName = className.substring(0, className.lastIndexOf("."));
|
String packageName = className.substring(0, className.lastIndexOf("."));
|
||||||
Path root = pkgRoots.get(packageName);
|
Path packageRoot = pkgRoots.get(packageName);
|
||||||
if (root != null) {
|
if (packageRoot != null) {
|
||||||
Path classPath = root.resolve(className.substring(className.lastIndexOf(".") + 1) + ".java");
|
Path classPath = packageRoot.resolve(className.substring(className.lastIndexOf(".") + 1) + ".java");
|
||||||
return new FileInputStream(classPath.toFile());
|
return new FileInputStream(classPath.toFile());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -209,16 +209,16 @@ public class ContextGeneratorCommon {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> Set<T> getCommon(List<PainlessContextInfo> contexts, Function<PainlessContextInfo, List<T>> getter) {
|
private <T> Set<T> getCommon(List<PainlessContextInfo> painlessContexts, Function<PainlessContextInfo, List<T>> getter) {
|
||||||
Map<T, Integer> infoCounts = new HashMap<>();
|
Map<T, Integer> infoCounts = new HashMap<>();
|
||||||
for (PainlessContextInfo contextInfo : contexts) {
|
for (PainlessContextInfo contextInfo : painlessContexts) {
|
||||||
for (T info : getter.apply(contextInfo)) {
|
for (T info : getter.apply(contextInfo)) {
|
||||||
infoCounts.merge(info, 1, Integer::sum);
|
infoCounts.merge(info, 1, Integer::sum);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return infoCounts.entrySet()
|
return infoCounts.entrySet()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(e -> e.getValue() == contexts.size())
|
.filter(e -> e.getValue() == painlessContexts.size())
|
||||||
.map(Map.Entry::getKey)
|
.map(Map.Entry::getKey)
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
}
|
}
|
||||||
|
|
|
@ -167,12 +167,12 @@ final class Compiler {
|
||||||
Compiler(Class<?> scriptClass, Class<?> factoryClass, Class<?> statefulFactoryClass, PainlessLookup painlessLookup) {
|
Compiler(Class<?> scriptClass, Class<?> factoryClass, Class<?> statefulFactoryClass, PainlessLookup painlessLookup) {
|
||||||
this.scriptClass = scriptClass;
|
this.scriptClass = scriptClass;
|
||||||
this.painlessLookup = painlessLookup;
|
this.painlessLookup = painlessLookup;
|
||||||
Map<String, Class<?>> additionalClasses = new HashMap<>();
|
Map<String, Class<?>> additionalClassMap = new HashMap<>();
|
||||||
additionalClasses.put(scriptClass.getName(), scriptClass);
|
additionalClassMap.put(scriptClass.getName(), scriptClass);
|
||||||
addFactoryMethod(additionalClasses, factoryClass, "newInstance");
|
addFactoryMethod(additionalClassMap, factoryClass, "newInstance");
|
||||||
addFactoryMethod(additionalClasses, statefulFactoryClass, "newFactory");
|
addFactoryMethod(additionalClassMap, statefulFactoryClass, "newFactory");
|
||||||
addFactoryMethod(additionalClasses, statefulFactoryClass, "newInstance");
|
addFactoryMethod(additionalClassMap, statefulFactoryClass, "newInstance");
|
||||||
this.additionalClasses = Collections.unmodifiableMap(additionalClasses);
|
this.additionalClasses = Collections.unmodifiableMap(additionalClassMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void addFactoryMethod(Map<String, Class<?>> additionalClasses, Class<?> factoryClass, String methodName) {
|
private static void addFactoryMethod(Map<String, Class<?>> additionalClasses, Class<?> factoryClass, String methodName) {
|
||||||
|
|
|
@ -171,14 +171,14 @@ public final class CompilerSettings {
|
||||||
* annotation.
|
* annotation.
|
||||||
*/
|
*/
|
||||||
public Map<String, Object> asMap() {
|
public Map<String, Object> asMap() {
|
||||||
int regexLimitFactor = this.regexLimitFactor;
|
int regexLimitFactorToApply = this.regexLimitFactor;
|
||||||
if (regexesEnabled == RegexEnabled.TRUE) {
|
if (regexesEnabled == RegexEnabled.TRUE) {
|
||||||
regexLimitFactor = Augmentation.UNLIMITED_PATTERN_FACTOR;
|
regexLimitFactorToApply = Augmentation.UNLIMITED_PATTERN_FACTOR;
|
||||||
} else if (regexesEnabled == RegexEnabled.FALSE) {
|
} else if (regexesEnabled == RegexEnabled.FALSE) {
|
||||||
regexLimitFactor = Augmentation.DISABLED_PATTERN_FACTOR;
|
regexLimitFactorToApply = Augmentation.DISABLED_PATTERN_FACTOR;
|
||||||
}
|
}
|
||||||
Map<String, Object> map = new HashMap<>();
|
Map<String, Object> map = new HashMap<>();
|
||||||
map.put("regex_limit_factor", regexLimitFactor);
|
map.put("regex_limit_factor", regexLimitFactorToApply);
|
||||||
|
|
||||||
// for testing only
|
// for testing only
|
||||||
map.put("testInject0", testInject0);
|
map.put("testInject0", testInject0);
|
||||||
|
|
|
@ -155,14 +155,14 @@ public final class DefBootstrap {
|
||||||
/**
|
/**
|
||||||
* Does a slow lookup against the whitelist.
|
* Does a slow lookup against the whitelist.
|
||||||
*/
|
*/
|
||||||
private MethodHandle lookup(int flavor, String name, Class<?> receiver) throws Throwable {
|
private MethodHandle lookup(int flavorValue, String nameValue, Class<?> receiver) throws Throwable {
|
||||||
switch (flavor) {
|
switch (flavorValue) {
|
||||||
case METHOD_CALL:
|
case METHOD_CALL:
|
||||||
return Def.lookupMethod(painlessLookup, functions, constants, methodHandlesLookup, type(), receiver, name, args);
|
return Def.lookupMethod(painlessLookup, functions, constants, methodHandlesLookup, type(), receiver, nameValue, args);
|
||||||
case LOAD:
|
case LOAD:
|
||||||
return Def.lookupGetter(painlessLookup, receiver, name);
|
return Def.lookupGetter(painlessLookup, receiver, nameValue);
|
||||||
case STORE:
|
case STORE:
|
||||||
return Def.lookupSetter(painlessLookup, receiver, name);
|
return Def.lookupSetter(painlessLookup, receiver, nameValue);
|
||||||
case ARRAY_LOAD:
|
case ARRAY_LOAD:
|
||||||
return Def.lookupArrayLoad(receiver);
|
return Def.lookupArrayLoad(receiver);
|
||||||
case ARRAY_STORE:
|
case ARRAY_STORE:
|
||||||
|
@ -170,7 +170,15 @@ public final class DefBootstrap {
|
||||||
case ITERATOR:
|
case ITERATOR:
|
||||||
return Def.lookupIterator(receiver);
|
return Def.lookupIterator(receiver);
|
||||||
case REFERENCE:
|
case REFERENCE:
|
||||||
return Def.lookupReference(painlessLookup, functions, constants, methodHandlesLookup, (String) args[0], receiver, name);
|
return Def.lookupReference(
|
||||||
|
painlessLookup,
|
||||||
|
functions,
|
||||||
|
constants,
|
||||||
|
methodHandlesLookup,
|
||||||
|
(String) args[0],
|
||||||
|
receiver,
|
||||||
|
nameValue
|
||||||
|
);
|
||||||
case INDEX_NORMALIZE:
|
case INDEX_NORMALIZE:
|
||||||
return Def.lookupIndexNormalize(receiver);
|
return Def.lookupIndexNormalize(receiver);
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -81,21 +81,21 @@ public final class PainlessScriptEngine implements ScriptEngine {
|
||||||
defaultCompilerSettings.setRegexesEnabled(CompilerSettings.REGEX_ENABLED.get(settings));
|
defaultCompilerSettings.setRegexesEnabled(CompilerSettings.REGEX_ENABLED.get(settings));
|
||||||
defaultCompilerSettings.setRegexLimitFactor(CompilerSettings.REGEX_LIMIT_FACTOR.get(settings));
|
defaultCompilerSettings.setRegexLimitFactor(CompilerSettings.REGEX_LIMIT_FACTOR.get(settings));
|
||||||
|
|
||||||
Map<ScriptContext<?>, Compiler> contextsToCompilers = new HashMap<>();
|
Map<ScriptContext<?>, Compiler> mutableContextsToCompilers = new HashMap<>();
|
||||||
Map<ScriptContext<?>, PainlessLookup> contextsToLookups = new HashMap<>();
|
Map<ScriptContext<?>, PainlessLookup> mutableContextsToLookups = new HashMap<>();
|
||||||
|
|
||||||
for (Map.Entry<ScriptContext<?>, List<Whitelist>> entry : contexts.entrySet()) {
|
for (Map.Entry<ScriptContext<?>, List<Whitelist>> entry : contexts.entrySet()) {
|
||||||
ScriptContext<?> context = entry.getKey();
|
ScriptContext<?> context = entry.getKey();
|
||||||
PainlessLookup lookup = PainlessLookupBuilder.buildFromWhitelists(entry.getValue());
|
PainlessLookup lookup = PainlessLookupBuilder.buildFromWhitelists(entry.getValue());
|
||||||
contextsToCompilers.put(
|
mutableContextsToCompilers.put(
|
||||||
context,
|
context,
|
||||||
new Compiler(context.instanceClazz, context.factoryClazz, context.statefulFactoryClazz, lookup)
|
new Compiler(context.instanceClazz, context.factoryClazz, context.statefulFactoryClazz, lookup)
|
||||||
);
|
);
|
||||||
contextsToLookups.put(context, lookup);
|
mutableContextsToLookups.put(context, lookup);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.contextsToCompilers = Collections.unmodifiableMap(contextsToCompilers);
|
this.contextsToCompilers = Collections.unmodifiableMap(mutableContextsToCompilers);
|
||||||
this.contextsToLookups = Collections.unmodifiableMap(contextsToLookups);
|
this.contextsToLookups = Collections.unmodifiableMap(mutableContextsToLookups);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<ScriptContext<?>, PainlessLookup> getContextsToLookups() {
|
public Map<ScriptContext<?>, PainlessLookup> getContextsToLookups() {
|
||||||
|
|
|
@ -39,6 +39,7 @@ public class ScriptClassInfo {
|
||||||
public final List<FunctionTable.LocalFunction> converters;
|
public final List<FunctionTable.LocalFunction> converters;
|
||||||
public final FunctionTable.LocalFunction defConverter;
|
public final FunctionTable.LocalFunction defConverter;
|
||||||
|
|
||||||
|
@SuppressWarnings("HiddenField")
|
||||||
public ScriptClassInfo(PainlessLookup painlessLookup, Class<?> baseClass) {
|
public ScriptClassInfo(PainlessLookup painlessLookup, Class<?> baseClass) {
|
||||||
this.baseClass = baseClass;
|
this.baseClass = baseClass;
|
||||||
|
|
||||||
|
|
|
@ -196,9 +196,9 @@ public class PainlessExecuteAction extends ActionType<PainlessExecuteAction.Resp
|
||||||
ContextSetup(StreamInput in) throws IOException {
|
ContextSetup(StreamInput in) throws IOException {
|
||||||
index = in.readOptionalString();
|
index = in.readOptionalString();
|
||||||
document = in.readOptionalBytesReference();
|
document = in.readOptionalBytesReference();
|
||||||
String xContentType = in.readOptionalString();
|
String optionalXContentType = in.readOptionalString();
|
||||||
if (xContentType != null) {
|
if (optionalXContentType != null) {
|
||||||
this.xContentType = XContentType.fromMediaType(xContentType);
|
this.xContentType = XContentType.fromMediaType(optionalXContentType);
|
||||||
}
|
}
|
||||||
query = in.readOptionalNamedWriteable(QueryBuilder.class);
|
query = in.readOptionalNamedWriteable(QueryBuilder.class);
|
||||||
}
|
}
|
||||||
|
|
|
@ -179,8 +179,8 @@ public final class Walker extends PainlessParserBaseVisitor<ANode> {
|
||||||
return identifier++;
|
return identifier++;
|
||||||
}
|
}
|
||||||
|
|
||||||
private SourceContext buildAntlrTree(String source) {
|
private SourceContext buildAntlrTree(String sourceString) {
|
||||||
ANTLRInputStream stream = new ANTLRInputStream(source);
|
ANTLRInputStream stream = new ANTLRInputStream(sourceString);
|
||||||
PainlessLexer lexer = new EnhancedPainlessLexer(stream, sourceName);
|
PainlessLexer lexer = new EnhancedPainlessLexer(stream, sourceName);
|
||||||
PainlessParser parser = new PainlessParser(new CommonTokenStream(lexer));
|
PainlessParser parser = new PainlessParser(new CommonTokenStream(lexer));
|
||||||
ParserErrorStrategy strategy = new ParserErrorStrategy(sourceName);
|
ParserErrorStrategy strategy = new ParserErrorStrategy(sourceName);
|
||||||
|
|
|
@ -18,7 +18,7 @@ public class ForLoopNode extends ConditionNode {
|
||||||
private IRNode initializerNode;
|
private IRNode initializerNode;
|
||||||
private ExpressionNode afterthoughtNode;
|
private ExpressionNode afterthoughtNode;
|
||||||
|
|
||||||
public void setInitialzerNode(IRNode initializerNode) {
|
public void setInitializerNode(IRNode initializerNode) {
|
||||||
this.initializerNode = initializerNode;
|
this.initializerNode = initializerNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -116,7 +116,7 @@ public class DefaultConstantFoldingOptimizationPhase extends IRTreeBaseVisitor<C
|
||||||
@Override
|
@Override
|
||||||
public void visitForLoop(ForLoopNode irForLoopNode, Consumer<ExpressionNode> scope) {
|
public void visitForLoop(ForLoopNode irForLoopNode, Consumer<ExpressionNode> scope) {
|
||||||
if (irForLoopNode.getInitializerNode() != null) {
|
if (irForLoopNode.getInitializerNode() != null) {
|
||||||
irForLoopNode.getInitializerNode().visit(this, irForLoopNode::setInitialzerNode);
|
irForLoopNode.getInitializerNode().visit(this, irForLoopNode::setInitializerNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (irForLoopNode.getConditionNode() != null) {
|
if (irForLoopNode.getConditionNode() != null) {
|
||||||
|
|
|
@ -714,7 +714,7 @@ public class DefaultUserTreeToIRTreePhase implements UserTreeVisitor<ScriptScope
|
||||||
@Override
|
@Override
|
||||||
public void visitFor(SFor userForNode, ScriptScope scriptScope) {
|
public void visitFor(SFor userForNode, ScriptScope scriptScope) {
|
||||||
ForLoopNode irForLoopNode = new ForLoopNode(userForNode.getLocation());
|
ForLoopNode irForLoopNode = new ForLoopNode(userForNode.getLocation());
|
||||||
irForLoopNode.setInitialzerNode(visit(userForNode.getInitializerNode(), scriptScope));
|
irForLoopNode.setInitializerNode(visit(userForNode.getInitializerNode(), scriptScope));
|
||||||
irForLoopNode.setConditionNode(injectCast(userForNode.getConditionNode(), scriptScope));
|
irForLoopNode.setConditionNode(injectCast(userForNode.getConditionNode(), scriptScope));
|
||||||
irForLoopNode.setAfterthoughtNode((ExpressionNode) visit(userForNode.getAfterthoughtNode(), scriptScope));
|
irForLoopNode.setAfterthoughtNode((ExpressionNode) visit(userForNode.getAfterthoughtNode(), scriptScope));
|
||||||
irForLoopNode.setBlockNode((BlockNode) visit(userForNode.getBlockNode(), scriptScope));
|
irForLoopNode.setBlockNode((BlockNode) visit(userForNode.getBlockNode(), scriptScope));
|
||||||
|
|
|
@ -143,21 +143,23 @@ public class WriteScope {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Creates a class scope with the script scope as a parent. */
|
/** Creates a class scope with the script scope as a parent. */
|
||||||
public WriteScope newClassScope(ClassWriter classWriter) {
|
public WriteScope newClassScope(ClassWriter writer) {
|
||||||
return new WriteScope(this, classWriter);
|
return new WriteScope(this, writer);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Creates a method scope with the class scope as a parent and parameters from the method signature. */
|
/** Creates a method scope with the class scope as a parent and parameters from the method signature. */
|
||||||
public WriteScope newMethodScope(MethodWriter methodWriter) {
|
public WriteScope newMethodScope(MethodWriter writer) {
|
||||||
return new WriteScope(this, methodWriter);
|
return new WriteScope(this, writer);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Creates a loop scope with labels for where continue and break instructions should jump to. */
|
/** Creates a loop scope with labels for where continue and break instructions should jump to. */
|
||||||
|
@SuppressWarnings("HiddenField")
|
||||||
public WriteScope newLoopScope(Label continueLabel, Label breakLabel) {
|
public WriteScope newLoopScope(Label continueLabel, Label breakLabel) {
|
||||||
return new WriteScope(this, continueLabel, breakLabel);
|
return new WriteScope(this, continueLabel, breakLabel);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Creates a try scope with labels for where and exception should jump to. */
|
/** Creates a try scope with labels for where and exception should jump to. */
|
||||||
|
@SuppressWarnings("HiddenField")
|
||||||
public WriteScope newTryScope(Label tryBeginLabel, Label tryEndLabel, Label catchesEndLabel) {
|
public WriteScope newTryScope(Label tryBeginLabel, Label tryEndLabel, Label catchesEndLabel) {
|
||||||
return new WriteScope(this, tryBeginLabel, tryEndLabel, catchesEndLabel);
|
return new WriteScope(this, tryBeginLabel, tryEndLabel, catchesEndLabel);
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,8 +82,8 @@ public class BindingsTests extends ScriptTestCase {
|
||||||
this.value = value;
|
this.value = value;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setInstanceBindingValue(int value) {
|
public void setInstanceBindingValue(int instanceBindingValue) {
|
||||||
this.value = value;
|
this.value = instanceBindingValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getInstanceBindingValue() {
|
public int getInstanceBindingValue() {
|
||||||
|
|
|
@ -105,8 +105,8 @@ public class FeatureTestObject {
|
||||||
return this.x * fn.apply(arg) * (inject1 + inject2 + inject3);
|
return this.x * fn.apply(arg) * (inject1 + inject2 + inject3);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Double mixedAdd(int i, Byte b, char c, Float f) {
|
public Double mixedAdd(int someInt, Byte b, char c, Float f) {
|
||||||
return (double) (i + b + c + f);
|
return (double) (someInt + b + c + f);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** method taking two functions! */
|
/** method taking two functions! */
|
||||||
|
|
|
@ -310,13 +310,13 @@ public class RegexLimitTests extends ScriptTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testSnippetRegex() {
|
public void testSnippetRegex() {
|
||||||
String charSequence = String.join("", Collections.nCopies(100, "abcdef123456"));
|
String longCharSequence = String.join("", Collections.nCopies(100, "abcdef123456"));
|
||||||
String script = "if ('" + charSequence + "' ==~ " + pattern + ") { return 100; } return 200";
|
String script = "if ('" + longCharSequence + "' ==~ " + pattern + ") { return 100; } return 200";
|
||||||
|
|
||||||
setRegexLimitFactor(1);
|
setRegexLimitFactor(1);
|
||||||
CircuitBreakingException cbe = expectScriptThrows(CircuitBreakingException.class, () -> exec(script));
|
CircuitBreakingException cbe = expectScriptThrows(CircuitBreakingException.class, () -> exec(script));
|
||||||
assertTrue(cbe.getMessage().contains(regexCircuitMessage));
|
assertTrue(cbe.getMessage().contains(regexCircuitMessage));
|
||||||
assertTrue(cbe.getMessage().contains(charSequence.subSequence(0, 61) + "..."));
|
assertTrue(cbe.getMessage().contains(longCharSequence.subSequence(0, 61) + "..."));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setRegexLimitFactor(int factor) {
|
private void setRegexLimitFactor(int factor) {
|
||||||
|
|
|
@ -105,11 +105,11 @@ public class KuromojiTokenizerFactory extends AbstractTokenizerFactory {
|
||||||
@Override
|
@Override
|
||||||
public Tokenizer create() {
|
public Tokenizer create() {
|
||||||
JapaneseTokenizer t = new JapaneseTokenizer(userDictionary, discardPunctuation, discardCompoundToken, mode);
|
JapaneseTokenizer t = new JapaneseTokenizer(userDictionary, discardPunctuation, discardCompoundToken, mode);
|
||||||
int nBestCost = this.nBestCost;
|
int nBestCostValue = this.nBestCost;
|
||||||
if (nBestExamples != null) {
|
if (nBestExamples != null) {
|
||||||
nBestCost = Math.max(nBestCost, t.calcNBestCost(nBestExamples));
|
nBestCostValue = Math.max(nBestCostValue, t.calcNBestCost(nBestExamples));
|
||||||
}
|
}
|
||||||
t.setNBestCost(nBestCost);
|
t.setNBestCost(nBestCostValue);
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -147,22 +147,22 @@ public abstract class AbstractAzureComputeServiceTestCase extends ESIntegTestCas
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected AzureSeedHostsProvider createSeedHostsProvider(
|
protected AzureSeedHostsProvider createSeedHostsProvider(
|
||||||
final Settings settings,
|
final Settings settingsToUse,
|
||||||
final AzureComputeService azureComputeService,
|
final AzureComputeService azureComputeService,
|
||||||
final TransportService transportService,
|
final TransportService transportService,
|
||||||
final NetworkService networkService
|
final NetworkService networkService
|
||||||
) {
|
) {
|
||||||
return new AzureSeedHostsProvider(settings, azureComputeService, transportService, networkService) {
|
return new AzureSeedHostsProvider(settingsToUse, azureComputeService, transportService, networkService) {
|
||||||
@Override
|
@Override
|
||||||
protected String resolveInstanceAddress(final HostType hostType, final RoleInstance instance) {
|
protected String resolveInstanceAddress(final HostType hostTypeValue, final RoleInstance instance) {
|
||||||
if (hostType == HostType.PRIVATE_IP) {
|
if (hostTypeValue == HostType.PRIVATE_IP) {
|
||||||
DiscoveryNode discoveryNode = nodes.get(instance.getInstanceName());
|
DiscoveryNode discoveryNode = nodes.get(instance.getInstanceName());
|
||||||
if (discoveryNode != null) {
|
if (discoveryNode != null) {
|
||||||
// Format the InetSocketAddress to a format that contains the port number
|
// Format the InetSocketAddress to a format that contains the port number
|
||||||
return NetworkAddress.format(discoveryNode.getAddress().address());
|
return NetworkAddress.format(discoveryNode.getAddress().address());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return super.resolveInstanceAddress(hostType, instance);
|
return super.resolveInstanceAddress(hostTypeValue, instance);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -220,15 +220,15 @@ public class AzureSeedHostsProvider implements SeedHostsProvider {
|
||||||
return dynamicHosts;
|
return dynamicHosts;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String resolveInstanceAddress(final HostType hostType, final RoleInstance instance) {
|
protected String resolveInstanceAddress(final HostType hostTypeValue, final RoleInstance instance) {
|
||||||
if (hostType == HostType.PRIVATE_IP) {
|
if (hostTypeValue == HostType.PRIVATE_IP) {
|
||||||
final InetAddress privateIp = instance.getIPAddress();
|
final InetAddress privateIp = instance.getIPAddress();
|
||||||
if (privateIp != null) {
|
if (privateIp != null) {
|
||||||
return InetAddresses.toUriString(privateIp);
|
return InetAddresses.toUriString(privateIp);
|
||||||
} else {
|
} else {
|
||||||
logger.trace("no private ip provided. ignoring [{}]...", instance.getInstanceName());
|
logger.trace("no private ip provided. ignoring [{}]...", instance.getInstanceName());
|
||||||
}
|
}
|
||||||
} else if (hostType == HostType.PUBLIC_IP) {
|
} else if (hostTypeValue == HostType.PUBLIC_IP) {
|
||||||
for (InstanceEndpoint endpoint : instance.getInstanceEndpoints()) {
|
for (InstanceEndpoint endpoint : instance.getInstanceEndpoints()) {
|
||||||
if (publicEndpointName.equals(endpoint.getName())) {
|
if (publicEndpointName.equals(endpoint.getName())) {
|
||||||
return NetworkAddress.format(new InetSocketAddress(endpoint.getVirtualIPAddress(), endpoint.getPort()));
|
return NetworkAddress.format(new InetSocketAddress(endpoint.getVirtualIPAddress(), endpoint.getPort()));
|
||||||
|
|
|
@ -57,12 +57,12 @@ public class AzureDiscoveryPlugin extends Plugin implements DiscoveryPlugin {
|
||||||
|
|
||||||
// Used for testing
|
// Used for testing
|
||||||
protected AzureSeedHostsProvider createSeedHostsProvider(
|
protected AzureSeedHostsProvider createSeedHostsProvider(
|
||||||
final Settings settings,
|
final Settings settingsToUse,
|
||||||
final AzureComputeService azureComputeService,
|
final AzureComputeService azureComputeService,
|
||||||
final TransportService transportService,
|
final TransportService transportService,
|
||||||
final NetworkService networkService
|
final NetworkService networkService
|
||||||
) {
|
) {
|
||||||
return new AzureSeedHostsProvider(settings, azureComputeService, transportService, networkService);
|
return new AzureSeedHostsProvider(settingsToUse, azureComputeService, transportService, networkService);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -96,7 +96,7 @@ class AwsEc2SeedHostsProvider implements SeedHostsProvider {
|
||||||
|
|
||||||
protected List<TransportAddress> fetchDynamicNodes() {
|
protected List<TransportAddress> fetchDynamicNodes() {
|
||||||
|
|
||||||
final List<TransportAddress> dynamicHosts = new ArrayList<>();
|
final List<TransportAddress> dynamicHostAddresses = new ArrayList<>();
|
||||||
|
|
||||||
final DescribeInstancesResult descInstances;
|
final DescribeInstancesResult descInstances;
|
||||||
try (AmazonEc2Reference clientReference = awsEc2Service.client()) {
|
try (AmazonEc2Reference clientReference = awsEc2Service.client()) {
|
||||||
|
@ -109,7 +109,7 @@ class AwsEc2SeedHostsProvider implements SeedHostsProvider {
|
||||||
} catch (final AmazonClientException e) {
|
} catch (final AmazonClientException e) {
|
||||||
logger.info("Exception while retrieving instance list from AWS API: {}", e.getMessage());
|
logger.info("Exception while retrieving instance list from AWS API: {}", e.getMessage());
|
||||||
logger.debug("Full exception:", e);
|
logger.debug("Full exception:", e);
|
||||||
return dynamicHosts;
|
return dynamicHostAddresses;
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.trace("finding seed nodes...");
|
logger.trace("finding seed nodes...");
|
||||||
|
@ -164,8 +164,8 @@ class AwsEc2SeedHostsProvider implements SeedHostsProvider {
|
||||||
// Reading the node host from its metadata
|
// Reading the node host from its metadata
|
||||||
final String tagName = hostType.substring(TAG_PREFIX.length());
|
final String tagName = hostType.substring(TAG_PREFIX.length());
|
||||||
logger.debug("reading hostname from [{}] instance tag", tagName);
|
logger.debug("reading hostname from [{}] instance tag", tagName);
|
||||||
final List<Tag> tags = instance.getTags();
|
final List<Tag> tagList = instance.getTags();
|
||||||
for (final Tag tag : tags) {
|
for (final Tag tag : tagList) {
|
||||||
if (tag.getKey().equals(tagName)) {
|
if (tag.getKey().equals(tagName)) {
|
||||||
address = tag.getValue();
|
address = tag.getValue();
|
||||||
logger.debug("using [{}] as the instance address", address);
|
logger.debug("using [{}] as the instance address", address);
|
||||||
|
@ -179,7 +179,7 @@ class AwsEc2SeedHostsProvider implements SeedHostsProvider {
|
||||||
final TransportAddress[] addresses = transportService.addressesFromString(address);
|
final TransportAddress[] addresses = transportService.addressesFromString(address);
|
||||||
for (int i = 0; i < addresses.length; i++) {
|
for (int i = 0; i < addresses.length; i++) {
|
||||||
logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]);
|
logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]);
|
||||||
dynamicHosts.add(addresses[i]);
|
dynamicHostAddresses.add(addresses[i]);
|
||||||
}
|
}
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
final String finalAddress = address;
|
final String finalAddress = address;
|
||||||
|
@ -198,9 +198,9 @@ class AwsEc2SeedHostsProvider implements SeedHostsProvider {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug("using dynamic transport addresses {}", dynamicHosts);
|
logger.debug("using dynamic transport addresses {}", dynamicHostAddresses);
|
||||||
|
|
||||||
return dynamicHosts;
|
return dynamicHostAddresses;
|
||||||
}
|
}
|
||||||
|
|
||||||
private DescribeInstancesRequest buildDescribeInstancesRequest() {
|
private DescribeInstancesRequest buildDescribeInstancesRequest() {
|
||||||
|
|
|
@ -28,14 +28,14 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
class AwsEc2ServiceImpl implements AwsEc2Service {
|
class AwsEc2ServiceImpl implements AwsEc2Service {
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(AwsEc2ServiceImpl.class);
|
private static final Logger LOGGER = LogManager.getLogger(AwsEc2ServiceImpl.class);
|
||||||
|
|
||||||
private final AtomicReference<LazyInitializable<AmazonEc2Reference, ElasticsearchException>> lazyClientReference =
|
private final AtomicReference<LazyInitializable<AmazonEc2Reference, ElasticsearchException>> lazyClientReference =
|
||||||
new AtomicReference<>();
|
new AtomicReference<>();
|
||||||
|
|
||||||
private AmazonEC2 buildClient(Ec2ClientSettings clientSettings) {
|
private AmazonEC2 buildClient(Ec2ClientSettings clientSettings) {
|
||||||
final AWSCredentialsProvider credentials = buildCredentials(logger, clientSettings);
|
final AWSCredentialsProvider credentials = buildCredentials(LOGGER, clientSettings);
|
||||||
final ClientConfiguration configuration = buildConfiguration(logger, clientSettings);
|
final ClientConfiguration configuration = buildConfiguration(LOGGER, clientSettings);
|
||||||
return buildClient(credentials, configuration, clientSettings.endpoint);
|
return buildClient(credentials, configuration, clientSettings.endpoint);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ class AwsEc2ServiceImpl implements AwsEc2Service {
|
||||||
.withCredentials(credentials)
|
.withCredentials(credentials)
|
||||||
.withClientConfiguration(configuration);
|
.withClientConfiguration(configuration);
|
||||||
if (Strings.hasText(endpoint)) {
|
if (Strings.hasText(endpoint)) {
|
||||||
logger.debug("using explicit ec2 endpoint [{}]", endpoint);
|
LOGGER.debug("using explicit ec2 endpoint [{}]", endpoint);
|
||||||
builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, null));
|
builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, null));
|
||||||
}
|
}
|
||||||
return SocketAccess.doPrivileged(builder::build);
|
return SocketAccess.doPrivileged(builder::build);
|
||||||
|
|
|
@ -43,7 +43,7 @@ import java.util.function.Supplier;
|
||||||
|
|
||||||
public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin, ReloadablePlugin {
|
public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin, ReloadablePlugin {
|
||||||
|
|
||||||
private static Logger logger = LogManager.getLogger(Ec2DiscoveryPlugin.class);
|
private static final Logger logger = LogManager.getLogger(Ec2DiscoveryPlugin.class);
|
||||||
public static final String EC2 = "ec2";
|
public static final String EC2 = "ec2";
|
||||||
|
|
||||||
static {
|
static {
|
||||||
|
@ -80,7 +80,7 @@ public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin, Reloa
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public NetworkService.CustomNameResolver getCustomNameResolver(Settings settings) {
|
public NetworkService.CustomNameResolver getCustomNameResolver(Settings _settings) {
|
||||||
logger.debug("Register _ec2_, _ec2:xxx_ network names");
|
logger.debug("Register _ec2_, _ec2:xxx_ network names");
|
||||||
return new Ec2NameResolver();
|
return new Ec2NameResolver();
|
||||||
}
|
}
|
||||||
|
@ -171,9 +171,9 @@ public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin, Reloa
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reload(Settings settings) {
|
public void reload(Settings settingsToLoad) {
|
||||||
// secure settings should be readable
|
// secure settings should be readable
|
||||||
final Ec2ClientSettings clientSettings = Ec2ClientSettings.getClientSettings(settings);
|
final Ec2ClientSettings clientSettings = Ec2ClientSettings.getClientSettings(settingsToLoad);
|
||||||
ec2Service.refreshAndClearCache(clientSettings);
|
ec2Service.refreshAndClearCache(clientSettings);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,14 +83,14 @@ public class GceDiscoveryPlugin extends Plugin implements DiscoveryPlugin, Close
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public NetworkService.CustomNameResolver getCustomNameResolver(Settings settings) {
|
public NetworkService.CustomNameResolver getCustomNameResolver(Settings settingsToUse) {
|
||||||
logger.debug("Register _gce_, _gce:xxx network names");
|
logger.debug("Register _gce_, _gce:xxx network names");
|
||||||
return new GceNameResolver(new GceMetadataService(settings));
|
return new GceNameResolver(new GceMetadataService(settingsToUse));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Setting<?>> getSettings() {
|
public List<Setting<?>> getSettings() {
|
||||||
List<Setting<?>> settings = new ArrayList<>(
|
List<Setting<?>> settingList = new ArrayList<>(
|
||||||
Arrays.asList(
|
Arrays.asList(
|
||||||
// Register GCE settings
|
// Register GCE settings
|
||||||
GceInstancesService.PROJECT_SETTING,
|
GceInstancesService.PROJECT_SETTING,
|
||||||
|
@ -103,10 +103,10 @@ public class GceDiscoveryPlugin extends Plugin implements DiscoveryPlugin, Close
|
||||||
);
|
);
|
||||||
|
|
||||||
if (ALLOW_REROUTE_GCE_SETTINGS) {
|
if (ALLOW_REROUTE_GCE_SETTINGS) {
|
||||||
settings.add(GceMetadataService.GCE_HOST);
|
settingList.add(GceMetadataService.GCE_HOST);
|
||||||
settings.add(GceInstancesServiceImpl.GCE_ROOT_URL);
|
settingList.add(GceInstancesServiceImpl.GCE_ROOT_URL);
|
||||||
}
|
}
|
||||||
return Collections.unmodifiableList(settings);
|
return Collections.unmodifiableList(settingList);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -87,14 +87,14 @@ public final class AttachmentProcessor extends AbstractProcessor {
|
||||||
throw new IllegalArgumentException("field [" + field + "] is null, cannot parse.");
|
throw new IllegalArgumentException("field [" + field + "] is null, cannot parse.");
|
||||||
}
|
}
|
||||||
|
|
||||||
Integer indexedChars = this.indexedChars;
|
Integer indexedCharsValue = this.indexedChars;
|
||||||
|
|
||||||
if (indexedCharsField != null) {
|
if (indexedCharsField != null) {
|
||||||
// If the user provided the number of characters to be extracted as part of the document, we use it
|
// If the user provided the number of characters to be extracted as part of the document, we use it
|
||||||
indexedChars = ingestDocument.getFieldValue(indexedCharsField, Integer.class, true);
|
indexedCharsValue = ingestDocument.getFieldValue(indexedCharsField, Integer.class, true);
|
||||||
if (indexedChars == null) {
|
if (indexedCharsValue == null) {
|
||||||
// If the field does not exist we fall back to the global limit
|
// If the field does not exist we fall back to the global limit
|
||||||
indexedChars = this.indexedChars;
|
indexedCharsValue = this.indexedChars;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,7 +104,7 @@ public final class AttachmentProcessor extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
String parsedContent = "";
|
String parsedContent = "";
|
||||||
try {
|
try {
|
||||||
parsedContent = TikaImpl.parse(input, metadata, indexedChars);
|
parsedContent = TikaImpl.parse(input, metadata, indexedCharsValue);
|
||||||
} catch (ZeroByteFileException e) {
|
} catch (ZeroByteFileException e) {
|
||||||
// tika 1.17 throws an exception when the InputStream has 0 bytes.
|
// tika 1.17 throws an exception when the InputStream has 0 bytes.
|
||||||
// previously, it did not mind. This is here to preserve that behavior.
|
// previously, it did not mind. This is here to preserve that behavior.
|
||||||
|
|
|
@ -39,7 +39,7 @@ import static org.hamcrest.Matchers.nullValue;
|
||||||
|
|
||||||
public class AttachmentProcessorTests extends ESTestCase {
|
public class AttachmentProcessorTests extends ESTestCase {
|
||||||
|
|
||||||
private AttachmentProcessor processor;
|
private Processor processor;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void createStandardProcessor() {
|
public void createStandardProcessor() {
|
||||||
|
@ -263,17 +263,7 @@ public class AttachmentProcessorTests extends ESTestCase {
|
||||||
Collections.singletonMap("source_field", null)
|
Collections.singletonMap("source_field", null)
|
||||||
);
|
);
|
||||||
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
|
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
|
||||||
Processor processor = new AttachmentProcessor(
|
processor = new AttachmentProcessor(randomAlphaOfLength(10), null, "source_field", "randomTarget", null, 10, true, null, null);
|
||||||
randomAlphaOfLength(10),
|
|
||||||
null,
|
|
||||||
"source_field",
|
|
||||||
"randomTarget",
|
|
||||||
null,
|
|
||||||
10,
|
|
||||||
true,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
processor.execute(ingestDocument);
|
processor.execute(ingestDocument);
|
||||||
assertIngestDocument(originalIngestDocument, ingestDocument);
|
assertIngestDocument(originalIngestDocument, ingestDocument);
|
||||||
}
|
}
|
||||||
|
@ -281,17 +271,7 @@ public class AttachmentProcessorTests extends ESTestCase {
|
||||||
public void testNonExistentWithIgnoreMissing() throws Exception {
|
public void testNonExistentWithIgnoreMissing() throws Exception {
|
||||||
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
|
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
|
||||||
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
|
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
|
||||||
Processor processor = new AttachmentProcessor(
|
processor = new AttachmentProcessor(randomAlphaOfLength(10), null, "source_field", "randomTarget", null, 10, true, null, null);
|
||||||
randomAlphaOfLength(10),
|
|
||||||
null,
|
|
||||||
"source_field",
|
|
||||||
"randomTarget",
|
|
||||||
null,
|
|
||||||
10,
|
|
||||||
true,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
processor.execute(ingestDocument);
|
processor.execute(ingestDocument);
|
||||||
assertIngestDocument(originalIngestDocument, ingestDocument);
|
assertIngestDocument(originalIngestDocument, ingestDocument);
|
||||||
}
|
}
|
||||||
|
@ -302,17 +282,7 @@ public class AttachmentProcessorTests extends ESTestCase {
|
||||||
Collections.singletonMap("source_field", null)
|
Collections.singletonMap("source_field", null)
|
||||||
);
|
);
|
||||||
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
|
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
|
||||||
Processor processor = new AttachmentProcessor(
|
processor = new AttachmentProcessor(randomAlphaOfLength(10), null, "source_field", "randomTarget", null, 10, false, null, null);
|
||||||
randomAlphaOfLength(10),
|
|
||||||
null,
|
|
||||||
"source_field",
|
|
||||||
"randomTarget",
|
|
||||||
null,
|
|
||||||
10,
|
|
||||||
false,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
Exception exception = expectThrows(Exception.class, () -> processor.execute(ingestDocument));
|
Exception exception = expectThrows(Exception.class, () -> processor.execute(ingestDocument));
|
||||||
assertThat(exception.getMessage(), equalTo("field [source_field] is null, cannot parse."));
|
assertThat(exception.getMessage(), equalTo("field [source_field] is null, cannot parse."));
|
||||||
}
|
}
|
||||||
|
@ -320,33 +290,23 @@ public class AttachmentProcessorTests extends ESTestCase {
|
||||||
public void testNonExistentWithoutIgnoreMissing() throws Exception {
|
public void testNonExistentWithoutIgnoreMissing() throws Exception {
|
||||||
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
|
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
|
||||||
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
|
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
|
||||||
Processor processor = new AttachmentProcessor(
|
processor = new AttachmentProcessor(randomAlphaOfLength(10), null, "source_field", "randomTarget", null, 10, false, null, null);
|
||||||
randomAlphaOfLength(10),
|
|
||||||
null,
|
|
||||||
"source_field",
|
|
||||||
"randomTarget",
|
|
||||||
null,
|
|
||||||
10,
|
|
||||||
false,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
Exception exception = expectThrows(Exception.class, () -> processor.execute(ingestDocument));
|
Exception exception = expectThrows(Exception.class, () -> processor.execute(ingestDocument));
|
||||||
assertThat(exception.getMessage(), equalTo("field [source_field] not present as part of path [source_field]"));
|
assertThat(exception.getMessage(), equalTo("field [source_field] not present as part of path [source_field]"));
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, Object> parseDocument(String file, AttachmentProcessor processor) throws Exception {
|
private Map<String, Object> parseDocument(String file, Processor attachmentProcessor) throws Exception {
|
||||||
return parseDocument(file, processor, new HashMap<>());
|
return parseDocument(file, attachmentProcessor, new HashMap<>());
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, Object> parseDocument(String file, AttachmentProcessor processor, Map<String, Object> optionalFields)
|
private Map<String, Object> parseDocument(String file, Processor attachmentProcessor, Map<String, Object> optionalFields)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
return parseDocument(file, processor, optionalFields, false);
|
return parseDocument(file, attachmentProcessor, optionalFields, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, Object> parseDocument(
|
private Map<String, Object> parseDocument(
|
||||||
String file,
|
String file,
|
||||||
AttachmentProcessor processor,
|
Processor attachmentProcessor,
|
||||||
Map<String, Object> optionalFields,
|
Map<String, Object> optionalFields,
|
||||||
boolean includeResourceName
|
boolean includeResourceName
|
||||||
) throws Exception {
|
) throws Exception {
|
||||||
|
@ -358,7 +318,7 @@ public class AttachmentProcessorTests extends ESTestCase {
|
||||||
document.putAll(optionalFields);
|
document.putAll(optionalFields);
|
||||||
|
|
||||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
|
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
|
||||||
processor.execute(ingestDocument);
|
attachmentProcessor.execute(ingestDocument);
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
Map<String, Object> attachmentData = (Map<String, Object>) ingestDocument.getSourceAndMetadata().get("target_field");
|
Map<String, Object> attachmentData = (Map<String, Object>) ingestDocument.getSourceAndMetadata().get("target_field");
|
||||||
|
|
|
@ -160,8 +160,8 @@ public class AnnotatedPassageFormatter extends PassageFormatter {
|
||||||
int pos;
|
int pos;
|
||||||
int j = 0;
|
int j = 0;
|
||||||
for (Passage passage : passages) {
|
for (Passage passage : passages) {
|
||||||
AnnotationToken[] annotations = getIntersectingAnnotations(passage.getStartOffset(), passage.getEndOffset());
|
AnnotationToken[] annotationTokens = getIntersectingAnnotations(passage.getStartOffset(), passage.getEndOffset());
|
||||||
MarkupPassage mergedMarkup = mergeAnnotations(annotations, passage);
|
MarkupPassage mergedMarkup = mergeAnnotations(annotationTokens, passage);
|
||||||
|
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
pos = passage.getStartOffset();
|
pos = passage.getStartOffset();
|
||||||
|
|
|
@ -393,11 +393,11 @@ public class AnnotatedTextFieldMapper extends FieldMapper {
|
||||||
super(in);
|
super(in);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setAnnotations(AnnotatedText annotatedText) {
|
public void setAnnotations(AnnotatedText text) {
|
||||||
this.annotatedText = annotatedText;
|
this.annotatedText = text;
|
||||||
currentAnnotationIndex = 0;
|
currentAnnotationIndex = 0;
|
||||||
if (annotatedText != null && annotatedText.numAnnotations() > 0) {
|
if (text != null && text.numAnnotations() > 0) {
|
||||||
nextAnnotationForInjection = annotatedText.getAnnotation(0);
|
nextAnnotationForInjection = text.getAnnotation(0);
|
||||||
} else {
|
} else {
|
||||||
nextAnnotationForInjection = null;
|
nextAnnotationForInjection = null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -114,8 +114,8 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
AzureStorageService createAzureStorageService(Settings settings, AzureClientProvider azureClientProvider) {
|
AzureStorageService createAzureStorageService(Settings settingsToUse, AzureClientProvider azureClientProvider) {
|
||||||
return new AzureStorageService(settings, azureClientProvider) {
|
return new AzureStorageService(settingsToUse, azureClientProvider) {
|
||||||
@Override
|
@Override
|
||||||
RequestRetryOptions getRetryOptions(LocationMode locationMode, AzureStorageSettings azureStorageSettings) {
|
RequestRetryOptions getRetryOptions(LocationMode locationMode, AzureStorageSettings azureStorageSettings) {
|
||||||
return new RequestRetryOptions(
|
return new RequestRetryOptions(
|
||||||
|
|
|
@ -700,9 +700,9 @@ public class AzureBlobStore implements BlobStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
private ByteBuf copyBuffer(ByteBuffer buffer) {
|
private ByteBuf copyBuffer(ByteBuffer buffer) {
|
||||||
ByteBuf byteBuf = allocator.heapBuffer(buffer.remaining(), buffer.remaining());
|
ByteBuf byteBuffer = allocator.heapBuffer(buffer.remaining(), buffer.remaining());
|
||||||
byteBuf.writeBytes(buffer);
|
byteBuffer.writeBytes(buffer);
|
||||||
return byteBuf;
|
return byteBuffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -100,8 +100,8 @@ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, R
|
||||||
return Collections.singletonList(azureClientProvider);
|
return Collections.singletonList(azureClientProvider);
|
||||||
}
|
}
|
||||||
|
|
||||||
AzureStorageService createAzureStorageService(Settings settings, AzureClientProvider azureClientProvider) {
|
AzureStorageService createAzureStorageService(Settings settingsToUse, AzureClientProvider azureClientProvider) {
|
||||||
return new AzureStorageService(settings, azureClientProvider);
|
return new AzureStorageService(settingsToUse, azureClientProvider);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -120,8 +120,8 @@ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, R
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
|
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settingsToUse) {
|
||||||
return Arrays.asList(executorBuilder(), nettyEventLoopExecutorBuilder(settings));
|
return Arrays.asList(executorBuilder(), nettyEventLoopExecutorBuilder(settingsToUse));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ExecutorBuilder<?> executorBuilder() {
|
public static ExecutorBuilder<?> executorBuilder() {
|
||||||
|
@ -134,9 +134,9 @@ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, R
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reload(Settings settings) {
|
public void reload(Settings settingsToLoad) {
|
||||||
// secure settings should be readable
|
// secure settings should be readable
|
||||||
final Map<String, AzureStorageSettings> clientsSettings = AzureStorageSettings.load(settings);
|
final Map<String, AzureStorageSettings> clientsSettings = AzureStorageSettings.load(settingsToLoad);
|
||||||
AzureStorageService storageService = azureStoreService.get();
|
AzureStorageService storageService = azureStoreService.get();
|
||||||
assert storageService != null;
|
assert storageService != null;
|
||||||
storageService.refreshSettings(clientsSettings);
|
storageService.refreshSettings(clientsSettings);
|
||||||
|
|
|
@ -237,10 +237,10 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
|
||||||
return new GoogleCloudStorageService() {
|
return new GoogleCloudStorageService() {
|
||||||
@Override
|
@Override
|
||||||
StorageOptions createStorageOptions(
|
StorageOptions createStorageOptions(
|
||||||
final GoogleCloudStorageClientSettings clientSettings,
|
final GoogleCloudStorageClientSettings gcsClientSettings,
|
||||||
final HttpTransportOptions httpTransportOptions
|
final HttpTransportOptions httpTransportOptions
|
||||||
) {
|
) {
|
||||||
StorageOptions options = super.createStorageOptions(clientSettings, httpTransportOptions);
|
StorageOptions options = super.createStorageOptions(gcsClientSettings, httpTransportOptions);
|
||||||
return options.toBuilder()
|
return options.toBuilder()
|
||||||
.setHost(options.getHost())
|
.setHost(options.getHost())
|
||||||
.setCredentials(options.getCredentials())
|
.setCredentials(options.getCredentials())
|
||||||
|
|
|
@ -121,12 +121,12 @@ public class GoogleCloudStorageService {
|
||||||
/**
|
/**
|
||||||
* Creates a client that can be used to manage Google Cloud Storage objects. The client is thread-safe.
|
* Creates a client that can be used to manage Google Cloud Storage objects. The client is thread-safe.
|
||||||
*
|
*
|
||||||
* @param clientSettings client settings to use, including secure settings
|
* @param gcsClientSettings client settings to use, including secure settings
|
||||||
* @param stats the stats collector to use by the underlying SDK
|
* @param stats the stats collector to use by the underlying SDK
|
||||||
* @return a new client storage instance that can be used to manage objects
|
* @return a new client storage instance that can be used to manage objects
|
||||||
* (blobs)
|
* (blobs)
|
||||||
*/
|
*/
|
||||||
private Storage createClient(GoogleCloudStorageClientSettings clientSettings, GoogleCloudStorageOperationsStats stats)
|
private Storage createClient(GoogleCloudStorageClientSettings gcsClientSettings, GoogleCloudStorageOperationsStats stats)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final HttpTransport httpTransport = SocketAccess.doPrivilegedIOException(() -> {
|
final HttpTransport httpTransport = SocketAccess.doPrivilegedIOException(() -> {
|
||||||
final NetHttpTransport.Builder builder = new NetHttpTransport.Builder();
|
final NetHttpTransport.Builder builder = new NetHttpTransport.Builder();
|
||||||
|
@ -146,8 +146,8 @@ public class GoogleCloudStorageService {
|
||||||
|
|
||||||
final HttpTransportOptions httpTransportOptions = new HttpTransportOptions(
|
final HttpTransportOptions httpTransportOptions = new HttpTransportOptions(
|
||||||
HttpTransportOptions.newBuilder()
|
HttpTransportOptions.newBuilder()
|
||||||
.setConnectTimeout(toTimeout(clientSettings.getConnectTimeout()))
|
.setConnectTimeout(toTimeout(gcsClientSettings.getConnectTimeout()))
|
||||||
.setReadTimeout(toTimeout(clientSettings.getReadTimeout()))
|
.setReadTimeout(toTimeout(gcsClientSettings.getReadTimeout()))
|
||||||
.setHttpTransportFactory(() -> httpTransport)
|
.setHttpTransportFactory(() -> httpTransport)
|
||||||
) {
|
) {
|
||||||
|
|
||||||
|
@ -163,28 +163,28 @@ public class GoogleCloudStorageService {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
final StorageOptions storageOptions = createStorageOptions(clientSettings, httpTransportOptions);
|
final StorageOptions storageOptions = createStorageOptions(gcsClientSettings, httpTransportOptions);
|
||||||
return storageOptions.getService();
|
return storageOptions.getService();
|
||||||
}
|
}
|
||||||
|
|
||||||
StorageOptions createStorageOptions(
|
StorageOptions createStorageOptions(
|
||||||
final GoogleCloudStorageClientSettings clientSettings,
|
final GoogleCloudStorageClientSettings gcsClientSettings,
|
||||||
final HttpTransportOptions httpTransportOptions
|
final HttpTransportOptions httpTransportOptions
|
||||||
) {
|
) {
|
||||||
final StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder()
|
final StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder()
|
||||||
.setTransportOptions(httpTransportOptions)
|
.setTransportOptions(httpTransportOptions)
|
||||||
.setHeaderProvider(() -> {
|
.setHeaderProvider(() -> {
|
||||||
final MapBuilder<String, String> mapBuilder = MapBuilder.newMapBuilder();
|
final MapBuilder<String, String> mapBuilder = MapBuilder.newMapBuilder();
|
||||||
if (Strings.hasLength(clientSettings.getApplicationName())) {
|
if (Strings.hasLength(gcsClientSettings.getApplicationName())) {
|
||||||
mapBuilder.put("user-agent", clientSettings.getApplicationName());
|
mapBuilder.put("user-agent", gcsClientSettings.getApplicationName());
|
||||||
}
|
}
|
||||||
return mapBuilder.immutableMap();
|
return mapBuilder.immutableMap();
|
||||||
});
|
});
|
||||||
if (Strings.hasLength(clientSettings.getHost())) {
|
if (Strings.hasLength(gcsClientSettings.getHost())) {
|
||||||
storageOptionsBuilder.setHost(clientSettings.getHost());
|
storageOptionsBuilder.setHost(gcsClientSettings.getHost());
|
||||||
}
|
}
|
||||||
if (Strings.hasLength(clientSettings.getProjectId())) {
|
if (Strings.hasLength(gcsClientSettings.getProjectId())) {
|
||||||
storageOptionsBuilder.setProjectId(clientSettings.getProjectId());
|
storageOptionsBuilder.setProjectId(gcsClientSettings.getProjectId());
|
||||||
} else {
|
} else {
|
||||||
String defaultProjectId = null;
|
String defaultProjectId = null;
|
||||||
try {
|
try {
|
||||||
|
@ -210,16 +210,16 @@ public class GoogleCloudStorageService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (clientSettings.getCredential() == null) {
|
if (gcsClientSettings.getCredential() == null) {
|
||||||
try {
|
try {
|
||||||
storageOptionsBuilder.setCredentials(GoogleCredentials.getApplicationDefault());
|
storageOptionsBuilder.setCredentials(GoogleCredentials.getApplicationDefault());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn("failed to load Application Default Credentials", e);
|
logger.warn("failed to load Application Default Credentials", e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ServiceAccountCredentials serviceAccountCredentials = clientSettings.getCredential();
|
ServiceAccountCredentials serviceAccountCredentials = gcsClientSettings.getCredential();
|
||||||
// override token server URI
|
// override token server URI
|
||||||
final URI tokenServerUri = clientSettings.getTokenUri();
|
final URI tokenServerUri = gcsClientSettings.getTokenUri();
|
||||||
if (Strings.hasLength(tokenServerUri.toString())) {
|
if (Strings.hasLength(tokenServerUri.toString())) {
|
||||||
// Rebuild the service account credentials in order to use a custom Token url.
|
// Rebuild the service account credentials in order to use a custom Token url.
|
||||||
// This is mostly used for testing purpose.
|
// This is mostly used for testing purpose.
|
||||||
|
|
|
@ -140,10 +140,10 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
final GoogleCloudStorageService service = new GoogleCloudStorageService() {
|
final GoogleCloudStorageService service = new GoogleCloudStorageService() {
|
||||||
@Override
|
@Override
|
||||||
StorageOptions createStorageOptions(
|
StorageOptions createStorageOptions(
|
||||||
final GoogleCloudStorageClientSettings clientSettings,
|
final GoogleCloudStorageClientSettings gcsClientSettings,
|
||||||
final HttpTransportOptions httpTransportOptions
|
final HttpTransportOptions httpTransportOptions
|
||||||
) {
|
) {
|
||||||
StorageOptions options = super.createStorageOptions(clientSettings, httpTransportOptions);
|
StorageOptions options = super.createStorageOptions(gcsClientSettings, httpTransportOptions);
|
||||||
RetrySettings.Builder retrySettingsBuilder = RetrySettings.newBuilder()
|
RetrySettings.Builder retrySettingsBuilder = RetrySettings.newBuilder()
|
||||||
.setTotalTimeout(options.getRetrySettings().getTotalTimeout())
|
.setTotalTimeout(options.getRetrySettings().getTotalTimeout())
|
||||||
.setInitialRetryDelay(Duration.ofMillis(10L))
|
.setInitialRetryDelay(Duration.ofMillis(10L))
|
||||||
|
|
|
@ -235,7 +235,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
|
||||||
FileStatus[] files;
|
FileStatus[] files;
|
||||||
try {
|
try {
|
||||||
files = store.execute(
|
files = store.execute(
|
||||||
fileContext -> fileContext.util().listStatus(path, path -> prefix == null || path.getName().startsWith(prefix))
|
fileContext -> fileContext.util().listStatus(path, eachPath -> prefix == null || eachPath.getName().startsWith(prefix))
|
||||||
);
|
);
|
||||||
} catch (FileNotFoundException e) {
|
} catch (FileNotFoundException e) {
|
||||||
files = new FileStatus[0];
|
files = new FileStatus[0];
|
||||||
|
|
|
@ -48,6 +48,7 @@ final class HdfsBlobStore implements BlobStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("HiddenField")
|
||||||
private void mkdirs(Path path) throws IOException {
|
private void mkdirs(Path path) throws IOException {
|
||||||
execute((Operation<Void>) fileContext -> {
|
execute((Operation<Void>) fileContext -> {
|
||||||
fileContext.mkdir(path, null, true);
|
fileContext.mkdir(path, null, true);
|
||||||
|
|
|
@ -106,7 +106,7 @@ public final class HdfsRepository extends BlobStoreRepository {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private HdfsBlobStore createBlobstore(URI uri, String path, Settings repositorySettings) {
|
private HdfsBlobStore createBlobstore(URI blobstoreUri, String path, Settings repositorySettings) {
|
||||||
Configuration hadoopConfiguration = new Configuration(repositorySettings.getAsBoolean("load_defaults", true));
|
Configuration hadoopConfiguration = new Configuration(repositorySettings.getAsBoolean("load_defaults", true));
|
||||||
hadoopConfiguration.setClassLoader(HdfsRepository.class.getClassLoader());
|
hadoopConfiguration.setClassLoader(HdfsRepository.class.getClassLoader());
|
||||||
hadoopConfiguration.reloadConfiguration();
|
hadoopConfiguration.reloadConfiguration();
|
||||||
|
@ -126,7 +126,7 @@ public final class HdfsRepository extends BlobStoreRepository {
|
||||||
// Sense if HA is enabled
|
// Sense if HA is enabled
|
||||||
// HA requires elevated permissions during regular usage in the event that a failover operation
|
// HA requires elevated permissions during regular usage in the event that a failover operation
|
||||||
// occurs and a new connection is required.
|
// occurs and a new connection is required.
|
||||||
String host = uri.getHost();
|
String host = blobstoreUri.getHost();
|
||||||
String configKey = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + host;
|
String configKey = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + host;
|
||||||
Class<?> ret = hadoopConfiguration.getClass(configKey, null, FailoverProxyProvider.class);
|
Class<?> ret = hadoopConfiguration.getClass(configKey, null, FailoverProxyProvider.class);
|
||||||
boolean haEnabled = ret != null;
|
boolean haEnabled = ret != null;
|
||||||
|
@ -135,7 +135,7 @@ public final class HdfsRepository extends BlobStoreRepository {
|
||||||
// This will correctly configure the filecontext to have our UGI as its internal user.
|
// This will correctly configure the filecontext to have our UGI as its internal user.
|
||||||
FileContext fileContext = ugi.doAs((PrivilegedAction<FileContext>) () -> {
|
FileContext fileContext = ugi.doAs((PrivilegedAction<FileContext>) () -> {
|
||||||
try {
|
try {
|
||||||
AbstractFileSystem fs = AbstractFileSystem.get(uri, hadoopConfiguration);
|
AbstractFileSystem fs = AbstractFileSystem.get(blobstoreUri, hadoopConfiguration);
|
||||||
return FileContext.getFileContext(fs, hadoopConfiguration);
|
return FileContext.getFileContext(fs, hadoopConfiguration);
|
||||||
} catch (UnsupportedFileSystemException e) {
|
} catch (UnsupportedFileSystemException e) {
|
||||||
throw new UncheckedIOException(e);
|
throw new UncheckedIOException(e);
|
||||||
|
@ -152,7 +152,7 @@ public final class HdfsRepository extends BlobStoreRepository {
|
||||||
try {
|
try {
|
||||||
return new HdfsBlobStore(fileContext, path, bufferSize, isReadOnly(), haEnabled);
|
return new HdfsBlobStore(fileContext, path, bufferSize, isReadOnly(), haEnabled);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new UncheckedIOException(String.format(Locale.ROOT, "Cannot create HDFS repository for uri [%s]", uri), e);
|
throw new UncheckedIOException(String.format(Locale.ROOT, "Cannot create HDFS repository for uri [%s]", blobstoreUri), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -100,9 +100,9 @@ class HdfsSecurityContext {
|
||||||
this.restrictedExecutionPermissions = renderPermissions(ugi);
|
this.restrictedExecutionPermissions = renderPermissions(ugi);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Permission[] renderPermissions(UserGroupInformation ugi) {
|
private Permission[] renderPermissions(UserGroupInformation userGroupInformation) {
|
||||||
Permission[] permissions;
|
Permission[] permissions;
|
||||||
if (ugi.isFromKeytab()) {
|
if (userGroupInformation.isFromKeytab()) {
|
||||||
// KERBEROS
|
// KERBEROS
|
||||||
// Leave room to append one extra permission based on the logged in user's info.
|
// Leave room to append one extra permission based on the logged in user's info.
|
||||||
int permlen = KERBEROS_AUTH_PERMISSIONS.length + 1;
|
int permlen = KERBEROS_AUTH_PERMISSIONS.length + 1;
|
||||||
|
@ -112,7 +112,7 @@ class HdfsSecurityContext {
|
||||||
|
|
||||||
// Append a kerberos.ServicePermission to only allow initiating kerberos connections
|
// Append a kerberos.ServicePermission to only allow initiating kerberos connections
|
||||||
// as the logged in user.
|
// as the logged in user.
|
||||||
permissions[permissions.length - 1] = new ServicePermission(ugi.getUserName(), "initiate");
|
permissions[permissions.length - 1] = new ServicePermission(userGroupInformation.getUserName(), "initiate");
|
||||||
} else {
|
} else {
|
||||||
// SIMPLE
|
// SIMPLE
|
||||||
permissions = Arrays.copyOf(SIMPLE_AUTH_PERMISSIONS, SIMPLE_AUTH_PERMISSIONS.length);
|
permissions = Arrays.copyOf(SIMPLE_AUTH_PERMISSIONS, SIMPLE_AUTH_PERMISSIONS.length);
|
||||||
|
|
|
@ -451,9 +451,9 @@ class S3BlobContainer extends AbstractBlobContainer {
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListObjectsRequest listObjectsRequest(String keyPath) {
|
private ListObjectsRequest listObjectsRequest(String pathPrefix) {
|
||||||
return new ListObjectsRequest().withBucketName(blobStore.bucket())
|
return new ListObjectsRequest().withBucketName(blobStore.bucket())
|
||||||
.withPrefix(keyPath)
|
.withPrefix(pathPrefix)
|
||||||
.withDelimiter("/")
|
.withDelimiter("/")
|
||||||
.withRequestMetricCollector(blobStore.listMetricCollector);
|
.withRequestMetricCollector(blobStore.listMetricCollector);
|
||||||
}
|
}
|
||||||
|
@ -465,28 +465,28 @@ class S3BlobContainer extends AbstractBlobContainer {
|
||||||
/**
|
/**
|
||||||
* Uploads a blob using a single upload request
|
* Uploads a blob using a single upload request
|
||||||
*/
|
*/
|
||||||
void executeSingleUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize)
|
void executeSingleUpload(final S3BlobStore s3BlobStore, final String blobName, final InputStream input, final long blobSize)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
// Extra safety checks
|
// Extra safety checks
|
||||||
if (blobSize > MAX_FILE_SIZE.getBytes()) {
|
if (blobSize > MAX_FILE_SIZE.getBytes()) {
|
||||||
throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than " + MAX_FILE_SIZE);
|
throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than " + MAX_FILE_SIZE);
|
||||||
}
|
}
|
||||||
if (blobSize > blobStore.bufferSizeInBytes()) {
|
if (blobSize > s3BlobStore.bufferSizeInBytes()) {
|
||||||
throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than buffer size");
|
throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than buffer size");
|
||||||
}
|
}
|
||||||
|
|
||||||
final ObjectMetadata md = new ObjectMetadata();
|
final ObjectMetadata md = new ObjectMetadata();
|
||||||
md.setContentLength(blobSize);
|
md.setContentLength(blobSize);
|
||||||
if (blobStore.serverSideEncryption()) {
|
if (s3BlobStore.serverSideEncryption()) {
|
||||||
md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
|
md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
|
||||||
}
|
}
|
||||||
final PutObjectRequest putRequest = new PutObjectRequest(blobStore.bucket(), blobName, input, md);
|
final PutObjectRequest putRequest = new PutObjectRequest(s3BlobStore.bucket(), blobName, input, md);
|
||||||
putRequest.setStorageClass(blobStore.getStorageClass());
|
putRequest.setStorageClass(s3BlobStore.getStorageClass());
|
||||||
putRequest.setCannedAcl(blobStore.getCannedACL());
|
putRequest.setCannedAcl(s3BlobStore.getCannedACL());
|
||||||
putRequest.setRequestMetricCollector(blobStore.putMetricCollector);
|
putRequest.setRequestMetricCollector(s3BlobStore.putMetricCollector);
|
||||||
|
|
||||||
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
|
try (AmazonS3Reference clientReference = s3BlobStore.clientReference()) {
|
||||||
SocketAccess.doPrivilegedVoid(() -> { clientReference.client().putObject(putRequest); });
|
SocketAccess.doPrivilegedVoid(() -> { clientReference.client().putObject(putRequest); });
|
||||||
} catch (final AmazonClientException e) {
|
} catch (final AmazonClientException e) {
|
||||||
throw new IOException("Unable to upload object [" + blobName + "] using a single upload", e);
|
throw new IOException("Unable to upload object [" + blobName + "] using a single upload", e);
|
||||||
|
@ -496,11 +496,11 @@ class S3BlobContainer extends AbstractBlobContainer {
|
||||||
/**
|
/**
|
||||||
* Uploads a blob using multipart upload requests.
|
* Uploads a blob using multipart upload requests.
|
||||||
*/
|
*/
|
||||||
void executeMultipartUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize)
|
void executeMultipartUpload(final S3BlobStore s3BlobStore, final String blobName, final InputStream input, final long blobSize)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
ensureMultiPartUploadSize(blobSize);
|
ensureMultiPartUploadSize(blobSize);
|
||||||
final long partSize = blobStore.bufferSizeInBytes();
|
final long partSize = s3BlobStore.bufferSizeInBytes();
|
||||||
final Tuple<Long, Long> multiparts = numberOfMultiparts(blobSize, partSize);
|
final Tuple<Long, Long> multiparts = numberOfMultiparts(blobSize, partSize);
|
||||||
|
|
||||||
if (multiparts.v1() > Integer.MAX_VALUE) {
|
if (multiparts.v1() > Integer.MAX_VALUE) {
|
||||||
|
@ -512,9 +512,9 @@ class S3BlobContainer extends AbstractBlobContainer {
|
||||||
assert blobSize == (((nbParts - 1) * partSize) + lastPartSize) : "blobSize does not match multipart sizes";
|
assert blobSize == (((nbParts - 1) * partSize) + lastPartSize) : "blobSize does not match multipart sizes";
|
||||||
|
|
||||||
final SetOnce<String> uploadId = new SetOnce<>();
|
final SetOnce<String> uploadId = new SetOnce<>();
|
||||||
final String bucketName = blobStore.bucket();
|
final String bucketName = s3BlobStore.bucket();
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
|
try (AmazonS3Reference clientReference = s3BlobStore.clientReference()) {
|
||||||
|
|
||||||
uploadId.set(
|
uploadId.set(
|
||||||
SocketAccess.doPrivileged(
|
SocketAccess.doPrivileged(
|
||||||
|
@ -556,7 +556,7 @@ class S3BlobContainer extends AbstractBlobContainer {
|
||||||
uploadId.get(),
|
uploadId.get(),
|
||||||
parts
|
parts
|
||||||
);
|
);
|
||||||
complRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
|
complRequest.setRequestMetricCollector(s3BlobStore.multiPartUploadMetricCollector);
|
||||||
SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest));
|
SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest));
|
||||||
success = true;
|
success = true;
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,7 @@ import java.util.Map;
|
||||||
import static java.util.Collections.emptyMap;
|
import static java.util.Collections.emptyMap;
|
||||||
|
|
||||||
class S3Service implements Closeable {
|
class S3Service implements Closeable {
|
||||||
private static final Logger logger = LogManager.getLogger(S3Service.class);
|
private static final Logger LOGGER = LogManager.getLogger(S3Service.class);
|
||||||
|
|
||||||
private volatile Map<S3ClientSettings, AmazonS3Reference> clientsCache = emptyMap();
|
private volatile Map<S3ClientSettings, AmazonS3Reference> clientsCache = emptyMap();
|
||||||
|
|
||||||
|
@ -127,7 +127,7 @@ class S3Service implements Closeable {
|
||||||
// proxy for testing
|
// proxy for testing
|
||||||
AmazonS3 buildClient(final S3ClientSettings clientSettings) {
|
AmazonS3 buildClient(final S3ClientSettings clientSettings) {
|
||||||
final AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard();
|
final AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard();
|
||||||
builder.withCredentials(buildCredentials(logger, clientSettings));
|
builder.withCredentials(buildCredentials(LOGGER, clientSettings));
|
||||||
builder.withClientConfiguration(buildConfiguration(clientSettings));
|
builder.withClientConfiguration(buildConfiguration(clientSettings));
|
||||||
|
|
||||||
String endpoint = Strings.hasLength(clientSettings.endpoint) ? clientSettings.endpoint : Constants.S3_HOSTNAME;
|
String endpoint = Strings.hasLength(clientSettings.endpoint) ? clientSettings.endpoint : Constants.S3_HOSTNAME;
|
||||||
|
@ -137,7 +137,7 @@ class S3Service implements Closeable {
|
||||||
endpoint = clientSettings.protocol.toString() + "://" + endpoint;
|
endpoint = clientSettings.protocol.toString() + "://" + endpoint;
|
||||||
}
|
}
|
||||||
final String region = Strings.hasLength(clientSettings.region) ? clientSettings.region : null;
|
final String region = Strings.hasLength(clientSettings.region) ? clientSettings.region : null;
|
||||||
logger.debug("using endpoint [{}] and region [{}]", endpoint, region);
|
LOGGER.debug("using endpoint [{}] and region [{}]", endpoint, region);
|
||||||
|
|
||||||
// If the endpoint configuration isn't set on the builder then the default behaviour is to try
|
// If the endpoint configuration isn't set on the builder then the default behaviour is to try
|
||||||
// and work out what region we are in and use an appropriate endpoint - see AwsClientBuilder#setRegion.
|
// and work out what region we are in and use an appropriate endpoint - see AwsClientBuilder#setRegion.
|
||||||
|
|
|
@ -217,8 +217,8 @@ public class NioHttpRequest implements HttpRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public NioHttpResponse createResponse(RestStatus status, BytesReference content) {
|
public NioHttpResponse createResponse(RestStatus status, BytesReference contentRef) {
|
||||||
return new NioHttpResponse(request.headers(), request.protocolVersion(), status, content);
|
return new NioHttpResponse(request.headers(), request.protocolVersion(), status, contentRef);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -196,10 +196,10 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
|
||||||
HttpHandlingSettings httpHandlingSettings = HttpHandlingSettings.fromSettings(settings);
|
HttpHandlingSettings httpHandlingSettings = HttpHandlingSettings.fromSettings(settings);
|
||||||
|
|
||||||
CorsHandler corsHandler = CorsHandler.disabled();
|
CorsHandler corsHandler = CorsHandler.disabled();
|
||||||
TaskScheduler taskScheduler = new TaskScheduler();
|
TaskScheduler realScheduler = new TaskScheduler();
|
||||||
|
|
||||||
Iterator<Integer> timeValues = Arrays.asList(0, 2, 4, 6, 8).iterator();
|
Iterator<Integer> timeValues = Arrays.asList(0, 2, 4, 6, 8).iterator();
|
||||||
handler = new HttpReadWriteHandler(channel, transport, httpHandlingSettings, taskScheduler, timeValues::next);
|
handler = new HttpReadWriteHandler(channel, transport, httpHandlingSettings, realScheduler, timeValues::next);
|
||||||
handler.channelActive();
|
handler.channelActive();
|
||||||
|
|
||||||
prepareHandlerForResponse(handler);
|
prepareHandlerForResponse(handler);
|
||||||
|
@ -207,31 +207,31 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
|
||||||
HttpWriteOperation writeOperation0 = new HttpWriteOperation(context, emptyGetResponse(0), mock(BiConsumer.class));
|
HttpWriteOperation writeOperation0 = new HttpWriteOperation(context, emptyGetResponse(0), mock(BiConsumer.class));
|
||||||
((ChannelPromise) handler.writeToBytes(writeOperation0).get(0).getListener()).setSuccess();
|
((ChannelPromise) handler.writeToBytes(writeOperation0).get(0).getListener()).setSuccess();
|
||||||
|
|
||||||
taskScheduler.pollTask(timeValue.getNanos() + 1).run();
|
realScheduler.pollTask(timeValue.getNanos() + 1).run();
|
||||||
// There was a read. Do not close.
|
// There was a read. Do not close.
|
||||||
verify(transport, times(0)).onException(eq(channel), any(HttpReadTimeoutException.class));
|
verify(transport, times(0)).onException(eq(channel), any(HttpReadTimeoutException.class));
|
||||||
|
|
||||||
prepareHandlerForResponse(handler);
|
prepareHandlerForResponse(handler);
|
||||||
prepareHandlerForResponse(handler);
|
prepareHandlerForResponse(handler);
|
||||||
|
|
||||||
taskScheduler.pollTask(timeValue.getNanos() + 3).run();
|
realScheduler.pollTask(timeValue.getNanos() + 3).run();
|
||||||
// There was a read. Do not close.
|
// There was a read. Do not close.
|
||||||
verify(transport, times(0)).onException(eq(channel), any(HttpReadTimeoutException.class));
|
verify(transport, times(0)).onException(eq(channel), any(HttpReadTimeoutException.class));
|
||||||
|
|
||||||
HttpWriteOperation writeOperation1 = new HttpWriteOperation(context, emptyGetResponse(1), mock(BiConsumer.class));
|
HttpWriteOperation writeOperation1 = new HttpWriteOperation(context, emptyGetResponse(1), mock(BiConsumer.class));
|
||||||
((ChannelPromise) handler.writeToBytes(writeOperation1).get(0).getListener()).setSuccess();
|
((ChannelPromise) handler.writeToBytes(writeOperation1).get(0).getListener()).setSuccess();
|
||||||
|
|
||||||
taskScheduler.pollTask(timeValue.getNanos() + 5).run();
|
realScheduler.pollTask(timeValue.getNanos() + 5).run();
|
||||||
// There has not been a read, however there is still an inflight request. Do not close.
|
// There has not been a read, however there is still an inflight request. Do not close.
|
||||||
verify(transport, times(0)).onException(eq(channel), any(HttpReadTimeoutException.class));
|
verify(transport, times(0)).onException(eq(channel), any(HttpReadTimeoutException.class));
|
||||||
|
|
||||||
HttpWriteOperation writeOperation2 = new HttpWriteOperation(context, emptyGetResponse(2), mock(BiConsumer.class));
|
HttpWriteOperation writeOperation2 = new HttpWriteOperation(context, emptyGetResponse(2), mock(BiConsumer.class));
|
||||||
((ChannelPromise) handler.writeToBytes(writeOperation2).get(0).getListener()).setSuccess();
|
((ChannelPromise) handler.writeToBytes(writeOperation2).get(0).getListener()).setSuccess();
|
||||||
|
|
||||||
taskScheduler.pollTask(timeValue.getNanos() + 7).run();
|
realScheduler.pollTask(timeValue.getNanos() + 7).run();
|
||||||
// No reads and no inflight requests, close
|
// No reads and no inflight requests, close
|
||||||
verify(transport, times(1)).onException(eq(channel), any(HttpReadTimeoutException.class));
|
verify(transport, times(1)).onException(eq(channel), any(HttpReadTimeoutException.class));
|
||||||
assertNull(taskScheduler.pollTask(timeValue.getNanos() + 9));
|
assertNull(realScheduler.pollTask(timeValue.getNanos() + 9));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static HttpPipelinedResponse emptyGetResponse(int sequence) {
|
private static HttpPipelinedResponse emptyGetResponse(int sequence) {
|
||||||
|
@ -242,7 +242,7 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
|
||||||
return httpResponse;
|
return httpResponse;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void prepareHandlerForResponse(HttpReadWriteHandler handler) throws IOException {
|
private void prepareHandlerForResponse(HttpReadWriteHandler readWriteHandler) throws IOException {
|
||||||
HttpMethod method = randomBoolean() ? HttpMethod.GET : HttpMethod.HEAD;
|
HttpMethod method = randomBoolean() ? HttpMethod.GET : HttpMethod.HEAD;
|
||||||
HttpVersion version = randomBoolean() ? HttpVersion.HTTP_1_0 : HttpVersion.HTTP_1_1;
|
HttpVersion version = randomBoolean() ? HttpVersion.HTTP_1_0 : HttpVersion.HTTP_1_1;
|
||||||
String uri = "http://localhost:9090/" + randomAlphaOfLength(8);
|
String uri = "http://localhost:9090/" + randomAlphaOfLength(8);
|
||||||
|
@ -250,7 +250,7 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
|
||||||
io.netty.handler.codec.http.HttpRequest request = new DefaultFullHttpRequest(version, method, uri);
|
io.netty.handler.codec.http.HttpRequest request = new DefaultFullHttpRequest(version, method, uri);
|
||||||
ByteBuf buf = requestEncoder.encode(request);
|
ByteBuf buf = requestEncoder.encode(request);
|
||||||
try {
|
try {
|
||||||
handler.consumeReads(toChannelBuffer(buf));
|
readWriteHandler.consumeReads(toChannelBuffer(buf));
|
||||||
} finally {
|
} finally {
|
||||||
buf.release();
|
buf.release();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue