Revert "Restrict Connector APIs to manage/monitor_connector privileges (#119389)" (#119833)

This reverts commit c88eef308c.
This commit is contained in:
Jedr Blaszyk 2025-01-09 11:02:51 +01:00 committed by GitHub
parent 996a4f8e7d
commit e0cefb8ff0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
37 changed files with 230 additions and 230 deletions

View file

@ -1,5 +0,0 @@
pr: 119389
summary: Restrict Connector APIs to manage/monitor_connector privileges
area: Extract&Transform
type: feature
issues: []

View file

@ -4,12 +4,13 @@ admin:
- manage_behavioral_analytics - manage_behavioral_analytics
- manage - manage
- monitor - monitor
- manage_connector
indices: indices:
- names: [ - names: [
# indices and search applications # indices and search applications
"test-*", "test-*",
"another-test-search-application", "another-test-search-application",
".elastic-connectors-v1",
".elastic-connectors-sync-jobs-v1"
] ]
privileges: [ "manage", "write", "read" ] privileges: [ "manage", "write", "read" ]
@ -19,7 +20,6 @@ user:
- manage_api_key - manage_api_key
- read_connector_secrets - read_connector_secrets
- write_connector_secrets - write_connector_secrets
- monitor_connector
indices: indices:
- names: [ - names: [
"test-index1", "test-index1",
@ -27,7 +27,9 @@ user:
"test-search-application-1", "test-search-application-1",
"test-search-application-with-aggs", "test-search-application-with-aggs",
"test-search-application-with-list", "test-search-application-with-list",
"test-search-application-with-list-invalid" "test-search-application-with-list-invalid",
".elastic-connectors-v1",
".elastic-connectors-sync-jobs-v1"
] ]
privileges: [ "read" ] privileges: [ "read" ]

View file

@ -24,7 +24,6 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.BoolQueryBuilder;
@ -78,15 +77,13 @@ import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.application.connector.ConnectorFiltering.fromXContentBytesConnectorFiltering; import static org.elasticsearch.xpack.application.connector.ConnectorFiltering.fromXContentBytesConnectorFiltering;
import static org.elasticsearch.xpack.application.connector.ConnectorFiltering.sortFilteringRulesByOrder; import static org.elasticsearch.xpack.application.connector.ConnectorFiltering.sortFilteringRulesByOrder;
import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.MANAGED_CONNECTOR_INDEX_PREFIX; import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.MANAGED_CONNECTOR_INDEX_PREFIX;
import static org.elasticsearch.xpack.core.ClientHelper.CONNECTORS_ORIGIN;
/** /**
* A service that manages persistent {@link Connector} configurations. * A service that manages persistent {@link Connector} configurations.
*/ */
public class ConnectorIndexService { public class ConnectorIndexService {
// The client to interact with the system index (internal user). private final Client client;
private final Client clientWithOrigin;
public static final String CONNECTOR_INDEX_NAME = ConnectorTemplateRegistry.CONNECTOR_INDEX_NAME_PATTERN; public static final String CONNECTOR_INDEX_NAME = ConnectorTemplateRegistry.CONNECTOR_INDEX_NAME_PATTERN;
@ -94,7 +91,7 @@ public class ConnectorIndexService {
* @param client A client for executing actions on the connector index * @param client A client for executing actions on the connector index
*/ */
public ConnectorIndexService(Client client) { public ConnectorIndexService(Client client) {
this.clientWithOrigin = new OriginSettingClient(client, CONNECTORS_ORIGIN); this.client = client;
} }
/** /**
@ -140,7 +137,7 @@ public class ConnectorIndexService {
indexRequest = indexRequest.id(connectorId); indexRequest = indexRequest.id(connectorId);
} }
clientWithOrigin.index( client.index(
indexRequest, indexRequest,
listener.delegateFailureAndWrap( listener.delegateFailureAndWrap(
(ll, indexResponse) -> ll.onResponse( (ll, indexResponse) -> ll.onResponse(
@ -204,7 +201,7 @@ public class ConnectorIndexService {
try { try {
final GetRequest getRequest = new GetRequest(CONNECTOR_INDEX_NAME).id(connectorId).realtime(true); final GetRequest getRequest = new GetRequest(CONNECTOR_INDEX_NAME).id(connectorId).realtime(true);
clientWithOrigin.get(getRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, getResponse) -> { client.get(getRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, getResponse) -> {
if (getResponse.isExists() == false) { if (getResponse.isExists() == false) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return; return;
@ -238,23 +235,17 @@ public class ConnectorIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try { try {
clientWithOrigin.delete( client.delete(deleteRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, deleteResponse) -> {
deleteRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, deleteResponse) -> {
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return; return;
} }
if (shouldDeleteSyncJobs) { if (shouldDeleteSyncJobs) {
new ConnectorSyncJobIndexService(clientWithOrigin).deleteAllSyncJobsByConnectorId( new ConnectorSyncJobIndexService(client).deleteAllSyncJobsByConnectorId(connectorId, l.map(r -> deleteResponse));
connectorId,
l.map(r -> deleteResponse)
);
} else { } else {
l.onResponse(deleteResponse); l.onResponse(deleteResponse);
} }
}) }));
);
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }
@ -288,7 +279,7 @@ public class ConnectorIndexService {
.fetchSource(true) .fetchSource(true)
.sort(Connector.INDEX_NAME_FIELD.getPreferredName(), SortOrder.ASC); .sort(Connector.INDEX_NAME_FIELD.getPreferredName(), SortOrder.ASC);
final SearchRequest req = new SearchRequest(CONNECTOR_INDEX_NAME).source(source); final SearchRequest req = new SearchRequest(CONNECTOR_INDEX_NAME).source(source);
clientWithOrigin.search(req, new ActionListener<>() { client.search(req, new ActionListener<>() {
@Override @Override
public void onResponse(SearchResponse searchResponse) { public void onResponse(SearchResponse searchResponse) {
try { try {
@ -463,7 +454,7 @@ public class ConnectorIndexService {
return; return;
} }
clientWithOrigin.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, l, (ll, updateResponse) -> { client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, l, (ll, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return; return;
@ -500,16 +491,13 @@ public class ConnectorIndexService {
} }
}) })
); );
clientWithOrigin.update( client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return; return;
} }
l.onResponse(updateResponse); l.onResponse(updateResponse);
}) }));
);
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }
@ -531,16 +519,13 @@ public class ConnectorIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) .source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
); );
clientWithOrigin.update( client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return; return;
} }
l.onResponse(updateResponse); l.onResponse(updateResponse);
}) }));
);
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }
@ -561,16 +546,13 @@ public class ConnectorIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(Map.of(Connector.FILTERING_FIELD.getPreferredName(), filtering)) .source(Map.of(Connector.FILTERING_FIELD.getPreferredName(), filtering))
); );
clientWithOrigin.update( client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return; return;
} }
l.onResponse(updateResponse); l.onResponse(updateResponse);
}) }));
);
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }
@ -591,16 +573,13 @@ public class ConnectorIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(Map.of(Connector.FEATURES_FIELD.getPreferredName(), features)) .source(Map.of(Connector.FEATURES_FIELD.getPreferredName(), features))
); );
clientWithOrigin.update( client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return; return;
} }
l.onResponse(updateResponse); l.onResponse(updateResponse);
}) }));
);
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }
@ -656,16 +635,13 @@ public class ConnectorIndexService {
.source(Map.of(Connector.FILTERING_FIELD.getPreferredName(), List.of(connectorFilteringWithUpdatedDraft))) .source(Map.of(Connector.FILTERING_FIELD.getPreferredName(), List.of(connectorFilteringWithUpdatedDraft)))
); );
clientWithOrigin.update( client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (ll, updateResponse) -> {
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (ll, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return; return;
} }
ll.onResponse(updateResponse); ll.onResponse(updateResponse);
}) }));
);
})); }));
} catch (Exception e) { } catch (Exception e) {
@ -707,7 +683,7 @@ public class ConnectorIndexService {
.source(Map.of(Connector.FILTERING_FIELD.getPreferredName(), List.of(activatedConnectorFiltering))) .source(Map.of(Connector.FILTERING_FIELD.getPreferredName(), List.of(activatedConnectorFiltering)))
); );
clientWithOrigin.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, l, (ll, updateResponse) -> { client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, l, (ll, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return; return;
@ -764,7 +740,7 @@ public class ConnectorIndexService {
.source(Map.of(Connector.FILTERING_FIELD.getPreferredName(), List.of(activatedConnectorFiltering))) .source(Map.of(Connector.FILTERING_FIELD.getPreferredName(), List.of(activatedConnectorFiltering)))
); );
clientWithOrigin.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, l, (ll, updateResponse) -> { client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, l, (ll, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return; return;
@ -792,16 +768,13 @@ public class ConnectorIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(Map.of(Connector.LAST_SEEN_FIELD.getPreferredName(), Instant.now())) .source(Map.of(Connector.LAST_SEEN_FIELD.getPreferredName(), Instant.now()))
); );
clientWithOrigin.update( client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return; return;
} }
l.onResponse(updateResponse); l.onResponse(updateResponse);
}) }));
);
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }
@ -822,16 +795,13 @@ public class ConnectorIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) .source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
); );
clientWithOrigin.update( client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return; return;
} }
l.onResponse(updateResponse); l.onResponse(updateResponse);
}) }));
);
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }
@ -895,16 +865,13 @@ public class ConnectorIndexService {
) )
) )
); );
clientWithOrigin.update( client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (ll, updateResponse) -> {
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (ll, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return; return;
} }
ll.onResponse(updateResponse); ll.onResponse(updateResponse);
}) }));
);
})); }));
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
@ -927,16 +894,13 @@ public class ConnectorIndexService {
.source(Map.of(Connector.PIPELINE_FIELD.getPreferredName(), request.getPipeline())) .source(Map.of(Connector.PIPELINE_FIELD.getPreferredName(), request.getPipeline()))
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) .source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
); );
clientWithOrigin.update( client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return; return;
} }
l.onResponse(updateResponse); l.onResponse(updateResponse);
}) }));
);
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }
@ -995,7 +959,7 @@ public class ConnectorIndexService {
} }
}) })
); );
clientWithOrigin.update( client.update(
updateRequest, updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (lll, updateResponse) -> { new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (lll, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
@ -1028,16 +992,13 @@ public class ConnectorIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(Map.of(Connector.SCHEDULING_FIELD.getPreferredName(), request.getScheduling())) .source(Map.of(Connector.SCHEDULING_FIELD.getPreferredName(), request.getScheduling()))
); );
clientWithOrigin.update( client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return; return;
} }
l.onResponse(updateResponse); l.onResponse(updateResponse);
}) }));
);
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }
@ -1073,7 +1034,7 @@ public class ConnectorIndexService {
) )
); );
clientWithOrigin.update( client.update(
updateRequest, updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (updateListener, updateResponse) -> { new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (updateListener, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
@ -1116,7 +1077,7 @@ public class ConnectorIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(Map.of(Connector.STATUS_FIELD.getPreferredName(), request.getStatus())) .source(Map.of(Connector.STATUS_FIELD.getPreferredName(), request.getStatus()))
); );
clientWithOrigin.update( client.update(
updateRequest, updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (updateListener, updateResponse) -> { new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (updateListener, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
@ -1144,16 +1105,13 @@ public class ConnectorIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) .source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
); );
clientWithOrigin.update( client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return; return;
} }
l.onResponse(updateResponse); l.onResponse(updateResponse);
}) }));
);
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }
@ -1223,7 +1181,7 @@ public class ConnectorIndexService {
final SearchSourceBuilder searchSource = new SearchSourceBuilder().query(boolFilterQueryBuilder); final SearchSourceBuilder searchSource = new SearchSourceBuilder().query(boolFilterQueryBuilder);
final SearchRequest searchRequest = new SearchRequest(CONNECTOR_INDEX_NAME).source(searchSource); final SearchRequest searchRequest = new SearchRequest(CONNECTOR_INDEX_NAME).source(searchSource);
clientWithOrigin.search(searchRequest, new ActionListener<>() { client.search(searchRequest, new ActionListener<>() {
@Override @Override
public void onResponse(SearchResponse searchResponse) { public void onResponse(SearchResponse searchResponse) {
boolean indexNameIsInUse = searchResponse.getHits().getTotalHits().value() > 0L; boolean indexNameIsInUse = searchResponse.getHits().getTotalHits().value() > 0L;

View file

@ -9,9 +9,12 @@ package org.elasticsearch.xpack.application.connector.action;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry;
import java.io.IOException; import java.io.IOException;
@ -19,9 +22,10 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.MANAGED_CONNECTOR_INDEX_PREFIX; import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.MANAGED_CONNECTOR_INDEX_PREFIX;
/** /**
* Abstract base class for action requests targeting the connectors index. * Abstract base class for action requests targeting the connectors index. Implements {@link org.elasticsearch.action.IndicesRequest}
* to ensure index-level privilege support. This class defines the connectors index as the target for all derived action requests.
*/ */
public abstract class ConnectorActionRequest extends ActionRequest { public abstract class ConnectorActionRequest extends ActionRequest implements IndicesRequest {
public ConnectorActionRequest() { public ConnectorActionRequest() {
super(); super();
@ -74,4 +78,14 @@ public abstract class ConnectorActionRequest extends ActionRequest {
} }
return validationException; return validationException;
} }
@Override
public String[] indices() {
return new String[] { ConnectorTemplateRegistry.CONNECTOR_INDEX_NAME_PATTERN };
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.lenientExpandHidden();
}
} }

View file

@ -18,6 +18,7 @@ import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
@ -27,7 +28,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class DeleteConnectorAction { public class DeleteConnectorAction {
public static final String NAME = "cluster:admin/xpack/connector/delete"; public static final String NAME = "indices:data/write/xpack/connector/delete";
public static final ActionType<AcknowledgedResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<AcknowledgedResponse> INSTANCE = new ActionType<>(NAME);
private DeleteConnectorAction() {/* no instances */} private DeleteConnectorAction() {/* no instances */}
@ -70,6 +71,14 @@ public class DeleteConnectorAction {
return deleteSyncJobs; return deleteSyncJobs;
} }
@Override
public String[] indices() {
// When deleting a connector, corresponding sync jobs can also be deleted
return new String[] {
ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN,
ConnectorTemplateRegistry.CONNECTOR_INDEX_NAME_PATTERN };
}
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);

View file

@ -28,7 +28,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class GetConnectorAction { public class GetConnectorAction {
public static final String NAME = "cluster:admin/xpack/connector/get"; public static final String NAME = "indices:data/read/xpack/connector/get";
public static final ActionType<GetConnectorAction.Response> INSTANCE = new ActionType<>(NAME); public static final ActionType<GetConnectorAction.Response> INSTANCE = new ActionType<>(NAME);
private GetConnectorAction() {/* no instances */} private GetConnectorAction() {/* no instances */}

View file

@ -34,7 +34,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class ListConnectorAction { public class ListConnectorAction {
public static final String NAME = "cluster:admin/xpack/connector/list"; public static final String NAME = "indices:data/read/xpack/connector/list";
public static final ActionType<ListConnectorAction.Response> INSTANCE = new ActionType<>(NAME); public static final ActionType<ListConnectorAction.Response> INSTANCE = new ActionType<>(NAME);
private ListConnectorAction() {/* no instances */} private ListConnectorAction() {/* no instances */}

View file

@ -25,7 +25,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class PostConnectorAction { public class PostConnectorAction {
public static final String NAME = "cluster:admin/xpack/connector/post"; public static final String NAME = "indices:data/write/xpack/connector/post";
public static final ActionType<ConnectorCreateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorCreateActionResponse> INSTANCE = new ActionType<>(NAME);
private PostConnectorAction() {/* no instances */} private PostConnectorAction() {/* no instances */}

View file

@ -9,6 +9,7 @@ package org.elasticsearch.xpack.application.connector.action;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -26,12 +27,12 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class PutConnectorAction { public class PutConnectorAction {
public static final String NAME = "cluster:admin/xpack/connector/put"; public static final String NAME = "indices:data/write/xpack/connector/put";
public static final ActionType<ConnectorCreateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorCreateActionResponse> INSTANCE = new ActionType<>(NAME);
private PutConnectorAction() {/* no instances */} private PutConnectorAction() {/* no instances */}
public static class Request extends ConnectorActionRequest implements ToXContentObject { public static class Request extends ConnectorActionRequest implements IndicesRequest, ToXContentObject {
@Nullable @Nullable
private final String connectorId; private final String connectorId;

View file

@ -22,7 +22,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
public class UpdateConnectorActiveFilteringAction { public class UpdateConnectorActiveFilteringAction {
public static final String NAME = "cluster:admin/xpack/connector/update_filtering/activate"; public static final String NAME = "indices:data/write/xpack/connector/update_filtering/activate";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorActiveFilteringAction() {/* no instances */} private UpdateConnectorActiveFilteringAction() {/* no instances */}

View file

@ -27,7 +27,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class UpdateConnectorApiKeyIdAction { public class UpdateConnectorApiKeyIdAction {
public static final String NAME = "cluster:admin/xpack/connector/update_api_key_id"; public static final String NAME = "indices:data/write/xpack/connector/update_api_key_id";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorApiKeyIdAction() {/* no instances */} private UpdateConnectorApiKeyIdAction() {/* no instances */}

View file

@ -32,7 +32,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class UpdateConnectorConfigurationAction { public class UpdateConnectorConfigurationAction {
public static final String NAME = "cluster:admin/xpack/connector/update_configuration"; public static final String NAME = "indices:data/write/xpack/connector/update_configuration";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorConfigurationAction() {/* no instances */} private UpdateConnectorConfigurationAction() {/* no instances */}

View file

@ -27,7 +27,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class UpdateConnectorErrorAction { public class UpdateConnectorErrorAction {
public static final String NAME = "cluster:admin/xpack/connector/update_error"; public static final String NAME = "indices:data/write/xpack/connector/update_error";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorErrorAction() {/* no instances */} private UpdateConnectorErrorAction() {/* no instances */}

View file

@ -27,7 +27,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class UpdateConnectorFeaturesAction { public class UpdateConnectorFeaturesAction {
public static final String NAME = "cluster:admin/xpack/connector/update_features"; public static final String NAME = "indices:data/write/xpack/connector/update_features";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorFeaturesAction() {/* no instances */} private UpdateConnectorFeaturesAction() {/* no instances */}

View file

@ -32,7 +32,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class UpdateConnectorFilteringAction { public class UpdateConnectorFilteringAction {
public static final String NAME = "cluster:admin/xpack/connector/update_filtering"; public static final String NAME = "indices:data/write/xpack/connector/update_filtering";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorFilteringAction() {/* no instances */} private UpdateConnectorFilteringAction() {/* no instances */}

View file

@ -27,7 +27,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class UpdateConnectorFilteringValidationAction { public class UpdateConnectorFilteringValidationAction {
public static final String NAME = "cluster:admin/xpack/connector/update_filtering/draft_validation"; public static final String NAME = "indices:data/write/xpack/connector/update_filtering/draft_validation";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorFilteringValidationAction() {/* no instances */} private UpdateConnectorFilteringValidationAction() {/* no instances */}

View file

@ -27,7 +27,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class UpdateConnectorIndexNameAction { public class UpdateConnectorIndexNameAction {
public static final String NAME = "cluster:admin/xpack/connector/update_index_name"; public static final String NAME = "indices:data/write/xpack/connector/update_index_name";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorIndexNameAction() {/* no instances */} private UpdateConnectorIndexNameAction() {/* no instances */}

View file

@ -23,7 +23,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
public class UpdateConnectorLastSeenAction { public class UpdateConnectorLastSeenAction {
public static final String NAME = "cluster:admin/xpack/connector/update_last_seen"; public static final String NAME = "indices:data/write/xpack/connector/update_last_seen";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorLastSeenAction() {/* no instances */} private UpdateConnectorLastSeenAction() {/* no instances */}

View file

@ -32,7 +32,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class UpdateConnectorLastSyncStatsAction { public class UpdateConnectorLastSyncStatsAction {
public static final String NAME = "cluster:admin/xpack/connector/update_last_sync_stats"; public static final String NAME = "indices:data/write/xpack/connector/update_last_sync_stats";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorLastSyncStatsAction() {/* no instances */} private UpdateConnectorLastSyncStatsAction() {/* no instances */}

View file

@ -27,7 +27,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class UpdateConnectorNameAction { public class UpdateConnectorNameAction {
public static final String NAME = "cluster:admin/xpack/connector/update_name"; public static final String NAME = "indices:data/write/xpack/connector/update_name";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorNameAction() {/* no instances */} private UpdateConnectorNameAction() {/* no instances */}

View file

@ -26,7 +26,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class UpdateConnectorNativeAction { public class UpdateConnectorNativeAction {
public static final String NAME = "cluster:admin/xpack/connector/update_native"; public static final String NAME = "indices:data/write/xpack/connector/update_native";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorNativeAction() {/* no instances */} private UpdateConnectorNativeAction() {/* no instances */}

View file

@ -27,7 +27,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class UpdateConnectorPipelineAction { public class UpdateConnectorPipelineAction {
public static final String NAME = "cluster:admin/xpack/connector/update_pipeline"; public static final String NAME = "indices:data/write/xpack/connector/update_pipeline";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorPipelineAction() {/* no instances */} private UpdateConnectorPipelineAction() {/* no instances */}

View file

@ -32,7 +32,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class UpdateConnectorSchedulingAction { public class UpdateConnectorSchedulingAction {
public static final String NAME = "cluster:admin/xpack/connector/update_scheduling"; public static final String NAME = "indices:data/write/xpack/connector/update_scheduling";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorSchedulingAction() {/* no instances */} private UpdateConnectorSchedulingAction() {/* no instances */}

View file

@ -26,7 +26,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class UpdateConnectorServiceTypeAction { public class UpdateConnectorServiceTypeAction {
public static final String NAME = "cluster:admin/xpack/connector/update_service_type"; public static final String NAME = "indices:data/write/xpack/connector/update_service_type";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorServiceTypeAction() {/* no instances */} private UpdateConnectorServiceTypeAction() {/* no instances */}

View file

@ -28,7 +28,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class UpdateConnectorStatusAction { public class UpdateConnectorStatusAction {
public static final String NAME = "cluster:admin/xpack/connector/update_status"; public static final String NAME = "indices:data/write/xpack/connector/update_status";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
public UpdateConnectorStatusAction() {/* no instances */} public UpdateConnectorStatusAction() {/* no instances */}

View file

@ -28,7 +28,6 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.engine.DocumentMissingException;
@ -69,7 +68,6 @@ import java.util.stream.Stream;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.application.connector.ConnectorIndexService.CONNECTOR_INDEX_NAME; import static org.elasticsearch.xpack.application.connector.ConnectorIndexService.CONNECTOR_INDEX_NAME;
import static org.elasticsearch.xpack.core.ClientHelper.CONNECTORS_ORIGIN;
/** /**
* A service that manages persistent {@link ConnectorSyncJob} configurations. * A service that manages persistent {@link ConnectorSyncJob} configurations.
@ -78,8 +76,7 @@ public class ConnectorSyncJobIndexService {
private static final Long ZERO = 0L; private static final Long ZERO = 0L;
// The client to interact with the system index (internal user). private final Client client;
private final Client clientWithOrigin;
public static final String CONNECTOR_SYNC_JOB_INDEX_NAME = ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN; public static final String CONNECTOR_SYNC_JOB_INDEX_NAME = ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN;
@ -87,7 +84,7 @@ public class ConnectorSyncJobIndexService {
* @param client A client for executing actions on the connectors sync jobs index. * @param client A client for executing actions on the connectors sync jobs index.
*/ */
public ConnectorSyncJobIndexService(Client client) { public ConnectorSyncJobIndexService(Client client) {
this.clientWithOrigin = new OriginSettingClient(client, CONNECTORS_ORIGIN); this.client = client;
} }
/** /**
@ -152,7 +149,7 @@ public class ConnectorSyncJobIndexService {
indexRequest.source(syncJob.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)); indexRequest.source(syncJob.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS));
clientWithOrigin.index( client.index(
indexRequest, indexRequest,
l.delegateFailureAndWrap( l.delegateFailureAndWrap(
(ll, indexResponse) -> ll.onResponse(new PostConnectorSyncJobAction.Response(indexResponse.getId())) (ll, indexResponse) -> ll.onResponse(new PostConnectorSyncJobAction.Response(indexResponse.getId()))
@ -178,7 +175,7 @@ public class ConnectorSyncJobIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try { try {
clientWithOrigin.delete( client.delete(
deleteRequest, deleteRequest,
new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(connectorSyncJobId, listener, (l, deleteResponse) -> { new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(connectorSyncJobId, listener, (l, deleteResponse) -> {
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
@ -208,7 +205,7 @@ public class ConnectorSyncJobIndexService {
).doc(Map.of(ConnectorSyncJob.LAST_SEEN_FIELD.getPreferredName(), newLastSeen)); ).doc(Map.of(ConnectorSyncJob.LAST_SEEN_FIELD.getPreferredName(), newLastSeen));
try { try {
clientWithOrigin.update( client.update(
updateRequest, updateRequest,
new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(connectorSyncJobId, listener, (l, updateResponse) -> { new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(connectorSyncJobId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
@ -233,7 +230,7 @@ public class ConnectorSyncJobIndexService {
final GetRequest getRequest = new GetRequest(CONNECTOR_SYNC_JOB_INDEX_NAME).id(connectorSyncJobId).realtime(true); final GetRequest getRequest = new GetRequest(CONNECTOR_SYNC_JOB_INDEX_NAME).id(connectorSyncJobId).realtime(true);
try { try {
clientWithOrigin.get( client.get(
getRequest, getRequest,
new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(connectorSyncJobId, listener, (l, getResponse) -> { new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(connectorSyncJobId, listener, (l, getResponse) -> {
if (getResponse.isExists() == false) { if (getResponse.isExists() == false) {
@ -309,7 +306,7 @@ public class ConnectorSyncJobIndexService {
WriteRequest.RefreshPolicy.IMMEDIATE WriteRequest.RefreshPolicy.IMMEDIATE
).doc(syncJobFieldsToUpdate); ).doc(syncJobFieldsToUpdate);
clientWithOrigin.update( client.update(
updateRequest, updateRequest,
new DelegatingIndexNotFoundOrDocumentMissingActionListener<>( new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(
connectorSyncJobId, connectorSyncJobId,
@ -358,7 +355,7 @@ public class ConnectorSyncJobIndexService {
final SearchRequest searchRequest = new SearchRequest(CONNECTOR_SYNC_JOB_INDEX_NAME).source(searchSource); final SearchRequest searchRequest = new SearchRequest(CONNECTOR_SYNC_JOB_INDEX_NAME).source(searchSource);
clientWithOrigin.search(searchRequest, new ActionListener<>() { client.search(searchRequest, new ActionListener<>() {
@Override @Override
public void onResponse(SearchResponse searchResponse) { public void onResponse(SearchResponse searchResponse) {
try { try {
@ -477,7 +474,7 @@ public class ConnectorSyncJobIndexService {
).doc(fieldsToUpdate); ).doc(fieldsToUpdate);
try { try {
clientWithOrigin.update( client.update(
updateRequest, updateRequest,
new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(syncJobId, listener, (l, updateResponse) -> { new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(syncJobId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { if (updateResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
@ -504,7 +501,7 @@ public class ConnectorSyncJobIndexService {
final GetRequest request = new GetRequest(CONNECTOR_INDEX_NAME, connectorId); final GetRequest request = new GetRequest(CONNECTOR_INDEX_NAME, connectorId);
clientWithOrigin.get(request, new ActionListener<>() { client.get(request, new ActionListener<>() {
@Override @Override
public void onResponse(GetResponse response) { public void onResponse(GetResponse response) {
final boolean connectorDoesNotExist = response.isExists() == false; final boolean connectorDoesNotExist = response.isExists() == false;
@ -597,7 +594,7 @@ public class ConnectorSyncJobIndexService {
) )
); );
clientWithOrigin.update( client.update(
updateRequest, updateRequest,
new DelegatingIndexNotFoundOrDocumentMissingActionListener<>( new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(
connectorSyncJobId, connectorSyncJobId,
@ -632,7 +629,7 @@ public class ConnectorSyncJobIndexService {
) )
).setRefresh(true).setIndicesOptions(IndicesOptions.fromOptions(true, true, false, false)); ).setRefresh(true).setIndicesOptions(IndicesOptions.fromOptions(true, true, false, false));
clientWithOrigin.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener.delegateFailureAndWrap((l, r) -> { client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener.delegateFailureAndWrap((l, r) -> {
final List<BulkItemResponse.Failure> bulkDeleteFailures = r.getBulkFailures(); final List<BulkItemResponse.Failure> bulkDeleteFailures = r.getBulkFailures();
if (bulkDeleteFailures.isEmpty() == false) { if (bulkDeleteFailures.isEmpty() == false) {
l.onFailure( l.onFailure(
@ -684,7 +681,7 @@ public class ConnectorSyncJobIndexService {
WriteRequest.RefreshPolicy.IMMEDIATE WriteRequest.RefreshPolicy.IMMEDIATE
).doc(document); ).doc(document);
clientWithOrigin.update( client.update(
updateRequest, updateRequest,
new DelegatingIndexNotFoundOrDocumentMissingActionListener<>( new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(
connectorSyncJobId, connectorSyncJobId,

View file

@ -28,7 +28,7 @@ import static org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyn
public class CancelConnectorSyncJobAction { public class CancelConnectorSyncJobAction {
public static final String NAME = "cluster:admin/xpack/connector/sync_job/cancel"; public static final String NAME = "indices:data/write/xpack/connector/sync_job/cancel";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<ConnectorUpdateActionResponse>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<ConnectorUpdateActionResponse>(NAME);
private CancelConnectorSyncJobAction() {/* no instances */} private CancelConnectorSyncJobAction() {/* no instances */}

View file

@ -28,7 +28,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class CheckInConnectorSyncJobAction { public class CheckInConnectorSyncJobAction {
public static final String NAME = "cluster:admin/xpack/connector/sync_job/check_in"; public static final String NAME = "indices:data/write/xpack/connector/sync_job/check_in";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private CheckInConnectorSyncJobAction() {/* no instances */} private CheckInConnectorSyncJobAction() {/* no instances */}

View file

@ -31,7 +31,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class ClaimConnectorSyncJobAction { public class ClaimConnectorSyncJobAction {
public static final ParseField CONNECTOR_SYNC_JOB_ID_FIELD = new ParseField("connector_sync_job_id"); public static final ParseField CONNECTOR_SYNC_JOB_ID_FIELD = new ParseField("connector_sync_job_id");
public static final String NAME = "cluster:admin/xpack/connector/sync_job/claim"; public static final String NAME = "indices:data/write/xpack/connector/sync_job/claim";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private ClaimConnectorSyncJobAction() {/* no instances */} private ClaimConnectorSyncJobAction() {/* no instances */}

View file

@ -8,14 +8,19 @@
package org.elasticsearch.xpack.application.connector.syncjob.action; package org.elasticsearch.xpack.application.connector.syncjob.action;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry;
import java.io.IOException; import java.io.IOException;
/** /**
* Abstract base class for action requests targeting the connector sync job index. * Abstract base class for action requests targeting the connector sync job index.
* Implements {@link org.elasticsearch.action.IndicesRequest} to ensure index-level privilege support.
* This class defines the connectors sync job index as the target for all derived action requests.
*/ */
public abstract class ConnectorSyncJobActionRequest extends ActionRequest { public abstract class ConnectorSyncJobActionRequest extends ActionRequest implements IndicesRequest {
public ConnectorSyncJobActionRequest() { public ConnectorSyncJobActionRequest() {
super(); super();
@ -24,4 +29,14 @@ public abstract class ConnectorSyncJobActionRequest extends ActionRequest {
public ConnectorSyncJobActionRequest(StreamInput in) throws IOException { public ConnectorSyncJobActionRequest(StreamInput in) throws IOException {
super(in); super(in);
} }
@Override
public String[] indices() {
return new String[] { ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN };
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.lenientExpandHidden();
}
} }

View file

@ -28,7 +28,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class DeleteConnectorSyncJobAction { public class DeleteConnectorSyncJobAction {
public static final String NAME = "cluster:admin/xpack/connector/sync_job/delete"; public static final String NAME = "indices:data/write/xpack/connector/sync_job/delete";
public static final ActionType<AcknowledgedResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<AcknowledgedResponse> INSTANCE = new ActionType<>(NAME);
private DeleteConnectorSyncJobAction() {/* no instances */} private DeleteConnectorSyncJobAction() {/* no instances */}

View file

@ -29,7 +29,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class GetConnectorSyncJobAction { public class GetConnectorSyncJobAction {
public static final String NAME = "cluster:admin/xpack/connector/sync_job/get"; public static final String NAME = "indices:data/read/xpack/connector/sync_job/get";
public static final ActionType<GetConnectorSyncJobAction.Response> INSTANCE = new ActionType<>(NAME); public static final ActionType<GetConnectorSyncJobAction.Response> INSTANCE = new ActionType<>(NAME);
private GetConnectorSyncJobAction() {/* no instances */} private GetConnectorSyncJobAction() {/* no instances */}

View file

@ -33,7 +33,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class ListConnectorSyncJobsAction { public class ListConnectorSyncJobsAction {
public static final String NAME = "cluster:admin/xpack/connector/sync_job/list"; public static final String NAME = "indices:data/read/xpack/connector/sync_job/list";
public static final ActionType<ListConnectorSyncJobsAction.Response> INSTANCE = new ActionType<>(NAME); public static final ActionType<ListConnectorSyncJobsAction.Response> INSTANCE = new ActionType<>(NAME);
private ListConnectorSyncJobsAction() {/* no instances */} private ListConnectorSyncJobsAction() {/* no instances */}

View file

@ -18,6 +18,7 @@ import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.application.connector.Connector; import org.elasticsearch.xpack.application.connector.Connector;
import org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry;
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJob; import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJob;
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobTriggerMethod; import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobTriggerMethod;
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobType; import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobType;
@ -31,7 +32,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class PostConnectorSyncJobAction { public class PostConnectorSyncJobAction {
public static final String NAME = "cluster:admin/xpack/connector/sync_job/post"; public static final String NAME = "indices:data/write/xpack/connector/sync_job/post";
public static final ActionType<PostConnectorSyncJobAction.Response> INSTANCE = new ActionType<>(NAME); public static final ActionType<PostConnectorSyncJobAction.Response> INSTANCE = new ActionType<>(NAME);
private PostConnectorSyncJobAction() {/* no instances */} private PostConnectorSyncJobAction() {/* no instances */}
@ -139,6 +140,14 @@ public class PostConnectorSyncJobAction {
public int hashCode() { public int hashCode() {
return Objects.hash(id, jobType, triggerMethod); return Objects.hash(id, jobType, triggerMethod);
} }
@Override
public String[] indices() {
// Creating a new sync job requires reading from connector index
return new String[] {
ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN,
ConnectorTemplateRegistry.CONNECTOR_INDEX_NAME_PATTERN };
}
} }
public static class Response extends ActionResponse implements ToXContentObject { public static class Response extends ActionResponse implements ToXContentObject {

View file

@ -28,7 +28,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class UpdateConnectorSyncJobErrorAction { public class UpdateConnectorSyncJobErrorAction {
public static final String NAME = "cluster:admin/xpack/connector/sync_job/update_error"; public static final String NAME = "indices:data/write/xpack/connector/sync_job/update_error";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorSyncJobErrorAction() {/* no instances */} private UpdateConnectorSyncJobErrorAction() {/* no instances */}

View file

@ -35,7 +35,7 @@ import static org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyn
public class UpdateConnectorSyncJobIngestionStatsAction { public class UpdateConnectorSyncJobIngestionStatsAction {
public static final String NAME = "cluster:admin/xpack/connector/sync_job/update_stats"; public static final String NAME = "indices:data/write/xpack/connector/sync_job/update_stats";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME); public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorSyncJobIngestionStatsAction() {/* no instances */} private UpdateConnectorSyncJobIngestionStatsAction() {/* no instances */}

View file

@ -134,40 +134,40 @@ public class Constants {
"cluster:admin/xpack/ccr/auto_follow_pattern/put", "cluster:admin/xpack/ccr/auto_follow_pattern/put",
"cluster:admin/xpack/ccr/pause_follow", "cluster:admin/xpack/ccr/pause_follow",
"cluster:admin/xpack/ccr/resume_follow", "cluster:admin/xpack/ccr/resume_follow",
"cluster:admin/xpack/connector/delete", "indices:data/write/xpack/connector/delete",
"cluster:admin/xpack/connector/get", "indices:data/read/xpack/connector/get",
"cluster:admin/xpack/connector/list", "indices:data/read/xpack/connector/list",
"cluster:admin/xpack/connector/post", "indices:data/write/xpack/connector/post",
"cluster:admin/xpack/connector/put", "indices:data/write/xpack/connector/put",
"cluster:admin/xpack/connector/update_api_key_id", "indices:data/write/xpack/connector/update_api_key_id",
"cluster:admin/xpack/connector/update_configuration", "indices:data/write/xpack/connector/update_configuration",
"cluster:admin/xpack/connector/update_error", "indices:data/write/xpack/connector/update_error",
"cluster:admin/xpack/connector/update_features", "indices:data/write/xpack/connector/update_features",
"cluster:admin/xpack/connector/update_filtering", "indices:data/write/xpack/connector/update_filtering",
"cluster:admin/xpack/connector/update_filtering/activate", "indices:data/write/xpack/connector/update_filtering/activate",
"cluster:admin/xpack/connector/update_filtering/draft_validation", "indices:data/write/xpack/connector/update_filtering/draft_validation",
"cluster:admin/xpack/connector/update_index_name", "indices:data/write/xpack/connector/update_index_name",
"cluster:admin/xpack/connector/update_last_seen", "indices:data/write/xpack/connector/update_last_seen",
"cluster:admin/xpack/connector/update_last_sync_stats", "indices:data/write/xpack/connector/update_last_sync_stats",
"cluster:admin/xpack/connector/update_name", "indices:data/write/xpack/connector/update_name",
"cluster:admin/xpack/connector/update_native", "indices:data/write/xpack/connector/update_native",
"cluster:admin/xpack/connector/update_pipeline", "indices:data/write/xpack/connector/update_pipeline",
"cluster:admin/xpack/connector/update_scheduling", "indices:data/write/xpack/connector/update_scheduling",
"cluster:admin/xpack/connector/update_service_type", "indices:data/write/xpack/connector/update_service_type",
"cluster:admin/xpack/connector/update_status", "indices:data/write/xpack/connector/update_status",
"cluster:admin/xpack/connector/secret/delete", "cluster:admin/xpack/connector/secret/delete",
"cluster:admin/xpack/connector/secret/get", "cluster:admin/xpack/connector/secret/get",
"cluster:admin/xpack/connector/secret/post", "cluster:admin/xpack/connector/secret/post",
"cluster:admin/xpack/connector/secret/put", "cluster:admin/xpack/connector/secret/put",
"cluster:admin/xpack/connector/sync_job/cancel", "indices:data/write/xpack/connector/sync_job/cancel",
"cluster:admin/xpack/connector/sync_job/check_in", "indices:data/write/xpack/connector/sync_job/check_in",
"cluster:admin/xpack/connector/sync_job/claim", "indices:data/write/xpack/connector/sync_job/claim",
"cluster:admin/xpack/connector/sync_job/delete", "indices:data/write/xpack/connector/sync_job/delete",
"cluster:admin/xpack/connector/sync_job/get", "indices:data/read/xpack/connector/sync_job/get",
"cluster:admin/xpack/connector/sync_job/list", "indices:data/read/xpack/connector/sync_job/list",
"cluster:admin/xpack/connector/sync_job/post", "indices:data/write/xpack/connector/sync_job/post",
"cluster:admin/xpack/connector/sync_job/update_error", "indices:data/write/xpack/connector/sync_job/update_error",
"cluster:admin/xpack/connector/sync_job/update_stats", "indices:data/write/xpack/connector/sync_job/update_stats",
"cluster:admin/xpack/deprecation/info", "cluster:admin/xpack/deprecation/info",
"cluster:admin/xpack/deprecation/nodes/info", "cluster:admin/xpack/deprecation/nodes/info",
"cluster:admin/xpack/enrich/delete", "cluster:admin/xpack/enrich/delete",