[8.18] Use search after for finding gaps (#211040) (#213528)

# Backport

This will backport the following commits from `main` to `8.18`:
- [Use search after for finding gaps
(#211040)](https://github.com/elastic/kibana/pull/211040)

<!--- Backport version: 9.6.6 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sorenlouv/backport)

<!--BACKPORT [{"author":{"name":"Khristinin
Nikita","email":"nikita.khristinin@elastic.co"},"sourceCommit":{"committedDate":"2025-03-07T09:23:37Z","message":"Use
search after for finding gaps (#211040)\n\n## Use search after for
finding gaps\n\nIssue:
https://github.com/elastic/security-team/issues/11860\n\nTo be able
process more than 10.000 gaps per rule in one update cycle we\nneed to
implement search after loop for all gaps.\n\nFor the API I keep from and
size method, as it's much for client to use.\n\n<img width=\"1250\"
alt=\"Screenshot 2025-02-17 at 15 25
27\"\nsrc=\"https://github.com/user-attachments/assets/806b2245-8aad-4960-84f4-d2a2818a4a12\"\n/>\n\n---------\n\nCo-authored-by:
kibanamachine
<42973632+kibanamachine@users.noreply.github.com>\nCo-authored-by:
Elastic Machine
<elasticmachine@users.noreply.github.com>","sha":"b0ad5424b28f2667f05c7d2ac5560adc4d62af28","branchLabelMapping":{"^v9.1.0$":"main","^v8.19.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","backport:prev-minor","backport:version","v8.18.0","v9.1.0"],"title":"Use
search after for finding
gaps","number":211040,"url":"https://github.com/elastic/kibana/pull/211040","mergeCommit":{"message":"Use
search after for finding gaps (#211040)\n\n## Use search after for
finding gaps\n\nIssue:
https://github.com/elastic/security-team/issues/11860\n\nTo be able
process more than 10.000 gaps per rule in one update cycle we\nneed to
implement search after loop for all gaps.\n\nFor the API I keep from and
size method, as it's much for client to use.\n\n<img width=\"1250\"
alt=\"Screenshot 2025-02-17 at 15 25
27\"\nsrc=\"https://github.com/user-attachments/assets/806b2245-8aad-4960-84f4-d2a2818a4a12\"\n/>\n\n---------\n\nCo-authored-by:
kibanamachine
<42973632+kibanamachine@users.noreply.github.com>\nCo-authored-by:
Elastic Machine
<elasticmachine@users.noreply.github.com>","sha":"b0ad5424b28f2667f05c7d2ac5560adc4d62af28"}},"sourceBranch":"main","suggestedTargetBranches":["8.18"],"targetPullRequestStates":[{"branch":"8.18","label":"v8.18.0","branchLabelMappingKey":"^v(\\d+).(\\d+).\\d+$","isSourceBranch":false,"state":"NOT_CREATED"},{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/211040","number":211040,"mergeCommit":{"message":"Use
search after for finding gaps (#211040)\n\n## Use search after for
finding gaps\n\nIssue:
https://github.com/elastic/security-team/issues/11860\n\nTo be able
process more than 10.000 gaps per rule in one update cycle we\nneed to
implement search after loop for all gaps.\n\nFor the API I keep from and
size method, as it's much for client to use.\n\n<img width=\"1250\"
alt=\"Screenshot 2025-02-17 at 15 25
27\"\nsrc=\"https://github.com/user-attachments/assets/806b2245-8aad-4960-84f4-d2a2818a4a12\"\n/>\n\n---------\n\nCo-authored-by:
kibanamachine
<42973632+kibanamachine@users.noreply.github.com>\nCo-authored-by:
Elastic Machine
<elasticmachine@users.noreply.github.com>","sha":"b0ad5424b28f2667f05c7d2ac5560adc4d62af28"}}]}]
BACKPORT-->

---------

Co-authored-by: Khristinin Nikita <nikita.khristinin@elastic.co>
Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
Co-authored-by: Nikita Khristinin <nkhristinin@gmail.com>
This commit is contained in:
Kibana Machine 2025-03-11 23:59:35 +11:00 committed by GitHub
parent 194dd334fb
commit 92d3a3fa76
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 722 additions and 72 deletions

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { findGaps } from './find_gaps';
import { findGaps, findGapsSearchAfter } from './find_gaps';
import { gapStatus } from '../../../common/constants/gap_status';
import { loggerMock } from '@kbn/logging-mocks';
import { eventLogClientMock } from '@kbn/event-log-plugin/server/event_log_client.mock';
@ -155,3 +155,158 @@ describe('findGaps', () => {
);
});
});
describe('findGapsSearchAfter', () => {
const mockLogger = loggerMock.create();
const mockEventLogClient = eventLogClientMock.create();
beforeEach(() => {
jest.resetAllMocks();
});
it('should call findEventsBySavedObjectIdsSearchAfter with correct parameters', async () => {
mockEventLogClient.findEventsBySavedObjectIdsSearchAfter.mockResolvedValue({
total: 0,
data: [],
search_after: undefined,
pit_id: 'test-pit-id',
});
await findGapsSearchAfter({
eventLogClient: mockEventLogClient,
logger: mockLogger,
params: {
ruleId: 'test-rule',
start: '2024-01-01',
end: '2024-01-02',
perPage: 10,
statuses: [gapStatus.UNFILLED],
},
});
expect(mockEventLogClient.findEventsBySavedObjectIdsSearchAfter).toHaveBeenCalledWith(
'alert',
['test-rule'],
expect.objectContaining({
filter: expect.stringContaining('event.action: gap'),
sort: [{ sort_field: '@timestamp', sort_order: 'desc' }],
per_page: 10,
})
);
});
it('should use provided PIT and search_after values', async () => {
mockEventLogClient.findEventsBySavedObjectIdsSearchAfter.mockResolvedValue({
total: 0,
data: [],
search_after: ['2024-01-02'],
pit_id: 'test-pit-id',
});
await findGapsSearchAfter({
eventLogClient: mockEventLogClient,
logger: mockLogger,
params: {
ruleId: 'test-rule',
start: '2024-01-01',
end: '2024-01-02',
perPage: 10,
pitId: 'existing-pit-id',
searchAfter: ['2024-01-01'],
},
});
expect(mockEventLogClient.findEventsBySavedObjectIdsSearchAfter).toHaveBeenCalledWith(
'alert',
['test-rule'],
expect.objectContaining({
pit_id: 'existing-pit-id',
search_after: ['2024-01-01'],
})
);
});
it('should transform response data to Gap objects', async () => {
const mockResponse = {
total: 1,
data: [createMockGapEvent()],
search_after: ['2024-01-02'],
pit_id: 'test-pit-id',
};
mockEventLogClient.findEventsBySavedObjectIdsSearchAfter.mockResolvedValue(mockResponse);
const result = await findGapsSearchAfter({
eventLogClient: mockEventLogClient,
logger: mockLogger,
params: {
ruleId: 'test-rule',
perPage: 10,
start: '2024-01-01',
end: '2024-01-02',
},
});
expect(result.data[0]).toBeInstanceOf(Gap);
expect(result.data[0].range).toEqual({
gte: new Date('2024-01-01'),
lte: new Date('2024-01-02'),
});
expect(result.data[0].filledIntervals).toEqual([]);
expect(result.data[0].inProgressIntervals).toEqual([]);
expect(result.searchAfter).toEqual(['2024-01-02']);
expect(result.pitId).toBe('test-pit-id');
});
it('should handle custom sort field', async () => {
mockEventLogClient.findEventsBySavedObjectIdsSearchAfter.mockResolvedValue({
total: 0,
data: [],
search_after: undefined,
pit_id: 'test-pit-id',
});
await findGapsSearchAfter({
eventLogClient: mockEventLogClient,
logger: mockLogger,
params: {
ruleId: 'test-rule',
perPage: 10,
sortField: 'kibana.alert.rule.gap.total_gap_duration_ms',
sortOrder: 'asc',
start: '2024-01-01',
end: '2024-01-02',
},
});
expect(mockEventLogClient.findEventsBySavedObjectIdsSearchAfter).toHaveBeenCalledWith(
'alert',
['test-rule'],
expect.objectContaining({
sort: [{ sort_field: 'kibana.alert.rule.gap.total_gap_duration_ms', sort_order: 'asc' }],
})
);
});
it('should handle errors and log them', async () => {
const error = new Error('Test error');
mockEventLogClient.findEventsBySavedObjectIdsSearchAfter.mockRejectedValue(error);
await expect(
findGapsSearchAfter({
eventLogClient: mockEventLogClient,
logger: mockLogger,
params: {
ruleId: 'test-rule',
perPage: 10,
start: '2024-01-01',
end: '2024-01-02',
},
})
).rejects.toThrow(error);
expect(mockLogger.error).toHaveBeenCalledWith(
expect.stringContaining('Failed to find gaps with search after for rule test-rule')
);
});
});

View file

@ -7,12 +7,12 @@
import { IEventLogClient } from '@kbn/event-log-plugin/server';
import { Logger } from '@kbn/core/server';
import { SortResults } from '@elastic/elasticsearch/lib/api/types';
import { RULE_SAVED_OBJECT_TYPE } from '../../saved_objects';
import { FindGapsParams } from './types';
import { FindGapsParams, FindGapsSearchAfterParams } from './types';
import { Gap } from './gap';
import { transformToGap } from './transforms/transform_to_gap';
import { buildGapsFilter } from './build_gaps_filter';
export const findGaps = async ({
eventLogClient,
logger,
@ -59,3 +59,53 @@ export const findGaps = async ({
throw err;
}
};
/**
* This function is used to find gaps using search after.
* It's used when to be able process more than 10,000 gaps with stable sorting.
*/
export const findGapsSearchAfter = async ({
eventLogClient,
logger,
params,
}: {
eventLogClient: IEventLogClient;
logger: Logger;
params: FindGapsSearchAfterParams;
}): Promise<{
total: number;
data: Gap[];
searchAfter?: SortResults[];
pitId?: string;
}> => {
const { ruleId, start, end, perPage, statuses, sortField, sortOrder } = params;
try {
const filter = buildGapsFilter({ start, end, statuses });
const gapsResponse = await eventLogClient.findEventsBySavedObjectIdsSearchAfter(
RULE_SAVED_OBJECT_TYPE,
[ruleId],
{
filter,
sort: [
{
sort_field: sortField ?? '@timestamp',
sort_order: sortOrder ?? 'desc',
},
],
per_page: perPage,
pit_id: params?.pitId,
search_after: params?.searchAfter,
}
);
return {
total: gapsResponse.total,
data: transformToGap(gapsResponse),
searchAfter: gapsResponse.search_after,
pitId: gapsResponse.pit_id,
};
} catch (err) {
logger.error(`Failed to find gaps with search after for rule ${ruleId}: ${err.message}`);
throw err;
}
};

View file

@ -33,10 +33,9 @@ export const gapBaseSchema = schema.object({
in_progress_duration_ms: schema.number(),
});
export const findGapsParamsSchema = schema.object(
const findGapsBaseParamsSchema = schema.object(
{
end: schema.string(),
page: schema.number({ defaultValue: 1, min: 1 }),
perPage: schema.number({ defaultValue: 10, min: 0 }),
ruleId: schema.string(),
start: schema.string(),
@ -70,9 +69,20 @@ export const findGapsParamsSchema = schema.object(
}
);
export const findGapsParamsSchema = findGapsBaseParamsSchema.extends({
page: schema.number({ defaultValue: 1, min: 1 }),
});
export const findGapsByIdParamsSchema = schema.object({
gapIds: schema.arrayOf(schema.string()),
ruleId: schema.string(),
page: schema.number({ defaultValue: 1, min: 1 }),
perPage: schema.number({ defaultValue: 10, min: 1 }),
});
export const findGapsSearchAfterParamsSchema = findGapsBaseParamsSchema.extends({
pitId: schema.maybe(schema.string()),
searchAfter: schema.maybe(
schema.arrayOf(schema.oneOf([schema.string(), schema.number(), schema.boolean(), schema.any()]))
),
});

View file

@ -6,10 +6,16 @@
*/
import { TypeOf } from '@kbn/config-schema';
import { gapBaseSchema, findGapsParamsSchema, findGapsByIdParamsSchema } from '../schemas';
import {
gapBaseSchema,
findGapsParamsSchema,
findGapsByIdParamsSchema,
findGapsSearchAfterParamsSchema,
} from '../schemas';
export type GapBase = TypeOf<typeof gapBaseSchema>;
export type FindGapsParams = TypeOf<typeof findGapsParamsSchema>;
export type FindGapsSearchAfterParams = TypeOf<typeof findGapsSearchAfterParamsSchema>;
export type FindGapsByIdParams = TypeOf<typeof findGapsByIdParamsSchema>;
export interface Interval {

View file

@ -6,7 +6,7 @@
*/
import { updateGaps } from './update_gaps';
import { findGaps } from '../find_gaps';
import { findGapsSearchAfter } from '../find_gaps';
import { mgetGaps } from '../mget_gaps';
import { updateGapFromSchedule } from './update_gap_from_schedule';
import { calculateGapStateFromAllBackfills } from './calculate_gaps_state';
@ -49,14 +49,23 @@ describe('updateGaps', () => {
beforeEach(() => {
jest.resetAllMocks();
(findGaps as jest.Mock).mockResolvedValue({ data: [], total: 0 });
(findGapsSearchAfter as jest.Mock).mockResolvedValue({
data: [],
total: 0,
pitId: 'test-pit-id',
searchAfter: undefined,
});
(mgetGaps as jest.Mock).mockResolvedValue([]);
});
describe('updateGaps', () => {
it('should orchestrate the gap update process', async () => {
const testGap = createTestGap();
(findGaps as jest.Mock).mockResolvedValue({ data: [testGap], total: 1 });
(findGapsSearchAfter as jest.Mock).mockResolvedValue({
data: [testGap],
pitId: 'test-pit-id',
searchAfter: undefined,
});
await updateGaps({
ruleId: 'test-rule-id',
@ -70,22 +79,26 @@ describe('updateGaps', () => {
actionsClient: mockActionsClient,
});
expect(findGaps).toHaveBeenCalledWith({
expect(findGapsSearchAfter).toHaveBeenCalledWith({
eventLogClient: mockEventLogClient,
logger: mockLogger,
params: {
ruleId: 'test-rule-id',
start: '2024-01-01T00:00:00.000Z',
end: '2024-01-01T01:00:00.000Z',
page: 1,
perPage: 500,
statuses: ['partially_filled', 'unfilled'],
sortField: '@timestamp',
sortOrder: 'asc',
searchAfter: undefined,
pitId: undefined,
},
});
expect(mockEventLogger.updateEvents).toHaveBeenCalled();
expect(mockEventLogClient.closePointInTime).toHaveBeenCalledWith('test-pit-id');
});
it('should handle pagination', async () => {
it('should handle pagination with search_after', async () => {
const gaps = [
createTestGap(),
new Gap({
@ -102,13 +115,21 @@ describe('updateGaps', () => {
}),
];
// Mock first page with perPage items to trigger second page fetch
// Mock first page with search_after
const firstPageGaps = Array(500).fill(gaps[0]);
const secondPageGaps = [gaps[1]];
(findGaps as jest.Mock)
.mockResolvedValueOnce({ data: firstPageGaps, total: 501 })
.mockResolvedValueOnce({ data: secondPageGaps, total: 501 });
(findGapsSearchAfter as jest.Mock)
.mockResolvedValueOnce({
data: firstPageGaps,
pitId: 'test-pit-id',
searchAfter: ['2024-01-01T01:00:00.000Z'],
})
.mockResolvedValueOnce({
data: secondPageGaps,
pitId: 'test-pit-id',
searchAfter: undefined,
});
await updateGaps({
ruleId: 'test-rule-id',
@ -122,26 +143,33 @@ describe('updateGaps', () => {
actionsClient: mockActionsClient,
});
expect(findGaps).toHaveBeenCalledTimes(2);
expect(findGaps).toHaveBeenNthCalledWith(
expect(findGapsSearchAfter).toHaveBeenCalledTimes(2);
expect(findGapsSearchAfter).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
params: expect.objectContaining({ page: 1 }),
params: expect.objectContaining({
searchAfter: undefined,
pitId: undefined,
}),
})
);
expect(findGaps).toHaveBeenNthCalledWith(
expect(findGapsSearchAfter).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
params: expect.objectContaining({ page: 2 }),
params: expect.objectContaining({
searchAfter: ['2024-01-01T01:00:00.000Z'],
pitId: 'test-pit-id',
}),
})
);
expect(mockEventLogger.updateEvents).toHaveBeenCalledTimes(2);
expect(mockEventLogClient.closePointInTime).toHaveBeenCalledWith('test-pit-id');
});
});
describe('error handling', () => {
it('should handle findGaps errors', async () => {
(findGaps as jest.Mock).mockRejectedValue(new Error('Find gaps failed'));
(findGapsSearchAfter as jest.Mock).mockRejectedValue(new Error('Find gaps failed'));
await updateGaps({
ruleId: 'test-rule-id',
@ -163,7 +191,12 @@ describe('updateGaps', () => {
it('should retry on conflict errors and refetch gap', async () => {
const testGap = createTestGap();
const updatedGap = createTestGap();
(findGaps as jest.Mock).mockResolvedValue({ data: [testGap], total: 1 });
(findGapsSearchAfter as jest.Mock).mockResolvedValue({
data: [testGap],
total: 1,
pitId: 'test-pit-id',
searchAfter: undefined,
});
(mgetGaps as jest.Mock).mockResolvedValue([updatedGap]);
if (!testGap.internalFields?._id) {
@ -210,7 +243,12 @@ describe('updateGaps', () => {
it('should stop retrying after max attempts', async () => {
const testGap = createTestGap();
const updatedGap = createTestGap();
(findGaps as jest.Mock).mockResolvedValue({ data: [testGap], total: 1 });
(findGapsSearchAfter as jest.Mock).mockResolvedValue({
data: [testGap],
total: 1,
pitId: 'test-pit-id',
searchAfter: undefined,
});
(mgetGaps as jest.Mock).mockResolvedValue([updatedGap]);
if (!testGap.internalFields?._id) {
@ -253,7 +291,12 @@ describe('updateGaps', () => {
it('should handle direct schedule updates', async () => {
const testGap = createTestGap();
(findGaps as jest.Mock).mockResolvedValue({ data: [testGap], total: 1 });
(findGapsSearchAfter as jest.Mock).mockResolvedValue({
data: [testGap],
total: 1,
pitId: 'test-pit-id',
searchAfter: undefined,
});
const backfillSchedule = [
{
@ -285,7 +328,12 @@ describe('updateGaps', () => {
it('should trigger refetch when shouldRefetchAllBackfills is true', async () => {
const testGap = createTestGap();
(findGaps as jest.Mock).mockResolvedValue({ data: [testGap], total: 1 });
(findGapsSearchAfter as jest.Mock).mockResolvedValue({
data: [testGap],
total: 1,
pitId: 'test-pit-id',
searchAfter: undefined,
});
await updateGaps({
ruleId: 'test-rule-id',

View file

@ -8,9 +8,10 @@
import { Logger, ISavedObjectsRepository } from '@kbn/core/server';
import { IEventLogClient, IEventLogger } from '@kbn/event-log-plugin/server';
import { ActionsClient } from '@kbn/actions-plugin/server';
import { SortResults } from '@elastic/elasticsearch/lib/api/types';
import { BackfillClient } from '../../../backfill_client/backfill_client';
import { AlertingEventLogger } from '../../alerting_event_logger/alerting_event_logger';
import { findGaps } from '../find_gaps';
import { findGapsSearchAfter } from '../find_gaps';
import { Gap } from '../gap';
import { gapStatus } from '../../../../common/constants';
import { BackfillSchedule } from '../../../application/backfill/result/types';
@ -199,8 +200,8 @@ const updateGapBatch = async (
/**
* Update gaps for a given rule
* Trying to fetch gaps in batches
* Prepare them for update
* Using search_after pagination to process more than 10,000 gaps with stable sorting
* Prepare gaps for update
* Update them in bulk
* If there are conflicts, retry the failed gaps
*/
@ -225,46 +226,73 @@ export const updateGaps = async (params: UpdateGapsParams) => {
try {
const alertingEventLogger = new AlertingEventLogger(eventLogger);
let currentPage = 1;
let hasErrors = false;
let searchAfter: SortResults[] | undefined;
let pitId: string | undefined;
let iterationCount = 0;
// Circuit breaker to prevent infinite loops
// It should be enough to update 50,000,000 gaps
// 100000 * 500 = 50,000,000 millions gaps
const MAX_ITERATIONS = 100000;
while (true) {
const { data: gaps } = await findGaps({
eventLogClient,
logger,
params: {
ruleId,
start: start.toISOString(),
end: end.toISOString(),
page: currentPage,
perPage: PAGE_SIZE,
statuses: [gapStatus.PARTIALLY_FILLED, gapStatus.UNFILLED],
},
});
try {
while (true) {
if (iterationCount >= MAX_ITERATIONS) {
logger.warn(
`Circuit breaker triggered: Reached maximum number of iterations (${MAX_ITERATIONS}) while updating gaps for rule ${ruleId}`
);
break;
}
iterationCount++;
if (gaps.length > 0) {
const success = await updateGapBatch(gaps, {
backfillSchedule,
savedObjectsRepository,
shouldRefetchAllBackfills,
backfillClient,
actionsClient,
alertingEventLogger,
logger,
ruleId,
const gapsResponse = await findGapsSearchAfter({
eventLogClient,
logger,
params: {
ruleId,
start: start.toISOString(),
end: end.toISOString(),
perPage: PAGE_SIZE,
statuses: [gapStatus.PARTIALLY_FILLED, gapStatus.UNFILLED],
sortField: '@timestamp',
sortOrder: 'asc',
searchAfter,
pitId,
},
});
if (!success) {
hasErrors = true;
const { data: gaps, searchAfter: nextSearchAfter, pitId: nextPitId } = gapsResponse;
pitId = nextPitId;
if (gaps.length > 0) {
const success = await updateGapBatch(gaps, {
backfillSchedule,
savedObjectsRepository,
shouldRefetchAllBackfills,
backfillClient,
actionsClient,
alertingEventLogger,
logger,
ruleId,
eventLogClient,
});
if (!success) {
hasErrors = true;
}
}
}
if (gaps.length === 0 || gaps.length < PAGE_SIZE) {
break;
}
// Exit conditions: no more results or no next search_after
if (gaps.length === 0 || !nextSearchAfter) {
break;
}
currentPage++;
searchAfter = nextSearchAfter;
}
} finally {
if (pitId) {
await eventLogClient.closePointInTime(pitId);
}
}
if (hasErrors) {

View file

@ -30,6 +30,8 @@ const createClusterClientMock = () => {
shutdown: jest.fn(),
updateDocuments: jest.fn(),
queryEventsByDocumentIds: jest.fn(),
queryEventsBySavedObjectsSearchAfter: jest.fn(),
closePointInTime: jest.fn(),
refreshIndex: jest.fn(),
};
return mock;

View file

@ -2595,6 +2595,167 @@ describe('queryEventsByDocumentIds', () => {
});
});
describe('queryEventsBySavedObjectsSearchAfter', () => {
const defaultQuery = {
index: 'test-index',
namespace: undefined,
type: 'test-type',
ids: ['test-id'],
findOptions: {
per_page: 10,
sort: [{ sort_field: '@timestamp', sort_order: 'desc' }],
},
};
beforeEach(() => {
clusterClient.openPointInTime.mockResponse({
id: 'test-pit-id',
_shards: {
total: 1,
successful: 1,
failed: 0,
skipped: 0,
},
});
clusterClient.search.mockResponse({
hits: {
hits: [
{
_id: 'hit1',
_index: 'test-index',
_seq_no: 1,
_primary_term: 1,
_source: { field: 'value1' },
sort: ['2021-01-01T00:00:00.000Z'],
},
{
_id: 'hit2',
_index: 'test-index',
_seq_no: 2,
_primary_term: 1,
_source: { field: 'value2' },
sort: ['2021-01-01T00:00:01.000Z'],
},
],
total: { value: 100, relation: 'eq' },
},
took: 0,
timed_out: false,
_shards: {
failed: 0,
successful: 0,
total: 0,
skipped: 0,
},
});
});
test('should use seq_no_primary_term in query', async () => {
await clusterClientAdapter.queryEventsBySavedObjectsSearchAfter(defaultQuery);
expect(clusterClient.search).toHaveBeenCalledWith(
expect.objectContaining({
seq_no_primary_term: true,
})
);
});
test('should create new PIT when not provided', async () => {
await clusterClientAdapter.queryEventsBySavedObjectsSearchAfter(defaultQuery);
expect(clusterClient.openPointInTime).toHaveBeenCalledWith({
index: 'test-index',
keep_alive: '1m',
});
});
test('should not create new PIT when one is provided', async () => {
await clusterClientAdapter.queryEventsBySavedObjectsSearchAfter({
...defaultQuery,
findOptions: {
...defaultQuery.findOptions,
pit_id: 'existing-pit-id',
},
});
expect(clusterClient.openPointInTime).not.toHaveBeenCalled();
});
test('should use provided search_after in query', async () => {
await clusterClientAdapter.queryEventsBySavedObjectsSearchAfter({
...defaultQuery,
findOptions: {
...defaultQuery.findOptions,
search_after: ['2021-01-01T00:00:00.000Z'],
},
});
expect(clusterClient.search).toHaveBeenCalledWith(
expect.objectContaining({
body: expect.objectContaining({
search_after: ['2021-01-01T00:00:00.000Z'],
}),
})
);
});
test('should return properly formatted response', async () => {
const response = await clusterClientAdapter.queryEventsBySavedObjectsSearchAfter(defaultQuery);
expect(response).toEqual({
data: [
{
field: 'value1',
_id: 'hit1',
_index: 'test-index',
_seq_no: 1,
_primary_term: 1,
},
{
field: 'value2',
_id: 'hit2',
_index: 'test-index',
_seq_no: 2,
_primary_term: 1,
},
],
total: 100,
search_after: ['2021-01-01T00:00:01.000Z'],
pit_id: 'test-pit-id',
});
});
test('should clean up PIT on error', async () => {
clusterClient.search.mockRejectedValue(new Error('Search failed'));
await expect(
clusterClientAdapter.queryEventsBySavedObjectsSearchAfter(defaultQuery)
).rejects.toThrow('Search failed');
expect(clusterClient.closePointInTime).toHaveBeenCalledWith({ id: 'test-pit-id' });
});
});
describe('closePointInTime', () => {
test('should not call cluster when pitId is empty', async () => {
await clusterClientAdapter.closePointInTime('');
expect(clusterClient.closePointInTime).not.toHaveBeenCalled();
});
test('should call cluster with given pitId', async () => {
await clusterClientAdapter.closePointInTime('test-pit-id');
expect(clusterClient.closePointInTime).toHaveBeenCalledWith({ id: 'test-pit-id' });
});
test('should throw and log error when cluster call fails', async () => {
const error = new Error('Failed to close PIT');
clusterClient.closePointInTime.mockRejectedValue(error);
await expect(clusterClientAdapter.closePointInTime('test-pit-id')).rejects.toThrow(error);
expect(logger.error).toHaveBeenCalledWith('Failed to close point in time: Failed to close PIT');
});
});
describe('refreshIndex', () => {
test('should successfully refresh index', async () => {
clusterClient.indices.refresh.mockResolvedValue({});

View file

@ -15,7 +15,12 @@ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { fromKueryExpression, toElasticsearchQuery, KueryNode, nodeBuilder } from '@kbn/es-query';
import { BulkResponse, long } from '@elastic/elasticsearch/lib/api/types';
import { IEvent, IValidatedEvent, SAVED_OBJECT_REL_PRIMARY } from '../types';
import { AggregateOptionsType, FindOptionsType, QueryOptionsType } from '../event_log_client';
import {
AggregateOptionsType,
FindOptionsType,
QueryOptionsType,
FindOptionsSearchAfterType,
} from '../event_log_client';
import { ParsedIndexAlias } from './init';
import { EsNames } from './names';
@ -93,6 +98,10 @@ export type FindEventsOptionsBySavedObjectFilter = QueryOptionsEventsBySavedObje
findOptions: FindOptionsType;
};
export type FindEventsOptionsSearchAfter = QueryOptionsEventsBySavedObjectFilter & {
findOptions: FindOptionsSearchAfterType;
};
export type AggregateEventsOptionsBySavedObjectFilter = QueryOptionsEventsBySavedObjectFilter & {
aggregateOptions: AggregateOptionsType;
};
@ -114,6 +123,13 @@ type AliasAny = any;
const LEGACY_ID_CUTOFF_VERSION = '8.0.0';
export interface QueryEventsBySavedObjectSearchAfterResult {
data: IValidatedEventInternalDocInfo[];
total: number;
search_after?: estypes.SortResults;
pit_id?: string;
}
export class ClusterClientAdapter<
TDoc extends {
body: AliasAny;
@ -677,6 +693,100 @@ export class ClusterClientAdapter<
throw err;
}
}
public async queryEventsBySavedObjectsSearchAfter(
queryOptions: FindEventsOptionsSearchAfter
): Promise<QueryEventsBySavedObjectSearchAfterResult> {
const { index, type, ids, findOptions } = queryOptions;
const {
per_page: perPage,
sort,
pit_id: existingPitId,
search_after: searchAfter,
} = findOptions;
const esClient = await this.elasticsearchClientPromise;
let pitId = existingPitId;
// Create new PIT if not provided
if (!pitId) {
const pitResponse = await esClient.openPointInTime({
index,
keep_alive: '1m',
});
pitId = pitResponse.id;
}
const query = getQueryBody(
this.logger,
queryOptions,
pick(queryOptions.findOptions, ['start', 'end', 'filter'])
);
const body: estypes.SearchRequest['body'] = {
size: perPage,
query,
pit: {
id: pitId,
keep_alive: '1m',
},
...(sort
? { sort: sort.map((s) => ({ [s.sort_field]: { order: s.sort_order } })) as estypes.Sort }
: { sort: [{ '@timestamp': { order: 'desc' } }, { _id: { order: 'desc' } }] }), // default sort
...(searchAfter ? { search_after: searchAfter } : {}),
};
try {
const {
hits: { hits, total },
} = await esClient.search<IValidatedEventInternalDocInfo>({
body,
track_total_hits: true,
seq_no_primary_term: true,
});
// Get the sort values from the last hit to use as search_after for next page
const lastHit = hits[hits.length - 1];
const nextSearchAfter = lastHit?.sort;
return {
data: hits.map((hit) => ({
...hit._source,
_id: hit._id!,
_index: hit._index,
_seq_no: hit._seq_no!,
_primary_term: hit._primary_term!,
})),
total: isNumber(total) ? total : total!.value,
search_after: nextSearchAfter,
pit_id: pitId,
};
} catch (err) {
try {
if (pitId) {
await esClient.closePointInTime({ id: pitId });
}
} catch (closeErr) {
this.logger.error(`Failed to close point in time: ${closeErr.message}`);
}
throw new Error(
`querying for Event Log by for type "${type}" and ids "${ids}" failed with: ${err.message}`
);
}
}
public async closePointInTime(pitId: string): Promise<void> {
if (!pitId) return;
try {
const esClient = await this.elasticsearchClientPromise;
await esClient.closePointInTime({ id: pitId });
} catch (err) {
this.logger.error(`Failed to close point in time: ${err.message}`);
throw err;
}
}
}
export function getQueryBodyWithAuthFilter(
@ -845,7 +955,10 @@ function getNamespaceQuery(namespace?: string) {
export function getQueryBody(
logger: Logger,
opts: FindEventsOptionsBySavedObjectFilter | AggregateEventsOptionsBySavedObjectFilter,
opts:
| FindEventsOptionsBySavedObjectFilter
| AggregateEventsOptionsBySavedObjectFilter
| FindEventsOptionsSearchAfter,
queryOptions: QueryOptionsType
) {
const { namespace, type, ids, legacyIds } = opts;

View file

@ -14,6 +14,8 @@ const createEventLogClientMock = () => {
aggregateEventsBySavedObjectIds: jest.fn(),
aggregateEventsWithAuthFilter: jest.fn(),
findEventsByDocumentIds: jest.fn(),
findEventsBySavedObjectIdsSearchAfter: jest.fn(),
closePointInTime: jest.fn(),
refreshIndex: jest.fn(),
};
return mock;

View file

@ -15,8 +15,12 @@ import { SpacesServiceStart } from '@kbn/spaces-plugin/server';
import { KueryNode } from '@kbn/es-query';
import { EsContext } from './es';
import { IEventLogClient } from './types';
import { QueryEventsBySavedObjectResult } from './es/cluster_client_adapter';
import {
QueryEventsBySavedObjectResult,
QueryEventsBySavedObjectSearchAfterResult,
} from './es/cluster_client_adapter';
import { SavedObjectBulkGetterResult } from './saved_object_provider_registry';
export type PluginClusterClient = Pick<IClusterClient, 'asInternalUser'>;
export type AdminClusterClient$ = Observable<PluginClusterClient>;
@ -57,6 +61,20 @@ export const queryOptionsSchema = schema.object({
filter: schema.maybe(schema.string()),
});
export const queryOptionsSearchAfterSchema = schema.object({
per_page: schema.number({ defaultValue: 10, min: 0 }),
pit_id: schema.maybe(schema.string()),
search_after: schema.maybe(
schema.arrayOf(schema.oneOf([schema.string(), schema.number(), schema.boolean(), schema.any()]))
),
start: optionalDateFieldSchema,
end: optionalDateFieldSchema,
sort: schema.arrayOf(sortSchema, {
defaultValue: [{ sort_field: '@timestamp', sort_order: 'asc' }],
}),
filter: schema.maybe(schema.string()),
});
export type QueryOptionsType = Pick<TypeOf<typeof queryOptionsSchema>, 'start' | 'end' | 'filter'>;
// page & perPage are required, other fields are optional
@ -72,6 +90,11 @@ export type AggregateOptionsType = Pick<TypeOf<typeof queryOptionsSchema>, 'filt
aggs: Record<string, estypes.AggregationsAggregationContainer>;
};
export type FindOptionsSearchAfterType = Omit<FindOptionsType, 'page'> & {
pit_id?: string;
search_after?: estypes.SortResults;
};
interface EventLogServiceCtorParams {
esContext: EsContext;
savedObjectGetter: SavedObjectBulkGetterResult;
@ -207,6 +230,30 @@ export class EventLogClient implements IEventLogClient {
await this.esContext.esAdapter.refreshIndex();
}
public async findEventsBySavedObjectIdsSearchAfter(
type: string,
ids: string[],
options?: Partial<FindOptionsSearchAfterType>,
legacyIds?: string[]
): Promise<QueryEventsBySavedObjectSearchAfterResult> {
const findOptions = queryOptionsSearchAfterSchema.validate(options ?? {});
await this.savedObjectGetter(type, ids);
return await this.esContext.esAdapter.queryEventsBySavedObjectsSearchAfter({
index: this.esContext.esNames.indexPattern,
namespace: await this.getNamespace(),
type,
ids,
findOptions,
legacyIds,
});
}
public async closePointInTime(pitId: string): Promise<void> {
return await this.esContext.esAdapter.closePointInTime(pitId);
}
private async getNamespace() {
const space = await this.spacesService?.getActiveSpace(this.request);
return space && this.spacesService?.spaceIdToNamespace(space.id);

View file

@ -13,11 +13,16 @@ export type { IEvent, IValidatedEvent } from '../generated/schemas';
export { EventSchema, ECS_VERSION } from '../generated/schemas';
import { BulkResponse } from '@elastic/elasticsearch/lib/api/types';
import { IEvent } from '../generated/schemas';
import { AggregateOptionsType, FindOptionsType } from './event_log_client';
import {
AggregateOptionsType,
FindOptionsSearchAfterType,
FindOptionsType,
} from './event_log_client';
import {
AggregateEventsBySavedObjectResult,
QueryEventsBySavedObjectResult,
InternalFields,
QueryEventsBySavedObjectSearchAfterResult,
} from './es/cluster_client_adapter';
export type {
@ -84,6 +89,13 @@ export interface IEventLogClient {
findEventsByDocumentIds(
docs: Array<{ _id: string; _index: string }>
): Promise<Pick<QueryEventsBySavedObjectResult, 'data'>>;
findEventsBySavedObjectIdsSearchAfter(
type: string,
ids: string[],
options?: Partial<FindOptionsSearchAfterType>,
legacyIds?: string[]
): Promise<QueryEventsBySavedObjectSearchAfterResult>;
closePointInTime(pitId: string): Promise<void>;
refreshIndex(): Promise<void>;
}

View file

@ -24,6 +24,7 @@ import {
EuiText,
EuiHealth,
EuiSuperDatePicker,
EuiTextColor,
} from '@elastic/eui';
import { useUserData } from '../../../../detections/components/user_info';
import { hasUserCRUDPermission } from '../../../../common/utils/privileges';
@ -182,10 +183,12 @@ export const RuleGaps = ({ ruleId, enabled }: { ruleId: string; enabled: boolean
sortOrder: sort.direction,
});
const totalItemCount = data?.total ?? 0;
const MaxItemCount = 10000;
const pagination = {
pageIndex,
pageSize,
totalItemCount: data?.total ?? 0,
totalItemCount: Math.min(totalItemCount, MaxItemCount),
};
const columns = getGapsTableColumns(hasCRUDPermissions, ruleId, enabled);
@ -269,7 +272,15 @@ export const RuleGaps = ({ ruleId, enabled }: { ruleId: string; enabled: boolean
</EuiFlexGroup>
</EuiFlexItem>
</EuiFlexGroup>
<EuiFlexGroup justifyContent="flexEnd">
<EuiFlexGroup justifyContent="spaceBetween">
<EuiFlexItem grow={false}>
{totalItemCount > MaxItemCount && (
<EuiTextColor color="danger">
{i18n.GAPS_TABLE_TOTAL_GAPS_LABEL(totalItemCount, MaxItemCount)}
</EuiTextColor>
)}
</EuiFlexItem>
<EuiFlexItem grow={false}>
{timelines.getLastUpdated({
showUpdating: isLoading,

View file

@ -131,3 +131,9 @@ export const GAP_FILL_DISABLED_MESSAGE = i18n.translate(
defaultMessage: 'Rule should be enabled to fill gaps',
}
);
export const GAPS_TABLE_TOTAL_GAPS_LABEL = (totalItems: number, maxItems: number) =>
i18n.translate('xpack.securitySolution.gapsTable.totalGapsLabel', {
values: { totalItems, maxItems },
defaultMessage: `More than {totalItems} gaps match filters provided. Showing first {maxItems}. Constrain filters further to view additional gaps.`,
});

View file

@ -295,10 +295,11 @@ export default function updateGapsTests({ getService }: FtrProviderContext) {
});
it('should fill gap with multiple backfills', async () => {
const parallelBackfills = 5;
const { space } = SuperuserAtSpace1;
const fiveDaysGapStart = moment(gapStart);
const fiveDaysGapEnd = moment(gapStart).add(5, 'days').toISOString();
const fiveDaysGapEnd = moment(gapStart).add(parallelBackfills, 'days').toISOString();
// Create a rule
const ruleResponse = await supertest
@ -320,12 +321,10 @@ export default function updateGapsTests({ getService }: FtrProviderContext) {
spaceId: space.id,
});
// Schedule two backfills that together cover the entire gap
const startBackfillTime = moment(gapStart);
const backfills = [];
for (let i = 0; i < 5; i++) {
for (let i = 0; i < parallelBackfills; i++) {
backfills.push({
rule_id: ruleId,
start: startBackfillTime.toISOString(),
@ -340,7 +339,7 @@ export default function updateGapsTests({ getService }: FtrProviderContext) {
.send(backfills);
expect(scheduleResponse.statusCode).to.eql(200);
expect(scheduleResponse.body).to.have.length(5);
expect(scheduleResponse.body).to.have.length(parallelBackfills);
// Wait for both backfills to complete
await Promise.all(
@ -364,7 +363,7 @@ export default function updateGapsTests({ getService }: FtrProviderContext) {
expect(finalGapResponse.body.total).to.eql(1);
const finalGap = finalGapResponse.body.data[0];
expect(finalGap.status).to.eql('filled');
expect(finalGap.filled_duration_ms).to.eql(432000000);
expect(finalGap.filled_duration_ms).to.eql(86400000 * parallelBackfills);
expect(finalGap.unfilled_duration_ms).to.eql(0);
expect(finalGap.filled_intervals).to.have.length(1);
expect(finalGap.filled_intervals[0].gte).to.eql(fiveDaysGapStart.toISOString());