[Enterprise Search] Start syncs with new sync jobs (#154082)

## Summary

This changes the way Kibana triggers connectors syncs by adding a sync
job instead of setting a flag on the connector document.
This commit is contained in:
Sander Philipse 2023-04-05 18:22:58 +02:00 committed by GitHub
parent aed01474b3
commit 8fa6d5d092
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 200 additions and 41 deletions

View file

@ -174,9 +174,9 @@ export interface ConnectorSyncJob {
filtering: FilteringRules | FilteringRules[] | null;
id: string;
index_name: string;
language: string;
language: string | null;
pipeline: IngestPipelineParams | null;
service_type: string;
service_type: string | null;
};
created_at: string;
deleted_document_count: number;
@ -184,12 +184,12 @@ export interface ConnectorSyncJob {
id: string;
indexed_document_count: number;
indexed_document_volume: number;
last_seen: string;
last_seen: string | null;
metadata: Record<string, unknown>;
started_at: string;
started_at: string | null;
status: SyncStatus;
trigger_method: TriggerMethod;
worker_hostname: string;
worker_hostname: string | null;
}
export type ConnectorSyncJobDocument = Omit<ConnectorSyncJob, 'id'>;

View file

@ -85,9 +85,9 @@ export const SyncJobFlyout: React.FC<SyncJobFlyoutProps> = ({ onClose, syncJob }
canceledAt={syncJob.canceled_at ?? ''}
cancelationRequestedAt={syncJob.cancelation_requested_at ?? ''}
syncRequestedAt={syncJob.created_at}
syncStarted={syncJob.started_at}
syncStarted={syncJob.started_at ?? ''}
completed={syncJob.completed_at ?? ''}
lastUpdated={syncJob.last_seen}
lastUpdated={syncJob.last_seen ?? ''}
triggerMethod={syncJob.trigger_method}
/>
</EuiFlexItem>

View file

@ -7,19 +7,19 @@
import { IScopedClusterClient } from '@kbn/core/server';
import { CONNECTORS_INDEX } from '../..';
import { CONNECTORS_INDEX, CONNECTORS_JOBS_INDEX } from '../..';
import { SyncStatus, TriggerMethod } from '../../../common/types/connectors';
import { ErrorCode } from '../../../common/types/error_codes';
import { startConnectorSync } from './start_sync';
describe('addConnector lib function', () => {
describe('startSync lib function', () => {
const mockClient = {
asCurrentUser: {
get: jest.fn(),
index: jest.fn(),
indices: {
refresh: jest.fn(),
},
update: jest.fn(),
},
asInternalUser: {},
};
@ -31,6 +31,7 @@ describe('addConnector lib function', () => {
it('should start a sync', async () => {
mockClient.asCurrentUser.get.mockImplementationOnce(() => {
return Promise.resolve({
_id: 'connectorId',
_source: {
api_key_id: null,
configuration: {},
@ -38,6 +39,7 @@ describe('addConnector lib function', () => {
custom_scheduling: {},
error: null,
index_name: 'index_name',
language: null,
last_seen: null,
last_sync_error: null,
last_sync_scheduled_at: null,
@ -58,27 +60,93 @@ describe('addConnector lib function', () => {
).resolves.toEqual({ _id: 'fakeId' });
expect(mockClient.asCurrentUser.index).toHaveBeenCalledWith({
document: {
api_key_id: null,
configuration: {},
created_at: null,
custom_scheduling: {},
cancelation_requested_at: null,
canceled_at: null,
completed_at: null,
connector: {
configuration: {},
filtering: null,
id: 'connectorId',
index_name: 'index_name',
language: null,
pipeline: null,
service_type: null,
},
created_at: expect.any(String),
deleted_document_count: 0,
error: null,
index_name: 'index_name',
indexed_document_count: 0,
indexed_document_volume: 0,
last_seen: null,
last_sync_error: null,
last_sync_scheduled_at: null,
last_sync_status: null,
last_synced: null,
scheduling: { enabled: true, interval: '1 2 3 4 5' },
service_type: null,
status: 'not connected',
sync_now: true,
metadata: {},
started_at: null,
status: SyncStatus.PENDING,
trigger_method: TriggerMethod.ON_DEMAND,
worker_hostname: null,
},
id: 'connectorId',
index: CONNECTORS_INDEX,
index: CONNECTORS_JOBS_INDEX,
});
expect(mockClient.asCurrentUser.indices.refresh).toHaveBeenCalledWith({
index: CONNECTORS_INDEX,
});
it('should start a sync with service type, pipeline and nextSyncConfig', async () => {
mockClient.asCurrentUser.get.mockImplementationOnce(() => {
return Promise.resolve({
_source: {
api_key_id: null,
configuration: { config: { label: 'label', value: 'haha' } },
created_at: null,
custom_scheduling: {},
error: null,
filtering: [{ active: 'filtering' }],
index_name: 'index_name',
language: 'nl',
last_seen: null,
last_sync_error: null,
last_sync_status: null,
last_synced: null,
pipeline: { name: 'pipeline' },
scheduling: { enabled: true, interval: '1 2 3 4 5' },
service_type: 'service_type',
status: 'not connected',
sync_now: false,
},
index: CONNECTORS_INDEX,
});
});
mockClient.asCurrentUser.index.mockImplementation(() => ({ _id: 'fakeId' }));
await expect(
startConnectorSync(mockClient as unknown as IScopedClusterClient, 'connectorId', 'syncConfig')
).resolves.toEqual({ _id: 'fakeId' });
expect(mockClient.asCurrentUser.index).toHaveBeenCalledWith({
document: {
cancelation_requested_at: null,
canceled_at: null,
completed_at: null,
connector: {
configuration: {
config: { label: 'label', value: 'haha' },
nextSyncConfig: { label: 'nextSyncConfig', value: 'syncConfig' },
},
filtering: 'filtering',
id: 'connectorId',
index_name: 'index_name',
language: 'nl',
pipeline: { name: 'pipeline' },
service_type: 'service_type',
},
created_at: expect.any(String),
deleted_document_count: 0,
error: null,
indexed_document_count: 0,
indexed_document_volume: 0,
last_seen: null,
metadata: {},
started_at: null,
status: SyncStatus.PENDING,
trigger_method: TriggerMethod.ON_DEMAND,
worker_hostname: null,
},
index: CONNECTORS_JOBS_INDEX,
});
});
@ -91,4 +159,52 @@ describe('addConnector lib function', () => {
).rejects.toEqual(new Error(ErrorCode.RESOURCE_NOT_FOUND));
expect(mockClient.asCurrentUser.index).not.toHaveBeenCalled();
});
it('should set sync_now for crawler and not index a sync job', async () => {
mockClient.asCurrentUser.get.mockImplementationOnce(() => {
return Promise.resolve({
_primary_term: 1,
_seq_no: 10,
_source: {
api_key_id: null,
configuration: { config: { label: 'label', value: 'haha' } },
created_at: null,
custom_scheduling: {},
error: null,
filtering: [{ active: 'filtering' }],
index_name: 'index_name',
language: 'nl',
last_seen: null,
last_sync_error: null,
last_sync_status: null,
last_synced: null,
pipeline: { name: 'pipeline' },
scheduling: { enabled: true, interval: '1 2 3 4 5' },
service_type: 'elastic-crawler',
status: 'not connected',
sync_now: false,
},
index: CONNECTORS_INDEX,
});
});
mockClient.asCurrentUser.update.mockImplementation(() => ({ _id: 'fakeId' }));
await expect(
startConnectorSync(mockClient as unknown as IScopedClusterClient, 'connectorId', 'syncConfig')
).resolves.toEqual({ _id: 'fakeId' });
expect(mockClient.asCurrentUser.index).not.toHaveBeenCalled();
expect(mockClient.asCurrentUser.update).toHaveBeenCalledWith({
doc: {
configuration: {
config: { label: 'label', value: 'haha' },
nextSyncConfig: { label: 'nextSyncConfig', value: 'syncConfig' },
},
sync_now: true,
},
id: 'connectorId',
if_primary_term: 1,
if_seq_no: 10,
index: CONNECTORS_INDEX,
});
});
});

View file

@ -7,9 +7,16 @@
import { IScopedClusterClient } from '@kbn/core/server';
import { CONNECTORS_INDEX } from '../..';
import { CONNECTORS_INDEX, CONNECTORS_JOBS_INDEX } from '../..';
import { ENTERPRISE_SEARCH_CONNECTOR_CRAWLER_SERVICE_TYPE } from '../../../common/constants';
import { ConnectorDocument } from '../../../common/types/connectors';
import {
ConnectorConfiguration,
ConnectorDocument,
ConnectorSyncJobDocument,
SyncStatus,
TriggerMethod,
} from '../../../common/types/connectors';
import { ErrorCode } from '../../../common/types/error_codes';
export const startConnectorSync = async (
@ -23,21 +30,57 @@ export const startConnectorSync = async (
});
const connector = connectorResult._source;
if (connector) {
if (nextSyncConfig) {
connector.configuration.nextSyncConfig = { label: 'nextSyncConfig', value: nextSyncConfig };
const configuration: ConnectorConfiguration = nextSyncConfig
? {
...connector.configuration,
nextSyncConfig: { label: 'nextSyncConfig', value: nextSyncConfig },
}
: connector.configuration;
const { filtering, index_name, language, pipeline, service_type } = connector;
const now = new Date().toISOString();
if (connector.service_type === ENTERPRISE_SEARCH_CONNECTOR_CRAWLER_SERVICE_TYPE) {
return await client.asCurrentUser.update({
doc: {
configuration,
sync_now: true,
},
id: connectorId,
if_primary_term: connectorResult._primary_term,
if_seq_no: connectorResult._seq_no,
index: CONNECTORS_INDEX,
});
}
const result = await client.asCurrentUser.index<ConnectorDocument>({
return await client.asCurrentUser.index<ConnectorSyncJobDocument>({
document: {
...connector,
sync_now: true,
cancelation_requested_at: null,
canceled_at: null,
completed_at: null,
connector: {
configuration,
filtering: filtering ? filtering[0]?.active ?? null : null,
id: connectorId,
index_name,
language,
pipeline: pipeline ?? null,
service_type,
},
created_at: now,
deleted_document_count: 0,
error: null,
indexed_document_count: 0,
indexed_document_volume: 0,
last_seen: null,
metadata: {},
started_at: null,
status: SyncStatus.PENDING,
trigger_method: TriggerMethod.ON_DEMAND,
worker_hostname: null,
},
id: connectorId,
index: CONNECTORS_INDEX,
index: CONNECTORS_JOBS_INDEX,
});
await client.asCurrentUser.indices.refresh({ index: CONNECTORS_INDEX });
return result;
} else {
throw new Error(ErrorCode.RESOURCE_NOT_FOUND);
}