Forbid changing thread pool types

This commit forbids the changing of thread pool types for any thread
pool. The motivation here is that these are expert settings with
little practical advantage.

Closes #14294, relates #2509, relates #2858, relates #5152
This commit is contained in:
Jason Tedor 2015-10-28 10:16:54 -04:00
parent c44fe5c907
commit e3b8dc7121
11 changed files with 687 additions and 430 deletions

View file

@ -177,7 +177,7 @@ public class ClusterModule extends AbstractModule {
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT, Validator.TIME_NON_NEGATIVE); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT, Validator.TIME_NON_NEGATIVE);
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT, Validator.TIME_NON_NEGATIVE); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT, Validator.TIME_NON_NEGATIVE);
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_SIZE_PER_SEC, Validator.BYTES_SIZE); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_SIZE_PER_SEC, Validator.BYTES_SIZE);
registerClusterDynamicSetting(ThreadPool.THREADPOOL_GROUP + "*", Validator.EMPTY); registerClusterDynamicSetting(ThreadPool.THREADPOOL_GROUP + "*", ThreadPool.THREAD_POOL_TYPE_SETTINGS_VALIDATOR);
registerClusterDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, Validator.INTEGER); registerClusterDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, Validator.INTEGER);
registerClusterDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, Validator.INTEGER); registerClusterDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, Validator.INTEGER);
registerClusterDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, Validator.EMPTY); registerClusterDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, Validator.EMPTY);

View file

@ -288,7 +288,7 @@ public class RestThreadPoolAction extends AbstractCatAction {
} }
} }
table.addCell(poolInfo == null ? null : poolInfo.getType()); table.addCell(poolInfo == null ? null : poolInfo.getThreadPoolType());
table.addCell(poolStats == null ? null : poolStats.getActive()); table.addCell(poolStats == null ? null : poolStats.getActive());
table.addCell(poolStats == null ? null : poolStats.getThreads()); table.addCell(poolStats == null ? null : poolStats.getThreads());
table.addCell(poolStats == null ? null : poolStats.getQueue()); table.addCell(poolStats == null ? null : poolStats.getQueue());

View file

@ -20,6 +20,8 @@
package org.elasticsearch.threadpool; package org.elasticsearch.threadpool;
import org.apache.lucene.util.Counter; import org.apache.lucene.util.Counter;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.settings.Validator;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -39,22 +41,11 @@ import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.node.settings.NodeSettingsService;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.*;
import java.util.HashMap; import java.util.concurrent.*;
import java.util.List; import java.util.function.Function;
import java.util.Map; import java.util.regex.Matcher;
import java.util.Objects; import java.util.regex.Pattern;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.unmodifiableMap; import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.settings.Settings.settingsBuilder;
@ -86,6 +77,101 @@ public class ThreadPool extends AbstractComponent {
public static final String FETCH_SHARD_STORE = "fetch_shard_store"; public static final String FETCH_SHARD_STORE = "fetch_shard_store";
} }
public enum ThreadPoolType {
CACHED("cached"),
DIRECT("direct"),
FIXED("fixed"),
SCALING("scaling");
private final String type;
public String getType() {
return type;
}
ThreadPoolType(String type) {
this.type = type;
}
private final static Map<String, ThreadPoolType> TYPE_MAP;
static {
Map<String, ThreadPoolType> typeMap = new HashMap<>();
for (ThreadPoolType threadPoolType : ThreadPoolType.values()) {
typeMap.put(threadPoolType.getType(), threadPoolType);
}
TYPE_MAP = Collections.unmodifiableMap(typeMap);
}
public static ThreadPoolType fromType(String type) {
ThreadPoolType threadPoolType = TYPE_MAP.get(type);
if (threadPoolType == null) {
throw new IllegalArgumentException("no ThreadPoolType for " + type);
}
return threadPoolType;
}
}
public static Map<String, ThreadPoolType> THREAD_POOL_TYPES;
static {
HashMap<String, ThreadPoolType> map = new HashMap<>();
map.put(Names.SAME, ThreadPoolType.DIRECT);
map.put(Names.GENERIC, ThreadPoolType.CACHED);
map.put(Names.LISTENER, ThreadPoolType.FIXED);
map.put(Names.GET, ThreadPoolType.FIXED);
map.put(Names.INDEX, ThreadPoolType.FIXED);
map.put(Names.BULK, ThreadPoolType.FIXED);
map.put(Names.SEARCH, ThreadPoolType.FIXED);
map.put(Names.SUGGEST, ThreadPoolType.FIXED);
map.put(Names.PERCOLATE, ThreadPoolType.FIXED);
map.put(Names.MANAGEMENT, ThreadPoolType.SCALING);
map.put(Names.FLUSH, ThreadPoolType.SCALING);
map.put(Names.REFRESH, ThreadPoolType.SCALING);
map.put(Names.WARMER, ThreadPoolType.SCALING);
map.put(Names.SNAPSHOT, ThreadPoolType.SCALING);
map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED);
map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING);
map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING);
THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
}
private static void add(Map<String, Settings> executorSettings, ExecutorSettingsBuilder builder) {
Settings settings = builder.build();
String name = settings.get("name");
executorSettings.put(name, settings);
}
private static class ExecutorSettingsBuilder {
Map<String, String> settings = new HashMap<>();
public ExecutorSettingsBuilder(String name) {
settings.put("name", name);
settings.put("type", THREAD_POOL_TYPES.get(name).getType());
}
public ExecutorSettingsBuilder size(int availableProcessors) {
return add("size", Integer.toString(availableProcessors));
}
public ExecutorSettingsBuilder queueSize(int queueSize) {
return add("queue_size", Integer.toString(queueSize));
}
public ExecutorSettingsBuilder keepAlive(String keepAlive) {
return add("keep_alive", keepAlive);
}
private ExecutorSettingsBuilder add(String key, String value) {
settings.put(key, value);
return this;
}
public Settings build() {
return settingsBuilder().put(settings).build();
}
}
public static final String THREADPOOL_GROUP = "threadpool."; public static final String THREADPOOL_GROUP = "threadpool.";
private volatile Map<String, ExecutorHolder> executors; private volatile Map<String, ExecutorHolder> executors;
@ -102,7 +188,6 @@ public class ThreadPool extends AbstractComponent {
static final Executor DIRECT_EXECUTOR = command -> command.run(); static final Executor DIRECT_EXECUTOR = command -> command.run();
public ThreadPool(String name) { public ThreadPool(String name) {
this(Settings.builder().put("name", name).build()); this(Settings.builder().put("name", name).build());
} }
@ -112,42 +197,31 @@ public class ThreadPool extends AbstractComponent {
assert settings.get("name") != null : "ThreadPool's settings should contain a name"; assert settings.get("name") != null : "ThreadPool's settings should contain a name";
Map<String, Settings> groupSettings = settings.getGroups(THREADPOOL_GROUP); Map<String, Settings> groupSettings = getThreadPoolSettingsGroup(settings);
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
int halfProcMaxAt5 = Math.min(((availableProcessors + 1) / 2), 5); int halfProcMaxAt5 = Math.min(((availableProcessors + 1) / 2), 5);
int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10); int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10);
Map<String, Settings> defaultExecutorTypeSettings = new HashMap<>(); Map<String, Settings> defaultExecutorTypeSettings = new HashMap<>();
defaultExecutorTypeSettings.put(Names.GENERIC, settingsBuilder().put("type", "cached").put("keep_alive", "30s").build()); add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.GENERIC).keepAlive("30s"));
defaultExecutorTypeSettings.put(Names.INDEX, add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.INDEX).size(availableProcessors).queueSize(200));
settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 200).build()); add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.BULK).size(availableProcessors).queueSize(50));
defaultExecutorTypeSettings.put(Names.BULK, add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.GET).size(availableProcessors).queueSize(1000));
settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 50).build()); add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SEARCH).size(((availableProcessors * 3) / 2) + 1).queueSize(1000));
defaultExecutorTypeSettings.put(Names.GET, add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SUGGEST).size(availableProcessors).queueSize(1000));
settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build()); add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.PERCOLATE).size(availableProcessors).queueSize(1000));
defaultExecutorTypeSettings.put(Names.SEARCH, add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.MANAGEMENT).size(5).keepAlive("5m"));
settingsBuilder().put("type", "fixed").put("size", ((availableProcessors * 3) / 2) + 1).put("queue_size", 1000).build());
defaultExecutorTypeSettings.put(Names.SUGGEST,
settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build());
defaultExecutorTypeSettings.put(Names.PERCOLATE,
settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build());
defaultExecutorTypeSettings .put(Names.MANAGEMENT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 5).build());
// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
// the assumption here is that the listeners should be very lightweight on the listeners side // the assumption here is that the listeners should be very lightweight on the listeners side
defaultExecutorTypeSettings.put(Names.LISTENER, settingsBuilder().put("type", "fixed").put("size", halfProcMaxAt10).build()); add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.LISTENER).size(halfProcMaxAt10));
defaultExecutorTypeSettings.put(Names.FLUSH, add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FLUSH).size(halfProcMaxAt5).keepAlive("5m"));
settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()); add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.REFRESH).size(halfProcMaxAt10).keepAlive("5m"));
defaultExecutorTypeSettings.put(Names.REFRESH, add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.WARMER).size(halfProcMaxAt5).keepAlive("5m"));
settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt10).build()); add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SNAPSHOT).size(halfProcMaxAt5).keepAlive("5m"));
defaultExecutorTypeSettings.put(Names.WARMER, add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FORCE_MERGE).size(1));
settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()); add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STARTED).size(availableProcessors * 2).keepAlive("5m"));
defaultExecutorTypeSettings.put(Names.SNAPSHOT, add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STORE).size(availableProcessors * 2).keepAlive("5m"));
settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build());
defaultExecutorTypeSettings.put(Names.FORCE_MERGE, settingsBuilder().put("type", "fixed").put("size", 1).build());
defaultExecutorTypeSettings.put(Names.FETCH_SHARD_STARTED,
settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", availableProcessors * 2).build());
defaultExecutorTypeSettings.put(Names.FETCH_SHARD_STORE,
settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", availableProcessors * 2).build());
this.defaultExecutorTypeSettings = unmodifiableMap(defaultExecutorTypeSettings); this.defaultExecutorTypeSettings = unmodifiableMap(defaultExecutorTypeSettings);
Map<String, ExecutorHolder> executors = new HashMap<>(); Map<String, ExecutorHolder> executors = new HashMap<>();
@ -163,8 +237,8 @@ public class ThreadPool extends AbstractComponent {
executors.put(entry.getKey(), build(entry.getKey(), entry.getValue(), Settings.EMPTY)); executors.put(entry.getKey(), build(entry.getKey(), entry.getValue(), Settings.EMPTY));
} }
executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, "same"))); executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));
if (!executors.get(Names.GENERIC).info.getType().equals("cached")) { if (!executors.get(Names.GENERIC).info.getThreadPoolType().equals(ThreadPoolType.CACHED)) {
throw new IllegalArgumentException("generic thread pool must be of type cached"); throw new IllegalArgumentException("generic thread pool must be of type cached");
} }
this.executors = unmodifiableMap(executors); this.executors = unmodifiableMap(executors);
@ -178,6 +252,12 @@ public class ThreadPool extends AbstractComponent {
this.estimatedTimeThread.start(); this.estimatedTimeThread.start();
} }
private Map<String, Settings> getThreadPoolSettingsGroup(Settings settings) {
Map<String, Settings> groupSettings = settings.getGroups(THREADPOOL_GROUP);
validate(groupSettings);
return groupSettings;
}
public void setNodeSettingsService(NodeSettingsService nodeSettingsService) { public void setNodeSettingsService(NodeSettingsService nodeSettingsService) {
if(settingsListenerIsSet) { if(settingsListenerIsSet) {
throw new IllegalStateException("the node settings listener was set more then once"); throw new IllegalStateException("the node settings listener was set more then once");
@ -326,24 +406,28 @@ public class ThreadPool extends AbstractComponent {
settings = Settings.Builder.EMPTY_SETTINGS; settings = Settings.Builder.EMPTY_SETTINGS;
} }
Info previousInfo = previousExecutorHolder != null ? previousExecutorHolder.info : null; Info previousInfo = previousExecutorHolder != null ? previousExecutorHolder.info : null;
String type = settings.get("type", previousInfo != null ? previousInfo.getType() : defaultSettings.get("type")); String type = settings.get("type", previousInfo != null ? previousInfo.getThreadPoolType().getType() : defaultSettings.get("type"));
ThreadPoolType threadPoolType = ThreadPoolType.fromType(type);
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, name); ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, name);
if ("same".equals(type)) { if (ThreadPoolType.DIRECT == threadPoolType) {
if (previousExecutorHolder != null) { if (previousExecutorHolder != null) {
logger.debug("updating thread_pool [{}], type [{}]", name, type); logger.debug("updating thread_pool [{}], type [{}]", name, type);
} else { } else {
logger.debug("creating thread_pool [{}], type [{}]", name, type); logger.debug("creating thread_pool [{}], type [{}]", name, type);
} }
return new ExecutorHolder(DIRECT_EXECUTOR, new Info(name, type)); return new ExecutorHolder(DIRECT_EXECUTOR, new Info(name, threadPoolType));
} else if ("cached".equals(type)) { } else if (ThreadPoolType.CACHED == threadPoolType) {
if (!Names.GENERIC.equals(name)) {
throw new IllegalArgumentException("thread pool type cached is reserved only for the generic thread pool and can not be applied to [" + name + "]");
}
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
if (previousExecutorHolder != null) { if (previousExecutorHolder != null) {
if ("cached".equals(previousInfo.getType())) { if (ThreadPoolType.CACHED == previousInfo.getThreadPoolType()) {
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive()); TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) { if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) {
logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive); logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive);
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS); ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS);
return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, type, -1, -1, updatedKeepAlive, null)); return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, threadPoolType, -1, -1, updatedKeepAlive, null));
} }
return previousExecutorHolder; return previousExecutorHolder;
} }
@ -358,13 +442,13 @@ public class ThreadPool extends AbstractComponent {
logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive);
} }
Executor executor = EsExecutors.newCached(name, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); Executor executor = EsExecutors.newCached(name, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory);
return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null)); return new ExecutorHolder(executor, new Info(name, threadPoolType, -1, -1, keepAlive, null));
} else if ("fixed".equals(type)) { } else if (ThreadPoolType.FIXED == threadPoolType) {
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings));
SizeValue defaultQueueSize = getAsSizeOrUnbounded(defaultSettings, "queue", getAsSizeOrUnbounded(defaultSettings, "queue_size", null)); SizeValue defaultQueueSize = getAsSizeOrUnbounded(defaultSettings, "queue", getAsSizeOrUnbounded(defaultSettings, "queue_size", null));
if (previousExecutorHolder != null) { if (previousExecutorHolder != null) {
if ("fixed".equals(previousInfo.getType())) { if (ThreadPoolType.FIXED == previousInfo.getThreadPoolType()) {
SizeValue updatedQueueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", previousInfo.getQueueSize()))); SizeValue updatedQueueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", previousInfo.getQueueSize())));
if (Objects.equals(previousInfo.getQueueSize(), updatedQueueSize)) { if (Objects.equals(previousInfo.getQueueSize(), updatedQueueSize)) {
int updatedSize = settings.getAsInt("size", previousInfo.getMax()); int updatedSize = settings.getAsInt("size", previousInfo.getMax());
@ -378,7 +462,7 @@ public class ThreadPool extends AbstractComponent {
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setCorePoolSize(updatedSize); ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setCorePoolSize(updatedSize);
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize); ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize);
} }
return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, type, updatedSize, updatedSize, null, updatedQueueSize)); return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, threadPoolType, updatedSize, updatedSize, null, updatedQueueSize));
} }
return previousExecutorHolder; return previousExecutorHolder;
} }
@ -393,13 +477,13 @@ public class ThreadPool extends AbstractComponent {
SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize))); SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize)));
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize); logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize);
Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory); Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory);
return new ExecutorHolder(executor, new Info(name, type, size, size, null, queueSize)); return new ExecutorHolder(executor, new Info(name, threadPoolType, size, size, null, queueSize));
} else if ("scaling".equals(type)) { } else if (ThreadPoolType.SCALING == threadPoolType) {
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
int defaultMin = defaultSettings.getAsInt("min", 1); int defaultMin = defaultSettings.getAsInt("min", 1);
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings));
if (previousExecutorHolder != null) { if (previousExecutorHolder != null) {
if ("scaling".equals(previousInfo.getType())) { if (ThreadPoolType.SCALING == previousInfo.getThreadPoolType()) {
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive()); TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
int updatedMin = settings.getAsInt("min", previousInfo.getMin()); int updatedMin = settings.getAsInt("min", previousInfo.getMin());
int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax())); int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax()));
@ -414,7 +498,7 @@ public class ThreadPool extends AbstractComponent {
if (previousInfo.getMax() != updatedSize) { if (previousInfo.getMax() != updatedSize) {
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize); ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize);
} }
return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, type, updatedMin, updatedSize, updatedKeepAlive, null)); return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, threadPoolType, updatedMin, updatedSize, updatedKeepAlive, null));
} }
return previousExecutorHolder; return previousExecutorHolder;
} }
@ -437,13 +521,13 @@ public class ThreadPool extends AbstractComponent {
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
} }
Executor executor = EsExecutors.newScaling(name, min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); Executor executor = EsExecutors.newScaling(name, min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory);
return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, null)); return new ExecutorHolder(executor, new Info(name, threadPoolType, min, size, keepAlive, null));
} }
throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]"); throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]");
} }
public void updateSettings(Settings settings) { public void updateSettings(Settings settings) {
Map<String, Settings> groupSettings = settings.getGroups("threadpool"); Map<String, Settings> groupSettings = getThreadPoolSettingsGroup(settings);
if (groupSettings.isEmpty()) { if (groupSettings.isEmpty()) {
return; return;
} }
@ -490,6 +574,20 @@ public class ThreadPool extends AbstractComponent {
} }
} }
private void validate(Map<String, Settings> groupSettings) {
for (String key : groupSettings.keySet()) {
if (!THREAD_POOL_TYPES.containsKey(key)) {
continue;
}
String type = groupSettings.get(key).get("type");
ThreadPoolType correctThreadPoolType = THREAD_POOL_TYPES.get(key);
// TODO: the type equality check can be removed after #3760/#6732 are addressed
if (type != null && !correctThreadPoolType.getType().equals(type)) {
throw new IllegalArgumentException("setting " + THREADPOOL_GROUP + key + ".type to " + type + " is not permitted; must be " + correctThreadPoolType.getType());
}
}
}
/** /**
* A thread pool size can also be unbounded and is represented by -1, which is not supported by SizeValue (which only supports positive numbers) * A thread pool size can also be unbounded and is represented by -1, which is not supported by SizeValue (which only supports positive numbers)
*/ */
@ -643,7 +741,7 @@ public class ThreadPool extends AbstractComponent {
public static class Info implements Streamable, ToXContent { public static class Info implements Streamable, ToXContent {
private String name; private String name;
private String type; private ThreadPoolType type;
private int min; private int min;
private int max; private int max;
private TimeValue keepAlive; private TimeValue keepAlive;
@ -653,15 +751,15 @@ public class ThreadPool extends AbstractComponent {
} }
public Info(String name, String type) { public Info(String name, ThreadPoolType type) {
this(name, type, -1); this(name, type, -1);
} }
public Info(String name, String type, int size) { public Info(String name, ThreadPoolType type, int size) {
this(name, type, size, size, null, null); this(name, type, size, size, null, null);
} }
public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize) { public Info(String name, ThreadPoolType type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize) {
this.name = name; this.name = name;
this.type = type; this.type = type;
this.min = min; this.min = min;
@ -674,7 +772,7 @@ public class ThreadPool extends AbstractComponent {
return this.name; return this.name;
} }
public String getType() { public ThreadPoolType getThreadPoolType() {
return this.type; return this.type;
} }
@ -699,7 +797,7 @@ public class ThreadPool extends AbstractComponent {
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
name = in.readString(); name = in.readString();
type = in.readString(); type = ThreadPoolType.fromType(in.readString());
min = in.readInt(); min = in.readInt();
max = in.readInt(); max = in.readInt();
if (in.readBoolean()) { if (in.readBoolean()) {
@ -716,7 +814,7 @@ public class ThreadPool extends AbstractComponent {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeString(name); out.writeString(name);
out.writeString(type); out.writeString(type.getType());
out.writeInt(min); out.writeInt(min);
out.writeInt(max); out.writeInt(max);
if (keepAlive == null) { if (keepAlive == null) {
@ -739,7 +837,7 @@ public class ThreadPool extends AbstractComponent {
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(name, XContentBuilder.FieldCaseConversion.NONE); builder.startObject(name, XContentBuilder.FieldCaseConversion.NONE);
builder.field(Fields.TYPE, type); builder.field(Fields.TYPE, type.getType());
if (min != -1) { if (min != -1) {
builder.field(Fields.MIN, min); builder.field(Fields.MIN, min);
} }
@ -814,4 +912,37 @@ public class ThreadPool extends AbstractComponent {
return false; return false;
} }
public static ThreadPoolTypeSettingsValidator THREAD_POOL_TYPE_SETTINGS_VALIDATOR = new ThreadPoolTypeSettingsValidator();
private static class ThreadPoolTypeSettingsValidator implements Validator {
@Override
public String validate(String setting, String value, ClusterState clusterState) {
// TODO: the type equality validation can be removed after #3760/#6732 are addressed
Matcher matcher = Pattern.compile("threadpool\\.(.*)\\.type").matcher(setting);
if (!matcher.matches()) {
return null;
} else {
String threadPool = matcher.group(1);
ThreadPool.ThreadPoolType defaultThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPool);
ThreadPool.ThreadPoolType threadPoolType;
try {
threadPoolType = ThreadPool.ThreadPoolType.fromType(value);
} catch (IllegalArgumentException e) {
return e.getMessage();
}
if (defaultThreadPoolType.equals(threadPoolType)) {
return null;
} else {
return String.format(
Locale.ROOT,
"thread pool type for [%s] can only be updated to [%s] but was [%s]",
threadPool,
defaultThreadPoolType.getType(),
threadPoolType.getType()
);
}
}
}
}
} }

View file

@ -37,7 +37,6 @@ public class SearchWithRejectionsIT extends ESIntegTestCase {
@Override @Override
public Settings nodeSettings(int nodeOrdinal) { public Settings nodeSettings(int nodeOrdinal) {
return settingsBuilder().put(super.nodeSettings(nodeOrdinal)) return settingsBuilder().put(super.nodeSettings(nodeOrdinal))
.put("threadpool.search.type", "fixed")
.put("threadpool.search.size", 1) .put("threadpool.search.size", 1)
.put("threadpool.search.queue_size", 1) .put("threadpool.search.queue_size", 1)
.build(); .build();

View file

@ -46,20 +46,13 @@ import java.lang.management.ThreadMXBean;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.*;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
/** /**
*/ */
@ -67,7 +60,7 @@ import static org.hamcrest.Matchers.sameInstance;
public class SimpleThreadPoolIT extends ESIntegTestCase { public class SimpleThreadPoolIT extends ESIntegTestCase {
@Override @Override
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {
return Settings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("threadpool.search.type", "cached").build(); return Settings.settingsBuilder().build();
} }
public void testThreadNames() throws Exception { public void testThreadNames() throws Exception {
@ -130,26 +123,23 @@ public class SimpleThreadPoolIT extends ESIntegTestCase {
internalCluster().startNodesAsync(2).get(); internalCluster().startNodesAsync(2).get();
ThreadPool threadPool = internalCluster().getDataNodeInstance(ThreadPool.class); ThreadPool threadPool = internalCluster().getDataNodeInstance(ThreadPool.class);
// Check that settings are changed // Check that settings are changed
assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(5L)); assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue().remainingCapacity(), equalTo(1000));
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.keep_alive", "10m").build()).execute().actionGet(); client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.queue_size", 2000).build()).execute().actionGet();
assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue().remainingCapacity(), equalTo(2000));
// Make sure that threads continue executing when executor is replaced // Make sure that threads continue executing when executor is replaced
final CyclicBarrier barrier = new CyclicBarrier(2); final CyclicBarrier barrier = new CyclicBarrier(2);
Executor oldExecutor = threadPool.executor(Names.SEARCH); Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.executor(Names.SEARCH).execute(new Runnable() { threadPool.executor(Names.SEARCH).execute(() -> {
@Override try {
public void run() { barrier.await();
try { } catch (InterruptedException ex) {
barrier.await(); Thread.currentThread().interrupt();
} catch (InterruptedException ex) { } catch (BrokenBarrierException ex) {
Thread.currentThread().interrupt(); //
} catch (BrokenBarrierException ex) { }
// });
} client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.queue_size", 1000).build()).execute().actionGet();
}
});
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()).execute().actionGet();
assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor))); assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor)));
assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true)); assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true));
assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true)); assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true));
@ -157,24 +147,19 @@ public class SimpleThreadPoolIT extends ESIntegTestCase {
barrier.await(10, TimeUnit.SECONDS); barrier.await(10, TimeUnit.SECONDS);
// Make sure that new thread executor is functional // Make sure that new thread executor is functional
threadPool.executor(Names.SEARCH).execute(new Runnable() { threadPool.executor(Names.SEARCH).execute(() -> {
@Override try {
public void run() { barrier.await();
try { } catch (InterruptedException ex) {
barrier.await(); Thread.currentThread().interrupt();
} catch (InterruptedException ex) { } catch (BrokenBarrierException ex) {
Thread.currentThread().interrupt(); //
} catch (BrokenBarrierException ex) { }
//
} }
} );
}); client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.queue_size", 500)).execute().actionGet();
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()).execute().actionGet();
barrier.await(10, TimeUnit.SECONDS); barrier.await(10, TimeUnit.SECONDS);
// This was here: Thread.sleep(200);
// Why? What was it for?
// Check that node info is correct // Check that node info is correct
NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().all().execute().actionGet(); NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().all().execute().actionGet();
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
@ -182,7 +167,7 @@ public class SimpleThreadPoolIT extends ESIntegTestCase {
boolean found = false; boolean found = false;
for (ThreadPool.Info info : nodeInfo.getThreadPool()) { for (ThreadPool.Info info : nodeInfo.getThreadPool()) {
if (info.getName().equals(Names.SEARCH)) { if (info.getName().equals(Names.SEARCH)) {
assertThat(info.getType(), equalTo("fixed")); assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
found = true; found = true;
break; break;
} }

View file

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.threadpool; package org.elasticsearch.threadpool;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -30,7 +31,9 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.settings.Settings.settingsBuilder;
@ -44,9 +47,16 @@ import static org.hamcrest.Matchers.nullValue;
*/ */
public class ThreadPoolSerializationTests extends ESTestCase { public class ThreadPoolSerializationTests extends ESTestCase {
BytesStreamOutput output = new BytesStreamOutput(); BytesStreamOutput output = new BytesStreamOutput();
private ThreadPool.ThreadPoolType threadPoolType;
@Before
public void setUp() throws Exception {
super.setUp();
threadPoolType = randomFrom(ThreadPool.ThreadPoolType.values());
}
public void testThatQueueSizeSerializationWorks() throws Exception { public void testThatQueueSizeSerializationWorks() throws Exception {
ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("10k")); ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("10k"));
output.setVersion(Version.CURRENT); output.setVersion(Version.CURRENT);
info.writeTo(output); info.writeTo(output);
@ -58,7 +68,7 @@ public class ThreadPoolSerializationTests extends ESTestCase {
} }
public void testThatNegativeQueueSizesCanBeSerialized() throws Exception { public void testThatNegativeQueueSizesCanBeSerialized() throws Exception {
ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), null); ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), null);
output.setVersion(Version.CURRENT); output.setVersion(Version.CURRENT);
info.writeTo(output); info.writeTo(output);
@ -70,7 +80,7 @@ public class ThreadPoolSerializationTests extends ESTestCase {
} }
public void testThatToXContentWritesOutUnboundedCorrectly() throws Exception { public void testThatToXContentWritesOutUnboundedCorrectly() throws Exception {
ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), null); ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), null);
XContentBuilder builder = jsonBuilder(); XContentBuilder builder = jsonBuilder();
builder.startObject(); builder.startObject();
info.toXContent(builder, ToXContent.EMPTY_PARAMS); info.toXContent(builder, ToXContent.EMPTY_PARAMS);
@ -95,7 +105,7 @@ public class ThreadPoolSerializationTests extends ESTestCase {
} }
public void testThatToXContentWritesInteger() throws Exception { public void testThatToXContentWritesInteger() throws Exception {
ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("1k")); ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("1k"));
XContentBuilder builder = jsonBuilder(); XContentBuilder builder = jsonBuilder();
builder.startObject(); builder.startObject();
info.toXContent(builder, ToXContent.EMPTY_PARAMS); info.toXContent(builder, ToXContent.EMPTY_PARAMS);
@ -111,4 +121,16 @@ public class ThreadPoolSerializationTests extends ESTestCase {
assertThat(map, hasKey("queue_size")); assertThat(map, hasKey("queue_size"));
assertThat(map.get("queue_size").toString(), is("1000")); assertThat(map.get("queue_size").toString(), is("1000"));
} }
public void testThatThreadPoolTypeIsSerializedCorrectly() throws IOException {
ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType);
output.setVersion(Version.CURRENT);
info.writeTo(output);
StreamInput input = StreamInput.wrap(output.bytes());
ThreadPool.Info newInfo = new ThreadPool.Info();
newInfo.readFrom(input);
assertThat(newInfo.getThreadPoolType(), is(threadPoolType));
}
} }

View file

@ -0,0 +1,54 @@
package org.elasticsearch.threadpool;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.settings.Validator;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.*;
import static org.junit.Assert.*;
public class ThreadPoolTypeSettingsValidatorTests extends ESTestCase {
private Validator validator;
@Before
public void setUp() throws Exception {
super.setUp();
validator = ThreadPool.THREAD_POOL_TYPE_SETTINGS_VALIDATOR;
}
public void testValidThreadPoolTypeSettings() {
for (Map.Entry<String, ThreadPool.ThreadPoolType> entry : ThreadPool.THREAD_POOL_TYPES.entrySet()) {
assertNull(validateSetting(validator, entry.getKey(), entry.getValue().getType()));
}
}
public void testInvalidThreadPoolTypeSettings() {
for (Map.Entry<String, ThreadPool.ThreadPoolType> entry : ThreadPool.THREAD_POOL_TYPES.entrySet()) {
Set<ThreadPool.ThreadPoolType> set = new HashSet<>();
set.addAll(Arrays.asList(ThreadPool.ThreadPoolType.values()));
set.remove(entry.getValue());
ThreadPool.ThreadPoolType invalidThreadPoolType = randomFrom(set.toArray(new ThreadPool.ThreadPoolType[set.size()]));
String expectedMessage = String.format(
Locale.ROOT,
"thread pool type for [%s] can only be updated to [%s] but was [%s]",
entry.getKey(),
entry.getValue().getType(),
invalidThreadPoolType.getType());
String message = validateSetting(validator, entry.getKey(), invalidThreadPoolType.getType());
assertNotNull(message);
assertEquals(expectedMessage, message);
}
}
public void testNonThreadPoolTypeSetting() {
String setting = ThreadPool.THREADPOOL_GROUP + randomAsciiOfLength(10) + "foo";
String value = randomAsciiOfLength(10);
assertNull(validator.validate(setting, value, ClusterState.PROTO));
}
private String validateSetting(Validator validator, String threadPoolName, String value) {
return validator.validate(ThreadPool.THREADPOOL_GROUP + threadPoolName + ".type", value, ClusterState.PROTO);
}
}

View file

@ -25,22 +25,330 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.threadpool.ThreadPool.Names;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
/** /**
*/ */
public class UpdateThreadPoolSettingsTests extends ESTestCase { public class UpdateThreadPoolSettingsTests extends ESTestCase {
public void testCorrectThreadPoolTypePermittedInSettings() throws InterruptedException {
String threadPoolName = randomThreadPoolName();
ThreadPool.ThreadPoolType correctThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPoolName);
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(settingsBuilder()
.put("name", "testCorrectThreadPoolTypePermittedInSettings")
.put("threadpool." + threadPoolName + ".type", correctThreadPoolType.getType())
.build());
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), correctThreadPoolType);
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
public void testThreadPoolCanNotOverrideThreadPoolType() throws InterruptedException {
String threadPoolName = randomThreadPoolName();
ThreadPool.ThreadPoolType incorrectThreadPoolType = randomIncorrectThreadPoolType(threadPoolName);
ThreadPool.ThreadPoolType correctThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPoolName);
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(
settingsBuilder()
.put("name", "testThreadPoolCanNotOverrideThreadPoolType")
.put("threadpool." + threadPoolName + ".type", incorrectThreadPoolType.getType())
.build());
terminate(threadPool);
fail("expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertThat(
e.getMessage(),
is("setting threadpool." + threadPoolName + ".type to " + incorrectThreadPoolType.getType() + " is not permitted; must be " + correctThreadPoolType.getType()));
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
public void testUpdateSettingsCanNotChangeThreadPoolType() throws InterruptedException {
String threadPoolName = randomThreadPoolName();
ThreadPool.ThreadPoolType invalidThreadPoolType = randomIncorrectThreadPoolType(threadPoolName);
ThreadPool.ThreadPoolType validThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPoolName);
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(settingsBuilder().put("name", "testUpdateSettingsCanNotChangeThreadPoolType").build());
threadPool.updateSettings(
settingsBuilder()
.put("threadpool." + threadPoolName + ".type", invalidThreadPoolType.getType())
.build()
);
fail("expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertThat(
e.getMessage(),
is("setting threadpool." + threadPoolName + ".type to " + invalidThreadPoolType.getType() + " is not permitted; must be " + validThreadPoolType.getType()));
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
public void testCachedExecutorType() throws InterruptedException {
String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.CACHED);
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(
Settings.settingsBuilder()
.put("name", "testCachedExecutorType").build());
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED);
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
threadPool.updateSettings(settingsBuilder()
.put("threadpool." + threadPoolName + ".keep_alive", "10m")
.build());
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED);
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(0));
// Make sure keep alive value changed
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
// Make sure keep alive value reused
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L));
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
// Change keep alive
Executor oldExecutor = threadPool.executor(threadPoolName);
threadPool.updateSettings(settingsBuilder().put("threadpool." + threadPoolName + ".keep_alive", "1m").build());
// Make sure keep alive value changed
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(1L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L));
// Make sure executor didn't change
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED);
assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor));
// Set the same keep alive
threadPool.updateSettings(settingsBuilder().put("threadpool." + threadPoolName + ".keep_alive", "1m").build());
// Make sure keep alive value didn't change
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(1L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L));
// Make sure executor didn't change
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED);
assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor));
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
public void testFixedExecutorType() throws InterruptedException {
String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED);
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(settingsBuilder()
.put("name", "testCachedExecutorType").build());
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
threadPool.updateSettings(settingsBuilder()
.put("threadpool." + threadPoolName + ".size", "15")
.build());
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(15));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15));
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(15));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15));
// keep alive does not apply to fixed thread pools
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(0L));
// Put old type back
threadPool.updateSettings(Settings.EMPTY);
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
// Make sure keep alive value is not used
assertThat(info(threadPool, threadPoolName).getKeepAlive(), nullValue());
// Make sure keep pool size value were reused
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(15));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15));
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(15));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15));
// Change size
Executor oldExecutor = threadPool.executor(threadPoolName);
threadPool.updateSettings(settingsBuilder().put("threadpool." + threadPoolName + ".size", "10").build());
// Make sure size values changed
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10));
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(10));
// Make sure executor didn't change
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor));
// Change queue capacity
threadPool.updateSettings(settingsBuilder()
.put("threadpool." + threadPoolName + ".queue", "500")
.build());
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
public void testScalingExecutorType() throws InterruptedException {
String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING);
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(settingsBuilder()
.put("threadpool." + threadPoolName + ".size", 10)
.put("name", "testCachedExecutorType").build());
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(1));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10));
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(5L));
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING);
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
// Change settings that doesn't require pool replacement
Executor oldExecutor = threadPool.executor(threadPoolName);
threadPool.updateSettings(settingsBuilder()
.put("threadpool." + threadPoolName + ".keep_alive", "10m")
.put("threadpool." + threadPoolName + ".min", "2")
.put("threadpool." + threadPoolName + ".size", "15")
.build());
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING);
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(2));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15));
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(2));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15));
// Make sure keep alive value changed
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor));
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
public void testShutdownNowInterrupts() throws Exception {
String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED);
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(Settings.settingsBuilder()
.put("threadpool." + threadPoolName + ".queue_size", 1000)
.put("name", "testCachedExecutorType").build());
assertEquals(info(threadPool, threadPoolName).getQueueSize().getSingles(), 1000L);
final CountDownLatch latch = new CountDownLatch(1);
ThreadPoolExecutor oldExecutor = (ThreadPoolExecutor) threadPool.executor(threadPoolName);
threadPool.executor(threadPoolName).execute(() -> {
try {
new CountDownLatch(1).await();
} catch (InterruptedException ex) {
latch.countDown();
Thread.currentThread().interrupt();
}
}
);
threadPool.updateSettings(settingsBuilder().put("threadpool." + threadPoolName + ".queue_size", 2000).build());
assertThat(threadPool.executor(threadPoolName), not(sameInstance(oldExecutor)));
assertThat(oldExecutor.isShutdown(), equalTo(true));
assertThat(oldExecutor.isTerminating(), equalTo(true));
assertThat(oldExecutor.isTerminated(), equalTo(false));
threadPool.shutdownNow(); // should interrupt the thread
latch.await(3, TimeUnit.SECONDS); // If this throws then ThreadPool#shutdownNow didn't interrupt
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
public void testCustomThreadPool() throws Exception {
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(Settings.settingsBuilder()
.put("threadpool.my_pool1.type", "scaling")
.put("threadpool.my_pool2.type", "fixed")
.put("threadpool.my_pool2.size", "1")
.put("threadpool.my_pool2.queue_size", "1")
.put("name", "testCustomThreadPool").build());
ThreadPoolInfo groups = threadPool.info();
boolean foundPool1 = false;
boolean foundPool2 = false;
outer:
for (ThreadPool.Info info : groups) {
if ("my_pool1".equals(info.getName())) {
foundPool1 = true;
assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING);
} else if ("my_pool2".equals(info.getName())) {
foundPool2 = true;
assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
assertThat(info.getMin(), equalTo(1));
assertThat(info.getMax(), equalTo(1));
assertThat(info.getQueueSize().singles(), equalTo(1l));
} else {
for (Field field : Names.class.getFields()) {
if (info.getName().equalsIgnoreCase(field.getName())) {
// This is ok it is a default thread pool
continue outer;
}
}
fail("Unexpected pool name: " + info.getName());
}
}
assertThat(foundPool1, is(true));
assertThat(foundPool2, is(true));
// Updating my_pool2
Settings settings = Settings.builder()
.put("threadpool.my_pool2.size", "10")
.build();
threadPool.updateSettings(settings);
groups = threadPool.info();
foundPool1 = false;
foundPool2 = false;
outer:
for (ThreadPool.Info info : groups) {
if ("my_pool1".equals(info.getName())) {
foundPool1 = true;
assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING);
} else if ("my_pool2".equals(info.getName())) {
foundPool2 = true;
assertThat(info.getMax(), equalTo(10));
assertThat(info.getMin(), equalTo(10));
assertThat(info.getQueueSize().singles(), equalTo(1l));
assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
} else {
for (Field field : Names.class.getFields()) {
if (info.getName().equalsIgnoreCase(field.getName())) {
// This is ok it is a default thread pool
continue outer;
}
}
fail("Unexpected pool name: " + info.getName());
}
}
assertThat(foundPool1, is(true));
assertThat(foundPool2, is(true));
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
private void terminateThreadPoolIfNeeded(ThreadPool threadPool) throws InterruptedException {
if (threadPool != null) {
terminate(threadPool);
}
}
private ThreadPool.Info info(ThreadPool threadPool, String name) { private ThreadPool.Info info(ThreadPool threadPool, String name) {
for (ThreadPool.Info info : threadPool.info()) { for (ThreadPool.Info info : threadPool.info()) {
if (info.getName().equals(name)) { if (info.getName().equals(name)) {
@ -50,247 +358,20 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
return null; return null;
} }
public void testCachedExecutorType() throws InterruptedException { private String randomThreadPoolName() {
ThreadPool threadPool = new ThreadPool( Set<String> threadPoolNames = ThreadPool.THREAD_POOL_TYPES.keySet();
Settings.settingsBuilder() return randomFrom(threadPoolNames.toArray(new String[threadPoolNames.size()]));
.put("threadpool.search.type", "cached")
.put("name","testCachedExecutorType").build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(5L));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
// Replace with different type
threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "same").build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("same"));
assertThat(threadPool.executor(Names.SEARCH), is(ThreadPool.DIRECT_EXECUTOR));
// Replace with different type again
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "scaling")
.put("threadpool.search.keep_alive", "10m")
.build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(1));
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
// Put old type back
threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "cached").build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
// Make sure keep alive value reused
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
// Change keep alive
Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build());
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L));
// Make sure executor didn't change
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
// Set the same keep alive
threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build());
// Make sure keep alive value didn't change
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L));
// Make sure executor didn't change
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
terminate(threadPool);
} }
public void testFixedExecutorType() throws InterruptedException { private ThreadPool.ThreadPoolType randomIncorrectThreadPoolType(String threadPoolName) {
ThreadPool threadPool = new ThreadPool(settingsBuilder() Set<ThreadPool.ThreadPoolType> set = new HashSet<>();
.put("threadpool.search.type", "fixed") set.addAll(Arrays.asList(ThreadPool.ThreadPoolType.values()));
.put("name","testCachedExecutorType").build()); set.remove(ThreadPool.THREAD_POOL_TYPES.get(threadPoolName));
ThreadPool.ThreadPoolType invalidThreadPoolType = randomFrom(set.toArray(new ThreadPool.ThreadPoolType[set.size()]));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); return invalidThreadPoolType;
// Replace with different type
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "scaling")
.put("threadpool.search.keep_alive", "10m")
.put("threadpool.search.min", "2")
.put("threadpool.search.size", "15")
.build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(2));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15));
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
// Put old type back
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "fixed")
.build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("fixed"));
// Make sure keep alive value is not used
assertThat(info(threadPool, Names.SEARCH).getKeepAlive(), nullValue());
// Make sure keep pool size value were reused
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(15));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15));
// Change size
Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.updateSettings(settingsBuilder().put("threadpool.search.size", "10").build());
// Make sure size values changed
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10));
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(10));
// Make sure executor didn't change
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("fixed"));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
// Change queue capacity
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.queue", "500")
.build());
terminate(threadPool);
} }
public void testScalingExecutorType() throws InterruptedException { private String randomThreadPool(ThreadPool.ThreadPoolType type) {
ThreadPool threadPool = new ThreadPool(settingsBuilder() return randomFrom(ThreadPool.THREAD_POOL_TYPES.entrySet().stream().filter(t -> t.getValue().equals(type)).map(t -> t.getKey()).collect(Collectors.toList()));
.put("threadpool.search.type", "scaling")
.put("threadpool.search.size", 10)
.put("name","testCachedExecutorType").build());
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(1));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10));
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(5L));
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
// Change settings that doesn't require pool replacement
Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "scaling")
.put("threadpool.search.keep_alive", "10m")
.put("threadpool.search.min", "2")
.put("threadpool.search.size", "15")
.build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(2));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15));
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
terminate(threadPool);
} }
public void testShutdownNowInterrupts() throws Exception {
ThreadPool threadPool = new ThreadPool(Settings.settingsBuilder()
.put("threadpool.search.type", "cached")
.put("name","testCachedExecutorType").build());
final CountDownLatch latch = new CountDownLatch(1);
ThreadPoolExecutor oldExecutor = (ThreadPoolExecutor) threadPool.executor(Names.SEARCH);
threadPool.executor(Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
try {
new CountDownLatch(1).await();
} catch (InterruptedException ex) {
latch.countDown();
Thread.currentThread().interrupt();
}
}
});
threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "fixed").build());
assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor)));
assertThat(oldExecutor.isShutdown(), equalTo(true));
assertThat(oldExecutor.isTerminating(), equalTo(true));
assertThat(oldExecutor.isTerminated(), equalTo(false));
threadPool.shutdownNow(); // should interrupt the thread
latch.await(3, TimeUnit.SECONDS); // If this throws then shotdownNow didn't interrupt
terminate(threadPool);
}
public void testCustomThreadPool() throws Exception {
ThreadPool threadPool = new ThreadPool(Settings.settingsBuilder()
.put("threadpool.my_pool1.type", "cached")
.put("threadpool.my_pool2.type", "fixed")
.put("threadpool.my_pool2.size", "1")
.put("threadpool.my_pool2.queue_size", "1")
.put("name", "testCustomThreadPool").build());
ThreadPoolInfo groups = threadPool.info();
boolean foundPool1 = false;
boolean foundPool2 = false;
outer: for (ThreadPool.Info info : groups) {
if ("my_pool1".equals(info.getName())) {
foundPool1 = true;
assertThat(info.getType(), equalTo("cached"));
} else if ("my_pool2".equals(info.getName())) {
foundPool2 = true;
assertThat(info.getType(), equalTo("fixed"));
assertThat(info.getMin(), equalTo(1));
assertThat(info.getMax(), equalTo(1));
assertThat(info.getQueueSize().singles(), equalTo(1l));
} else {
for (Field field : Names.class.getFields()) {
if (info.getName().equalsIgnoreCase(field.getName())) {
// This is ok it is a default thread pool
continue outer;
}
}
fail("Unexpected pool name: " + info.getName());
}
}
assertThat(foundPool1, is(true));
assertThat(foundPool2, is(true));
// Updating my_pool2
Settings settings = Settings.builder()
.put("threadpool.my_pool2.size", "10")
.build();
threadPool.updateSettings(settings);
groups = threadPool.info();
foundPool1 = false;
foundPool2 = false;
outer: for (ThreadPool.Info info : groups) {
if ("my_pool1".equals(info.getName())) {
foundPool1 = true;
assertThat(info.getType(), equalTo("cached"));
} else if ("my_pool2".equals(info.getName())) {
foundPool2 = true;
assertThat(info.getMax(), equalTo(10));
assertThat(info.getMin(), equalTo(10));
assertThat(info.getQueueSize().singles(), equalTo(1l));
assertThat(info.getType(), equalTo("fixed"));
} else {
for (Field field : Names.class.getFields()) {
if (info.getName().equalsIgnoreCase(field.getName())) {
// This is ok it is a default thread pool
continue outer;
}
}
fail("Unexpected pool name: " + info.getName());
}
}
assertThat(foundPool1, is(true));
assertThat(foundPool2, is(true));
terminate(threadPool);
}
} }

View file

@ -389,4 +389,12 @@ request cache and the field data cache.
This setting would arbitrarily pick the first interface not marked as loopback. Instead, specify by address This setting would arbitrarily pick the first interface not marked as loopback. Instead, specify by address
scope (e.g. `_local_,_site_` for all loopback and private network addresses) or by explicit interface names, scope (e.g. `_local_,_site_` for all loopback and private network addresses) or by explicit interface names,
hostnames, or addresses. hostnames, or addresses.
=== Forbid changing of thread pool types
Previously, <<modules-threadpool,thread pool types>> could be dynamically adjusted. The thread pool type effectively
controls the backing queue for the thread pool and modifying this is an expert setting with minimal practical benefits
and high risk of being misused. The ability to change the thread pool type for any thread pool has been removed; do note
that it is still possible to adjust relevant thread pool parameters for each of the thread pools (e.g., depending on
the thread pool type, `keep_alive`, `queue_size`, etc.).

View file

@ -9,87 +9,92 @@ of discarded.
There are several thread pools, but the important ones include: There are several thread pools, but the important ones include:
`generic`::
For generic operations (e.g., background node discovery).
Thread pool type is `cached`.
`index`:: `index`::
For index/delete operations. Defaults to `fixed` For index/delete operations. Thread pool type is `fixed`
with a size of `# of available processors`, with a size of `# of available processors`,
queue_size of `200`. queue_size of `200`.
`search`:: `search`::
For count/search operations. Defaults to `fixed` For count/search operations. Thread pool type is `fixed`
with a size of `int((# of available_processors * 3) / 2) + 1`, with a size of `int((# of available_processors * 3) / 2) + 1`,
queue_size of `1000`. queue_size of `1000`.
`suggest`:: `suggest`::
For suggest operations. Defaults to `fixed` For suggest operations. Thread pool type is `fixed`
with a size of `# of available processors`, with a size of `# of available processors`,
queue_size of `1000`. queue_size of `1000`.
`get`:: `get`::
For get operations. Defaults to `fixed` For get operations. Thread pool type is `fixed`
with a size of `# of available processors`, with a size of `# of available processors`,
queue_size of `1000`. queue_size of `1000`.
`bulk`:: `bulk`::
For bulk operations. Defaults to `fixed` For bulk operations. Thread pool type is `fixed`
with a size of `# of available processors`, with a size of `# of available processors`,
queue_size of `50`. queue_size of `50`.
`percolate`:: `percolate`::
For percolate operations. Defaults to `fixed` For percolate operations. Thread pool type is `fixed`
with a size of `# of available processors`, with a size of `# of available processors`,
queue_size of `1000`. queue_size of `1000`.
`snapshot`:: `snapshot`::
For snapshot/restore operations. Defaults to `scaling` with a For snapshot/restore operations. Thread pool type is `scaling` with a
keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`, max at 5. keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`.
`warmer`:: `warmer`::
For segment warm-up operations. Defaults to `scaling` with a For segment warm-up operations. Thread pool type is `scaling` with a
keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`, max at 5. keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`.
`refresh`:: `refresh`::
For refresh operations. Defaults to `scaling` with a For refresh operations. Thread pool type is `scaling` with a
keep-alive of `5m` and a size of `min(10, (# of available processors)/2)`, max at 10. keep-alive of `5m` and a size of `min(10, (# of available processors)/2)`.
`listener`:: `listener`::
Mainly for java client executing of action when listener threaded is set to true. Mainly for java client executing of action when listener threaded is set to true.
Default size of `(# of available processors)/2`, max at 10. Thread pool type is `scaling` with a default size of `min(10, (# of available processors)/2)`.
Changing a specific thread pool can be done by setting its type and Changing a specific thread pool can be done by setting its type-specific parameters; for example, changing the `index`
specific type parameters, for example, changing the `index` thread pool thread pool to have more threads:
to have more threads:
[source,js] [source,js]
-------------------------------------------------- --------------------------------------------------
threadpool: threadpool:
index: index:
type: fixed
size: 30 size: 30
-------------------------------------------------- --------------------------------------------------
NOTE: you can update threadpool settings live using NOTE: you can update thread pool settings dynamically using <<cluster-update-settings>>.
<<cluster-update-settings>>.
[float] [float]
[[types]] [[types]]
=== Thread pool types === Thread pool types
The following are the types of thread pools that can be used and their The following are the types of thread pools and their respective parameters:
respective parameters:
[float] [float]
==== `cache` ==== `cached`
The `cache` thread pool is an unbounded thread pool that will spawn a The `cached` thread pool is an unbounded thread pool that will spawn a
thread if there are pending requests. Here is an example of how to set thread if there are pending requests. This thread pool is used to
it: prevent requests submitted to this pool from blocking or being
rejected. Unused threads in this thread pool will be terminated after
a keep alive expires (defaults to five minutes). The `cached` thread
pool is reserved for the <<modules-threadpool,`generic`>> thread pool.
The `keep_alive` parameter determines how long a thread should be kept
around in the thread pool without doing any work.
[source,js] [source,js]
-------------------------------------------------- --------------------------------------------------
threadpool: threadpool:
index: generic:
type: cached keep_alive: 2m
-------------------------------------------------- --------------------------------------------------
[float] [float]
@ -111,7 +116,6 @@ full, it will abort the request.
-------------------------------------------------- --------------------------------------------------
threadpool: threadpool:
index: index:
type: fixed
size: 30 size: 30
queue_size: 1000 queue_size: 1000
-------------------------------------------------- --------------------------------------------------
@ -130,7 +134,6 @@ around in the thread pool without it doing any work.
-------------------------------------------------- --------------------------------------------------
threadpool: threadpool:
warmer: warmer:
type: scaling
size: 8 size: 8
keep_alive: 2m keep_alive: 2m
-------------------------------------------------- --------------------------------------------------

View file

@ -64,10 +64,10 @@ import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStoreConfig; import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
@ -88,7 +88,6 @@ import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.store.MockFSIndexStore;
import org.elasticsearch.test.transport.AssertingLocalTransport; import org.elasticsearch.test.transport.AssertingLocalTransport;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty.NettyTransport; import org.elasticsearch.transport.netty.NettyTransport;
@ -98,20 +97,11 @@ import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.ArrayList; import java.util.*;
import java.util.Arrays; import java.util.concurrent.ExecutionException;
import java.util.Collection; import java.util.concurrent.ExecutorService;
import java.util.Collections; import java.util.concurrent.Future;
import java.util.HashMap; import java.util.concurrent.TimeUnit;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -119,15 +109,11 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static junit.framework.Assert.fail; import static junit.framework.Assert.fail;
import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY; import static org.apache.lucene.util.LuceneTestCase.*;
import static org.apache.lucene.util.LuceneTestCase.rarely;
import static org.apache.lucene.util.LuceneTestCase.usually;
import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.elasticsearch.test.ESTestCase.assertBusy;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
/** /**
@ -404,18 +390,6 @@ public final class InternalTestCluster extends TestCluster {
if (random.nextBoolean()) { // sometimes set a if (random.nextBoolean()) { // sometimes set a
builder.put(SearchService.DEFAULT_KEEPALIVE_KEY, TimeValue.timeValueSeconds(100 + random.nextInt(5 * 60))); builder.put(SearchService.DEFAULT_KEEPALIVE_KEY, TimeValue.timeValueSeconds(100 + random.nextInt(5 * 60)));
} }
if (random.nextBoolean()) {
// change threadpool types to make sure we don't have components that rely on the type of thread pools
for (String name : Arrays.asList(ThreadPool.Names.BULK, ThreadPool.Names.FLUSH, ThreadPool.Names.GET,
ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.FORCE_MERGE,
ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT,
ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) {
if (random.nextBoolean()) {
final String type = RandomPicks.randomFrom(random, Arrays.asList("fixed", "cached", "scaling"));
builder.put(ThreadPool.THREADPOOL_GROUP + name + ".type", type);
}
}
}
if (random.nextInt(10) == 0) { if (random.nextInt(10) == 0) {
// node gets an extra cpu this time // node gets an extra cpu this time