[ML] [Transforms] prefer secondary auth headers for transforms (#86757)

When creating and updating transforms, it is possible for clients to provide secondary headers.

When PUT, _preview, _update is called with secondary authorization headers, those are then used or stored with the transform.

closes: https://github.com/elastic/elasticsearch/issues/86731
This commit is contained in:
Benjamin Trent 2022-05-16 10:13:21 -04:00 committed by GitHub
parent 132633e998
commit b90b3450a2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 379 additions and 194 deletions

View file

@ -0,0 +1,5 @@
pr: 86757
summary: "Prefer secondary auth headers for transforms"
area: Transform
type: enhancement
issues: []

View file

@ -27,10 +27,15 @@ Previews a {transform}.
Requires the following privileges:
* cluster: `manage_transform` (the `transform_admin` built-in role grants this
* cluster: `manage_transform` (the `transform_admin` built-in role grants this
privilege)
* source indices: `read`, `view_index_metadata`.
+
--
NOTE: If you provide
<<http-clients-secondary-authorization,secondary authorization headers>>, those
credentials are used.
--
[[preview-transform-desc]]
== {api-description-title}

View file

@ -24,6 +24,12 @@ Requires the following privileges:
* source indices: `read`, `view_index_metadata`
* destination index: `read`, `create_index`, `index`. If a `retention_policy` is configured, the `delete` privilege is
also required.
+
--
NOTE: If you provide
<<http-clients-secondary-authorization,secondary authorization headers>>, those
credentials are used.
--
[[put-transform-desc]]
== {api-description-title}

View file

@ -44,7 +44,9 @@ each checkpoint.
* When {es} {security-features} are enabled, your {transform} remembers which
roles the user who updated it had at the time of update and runs with those
privileges.
privileges. If you provide
<<http-clients-secondary-authorization,secondary authorization headers>>, those
credentials are used instead.
* You must use {kib} or this API to update a {transform}. Do not update a
{transform} directly via `.transform-internal*` indices using the {es} index API.
If {es} {security-features} are enabled, do not give users any privileges on

View file

@ -42,12 +42,14 @@ import static org.hamcrest.Matchers.nullValue;
public class TransformPivotRestIT extends TransformRestTestCase {
private static final String TEST_USER_NAME_NO_ACCESS = "no_authorization";
private static final String TEST_USER_NAME = "transform_admin_plus_data";
private static final String DATA_ACCESS_ROLE = "test_data_access";
private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS = basicAuthHeaderValue(
TEST_USER_NAME,
TEST_PASSWORD_SECURE_STRING
);
private static final String BASIC_AUTH_VALUE_NO_ACCESS = basicAuthHeaderValue(TEST_USER_NAME_NO_ACCESS, TEST_PASSWORD_SECURE_STRING);
private static boolean indicesCreated = false;
@ -96,6 +98,34 @@ public class TransformPivotRestIT extends TransformRestTestCase {
assertOneCount(transformIndex + "/_search?q=reviewer:user_26", "hits.hits._source.affiliate_missing", 0);
}
public void testSimplePivotWithSecondaryHeaders() throws Exception {
setupUser(TEST_USER_NAME_NO_ACCESS, List.of("transform_admin"));
String transformId = "simple-pivot";
String transformIndex = "pivot_reviews";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
createPivotReviewsTransform(
transformId,
transformIndex,
null,
null,
BASIC_AUTH_VALUE_NO_ACCESS,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS,
REVIEWS_INDEX_NAME
);
startAndWaitForTransform(
transformId,
transformIndex,
BASIC_AUTH_VALUE_NO_ACCESS,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS,
new String[0]
);
// we expect 27 documents as there shall be 27 user_id's
// Just need to validate that things ran with secondary headers
Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
}
public void testSimpleDataStreamPivot() throws Exception {
String indexName = "reviews_data_stream";
createReviewsIndex(indexName, 1000, 27, "date", true, -1, null);

View file

@ -43,6 +43,7 @@ import static org.hamcrest.Matchers.is;
public abstract class TransformRestTestCase extends ESRestTestCase {
protected static final String TEST_PASSWORD = "x-pack-test-password";
private static final String SECONDARY_AUTH_KEY = "es-secondary-authorization";
protected static final SecureString TEST_PASSWORD_SECURE_STRING = new SecureString(TEST_PASSWORD.toCharArray());
private static final String BASIC_AUTH_VALUE_SUPER_USER = basicAuthHeaderValue("x_pack_rest_user", TEST_PASSWORD_SECURE_STRING);
@ -267,7 +268,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
}
}""".formatted(transformIndex, REVIEWS_INDEX_NAME);
createReviewsTransform(transformId, authHeader, config);
createReviewsTransform(transformId, authHeader, null, config);
}
protected void createPivotReviewsTransform(
@ -277,6 +278,18 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
String pipeline,
String authHeader,
String sourceIndex
) throws IOException {
createPivotReviewsTransform(transformId, transformIndex, query, pipeline, authHeader, null, sourceIndex);
}
protected void createPivotReviewsTransform(
String transformId,
String transformIndex,
String query,
String pipeline,
String authHeader,
String secondaryAuthHeader,
String sourceIndex
) throws IOException {
String config = "{";
@ -326,7 +339,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
"frequency": "1s"
}""";
createReviewsTransform(transformId, authHeader, config);
createReviewsTransform(transformId, authHeader, secondaryAuthHeader, config);
}
protected void createLatestReviewsTransform(String transformId, String transformIndex) throws IOException {
@ -347,11 +360,17 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
"frequency": "1s"
}""".formatted(transformIndex, REVIEWS_INDEX_NAME);
createReviewsTransform(transformId, null, config);
createReviewsTransform(transformId, null, null, config);
}
private void createReviewsTransform(String transformId, String authHeader, String config) throws IOException {
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader);
private void createReviewsTransform(String transformId, String authHeader, String secondaryAuthHeader, String config)
throws IOException {
final Request createTransformRequest = createRequestWithSecondaryAuth(
"PUT",
getTransformEndpoint() + transformId,
authHeader,
secondaryAuthHeader
);
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
@ -360,7 +379,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
protected void createPivotReviewsTransform(String transformId, String transformIndex, String query, String pipeline, String authHeader)
throws IOException {
createPivotReviewsTransform(transformId, transformIndex, query, pipeline, authHeader, REVIEWS_INDEX_NAME);
createPivotReviewsTransform(transformId, transformIndex, query, pipeline, authHeader, null, REVIEWS_INDEX_NAME);
}
protected void startTransform(String transformId) throws IOException {
@ -369,7 +388,18 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
protected void startTransform(String transformId, String authHeader, String... warnings) throws IOException {
// start the transform
final Request startTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_start", authHeader);
startTransform(transformId, authHeader, null, warnings);
}
protected void startTransform(String transformId, String authHeader, String secondaryAuthHeader, String... warnings)
throws IOException {
// start the transform
final Request startTransformRequest = createRequestWithSecondaryAuth(
"POST",
getTransformEndpoint() + transformId + "/_start",
authHeader,
secondaryAuthHeader
);
if (warnings.length > 0) {
startTransformRequest.setOptions(expectWarnings(warnings));
}
@ -404,8 +434,18 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
protected void startAndWaitForTransform(String transformId, String transformIndex, String authHeader, String... warnings)
throws Exception {
startAndWaitForTransform(transformId, transformIndex, authHeader, null, warnings);
}
protected void startAndWaitForTransform(
String transformId,
String transformIndex,
String authHeader,
String secondaryAuthHeader,
String... warnings
) throws Exception {
// start the transform
startTransform(transformId, authHeader, warnings);
startTransform(transformId, authHeader, secondaryAuthHeader, warnings);
assertTrue(indexExists(transformIndex));
// wait until the transform has been created and all data is available
waitForTransformCheckpoint(transformId);
@ -435,18 +475,29 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
assertThat(resetTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}
protected Request createRequestWithAuth(final String method, final String endpoint, final String authHeader) {
protected Request createRequestWithSecondaryAuth(
final String method,
final String endpoint,
final String authHeader,
final String secondaryAuthHeader
) {
final Request request = new Request(method, endpoint);
RequestOptions.Builder options = request.getOptions().toBuilder();
if (authHeader != null) {
RequestOptions.Builder options = request.getOptions().toBuilder();
options.addHeader("Authorization", authHeader);
request.setOptions(options);
}
if (secondaryAuthHeader != null) {
options.addHeader(SECONDARY_AUTH_KEY, secondaryAuthHeader);
}
request.setOptions(options);
return request;
}
protected Request createRequestWithAuth(final String method, final String endpoint, final String authHeader) {
return createRequestWithSecondaryAuth(method, endpoint, authHeader, null);
}
void waitForTransformStopped(String transformId) throws Exception {
assertBusy(() -> { assertEquals("stopped", getTransformState(transformId)); }, 15, TimeUnit.SECONDS);
}

View file

@ -38,15 +38,18 @@ public class TransformUpdateIT extends TransformRestTestCase {
TEST_ADMIN_USER_NAME_2,
TEST_PASSWORD_SECURE_STRING
);
private static final String TEST_ADMIN_USER_NAME_NO_DATA = "transform_admin_no_data";
private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN_NO_DATA = basicAuthHeaderValue(
TEST_ADMIN_USER_NAME_NO_DATA,
TEST_PASSWORD_SECURE_STRING
);
private static final String DATA_ACCESS_ROLE = "test_data_access";
private static final String DATA_ACCESS_ROLE_2 = "test_data_access_2";
private static boolean indicesCreated = false;
// preserve indices in order to reuse source indices in several test cases
@Override
protected boolean preserveIndicesUponCompletion() {
return true;
return false;
}
@Override
@ -70,14 +73,8 @@ public class TransformUpdateIT extends TransformRestTestCase {
setupUser(TEST_USER_NAME, Arrays.asList("transform_user", DATA_ACCESS_ROLE));
setupUser(TEST_ADMIN_USER_NAME_1, Arrays.asList("transform_admin", DATA_ACCESS_ROLE));
setupUser(TEST_ADMIN_USER_NAME_2, Arrays.asList("transform_admin", DATA_ACCESS_ROLE_2));
// it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack
if (indicesCreated) {
return;
}
setupUser(TEST_ADMIN_USER_NAME_NO_DATA, List.of("transform_admin"));
createReviewsIndex();
indicesCreated = true;
}
@SuppressWarnings("unchecked")
@ -149,8 +146,15 @@ public class TransformUpdateIT extends TransformRestTestCase {
assertThat(XContentMapValues.extractValue("settings.max_page_search_size", transform), equalTo(555));
}
@SuppressWarnings("unchecked")
public void testUpdateTransferRights() throws Exception {
updateTransferRightsTester(false);
}
public void testUpdateTransferRightsSecondaryAuthHeaders() throws Exception {
updateTransferRightsTester(true);
}
private void updateTransferRightsTester(boolean useSecondaryAuthHeaders) throws Exception {
String transformId = "transform1";
// Note: Due to a bug the transform does not fail to start after deleting the user and role, therefore invalidating
// the credentials stored with the config. As a workaround we use a 2nd transform that uses the same config
@ -160,17 +164,23 @@ public class TransformUpdateIT extends TransformRestTestCase {
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest);
setupDataAccessRole(DATA_ACCESS_ROLE_2, REVIEWS_INDEX_NAME, transformDest);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2
);
final Request createTransformRequest = useSecondaryAuthHeaders
? createRequestWithSecondaryAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_NO_DATA,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2
)
: createRequestWithAuth("PUT", getTransformEndpoint() + transformId, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2);
final Request createTransformRequest_2 = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformIdCloned,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2
);
final Request createTransformRequest_2 = useSecondaryAuthHeaders
? createRequestWithSecondaryAuth(
"PUT",
getTransformEndpoint() + transformIdCloned,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_NO_DATA,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2
)
: createRequestWithAuth("PUT", getTransformEndpoint() + transformIdCloned, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2);
String config = """
{
@ -229,24 +239,37 @@ public class TransformUpdateIT extends TransformRestTestCase {
assertEquals(1, XContentMapValues.extractValue("count", transforms));
// start using admin 1, but as the header is still admin 2
// This fails as the stored header is still admin 2
try {
startAndWaitForTransform(transformId, transformDest, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1);
if (useSecondaryAuthHeaders) {
startAndWaitForTransform(
transformId,
transformDest,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_NO_DATA,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1,
new String[0]
);
} else {
startAndWaitForTransform(transformId, transformDest, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1);
}
fail("request should have failed");
} catch (ResponseException e) {
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(500));
}
assertBusy(() -> {
Map<?, ?> transformStatsAsMap = getTransformStateAndStats(transformId);
assertThat(XContentMapValues.extractValue("stats.documents_indexed", transformStatsAsMap), equalTo(0));
}, 3, TimeUnit.SECONDS);
// update the transform with an empty body, the credentials (headers) should change
final Request updateRequest = createRequestWithAuth(
"POST",
getTransformEndpoint() + transformIdCloned + "/_update",
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1
);
final Request updateRequest = useSecondaryAuthHeaders
? createRequestWithSecondaryAuth(
"POST",
getTransformEndpoint() + transformIdCloned + "/_update",
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_NO_DATA,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1
)
: createRequestWithAuth("POST", getTransformEndpoint() + transformIdCloned + "/_update", BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1);
updateRequest.setJsonEntity("{}");
assertOK(client().performRequest(updateRequest));
@ -256,8 +279,17 @@ public class TransformUpdateIT extends TransformRestTestCase {
assertEquals(1, XContentMapValues.extractValue("count", transforms));
// start with updated configuration should succeed
startAndWaitForTransform(transformIdCloned, transformDest, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1);
if (useSecondaryAuthHeaders) {
startAndWaitForTransform(
transformIdCloned,
transformDest,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_NO_DATA,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1,
new String[0]
);
} else {
startAndWaitForTransform(transformIdCloned, transformDest, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1);
}
assertBusy(() -> {
Map<?, ?> transformStatsAsMap = getTransformStateAndStats(transformIdCloned);
assertThat(XContentMapValues.extractValue("stats.documents_indexed", transformStatsAsMap), equalTo(27));

View file

@ -24,9 +24,11 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;
/**
* {@link TransformPrivilegeChecker} is responsible for checking whether the user has the right privileges in order to work with transform.
@ -43,21 +45,23 @@ final class TransformPrivilegeChecker {
boolean checkDestIndexPrivileges,
ActionListener<Void> listener
) {
final String username = securityContext.getUser().principal();
useSecondaryAuthIfAvailable(securityContext, () -> {
final String username = securityContext.getUser().principal();
ActionListener<HasPrivilegesResponse> hasPrivilegesResponseListener = ActionListener.wrap(
response -> handlePrivilegesResponse(operationName, username, config.getId(), response, listener),
listener::onFailure
);
ActionListener<HasPrivilegesResponse> hasPrivilegesResponseListener = ActionListener.wrap(
response -> handlePrivilegesResponse(operationName, username, config.getId(), response, listener),
listener::onFailure
);
HasPrivilegesRequest hasPrivilegesRequest = buildPrivilegesRequest(
config,
indexNameExpressionResolver,
clusterState,
username,
checkDestIndexPrivileges
);
client.execute(HasPrivilegesAction.INSTANCE, hasPrivilegesRequest, hasPrivilegesResponseListener);
HasPrivilegesRequest hasPrivilegesRequest = buildPrivilegesRequest(
config,
indexNameExpressionResolver,
clusterState,
username,
checkDestIndexPrivileges
);
client.execute(HasPrivilegesAction.INSTANCE, hasPrivilegesRequest, hasPrivilegesResponseListener);
});
}
private static HasPrivilegesRequest buildPrivilegesRequest(
@ -128,7 +132,7 @@ final class TransformPrivilegeChecker {
.entrySet()
.stream()
.filter(e -> Boolean.TRUE.equals(e.getValue()) == false)
.map(e -> e.getKey())
.map(Map.Entry::getKey)
.collect(joining(", ", indexPrivileges.getResource() + ":[", "]"))
)
.collect(toList());

View file

@ -29,7 +29,6 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.license.License;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -64,11 +63,11 @@ import java.util.stream.Collectors;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.transform.action.PreviewTransformAction.DUMMY_DEST_INDEX_FOR_PREVIEW;
import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;
public class TransportPreviewTransformAction extends HandledTransportAction<Request, Response> {
private static final int NUMBER_OF_PREVIEW_BUCKETS = 100;
private final XPackLicenseState licenseState;
private final SecurityContext securityContext;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final Client client;
@ -80,7 +79,6 @@ public class TransportPreviewTransformAction extends HandledTransportAction<Requ
@Inject
public TransportPreviewTransformAction(
XPackLicenseState licenseState,
TransportService transportService,
ActionFilters actionFilters,
Client client,
@ -91,7 +89,6 @@ public class TransportPreviewTransformAction extends HandledTransportAction<Requ
IngestService ingestService
) {
super(PreviewTransformAction.NAME, transportService, actionFilters, Request::new);
this.licenseState = licenseState;
this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings)
? new SecurityContext(settings, threadPool.getThreadContext())
: null;
@ -140,35 +137,40 @@ public class TransportPreviewTransformAction extends HandledTransportAction<Requ
final Function function = FunctionFactory.create(config);
// <4> Validate transform query
ActionListener<Boolean> validateConfigListener = ActionListener.wrap(validateConfigResponse -> {
getPreview(
config.getId(), // note: @link{PreviewTransformAction} sets an id, so this is never null
function,
config.getSource(),
config.getDestination().getPipeline(),
config.getDestination().getIndex(),
config.getSyncConfig(),
listener
);
}, listener::onFailure);
ActionListener<Boolean> validateConfigListener = ActionListener.wrap(
validateConfigResponse -> useSecondaryAuthIfAvailable(
securityContext,
() -> getPreview(
config.getId(), // note: @link{PreviewTransformAction} sets an id, so this is never null
function,
config.getSource(),
config.getDestination().getPipeline(),
config.getDestination().getIndex(),
config.getSyncConfig(),
listener
)
),
listener::onFailure
);
// <3> Validate transform function config
ActionListener<Boolean> validateSourceDestListener = ActionListener.wrap(
validateSourceDestResponse -> { function.validateConfig(validateConfigListener); },
validateSourceDestResponse -> function.validateConfig(validateConfigListener),
listener::onFailure
);
// <2> Validate source and destination indices
ActionListener<Void> checkPrivilegesListener = ActionListener.wrap(aVoid -> {
sourceDestValidator.validate(
ActionListener<Void> checkPrivilegesListener = ActionListener.wrap(
aVoid -> sourceDestValidator.validate(
clusterState,
config.getSource().getIndex(),
config.getDestination().getIndex(),
config.getDestination().getPipeline(),
SourceDestValidations.getValidationsForPreview(config.getAdditionalSourceDestValidations()),
validateSourceDestListener
);
}, listener::onFailure);
),
listener::onFailure
);
// <1> Early check to verify that the user can create the destination index and can read from the source
if (XPackSettings.SECURITY_ENABLED.get(nodeSettings)) {
@ -228,7 +230,7 @@ public class TransportPreviewTransformAction extends HandledTransportAction<Requ
);
List<String> warnings = TransformConfigLinter.getWarnings(function, source, syncConfig);
warnings.forEach(warning -> HeaderWarning.addWarning(warning));
warnings.forEach(HeaderWarning::addWarning);
listener.onResponse(new Response(docs, generatedDestIndexSettings));
}, listener::onFailure);
@ -240,7 +242,7 @@ public class TransportPreviewTransformAction extends HandledTransportAction<Requ
Clock.systemUTC()
);
List<String> warnings = TransformConfigLinter.getWarnings(function, source, syncConfig);
warnings.forEach(warning -> HeaderWarning.addWarning(warning));
warnings.forEach(HeaderWarning::addWarning);
listener.onResponse(new Response(docs, generatedDestIndexSettings));
} else {
List<Map<String, Object>> results = docs.stream().map(doc -> {

View file

@ -24,7 +24,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@ -48,6 +47,8 @@ import java.time.Instant;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;
public class TransportPutTransformAction extends AcknowledgedTransportMasterNodeAction<Request> {
private static final Logger logger = LogManager.getLogger(TransportPutTransformAction.class);
@ -67,8 +68,7 @@ public class TransportPutTransformAction extends AcknowledgedTransportMasterNode
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService,
TransformServices transformServices,
Client client,
IngestService ingestService
Client client
) {
super(
PutTransformAction.NAME,
@ -92,54 +92,61 @@ public class TransportPutTransformAction extends AcknowledgedTransportMasterNode
@Override
protected void masterOperation(Task task, Request request, ClusterState clusterState, ActionListener<AcknowledgedResponse> listener) {
XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);
// set headers to run transform as calling user
Map<String, String> filteredHeaders = ClientHelper.getPersistableSafeSecurityHeaders(
threadPool.getThreadContext(),
clusterService.state()
);
TransformConfig config = request.getConfig().setHeaders(filteredHeaders).setCreateTime(Instant.now()).setVersion(Version.CURRENT);
String transformId = config.getId();
// quick check whether a transform has already been created under that name
if (PersistentTasksCustomMetadata.getTaskWithId(clusterState, transformId) != null) {
listener.onFailure(
new ResourceAlreadyExistsException(TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_EXISTS, transformId))
useSecondaryAuthIfAvailable(securityContext, () -> {
// set headers to run transform as calling user
Map<String, String> filteredHeaders = ClientHelper.getPersistableSafeSecurityHeaders(
threadPool.getThreadContext(),
clusterService.state()
);
return;
}
// <3> Create the transform
ActionListener<ValidateTransformAction.Response> validateTransformListener = ActionListener.wrap(
validationResponse -> { putTransform(request, listener); },
listener::onFailure
);
TransformConfig config = request.getConfig()
.setHeaders(filteredHeaders)
.setCreateTime(Instant.now())
.setVersion(Version.CURRENT);
// <2> Validate source and destination indices
ActionListener<Void> checkPrivilegesListener = ActionListener.wrap(aVoid -> {
client.execute(
ValidateTransformAction.INSTANCE,
new ValidateTransformAction.Request(config, request.isDeferValidation(), request.timeout()),
validateTransformListener
String transformId = config.getId();
// quick check whether a transform has already been created under that name
if (PersistentTasksCustomMetadata.getTaskWithId(clusterState, transformId) != null) {
listener.onFailure(
new ResourceAlreadyExistsException(
TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_EXISTS, transformId)
)
);
return;
}
// <3> Create the transform
ActionListener<ValidateTransformAction.Response> validateTransformListener = ActionListener.wrap(
validationResponse -> putTransform(request, listener),
listener::onFailure
);
}, listener::onFailure);
// <1> Early check to verify that the user can create the destination index and can read from the source
if (XPackSettings.SECURITY_ENABLED.get(settings) && request.isDeferValidation() == false) {
TransformPrivilegeChecker.checkPrivileges(
"create",
securityContext,
indexNameExpressionResolver,
clusterState,
client,
config,
true,
checkPrivilegesListener
// <2> Validate source and destination indices
ActionListener<Void> checkPrivilegesListener = ActionListener.wrap(
aVoid -> client.execute(
ValidateTransformAction.INSTANCE,
new ValidateTransformAction.Request(config, request.isDeferValidation(), request.timeout()),
validateTransformListener
),
listener::onFailure
);
} else { // No security enabled, just move on
checkPrivilegesListener.onResponse(null);
}
// <1> Early check to verify that the user can create the destination index and can read from the source
if (XPackSettings.SECURITY_ENABLED.get(settings) && request.isDeferValidation() == false) {
TransformPrivilegeChecker.checkPrivileges(
"create",
securityContext,
indexNameExpressionResolver,
clusterState,
client,
config,
true,
checkPrivilegesListener
);
} else { // No security enabled, just move on
checkPrivilegesListener.onResponse(null);
}
});
}
@Override

View file

@ -25,7 +25,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@ -51,6 +50,8 @@ import org.elasticsearch.xpack.transform.transforms.TransformTask;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;
public class TransportUpdateTransformAction extends TransportTasksAction<TransformTask, Request, Response, Response> {
private static final Logger logger = LogManager.getLogger(TransportUpdateTransformAction.class);
@ -71,8 +72,7 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService,
TransformServices transformServices,
Client client,
IngestService ingestService
Client client
) {
super(
UpdateTransformAction.NAME,
@ -117,66 +117,71 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
}
return;
}
// set headers to run transform as calling user
Map<String, String> filteredHeaders = ClientHelper.getPersistableSafeSecurityHeaders(
threadPool.getThreadContext(),
clusterService.state()
);
TransformConfigUpdate update = request.getUpdate();
update.setHeaders(filteredHeaders);
// GET transform and attempt to update
// We don't want the update to complete if the config changed between GET and INDEX
transformConfigManager.getTransformConfigurationForUpdate(request.getId(), ActionListener.wrap(configAndVersion -> {
TransformUpdater.updateTransform(
securityContext,
indexNameExpressionResolver,
clusterState,
settings,
client,
transformConfigManager,
configAndVersion.v1(),
update,
configAndVersion.v2(),
request.isDeferValidation(),
false, // dryRun
true, // checkAccess
request.getTimeout(),
ActionListener.wrap(updateResponse -> {
TransformConfig updatedConfig = updateResponse.getConfig();
auditor.info(updatedConfig.getId(), "Updated transform.");
logger.debug("[{}] Updated transform [{}]", updatedConfig.getId(), updateResponse.getStatus());
checkTransformConfigAndLogWarnings(updatedConfig);
if (update.changesSettings(configAndVersion.v1())) {
PersistentTasksCustomMetadata.PersistentTask<?> transformTask = TransformTask.getTransformTask(
request.getId(),
clusterState
);
// to send a request to apply new settings at runtime, several requirements must be met:
// - transform must be running, meaning a task exists
// - transform is not failed (stopped transforms do not have a task)
// - the node where transform is executed on is at least 7.8.0 in order to understand the request
if (transformTask != null
&& transformTask.isAssigned()
&& transformTask.getState() instanceof TransformState
&& ((TransformState) transformTask.getState()).getTaskState() != TransformTaskState.FAILED
&& clusterState.nodes().get(transformTask.getExecutorNode()).getVersion().onOrAfter(Version.V_7_8_0)) {
request.setNodes(transformTask.getExecutorNode());
request.setConfig(updatedConfig);
super.doExecute(task, request, listener);
return;
}
}
listener.onResponse(new Response(updatedConfig));
}, listener::onFailure)
useSecondaryAuthIfAvailable(securityContext, () -> {
// set headers to run transform as calling user
Map<String, String> filteredHeaders = ClientHelper.getPersistableSafeSecurityHeaders(
threadPool.getThreadContext(),
clusterService.state()
);
}, listener::onFailure));
TransformConfigUpdate update = request.getUpdate();
update.setHeaders(filteredHeaders);
// GET transform and attempt to update
// We don't want the update to complete if the config changed between GET and INDEX
transformConfigManager.getTransformConfigurationForUpdate(
request.getId(),
ActionListener.wrap(
configAndVersion -> TransformUpdater.updateTransform(
securityContext,
indexNameExpressionResolver,
clusterState,
settings,
client,
transformConfigManager,
configAndVersion.v1(),
update,
configAndVersion.v2(),
request.isDeferValidation(),
false, // dryRun
true, // checkAccess
request.getTimeout(),
ActionListener.wrap(updateResponse -> {
TransformConfig updatedConfig = updateResponse.getConfig();
auditor.info(updatedConfig.getId(), "Updated transform.");
logger.debug("[{}] Updated transform [{}]", updatedConfig.getId(), updateResponse.getStatus());
checkTransformConfigAndLogWarnings(updatedConfig);
if (update.changesSettings(configAndVersion.v1())) {
PersistentTasksCustomMetadata.PersistentTask<?> transformTask = TransformTask.getTransformTask(
request.getId(),
clusterState
);
// to send a request to apply new settings at runtime, several requirements must be met:
// - transform must be running, meaning a task exists
// - transform is not failed (stopped transforms do not have a task)
// - the node where transform is executed on is at least 7.8.0 in order to understand the request
if (transformTask != null
&& transformTask.isAssigned()
&& transformTask.getState() instanceof TransformState
&& ((TransformState) transformTask.getState()).getTaskState() != TransformTaskState.FAILED
&& clusterState.nodes().get(transformTask.getExecutorNode()).getVersion().onOrAfter(Version.V_7_8_0)) {
request.setNodes(transformTask.getExecutorNode());
request.setConfig(updatedConfig);
super.doExecute(task, request, listener);
return;
}
}
listener.onResponse(new Response(updatedConfig));
}, listener::onFailure)
),
listener::onFailure
)
);
});
}
private void checkTransformConfigAndLogWarnings(TransformConfig config) {

View file

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.transform.utils;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.authc.support.SecondaryAuthentication;
public final class SecondaryAuthorizationUtils {
private SecondaryAuthorizationUtils() {}
/**
* This executes the supplied runnable inside the secondary auth context if it exists;
*/
public static void useSecondaryAuthIfAvailable(SecurityContext securityContext, Runnable runnable) {
if (securityContext == null) {
runnable.run();
return;
}
SecondaryAuthentication secondaryAuth = securityContext.getSecondaryAuthentication();
if (secondaryAuth != null) {
runnable = secondaryAuth.wrap(runnable);
}
runnable.run();
}
}

View file

@ -22,6 +22,8 @@ import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse;
@ -75,12 +77,8 @@ public class TransformPrivilegeCheckerTests extends ESTestCase {
.setSource(new SourceConfig(SOURCE_INDEX_NAME))
.setDest(new DestConfig(DEST_INDEX_NAME, null))
.build();
private final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, null) {
public User getUser() {
return new User(USER_NAME);
}
};
private ThreadPool threadPool;
private SecurityContext securityContext;
private final IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance();
private MyMockClient client;
@ -90,11 +88,18 @@ public class TransformPrivilegeCheckerTests extends ESTestCase {
client.close();
}
client = new MyMockClient(getTestName());
threadPool = new TestThreadPool("transform_privilege_checker_tests");
securityContext = new SecurityContext(Settings.EMPTY, threadPool.getThreadContext()) {
public User getUser() {
return new User(USER_NAME);
}
};
}
@After
public void tearDownClient() {
client.close();
threadPool.shutdown();
}
public void testCheckPrivileges_NoCheckDestIndexPrivileges() {