[Enterprise Search] Fix pending syncs logic (#159024)

## Summary

This fixes the way we handle pending syncs, now retrieved by looking at
the actually scheduled sync jobs instead of the no-longer-used
`sync_now` flag.
This commit is contained in:
Sander Philipse 2023-06-05 16:33:07 +02:00 committed by GitHub
parent 8f0beaeb7a
commit bcf431b4d1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 70 additions and 53 deletions

View file

@ -21,6 +21,8 @@ export interface AlwaysShowPattern {
}
export interface ElasticsearchIndex {
count: number; // Elasticsearch _count
has_in_progress_syncs?: boolean; // these default to false if not a connector or crawler
has_pending_syncs?: boolean;
health?: HealthStatus;
hidden: boolean;
name: IndexName;

View file

@ -56,10 +56,10 @@ const DEFAULT_VALUES = {
isSyncing: false,
isWaitingForSync: false,
lastUpdated: null,
localSyncNowValue: false,
pipelineData: undefined,
recheckIndexLoading: false,
syncStatus: null,
syncTriggeredLocally: false,
};
const CONNECTOR_VALUES = {
@ -100,32 +100,7 @@ describe('IndexViewLogic', () => {
it('should update values', () => {
CachedFetchIndexApiLogic.actions.apiSuccess({
...CONNECTOR_VALUES.index,
connector: { ...connectorIndex.connector!, sync_now: true },
});
expect(IndexViewLogic.values.connector).toEqual({
...connectorIndex.connector,
sync_now: true,
});
expect(IndexViewLogic.values.fetchIndexApiData).toEqual({
...CONNECTOR_VALUES.index,
connector: { ...connectorIndex.connector, sync_now: true },
});
expect(IndexViewLogic.values.fetchIndexApiData).toEqual({
...CONNECTOR_VALUES.index,
connector: { ...connectorIndex.connector, sync_now: true },
});
expect(IndexViewLogic.values.index).toEqual({
...CONNECTOR_VALUES.index,
connector: { ...connectorIndex.connector, sync_now: true },
});
expect(IndexViewLogic.values.indexData).toEqual({
...CONNECTOR_VALUES.index,
connector: { ...connectorIndex.connector, sync_now: true },
has_pending_syncs: true,
});
expect(IndexViewLogic.values).toEqual(
@ -136,7 +111,6 @@ describe('IndexViewLogic', () => {
isConnectorIndex: true,
isWaitingForSync: true,
lastUpdated: CONNECTOR_VALUES.lastUpdated,
localSyncNowValue: true,
pipelineData: undefined,
syncStatus: SyncStatus.COMPLETED,
})
@ -200,12 +174,14 @@ describe('IndexViewLogic', () => {
describe('StartSyncApiLogic.apiSuccess', () => {
it('should set localSyncNow to true', async () => {
mount({
localSyncNowValue: false,
});
StartSyncApiLogic.actions.apiSuccess({});
expect(IndexViewLogic.values.localSyncNowValue).toEqual(true);
expect(IndexViewLogic.values).toEqual(
expect.objectContaining({
...DEFAULT_VALUES,
isWaitingForSync: true,
syncTriggeredLocally: true,
})
);
});
});
});

View file

@ -82,11 +82,11 @@ export interface IndexViewValues {
isSyncing: boolean;
isWaitingForSync: boolean;
lastUpdated: string | null;
localSyncNowValue: boolean; // holds local value after update so UI updates correctly
pipelineData: IngestPipelineParams | undefined;
recheckIndexLoading: boolean;
resetFetchIndexLoading: boolean;
syncStatus: SyncStatus | null;
syncTriggeredLocally: boolean; // holds local value after update so UI updates correctly
}
export const IndexViewLogic = kea<MakeLogicType<IndexViewValues, IndexViewActions>>({
@ -160,14 +160,6 @@ export const IndexViewLogic = kea<MakeLogicType<IndexViewValues, IndexViewAction
}),
path: ['enterprise_search', 'content', 'index_view_logic'],
reducers: {
localSyncNowValue: [
false,
{
fetchIndexApiSuccess: (_, index) =>
isConnectorIndex(index) ? index.connector.sync_now : false,
startSyncApiSuccess: () => true,
},
],
recheckIndexLoading: [
false,
{
@ -175,6 +167,13 @@ export const IndexViewLogic = kea<MakeLogicType<IndexViewValues, IndexViewAction
resetRecheckIndexLoading: () => false,
},
],
syncTriggeredLocally: [
false,
{
fetchIndexApiSuccess: () => false,
startSyncApiSuccess: () => true,
},
],
},
selectors: ({ selectors }) => ({
connector: [
@ -246,8 +245,9 @@ export const IndexViewLogic = kea<MakeLogicType<IndexViewValues, IndexViewAction
indexData?.has_in_progress_syncs || syncStatus === SyncStatus.IN_PROGRESS,
],
isWaitingForSync: [
() => [selectors.fetchIndexApiData, selectors.localSyncNowValue],
(data, localSyncNowValue) => data?.connector?.sync_now || localSyncNowValue,
() => [selectors.indexData, selectors.syncTriggeredLocally],
(indexData: FetchIndexApiResponse | null, syncTriggeredLocally: boolean) =>
indexData?.has_pending_syncs || syncTriggeredLocally || false,
],
lastUpdated: [() => [selectors.fetchIndexApiData], (data) => getLastUpdated(data)],
pipelineData: [

View file

@ -9,6 +9,7 @@ import { ByteSizeValue } from '@kbn/config-schema';
import { IScopedClusterClient } from '@kbn/core/server';
import { ENTERPRISE_SEARCH_CONNECTOR_CRAWLER_SERVICE_TYPE } from '../../../common/constants';
import { SyncStatus } from '../../../common/types/connectors';
import { fetchConnectorByIndexName } from '../connectors/fetch_connectors';
import { fetchCrawlerByIndexName } from '../crawler/fetch_crawlers';
@ -32,7 +33,14 @@ describe('fetchIndex lib function', () => {
get: jest.fn(),
stats: jest.fn(),
},
search: jest.fn(),
search: jest.fn().mockReturnValue({
hits: {
hits: [
{ _source: { status: SyncStatus.IN_PROGRESS } },
{ _source: { status: SyncStatus.PENDING } },
],
},
}),
},
asInternalUser: {},
};
@ -66,6 +74,7 @@ describe('fetchIndex lib function', () => {
aliases: [],
count: 100,
has_in_progress_syncs: false,
has_pending_syncs: false,
health: 'green',
hidden: false,
name: 'index_name',
@ -100,6 +109,14 @@ describe('fetchIndex lib function', () => {
});
it('should return data and stats for index and connector if connector is present', async () => {
mockClient.asCurrentUser.search.mockReturnValue({
hits: {
hits: [
{ _source: { status: SyncStatus.CANCELED } },
{ _source: { status: SyncStatus.PENDING } },
],
},
});
mockClient.asCurrentUser.indices.get.mockImplementation(() =>
Promise.resolve({
index_name: { aliases: [], data: 'full index' },
@ -118,7 +135,7 @@ describe('fetchIndex lib function', () => {
).resolves.toEqual({
...result,
connector: { doc: 'doc', service_type: 'some-service-type' },
has_in_progress_syncs: true,
has_pending_syncs: true,
});
});
@ -150,6 +167,14 @@ describe('fetchIndex lib function', () => {
it('should return data and stats for index and crawler if a crawler registered as a connector is present', async () => {
mockClient.asCurrentUser.count.mockReturnValue({ count: 0 });
mockClient.asCurrentUser.search.mockReturnValue({
hits: {
hits: [
{ _source: { status: SyncStatus.IN_PROGRESS } },
{ _source: { status: SyncStatus.COMPLETED } },
],
},
});
mockClient.asCurrentUser.indices.get.mockImplementation(() =>
Promise.resolve({
index_name: { aliases: [], data: 'full index' },
@ -175,6 +200,8 @@ describe('fetchIndex lib function', () => {
connector: { doc: 'doc', service_type: ENTERPRISE_SEARCH_CONNECTOR_CRAWLER_SERVICE_TYPE },
count: 0,
crawler: { id: '1234' },
has_in_progress_syncs: true,
has_pending_syncs: false,
});
});

View file

@ -10,7 +10,7 @@ import { IScopedClusterClient } from '@kbn/core/server';
import { CONNECTORS_JOBS_INDEX } from '../..';
import { ENTERPRISE_SEARCH_CONNECTOR_CRAWLER_SERVICE_TYPE } from '../../../common/constants';
import { SyncStatus } from '../../../common/types/connectors';
import { ConnectorSyncJobDocument, SyncStatus } from '../../../common/types/connectors';
import { ElasticsearchIndexWithIngestion } from '../../../common/types/indices';
import { fetchConnectorByIndexName } from '../connectors/fetch_connectors';
import { fetchCrawlerByIndexName } from '../crawler/fetch_crawlers';
@ -20,19 +20,30 @@ import { mapIndexStats } from './utils/map_index_stats';
const hasInProgressSyncs = async (
client: IScopedClusterClient,
connectorId: string
): Promise<boolean> => {
const inProgressCount = await client.asCurrentUser.count({
): Promise<{ inProgress: boolean; pending: boolean }> => {
const syncs = await client.asCurrentUser.search<ConnectorSyncJobDocument>({
index: CONNECTORS_JOBS_INDEX,
query: {
bool: {
filter: [
{ term: { 'connector.id': connectorId } },
{ term: { status: SyncStatus.IN_PROGRESS } },
{
dis_max: {
queries: [
{ term: { status: SyncStatus.IN_PROGRESS } },
{ term: { status: SyncStatus.PENDING } },
],
},
},
],
},
},
});
return inProgressCount.count > 0;
const inProgress = syncs.hits.hits.some(
(sync) => sync._source?.status === SyncStatus.IN_PROGRESS
);
const pending = syncs.hits.hits.some((sync) => sync._source?.status === SyncStatus.PENDING);
return { inProgress, pending };
};
export const fetchIndex = async (
@ -53,12 +64,13 @@ export const fetchIndex = async (
const connector = await fetchConnectorByIndexName(client, index);
const hasInProgressSyncsResult = connector
? await hasInProgressSyncs(client, connector.id)
: false;
: { inProgress: false, pending: false };
const indexResult = {
count,
...mapIndexStats(indexData, indexStats, index),
has_in_progress_syncs: hasInProgressSyncsResult,
has_in_progress_syncs: hasInProgressSyncsResult.inProgress,
has_pending_syncs: hasInProgressSyncsResult.pending,
};
if (connector && connector.service_type !== ENTERPRISE_SEARCH_CONNECTOR_CRAWLER_SERVICE_TYPE) {