[Search] Add system index descriptors to Connector indices (#118991)

Update the .elastic-connectors and .elastic-connectors-sync-jobs indices into system indices
This commit is contained in:
Navarone Feekery 2025-01-24 13:10:38 +01:00 committed by GitHub
parent dc195f4db7
commit 385e1fdf21
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 230 additions and 375 deletions

View file

@ -196,6 +196,9 @@ public final class ClientHelper {
public static final String APM_ORIGIN = "apm"; public static final String APM_ORIGIN = "apm";
public static final String OTEL_ORIGIN = "otel"; public static final String OTEL_ORIGIN = "otel";
public static final String REINDEX_DATA_STREAM_ORIGIN = "reindex_data_stream"; public static final String REINDEX_DATA_STREAM_ORIGIN = "reindex_data_stream";
// TODO consolidate the Kibana origin with the one defined in org/elasticsearch/kibana/KibanaPlugin.java
public static final String KIBANA_ORIGIN = "kibana";
public static final String CLOUD_ORIGIN = "cloud";
private ClientHelper() {} private ClientHelper() {}

View file

@ -1,12 +1,16 @@
{ {
"template": { "settings": {
"aliases": { "index": {
".elastic-connectors-sync-jobs": {} "number_of_shards": "1",
"auto_expand_replicas": "0-1"
}
}, },
"mappings": { "mappings": {
"dynamic": "false", "_doc": {
"dynamic": "strict",
"_meta": { "_meta": {
"version": ${xpack.application.connector.template.version} "version": "${elastic-connectors-sync-jobs.version}",
"managed_index_mappings_version": ${elastic-connectors-sync-jobs.managed.index.version}
}, },
"properties": { "properties": {
"cancelation_requested_at": { "cancelation_requested_at": {
@ -21,9 +25,11 @@
"connector": { "connector": {
"properties": { "properties": {
"configuration": { "configuration": {
"dynamic": "false",
"type": "object" "type": "object"
}, },
"filtering": { "filtering": {
"dynamic": "false",
"properties": { "properties": {
"advanced_snippet": { "advanced_snippet": {
"properties": { "properties": {
@ -91,6 +97,7 @@
"type": "keyword" "type": "keyword"
}, },
"pipeline": { "pipeline": {
"dynamic": "false",
"properties": { "properties": {
"extract_binary_content": { "extract_binary_content": {
"type": "boolean" "type": "boolean"
@ -110,6 +117,7 @@
"type": "keyword" "type": "keyword"
}, },
"sync_cursor": { "sync_cursor": {
"dynamic": "false",
"type": "object" "type": "object"
} }
} }
@ -136,6 +144,7 @@
"type": "date" "type": "date"
}, },
"metadata": { "metadata": {
"dynamic": "false",
"type": "object" "type": "object"
}, },
"started_at": { "started_at": {
@ -155,10 +164,5 @@
} }
} }
} }
}, }
"_meta": {
"description": "Built-in mappings applied by default to elastic-connectors indices",
"managed": true
},
"version": ${xpack.application.connector.template.version}
} }

View file

@ -1,29 +1,35 @@
{ {
"template": { "settings": {
"aliases": { "index": {
".elastic-connectors": {} "number_of_shards": "1",
"auto_expand_replicas": "0-1"
}
}, },
"mappings": { "mappings": {
"dynamic": "false", "_doc": {
"dynamic": "strict",
"_meta": { "_meta": {
"pipeline": { "version": "${elastic-connectors.version}",
"default_name": "search-default-ingestion", "managed_index_mappings_version": ${elastic-connectors.managed.index.version}
"default_extract_binary_content": true,
"default_run_ml_inference": true,
"default_reduce_whitespace": true
},
"version": ${xpack.application.connector.template.version}
}, },
"properties": { "properties": {
"api_key_id": { "api_key_id": {
"type": "keyword" "type": "keyword"
}, },
"api_key_secret_id": {
"type": "keyword"
},
"configuration": { "configuration": {
"dynamic": "false",
"type": "object" "type": "object"
}, },
"custom_scheduling": { "custom_scheduling": {
"dynamic": "false",
"type": "object" "type": "object"
}, },
"deleted": {
"type": "boolean"
},
"description": { "description": {
"type": "text" "type": "text"
}, },
@ -31,6 +37,7 @@
"type": "keyword" "type": "keyword"
}, },
"features": { "features": {
"dynamic": "false",
"properties": { "properties": {
"filtering_advanced_config": { "filtering_advanced_config": {
"type": "boolean" "type": "boolean"
@ -66,6 +73,7 @@
} }
}, },
"filtering": { "filtering": {
"dynamic": "false",
"properties": { "properties": {
"active": { "active": {
"properties": { "properties": {
@ -78,6 +86,7 @@
"type": "date" "type": "date"
}, },
"value": { "value": {
"dynamic": "false",
"type": "object" "type": "object"
} }
} }
@ -143,6 +152,7 @@
"type": "date" "type": "date"
}, },
"value": { "value": {
"dynamic": "false",
"type": "object" "type": "object"
} }
} }
@ -242,6 +252,7 @@
"type": "keyword" "type": "keyword"
}, },
"pipeline": { "pipeline": {
"dynamic": "false",
"properties": { "properties": {
"extract_binary_content": { "extract_binary_content": {
"type": "boolean" "type": "boolean"
@ -258,6 +269,7 @@
} }
}, },
"scheduling": { "scheduling": {
"dynamic": "false",
"properties": { "properties": {
"access_control": { "access_control": {
"properties": { "properties": {
@ -298,22 +310,13 @@
"type": "keyword" "type": "keyword"
}, },
"sync_cursor": { "sync_cursor": {
"dynamic": "false",
"type": "object" "type": "object"
}, },
"sync_now": { "sync_now": {
"type": "boolean" "type": "boolean"
},
"deleted": {
"type": "boolean"
} }
} }
} }
}, }
"_meta": {
"description": "Built-in mappings applied by default to elastic-connectors indices",
"managed": true
},
"version": ${xpack.application.connector.template.version}
} }

View file

@ -1,14 +0,0 @@
{
"template": {
"settings": {
"hidden": true,
"number_of_shards": "1",
"auto_expand_replicas": "0-1"
}
},
"_meta": {
"description": "Built-in settings applied by default to connector management indices",
"managed": true
},
"version": ${xpack.application.connector.template.version}
}

View file

@ -1,14 +0,0 @@
{
"index_patterns": ["${connectors-sync-jobs.index_pattern}"],
"priority": 100,
"composed_of": [
"elastic-connectors-settings",
"elastic-connectors-sync-jobs-mappings"
],
"allow_auto_create": true,
"_meta": {
"description": "Built-in template for elastic-connectors-sync-jobs",
"managed": true
},
"version": ${xpack.application.connector.template.version}
}

View file

@ -1,14 +0,0 @@
{
"index_patterns": ["${connectors.index_pattern}"],
"priority": 100,
"composed_of": [
"elastic-connectors-settings",
"elastic-connectors-mappings"
],
"allow_auto_create": true,
"_meta": {
"description": "Built-in template for elastic-connectors",
"managed": true
},
"version": ${xpack.application.connector.template.version}
}

View file

@ -46,6 +46,7 @@ import org.elasticsearch.xpack.application.analytics.action.TransportPostAnalyti
import org.elasticsearch.xpack.application.analytics.action.TransportPutAnalyticsCollectionAction; import org.elasticsearch.xpack.application.analytics.action.TransportPutAnalyticsCollectionAction;
import org.elasticsearch.xpack.application.analytics.ingest.AnalyticsEventIngestConfig; import org.elasticsearch.xpack.application.analytics.ingest.AnalyticsEventIngestConfig;
import org.elasticsearch.xpack.application.connector.ConnectorAPIFeature; import org.elasticsearch.xpack.application.connector.ConnectorAPIFeature;
import org.elasticsearch.xpack.application.connector.ConnectorIndexService;
import org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry; import org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry;
import org.elasticsearch.xpack.application.connector.action.DeleteConnectorAction; import org.elasticsearch.xpack.application.connector.action.DeleteConnectorAction;
import org.elasticsearch.xpack.application.connector.action.GetConnectorAction; import org.elasticsearch.xpack.application.connector.action.GetConnectorAction;
@ -124,6 +125,7 @@ import org.elasticsearch.xpack.application.connector.secrets.action.TransportDel
import org.elasticsearch.xpack.application.connector.secrets.action.TransportGetConnectorSecretAction; import org.elasticsearch.xpack.application.connector.secrets.action.TransportGetConnectorSecretAction;
import org.elasticsearch.xpack.application.connector.secrets.action.TransportPostConnectorSecretAction; import org.elasticsearch.xpack.application.connector.secrets.action.TransportPostConnectorSecretAction;
import org.elasticsearch.xpack.application.connector.secrets.action.TransportPutConnectorSecretAction; import org.elasticsearch.xpack.application.connector.secrets.action.TransportPutConnectorSecretAction;
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobIndexService;
import org.elasticsearch.xpack.application.connector.syncjob.action.CancelConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.CancelConnectorSyncJobAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.CheckInConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.CheckInConnectorSyncJobAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.ClaimConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.ClaimConnectorSyncJobAction;
@ -477,7 +479,12 @@ public class EnterpriseSearch extends Plugin implements ActionPlugin, SystemInde
@Override @Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) { public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
Collection<SystemIndexDescriptor> systemIndices = new ArrayList<>( Collection<SystemIndexDescriptor> systemIndices = new ArrayList<>(
List.of(SearchApplicationIndexService.getSystemIndexDescriptor(), QueryRulesIndexService.getSystemIndexDescriptor()) List.of(
SearchApplicationIndexService.getSystemIndexDescriptor(),
QueryRulesIndexService.getSystemIndexDescriptor(),
ConnectorSyncJobIndexService.getSystemIndexDescriptor(),
ConnectorIndexService.getSystemIndexDescriptor()
)
); );
if (ConnectorSecretsFeature.isEnabled()) { if (ConnectorSecretsFeature.isEnabled()) {

View file

@ -10,10 +10,12 @@ package org.elasticsearch.xpack.application.connector;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DelegatingActionListener; import org.elasticsearch.action.DelegatingActionListener;
import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
@ -33,6 +35,7 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.query.WildcardQueryBuilder; import org.elasticsearch.index.query.WildcardQueryBuilder;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType; import org.elasticsearch.script.ScriptType;
@ -59,6 +62,7 @@ import org.elasticsearch.xpack.application.connector.filtering.FilteringValidati
import org.elasticsearch.xpack.application.connector.filtering.FilteringValidationState; import org.elasticsearch.xpack.application.connector.filtering.FilteringValidationState;
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJob; import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJob;
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobIndexService; import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobIndexService;
import org.elasticsearch.xpack.core.template.TemplateUtils;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
@ -76,6 +80,7 @@ import java.util.stream.Collectors;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.application.connector.ConnectorFiltering.fromXContentBytesConnectorFiltering; import static org.elasticsearch.xpack.application.connector.ConnectorFiltering.fromXContentBytesConnectorFiltering;
import static org.elasticsearch.xpack.application.connector.ConnectorFiltering.sortFilteringRulesByOrder; import static org.elasticsearch.xpack.application.connector.ConnectorFiltering.sortFilteringRulesByOrder;
import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.CONNECTORS_ALLOWED_PRODUCT_ORIGINS;
import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.MANAGED_CONNECTOR_INDEX_PREFIX; import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.MANAGED_CONNECTOR_INDEX_PREFIX;
import static org.elasticsearch.xpack.core.ClientHelper.CONNECTORS_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.CONNECTORS_ORIGIN;
@ -87,7 +92,20 @@ public class ConnectorIndexService {
// The client to interact with the system index (internal user). // The client to interact with the system index (internal user).
private final Client clientWithOrigin; private final Client clientWithOrigin;
public static final String CONNECTOR_INDEX_NAME = ConnectorTemplateRegistry.CONNECTOR_INDEX_NAME_PATTERN; // TODO use proper version IDs (see org/elasticsearch/xpack/application/rules/QueryRulesIndexService.java)
// TODO if this version is updated, a test should be added to
// javaRestTest/java/org/elasticsearch/xpack/application/FullClusterRestartIT.java
private static final int CONNECTORS_INDEX_VERSION = 1;
// TODO rename to CONNECTOR_ALIAS_NAME
public static final String CONNECTOR_INDEX_NAME = ".elastic-connectors";
public static final String CONNECTOR_INDEX_PREFIX = ".elastic-connectors-v";
public static final String CONNECTOR_CONCRETE_INDEX_NAME = CONNECTOR_INDEX_PREFIX + CONNECTORS_INDEX_VERSION;
// The index pattern needs a stricter regex to prevent conflicts with .elastic-connectors-sync-jobs
public static final String CONNECTOR_INDEX_NAME_PATTERN = CONNECTOR_INDEX_PREFIX + "*";
private static final String CONNECTORS_MAPPING_VERSION_VARIABLE = "elastic-connectors.version";
private static final String CONNECTORS_MAPPING_MANAGED_VERSION_VARIABLE = "elastic-connectors.managed.index.version";
/** /**
* @param client A client for executing actions on the connector index * @param client A client for executing actions on the connector index
@ -96,6 +114,36 @@ public class ConnectorIndexService {
this.clientWithOrigin = new OriginSettingClient(client, CONNECTORS_ORIGIN); this.clientWithOrigin = new OriginSettingClient(client, CONNECTORS_ORIGIN);
} }
/**
* Returns the {@link SystemIndexDescriptor} for the Connector system index.
*
* @return The {@link SystemIndexDescriptor} for the Connector system index.
*/
public static SystemIndexDescriptor getSystemIndexDescriptor() {
PutIndexTemplateRequest request = new PutIndexTemplateRequest();
String templateSource = TemplateUtils.loadTemplate(
"/elastic-connectors.json",
Version.CURRENT.toString(),
CONNECTORS_MAPPING_VERSION_VARIABLE,
Map.of(CONNECTORS_MAPPING_MANAGED_VERSION_VARIABLE, Integer.toString(CONNECTORS_INDEX_VERSION))
);
request.source(templateSource, XContentType.JSON);
// The index pattern needs a stricter regex to prevent conflicts with .elastic-connectors-sync-jobs
return SystemIndexDescriptor.builder()
.setIndexPattern(CONNECTOR_INDEX_NAME_PATTERN)
.setPrimaryIndex(CONNECTOR_CONCRETE_INDEX_NAME)
.setAliasName(CONNECTOR_INDEX_NAME)
.setDescription("Search connectors")
.setMappings(request.mappings())
.setSettings(request.settings())
.setOrigin(CONNECTORS_ORIGIN)
.setType(SystemIndexDescriptor.Type.EXTERNAL_MANAGED)
.setAllowedElasticProductOrigins(CONNECTORS_ALLOWED_PRODUCT_ORIGINS)
.setNetNew()
.build();
}
/** /**
* Creates or updates the {@link Connector} in the underlying index with a specific doc ID * Creates or updates the {@link Connector} in the underlying index with a specific doc ID
* if connectorId is provided. Otherwise, the connector doc is indexed with auto-generated doc ID. * if connectorId is provided. Otherwise, the connector doc is indexed with auto-generated doc ID.

View file

@ -8,25 +8,23 @@
package org.elasticsearch.xpack.application.connector; package org.elasticsearch.xpack.application.connector;
import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.template.IndexTemplateConfig; import org.elasticsearch.xpack.core.template.IndexTemplateConfig;
import org.elasticsearch.xpack.core.template.IndexTemplateRegistry; import org.elasticsearch.xpack.core.template.IndexTemplateRegistry;
import org.elasticsearch.xpack.core.template.IngestPipelineConfig; import org.elasticsearch.xpack.core.template.IngestPipelineConfig;
import org.elasticsearch.xpack.core.template.JsonIngestPipelineConfig; import org.elasticsearch.xpack.core.template.JsonIngestPipelineConfig;
import java.io.IOException;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.xpack.core.ClientHelper.CLOUD_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.CONNECTORS_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.ENT_SEARCH_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.ENT_SEARCH_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.KIBANA_ORIGIN;
public class ConnectorTemplateRegistry extends IndexTemplateRegistry { public class ConnectorTemplateRegistry extends IndexTemplateRegistry {
@ -34,13 +32,6 @@ public class ConnectorTemplateRegistry extends IndexTemplateRegistry {
static final int REGISTRY_VERSION = 3; static final int REGISTRY_VERSION = 3;
// Connector indices constants // Connector indices constants
public static final String CONNECTOR_INDEX_NAME_PATTERN = ".elastic-connectors-v1";
public static final String CONNECTOR_TEMPLATE_NAME = "elastic-connectors";
public static final String CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN = ".elastic-connectors-sync-jobs-v1";
public static final String CONNECTOR_SYNC_JOBS_TEMPLATE_NAME = "elastic-connectors-sync-jobs";
public static final String ACCESS_CONTROL_INDEX_PREFIX = ".search-acl-filter-"; public static final String ACCESS_CONTROL_INDEX_PREFIX = ".search-acl-filter-";
public static final String ACCESS_CONTROL_INDEX_NAME_PATTERN = ".search-acl-filter-*"; public static final String ACCESS_CONTROL_INDEX_NAME_PATTERN = ".search-acl-filter-*";
public static final String ACCESS_CONTROL_TEMPLATE_NAME = "search-acl-filter"; public static final String ACCESS_CONTROL_TEMPLATE_NAME = "search-acl-filter";
@ -58,51 +49,8 @@ public class ConnectorTemplateRegistry extends IndexTemplateRegistry {
// Variable used to replace template version in index templates // Variable used to replace template version in index templates
public static final String TEMPLATE_VERSION_VARIABLE = "xpack.application.connector.template.version"; public static final String TEMPLATE_VERSION_VARIABLE = "xpack.application.connector.template.version";
private static final String MAPPINGS_SUFFIX = "-mappings"; // Sources allowed to access system indices using X-elastic-product-origin header
public static final List<String> CONNECTORS_ALLOWED_PRODUCT_ORIGINS = List.of(KIBANA_ORIGIN, CONNECTORS_ORIGIN, CLOUD_ORIGIN);
private static final String SETTINGS_SUFFIX = "-settings";
private static final String JSON_EXTENSION = ".json";
static final Map<String, ComponentTemplate> COMPONENT_TEMPLATES;
static {
final Map<String, ComponentTemplate> componentTemplates = new HashMap<>();
for (IndexTemplateConfig config : List.of(
new IndexTemplateConfig(
CONNECTOR_TEMPLATE_NAME + MAPPINGS_SUFFIX,
ROOT_TEMPLATE_RESOURCE_PATH + CONNECTOR_TEMPLATE_NAME + MAPPINGS_SUFFIX + JSON_EXTENSION,
REGISTRY_VERSION,
TEMPLATE_VERSION_VARIABLE
),
new IndexTemplateConfig(
CONNECTOR_TEMPLATE_NAME + SETTINGS_SUFFIX,
ROOT_TEMPLATE_RESOURCE_PATH + CONNECTOR_TEMPLATE_NAME + SETTINGS_SUFFIX + JSON_EXTENSION,
REGISTRY_VERSION,
TEMPLATE_VERSION_VARIABLE
),
new IndexTemplateConfig(
CONNECTOR_SYNC_JOBS_TEMPLATE_NAME + MAPPINGS_SUFFIX,
ROOT_TEMPLATE_RESOURCE_PATH + CONNECTOR_SYNC_JOBS_TEMPLATE_NAME + MAPPINGS_SUFFIX + JSON_EXTENSION,
REGISTRY_VERSION,
TEMPLATE_VERSION_VARIABLE
),
new IndexTemplateConfig(
CONNECTOR_SYNC_JOBS_TEMPLATE_NAME + SETTINGS_SUFFIX,
ROOT_TEMPLATE_RESOURCE_PATH + CONNECTOR_TEMPLATE_NAME + SETTINGS_SUFFIX + JSON_EXTENSION,
REGISTRY_VERSION,
TEMPLATE_VERSION_VARIABLE
)
)) {
try (var parser = JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, config.loadBytes())) {
componentTemplates.put(config.getTemplateName(), ComponentTemplate.parse(parser));
} catch (IOException e) {
throw new AssertionError(e);
}
}
COMPONENT_TEMPLATES = Map.copyOf(componentTemplates);
}
@Override @Override
protected List<IngestPipelineConfig> getIngestPipelines() { protected List<IngestPipelineConfig> getIngestPipelines() {
@ -117,20 +65,6 @@ public class ConnectorTemplateRegistry extends IndexTemplateRegistry {
} }
static final Map<String, ComposableIndexTemplate> COMPOSABLE_INDEX_TEMPLATES = parseComposableTemplates( static final Map<String, ComposableIndexTemplate> COMPOSABLE_INDEX_TEMPLATES = parseComposableTemplates(
new IndexTemplateConfig(
CONNECTOR_TEMPLATE_NAME,
ROOT_TEMPLATE_RESOURCE_PATH + CONNECTOR_TEMPLATE_NAME + ".json",
REGISTRY_VERSION,
TEMPLATE_VERSION_VARIABLE,
Map.of("connectors.index_pattern", CONNECTOR_INDEX_NAME_PATTERN)
),
new IndexTemplateConfig(
CONNECTOR_SYNC_JOBS_TEMPLATE_NAME,
ROOT_TEMPLATE_RESOURCE_PATH + CONNECTOR_SYNC_JOBS_TEMPLATE_NAME + ".json",
REGISTRY_VERSION,
TEMPLATE_VERSION_VARIABLE,
Map.of("connectors-sync-jobs.index_pattern", CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN)
),
new IndexTemplateConfig( new IndexTemplateConfig(
ACCESS_CONTROL_TEMPLATE_NAME, ACCESS_CONTROL_TEMPLATE_NAME,
ROOT_TEMPLATE_RESOURCE_PATH + ACCESS_CONTROL_TEMPLATE_NAME + ".json", ROOT_TEMPLATE_RESOURCE_PATH + ACCESS_CONTROL_TEMPLATE_NAME + ".json",
@ -154,11 +88,6 @@ public class ConnectorTemplateRegistry extends IndexTemplateRegistry {
return ENT_SEARCH_ORIGIN; return ENT_SEARCH_ORIGIN;
} }
@Override
protected Map<String, ComponentTemplate> getComponentTemplateConfigs() {
return COMPONENT_TEMPLATES;
}
@Override @Override
protected Map<String, ComposableIndexTemplate> getComposableTemplateConfigs() { protected Map<String, ComposableIndexTemplate> getComposableTemplateConfigs() {
return COMPOSABLE_INDEX_TEMPLATES; return COMPOSABLE_INDEX_TEMPLATES;

View file

@ -11,10 +11,12 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DelegatingActionListener; import org.elasticsearch.action.DelegatingActionListener;
import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.delete.DeleteResponse;
@ -40,6 +42,7 @@ import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -49,10 +52,10 @@ import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.application.connector.Connector; import org.elasticsearch.xpack.application.connector.Connector;
import org.elasticsearch.xpack.application.connector.ConnectorFiltering; import org.elasticsearch.xpack.application.connector.ConnectorFiltering;
import org.elasticsearch.xpack.application.connector.ConnectorSyncStatus; import org.elasticsearch.xpack.application.connector.ConnectorSyncStatus;
import org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry;
import org.elasticsearch.xpack.application.connector.filtering.FilteringRules; import org.elasticsearch.xpack.application.connector.filtering.FilteringRules;
import org.elasticsearch.xpack.application.connector.syncjob.action.PostConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.PostConnectorSyncJobAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.UpdateConnectorSyncJobIngestionStatsAction; import org.elasticsearch.xpack.application.connector.syncjob.action.UpdateConnectorSyncJobIngestionStatsAction;
import org.elasticsearch.xpack.core.template.TemplateUtils;
import java.io.IOException; import java.io.IOException;
import java.time.Instant; import java.time.Instant;
@ -69,6 +72,7 @@ import java.util.stream.Stream;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.application.connector.ConnectorIndexService.CONNECTOR_INDEX_NAME; import static org.elasticsearch.xpack.application.connector.ConnectorIndexService.CONNECTOR_INDEX_NAME;
import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.CONNECTORS_ALLOWED_PRODUCT_ORIGINS;
import static org.elasticsearch.xpack.core.ClientHelper.CONNECTORS_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.CONNECTORS_ORIGIN;
/** /**
@ -81,7 +85,17 @@ public class ConnectorSyncJobIndexService {
// The client to interact with the system index (internal user). // The client to interact with the system index (internal user).
private final Client clientWithOrigin; private final Client clientWithOrigin;
public static final String CONNECTOR_SYNC_JOB_INDEX_NAME = ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN; // TODO use proper version IDs (see org/elasticsearch/xpack/application/rules/QueryRulesIndexService.java)
// TODO if this version is updated, a test should be added to
// javaRestTest/java/org/elasticsearch/xpack/application/FullClusterRestartIT.java
private static final int CONNECTOR_SYNC_JOB_INDEX_VERSION = 1;
public static final String CONNECTOR_SYNC_JOB_INDEX_NAME = ".elastic-connectors-sync-jobs";
public static final String CONNECTOR_SYNC_JOB_INDEX_PREFIX = ".elastic-connectors-sync-jobs-v";
public static final String CONNECTOR_SYNC_JOB_CONCRETE_INDEX_NAME = CONNECTOR_SYNC_JOB_INDEX_PREFIX + CONNECTOR_SYNC_JOB_INDEX_VERSION;
public static final String CONNECTOR_SYNC_JOB_INDEX_NAME_PATTERN = CONNECTOR_SYNC_JOB_INDEX_NAME + "*";
private static final String CONNECTOR_SYNC_JOB_MAPPING_VERSION_VARIABLE = "elastic-connectors-sync-jobs.version";
private static final String CONNECTOR_SYNC_JOB_MAPPING_MANAGED_VERSION_VARIABLE = "elastic-connectors-sync-jobs.managed.index.version";
/** /**
* @param client A client for executing actions on the connectors sync jobs index. * @param client A client for executing actions on the connectors sync jobs index.
@ -90,6 +104,35 @@ public class ConnectorSyncJobIndexService {
this.clientWithOrigin = new OriginSettingClient(client, CONNECTORS_ORIGIN); this.clientWithOrigin = new OriginSettingClient(client, CONNECTORS_ORIGIN);
} }
/**
* Returns the {@link SystemIndexDescriptor} for the Connector system index.
*
* @return The {@link SystemIndexDescriptor} for the Connector system index.
*/
public static SystemIndexDescriptor getSystemIndexDescriptor() {
PutIndexTemplateRequest request = new PutIndexTemplateRequest();
String templateSource = TemplateUtils.loadTemplate(
"/elastic-connectors-sync-jobs.json",
Version.CURRENT.toString(),
CONNECTOR_SYNC_JOB_MAPPING_VERSION_VARIABLE,
Map.of(CONNECTOR_SYNC_JOB_MAPPING_MANAGED_VERSION_VARIABLE, Integer.toString(CONNECTOR_SYNC_JOB_INDEX_VERSION))
);
request.source(templateSource, XContentType.JSON);
return SystemIndexDescriptor.builder()
.setIndexPattern(CONNECTOR_SYNC_JOB_INDEX_NAME_PATTERN)
.setPrimaryIndex(CONNECTOR_SYNC_JOB_CONCRETE_INDEX_NAME)
.setAliasName(CONNECTOR_SYNC_JOB_INDEX_NAME)
.setDescription("Search connectors sync jobs")
.setMappings(request.mappings())
.setSettings(request.settings())
.setOrigin(CONNECTORS_ORIGIN)
.setType(SystemIndexDescriptor.Type.EXTERNAL_MANAGED)
.setAllowedElasticProductOrigins(CONNECTORS_ALLOWED_PRODUCT_ORIGINS)
.setNetNew()
.build();
}
/** /**
* @param request Request for creating a connector sync job. * @param request Request for creating a connector sync job.
* @param listener Listener to respond to a successful response or an error. * @param listener Listener to respond to a successful response or an error.

View file

@ -14,7 +14,9 @@ import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Tuple; import org.elasticsearch.core.Tuple;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.MockScriptPlugin; import org.elasticsearch.script.MockScriptPlugin;
@ -59,7 +61,6 @@ import static org.elasticsearch.xpack.application.connector.ConnectorTemplateReg
import static org.elasticsearch.xpack.application.connector.ConnectorTestUtils.getRandomConnectorFeatures; import static org.elasticsearch.xpack.application.connector.ConnectorTestUtils.getRandomConnectorFeatures;
import static org.elasticsearch.xpack.application.connector.ConnectorTestUtils.getRandomCronExpression; import static org.elasticsearch.xpack.application.connector.ConnectorTestUtils.getRandomCronExpression;
import static org.elasticsearch.xpack.application.connector.ConnectorTestUtils.randomConnectorFeatureEnabled; import static org.elasticsearch.xpack.application.connector.ConnectorTestUtils.randomConnectorFeatureEnabled;
import static org.elasticsearch.xpack.application.connector.ConnectorTestUtils.registerSimplifiedConnectorIndexTemplates;
import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.not;
@ -72,7 +73,6 @@ public class ConnectorIndexServiceTests extends ESSingleNodeTestCase {
@Before @Before
public void setup() { public void setup() {
registerSimplifiedConnectorIndexTemplates(indicesAdmin());
this.connectorIndexService = new ConnectorIndexService(client()); this.connectorIndexService = new ConnectorIndexService(client());
} }
@ -80,6 +80,7 @@ public class ConnectorIndexServiceTests extends ESSingleNodeTestCase {
protected Collection<Class<? extends Plugin>> getPlugins() { protected Collection<Class<? extends Plugin>> getPlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.getPlugins()); List<Class<? extends Plugin>> plugins = new ArrayList<>(super.getPlugins());
plugins.add(MockPainlessScriptEngine.TestPlugin.class); plugins.add(MockPainlessScriptEngine.TestPlugin.class);
plugins.add(ConnectorIndexServiceTests.TestPlugin.class);
return plugins; return plugins;
} }
@ -1612,4 +1613,24 @@ public class ConnectorIndexServiceTests extends ESSingleNodeTestCase {
} }
} }
/**
* Test plugin to register the {@link ConnectorIndexService} system index descriptor.
*/
public static class TestPlugin extends Plugin implements SystemIndexPlugin {
@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return List.of(ConnectorIndexService.getSystemIndexDescriptor());
}
@Override
public String getFeatureName() {
return this.getClass().getSimpleName();
}
@Override
public String getFeatureDescription() {
return this.getClass().getCanonicalName();
}
}
} }

View file

@ -55,15 +55,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.ACCESS_CONTROL_INDEX_NAME_PATTERN; import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.ACCESS_CONTROL_INDEX_NAME_PATTERN;
import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.CONNECTOR_INDEX_NAME_PATTERN;
import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN;
import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -92,14 +90,6 @@ public class ConnectorTemplateRegistryTests extends ESTestCase {
DiscoveryNode node = DiscoveryNodeUtils.create("node"); DiscoveryNode node = DiscoveryNodeUtils.create("node");
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
Map<String, Integer> existingComponentTemplates = Map.of( Map<String, Integer> existingComponentTemplates = Map.of(
ConnectorTemplateRegistry.CONNECTOR_TEMPLATE_NAME + "-mappings",
ConnectorTemplateRegistry.REGISTRY_VERSION,
ConnectorTemplateRegistry.CONNECTOR_TEMPLATE_NAME + "-settings",
ConnectorTemplateRegistry.REGISTRY_VERSION,
ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_TEMPLATE_NAME + "-mappings",
ConnectorTemplateRegistry.REGISTRY_VERSION,
ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_TEMPLATE_NAME + "-settings",
ConnectorTemplateRegistry.REGISTRY_VERSION,
ConnectorTemplateRegistry.ACCESS_CONTROL_TEMPLATE_NAME, ConnectorTemplateRegistry.ACCESS_CONTROL_TEMPLATE_NAME,
ConnectorTemplateRegistry.REGISTRY_VERSION ConnectorTemplateRegistry.REGISTRY_VERSION
); );
@ -125,131 +115,6 @@ public class ConnectorTemplateRegistryTests extends ESTestCase {
}); });
} }
public void testThatNonExistingComponentTemplatesAreAddedImmediately() throws Exception {
DiscoveryNode node = DiscoveryNodeUtils.create("node");
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
ClusterChangedEvent event = createClusterChangedEvent(
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(ConnectorTemplateRegistry.SEARCH_DEFAULT_PIPELINE_NAME, ConnectorTemplateRegistry.REGISTRY_VERSION),
Collections.emptyMap(),
nodes
);
AtomicInteger calledTimes = new AtomicInteger(0);
client.setVerifier((action, request, listener) -> verifyComponentTemplateInstalled(calledTimes, action, request, listener));
registry.clusterChanged(event);
assertBusy(() -> assertThat(calledTimes.get(), equalTo(registry.getComponentTemplateConfigs().size())));
calledTimes.set(0);
// attempting to register the event multiple times as a race condition can yield this test flaky, namely:
// when calling registry.clusterChanged(newEvent) the templateCreationsInProgress state that the IndexTemplateRegistry maintains
// might've not yet been updated to reflect that the first template registration was complete, so a second template registration
// will not be issued anymore, leaving calledTimes to 0
assertBusy(() -> {
// now delete all templates from the cluster state and let's retry
ClusterChangedEvent newEvent = createClusterChangedEvent(Collections.emptyMap(), Collections.emptyMap(), nodes);
registry.clusterChanged(newEvent);
assertThat(calledTimes.get(), greaterThan(4));
});
}
public void testThatVersionedOldComponentTemplatesAreUpgraded() throws Exception {
DiscoveryNode node = DiscoveryNodeUtils.create("node");
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
ClusterChangedEvent event = createClusterChangedEvent(
Collections.emptyMap(),
Collections.singletonMap(
ConnectorTemplateRegistry.CONNECTOR_TEMPLATE_NAME + "-settings",
ConnectorTemplateRegistry.REGISTRY_VERSION - 1
),
Collections.singletonMap(ConnectorTemplateRegistry.SEARCH_DEFAULT_PIPELINE_NAME, ConnectorTemplateRegistry.REGISTRY_VERSION),
Collections.emptyMap(),
nodes
);
AtomicInteger calledTimes = new AtomicInteger(0);
client.setVerifier((action, request, listener) -> verifyComponentTemplateInstalled(calledTimes, action, request, listener));
registry.clusterChanged(event);
assertBusy(() -> assertThat(calledTimes.get(), equalTo(registry.getComponentTemplateConfigs().size())));
}
public void testThatUnversionedOldComponentTemplatesAreUpgraded() throws Exception {
DiscoveryNode node = DiscoveryNodeUtils.create("node");
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
ClusterChangedEvent event = createClusterChangedEvent(
Collections.emptyMap(),
Collections.singletonMap(ConnectorTemplateRegistry.CONNECTOR_TEMPLATE_NAME + "-mappings", null),
Collections.singletonMap(ConnectorTemplateRegistry.SEARCH_DEFAULT_PIPELINE_NAME, ConnectorTemplateRegistry.REGISTRY_VERSION),
Collections.emptyMap(),
nodes
);
AtomicInteger calledTimes = new AtomicInteger(0);
client.setVerifier((action, request, listener) -> verifyComponentTemplateInstalled(calledTimes, action, request, listener));
registry.clusterChanged(event);
assertBusy(() -> assertThat(calledTimes.get(), equalTo(registry.getComponentTemplateConfigs().size())));
}
public void testSameOrHigherVersionComponentTemplateNotUpgraded() {
DiscoveryNode node = DiscoveryNodeUtils.create("node");
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
Map<String, Integer> versions = new HashMap<>();
versions.put(ConnectorTemplateRegistry.CONNECTOR_TEMPLATE_NAME + "-mappings", ConnectorTemplateRegistry.REGISTRY_VERSION);
versions.put(ConnectorTemplateRegistry.CONNECTOR_TEMPLATE_NAME + "-settings", ConnectorTemplateRegistry.REGISTRY_VERSION);
versions.put(ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_TEMPLATE_NAME + "-mappings", ConnectorTemplateRegistry.REGISTRY_VERSION);
versions.put(ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_TEMPLATE_NAME + "-settings", ConnectorTemplateRegistry.REGISTRY_VERSION);
versions.put(ConnectorTemplateRegistry.ACCESS_CONTROL_TEMPLATE_NAME, ConnectorTemplateRegistry.REGISTRY_VERSION);
ClusterChangedEvent sameVersionEvent = createClusterChangedEvent(Collections.emptyMap(), versions, nodes);
client.setVerifier((action, request, listener) -> {
if (action == PutPipelineTransportAction.TYPE) {
// Ignore this, it's verified in another test
return AcknowledgedResponse.TRUE;
}
if (action instanceof PutComponentTemplateAction) {
fail("template should not have been re-installed");
return null;
} else if (action == ILMActions.PUT) {
// Ignore this, it's verified in another test
return AcknowledgedResponse.TRUE;
} else if (action == TransportPutComposableIndexTemplateAction.TYPE) {
// Ignore this, it's verified in another test
return AcknowledgedResponse.TRUE;
} else {
fail("client called with unexpected request:" + request.toString());
return null;
}
});
registry.clusterChanged(sameVersionEvent);
versions.clear();
versions.put(
ConnectorTemplateRegistry.CONNECTOR_TEMPLATE_NAME + "-mappings",
ConnectorTemplateRegistry.REGISTRY_VERSION + randomIntBetween(0, 1000)
);
versions.put(
ConnectorTemplateRegistry.CONNECTOR_TEMPLATE_NAME + "-settings",
ConnectorTemplateRegistry.REGISTRY_VERSION + randomIntBetween(0, 1000)
);
versions.put(
ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_TEMPLATE_NAME + "-mappings",
ConnectorTemplateRegistry.REGISTRY_VERSION + randomIntBetween(0, 1000)
);
versions.put(
ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_TEMPLATE_NAME + "-settings",
ConnectorTemplateRegistry.REGISTRY_VERSION + randomIntBetween(0, 1000)
);
versions.put(
ConnectorTemplateRegistry.ACCESS_CONTROL_TEMPLATE_NAME,
ConnectorTemplateRegistry.REGISTRY_VERSION + randomIntBetween(0, 1000)
);
ClusterChangedEvent higherVersionEvent = createClusterChangedEvent(Collections.emptyMap(), versions, nodes);
registry.clusterChanged(higherVersionEvent);
}
public void testThatMissingMasterNodeDoesNothing() { public void testThatMissingMasterNodeDoesNothing() {
DiscoveryNode localNode = DiscoveryNodeUtils.create("node"); DiscoveryNode localNode = DiscoveryNodeUtils.create("node");
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").add(localNode).build(); DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").add(localNode).build();
@ -260,7 +125,7 @@ public class ConnectorTemplateRegistryTests extends ESTestCase {
}); });
ClusterChangedEvent event = createClusterChangedEvent( ClusterChangedEvent event = createClusterChangedEvent(
Collections.singletonMap(ConnectorTemplateRegistry.CONNECTOR_TEMPLATE_NAME, null), Collections.singletonMap(ConnectorTemplateRegistry.ACCESS_CONTROL_TEMPLATE_NAME, null),
Collections.emptyMap(), Collections.emptyMap(),
nodes nodes
); );
@ -357,10 +222,7 @@ public class ConnectorTemplateRegistryTests extends ESTestCase {
assertThat(putRequest.indexTemplate().version(), equalTo((long) ConnectorTemplateRegistry.REGISTRY_VERSION)); assertThat(putRequest.indexTemplate().version(), equalTo((long) ConnectorTemplateRegistry.REGISTRY_VERSION));
final List<String> indexPatterns = putRequest.indexTemplate().indexPatterns(); final List<String> indexPatterns = putRequest.indexTemplate().indexPatterns();
assertThat(indexPatterns, hasSize(1)); assertThat(indexPatterns, hasSize(1));
assertThat( assertThat(indexPatterns, contains(ACCESS_CONTROL_INDEX_NAME_PATTERN));
indexPatterns,
contains(oneOf(ACCESS_CONTROL_INDEX_NAME_PATTERN, CONNECTOR_INDEX_NAME_PATTERN, CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN))
);
assertNotNull(listener); assertNotNull(listener);
return new TestPutIndexTemplateResponse(true); return new TestPutIndexTemplateResponse(true);
} else { } else {

View file

@ -7,7 +7,6 @@
package org.elasticsearch.xpack.application.connector; package org.elasticsearch.xpack.application.connector;
import org.elasticsearch.client.internal.IndicesAdminClient;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.XContentType;
@ -27,7 +26,6 @@ import org.elasticsearch.xpack.application.connector.filtering.FilteringRules;
import org.elasticsearch.xpack.application.connector.filtering.FilteringValidation; import org.elasticsearch.xpack.application.connector.filtering.FilteringValidation;
import org.elasticsearch.xpack.application.connector.filtering.FilteringValidationInfo; import org.elasticsearch.xpack.application.connector.filtering.FilteringValidationInfo;
import org.elasticsearch.xpack.application.connector.filtering.FilteringValidationState; import org.elasticsearch.xpack.application.connector.filtering.FilteringValidationState;
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJob;
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobType; import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobType;
import org.elasticsearch.xpack.core.scheduler.Cron; import org.elasticsearch.xpack.core.scheduler.Cron;
@ -47,55 +45,14 @@ import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.ESTestCase.randomFrom;
import static org.elasticsearch.test.ESTestCase.randomInt; import static org.elasticsearch.test.ESTestCase.randomInt;
import static org.elasticsearch.test.ESTestCase.randomList; import static org.elasticsearch.test.ESTestCase.randomList;
import static org.elasticsearch.test.ESTestCase.randomLong;
import static org.elasticsearch.test.ESTestCase.randomLongBetween; import static org.elasticsearch.test.ESTestCase.randomLongBetween;
import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.CONNECTOR_INDEX_NAME_PATTERN; import static org.elasticsearch.test.ESTestCase.randomNonNegativeLong;
import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN; import static org.elasticsearch.test.ESTestCase.randomShort;
import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_TEMPLATE_NAME;
import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.CONNECTOR_TEMPLATE_NAME;
public final class ConnectorTestUtils { public final class ConnectorTestUtils {
public static final String NULL_STRING = null; public static final String NULL_STRING = null;
/**
* Registers index templates for instances of {@link Connector} and {@link ConnectorSyncJob} with essential field mappings. This method
* only includes mappings for fields relevant to test cases, specifying field types to ensure correct ES query logic behavior.
*
* @param indicesAdminClient The Elasticsearch indices admin client used for template registration.
*/
public static void registerSimplifiedConnectorIndexTemplates(IndicesAdminClient indicesAdminClient) {
indicesAdminClient.preparePutTemplate(CONNECTOR_TEMPLATE_NAME)
.setPatterns(List.of(CONNECTOR_INDEX_NAME_PATTERN))
.setVersion(0)
.setMapping(
"service_type",
"type=keyword,store=true",
"status",
"type=keyword,store=true",
"index_name",
"type=keyword,store=true",
"configuration",
"type=object"
)
.get();
indicesAdminClient.preparePutTemplate(CONNECTOR_SYNC_JOBS_TEMPLATE_NAME)
.setPatterns(List.of(CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN))
.setVersion(0)
.setMapping(
"job_type",
"type=keyword,store=true",
"connector.id",
"type=keyword,store=true",
"status",
"type=keyword,store=true"
)
.get();
}
public static PutConnectorAction.Request getRandomPutConnectorActionRequest() { public static PutConnectorAction.Request getRandomPutConnectorActionRequest() {
return new PutConnectorAction.Request( return new PutConnectorAction.Request(
randomAlphaOfLengthBetween(5, 15), randomAlphaOfLengthBetween(5, 15),
@ -144,9 +101,9 @@ public final class ConnectorTestUtils {
return new ConnectorSyncInfo.Builder().setLastAccessControlSyncError(randomFrom(new String[] { null, randomAlphaOfLength(10) })) return new ConnectorSyncInfo.Builder().setLastAccessControlSyncError(randomFrom(new String[] { null, randomAlphaOfLength(10) }))
.setLastAccessControlSyncScheduledAt(randomFrom(new Instant[] { null, ConnectorTestUtils.randomInstant() })) .setLastAccessControlSyncScheduledAt(randomFrom(new Instant[] { null, ConnectorTestUtils.randomInstant() }))
.setLastAccessControlSyncStatus(randomFrom(new ConnectorSyncStatus[] { null, getRandomSyncStatus() })) .setLastAccessControlSyncStatus(randomFrom(new ConnectorSyncStatus[] { null, getRandomSyncStatus() }))
.setLastDeletedDocumentCount(randomLong()) .setLastDeletedDocumentCount(randomNonNegativeLong())
.setLastIncrementalSyncScheduledAt(randomFrom(new Instant[] { null, ConnectorTestUtils.randomInstant() })) .setLastIncrementalSyncScheduledAt(randomFrom(new Instant[] { null, ConnectorTestUtils.randomInstant() }))
.setLastIndexedDocumentCount(randomLong()) .setLastIndexedDocumentCount(randomNonNegativeLong())
.setLastSyncError(randomFrom(new String[] { null, randomAlphaOfLength(10) })) .setLastSyncError(randomFrom(new String[] { null, randomAlphaOfLength(10) }))
.setLastSyncScheduledAt(randomFrom(new Instant[] { null, ConnectorTestUtils.randomInstant() })) .setLastSyncScheduledAt(randomFrom(new Instant[] { null, ConnectorTestUtils.randomInstant() }))
.setLastSyncStatus(randomFrom(new ConnectorSyncStatus[] { null, getRandomSyncStatus() })) .setLastSyncStatus(randomFrom(new ConnectorSyncStatus[] { null, getRandomSyncStatus() }))
@ -197,7 +154,7 @@ public final class ConnectorTestUtils {
public static ConnectorFiltering getRandomConnectorFiltering() { public static ConnectorFiltering getRandomConnectorFiltering() {
Instant currentTimestamp = Instant.now(); Instant currentTimestamp = Instant.now();
int order = randomInt(); int order = randomShort();
return new ConnectorFiltering.Builder().setActive( return new ConnectorFiltering.Builder().setActive(
new FilteringRules.Builder().setAdvancedSnippet( new FilteringRules.Builder().setAdvancedSnippet(

View file

@ -20,8 +20,11 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.ESSingleNodeTestCase;
@ -58,7 +61,6 @@ import java.util.stream.Collectors;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.ACCESS_CONTROL_INDEX_PREFIX; import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.ACCESS_CONTROL_INDEX_PREFIX;
import static org.elasticsearch.xpack.application.connector.ConnectorTestUtils.registerSimplifiedConnectorIndexTemplates;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -86,14 +88,12 @@ public class ConnectorSyncJobIndexServiceTests extends ESSingleNodeTestCase {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.getPlugins()); List<Class<? extends Plugin>> plugins = new ArrayList<>(super.getPlugins());
// Reindex plugin is required for testDeleteAllSyncJobsByConnectorId (supports delete_by_query) // Reindex plugin is required for testDeleteAllSyncJobsByConnectorId (supports delete_by_query)
plugins.add(ReindexPlugin.class); plugins.add(ReindexPlugin.class);
plugins.add(TestPlugin.class);
return plugins; return plugins;
} }
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
registerSimplifiedConnectorIndexTemplates(indicesAdmin());
connectorOneId = createConnector(ConnectorTestUtils.getRandomConnector()); connectorOneId = createConnector(ConnectorTestUtils.getRandomConnector());
connectorTwoId = createConnector(ConnectorTestUtils.getRandomConnector()); connectorTwoId = createConnector(ConnectorTestUtils.getRandomConnector());
connectorThreeId = createConnector(ConnectorTestUtils.getRandomConnectorWithDetachedIndex()); connectorThreeId = createConnector(ConnectorTestUtils.getRandomConnectorWithDetachedIndex());
@ -805,18 +805,18 @@ public class ConnectorSyncJobIndexServiceTests extends ESSingleNodeTestCase {
Instant requestLastSeen = request.getLastSeen(); Instant requestLastSeen = request.getLastSeen();
Map<String, Object> metadata = request.getMetadata(); Map<String, Object> metadata = request.getMetadata();
Long deletedDocumentCountAfterUpdate = (Long) syncJobSourceAfterUpdate.get( Long deletedDocumentCountAfterUpdate = ((Number) syncJobSourceAfterUpdate.get(
ConnectorSyncJob.DELETED_DOCUMENT_COUNT_FIELD.getPreferredName() ConnectorSyncJob.DELETED_DOCUMENT_COUNT_FIELD.getPreferredName()
); )).longValue();
Long indexedDocumentCountAfterUpdate = (Long) syncJobSourceAfterUpdate.get( Long indexedDocumentCountAfterUpdate = ((Number) syncJobSourceAfterUpdate.get(
ConnectorSyncJob.INDEXED_DOCUMENT_COUNT_FIELD.getPreferredName() ConnectorSyncJob.INDEXED_DOCUMENT_COUNT_FIELD.getPreferredName()
); )).longValue();
Long indexedDocumentVolumeAfterUpdate = (Long) syncJobSourceAfterUpdate.get( Long indexedDocumentVolumeAfterUpdate = ((Number) syncJobSourceAfterUpdate.get(
ConnectorSyncJob.INDEXED_DOCUMENT_VOLUME_FIELD.getPreferredName() ConnectorSyncJob.INDEXED_DOCUMENT_VOLUME_FIELD.getPreferredName()
); )).longValue();
Long totalDocumentCountAfterUpdate = (Long) syncJobSourceAfterUpdate.get( Long totalDocumentCountAfterUpdate = ((Number) syncJobSourceAfterUpdate.get(
ConnectorSyncJob.TOTAL_DOCUMENT_COUNT_FIELD.getPreferredName() ConnectorSyncJob.TOTAL_DOCUMENT_COUNT_FIELD.getPreferredName()
); )).longValue();
Instant lastSeenAfterUpdate = Instant.parse( Instant lastSeenAfterUpdate = Instant.parse(
(String) syncJobSourceAfterUpdate.get(ConnectorSyncJob.LAST_SEEN_FIELD.getPreferredName()) (String) syncJobSourceAfterUpdate.get(ConnectorSyncJob.LAST_SEEN_FIELD.getPreferredName())
); );
@ -1411,4 +1411,24 @@ public class ConnectorSyncJobIndexServiceTests extends ESSingleNodeTestCase {
// wait 10 seconds for connector creation // wait 10 seconds for connector creation
return index.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getId(); return index.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getId();
} }
/**
* Test plugin to register the {@link ConnectorSyncJobIndexService} system index descriptor.
*/
public static class TestPlugin extends Plugin implements SystemIndexPlugin {
@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return List.of(ConnectorSyncJobIndexService.getSystemIndexDescriptor());
}
@Override
public String getFeatureName() {
return this.getClass().getSimpleName();
}
@Override
public String getFeatureDescription() {
return this.getClass().getCanonicalName();
}
}
} }

View file

@ -36,7 +36,7 @@ import static org.elasticsearch.test.ESTestCase.randomInstantBetween;
import static org.elasticsearch.test.ESTestCase.randomInt; import static org.elasticsearch.test.ESTestCase.randomInt;
import static org.elasticsearch.test.ESTestCase.randomLong; import static org.elasticsearch.test.ESTestCase.randomLong;
import static org.elasticsearch.test.ESTestCase.randomMap; import static org.elasticsearch.test.ESTestCase.randomMap;
import static org.elasticsearch.test.ESTestCase.randomNonNegativeLong; import static org.elasticsearch.test.ESTestCase.randomNonNegativeInt;
public class ConnectorSyncJobTestUtils { public class ConnectorSyncJobTestUtils {
@ -51,11 +51,11 @@ public class ConnectorSyncJobTestUtils {
.setCompletedAt(randomFrom(new Instant[] { null, randomInstantBetween(lowerBoundInstant, upperBoundInstant) })) .setCompletedAt(randomFrom(new Instant[] { null, randomInstantBetween(lowerBoundInstant, upperBoundInstant) }))
.setConnector(ConnectorTestUtils.getRandomSyncJobConnectorInfo()) .setConnector(ConnectorTestUtils.getRandomSyncJobConnectorInfo())
.setCreatedAt(randomInstantBetween(lowerBoundInstant, upperBoundInstant)) .setCreatedAt(randomInstantBetween(lowerBoundInstant, upperBoundInstant))
.setDeletedDocumentCount(randomLong()) .setDeletedDocumentCount(randomNonNegativeInt())
.setError(randomFrom(new String[] { null, randomAlphaOfLength(10) })) .setError(randomFrom(new String[] { null, randomAlphaOfLength(10) }))
.setId(randomAlphaOfLength(10)) .setId(randomAlphaOfLength(10))
.setIndexedDocumentCount(randomLong()) .setIndexedDocumentCount(randomNonNegativeInt())
.setIndexedDocumentVolume(randomLong()) .setIndexedDocumentVolume(randomNonNegativeInt())
.setJobType(getRandomConnectorJobType()) .setJobType(getRandomConnectorJobType())
.setLastSeen(randomFrom(new Instant[] { null, randomInstantBetween(lowerBoundInstant, upperBoundInstant) })) .setLastSeen(randomFrom(new Instant[] { null, randomInstantBetween(lowerBoundInstant, upperBoundInstant) }))
.setMetadata( .setMetadata(
@ -67,7 +67,7 @@ public class ConnectorSyncJobTestUtils {
) )
.setStartedAt(randomFrom(new Instant[] { null, randomInstantBetween(lowerBoundInstant, upperBoundInstant) })) .setStartedAt(randomFrom(new Instant[] { null, randomInstantBetween(lowerBoundInstant, upperBoundInstant) }))
.setStatus(ConnectorTestUtils.getRandomSyncStatus()) .setStatus(ConnectorTestUtils.getRandomSyncStatus())
.setTotalDocumentCount(randomLong()) .setTotalDocumentCount(randomNonNegativeInt())
.setTriggerMethod(getRandomConnectorSyncJobTriggerMethod()) .setTriggerMethod(getRandomConnectorSyncJobTriggerMethod())
.setWorkerHostname(randomAlphaOfLength(10)) .setWorkerHostname(randomAlphaOfLength(10))
.build(); .build();
@ -156,10 +156,10 @@ public class ConnectorSyncJobTestUtils {
return new UpdateConnectorSyncJobIngestionStatsAction.Request( return new UpdateConnectorSyncJobIngestionStatsAction.Request(
randomAlphaOfLength(10), randomAlphaOfLength(10),
randomNonNegativeLong(), (long) randomNonNegativeInt(),
randomNonNegativeLong(), (long) randomNonNegativeInt(),
randomNonNegativeLong(), (long) randomNonNegativeInt(),
randomNonNegativeLong(), (long) randomNonNegativeInt(),
randomInstantBetween(lowerBoundInstant, upperBoundInstant), randomInstantBetween(lowerBoundInstant, upperBoundInstant),
randomMap(2, 3, () -> new Tuple<>(randomAlphaOfLength(4), randomAlphaOfLength(4))) randomMap(2, 3, () -> new Tuple<>(randomAlphaOfLength(4), randomAlphaOfLength(4)))
); );
@ -173,10 +173,10 @@ public class ConnectorSyncJobTestUtils {
return new UpdateConnectorSyncJobIngestionStatsAction.Request( return new UpdateConnectorSyncJobIngestionStatsAction.Request(
syncJobId, syncJobId,
randomNonNegativeLong(), (long) randomNonNegativeInt(),
randomNonNegativeLong(), (long) randomNonNegativeInt(),
randomNonNegativeLong(), (long) randomNonNegativeInt(),
randomNonNegativeLong(), (long) randomNonNegativeInt(),
randomInstantBetween(lowerBoundInstant, upperBoundInstant), randomInstantBetween(lowerBoundInstant, upperBoundInstant),
randomMap(2, 3, () -> new Tuple<>(randomAlphaOfLength(4), randomAlphaOfLength(4))) randomMap(2, 3, () -> new Tuple<>(randomAlphaOfLength(4), randomAlphaOfLength(4)))
); );