[Enterprise Search] Add reasonable error handling for connector index absence (#161729)

This adds reasonable error handling and returns for connectors in case
the indices haven't been created yet. Specifically:

- Creating a connector or sync job targets the concrete index so the
index template can be triggered.
- Fetches of connectors or sync jobs return undefined or empty arrays if
the index doesn't exist.
This commit is contained in:
Sander Philipse 2023-07-12 18:34:14 +08:00 committed by GitHub
parent 707a637f42
commit 90b7b1a3c8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 267 additions and 1169 deletions

View file

@ -199,6 +199,20 @@ export const DEFAULT_PIPELINE_VALUES: IngestPipelineParams = {
run_ml_inference: false,
};
export interface DefaultConnectorsPipelineMeta {
default_extract_binary_content: boolean;
default_name: string;
default_reduce_whitespace: boolean;
default_run_ml_inference: boolean;
}
export const defaultConnectorsPipelineMeta: DefaultConnectorsPipelineMeta = {
default_extract_binary_content: DEFAULT_PIPELINE_VALUES.extract_binary_content,
default_name: DEFAULT_PIPELINE_NAME,
default_reduce_whitespace: DEFAULT_PIPELINE_VALUES.reduce_whitespace,
default_run_ml_inference: DEFAULT_PIPELINE_VALUES.run_ml_inference,
};
export enum INGESTION_METHOD_IDS {
API = 'api',
CONNECTOR = 'connector',

View file

@ -51,5 +51,6 @@ export const config: PluginConfigDescriptor<ConfigType> = {
export const CONNECTORS_INDEX = '.elastic-connectors';
export const CURRENT_CONNECTORS_INDEX = '.elastic-connectors-v1';
export const CONNECTORS_JOBS_INDEX = '.elastic-connectors-sync-jobs';
export const CURRENT_CONNECTORS_JOB_INDEX = '.elastic-connectors-v1';
export const CONNECTORS_VERSION = 1;
export const CRAWLERS_INDEX = '.ent-search-actastic-crawler2_configurations_v2';

View file

@ -1,399 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { CONNECTORS_INDEX, CONNECTORS_JOBS_INDEX, CONNECTORS_VERSION } from '..';
import { defaultConnectorsPipelineMeta, setupConnectorsIndices } from './setup_indices';
describe('Setup Indices', () => {
const mockClient = {
asCurrentUser: {
indices: {
create: jest.fn(),
get: jest.fn(),
stats: jest.fn(),
updateAliases: jest.fn(),
},
search: jest.fn(),
},
asInternalUser: {},
};
const connectorsIndexName = `${CONNECTORS_INDEX}-v${CONNECTORS_VERSION}`;
const jobsIndexName = `${CONNECTORS_JOBS_INDEX}-v${CONNECTORS_VERSION}`;
const connectorsMappings = {
_meta: {
version: CONNECTORS_VERSION,
pipeline: defaultConnectorsPipelineMeta,
},
dynamic: false,
properties: {
api_key_id: {
type: 'keyword',
},
configuration: {
type: 'object',
},
custom_scheduling: {
type: 'object',
},
description: { type: 'text' },
error: { type: 'keyword' },
features: {
properties: {
filtering_advanced_config: { type: 'boolean' },
filtering_rules: { type: 'boolean' },
incremental_sync: {
properties: {
enabled: { type: 'boolean' },
},
},
sync_rules: {
properties: {
basic: {
properties: {
enabled: { type: 'boolean' },
},
},
advanced: {
properties: {
enabled: { type: 'boolean' },
},
},
},
},
},
},
filtering: {
properties: {
active: {
properties: {
advanced_snippet: {
properties: {
created_at: { type: 'date' },
updated_at: { type: 'date' },
value: { type: 'object' },
},
},
rules: {
properties: {
created_at: { type: 'date' },
field: { type: 'keyword' },
id: { type: 'keyword' },
order: { type: 'short' },
policy: { type: 'keyword' },
rule: { type: 'keyword' },
updated_at: { type: 'date' },
value: { type: 'keyword' },
},
},
validation: {
properties: {
errors: {
properties: {
ids: { type: 'keyword' },
messages: { type: 'text' },
},
},
state: { type: 'keyword' },
},
},
},
},
domain: { type: 'keyword' },
draft: {
properties: {
advanced_snippet: {
properties: {
created_at: { type: 'date' },
updated_at: { type: 'date' },
value: { type: 'object' },
},
},
rules: {
properties: {
created_at: { type: 'date' },
field: { type: 'keyword' },
id: { type: 'keyword' },
order: { type: 'short' },
policy: { type: 'keyword' },
rule: { type: 'keyword' },
updated_at: { type: 'date' },
value: { type: 'keyword' },
},
},
validation: {
properties: {
errors: {
properties: {
ids: { type: 'keyword' },
messages: { type: 'text' },
},
},
state: { type: 'keyword' },
},
},
},
},
},
},
index_name: { type: 'keyword' },
is_native: { type: 'boolean' },
language: { type: 'keyword' },
last_access_control_sync_error: { type: 'keyword' },
last_access_control_sync_scheduled_at: { type: 'date' },
last_access_control_sync_status: { type: 'keyword' },
last_deleted_document_count: { type: 'long' },
last_incremental_sync_scheduled_at: { type: 'date' },
last_indexed_document_count: { type: 'long' },
last_seen: { type: 'date' },
last_sync_error: { type: 'keyword' },
last_sync_scheduled_at: { type: 'date' },
last_sync_status: { type: 'keyword' },
last_synced: { type: 'date' },
name: { type: 'keyword' },
pipeline: {
properties: {
extract_binary_content: { type: 'boolean' },
name: { type: 'keyword' },
reduce_whitespace: { type: 'boolean' },
run_ml_inference: { type: 'boolean' },
},
},
scheduling: {
properties: {
access_control: {
properties: {
enabled: { type: 'boolean' },
interval: { type: 'text' },
},
},
incremental: {
properties: {
enabled: { type: 'boolean' },
interval: { type: 'text' },
},
},
full: {
properties: {
enabled: { type: 'boolean' },
interval: { type: 'text' },
},
},
},
},
service_type: { type: 'keyword' },
status: { type: 'keyword' },
sync_cursor: { type: 'object' },
sync_now: { type: 'boolean' },
},
};
const connectorsJobsMappings = {
_meta: {
version: CONNECTORS_VERSION,
},
dynamic: false,
properties: {
cancelation_requested_at: { type: 'date' },
canceled_at: { type: 'date' },
completed_at: { type: 'date' },
connector: {
properties: {
configuration: { type: 'object' },
filtering: {
properties: {
advanced_snippet: {
properties: {
created_at: { type: 'date' },
updated_at: { type: 'date' },
value: { type: 'object' },
},
},
domain: { type: 'keyword' },
rules: {
properties: {
created_at: { type: 'date' },
field: { type: 'keyword' },
id: { type: 'keyword' },
order: { type: 'short' },
policy: { type: 'keyword' },
rule: { type: 'keyword' },
updated_at: { type: 'date' },
value: { type: 'keyword' },
},
},
warnings: {
properties: {
ids: { type: 'keyword' },
messages: { type: 'text' },
},
},
},
},
id: { type: 'keyword' },
index_name: { type: 'keyword' },
language: { type: 'keyword' },
pipeline: {
properties: {
extract_binary_content: { type: 'boolean' },
name: { type: 'keyword' },
reduce_whitespace: { type: 'boolean' },
run_ml_inference: { type: 'boolean' },
},
},
service_type: { type: 'keyword' },
sync_cursor: { type: 'object' },
},
},
created_at: { type: 'date' },
deleted_document_count: { type: 'integer' },
error: { type: 'keyword' },
indexed_document_count: { type: 'integer' },
indexed_document_volume: { type: 'integer' },
job_type: { type: 'keyword' },
last_seen: { type: 'date' },
metadata: { type: 'object' },
started_at: { type: 'date' },
status: {
type: 'keyword',
},
total_document_count: { type: 'integer' },
trigger_method: { type: 'keyword' },
worker_hostname: { type: 'keyword' },
},
};
beforeEach(() => {
jest.clearAllMocks();
});
describe('setupConnectorsIndices', () => {
it('should do nothing if indices exist', async () => {
const result = {
[connectorsIndexName]: {
mappings: {
_meta: {
version: CONNECTORS_VERSION,
},
},
},
[jobsIndexName]: {
mappings: {
_meta: {
version: CONNECTORS_VERSION,
},
},
},
};
mockClient.asCurrentUser.indices.get.mockImplementation(() => Promise.resolve(result));
mockClient.asCurrentUser.indices.create.mockImplementation(() => Promise.resolve());
mockClient.asCurrentUser.indices.updateAliases.mockImplementation(() => Promise.resolve());
await expect(setupConnectorsIndices(mockClient.asCurrentUser as any)).resolves.toEqual(
undefined
);
expect(mockClient.asCurrentUser.indices.create).not.toHaveBeenCalled();
expect(mockClient.asCurrentUser.indices.updateAliases).not.toHaveBeenCalled();
});
it('should do nothing if it hits race condition exist', async () => {
const result = {
[connectorsIndexName]: {
mappings: {
_meta: {
version: CONNECTORS_VERSION,
},
},
},
[jobsIndexName]: {
mappings: {
_meta: {
version: CONNECTORS_VERSION,
},
},
},
};
mockClient.asCurrentUser.indices.get.mockImplementation(() => Promise.resolve(result));
mockClient.asCurrentUser.indices.create.mockImplementation(() =>
Promise.reject({ meta: { body: { error: { type: 'resource_already_exists_exception' } } } })
);
mockClient.asCurrentUser.indices.updateAliases.mockImplementation(() => Promise.resolve());
await expect(setupConnectorsIndices(mockClient.asCurrentUser as any)).resolves.toEqual(
undefined
);
expect(mockClient.asCurrentUser.indices.create).not.toHaveBeenCalled();
expect(mockClient.asCurrentUser.indices.updateAliases).not.toHaveBeenCalled();
});
it('should create new index and update alias if connectors index does not exist', async () => {
const result = {
[jobsIndexName]: {
mappings: {
_meta: {
version: CONNECTORS_VERSION,
},
},
},
};
mockClient.asCurrentUser.indices.get.mockImplementation(() => Promise.resolve(result));
mockClient.asCurrentUser.indices.create.mockImplementation(() => Promise.resolve());
mockClient.asCurrentUser.indices.updateAliases.mockImplementation(() => Promise.resolve());
await expect(setupConnectorsIndices(mockClient.asCurrentUser as any)).resolves.toEqual(
undefined
);
expect(mockClient.asCurrentUser.indices.create).toHaveBeenCalledWith({
index: connectorsIndexName,
mappings: connectorsMappings,
settings: { auto_expand_replicas: '0-3', hidden: true, number_of_replicas: 0 },
});
expect(mockClient.asCurrentUser.indices.updateAliases).toHaveBeenCalledWith({
actions: [
{
add: {
aliases: [CONNECTORS_INDEX],
index: `${CONNECTORS_INDEX}-v${CONNECTORS_VERSION}`,
is_hidden: true,
is_write_index: true,
},
},
],
});
});
it('should create new jobs index and update alias if jobs index does not exist', async () => {
const result = {
[connectorsIndexName]: {
mappings: {
_meta: {
version: CONNECTORS_VERSION,
},
},
},
};
mockClient.asCurrentUser.indices.get.mockImplementation(() => Promise.resolve(result));
mockClient.asCurrentUser.indices.create.mockImplementation(() => Promise.resolve());
mockClient.asCurrentUser.indices.updateAliases.mockImplementation(() => Promise.resolve());
await expect(setupConnectorsIndices(mockClient.asCurrentUser as any)).resolves.toEqual(
undefined
);
expect(mockClient.asCurrentUser.indices.create).toHaveBeenCalledWith({
index: jobsIndexName,
mappings: connectorsJobsMappings,
settings: { auto_expand_replicas: '0-3', hidden: true, number_of_replicas: 0 },
});
expect(mockClient.asCurrentUser.indices.updateAliases).toHaveBeenCalledWith({
actions: [
{
add: {
aliases: [CONNECTORS_JOBS_INDEX],
index: `${CONNECTORS_JOBS_INDEX}-v${CONNECTORS_VERSION}`,
is_hidden: true,
is_write_index: true,
},
},
],
});
});
});
});

View file

@ -1,340 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
IndicesIndexSettings,
MappingProperty,
MappingTypeMapping,
} from '@elastic/elasticsearch/lib/api/types';
import { ElasticsearchClient } from '@kbn/core/server';
import { CONNECTORS_INDEX } from '..';
import { isResourceAlreadyExistsException } from '../utils/identify_exceptions';
export enum SETUP_ERRORS {
'insufficient_permissions',
'index_already_exists',
}
interface IndexDefinition {
aliases: string[];
mappings: MappingTypeMapping;
name: string;
settings: IndicesIndexSettings;
}
const connectorMappingsProperties: Record<string, MappingProperty> = {
api_key_id: { type: 'keyword' },
configuration: { type: 'object' },
custom_scheduling: { type: 'object' },
description: { type: 'text' },
error: { type: 'keyword' },
features: {
properties: {
filtering_advanced_config: { type: 'boolean' },
filtering_rules: { type: 'boolean' },
incremental_sync: {
properties: {
enabled: { type: 'boolean' },
},
},
sync_rules: {
properties: {
basic: {
properties: {
enabled: { type: 'boolean' },
},
},
advanced: {
properties: {
enabled: { type: 'boolean' },
},
},
},
},
},
},
filtering: {
properties: {
active: {
properties: {
advanced_snippet: {
properties: {
created_at: { type: 'date' },
updated_at: { type: 'date' },
value: { type: 'object' },
},
},
rules: {
properties: {
created_at: { type: 'date' },
field: { type: 'keyword' },
id: { type: 'keyword' },
order: { type: 'short' },
policy: { type: 'keyword' },
rule: { type: 'keyword' },
updated_at: { type: 'date' },
value: { type: 'keyword' },
},
},
validation: {
properties: {
errors: {
properties: {
ids: { type: 'keyword' },
messages: { type: 'text' },
},
},
state: { type: 'keyword' },
},
},
},
},
domain: { type: 'keyword' },
draft: {
properties: {
advanced_snippet: {
properties: {
created_at: { type: 'date' },
updated_at: { type: 'date' },
value: { type: 'object' },
},
},
rules: {
properties: {
created_at: { type: 'date' },
field: { type: 'keyword' },
id: { type: 'keyword' },
order: { type: 'short' },
policy: { type: 'keyword' },
rule: { type: 'keyword' },
updated_at: { type: 'date' },
value: { type: 'keyword' },
},
},
validation: {
properties: {
errors: {
properties: {
ids: { type: 'keyword' },
messages: { type: 'text' },
},
},
state: { type: 'keyword' },
},
},
},
},
},
},
index_name: { type: 'keyword' },
is_native: { type: 'boolean' },
language: { type: 'keyword' },
last_access_control_sync_error: { type: 'keyword' },
last_access_control_sync_scheduled_at: { type: 'date' },
last_access_control_sync_status: { type: 'keyword' },
last_deleted_document_count: { type: 'long' },
last_incremental_sync_scheduled_at: { type: 'date' },
last_indexed_document_count: { type: 'long' },
last_seen: { type: 'date' },
last_sync_error: { type: 'keyword' },
last_sync_scheduled_at: { type: 'date' },
last_sync_status: { type: 'keyword' },
last_synced: { type: 'date' },
name: { type: 'keyword' },
pipeline: {
properties: {
extract_binary_content: { type: 'boolean' },
name: { type: 'keyword' },
reduce_whitespace: { type: 'boolean' },
run_ml_inference: { type: 'boolean' },
},
},
scheduling: {
properties: {
access_control: {
properties: {
enabled: { type: 'boolean' },
interval: { type: 'text' },
},
},
incremental: {
properties: {
enabled: { type: 'boolean' },
interval: { type: 'text' },
},
},
full: {
properties: {
enabled: { type: 'boolean' },
interval: { type: 'text' },
},
},
},
},
service_type: { type: 'keyword' },
status: { type: 'keyword' },
sync_cursor: { type: 'object' },
sync_now: { type: 'boolean' },
};
const defaultSettings: IndicesIndexSettings = {
auto_expand_replicas: '0-3',
hidden: true,
number_of_replicas: 0,
};
export interface DefaultConnectorsPipelineMeta {
default_extract_binary_content: boolean;
default_name: string;
default_reduce_whitespace: boolean;
default_run_ml_inference: boolean;
}
export const defaultConnectorsPipelineMeta: DefaultConnectorsPipelineMeta = {
default_extract_binary_content: true,
default_name: 'ent-search-generic-ingestion',
default_reduce_whitespace: true,
default_run_ml_inference: true,
};
const indices: IndexDefinition[] = [
{
aliases: ['.elastic-connectors'],
mappings: {
_meta: {
pipeline: defaultConnectorsPipelineMeta,
version: 1,
},
dynamic: false,
properties: connectorMappingsProperties,
},
name: '.elastic-connectors-v1',
settings: defaultSettings,
},
{
aliases: ['.elastic-connectors-sync-jobs'],
mappings: {
_meta: {
version: 1,
},
dynamic: false,
properties: {
cancelation_requested_at: { type: 'date' },
canceled_at: { type: 'date' },
completed_at: { type: 'date' },
connector: {
properties: {
configuration: { type: 'object' },
filtering: {
properties: {
advanced_snippet: {
properties: {
created_at: { type: 'date' },
updated_at: { type: 'date' },
value: { type: 'object' },
},
},
domain: { type: 'keyword' },
rules: {
properties: {
created_at: { type: 'date' },
field: { type: 'keyword' },
id: { type: 'keyword' },
order: { type: 'short' },
policy: { type: 'keyword' },
rule: { type: 'keyword' },
updated_at: { type: 'date' },
value: { type: 'keyword' },
},
},
warnings: {
properties: {
ids: { type: 'keyword' },
messages: { type: 'text' },
},
},
},
},
id: { type: 'keyword' },
index_name: { type: 'keyword' },
language: { type: 'keyword' },
pipeline: {
properties: {
extract_binary_content: { type: 'boolean' },
name: { type: 'keyword' },
reduce_whitespace: { type: 'boolean' },
run_ml_inference: { type: 'boolean' },
},
},
service_type: { type: 'keyword' },
sync_cursor: { type: 'object' },
},
},
created_at: { type: 'date' },
deleted_document_count: { type: 'integer' },
error: { type: 'keyword' },
indexed_document_count: { type: 'integer' },
indexed_document_volume: { type: 'integer' },
job_type: { type: 'keyword' },
last_seen: { type: 'date' },
metadata: { type: 'object' },
started_at: { type: 'date' },
status: { type: 'keyword' },
total_document_count: { type: 'integer' },
trigger_method: { type: 'keyword' },
worker_hostname: { type: 'keyword' },
},
},
name: '.elastic-connectors-sync-jobs-v1',
settings: defaultSettings,
},
];
const createConnectorsIndex = async (
client: ElasticsearchClient,
indexDefinition: IndexDefinition
) => {
try {
const { aliases, mappings, name: index, settings } = indexDefinition;
await client.indices.create({
index,
mappings,
settings,
});
await client.indices.updateAliases({
actions: [
{
add: {
aliases,
index,
is_hidden: true,
is_write_index: true,
},
},
],
});
} catch (error) {
if (isResourceAlreadyExistsException(error)) {
// We hit a race condition, do nothing
return;
}
return error;
}
};
export const setupConnectorsIndices = async (client: ElasticsearchClient) => {
const connectorsIndexResponse = await client.indices.get({
index: `${CONNECTORS_INDEX}*`,
});
for (const indexDefinition of indices) {
if (!connectorsIndexResponse[indexDefinition.name]) {
await createConnectorsIndex(client, indexDefinition);
}
// TODO handle migrations once we start migrating stuff
}
};

View file

@ -7,12 +7,10 @@
import { IScopedClusterClient } from '@kbn/core/server';
import { CONNECTORS_INDEX } from '../..';
import { CURRENT_CONNECTORS_INDEX } from '../..';
import { ConnectorStatus } from '../../../common/types/connectors';
import { ErrorCode } from '../../../common/types/error_codes';
import { setupConnectorsIndices } from '../../index_management/setup_indices';
import { fetchCrawlerByIndexName } from '../crawler/fetch_crawlers';
import { textAnalysisSettings } from '../indices/text_analysis';
@ -20,10 +18,6 @@ import { addConnector } from './add_connector';
import { deleteConnectorById } from './delete_connector';
import { fetchConnectorByIndexName } from './fetch_connectors';
jest.mock('../../index_management/setup_indices', () => ({
setupConnectorsIndices: jest.fn(),
}));
jest.mock('./fetch_connectors', () => ({ fetchConnectorByIndexName: jest.fn() }));
jest.mock('./delete_connector', () => ({ deleteConnectorById: jest.fn() }));
jest.mock('../crawler/fetch_crawlers', () => ({ fetchCrawlerByIndexName: jest.fn() }));
@ -41,11 +35,6 @@ describe('addConnector lib function', () => {
asInternalUser: {},
};
const createConnectorsIndexExistsFn =
(connectorsIndexExists: boolean, defaultValue: boolean) =>
({ index }: { index: string }) =>
index === CONNECTORS_INDEX ? connectorsIndexExists : defaultValue;
beforeEach(() => {
jest.clearAllMocks();
});
@ -68,9 +57,7 @@ describe('addConnector lib function', () => {
it('should add connector', async () => {
mockClient.asCurrentUser.index.mockImplementation(() => ({ _id: 'fakeId' }));
mockClient.asCurrentUser.indices.exists.mockImplementation(
createConnectorsIndexExistsFn(true, false)
);
mockClient.asCurrentUser.indices.exists.mockImplementation(() => false);
(fetchConnectorByIndexName as jest.Mock).mockImplementation(() => undefined);
(fetchCrawlerByIndexName as jest.Mock).mockImplementation(() => undefined);
mockClient.asCurrentUser.indices.getMapping.mockImplementation(() => connectorsIndicesMapping);
@ -169,7 +156,7 @@ describe('addConnector lib function', () => {
status: ConnectorStatus.CREATED,
sync_now: false,
},
index: CONNECTORS_INDEX,
index: CURRENT_CONNECTORS_INDEX,
refresh: 'wait_for',
});
expect(mockClient.asCurrentUser.indices.create).toHaveBeenCalledWith({
@ -181,9 +168,7 @@ describe('addConnector lib function', () => {
it('should reject if index already exists', async () => {
mockClient.asCurrentUser.index.mockImplementation(() => ({ _id: 'fakeId' }));
mockClient.asCurrentUser.indices.exists.mockImplementation(
createConnectorsIndexExistsFn(true, true)
);
mockClient.asCurrentUser.indices.exists.mockImplementation(() => true);
(fetchConnectorByIndexName as jest.Mock).mockImplementation(() => undefined);
(fetchCrawlerByIndexName as jest.Mock).mockImplementation(() => undefined);
mockClient.asCurrentUser.indices.getMapping.mockImplementation(() => connectorsIndicesMapping);
@ -200,9 +185,7 @@ describe('addConnector lib function', () => {
it('should reject if connector already exists', async () => {
mockClient.asCurrentUser.index.mockImplementation(() => ({ _id: 'fakeId' }));
mockClient.asCurrentUser.indices.exists.mockImplementation(
createConnectorsIndexExistsFn(true, false)
);
mockClient.asCurrentUser.indices.exists.mockImplementation(() => false);
(fetchConnectorByIndexName as jest.Mock).mockImplementation(() => true);
(fetchCrawlerByIndexName as jest.Mock).mockImplementation(() => undefined);
mockClient.asCurrentUser.indices.getMapping.mockImplementation(() => connectorsIndicesMapping);
@ -219,9 +202,7 @@ describe('addConnector lib function', () => {
it('should reject if crawler already exists', async () => {
mockClient.asCurrentUser.index.mockImplementation(() => ({ _id: 'fakeId' }));
mockClient.asCurrentUser.indices.exists.mockImplementation(
createConnectorsIndexExistsFn(true, false)
);
mockClient.asCurrentUser.indices.exists.mockImplementation(() => false);
(fetchConnectorByIndexName as jest.Mock).mockImplementation(() => undefined);
(fetchCrawlerByIndexName as jest.Mock).mockImplementation(() => true);
mockClient.asCurrentUser.indices.getMapping.mockImplementation(() => connectorsIndicesMapping);
@ -238,9 +219,7 @@ describe('addConnector lib function', () => {
it('should reject with index already exists if connector and index already exist', async () => {
mockClient.asCurrentUser.index.mockImplementation(() => ({ _id: 'fakeId' }));
mockClient.asCurrentUser.indices.exists.mockImplementation(
createConnectorsIndexExistsFn(true, true)
);
mockClient.asCurrentUser.indices.exists.mockImplementation(() => true);
(fetchConnectorByIndexName as jest.Mock).mockImplementation(() => true);
(fetchCrawlerByIndexName as jest.Mock).mockImplementation(() => undefined);
mockClient.asCurrentUser.indices.getMapping.mockImplementation(() => connectorsIndicesMapping);
@ -257,9 +236,7 @@ describe('addConnector lib function', () => {
it('should replace connector if deleteExistingConnector flag is true', async () => {
mockClient.asCurrentUser.index.mockImplementation(() => ({ _id: 'fakeId' }));
mockClient.asCurrentUser.indices.exists.mockImplementation(
createConnectorsIndexExistsFn(true, false)
);
mockClient.asCurrentUser.indices.exists.mockImplementation(() => false);
(fetchConnectorByIndexName as jest.Mock).mockImplementation(() => ({ id: 'connectorId' }));
(fetchCrawlerByIndexName as jest.Mock).mockImplementation(() => undefined);
mockClient.asCurrentUser.indices.getMapping.mockImplementation(() => connectorsIndicesMapping);
@ -360,7 +337,7 @@ describe('addConnector lib function', () => {
status: ConnectorStatus.CREATED,
sync_now: false,
},
index: CONNECTORS_INDEX,
index: CURRENT_CONNECTORS_INDEX,
refresh: 'wait_for',
});
expect(mockClient.asCurrentUser.indices.create).toHaveBeenCalledWith({
@ -373,116 +350,4 @@ describe('addConnector lib function', () => {
},
});
});
it('should create index if no connectors index exists', async () => {
mockClient.asCurrentUser.indices.exists.mockImplementation(
createConnectorsIndexExistsFn(false, false)
);
(fetchConnectorByIndexName as jest.Mock).mockImplementation(() => false);
(fetchCrawlerByIndexName as jest.Mock).mockImplementation(() => undefined);
mockClient.asCurrentUser.indices.getMapping.mockImplementation(() => connectorsIndicesMapping);
await expect(
addConnector(mockClient as unknown as IScopedClusterClient, {
index_name: 'search-index_name',
is_native: false,
language: 'en',
})
).resolves.toEqual({ id: 'fakeId', index_name: 'search-index_name' });
expect(setupConnectorsIndices as jest.Mock).toHaveBeenCalledWith(mockClient.asCurrentUser);
expect(mockClient.asCurrentUser.index).toHaveBeenCalledWith({
document: {
api_key_id: null,
configuration: {},
custom_scheduling: {},
description: null,
error: null,
features: null,
filtering: [
{
active: {
advanced_snippet: {
created_at: expect.any(String),
updated_at: expect.any(String),
value: {},
},
rules: [
{
created_at: expect.any(String),
field: '_',
id: 'DEFAULT',
order: 0,
policy: 'include',
rule: 'regex',
updated_at: expect.any(String),
value: '.*',
},
],
validation: {
errors: [],
state: 'valid',
},
},
domain: 'DEFAULT',
draft: {
advanced_snippet: {
created_at: expect.any(String),
updated_at: expect.any(String),
value: {},
},
rules: [
{
created_at: expect.any(String),
field: '_',
id: 'DEFAULT',
order: 0,
policy: 'include',
rule: 'regex',
updated_at: expect.any(String),
value: '.*',
},
],
validation: {
errors: [],
state: 'valid',
},
},
},
],
index_name: 'search-index_name',
is_native: false,
language: 'en',
last_access_control_sync_error: null,
last_access_control_sync_scheduled_at: null,
last_access_control_sync_status: null,
last_incremental_sync_scheduled_at: null,
last_seen: null,
last_sync_error: null,
last_sync_scheduled_at: null,
last_sync_status: null,
last_synced: null,
name: 'index_name',
pipeline: {
extract_binary_content: true,
name: 'ent-search-generic-ingestion',
reduce_whitespace: true,
run_ml_inference: false,
},
scheduling: {
access_control: { enabled: false, interval: '0 0 0 * * ?' },
full: { enabled: false, interval: '0 0 0 * * ?' },
incremental: { enabled: false, interval: '0 0 0 * * ?' },
},
service_type: null,
status: ConnectorStatus.CREATED,
sync_now: false,
},
index: CONNECTORS_INDEX,
refresh: 'wait_for',
});
expect(mockClient.asCurrentUser.indices.create).toHaveBeenCalledWith({
index: 'search-index_name',
mappings: {},
settings: { ...textAnalysisSettings('en'), auto_expand_replicas: '0-3', number_of_shards: 2 },
});
});
});

View file

@ -7,17 +7,14 @@
import { IScopedClusterClient } from '@kbn/core/server';
import { CONNECTORS_INDEX, CONNECTORS_VERSION } from '../..';
import { CURRENT_CONNECTORS_INDEX } from '../..';
import { ConnectorDocument } from '../../../common/types/connectors';
import { ErrorCode } from '../../../common/types/error_codes';
import {
DefaultConnectorsPipelineMeta,
setupConnectorsIndices,
} from '../../index_management/setup_indices';
import { createConnectorDocument } from '../../utils/create_connector_document';
import { fetchCrawlerByIndexName } from '../crawler/fetch_crawlers';
import { createIndex } from '../indices/create_index';
import { getDefaultPipeline } from '../pipelines/get_default_pipeline';
import { deleteConnectorById } from './delete_connector';
@ -53,7 +50,7 @@ const createConnector = async (
const result = await client.asCurrentUser.index({
document,
index: CONNECTORS_INDEX,
index: CURRENT_CONNECTORS_INDEX,
refresh: 'wait_for',
});
await createIndex(client, document.index_name, language, false);
@ -71,31 +68,13 @@ export const addConnector = async (
service_type?: string;
}
): Promise<{ id: string; index_name: string }> => {
const connectorsIndexExists = await client.asCurrentUser.indices.exists({
index: CONNECTORS_INDEX,
});
if (!connectorsIndexExists) {
await setupConnectorsIndices(client.asCurrentUser);
}
const connectorsIndicesMapping = await client.asCurrentUser.indices.getMapping({
index: CONNECTORS_INDEX,
});
const pipeline: DefaultConnectorsPipelineMeta =
connectorsIndicesMapping[`${CONNECTORS_INDEX}-v${CONNECTORS_VERSION}`]?.mappings?._meta
?.pipeline;
const pipeline = await getDefaultPipeline(client);
const document = createConnectorDocument({
indexName: input.index_name,
isNative: input.is_native,
language: input.language,
pipeline: pipeline
? {
extract_binary_content: pipeline.default_extract_binary_content,
name: pipeline.default_name,
reduce_whitespace: pipeline.default_reduce_whitespace,
run_ml_inference: pipeline.default_run_ml_inference,
}
: null,
pipeline,
serviceType: input.service_type ?? null,
});

View file

@ -5,16 +5,24 @@
* 2.0.
*/
import { isIndexNotFoundException } from '@kbn/core-saved-objects-migration-server-internal';
import { IScopedClusterClient } from '@kbn/core/server';
import { CONNECTORS_INDEX } from '../..';
export async function fetchConnectorIndexNames(client: IScopedClusterClient): Promise<string[]> {
const result = await client.asCurrentUser.search({
_source: false,
fields: [{ field: 'index_name' }],
index: CONNECTORS_INDEX,
size: 10000,
});
return (result?.hits.hits ?? []).map((field) => field.fields?.index_name[0] ?? '');
try {
const result = await client.asCurrentUser.search({
_source: false,
fields: [{ field: 'index_name' }],
index: CONNECTORS_INDEX,
size: 10000,
});
return (result?.hits.hits ?? []).map((field) => field.fields?.index_name[0] ?? '');
} catch (error) {
if (isIndexNotFoundException(error)) {
return [];
}
throw error;
}
}

View file

@ -6,13 +6,28 @@
*/
import { CONNECTORS_INDEX } from '../..';
import { setupConnectorsIndices } from '../../index_management/setup_indices';
import { fetchConnectorById, fetchConnectorByIndexName, fetchConnectors } from './fetch_connectors';
jest.mock('../../index_management/setup_indices', () => ({
setupConnectorsIndices: jest.fn(),
}));
const indexNotFoundError = {
meta: {
body: {
error: {
type: 'index_not_found_exception',
},
},
},
};
const otherError = {
meta: {
body: {
error: {
type: 'other_error',
},
},
},
};
describe('fetchConnectors lib', () => {
const mockClient = {
@ -46,43 +61,21 @@ describe('fetchConnectors lib', () => {
index: CONNECTORS_INDEX,
});
});
it('should call setup connectors on index not found error', async () => {
mockClient.asCurrentUser.get.mockImplementationOnce(() =>
Promise.reject({
meta: {
body: {
error: {
type: 'index_not_found_exception',
},
},
},
})
);
it('should return undefined on index not found error', async () => {
mockClient.asCurrentUser.get.mockImplementationOnce(() => Promise.reject(indexNotFoundError));
await expect(fetchConnectorById(mockClient as any, 'id')).resolves.toEqual(undefined);
expect(mockClient.asCurrentUser.get).toHaveBeenCalledWith({
id: 'id',
index: CONNECTORS_INDEX,
});
expect(setupConnectorsIndices as jest.Mock).toHaveBeenCalledWith(mockClient.asCurrentUser);
});
it('should not call setup connectors on other errors', async () => {
mockClient.asCurrentUser.get.mockImplementationOnce(() =>
Promise.reject({
meta: {
body: {
error: {
type: 'other error',
},
},
},
})
);
await expect(fetchConnectorById(mockClient as any, 'id')).resolves.toEqual(undefined);
it('should throw on other errors', async () => {
mockClient.asCurrentUser.get.mockImplementationOnce(() => Promise.reject(otherError));
await expect(fetchConnectorById(mockClient as any, 'id')).rejects.toEqual(otherError);
expect(mockClient.asCurrentUser.get).toHaveBeenCalledWith({
id: 'id',
index: CONNECTORS_INDEX,
});
expect(setupConnectorsIndices as jest.Mock).not.toHaveBeenCalled();
});
});
describe('fetch connector by name', () => {
@ -106,15 +99,9 @@ describe('fetchConnectors lib', () => {
},
});
});
it('should call setup connectors on index not found error', async () => {
it('should return undefined on index not found error', async () => {
mockClient.asCurrentUser.search.mockImplementationOnce(() =>
Promise.reject({
meta: {
body: {
error: { type: 'index_not_found_exception' },
},
},
})
Promise.reject(indexNotFoundError)
);
await expect(fetchConnectorByIndexName(mockClient as any, 'id')).resolves.toEqual(undefined);
expect(mockClient.asCurrentUser.search).toHaveBeenCalledWith({
@ -125,21 +112,10 @@ describe('fetchConnectors lib', () => {
},
},
});
expect(setupConnectorsIndices as jest.Mock).toHaveBeenCalledWith(mockClient.asCurrentUser);
});
it('should not call setup connectors on other errors', async () => {
mockClient.asCurrentUser.search.mockImplementationOnce(() =>
Promise.reject({
meta: {
body: {
error: {
type: 'other error',
},
},
},
})
);
await expect(fetchConnectorByIndexName(mockClient as any, 'id')).resolves.toEqual(undefined);
it('should throw on other errors', async () => {
mockClient.asCurrentUser.search.mockImplementationOnce(() => Promise.reject(otherError));
await expect(fetchConnectorByIndexName(mockClient as any, 'id')).rejects.toEqual(otherError);
expect(mockClient.asCurrentUser.search).toHaveBeenCalledWith({
index: CONNECTORS_INDEX,
query: {
@ -148,7 +124,6 @@ describe('fetchConnectors lib', () => {
},
},
});
expect(setupConnectorsIndices as jest.Mock).not.toHaveBeenCalled();
});
});
describe('fetch connectors', () => {
@ -197,15 +172,9 @@ describe('fetchConnectors lib', () => {
});
expect(mockClient.asCurrentUser.search).toHaveBeenCalledTimes(3);
});
it('should call setup connectors on index not found error', async () => {
it('should return empty array on index not found error', async () => {
mockClient.asCurrentUser.search.mockImplementationOnce(() =>
Promise.reject({
meta: {
body: {
error: { type: 'index_not_found_exception' },
},
},
})
Promise.reject(indexNotFoundError)
);
await expect(fetchConnectors(mockClient as any)).resolves.toEqual([]);
expect(mockClient.asCurrentUser.search).toHaveBeenCalledWith({
@ -214,28 +183,16 @@ describe('fetchConnectors lib', () => {
query: { match_all: {} },
size: 1000,
});
expect(setupConnectorsIndices as jest.Mock).toHaveBeenCalledWith(mockClient.asCurrentUser);
});
it('should not call setup connectors on other errors', async () => {
mockClient.asCurrentUser.search.mockImplementationOnce(() =>
Promise.reject({
meta: {
body: {
error: {
type: 'other error',
},
},
},
})
);
await expect(fetchConnectors(mockClient as any)).resolves.toEqual([]);
it('should throw on other errors', async () => {
mockClient.asCurrentUser.search.mockImplementationOnce(() => Promise.reject(otherError));
await expect(fetchConnectors(mockClient as any)).rejects.toEqual(otherError);
expect(mockClient.asCurrentUser.search).toHaveBeenCalledWith({
from: 0,
index: CONNECTORS_INDEX,
query: { match_all: {} },
size: 1000,
});
expect(setupConnectorsIndices as jest.Mock).not.toHaveBeenCalled();
});
});
});

View file

@ -11,8 +11,6 @@ import { IScopedClusterClient } from '@kbn/core/server';
import { CONNECTORS_INDEX } from '../..';
import { Connector, ConnectorDocument } from '../../../common/types/connectors';
import { OptimisticConcurrency } from '../../../common/types/util_types';
import { setupConnectorsIndices } from '../../index_management/setup_indices';
import { isIndexNotFoundException } from '../../utils/identify_exceptions';
import { fetchAll } from '../fetch_all';
@ -34,9 +32,9 @@ export const fetchConnectorById = async (
: undefined;
} catch (error) {
if (isIndexNotFoundException(error)) {
await setupConnectorsIndices(client.asCurrentUser);
return undefined;
}
return undefined;
throw error;
}
};
@ -57,9 +55,9 @@ export const fetchConnectorByIndexName = async (
return result;
} catch (error) {
if (isIndexNotFoundException(error)) {
await setupConnectorsIndices(client.asCurrentUser);
return undefined;
}
return undefined;
throw error;
}
};
@ -75,8 +73,8 @@ export const fetchConnectors = async (
return await fetchAll<Connector>(client, CONNECTORS_INDEX, query);
} catch (error) {
if (isIndexNotFoundException(error)) {
await setupConnectorsIndices(client.asCurrentUser);
return [];
}
return [];
throw error;
}
};

View file

@ -5,14 +5,8 @@
* 2.0.
*/
import { setupConnectorsIndices } from '../../index_management/setup_indices';
import { fetchSyncJobsByConnectorId } from './fetch_sync_jobs';
jest.mock('../../index_management/setup_indices', () => ({
setupConnectorsIndices: jest.fn(),
}));
describe('fetchSyncJobs lib', () => {
const mockClient = {
asCurrentUser: {
@ -71,7 +65,7 @@ describe('fetchSyncJobs lib', () => {
});
expect(mockClient.asCurrentUser.search).not.toHaveBeenCalled();
});
it('should call setup connectors on index not found error', async () => {
it('should return empty array on index not found error', async () => {
mockClient.asCurrentUser.search.mockImplementationOnce(() =>
Promise.reject({
meta: {
@ -109,9 +103,8 @@ describe('fetchSyncJobs lib', () => {
},
},
});
expect(setupConnectorsIndices as jest.Mock).toHaveBeenCalledWith(mockClient.asCurrentUser);
});
it('should not call setup connectors on other errors', async () => {
it('should throw on other errors', async () => {
mockClient.asCurrentUser.search.mockImplementationOnce(() =>
Promise.reject({
meta: {
@ -147,7 +140,6 @@ describe('fetchSyncJobs lib', () => {
},
},
});
expect(setupConnectorsIndices as jest.Mock).not.toHaveBeenCalled();
});
});
});

View file

@ -12,7 +12,6 @@ import { ConnectorSyncJob, SyncJobType } from '../../../common/types/connectors'
import { Paginate } from '../../../common/types/pagination';
import { isNotNullish } from '../../../common/utils/is_not_nullish';
import { setupConnectorsIndices } from '../../index_management/setup_indices';
import { fetchWithPagination } from '../../utils/fetch_with_pagination';
import { isIndexNotFoundException } from '../../utils/identify_exceptions';
@ -82,10 +81,8 @@ export const fetchSyncJobsByConnectorId = async (
};
} catch (error) {
if (isIndexNotFoundException(error)) {
await setupConnectorsIndices(client.asCurrentUser);
return defaultResult;
} else {
throw error;
}
throw error;
}
};

View file

@ -7,7 +7,7 @@
import { IScopedClusterClient } from '@kbn/core/server';
import { CONNECTORS_INDEX, CONNECTORS_JOBS_INDEX } from '../..';
import { CONNECTORS_INDEX, CURRENT_CONNECTORS_JOB_INDEX } from '../..';
import { CONNECTORS_ACCESS_CONTROL_INDEX_PREFIX } from '../../../common/constants';
import { SyncJobType, SyncStatus, TriggerMethod } from '../../../common/types/connectors';
@ -95,7 +95,7 @@ describe('startSync lib function', () => {
trigger_method: TriggerMethod.ON_DEMAND,
worker_hostname: null,
},
index: CONNECTORS_JOBS_INDEX,
index: CURRENT_CONNECTORS_JOB_INDEX,
});
});
it('should start a full sync with service type, pipeline and nextSyncConfig', async () => {
@ -164,7 +164,7 @@ describe('startSync lib function', () => {
trigger_method: TriggerMethod.ON_DEMAND,
worker_hostname: null,
},
index: CONNECTORS_JOBS_INDEX,
index: CURRENT_CONNECTORS_JOB_INDEX,
});
});
@ -299,7 +299,7 @@ describe('startSync lib function', () => {
trigger_method: TriggerMethod.ON_DEMAND,
worker_hostname: null,
},
index: CONNECTORS_JOBS_INDEX,
index: CURRENT_CONNECTORS_JOB_INDEX,
});
});
@ -366,7 +366,7 @@ describe('startSync lib function', () => {
trigger_method: TriggerMethod.ON_DEMAND,
worker_hostname: null,
},
index: CONNECTORS_JOBS_INDEX,
index: CURRENT_CONNECTORS_JOB_INDEX,
});
});
});

View file

@ -7,7 +7,7 @@
import { IScopedClusterClient } from '@kbn/core/server';
import { CONNECTORS_INDEX, CONNECTORS_JOBS_INDEX } from '../..';
import { CONNECTORS_INDEX, CURRENT_CONNECTORS_JOB_INDEX } from '../..';
import { isConfigEntry } from '../../../common/connectors/is_category_entry';
import {
@ -99,7 +99,7 @@ export const startConnectorSync = async (
trigger_method: TriggerMethod.ON_DEMAND,
worker_hostname: null,
},
index: CONNECTORS_JOBS_INDEX,
index: CURRENT_CONNECTORS_JOB_INDEX,
});
} else {
throw new Error(ErrorCode.RESOURCE_NOT_FOUND);

View file

@ -12,6 +12,7 @@ import { CONNECTORS_JOBS_INDEX } from '../..';
import { ENTERPRISE_SEARCH_CONNECTOR_CRAWLER_SERVICE_TYPE } from '../../../common/constants';
import { ConnectorSyncJobDocument, SyncStatus } from '../../../common/types/connectors';
import { ElasticsearchIndexWithIngestion } from '../../../common/types/indices';
import { isIndexNotFoundException } from '../../utils/identify_exceptions';
import { fetchConnectorByIndexName } from '../connectors/fetch_connectors';
import { fetchCrawlerByIndexName } from '../crawler/fetch_crawlers';
@ -21,29 +22,36 @@ const hasInProgressSyncs = async (
client: IScopedClusterClient,
connectorId: string
): Promise<{ inProgress: boolean; pending: boolean }> => {
const syncs = await client.asCurrentUser.search<ConnectorSyncJobDocument>({
index: CONNECTORS_JOBS_INDEX,
query: {
bool: {
filter: [
{ term: { 'connector.id': connectorId } },
{
dis_max: {
queries: [
{ term: { status: SyncStatus.IN_PROGRESS } },
{ term: { status: SyncStatus.PENDING } },
],
try {
const syncs = await client.asCurrentUser.search<ConnectorSyncJobDocument>({
index: CONNECTORS_JOBS_INDEX,
query: {
bool: {
filter: [
{ term: { 'connector.id': connectorId } },
{
dis_max: {
queries: [
{ term: { status: SyncStatus.IN_PROGRESS } },
{ term: { status: SyncStatus.PENDING } },
],
},
},
},
],
],
},
},
},
});
const inProgress = syncs.hits.hits.some(
(sync) => sync._source?.status === SyncStatus.IN_PROGRESS
);
const pending = syncs.hits.hits.some((sync) => sync._source?.status === SyncStatus.PENDING);
return { inProgress, pending };
});
const inProgress = syncs.hits.hits.some(
(sync) => sync._source?.status === SyncStatus.IN_PROGRESS
);
const pending = syncs.hits.hits.some((sync) => sync._source?.status === SyncStatus.PENDING);
return { inProgress, pending };
} catch (error) {
if (isIndexNotFoundException(error)) {
return { inProgress: false, pending: false };
}
throw error;
}
};
export const fetchIndex = async (

View file

@ -10,24 +10,32 @@ import { IScopedClusterClient } from '@kbn/core/server';
import { CURRENT_CONNECTORS_INDEX } from '../..';
import { DEFAULT_PIPELINE_VALUES } from '../../../common/constants';
import { DefaultConnectorsPipelineMeta } from '../../../common/constants';
import { IngestPipelineParams } from '../../../common/types/connectors';
import { DefaultConnectorsPipelineMeta } from '../../index_management/setup_indices';
import { isIndexNotFoundException } from '../../utils/identify_exceptions';
export const getDefaultPipeline = async (
client: IScopedClusterClient
): Promise<IngestPipelineParams> => {
const mapping = await client.asCurrentUser.indices.getMapping({
index: CURRENT_CONNECTORS_INDEX,
});
const meta: DefaultConnectorsPipelineMeta | undefined =
mapping[CURRENT_CONNECTORS_INDEX]?.mappings._meta?.pipeline;
const mappedMapping: IngestPipelineParams = meta
? {
extract_binary_content: meta.default_extract_binary_content,
name: meta.default_name,
reduce_whitespace: meta.default_reduce_whitespace,
run_ml_inference: meta.default_run_ml_inference,
}
: DEFAULT_PIPELINE_VALUES;
return mappedMapping;
try {
const mapping = await client.asCurrentUser.indices.getMapping({
index: CURRENT_CONNECTORS_INDEX,
});
const meta: DefaultConnectorsPipelineMeta | undefined =
mapping[CURRENT_CONNECTORS_INDEX]?.mappings._meta?.pipeline;
const mappedMapping: IngestPipelineParams = meta
? {
extract_binary_content: meta.default_extract_binary_content,
name: meta.default_name,
reduce_whitespace: meta.default_reduce_whitespace,
run_ml_inference: meta.default_run_ml_inference,
}
: DEFAULT_PIPELINE_VALUES;
return mappedMapping;
} catch (error) {
if (isIndexNotFoundException(error)) {
return DEFAULT_PIPELINE_VALUES;
}
throw error;
}
};

View file

@ -9,12 +9,8 @@ import { IScopedClusterClient } from '@kbn/core/server';
import { CURRENT_CONNECTORS_INDEX } from '../..';
import { DefaultConnectorsPipelineMeta } from '../../../common/constants';
import { IngestPipelineParams } from '../../../common/types/connectors';
import {
DefaultConnectorsPipelineMeta,
setupConnectorsIndices,
} from '../../index_management/setup_indices';
import { isIndexNotFoundException } from '../../utils/identify_exceptions';
export const updateDefaultPipeline = async (
client: IScopedClusterClient,
@ -35,8 +31,7 @@ export const updateDefaultPipeline = async (
index: CURRENT_CONNECTORS_INDEX,
});
} catch (error) {
if (isIndexNotFoundException(error)) {
setupConnectorsIndices(client.asCurrentUser);
}
// TODO: Throw error saying you have to index a connector first
throw error;
}
};

View file

@ -13,125 +13,140 @@ import { CONNECTORS_INDEX, CONNECTORS_JOBS_INDEX } from '../..';
import { SyncJobsStats } from '../../../common/stats';
import { ConnectorStatus, SyncStatus } from '../../../common/types/connectors';
import { isIndexNotFoundException } from '../../utils/identify_exceptions';
export const fetchSyncJobsStats = async (client: IScopedClusterClient): Promise<SyncJobsStats> => {
const connectorIdsResult = await client.asCurrentUser.search({
index: CONNECTORS_INDEX,
scroll: '10s',
stored_fields: [],
});
const ids = connectorIdsResult.hits.hits.map((hit) => hit._id);
const orphanedJobsCountResponse = await client.asCurrentUser.count({
index: CONNECTORS_JOBS_INDEX,
query: {
bool: {
must_not: [
{
terms: {
'connector.id': ids,
},
},
],
},
},
});
const inProgressJobsCountResponse = await client.asCurrentUser.count({
index: CONNECTORS_JOBS_INDEX,
query: {
term: {
status: SyncStatus.IN_PROGRESS,
},
},
});
const idleJobsCountResponse = await client.asCurrentUser.count({
index: CONNECTORS_JOBS_INDEX,
query: {
bool: {
filter: [
{
term: {
status: SyncStatus.IN_PROGRESS,
},
},
{
range: {
last_seen: {
lt: moment().subtract(1, 'minute').toISOString(),
try {
const connectorIdsResult = await client.asCurrentUser.search({
index: CONNECTORS_INDEX,
scroll: '10s',
stored_fields: [],
});
const ids = connectorIdsResult.hits.hits.map((hit) => hit._id);
const orphanedJobsCountResponse = await client.asCurrentUser.count({
index: CONNECTORS_JOBS_INDEX,
query: {
bool: {
must_not: [
{
terms: {
'connector.id': ids,
},
},
},
],
],
},
},
},
});
});
const errorResponse = await client.asCurrentUser.count({
index: CONNECTORS_INDEX,
query: {
term: {
last_sync_status: SyncStatus.ERROR,
const inProgressJobsCountResponse = await client.asCurrentUser.count({
index: CONNECTORS_JOBS_INDEX,
query: {
term: {
status: SyncStatus.IN_PROGRESS,
},
},
},
});
});
const connectedResponse = await client.asCurrentUser.count({
index: CONNECTORS_INDEX,
query: {
bool: {
filter: [
{
term: {
status: ConnectorStatus.CONNECTED,
},
},
{
range: {
last_seen: {
gte: moment().subtract(30, 'minutes').toISOString(),
const idleJobsCountResponse = await client.asCurrentUser.count({
index: CONNECTORS_JOBS_INDEX,
query: {
bool: {
filter: [
{
term: {
status: SyncStatus.IN_PROGRESS,
},
},
},
],
},
},
});
const incompleteResponse = await client.asCurrentUser.count({
index: CONNECTORS_INDEX,
query: {
bool: {
should: [
{
bool: {
must_not: {
terms: {
status: [ConnectorStatus.CONNECTED, ConnectorStatus.ERROR],
{
range: {
last_seen: {
lt: moment().subtract(1, 'minute').toISOString(),
},
},
},
},
{
range: {
last_seen: {
lt: moment().subtract(30, 'minutes').toISOString(),
],
},
},
});
const errorResponse = await client.asCurrentUser.count({
index: CONNECTORS_INDEX,
query: {
term: {
last_sync_status: SyncStatus.ERROR,
},
},
});
const connectedResponse = await client.asCurrentUser.count({
index: CONNECTORS_INDEX,
query: {
bool: {
filter: [
{
term: {
status: ConnectorStatus.CONNECTED,
},
},
},
],
{
range: {
last_seen: {
gte: moment().subtract(30, 'minutes').toISOString(),
},
},
},
],
},
},
},
});
});
const response = {
connected: connectedResponse.count,
errors: errorResponse.count,
idle: idleJobsCountResponse.count,
in_progress: inProgressJobsCountResponse.count,
incomplete: incompleteResponse.count,
orphaned_jobs: orphanedJobsCountResponse.count,
};
const incompleteResponse = await client.asCurrentUser.count({
index: CONNECTORS_INDEX,
query: {
bool: {
should: [
{
bool: {
must_not: {
terms: {
status: [ConnectorStatus.CONNECTED, ConnectorStatus.ERROR],
},
},
},
},
{
range: {
last_seen: {
lt: moment().subtract(30, 'minutes').toISOString(),
},
},
},
],
},
},
});
return response;
const response = {
connected: connectedResponse.count,
errors: errorResponse.count,
idle: idleJobsCountResponse.count,
in_progress: inProgressJobsCountResponse.count,
incomplete: incompleteResponse.count,
orphaned_jobs: orphanedJobsCountResponse.count,
};
return response;
} catch (error) {
if (isIndexNotFoundException(error)) {
return {
connected: 0,
errors: 0,
idle: 0,
in_progress: 0,
incomplete: 0,
orphaned_jobs: 0,
};
}
throw error;
}
};