feat(slo): cleanup temp summary documents task (#210264)

This commit is contained in:
Kevin Delemme 2025-02-20 11:35:24 -05:00 committed by GitHub
parent 8c90076d80
commit 875a42cf99
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 624 additions and 77 deletions

View file

@ -6,14 +6,14 @@
*/
import * as t from 'io-ts';
const getOverviewParamsSchema = t.partial({
const getSLOStatsOverviewParamsSchema = t.partial({
query: t.partial({
kqlQuery: t.string,
filters: t.string,
}),
});
const getOverviewResponseSchema = t.type({
const getSLOStatsOverviewResponseSchema = t.type({
violated: t.number,
degrading: t.number,
stale: t.number,
@ -24,8 +24,8 @@ const getOverviewResponseSchema = t.type({
burnRateRecoveredAlerts: t.number,
});
type GetOverviewParams = t.TypeOf<typeof getOverviewParamsSchema.props.query>;
type GetOverviewResponse = t.OutputOf<typeof getOverviewResponseSchema>;
type GetSLOStatsOverviewParams = t.TypeOf<typeof getSLOStatsOverviewParamsSchema.props.query>;
type GetSLOStatsOverviewResponse = t.OutputOf<typeof getSLOStatsOverviewResponseSchema>;
export { getOverviewParamsSchema, getOverviewResponseSchema };
export type { GetOverviewParams, GetOverviewResponse };
export { getSLOStatsOverviewParamsSchema, getSLOStatsOverviewResponseSchema };
export type { GetSLOStatsOverviewParams, GetSLOStatsOverviewResponse };

View file

@ -14,6 +14,7 @@ export * from './find_definition';
export * from './get';
export * from './get_burn_rates';
export * from './get_slo_groupings';
export * from './get_slo_stats_overview';
export * from './get_preview_data';
export * from './reset';
export * from './manage';

View file

@ -9,6 +9,7 @@ import { schema, TypeOf } from '@kbn/config-schema';
export const configSchema = schema.object({
sloOrphanSummaryCleanUpTaskEnabled: schema.boolean({ defaultValue: true }),
tempSummaryCleanupTaskEnabled: schema.boolean({ defaultValue: true }),
enabled: schema.boolean({ defaultValue: true }),
experimental: schema.maybe(
schema.object({

View file

@ -5,18 +5,18 @@
* 2.0.
*/
import { getListOfSloSummaryIndices } from './summary_indices';
import { getSLOSummaryIndices } from './get_slo_summary_indices';
import { DEFAULT_STALE_SLO_THRESHOLD_HOURS, SUMMARY_DESTINATION_INDEX_PATTERN } from './constants';
describe('getListOfSloSummaryIndices', () => {
it('should return default index if disabled', function () {
describe('getSLOSummaryIndices', () => {
it('should return default local index if disabled', function () {
const settings = {
useAllRemoteClusters: false,
selectedRemoteClusters: [],
staleThresholdInHours: DEFAULT_STALE_SLO_THRESHOLD_HOURS,
};
const result = getListOfSloSummaryIndices(settings, []);
expect(result).toBe(SUMMARY_DESTINATION_INDEX_PATTERN);
const result = getSLOSummaryIndices(settings, []);
expect(result).toStrictEqual([SUMMARY_DESTINATION_INDEX_PATTERN]);
});
it('should return all remote clusters when enabled', function () {
@ -29,10 +29,12 @@ describe('getListOfSloSummaryIndices', () => {
{ name: 'cluster1', isConnected: true },
{ name: 'cluster2', isConnected: true },
];
const result = getListOfSloSummaryIndices(settings, clustersByName);
expect(result).toBe(
`${SUMMARY_DESTINATION_INDEX_PATTERN},cluster1:${SUMMARY_DESTINATION_INDEX_PATTERN},cluster2:${SUMMARY_DESTINATION_INDEX_PATTERN}`
);
const result = getSLOSummaryIndices(settings, clustersByName);
expect(result).toStrictEqual([
SUMMARY_DESTINATION_INDEX_PATTERN,
`cluster1:${SUMMARY_DESTINATION_INDEX_PATTERN}`,
`cluster2:${SUMMARY_DESTINATION_INDEX_PATTERN}`,
]);
});
it('should return selected when enabled', function () {
@ -45,9 +47,10 @@ describe('getListOfSloSummaryIndices', () => {
{ name: 'cluster1', isConnected: true },
{ name: 'cluster2', isConnected: true },
];
const result = getListOfSloSummaryIndices(settings, clustersByName);
expect(result).toBe(
`${SUMMARY_DESTINATION_INDEX_PATTERN},cluster1:${SUMMARY_DESTINATION_INDEX_PATTERN}`
);
const result = getSLOSummaryIndices(settings, clustersByName);
expect(result).toStrictEqual([
SUMMARY_DESTINATION_INDEX_PATTERN,
`cluster1:${SUMMARY_DESTINATION_INDEX_PATTERN}`,
]);
});
});

View file

@ -8,21 +8,22 @@
import { GetSLOSettingsResponse } from '@kbn/slo-schema';
import { SUMMARY_DESTINATION_INDEX_PATTERN } from './constants';
export const getListOfSloSummaryIndices = (
export const getSLOSummaryIndices = (
settings: GetSLOSettingsResponse,
clustersByName: Array<{ name: string; isConnected: boolean }>
) => {
remoteClusters: Array<{ name: string; isConnected: boolean }>
): string[] => {
const { useAllRemoteClusters, selectedRemoteClusters } = settings;
if (!useAllRemoteClusters && selectedRemoteClusters.length === 0) {
return SUMMARY_DESTINATION_INDEX_PATTERN;
return [SUMMARY_DESTINATION_INDEX_PATTERN];
}
const indices: string[] = [SUMMARY_DESTINATION_INDEX_PATTERN];
clustersByName.forEach(({ name, isConnected }) => {
if (isConnected && (useAllRemoteClusters || selectedRemoteClusters.includes(name))) {
indices.push(`${name}:${SUMMARY_DESTINATION_INDEX_PATTERN}`);
}
});
return indices.join(',');
return remoteClusters.reduce(
(acc, { name, isConnected }) => {
if (isConnected && (useAllRemoteClusters || selectedRemoteClusters.includes(name))) {
acc.push(`${name}:${SUMMARY_DESTINATION_INDEX_PATTERN}`);
}
return acc;
},
[SUMMARY_DESTINATION_INDEX_PATTERN]
);
};

View file

@ -8,7 +8,7 @@
import { EuiFlexGroup, EuiFlexItem, EuiSpacer, EuiText, EuiTitle, EuiPanel } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import React from 'react';
import { GetOverviewResponse } from '@kbn/slo-schema/src/rest_specs/routes/get_overview';
import { GetSLOStatsOverviewResponse } from '@kbn/slo-schema/src/rest_specs/routes/get_slo_stats_overview';
import { rulesLocatorID, RulesParams } from '@kbn/observability-plugin/public';
import { useAlertsUrl } from '../../../../hooks/use_alerts_url';
import { useKibana } from '../../../../hooks/use_kibana';
@ -18,7 +18,7 @@ export function SLOOverviewAlerts({
data,
isLoading,
}: {
data?: GetOverviewResponse;
data?: GetSLOStatsOverviewResponse;
isLoading: boolean;
}) {
const {

View file

@ -6,7 +6,7 @@
*/
import { buildQueryFromFilters, Filter } from '@kbn/es-query';
import { i18n } from '@kbn/i18n';
import { GetOverviewResponse } from '@kbn/slo-schema/src/rest_specs/routes/get_overview';
import { GetSLOStatsOverviewResponse } from '@kbn/slo-schema/src/rest_specs/routes/get_slo_stats_overview';
import { useQuery } from '@tanstack/react-query';
import { useMemo } from 'react';
import { SUMMARY_DESTINATION_INDEX_PATTERN } from '../../../../common/constants';
@ -29,7 +29,7 @@ interface UseFetchSloGroupsResponse {
isRefetching: boolean;
isSuccess: boolean;
isError: boolean;
data: GetOverviewResponse | undefined;
data: GetSLOStatsOverviewResponse | undefined;
}
export function useFetchSLOsOverview({

View file

@ -7,7 +7,7 @@
import { useFetcher } from '@kbn/observability-shared-plugin/public';
import { useEffect, useState } from 'react';
import { getListOfSloSummaryIndices } from '../../../../common/summary_indices';
import { getSLOSummaryIndices } from '../../../../common/get_slo_summary_indices';
import { useCreateDataView } from '../../../hooks/use_create_data_view';
import { useKibana } from '../../../hooks/use_kibana';
import { useGetSettings } from '../../slo_settings/hooks/use_get_settings';
@ -23,8 +23,8 @@ export const useSloSummaryDataView = () => {
useEffect(() => {
if (settings && remoteClusters) {
const summaryIndices = getListOfSloSummaryIndices(settings, remoteClusters);
setIndexPattern(summaryIndices);
const summaryIndices = getSLOSummaryIndices(settings, remoteClusters);
setIndexPattern(summaryIndices.join(','));
}
}, [settings, remoteClusters]);

View file

@ -9,8 +9,8 @@ import type {
KibanaRequest,
SavedObjectsClientContract,
} from '@kbn/core/server';
import { castArray, once } from 'lodash';
import { getListOfSummaryIndices, getSloSettings } from '../services/slo_settings';
import { once } from 'lodash';
import { getSloSettings, getSummaryIndices } from '../services/slo_settings';
export interface SloClient {
getSummaryIndices(): Promise<string[]>;
@ -24,17 +24,17 @@ export function getSloClientWithRequest({
esClient: ElasticsearchClient;
soClient: SavedObjectsClientContract;
}): SloClient {
const getListOfSummaryIndicesOnce = once(async () => {
const getSummaryIndicesOnce = once(async () => {
const settings = await getSloSettings(soClient);
const { indices } = await getListOfSummaryIndices(esClient, settings);
const { indices } = await getSummaryIndices(esClient, settings);
return castArray(indices);
return indices;
});
return {
getSummaryIndices: async () => {
return await getListOfSummaryIndicesOnce();
return await getSummaryIndicesOnce();
},
};
}

View file

@ -5,6 +5,7 @@
* 2.0.
*/
import { ALERTING_FEATURE_ID } from '@kbn/alerting-plugin/common';
import {
CoreSetup,
CoreStart,
@ -15,12 +16,12 @@ import {
PluginInitializerContext,
SavedObjectsClient,
} from '@kbn/core/server';
import { AlertsLocatorDefinition, sloFeatureId } from '@kbn/observability-plugin/common';
import { SLO_BURN_RATE_RULE_TYPE_ID } from '@kbn/rule-data-utils';
import { ALERTING_FEATURE_ID } from '@kbn/alerting-plugin/common';
import { KibanaFeatureScope } from '@kbn/features-plugin/common';
import { i18n } from '@kbn/i18n';
import { AlertsLocatorDefinition, sloFeatureId } from '@kbn/observability-plugin/common';
import { SLO_BURN_RATE_RULE_TYPE_ID } from '@kbn/rule-data-utils';
import { mapValues } from 'lodash';
import { getSloClientWithRequest } from './client';
import { registerSloUsageCollector } from './lib/collectors/register';
import { registerBurnRateRule } from './lib/rules/register_burn_rate_rule';
import { getSloServerRouteRepository } from './routes/get_slo_server_route_repository';
@ -30,6 +31,7 @@ import { SO_SLO_TYPE, slo } from './saved_objects';
import { SO_SLO_SETTINGS_TYPE, sloSettings } from './saved_objects/slo_settings';
import { DefaultResourceInstaller } from './services';
import { SloOrphanSummaryCleanupTask } from './services/tasks/orphan_summary_cleanup_task';
import { TempSummaryCleanupTask } from './services/tasks/temp_summary_cleanup_task';
import type {
SLOConfig,
SLOPluginSetupDependencies,
@ -37,7 +39,6 @@ import type {
SLOServerSetup,
SLOServerStart,
} from './types';
import { getSloClientWithRequest } from './client';
const sloRuleTypes = [SLO_BURN_RATE_RULE_TYPE_ID];
@ -50,6 +51,7 @@ export class SLOPlugin
private readonly isServerless: boolean;
private readonly isDev: boolean;
private sloOrphanCleanupTask?: SloOrphanSummaryCleanupTask;
private tempSummaryCleanupTask?: TempSummaryCleanupTask;
constructor(private readonly initContext: PluginInitializerContext) {
this.logger = this.initContext.logger.get();
@ -172,6 +174,13 @@ export class SLOPlugin
this.config
);
this.tempSummaryCleanupTask = new TempSummaryCleanupTask({
core,
taskManager: plugins.taskManager,
logFactory: this.initContext.logger,
config: this.config,
});
return {};
}
@ -183,6 +192,8 @@ export class SLOPlugin
?.start(plugins.taskManager, internalSoClient, internalEsClient)
.catch(() => {});
this.tempSummaryCleanupTask?.start(plugins).catch(() => {});
return {
getSloClientWithRequest: (request: KibanaRequest) => {
return getSloClientWithRequest({

View file

@ -5,13 +5,13 @@
* 2.0.
*/
import { getOverviewParamsSchema } from '@kbn/slo-schema/src/rest_specs/routes/get_overview';
import { GetSLOsOverview } from '../../services/get_slos_overview';
import { getSLOStatsOverviewParamsSchema } from '@kbn/slo-schema';
import { GetSLOStatsOverview } from '../../services/get_slo_stats_overview';
import { createSloServerRoute } from '../create_slo_server_route';
import { assertPlatinumLicense } from './utils/assert_platinum_license';
import { getSpaceId } from './utils/get_space_id';
export const getSLOsOverview = createSloServerRoute({
export const getSLOStatsOverview = createSloServerRoute({
endpoint: 'GET /internal/observability/slos/overview',
options: { access: 'internal' },
security: {
@ -19,7 +19,7 @@ export const getSLOsOverview = createSloServerRoute({
requiredPrivileges: ['slo_read'],
},
},
params: getOverviewParamsSchema,
params: getSLOStatsOverviewParamsSchema,
handler: async ({ context, params, request, logger, plugins }) => {
await assertPlatinumLicense(plugins);
@ -34,7 +34,7 @@ export const getSLOsOverview = createSloServerRoute({
const alerting = await plugins.alerting.start();
const rulesClient = await alerting.getRulesClientWithRequest(request);
const slosOverview = new GetSLOsOverview(
const slosOverview = new GetSLOStatsOverview(
soClient,
esClient,
spaceId,

View file

@ -26,7 +26,7 @@ import { getSloBurnRates } from './get_slo_burn_rates';
import { getSLOSuggestionsRoute } from './get_suggestions';
import { putSloSettings } from './put_slo_settings';
import { resetSLORoute } from './reset_slo';
import { getSLOsOverview } from './get_slos_overview';
import { getSLOStatsOverview } from './get_slo_stats_overview';
export const getSloRouteRepository = (isServerless?: boolean) => {
return {
@ -51,6 +51,6 @@ export const getSloRouteRepository = (isServerless?: boolean) => {
...resetSLORoute,
...findSLOGroupsRoute,
...getSLOSuggestionsRoute,
...getSLOsOverview,
...getSLOStatsOverview,
};
};

View file

@ -12,7 +12,7 @@ import {
Pagination,
sloGroupWithSummaryResponseSchema,
} from '@kbn/slo-schema';
import { getListOfSummaryIndices, getSloSettings } from './slo_settings';
import { getSummaryIndices, getSloSettings } from './slo_settings';
import { DEFAULT_SLO_GROUPS_PAGE_SIZE } from '../../common/constants';
import { IllegalArgumentError } from '../errors';
import { typedSearch } from '../utils/queries';
@ -53,7 +53,7 @@ export class FindSLOGroups {
const parsedFilters = parseStringFilters(filters, this.logger);
const settings = await getSloSettings(this.soClient);
const { indices } = await getListOfSummaryIndices(this.esClient, settings);
const { indices } = await getSummaryIndices(this.esClient, settings);
const hasSelectedTags = groupBy === 'slo.tags' && groupsFilter.length > 0;

View file

@ -5,22 +5,19 @@
* 2.0.
*/
import { SavedObjectsClientContract } from '@kbn/core-saved-objects-api-server';
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { Logger } from '@kbn/logging';
import {
GetOverviewParams,
GetOverviewResponse,
} from '@kbn/slo-schema/src/rest_specs/routes/get_overview';
import { RulesClientApi } from '@kbn/alerting-plugin/server/types';
import { AlertsClient } from '@kbn/rule-registry-plugin/server';
import moment from 'moment';
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { SavedObjectsClientContract } from '@kbn/core-saved-objects-api-server';
import { Logger } from '@kbn/logging';
import { AlertConsumers, SLO_RULE_TYPE_IDS } from '@kbn/rule-data-utils';
import { AlertsClient } from '@kbn/rule-registry-plugin/server';
import { GetSLOStatsOverviewParams, GetSLOStatsOverviewResponse } from '@kbn/slo-schema';
import moment from 'moment';
import { typedSearch } from '../utils/queries';
import { getSummaryIndices, getSloSettings } from './slo_settings';
import { getElasticsearchQueryOrThrow, parseStringFilters } from './transform_generators';
import { getListOfSummaryIndices, getSloSettings } from './slo_settings';
export class GetSLOsOverview {
export class GetSLOStatsOverview {
constructor(
private soClient: SavedObjectsClientContract,
private esClient: ElasticsearchClient,
@ -30,9 +27,9 @@ export class GetSLOsOverview {
private racClient: AlertsClient
) {}
public async execute(params: GetOverviewParams = {}): Promise<GetOverviewResponse> {
public async execute(params: GetSLOStatsOverviewParams): Promise<GetSLOStatsOverviewResponse> {
const settings = await getSloSettings(this.soClient);
const { indices } = await getListOfSummaryIndices(this.esClient, settings);
const { indices } = await getSummaryIndices(this.esClient, settings);
const kqlQuery = params.kqlQuery ?? '';
const filters = params.filters ?? '';

View file

@ -0,0 +1,100 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`CleanUpTempSummary deletes the duplicated temp documents 1`] = `
[MockFunction] {
"calls": Array [
Array [
Object {
"body": Object {
"query": Object {
"bool": Object {
"minimum_should_match": 1,
"should": Array [
Object {
"bool": Object {
"must": Array [
Object {
"term": Object {
"isTempDoc": true,
},
},
Object {
"term": Object {
"slo.id": "slo-id-one",
},
},
Object {
"term": Object {
"spaceId": "space-one",
},
},
],
},
},
],
},
},
},
"conflicts": "proceed",
"index": ".slo-observability.summary-v3.4.temp",
"slices": "auto",
"wait_for_completion": false,
},
Object {
"signal": AbortSignal {},
},
],
Array [
Object {
"body": Object {
"query": Object {
"bool": Object {
"minimum_should_match": 1,
"should": Array [
Object {
"bool": Object {
"must": Array [
Object {
"term": Object {
"isTempDoc": true,
},
},
Object {
"term": Object {
"slo.id": "another-temp-id",
},
},
Object {
"term": Object {
"spaceId": "space-two",
},
},
],
},
},
],
},
},
},
"conflicts": "proceed",
"index": ".slo-observability.summary-v3.4.temp",
"slices": "auto",
"wait_for_completion": false,
},
Object {
"signal": AbortSignal {},
},
],
],
"results": Array [
Object {
"type": "return",
"value": Promise {},
},
Object {
"type": "return",
"value": Promise {},
},
],
}
`;

View file

@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
ElasticsearchClientMock,
elasticsearchServiceMock,
loggingSystemMock,
} from '@kbn/core/server/mocks';
import { CleanUpTempSummary } from './clean_up_temp_summary';
const commonEsResponse = {
took: 100,
timed_out: false,
_shards: {
total: 1,
successful: 1,
skipped: 0,
failed: 0,
},
hits: {
hits: [],
},
};
describe('CleanUpTempSummary', () => {
let esClientMock: ElasticsearchClientMock;
let service: CleanUpTempSummary;
beforeEach(() => {
jest.useFakeTimers().setSystemTime(new Date('2025-02-10T15:00:00.000Z'));
esClientMock = elasticsearchServiceMock.createElasticsearchClient();
service = new CleanUpTempSummary(
esClientMock,
loggingSystemMock.createLogger(),
new AbortController()
);
});
afterAll(() => {
jest.useRealTimers();
});
it('returns early if there is no temporary documents', async () => {
esClientMock.count.mockResolvedValueOnce({
count: 0,
_shards: {
total: 1,
successful: 1,
skipped: 0,
failed: 0,
},
});
await service.execute();
expect(esClientMock.search).not.toHaveBeenCalled();
expect(esClientMock.deleteByQuery).not.toHaveBeenCalled();
});
it("deletes nothing when there isn't a duplicate temporary documents", async () => {
esClientMock.search.mockResolvedValueOnce({
...commonEsResponse,
aggregations: { duplicate_ids: { buckets: [] } },
});
await service.execute();
expect(esClientMock.deleteByQuery).not.toHaveBeenCalled();
expect(esClientMock.search).toHaveBeenCalledTimes(1);
});
it('deletes the duplicated temp documents', async () => {
// first search returns 1 duplicate and an indication for a next search
esClientMock.search.mockResolvedValueOnce({
...commonEsResponse,
aggregations: {
duplicate_ids: {
after_key: {
spaceId: 'space-two',
id: 'last-id',
},
buckets: [
{
key: {
spaceId: 'space-one',
id: 'slo-id-one',
},
},
],
},
},
});
// second search returns 1 duplicate and an indication for a next search
esClientMock.search.mockResolvedValueOnce({
...commonEsResponse,
aggregations: {
duplicate_ids: {
after_key: {
spaceId: 'space-three',
id: 'last-id-2',
},
buckets: [
{
key: {
spaceId: 'space-two',
id: 'another-temp-id',
},
},
],
},
},
});
// third search returns no duplicate and no more next page
esClientMock.search.mockResolvedValueOnce({
...commonEsResponse,
aggregations: {
duplicate_ids: {
buckets: [],
},
},
});
await service.execute();
expect(esClientMock.deleteByQuery).toHaveBeenCalledTimes(2);
expect(esClientMock.deleteByQuery).toMatchSnapshot();
expect(esClientMock.search).toHaveBeenCalledTimes(3);
});
});

View file

@ -0,0 +1,165 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ElasticsearchClient, Logger } from '@kbn/core/server';
import {
SUMMARY_DESTINATION_INDEX_PATTERN,
SUMMARY_TEMP_INDEX_NAME,
} from '../../../common/constants';
interface AggBucketKey {
spaceId: string;
id: string;
}
interface AggBucket {
key: AggBucketKey;
doc_count: number;
}
export interface AggResults {
duplicate_ids: {
after_key: AggBucketKey | undefined;
buckets: AggBucket[];
};
}
export class CleanUpTempSummary {
constructor(
private readonly esClient: ElasticsearchClient,
private readonly logger: Logger,
private readonly abortController: AbortController
) {}
public async execute(): Promise<void> {
const openCircuitBreaker = await this.shouldOpenCircuitBreaker();
if (openCircuitBreaker) {
this.logger.debug('No temporary documents found, skipping.');
return;
}
let searchAfterKey: AggBucketKey | undefined;
do {
const { buckets, nextSearchAfterKey } = await this.findDuplicateTemporaryDocuments(
searchAfterKey
);
searchAfterKey = nextSearchAfterKey;
if (buckets.length > 0) {
await this.deleteDuplicateTemporaryDocuments(buckets);
}
} while (searchAfterKey);
}
private async shouldOpenCircuitBreaker() {
const results = await this.esClient.count(
{ index: SUMMARY_TEMP_INDEX_NAME, terminate_after: 1 },
{ signal: this.abortController.signal }
);
return results.count === 0;
}
private async findDuplicateTemporaryDocuments(searchAfterKey: AggBucketKey | undefined) {
this.logger.debug('Searching for duplicate temporary documents');
const results = await this.esClient.search<unknown, AggResults>(
{
index: SUMMARY_DESTINATION_INDEX_PATTERN,
size: 0,
aggs: {
duplicate_ids: {
composite: {
size: 10000,
after: searchAfterKey,
sources: [
{
spaceId: {
terms: {
field: 'spaceId',
},
},
},
{
id: {
terms: {
field: 'slo.id',
},
},
},
],
},
aggs: {
cardinality_istempdoc: {
cardinality: {
field: 'isTempDoc',
},
},
find_duplicates: {
bucket_selector: {
buckets_path: {
cardinality: 'cardinality_istempdoc',
},
script: 'params.cardinality == 2',
},
},
},
},
},
},
{ signal: this.abortController.signal }
);
const buckets = (results.aggregations?.duplicate_ids.buckets ?? []).map((bucket) => bucket.key);
const nextSearchAfterKey = results.aggregations?.duplicate_ids.after_key;
this.logger.debug(`Found ${buckets.length} duplicate temporary documents`);
return { buckets, nextSearchAfterKey };
}
private async deleteDuplicateTemporaryDocuments(buckets: AggBucketKey[]) {
this.logger.debug(`Deleting ${buckets.length} duplicate temporary documents`);
await this.esClient.deleteByQuery(
{
index: SUMMARY_TEMP_INDEX_NAME,
wait_for_completion: false,
slices: 'auto',
conflicts: 'proceed',
body: {
query: {
bool: {
should: buckets.map((bucket) => {
return {
bool: {
must: [
{
term: {
isTempDoc: true,
},
},
{
term: {
'slo.id': bucket.id,
},
},
{
term: {
spaceId: bucket.spaceId,
},
},
],
},
};
}),
minimum_should_match: 1,
},
},
},
},
{ signal: this.abortController.signal }
);
}
}

View file

@ -13,7 +13,7 @@ import {
DEFAULT_STALE_SLO_THRESHOLD_HOURS,
SUMMARY_DESTINATION_INDEX_PATTERN,
} from '../../common/constants';
import { getListOfSloSummaryIndices } from '../../common/summary_indices';
import { getSLOSummaryIndices } from '../../common/get_slo_summary_indices';
import { SLOSettings, StoredSLOSettings } from '../domain/models';
import { SO_SLO_SETTINGS_TYPE, sloSettingsObjectId } from '../saved_objects/slo_settings';
@ -56,13 +56,13 @@ export const storeSloSettings = async (
return sloSettingsSchema.encode(object.attributes);
};
export const getListOfSummaryIndices = async (
export const getSummaryIndices = async (
esClient: ElasticsearchClient,
settings: StoredSLOSettings
) => {
): Promise<{ indices: string[] }> => {
const { useAllRemoteClusters, selectedRemoteClusters } = settings;
if (!useAllRemoteClusters && selectedRemoteClusters.length === 0) {
return { indices: [SUMMARY_DESTINATION_INDEX_PATTERN], settings };
return { indices: [SUMMARY_DESTINATION_INDEX_PATTERN] };
}
const clustersByName = await esClient.cluster.remoteInfo();
@ -72,5 +72,5 @@ export const getListOfSummaryIndices = async (
isConnected: clustersByName[clusterName].connected,
}));
return { indices: getListOfSloSummaryIndices(settings, clusterInfo) };
return { indices: getSLOSummaryIndices(settings, clusterInfo) };
};

View file

@ -15,7 +15,7 @@ import { SUMMARY_DESTINATION_INDEX_PATTERN } from '../../../common/constants';
import { StoredSLOSettings } from '../../domain/models';
import { toHighPrecision } from '../../utils/number';
import { createEsParams, typedSearch } from '../../utils/queries';
import { getListOfSummaryIndices, getSloSettings } from '../slo_settings';
import { getSummaryIndices, getSloSettings } from '../slo_settings';
import { EsSummaryDocument } from '../summary_transform_generator/helpers/create_temp_summary';
import { getElasticsearchQueryOrThrow, parseStringFilters } from '../transform_generators';
import { fromRemoteSummaryDocumentToSloDefinition } from '../unsafe_federated/remote_summary_doc_to_slo';
@ -47,7 +47,7 @@ export class DefaultSummarySearchClient implements SummarySearchClient {
): Promise<Paginated<SummaryResult>> {
const parsedFilters = parseStringFilters(filters, this.logger);
const settings = await getSloSettings(this.soClient);
const { indices } = await getListOfSummaryIndices(this.esClient, settings);
const { indices } = await getSummaryIndices(this.esClient, settings);
const esParams = createEsParams({
index: indices,

View file

@ -0,0 +1,132 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { errors } from '@elastic/elasticsearch';
import { type CoreSetup, type Logger, type LoggerFactory } from '@kbn/core/server';
import { ConcreteTaskInstance, TaskManagerSetupContract } from '@kbn/task-manager-plugin/server';
import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task';
import { SLOConfig, SLOPluginStartDependencies } from '../../types';
import { CleanUpTempSummary } from '../management/clean_up_temp_summary';
export const TYPE = 'slo:temp-summary-cleanup-task';
export const VERSION = '1.0.0';
interface TaskSetupContract {
taskManager: TaskManagerSetupContract;
core: CoreSetup;
logFactory: LoggerFactory;
config: SLOConfig;
}
export class TempSummaryCleanupTask {
private abortController = new AbortController();
private logger: Logger;
private config: SLOConfig;
private wasStarted: boolean = false;
constructor(setupContract: TaskSetupContract) {
const { core, config, taskManager, logFactory } = setupContract;
this.logger = logFactory.get(this.taskId);
this.config = config;
this.logger.debug('Registering task with [2m] timeout');
taskManager.registerTaskDefinitions({
[TYPE]: {
title: 'SLO temp summary cleanup task',
timeout: '2m',
maxAttempts: 1,
createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => {
return {
run: async () => {
return this.runTask(taskInstance, core);
},
cancel: async () => {
this.abortController.abort('Timed out');
},
};
},
},
});
}
public async start(plugins: SLOPluginStartDependencies) {
const hasCorrectLicense = (await plugins.licensing.getLicense()).hasAtLeast('platinum');
if (!hasCorrectLicense) {
this.logger.debug('Platinum license is required');
return;
}
if (!plugins.taskManager) {
this.logger.error('Missing required service during start');
return;
}
if (!this.config.tempSummaryCleanupTaskEnabled) {
this.logger.debug('Unscheduling task');
return await plugins.taskManager.removeIfExists(this.taskId);
}
this.logger.debug('Scheduling task with [1h] interval');
this.wasStarted = true;
try {
await plugins.taskManager.ensureScheduled({
id: this.taskId,
taskType: TYPE,
scope: ['observability', 'slo'],
schedule: {
interval: '1h',
},
state: {},
params: {},
});
} catch (e) {
this.logger.error(`Error scheduling task, error: ${e}`);
}
}
private get taskId(): string {
return `${TYPE}:${VERSION}`;
}
public async runTask(taskInstance: ConcreteTaskInstance, core: CoreSetup) {
if (!this.wasStarted) {
this.logger.debug('runTask Aborted. Task not started yet');
return;
}
if (taskInstance.id !== this.taskId) {
this.logger.debug(
`Outdated task version: Got [${taskInstance.id}], current version is [${this.taskId}]`
);
return getDeleteTaskRunResult();
}
this.logger.debug(`runTask() started`);
const [coreStart] = await core.getStartServices();
const esClient = coreStart.elasticsearch.client.asInternalUser;
try {
const cleanUpTempSummary = new CleanUpTempSummary(
esClient,
this.logger,
this.abortController
);
await cleanUpTempSummary.execute();
} catch (err) {
if (err instanceof errors.RequestAbortedError) {
this.logger.warn(`Request aborted due to timeout: ${err}`);
return;
}
this.logger.error(`Error: ${err}`);
}
}
}

View file

@ -175,6 +175,7 @@ export default function ({ getService }: FtrProviderContext) {
'security:telemetry-prebuilt-rule-alerts',
'security:telemetry-timelines',
'session_cleanup',
'slo:temp-summary-cleanup-task',
'task_manager:delete_inactive_background_task_nodes',
'task_manager:mark_removed_tasks_as_unrecognized',
]);