mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-24 23:27:25 -04:00
[ML] [Transforms] fix transform _start permissions to use stored headers in the config (#86802)
It was previously required that the _start API caller required the same roles as the create API caller. This does not make sense as when the transform is actually running (after _start) we rely solely on the roles of the caller who created the transform. Consequently, this commit does the permission validations and various checks with the roles of user who created the transform, not the one calling _start
This commit is contained in:
parent
480abeaa9d
commit
88a5da9560
7 changed files with 49 additions and 37 deletions
6
docs/changelog/86802.yaml
Normal file
6
docs/changelog/86802.yaml
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
pr: 86802
|
||||||
|
summary: "Fix transform `_start` permissions to use stored headers in\
|
||||||
|
\ the config"
|
||||||
|
area: Transform
|
||||||
|
type: bug
|
||||||
|
issues: []
|
|
@ -21,9 +21,6 @@ Requires the following privileges:
|
||||||
|
|
||||||
* cluster: `manage_transform` (the `transform_admin` built-in role grants this
|
* cluster: `manage_transform` (the `transform_admin` built-in role grants this
|
||||||
privilege)
|
privilege)
|
||||||
* source indices: `read`, `view_index_metadata`.
|
|
||||||
* destination index: `read`, `create_index`, `index`. If a `retention_policy` is configured, the `delete` privilege is
|
|
||||||
also required.
|
|
||||||
|
|
||||||
[[start-transform-desc]]
|
[[start-transform-desc]]
|
||||||
== {api-description-title}
|
== {api-description-title}
|
||||||
|
|
|
@ -229,8 +229,12 @@ public class TransformUpdateIT extends TransformRestTestCase {
|
||||||
assertEquals(1, XContentMapValues.extractValue("count", transforms));
|
assertEquals(1, XContentMapValues.extractValue("count", transforms));
|
||||||
|
|
||||||
// start using admin 1, but as the header is still admin 2
|
// start using admin 1, but as the header is still admin 2
|
||||||
// BUG: this should fail, because the transform can not access the source index any longer
|
try {
|
||||||
startAndWaitForTransform(transformId, transformDest, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1);
|
startAndWaitForTransform(transformId, transformDest, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1);
|
||||||
|
fail("request should have failed");
|
||||||
|
} catch (ResponseException e) {
|
||||||
|
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(500));
|
||||||
|
}
|
||||||
|
|
||||||
assertBusy(() -> {
|
assertBusy(() -> {
|
||||||
Map<?, ?> transformStatsAsMap = getTransformStateAndStats(transformId);
|
Map<?, ?> transformStatsAsMap = getTransformStateAndStats(transformId);
|
||||||
|
|
|
@ -20,6 +20,7 @@ import org.elasticsearch.core.Nullable;
|
||||||
import org.elasticsearch.core.TimeValue;
|
import org.elasticsearch.core.TimeValue;
|
||||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||||
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
|
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
|
||||||
|
import org.elasticsearch.xpack.core.ClientHelper;
|
||||||
import org.elasticsearch.xpack.core.XPackSettings;
|
import org.elasticsearch.xpack.core.XPackSettings;
|
||||||
import org.elasticsearch.xpack.core.security.SecurityContext;
|
import org.elasticsearch.xpack.core.security.SecurityContext;
|
||||||
import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction;
|
import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction;
|
||||||
|
@ -175,7 +176,7 @@ public class TransformUpdater {
|
||||||
|
|
||||||
// <2> Validate source and destination indices
|
// <2> Validate source and destination indices
|
||||||
ActionListener<Void> checkPrivilegesListener = ActionListener.wrap(
|
ActionListener<Void> checkPrivilegesListener = ActionListener.wrap(
|
||||||
aVoid -> { validateTransform(updatedConfig, client, deferValidation, timeout, validateTransformListener); },
|
aVoid -> validateTransform(updatedConfig, client, deferValidation, timeout, validateTransformListener),
|
||||||
listener::onFailure
|
listener::onFailure
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -203,7 +204,10 @@ public class TransformUpdater {
|
||||||
TimeValue timeout,
|
TimeValue timeout,
|
||||||
ActionListener<Map<String, String>> listener
|
ActionListener<Map<String, String>> listener
|
||||||
) {
|
) {
|
||||||
client.execute(
|
ClientHelper.executeWithHeadersAsync(
|
||||||
|
config.getHeaders(),
|
||||||
|
ClientHelper.TRANSFORM_ORIGIN,
|
||||||
|
client,
|
||||||
ValidateTransformAction.INSTANCE,
|
ValidateTransformAction.INSTANCE,
|
||||||
new ValidateTransformAction.Request(config, deferValidation, timeout),
|
new ValidateTransformAction.Request(config, deferValidation, timeout),
|
||||||
ActionListener.wrap(response -> listener.onResponse(response.getDestIndexMappings()), listener::onFailure)
|
ActionListener.wrap(response -> listener.onResponse(response.getDestIndexMappings()), listener::onFailure)
|
||||||
|
@ -234,7 +238,7 @@ public class TransformUpdater {
|
||||||
transformConfigManager.putOrUpdateTransformStoredDoc(
|
transformConfigManager.putOrUpdateTransformStoredDoc(
|
||||||
currentState.v1(),
|
currentState.v1(),
|
||||||
null, // set seqNoPrimaryTermAndIndex to `null` to force optype `create`, gh#80073
|
null, // set seqNoPrimaryTermAndIndex to `null` to force optype `create`, gh#80073
|
||||||
ActionListener.wrap(r -> { listener.onResponse(lastCheckpoint); }, e -> {
|
ActionListener.wrap(r -> listener.onResponse(lastCheckpoint), e -> {
|
||||||
if (org.elasticsearch.ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
|
if (org.elasticsearch.ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
|
||||||
// if a version conflict occurs a new state has been written between us reading and writing.
|
// if a version conflict occurs a new state has been written between us reading and writing.
|
||||||
// this is a benign case, as it means the transform is running and the latest state has been written by it
|
// this is a benign case, as it means the transform is running and the latest state has been written by it
|
||||||
|
@ -277,15 +281,17 @@ public class TransformUpdater {
|
||||||
ActionListener<Void> listener
|
ActionListener<Void> listener
|
||||||
) {
|
) {
|
||||||
// <3> Return to the listener
|
// <3> Return to the listener
|
||||||
ActionListener<Boolean> putTransformConfigurationListener = ActionListener.wrap(putTransformConfigurationResult -> {
|
ActionListener<Boolean> putTransformConfigurationListener = ActionListener.wrap(
|
||||||
transformConfigManager.deleteOldTransformConfigurations(config.getId(), ActionListener.wrap(r -> {
|
putTransformConfigurationResult -> transformConfigManager.deleteOldTransformConfigurations(
|
||||||
logger.trace("[{}] successfully deleted old transform configurations", config.getId());
|
config.getId(),
|
||||||
listener.onResponse(null);
|
ActionListener.wrap(r -> {
|
||||||
}, e -> {
|
logger.trace("[{}] successfully deleted old transform configurations", config.getId());
|
||||||
logger.warn(LoggerMessageFormat.format("[{}] failed deleting old transform configurations.", config.getId()), e);
|
listener.onResponse(null);
|
||||||
listener.onResponse(null);
|
}, e -> {
|
||||||
}));
|
logger.warn(LoggerMessageFormat.format("[{}] failed deleting old transform configurations.", config.getId()), e);
|
||||||
},
|
listener.onResponse(null);
|
||||||
|
})
|
||||||
|
),
|
||||||
// If we failed to INDEX AND we created the destination index, the destination index will still be around
|
// If we failed to INDEX AND we created the destination index, the destination index will still be around
|
||||||
// This is a similar behavior to _start
|
// This is a similar behavior to _start
|
||||||
listener::onFailure
|
listener::onFailure
|
||||||
|
|
|
@ -24,9 +24,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.ValidationException;
|
import org.elasticsearch.common.ValidationException;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
|
||||||
import org.elasticsearch.core.TimeValue;
|
import org.elasticsearch.core.TimeValue;
|
||||||
import org.elasticsearch.ingest.IngestService;
|
|
||||||
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
|
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
|
||||||
import org.elasticsearch.persistent.PersistentTasksService;
|
import org.elasticsearch.persistent.PersistentTasksService;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
|
@ -64,7 +62,6 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
||||||
private final PersistentTasksService persistentTasksService;
|
private final PersistentTasksService persistentTasksService;
|
||||||
private final Client client;
|
private final Client client;
|
||||||
private final TransformAuditor auditor;
|
private final TransformAuditor auditor;
|
||||||
private final IngestService ingestService;
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public TransportStartTransformAction(
|
public TransportStartTransformAction(
|
||||||
|
@ -75,9 +72,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
||||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||||
TransformServices transformServices,
|
TransformServices transformServices,
|
||||||
PersistentTasksService persistentTasksService,
|
PersistentTasksService persistentTasksService,
|
||||||
Client client,
|
Client client
|
||||||
Settings settings,
|
|
||||||
IngestService ingestService
|
|
||||||
) {
|
) {
|
||||||
this(
|
this(
|
||||||
StartTransformAction.NAME,
|
StartTransformAction.NAME,
|
||||||
|
@ -88,9 +83,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
||||||
indexNameExpressionResolver,
|
indexNameExpressionResolver,
|
||||||
transformServices,
|
transformServices,
|
||||||
persistentTasksService,
|
persistentTasksService,
|
||||||
client,
|
client
|
||||||
settings,
|
|
||||||
ingestService
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,9 +96,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
||||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||||
TransformServices transformServices,
|
TransformServices transformServices,
|
||||||
PersistentTasksService persistentTasksService,
|
PersistentTasksService persistentTasksService,
|
||||||
Client client,
|
Client client
|
||||||
Settings settings,
|
|
||||||
IngestService ingestService
|
|
||||||
) {
|
) {
|
||||||
super(
|
super(
|
||||||
name,
|
name,
|
||||||
|
@ -122,7 +113,6 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
||||||
this.persistentTasksService = persistentTasksService;
|
this.persistentTasksService = persistentTasksService;
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.auditor = transformServices.getAuditor();
|
this.auditor = transformServices.getAuditor();
|
||||||
this.ingestService = ingestService;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -131,7 +121,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
||||||
StartTransformAction.Request request,
|
StartTransformAction.Request request,
|
||||||
ClusterState state,
|
ClusterState state,
|
||||||
ActionListener<StartTransformAction.Response> listener
|
ActionListener<StartTransformAction.Response> listener
|
||||||
) throws Exception {
|
) {
|
||||||
TransformNodes.warnIfNoTransformNodes(state);
|
TransformNodes.warnIfNoTransformNodes(state);
|
||||||
|
|
||||||
final AtomicReference<TransformTaskParams> transformTaskParamsHolder = new AtomicReference<>();
|
final AtomicReference<TransformTaskParams> transformTaskParamsHolder = new AtomicReference<>();
|
||||||
|
@ -250,7 +240,10 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
transformConfigHolder.set(config);
|
transformConfigHolder.set(config);
|
||||||
client.execute(
|
ClientHelper.executeWithHeadersAsync(
|
||||||
|
config.getHeaders(),
|
||||||
|
ClientHelper.TRANSFORM_ORIGIN,
|
||||||
|
client,
|
||||||
ValidateTransformAction.INSTANCE,
|
ValidateTransformAction.INSTANCE,
|
||||||
new ValidateTransformAction.Request(config, false, request.timeout()),
|
new ValidateTransformAction.Request(config, false, request.timeout()),
|
||||||
validationListener
|
validationListener
|
||||||
|
@ -281,7 +274,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cancelTransformTask(String taskId, String transformId, Exception exception, Consumer<Exception> onFailure) {
|
private void cancelTransformTask(String taskId, String transformId, Exception exception, Consumer<Exception> onFailure) {
|
||||||
persistentTasksService.sendRemoveRequest(taskId, new ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>() {
|
persistentTasksService.sendRemoveRequest(taskId, new ActionListener<>() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
|
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
|
||||||
// We succeeded in canceling the persistent task, but the
|
// We succeeded in canceling the persistent task, but the
|
||||||
|
@ -346,7 +339,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
||||||
* Important: the methods of this class must NOT throw exceptions. If they did then the callers
|
* Important: the methods of this class must NOT throw exceptions. If they did then the callers
|
||||||
* of endpoints waiting for a condition tested by this predicate would never get a response.
|
* of endpoints waiting for a condition tested by this predicate would never get a response.
|
||||||
*/
|
*/
|
||||||
private class TransformPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> {
|
private static class TransformPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> {
|
||||||
|
|
||||||
private volatile Exception exception;
|
private volatile Exception exception;
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||||
import org.elasticsearch.cluster.metadata.MappingMetadata;
|
import org.elasticsearch.cluster.metadata.MappingMetadata;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.IndexNotFoundException;
|
import org.elasticsearch.index.IndexNotFoundException;
|
||||||
|
import org.elasticsearch.xpack.core.ClientHelper;
|
||||||
import org.elasticsearch.xpack.core.transform.TransformField;
|
import org.elasticsearch.xpack.core.transform.TransformField;
|
||||||
import org.elasticsearch.xpack.core.transform.TransformMessages;
|
import org.elasticsearch.xpack.core.transform.TransformMessages;
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||||
|
@ -103,7 +104,10 @@ public final class TransformIndex {
|
||||||
request.alias(alias);
|
request.alias(alias);
|
||||||
}
|
}
|
||||||
|
|
||||||
client.execute(
|
ClientHelper.executeWithHeadersAsync(
|
||||||
|
transformConfig.getHeaders(),
|
||||||
|
TRANSFORM_ORIGIN,
|
||||||
|
client,
|
||||||
CreateIndexAction.INSTANCE,
|
CreateIndexAction.INSTANCE,
|
||||||
request,
|
request,
|
||||||
ActionListener.wrap(createIndexResponse -> { listener.onResponse(true); }, e -> {
|
ActionListener.wrap(createIndexResponse -> { listener.onResponse(true); }, e -> {
|
||||||
|
|
|
@ -50,6 +50,7 @@ import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
import static org.mockito.internal.verification.VerificationModeFactory.atLeastOnce;
|
||||||
|
|
||||||
public class TransformIndexTests extends ESTestCase {
|
public class TransformIndexTests extends ESTestCase {
|
||||||
|
|
||||||
|
@ -59,7 +60,7 @@ public class TransformIndexTests extends ESTestCase {
|
||||||
private static final String CREATED_BY = "transform";
|
private static final String CREATED_BY = "transform";
|
||||||
|
|
||||||
private Client client;
|
private Client client;
|
||||||
private Clock clock = Clock.fixed(Instant.ofEpochMilli(CURRENT_TIME_MILLIS), ZoneId.systemDefault());
|
private final Clock clock = Clock.fixed(Instant.ofEpochMilli(CURRENT_TIME_MILLIS), ZoneId.systemDefault());
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUpMocks() {
|
public void setUpMocks() {
|
||||||
|
@ -141,11 +142,12 @@ public class TransformIndexTests extends ESTestCase {
|
||||||
client,
|
client,
|
||||||
TransformConfigTests.randomTransformConfig(TRANSFORM_ID),
|
TransformConfigTests.randomTransformConfig(TRANSFORM_ID),
|
||||||
TransformIndex.createTransformDestIndexSettings(new HashMap<>(), TRANSFORM_ID, clock),
|
TransformIndex.createTransformDestIndexSettings(new HashMap<>(), TRANSFORM_ID, clock),
|
||||||
ActionListener.wrap(value -> assertTrue(value), e -> fail(e.getMessage()))
|
ActionListener.wrap(Assert::assertTrue, e -> fail(e.getMessage()))
|
||||||
);
|
);
|
||||||
|
|
||||||
ArgumentCaptor<CreateIndexRequest> createIndexRequestCaptor = ArgumentCaptor.forClass(CreateIndexRequest.class);
|
ArgumentCaptor<CreateIndexRequest> createIndexRequestCaptor = ArgumentCaptor.forClass(CreateIndexRequest.class);
|
||||||
verify(client).execute(eq(CreateIndexAction.INSTANCE), createIndexRequestCaptor.capture(), any());
|
verify(client).execute(eq(CreateIndexAction.INSTANCE), createIndexRequestCaptor.capture(), any());
|
||||||
|
verify(client, atLeastOnce()).threadPool();
|
||||||
verifyNoMoreInteractions(client);
|
verifyNoMoreInteractions(client);
|
||||||
|
|
||||||
CreateIndexRequest createIndexRequest = createIndexRequestCaptor.getValue();
|
CreateIndexRequest createIndexRequest = createIndexRequestCaptor.getValue();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue