mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 09:28:55 -04:00
ThreadPool and ThreadContext are not closeable (#43249)
This commit changes the ThreadContext to just use a regular ThreadLocal over the lucene CloseableThreadLocal. The CloseableThreadLocal solves issues with ThreadLocals that are no longer needed during runtime but in the case of the ThreadContext, we need it for the runtime of the node and it is typically not closed until the node closes, so we miss out on the benefits that this class provides. Additionally by removing the close logic, we simplify code in other places that deal with exceptions and tracking to see if it happens when the node is closing. Closes #42577
This commit is contained in:
parent
e0aa9107e3
commit
0260c6f55c
21 changed files with 641 additions and 839 deletions
|
@ -387,80 +387,77 @@ public class JsonLoggerTests extends ESTestCase {
|
||||||
public void testDuplicateLogMessages() throws IOException {
|
public void testDuplicateLogMessages() throws IOException {
|
||||||
final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger("test"));
|
final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger("test"));
|
||||||
|
|
||||||
|
|
||||||
// For the same key and X-Opaque-ID deprecation should be once
|
// For the same key and X-Opaque-ID deprecation should be once
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
try{
|
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||||
threadContext.putHeader(Task.X_OPAQUE_ID, "ID1");
|
threadContext.putHeader(Task.X_OPAQUE_ID, "ID1");
|
||||||
DeprecationLogger.setThreadContext(threadContext);
|
DeprecationLogger.setThreadContext(threadContext);
|
||||||
deprecationLogger.deprecatedAndMaybeLog("key", "message1");
|
deprecationLogger.deprecatedAndMaybeLog("key", "message1");
|
||||||
deprecationLogger.deprecatedAndMaybeLog("key", "message2");
|
deprecationLogger.deprecatedAndMaybeLog("key", "message2");
|
||||||
assertWarnings("message1", "message2");
|
assertWarnings("message1", "message2");
|
||||||
|
|
||||||
final Path path = PathUtils.get(System.getProperty("es.logs.base_path"),
|
final Path path = PathUtils.get(System.getProperty("es.logs.base_path"),
|
||||||
System.getProperty("es.logs.cluster_name") + "_deprecated.json");
|
System.getProperty("es.logs.cluster_name") + "_deprecated.json");
|
||||||
try (Stream<Map<String, String>> stream = JsonLogsStream.mapStreamFrom(path)) {
|
try (Stream<Map<String, String>> stream = JsonLogsStream.mapStreamFrom(path)) {
|
||||||
List<Map<String, String>> jsonLogs = stream
|
List<Map<String, String>> jsonLogs = stream
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
assertThat(jsonLogs, contains(
|
assertThat(jsonLogs, contains(
|
||||||
allOf(
|
allOf(
|
||||||
hasEntry("type", "deprecation"),
|
hasEntry("type", "deprecation"),
|
||||||
hasEntry("level", "WARN"),
|
hasEntry("level", "WARN"),
|
||||||
hasEntry("component", "d.test"),
|
hasEntry("component", "d.test"),
|
||||||
hasEntry("cluster.name", "elasticsearch"),
|
hasEntry("cluster.name", "elasticsearch"),
|
||||||
hasEntry("node.name", "sample-name"),
|
hasEntry("node.name", "sample-name"),
|
||||||
hasEntry("message", "message1"),
|
hasEntry("message", "message1"),
|
||||||
hasEntry("x-opaque-id", "ID1"))
|
hasEntry("x-opaque-id", "ID1"))
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
|
||||||
}finally{
|
|
||||||
DeprecationLogger.removeThreadContext(threadContext);
|
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
DeprecationLogger.removeThreadContext(threadContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// For the same key and different X-Opaque-ID should be multiple times per key/x-opaque-id
|
// For the same key and different X-Opaque-ID should be multiple times per key/x-opaque-id
|
||||||
//continuing with message1-ID1 in logs already, adding a new deprecation log line with message2-ID2
|
//continuing with message1-ID1 in logs already, adding a new deprecation log line with message2-ID2
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||||
try{
|
threadContext.putHeader(Task.X_OPAQUE_ID, "ID2");
|
||||||
threadContext.putHeader(Task.X_OPAQUE_ID, "ID2");
|
DeprecationLogger.setThreadContext(threadContext);
|
||||||
DeprecationLogger.setThreadContext(threadContext);
|
deprecationLogger.deprecatedAndMaybeLog("key", "message1");
|
||||||
deprecationLogger.deprecatedAndMaybeLog("key", "message1");
|
deprecationLogger.deprecatedAndMaybeLog("key", "message2");
|
||||||
deprecationLogger.deprecatedAndMaybeLog("key", "message2");
|
assertWarnings("message1", "message2");
|
||||||
assertWarnings("message1", "message2");
|
|
||||||
|
|
||||||
final Path path = PathUtils.get(System.getProperty("es.logs.base_path"),
|
final Path path = PathUtils.get(System.getProperty("es.logs.base_path"),
|
||||||
System.getProperty("es.logs.cluster_name") + "_deprecated.json");
|
System.getProperty("es.logs.cluster_name") + "_deprecated.json");
|
||||||
try (Stream<Map<String, String>> stream = JsonLogsStream.mapStreamFrom(path)) {
|
try (Stream<Map<String, String>> stream = JsonLogsStream.mapStreamFrom(path)) {
|
||||||
List<Map<String, String>> jsonLogs = stream
|
List<Map<String, String>> jsonLogs = stream
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
assertThat(jsonLogs, contains(
|
assertThat(jsonLogs, contains(
|
||||||
allOf(
|
allOf(
|
||||||
hasEntry("type", "deprecation"),
|
hasEntry("type", "deprecation"),
|
||||||
hasEntry("level", "WARN"),
|
hasEntry("level", "WARN"),
|
||||||
hasEntry("component", "d.test"),
|
hasEntry("component", "d.test"),
|
||||||
hasEntry("cluster.name", "elasticsearch"),
|
hasEntry("cluster.name", "elasticsearch"),
|
||||||
hasEntry("node.name", "sample-name"),
|
hasEntry("node.name", "sample-name"),
|
||||||
hasEntry("message", "message1"),
|
hasEntry("message", "message1"),
|
||||||
hasEntry("x-opaque-id", "ID1")
|
hasEntry("x-opaque-id", "ID1")
|
||||||
),
|
),
|
||||||
allOf(
|
allOf(
|
||||||
hasEntry("type", "deprecation"),
|
hasEntry("type", "deprecation"),
|
||||||
hasEntry("level", "WARN"),
|
hasEntry("level", "WARN"),
|
||||||
hasEntry("component", "d.test"),
|
hasEntry("component", "d.test"),
|
||||||
hasEntry("cluster.name", "elasticsearch"),
|
hasEntry("cluster.name", "elasticsearch"),
|
||||||
hasEntry("node.name", "sample-name"),
|
hasEntry("node.name", "sample-name"),
|
||||||
hasEntry("message", "message1"),
|
hasEntry("message", "message1"),
|
||||||
hasEntry("x-opaque-id", "ID2")
|
hasEntry("x-opaque-id", "ID2")
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
|
||||||
}finally{
|
|
||||||
DeprecationLogger.removeThreadContext(threadContext);
|
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
DeprecationLogger.removeThreadContext(threadContext);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -262,7 +262,6 @@ public class DeprecationLogger {
|
||||||
|
|
||||||
public String getXOpaqueId(Set<ThreadContext> threadContexts) {
|
public String getXOpaqueId(Set<ThreadContext> threadContexts) {
|
||||||
return threadContexts.stream()
|
return threadContexts.stream()
|
||||||
.filter(t -> t.isClosed() == false)
|
|
||||||
.filter(t -> t.getHeader(Task.X_OPAQUE_ID) != null)
|
.filter(t -> t.getHeader(Task.X_OPAQUE_ID) != null)
|
||||||
.findFirst()
|
.findFirst()
|
||||||
.map(t -> t.getHeader(Task.X_OPAQUE_ID))
|
.map(t -> t.getHeader(Task.X_OPAQUE_ID))
|
||||||
|
|
|
@ -106,17 +106,8 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean assertDefaultContext(Runnable r) {
|
private boolean assertDefaultContext(Runnable r) {
|
||||||
try {
|
assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" +
|
||||||
assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" +
|
Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]";
|
||||||
Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]";
|
|
||||||
} catch (IllegalStateException ex) {
|
|
||||||
// sometimes we execute on a closed context and isDefaultContext doen't bypass the ensureOpen checks
|
|
||||||
// this must not trigger an exception here since we only assert if the default is restored and
|
|
||||||
// we don't really care if we are closed
|
|
||||||
if (contextHolder.isClosed() == false) {
|
|
||||||
throw ex;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,9 +20,9 @@ package org.elasticsearch.common.util.concurrent;
|
||||||
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.lucene.util.CloseableThreadLocal;
|
|
||||||
import org.elasticsearch.action.support.ContextPreservingActionListener;
|
import org.elasticsearch.action.support.ContextPreservingActionListener;
|
||||||
import org.elasticsearch.client.OriginSettingClient;
|
import org.elasticsearch.client.OriginSettingClient;
|
||||||
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.Writeable;
|
import org.elasticsearch.common.io.stream.Writeable;
|
||||||
|
@ -32,7 +32,6 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.http.HttpTransportSettings;
|
import org.elasticsearch.http.HttpTransportSettings;
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -42,7 +41,6 @@ import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.BinaryOperator;
|
import java.util.function.BinaryOperator;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
@ -82,7 +80,7 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARN
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public final class ThreadContext implements Closeable, Writeable {
|
public final class ThreadContext implements Writeable {
|
||||||
|
|
||||||
public static final String PREFIX = "request.headers";
|
public static final String PREFIX = "request.headers";
|
||||||
public static final Setting<Settings> DEFAULT_HEADERS_SETTING = Setting.groupSetting(PREFIX + ".", Property.NodeScope);
|
public static final Setting<Settings> DEFAULT_HEADERS_SETTING = Setting.groupSetting(PREFIX + ".", Property.NodeScope);
|
||||||
|
@ -95,7 +93,7 @@ public final class ThreadContext implements Closeable, Writeable {
|
||||||
private static final Logger logger = LogManager.getLogger(ThreadContext.class);
|
private static final Logger logger = LogManager.getLogger(ThreadContext.class);
|
||||||
private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct();
|
private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct();
|
||||||
private final Map<String, String> defaultHeader;
|
private final Map<String, String> defaultHeader;
|
||||||
private final ContextThreadLocal threadLocal;
|
private final ThreadLocal<ThreadContextStruct> threadLocal;
|
||||||
private final int maxWarningHeaderCount;
|
private final int maxWarningHeaderCount;
|
||||||
private final long maxWarningHeaderSize;
|
private final long maxWarningHeaderSize;
|
||||||
|
|
||||||
|
@ -104,26 +102,12 @@ public final class ThreadContext implements Closeable, Writeable {
|
||||||
* @param settings the settings to read the default request headers from
|
* @param settings the settings to read the default request headers from
|
||||||
*/
|
*/
|
||||||
public ThreadContext(Settings settings) {
|
public ThreadContext(Settings settings) {
|
||||||
Settings headers = DEFAULT_HEADERS_SETTING.get(settings);
|
this.defaultHeader = buildDefaultHeaders(settings);
|
||||||
if (headers == null) {
|
this.threadLocal = ThreadLocal.withInitial(() -> DEFAULT_CONTEXT);
|
||||||
this.defaultHeader = Collections.emptyMap();
|
|
||||||
} else {
|
|
||||||
Map<String, String> defaultHeader = new HashMap<>();
|
|
||||||
for (String key : headers.names()) {
|
|
||||||
defaultHeader.put(key, headers.get(key));
|
|
||||||
}
|
|
||||||
this.defaultHeader = Collections.unmodifiableMap(defaultHeader);
|
|
||||||
}
|
|
||||||
threadLocal = new ContextThreadLocal();
|
|
||||||
this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings);
|
this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings);
|
||||||
this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes();
|
this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
threadLocal.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes the current context and resets a default context. The removed context can be
|
* Removes the current context and resets a default context. The removed context can be
|
||||||
* restored by closing the returned {@link StoredContext}.
|
* restored by closing the returned {@link StoredContext}.
|
||||||
|
@ -140,19 +124,13 @@ public final class ThreadContext implements Closeable, Writeable {
|
||||||
DEFAULT_CONTEXT.putHeaders(Map.of(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID)));
|
DEFAULT_CONTEXT.putHeaders(Map.of(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID)));
|
||||||
threadLocal.set(threadContextStruct);
|
threadLocal.set(threadContextStruct);
|
||||||
} else {
|
} else {
|
||||||
threadLocal.set(null);
|
threadLocal.set(DEFAULT_CONTEXT);
|
||||||
}
|
}
|
||||||
return () -> {
|
return () -> {
|
||||||
// If the node and thus the threadLocal get closed while this task
|
// If the node and thus the threadLocal get closed while this task
|
||||||
// is still executing, we don't want this runnable to fail with an
|
// is still executing, we don't want this runnable to fail with an
|
||||||
// uncaught exception
|
// uncaught exception
|
||||||
try {
|
threadLocal.set(context);
|
||||||
threadLocal.set(context);
|
|
||||||
} catch (IllegalStateException e) {
|
|
||||||
if (isClosed() == false) {
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -255,7 +233,20 @@ public final class ThreadContext implements Closeable, Writeable {
|
||||||
* Reads the headers from the stream into the current context
|
* Reads the headers from the stream into the current context
|
||||||
*/
|
*/
|
||||||
public void readHeaders(StreamInput in) throws IOException {
|
public void readHeaders(StreamInput in) throws IOException {
|
||||||
final Map<String, String> requestHeaders = in.readMap(StreamInput::readString, StreamInput::readString);
|
final Tuple<Map<String, String>, Map<String, Set<String>>> streamTuple = readHeadersFromStream(in);
|
||||||
|
final Map<String, String> requestHeaders = streamTuple.v1();
|
||||||
|
final Map<String, Set<String>> responseHeaders = streamTuple.v2();
|
||||||
|
final ThreadContextStruct struct;
|
||||||
|
if (requestHeaders.isEmpty() && responseHeaders.isEmpty()) {
|
||||||
|
struct = ThreadContextStruct.EMPTY;
|
||||||
|
} else {
|
||||||
|
struct = new ThreadContextStruct(requestHeaders, responseHeaders, Collections.emptyMap(), false);
|
||||||
|
}
|
||||||
|
threadLocal.set(struct);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Tuple<Map<String, String>, Map<String, Set<String>>> readHeadersFromStream(StreamInput in) throws IOException {
|
||||||
|
final Map<String, String> requestHeaders = in.readMap(StreamInput::readString, StreamInput::readString);
|
||||||
final Map<String, Set<String>> responseHeaders = in.readMap(StreamInput::readString, input -> {
|
final Map<String, Set<String>> responseHeaders = in.readMap(StreamInput::readString, input -> {
|
||||||
final int size = input.readVInt();
|
final int size = input.readVInt();
|
||||||
if (size == 0) {
|
if (size == 0) {
|
||||||
|
@ -273,13 +264,7 @@ public final class ThreadContext implements Closeable, Writeable {
|
||||||
return values;
|
return values;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
final ThreadContextStruct struct;
|
return new Tuple<>(requestHeaders, responseHeaders);
|
||||||
if (requestHeaders.isEmpty() && responseHeaders.isEmpty()) {
|
|
||||||
struct = ThreadContextStruct.EMPTY;
|
|
||||||
} else {
|
|
||||||
struct = new ThreadContextStruct(requestHeaders, responseHeaders, Collections.emptyMap(), false);
|
|
||||||
}
|
|
||||||
threadLocal.set(struct);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -373,17 +358,7 @@ public final class ThreadContext implements Closeable, Writeable {
|
||||||
* @param uniqueValue the function that produces de-duplication values
|
* @param uniqueValue the function that produces de-duplication values
|
||||||
*/
|
*/
|
||||||
public void addResponseHeader(final String key, final String value, final Function<String, String> uniqueValue) {
|
public void addResponseHeader(final String key, final String value, final Function<String, String> uniqueValue) {
|
||||||
/*
|
threadLocal.set(threadLocal.get().putResponse(key, value, uniqueValue, maxWarningHeaderCount, maxWarningHeaderSize));
|
||||||
* Updating the thread local is expensive due to a shared reference that we synchronize on, so we should only do it if the thread
|
|
||||||
* context struct changed. It will not change if we de-duplicate this value to an existing one, or if we don't add a new one because
|
|
||||||
* we have reached capacity.
|
|
||||||
*/
|
|
||||||
final ThreadContextStruct current = threadLocal.get();
|
|
||||||
final ThreadContextStruct maybeNext =
|
|
||||||
current.putResponse(key, value, uniqueValue, maxWarningHeaderCount, maxWarningHeaderSize);
|
|
||||||
if (current != maybeNext) {
|
|
||||||
threadLocal.set(maybeNext);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -435,13 +410,6 @@ public final class ThreadContext implements Closeable, Writeable {
|
||||||
return threadLocal.get().isSystemContext;
|
return threadLocal.get().isSystemContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns <code>true</code> if the context is closed, otherwise <code>true</code>
|
|
||||||
*/
|
|
||||||
public boolean isClosed() {
|
|
||||||
return threadLocal.closed.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
public interface StoredContext extends AutoCloseable {
|
public interface StoredContext extends AutoCloseable {
|
||||||
@Override
|
@Override
|
||||||
|
@ -452,6 +420,19 @@ public final class ThreadContext implements Closeable, Writeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Map<String, String> buildDefaultHeaders(Settings settings) {
|
||||||
|
Settings headers = DEFAULT_HEADERS_SETTING.get(settings);
|
||||||
|
if (headers == null) {
|
||||||
|
return Collections.emptyMap();
|
||||||
|
} else {
|
||||||
|
Map<String, String> defaultHeader = new HashMap<>();
|
||||||
|
for (String key : headers.names()) {
|
||||||
|
defaultHeader.put(key, headers.get(key));
|
||||||
|
}
|
||||||
|
return Collections.unmodifiableMap(defaultHeader);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static final class ThreadContextStruct {
|
private static final class ThreadContextStruct {
|
||||||
|
|
||||||
private static final ThreadContextStruct EMPTY =
|
private static final ThreadContextStruct EMPTY =
|
||||||
|
@ -629,55 +610,6 @@ public final class ThreadContext implements Closeable, Writeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ContextThreadLocal extends CloseableThreadLocal<ThreadContextStruct> {
|
|
||||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void set(ThreadContextStruct object) {
|
|
||||||
try {
|
|
||||||
if (object == DEFAULT_CONTEXT) {
|
|
||||||
super.set(null);
|
|
||||||
} else {
|
|
||||||
super.set(object);
|
|
||||||
}
|
|
||||||
} catch (NullPointerException ex) {
|
|
||||||
/* This is odd but CloseableThreadLocal throws a NPE if it was closed but still accessed.
|
|
||||||
to get a real exception we call ensureOpen() to tell the user we are already closed.*/
|
|
||||||
ensureOpen();
|
|
||||||
throw ex;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ThreadContextStruct get() {
|
|
||||||
try {
|
|
||||||
ThreadContextStruct threadContextStruct = super.get();
|
|
||||||
if (threadContextStruct != null) {
|
|
||||||
return threadContextStruct;
|
|
||||||
}
|
|
||||||
return DEFAULT_CONTEXT;
|
|
||||||
} catch (NullPointerException ex) {
|
|
||||||
/* This is odd but CloseableThreadLocal throws a NPE if it was closed but still accessed.
|
|
||||||
to get a real exception we call ensureOpen() to tell the user we are already closed.*/
|
|
||||||
ensureOpen();
|
|
||||||
throw ex;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void ensureOpen() {
|
|
||||||
if (closed.get()) {
|
|
||||||
throw new IllegalStateException("threadcontext is already closed");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
if (closed.compareAndSet(false, true)) {
|
|
||||||
super.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wraps a Runnable to preserve the thread context.
|
* Wraps a Runnable to preserve the thread context.
|
||||||
*/
|
*/
|
||||||
|
@ -692,19 +624,9 @@ public final class ThreadContext implements Closeable, Writeable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
boolean whileRunning = false;
|
|
||||||
try (ThreadContext.StoredContext ignore = stashContext()){
|
try (ThreadContext.StoredContext ignore = stashContext()){
|
||||||
ctx.restore();
|
ctx.restore();
|
||||||
whileRunning = true;
|
|
||||||
in.run();
|
in.run();
|
||||||
whileRunning = false;
|
|
||||||
} catch (IllegalStateException ex) {
|
|
||||||
if (whileRunning || threadLocal.closed.get() == false) {
|
|
||||||
throw ex;
|
|
||||||
}
|
|
||||||
// if we hit an ISE here we have been shutting down
|
|
||||||
// this comes from the threadcontext and barfs if
|
|
||||||
// our threadpool has been shutting down
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -761,21 +683,9 @@ public final class ThreadContext implements Closeable, Writeable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doRun() throws Exception {
|
protected void doRun() throws Exception {
|
||||||
boolean whileRunning = false;
|
|
||||||
threadsOriginalContext = stashContext();
|
threadsOriginalContext = stashContext();
|
||||||
try {
|
creatorsContext.restore();
|
||||||
creatorsContext.restore();
|
in.doRun();
|
||||||
whileRunning = true;
|
|
||||||
in.doRun();
|
|
||||||
whileRunning = false;
|
|
||||||
} catch (IllegalStateException ex) {
|
|
||||||
if (whileRunning || threadLocal.closed.get() == false) {
|
|
||||||
throw ex;
|
|
||||||
}
|
|
||||||
// if we hit an ISE here we have been shutting down
|
|
||||||
// this comes from the threadcontext and barfs if
|
|
||||||
// our threadpool has been shutting down
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -39,7 +39,6 @@ import org.elasticsearch.common.xcontent.ToXContentFragment;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.node.Node;
|
import org.elasticsearch.node.Node;
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -62,7 +61,7 @@ import java.util.stream.Collectors;
|
||||||
import static java.util.Collections.unmodifiableMap;
|
import static java.util.Collections.unmodifiableMap;
|
||||||
import static java.util.Map.entry;
|
import static java.util.Map.entry;
|
||||||
|
|
||||||
public class ThreadPool implements Scheduler, Closeable {
|
public class ThreadPool implements Scheduler {
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(ThreadPool.class);
|
private static final Logger logger = LogManager.getLogger(ThreadPool.class);
|
||||||
|
|
||||||
|
@ -707,15 +706,13 @@ public class ThreadPool implements Scheduler, Closeable {
|
||||||
public static boolean terminate(ThreadPool pool, long timeout, TimeUnit timeUnit) {
|
public static boolean terminate(ThreadPool pool, long timeout, TimeUnit timeUnit) {
|
||||||
if (pool != null) {
|
if (pool != null) {
|
||||||
// Leverage try-with-resources to close the threadpool
|
// Leverage try-with-resources to close the threadpool
|
||||||
try (ThreadPool c = pool) {
|
pool.shutdown();
|
||||||
pool.shutdown();
|
if (awaitTermination(pool, timeout, timeUnit)) {
|
||||||
if (awaitTermination(pool, timeout, timeUnit)) {
|
return true;
|
||||||
return true;
|
|
||||||
}
|
|
||||||
// last resort
|
|
||||||
pool.shutdownNow();
|
|
||||||
return awaitTermination(pool, timeout, timeUnit);
|
|
||||||
}
|
}
|
||||||
|
// last resort
|
||||||
|
pool.shutdownNow();
|
||||||
|
return awaitTermination(pool, timeout, timeUnit);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -734,11 +731,6 @@ public class ThreadPool implements Scheduler, Closeable {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
threadContext.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
public ThreadContext getThreadContext() {
|
public ThreadContext getThreadContext() {
|
||||||
return threadContext;
|
return threadContext;
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,6 @@ import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.compress.Compressor;
|
import org.elasticsearch.common.compress.Compressor;
|
||||||
import org.elasticsearch.common.compress.NotCompressedException;
|
import org.elasticsearch.common.compress.NotCompressedException;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
import org.elasticsearch.core.internal.io.IOUtils;
|
import org.elasticsearch.core.internal.io.IOUtils;
|
||||||
|
|
||||||
|
@ -94,9 +93,8 @@ public final class TransportLogger {
|
||||||
streamInput = compressor.streamInput(streamInput);
|
streamInput = compressor.streamInput(streamInput);
|
||||||
}
|
}
|
||||||
|
|
||||||
try (ThreadContext context = new ThreadContext(Settings.EMPTY)) {
|
// read and discard headers
|
||||||
context.readHeaders(streamInput);
|
ThreadContext.readHeadersFromStream(streamInput);
|
||||||
}
|
|
||||||
if (streamInput.getVersion().before(Version.V_8_0_0)) {
|
if (streamInput.getVersion().before(Version.V_8_0_0)) {
|
||||||
// discard the features
|
// discard the features
|
||||||
streamInput.readStringArray();
|
streamInput.readStringArray();
|
||||||
|
|
|
@ -28,129 +28,126 @@ import java.io.IOException;
|
||||||
public class ContextPreservingActionListenerTests extends ESTestCase {
|
public class ContextPreservingActionListenerTests extends ESTestCase {
|
||||||
|
|
||||||
public void testOriginalContextIsPreservedAfterOnResponse() throws IOException {
|
public void testOriginalContextIsPreservedAfterOnResponse() throws IOException {
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
final boolean nonEmptyContext = randomBoolean();
|
final boolean nonEmptyContext = randomBoolean();
|
||||||
if (nonEmptyContext) {
|
if (nonEmptyContext) {
|
||||||
threadContext.putHeader("not empty", "value");
|
threadContext.putHeader("not empty", "value");
|
||||||
}
|
|
||||||
final ContextPreservingActionListener<Void> actionListener;
|
|
||||||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
|
||||||
threadContext.putHeader("foo", "bar");
|
|
||||||
final ActionListener<Void> delegate = new ActionListener<Void>() {
|
|
||||||
@Override
|
|
||||||
public void onResponse(Void aVoid) {
|
|
||||||
assertEquals("bar", threadContext.getHeader("foo"));
|
|
||||||
assertNull(threadContext.getHeader("not empty"));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Exception e) {
|
|
||||||
throw new RuntimeException("onFailure shouldn't be called", e);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if (randomBoolean()) {
|
|
||||||
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true), delegate);
|
|
||||||
} else {
|
|
||||||
actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assertNull(threadContext.getHeader("foo"));
|
|
||||||
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
|
||||||
|
|
||||||
actionListener.onResponse(null);
|
|
||||||
|
|
||||||
assertNull(threadContext.getHeader("foo"));
|
|
||||||
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
|
||||||
}
|
}
|
||||||
|
final ContextPreservingActionListener<Void> actionListener;
|
||||||
|
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||||
|
threadContext.putHeader("foo", "bar");
|
||||||
|
final ActionListener<Void> delegate = new ActionListener<Void>() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(Void aVoid) {
|
||||||
|
assertEquals("bar", threadContext.getHeader("foo"));
|
||||||
|
assertNull(threadContext.getHeader("not empty"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
throw new RuntimeException("onFailure shouldn't be called", e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if (randomBoolean()) {
|
||||||
|
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true), delegate);
|
||||||
|
} else {
|
||||||
|
actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNull(threadContext.getHeader("foo"));
|
||||||
|
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
||||||
|
|
||||||
|
actionListener.onResponse(null);
|
||||||
|
|
||||||
|
assertNull(threadContext.getHeader("foo"));
|
||||||
|
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testOriginalContextIsPreservedAfterOnFailure() throws Exception {
|
public void testOriginalContextIsPreservedAfterOnFailure() throws Exception {
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
final boolean nonEmptyContext = randomBoolean();
|
final boolean nonEmptyContext = randomBoolean();
|
||||||
if (nonEmptyContext) {
|
if (nonEmptyContext) {
|
||||||
threadContext.putHeader("not empty", "value");
|
threadContext.putHeader("not empty", "value");
|
||||||
}
|
}
|
||||||
final ContextPreservingActionListener<Void> actionListener;
|
final ContextPreservingActionListener<Void> actionListener;
|
||||||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||||
threadContext.putHeader("foo", "bar");
|
threadContext.putHeader("foo", "bar");
|
||||||
final ActionListener<Void> delegate = new ActionListener<Void>() {
|
final ActionListener<Void> delegate = new ActionListener<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(Void aVoid) {
|
public void onResponse(Void aVoid) {
|
||||||
throw new RuntimeException("onResponse shouldn't be called");
|
throw new RuntimeException("onResponse shouldn't be called");
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Exception e) {
|
|
||||||
assertEquals("bar", threadContext.getHeader("foo"));
|
|
||||||
assertNull(threadContext.getHeader("not empty"));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if (randomBoolean()) {
|
|
||||||
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true), delegate);
|
|
||||||
} else {
|
|
||||||
actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
assertEquals("bar", threadContext.getHeader("foo"));
|
||||||
|
assertNull(threadContext.getHeader("not empty"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if (randomBoolean()) {
|
||||||
|
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true), delegate);
|
||||||
|
} else {
|
||||||
|
actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
assertNull(threadContext.getHeader("foo"));
|
|
||||||
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
|
||||||
|
|
||||||
actionListener.onFailure(null);
|
|
||||||
|
|
||||||
assertNull(threadContext.getHeader("foo"));
|
|
||||||
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
assertNull(threadContext.getHeader("foo"));
|
||||||
|
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
||||||
|
|
||||||
|
actionListener.onFailure(null);
|
||||||
|
|
||||||
|
assertNull(threadContext.getHeader("foo"));
|
||||||
|
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
||||||
|
}
|
||||||
|
|
||||||
public void testOriginalContextIsWhenListenerThrows() throws Exception {
|
public void testOriginalContextIsWhenListenerThrows() throws Exception {
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
final boolean nonEmptyContext = randomBoolean();
|
final boolean nonEmptyContext = randomBoolean();
|
||||||
if (nonEmptyContext) {
|
if (nonEmptyContext) {
|
||||||
threadContext.putHeader("not empty", "value");
|
threadContext.putHeader("not empty", "value");
|
||||||
}
|
|
||||||
final ContextPreservingActionListener<Void> actionListener;
|
|
||||||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
|
||||||
threadContext.putHeader("foo", "bar");
|
|
||||||
final ActionListener<Void> delegate = new ActionListener<Void>() {
|
|
||||||
@Override
|
|
||||||
public void onResponse(Void aVoid) {
|
|
||||||
assertEquals("bar", threadContext.getHeader("foo"));
|
|
||||||
assertNull(threadContext.getHeader("not empty"));
|
|
||||||
throw new RuntimeException("onResponse called");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Exception e) {
|
|
||||||
assertEquals("bar", threadContext.getHeader("foo"));
|
|
||||||
assertNull(threadContext.getHeader("not empty"));
|
|
||||||
throw new RuntimeException("onFailure called");
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if (randomBoolean()) {
|
|
||||||
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true), delegate);
|
|
||||||
} else {
|
|
||||||
actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assertNull(threadContext.getHeader("foo"));
|
|
||||||
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
|
||||||
|
|
||||||
RuntimeException e = expectThrows(RuntimeException.class, () -> actionListener.onResponse(null));
|
|
||||||
assertEquals("onResponse called", e.getMessage());
|
|
||||||
|
|
||||||
assertNull(threadContext.getHeader("foo"));
|
|
||||||
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
|
||||||
|
|
||||||
e = expectThrows(RuntimeException.class, () -> actionListener.onFailure(null));
|
|
||||||
assertEquals("onFailure called", e.getMessage());
|
|
||||||
|
|
||||||
assertNull(threadContext.getHeader("foo"));
|
|
||||||
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
|
||||||
}
|
}
|
||||||
|
final ContextPreservingActionListener<Void> actionListener;
|
||||||
|
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||||
|
threadContext.putHeader("foo", "bar");
|
||||||
|
final ActionListener<Void> delegate = new ActionListener<Void>() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(Void aVoid) {
|
||||||
|
assertEquals("bar", threadContext.getHeader("foo"));
|
||||||
|
assertNull(threadContext.getHeader("not empty"));
|
||||||
|
throw new RuntimeException("onResponse called");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
assertEquals("bar", threadContext.getHeader("foo"));
|
||||||
|
assertNull(threadContext.getHeader("not empty"));
|
||||||
|
throw new RuntimeException("onFailure called");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if (randomBoolean()) {
|
||||||
|
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true), delegate);
|
||||||
|
} else {
|
||||||
|
actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNull(threadContext.getHeader("foo"));
|
||||||
|
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
||||||
|
|
||||||
|
RuntimeException e = expectThrows(RuntimeException.class, () -> actionListener.onResponse(null));
|
||||||
|
assertEquals("onResponse called", e.getMessage());
|
||||||
|
|
||||||
|
assertNull(threadContext.getHeader("foo"));
|
||||||
|
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
||||||
|
|
||||||
|
e = expectThrows(RuntimeException.class, () -> actionListener.onFailure(null));
|
||||||
|
assertEquals("onFailure called", e.getMessage());
|
||||||
|
|
||||||
|
assertNull(threadContext.getHeader("foo"));
|
||||||
|
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,7 +42,6 @@ import java.security.Permissions;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.security.ProtectionDomain;
|
import java.security.ProtectionDomain;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -78,136 +77,118 @@ public class DeprecationLoggerTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAddsHeaderWithThreadContext() throws IOException {
|
public void testAddsHeaderWithThreadContext() throws IOException {
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
|
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
|
||||||
|
|
||||||
final String param = randomAlphaOfLengthBetween(1, 5);
|
final String param = randomAlphaOfLengthBetween(1, 5);
|
||||||
logger.deprecated(threadContexts, "A simple message [{}]", param);
|
logger.deprecated(threadContexts, "A simple message [{}]", param);
|
||||||
|
|
||||||
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
|
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
|
||||||
|
|
||||||
assertThat(responseHeaders.size(), equalTo(1));
|
assertThat(responseHeaders.size(), equalTo(1));
|
||||||
final List<String> responses = responseHeaders.get("Warning");
|
final List<String> responses = responseHeaders.get("Warning");
|
||||||
assertThat(responses, hasSize(1));
|
assertThat(responses, hasSize(1));
|
||||||
assertThat(responses.get(0), warningValueMatcher);
|
assertThat(responses.get(0), warningValueMatcher);
|
||||||
assertThat(responses.get(0), containsString("\"A simple message [" + param + "]\""));
|
assertThat(responses.get(0), containsString("\"A simple message [" + param + "]\""));
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testContainingNewline() throws IOException {
|
public void testContainingNewline() throws IOException {
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
|
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
|
||||||
|
|
||||||
logger.deprecated(threadContexts, "this message contains a newline\n");
|
logger.deprecated(threadContexts, "this message contains a newline\n");
|
||||||
|
|
||||||
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
|
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
|
||||||
|
|
||||||
assertThat(responseHeaders.size(), equalTo(1));
|
assertThat(responseHeaders.size(), equalTo(1));
|
||||||
final List<String> responses = responseHeaders.get("Warning");
|
final List<String> responses = responseHeaders.get("Warning");
|
||||||
assertThat(responses, hasSize(1));
|
assertThat(responses, hasSize(1));
|
||||||
assertThat(responses.get(0), warningValueMatcher);
|
assertThat(responses.get(0), warningValueMatcher);
|
||||||
assertThat(responses.get(0), containsString("\"this message contains a newline%0A\""));
|
assertThat(responses.get(0), containsString("\"this message contains a newline%0A\""));
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testSurrogatePair() throws IOException {
|
public void testSurrogatePair() throws IOException {
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
|
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
|
||||||
|
|
||||||
logger.deprecated(threadContexts, "this message contains a surrogate pair 😱");
|
logger.deprecated(threadContexts, "this message contains a surrogate pair 😱");
|
||||||
|
|
||||||
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
|
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
|
||||||
|
|
||||||
assertThat(responseHeaders.size(), equalTo(1));
|
assertThat(responseHeaders.size(), equalTo(1));
|
||||||
final List<String> responses = responseHeaders.get("Warning");
|
final List<String> responses = responseHeaders.get("Warning");
|
||||||
assertThat(responses, hasSize(1));
|
assertThat(responses, hasSize(1));
|
||||||
assertThat(responses.get(0), warningValueMatcher);
|
assertThat(responses.get(0), warningValueMatcher);
|
||||||
|
|
||||||
// convert UTF-16 to UTF-8 by hand to show the hard-coded constant below is correct
|
// convert UTF-16 to UTF-8 by hand to show the hard-coded constant below is correct
|
||||||
assertThat("😱", equalTo("\uD83D\uDE31"));
|
assertThat("😱", equalTo("\uD83D\uDE31"));
|
||||||
final int code = 0x10000 + ((0xD83D & 0x3FF) << 10) + (0xDE31 & 0x3FF);
|
final int code = 0x10000 + ((0xD83D & 0x3FF) << 10) + (0xDE31 & 0x3FF);
|
||||||
@SuppressWarnings("PointlessBitwiseExpression")
|
@SuppressWarnings("PointlessBitwiseExpression")
|
||||||
final int[] points = new int[] {
|
final int[] points = new int[] {
|
||||||
(code >> 18) & 0x07 | 0xF0,
|
(code >> 18) & 0x07 | 0xF0,
|
||||||
(code >> 12) & 0x3F | 0x80,
|
(code >> 12) & 0x3F | 0x80,
|
||||||
(code >> 6) & 0x3F | 0x80,
|
(code >> 6) & 0x3F | 0x80,
|
||||||
(code >> 0) & 0x3F | 0x80};
|
(code >> 0) & 0x3F | 0x80};
|
||||||
final StringBuilder sb = new StringBuilder();
|
final StringBuilder sb = new StringBuilder();
|
||||||
// noinspection ForLoopReplaceableByForEach
|
// noinspection ForLoopReplaceableByForEach
|
||||||
for (int i = 0; i < points.length; i++) {
|
for (int i = 0; i < points.length; i++) {
|
||||||
sb.append("%").append(Integer.toString(points[i], 16).toUpperCase(Locale.ROOT));
|
sb.append("%").append(Integer.toString(points[i], 16).toUpperCase(Locale.ROOT));
|
||||||
}
|
|
||||||
assertThat(sb.toString(), equalTo("%F0%9F%98%B1"));
|
|
||||||
assertThat(responses.get(0), containsString("\"this message contains a surrogate pair %F0%9F%98%B1\""));
|
|
||||||
}
|
}
|
||||||
|
assertThat(sb.toString(), equalTo("%F0%9F%98%B1"));
|
||||||
|
assertThat(responses.get(0), containsString("\"this message contains a surrogate pair %F0%9F%98%B1\""));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAddsCombinedHeaderWithThreadContext() throws IOException {
|
public void testAddsCombinedHeaderWithThreadContext() throws IOException {
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
|
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
|
||||||
|
|
||||||
final String param = randomAlphaOfLengthBetween(1, 5);
|
final String param = randomAlphaOfLengthBetween(1, 5);
|
||||||
logger.deprecated(threadContexts, "A simple message [{}]", param);
|
logger.deprecated(threadContexts, "A simple message [{}]", param);
|
||||||
final String second = randomAlphaOfLengthBetween(1, 10);
|
final String second = randomAlphaOfLengthBetween(1, 10);
|
||||||
logger.deprecated(threadContexts, second);
|
logger.deprecated(threadContexts, second);
|
||||||
|
|
||||||
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
|
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
|
||||||
|
|
||||||
assertEquals(1, responseHeaders.size());
|
assertEquals(1, responseHeaders.size());
|
||||||
|
|
||||||
final List<String> responses = responseHeaders.get("Warning");
|
final List<String> responses = responseHeaders.get("Warning");
|
||||||
|
|
||||||
assertEquals(2, responses.size());
|
assertEquals(2, responses.size());
|
||||||
assertThat(responses.get(0), warningValueMatcher);
|
assertThat(responses.get(0), warningValueMatcher);
|
||||||
assertThat(responses.get(0), containsString("\"A simple message [" + param + "]\""));
|
assertThat(responses.get(0), containsString("\"A simple message [" + param + "]\""));
|
||||||
assertThat(responses.get(1), warningValueMatcher);
|
assertThat(responses.get(1), warningValueMatcher);
|
||||||
assertThat(responses.get(1), containsString("\"" + second + "\""));
|
assertThat(responses.get(1), containsString("\"" + second + "\""));
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCanRemoveThreadContext() throws IOException {
|
public void testCanRemoveThreadContext() throws IOException {
|
||||||
final String expected = "testCanRemoveThreadContext";
|
final String expected = "testCanRemoveThreadContext";
|
||||||
final String unexpected = "testCannotRemoveThreadContext";
|
final String unexpected = "testCannotRemoveThreadContext";
|
||||||
|
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
|
||||||
DeprecationLogger.setThreadContext(threadContext);
|
|
||||||
logger.deprecated(expected);
|
|
||||||
|
|
||||||
{
|
|
||||||
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
|
|
||||||
final List<String> responses = responseHeaders.get("Warning");
|
|
||||||
|
|
||||||
assertThat(responses, hasSize(1));
|
|
||||||
assertThat(responses.get(0), warningValueMatcher);
|
|
||||||
assertThat(responses.get(0), containsString(expected));
|
|
||||||
}
|
|
||||||
|
|
||||||
DeprecationLogger.removeThreadContext(threadContext);
|
|
||||||
logger.deprecated(unexpected);
|
|
||||||
|
|
||||||
{
|
|
||||||
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
|
|
||||||
final List<String> responses = responseHeaders.get("Warning");
|
|
||||||
|
|
||||||
assertThat(responses, hasSize(1));
|
|
||||||
assertThat(responses.get(0), warningValueMatcher);
|
|
||||||
assertThat(responses.get(0), containsString(expected));
|
|
||||||
assertThat(responses.get(0), not(containsString(unexpected)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testIgnoresClosedThreadContext() throws IOException {
|
|
||||||
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
Set<ThreadContext> threadContexts = new HashSet<>(1);
|
DeprecationLogger.setThreadContext(threadContext);
|
||||||
|
logger.deprecated(expected);
|
||||||
|
|
||||||
threadContexts.add(threadContext);
|
{
|
||||||
|
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
|
||||||
|
final List<String> responses = responseHeaders.get("Warning");
|
||||||
|
|
||||||
threadContext.close();
|
assertThat(responses, hasSize(1));
|
||||||
|
assertThat(responses.get(0), warningValueMatcher);
|
||||||
|
assertThat(responses.get(0), containsString(expected));
|
||||||
|
}
|
||||||
|
|
||||||
logger.deprecated(threadContexts, "Ignored logger message");
|
DeprecationLogger.removeThreadContext(threadContext);
|
||||||
|
logger.deprecated(unexpected);
|
||||||
|
|
||||||
assertTrue(threadContexts.contains(threadContext));
|
{
|
||||||
|
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
|
||||||
|
final List<String> responses = responseHeaders.get("Warning");
|
||||||
|
|
||||||
|
assertThat(responses, hasSize(1));
|
||||||
|
assertThat(responses.get(0), warningValueMatcher);
|
||||||
|
assertThat(responses.get(0), containsString(expected));
|
||||||
|
assertThat(responses.get(0), not(containsString(unexpected)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testSafeWithoutThreadContext() {
|
public void testSafeWithoutThreadContext() {
|
||||||
|
@ -219,22 +200,20 @@ public class DeprecationLoggerTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testFailsWhenDoubleSettingSameThreadContext() throws IOException {
|
public void testFailsWhenDoubleSettingSameThreadContext() throws IOException {
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
DeprecationLogger.setThreadContext(threadContext);
|
DeprecationLogger.setThreadContext(threadContext);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
expectThrows(IllegalStateException.class, () -> DeprecationLogger.setThreadContext(threadContext));
|
expectThrows(IllegalStateException.class, () -> DeprecationLogger.setThreadContext(threadContext));
|
||||||
} finally {
|
} finally {
|
||||||
// cleanup after ourselves
|
// cleanup after ourselves
|
||||||
DeprecationLogger.removeThreadContext(threadContext);
|
DeprecationLogger.removeThreadContext(threadContext);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testFailsWhenRemovingUnknownThreadContext() throws IOException {
|
public void testFailsWhenRemovingUnknownThreadContext() throws IOException {
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
expectThrows(IllegalStateException.class, () -> DeprecationLogger.removeThreadContext(threadContext));
|
expectThrows(IllegalStateException.class, () -> DeprecationLogger.removeThreadContext(threadContext));
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testWarningValueFromWarningHeader() throws InterruptedException {
|
public void testWarningValueFromWarningHeader() throws InterruptedException {
|
||||||
|
@ -274,21 +253,20 @@ public class DeprecationLoggerTests extends ESTestCase {
|
||||||
Settings settings = Settings.builder()
|
Settings settings = Settings.builder()
|
||||||
.put("http.max_warning_header_count", maxWarningHeaderCount)
|
.put("http.max_warning_header_count", maxWarningHeaderCount)
|
||||||
.build();
|
.build();
|
||||||
try (ThreadContext threadContext = new ThreadContext(settings)) {
|
ThreadContext threadContext = new ThreadContext(settings);
|
||||||
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
|
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
|
||||||
// try to log three warning messages
|
// try to log three warning messages
|
||||||
logger.deprecated(threadContexts, "A simple message 1");
|
logger.deprecated(threadContexts, "A simple message 1");
|
||||||
logger.deprecated(threadContexts, "A simple message 2");
|
logger.deprecated(threadContexts, "A simple message 2");
|
||||||
logger.deprecated(threadContexts, "A simple message 3");
|
logger.deprecated(threadContexts, "A simple message 3");
|
||||||
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
|
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
|
||||||
final List<String> responses = responseHeaders.get("Warning");
|
final List<String> responses = responseHeaders.get("Warning");
|
||||||
|
|
||||||
assertEquals(maxWarningHeaderCount, responses.size());
|
assertEquals(maxWarningHeaderCount, responses.size());
|
||||||
assertThat(responses.get(0), warningValueMatcher);
|
assertThat(responses.get(0), warningValueMatcher);
|
||||||
assertThat(responses.get(0), containsString("\"A simple message 1"));
|
assertThat(responses.get(0), containsString("\"A simple message 1"));
|
||||||
assertThat(responses.get(1), warningValueMatcher);
|
assertThat(responses.get(1), warningValueMatcher);
|
||||||
assertThat(responses.get(1), containsString("\"A simple message 2"));
|
assertThat(responses.get(1), containsString("\"A simple message 2"));
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testWarningHeaderSizeSetting() throws IOException{
|
public void testWarningHeaderSizeSetting() throws IOException{
|
||||||
|
@ -302,23 +280,22 @@ public class DeprecationLoggerTests extends ESTestCase {
|
||||||
String message2 = new String(arr, StandardCharsets.UTF_8) + "2";
|
String message2 = new String(arr, StandardCharsets.UTF_8) + "2";
|
||||||
String message3 = new String(arr, StandardCharsets.UTF_8) + "3";
|
String message3 = new String(arr, StandardCharsets.UTF_8) + "3";
|
||||||
|
|
||||||
try (ThreadContext threadContext = new ThreadContext(settings)) {
|
ThreadContext threadContext = new ThreadContext(settings);
|
||||||
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
|
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
|
||||||
// try to log three warning messages
|
// try to log three warning messages
|
||||||
logger.deprecated(threadContexts, message1);
|
logger.deprecated(threadContexts, message1);
|
||||||
logger.deprecated(threadContexts, message2);
|
logger.deprecated(threadContexts, message2);
|
||||||
logger.deprecated(threadContexts, message3);
|
logger.deprecated(threadContexts, message3);
|
||||||
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
|
final Map<String, List<String>> responseHeaders = threadContext.getResponseHeaders();
|
||||||
final List<String> responses = responseHeaders.get("Warning");
|
final List<String> responses = responseHeaders.get("Warning");
|
||||||
|
|
||||||
long warningHeadersSize = 0L;
|
long warningHeadersSize = 0L;
|
||||||
for (String response : responses){
|
for (String response : responses){
|
||||||
warningHeadersSize += "Warning".getBytes(StandardCharsets.UTF_8).length +
|
warningHeadersSize += "Warning".getBytes(StandardCharsets.UTF_8).length +
|
||||||
response.getBytes(StandardCharsets.UTF_8).length;
|
response.getBytes(StandardCharsets.UTF_8).length;
|
||||||
}
|
|
||||||
// assert that the size of all warning headers is less or equal to 1Kb
|
|
||||||
assertTrue(warningHeadersSize <= 1024);
|
|
||||||
}
|
}
|
||||||
|
// assert that the size of all warning headers is less or equal to 1Kb
|
||||||
|
assertTrue(warningHeadersSize <= 1024);
|
||||||
}
|
}
|
||||||
@SuppressLoggerChecks(reason = "Safe as this is using mockito")
|
@SuppressLoggerChecks(reason = "Safe as this is using mockito")
|
||||||
public void testLogPermissions() {
|
public void testLogPermissions() {
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.common.util.concurrent;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -47,11 +46,6 @@ public class AsyncIOProcessorTests extends ESTestCase {
|
||||||
threadContext = new ThreadContext(Settings.EMPTY);
|
threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
|
||||||
public void tearDownThreadContext() {
|
|
||||||
threadContext.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testPut() throws InterruptedException {
|
public void testPut() throws InterruptedException {
|
||||||
boolean blockInternal = randomBoolean();
|
boolean blockInternal = randomBoolean();
|
||||||
AtomicInteger received = new AtomicInteger(0);
|
AtomicInteger received = new AtomicInteger(0);
|
||||||
|
|
|
@ -64,7 +64,6 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
|
||||||
});
|
});
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
executor.awaitTermination(10, TimeUnit.SECONDS);
|
executor.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
context.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAutoQueueSizingUp() throws Exception {
|
public void testAutoQueueSizingUp() throws Exception {
|
||||||
|
@ -93,7 +92,6 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
|
||||||
});
|
});
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
executor.awaitTermination(10, TimeUnit.SECONDS);
|
executor.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
context.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAutoQueueSizingDown() throws Exception {
|
public void testAutoQueueSizingDown() throws Exception {
|
||||||
|
@ -121,7 +119,6 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
|
||||||
});
|
});
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
executor.awaitTermination(10, TimeUnit.SECONDS);
|
executor.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
context.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAutoQueueSizingWithMin() throws Exception {
|
public void testAutoQueueSizingWithMin() throws Exception {
|
||||||
|
@ -151,7 +148,6 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
|
||||||
});
|
});
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
executor.awaitTermination(10, TimeUnit.SECONDS);
|
executor.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
context.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAutoQueueSizingWithMax() throws Exception {
|
public void testAutoQueueSizingWithMax() throws Exception {
|
||||||
|
@ -181,7 +177,6 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
|
||||||
});
|
});
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
executor.awaitTermination(10, TimeUnit.SECONDS);
|
executor.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
context.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testExecutionEWMACalculation() throws Exception {
|
public void testExecutionEWMACalculation() throws Exception {
|
||||||
|
@ -222,7 +217,6 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
|
||||||
|
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
executor.awaitTermination(10, TimeUnit.SECONDS);
|
executor.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
context.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Use a runnable wrapper that simulates a task with unknown failures. */
|
/** Use a runnable wrapper that simulates a task with unknown failures. */
|
||||||
|
@ -244,7 +238,6 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
|
||||||
executeTask(executor, 1);
|
executeTask(executor, 1);
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
executor.awaitTermination(10, TimeUnit.SECONDS);
|
executor.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
context.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Function<Runnable, WrappedRunnable> fastWrapper() {
|
private Function<Runnable, WrappedRunnable> fastWrapper() {
|
||||||
|
|
|
@ -244,35 +244,6 @@ public class ThreadContextTests extends ESTestCase {
|
||||||
assertEquals("bar", threadContext.getHeader("foo"));
|
assertEquals("bar", threadContext.getHeader("foo"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAccessClosed() throws IOException {
|
|
||||||
Settings build = Settings.builder().put("request.headers.default", "1").build();
|
|
||||||
ThreadContext threadContext = new ThreadContext(build);
|
|
||||||
threadContext.putHeader("foo", "bar");
|
|
||||||
threadContext.putTransient("ctx.foo", 1);
|
|
||||||
|
|
||||||
threadContext.close();
|
|
||||||
try {
|
|
||||||
threadContext.getHeader("foo");
|
|
||||||
fail();
|
|
||||||
} catch (IllegalStateException ise) {
|
|
||||||
assertEquals("threadcontext is already closed", ise.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
threadContext.putTransient("foo", new Object());
|
|
||||||
fail();
|
|
||||||
} catch (IllegalStateException ise) {
|
|
||||||
assertEquals("threadcontext is already closed", ise.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
threadContext.putHeader("boom", "boom");
|
|
||||||
fail();
|
|
||||||
} catch (IllegalStateException ise) {
|
|
||||||
assertEquals("threadcontext is already closed", ise.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testSerialize() throws IOException {
|
public void testSerialize() throws IOException {
|
||||||
Settings build = Settings.builder().put("request.headers.default", "1").build();
|
Settings build = Settings.builder().put("request.headers.default", "1").build();
|
||||||
ThreadContext threadContext = new ThreadContext(build);
|
ThreadContext threadContext = new ThreadContext(build);
|
||||||
|
@ -397,244 +368,238 @@ public class ThreadContextTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testPreserveContext() throws IOException {
|
public void testPreserveContext() throws IOException {
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
Runnable withContext;
|
Runnable withContext;
|
||||||
|
|
||||||
// Create a runnable that should run with some header
|
// Create a runnable that should run with some header
|
||||||
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
||||||
threadContext.putHeader("foo", "bar");
|
threadContext.putHeader("foo", "bar");
|
||||||
withContext = threadContext.preserveContext(sometimesAbstractRunnable(() -> {
|
withContext = threadContext.preserveContext(sometimesAbstractRunnable(() -> {
|
||||||
assertEquals("bar", threadContext.getHeader("foo"));
|
assertEquals("bar", threadContext.getHeader("foo"));
|
||||||
}));
|
}));
|
||||||
}
|
|
||||||
|
|
||||||
// We don't see the header outside of the runnable
|
|
||||||
assertNull(threadContext.getHeader("foo"));
|
|
||||||
|
|
||||||
// But we do inside of it
|
|
||||||
withContext.run();
|
|
||||||
|
|
||||||
// but not after
|
|
||||||
assertNull(threadContext.getHeader("foo"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We don't see the header outside of the runnable
|
||||||
|
assertNull(threadContext.getHeader("foo"));
|
||||||
|
|
||||||
|
// But we do inside of it
|
||||||
|
withContext.run();
|
||||||
|
|
||||||
|
// but not after
|
||||||
|
assertNull(threadContext.getHeader("foo"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testPreserveContextKeepsOriginalContextWhenCalledTwice() throws IOException {
|
public void testPreserveContextKeepsOriginalContextWhenCalledTwice() throws IOException {
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
Runnable originalWithContext;
|
Runnable originalWithContext;
|
||||||
Runnable withContext;
|
Runnable withContext;
|
||||||
|
|
||||||
// Create a runnable that should run with some header
|
// Create a runnable that should run with some header
|
||||||
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
||||||
threadContext.putHeader("foo", "bar");
|
threadContext.putHeader("foo", "bar");
|
||||||
withContext = threadContext.preserveContext(sometimesAbstractRunnable(() -> {
|
withContext = threadContext.preserveContext(sometimesAbstractRunnable(() -> {
|
||||||
assertEquals("bar", threadContext.getHeader("foo"));
|
assertEquals("bar", threadContext.getHeader("foo"));
|
||||||
}));
|
}));
|
||||||
}
|
|
||||||
|
|
||||||
// Now attempt to rewrap it
|
|
||||||
originalWithContext = withContext;
|
|
||||||
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
|
||||||
threadContext.putHeader("foo", "zot");
|
|
||||||
withContext = threadContext.preserveContext(withContext);
|
|
||||||
}
|
|
||||||
|
|
||||||
// We get the original context inside the runnable
|
|
||||||
withContext.run();
|
|
||||||
|
|
||||||
// In fact the second wrapping didn't even change it
|
|
||||||
assertThat(withContext, sameInstance(originalWithContext));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Now attempt to rewrap it
|
||||||
|
originalWithContext = withContext;
|
||||||
|
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
||||||
|
threadContext.putHeader("foo", "zot");
|
||||||
|
withContext = threadContext.preserveContext(withContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
// We get the original context inside the runnable
|
||||||
|
withContext.run();
|
||||||
|
|
||||||
|
// In fact the second wrapping didn't even change it
|
||||||
|
assertThat(withContext, sameInstance(originalWithContext));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testPreservesThreadsOriginalContextOnRunException() throws IOException {
|
public void testPreservesThreadsOriginalContextOnRunException() throws IOException {
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
Runnable withContext;
|
Runnable withContext;
|
||||||
|
|
||||||
// create a abstract runnable, add headers and transient objects and verify in the methods
|
// create a abstract runnable, add headers and transient objects and verify in the methods
|
||||||
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
||||||
threadContext.putHeader("foo", "bar");
|
threadContext.putHeader("foo", "bar");
|
||||||
boolean systemContext = randomBoolean();
|
boolean systemContext = randomBoolean();
|
||||||
if (systemContext) {
|
if (systemContext) {
|
||||||
threadContext.markAsSystemContext();
|
threadContext.markAsSystemContext();
|
||||||
}
|
|
||||||
threadContext.putTransient("foo", "bar_transient");
|
|
||||||
withContext = threadContext.preserveContext(new AbstractRunnable() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onAfter() {
|
|
||||||
assertEquals(systemContext, threadContext.isSystemContext());
|
|
||||||
assertEquals("bar", threadContext.getHeader("foo"));
|
|
||||||
assertEquals("bar_transient", threadContext.getTransient("foo"));
|
|
||||||
assertNotNull(threadContext.getTransient("failure"));
|
|
||||||
assertEquals("exception from doRun", ((RuntimeException) threadContext.getTransient("failure")).getMessage());
|
|
||||||
assertFalse(threadContext.isDefaultContext());
|
|
||||||
threadContext.putTransient("after", "after");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Exception e) {
|
|
||||||
assertEquals(systemContext, threadContext.isSystemContext());
|
|
||||||
assertEquals("exception from doRun", e.getMessage());
|
|
||||||
assertEquals("bar", threadContext.getHeader("foo"));
|
|
||||||
assertEquals("bar_transient", threadContext.getTransient("foo"));
|
|
||||||
assertFalse(threadContext.isDefaultContext());
|
|
||||||
threadContext.putTransient("failure", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void doRun() throws Exception {
|
|
||||||
assertEquals(systemContext, threadContext.isSystemContext());
|
|
||||||
assertEquals("bar", threadContext.getHeader("foo"));
|
|
||||||
assertEquals("bar_transient", threadContext.getTransient("foo"));
|
|
||||||
assertFalse(threadContext.isDefaultContext());
|
|
||||||
throw new RuntimeException("exception from doRun");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
threadContext.putTransient("foo", "bar_transient");
|
||||||
|
withContext = threadContext.preserveContext(new AbstractRunnable() {
|
||||||
|
|
||||||
// We don't see the header outside of the runnable
|
@Override
|
||||||
assertNull(threadContext.getHeader("foo"));
|
public void onAfter() {
|
||||||
assertNull(threadContext.getTransient("foo"));
|
assertEquals(systemContext, threadContext.isSystemContext());
|
||||||
assertNull(threadContext.getTransient("failure"));
|
assertEquals("bar", threadContext.getHeader("foo"));
|
||||||
assertNull(threadContext.getTransient("after"));
|
assertEquals("bar_transient", threadContext.getTransient("foo"));
|
||||||
assertTrue(threadContext.isDefaultContext());
|
assertNotNull(threadContext.getTransient("failure"));
|
||||||
|
assertEquals("exception from doRun", ((RuntimeException) threadContext.getTransient("failure")).getMessage());
|
||||||
|
assertFalse(threadContext.isDefaultContext());
|
||||||
|
threadContext.putTransient("after", "after");
|
||||||
|
}
|
||||||
|
|
||||||
// But we do inside of it
|
@Override
|
||||||
withContext.run();
|
public void onFailure(Exception e) {
|
||||||
|
assertEquals(systemContext, threadContext.isSystemContext());
|
||||||
// verify not seen after
|
assertEquals("exception from doRun", e.getMessage());
|
||||||
assertNull(threadContext.getHeader("foo"));
|
|
||||||
assertNull(threadContext.getTransient("foo"));
|
|
||||||
assertNull(threadContext.getTransient("failure"));
|
|
||||||
assertNull(threadContext.getTransient("after"));
|
|
||||||
assertTrue(threadContext.isDefaultContext());
|
|
||||||
|
|
||||||
// repeat with regular runnable
|
|
||||||
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
|
||||||
threadContext.putHeader("foo", "bar");
|
|
||||||
threadContext.putTransient("foo", "bar_transient");
|
|
||||||
withContext = threadContext.preserveContext(() -> {
|
|
||||||
assertEquals("bar", threadContext.getHeader("foo"));
|
assertEquals("bar", threadContext.getHeader("foo"));
|
||||||
assertEquals("bar_transient", threadContext.getTransient("foo"));
|
assertEquals("bar_transient", threadContext.getTransient("foo"));
|
||||||
assertFalse(threadContext.isDefaultContext());
|
assertFalse(threadContext.isDefaultContext());
|
||||||
threadContext.putTransient("run", true);
|
threadContext.putTransient("failure", e);
|
||||||
throw new RuntimeException("exception from run");
|
}
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
assertNull(threadContext.getHeader("foo"));
|
@Override
|
||||||
assertNull(threadContext.getTransient("foo"));
|
protected void doRun() throws Exception {
|
||||||
assertNull(threadContext.getTransient("run"));
|
assertEquals(systemContext, threadContext.isSystemContext());
|
||||||
assertTrue(threadContext.isDefaultContext());
|
assertEquals("bar", threadContext.getHeader("foo"));
|
||||||
|
assertEquals("bar_transient", threadContext.getTransient("foo"));
|
||||||
final Runnable runnable = withContext;
|
assertFalse(threadContext.isDefaultContext());
|
||||||
RuntimeException e = expectThrows(RuntimeException.class, runnable::run);
|
throw new RuntimeException("exception from doRun");
|
||||||
assertEquals("exception from run", e.getMessage());
|
}
|
||||||
assertNull(threadContext.getHeader("foo"));
|
});
|
||||||
assertNull(threadContext.getTransient("foo"));
|
|
||||||
assertNull(threadContext.getTransient("run"));
|
|
||||||
assertTrue(threadContext.isDefaultContext());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We don't see the header outside of the runnable
|
||||||
|
assertNull(threadContext.getHeader("foo"));
|
||||||
|
assertNull(threadContext.getTransient("foo"));
|
||||||
|
assertNull(threadContext.getTransient("failure"));
|
||||||
|
assertNull(threadContext.getTransient("after"));
|
||||||
|
assertTrue(threadContext.isDefaultContext());
|
||||||
|
|
||||||
|
// But we do inside of it
|
||||||
|
withContext.run();
|
||||||
|
|
||||||
|
// verify not seen after
|
||||||
|
assertNull(threadContext.getHeader("foo"));
|
||||||
|
assertNull(threadContext.getTransient("foo"));
|
||||||
|
assertNull(threadContext.getTransient("failure"));
|
||||||
|
assertNull(threadContext.getTransient("after"));
|
||||||
|
assertTrue(threadContext.isDefaultContext());
|
||||||
|
|
||||||
|
// repeat with regular runnable
|
||||||
|
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
||||||
|
threadContext.putHeader("foo", "bar");
|
||||||
|
threadContext.putTransient("foo", "bar_transient");
|
||||||
|
withContext = threadContext.preserveContext(() -> {
|
||||||
|
assertEquals("bar", threadContext.getHeader("foo"));
|
||||||
|
assertEquals("bar_transient", threadContext.getTransient("foo"));
|
||||||
|
assertFalse(threadContext.isDefaultContext());
|
||||||
|
threadContext.putTransient("run", true);
|
||||||
|
throw new RuntimeException("exception from run");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNull(threadContext.getHeader("foo"));
|
||||||
|
assertNull(threadContext.getTransient("foo"));
|
||||||
|
assertNull(threadContext.getTransient("run"));
|
||||||
|
assertTrue(threadContext.isDefaultContext());
|
||||||
|
|
||||||
|
final Runnable runnable = withContext;
|
||||||
|
RuntimeException e = expectThrows(RuntimeException.class, runnable::run);
|
||||||
|
assertEquals("exception from run", e.getMessage());
|
||||||
|
assertNull(threadContext.getHeader("foo"));
|
||||||
|
assertNull(threadContext.getTransient("foo"));
|
||||||
|
assertNull(threadContext.getTransient("run"));
|
||||||
|
assertTrue(threadContext.isDefaultContext());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testPreservesThreadsOriginalContextOnFailureException() throws IOException {
|
public void testPreservesThreadsOriginalContextOnFailureException() throws IOException {
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
Runnable withContext;
|
Runnable withContext;
|
||||||
|
|
||||||
// a runnable that throws from onFailure
|
// a runnable that throws from onFailure
|
||||||
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
||||||
threadContext.putHeader("foo", "bar");
|
threadContext.putHeader("foo", "bar");
|
||||||
threadContext.putTransient("foo", "bar_transient");
|
threadContext.putTransient("foo", "bar_transient");
|
||||||
withContext = threadContext.preserveContext(new AbstractRunnable() {
|
withContext = threadContext.preserveContext(new AbstractRunnable() {
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
throw new RuntimeException("from onFailure", e);
|
throw new RuntimeException("from onFailure", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doRun() throws Exception {
|
protected void doRun() throws Exception {
|
||||||
assertEquals("bar", threadContext.getHeader("foo"));
|
assertEquals("bar", threadContext.getHeader("foo"));
|
||||||
assertEquals("bar_transient", threadContext.getTransient("foo"));
|
assertEquals("bar_transient", threadContext.getTransient("foo"));
|
||||||
assertFalse(threadContext.isDefaultContext());
|
assertFalse(threadContext.isDefaultContext());
|
||||||
throw new RuntimeException("from doRun");
|
throw new RuntimeException("from doRun");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
|
||||||
|
|
||||||
// We don't see the header outside of the runnable
|
|
||||||
assertNull(threadContext.getHeader("foo"));
|
|
||||||
assertNull(threadContext.getTransient("foo"));
|
|
||||||
assertTrue(threadContext.isDefaultContext());
|
|
||||||
|
|
||||||
// But we do inside of it
|
|
||||||
RuntimeException e = expectThrows(RuntimeException.class, withContext::run);
|
|
||||||
assertEquals("from onFailure", e.getMessage());
|
|
||||||
assertEquals("from doRun", e.getCause().getMessage());
|
|
||||||
|
|
||||||
// but not after
|
|
||||||
assertNull(threadContext.getHeader("foo"));
|
|
||||||
assertNull(threadContext.getTransient("foo"));
|
|
||||||
assertTrue(threadContext.isDefaultContext());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We don't see the header outside of the runnable
|
||||||
|
assertNull(threadContext.getHeader("foo"));
|
||||||
|
assertNull(threadContext.getTransient("foo"));
|
||||||
|
assertTrue(threadContext.isDefaultContext());
|
||||||
|
|
||||||
|
// But we do inside of it
|
||||||
|
RuntimeException e = expectThrows(RuntimeException.class, withContext::run);
|
||||||
|
assertEquals("from onFailure", e.getMessage());
|
||||||
|
assertEquals("from doRun", e.getCause().getMessage());
|
||||||
|
|
||||||
|
// but not after
|
||||||
|
assertNull(threadContext.getHeader("foo"));
|
||||||
|
assertNull(threadContext.getTransient("foo"));
|
||||||
|
assertTrue(threadContext.isDefaultContext());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testPreservesThreadsOriginalContextOnAfterException() throws IOException {
|
public void testPreservesThreadsOriginalContextOnAfterException() throws IOException {
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
Runnable withContext;
|
Runnable withContext;
|
||||||
|
|
||||||
// a runnable that throws from onAfter
|
// a runnable that throws from onAfter
|
||||||
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
||||||
threadContext.putHeader("foo", "bar");
|
threadContext.putHeader("foo", "bar");
|
||||||
threadContext.putTransient("foo", "bar_transient");
|
threadContext.putTransient("foo", "bar_transient");
|
||||||
withContext = threadContext.preserveContext(new AbstractRunnable() {
|
withContext = threadContext.preserveContext(new AbstractRunnable() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onAfter() {
|
public void onAfter() {
|
||||||
throw new RuntimeException("from onAfter");
|
throw new RuntimeException("from onAfter");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
throw new RuntimeException("from onFailure", e);
|
throw new RuntimeException("from onFailure", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doRun() throws Exception {
|
protected void doRun() throws Exception {
|
||||||
assertEquals("bar", threadContext.getHeader("foo"));
|
assertEquals("bar", threadContext.getHeader("foo"));
|
||||||
assertEquals("bar_transient", threadContext.getTransient("foo"));
|
assertEquals("bar_transient", threadContext.getTransient("foo"));
|
||||||
assertFalse(threadContext.isDefaultContext());
|
assertFalse(threadContext.isDefaultContext());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
|
||||||
|
|
||||||
// We don't see the header outside of the runnable
|
|
||||||
assertNull(threadContext.getHeader("foo"));
|
|
||||||
assertNull(threadContext.getTransient("foo"));
|
|
||||||
assertTrue(threadContext.isDefaultContext());
|
|
||||||
|
|
||||||
// But we do inside of it
|
|
||||||
RuntimeException e = expectThrows(RuntimeException.class, withContext::run);
|
|
||||||
assertEquals("from onAfter", e.getMessage());
|
|
||||||
assertNull(e.getCause());
|
|
||||||
|
|
||||||
// but not after
|
|
||||||
assertNull(threadContext.getHeader("foo"));
|
|
||||||
assertNull(threadContext.getTransient("foo"));
|
|
||||||
assertTrue(threadContext.isDefaultContext());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We don't see the header outside of the runnable
|
||||||
|
assertNull(threadContext.getHeader("foo"));
|
||||||
|
assertNull(threadContext.getTransient("foo"));
|
||||||
|
assertTrue(threadContext.isDefaultContext());
|
||||||
|
|
||||||
|
// But we do inside of it
|
||||||
|
RuntimeException e = expectThrows(RuntimeException.class, withContext::run);
|
||||||
|
assertEquals("from onAfter", e.getMessage());
|
||||||
|
assertNull(e.getCause());
|
||||||
|
|
||||||
|
// but not after
|
||||||
|
assertNull(threadContext.getHeader("foo"));
|
||||||
|
assertNull(threadContext.getTransient("foo"));
|
||||||
|
assertTrue(threadContext.isDefaultContext());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testMarkAsSystemContext() throws IOException {
|
public void testMarkAsSystemContext() throws IOException {
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
assertFalse(threadContext.isSystemContext());
|
assertFalse(threadContext.isSystemContext());
|
||||||
try (ThreadContext.StoredContext context = threadContext.stashContext()) {
|
try (ThreadContext.StoredContext context = threadContext.stashContext()) {
|
||||||
assertFalse(threadContext.isSystemContext());
|
|
||||||
threadContext.markAsSystemContext();
|
|
||||||
assertTrue(threadContext.isSystemContext());
|
|
||||||
}
|
|
||||||
assertFalse(threadContext.isSystemContext());
|
assertFalse(threadContext.isSystemContext());
|
||||||
|
threadContext.markAsSystemContext();
|
||||||
|
assertTrue(threadContext.isSystemContext());
|
||||||
}
|
}
|
||||||
|
assertFalse(threadContext.isSystemContext());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testPutHeaders() {
|
public void testPutHeaders() {
|
||||||
|
|
|
@ -166,7 +166,6 @@ public class NodeTests extends ESTestCase {
|
||||||
assertTrue(node.awaitClose(1, TimeUnit.DAYS));
|
assertTrue(node.awaitClose(1, TimeUnit.DAYS));
|
||||||
}
|
}
|
||||||
|
|
||||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42577")
|
|
||||||
public void testCloseRaceWithTaskExecution() throws Exception {
|
public void testCloseRaceWithTaskExecution() throws Exception {
|
||||||
Node node = new MockNode(baseSettings().build(), basePlugins());
|
Node node = new MockNode(baseSettings().build(), basePlugins());
|
||||||
node.start();
|
node.start();
|
||||||
|
|
|
@ -62,8 +62,7 @@ public class ThreadPoolTests extends ESTestCase {
|
||||||
// the delta can be large, we just care it is the same order of magnitude
|
// the delta can be large, we just care it is the same order of magnitude
|
||||||
assertTrue("thread pool cached absolute time " + gotTime + " is too far from real current time " + currentTime, delta < 10000);
|
assertTrue("thread pool cached absolute time " + gotTime + " is too far from real current time " + currentTime, delta < 10000);
|
||||||
} finally {
|
} finally {
|
||||||
threadPool.shutdown();
|
terminate(threadPool);
|
||||||
threadPool.close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -90,9 +90,8 @@ public class TransportLoggerTests extends ESTestCase {
|
||||||
private BytesReference buildRequest() throws IOException {
|
private BytesReference buildRequest() throws IOException {
|
||||||
try (BytesStreamOutput messageOutput = new BytesStreamOutput()) {
|
try (BytesStreamOutput messageOutput = new BytesStreamOutput()) {
|
||||||
messageOutput.setVersion(Version.CURRENT);
|
messageOutput.setVersion(Version.CURRENT);
|
||||||
try (ThreadContext context = new ThreadContext(Settings.EMPTY)) {
|
ThreadContext context = new ThreadContext(Settings.EMPTY);
|
||||||
context.writeTo(messageOutput);
|
context.writeTo(messageOutput);
|
||||||
}
|
|
||||||
messageOutput.writeString(ClusterStatsAction.NAME);
|
messageOutput.writeString(ClusterStatsAction.NAME);
|
||||||
new ClusterStatsRequest().writeTo(messageOutput);
|
new ClusterStatsRequest().writeTo(messageOutput);
|
||||||
BytesReference messageBody = messageOutput.bytes();
|
BytesReference messageBody = messageOutput.bytes();
|
||||||
|
|
|
@ -370,7 +370,8 @@ public abstract class ESTestCase extends LuceneTestCase {
|
||||||
// initialized
|
// initialized
|
||||||
if (threadContext != null) {
|
if (threadContext != null) {
|
||||||
ensureNoWarnings();
|
ensureNoWarnings();
|
||||||
assert threadContext == null;
|
DeprecationLogger.removeThreadContext(threadContext);
|
||||||
|
threadContext = null;
|
||||||
}
|
}
|
||||||
ensureAllSearchContextsReleased();
|
ensureAllSearchContextsReleased();
|
||||||
ensureCheckIndexPassed();
|
ensureCheckIndexPassed();
|
||||||
|
@ -393,7 +394,7 @@ public abstract class ESTestCase extends LuceneTestCase {
|
||||||
final List<String> warnings = threadContext.getResponseHeaders().get("Warning");
|
final List<String> warnings = threadContext.getResponseHeaders().get("Warning");
|
||||||
assertNull("unexpected warning headers", warnings);
|
assertNull("unexpected warning headers", warnings);
|
||||||
} finally {
|
} finally {
|
||||||
resetDeprecationLogger(false);
|
resetDeprecationLogger();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -434,26 +435,16 @@ public abstract class ESTestCase extends LuceneTestCase {
|
||||||
+ Arrays.asList(expectedWarnings) + "\nActual: " + actualWarnings,
|
+ Arrays.asList(expectedWarnings) + "\nActual: " + actualWarnings,
|
||||||
expectedWarnings.length, actualWarnings.size());
|
expectedWarnings.length, actualWarnings.size());
|
||||||
} finally {
|
} finally {
|
||||||
resetDeprecationLogger(true);
|
resetDeprecationLogger();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reset the deprecation logger by removing the current thread context, and setting a new thread context if {@code setNewThreadContext}
|
* Reset the deprecation logger by clearing the current thread context.
|
||||||
* is set to {@code true} and otherwise clearing the current thread context.
|
|
||||||
*
|
|
||||||
* @param setNewThreadContext whether or not to attach a new thread context to the deprecation logger
|
|
||||||
*/
|
*/
|
||||||
private void resetDeprecationLogger(final boolean setNewThreadContext) {
|
private void resetDeprecationLogger() {
|
||||||
// "clear" current warning headers by setting a new ThreadContext
|
// "clear" context by stashing current values and dropping the returned StoredContext
|
||||||
DeprecationLogger.removeThreadContext(this.threadContext);
|
threadContext.stashContext();
|
||||||
this.threadContext.close();
|
|
||||||
if (setNewThreadContext) {
|
|
||||||
this.threadContext = new ThreadContext(Settings.EMPTY);
|
|
||||||
DeprecationLogger.setThreadContext(this.threadContext);
|
|
||||||
} else {
|
|
||||||
this.threadContext = null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final List<StatusData> statusData = new ArrayList<>();
|
private static final List<StatusData> statusData = new ArrayList<>();
|
||||||
|
|
|
@ -881,14 +881,13 @@ public abstract class ESRestTestCase extends ESTestCase {
|
||||||
throw new RuntimeException("Error setting up ssl", e);
|
throw new RuntimeException("Error setting up ssl", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try (ThreadContext threadContext = new ThreadContext(settings)) {
|
Map<String, String> headers = ThreadContext.buildDefaultHeaders(settings);
|
||||||
Header[] defaultHeaders = new Header[threadContext.getHeaders().size()];
|
Header[] defaultHeaders = new Header[headers.size()];
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (Map.Entry<String, String> entry : threadContext.getHeaders().entrySet()) {
|
for (Map.Entry<String, String> entry : headers.entrySet()) {
|
||||||
defaultHeaders[i++] = new BasicHeader(entry.getKey(), entry.getValue());
|
defaultHeaders[i++] = new BasicHeader(entry.getKey(), entry.getValue());
|
||||||
}
|
|
||||||
builder.setDefaultHeaders(defaultHeaders);
|
|
||||||
}
|
}
|
||||||
|
builder.setDefaultHeaders(defaultHeaders);
|
||||||
final String socketTimeoutString = Objects.requireNonNullElse(settings.get(CLIENT_SOCKET_TIMEOUT), "60s");
|
final String socketTimeoutString = Objects.requireNonNullElse(settings.get(CLIENT_SOCKET_TIMEOUT), "60s");
|
||||||
final TimeValue socketTimeout = TimeValue.parseTimeValue(socketTimeoutString, CLIENT_SOCKET_TIMEOUT);
|
final TimeValue socketTimeout = TimeValue.parseTimeValue(socketTimeoutString, CLIENT_SOCKET_TIMEOUT);
|
||||||
builder.setRequestConfigCallback(conf -> conf.setSocketTimeout(Math.toIntExact(socketTimeout.getMillis())));
|
builder.setRequestConfigCallback(conf -> conf.setSocketTimeout(Math.toIntExact(socketTimeout.getMillis())));
|
||||||
|
|
|
@ -32,6 +32,7 @@ import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
@ -83,7 +84,7 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
|
||||||
assertThat(e.getMessage(), containsString("no such repository [repo]"));
|
assertThat(e.getMessage(), containsString("no such repository [repo]"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testNothingScheduledWhenNotRunning() {
|
public void testNothingScheduledWhenNotRunning() throws InterruptedException {
|
||||||
ClockMock clock = new ClockMock();
|
ClockMock clock = new ClockMock();
|
||||||
SnapshotLifecyclePolicyMetadata initialPolicy = SnapshotLifecyclePolicyMetadata.builder()
|
SnapshotLifecyclePolicyMetadata initialPolicy = SnapshotLifecyclePolicyMetadata.builder()
|
||||||
.setPolicy(createPolicy("initial", "*/1 * * * * ?"))
|
.setPolicy(createPolicy("initial", "*/1 * * * * ?"))
|
||||||
|
@ -94,8 +95,8 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
|
||||||
ClusterState initialState = createState(new SnapshotLifecycleMetadata(
|
ClusterState initialState = createState(new SnapshotLifecycleMetadata(
|
||||||
Collections.singletonMap(initialPolicy.getPolicy().getId(), initialPolicy),
|
Collections.singletonMap(initialPolicy.getPolicy().getId(), initialPolicy),
|
||||||
OperationMode.RUNNING, new SnapshotLifecycleStats()));
|
OperationMode.RUNNING, new SnapshotLifecycleStats()));
|
||||||
try (ThreadPool threadPool = new TestThreadPool("test");
|
ThreadPool threadPool = new TestThreadPool("test");
|
||||||
ClusterService clusterService = ClusterServiceUtils.createClusterService(initialState, threadPool);
|
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(initialState, threadPool);
|
||||||
SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY,
|
SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY,
|
||||||
() -> new FakeSnapshotTask(e -> logger.info("triggered")), clusterService, clock)) {
|
() -> new FakeSnapshotTask(e -> logger.info("triggered")), clusterService, clock)) {
|
||||||
|
|
||||||
|
@ -140,8 +141,9 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
|
||||||
sls.onMaster();
|
sls.onMaster();
|
||||||
sls.clusterChanged(new ClusterChangedEvent("1", state, emptyState));
|
sls.clusterChanged(new ClusterChangedEvent("1", state, emptyState));
|
||||||
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
|
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
|
||||||
|
} finally {
|
||||||
threadPool.shutdownNow();
|
threadPool.shutdownNow();
|
||||||
|
threadPool.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,8 +156,8 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
|
||||||
ClockMock clock = new ClockMock();
|
ClockMock clock = new ClockMock();
|
||||||
final AtomicInteger triggerCount = new AtomicInteger(0);
|
final AtomicInteger triggerCount = new AtomicInteger(0);
|
||||||
final AtomicReference<Consumer<SchedulerEngine.Event>> trigger = new AtomicReference<>(e -> triggerCount.incrementAndGet());
|
final AtomicReference<Consumer<SchedulerEngine.Event>> trigger = new AtomicReference<>(e -> triggerCount.incrementAndGet());
|
||||||
try (ThreadPool threadPool = new TestThreadPool("test");
|
ThreadPool threadPool = new TestThreadPool("test");
|
||||||
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
||||||
SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY,
|
SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY,
|
||||||
() -> new FakeSnapshotTask(e -> trigger.get().accept(e)), clusterService, clock)) {
|
() -> new FakeSnapshotTask(e -> trigger.get().accept(e)), clusterService, clock)) {
|
||||||
|
|
||||||
|
@ -250,8 +252,9 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
|
||||||
// Signify becoming non-master, the jobs should all be cancelled
|
// Signify becoming non-master, the jobs should all be cancelled
|
||||||
sls.offMaster();
|
sls.offMaster();
|
||||||
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
|
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
|
||||||
|
} finally {
|
||||||
threadPool.shutdownNow();
|
threadPool.shutdownNow();
|
||||||
|
threadPool.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,8 +265,8 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
|
||||||
ClockMock clock = new ClockMock();
|
ClockMock clock = new ClockMock();
|
||||||
final AtomicInteger triggerCount = new AtomicInteger(0);
|
final AtomicInteger triggerCount = new AtomicInteger(0);
|
||||||
final AtomicReference<Consumer<SchedulerEngine.Event>> trigger = new AtomicReference<>(e -> triggerCount.incrementAndGet());
|
final AtomicReference<Consumer<SchedulerEngine.Event>> trigger = new AtomicReference<>(e -> triggerCount.incrementAndGet());
|
||||||
try (ThreadPool threadPool = new TestThreadPool("test");
|
ThreadPool threadPool = new TestThreadPool("test");
|
||||||
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
||||||
SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY,
|
SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY,
|
||||||
() -> new FakeSnapshotTask(e -> trigger.get().accept(e)), clusterService, clock)) {
|
() -> new FakeSnapshotTask(e -> trigger.get().accept(e)), clusterService, clock)) {
|
||||||
sls.onMaster();
|
sls.onMaster();
|
||||||
|
@ -304,8 +307,9 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
|
||||||
|
|
||||||
sls.offMaster();
|
sls.offMaster();
|
||||||
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
|
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
|
||||||
|
} finally {
|
||||||
threadPool.shutdownNow();
|
threadPool.shutdownNow();
|
||||||
|
threadPool.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
@ -42,13 +43,13 @@ public class SnapshotRetentionServiceTests extends ESTestCase {
|
||||||
clusterSettings = new ClusterSettings(Settings.EMPTY, internalSettings);
|
clusterSettings = new ClusterSettings(Settings.EMPTY, internalSettings);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testJobsAreScheduled() {
|
public void testJobsAreScheduled() throws InterruptedException {
|
||||||
final DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(),
|
final DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(),
|
||||||
Collections.emptyMap(), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT);
|
Collections.emptyMap(), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT);
|
||||||
ClockMock clock = new ClockMock();
|
ClockMock clock = new ClockMock();
|
||||||
|
|
||||||
try (ThreadPool threadPool = new TestThreadPool("test");
|
ThreadPool threadPool = new TestThreadPool("test");
|
||||||
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings);
|
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings);
|
||||||
SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY,
|
SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY,
|
||||||
FakeRetentionTask::new, clusterService, clock)) {
|
FakeRetentionTask::new, clusterService, clock)) {
|
||||||
assertThat(service.getScheduler().jobCount(), equalTo(0));
|
assertThat(service.getScheduler().jobCount(), equalTo(0));
|
||||||
|
@ -75,14 +76,14 @@ public class SnapshotRetentionServiceTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testManualTriggering() {
|
public void testManualTriggering() throws InterruptedException {
|
||||||
final DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(),
|
final DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(),
|
||||||
Collections.emptyMap(), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT);
|
Collections.emptyMap(), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT);
|
||||||
ClockMock clock = new ClockMock();
|
ClockMock clock = new ClockMock();
|
||||||
AtomicInteger invoked = new AtomicInteger(0);
|
AtomicInteger invoked = new AtomicInteger(0);
|
||||||
|
|
||||||
try (ThreadPool threadPool = new TestThreadPool("test");
|
ThreadPool threadPool = new TestThreadPool("test");
|
||||||
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings);
|
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings);
|
||||||
SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY,
|
SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY,
|
||||||
() -> new FakeRetentionTask(event -> {
|
() -> new FakeRetentionTask(event -> {
|
||||||
assertThat(event.getJobName(), equalTo(SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID));
|
assertThat(event.getJobName(), equalTo(SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID));
|
||||||
|
@ -100,8 +101,9 @@ public class SnapshotRetentionServiceTests extends ESTestCase {
|
||||||
service.onMaster();
|
service.onMaster();
|
||||||
service.triggerRetention();
|
service.triggerRetention();
|
||||||
assertThat(invoked.get(), equalTo(2));
|
assertThat(invoked.get(), equalTo(2));
|
||||||
|
} finally {
|
||||||
threadPool.shutdownNow();
|
threadPool.shutdownNow();
|
||||||
|
threadPool.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -154,8 +154,8 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void retentionTaskTest(final boolean deletionSuccess) throws Exception {
|
private void retentionTaskTest(final boolean deletionSuccess) throws Exception {
|
||||||
try (ThreadPool threadPool = new TestThreadPool("slm-test");
|
ThreadPool threadPool = new TestThreadPool("slm-test");
|
||||||
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
||||||
Client noOpClient = new NoOpClient("slm-test")) {
|
Client noOpClient = new NoOpClient("slm-test")) {
|
||||||
|
|
||||||
final String policyId = "policy";
|
final String policyId = "policy";
|
||||||
|
@ -222,7 +222,7 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
|
||||||
boolean historySuccess = historyLatch.await(10, TimeUnit.SECONDS);
|
boolean historySuccess = historyLatch.await(10, TimeUnit.SECONDS);
|
||||||
assertThat("expected history entries for 1 snapshot deletions", historySuccess, equalTo(true));
|
assertThat("expected history entries for 1 snapshot deletions", historySuccess, equalTo(true));
|
||||||
assertThat(deletedSnapshotsInHistory, contains(eligibleSnapshot.snapshotId().getName()));
|
assertThat(deletedSnapshotsInHistory, contains(eligibleSnapshot.snapshotId().getName()));
|
||||||
|
} finally {
|
||||||
threadPool.shutdownNow();
|
threadPool.shutdownNow();
|
||||||
threadPool.awaitTermination(10, TimeUnit.SECONDS);
|
threadPool.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
@ -237,8 +237,8 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception {
|
private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception {
|
||||||
try (ThreadPool threadPool = new TestThreadPool("slm-test");
|
ThreadPool threadPool = new TestThreadPool("slm-test");
|
||||||
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
||||||
Client noOpClient = new NoOpClient("slm-test")) {
|
Client noOpClient = new NoOpClient("slm-test")) {
|
||||||
|
|
||||||
final String policyId = "policy";
|
final String policyId = "policy";
|
||||||
|
@ -321,7 +321,7 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
|
||||||
boolean historySuccess = historyLatch.await(10, TimeUnit.SECONDS);
|
boolean historySuccess = historyLatch.await(10, TimeUnit.SECONDS);
|
||||||
assertThat("expected history entries for 2 snapshot deletions", historySuccess, equalTo(true));
|
assertThat("expected history entries for 2 snapshot deletions", historySuccess, equalTo(true));
|
||||||
assertThat(deletedSnapshotsInHistory, containsInAnyOrder(snap1.snapshotId().getName(), snap2.snapshotId().getName()));
|
assertThat(deletedSnapshotsInHistory, containsInAnyOrder(snap1.snapshotId().getName(), snap2.snapshotId().getName()));
|
||||||
|
} finally {
|
||||||
threadPool.shutdownNow();
|
threadPool.shutdownNow();
|
||||||
threadPool.awaitTermination(10, TimeUnit.SECONDS);
|
threadPool.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
@ -374,8 +374,8 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doTestSkipDuringMode(OperationMode mode) throws Exception {
|
private void doTestSkipDuringMode(OperationMode mode) throws Exception {
|
||||||
try (ThreadPool threadPool = new TestThreadPool("slm-test");
|
ThreadPool threadPool = new TestThreadPool("slm-test");
|
||||||
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
||||||
Client noOpClient = new NoOpClient("slm-test")) {
|
Client noOpClient = new NoOpClient("slm-test")) {
|
||||||
final String policyId = "policy";
|
final String policyId = "policy";
|
||||||
final String repoId = "repo";
|
final String repoId = "repo";
|
||||||
|
@ -398,7 +398,7 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
|
||||||
|
|
||||||
long time = System.currentTimeMillis();
|
long time = System.currentTimeMillis();
|
||||||
task.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_JOB_ID, time, time));
|
task.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_JOB_ID, time, time));
|
||||||
|
} finally {
|
||||||
threadPool.shutdownNow();
|
threadPool.shutdownNow();
|
||||||
threadPool.awaitTermination(10, TimeUnit.SECONDS);
|
threadPool.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
@ -413,8 +413,8 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doTestRunManuallyDuringMode(OperationMode mode) throws Exception {
|
private void doTestRunManuallyDuringMode(OperationMode mode) throws Exception {
|
||||||
try (ThreadPool threadPool = new TestThreadPool("slm-test");
|
ThreadPool threadPool = new TestThreadPool("slm-test");
|
||||||
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
||||||
Client noOpClient = new NoOpClient("slm-test")) {
|
Client noOpClient = new NoOpClient("slm-test")) {
|
||||||
final String policyId = "policy";
|
final String policyId = "policy";
|
||||||
final String repoId = "repo";
|
final String repoId = "repo";
|
||||||
|
@ -426,20 +426,22 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
|
||||||
|
|
||||||
AtomicBoolean retentionWasRun = new AtomicBoolean(false);
|
AtomicBoolean retentionWasRun = new AtomicBoolean(false);
|
||||||
MockSnapshotRetentionTask task = new MockSnapshotRetentionTask(noOpClient, clusterService,
|
MockSnapshotRetentionTask task = new MockSnapshotRetentionTask(noOpClient, clusterService,
|
||||||
new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC, (historyItem) -> { }),
|
new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC, (historyItem) -> {
|
||||||
|
}),
|
||||||
threadPool,
|
threadPool,
|
||||||
() -> {
|
() -> {
|
||||||
retentionWasRun.set(true);
|
retentionWasRun.set(true);
|
||||||
return Collections.emptyMap();
|
return Collections.emptyMap();
|
||||||
},
|
},
|
||||||
(deletionPolicyId, repo, snapId, slmStats, listener) -> { },
|
(deletionPolicyId, repo, snapId, slmStats, listener) -> {
|
||||||
|
},
|
||||||
System::nanoTime);
|
System::nanoTime);
|
||||||
|
|
||||||
long time = System.currentTimeMillis();
|
long time = System.currentTimeMillis();
|
||||||
task.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID, time, time));
|
task.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID, time, time));
|
||||||
|
|
||||||
assertTrue("retention should be run manually even if SLM is disabled", retentionWasRun.get());
|
assertTrue("retention should be run manually even if SLM is disabled", retentionWasRun.get());
|
||||||
|
} finally {
|
||||||
threadPool.shutdownNow();
|
threadPool.shutdownNow();
|
||||||
threadPool.awaitTermination(10, TimeUnit.SECONDS);
|
threadPool.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,9 +49,8 @@ import org.elasticsearch.xpack.security.audit.AuditTrailService;
|
||||||
import org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrail;
|
import org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrail;
|
||||||
import org.elasticsearch.xpack.security.authc.Realms;
|
import org.elasticsearch.xpack.security.authc.Realms;
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
import org.junit.Before;
|
import org.junit.After;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -143,13 +142,9 @@ public class SecurityTests extends ESTestCase {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@After
|
||||||
public void cleanup() throws IOException {
|
public void cleanup() {
|
||||||
if (threadContext != null) {
|
threadContext = null;
|
||||||
threadContext.stashContext();
|
|
||||||
threadContext.close();
|
|
||||||
threadContext = null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCustomRealmExtension() throws Exception {
|
public void testCustomRealmExtension() throws Exception {
|
||||||
|
|
|
@ -399,51 +399,50 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testContextRestoreResponseHandlerRestoreOriginalContext() throws Exception {
|
public void testContextRestoreResponseHandlerRestoreOriginalContext() throws Exception {
|
||||||
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
|
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||||
threadContext.putTransient("foo", "bar");
|
threadContext.putTransient("foo", "bar");
|
||||||
threadContext.putHeader("key", "value");
|
threadContext.putHeader("key", "value");
|
||||||
TransportResponseHandler<Empty> handler;
|
TransportResponseHandler<Empty> handler;
|
||||||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||||
threadContext.putTransient("foo", "different_bar");
|
threadContext.putTransient("foo", "different_bar");
|
||||||
threadContext.putHeader("key", "value2");
|
threadContext.putHeader("key", "value2");
|
||||||
handler = new TransportService.ContextRestoreResponseHandler<>(threadContext.newRestorableContext(true),
|
handler = new TransportService.ContextRestoreResponseHandler<>(threadContext.newRestorableContext(true),
|
||||||
new TransportResponseHandler<Empty>() {
|
new TransportResponseHandler<Empty>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Empty read(StreamInput in) {
|
public Empty read(StreamInput in) {
|
||||||
return Empty.INSTANCE;
|
return Empty.INSTANCE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleResponse(Empty response) {
|
public void handleResponse(Empty response) {
|
||||||
assertEquals("different_bar", threadContext.getTransient("foo"));
|
assertEquals("different_bar", threadContext.getTransient("foo"));
|
||||||
assertEquals("value2", threadContext.getHeader("key"));
|
assertEquals("value2", threadContext.getHeader("key"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleException(TransportException exp) {
|
public void handleException(TransportException exp) {
|
||||||
assertEquals("different_bar", threadContext.getTransient("foo"));
|
assertEquals("different_bar", threadContext.getTransient("foo"));
|
||||||
assertEquals("value2", threadContext.getHeader("key"));
|
assertEquals("value2", threadContext.getHeader("key"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String executor() {
|
public String executor() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
|
||||||
|
|
||||||
assertEquals("bar", threadContext.getTransient("foo"));
|
|
||||||
assertEquals("value", threadContext.getHeader("key"));
|
|
||||||
handler.handleResponse(null);
|
|
||||||
|
|
||||||
assertEquals("bar", threadContext.getTransient("foo"));
|
|
||||||
assertEquals("value", threadContext.getHeader("key"));
|
|
||||||
handler.handleException(null);
|
|
||||||
|
|
||||||
assertEquals("bar", threadContext.getTransient("foo"));
|
|
||||||
assertEquals("value", threadContext.getHeader("key"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assertEquals("bar", threadContext.getTransient("foo"));
|
||||||
|
assertEquals("value", threadContext.getHeader("key"));
|
||||||
|
handler.handleResponse(null);
|
||||||
|
|
||||||
|
assertEquals("bar", threadContext.getTransient("foo"));
|
||||||
|
assertEquals("value", threadContext.getHeader("key"));
|
||||||
|
handler.handleException(null);
|
||||||
|
|
||||||
|
assertEquals("bar", threadContext.getTransient("foo"));
|
||||||
|
assertEquals("value", threadContext.getHeader("key"));
|
||||||
}
|
}
|
||||||
|
|
||||||
private String[] randomRoles() {
|
private String[] randomRoles() {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue