mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
[Response Ops][Task Manager] Provide a way for recurring tasks to indicate that they should be deleted. (#184776)
Resolves https://github.com/elastic/kibana/issues/181145 ## Summary Adds an optional flag `shouldDeleteTask` to a successful task run result. If this flag is set to true, task manager will remove the task at the end of the processing cycle. This allows tasks to gracefully inform us that they need to be deleted without throwing an unrecoverable error (the current way that tasks tell us they want to be deleted). Audited existing usages of `throwUnrecoverableError`. Other than usages within the alerting and actions task runner, which are thrown for valid error states, all other usages were by tasks that were considered outdated and should be deleted. Updated all those usages to return the `shouldDeleteTask` run result. --------- Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
e06c42cfe6
commit
83c151278e
20 changed files with 252 additions and 32 deletions
|
@ -9,6 +9,7 @@ import { coreMock } from '@kbn/core/server/mocks';
|
|||
import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks';
|
||||
import type { TaskManagerSetupContract } from '@kbn/task-manager-plugin/server';
|
||||
import { TaskStatus } from '@kbn/task-manager-plugin/server';
|
||||
import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task';
|
||||
import type { CoreSetup } from '@kbn/core/server';
|
||||
import type { ElasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
|
||||
|
||||
|
@ -197,6 +198,18 @@ describe('fleet metrics task', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('should not run if task is outdated', async () => {
|
||||
const result = await runTask({ ...MOCK_TASK_INSTANCE, id: 'old-id' });
|
||||
|
||||
expect(esClient.index).not.toHaveBeenCalled();
|
||||
expect(esClient.bulk).not.toHaveBeenCalled();
|
||||
|
||||
expect(appContextService.getLogger().info).toHaveBeenCalledWith(
|
||||
'Outdated task version: Got [old-id] from task instance. Current version is [Fleet-Metrics-Task:1.1.1]'
|
||||
);
|
||||
expect(result).toEqual(getDeleteTaskRunResult());
|
||||
});
|
||||
|
||||
it('should log errors from bulk create', async () => {
|
||||
esClient.bulk.mockResolvedValue({
|
||||
errors: true,
|
||||
|
|
|
@ -4,12 +4,12 @@
|
|||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task';
|
||||
import type {
|
||||
ConcreteTaskInstance,
|
||||
TaskManagerStartContract,
|
||||
TaskManagerSetupContract,
|
||||
} from '@kbn/task-manager-plugin/server';
|
||||
import { throwUnrecoverableError } from '@kbn/task-manager-plugin/server';
|
||||
import type { ElasticsearchClient } from '@kbn/core/server';
|
||||
import { withSpan } from '@kbn/apm-utils';
|
||||
|
||||
|
@ -68,8 +68,12 @@ export class FleetMetricsTask {
|
|||
}
|
||||
// Check that this task is current
|
||||
if (taskInstance.id !== this.taskId) {
|
||||
throwUnrecoverableError(new Error('Outdated task version for task: ' + taskInstance.id));
|
||||
return;
|
||||
appContextService
|
||||
.getLogger()
|
||||
.info(
|
||||
`Outdated task version: Got [${taskInstance.id}] from task instance. Current version is [${this.taskId}]`
|
||||
);
|
||||
return getDeleteTaskRunResult();
|
||||
}
|
||||
if (!this.esClient) {
|
||||
appContextService.getLogger().debug('esClient not set, skipping Fleet metrics task');
|
||||
|
|
|
@ -4,12 +4,12 @@
|
|||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task';
|
||||
import type {
|
||||
ConcreteTaskInstance,
|
||||
TaskManagerStartContract,
|
||||
TaskManagerSetupContract,
|
||||
} from '@kbn/task-manager-plugin/server';
|
||||
import { throwUnrecoverableError } from '@kbn/task-manager-plugin/server';
|
||||
import type { CoreSetup } from '@kbn/core/server';
|
||||
import { withSpan } from '@kbn/apm-utils';
|
||||
|
||||
|
@ -70,8 +70,12 @@ export class FleetUsageSender {
|
|||
}
|
||||
// Check that this task is current
|
||||
if (taskInstance.id !== this.taskId) {
|
||||
throwUnrecoverableError(new Error('Outdated task version for task: ' + taskInstance.id));
|
||||
return;
|
||||
appContextService
|
||||
.getLogger()
|
||||
.info(
|
||||
`Outdated task version: Got [${taskInstance.id}] from task instance. Current version is [${this.taskId}]`
|
||||
);
|
||||
return getDeleteTaskRunResult();
|
||||
}
|
||||
appContextService.getLogger().info('Running Fleet Usage telemetry send task');
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ import { coreMock } from '@kbn/core/server/mocks';
|
|||
import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks';
|
||||
import type { TaskManagerSetupContract } from '@kbn/task-manager-plugin/server';
|
||||
import { TaskStatus } from '@kbn/task-manager-plugin/server';
|
||||
import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task';
|
||||
import type { CoreSetup } from '@kbn/core/server';
|
||||
import type { ElasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
|
||||
import { loggingSystemMock } from '@kbn/core/server/mocks';
|
||||
|
@ -206,5 +207,14 @@ describe('check deleted files task', () => {
|
|||
{ signal: abortController.signal }
|
||||
);
|
||||
});
|
||||
|
||||
it('should not run if task is outdated', async () => {
|
||||
const result = await runTask({ ...MOCK_TASK_INSTANCE, id: 'old-id' });
|
||||
|
||||
expect(esClient.search).not.toHaveBeenCalled();
|
||||
expect(esClient.updateByQuery).not.toHaveBeenCalled();
|
||||
|
||||
expect(result).toEqual(getDeleteTaskRunResult());
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -11,7 +11,7 @@ import type {
|
|||
TaskManagerSetupContract,
|
||||
TaskManagerStartContract,
|
||||
} from '@kbn/task-manager-plugin/server';
|
||||
import { throwUnrecoverableError } from '@kbn/task-manager-plugin/server';
|
||||
import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task';
|
||||
import type { LoggerFactory } from '@kbn/core/server';
|
||||
import { errors } from '@elastic/elasticsearch';
|
||||
|
||||
|
@ -102,7 +102,10 @@ export class CheckDeletedFilesTask {
|
|||
|
||||
// Check that this task is current
|
||||
if (taskInstance.id !== this.taskId) {
|
||||
throwUnrecoverableError(new Error('Outdated task version'));
|
||||
this.logger.info(
|
||||
`Outdated task version: Got [${taskInstance.id}] from task instance. Current version is [${this.taskId}]`
|
||||
);
|
||||
return getDeleteTaskRunResult();
|
||||
}
|
||||
|
||||
this.logger.info(`[runTask()] started`);
|
||||
|
|
|
@ -18,6 +18,7 @@ import { coreMock } from '@kbn/core/server/mocks';
|
|||
import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks';
|
||||
import type { TaskManagerSetupContract } from '@kbn/task-manager-plugin/server';
|
||||
import { TaskStatus } from '@kbn/task-manager-plugin/server';
|
||||
import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task';
|
||||
import type { CoreSetup } from '@kbn/core/server';
|
||||
import type { ElasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
|
||||
import { TRANSFORM_STATES } from '../../../../common/constants';
|
||||
|
@ -126,6 +127,16 @@ describe('check metadata transforms task', () => {
|
|||
},
|
||||
} as unknown as TransportResult<TransformGetTransformStatsResponse>);
|
||||
|
||||
it('should not run if task is outdated', async () => {
|
||||
const result = await runTask({ ...MOCK_TASK_INSTANCE, id: 'old-id' });
|
||||
|
||||
expect(esClient.transform.getTransformStats).not.toHaveBeenCalled();
|
||||
expect(esClient.transform.stopTransform).not.toHaveBeenCalled();
|
||||
expect(esClient.transform.startTransform).not.toHaveBeenCalled();
|
||||
|
||||
expect(result).toEqual(getDeleteTaskRunResult());
|
||||
});
|
||||
|
||||
describe('transforms restart', () => {
|
||||
it('should stop task if transform stats response fails', async () => {
|
||||
esClient.transform.getTransformStats.mockRejectedValue({});
|
||||
|
|
|
@ -16,7 +16,7 @@ import type {
|
|||
TaskManagerSetupContract,
|
||||
TaskManagerStartContract,
|
||||
} from '@kbn/task-manager-plugin/server';
|
||||
import { throwUnrecoverableError } from '@kbn/task-manager-plugin/server';
|
||||
import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task';
|
||||
import { ElasticsearchAssetType, FLEET_ENDPOINT_PACKAGE } from '@kbn/fleet-plugin/common';
|
||||
import type { EndpointAppContext } from '../../types';
|
||||
import { METADATA_TRANSFORMS_PATTERN } from '../../../../common/endpoint/constants';
|
||||
|
@ -105,7 +105,12 @@ export class CheckMetadataTransformsTask {
|
|||
// Check that this task is current
|
||||
if (taskInstance.id !== this.getTaskId()) {
|
||||
// old task, die
|
||||
throwUnrecoverableError(new Error('Outdated task version'));
|
||||
this.logger.info(
|
||||
`Outdated task version: Got [${
|
||||
taskInstance.id
|
||||
}] from task instance. Current version is [${this.getTaskId()}]`
|
||||
);
|
||||
return getDeleteTaskRunResult();
|
||||
}
|
||||
|
||||
const [{ elasticsearch }] = await core.getStartServices();
|
||||
|
|
|
@ -11,7 +11,6 @@ import type {
|
|||
} from '@kbn/task-manager-plugin/server';
|
||||
import type { Logger } from '@kbn/logging';
|
||||
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
||||
import { throwUnrecoverableError } from '@kbn/task-manager-plugin/server';
|
||||
import { EndpointError } from '../../../../common/endpoint/errors';
|
||||
import { CompleteExternalActionsTaskRunner } from './complete_external_actions_task_runner';
|
||||
import type { EndpointAppContext } from '../../types';
|
||||
|
@ -90,14 +89,6 @@ export class CompleteExternalResponseActionsTask {
|
|||
);
|
||||
}
|
||||
|
||||
if (taskInstance.id !== this.taskId) {
|
||||
throwUnrecoverableError(
|
||||
new EndpointError(
|
||||
`Outdated task version. Got [${taskInstance.id}] from task instance. Current version is [${this.taskId}]`
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
const { id: taskId, taskType } = taskInstance;
|
||||
|
||||
return new CompleteExternalActionsTaskRunner(
|
||||
|
|
|
@ -14,6 +14,11 @@ import { EndpointActionGenerator } from '../../../../common/endpoint/data_genera
|
|||
import { ENDPOINT_ACTION_RESPONSES_INDEX } from '../../../../common/endpoint/constants';
|
||||
import { waitFor } from '@testing-library/react';
|
||||
import { ResponseActionsConnectorNotConfiguredError } from '../../services/actions/clients/errors';
|
||||
import {
|
||||
COMPLETE_EXTERNAL_RESPONSE_ACTIONS_TASK_TYPE,
|
||||
COMPLETE_EXTERNAL_RESPONSE_ACTIONS_TASK_VERSION,
|
||||
} from './complete_external_actions_task';
|
||||
import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task';
|
||||
|
||||
describe('CompleteExternalTaskRunner class', () => {
|
||||
let endpointContextServicesMock: ReturnType<typeof createMockEndpointAppContextService>;
|
||||
|
@ -25,7 +30,9 @@ describe('CompleteExternalTaskRunner class', () => {
|
|||
esClientMock = elasticsearchServiceMock.createElasticsearchClient();
|
||||
runnerInstance = new CompleteExternalActionsTaskRunner(
|
||||
endpointContextServicesMock,
|
||||
esClientMock
|
||||
esClientMock,
|
||||
'60s',
|
||||
`${COMPLETE_EXTERNAL_RESPONSE_ACTIONS_TASK_TYPE}-${COMPLETE_EXTERNAL_RESPONSE_ACTIONS_TASK_VERSION}`
|
||||
);
|
||||
const actionGenerator = new EndpointActionGenerator('seed');
|
||||
|
||||
|
@ -53,6 +60,22 @@ describe('CompleteExternalTaskRunner class', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('should do nothing if task instance id is outdated', async () => {
|
||||
runnerInstance = new CompleteExternalActionsTaskRunner(
|
||||
endpointContextServicesMock,
|
||||
esClientMock,
|
||||
'60s',
|
||||
'old-id'
|
||||
);
|
||||
const result = await runnerInstance.run();
|
||||
|
||||
expect(result).toEqual(getDeleteTaskRunResult());
|
||||
|
||||
expect(endpointContextServicesMock.createLogger().info).toHaveBeenCalledWith(
|
||||
`Outdated task version. Got [old-id] from task instance. Current version is [endpoint:complete-external-response-actions-1.0.0]`
|
||||
);
|
||||
});
|
||||
|
||||
it('should NOT log an error if agentType is not configured with a connector', async () => {
|
||||
(endpointContextServicesMock.getInternalResponseActionsClient as jest.Mock).mockImplementation(
|
||||
() => {
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
*/
|
||||
|
||||
import type { CancellableTask, RunContext, RunResult } from '@kbn/task-manager-plugin/server/task';
|
||||
import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task';
|
||||
import type { Logger, ElasticsearchClient } from '@kbn/core/server';
|
||||
import type { BulkRequest } from '@elastic/elasticsearch/lib/api/types';
|
||||
import { ResponseActionsConnectorNotConfiguredError } from '../../services/actions/clients/errors';
|
||||
|
@ -17,6 +18,10 @@ import { QueueProcessor } from '../../utils/queue_processor';
|
|||
import type { LogsEndpointActionResponse } from '../../../../common/endpoint/types';
|
||||
import type { EndpointAppContextService } from '../../endpoint_app_context_services';
|
||||
import { ENDPOINT_ACTION_RESPONSES_INDEX } from '../../../../common/endpoint/constants';
|
||||
import {
|
||||
COMPLETE_EXTERNAL_RESPONSE_ACTIONS_TASK_TYPE,
|
||||
COMPLETE_EXTERNAL_RESPONSE_ACTIONS_TASK_VERSION,
|
||||
} from './complete_external_actions_task';
|
||||
|
||||
/**
|
||||
* A task manager runner responsible for checking the status of and completing pending actions
|
||||
|
@ -34,7 +39,7 @@ export class CompleteExternalActionsTaskRunner
|
|||
private readonly endpointContextServices: EndpointAppContextService,
|
||||
private readonly esClient: ElasticsearchClient,
|
||||
private readonly nextRunInterval: string = '60s',
|
||||
private readonly taskId?: string,
|
||||
private readonly taskInstanceId?: string,
|
||||
private readonly taskType?: string
|
||||
) {
|
||||
this.log = this.endpointContextServices.createLogger(
|
||||
|
@ -49,6 +54,10 @@ export class CompleteExternalActionsTaskRunner
|
|||
});
|
||||
}
|
||||
|
||||
private get taskId(): string {
|
||||
return `${COMPLETE_EXTERNAL_RESPONSE_ACTIONS_TASK_TYPE}-${COMPLETE_EXTERNAL_RESPONSE_ACTIONS_TASK_VERSION}`;
|
||||
}
|
||||
|
||||
private async queueBatchProcessor({
|
||||
batch,
|
||||
data,
|
||||
|
@ -94,6 +103,13 @@ export class CompleteExternalActionsTaskRunner
|
|||
}
|
||||
|
||||
public async run(): Promise<RunResult | void> {
|
||||
if (this.taskInstanceId !== this.taskId) {
|
||||
this.log.info(
|
||||
`Outdated task version. Got [${this.taskInstanceId}] from task instance. Current version is [${this.taskId}]`
|
||||
);
|
||||
return getDeleteTaskRunResult();
|
||||
}
|
||||
|
||||
this.log.debug(`Started: Checking status of external response actions`);
|
||||
this.abortController = new AbortController();
|
||||
|
||||
|
@ -118,7 +134,7 @@ export class CompleteExternalActionsTaskRunner
|
|||
this.endpointContextServices.getInternalResponseActionsClient({
|
||||
agentType,
|
||||
taskType: this.taskType,
|
||||
taskId: this.taskId,
|
||||
taskId: this.taskInstanceId,
|
||||
});
|
||||
|
||||
return agentTypeActionsClient
|
||||
|
|
|
@ -11,7 +11,7 @@ import type {
|
|||
TaskManagerSetupContract,
|
||||
TaskManagerStartContract,
|
||||
} from '@kbn/task-manager-plugin/server';
|
||||
import { throwUnrecoverableError } from '@kbn/task-manager-plugin/server';
|
||||
import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task';
|
||||
import type { Tier } from '../../types';
|
||||
import { ProductTier } from '../../../common/product';
|
||||
import { NLP_CLEANUP_TASK_EVENT } from '../../telemetry/event_based_telemetry';
|
||||
|
@ -78,9 +78,10 @@ export class NLPCleanupTask {
|
|||
return {
|
||||
run: async () => {
|
||||
if (this.productTier === ProductTier.complete) {
|
||||
throwUnrecoverableError(
|
||||
new Error('Task no longer needed for current productTier, disabling...')
|
||||
this.logger.info(
|
||||
`Task ${taskInstance.id} no longer needed for current productTier, disabling...`
|
||||
);
|
||||
return getDeleteTaskRunResult();
|
||||
}
|
||||
return this.runTask(taskInstance, core);
|
||||
},
|
||||
|
@ -134,7 +135,7 @@ export class NLPCleanupTask {
|
|||
// Check that this task is current
|
||||
if (taskInstance.id !== this.taskId) {
|
||||
// old task, return
|
||||
throwUnrecoverableError(new Error('Outdated task version'));
|
||||
return getDeleteTaskRunResult();
|
||||
}
|
||||
|
||||
const [{ elasticsearch }] = await core.getStartServices();
|
||||
|
|
|
@ -14,6 +14,7 @@ import type {
|
|||
} from '@kbn/task-manager-plugin/server';
|
||||
import type { CloudSetup } from '@kbn/cloud-plugin/server';
|
||||
import { TaskStatus } from '@kbn/task-manager-plugin/server';
|
||||
import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task';
|
||||
import { coreMock } from '@kbn/core/server/mocks';
|
||||
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
|
||||
import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks';
|
||||
|
@ -201,6 +202,15 @@ describe('SecurityUsageReportingTask', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('should do nothing if task instance id is outdated', async () => {
|
||||
const result = await runTask({ ...buildMockTaskInstance(), id: 'old-id' });
|
||||
|
||||
expect(result).toEqual(getDeleteTaskRunResult());
|
||||
|
||||
expect(reportUsageSpy).not.toHaveBeenCalled();
|
||||
expect(meteringCallbackMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
describe('lastSuccessfulReport', () => {
|
||||
it('should set lastSuccessfulReport correctly if report success', async () => {
|
||||
reportUsageSpy.mockResolvedValueOnce({ status: 201 });
|
||||
|
|
|
@ -8,8 +8,8 @@
|
|||
import type { Response } from 'node-fetch';
|
||||
import type { CoreSetup, Logger } from '@kbn/core/server';
|
||||
import type { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server';
|
||||
import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task';
|
||||
import type { CloudSetup } from '@kbn/cloud-plugin/server';
|
||||
import { throwUnrecoverableError } from '@kbn/task-manager-plugin/server';
|
||||
|
||||
import { usageReportingService } from '../common/services';
|
||||
import type {
|
||||
|
@ -114,7 +114,10 @@ export class SecurityUsageReportingTask {
|
|||
// Check that this task is current
|
||||
if (taskInstance.id !== this.taskId) {
|
||||
// old task, die
|
||||
throwUnrecoverableError(new Error('Outdated task version'));
|
||||
this.logger.info(
|
||||
`Outdated task version: Got [${taskInstance.id}] from task instance. Current version is [${this.taskId}]`
|
||||
);
|
||||
return getDeleteTaskRunResult();
|
||||
}
|
||||
|
||||
const [{ elasticsearch }] = await core.getStartServices();
|
||||
|
|
|
@ -56,6 +56,7 @@ export type SuccessfulRunResult = {
|
|||
state: Record<string, unknown>;
|
||||
taskRunError?: DecoratedError;
|
||||
shouldValidate?: boolean;
|
||||
shouldDeleteTask?: boolean;
|
||||
} & (
|
||||
| // ensure a SuccessfulRunResult can either specify a new `runAt` or a new `schedule`, but not both
|
||||
{
|
||||
|
@ -88,6 +89,11 @@ export type FailedRunResult = SuccessfulRunResult & {
|
|||
|
||||
export type RunResult = FailedRunResult | SuccessfulRunResult;
|
||||
|
||||
export const getDeleteTaskRunResult = () => ({
|
||||
state: {},
|
||||
shouldDeleteTask: true,
|
||||
});
|
||||
|
||||
export const isFailedRunResult = (result: unknown): result is FailedRunResult =>
|
||||
!!((result as FailedRunResult)?.error ?? false);
|
||||
|
||||
|
@ -205,6 +211,7 @@ export enum TaskStatus {
|
|||
Claiming = 'claiming',
|
||||
Running = 'running',
|
||||
Failed = 'failed',
|
||||
ShouldDelete = 'should_delete',
|
||||
Unrecognized = 'unrecognized',
|
||||
DeadLetter = 'dead_letter',
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ import {
|
|||
TaskPersistence,
|
||||
asTaskManagerStatEvent,
|
||||
} from '../task_events';
|
||||
import { ConcreteTaskInstance, TaskStatus } from '../task';
|
||||
import { ConcreteTaskInstance, getDeleteTaskRunResult, TaskStatus } from '../task';
|
||||
import { SavedObjectsErrorHelpers } from '@kbn/core/server';
|
||||
import moment from 'moment';
|
||||
import { TaskDefinitionRegistry, TaskTypeDictionary } from '../task_type_dictionary';
|
||||
|
@ -1140,6 +1140,58 @@ describe('TaskManagerRunner', () => {
|
|||
expect(onTaskEvent).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
test(`doesn't reschedule recurring tasks that return shouldDeleteTask = true`, async () => {
|
||||
const id = _.random(1, 20).toString();
|
||||
const onTaskEvent = jest.fn();
|
||||
const {
|
||||
runner,
|
||||
store,
|
||||
instance: originalInstance,
|
||||
} = await readyToRunStageSetup({
|
||||
onTaskEvent,
|
||||
instance: {
|
||||
id,
|
||||
schedule: { interval: '20m' },
|
||||
status: TaskStatus.Running,
|
||||
startedAt: new Date(),
|
||||
enabled: true,
|
||||
},
|
||||
definitions: {
|
||||
bar: {
|
||||
title: 'Bar!',
|
||||
createTaskRunner: () => ({
|
||||
async run() {
|
||||
return getDeleteTaskRunResult();
|
||||
},
|
||||
}),
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await runner.run();
|
||||
|
||||
expect(store.remove).toHaveBeenCalled();
|
||||
expect(store.update).not.toHaveBeenCalled();
|
||||
|
||||
expect(onTaskEvent).toHaveBeenCalledWith(
|
||||
withAnyTiming(
|
||||
asTaskRunEvent(
|
||||
id,
|
||||
asOk({
|
||||
persistence: TaskPersistence.Recurring,
|
||||
task: originalInstance,
|
||||
result: TaskRunResult.Deleted,
|
||||
isExpired: false,
|
||||
})
|
||||
)
|
||||
)
|
||||
);
|
||||
expect(onTaskEvent).toHaveBeenCalledWith(
|
||||
asTaskManagerStatEvent('runDelay', asOk(expect.any(Number)))
|
||||
);
|
||||
expect(onTaskEvent).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
test('tasks that return runAt override the schedule', async () => {
|
||||
const runAt = minutesFromNow(_.random(5));
|
||||
const { runner, store } = await readyToRunStageSetup({
|
||||
|
|
|
@ -119,6 +119,8 @@ export enum TaskRunResult {
|
|||
RetryScheduled = 'RetryScheduled',
|
||||
// Task has failed
|
||||
Failed = 'Failed',
|
||||
// Task deleted
|
||||
Deleted = 'Deleted',
|
||||
}
|
||||
|
||||
// A ConcreteTaskInstance which we *know* has a `startedAt` Date on it
|
||||
|
@ -620,7 +622,13 @@ export class TaskManagerRunner implements TaskRunner {
|
|||
schedule: reschedule,
|
||||
state,
|
||||
attempts = 0,
|
||||
shouldDeleteTask,
|
||||
}: SuccessfulRunResult & { attempts: number }) => {
|
||||
if (shouldDeleteTask) {
|
||||
// set the status to failed so task will get deleted
|
||||
return asOk({ status: TaskStatus.ShouldDelete });
|
||||
}
|
||||
|
||||
const { startedAt, schedule } = this.instance.task;
|
||||
|
||||
return asOk({
|
||||
|
@ -642,7 +650,10 @@ export class TaskManagerRunner implements TaskRunner {
|
|||
counterType: 'taskManagerTaskRunner',
|
||||
incrementBy: 1,
|
||||
});
|
||||
} else if (fieldUpdates.status === TaskStatus.Failed) {
|
||||
} else if (
|
||||
fieldUpdates.status === TaskStatus.Failed ||
|
||||
fieldUpdates.status === TaskStatus.ShouldDelete
|
||||
) {
|
||||
// Delete the SO instead so it doesn't remain in the index forever
|
||||
this.instance = asRan(this.instance.task);
|
||||
await this.removeTask();
|
||||
|
@ -667,6 +678,8 @@ export class TaskManagerRunner implements TaskRunner {
|
|||
|
||||
return fieldUpdates.status === TaskStatus.Failed
|
||||
? TaskRunResult.Failed
|
||||
: fieldUpdates.status === TaskStatus.ShouldDelete
|
||||
? TaskRunResult.Deleted
|
||||
: hasTaskRunFailed
|
||||
? TaskRunResult.SuccessRescheduled
|
||||
: TaskRunResult.RetryScheduled;
|
||||
|
|
|
@ -910,7 +910,7 @@ describe('TaskStore', () => {
|
|||
|
||||
describe('getLifecycle', () => {
|
||||
test('returns the task status if the task exists ', async () => {
|
||||
expect.assertions(6);
|
||||
expect.assertions(7);
|
||||
return Promise.all(
|
||||
Object.values(TaskStatus).map(async (status) => {
|
||||
const task = {
|
||||
|
|
|
@ -18,7 +18,7 @@ import {
|
|||
EphemeralTask,
|
||||
} from '@kbn/task-manager-plugin/server';
|
||||
import { DEFAULT_MAX_WORKERS } from '@kbn/task-manager-plugin/server/config';
|
||||
import { TaskPriority } from '@kbn/task-manager-plugin/server/task';
|
||||
import { getDeleteTaskRunResult, TaskPriority } from '@kbn/task-manager-plugin/server/task';
|
||||
import { initRoutes } from './init_routes';
|
||||
|
||||
// this plugin's dependendencies
|
||||
|
@ -167,6 +167,45 @@ export class SampleTaskManagerFixturePlugin
|
|||
},
|
||||
}),
|
||||
},
|
||||
sampleRecurringTaskThatDeletesItself: {
|
||||
title: 'Sample Recurring Task that Times Out',
|
||||
description: 'A sample task that requests deletion.',
|
||||
stateSchemaByVersion: {
|
||||
1: {
|
||||
up: (state: Record<string, unknown>) => ({ count: state.count }),
|
||||
schema: schema.object({
|
||||
count: schema.maybe(schema.number()),
|
||||
}),
|
||||
},
|
||||
},
|
||||
createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => ({
|
||||
async run() {
|
||||
const { state } = taskInstance;
|
||||
const prevState = state || { count: 0 };
|
||||
|
||||
const count = (prevState.count || 0) + 1;
|
||||
|
||||
const [{ elasticsearch }] = await core.getStartServices();
|
||||
await elasticsearch.client.asInternalUser.index({
|
||||
index: '.kibana_task_manager_test_result',
|
||||
body: {
|
||||
type: 'task',
|
||||
taskId: taskInstance.id,
|
||||
state: JSON.stringify(state),
|
||||
ranAt: new Date(),
|
||||
},
|
||||
refresh: true,
|
||||
});
|
||||
|
||||
if (count === 5) {
|
||||
return getDeleteTaskRunResult();
|
||||
}
|
||||
return {
|
||||
state: { count },
|
||||
};
|
||||
},
|
||||
}),
|
||||
},
|
||||
sampleAdHocTaskTimingOut: {
|
||||
title: 'Sample Ad-Hoc Task that Times Out',
|
||||
description: 'A sample task that times out.',
|
||||
|
|
|
@ -25,6 +25,7 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
'sampleOneTimeTaskThrowingError',
|
||||
'sampleRecurringTaskTimingOut',
|
||||
'sampleRecurringTaskWhichHangs',
|
||||
'sampleRecurringTaskThatDeletesItself',
|
||||
'sampleTask',
|
||||
'sampleTaskWithLimitedConcurrency',
|
||||
'sampleTaskWithSingleConcurrency',
|
||||
|
|
|
@ -296,6 +296,20 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
});
|
||||
});
|
||||
|
||||
it('should remove recurring task if task requests deletion', async () => {
|
||||
await scheduleTask({
|
||||
taskType: 'sampleRecurringTaskThatDeletesItself',
|
||||
schedule: { interval: '1s' },
|
||||
params: {},
|
||||
});
|
||||
|
||||
await retry.try(async () => {
|
||||
const history = await historyDocs();
|
||||
expect(history.length).to.eql(5);
|
||||
expect((await currentTasks()).docs).to.eql([]);
|
||||
});
|
||||
});
|
||||
|
||||
it('should use a given ID as the task document ID', async () => {
|
||||
const result = await scheduleTask({
|
||||
id: 'test-task-for-sample-task-plugin-to-test-task-manager',
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue