kibana_usage_collection: optimize SO PIT searches (#168677)

## Summary

Properly loop though PIT search pages instead of fetching all documents
at once

---------

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Pierre Gayvallet 2023-10-15 12:51:42 +02:00 committed by GitHub
parent 81544b7f76
commit 02177248ec
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 97 additions and 267 deletions

View file

@ -1,28 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type {
ISavedObjectsRepository,
SavedObjectsCreatePointInTimeFinderOptions,
SavedObjectsFindResult,
} from '@kbn/core/server';
export async function fetchAllSavedObjects<T>(
soRepository: ISavedObjectsRepository,
findOptions: SavedObjectsCreatePointInTimeFinderOptions
): Promise<Array<SavedObjectsFindResult<T>>> {
const finder = soRepository.createPointInTimeFinder<T>({ ...findOptions, perPage: 1000 });
const allSavedObjects: Array<SavedObjectsFindResult<T>> = [];
for await (const { saved_objects: savedObjects } of finder.find()) {
allSavedObjects.push(...savedObjects);
}
return allSavedObjects;
}

View file

@ -177,18 +177,11 @@ describe('rollTotals', () => {
],
{ overwrite: true }
);
expect(savedObjectClient.delete).toHaveBeenCalledTimes(3);
expect(savedObjectClient.delete).toHaveBeenCalledWith(
SAVED_OBJECTS_DAILY_TYPE,
'appId-2:2020-01-01'
);
expect(savedObjectClient.delete).toHaveBeenCalledWith(
SAVED_OBJECTS_DAILY_TYPE,
'appId-1:2020-01-01'
);
expect(savedObjectClient.delete).toHaveBeenCalledWith(
SAVED_OBJECTS_DAILY_TYPE,
'appId-1:2020-01-01:viewId-1'
);
expect(savedObjectClient.bulkDelete).toHaveBeenCalledTimes(1);
expect(savedObjectClient.bulkDelete).toHaveBeenCalledWith([
{ type: SAVED_OBJECTS_DAILY_TYPE, id: 'appId-2:2020-01-01' },
{ type: SAVED_OBJECTS_DAILY_TYPE, id: 'appId-1:2020-01-01' },
{ type: SAVED_OBJECTS_DAILY_TYPE, id: 'appId-1:2020-01-01:viewId-1' },
]);
});
});

View file

@ -16,7 +16,6 @@ import {
SAVED_OBJECTS_TOTAL_TYPE,
} from '../saved_objects_types';
import { serializeKey } from './utils';
import { fetchAllSavedObjects } from '../fetch_all_saved_objects';
/**
* Moves all the daily documents into aggregated "total" documents as we don't care about any granularity after 90 days
@ -29,56 +28,56 @@ export async function rollTotals(logger: Logger, savedObjectsClient?: ISavedObje
}
try {
const [rawApplicationUsageTotals, rawApplicationUsageDaily] = await Promise.all([
fetchAllSavedObjects<ApplicationUsageTotal>(savedObjectsClient, {
type: SAVED_OBJECTS_TOTAL_TYPE,
}),
fetchAllSavedObjects<ApplicationUsageDaily>(savedObjectsClient, {
type: SAVED_OBJECTS_DAILY_TYPE,
filter: `${SAVED_OBJECTS_DAILY_TYPE}.attributes.timestamp < now-90d`,
}),
]);
const existingTotals = rawApplicationUsageTotals.reduce(
(
acc,
{
attributes: { appId, viewId = MAIN_APP_DEFAULT_VIEW_ID, numberOfClicks, minutesOnScreen },
}
) => {
const key = viewId === MAIN_APP_DEFAULT_VIEW_ID ? appId : serializeKey(appId, viewId);
// No need to sum because there should be 1 document per appId only
acc[key] = { appId, viewId, numberOfClicks, minutesOnScreen };
return acc;
},
{} as Record<
string,
{ appId: string; viewId: string; minutesOnScreen: number; numberOfClicks: number }
>
);
const totals = rawApplicationUsageDaily.reduce(
(acc, { attributes }) => {
const usageTotalsFinder = savedObjectsClient.createPointInTimeFinder<ApplicationUsageTotal>({
type: SAVED_OBJECTS_TOTAL_TYPE,
perPage: 200,
});
const existingTotals: Record<
string,
{ appId: string; viewId: string; minutesOnScreen: number; numberOfClicks: number }
> = {};
for await (const { saved_objects: savedObjects } of usageTotalsFinder.find()) {
for (const savedObject of savedObjects) {
const {
appId,
viewId = MAIN_APP_DEFAULT_VIEW_ID,
numberOfClicks,
minutesOnScreen,
} = attributes;
const key = viewId === MAIN_APP_DEFAULT_VIEW_ID ? appId : serializeKey(appId, viewId);
const existing = acc[key] || { minutesOnScreen: 0, numberOfClicks: 0 };
} = savedObject.attributes;
acc[key] = {
const key = viewId === MAIN_APP_DEFAULT_VIEW_ID ? appId : serializeKey(appId, viewId);
// No need to sum because there should be 1 document per appId only
existingTotals[key] = { appId, viewId, numberOfClicks, minutesOnScreen };
}
}
const usageDailyFinder = savedObjectsClient.createPointInTimeFinder<ApplicationUsageDaily>({
type: SAVED_OBJECTS_DAILY_TYPE,
filter: `${SAVED_OBJECTS_DAILY_TYPE}.attributes.timestamp < now-90d`,
perPage: 200,
});
const totals = { ...existingTotals };
const usageDailyIdsToDelete: string[] = [];
for await (const { saved_objects: savedObjects } of usageDailyFinder.find()) {
for (const savedObject of savedObjects) {
const {
appId,
viewId = MAIN_APP_DEFAULT_VIEW_ID,
numberOfClicks,
minutesOnScreen,
} = savedObject.attributes;
const key = viewId === MAIN_APP_DEFAULT_VIEW_ID ? appId : serializeKey(appId, viewId);
const existing = totals[key] || { minutesOnScreen: 0, numberOfClicks: 0 };
totals[key] = {
appId,
viewId,
numberOfClicks: numberOfClicks + existing.numberOfClicks,
minutesOnScreen: minutesOnScreen + existing.minutesOnScreen,
};
return acc;
},
{ ...existingTotals }
);
usageDailyIdsToDelete.push(savedObject.id);
}
}
await Promise.all([
Object.entries(totals).length &&
@ -90,8 +89,8 @@ export async function rollTotals(logger: Logger, savedObjectsClient?: ISavedObje
})),
{ overwrite: true }
),
...rawApplicationUsageDaily.map(
({ id }) => savedObjectsClient.delete(SAVED_OBJECTS_DAILY_TYPE, id) // There is no bulkDelete :(
savedObjectsClient.bulkDelete(
usageDailyIdsToDelete.map((id) => ({ id, type: SAVED_OBJECTS_DAILY_TYPE }))
),
]);
} catch (err) {

View file

@ -22,7 +22,6 @@ import { applicationUsageSchema } from './schema';
import { rollTotals, serializeKey } from './rollups';
import { ROLL_TOTAL_INDICES_INTERVAL, ROLL_INDICES_START } from './constants';
import type { ApplicationUsageTelemetryReport, ApplicationUsageViews } from './types';
import { fetchAllSavedObjects } from './fetch_all_saved_objects';
export const transformByApplicationViews = (
report: ApplicationUsageViews
@ -68,29 +67,27 @@ export function registerApplicationUsageCollector(
if (typeof savedObjectsClient === 'undefined') {
return;
}
const [rawApplicationUsageTotals, rawApplicationUsageDaily] = await Promise.all([
fetchAllSavedObjects<ApplicationUsageTotal>(savedObjectsClient, {
type: SAVED_OBJECTS_TOTAL_TYPE,
}),
fetchAllSavedObjects<ApplicationUsageDaily>(savedObjectsClient, {
type: SAVED_OBJECTS_DAILY_TYPE,
}),
]);
const applicationUsageFromTotals = rawApplicationUsageTotals.reduce(
(
acc,
{
attributes: {
appId,
viewId = MAIN_APP_DEFAULT_VIEW_ID,
minutesOnScreen,
numberOfClicks,
},
}
) => {
const existing = acc[appId] || { clicks_total: 0, minutes_on_screen_total: 0 };
acc[serializeKey(appId, viewId)] = {
const usageTotalsFinder = savedObjectsClient.createPointInTimeFinder<ApplicationUsageTotal>(
{
type: SAVED_OBJECTS_TOTAL_TYPE,
perPage: 200,
}
);
const applicationUsageFromTotals: ApplicationUsageTelemetryReport = {};
for await (const { saved_objects: savedObjects } of usageTotalsFinder.find()) {
for (const savedObject of savedObjects) {
const {
appId,
viewId = MAIN_APP_DEFAULT_VIEW_ID,
minutesOnScreen,
numberOfClicks,
} = savedObject.attributes;
const existing = applicationUsageFromTotals[appId] || {
clicks_total: 0,
minutes_on_screen_total: 0,
};
applicationUsageFromTotals[serializeKey(appId, viewId)] = {
appId,
viewId,
clicks_total: numberOfClicks + existing.clicks_total,
@ -102,28 +99,28 @@ export function registerApplicationUsageCollector(
minutes_on_screen_30_days: 0,
minutes_on_screen_90_days: 0,
};
return acc;
},
{} as ApplicationUsageTelemetryReport
);
}
}
const nowMinus7 = moment().subtract(7, 'days');
const nowMinus30 = moment().subtract(30, 'days');
const nowMinus90 = moment().subtract(90, 'days');
const applicationUsage = rawApplicationUsageDaily.reduce(
(
acc,
{
attributes: {
appId,
viewId = MAIN_APP_DEFAULT_VIEW_ID,
minutesOnScreen,
numberOfClicks,
timestamp,
},
}
) => {
const existing = acc[serializeKey(appId, viewId)] || {
const usageDailyFinder = savedObjectsClient.createPointInTimeFinder<ApplicationUsageDaily>({
type: SAVED_OBJECTS_DAILY_TYPE,
perPage: 200,
});
const applicationUsage = { ...applicationUsageFromTotals };
for await (const { saved_objects: savedObjects } of usageDailyFinder.find()) {
for (const savedObject of savedObjects) {
const {
appId,
viewId = MAIN_APP_DEFAULT_VIEW_ID,
minutesOnScreen,
numberOfClicks,
timestamp,
} = savedObject.attributes;
const existing = applicationUsage[serializeKey(appId, viewId)] || {
appId,
viewId,
clicks_total: 0,
@ -154,7 +151,7 @@ export function registerApplicationUsageCollector(
minutes_on_screen_90_days: existing.minutes_on_screen_90_days + minutesOnScreen,
};
acc[serializeKey(appId, viewId)] = {
applicationUsage[serializeKey(appId, viewId)] = {
...existing,
clicks_total: existing.clicks_total + numberOfClicks,
minutes_on_screen_total: existing.minutes_on_screen_total + minutesOnScreen,
@ -162,10 +159,8 @@ export function registerApplicationUsageCollector(
...(isInLast30Days ? last30Days : {}),
...(isInLast90Days ? last90Days : {}),
};
return acc;
},
applicationUsageFromTotals
);
}
}
return transformByApplicationViews(applicationUsage);
},

View file

@ -1,99 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { times } from 'lodash';
import { SavedObjectsFindOptions, SavedObjectsFindResult } from '@kbn/core/server';
import { savedObjectsClientMock } from '@kbn/core/server/mocks';
import { findAll } from './find_all';
describe('findAll', () => {
let savedObjectsClient: ReturnType<typeof savedObjectsClientMock.create>;
const createObj = (id: number): SavedObjectsFindResult => ({
type: 'type',
id: `id-${id}`,
attributes: {},
score: 1,
references: [],
});
beforeEach(() => {
savedObjectsClient = savedObjectsClientMock.create();
});
it('calls `client.createPointInTimeFinder` with the correct parameters', async () => {
const query: SavedObjectsFindOptions = {
type: ['some-type', 'another-type'],
};
savedObjectsClient.find.mockResolvedValue({
saved_objects: [],
total: 1,
per_page: 20,
page: 1,
});
await findAll(savedObjectsClient, query);
expect(savedObjectsClient.createPointInTimeFinder).toHaveBeenCalledTimes(1);
expect(savedObjectsClient.createPointInTimeFinder).toHaveBeenCalledWith(query);
});
it('returns the results from the PIT search', async () => {
const query: SavedObjectsFindOptions = {
type: ['some-type', 'another-type'],
};
savedObjectsClient.find.mockResolvedValue({
saved_objects: [createObj(1), createObj(2)],
total: 1,
per_page: 20,
page: 1,
});
const results = await findAll(savedObjectsClient, query);
expect(savedObjectsClient.find).toHaveBeenCalledTimes(1);
expect(savedObjectsClient.find).toHaveBeenCalledWith(
expect.objectContaining({
...query,
}),
undefined // internalOptions
);
expect(results).toEqual([createObj(1), createObj(2)]);
});
it('works when the PIT search returns multiple batches', async () => {
const query: SavedObjectsFindOptions = {
type: ['some-type', 'another-type'],
perPage: 2,
};
const objPerPage = 2;
let callCount = 0;
savedObjectsClient.find.mockImplementation(({}) => {
callCount++;
const firstInPage = (callCount - 1) * objPerPage + 1;
return Promise.resolve({
saved_objects:
callCount > 3
? [createObj(firstInPage)]
: [createObj(firstInPage), createObj(firstInPage + 1)],
total: objPerPage * 3,
per_page: objPerPage,
page: callCount!,
});
});
const results = await findAll(savedObjectsClient, query);
expect(savedObjectsClient.find).toHaveBeenCalledTimes(4);
expect(results).toEqual(times(7, (num) => createObj(num + 1)));
});
});

View file

@ -1,25 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import {
SavedObjectsClientContract,
SavedObject,
SavedObjectsCreatePointInTimeFinderOptions,
} from '@kbn/core/server';
export const findAll = async (
client: SavedObjectsClientContract,
findOptions: SavedObjectsCreatePointInTimeFinderOptions
): Promise<SavedObject[]> => {
const finder = client.createPointInTimeFinder(findOptions);
const results: SavedObject[] = [];
for await (const result of finder.find()) {
results.push(...result.saved_objects);
}
return results;
};

View file

@ -8,5 +8,4 @@
export { toSavedObjectWithMeta } from './to_saved_object_with_meta';
export { injectMetaAttributes } from './inject_meta_attributes';
export { findAll } from './find_all';
export { findRelationships } from './find_relationships';

View file

@ -10,7 +10,6 @@ import { schema } from '@kbn/config-schema';
import type { IRouter, SavedObjectsCreatePointInTimeFinderOptions } from '@kbn/core/server';
import { chain } from 'lodash';
import type { v1 } from '../../common';
import { findAll } from '../lib';
export const registerScrollForCountRoute = (router: IRouter) => {
router.post(
@ -45,7 +44,7 @@ export const registerScrollForCountRoute = (router: IRouter) => {
const client = getClient({ includedHiddenTypes });
const findOptions: SavedObjectsCreatePointInTimeFinderOptions = {
type: typesToInclude,
perPage: 1000,
perPage: 500,
};
if (searchString) {
findOptions.search = `${searchString}*`;
@ -56,18 +55,15 @@ export const registerScrollForCountRoute = (router: IRouter) => {
findOptions.hasReferenceOperator = 'OR';
}
const objects = await findAll(client, findOptions);
const counts = objects.reduce((accum, result) => {
const type = result.type;
accum[type] = accum[type] || 0;
accum[type]++;
return accum;
}, {} as Record<string, number>);
const counts: Record<string, number> = {};
for (const type of typesToInclude) {
if (!counts[type]) {
counts[type] = 0;
counts[type] = 0;
}
const finder = client.createPointInTimeFinder(findOptions);
for await (const { saved_objects: savedObjects } of finder.find()) {
for (const { type } of savedObjects) {
counts[type]++;
}
}