[Connectors] Use API to start on-demand connector sync (#184411)

This commit is contained in:
Jedr Blaszyk 2024-06-03 16:18:58 +02:00 committed by GitHub
parent 9abe56f1c6
commit 03a5ea5629
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 189 additions and 554 deletions

View file

@ -8,17 +8,17 @@
import { ElasticsearchClient } from '@kbn/core/server';
import { CONNECTORS_INDEX, CURRENT_CONNECTORS_JOB_INDEX } from '..';
import { SyncJobType, SyncStatus, TriggerMethod } from '../types/connectors';
import { CONNECTORS_ACCESS_CONTROL_INDEX_PREFIX } from '..';
import { errors } from '@elastic/elasticsearch';
import { SyncJobType } from '../types/connectors';
import { startConnectorSync } from './start_sync';
describe('startSync lib function', () => {
const mockClient = {
get: jest.fn(),
index: jest.fn(),
update: jest.fn(),
transport: {
request: jest.fn(),
},
};
beforeEach(() => {
@ -26,217 +26,79 @@ describe('startSync lib function', () => {
});
it('should start a full sync', async () => {
mockClient.get.mockImplementation(() => {
return Promise.resolve({
_id: 'connectorId',
_source: {
api_key_id: null,
configuration: {},
created_at: null,
custom_scheduling: {},
error: null,
index_name: 'index_name',
language: null,
last_access_control_sync_error: null,
last_access_control_sync_scheduled_at: null,
last_access_control_sync_status: null,
last_seen: null,
last_sync_error: null,
last_sync_scheduled_at: null,
last_sync_status: null,
last_synced: null,
scheduling: { enabled: true, interval: '1 2 3 4 5' },
service_type: null,
status: 'not connected',
sync_now: false,
},
index: CONNECTORS_INDEX,
});
});
mockClient.index.mockImplementation(() => ({ _id: 'fakeId' }));
mockClient.transport.request.mockImplementation(() => ({ id: '12345' }));
await expect(
startConnectorSync(mockClient as unknown as ElasticsearchClient, {
connectorId: 'connectorId',
jobType: SyncJobType.FULL,
})
).resolves.toEqual({ _id: 'fakeId' });
expect(mockClient.index).toHaveBeenCalledWith({
document: {
cancelation_requested_at: null,
canceled_at: null,
completed_at: null,
connector: {
configuration: {},
filtering: null,
id: 'connectorId',
index_name: 'index_name',
language: null,
pipeline: null,
service_type: null,
},
created_at: expect.any(String),
deleted_document_count: 0,
error: null,
indexed_document_count: 0,
indexed_document_volume: 0,
job_type: SyncJobType.FULL,
last_seen: null,
metadata: {},
started_at: null,
status: SyncStatus.PENDING,
total_document_count: null,
trigger_method: TriggerMethod.ON_DEMAND,
worker_hostname: null,
).resolves.toEqual({ id: '12345' });
expect(mockClient.transport.request).toHaveBeenCalledWith({
method: 'POST',
path: '/_connector/_sync_job',
body: {
id: 'connectorId',
job_type: 'full',
},
index: CURRENT_CONNECTORS_JOB_INDEX,
});
});
it('should not create index if there is no connector', async () => {
mockClient.get.mockImplementation(() => {
return Promise.resolve({});
});
await expect(
startConnectorSync(mockClient as unknown as ElasticsearchClient, {
connectorId: 'connectorId',
jobType: SyncJobType.FULL,
})
).rejects.toEqual(new Error('resource_not_found'));
expect(mockClient.index).not.toHaveBeenCalled();
});
it('should start an incremental sync', async () => {
mockClient.get.mockImplementation(() => {
return Promise.resolve({
_id: 'connectorId',
_source: {
api_key_id: null,
configuration: {},
created_at: null,
custom_scheduling: {},
error: null,
filtering: [],
index_name: 'index_name',
language: null,
last_access_control_sync_status: null,
last_seen: null,
last_sync_error: null,
last_sync_scheduled_at: null,
last_sync_status: null,
last_synced: null,
scheduling: { enabled: true, interval: '1 2 3 4 5' },
service_type: null,
status: 'not connected',
sync_now: false,
},
index: CONNECTORS_INDEX,
});
});
mockClient.index.mockImplementation(() => ({ _id: 'fakeId' }));
mockClient.transport.request.mockImplementation(() => ({ id: '12345' }));
await expect(
startConnectorSync(mockClient as unknown as ElasticsearchClient, {
connectorId: 'connectorId',
jobType: SyncJobType.INCREMENTAL,
})
).resolves.toEqual({ _id: 'fakeId' });
expect(mockClient.index).toHaveBeenCalledWith({
document: {
cancelation_requested_at: null,
canceled_at: null,
completed_at: null,
connector: {
configuration: {},
filtering: null,
id: 'connectorId',
index_name: 'index_name',
language: null,
pipeline: null,
service_type: null,
},
created_at: expect.any(String),
deleted_document_count: 0,
error: null,
indexed_document_count: 0,
indexed_document_volume: 0,
job_type: SyncJobType.INCREMENTAL,
last_seen: null,
metadata: {},
started_at: null,
status: SyncStatus.PENDING,
total_document_count: null,
trigger_method: TriggerMethod.ON_DEMAND,
worker_hostname: null,
).resolves.toEqual({ id: '12345' });
expect(mockClient.transport.request).toHaveBeenCalledWith({
method: 'POST',
path: '/_connector/_sync_job',
body: {
id: 'connectorId',
job_type: 'incremental',
},
index: CURRENT_CONNECTORS_JOB_INDEX,
});
});
it('should start an access control sync', async () => {
mockClient.get.mockImplementation(() => {
return Promise.resolve({
_id: 'connectorId',
_source: {
api_key_id: null,
configuration: {},
created_at: null,
custom_scheduling: {},
error: null,
index_name: 'search-index_name',
language: null,
last_access_control_sync_status: null,
last_seen: null,
last_sync_error: null,
last_sync_scheduled_at: null,
last_sync_status: null,
last_synced: null,
scheduling: { enabled: true, interval: '1 2 3 4 5' },
service_type: null,
status: 'not connected',
sync_now: false,
},
index: CONNECTORS_INDEX,
});
});
mockClient.index.mockImplementation(() => ({ _id: 'fakeId' }));
it('should start an access_control sync', async () => {
mockClient.transport.request.mockImplementation(() => ({ id: '12345' }));
await expect(
startConnectorSync(mockClient as unknown as ElasticsearchClient, {
connectorId: 'connectorId',
targetIndexName: '.search-acl-filter-index_name',
jobType: SyncJobType.ACCESS_CONTROL,
})
).resolves.toEqual({ _id: 'fakeId' });
expect(mockClient.index).toHaveBeenCalledWith({
document: {
cancelation_requested_at: null,
canceled_at: null,
completed_at: null,
connector: {
configuration: {},
filtering: null,
id: 'connectorId',
index_name: `${CONNECTORS_ACCESS_CONTROL_INDEX_PREFIX}index_name`,
language: null,
pipeline: null,
service_type: null,
},
created_at: expect.any(String),
deleted_document_count: 0,
error: null,
indexed_document_count: 0,
indexed_document_volume: 0,
job_type: SyncJobType.ACCESS_CONTROL,
last_seen: null,
metadata: {},
started_at: null,
status: SyncStatus.PENDING,
total_document_count: null,
trigger_method: TriggerMethod.ON_DEMAND,
worker_hostname: null,
).resolves.toEqual({ id: '12345' });
expect(mockClient.transport.request).toHaveBeenCalledWith({
method: 'POST',
path: '/_connector/_sync_job',
body: {
id: 'connectorId',
job_type: 'access_control',
},
index: CURRENT_CONNECTORS_JOB_INDEX,
});
});
it('sync not started if there is no connector', async () => {
const notFoundError = new errors.ResponseError({
statusCode: 404,
body: {
error: {
type: `document_missing_exception`,
},
},
} as any);
mockClient.transport.request.mockRejectedValueOnce(notFoundError);
await expect(
startConnectorSync(mockClient as unknown as ElasticsearchClient, {
connectorId: 'connectorId',
jobType: SyncJobType.FULL,
})
).rejects.toEqual(notFoundError);
});
});

View file

@ -8,84 +8,24 @@
import { ElasticsearchClient } from '@kbn/core/server';
import { CONNECTORS_INDEX, CURRENT_CONNECTORS_JOB_INDEX } from '..';
import {
ConnectorConfiguration,
ConnectorDocument,
SyncJobType,
SyncStatus,
TriggerMethod,
} from '../types/connectors';
import { isConfigEntry } from '../utils/is_category_entry';
import { SyncJobType } from '../types/connectors';
export const startConnectorSync = async (
client: ElasticsearchClient,
{
connectorId,
jobType,
targetIndexName,
}: {
connectorId: string;
jobType?: SyncJobType;
targetIndexName?: string;
}
) => {
const connectorResult = await client.get<ConnectorDocument>({
id: connectorId,
index: CONNECTORS_INDEX,
return await client.transport.request<{ id: string }>({
method: 'POST',
path: `/_connector/_sync_job`,
body: {
id: connectorId,
job_type: jobType,
},
});
const connector = connectorResult._source;
if (connector) {
const configuration = Object.entries(connector.configuration).reduce(
(acc, [key, configEntry]) => {
if (isConfigEntry(configEntry)) {
acc[key] = configEntry;
}
return acc;
},
{} as ConnectorConfiguration
);
const {
filtering,
index_name: connectorIndexName,
language,
pipeline,
service_type: serviceType,
} = connector;
const now = new Date().toISOString();
return await client.index({
document: {
cancelation_requested_at: null,
canceled_at: null,
completed_at: null,
connector: {
configuration,
filtering: filtering ? filtering[0]?.active ?? null : null,
id: connectorId,
index_name: targetIndexName || connectorIndexName,
language,
pipeline: pipeline ?? null,
service_type: serviceType,
},
created_at: now,
deleted_document_count: 0,
error: null,
indexed_document_count: 0,
indexed_document_volume: 0,
job_type: jobType || SyncJobType.FULL,
last_seen: null,
metadata: {},
started_at: null,
status: SyncStatus.PENDING,
total_document_count: null,
trigger_method: TriggerMethod.ON_DEMAND,
worker_hostname: null,
},
index: CURRENT_CONNECTORS_JOB_INDEX,
});
} else {
throw new Error('resource_not_found');
}
};

View file

@ -6,20 +6,23 @@
*/
import { IScopedClusterClient } from '@kbn/core/server';
import {
CONNECTORS_INDEX,
SyncJobType,
SyncStatus,
TriggerMethod,
CURRENT_CONNECTORS_JOB_INDEX,
} from '@kbn/search-connectors';
import { CONNECTORS_INDEX, SyncJobType } from '@kbn/search-connectors';
import { CONNECTORS_ACCESS_CONTROL_INDEX_PREFIX } from '../../../common/constants';
import { fetchConnectorById, startConnectorSync } from '@kbn/search-connectors';
import { ErrorCode } from '../../../common/types/error_codes';
import { startSync } from './start_sync';
jest.mock('@kbn/search-connectors', () => {
const originalModule = jest.requireActual('@kbn/search-connectors');
return {
...originalModule,
fetchConnectorById: jest.fn(),
startConnectorSync: jest.fn(),
};
});
describe('startSync lib function', () => {
const mockClient = {
asCurrentUser: {
@ -28,6 +31,9 @@ describe('startSync lib function', () => {
update: jest.fn(),
},
asInternalUser: {},
transport: {
request: jest.fn(),
},
};
beforeEach(() => {
@ -35,170 +41,71 @@ describe('startSync lib function', () => {
});
it('should start a full sync', async () => {
mockClient.asCurrentUser.get.mockImplementation(() => {
return Promise.resolve({
_id: 'connectorId',
_source: {
api_key_id: null,
configuration: {},
created_at: null,
custom_scheduling: {},
error: null,
index_name: 'index_name',
language: null,
last_access_control_sync_error: null,
last_access_control_sync_scheduled_at: null,
last_access_control_sync_status: null,
last_seen: null,
last_sync_error: null,
last_sync_scheduled_at: null,
last_sync_status: null,
last_synced: null,
scheduling: { enabled: true, interval: '1 2 3 4 5' },
service_type: null,
status: 'not connected',
sync_now: false,
},
index: CONNECTORS_INDEX,
});
(fetchConnectorById as jest.Mock).mockResolvedValue({
api_key_id: null,
configuration: {},
created_at: null,
custom_scheduling: {},
error: null,
id: 'connectorId',
index_name: 'index_name',
language: null,
last_access_control_sync_error: null,
last_access_control_sync_scheduled_at: null,
last_access_control_sync_status: null,
last_seen: null,
last_sync_error: null,
last_sync_scheduled_at: null,
last_sync_status: null,
last_synced: null,
scheduling: { enabled: true, interval: '1 2 3 4 5' },
service_type: null,
status: 'not connected',
sync_now: false,
});
mockClient.asCurrentUser.index.mockImplementation(() => ({ _id: 'fakeId' }));
(startConnectorSync as jest.Mock).mockResolvedValue({ id: 'fakeId' });
await expect(
startSync(mockClient as unknown as IScopedClusterClient, 'connectorId', SyncJobType.FULL)
).resolves.toEqual({ _id: 'fakeId' });
expect(mockClient.asCurrentUser.index).toHaveBeenCalledWith({
document: {
cancelation_requested_at: null,
canceled_at: null,
completed_at: null,
connector: {
configuration: {},
filtering: null,
id: 'connectorId',
index_name: 'index_name',
language: null,
pipeline: null,
service_type: null,
},
created_at: expect.any(String),
deleted_document_count: 0,
error: null,
indexed_document_count: 0,
indexed_document_volume: 0,
job_type: SyncJobType.FULL,
last_seen: null,
metadata: {},
started_at: null,
status: SyncStatus.PENDING,
total_document_count: null,
trigger_method: TriggerMethod.ON_DEMAND,
worker_hostname: null,
},
index: CURRENT_CONNECTORS_JOB_INDEX,
});
});
it('should start a full sync with service type, pipeline', async () => {
mockClient.asCurrentUser.get.mockImplementation(() => {
return Promise.resolve({
_source: {
api_key_id: null,
configuration: { config: { label: 'label', value: 'haha' } },
created_at: null,
custom_scheduling: {},
error: null,
filtering: [{ active: 'filtering' }],
index_name: 'index_name',
language: 'nl',
last_seen: null,
last_sync_error: null,
last_sync_status: null,
last_synced: null,
pipeline: { name: 'pipeline' },
scheduling: { enabled: true, interval: '1 2 3 4 5' },
service_type: 'service_type',
status: 'not connected',
sync_now: false,
},
index: CONNECTORS_INDEX,
});
});
mockClient.asCurrentUser.index.mockImplementation(() => ({ _id: 'fakeId' }));
).resolves.toEqual({ id: 'fakeId' });
await expect(
startSync(mockClient as unknown as IScopedClusterClient, 'connectorId', SyncJobType.FULL)
).resolves.toEqual({ _id: 'fakeId' });
expect(mockClient.asCurrentUser.index).toHaveBeenCalledWith({
document: {
cancelation_requested_at: null,
canceled_at: null,
completed_at: null,
connector: {
configuration: {
config: { label: 'label', value: 'haha' },
},
filtering: 'filtering',
id: 'connectorId',
index_name: 'index_name',
language: 'nl',
pipeline: { name: 'pipeline' },
service_type: 'service_type',
},
created_at: expect.any(String),
deleted_document_count: 0,
error: null,
indexed_document_count: 0,
indexed_document_volume: 0,
job_type: SyncJobType.FULL,
last_seen: null,
metadata: {},
started_at: null,
status: SyncStatus.PENDING,
total_document_count: null,
trigger_method: TriggerMethod.ON_DEMAND,
worker_hostname: null,
},
index: CURRENT_CONNECTORS_JOB_INDEX,
expect(startConnectorSync).toHaveBeenCalledWith(mockClient.asCurrentUser, {
connectorId: 'connectorId',
jobType: 'full',
});
});
it('should not create index if there is no connector', async () => {
mockClient.asCurrentUser.get.mockImplementation(() => {
return Promise.resolve({});
});
it('should not create job if there is no connector', async () => {
(fetchConnectorById as jest.Mock).mockResolvedValue(undefined);
await expect(
startSync(mockClient as unknown as IScopedClusterClient, 'connectorId', SyncJobType.FULL)
).rejects.toEqual(new Error(ErrorCode.RESOURCE_NOT_FOUND));
expect(mockClient.asCurrentUser.index).not.toHaveBeenCalled();
expect(startConnectorSync).not.toHaveBeenCalled();
});
it('should set sync_now for crawler and not index a sync job', async () => {
mockClient.asCurrentUser.get.mockImplementation(() => {
return Promise.resolve({
_primary_term: 1,
_seq_no: 10,
_source: {
api_key_id: null,
configuration: { config: { label: 'label', value: 'haha' } },
created_at: null,
custom_scheduling: {},
error: null,
filtering: [{ active: 'filtering' }],
index_name: 'index_name',
language: 'nl',
last_seen: null,
last_sync_error: null,
last_sync_status: null,
last_synced: null,
pipeline: { name: 'pipeline' },
scheduling: { enabled: true, interval: '1 2 3 4 5' },
service_type: 'elastic-crawler',
status: 'not connected',
sync_now: false,
},
index: CONNECTORS_INDEX,
});
(fetchConnectorById as jest.Mock).mockResolvedValue({
api_key_id: null,
configuration: { config: { label: 'label', value: 'haha' } },
created_at: null,
custom_scheduling: {},
error: null,
filtering: [{ active: 'filtering' }],
id: 'connectorId',
index_name: 'index_name',
language: 'nl',
last_seen: null,
last_sync_error: null,
last_sync_status: null,
last_synced: null,
pipeline: { name: 'pipeline' },
scheduling: { enabled: true, interval: '1 2 3 4 5' },
service_type: 'elastic-crawler',
status: 'not connected',
sync_now: false,
});
mockClient.asCurrentUser.update.mockImplementation(() => ({ _id: 'fakeId' }));
await expect(
@ -209,7 +116,7 @@ describe('startSync lib function', () => {
'syncConfig'
)
).resolves.toEqual({ _id: 'fakeId' });
expect(mockClient.asCurrentUser.index).not.toHaveBeenCalled();
expect(startConnectorSync).not.toHaveBeenCalled();
expect(mockClient.asCurrentUser.update).toHaveBeenCalledWith({
doc: {
configuration: {
@ -219,40 +126,35 @@ describe('startSync lib function', () => {
sync_now: true,
},
id: 'connectorId',
if_primary_term: 1,
if_seq_no: 10,
index: CONNECTORS_INDEX,
});
});
it('should start an incremental sync', async () => {
mockClient.asCurrentUser.get.mockImplementation(() => {
return Promise.resolve({
_id: 'connectorId',
_source: {
api_key_id: null,
configuration: {},
created_at: null,
custom_scheduling: {},
error: null,
filtering: [],
index_name: 'index_name',
language: null,
last_access_control_sync_status: null,
last_seen: null,
last_sync_error: null,
last_sync_scheduled_at: null,
last_sync_status: null,
last_synced: null,
scheduling: { enabled: true, interval: '1 2 3 4 5' },
service_type: null,
status: 'not connected',
sync_now: false,
},
index: CONNECTORS_INDEX,
});
(fetchConnectorById as jest.Mock).mockResolvedValue({
api_key_id: null,
configuration: {},
created_at: null,
custom_scheduling: {},
error: null,
id: 'connectorId',
index_name: 'index_name',
language: null,
last_access_control_sync_error: null,
last_access_control_sync_scheduled_at: null,
last_access_control_sync_status: null,
last_seen: null,
last_sync_error: null,
last_sync_scheduled_at: null,
last_sync_status: null,
last_synced: null,
scheduling: { enabled: true, interval: '1 2 3 4 5' },
service_type: null,
status: 'not connected',
sync_now: false,
});
mockClient.asCurrentUser.index.mockImplementation(() => ({ _id: 'fakeId' }));
(startConnectorSync as jest.Mock).mockResolvedValue({ id: 'fakeId' });
await expect(
startSync(
@ -260,70 +162,43 @@ describe('startSync lib function', () => {
'connectorId',
SyncJobType.INCREMENTAL
)
).resolves.toEqual({ _id: 'fakeId' });
expect(mockClient.asCurrentUser.index).toHaveBeenCalledWith({
document: {
cancelation_requested_at: null,
canceled_at: null,
completed_at: null,
connector: {
configuration: {},
filtering: null,
id: 'connectorId',
index_name: 'index_name',
language: null,
pipeline: null,
service_type: null,
},
created_at: expect.any(String),
deleted_document_count: 0,
error: null,
indexed_document_count: 0,
indexed_document_volume: 0,
job_type: SyncJobType.INCREMENTAL,
last_seen: null,
metadata: {},
started_at: null,
status: SyncStatus.PENDING,
total_document_count: null,
trigger_method: TriggerMethod.ON_DEMAND,
worker_hostname: null,
},
index: CURRENT_CONNECTORS_JOB_INDEX,
).resolves.toEqual({ id: 'fakeId' });
expect(startConnectorSync).toHaveBeenCalledWith(mockClient.asCurrentUser, {
connectorId: 'connectorId',
jobType: 'incremental',
});
});
it('should start an access control sync', async () => {
mockClient.asCurrentUser.get.mockImplementation(() => {
return Promise.resolve({
_id: 'connectorId',
_source: {
api_key_id: null,
configuration: {
use_document_level_security: {
value: true,
},
},
created_at: null,
custom_scheduling: {},
error: null,
index_name: 'search-index_name',
language: null,
last_access_control_sync_status: null,
last_seen: null,
last_sync_error: null,
last_sync_scheduled_at: null,
last_sync_status: null,
last_synced: null,
scheduling: { enabled: true, interval: '1 2 3 4 5' },
service_type: null,
status: 'not connected',
sync_now: false,
(fetchConnectorById as jest.Mock).mockResolvedValue({
api_key_id: null,
configuration: {
use_document_level_security: {
value: true,
},
index: CONNECTORS_INDEX,
});
},
created_at: null,
custom_scheduling: {},
error: null,
id: 'connectorId',
index_name: 'index_name',
language: null,
last_access_control_sync_error: null,
last_access_control_sync_scheduled_at: null,
last_access_control_sync_status: null,
last_seen: null,
last_sync_error: null,
last_sync_scheduled_at: null,
last_sync_status: null,
last_synced: null,
scheduling: { enabled: true, interval: '1 2 3 4 5' },
service_type: null,
status: 'not connected',
sync_now: false,
});
mockClient.asCurrentUser.index.mockImplementation(() => ({ _id: 'fakeId' }));
(startConnectorSync as jest.Mock).mockResolvedValue({ id: 'fakeId' });
await expect(
startSync(
@ -331,40 +206,11 @@ describe('startSync lib function', () => {
'connectorId',
SyncJobType.ACCESS_CONTROL
)
).resolves.toEqual({ _id: 'fakeId' });
expect(mockClient.asCurrentUser.index).toHaveBeenCalledWith({
document: {
cancelation_requested_at: null,
canceled_at: null,
completed_at: null,
connector: {
configuration: {
use_document_level_security: {
value: true,
},
},
filtering: null,
id: 'connectorId',
index_name: `${CONNECTORS_ACCESS_CONTROL_INDEX_PREFIX}search-index_name`,
language: null,
pipeline: null,
service_type: null,
},
created_at: expect.any(String),
deleted_document_count: 0,
error: null,
indexed_document_count: 0,
indexed_document_volume: 0,
job_type: SyncJobType.ACCESS_CONTROL,
last_seen: null,
metadata: {},
started_at: null,
status: SyncStatus.PENDING,
total_document_count: null,
trigger_method: TriggerMethod.ON_DEMAND,
worker_hostname: null,
},
index: CURRENT_CONNECTORS_JOB_INDEX,
).resolves.toEqual({ id: 'fakeId' });
expect(startConnectorSync).toHaveBeenCalledWith(mockClient.asCurrentUser, {
connectorId: 'connectorId',
jobType: 'access_control',
});
});
});

View file

@ -9,18 +9,15 @@ import { IScopedClusterClient } from '@kbn/core/server';
import {
ConnectorConfiguration,
ConnectorDocument,
SyncJobType,
CONNECTORS_INDEX,
startConnectorSync,
fetchConnectorById,
} from '@kbn/search-connectors';
import { isConfigEntry } from '../../../common/connectors/is_category_entry';
import {
CONNECTORS_ACCESS_CONTROL_INDEX_PREFIX,
ENTERPRISE_SEARCH_CONNECTOR_CRAWLER_SERVICE_TYPE,
} from '../../../common/constants';
import { ENTERPRISE_SEARCH_CONNECTOR_CRAWLER_SERVICE_TYPE } from '../../../common/constants';
import { ErrorCode } from '../../../common/types/error_codes';
@ -30,11 +27,8 @@ export const startSync = async (
jobType: SyncJobType,
nextSyncConfig?: string // only processed for elastic-crawler service types
) => {
const connectorResult = await client.asCurrentUser.get<ConnectorDocument>({
id: connectorId,
index: CONNECTORS_INDEX,
});
const connector = connectorResult._source;
const connector = await fetchConnectorById(client.asCurrentUser, connectorId);
if (connector) {
const config = Object.entries(connector.configuration).reduce((acc, [key, configEntry]) => {
if (isConfigEntry(configEntry)) {
@ -48,7 +42,7 @@ export const startSync = async (
nextSyncConfig: { label: 'nextSyncConfig', value: nextSyncConfig },
}
: config;
const { index_name } = connector;
if (
jobType === SyncJobType.ACCESS_CONTROL &&
!configuration.use_document_level_security?.value
@ -57,27 +51,20 @@ export const startSync = async (
}
if (connector.service_type === ENTERPRISE_SEARCH_CONNECTOR_CRAWLER_SERVICE_TYPE) {
// Crawler-specific actions are not migrated to Connector API
return await client.asCurrentUser.update({
doc: {
configuration,
sync_now: true,
},
id: connectorId,
if_primary_term: connectorResult._primary_term,
if_seq_no: connectorResult._seq_no,
index: CONNECTORS_INDEX,
});
}
const targetIndexName =
jobType === SyncJobType.ACCESS_CONTROL
? `${CONNECTORS_ACCESS_CONTROL_INDEX_PREFIX}${index_name}`
: index_name ?? undefined;
return await startConnectorSync(client.asCurrentUser, {
connectorId,
jobType,
targetIndexName,
});
} else {
throw new Error(ErrorCode.RESOURCE_NOT_FOUND);