Merge remote-tracking branch 'upstream-main/main' into merge-main-9-1-25

This commit is contained in:
Simon Cooper 2025-01-09 16:09:21 +00:00
commit 2e04ed09a9
239 changed files with 3069 additions and 1469 deletions

View file

@ -4,12 +4,13 @@ admin:
- manage_behavioral_analytics
- manage
- monitor
- manage_connector
indices:
- names: [
# indices and search applications
"test-*",
"another-test-search-application",
".elastic-connectors-v1",
".elastic-connectors-sync-jobs-v1"
]
privileges: [ "manage", "write", "read" ]
@ -19,7 +20,6 @@ user:
- manage_api_key
- read_connector_secrets
- write_connector_secrets
- monitor_connector
indices:
- names: [
"test-index1",
@ -27,7 +27,9 @@ user:
"test-search-application-1",
"test-search-application-with-aggs",
"test-search-application-with-list",
"test-search-application-with-list-invalid"
"test-search-application-with-list-invalid",
".elastic-connectors-v1",
".elastic-connectors-sync-jobs-v1"
]
privileges: [ "read" ]

View file

@ -218,6 +218,30 @@ setup:
body:
index_name: search-test
---
'Create Connector - Index name used by deleted connector':
- do:
connector.put:
connector_id: test-connector-1
body:
index_name: search-test
- match: { result: 'created' }
- do:
connector.delete:
connector_id: test-connector-1
- match: { acknowledged: true }
- do:
connector.put:
connector_id: test-connector-2
body:
index_name: search-test
- match: { result: 'created' }
---
'Create Connector - Without index attached':
- do:

View file

@ -71,7 +71,7 @@ setup:
- match: { results.1.language: "nl" }
---
"List Connector- with size":
"List Connector - with size":
- do:
connector.list:
size: 2
@ -106,7 +106,6 @@ setup:
- match: { count: 0 }
---
"List Connector - filter by index names":
- do:
@ -279,4 +278,145 @@ setup:
connector.list: { }
---
"List Connectors - Soft deleted connectors / no deleted":
- requires:
cluster_features: ["connector_soft_deletes"]
reason: Soft deletes were introduced in 9.0 release
- do:
connector.list:
include_deleted: true
- match: { count: 3 }
---
"List Connectors - Single soft deleted connector":
- requires:
cluster_features: ["connector_soft_deletes"]
reason: Soft deletes were introduced in 9.0 release
- do:
connector.delete:
connector_id: connector-a
- do:
connector.list: {}
- match: { count: 2 }
- do:
connector.list:
include_deleted: true
- match: { count: 3 }
---
"List Connectors - Soft deleted connectors":
- requires:
cluster_features: ["connector_soft_deletes"]
reason: Soft deletes were introduced in 9.0 release
- do:
connector.delete:
connector_id: connector-a
- do:
connector.delete:
connector_id: connector-b
- do:
connector.delete:
connector_id: connector-c
- do:
connector.list:
include_deleted: true
- match: { count: 3 }
# Alphabetical order by index_name for results
- match: { results.0.id: "connector-b" }
- match: { results.0.index_name: "content-search-2-test" }
- match: { results.0.language: "en" }
- match: { results.0.deleted: true }
- match: { results.1.id: "connector-a" }
- match: { results.1.index_name: "search-1-test" }
- match: { results.1.language: "pl" }
- match: { results.1.deleted: true }
- match: { results.2.id: "connector-c" }
- match: { results.2.index_name: "search-3-test" }
- match: { results.2.language: "nl" }
- match: { results.2.deleted: true }
---
"List Connectors - Soft deleted with from":
- requires:
cluster_features: ["connector_soft_deletes"]
reason: Soft deletes were introduced in 9.0 release
- do:
connector.delete:
connector_id: connector-a
- do:
connector.delete:
connector_id: connector-b
- do:
connector.delete:
connector_id: connector-c
- do:
connector.list:
from: 1
include_deleted: true
- match: { count: 3 }
# Alphabetical order by index_name for results
- match: { results.0.id: "connector-a" }
- match: { results.0.index_name: "search-1-test" }
- match: { results.0.language: "pl" }
- match: { results.0.deleted: true }
- match: { results.1.id: "connector-c" }
- match: { results.1.index_name: "search-3-test" }
- match: { results.1.language: "nl" }
- match: { results.0.deleted: true }
---
"List Connector - Soft deleted with size":
- requires:
cluster_features: ["connector_soft_deletes"]
reason: Soft deletes were introduced in 9.0 release
- do:
connector.delete:
connector_id: connector-a
- do:
connector.delete:
connector_id: connector-b
- do:
connector.delete:
connector_id: connector-c
- do:
connector.list:
size: 2
include_deleted: true
- match: { count: 3 }
# Alphabetical order by index_name for results
- match: { results.0.id: "connector-b" }
- match: { results.0.index_name: "content-search-2-test" }
- match: { results.0.language: "en" }
- match: { results.0.deleted: true }
- match: { results.1.id: "connector-a" }
- match: { results.1.index_name: "search-1-test" }
- match: { results.1.language: "pl" }
- match: { results.1.deleted: true }

View file

@ -108,6 +108,33 @@ setup:
connector_id: test-nonexistent-connector
---
"Delete Connector - Supports soft deletes":
- requires:
cluster_features: ["connector_soft_deletes"]
reason: Soft deletes were introduced in 9.0 release
- do:
connector.delete:
connector_id: test-connector-to-delete
- match: { acknowledged: true }
- do:
catch: "missing"
connector.get:
connector_id: test-connector-to-delete
- do:
connector.get:
connector_id: test-connector-to-delete
include_deleted: true
- match: { id: test-connector-to-delete }
- match: { index_name: search-1-test }
- match: { service_type: super-connector }
- match: { name: my-connector }
---
"Delete connector fails for unprivileged user":
- skip:

View file

@ -9,9 +9,6 @@ package org.elasticsearch.xpack.application.connector;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ConstructingObjectParser;
@ -63,9 +60,10 @@ import static org.elasticsearch.xpack.application.connector.ConnectorTemplateReg
* <li>A {@link ConnectorStatus} indicating the current status of the connector.</li>
* <li>A sync cursor, used for incremental syncs.</li>
* <li>A boolean flag 'syncNow', which, when set, triggers an immediate synchronization operation.</li>
* <li>A boolean flag 'isDeleted', when set indicates that connector has been soft-deleted. </li>
* </ul>
*/
public class Connector implements NamedWriteable, ToXContentObject {
public class Connector implements ToXContentObject {
public static final String NAME = Connector.class.getName().toUpperCase(Locale.ROOT);
@ -106,6 +104,7 @@ public class Connector implements NamedWriteable, ToXContentObject {
@Nullable
private final Object syncCursor;
private final boolean syncNow;
private final boolean isDeleted;
/**
* Constructor for Connector.
@ -132,6 +131,7 @@ public class Connector implements NamedWriteable, ToXContentObject {
* @param status Current status of the connector.
* @param syncCursor Position or state indicating the current point of synchronization.
* @param syncNow Flag indicating whether an immediate synchronization is requested.
* @param isDeleted Flag indicating whether connector has been soft-deleted.
*/
private Connector(
String connectorId,
@ -155,7 +155,8 @@ public class Connector implements NamedWriteable, ToXContentObject {
String serviceType,
ConnectorStatus status,
Object syncCursor,
boolean syncNow
boolean syncNow,
Boolean isDeleted
) {
this.connectorId = connectorId;
this.apiKeyId = apiKeyId;
@ -179,31 +180,7 @@ public class Connector implements NamedWriteable, ToXContentObject {
this.status = status;
this.syncCursor = syncCursor;
this.syncNow = syncNow;
}
public Connector(StreamInput in) throws IOException {
this.connectorId = in.readOptionalString();
this.apiKeyId = in.readOptionalString();
this.apiKeySecretId = in.readOptionalString();
this.configuration = in.readMap(ConnectorConfiguration::new);
this.customScheduling = in.readMap(ConnectorCustomSchedule::new);
this.description = in.readOptionalString();
this.error = in.readOptionalString();
this.features = in.readOptionalWriteable(ConnectorFeatures::new);
this.filtering = in.readOptionalCollectionAsList(ConnectorFiltering::new);
this.syncJobFiltering = in.readOptionalWriteable(FilteringRules::new);
this.indexName = in.readOptionalString();
this.isNative = in.readBoolean();
this.language = in.readOptionalString();
this.lastSeen = in.readOptionalInstant();
this.syncInfo = in.readOptionalWriteable(ConnectorSyncInfo::new);
this.name = in.readOptionalString();
this.pipeline = in.readOptionalWriteable(ConnectorIngestPipeline::new);
this.scheduling = in.readOptionalWriteable(ConnectorScheduling::new);
this.serviceType = in.readOptionalString();
this.status = in.readEnum(ConnectorStatus.class);
this.syncCursor = in.readGenericValue();
this.syncNow = in.readBoolean();
this.isDeleted = isDeleted;
}
public static final ParseField ID_FIELD = new ParseField("id");
@ -226,6 +203,7 @@ public class Connector implements NamedWriteable, ToXContentObject {
public static final ParseField STATUS_FIELD = new ParseField("status");
public static final ParseField SYNC_CURSOR_FIELD = new ParseField("sync_cursor");
static final ParseField SYNC_NOW_FIELD = new ParseField("sync_now");
public static final ParseField IS_DELETED_FIELD = new ParseField("deleted");
@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<Connector, String> PARSER = new ConstructingObjectParser<>(
@ -265,7 +243,8 @@ public class Connector implements NamedWriteable, ToXContentObject {
.setServiceType((String) args[i++])
.setStatus((ConnectorStatus) args[i++])
.setSyncCursor(args[i++])
.setSyncNow((Boolean) args[i])
.setSyncNow((Boolean) args[i++])
.setIsDeleted((Boolean) args[i])
.build();
}
);
@ -357,6 +336,7 @@ public class Connector implements NamedWriteable, ToXContentObject {
);
PARSER.declareObjectOrNull(optionalConstructorArg(), (p, c) -> p.map(), null, SYNC_CURSOR_FIELD);
PARSER.declareBoolean(optionalConstructorArg(), SYNC_NOW_FIELD);
PARSER.declareBoolean(optionalConstructorArg(), IS_DELETED_FIELD);
}
public static Connector fromXContentBytes(BytesReference source, String docId, XContentType xContentType) {
@ -433,6 +413,7 @@ public class Connector implements NamedWriteable, ToXContentObject {
builder.field(STATUS_FIELD.getPreferredName(), status.toString());
}
builder.field(SYNC_NOW_FIELD.getPreferredName(), syncNow);
builder.field(IS_DELETED_FIELD.getPreferredName(), isDeleted);
}
@Override
@ -445,32 +426,6 @@ public class Connector implements NamedWriteable, ToXContentObject {
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(connectorId);
out.writeOptionalString(apiKeyId);
out.writeOptionalString(apiKeySecretId);
out.writeMap(configuration, StreamOutput::writeWriteable);
out.writeMap(customScheduling, StreamOutput::writeWriteable);
out.writeOptionalString(description);
out.writeOptionalString(error);
out.writeOptionalWriteable(features);
out.writeOptionalCollection(filtering);
out.writeOptionalWriteable(syncJobFiltering);
out.writeOptionalString(indexName);
out.writeBoolean(isNative);
out.writeOptionalString(language);
out.writeOptionalInstant(lastSeen);
out.writeOptionalWriteable(syncInfo);
out.writeOptionalString(name);
out.writeOptionalWriteable(pipeline);
out.writeOptionalWriteable(scheduling);
out.writeOptionalString(serviceType);
out.writeEnum(status);
out.writeGenericValue(syncCursor);
out.writeBoolean(syncNow);
}
public String getConnectorId() {
return connectorId;
}
@ -559,6 +514,10 @@ public class Connector implements NamedWriteable, ToXContentObject {
return syncNow;
}
public boolean isDeleted() {
return isDeleted;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -566,6 +525,7 @@ public class Connector implements NamedWriteable, ToXContentObject {
Connector connector = (Connector) o;
return isNative == connector.isNative
&& syncNow == connector.syncNow
&& isDeleted == connector.isDeleted
&& Objects.equals(connectorId, connector.connectorId)
&& Objects.equals(apiKeyId, connector.apiKeyId)
&& Objects.equals(apiKeySecretId, connector.apiKeySecretId)
@ -612,15 +572,11 @@ public class Connector implements NamedWriteable, ToXContentObject {
serviceType,
status,
syncCursor,
syncNow
syncNow,
isDeleted
);
}
@Override
public String getWriteableName() {
return NAME;
}
public static class Builder {
private String connectorId;
@ -645,6 +601,7 @@ public class Connector implements NamedWriteable, ToXContentObject {
private ConnectorStatus status = ConnectorStatus.CREATED;
private Object syncCursor;
private boolean syncNow;
private boolean isDeleted;
public Builder setConnectorId(String connectorId) {
this.connectorId = connectorId;
@ -756,6 +713,11 @@ public class Connector implements NamedWriteable, ToXContentObject {
return this;
}
public Builder setIsDeleted(Boolean isDeleted) {
this.isDeleted = Objects.requireNonNullElse(isDeleted, false);
return this;
}
public Connector build() {
return new Connector(
connectorId,
@ -779,7 +741,8 @@ public class Connector implements NamedWriteable, ToXContentObject {
serviceType,
status,
syncCursor,
syncNow
syncNow,
isDeleted
);
}
}

View file

@ -13,9 +13,6 @@ import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DelegatingActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
@ -24,7 +21,6 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.BoolQueryBuilder;
@ -78,15 +74,13 @@ import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.application.connector.ConnectorFiltering.fromXContentBytesConnectorFiltering;
import static org.elasticsearch.xpack.application.connector.ConnectorFiltering.sortFilteringRulesByOrder;
import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.MANAGED_CONNECTOR_INDEX_PREFIX;
import static org.elasticsearch.xpack.core.ClientHelper.CONNECTORS_ORIGIN;
/**
* A service that manages persistent {@link Connector} configurations.
*/
public class ConnectorIndexService {
// The client to interact with the system index (internal user).
private final Client clientWithOrigin;
private final Client client;
public static final String CONNECTOR_INDEX_NAME = ConnectorTemplateRegistry.CONNECTOR_INDEX_NAME_PATTERN;
@ -94,7 +88,7 @@ public class ConnectorIndexService {
* @param client A client for executing actions on the connector index
*/
public ConnectorIndexService(Client client) {
this.clientWithOrigin = new OriginSettingClient(client, CONNECTORS_ORIGIN);
this.client = client;
}
/**
@ -140,7 +134,7 @@ public class ConnectorIndexService {
indexRequest = indexRequest.id(connectorId);
}
clientWithOrigin.index(
client.index(
indexRequest,
listener.delegateFailureAndWrap(
(ll, indexResponse) -> ll.onResponse(
@ -197,14 +191,17 @@ public class ConnectorIndexService {
/**
* Gets the {@link Connector} from the underlying index.
*
* @param connectorId The id of the connector object.
* @param listener The action listener to invoke on response/failure.
* @param connectorId The id of the connector object.
* @param includeDeleted If false, returns only the non-deleted connector with the matching ID;
* if true, returns the connector with the matching ID.
* @param listener The action listener to invoke on response/failure.
*/
public void getConnector(String connectorId, ActionListener<ConnectorSearchResult> listener) {
public void getConnector(String connectorId, boolean includeDeleted, ActionListener<ConnectorSearchResult> listener) {
try {
final GetRequest getRequest = new GetRequest(CONNECTOR_INDEX_NAME).id(connectorId).realtime(true);
clientWithOrigin.get(getRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, getResponse) -> {
client.get(getRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, getResponse) -> {
if (getResponse.isExists() == false) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
@ -215,6 +212,12 @@ public class ConnectorIndexService {
.setResultMap(getResponse.getSourceAsMap())
.build();
boolean connectorIsSoftDeleted = includeDeleted == false && isConnectorDeleted(connector);
if (connectorIsSoftDeleted) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(connector);
} catch (Exception e) {
listener.onFailure(e);
@ -226,39 +229,40 @@ public class ConnectorIndexService {
}
/**
* Deletes the {@link Connector} and the related instances of {@link ConnectorSyncJob} in the underlying index.
* Soft deletes the {@link Connector} and optionally removes the related instances of {@link ConnectorSyncJob} in the underlying index.
*
* @param connectorId The id of the {@link Connector}.
* @param shouldDeleteSyncJobs The flag indicating if {@link ConnectorSyncJob} should also be deleted.
* @param listener The action listener to invoke on response/failure.
*/
public void deleteConnector(String connectorId, boolean shouldDeleteSyncJobs, ActionListener<DeleteResponse> listener) {
final DeleteRequest deleteRequest = new DeleteRequest(CONNECTOR_INDEX_NAME).id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
public void deleteConnector(String connectorId, boolean shouldDeleteSyncJobs, ActionListener<UpdateResponse> listener) {
try {
clientWithOrigin.delete(
deleteRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, deleteResponse) -> {
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
// ensure that if connector is soft-deleted, deleting it again results in 404
getConnector(connectorId, false, listener.delegateFailure((l, connector) -> {
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).setRefreshPolicy(
WriteRequest.RefreshPolicy.IMMEDIATE
)
.doc(
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.source(Map.of(Connector.IS_DELETED_FIELD.getPreferredName(), true))
);
client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, l, (ll, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
if (shouldDeleteSyncJobs) {
new ConnectorSyncJobIndexService(clientWithOrigin).deleteAllSyncJobsByConnectorId(
connectorId,
l.map(r -> deleteResponse)
);
new ConnectorSyncJobIndexService(client).deleteAllSyncJobsByConnectorId(connectorId, ll.map(r -> updateResponse));
} else {
l.onResponse(deleteResponse);
ll.onResponse(updateResponse);
}
})
);
}));
}));
} catch (Exception e) {
listener.onFailure(e);
}
}
/**
@ -270,6 +274,7 @@ public class ConnectorIndexService {
* @param connectorNames Filter connectors by connector names, if provided.
* @param serviceTypes Filter connectors by service types, if provided.
* @param searchQuery Apply a wildcard search on index name, connector name, and description, if provided.
* @param includeDeleted If false, filters to include only non-deleted connectors; if true, no filter is applied.
* @param listener Invoked with search results or upon failure.
*/
public void listConnectors(
@ -279,16 +284,17 @@ public class ConnectorIndexService {
List<String> connectorNames,
List<String> serviceTypes,
String searchQuery,
boolean includeDeleted,
ActionListener<ConnectorIndexService.ConnectorResult> listener
) {
try {
final SearchSourceBuilder source = new SearchSourceBuilder().from(from)
.size(size)
.query(buildListQuery(indexNames, connectorNames, serviceTypes, searchQuery))
.query(buildListQuery(indexNames, connectorNames, serviceTypes, searchQuery, includeDeleted))
.fetchSource(true)
.sort(Connector.INDEX_NAME_FIELD.getPreferredName(), SortOrder.ASC);
final SearchRequest req = new SearchRequest(CONNECTOR_INDEX_NAME).source(source);
clientWithOrigin.search(req, new ActionListener<>() {
client.search(req, new ActionListener<>() {
@Override
public void onResponse(SearchResponse searchResponse) {
try {
@ -320,13 +326,15 @@ public class ConnectorIndexService {
* @param connectorNames List of connector names for filtering, or null/empty to skip.
* @param serviceTypes List of connector service types for filtering, or null/empty to skip.
* @param searchQuery Search query for wildcard filtering on index name, connector name, and description, or null/empty to skip.
* @param includeDeleted If false, filters to include only non-deleted connectors; if true, no filter is applied.
* @return A {@link QueryBuilder} customized based on provided filters.
*/
private QueryBuilder buildListQuery(
List<String> indexNames,
List<String> connectorNames,
List<String> serviceTypes,
String searchQuery
String searchQuery,
boolean includeDeleted
) {
boolean filterByIndexNames = indexNames != null && indexNames.isEmpty() == false;
boolean filterByConnectorNames = indexNames != null && connectorNames.isEmpty() == false;
@ -358,7 +366,12 @@ public class ConnectorIndexService {
);
}
}
return usesFilter ? boolFilterQueryBuilder : new MatchAllQueryBuilder();
if (includeDeleted == false) {
boolFilterQueryBuilder.mustNot(new TermQueryBuilder(Connector.IS_DELETED_FIELD.getPreferredName(), true));
}
return boolFilterQueryBuilder;
}
/**
@ -377,7 +390,7 @@ public class ConnectorIndexService {
Map<String, Object> configurationValues = request.getConfigurationValues();
String connectorId = request.getConnectorId();
getConnector(connectorId, listener.delegateFailure((l, connector) -> {
getConnector(connectorId, false, listener.delegateFailure((l, connector) -> {
UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).setRefreshPolicy(
WriteRequest.RefreshPolicy.IMMEDIATE
@ -463,7 +476,7 @@ public class ConnectorIndexService {
return;
}
clientWithOrigin.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, l, (ll, updateResponse) -> {
client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, l, (ll, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
@ -500,16 +513,13 @@ public class ConnectorIndexService {
}
})
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(updateResponse);
})
);
client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(updateResponse);
}));
} catch (Exception e) {
listener.onFailure(e);
}
@ -531,16 +541,13 @@ public class ConnectorIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(updateResponse);
})
);
client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(updateResponse);
}));
} catch (Exception e) {
listener.onFailure(e);
}
@ -561,16 +568,13 @@ public class ConnectorIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(Map.of(Connector.FILTERING_FIELD.getPreferredName(), filtering))
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(updateResponse);
})
);
client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(updateResponse);
}));
} catch (Exception e) {
listener.onFailure(e);
}
@ -591,16 +595,13 @@ public class ConnectorIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(Map.of(Connector.FEATURES_FIELD.getPreferredName(), features))
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(updateResponse);
})
);
client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(updateResponse);
}));
} catch (Exception e) {
listener.onFailure(e);
}
@ -621,7 +622,7 @@ public class ConnectorIndexService {
ActionListener<UpdateResponse> listener
) {
try {
getConnector(connectorId, listener.delegateFailure((l, connector) -> {
getConnector(connectorId, false, listener.delegateFailure((l, connector) -> {
List<ConnectorFiltering> connectorFilteringList = fromXContentBytesConnectorFiltering(
connector.getSourceRef(),
XContentType.JSON
@ -656,16 +657,13 @@ public class ConnectorIndexService {
.source(Map.of(Connector.FILTERING_FIELD.getPreferredName(), List.of(connectorFilteringWithUpdatedDraft)))
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (ll, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
ll.onResponse(updateResponse);
})
);
client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (ll, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
ll.onResponse(updateResponse);
}));
}));
} catch (Exception e) {
@ -684,7 +682,7 @@ public class ConnectorIndexService {
FilteringValidationInfo validation,
ActionListener<UpdateResponse> listener
) {
getConnector(connectorId, listener.delegateFailure((l, connector) -> {
getConnector(connectorId, false, listener.delegateFailure((l, connector) -> {
try {
List<ConnectorFiltering> connectorFilteringList = fromXContentBytesConnectorFiltering(
connector.getSourceRef(),
@ -707,7 +705,7 @@ public class ConnectorIndexService {
.source(Map.of(Connector.FILTERING_FIELD.getPreferredName(), List.of(activatedConnectorFiltering)))
);
clientWithOrigin.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, l, (ll, updateResponse) -> {
client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, l, (ll, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
@ -728,7 +726,7 @@ public class ConnectorIndexService {
* @param listener Listener to respond to a successful response or an error.
*/
public void activateConnectorDraftFiltering(String connectorId, ActionListener<UpdateResponse> listener) {
getConnector(connectorId, listener.delegateFailure((l, connector) -> {
getConnector(connectorId, false, listener.delegateFailure((l, connector) -> {
try {
List<ConnectorFiltering> connectorFilteringList = fromXContentBytesConnectorFiltering(
connector.getSourceRef(),
@ -764,7 +762,7 @@ public class ConnectorIndexService {
.source(Map.of(Connector.FILTERING_FIELD.getPreferredName(), List.of(activatedConnectorFiltering)))
);
clientWithOrigin.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, l, (ll, updateResponse) -> {
client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, l, (ll, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
@ -792,16 +790,13 @@ public class ConnectorIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(Map.of(Connector.LAST_SEEN_FIELD.getPreferredName(), Instant.now()))
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(updateResponse);
})
);
client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(updateResponse);
}));
} catch (Exception e) {
listener.onFailure(e);
}
@ -822,16 +817,13 @@ public class ConnectorIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(updateResponse);
})
);
client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(updateResponse);
}));
} catch (Exception e) {
listener.onFailure(e);
}
@ -849,7 +841,7 @@ public class ConnectorIndexService {
String connectorId = request.getConnectorId();
boolean isNative = request.isNative();
getConnector(connectorId, listener.delegateFailure((l, connector) -> {
getConnector(connectorId, false, listener.delegateFailure((l, connector) -> {
String indexName = getConnectorIndexNameFromSearchResult(connector);
@ -895,16 +887,13 @@ public class ConnectorIndexService {
)
)
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (ll, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
ll.onResponse(updateResponse);
})
);
client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (ll, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
ll.onResponse(updateResponse);
}));
}));
} catch (Exception e) {
listener.onFailure(e);
@ -927,16 +916,13 @@ public class ConnectorIndexService {
.source(Map.of(Connector.PIPELINE_FIELD.getPreferredName(), request.getPipeline()))
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(updateResponse);
})
);
client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(updateResponse);
}));
} catch (Exception e) {
listener.onFailure(e);
}
@ -966,7 +952,7 @@ public class ConnectorIndexService {
return;
}
getConnector(connectorId, l.delegateFailure((ll, connector) -> {
getConnector(connectorId, false, l.delegateFailure((ll, connector) -> {
Boolean isNativeConnector = getConnectorIsNativeFlagFromSearchResult(connector);
Boolean doesNotHaveContentPrefix = indexName != null && isValidManagedConnectorIndexName(indexName) == false;
@ -995,7 +981,7 @@ public class ConnectorIndexService {
}
})
);
clientWithOrigin.update(
client.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (lll, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
@ -1028,16 +1014,13 @@ public class ConnectorIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(Map.of(Connector.SCHEDULING_FIELD.getPreferredName(), request.getScheduling()))
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(updateResponse);
})
);
client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(updateResponse);
}));
} catch (Exception e) {
listener.onFailure(e);
}
@ -1052,7 +1035,7 @@ public class ConnectorIndexService {
public void updateConnectorServiceType(UpdateConnectorServiceTypeAction.Request request, ActionListener<UpdateResponse> listener) {
try {
String connectorId = request.getConnectorId();
getConnector(connectorId, listener.delegateFailure((l, connector) -> {
getConnector(connectorId, false, listener.delegateFailure((l, connector) -> {
ConnectorStatus prevStatus = getConnectorStatusFromSearchResult(connector);
ConnectorStatus newStatus = prevStatus == ConnectorStatus.CREATED
@ -1073,7 +1056,7 @@ public class ConnectorIndexService {
)
);
clientWithOrigin.update(
client.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (updateListener, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
@ -1099,7 +1082,7 @@ public class ConnectorIndexService {
try {
String connectorId = request.getConnectorId();
ConnectorStatus newStatus = request.getStatus();
getConnector(connectorId, listener.delegateFailure((l, connector) -> {
getConnector(connectorId, false, listener.delegateFailure((l, connector) -> {
ConnectorStatus prevStatus = getConnectorStatusFromSearchResult(connector);
@ -1116,7 +1099,7 @@ public class ConnectorIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(Map.of(Connector.STATUS_FIELD.getPreferredName(), request.getStatus()))
);
clientWithOrigin.update(
client.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (updateListener, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
@ -1144,16 +1127,13 @@ public class ConnectorIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(updateResponse);
})
);
client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
l.onResponse(updateResponse);
}));
} catch (Exception e) {
listener.onFailure(e);
}
@ -1180,6 +1160,11 @@ public class ConnectorIndexService {
return (Map<String, Object>) searchResult.getResultMap().get(Connector.CONFIGURATION_FIELD.getPreferredName());
}
private boolean isConnectorDeleted(ConnectorSearchResult searchResult) {
Boolean isDeletedFlag = (Boolean) searchResult.getResultMap().get(Connector.IS_DELETED_FIELD.getPreferredName());
return Boolean.TRUE.equals(isDeletedFlag);
}
private static ConnectorIndexService.ConnectorResult mapSearchResponseToConnectorList(SearchResponse response) {
final List<ConnectorSearchResult> connectorResults = Arrays.stream(response.getHits().getHits())
.map(ConnectorIndexService::hitToConnector)
@ -1199,7 +1184,7 @@ public class ConnectorIndexService {
/**
* This method determines if any documents in the connector index have the same index name as the one specified,
* excluding the document with the given _id if it is provided.
* excluding the docs marked as deleted (soft-deleted) and document with the given _id if it is provided.
*
* @param indexName The name of the index to check for existence in the connector index.
* @param connectorId The ID of the {@link Connector} to exclude from the search. Can be null if no document should be excluded.
@ -1215,6 +1200,9 @@ public class ConnectorIndexService {
boolFilterQueryBuilder.must().add(new TermQueryBuilder(Connector.INDEX_NAME_FIELD.getPreferredName(), indexName));
// exclude soft-deleted connectors
boolFilterQueryBuilder.mustNot(new TermQueryBuilder(Connector.IS_DELETED_FIELD.getPreferredName(), true));
// If we know the connector _id, exclude this from search query
if (connectorId != null) {
boolFilterQueryBuilder.mustNot(new IdsQueryBuilder().addIds(connectorId));
@ -1223,7 +1211,7 @@ public class ConnectorIndexService {
final SearchSourceBuilder searchSource = new SearchSourceBuilder().query(boolFilterQueryBuilder);
final SearchRequest searchRequest = new SearchRequest(CONNECTOR_INDEX_NAME).source(searchSource);
clientWithOrigin.search(searchRequest, new ActionListener<>() {
client.search(searchRequest, new ActionListener<>() {
@Override
public void onResponse(SearchResponse searchResponse) {
boolean indexNameIsInUse = searchResponse.getHits().getTotalHits().value() > 0L;

View file

@ -9,9 +9,12 @@ package org.elasticsearch.xpack.application.connector.action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry;
import java.io.IOException;
@ -19,9 +22,10 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.MANAGED_CONNECTOR_INDEX_PREFIX;
/**
* Abstract base class for action requests targeting the connectors index.
* Abstract base class for action requests targeting the connectors index. Implements {@link org.elasticsearch.action.IndicesRequest}
* to ensure index-level privilege support. This class defines the connectors index as the target for all derived action requests.
*/
public abstract class ConnectorActionRequest extends ActionRequest {
public abstract class ConnectorActionRequest extends ActionRequest implements IndicesRequest {
public ConnectorActionRequest() {
super();
@ -74,4 +78,14 @@ public abstract class ConnectorActionRequest extends ActionRequest {
}
return validationException;
}
@Override
public String[] indices() {
return new String[] { ConnectorTemplateRegistry.CONNECTOR_INDEX_NAME_PATTERN };
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.lenientExpandHidden();
}
}

View file

@ -18,6 +18,7 @@ import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry;
import java.io.IOException;
import java.util.Objects;
@ -27,7 +28,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class DeleteConnectorAction {
public static final String NAME = "cluster:admin/xpack/connector/delete";
public static final String NAME = "indices:data/write/xpack/connector/delete";
public static final ActionType<AcknowledgedResponse> INSTANCE = new ActionType<>(NAME);
private DeleteConnectorAction() {/* no instances */}
@ -70,6 +71,14 @@ public class DeleteConnectorAction {
return deleteSyncJobs;
}
@Override
public String[] indices() {
// When deleting a connector, corresponding sync jobs can also be deleted
return new String[] {
ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN,
ConnectorTemplateRegistry.CONNECTOR_INDEX_NAME_PATTERN };
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View file

@ -10,6 +10,7 @@ package org.elasticsearch.xpack.application.connector.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -25,10 +26,11 @@ import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class GetConnectorAction {
public static final String NAME = "cluster:admin/xpack/connector/get";
public static final String NAME = "indices:data/read/xpack/connector/get";
public static final ActionType<GetConnectorAction.Response> INSTANCE = new ActionType<>(NAME);
private GetConnectorAction() {/* no instances */}
@ -36,22 +38,25 @@ public class GetConnectorAction {
public static class Request extends ConnectorActionRequest implements ToXContentObject {
private final String connectorId;
private final Boolean includeDeleted;
private static final ParseField CONNECTOR_ID_FIELD = new ParseField("connector_id");
public Request(String connectorId) {
this.connectorId = connectorId;
}
private static final ParseField INCLUDE_DELETED_FIELD = new ParseField("include_deleted");
public Request(StreamInput in) throws IOException {
super(in);
this.connectorId = in.readString();
public Request(String connectorId, Boolean includeDeleted) {
this.connectorId = connectorId;
this.includeDeleted = includeDeleted;
}
public String getConnectorId() {
return connectorId;
}
public Boolean getIncludeDeleted() {
return includeDeleted;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
@ -65,8 +70,7 @@ public class GetConnectorAction {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(connectorId);
TransportAction.localOnly();
}
@Override
@ -74,12 +78,12 @@ public class GetConnectorAction {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Objects.equals(connectorId, request.connectorId);
return Objects.equals(connectorId, request.connectorId) && Objects.equals(includeDeleted, request.includeDeleted);
}
@Override
public int hashCode() {
return Objects.hash(connectorId);
return Objects.hash(connectorId, includeDeleted);
}
@Override
@ -87,6 +91,7 @@ public class GetConnectorAction {
builder.startObject();
{
builder.field(CONNECTOR_ID_FIELD.getPreferredName(), connectorId);
builder.field(INCLUDE_DELETED_FIELD.getPreferredName(), includeDeleted);
}
builder.endObject();
return builder;
@ -95,11 +100,12 @@ public class GetConnectorAction {
private static final ConstructingObjectParser<Request, Void> PARSER = new ConstructingObjectParser<>(
"get_connector_request",
false,
(p) -> new Request((String) p[0])
(p) -> new Request((String) p[0], (Boolean) p[1])
);
static {
PARSER.declareString(constructorArg(), CONNECTOR_ID_FIELD);
PARSER.declareBoolean(optionalConstructorArg(), INCLUDE_DELETED_FIELD);
}
public static Request parse(XContentParser parser) {

View file

@ -10,6 +10,7 @@ package org.elasticsearch.xpack.application.connector.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -34,7 +35,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class ListConnectorAction {
public static final String NAME = "cluster:admin/xpack/connector/list";
public static final String NAME = "indices:data/read/xpack/connector/list";
public static final ActionType<ListConnectorAction.Response> INSTANCE = new ActionType<>(NAME);
private ListConnectorAction() {/* no instances */}
@ -46,33 +47,28 @@ public class ListConnectorAction {
private final List<String> connectorNames;
private final List<String> connectorServiceTypes;
private final String connectorSearchQuery;
private final Boolean includeDeleted;
private static final ParseField PAGE_PARAMS_FIELD = new ParseField("pageParams");
private static final ParseField INDEX_NAMES_FIELD = new ParseField("index_names");
private static final ParseField NAMES_FIELD = new ParseField("names");
private static final ParseField SEARCH_QUERY_FIELD = new ParseField("query");
public Request(StreamInput in) throws IOException {
super(in);
this.pageParams = new PageParams(in);
this.indexNames = in.readOptionalStringCollectionAsList();
this.connectorNames = in.readOptionalStringCollectionAsList();
this.connectorServiceTypes = in.readOptionalStringCollectionAsList();
this.connectorSearchQuery = in.readOptionalString();
}
private static final ParseField INCLUDE_DELETED_FIELD = new ParseField("include_deleted");
public Request(
PageParams pageParams,
List<String> indexNames,
List<String> connectorNames,
List<String> serviceTypes,
String connectorSearchQuery
String connectorSearchQuery,
Boolean includeDeleted
) {
this.pageParams = pageParams;
this.indexNames = indexNames;
this.connectorNames = connectorNames;
this.connectorServiceTypes = serviceTypes;
this.connectorSearchQuery = connectorSearchQuery;
this.includeDeleted = includeDeleted;
}
public PageParams getPageParams() {
@ -95,6 +91,10 @@ public class ListConnectorAction {
return connectorSearchQuery;
}
public Boolean getIncludeDeleted() {
return includeDeleted;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
@ -114,12 +114,7 @@ public class ListConnectorAction {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
pageParams.writeTo(out);
out.writeOptionalStringCollection(indexNames);
out.writeOptionalStringCollection(connectorNames);
out.writeOptionalStringCollection(connectorServiceTypes);
out.writeOptionalString(connectorSearchQuery);
TransportAction.localOnly();
}
@Override
@ -131,7 +126,8 @@ public class ListConnectorAction {
&& Objects.equals(indexNames, request.indexNames)
&& Objects.equals(connectorNames, request.connectorNames)
&& Objects.equals(connectorServiceTypes, request.connectorServiceTypes)
&& Objects.equals(connectorSearchQuery, request.connectorSearchQuery);
&& Objects.equals(connectorSearchQuery, request.connectorSearchQuery)
&& Objects.equals(includeDeleted, request.includeDeleted);
}
@Override
@ -147,7 +143,8 @@ public class ListConnectorAction {
(List<String>) p[1],
(List<String>) p[2],
(List<String>) p[3],
(String) p[4]
(String) p[4],
(Boolean) p[5]
)
);
@ -157,6 +154,7 @@ public class ListConnectorAction {
PARSER.declareStringArray(optionalConstructorArg(), NAMES_FIELD);
PARSER.declareStringArray(optionalConstructorArg(), Connector.SERVICE_TYPE_FIELD);
PARSER.declareString(optionalConstructorArg(), SEARCH_QUERY_FIELD);
PARSER.declareBoolean(optionalConstructorArg(), INCLUDE_DELETED_FIELD);
}
public static ListConnectorAction.Request parse(XContentParser parser) {
@ -172,6 +170,7 @@ public class ListConnectorAction {
builder.field(NAMES_FIELD.getPreferredName(), connectorNames);
builder.field(Connector.SERVICE_TYPE_FIELD.getPreferredName(), connectorServiceTypes);
builder.field(SEARCH_QUERY_FIELD.getPreferredName(), connectorSearchQuery);
builder.field(INCLUDE_DELETED_FIELD.getPreferredName(), includeDeleted);
}
builder.endObject();
return builder;

View file

@ -25,7 +25,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class PostConnectorAction {
public static final String NAME = "cluster:admin/xpack/connector/post";
public static final String NAME = "indices:data/write/xpack/connector/post";
public static final ActionType<ConnectorCreateActionResponse> INSTANCE = new ActionType<>(NAME);
private PostConnectorAction() {/* no instances */}

View file

@ -9,6 +9,7 @@ package org.elasticsearch.xpack.application.connector.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -26,12 +27,12 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class PutConnectorAction {
public static final String NAME = "cluster:admin/xpack/connector/put";
public static final String NAME = "indices:data/write/xpack/connector/put";
public static final ActionType<ConnectorCreateActionResponse> INSTANCE = new ActionType<>(NAME);
private PutConnectorAction() {/* no instances */}
public static class Request extends ConnectorActionRequest implements ToXContentObject {
public static class Request extends ConnectorActionRequest implements IndicesRequest, ToXContentObject {
@Nullable
private final String connectorId;

View file

@ -36,7 +36,8 @@ public class RestGetConnectorAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
GetConnectorAction.Request request = new GetConnectorAction.Request(restRequest.param(CONNECTOR_ID_PARAM));
Boolean includeDeleted = restRequest.paramAsBoolean("include_deleted", false);
GetConnectorAction.Request request = new GetConnectorAction.Request(restRequest.param(CONNECTOR_ID_PARAM), includeDeleted);
return channel -> client.execute(GetConnectorAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View file

@ -43,13 +43,15 @@ public class RestListConnectorAction extends BaseRestHandler {
List<String> connectorNames = List.of(restRequest.paramAsStringArray("connector_name", new String[0]));
List<String> serviceTypes = List.of(restRequest.paramAsStringArray("service_type", new String[0]));
String searchQuery = restRequest.param("query");
Boolean includeDeleted = restRequest.paramAsBoolean("include_deleted", false);
ListConnectorAction.Request request = new ListConnectorAction.Request(
new PageParams(from, size),
indexNames,
connectorNames,
serviceTypes,
searchQuery
searchQuery,
includeDeleted
);
return channel -> client.execute(ListConnectorAction.INSTANCE, request, new RestToXContentListener<>(channel));

View file

@ -9,7 +9,7 @@ package org.elasticsearch.xpack.application.connector.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.injection.guice.Inject;
@ -17,23 +17,21 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.application.connector.ConnectorIndexService;
public class TransportGetConnectorAction extends HandledTransportAction<GetConnectorAction.Request, GetConnectorAction.Response> {
public class TransportGetConnectorAction extends TransportAction<GetConnectorAction.Request, GetConnectorAction.Response> {
protected final ConnectorIndexService connectorIndexService;
@Inject
public TransportGetConnectorAction(TransportService transportService, ActionFilters actionFilters, Client client) {
super(
GetConnectorAction.NAME,
transportService,
actionFilters,
GetConnectorAction.Request::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
super(GetConnectorAction.NAME, actionFilters, transportService.getTaskManager(), EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.connectorIndexService = new ConnectorIndexService(client);
}
@Override
protected void doExecute(Task task, GetConnectorAction.Request request, ActionListener<GetConnectorAction.Response> listener) {
connectorIndexService.getConnector(request.getConnectorId(), listener.map(GetConnectorAction.Response::new));
connectorIndexService.getConnector(
request.getConnectorId(),
request.getIncludeDeleted(),
listener.map(GetConnectorAction.Response::new)
);
}
}

View file

@ -9,7 +9,7 @@ package org.elasticsearch.xpack.application.connector.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.injection.guice.Inject;
@ -18,18 +18,12 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.application.connector.ConnectorIndexService;
import org.elasticsearch.xpack.core.action.util.PageParams;
public class TransportListConnectorAction extends HandledTransportAction<ListConnectorAction.Request, ListConnectorAction.Response> {
public class TransportListConnectorAction extends TransportAction<ListConnectorAction.Request, ListConnectorAction.Response> {
protected final ConnectorIndexService connectorIndexService;
@Inject
public TransportListConnectorAction(TransportService transportService, ActionFilters actionFilters, Client client) {
super(
ListConnectorAction.NAME,
transportService,
actionFilters,
ListConnectorAction.Request::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
super(ListConnectorAction.NAME, actionFilters, transportService.getTaskManager(), EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.connectorIndexService = new ConnectorIndexService(client);
}
@ -44,6 +38,8 @@ public class TransportListConnectorAction extends HandledTransportAction<ListCon
request.getConnectorNames(),
request.getConnectorServiceTypes(),
request.getConnectorSearchQuery(),
request.getIncludeDeleted(),
listener.map(r -> new ListConnectorAction.Response(r.connectors(), r.totalResults()))
);
}

View file

@ -22,7 +22,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
public class UpdateConnectorActiveFilteringAction {
public static final String NAME = "cluster:admin/xpack/connector/update_filtering/activate";
public static final String NAME = "indices:data/write/xpack/connector/update_filtering/activate";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorActiveFilteringAction() {/* no instances */}

View file

@ -27,7 +27,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class UpdateConnectorApiKeyIdAction {
public static final String NAME = "cluster:admin/xpack/connector/update_api_key_id";
public static final String NAME = "indices:data/write/xpack/connector/update_api_key_id";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorApiKeyIdAction() {/* no instances */}

View file

@ -32,7 +32,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class UpdateConnectorConfigurationAction {
public static final String NAME = "cluster:admin/xpack/connector/update_configuration";
public static final String NAME = "indices:data/write/xpack/connector/update_configuration";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorConfigurationAction() {/* no instances */}

View file

@ -27,7 +27,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class UpdateConnectorErrorAction {
public static final String NAME = "cluster:admin/xpack/connector/update_error";
public static final String NAME = "indices:data/write/xpack/connector/update_error";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorErrorAction() {/* no instances */}

View file

@ -27,7 +27,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class UpdateConnectorFeaturesAction {
public static final String NAME = "cluster:admin/xpack/connector/update_features";
public static final String NAME = "indices:data/write/xpack/connector/update_features";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorFeaturesAction() {/* no instances */}

View file

@ -32,7 +32,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class UpdateConnectorFilteringAction {
public static final String NAME = "cluster:admin/xpack/connector/update_filtering";
public static final String NAME = "indices:data/write/xpack/connector/update_filtering";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorFilteringAction() {/* no instances */}

View file

@ -27,7 +27,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class UpdateConnectorFilteringValidationAction {
public static final String NAME = "cluster:admin/xpack/connector/update_filtering/draft_validation";
public static final String NAME = "indices:data/write/xpack/connector/update_filtering/draft_validation";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorFilteringValidationAction() {/* no instances */}

View file

@ -27,7 +27,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class UpdateConnectorIndexNameAction {
public static final String NAME = "cluster:admin/xpack/connector/update_index_name";
public static final String NAME = "indices:data/write/xpack/connector/update_index_name";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorIndexNameAction() {/* no instances */}

View file

@ -23,7 +23,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
public class UpdateConnectorLastSeenAction {
public static final String NAME = "cluster:admin/xpack/connector/update_last_seen";
public static final String NAME = "indices:data/write/xpack/connector/update_last_seen";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorLastSeenAction() {/* no instances */}

View file

@ -32,7 +32,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class UpdateConnectorLastSyncStatsAction {
public static final String NAME = "cluster:admin/xpack/connector/update_last_sync_stats";
public static final String NAME = "indices:data/write/xpack/connector/update_last_sync_stats";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorLastSyncStatsAction() {/* no instances */}

View file

@ -27,7 +27,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class UpdateConnectorNameAction {
public static final String NAME = "cluster:admin/xpack/connector/update_name";
public static final String NAME = "indices:data/write/xpack/connector/update_name";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorNameAction() {/* no instances */}

View file

@ -26,7 +26,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class UpdateConnectorNativeAction {
public static final String NAME = "cluster:admin/xpack/connector/update_native";
public static final String NAME = "indices:data/write/xpack/connector/update_native";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorNativeAction() {/* no instances */}

View file

@ -27,7 +27,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class UpdateConnectorPipelineAction {
public static final String NAME = "cluster:admin/xpack/connector/update_pipeline";
public static final String NAME = "indices:data/write/xpack/connector/update_pipeline";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorPipelineAction() {/* no instances */}

View file

@ -32,7 +32,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class UpdateConnectorSchedulingAction {
public static final String NAME = "cluster:admin/xpack/connector/update_scheduling";
public static final String NAME = "indices:data/write/xpack/connector/update_scheduling";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorSchedulingAction() {/* no instances */}

View file

@ -26,7 +26,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class UpdateConnectorServiceTypeAction {
public static final String NAME = "cluster:admin/xpack/connector/update_service_type";
public static final String NAME = "indices:data/write/xpack/connector/update_service_type";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorServiceTypeAction() {/* no instances */}

View file

@ -28,7 +28,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class UpdateConnectorStatusAction {
public static final String NAME = "cluster:admin/xpack/connector/update_status";
public static final String NAME = "indices:data/write/xpack/connector/update_status";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
public UpdateConnectorStatusAction() {/* no instances */}

View file

@ -10,9 +10,6 @@ package org.elasticsearch.xpack.application.connector.syncjob;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ConstructingObjectParser;
@ -65,7 +62,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
* <li>The hostname of the worker to run the sync job.</li>
* </ul>
*/
public class ConnectorSyncJob implements Writeable, ToXContentObject {
public class ConnectorSyncJob implements ToXContentObject {
static final ParseField CANCELATION_REQUESTED_AT_FIELD = new ParseField("cancelation_requested_at");
@ -213,27 +210,6 @@ public class ConnectorSyncJob implements Writeable, ToXContentObject {
this.workerHostname = workerHostname;
}
public ConnectorSyncJob(StreamInput in) throws IOException {
this.cancelationRequestedAt = in.readOptionalInstant();
this.canceledAt = in.readOptionalInstant();
this.completedAt = in.readOptionalInstant();
this.connector = in.readNamedWriteable(Connector.class);
this.createdAt = in.readInstant();
this.deletedDocumentCount = in.readLong();
this.error = in.readOptionalString();
this.id = in.readString();
this.indexedDocumentCount = in.readLong();
this.indexedDocumentVolume = in.readLong();
this.jobType = in.readEnum(ConnectorSyncJobType.class);
this.lastSeen = in.readOptionalInstant();
this.metadata = in.readMap(StreamInput::readString, StreamInput::readGenericValue);
this.startedAt = in.readOptionalInstant();
this.status = in.readEnum(ConnectorSyncStatus.class);
this.totalDocumentCount = in.readOptionalLong();
this.triggerMethod = in.readEnum(ConnectorSyncJobTriggerMethod.class);
this.workerHostname = in.readOptionalString();
}
@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<ConnectorSyncJob, String> PARSER = new ConstructingObjectParser<>(
"connector_sync_job",
@ -548,28 +524,6 @@ public class ConnectorSyncJob implements Writeable, ToXContentObject {
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalInstant(cancelationRequestedAt);
out.writeOptionalInstant(canceledAt);
out.writeOptionalInstant(completedAt);
out.writeNamedWriteable(connector);
out.writeInstant(createdAt);
out.writeLong(deletedDocumentCount);
out.writeOptionalString(error);
out.writeString(id);
out.writeLong(indexedDocumentCount);
out.writeLong(indexedDocumentVolume);
out.writeEnum(jobType);
out.writeOptionalInstant(lastSeen);
out.writeMap(metadata, StreamOutput::writeString, StreamOutput::writeGenericValue);
out.writeOptionalInstant(startedAt);
out.writeEnum(status);
out.writeOptionalLong(totalDocumentCount);
out.writeEnum(triggerMethod);
out.writeOptionalString(workerHostname);
}
public boolean equals(Object other) {
if (this == other) return true;
if (other == null || getClass() != other.getClass()) return false;

View file

@ -28,7 +28,6 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.DocumentMissingException;
@ -69,7 +68,6 @@ import java.util.stream.Stream;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.application.connector.ConnectorIndexService.CONNECTOR_INDEX_NAME;
import static org.elasticsearch.xpack.core.ClientHelper.CONNECTORS_ORIGIN;
/**
* A service that manages persistent {@link ConnectorSyncJob} configurations.
@ -78,8 +76,7 @@ public class ConnectorSyncJobIndexService {
private static final Long ZERO = 0L;
// The client to interact with the system index (internal user).
private final Client clientWithOrigin;
private final Client client;
public static final String CONNECTOR_SYNC_JOB_INDEX_NAME = ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN;
@ -87,7 +84,7 @@ public class ConnectorSyncJobIndexService {
* @param client A client for executing actions on the connectors sync jobs index.
*/
public ConnectorSyncJobIndexService(Client client) {
this.clientWithOrigin = new OriginSettingClient(client, CONNECTORS_ORIGIN);
this.client = client;
}
/**
@ -152,7 +149,7 @@ public class ConnectorSyncJobIndexService {
indexRequest.source(syncJob.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS));
clientWithOrigin.index(
client.index(
indexRequest,
l.delegateFailureAndWrap(
(ll, indexResponse) -> ll.onResponse(new PostConnectorSyncJobAction.Response(indexResponse.getId()))
@ -178,7 +175,7 @@ public class ConnectorSyncJobIndexService {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try {
clientWithOrigin.delete(
client.delete(
deleteRequest,
new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(connectorSyncJobId, listener, (l, deleteResponse) -> {
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
@ -208,7 +205,7 @@ public class ConnectorSyncJobIndexService {
).doc(Map.of(ConnectorSyncJob.LAST_SEEN_FIELD.getPreferredName(), newLastSeen));
try {
clientWithOrigin.update(
client.update(
updateRequest,
new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(connectorSyncJobId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
@ -233,7 +230,7 @@ public class ConnectorSyncJobIndexService {
final GetRequest getRequest = new GetRequest(CONNECTOR_SYNC_JOB_INDEX_NAME).id(connectorSyncJobId).realtime(true);
try {
clientWithOrigin.get(
client.get(
getRequest,
new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(connectorSyncJobId, listener, (l, getResponse) -> {
if (getResponse.isExists() == false) {
@ -309,7 +306,7 @@ public class ConnectorSyncJobIndexService {
WriteRequest.RefreshPolicy.IMMEDIATE
).doc(syncJobFieldsToUpdate);
clientWithOrigin.update(
client.update(
updateRequest,
new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(
connectorSyncJobId,
@ -358,7 +355,7 @@ public class ConnectorSyncJobIndexService {
final SearchRequest searchRequest = new SearchRequest(CONNECTOR_SYNC_JOB_INDEX_NAME).source(searchSource);
clientWithOrigin.search(searchRequest, new ActionListener<>() {
client.search(searchRequest, new ActionListener<>() {
@Override
public void onResponse(SearchResponse searchResponse) {
try {
@ -477,7 +474,7 @@ public class ConnectorSyncJobIndexService {
).doc(fieldsToUpdate);
try {
clientWithOrigin.update(
client.update(
updateRequest,
new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(syncJobId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
@ -504,7 +501,7 @@ public class ConnectorSyncJobIndexService {
final GetRequest request = new GetRequest(CONNECTOR_INDEX_NAME, connectorId);
clientWithOrigin.get(request, new ActionListener<>() {
client.get(request, new ActionListener<>() {
@Override
public void onResponse(GetResponse response) {
final boolean connectorDoesNotExist = response.isExists() == false;
@ -597,7 +594,7 @@ public class ConnectorSyncJobIndexService {
)
);
clientWithOrigin.update(
client.update(
updateRequest,
new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(
connectorSyncJobId,
@ -632,7 +629,7 @@ public class ConnectorSyncJobIndexService {
)
).setRefresh(true).setIndicesOptions(IndicesOptions.fromOptions(true, true, false, false));
clientWithOrigin.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener.delegateFailureAndWrap((l, r) -> {
client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener.delegateFailureAndWrap((l, r) -> {
final List<BulkItemResponse.Failure> bulkDeleteFailures = r.getBulkFailures();
if (bulkDeleteFailures.isEmpty() == false) {
l.onFailure(
@ -684,7 +681,7 @@ public class ConnectorSyncJobIndexService {
WriteRequest.RefreshPolicy.IMMEDIATE
).doc(document);
clientWithOrigin.update(
client.update(
updateRequest,
new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(
connectorSyncJobId,

View file

@ -28,7 +28,7 @@ import static org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyn
public class CancelConnectorSyncJobAction {
public static final String NAME = "cluster:admin/xpack/connector/sync_job/cancel";
public static final String NAME = "indices:data/write/xpack/connector/sync_job/cancel";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<ConnectorUpdateActionResponse>(NAME);
private CancelConnectorSyncJobAction() {/* no instances */}

View file

@ -28,7 +28,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class CheckInConnectorSyncJobAction {
public static final String NAME = "cluster:admin/xpack/connector/sync_job/check_in";
public static final String NAME = "indices:data/write/xpack/connector/sync_job/check_in";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private CheckInConnectorSyncJobAction() {/* no instances */}

View file

@ -31,7 +31,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class ClaimConnectorSyncJobAction {
public static final ParseField CONNECTOR_SYNC_JOB_ID_FIELD = new ParseField("connector_sync_job_id");
public static final String NAME = "cluster:admin/xpack/connector/sync_job/claim";
public static final String NAME = "indices:data/write/xpack/connector/sync_job/claim";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private ClaimConnectorSyncJobAction() {/* no instances */}

View file

@ -8,14 +8,19 @@
package org.elasticsearch.xpack.application.connector.syncjob.action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry;
import java.io.IOException;
/**
* Abstract base class for action requests targeting the connector sync job index.
* Implements {@link org.elasticsearch.action.IndicesRequest} to ensure index-level privilege support.
* This class defines the connectors sync job index as the target for all derived action requests.
*/
public abstract class ConnectorSyncJobActionRequest extends ActionRequest {
public abstract class ConnectorSyncJobActionRequest extends ActionRequest implements IndicesRequest {
public ConnectorSyncJobActionRequest() {
super();
@ -24,4 +29,14 @@ public abstract class ConnectorSyncJobActionRequest extends ActionRequest {
public ConnectorSyncJobActionRequest(StreamInput in) throws IOException {
super(in);
}
@Override
public String[] indices() {
return new String[] { ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN };
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.lenientExpandHidden();
}
}

View file

@ -28,7 +28,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class DeleteConnectorSyncJobAction {
public static final String NAME = "cluster:admin/xpack/connector/sync_job/delete";
public static final String NAME = "indices:data/write/xpack/connector/sync_job/delete";
public static final ActionType<AcknowledgedResponse> INSTANCE = new ActionType<>(NAME);
private DeleteConnectorSyncJobAction() {/* no instances */}

View file

@ -29,7 +29,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class GetConnectorSyncJobAction {
public static final String NAME = "cluster:admin/xpack/connector/sync_job/get";
public static final String NAME = "indices:data/read/xpack/connector/sync_job/get";
public static final ActionType<GetConnectorSyncJobAction.Response> INSTANCE = new ActionType<>(NAME);
private GetConnectorSyncJobAction() {/* no instances */}

View file

@ -33,7 +33,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class ListConnectorSyncJobsAction {
public static final String NAME = "cluster:admin/xpack/connector/sync_job/list";
public static final String NAME = "indices:data/read/xpack/connector/sync_job/list";
public static final ActionType<ListConnectorSyncJobsAction.Response> INSTANCE = new ActionType<>(NAME);
private ListConnectorSyncJobsAction() {/* no instances */}

View file

@ -18,6 +18,7 @@ import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.application.connector.Connector;
import org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry;
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJob;
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobTriggerMethod;
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobType;
@ -31,7 +32,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class PostConnectorSyncJobAction {
public static final String NAME = "cluster:admin/xpack/connector/sync_job/post";
public static final String NAME = "indices:data/write/xpack/connector/sync_job/post";
public static final ActionType<PostConnectorSyncJobAction.Response> INSTANCE = new ActionType<>(NAME);
private PostConnectorSyncJobAction() {/* no instances */}
@ -139,6 +140,14 @@ public class PostConnectorSyncJobAction {
public int hashCode() {
return Objects.hash(id, jobType, triggerMethod);
}
@Override
public String[] indices() {
// Creating a new sync job requires reading from connector index
return new String[] {
ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN,
ConnectorTemplateRegistry.CONNECTOR_INDEX_NAME_PATTERN };
}
}
public static class Response extends ActionResponse implements ToXContentObject {

View file

@ -28,7 +28,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
public class UpdateConnectorSyncJobErrorAction {
public static final String NAME = "cluster:admin/xpack/connector/sync_job/update_error";
public static final String NAME = "indices:data/write/xpack/connector/sync_job/update_error";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorSyncJobErrorAction() {/* no instances */}

View file

@ -35,7 +35,7 @@ import static org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyn
public class UpdateConnectorSyncJobIngestionStatsAction {
public static final String NAME = "cluster:admin/xpack/connector/sync_job/update_stats";
public static final String NAME = "indices:data/write/xpack/connector/sync_job/update_stats";
public static final ActionType<ConnectorUpdateActionResponse> INSTANCE = new ActionType<>(NAME);
private UpdateConnectorSyncJobIngestionStatsAction() {/* no instances */}

View file

@ -11,7 +11,6 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Tuple;
@ -112,13 +111,66 @@ public class ConnectorIndexServiceTests extends ESSingleNodeTestCase {
}
String connectorIdToDelete = connectorIds.get(0);
DeleteResponse resp = awaitDeleteConnector(connectorIdToDelete, false);
UpdateResponse resp = awaitDeleteConnector(connectorIdToDelete, false);
assertThat(resp.status(), equalTo(RestStatus.OK));
expectThrows(ResourceNotFoundException.class, () -> awaitGetConnector(connectorIdToDelete));
expectThrows(ResourceNotFoundException.class, () -> awaitDeleteConnector(connectorIdToDelete, false));
}
public void testDeleteConnector_expectSoftDeletion() throws Exception {
int numConnectors = 5;
List<String> connectorIds = new ArrayList<>();
List<Connector> connectors = new ArrayList<>();
for (int i = 0; i < numConnectors; i++) {
Connector connector = ConnectorTestUtils.getRandomConnector();
ConnectorCreateActionResponse resp = awaitCreateConnector(null, connector);
connectorIds.add(resp.getId());
connectors.add(connector);
}
String connectorIdToDelete = connectorIds.get(0);
UpdateResponse resp = awaitDeleteConnector(connectorIdToDelete, false);
assertThat(resp.status(), equalTo(RestStatus.OK));
expectThrows(ResourceNotFoundException.class, () -> awaitGetConnector(connectorIdToDelete));
expectThrows(ResourceNotFoundException.class, () -> awaitDeleteConnector(connectorIdToDelete, false));
Connector softDeletedConnector = awaitGetSoftDeletedConnector(connectorIdToDelete);
assertThat(softDeletedConnector.getConnectorId(), equalTo(connectorIdToDelete));
assertThat(softDeletedConnector.getServiceType(), equalTo(connectors.get(0).getServiceType()));
}
public void testDeleteConnector_expectSoftDeletionMultipleConnectors() throws Exception {
int numConnectors = 5;
List<String> connectorIds = new ArrayList<>();
for (int i = 0; i < numConnectors; i++) {
Connector connector = ConnectorTestUtils.getRandomConnector();
ConnectorCreateActionResponse resp = awaitCreateConnector(null, connector);
connectorIds.add(resp.getId());
}
// Delete all of them
for (int i = 0; i < numConnectors; i++) {
String connectorIdToDelete = connectorIds.get(i);
UpdateResponse resp = awaitDeleteConnector(connectorIdToDelete, false);
assertThat(resp.status(), equalTo(RestStatus.OK));
}
// Connectors were deleted from main index
for (int i = 0; i < numConnectors; i++) {
String connectorId = connectorIds.get(i);
expectThrows(ResourceNotFoundException.class, () -> awaitGetConnector(connectorId));
}
// Soft deleted connectors available in system index
for (int i = 0; i < numConnectors; i++) {
String connectorId = connectorIds.get(i);
Connector softDeletedConnector = awaitGetSoftDeletedConnector(connectorId);
assertThat(softDeletedConnector.getConnectorId(), equalTo(connectorId));
}
}
public void testUpdateConnectorConfiguration_FullConfiguration() throws Exception {
Connector connector = ConnectorTestUtils.getRandomConnector();
String connectorId = randomUUID();
@ -897,13 +949,13 @@ public class ConnectorIndexServiceTests extends ESSingleNodeTestCase {
assertThat(updateApiKeyIdRequest.getApiKeySecretId(), equalTo(indexedConnector.getApiKeySecretId()));
}
private DeleteResponse awaitDeleteConnector(String connectorId, boolean deleteConnectorSyncJobs) throws Exception {
private UpdateResponse awaitDeleteConnector(String connectorId, boolean deleteConnectorSyncJobs) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<DeleteResponse> resp = new AtomicReference<>(null);
final AtomicReference<UpdateResponse> resp = new AtomicReference<>(null);
final AtomicReference<Exception> exc = new AtomicReference<>(null);
connectorIndexService.deleteConnector(connectorId, deleteConnectorSyncJobs, new ActionListener<>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
public void onResponse(UpdateResponse deleteResponse) {
resp.set(deleteResponse);
latch.countDown();
}
@ -956,11 +1008,19 @@ public class ConnectorIndexServiceTests extends ESSingleNodeTestCase {
return resp.get();
}
private Connector awaitGetSoftDeletedConnector(String connectorId) throws Exception {
return awaitGetConnector(connectorId, true);
}
private Connector awaitGetConnector(String connectorId) throws Exception {
return awaitGetConnector(connectorId, false);
}
private Connector awaitGetConnector(String connectorId, Boolean isDeleted) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Connector> resp = new AtomicReference<>(null);
final AtomicReference<Exception> exc = new AtomicReference<>(null);
connectorIndexService.getConnector(connectorId, new ActionListener<>() {
connectorIndexService.getConnector(connectorId, isDeleted, new ActionListener<>() {
@Override
public void onResponse(ConnectorSearchResult connectorResult) {
// Serialize the sourceRef to Connector class for unit tests
@ -994,11 +1054,23 @@ public class ConnectorIndexServiceTests extends ESSingleNodeTestCase {
List<String> names,
List<String> serviceTypes,
String searchQuery
) throws Exception {
return awaitListConnector(from, size, indexNames, names, serviceTypes, searchQuery, false);
}
private ConnectorIndexService.ConnectorResult awaitListConnector(
int from,
int size,
List<String> indexNames,
List<String> names,
List<String> serviceTypes,
String searchQuery,
Boolean isDeleted
) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<ConnectorIndexService.ConnectorResult> resp = new AtomicReference<>(null);
final AtomicReference<Exception> exc = new AtomicReference<>(null);
connectorIndexService.listConnectors(from, size, indexNames, names, serviceTypes, searchQuery, new ActionListener<>() {
connectorIndexService.listConnectors(from, size, indexNames, names, serviceTypes, searchQuery, isDeleted, new ActionListener<>() {
@Override
public void onResponse(ConnectorIndexService.ConnectorResult result) {
resp.set(result);

View file

@ -9,43 +9,20 @@ package org.elasticsearch.xpack.application.connector;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;
import org.junit.Before;
import java.io.IOException;
import java.util.List;
import static java.util.Collections.emptyList;
import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
import static org.hamcrest.CoreMatchers.equalTo;
public class ConnectorTests extends ESTestCase {
private NamedWriteableRegistry namedWriteableRegistry;
@Before
public void registerNamedObjects() {
SearchModule searchModule = new SearchModule(Settings.EMPTY, emptyList());
List<NamedWriteableRegistry.Entry> namedWriteables = searchModule.getNamedWriteables();
namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
}
public final void testRandomSerialization() throws IOException {
for (int runs = 0; runs < 10; runs++) {
Connector testInstance = ConnectorTestUtils.getRandomConnector();
assertTransportSerialization(testInstance);
}
}
public void testToXContent() throws IOException {
String connectorId = "test-connector";
String content = XContentHelper.stripWhitespace("""
@ -383,14 +360,4 @@ public class ConnectorTests extends ESTestCase {
assertToXContentEquivalent(originalBytes, toXContent(parsed, XContentType.JSON, humanReadable), XContentType.JSON);
assertThat(parsed.getApiKeySecretId(), equalTo(null));
}
private void assertTransportSerialization(Connector testInstance) throws IOException {
Connector deserializedInstance = copyInstance(testInstance);
assertNotSame(testInstance, deserializedInstance);
assertThat(testInstance, equalTo(deserializedInstance));
}
private Connector copyInstance(Connector instance) throws IOException {
return copyWriteable(instance, namedWriteableRegistry, Connector::new);
}
}

View file

@ -1,43 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.application.connector.action;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractBWCSerializationTestCase;
import org.elasticsearch.xcontent.XContentParser;
import java.io.IOException;
public class GetConnectorActionRequestBWCSerializingTests extends AbstractBWCSerializationTestCase<GetConnectorAction.Request> {
@Override
protected Writeable.Reader<GetConnectorAction.Request> instanceReader() {
return GetConnectorAction.Request::new;
}
@Override
protected GetConnectorAction.Request createTestInstance() {
return new GetConnectorAction.Request(randomAlphaOfLengthBetween(1, 10));
}
@Override
protected GetConnectorAction.Request mutateInstance(GetConnectorAction.Request instance) throws IOException {
return randomValueOtherThan(instance, this::createTestInstance);
}
@Override
protected GetConnectorAction.Request doParseInstance(XContentParser parser) throws IOException {
return GetConnectorAction.Request.parse(parser);
}
@Override
protected GetConnectorAction.Request mutateInstanceForVersion(GetConnectorAction.Request instance, TransportVersion version) {
return new GetConnectorAction.Request(instance.getConnectorId());
}
}

View file

@ -1,58 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.application.connector.action;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractBWCSerializationTestCase;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.application.EnterpriseSearchModuleTestUtils;
import org.elasticsearch.xpack.core.action.util.PageParams;
import java.io.IOException;
import java.util.List;
public class ListConnectorActionRequestBWCSerializingTests extends AbstractBWCSerializationTestCase<ListConnectorAction.Request> {
@Override
protected Writeable.Reader<ListConnectorAction.Request> instanceReader() {
return ListConnectorAction.Request::new;
}
@Override
protected ListConnectorAction.Request createTestInstance() {
PageParams pageParams = EnterpriseSearchModuleTestUtils.randomPageParams();
return new ListConnectorAction.Request(
pageParams,
List.of(generateRandomStringArray(10, 10, false)),
List.of(generateRandomStringArray(10, 10, false)),
List.of(generateRandomStringArray(10, 10, false)),
randomAlphaOfLengthBetween(3, 10)
);
}
@Override
protected ListConnectorAction.Request mutateInstance(ListConnectorAction.Request instance) throws IOException {
return randomValueOtherThan(instance, this::createTestInstance);
}
@Override
protected ListConnectorAction.Request doParseInstance(XContentParser parser) throws IOException {
return ListConnectorAction.Request.parse(parser);
}
@Override
protected ListConnectorAction.Request mutateInstanceForVersion(ListConnectorAction.Request instance, TransportVersion version) {
return new ListConnectorAction.Request(
instance.getPageParams(),
instance.getIndexNames(),
instance.getConnectorNames(),
instance.getConnectorServiceTypes(),
instance.getConnectorSearchQuery()
);
}
}

View file

@ -8,22 +8,14 @@
package org.elasticsearch.xpack.application.connector.secrets.action;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xpack.application.connector.Connector;
import org.elasticsearch.xpack.application.connector.secrets.ConnectorSecretsTestUtils;
import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase;
import java.io.IOException;
import java.util.List;
public class DeleteConnectorSecretResponseBWCSerializingTests extends AbstractBWCWireSerializationTestCase<DeleteConnectorSecretResponse> {
@Override
public NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(List.of(new NamedWriteableRegistry.Entry(Connector.class, Connector.NAME, Connector::new)));
}
@Override
protected Writeable.Reader<DeleteConnectorSecretResponse> instanceReader() {
return DeleteConnectorSecretResponse::new;

View file

@ -8,22 +8,14 @@
package org.elasticsearch.xpack.application.connector.secrets.action;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xpack.application.connector.Connector;
import org.elasticsearch.xpack.application.connector.secrets.ConnectorSecretsTestUtils;
import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase;
import java.io.IOException;
import java.util.List;
public class GetConnectorSecretResponseBWCSerializingTests extends AbstractBWCWireSerializationTestCase<GetConnectorSecretResponse> {
@Override
public NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(List.of(new NamedWriteableRegistry.Entry(Connector.class, Connector.NAME, Connector::new)));
}
@Override
protected Writeable.Reader<GetConnectorSecretResponse> instanceReader() {
return GetConnectorSecretResponse::new;

View file

@ -8,17 +8,14 @@
package org.elasticsearch.xpack.application.connector.syncjob;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.application.connector.Connector;
import org.elasticsearch.xpack.application.connector.ConnectorSyncStatus;
import org.junit.Before;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
@ -26,22 +23,6 @@ import static org.hamcrest.Matchers.notNullValue;
public class ConnectorSyncJobTests extends ESTestCase {
private NamedWriteableRegistry namedWriteableRegistry;
@Before
public void registerNamedObjects() {
namedWriteableRegistry = new NamedWriteableRegistry(
List.of(new NamedWriteableRegistry.Entry(Connector.class, Connector.NAME, Connector::new))
);
}
public final void testRandomSerialization() throws IOException {
for (int run = 0; run < 10; run++) {
ConnectorSyncJob syncJob = ConnectorSyncJobTestUtils.getRandomConnectorSyncJob();
assertTransportSerialization(syncJob);
}
}
public void testFromXContent_WithAllFields_AllSet() throws IOException {
String content = XContentHelper.stripWhitespace("""
{
@ -332,14 +313,4 @@ public class ConnectorSyncJobTests extends ESTestCase {
ConnectorSyncJob.syncJobConnectorFromXContentBytes(new BytesArray(content), null, XContentType.JSON);
}
private void assertTransportSerialization(ConnectorSyncJob testInstance) throws IOException {
ConnectorSyncJob deserializedInstance = copyInstance(testInstance);
assertNotSame(testInstance, deserializedInstance);
assertThat(testInstance, equalTo(deserializedInstance));
}
private ConnectorSyncJob copyInstance(ConnectorSyncJob instance) throws IOException {
return copyWriteable(instance, namedWriteableRegistry, ConnectorSyncJob::new);
}
}

View file

@ -8,23 +8,15 @@
package org.elasticsearch.xpack.application.connector.syncjob.action;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xpack.application.connector.Connector;
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobTestUtils;
import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase;
import java.io.IOException;
import java.util.List;
public class GetConnectorSyncJobActionResponseBWCSerializingTests extends AbstractBWCWireSerializationTestCase<
GetConnectorSyncJobAction.Response> {
@Override
public NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(List.of(new NamedWriteableRegistry.Entry(Connector.class, Connector.NAME, Connector::new)));
}
@Override
protected Writeable.Reader<GetConnectorSyncJobAction.Response> instanceReader() {
return GetConnectorSyncJobAction.Response::new;

View file

@ -8,23 +8,15 @@
package org.elasticsearch.xpack.application.connector.syncjob.action;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xpack.application.connector.Connector;
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobTestUtils;
import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase;
import java.io.IOException;
import java.util.List;
public class ListConnectorSyncJobsActionResponseBWCSerializingTests extends AbstractBWCWireSerializationTestCase<
ListConnectorSyncJobsAction.Response> {
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(List.of(new NamedWriteableRegistry.Entry(Connector.class, Connector.NAME, Connector::new)));
}
@Override
protected Writeable.Reader<ListConnectorSyncJobsAction.Response> instanceReader() {
return ListConnectorSyncJobsAction.Response::new;