Merge pull request ESQL-1188 from elastic/main

🤖 ESQL: Merge upstream
This commit is contained in:
elasticsearchmachine 2023-05-26 13:18:02 -04:00 committed by GitHub
commit 0ebbf0f60e
63 changed files with 2441 additions and 698 deletions

View file

@ -0,0 +1,5 @@
pr: 95512
summary: Adding `manage_dlm` index privilege and expanding `view_index_metadata` for access to data lifecycle APIs
area: DLM
type: enhancement
issues: []

View file

@ -0,0 +1,6 @@
pr: 96328
summary: Add `ingest` information to the cluster info endpoint
area: Stats
type: enhancement
issues:
- 95392

View file

@ -33,9 +33,14 @@ You can use the Cluster Info API to retrieve information of a cluster.
A comma-separated list of the following options: A comma-separated list of the following options:
+ +
-- --
`_all`::
All the information available. Can not be mixed with other targets.
`http`:: `http`::
HTTP connection information. HTTP connection information.
`ingest`::
Ingest information.
-- --
[role="child_attributes"] [role="child_attributes"]
@ -126,6 +131,114 @@ Cumulative size in bytes of all requests from this client.
====== ======
[[cluster-info-api-response-body-ingest]]
`ingest`::
(object)
Contains ingest information for the cluster.
+
.Properties of `ingest`
[%collapsible%open]
======
`total`::
(object)
Contains information about ingest operations for the cluster.
+
.Properties of `total`
[%collapsible%open]
=======
`count`::
(integer)
Total number of documents ingested across the cluster.
`time`::
(<<time-units,time value>>)
Total time spent preprocessing ingest documents across the cluster.
`time_in_millis`::
(integer)
Total time, in milliseconds, spent preprocessing ingest documents across the cluster.
`current`::
(integer)
Total number of documents currently being ingested.
`failed`::
(integer)
Total number of failed ingest operations across the cluster.
=======
`pipelines`::
(object)
Contains information about ingest pipelines for the cluster.
+
.Properties of `pipelines`
[%collapsible%open]
=======
`<pipeline_id>`::
(object)
Contains information about the ingest pipeline.
+
.Properties of `<pipeline_id>`
[%collapsible%open]
========
`count`::
(integer)
Number of documents preprocessed by the ingest pipeline.
`time`::
(<<time-units,time value>>)
Total time spent preprocessing documents in the ingest pipeline.
`time_in_millis`::
(integer)
Total time, in milliseconds, spent preprocessing documents in the ingest
pipeline.
`failed`::
(integer)
Total number of failed operations for the ingest pipeline.
`processors`::
(array of objects)
Contains information for the ingest processors for the ingest pipeline.
+
.Properties of `processors`
[%collapsible%open]
=========
`<processor>`::
(object)
Contains information for the ingest processor.
+
.Properties of `<processor>`
[%collapsible%open]
==========
`count`::
(integer)
Number of documents transformed by the processor.
`time`::
(<<time-units,time value>>)
Time spent by the processor transforming documents.
`time_in_millis`::
(integer)
Time, in milliseconds, spent by the processor transforming documents.
`current`::
(integer)
Number of documents currently being transformed by the processor.
`failed`::
(integer)
Number of failed operations for the processor.
==========
=========
========
=======
======
[[cluster-info-api-example]] [[cluster-info-api-example]]
==== {api-examples-title} ==== {api-examples-title}
@ -133,10 +246,13 @@ Cumulative size in bytes of all requests from this client.
---- ----
# returns all stats info of the cluster # returns all stats info of the cluster
GET /_info/_all GET /_info/_all
----
[source,console]
----
# returns the http info of the cluster # returns the http info of the cluster
GET /_info/http GET /_info/http
# returns the http info of the cluster
GET /_info/ingest
# returns the http and ingest info of the cluster
GET /_info/http,ingest
---- ----

View file

@ -8,6 +8,12 @@ experimental::[]
Deletes the lifecycle from a set of data streams. Deletes the lifecycle from a set of data streams.
[[delete-lifecycle-api-prereqs]]
==== {api-prereq-title}
* If the {es} {security-features} are enabled, you must have the `manage_dlm` index privilege or higher to
use this API. For more information, see <<security-privileges>>.
[[dlm-delete-lifecycle-request]] [[dlm-delete-lifecycle-request]]
==== {api-request-title} ==== {api-request-title}

View file

@ -8,6 +8,14 @@ experimental::[]
Retrieves the current data lifecycle status for one or more data stream backing indices. Retrieves the current data lifecycle status for one or more data stream backing indices.
[[explain-lifecycle-api-prereqs]]
==== {api-prereq-title}
* Nit: would rephrase as:
If the {es} {security-features} are enabled, you must have at least the `manage_dlm` index privilege or
`view_index_metadata` index privilege to use this API. For more information, see <<security-privileges>>.
[[dlm-explain-lifecycle-request]] [[dlm-explain-lifecycle-request]]
==== {api-request-title} ==== {api-request-title}

View file

@ -8,6 +8,13 @@ experimental::[]
Gets the lifecycle of a set of data streams. Gets the lifecycle of a set of data streams.
[[get-lifecycle-api-prereqs]]
==== {api-prereq-title}
* If the {es} {security-features} are enabled, you must have at least one of the `manage`
<<privileges-list-indices,index privilege>>, the `manage_dlm` index privilege, or the
`view_index_metadata` privilege to use this API. For more information, see <<security-privileges>>.
[[dlm-get-lifecycle-request]] [[dlm-get-lifecycle-request]]
==== {api-request-title} ==== {api-request-title}

View file

@ -8,6 +8,12 @@ experimental::[]
Configures the data lifecycle for the targeted data streams. Configures the data lifecycle for the targeted data streams.
[[put-lifecycle-api-prereqs]]
==== {api-prereq-title}
If the {es} {security-features} are enabled, you must have the `manage_dlm` index privilege or higher to use this API.
For more information, see <<security-privileges>>.
[[dlm-put-lifecycle-request]] [[dlm-put-lifecycle-request]]
==== {api-request-title} ==== {api-request-title}

View file

@ -440,6 +440,9 @@ GET /_xpack/usage
"enabled": true, "enabled": true,
"search_applications" : { "search_applications" : {
"count": 0 "count": 0
},
"analytics_collections": {
"count": 0
} }
} }
} }

View file

@ -31,7 +31,6 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
@ -141,10 +140,6 @@ public class DataStreamsStatsTransportAction extends TransportBroadcastByNodeAct
ActionListener.completeWith(listener, () -> { ActionListener.completeWith(listener, () -> {
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id()); IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
// if we don't have the routing entry yet, we need it stats wise, we treat it as if the shard is not ready yet
if (indexShard.routingEntry() == null) {
throw new ShardNotFoundException(indexShard.shardId());
}
StoreStats storeStats = indexShard.storeStats(); StoreStats storeStats = indexShard.storeStats();
IndexAbstraction indexAbstraction = clusterService.state().getMetadata().getIndicesLookup().get(shardRouting.getIndexName()); IndexAbstraction indexAbstraction = clusterService.state().getMetadata().getIndicesLookup().get(shardRouting.getIndexName());
assert indexAbstraction != null; assert indexAbstraction != null;

View file

View file

@ -0,0 +1,19 @@
import org.elasticsearch.gradle.Version
apply plugin: 'elasticsearch.legacy-java-rest-test'
apply plugin: 'elasticsearch.authenticated-testclusters'
dependencies {
javaRestTestImplementation project(":client:rest-high-level")
}
testClusters.configureEach {
testDistribution = 'DEFAULT'
setting 'xpack.watcher.enabled', 'false'
setting 'xpack.ml.enabled', 'false'
setting 'xpack.license.self_generated.type', 'trial'
rolesFile file('roles.yml')
user username: "test_dlm", password: "x-pack-test-password", role: "manage_dlm"
user username: "test_non_privileged", password: "x-pack-test-password", role: "not_privileged"
requiresFeature 'es.dlm_feature_flag_enabled', Version.fromString("8.9.0")
}

View file

@ -0,0 +1,18 @@
manage_dlm:
cluster:
- monitor
indices:
- names: [ 'dlm-*' ]
privileges:
- read
- write
- manage_dlm
not_privileged:
cluster:
- monitor
indices:
- names: [ 'dlm-*' ]
privileges:
- read
- write
- view_index_metadata

View file

@ -0,0 +1,165 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.dlm;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.ObjectPath;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class PermissionsIT extends ESRestTestCase {
@Override
protected Settings restClientSettings() {
// Note: This user is defined in build.gradle, and assigned the role "manage_dlm". That role is defined in roles.yml.
String token = basicAuthHeaderValue("test_dlm", new SecureString("x-pack-test-password".toCharArray()));
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
}
@Override
protected Settings restAdminSettings() {
String token = basicAuthHeaderValue("test_admin", new SecureString("x-pack-test-password".toCharArray()));
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
}
private Settings restUnprivilegedClientSettings() {
// Note: This user is defined in build.gradle, and assigned the role "not_privileged". That role is defined in roles.yml.
String token = basicAuthHeaderValue("test_non_privileged", new SecureString("x-pack-test-password".toCharArray()));
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
}
@SuppressWarnings("unchecked")
public void testManageDLM() throws Exception {
{
/*
* This test checks that a user with the "manage_dlm" index privilege on "dlm-*" data streams can delete and put a lifecycle
* on the "dlm-test" data stream, while a user with who does not have that privilege (but does have all of the other same
* "dlm-*" privileges) cannot delete or put a lifecycle on that datastream.
*/
String dataStreamName = "dlm-test"; // Needs to match the pattern of the names in roles.yml
createDataStreamAsAdmin(dataStreamName);
Response getDatastreamRepsonse = adminClient().performRequest(new Request("GET", "/_data_stream/" + dataStreamName));
final List<Map<String, Object>> nodes = ObjectPath.createFromResponse(getDatastreamRepsonse).evaluate("data_streams");
String index = (String) ((List<Map<String, Object>>) nodes.get(0).get("indices")).get(0).get("index_name");
Request explainLifecycleRequest = new Request("GET", "/" + randomFrom("_all", "*", index) + "/_lifecycle/explain");
Request getLifecycleRequest = new Request("GET", "_data_stream/" + randomFrom("_all", "*", dataStreamName) + "/_lifecycle");
Request deleteLifecycleRequest = new Request(
"DELETE",
"_data_stream/" + randomFrom("_all", "*", dataStreamName) + "/_lifecycle"
);
Request putLifecycleRequest = new Request("PUT", "_data_stream/" + randomFrom("_all", "*", dataStreamName) + "/_lifecycle");
putLifecycleRequest.setJsonEntity("{}");
makeRequest(client(), explainLifecycleRequest, true);
makeRequest(client(), getLifecycleRequest, true);
makeRequest(client(), deleteLifecycleRequest, true);
makeRequest(client(), putLifecycleRequest, true);
try (
RestClient nonDlmManagerClient = buildClient(restUnprivilegedClientSettings(), getClusterHosts().toArray(new HttpHost[0]))
) {
makeRequest(nonDlmManagerClient, explainLifecycleRequest, true);
makeRequest(nonDlmManagerClient, getLifecycleRequest, true);
makeRequest(nonDlmManagerClient, deleteLifecycleRequest, false);
makeRequest(nonDlmManagerClient, putLifecycleRequest, false);
}
}
{
// Now test that the user who has the manage_dlm privilege on dlm-* data streams cannot manage other data streams:
String otherDataStreamName = "other-dlm-test";
createDataStreamAsAdmin(otherDataStreamName);
Response getOtherDataStreamResponse = adminClient().performRequest(new Request("GET", "/_data_stream/" + otherDataStreamName));
final List<Map<String, Object>> otherNodes = ObjectPath.createFromResponse(getOtherDataStreamResponse).evaluate("data_streams");
String otherIndex = (String) ((List<Map<String, Object>>) otherNodes.get(0).get("indices")).get(0).get("index_name");
Request putOtherLifecycleRequest = new Request("PUT", "_data_stream/" + otherDataStreamName + "/_lifecycle");
putOtherLifecycleRequest.setJsonEntity("{}");
makeRequest(client(), new Request("GET", "/" + otherIndex + "/_lifecycle/explain"), false);
makeRequest(client(), new Request("GET", "_data_stream/" + otherDataStreamName + "/_lifecycle"), false);
makeRequest(client(), new Request("DELETE", "_data_stream/" + otherDataStreamName + "/_lifecycle"), false);
makeRequest(client(), putOtherLifecycleRequest, false);
}
}
/*
* This makes the given request with the given client. It asserts a 200 response if expectSuccess is true, and asserts an exception
* with a 403 response if expectStatus is false.
*/
private void makeRequest(RestClient client, Request request, boolean expectSuccess) throws IOException {
if (expectSuccess) {
Response response = client.performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus()));
} else {
ResponseException exception = expectThrows(ResponseException.class, () -> client.performRequest(request));
assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.FORBIDDEN.getStatus()));
}
}
private void createDataStreamAsAdmin(String name) throws IOException {
String mappingsTemplateName = name + "_mappings";
Request mappingsRequest = new Request("PUT", "/_component_template/" + mappingsTemplateName);
mappingsRequest.setJsonEntity("""
{
"template": {
"mappings": {
"properties": {
"@timestamp": {
"type": "date",
"format": "date_optional_time||epoch_millis"
},
"message": {
"type": "wildcard"
}
}
}
}
}""");
assertOK(adminClient().performRequest(mappingsRequest));
String settingsTemplateName = name + "_settings";
Request settingsRequest = new Request("PUT", "/_component_template/" + settingsTemplateName);
settingsRequest.setJsonEntity("""
{
"template": {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
}
}
}""");
assertOK(adminClient().performRequest(settingsRequest));
Request indexTemplateRequest = new Request("PUT", "/_index_template/" + name + "_template");
indexTemplateRequest.setJsonEntity(Strings.format("""
{
"index_patterns": ["%s*"],
"data_stream": { },
"composed_of": [ "%s", "%s" ]
}""", name, mappingsTemplateName, settingsTemplateName));
assertOK(adminClient().performRequest(indexTemplateRequest));
Request request = new Request("PUT", "/_data_stream/" + name);
assertOK(adminClient().performRequest(request));
}
}

View file

@ -605,8 +605,8 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
scheduler.get().add(scheduledJob); scheduler.get().add(scheduledJob);
} }
// package visibility for testing // public visibility for testing
DataLifecycleErrorStore getErrorStore() { public DataLifecycleErrorStore getErrorStore() {
return errorStore; return errorStore;
} }

View file

@ -0,0 +1,76 @@
---
setup:
- skip:
version: " - 8.8.99"
reason: "/_info/ingest only available from v8.9"
---
teardown:
- do:
ingest.delete_pipeline:
id: "ingest_info_pipeline"
ignore: 404
- do:
indices.delete:
index: "ingest_info_index"
ignore_unavailable: true
---
"Cluster ingest information":
- do:
ingest.put_pipeline:
id: "ingest_info_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field": "pipeline",
"value": "pipeline"
}
}
]
}
- do:
bulk:
refresh: true
index: ingest_info_index
body:
- '{"create": {"pipeline" : "ingest_info_pipeline"}}'
- '{"some-field": "some-value"}'
- '{"create": {"pipeline" : "ingest_info_pipeline"}}'
- '{"some-field": "another-value"}'
- do:
cluster.info:
target: [ ingest ]
- is_true: cluster_name
# Summary ingest section
- is_true: ingest.total
- gte: { ingest.total.count: 2 }
- gte: { ingest.total.time_in_millis: 0 }
# next 2 conditions _should_ be 0, but because these yaml tests are sharing the same test cluster, other tests could
# pollute the information.
- gte: { ingest.total.current: 0 }
- gte: { ingest.total.failed: 0 }
# Pipelines section
- is_true: ingest.pipelines.ingest_info_pipeline
- gte: { ingest.pipelines.ingest_info_pipeline.count: 2 }
- gte: { ingest.pipelines.ingest_info_pipeline.time_in_millis: 0 }
- match: { ingest.pipelines.ingest_info_pipeline.current: 0 }
- match: { ingest.pipelines.ingest_info_pipeline.failed: 0 }
# Processors section
- is_true: ingest.pipelines.ingest_info_pipeline.processors.0.set
- match: { ingest.pipelines.ingest_info_pipeline.processors.0.set.type: "set" }
- is_true: ingest.pipelines.ingest_info_pipeline.processors.0.set.stats
- gte: { ingest.pipelines.ingest_info_pipeline.processors.0.set.stats.count: 2 }
- gte: { ingest.pipelines.ingest_info_pipeline.processors.0.set.stats.time_in_millis: 0 }
- match: { ingest.pipelines.ingest_info_pipeline.processors.0.set.stats.current: 0 }
- match: { ingest.pipelines.ingest_info_pipeline.processors.0.set.stats.failed: 0 }

View file

@ -132,7 +132,7 @@ public class GoogleCloudStorageClientSettingsTests extends ESTestCase {
public void testLoadsProxySettings() throws Exception { public void testLoadsProxySettings() throws Exception {
final String clientName = randomAlphaOfLength(5); final String clientName = randomAlphaOfLength(5);
final ServiceAccountCredentials credential = randomCredential(clientName).v1(); final ServiceAccountCredentials credential = randomCredential(clientName).v1();
var proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(InetAddress.getLoopbackAddress(), randomIntBetween(1024, 65536))); var proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(InetAddress.getLoopbackAddress(), randomIntBetween(49152, 65535)));
final GoogleCloudStorageClientSettings googleCloudStorageClientSettings = new GoogleCloudStorageClientSettings( final GoogleCloudStorageClientSettings googleCloudStorageClientSettings = new GoogleCloudStorageClientSettings(
credential, credential,
ENDPOINT_SETTING.getDefault(Settings.EMPTY), ENDPOINT_SETTING.getDefault(Settings.EMPTY),

View file

@ -0,0 +1,38 @@
---
setup:
- skip:
version: " - 8.8.99"
reason: "/_info/_all only available from v8.9"
---
"Cluster Info _all":
- do:
cluster.info:
target: [ _all ]
# this tests only checks that the target exists, to check the structure of them, we have specific tests
- is_true: cluster_name
- is_true: http
- is_true: ingest
---
"Cluster Info fails when mixing _all with other targets":
- do:
catch: bad_request
cluster.info:
target: [ _all, ingest ]
- match: { status: 400 }
- match: { error.type: illegal_argument_exception }
- match: { error.reason: "request [/_info/_all,ingest] contains _all and individual target [_all,ingest]" }
---
"Cluster Info fails with an invalid target":
- do:
catch: bad_request
cluster.info:
target: [ ingest, invalid_target ]
- match: { status: 400 }
- match: { error.type: illegal_argument_exception }
- match: { error.reason: "request [/_info/ingest,invalid_target] contains unrecognized target: [invalid_target]" }

View file

@ -1,5 +1,5 @@
--- ---
"HTTP Stats": "Cluster HTTP Info":
- skip: - skip:
version: " - 8.8.99" version: " - 8.8.99"
reason: "/_info/http only available from v8.9" reason: "/_info/http only available from v8.9"

View file

@ -26,7 +26,6 @@ import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.RetentionLeaseStats;
import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
@ -110,11 +109,6 @@ public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<
assert task instanceof CancellableTask; assert task instanceof CancellableTask;
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id()); IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
// if we don't have the routing entry yet, we need it stats wise, we treat it as if the shard is not ready yet
if (indexShard.routingEntry() == null) {
throw new ShardNotFoundException(indexShard.shardId());
}
CommonStats commonStats = CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, request.flags()); CommonStats commonStats = CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, request.flags());
CommitStats commitStats; CommitStats commitStats;
SeqNoStats seqNoStats; SeqNoStats seqNoStats;

View file

@ -0,0 +1,159 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.action.support;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import java.util.Iterator;
/**
* Allows an action to fan-out to several sub-actions and accumulate their results, but which reacts to a cancellation by releasing all
* references to itself, and hence the partially-accumulated results, allowing them to be garbage-collected. This is a useful protection for
* cases where the results may consume a lot of heap (e.g. stats) but the final response may be delayed by a single slow node for long
* enough that the client gives up.
* <p>
* Note that it's easy to accidentally capture another reference to this class when implementing it, and this will prevent the early release
* of any accumulated results. Beware of lambdas and method references. You must test your implementation carefully (using e.g.
* {@code ReachabilityChecker}) to make sure it doesn't do this.
*/
public abstract class CancellableFanOut<Item, ItemResponse, FinalResponse> {
private static final Logger logger = LogManager.getLogger(CancellableFanOut.class);
/**
* Run the fan-out action.
*
* @param task The task to watch for cancellations. If {@code null} or not a {@link CancellableTask} then the fan-out still
* works, just without any cancellation handling.
* @param itemsIterator The items over which to fan out. Iterated on the calling thread.
* @param listener A listener for the final response, which is completed after all the fanned-out actions have completed. It is not
* completed promptly on cancellation. Completed on the thread that handles the final per-item response (or
* the calling thread if there are no items).
*/
public final void run(@Nullable Task task, Iterator<Item> itemsIterator, ActionListener<FinalResponse> listener) {
final var cancellableTask = task instanceof CancellableTask ct ? ct : null;
// Captures the final result as soon as it's known (either on completion or on cancellation) without necessarily completing the
// outer listener, because we do not want to complete the outer listener until all sub-tasks are complete
final var resultListener = new SubscribableListener<FinalResponse>();
// Completes resultListener (either on completion or on cancellation). Captures a reference to 'this', but within a 'RunOnce' so it
// is released promptly when executed.
final var resultListenerCompleter = new RunOnce(() -> {
if (cancellableTask != null && cancellableTask.notifyIfCancelled(resultListener)) {
return;
}
// It's important that we complete resultListener before returning, because otherwise there's a risk that a cancellation arrives
// later which might unexpectedly complete the final listener on a transport thread.
ActionListener.completeWith(resultListener, this::onCompletion);
});
// Collects the per-item listeners up so they can all be completed exceptionally on cancellation. Never completed successfully.
final var itemCancellationListener = new SubscribableListener<ItemResponse>();
if (cancellableTask != null) {
cancellableTask.addListener(() -> {
assert cancellableTask.isCancelled();
resultListenerCompleter.run();
cancellableTask.notifyIfCancelled(itemCancellationListener);
});
}
try (var refs = new RefCountingRunnable(() -> {
// When all sub-tasks are complete, pass the result from resultListener to the outer listener.
resultListenerCompleter.run();
// resultListener is always complete by this point, so the outer listener is completed on this thread
resultListener.addListener(listener);
})) {
while (itemsIterator.hasNext()) {
final var item = itemsIterator.next();
// Captures a reference to 'this', but within a 'notifyOnce' so it is released promptly when completed.
final ActionListener<ItemResponse> itemResponseListener = ActionListener.notifyOnce(new ActionListener<>() {
@Override
public void onResponse(ItemResponse itemResponse) {
onItemResponse(item, itemResponse);
}
@Override
public void onFailure(Exception e) {
if (cancellableTask != null && cancellableTask.isCancelled()) {
// Completed on cancellation so it is released promptly, but there's no need to handle the exception.
return;
}
onItemFailure(item, e);
}
@Override
public String toString() {
return "[" + CancellableFanOut.this + "][" + item + "]";
}
});
if (cancellableTask != null) {
if (cancellableTask.isCancelled()) {
return;
}
// Register this item's listener for prompt cancellation notification.
itemCancellationListener.addListener(itemResponseListener);
}
// Process the item, capturing a ref to make sure the outer listener is completed after this item is processed.
sendItemRequest(item, ActionListener.releaseAfter(itemResponseListener, refs.acquire()));
}
} catch (Exception e) {
// NB the listener may have been completed already (by exiting this try block) so this exception may not be sent to the caller,
// but we cannot do anything else with it; an exception here is a bug anyway.
logger.error("unexpected failure in [" + this + "]", e);
assert false : e;
throw e;
}
}
/**
* Run the action (typically by sending a transport request) for an individual item. Called in sequence on the thread that invoked
* {@link #run}. May not be called for every item if the task is cancelled during the iteration.
* <p>
* Note that it's easy to accidentally capture another reference to this class when implementing this method, and that will prevent the
* early release of any accumulated results. Beware of lambdas, and test carefully.
*/
protected abstract void sendItemRequest(Item item, ActionListener<ItemResponse> listener);
/**
* Handle a successful response for an item. May be called concurrently for multiple items. Not called if the task is cancelled.
* <p>
* Note that it's easy to accidentally capture another reference to this class when implementing this method, and that will prevent the
* early release of any accumulated results. Beware of lambdas, and test carefully.
*/
protected abstract void onItemResponse(Item item, ItemResponse itemResponse);
/**
* Handle a failure for an item. May be called concurrently for multiple items. Not called if the task is cancelled.
* <p>
* Note that it's easy to accidentally capture another reference to this class when implementing this method, and that will prevent the
* early release of any accumulated results. Beware of lambdas, and test carefully.
*/
protected abstract void onItemFailure(Item item, Exception e);
/**
* Called when responses for all items have been processed, on the thread that processed the last per-item response. Not called if the
* task is cancelled.
* <p>
* Note that it's easy to accidentally capture another reference to this class when implementing this method, and that will prevent the
* early release of any accumulated results. Beware of lambdas, and test carefully.
*/
protected abstract FinalResponse onCompletion() throws Exception;
}

View file

@ -18,11 +18,11 @@ import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.CancellableFanOut;
import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.broadcast.BaseBroadcastResponse; import org.elasticsearch.action.support.broadcast.BaseBroadcastResponse;
import org.elasticsearch.action.support.broadcast.BroadcastRequest; import org.elasticsearch.action.support.broadcast.BroadcastRequest;
@ -37,9 +37,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannel;
@ -280,48 +277,36 @@ public abstract class TransportBroadcastByNodeAction<
ResponseFactory<Response, ShardOperationResult> responseFactory, ResponseFactory<Response, ShardOperationResult> responseFactory,
ActionListener<Response> listener ActionListener<Response> listener
) { ) {
final var mutex = new Object(); new CancellableFanOut<Map.Entry<String, List<ShardRouting>>, NodeResponse, Response>() {
final var shardResponses = new ArrayList<ShardOperationResult>(availableShardCount); final ArrayList<ShardOperationResult> shardResponses = new ArrayList<>(availableShardCount);
final var exceptions = new ArrayList<DefaultShardOperationFailedException>(0); final ArrayList<DefaultShardOperationFailedException> exceptions = new ArrayList<>(0);
final var totalShards = new AtomicInteger(unavailableShardCount); final AtomicInteger totalShards = new AtomicInteger(unavailableShardCount);
final var successfulShards = new AtomicInteger(0); final AtomicInteger successfulShards = new AtomicInteger(0);
final TransportRequestOptions transportRequestOptions = TransportRequestOptions.timeout(request.timeout());
final var resultListener = new ListenableFuture<Response>(); @Override
final var resultListenerCompleter = new RunOnce(() -> { protected void sendItemRequest(Map.Entry<String, List<ShardRouting>> entry, ActionListener<NodeResponse> listener) {
if (task instanceof CancellableTask cancellableTask) {
if (cancellableTask.notifyIfCancelled(resultListener)) {
return;
}
}
// ref releases all happen-before here so no need to be synchronized
resultListener.onResponse(
responseFactory.newResponse(totalShards.get(), successfulShards.get(), exceptions.size(), shardResponses, exceptions)
);
});
final var nodeFailureListeners = new ListenableFuture<NodeResponse>();
if (task instanceof CancellableTask cancellableTask) {
cancellableTask.addListener(() -> {
assert cancellableTask.isCancelled();
resultListenerCompleter.run();
cancellableTask.notifyIfCancelled(nodeFailureListeners);
});
}
final var transportRequestOptions = TransportRequestOptions.timeout(request.timeout());
try (var refs = new RefCountingRunnable(() -> {
resultListener.addListener(listener);
resultListenerCompleter.run();
})) {
for (final var entry : shardsByNodeId.entrySet()) {
final var node = nodes.get(entry.getKey()); final var node = nodes.get(entry.getKey());
final var shards = entry.getValue(); final var shards = entry.getValue();
final ActionListener<NodeResponse> nodeResponseListener = ActionListener.notifyOnce(new ActionListener<NodeResponse>() { final var nodeRequest = new NodeRequest(request, shards, node.getId());
if (task != null) {
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
}
transportService.sendRequest(
node,
transportNodeBroadcastAction,
nodeRequest,
transportRequestOptions,
new ActionListenerResponseHandler<>(listener, nodeResponseReader)
);
}
@Override @Override
public void onResponse(NodeResponse nodeResponse) { protected void onItemResponse(Map.Entry<String, List<ShardRouting>> entry, NodeResponse nodeResponse) {
synchronized (mutex) { final var node = nodes.get(entry.getKey());
synchronized (this) {
shardResponses.addAll(nodeResponse.getResults()); shardResponses.addAll(nodeResponse.getResults());
} }
totalShards.addAndGet(nodeResponse.getTotalShards()); totalShards.addAndGet(nodeResponse.getTotalShards());
@ -331,7 +316,7 @@ public abstract class TransportBroadcastByNodeAction<
if (TransportActions.isShardNotAvailableException(exception)) { if (TransportActions.isShardNotAvailableException(exception)) {
assert node.getVersion().before(Version.V_8_7_0) : node; // we stopped sending these ignored exceptions assert node.getVersion().before(Version.V_8_7_0) : node; // we stopped sending these ignored exceptions
} else { } else {
synchronized (mutex) { synchronized (this) {
exceptions.add( exceptions.add(
new DefaultShardOperationFailedException( new DefaultShardOperationFailedException(
exception.getShardId().getIndexName(), exception.getShardId().getIndexName(),
@ -345,19 +330,15 @@ public abstract class TransportBroadcastByNodeAction<
} }
@Override @Override
public void onFailure(Exception e) { protected void onItemFailure(Map.Entry<String, List<ShardRouting>> entry, Exception e) {
if (task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled()) { final var node = nodes.get(entry.getKey());
return; final var shards = entry.getValue();
}
logger.debug(() -> format("failed to execute [%s] on node [%s]", actionName, node), e); logger.debug(() -> format("failed to execute [%s] on node [%s]", actionName, node), e);
final var failedNodeException = new FailedNodeException(node.getId(), "Failed node [" + node.getId() + "]", e); final var failedNodeException = new FailedNodeException(node.getId(), "Failed node [" + node.getId() + "]", e);
synchronized (mutex) { synchronized (this) {
for (ShardRouting shard : shards) { for (ShardRouting shard : shards) {
exceptions.add( exceptions.add(new DefaultShardOperationFailedException(shard.getIndexName(), shard.getId(), failedNodeException));
new DefaultShardOperationFailedException(shard.getIndexName(), shard.getId(), failedNodeException)
);
} }
} }
@ -365,34 +346,27 @@ public abstract class TransportBroadcastByNodeAction<
} }
@Override @Override
public String toString() { protected Response onCompletion() {
return "[" + actionName + "][" + node.descriptionWithoutAttributes() + "]"; // ref releases all happen-before here so no need to be synchronized
} return responseFactory.newResponse(
}); totalShards.get(),
successfulShards.get(),
if (task instanceof CancellableTask) { exceptions.size(),
nodeFailureListeners.addListener(nodeResponseListener); shardResponses,
} exceptions
final var nodeRequest = new NodeRequest(request, shards, node.getId());
if (task != null) {
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
}
transportService.sendRequest(
node,
transportNodeBroadcastAction,
nodeRequest,
transportRequestOptions,
new ActionListenerResponseHandler<>(
ActionListener.releaseAfter(nodeResponseListener, refs.acquire()),
NodeResponse::new
)
); );
} }
@Override
public String toString() {
return actionName;
} }
}.run(task, shardsByNodeId.entrySet().iterator(), listener);
} }
// not an inline method reference to avoid capturing CancellableFanOut.this.
private final Writeable.Reader<NodeResponse> nodeResponseReader = NodeResponse::new;
class BroadcastByNodeTransportRequestHandler implements TransportRequestHandler<NodeRequest> { class BroadcastByNodeTransportRequestHandler implements TransportRequestHandler<NodeRequest> {
@Override @Override
public void messageReceived(final NodeRequest request, TransportChannel channel, Task task) throws Exception { public void messageReceived(final NodeRequest request, TransportChannel channel, Task task) throws Exception {
@ -415,52 +389,26 @@ public abstract class TransportBroadcastByNodeAction<
) { ) {
logger.trace("[{}] executing operation on [{}] shards", actionName, shards.size()); logger.trace("[{}] executing operation on [{}] shards", actionName, shards.size());
final var results = new ArrayList<ShardOperationResult>(shards.size()); new CancellableFanOut<ShardRouting, ShardOperationResult, NodeResponse>() {
final var exceptions = new ArrayList<BroadcastShardOperationFailedException>(0);
final var resultListener = new ListenableFuture<NodeResponse>(); final ArrayList<ShardOperationResult> results = new ArrayList<>(shards.size());
final var resultListenerCompleter = new RunOnce(() -> { final ArrayList<BroadcastShardOperationFailedException> exceptions = new ArrayList<>(0);
if (task instanceof CancellableTask cancellableTask) {
if (cancellableTask.notifyIfCancelled(resultListener)) {
return;
}
}
// ref releases all happen-before here so no need to be synchronized
resultListener.onResponse(new NodeResponse(nodeId, shards.size(), results, exceptions));
});
final var shardFailureListeners = new ListenableFuture<ShardOperationResult>();
if (task instanceof CancellableTask cancellableTask) {
cancellableTask.addListener(() -> {
assert cancellableTask.isCancelled();
resultListenerCompleter.run();
cancellableTask.notifyIfCancelled(shardFailureListeners);
});
}
try (var refs = new RefCountingRunnable(() -> {
resultListener.addListener(listener);
resultListenerCompleter.run();
})) {
for (final var shardRouting : shards) {
if (task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled()) {
return;
}
final ActionListener<ShardOperationResult> shardListener = ActionListener.notifyOnce(new ActionListener<>() {
@Override @Override
public void onResponse(ShardOperationResult shardOperationResult) { protected void sendItemRequest(ShardRouting shardRouting, ActionListener<ShardOperationResult> listener) {
logger.trace(() -> format("[%s] completed operation for shard [%s]", actionName, shardRouting.shortSummary())); logger.trace(() -> format("[%s] executing operation for shard [%s]", actionName, shardRouting.shortSummary()));
ActionRunnable.wrap(listener, l -> shardOperation(request, shardRouting, task, l)).run();
}
@Override
protected void onItemResponse(ShardRouting shardRouting, ShardOperationResult shardOperationResult) {
synchronized (results) { synchronized (results) {
results.add(shardOperationResult); results.add(shardOperationResult);
} }
} }
@Override @Override
public void onFailure(Exception e) { protected void onItemFailure(ShardRouting shardRouting, Exception e) {
if (task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled()) {
return;
}
logger.log( logger.log(
TransportActions.isShardNotAvailableException(e) ? Level.TRACE : Level.DEBUG, TransportActions.isShardNotAvailableException(e) ? Level.TRACE : Level.DEBUG,
() -> format("[%s] failed to execute operation for shard [%s]", actionName, shardRouting.shortSummary()), () -> format("[%s] failed to execute operation for shard [%s]", actionName, shardRouting.shortSummary()),
@ -469,33 +417,23 @@ public abstract class TransportBroadcastByNodeAction<
if (TransportActions.isShardNotAvailableException(e) == false) { if (TransportActions.isShardNotAvailableException(e) == false) {
synchronized (exceptions) { synchronized (exceptions) {
exceptions.add( exceptions.add(
new BroadcastShardOperationFailedException( new BroadcastShardOperationFailedException(shardRouting.shardId(), "operation " + actionName + " failed", e)
shardRouting.shardId(),
"operation " + actionName + " failed",
e
)
); );
} }
} }
} }
@Override
protected NodeResponse onCompletion() {
// ref releases all happen-before here so no need to be synchronized
return new NodeResponse(nodeId, shards.size(), results, exceptions);
}
@Override @Override
public String toString() { public String toString() {
return "[" + actionName + "][" + shardRouting + "]"; return actionName;
}
});
if (task instanceof CancellableTask) {
shardFailureListeners.addListener(shardListener);
}
logger.trace(() -> format("[%s] executing operation for shard [%s]", actionName, shardRouting.shortSummary()));
ActionRunnable.wrap(
ActionListener.releaseAfter(shardListener, refs.acquire()),
l -> shardOperation(request, shardRouting, task, l)
).run();
}
} }
}.run(task, shards.iterator(), listener);
} }
class NodeRequest extends TransportRequest implements IndicesRequest { class NodeRequest extends TransportRequest implements IndicesRequest {

View file

@ -15,16 +15,15 @@ import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.CancellableFanOut;
import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannel;
@ -131,86 +130,66 @@ public abstract class TransportNodesAction<
assert request.concreteNodes() != null; assert request.concreteNodes() != null;
} }
final var responses = new ArrayList<NodeResponse>(request.concreteNodes().length); new CancellableFanOut<DiscoveryNode, NodeResponse, CheckedConsumer<ActionListener<NodesResponse>, Exception>>() {
final var exceptions = new ArrayList<FailedNodeException>(0);
final var resultListener = new ListenableFuture<NodesResponse>(); final ArrayList<NodeResponse> responses = new ArrayList<>(request.concreteNodes().length);
final var resultListenerCompleter = new RunOnce(() -> { final ArrayList<FailedNodeException> exceptions = new ArrayList<>(0);
if (task instanceof CancellableTask cancellableTask) {
if (cancellableTask.notifyIfCancelled(resultListener)) {
return;
}
}
// ref releases all happen-before here so no need to be synchronized
threadPool.executor(finalExecutor)
.execute(ActionRunnable.wrap(resultListener, l -> newResponseAsync(task, request, responses, exceptions, l)));
});
final var nodeCancellationListener = new ListenableFuture<NodeResponse>(); // collects node listeners & completes them if cancelled final TransportRequestOptions transportRequestOptions = TransportRequestOptions.timeout(request.timeout());
if (task instanceof CancellableTask cancellableTask) {
cancellableTask.addListener(() -> {
assert cancellableTask.isCancelled();
resultListenerCompleter.run();
cancellableTask.notifyIfCancelled(nodeCancellationListener);
});
}
final var transportRequestOptions = TransportRequestOptions.timeout(request.timeout());
try (var refs = new RefCountingRunnable(() -> {
resultListener.addListener(listener);
resultListenerCompleter.run();
})) {
for (final var node : request.concreteNodes()) {
final ActionListener<NodeResponse> nodeResponseListener = ActionListener.notifyOnce(new ActionListener<>() {
@Override
public void onResponse(NodeResponse nodeResponse) {
synchronized (responses) {
responses.add(nodeResponse);
}
}
@Override @Override
public void onFailure(Exception e) { protected void sendItemRequest(DiscoveryNode discoveryNode, ActionListener<NodeResponse> listener) {
if (task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled()) {
return;
}
logger.debug(() -> format("failed to execute [%s] on node [%s]", actionName, node), e);
synchronized (exceptions) {
exceptions.add(new FailedNodeException(node.getId(), "Failed node [" + node.getId() + "]", e));
}
}
@Override
public String toString() {
return "[" + actionName + "][" + node.descriptionWithoutAttributes() + "]";
}
});
if (task instanceof CancellableTask) {
nodeCancellationListener.addListener(nodeResponseListener);
}
final var nodeRequest = newNodeRequest(request); final var nodeRequest = newNodeRequest(request);
if (task != null) { if (task != null) {
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId()); nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
} }
transportService.sendRequest( transportService.sendRequest(
node, discoveryNode,
transportNodeAction, transportNodeAction,
nodeRequest, nodeRequest,
transportRequestOptions, transportRequestOptions,
new ActionListenerResponseHandler<>( new ActionListenerResponseHandler<>(listener, nodeResponseReader(discoveryNode))
ActionListener.releaseAfter(nodeResponseListener, refs.acquire()),
in -> newNodeResponse(in, node)
)
); );
} }
@Override
protected void onItemResponse(DiscoveryNode discoveryNode, NodeResponse nodeResponse) {
synchronized (responses) {
responses.add(nodeResponse);
} }
} }
@Override
protected void onItemFailure(DiscoveryNode discoveryNode, Exception e) {
logger.debug(() -> format("failed to execute [%s] on node [%s]", actionName, discoveryNode), e);
synchronized (exceptions) {
exceptions.add(new FailedNodeException(discoveryNode.getId(), "Failed node [" + discoveryNode.getId() + "]", e));
}
}
@Override
protected CheckedConsumer<ActionListener<NodesResponse>, Exception> onCompletion() {
// ref releases all happen-before here so no need to be synchronized
return l -> newResponseAsync(task, request, responses, exceptions, l);
}
@Override
public String toString() {
return actionName;
}
}.run(
task,
Iterators.forArray(request.concreteNodes()),
listener.delegateFailure((l, r) -> threadPool.executor(finalExecutor).execute(ActionRunnable.wrap(l, r)))
);
}
private Writeable.Reader<NodeResponse> nodeResponseReader(DiscoveryNode discoveryNode) {
// not an inline lambda to avoid capturing CancellableFanOut.this.
return in -> TransportNodesAction.this.newNodeResponse(in, discoveryNode);
}
/** /**
* Create a new {@link NodesResponse} (multi-node response). * Create a new {@link NodesResponse} (multi-node response).
* *

View file

@ -22,9 +22,13 @@ import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple; import org.elasticsearch.core.Tuple;
import org.elasticsearch.gateway.PriorityComparator; import org.elasticsearch.gateway.PriorityComparator;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
@ -44,23 +48,58 @@ public class DesiredBalanceReconciler {
private static final Logger logger = LogManager.getLogger(DesiredBalanceReconciler.class); private static final Logger logger = LogManager.getLogger(DesiredBalanceReconciler.class);
private final DesiredBalance desiredBalance; public static final Setting<TimeValue> UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING = Setting.timeSetting(
private final RoutingAllocation allocation; // name chosen to align with code in BalancedShardsAllocator but TODO rename "cluster.routing.allocation.desired_balance.undesired_allocations.log_interval",
private final RoutingNodes routingNodes; TimeValue.timeValueHours(1),
private final NodeAllocationOrdering allocationOrdering; TimeValue.ZERO,
private final NodeAllocationOrdering moveOrdering; Setting.Property.Dynamic,
Setting.Property.NodeScope
);
DesiredBalanceReconciler( public static final Setting<Double> UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING = Setting.doubleSetting(
DesiredBalance desiredBalance, "cluster.routing.allocation.desired_balance.undesired_allocations.threshold",
RoutingAllocation routingAllocation, 0.1,
NodeAllocationOrdering allocationOrdering, 0,
NodeAllocationOrdering moveOrdering Setting.Property.Dynamic,
) { Setting.Property.NodeScope
);
private final FrequencyCappedAction undesiredAllocationLogInterval;
private double undesiredAllocationsLogThreshold;
private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering();
private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering();
public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool threadPool) {
this.undesiredAllocationLogInterval = new FrequencyCappedAction(threadPool);
clusterSettings.initializeAndWatch(UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING, this.undesiredAllocationLogInterval::setMinInterval);
clusterSettings.initializeAndWatch(
UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING,
value -> this.undesiredAllocationsLogThreshold = value
);
}
public void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) {
var nodeIds = allocation.routingNodes().getAllNodeIds();
allocationOrdering.retainNodes(nodeIds);
moveOrdering.retainNodes(nodeIds);
new Reconciliation(desiredBalance, allocation).run();
}
public void clear() {
allocationOrdering.clear();
moveOrdering.clear();
}
private class Reconciliation {
private final DesiredBalance desiredBalance;
private final RoutingAllocation allocation;
private final RoutingNodes routingNodes;
Reconciliation(DesiredBalance desiredBalance, RoutingAllocation allocation) {
this.desiredBalance = desiredBalance; this.desiredBalance = desiredBalance;
this.allocation = routingAllocation; this.allocation = allocation;
this.routingNodes = routingAllocation.routingNodes(); this.routingNodes = allocation.routingNodes();
this.allocationOrdering = allocationOrdering;
this.moveOrdering = moveOrdering;
} }
void run() { void run() {
@ -356,11 +395,17 @@ public class DesiredBalanceReconciler {
return; return;
} }
// Iterate over all started shards and try to move any which are on undesired nodes. In the presence of throttling shard movements, long allAllocations = 0;
// the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the shards. long undesiredAllocations = 0;
// Iterate over all started shards and try to move any which are on undesired nodes. In the presence of throttling shard
// movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the
// shards.
for (final var iterator = OrderedShardsIterator.create(routingNodes, moveOrdering); iterator.hasNext();) { for (final var iterator = OrderedShardsIterator.create(routingNodes, moveOrdering); iterator.hasNext();) {
final var shardRouting = iterator.next(); final var shardRouting = iterator.next();
allAllocations++;
if (shardRouting.started() == false) { if (shardRouting.started() == false) {
// can only rebalance started shards // can only rebalance started shards
continue; continue;
@ -377,6 +422,8 @@ public class DesiredBalanceReconciler {
continue; continue;
} }
undesiredAllocations++;
if (allocation.deciders().canRebalance(shardRouting, allocation).type() != Decision.Type.YES) { if (allocation.deciders().canRebalance(shardRouting, allocation).type() != Decision.Type.YES) {
// rebalancing disabled for this shard // rebalancing disabled for this shard
continue; continue;
@ -406,6 +453,22 @@ public class DesiredBalanceReconciler {
moveOrdering.recordAllocation(shardRouting.currentNodeId()); moveOrdering.recordAllocation(shardRouting.currentNodeId());
} }
} }
maybeLogUndesiredAllocationsWarning(allAllocations, undesiredAllocations);
}
private void maybeLogUndesiredAllocationsWarning(long allAllocations, long undesiredAllocations) {
if (allAllocations > 0 && undesiredAllocations > undesiredAllocationsLogThreshold * allAllocations) {
undesiredAllocationLogInterval.maybeExecute(
() -> logger.warn(
"[{}%] of assigned shards ({}/{}) are not on their desired nodes, which exceeds the warn threshold of [{}%]",
100.0 * undesiredAllocations / allAllocations,
undesiredAllocations,
allAllocations,
100.0 * undesiredAllocationsLogThreshold
)
);
}
} }
private DiscoveryNode findRelocationTarget(final ShardRouting shardRouting, Set<String> desiredNodeIds) { private DiscoveryNode findRelocationTarget(final ShardRouting shardRouting, Set<String> desiredNodeIds) {
@ -413,6 +476,7 @@ public class DesiredBalanceReconciler {
if (moveDecision != null) { if (moveDecision != null) {
return moveDecision; return moveDecision;
} }
final var shardsOnReplacedNode = allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId(), REPLACE); final var shardsOnReplacedNode = allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId(), REPLACE);
if (shardsOnReplacedNode) { if (shardsOnReplacedNode) {
return findRelocationTarget(shardRouting, desiredNodeIds, this::decideCanForceAllocateForVacate); return findRelocationTarget(shardRouting, desiredNodeIds, this::decideCanForceAllocateForVacate);
@ -454,3 +518,4 @@ public class DesiredBalanceReconciler {
return allocation.deciders().canForceAllocateDuringReplace(shardRouting, target, allocation); return allocation.deciders().canForceAllocateDuringReplace(shardRouting, target, allocation);
} }
} }
}

View file

@ -50,13 +50,12 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator {
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final DesiredBalanceReconcilerAction reconciler; private final DesiredBalanceReconcilerAction reconciler;
private final DesiredBalanceComputer desiredBalanceComputer; private final DesiredBalanceComputer desiredBalanceComputer;
private final DesiredBalanceReconciler desiredBalanceReconciler;
private final ContinuousComputation<DesiredBalanceInput> desiredBalanceComputation; private final ContinuousComputation<DesiredBalanceInput> desiredBalanceComputation;
private final PendingListenersQueue queue; private final PendingListenersQueue queue;
private final AtomicLong indexGenerator = new AtomicLong(-1); private final AtomicLong indexGenerator = new AtomicLong(-1);
private final ConcurrentLinkedQueue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves = new ConcurrentLinkedQueue<>();
private final MasterServiceTaskQueue<ReconcileDesiredBalanceTask> masterServiceTaskQueue; private final MasterServiceTaskQueue<ReconcileDesiredBalanceTask> masterServiceTaskQueue;
private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering();
private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering();
private volatile DesiredBalance currentDesiredBalance = DesiredBalance.INITIAL; private volatile DesiredBalance currentDesiredBalance = DesiredBalance.INITIAL;
private volatile boolean resetCurrentDesiredBalance = false; private volatile boolean resetCurrentDesiredBalance = false;
@ -100,6 +99,7 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator {
this.threadPool = threadPool; this.threadPool = threadPool;
this.reconciler = reconciler; this.reconciler = reconciler;
this.desiredBalanceComputer = desiredBalanceComputer; this.desiredBalanceComputer = desiredBalanceComputer;
this.desiredBalanceReconciler = new DesiredBalanceReconciler(clusterService.getClusterSettings(), threadPool);
this.desiredBalanceComputation = new ContinuousComputation<>(threadPool) { this.desiredBalanceComputation = new ContinuousComputation<>(threadPool) {
@Override @Override
@ -228,13 +228,7 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator {
} else { } else {
logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex()); logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex());
} }
var allNodeIds = allocation.routingNodes().getAllNodeIds(); recordTime(cumulativeReconciliationTime, () -> desiredBalanceReconciler.reconcile(desiredBalance, allocation));
allocationOrdering.retainNodes(allNodeIds);
moveOrdering.retainNodes(allNodeIds);
recordTime(
cumulativeReconciliationTime,
new DesiredBalanceReconciler(desiredBalance, allocation, allocationOrdering, moveOrdering)::run
);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Reconciled desired balance: {}", desiredBalance); logger.trace("Reconciled desired balance: {}", desiredBalance);
} else { } else {
@ -287,7 +281,7 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator {
currentDesiredBalance = DesiredBalance.INITIAL; currentDesiredBalance = DesiredBalance.INITIAL;
queue.completeAllAsNotMaster(); queue.completeAllAsNotMaster();
pendingDesiredBalanceMoves.clear(); pendingDesiredBalanceMoves.clear();
allocationOrdering.clear(); desiredBalanceReconciler.clear();
} }
} }

View file

@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.function.LongSupplier;
/**
* Execute an action at most once per time interval
*/
public class FrequencyCappedAction {
private final LongSupplier currentTimeMillisSupplier;
private TimeValue minInterval;
private long next = -1;
public FrequencyCappedAction(ThreadPool threadPool) {
this(threadPool::relativeTimeInMillis);
}
public FrequencyCappedAction(LongSupplier currentTimeMillisSupplier) {
this.currentTimeMillisSupplier = currentTimeMillisSupplier;
this.minInterval = TimeValue.MAX_VALUE;
}
public void setMinInterval(TimeValue minInterval) {
this.minInterval = minInterval;
}
public void maybeExecute(Runnable runnable) {
var current = currentTimeMillisSupplier.getAsLong();
if (current >= next) {
next = current + minInterval.millis();
runnable.run();
}
}
}

View file

@ -43,6 +43,7 @@ import org.elasticsearch.cluster.routing.allocation.DataTier;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceReconciler;
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider;
@ -213,6 +214,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING,
BalancedShardsAllocator.THRESHOLD_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING,
DesiredBalanceComputer.PROGRESS_LOG_INTERVAL_SETTING, DesiredBalanceComputer.PROGRESS_LOG_INTERVAL_SETTING,
DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING,
DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING, BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_TYPE, BreakerSettings.CIRCUIT_BREAKER_TYPE,

View file

@ -23,6 +23,7 @@ import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -34,6 +35,19 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
Writeable, Writeable,
ChunkedToXContent { ChunkedToXContent {
private static final Comparator<PipelineStat> PIPELINE_STAT_COMPARATOR = (p1, p2) -> {
final Stats p2Stats = p2.stats;
final Stats p1Stats = p1.stats;
final int ingestTimeCompare = Long.compare(p2Stats.ingestTimeInMillis, p1Stats.ingestTimeInMillis);
if (ingestTimeCompare == 0) {
return Long.compare(p2Stats.ingestCount, p1Stats.ingestCount);
} else {
return ingestTimeCompare;
}
};
public static final IngestStats IDENTITY = new IngestStats(Stats.IDENTITY, List.of(), Map.of());
/** /**
* @param totalStats - The total stats for Ingest. This is logically the sum of all pipeline stats, * @param totalStats - The total stats for Ingest. This is logically the sum of all pipeline stats,
* and pipeline stats are logically the sum of the processor stats. * and pipeline stats are logically the sum of the processor stats.
@ -41,16 +55,7 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
* @param processorStats - The per-processor stats for a given pipeline. A map keyed by the pipeline identifier. * @param processorStats - The per-processor stats for a given pipeline. A map keyed by the pipeline identifier.
*/ */
public IngestStats { public IngestStats {
pipelineStats = pipelineStats.stream().sorted((p1, p2) -> { pipelineStats = pipelineStats.stream().sorted(PIPELINE_STAT_COMPARATOR).toList();
final IngestStats.Stats p2Stats = p2.stats;
final IngestStats.Stats p1Stats = p1.stats;
final int ingestTimeCompare = Long.compare(p2Stats.ingestTimeInMillis, p1Stats.ingestTimeInMillis);
if (ingestTimeCompare == 0) {
return Long.compare(p2Stats.ingestCount, p1Stats.ingestCount);
} else {
return ingestTimeCompare;
}
}).toList();
} }
/** /**
@ -153,11 +158,30 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
); );
} }
public static IngestStats merge(IngestStats first, IngestStats second) {
return new IngestStats(
Stats.merge(first.totalStats, second.totalStats),
PipelineStat.merge(first.pipelineStats, second.pipelineStats),
merge(first.processorStats, second.processorStats)
);
}
static Map<String, List<ProcessorStat>> merge(Map<String, List<ProcessorStat>> first, Map<String, List<ProcessorStat>> second) {
var totalsPerPipelineProcessor = new HashMap<String, List<ProcessorStat>>();
first.forEach((pipelineId, stats) -> totalsPerPipelineProcessor.merge(pipelineId, stats, ProcessorStat::merge));
second.forEach((pipelineId, stats) -> totalsPerPipelineProcessor.merge(pipelineId, stats, ProcessorStat::merge));
return totalsPerPipelineProcessor;
}
public record Stats(long ingestCount, long ingestTimeInMillis, long ingestCurrent, long ingestFailedCount) public record Stats(long ingestCount, long ingestTimeInMillis, long ingestCurrent, long ingestFailedCount)
implements implements
Writeable, Writeable,
ToXContentFragment { ToXContentFragment {
public static final Stats IDENTITY = new Stats(0, 0, 0, 0);
/** /**
* Read from a stream. * Read from a stream.
*/ */
@ -181,6 +205,15 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
builder.field("failed", ingestFailedCount); builder.field("failed", ingestFailedCount);
return builder; return builder;
} }
static Stats merge(Stats first, Stats second) {
return new Stats(
first.ingestCount + second.ingestCount,
first.ingestTimeInMillis + second.ingestTimeInMillis,
first.ingestCurrent + second.ingestCurrent,
first.ingestFailedCount + second.ingestFailedCount
);
}
} }
/** /**
@ -216,10 +249,34 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
/** /**
* Container for pipeline stats. * Container for pipeline stats.
*/ */
public record PipelineStat(String pipelineId, Stats stats) {} public record PipelineStat(String pipelineId, Stats stats) {
static List<PipelineStat> merge(List<PipelineStat> first, List<PipelineStat> second) {
var totalsPerPipeline = new HashMap<String, Stats>();
first.forEach(ps -> totalsPerPipeline.merge(ps.pipelineId, ps.stats, Stats::merge));
second.forEach(ps -> totalsPerPipeline.merge(ps.pipelineId, ps.stats, Stats::merge));
return totalsPerPipeline.entrySet()
.stream()
.map(v -> new PipelineStat(v.getKey(), v.getValue()))
.sorted(PIPELINE_STAT_COMPARATOR)
.toList();
}
}
/** /**
* Container for processor stats. * Container for processor stats.
*/ */
public record ProcessorStat(String name, String type, Stats stats) {} public record ProcessorStat(String name, String type, Stats stats) {
// The list of ProcessorStats has *always* stats for each processor (even if processor was executed or not), so it's safe to zip
// both lists using a common index iterator.
private static List<ProcessorStat> merge(List<ProcessorStat> first, List<ProcessorStat> second) {
var merged = new ArrayList<ProcessorStat>();
for (var i = 0; i < first.size(); i++) {
merged.add(new ProcessorStat(first.get(i).name, first.get(i).type, Stats.merge(first.get(i).stats, second.get(i).stats)));
}
return merged;
}
}
} }

View file

@ -8,7 +8,6 @@
package org.elasticsearch.rest.action.info; package org.elasticsearch.rest.action.info;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
@ -18,6 +17,7 @@ import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.http.HttpStats; import org.elasticsearch.http.HttpStats;
import org.elasticsearch.ingest.IngestStats;
import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.ChunkedRestResponseBody; import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
@ -36,13 +36,21 @@ import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.HTTP;
import static org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.INGEST;
import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
public class RestClusterInfoAction extends BaseRestHandler { public class RestClusterInfoAction extends BaseRestHandler {
static final Map<String, Function<NodesStatsResponse, ChunkedToXContent>> RESPONSE_MAPPER = Map.of( static final Map<String, Function<NodesStatsResponse, ChunkedToXContent>> RESPONSE_MAPPER = Map.of(
NodesInfoRequest.Metric.HTTP.metricName(), HTTP.metricName(),
nodesStatsResponse -> nodesStatsResponse.getNodes().stream().map(NodeStats::getHttp).reduce(HttpStats.IDENTITY, HttpStats::merge) nodesStatsResponse -> nodesStatsResponse.getNodes().stream().map(NodeStats::getHttp).reduce(HttpStats.IDENTITY, HttpStats::merge),
//
INGEST.metricName(),
nodesStatsResponse -> nodesStatsResponse.getNodes()
.stream()
.map(NodeStats::getIngestStats)
.reduce(IngestStats.IDENTITY, IngestStats::merge)
); );
static final Set<String> AVAILABLE_TARGETS = RESPONSE_MAPPER.keySet(); static final Set<String> AVAILABLE_TARGETS = RESPONSE_MAPPER.keySet();

View file

@ -0,0 +1,133 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.action.support;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelHelper;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.ReachabilityChecker;
import org.hamcrest.Matchers;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
public class CancellableFanOutTests extends ESTestCase {
public void testFanOutWithoutCancellation() {
final var task = randomFrom(
new Task(1, "test", "test", "", TaskId.EMPTY_TASK_ID, Map.of()),
new CancellableTask(1, "test", "test", "", TaskId.EMPTY_TASK_ID, Map.of()),
null
);
final var future = new PlainActionFuture<String>();
final var itemListeners = new HashMap<String, ActionListener<String>>();
final var finalFailure = randomBoolean();
new CancellableFanOut<String, String, String>() {
int counter;
@Override
protected void sendItemRequest(String item, ActionListener<String> listener) {
itemListeners.put(item, listener);
}
@Override
protected void onItemResponse(String item, String itemResponse) {
assertThat(item, Matchers.oneOf("a", "c"));
assertEquals(item + "-response", itemResponse);
counter += 1;
}
@Override
protected void onItemFailure(String item, Exception e) {
assertEquals("b", item);
counter += 1;
}
@Override
protected String onCompletion() {
assertEquals(3, counter);
if (finalFailure) {
throw new ElasticsearchException("failed");
} else {
return "completed";
}
}
}.run(task, List.of("a", "b", "c").iterator(), future);
itemListeners.remove("a").onResponse("a-response");
assertFalse(future.isDone());
itemListeners.remove("b").onFailure(new ElasticsearchException("b-response"));
assertFalse(future.isDone());
itemListeners.remove("c").onResponse("c-response");
assertTrue(future.isDone());
if (finalFailure) {
assertEquals("failed", expectThrows(ElasticsearchException.class, future::actionGet).getMessage());
} else {
assertEquals("completed", future.actionGet());
}
}
public void testReleaseOnCancellation() {
final var task = new CancellableTask(1, "test", "test", "", TaskId.EMPTY_TASK_ID, Map.of());
final var future = new PlainActionFuture<String>();
final var itemListeners = new HashMap<String, ActionListener<String>>();
final var handledItemResponse = new AtomicBoolean();
final var reachabilityChecker = new ReachabilityChecker();
reachabilityChecker.register(new CancellableFanOut<String, String, String>() {
@Override
protected void sendItemRequest(String item, ActionListener<String> listener) {
itemListeners.put(item, listener);
}
@Override
protected void onItemResponse(String item, String itemResponse) {
assertEquals("a", item);
assertEquals("a-response", itemResponse);
assertTrue(handledItemResponse.compareAndSet(false, true));
}
@Override
protected void onItemFailure(String item, Exception e) {
fail(item);
}
@Override
protected String onCompletion() {
throw new AssertionError("onCompletion");
}
}).run(task, List.of("a", "b", "c").iterator(), future);
itemListeners.remove("a").onResponse("a-response");
assertTrue(handledItemResponse.get());
reachabilityChecker.checkReachable();
TaskCancelHelper.cancel(task, "test");
reachabilityChecker.ensureUnreachable(); // even though we're still holding on to some item listeners.
assertFalse(future.isDone());
itemListeners.remove("b").onResponse("b-response");
assertFalse(future.isDone());
itemListeners.remove("c").onFailure(new ElasticsearchException("c-response"));
assertTrue(itemListeners.isEmpty());
assertTrue(future.isDone());
expectThrows(TaskCancelledException.class, future::actionGet);
}
}

View file

@ -9,8 +9,6 @@
package org.elasticsearch.cluster.routing.allocation.allocator; package org.elasticsearch.cluster.routing.allocation.allocator;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfo.NodeAndShard; import org.elasticsearch.cluster.ClusterInfo.NodeAndShard;
@ -41,7 +39,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.Maps;
@ -70,6 +67,7 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings; import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;
import static org.elasticsearch.test.MockLogAppender.assertThatLogger;
import static org.hamcrest.Matchers.aMapWithSize; import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -942,14 +940,7 @@ public class DesiredBalanceComputerTests extends ESTestCase {
} }
}); });
MockLogAppender mockAppender = new MockLogAppender(); assertThatLogger(() -> {
mockAppender.start();
mockAppender.addExpectation(expectation);
Logger logger = LogManager.getLogger(DesiredBalanceComputer.class);
Loggers.addAppender(logger, mockAppender);
try {
var iteration = new AtomicInteger(0); var iteration = new AtomicInteger(0);
desiredBalanceComputer.compute( desiredBalanceComputer.compute(
DesiredBalance.INITIAL, DesiredBalance.INITIAL,
@ -957,12 +948,7 @@ public class DesiredBalanceComputerTests extends ESTestCase {
queue(), queue(),
input -> iteration.incrementAndGet() < iterations input -> iteration.incrementAndGet() < iterations
); );
}, DesiredBalanceComputer.class, expectation);
mockAppender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(logger, mockAppender);
mockAppender.stop();
}
} }
private static Map.Entry<String, Long> indexSize(ClusterState clusterState, String name, long size, boolean primary) { private static Map.Entry<String, Long> indexSize(ClusterState clusterState, String name, long size, boolean primary) {

View file

@ -8,6 +8,7 @@
package org.elasticsearch.cluster.routing.allocation.allocator; package org.elasticsearch.cluster.routing.allocation.allocator;
import org.apache.logging.log4j.Level;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfo;
@ -62,6 +63,8 @@ import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo; import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
import org.elasticsearch.snapshots.SnapshotsInfoService; import org.elasticsearch.snapshots.SnapshotsInfoService;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import java.util.Comparator; import java.util.Comparator;
@ -87,11 +90,14 @@ import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAll
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING; import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING; import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING;
import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings; import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;
import static org.elasticsearch.test.MockLogAppender.assertThatLogger;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.oneOf; import static org.hamcrest.Matchers.oneOf;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class DesiredBalanceReconcilerTests extends ESAllocationTestCase { public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
@ -1083,8 +1089,7 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
new ConcurrentRebalanceAllocationDecider(clusterSettings), new ConcurrentRebalanceAllocationDecider(clusterSettings),
new ThrottlingAllocationDecider(clusterSettings) }; new ThrottlingAllocationDecider(clusterSettings) };
var allocationOrdering = new NodeAllocationOrdering(); var reconciler = new DesiredBalanceReconciler(clusterSettings, mock(ThreadPool.class));
var moveOrdering = new NodeAllocationOrdering();
var totalOutgoingMoves = new HashMap<String, AtomicInteger>(); var totalOutgoingMoves = new HashMap<String, AtomicInteger>();
for (int i = 0; i < numberOfNodes; i++) { for (int i = 0; i < numberOfNodes; i++) {
@ -1097,7 +1102,7 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
while (true) { while (true) {
var allocation = createRoutingAllocationFrom(clusterState, deciders); var allocation = createRoutingAllocationFrom(clusterState, deciders);
new DesiredBalanceReconciler(balance, allocation, allocationOrdering, moveOrdering).run(); reconciler.reconcile(balance, allocation);
var initializing = shardsWithState(allocation.routingNodes(), ShardRoutingState.INITIALIZING); var initializing = shardsWithState(allocation.routingNodes(), ShardRoutingState.INITIALIZING);
if (initializing.isEmpty()) { if (initializing.isEmpty()) {
@ -1124,8 +1129,52 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
} }
} }
public void testShouldLogOnTooManyUndesiredAllocations() {
var indexMetadata = IndexMetadata.builder("index-1").settings(indexSettings(Version.CURRENT, 1, 0)).build();
final var index = indexMetadata.getIndex();
final var shardId = new ShardId(index, 0);
final var clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(newNode("data-node-1")).add(newNode("data-node-2")))
.metadata(Metadata.builder().put(indexMetadata, true))
.routingTable(
RoutingTable.builder()
.add(IndexRoutingTable.builder(index).addShard(newShardRouting(shardId, "data-node-2", true, STARTED)))
)
.build();
final var balance = new DesiredBalance(1, Map.of(shardId, new ShardAssignment(Set.of("data-node-1"), 1, 0, 0)));
var threadPool = mock(ThreadPool.class);
when(threadPool.relativeTimeInMillis()).thenReturn(1L).thenReturn(2L);
var reconciler = new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool);
assertThatLogger(
() -> reconciler.reconcile(balance, createRoutingAllocationFrom(clusterState)),
DesiredBalanceReconciler.class,
new MockLogAppender.SeenEventExpectation(
"Should log first too many shards on undesired locations",
DesiredBalanceReconciler.class.getCanonicalName(),
Level.WARN,
"[100.0%] of assigned shards (1/1) are not on their desired nodes, which exceeds the warn threshold of [10.0%]"
)
);
assertThatLogger(
() -> reconciler.reconcile(balance, createRoutingAllocationFrom(clusterState)),
DesiredBalanceReconciler.class,
new MockLogAppender.UnseenEventExpectation(
"Should not log immediate second too many shards on undesired locations",
DesiredBalanceReconciler.class.getCanonicalName(),
Level.WARN,
"[100.0%] of assigned shards (1/1) are not on their desired nodes, which exceeds the warn threshold of [10.0%]"
)
);
}
private static void reconcile(RoutingAllocation routingAllocation, DesiredBalance desiredBalance) { private static void reconcile(RoutingAllocation routingAllocation, DesiredBalance desiredBalance) {
new DesiredBalanceReconciler(desiredBalance, routingAllocation, new NodeAllocationOrdering(), new NodeAllocationOrdering()).run(); new DesiredBalanceReconciler(createBuiltInClusterSettings(), mock(ThreadPool.class)).reconcile(desiredBalance, routingAllocation);
} }
private static boolean isReconciled(RoutingNode node, DesiredBalance balance) { private static boolean isReconciled(RoutingNode node, DesiredBalance balance) {

View file

@ -332,11 +332,12 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
var gatewayAllocator = createGatewayAllocator(); var gatewayAllocator = createGatewayAllocator();
var shardsAllocator = createShardsAllocator(); var shardsAllocator = createShardsAllocator();
var clusterSettings = createBuiltInClusterSettings();
var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator( var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator(
shardsAllocator, shardsAllocator,
threadPool, threadPool,
clusterService, clusterService,
new DesiredBalanceComputer(createBuiltInClusterSettings(), threadPool, shardsAllocator) { new DesiredBalanceComputer(clusterSettings, threadPool, shardsAllocator) {
@Override @Override
public DesiredBalance compute( public DesiredBalance compute(
DesiredBalance previousDesiredBalance, DesiredBalance previousDesiredBalance,
@ -433,11 +434,12 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
var gatewayAllocator = createGatewayAllocator(); var gatewayAllocator = createGatewayAllocator();
var shardsAllocator = createShardsAllocator(); var shardsAllocator = createShardsAllocator();
var clusterSettings = createBuiltInClusterSettings();
var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator( var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator(
shardsAllocator, shardsAllocator,
threadPool, threadPool,
clusterService, clusterService,
new DesiredBalanceComputer(createBuiltInClusterSettings(), threadPool, shardsAllocator) { new DesiredBalanceComputer(clusterSettings, threadPool, shardsAllocator) {
@Override @Override
public DesiredBalance compute( public DesiredBalance compute(
DesiredBalance previousDesiredBalance, DesiredBalance previousDesiredBalance,
@ -520,10 +522,10 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase {
var threadPool = new TestThreadPool(getTestName()); var threadPool = new TestThreadPool(getTestName());
var clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); var clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool);
var delegateAllocator = createShardsAllocator(); var delegateAllocator = createShardsAllocator();
var clusterSettings = createBuiltInClusterSettings();
var desiredBalanceComputer = new DesiredBalanceComputer(createBuiltInClusterSettings(), threadPool, delegateAllocator) { var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator) {
final AtomicReference<DesiredBalance> lastComputationInput = new AtomicReference<>(); final AtomicReference<DesiredBalance> lastComputationInput = new AtomicReference<>();

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.test.ESTestCase;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.core.TimeValue.timeValueMillis;
import static org.hamcrest.Matchers.equalTo;
public class FrequencyCappedActionTests extends ESTestCase {
public void testFrequencyCapExecution() {
var executions = new AtomicLong(0);
var currentTime = new AtomicLong();
var action = new FrequencyCappedAction(currentTime::get);
var minInterval = timeValueMillis(randomNonNegativeInt());
action.setMinInterval(minInterval);
// initial execution should happen
action.maybeExecute(executions::incrementAndGet);
assertThat(executions.get(), equalTo(1L));
// should not execute again too soon
currentTime.set(randomLongBetween(0, minInterval.millis() - 1));
action.maybeExecute(executions::incrementAndGet);
assertThat(executions.get(), equalTo(1L));
// should execute min interval elapsed
currentTime.set(randomLongBetween(minInterval.millis(), Long.MAX_VALUE));
action.maybeExecute(executions::incrementAndGet);
assertThat(executions.get(), equalTo(2L));
}
}

View file

@ -13,10 +13,13 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.hamcrest.Matchers.containsInAnyOrder;
public class IngestStatsTests extends ESTestCase { public class IngestStatsTests extends ESTestCase {
public void testSerialization() throws IOException { public void testSerialization() throws IOException {
@ -28,6 +31,110 @@ public class IngestStatsTests extends ESTestCase {
assertIngestStats(ingestStats, serializedStats); assertIngestStats(ingestStats, serializedStats);
} }
public void testStatsMerge() {
var first = randomStats();
var second = randomStats();
assertEquals(
new IngestStats.Stats(
first.ingestCount() + second.ingestCount(),
first.ingestTimeInMillis() + second.ingestTimeInMillis(),
first.ingestCurrent() + second.ingestCurrent(),
first.ingestFailedCount() + second.ingestFailedCount()
),
IngestStats.Stats.merge(first, second)
);
}
public void testPipelineStatsMerge() {
var first = List.of(
randomPipelineStat("pipeline-1"),
randomPipelineStat("pipeline-1"),
randomPipelineStat("pipeline-2"),
randomPipelineStat("pipeline-3"),
randomPipelineStat("pipeline-5")
);
var second = List.of(
randomPipelineStat("pipeline-2"),
randomPipelineStat("pipeline-1"),
randomPipelineStat("pipeline-4"),
randomPipelineStat("pipeline-3")
);
assertThat(
IngestStats.PipelineStat.merge(first, second),
containsInAnyOrder(
new IngestStats.PipelineStat("pipeline-1", merge(first.get(0).stats(), first.get(1).stats(), second.get(1).stats())),
new IngestStats.PipelineStat("pipeline-2", merge(first.get(2).stats(), second.get(0).stats())),
new IngestStats.PipelineStat("pipeline-3", merge(first.get(3).stats(), second.get(3).stats())),
new IngestStats.PipelineStat("pipeline-4", second.get(2).stats()),
new IngestStats.PipelineStat("pipeline-5", first.get(4).stats())
)
);
}
public void testProcessorStatsMerge() {
{
var first = Map.of("pipeline-1", randomPipelineProcessorStats());
assertEquals(IngestStats.merge(Map.of(), first), first);
assertEquals(IngestStats.merge(first, Map.of()), first);
}
{
var first = Map.of(
"pipeline-1",
randomPipelineProcessorStats(),
"pipeline-2",
randomPipelineProcessorStats(),
"pipeline-3",
randomPipelineProcessorStats()
);
var second = Map.of(
"pipeline-2",
randomPipelineProcessorStats(),
"pipeline-3",
randomPipelineProcessorStats(),
"pipeline-1",
randomPipelineProcessorStats()
);
assertEquals(
IngestStats.merge(first, second),
Map.of(
"pipeline-1",
expectedPipelineProcessorStats(first.get("pipeline-1"), second.get("pipeline-1")),
"pipeline-2",
expectedPipelineProcessorStats(first.get("pipeline-2"), second.get("pipeline-2")),
"pipeline-3",
expectedPipelineProcessorStats(first.get("pipeline-3"), second.get("pipeline-3"))
)
);
}
}
private static List<IngestStats.ProcessorStat> expectedPipelineProcessorStats(
List<IngestStats.ProcessorStat> first,
List<IngestStats.ProcessorStat> second
) {
return List.of(
new IngestStats.ProcessorStat("proc-1", "type-1", merge(first.get(0).stats(), second.get(0).stats())),
new IngestStats.ProcessorStat("proc-1", "type-2", merge(first.get(1).stats(), second.get(1).stats())),
new IngestStats.ProcessorStat("proc-2", "type-1", merge(first.get(2).stats(), second.get(2).stats())),
new IngestStats.ProcessorStat("proc-3", "type-4", merge(first.get(3).stats(), second.get(3).stats()))
);
}
private static List<IngestStats.ProcessorStat> randomPipelineProcessorStats() {
return List.of(
randomProcessorStat("proc-1", "type-1"),
randomProcessorStat("proc-1", "type-2"),
randomProcessorStat("proc-2", "type-1"),
randomProcessorStat("proc-3", "type-4")
);
}
private static IngestStats.Stats merge(IngestStats.Stats... stats) {
return Arrays.stream(stats).reduce(IngestStats.Stats.IDENTITY, IngestStats.Stats::merge);
}
private static List<IngestStats.PipelineStat> createPipelineStats() { private static List<IngestStats.PipelineStat> createPipelineStats() {
IngestStats.PipelineStat pipeline1Stats = new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(3, 3, 3, 3)); IngestStats.PipelineStat pipeline1Stats = new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(3, 3, 3, 3));
IngestStats.PipelineStat pipeline2Stats = new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(47, 97, 197, 297)); IngestStats.PipelineStat pipeline2Stats = new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(47, 97, 197, 297));
@ -98,4 +205,16 @@ public class IngestStatsTests extends ESTestCase {
.map(IngestStats.PipelineStat::stats) .map(IngestStats.PipelineStat::stats)
.orElse(null); .orElse(null);
} }
private static IngestStats.ProcessorStat randomProcessorStat(String name, String type) {
return new IngestStats.ProcessorStat(name, type, randomStats());
}
private static IngestStats.PipelineStat randomPipelineStat(String id) {
return new IngestStats.PipelineStat(id, randomStats());
}
private static IngestStats.Stats randomStats() {
return new IngestStats.Stats(randomLong(), randomLong(), randomLong(), randomLong());
}
} }

View file

@ -33,7 +33,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
@ -42,6 +41,7 @@ import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo; import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
import org.elasticsearch.snapshots.SnapshotsInfoService; import org.elasticsearch.snapshots.SnapshotsInfoService;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.gateway.TestGatewayAllocator; import org.elasticsearch.test.gateway.TestGatewayAllocator;
@ -59,7 +59,6 @@ import static org.elasticsearch.cluster.ClusterModule.DESIRED_BALANCE_ALLOCATOR;
import static org.elasticsearch.cluster.ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING; import static org.elasticsearch.cluster.ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings; import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;
import static org.mockito.Mockito.mock;
public abstract class ESAllocationTestCase extends ESTestCase { public abstract class ESAllocationTestCase extends ESTestCase {
@ -154,11 +153,13 @@ public abstract class ESAllocationTestCase extends ESTestCase {
private static DesiredBalanceShardsAllocator createDesiredBalanceShardsAllocator(Settings settings) { private static DesiredBalanceShardsAllocator createDesiredBalanceShardsAllocator(Settings settings) {
var queue = new DeterministicTaskQueue(); var queue = new DeterministicTaskQueue();
var clusterSettings = createBuiltInClusterSettings(settings);
var clusterService = ClusterServiceUtils.createClusterService(queue.getThreadPool(), clusterSettings);
return new DesiredBalanceShardsAllocator( return new DesiredBalanceShardsAllocator(
createBuiltInClusterSettings(settings), clusterSettings,
new BalancedShardsAllocator(settings), new BalancedShardsAllocator(settings),
queue.getThreadPool(), queue.getThreadPool(),
mock(ClusterService.class), clusterService,
null null
) { ) {
private RoutingAllocation lastAllocation; private RoutingAllocation lastAllocation;

View file

@ -261,4 +261,16 @@ public class MockLogAppender extends AbstractAppender {
} }
}; };
} }
/**
* Executes an action and verifies expectations against the provided logger
*/
public static void assertThatLogger(Runnable action, Class<?> loggerOwner, MockLogAppender.AbstractEventExpectation expectation) {
MockLogAppender mockAppender = new MockLogAppender();
try (var ignored = mockAppender.capturing(loggerOwner)) {
mockAppender.addExpectation(expectation);
action.run();
mockAppender.assertAllExpectationsMatched();
}
}
} }

View file

@ -303,6 +303,12 @@ All {Ilm} operations relating to managing the execution of policies of an index
or data stream. This includes operations such as retrying policies and removing or data stream. This includes operations such as retrying policies and removing
a policy from an index or data stream. a policy from an index or data stream.
ifeval::["{release-state}"!="released"]
`manage_dlm`::
All {Dlm} operations relating to reading and managing the lifecycle of a data stream.
This includes operations such as adding and removing a lifecycle from a data stream.
endif::[]
`manage_leader_index`:: `manage_leader_index`::
All actions that are required to manage the lifecycle of a leader index, which All actions that are required to manage the lifecycle of a leader index, which
includes <<ccr-post-forget-follower,forgetting a follower>>. This includes <<ccr-post-forget-follower,forgetting a follower>>. This

View file

@ -38,7 +38,7 @@ public class PutRoleRequestBuilder extends ActionRequestBuilder<PutRoleRequest,
public PutRoleRequestBuilder source(String name, BytesReference source, XContentType xContentType) throws IOException { public PutRoleRequestBuilder source(String name, BytesReference source, XContentType xContentType) throws IOException {
// we pass false as last parameter because we want to reject the request if field permissions // we pass false as last parameter because we want to reject the request if field permissions
// are given in 2.x syntax // are given in 2.x syntax
RoleDescriptor descriptor = RoleDescriptor.parse(name, source, false, xContentType); RoleDescriptor descriptor = RoleDescriptor.parse(name, source, false, xContentType, false);
assert name.equals(descriptor.getName()); assert name.equals(descriptor.getName());
request.name(name); request.name(name);
request.cluster(descriptor.getClusterPrivileges()); request.cluster(descriptor.getClusterPrivileges());

View file

@ -426,6 +426,16 @@ public class RoleDescriptor implements ToXContentObject, Writeable {
public static RoleDescriptor parse(String name, BytesReference source, boolean allow2xFormat, XContentType xContentType) public static RoleDescriptor parse(String name, BytesReference source, boolean allow2xFormat, XContentType xContentType)
throws IOException { throws IOException {
return parse(name, source, allow2xFormat, xContentType, true);
}
public static RoleDescriptor parse(
String name,
BytesReference source,
boolean allow2xFormat,
XContentType xContentType,
boolean allowRestriction
) throws IOException {
assert name != null; assert name != null;
// EMPTY is safe here because we never use namedObject // EMPTY is safe here because we never use namedObject
try ( try (
@ -433,16 +443,26 @@ public class RoleDescriptor implements ToXContentObject, Writeable {
XContentParser parser = xContentType.xContent() XContentParser parser = xContentType.xContent()
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream) .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)
) { ) {
return parse(name, parser, allow2xFormat); return parse(name, parser, allow2xFormat, allowRestriction);
} }
} }
public static RoleDescriptor parse(String name, XContentParser parser, boolean allow2xFormat) throws IOException { public static RoleDescriptor parse(String name, XContentParser parser, boolean allow2xFormat) throws IOException {
return parse(name, parser, allow2xFormat, TcpTransport.isUntrustedRemoteClusterEnabled()); return parse(name, parser, allow2xFormat, TcpTransport.isUntrustedRemoteClusterEnabled(), true);
} }
static RoleDescriptor parse(String name, XContentParser parser, boolean allow2xFormat, boolean untrustedRemoteClusterEnabled) public static RoleDescriptor parse(String name, XContentParser parser, boolean allow2xFormat, boolean allowRestriction)
throws IOException { throws IOException {
return parse(name, parser, allow2xFormat, TcpTransport.isUntrustedRemoteClusterEnabled(), allowRestriction);
}
static RoleDescriptor parse(
String name,
XContentParser parser,
boolean allow2xFormat,
boolean untrustedRemoteClusterEnabled,
boolean allowRestriction
) throws IOException {
// validate name // validate name
Validation.Error validationError = Validation.Roles.validateRoleName(name, true); Validation.Error validationError = Validation.Roles.validateRoleName(name, true);
if (validationError != null) { if (validationError != null) {
@ -503,7 +523,7 @@ public class RoleDescriptor implements ToXContentObject, Writeable {
} else if (untrustedRemoteClusterEnabled } else if (untrustedRemoteClusterEnabled
&& Fields.REMOTE_INDICES.match(currentFieldName, parser.getDeprecationHandler())) { && Fields.REMOTE_INDICES.match(currentFieldName, parser.getDeprecationHandler())) {
remoteIndicesPrivileges = parseRemoteIndices(name, parser); remoteIndicesPrivileges = parseRemoteIndices(name, parser);
} else if (Fields.RESTRICTION.match(currentFieldName, parser.getDeprecationHandler())) { } else if (allowRestriction && Fields.RESTRICTION.match(currentFieldName, parser.getDeprecationHandler())) {
restriction = Restriction.parse(name, parser); restriction = Restriction.parse(name, parser);
} else if (Fields.TYPE.match(currentFieldName, parser.getDeprecationHandler())) { } else if (Fields.TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
// don't need it // don't need it

View file

@ -30,6 +30,7 @@ import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.datastreams.PromoteDataStreamAction; import org.elasticsearch.action.datastreams.PromoteDataStreamAction;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
import org.elasticsearch.action.search.SearchShardsAction; import org.elasticsearch.action.search.SearchShardsAction;
import org.elasticsearch.cluster.metadata.DataLifecycle;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.index.seqno.RetentionLeaseActions; import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TcpTransport;
@ -118,6 +119,8 @@ public final class IndexPrivilege extends Privilege {
ValidateQueryAction.NAME + "*", ValidateQueryAction.NAME + "*",
GetSettingsAction.NAME, GetSettingsAction.NAME,
ExplainLifecycleAction.NAME, ExplainLifecycleAction.NAME,
"indices:admin/dlm/get",
"indices:admin/dlm/explain",
GetDataStreamAction.NAME, GetDataStreamAction.NAME,
ResolveIndexAction.NAME, ResolveIndexAction.NAME,
FieldCapabilitiesAction.NAME + "*", FieldCapabilitiesAction.NAME + "*",
@ -133,6 +136,7 @@ public final class IndexPrivilege extends Privilege {
); );
private static final Automaton MANAGE_LEADER_INDEX_AUTOMATON = patterns(ForgetFollowerAction.NAME + "*"); private static final Automaton MANAGE_LEADER_INDEX_AUTOMATON = patterns(ForgetFollowerAction.NAME + "*");
private static final Automaton MANAGE_ILM_AUTOMATON = patterns("indices:admin/ilm/*"); private static final Automaton MANAGE_ILM_AUTOMATON = patterns("indices:admin/ilm/*");
private static final Automaton MANAGE_DLM_AUTOMATON = patterns("indices:admin/dlm/*");
private static final Automaton MAINTENANCE_AUTOMATON = patterns( private static final Automaton MAINTENANCE_AUTOMATON = patterns(
"indices:admin/refresh*", "indices:admin/refresh*",
"indices:admin/flush*", "indices:admin/flush*",
@ -173,6 +177,7 @@ public final class IndexPrivilege extends Privilege {
public static final IndexPrivilege MANAGE_FOLLOW_INDEX = new IndexPrivilege("manage_follow_index", MANAGE_FOLLOW_INDEX_AUTOMATON); public static final IndexPrivilege MANAGE_FOLLOW_INDEX = new IndexPrivilege("manage_follow_index", MANAGE_FOLLOW_INDEX_AUTOMATON);
public static final IndexPrivilege MANAGE_LEADER_INDEX = new IndexPrivilege("manage_leader_index", MANAGE_LEADER_INDEX_AUTOMATON); public static final IndexPrivilege MANAGE_LEADER_INDEX = new IndexPrivilege("manage_leader_index", MANAGE_LEADER_INDEX_AUTOMATON);
public static final IndexPrivilege MANAGE_ILM = new IndexPrivilege("manage_ilm", MANAGE_ILM_AUTOMATON); public static final IndexPrivilege MANAGE_ILM = new IndexPrivilege("manage_ilm", MANAGE_ILM_AUTOMATON);
public static final IndexPrivilege MANAGE_DLM = new IndexPrivilege("manage_dlm", MANAGE_DLM_AUTOMATON);
public static final IndexPrivilege MAINTENANCE = new IndexPrivilege("maintenance", MAINTENANCE_AUTOMATON); public static final IndexPrivilege MAINTENANCE = new IndexPrivilege("maintenance", MAINTENANCE_AUTOMATON);
public static final IndexPrivilege AUTO_CONFIGURE = new IndexPrivilege("auto_configure", AUTO_CONFIGURE_AUTOMATON); public static final IndexPrivilege AUTO_CONFIGURE = new IndexPrivilege("auto_configure", AUTO_CONFIGURE_AUTOMATON);
public static final IndexPrivilege CROSS_CLUSTER_REPLICATION = new IndexPrivilege( public static final IndexPrivilege CROSS_CLUSTER_REPLICATION = new IndexPrivilege(
@ -204,6 +209,7 @@ public final class IndexPrivilege extends Privilege {
entry("manage_follow_index", MANAGE_FOLLOW_INDEX), entry("manage_follow_index", MANAGE_FOLLOW_INDEX),
entry("manage_leader_index", MANAGE_LEADER_INDEX), entry("manage_leader_index", MANAGE_LEADER_INDEX),
entry("manage_ilm", MANAGE_ILM), entry("manage_ilm", MANAGE_ILM),
DataLifecycle.isEnabled() ? entry("manage_dlm", MANAGE_DLM) : null,
entry("maintenance", MAINTENANCE), entry("maintenance", MAINTENANCE),
entry("auto_configure", AUTO_CONFIGURE), entry("auto_configure", AUTO_CONFIGURE),
TcpTransport.isUntrustedRemoteClusterEnabled() ? entry("cross_cluster_replication", CROSS_CLUSTER_REPLICATION) : null, TcpTransport.isUntrustedRemoteClusterEnabled() ? entry("cross_cluster_replication", CROSS_CLUSTER_REPLICATION) : null,

View file

@ -7,7 +7,10 @@
package org.elasticsearch.xpack.core.security.user; package org.elasticsearch.xpack.core.security.user;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor; import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.core.security.support.MetadataUtils; import org.elasticsearch.xpack.core.security.support.MetadataUtils;
@ -122,6 +125,48 @@ public class InternalUsers {
) )
); );
/**
* Internal user that manages DLM. Has all indices permissions to perform DLM runtime tasks.
*/
public static final InternalUser DLM_USER = new InternalUser(
UsernamesField.DLM_NAME,
new RoleDescriptor(
UsernamesField.DLM_ROLE,
new String[] {},
new RoleDescriptor.IndicesPrivileges[] {
RoleDescriptor.IndicesPrivileges.builder()
.indices("*")
.privileges(
"delete_index",
RolloverAction.NAME,
ForceMergeAction.NAME + "*",
// indices stats is used by rollover, so we need to grant it here
IndicesStatsAction.NAME + "*"
)
.allowRestrictedIndices(false)
.build(),
RoleDescriptor.IndicesPrivileges.builder()
.indices(
// System data stream for result history of fleet actions (see Fleet#fleetActionsResultsDescriptor)
".fleet-actions-results"
)
.privileges(
"delete_index",
RolloverAction.NAME,
ForceMergeAction.NAME + "*",
// indices stats is used by rollover, so we need to grant it here
IndicesStatsAction.NAME + "*"
)
.allowRestrictedIndices(true)
.build() },
null,
null,
new String[] {},
MetadataUtils.DEFAULT_RESERVED_METADATA,
Map.of()
)
);
public static final SystemUser SYSTEM_USER = SystemUser.INSTANCE; public static final SystemUser SYSTEM_USER = SystemUser.INSTANCE;
public static final InternalUser CROSS_CLUSTER_ACCESS_USER = CrossClusterAccessUser.INSTANCE; public static final InternalUser CROSS_CLUSTER_ACCESS_USER = CrossClusterAccessUser.INSTANCE;
@ -135,7 +180,8 @@ public class InternalUsers {
SECURITY_PROFILE_USER, SECURITY_PROFILE_USER,
ASYNC_SEARCH_USER, ASYNC_SEARCH_USER,
CROSS_CLUSTER_ACCESS_USER, CROSS_CLUSTER_ACCESS_USER,
STORAGE_USER STORAGE_USER,
DLM_USER
).collect(Collectors.toUnmodifiableMap(InternalUser::principal, Function.identity())); ).collect(Collectors.toUnmodifiableMap(InternalUser::principal, Function.identity()));
} }

View file

@ -16,6 +16,8 @@ public final class UsernamesField {
public static final String SYSTEM_ROLE = "_system"; public static final String SYSTEM_ROLE = "_system";
public static final String XPACK_SECURITY_NAME = "_xpack_security"; public static final String XPACK_SECURITY_NAME = "_xpack_security";
public static final String XPACK_SECURITY_ROLE = "_xpack_security"; public static final String XPACK_SECURITY_ROLE = "_xpack_security";
public static final String DLM_NAME = "_dlm";
public static final String DLM_ROLE = "_dlm";
public static final String CROSS_CLUSTER_ACCESS_NAME = "_cross_cluster_access"; public static final String CROSS_CLUSTER_ACCESS_NAME = "_cross_cluster_access";
public static final String CROSS_CLUSTER_ACCESS_ROLE = "_cross_cluster_access"; public static final String CROSS_CLUSTER_ACCESS_ROLE = "_cross_cluster_access";
public static final String SECURITY_PROFILE_NAME = "_security_profile"; public static final String SECURITY_PROFILE_NAME = "_security_profile";

View file

@ -28,7 +28,7 @@
"type": "date", "type": "date",
"format": "epoch_second" "format": "epoch_second"
}, },
"Symbolization.lastprocessed": { "Symbolization.next_time": {
"type": "date", "type": "date",
"format": "epoch_second", "format": "epoch_second",
"index": false "index": false

View file

@ -9,6 +9,13 @@
} }
}, },
"mappings": { "mappings": {
/*
For the inline chain the profiling-stackframes index needs '_source' to be enabled.
Also, doc_values for the fields below are disabled to not store the values twice.
Using synthetic source reduces storage size by ~50% but requires "concatenation"
of arrays and adds latency when _source is reconstructed at query time.
This last point is why we don't want to use synthetic source right now.
*/
"_source": { "_source": {
"enabled": true "enabled": true
}, },

View file

@ -240,7 +240,8 @@ public class AuthenticationTestHelper {
UsernamesField.ASYNC_SEARCH_ROLE, UsernamesField.ASYNC_SEARCH_ROLE,
UsernamesField.XPACK_SECURITY_ROLE, UsernamesField.XPACK_SECURITY_ROLE,
UsernamesField.SECURITY_PROFILE_ROLE, UsernamesField.SECURITY_PROFILE_ROLE,
UsernamesField.CROSS_CLUSTER_ACCESS_ROLE UsernamesField.CROSS_CLUSTER_ACCESS_ROLE,
UsernamesField.DLM_ROLE
); );
} }

View file

@ -594,6 +594,51 @@ public class RoleDescriptorTests extends ESTestCase {
} }
} }
public void testParseRoleWithRestrictionFailsWhenAllowRestrictionIsFalse() {
final String json = """
{
"restriction": {
"workflows": ["search_application"]
}
}""";
final ElasticsearchParseException e = expectThrows(
ElasticsearchParseException.class,
() -> RoleDescriptor.parse(
"test_role_with_restriction",
XContentHelper.createParser(XContentParserConfiguration.EMPTY, new BytesArray(json), XContentType.JSON),
randomBoolean(),
randomBoolean(),
false
)
);
assertThat(
e,
TestMatchers.throwableWithMessage(
containsString("failed to parse role [test_role_with_restriction]. unexpected field [restriction]")
)
);
}
public void testParseRoleWithRestrictionWhenAllowRestrictionIsTrue() throws IOException {
final String json = """
{
"restriction": {
"workflows": ["search_application"]
}
}""";
RoleDescriptor role = RoleDescriptor.parse(
"test_role_with_restriction",
XContentHelper.createParser(XContentParserConfiguration.EMPTY, new BytesArray(json), XContentType.JSON),
randomBoolean(),
randomBoolean(),
true
);
assertThat(role.getName(), equalTo("test_role_with_restriction"));
assertThat(role.hasRestriction(), equalTo(true));
assertThat(role.hasWorkflowsRestriction(), equalTo(true));
assertThat(role.getRestriction().getWorkflows(), arrayContaining("search_application"));
}
public void testParseEmptyQuery() throws Exception { public void testParseEmptyQuery() throws Exception {
String json = """ String json = """
{ {
@ -773,6 +818,7 @@ public class RoleDescriptorTests extends ESTestCase {
"test", "test",
XContentHelper.createParser(XContentParserConfiguration.EMPTY, new BytesArray(json), XContentType.JSON), XContentHelper.createParser(XContentParserConfiguration.EMPTY, new BytesArray(json), XContentType.JSON),
false, false,
false,
false false
) )
); );

View file

@ -15,6 +15,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesAction; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesAction;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateAction; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateAction;
import org.elasticsearch.cluster.metadata.DataLifecycle;
import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TcpTransport;
@ -465,6 +466,35 @@ public class PrivilegeTests extends ESTestCase {
} }
} }
public void testDlmPrivileges() {
assumeTrue("feature flag required", DataLifecycle.isEnabled());
{
Predicate<String> predicate = IndexPrivilege.MANAGE_DLM.predicate();
// check indices actions
assertThat(predicate.test("indices:admin/dlm/explain"), is(true));
assertThat(predicate.test("indices:admin/dlm/get"), is(true));
assertThat(predicate.test("indices:admin/dlm/delete"), is(true));
assertThat(predicate.test("indices:admin/dlm/put"), is(true));
assertThat(predicate.test("indices:admin/dlm/brand_new_api"), is(true));
assertThat(predicate.test("indices:admin/dlm/brand_new_api"), is(true));
// check non-dlm action
assertThat(predicate.test("indices:admin/whatever"), is(false));
}
{
Predicate<String> predicate = IndexPrivilege.VIEW_METADATA.predicate();
// check indices actions
assertThat(predicate.test("indices:admin/dlm/explain"), is(true));
assertThat(predicate.test("indices:admin/dlm/get"), is(true));
assertThat(predicate.test("indices:admin/dlm/delete"), is(false));
assertThat(predicate.test("indices:admin/dlm/put"), is(false));
assertThat(predicate.test("indices:admin/dlm/brand_new_api"), is(false));
assertThat(predicate.test("indices:admin/dlm/brand_new_api"), is(false));
// check non-dlm action
assertThat(predicate.test("indices:admin/whatever"), is(false));
}
}
public void testIngestPipelinePrivileges() { public void testIngestPipelinePrivileges() {
{ {
verifyClusterActionAllowed( verifyClusterActionAllowed(

View file

@ -9,11 +9,13 @@
package org.elasticsearch.xpack.core.security.test; package org.elasticsearch.xpack.core.security.test;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.indices.ExecutorNames; import org.elasticsearch.indices.ExecutorNames;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.indices.SystemIndices.Feature; import org.elasticsearch.indices.SystemIndices.Feature;
@ -27,6 +29,7 @@ import java.io.UncheckedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
@ -100,6 +103,25 @@ public class TestRestrictedIndices {
new SystemIndexDescriptor(".fleet-policies-leader*", "fleet policies leader"), new SystemIndexDescriptor(".fleet-policies-leader*", "fleet policies leader"),
new SystemIndexDescriptor(".fleet-servers*", "fleet servers"), new SystemIndexDescriptor(".fleet-servers*", "fleet servers"),
new SystemIndexDescriptor(".fleet-artifacts*", "fleet artifacts") new SystemIndexDescriptor(".fleet-artifacts*", "fleet artifacts")
),
List.of(
new SystemDataStreamDescriptor(
".fleet-actions-results",
"fleet actions results",
SystemDataStreamDescriptor.Type.EXTERNAL,
new ComposableIndexTemplate(
List.of(".fleet-actions-results"),
null,
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate()
),
Map.of(),
List.of("fleet", "kibana"),
null
)
) )
) )
); );

View file

@ -17,12 +17,16 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptAction; import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.TransportUnpromotableShardRefreshAction; import org.elasticsearch.action.admin.indices.refresh.TransportUnpromotableShardRefreshAction;
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction; import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.get.GetAction; import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -33,11 +37,13 @@ import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.security.authc.Authentication; import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.security.authc.AuthenticationTestHelper; import org.elasticsearch.xpack.core.security.authc.AuthenticationTestHelper;
import org.elasticsearch.xpack.core.security.authz.permission.ApplicationPermission; import org.elasticsearch.xpack.core.security.authz.permission.ApplicationPermission;
import org.elasticsearch.xpack.core.security.authz.permission.ClusterPermission;
import org.elasticsearch.xpack.core.security.authz.permission.FieldPermissionsCache; import org.elasticsearch.xpack.core.security.authz.permission.FieldPermissionsCache;
import org.elasticsearch.xpack.core.security.authz.permission.RemoteIndicesPermission; import org.elasticsearch.xpack.core.security.authz.permission.RemoteIndicesPermission;
import org.elasticsearch.xpack.core.security.authz.permission.Role; import org.elasticsearch.xpack.core.security.authz.permission.Role;
import org.elasticsearch.xpack.core.security.authz.permission.RunAsPermission; import org.elasticsearch.xpack.core.security.authz.permission.RunAsPermission;
import org.elasticsearch.xpack.core.security.authz.permission.SimpleRole; import org.elasticsearch.xpack.core.security.authz.permission.SimpleRole;
import org.elasticsearch.xpack.core.security.support.MetadataUtils;
import org.elasticsearch.xpack.core.security.test.TestRestrictedIndices; import org.elasticsearch.xpack.core.security.test.TestRestrictedIndices;
import java.util.List; import java.util.List;
@ -46,6 +52,8 @@ import static org.elasticsearch.xpack.core.security.test.TestRestrictedIndices.I
import static org.elasticsearch.xpack.core.security.test.TestRestrictedIndices.INTERNAL_SECURITY_TOKENS_INDEX_7; import static org.elasticsearch.xpack.core.security.test.TestRestrictedIndices.INTERNAL_SECURITY_TOKENS_INDEX_7;
import static org.elasticsearch.xpack.core.security.test.TestRestrictedIndices.SECURITY_MAIN_ALIAS; import static org.elasticsearch.xpack.core.security.test.TestRestrictedIndices.SECURITY_MAIN_ALIAS;
import static org.elasticsearch.xpack.core.security.test.TestRestrictedIndices.SECURITY_TOKENS_ALIAS; import static org.elasticsearch.xpack.core.security.test.TestRestrictedIndices.SECURITY_TOKENS_ALIAS;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -210,6 +218,54 @@ public class InternalUsersTests extends ESTestCase {
checkIndexAccess(role, randomFrom(sampleDeniedActions), INTERNAL_SECURITY_MAIN_INDEX_7, false); checkIndexAccess(role, randomFrom(sampleDeniedActions), INTERNAL_SECURITY_MAIN_INDEX_7, false);
} }
public void testDlmUser() {
assertThat(InternalUsers.getUser("_dlm"), is(InternalUsers.DLM_USER));
assertThat(
InternalUsers.DLM_USER.getLocalClusterRoleDescriptor().get().getMetadata(),
equalTo(MetadataUtils.DEFAULT_RESERVED_METADATA)
);
final SimpleRole role = getLocalClusterRole(InternalUsers.DLM_USER);
assertThat(role.cluster(), is(ClusterPermission.NONE));
assertThat(role.runAs(), is(RunAsPermission.NONE));
assertThat(role.application(), is(ApplicationPermission.NONE));
assertThat(role.remoteIndices(), is(RemoteIndicesPermission.NONE));
final String allowedSystemDataStream = ".fleet-actions-results";
for (var group : role.indices().groups()) {
if (group.allowRestrictedIndices()) {
assertThat(group.indices(), arrayContaining(allowedSystemDataStream));
}
}
final List<String> sampleIndexActions = List.of(
RolloverAction.NAME,
DeleteIndexAction.NAME,
ForceMergeAction.NAME,
IndicesStatsAction.NAME
);
final String dataStream = randomAlphaOfLengthBetween(3, 12);
checkIndexAccess(role, randomFrom(sampleIndexActions), dataStream, true);
// Also check backing index access
checkIndexAccess(
role,
randomFrom(sampleIndexActions),
DataStream.BACKING_INDEX_PREFIX + dataStream + randomAlphaOfLengthBetween(4, 8),
true
);
checkIndexAccess(role, randomFrom(sampleIndexActions), allowedSystemDataStream, true);
checkIndexAccess(
role,
randomFrom(sampleIndexActions),
DataStream.BACKING_INDEX_PREFIX + allowedSystemDataStream + randomAlphaOfLengthBetween(4, 8),
true
);
checkIndexAccess(role, randomFrom(sampleIndexActions), randomFrom(TestRestrictedIndices.SAMPLE_RESTRICTED_NAMES), false);
}
public void testRegularUser() { public void testRegularUser() {
var username = randomAlphaOfLengthBetween(4, 12); var username = randomAlphaOfLengthBetween(4, 12);
expectThrows(IllegalStateException.class, () -> InternalUsers.getUser(username)); expectThrows(IllegalStateException.class, () -> InternalUsers.getUser(username));

View file

@ -41,6 +41,7 @@ public class MultiNodesStatsTests extends MonitoringIntegTestCase {
wipeMonitoringIndices(); wipeMonitoringIndices();
} }
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/96374")
public void testMultipleNodes() throws Exception { public void testMultipleNodes() throws Exception {
int nodes = 0; int nodes = 0;

View file

@ -26,6 +26,7 @@ dependencies {
testImplementation project(path: ':modules:analysis-common') testImplementation project(path: ':modules:analysis-common')
testImplementation project(path: ':modules:reindex') testImplementation project(path: ':modules:reindex')
testImplementation project(':modules:data-streams') testImplementation project(':modules:data-streams')
testImplementation project(':modules:dlm')
testImplementation project(':modules:rest-root') testImplementation project(':modules:rest-root')
testImplementation project(":client:rest-high-level") testImplementation project(":client:rest-high-level")

View file

@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.security.role;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.xpack.security.SecurityOnTrialLicenseRestTestCase;
import java.io.IOException;
import static org.hamcrest.Matchers.containsString;
public class RoleWithWorkflowsRestrictionRestIT extends SecurityOnTrialLicenseRestTestCase {
public void testCreateRoleWithWorkflowsRestrictionFail() {
Request createRoleRequest = new Request(HttpPut.METHOD_NAME, "/_security/role/role_with_restriction");
createRoleRequest.setJsonEntity("""
{
"cluster": ["all"],
"indices": [
{
"names": ["index-a"],
"privileges": ["all"]
}
],
"restriction":{
"workflows": ["foo", "bar"]
}
}""");
ResponseException e = expectThrows(ResponseException.class, () -> adminClient().performRequest(createRoleRequest));
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
assertThat(e.getMessage(), containsString("failed to parse role [role_with_restriction]. unexpected field [restriction]"));
}
public void testUpdateRoleWithWorkflowsRestrictionFail() throws IOException {
Request createRoleRequest = new Request(HttpPut.METHOD_NAME, "/_security/role/my_role");
createRoleRequest.setJsonEntity("""
{
"cluster": ["all"],
"indices": [
{
"names": ["index-a"],
"privileges": ["all"]
}
]
}""");
Response createRoleResponse = adminClient().performRequest(createRoleRequest);
assertOK(createRoleResponse);
Request updateRoleRequest = new Request(HttpPost.METHOD_NAME, "/_security/role/my_role");
updateRoleRequest.setJsonEntity("""
{
"cluster": ["all"],
"indices": [
{
"names": ["index-*"],
"privileges": ["all"]
}
],
"restriction":{
"workflows": ["foo", "bar"]
}
}""");
ResponseException e = expectThrows(ResponseException.class, () -> adminClient().performRequest(updateRoleRequest));
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
assertThat(e.getMessage(), containsString("failed to parse role [my_role]. unexpected field [restriction]"));
}
}

View file

@ -0,0 +1,279 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.integration;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataLifecycle;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.dlm.DataLifecycleErrorStore;
import org.elasticsearch.dlm.DataLifecyclePlugin;
import org.elasticsearch.dlm.DataLifecycleService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.indices.ExecutorNames;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.SecurityIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.security.LocalStateSecurity;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
import static org.elasticsearch.xpack.security.support.SecuritySystemIndices.SECURITY_MAIN_ALIAS;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
/**
* This test suite ensures that DLM runtime tasks work correctly with security enabled, i.e., that the internal user for DLM has all
* requisite privileges to orchestrate DLM
*/
public class DataLifecycleServiceRuntimeSecurityIT extends SecurityIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(DataLifecyclePlugin.class, LocalStateSecurity.class, DataStreamsPlugin.class, SystemDataStreamTestPlugin.class);
}
@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
settings.put(DataLifecycleService.DLM_POLL_INTERVAL, "1s");
settings.put(DataLifecycle.CLUSTER_DLM_DEFAULT_ROLLOVER_SETTING.getKey(), "min_docs=1,max_docs=1");
return settings.build();
}
public void testRolloverLifecycleAndForceMergeAuthorized() throws Exception {
String dataStreamName = randomDataStreamName();
// empty lifecycle contains the default rollover
prepareDataStreamAndIndex(dataStreamName, new DataLifecycle());
assertBusy(() -> {
assertNoAuthzErrors();
List<Index> backingIndices = getDataStreamBackingIndices(dataStreamName);
assertThat(backingIndices.size(), equalTo(2));
String backingIndex = backingIndices.get(0).getName();
assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1));
String writeIndex = backingIndices.get(1).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});
// Index another doc to force another rollover and trigger an attempted force-merge. The force-merge may be a noop under
// the hood but for authz purposes this doesn't matter, it only matters that the force-merge API was called
indexDoc(dataStreamName);
assertBusy(() -> {
assertNoAuthzErrors();
List<Index> backingIndices = getDataStreamBackingIndices(dataStreamName);
assertThat(backingIndices.size(), equalTo(3));
});
}
public void testRolloverAndRetentionAuthorized() throws Exception {
String dataStreamName = randomDataStreamName();
prepareDataStreamAndIndex(dataStreamName, new DataLifecycle(TimeValue.timeValueMillis(0)));
assertBusy(() -> {
assertNoAuthzErrors();
List<Index> backingIndices = getDataStreamBackingIndices(dataStreamName);
assertThat(backingIndices.size(), equalTo(1));
// we expect the data stream to have only one backing index, the write one, with generation 2
// as generation 1 would've been deleted by DLM given the lifecycle configuration
String writeIndex = backingIndices.get(0).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});
}
public void testUnauthorized() throws Exception {
// this is an example index pattern for a system index that DLM does not have access for. DLM will therefore fail at runtime with an
// authz exception
prepareDataStreamAndIndex(SECURITY_MAIN_ALIAS, new DataLifecycle());
assertBusy(() -> {
Map<String, String> indicesAndErrors = collectErrorsFromStoreAsMap();
assertThat(indicesAndErrors, is(not(anEmptyMap())));
assertThat(
indicesAndErrors.values(),
hasItem(allOf(containsString("security_exception"), containsString("unauthorized for user [_dlm]")))
);
});
}
public void testRolloverAndRetentionWithSystemDataStreamAuthorized() throws Exception {
String dataStreamName = SystemDataStreamTestPlugin.SYSTEM_DATA_STREAM_NAME;
indexDoc(dataStreamName);
assertBusy(() -> {
assertNoAuthzErrors();
List<Index> backingIndices = getDataStreamBackingIndices(dataStreamName);
assertThat(backingIndices.size(), equalTo(1));
// we expect the data stream to have only one backing index, the write one, with generation 2
// as generation 1 would've been deleted by DLM given the lifecycle configuration
String writeIndex = backingIndices.get(0).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});
}
private static String randomDataStreamName() {
// lower-case since this is required for a valid data stream name
return randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT);
}
private Map<String, String> collectErrorsFromStoreAsMap() {
Iterable<DataLifecycleService> lifecycleServices = internalCluster().getInstances(DataLifecycleService.class);
Map<String, String> indicesAndErrors = new HashMap<>();
for (DataLifecycleService lifecycleService : lifecycleServices) {
DataLifecycleErrorStore errorStore = lifecycleService.getErrorStore();
List<String> allIndices = errorStore.getAllIndices();
for (var index : allIndices) {
indicesAndErrors.put(index, errorStore.getError(index));
}
}
return indicesAndErrors;
}
private void prepareDataStreamAndIndex(String dataStreamName, DataLifecycle lifecycle) throws IOException, InterruptedException,
ExecutionException {
putComposableIndexTemplate("id1", null, List.of(dataStreamName + "*"), null, null, lifecycle);
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
indexDoc(dataStreamName);
}
private List<Index> getDataStreamBackingIndices(String dataStreamName) {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
return getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
}
private void assertNoAuthzErrors() {
var indicesAndErrors = collectErrorsFromStoreAsMap();
for (var entry : indicesAndErrors.entrySet()) {
assertThat(
"unexpected authz error for index [" + entry.getKey() + "] with error message [" + entry.getValue() + "]",
entry.getValue(),
not(anyOf(containsString("security_exception"), containsString("unauthorized for user [_dlm]")))
);
}
}
private static void putComposableIndexTemplate(
String id,
@Nullable String mappings,
List<String> patterns,
@Nullable Settings settings,
@Nullable Map<String, Object> metadata,
@Nullable DataLifecycle lifecycle
) throws IOException {
PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(id);
request.indexTemplate(
new ComposableIndexTemplate(
patterns,
new Template(settings, mappings == null ? null : CompressedXContent.fromJSON(mappings), null, lifecycle),
null,
null,
null,
metadata,
new ComposableIndexTemplate.DataStreamTemplate(),
null
)
);
client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
}
private static void indexDoc(String dataStream) {
BulkRequest bulkRequest = new BulkRequest();
String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
bulkRequest.add(
new IndexRequest(dataStream).opType(DocWriteRequest.OpType.CREATE)
.source(String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, value), XContentType.JSON)
);
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat(bulkResponse.getItems().length, equalTo(1));
String backingIndexPrefix = DataStream.BACKING_INDEX_PREFIX + dataStream;
for (BulkItemResponse itemResponse : bulkResponse) {
assertThat(itemResponse.getFailureMessage(), nullValue());
assertThat(itemResponse.status(), equalTo(RestStatus.CREATED));
assertThat(itemResponse.getIndex(), startsWith(backingIndexPrefix));
}
client().admin().indices().refresh(new RefreshRequest(dataStream)).actionGet();
}
public static class SystemDataStreamTestPlugin extends Plugin implements SystemIndexPlugin {
static final String SYSTEM_DATA_STREAM_NAME = ".fleet-actions-results";
@Override
public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
return List.of(
new SystemDataStreamDescriptor(
SYSTEM_DATA_STREAM_NAME,
"a system data stream for testing",
SystemDataStreamDescriptor.Type.EXTERNAL,
new ComposableIndexTemplate(
List.of(SYSTEM_DATA_STREAM_NAME),
new Template(Settings.EMPTY, null, null, new DataLifecycle(0)),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate()
),
Map.of(),
Collections.singletonList("test"),
new ExecutorNames(ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_READ, ThreadPool.Names.SYSTEM_WRITE)
)
);
}
@Override
public String getFeatureName() {
return SystemDataStreamTestPlugin.class.getSimpleName();
}
@Override
public String getFeatureDescription() {
return "A plugin for testing DLM runtime actions on system data streams";
}
}
}

View file

@ -127,6 +127,9 @@ public final class AuthorizationUtils {
case POST_WRITE_REFRESH_ORIGIN: case POST_WRITE_REFRESH_ORIGIN:
securityContext.executeAsInternalUser(InternalUsers.STORAGE_USER, version, consumer); securityContext.executeAsInternalUser(InternalUsers.STORAGE_USER, version, consumer);
break; break;
case DLM_ORIGIN:
securityContext.executeAsInternalUser(InternalUsers.DLM_USER, version, consumer);
break;
case WATCHER_ORIGIN: case WATCHER_ORIGIN:
case ML_ORIGIN: case ML_ORIGIN:
case MONITORING_ORIGIN: case MONITORING_ORIGIN:
@ -135,7 +138,6 @@ public final class AuthorizationUtils {
case PERSISTENT_TASK_ORIGIN: case PERSISTENT_TASK_ORIGIN:
case ROLLUP_ORIGIN: case ROLLUP_ORIGIN:
case INDEX_LIFECYCLE_ORIGIN: case INDEX_LIFECYCLE_ORIGIN:
case DLM_ORIGIN:
case ENRICH_ORIGIN: case ENRICH_ORIGIN:
case IDP_ORIGIN: case IDP_ORIGIN:
case INGEST_ORIGIN: case INGEST_ORIGIN:

View file

@ -309,7 +309,7 @@ public class FileRolesStore implements BiConsumer<Set<String>, ActionListener<Ro
if (token == XContentParser.Token.START_OBJECT) { if (token == XContentParser.Token.START_OBJECT) {
// we pass true as last parameter because we do not want to reject files if field permissions // we pass true as last parameter because we do not want to reject files if field permissions
// are given in 2.x syntax // are given in 2.x syntax
RoleDescriptor descriptor = RoleDescriptor.parse(roleName, parser, true); RoleDescriptor descriptor = RoleDescriptor.parse(roleName, parser, true, false);
return checkDescriptor(descriptor, path, logger, settings, xContentRegistry); return checkDescriptor(descriptor, path, logger, settings, xContentRegistry);
} else { } else {
logger.error("invalid role definition [{}] in roles file [{}]. skipping role...", roleName, path.toAbsolutePath()); logger.error("invalid role definition [{}] in roles file [{}]. skipping role...", roleName, path.toAbsolutePath());

View file

@ -242,6 +242,7 @@ public class NativeRolesStore implements BiConsumer<Set<String>, ActionListener<
void innerPutRole(final PutRoleRequest request, final RoleDescriptor role, final ActionListener<Boolean> listener) { void innerPutRole(final PutRoleRequest request, final RoleDescriptor role, final ActionListener<Boolean> listener) {
final String roleName = role.getName(); final String roleName = role.getName();
assert NativeRealmValidationUtil.validateRoleName(roleName, false) == null : "Role name was invalid or reserved: " + roleName; assert NativeRealmValidationUtil.validateRoleName(roleName, false) == null : "Role name was invalid or reserved: " + roleName;
assert false == role.hasRestriction() : "restriction is not supported for native roles";
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> { securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
final XContentBuilder xContentBuilder; final XContentBuilder xContentBuilder;
@ -454,9 +455,9 @@ public class NativeRolesStore implements BiConsumer<Set<String>, ActionListener<
assert id.startsWith(ROLE_TYPE) : "[" + id + "] does not have role prefix"; assert id.startsWith(ROLE_TYPE) : "[" + id + "] does not have role prefix";
final String name = id.substring(ROLE_TYPE.length() + 1); final String name = id.substring(ROLE_TYPE.length() + 1);
try { try {
// we pass true as last parameter because we do not want to reject permissions if the field permissions // we pass true as allow2xFormat parameter because we do not want to reject permissions if the field permissions
// are given in 2.x syntax // are given in 2.x syntax
RoleDescriptor roleDescriptor = RoleDescriptor.parse(name, sourceBytes, true, XContentType.JSON); RoleDescriptor roleDescriptor = RoleDescriptor.parse(name, sourceBytes, true, XContentType.JSON, false);
final boolean dlsEnabled = Arrays.stream(roleDescriptor.getIndicesPrivileges()) final boolean dlsEnabled = Arrays.stream(roleDescriptor.getIndicesPrivileges())
.anyMatch(IndicesPrivileges::isUsingDocumentLevelSecurity); .anyMatch(IndicesPrivileges::isUsingDocumentLevelSecurity);
final boolean flsEnabled = Arrays.stream(roleDescriptor.getIndicesPrivileges()) final boolean flsEnabled = Arrays.stream(roleDescriptor.getIndicesPrivileges())
@ -488,7 +489,7 @@ public class NativeRolesStore implements BiConsumer<Set<String>, ActionListener<
return roleDescriptor; return roleDescriptor;
} }
} catch (Exception e) { } catch (Exception e) {
logger.error(() -> "error in the format of data for role [" + name + "]", e); logger.error("error in the format of data for role [" + name + "]", e);
return null; return null;
} }
} }

View file

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.security.authz;
import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.DataLifecycle;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.persistent.PersistentTasksService;
@ -118,6 +119,10 @@ public class AuthorizationUtilsTests extends ESTestCase {
); );
} }
public void testSwitchWithDlmOrigin() throws Exception {
assertSwitchBasedOnOriginAndExecute(DataLifecycle.DLM_ORIGIN, InternalUsers.DLM_USER, randomTransportVersion());
}
public void testSwitchAndExecuteXpackUser() throws Exception { public void testSwitchAndExecuteXpackUser() throws Exception {
for (String origin : Arrays.asList( for (String origin : Arrays.asList(
ClientHelper.ML_ORIGIN, ClientHelper.ML_ORIGIN,

View file

@ -617,7 +617,7 @@ public class FileRolesStoreTests extends ESTestCase {
assertThat(role, notNullValue()); assertThat(role, notNullValue());
assertThat(role.names(), equalTo(new String[] { "valid_role" })); assertThat(role.names(), equalTo(new String[] { "valid_role" }));
assertThat(entries, hasSize(6)); assertThat(entries, hasSize(7));
assertThat( assertThat(
entries.get(0), entries.get(0),
startsWith("invalid role definition [fóóbár] in roles file [" + path.toAbsolutePath() + "]. invalid role name") startsWith("invalid role definition [fóóbár] in roles file [" + path.toAbsolutePath() + "]. invalid role name")
@ -627,6 +627,7 @@ public class FileRolesStoreTests extends ESTestCase {
assertThat(entries.get(3), startsWith("failed to parse role [role3]")); assertThat(entries.get(3), startsWith("failed to parse role [role3]"));
assertThat(entries.get(4), startsWith("failed to parse role [role4]")); assertThat(entries.get(4), startsWith("failed to parse role [role4]"));
assertThat(entries.get(5), startsWith("failed to parse indices privileges for role [role5]")); assertThat(entries.get(5), startsWith("failed to parse indices privileges for role [role5]"));
assertThat(entries.get(6), startsWith("failed to parse role [role6]. unexpected field [restriction]"));
} }
public void testThatRoleNamesDoesNotResolvePermissions() throws Exception { public void testThatRoleNamesDoesNotResolvePermissions() throws Exception {
@ -635,8 +636,8 @@ public class FileRolesStoreTests extends ESTestCase {
List<String> events = CapturingLogger.output(logger.getName(), Level.ERROR); List<String> events = CapturingLogger.output(logger.getName(), Level.ERROR);
events.clear(); events.clear();
Set<String> roleNames = FileRolesStore.parseFileForRoleNames(path, logger); Set<String> roleNames = FileRolesStore.parseFileForRoleNames(path, logger);
assertThat(roleNames.size(), is(6)); assertThat(roleNames.size(), is(7));
assertThat(roleNames, containsInAnyOrder("valid_role", "role1", "role2", "role3", "role4", "role5")); assertThat(roleNames, containsInAnyOrder("valid_role", "role1", "role2", "role3", "role4", "role5", "role6"));
assertThat(events, hasSize(1)); assertThat(events, hasSize(1));
assertThat( assertThat(

View file

@ -6,6 +6,8 @@
*/ */
package org.elasticsearch.xpack.security.authz.store; package org.elasticsearch.xpack.security.authz.store;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version; import org.elasticsearch.Version;
@ -47,12 +49,14 @@ import org.elasticsearch.xpack.core.security.action.role.PutRoleRequest;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor; import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor.IndicesPrivileges; import org.elasticsearch.xpack.core.security.authz.RoleDescriptor.IndicesPrivileges;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptorTests; import org.elasticsearch.xpack.core.security.authz.RoleDescriptorTests;
import org.elasticsearch.xpack.core.security.authz.RoleRestrictionTests;
import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver; import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver;
import org.elasticsearch.xpack.security.support.SecurityIndexManager; import org.elasticsearch.xpack.security.support.SecurityIndexManager;
import org.elasticsearch.xpack.security.support.SecuritySystemIndices; import org.elasticsearch.xpack.security.support.SecuritySystemIndices;
import org.elasticsearch.xpack.security.test.SecurityTestUtils; import org.elasticsearch.xpack.security.test.SecurityTestUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.io.IOException; import java.io.IOException;
@ -70,7 +74,9 @@ import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class NativeRolesStoreTests extends ESTestCase { public class NativeRolesStoreTests extends ESTestCase {
@ -239,6 +245,61 @@ public class NativeRolesStoreTests extends ESTestCase {
assertThat(role, equalTo(noFlsDlsRole)); assertThat(role, equalTo(noFlsDlsRole));
} }
public void testTransformingRoleWithRestrictionFails() throws IOException {
MockLicenseState licenseState = mock(MockLicenseState.class);
when(licenseState.isAllowed(DOCUMENT_LEVEL_SECURITY_FEATURE)).thenReturn(false);
RoleDescriptor roleWithRestriction = new RoleDescriptor(
"role_with_restriction",
randomSubsetOf(ClusterPrivilegeResolver.names()).toArray(String[]::new),
new IndicesPrivileges[] {
IndicesPrivileges.builder()
.privileges("READ")
.indices(generateRandomStringArray(5, randomIntBetween(3, 9), false, false))
.grantedFields("*")
.deniedFields(generateRandomStringArray(5, randomIntBetween(3, 9), false, false))
.query(
randomBoolean()
? "{ \"term\": { \""
+ randomAlphaOfLengthBetween(3, 24)
+ "\" : \""
+ randomAlphaOfLengthBetween(3, 24)
+ "\" }"
: "{ \"match_all\": {} }"
)
.build() },
RoleDescriptorTests.randomApplicationPrivileges(),
RoleDescriptorTests.randomClusterPrivileges(),
generateRandomStringArray(5, randomIntBetween(2, 8), true, true),
RoleDescriptorTests.randomRoleDescriptorMetadata(ESTestCase.randomBoolean()),
null,
TcpTransport.isUntrustedRemoteClusterEnabled() ? RoleDescriptorTests.randomRemoteIndicesPrivileges(1, 2) : null,
RoleRestrictionTests.randomWorkflowsRestriction(1, 2)
);
XContentBuilder builder = roleWithRestriction.toXContent(
XContentBuilder.builder(XContentType.JSON.xContent()),
ToXContent.EMPTY_PARAMS
);
Logger mockedLogger = Mockito.mock(Logger.class);
BytesReference bytes = BytesReference.bytes(builder);
RoleDescriptor transformedRole = NativeRolesStore.transformRole(
RoleDescriptor.ROLE_TYPE + "-role_with_restriction",
bytes,
mockedLogger,
licenseState
);
assertThat(transformedRole, nullValue());
ArgumentCaptor<ElasticsearchParseException> exceptionCaptor = ArgumentCaptor.forClass(ElasticsearchParseException.class);
ArgumentCaptor<String> messageCaptor = ArgumentCaptor.forClass(String.class);
verify(mockedLogger).error(messageCaptor.capture(), exceptionCaptor.capture());
assertThat(messageCaptor.getValue(), containsString("error in the format of data for role [role_with_restriction]"));
assertThat(
exceptionCaptor.getValue().getMessage(),
containsString("failed to parse role [role_with_restriction]. unexpected field [restriction]")
);
}
public void testPutOfRoleWithFlsDlsUnlicensed() throws IOException { public void testPutOfRoleWithFlsDlsUnlicensed() throws IOException {
final Client client = mock(Client.class); final Client client = mock(Client.class);
final ClusterService clusterService = mockClusterServiceWithMinNodeVersion(TransportVersion.CURRENT); final ClusterService clusterService = mockClusterServiceWithMinNodeVersion(TransportVersion.CURRENT);

View file

@ -79,6 +79,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
import static org.elasticsearch.cluster.metadata.DataLifecycle.DLM_ORIGIN;
import static org.elasticsearch.test.ActionListenerUtils.anyActionListener; import static org.elasticsearch.test.ActionListenerUtils.anyActionListener;
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
@ -429,7 +430,9 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
TRANSFORM_ORIGIN, TRANSFORM_ORIGIN,
InternalUsers.XPACK_USER, InternalUsers.XPACK_USER,
ASYNC_SEARCH_ORIGIN, ASYNC_SEARCH_ORIGIN,
InternalUsers.ASYNC_SEARCH_USER InternalUsers.ASYNC_SEARCH_USER,
DLM_ORIGIN,
InternalUsers.DLM_USER
); );
final String origin = randomFrom(originToUserMap.keySet()); final String origin = randomFrom(originToUserMap.keySet());

View file

@ -45,3 +45,16 @@ role5:
- names: - names:
- 'idx1' - 'idx1'
privileges: [] privileges: []
# role includes unsupported workflows restriction
role6:
cluster:
- ALL
indices:
- names: idx
privileges:
- ALL
restriction:
workflows:
- workflow1
- workflow2

View file

@ -16,4 +16,4 @@ setup:
# I would much prefer we could just check that specific entries are in the array, but we don't have # I would much prefer we could just check that specific entries are in the array, but we don't have
# an assertion for that # an assertion for that
- length: { "cluster" : 48 } - length: { "cluster" : 48 }
- length: { "index" : 21 } - length: { "index" : 22 }

View file

@ -348,7 +348,7 @@ public class ApiKeyBackwardsCompatibilityIT extends AbstractUpgradeTestCase {
} }
private static RoleDescriptor randomRoleDescriptor(boolean includeRemoteIndices) { private static RoleDescriptor randomRoleDescriptor(boolean includeRemoteIndices) {
final Set<String> excludedPrivileges = Set.of("cross_cluster_replication", "cross_cluster_replication_internal"); final Set<String> excludedPrivileges = Set.of("cross_cluster_replication", "cross_cluster_replication_internal", "manage_dlm");
return new RoleDescriptor( return new RoleDescriptor(
randomAlphaOfLengthBetween(3, 90), randomAlphaOfLengthBetween(3, 90),
randomSubsetOf(Set.of("all", "monitor", "none")).toArray(String[]::new), randomSubsetOf(Set.of("all", "monitor", "none")).toArray(String[]::new),