[Response Ops][Task Manager] Integration test for switching between task claim strategies (#186419)

Resolves https://github.com/elastic/kibana/issues/184941

## Summary

Adds integration test to verify that restarting Kibana with a different
task claim strategy does not break anything and tasks are claimed as
expected.

---------

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2024-06-21 09:29:13 -04:00 committed by GitHub
parent e9a224d5b0
commit 6faadda1eb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 416 additions and 19 deletions

View file

@ -8,6 +8,35 @@
import deepmerge from 'deepmerge';
import { createTestServers, createRootWithCorePlugins } from '@kbn/core-test-helpers-kbn-server';
function createRoot(settings = {}) {
return createRootWithCorePlugins(
deepmerge(
{
logging: {
root: {
level: 'warn',
},
loggers: [
{
name: 'plugins.taskManager',
level: 'all',
},
{
name: 'plugins.taskManager.metrics-debugger',
level: 'warn',
},
{
name: 'plugins.taskManager.metrics-subscribe-debugger',
level: 'warn',
},
],
},
},
settings
),
{ oss: false }
);
}
export async function setupTestServers(settings = {}) {
const { startES } = createTestServers({
adjustTimeout: (t) => jest.setTimeout(t),
@ -20,25 +49,7 @@ export async function setupTestServers(settings = {}) {
const esServer = await startES();
const root = createRootWithCorePlugins(
deepmerge(
{
logging: {
root: {
level: 'warn',
},
loggers: [
{
name: 'plugins.taskManager',
level: 'all',
},
],
},
},
settings
),
{ oss: false }
);
const root = createRoot(settings);
await root.preboot();
const coreSetup = await root.setup();
@ -54,3 +65,20 @@ export async function setupTestServers(settings = {}) {
},
};
}
export async function setupKibanaServer(settings = {}) {
const root = createRoot(settings);
await root.preboot();
const coreSetup = await root.setup();
const coreStart = await root.start();
return {
kibanaServer: {
root,
coreSetup,
coreStart,
stop: async () => await root.shutdown(),
},
};
}

View file

@ -0,0 +1,369 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { v4 as uuidV4 } from 'uuid';
import { schema } from '@kbn/config-schema';
import { SerializedConcreteTaskInstance, TaskStatus } from '../task';
import type { TaskClaimingOpts } from '../queries/task_claiming';
import { injectTask, setupTestServers, retry } from './lib';
import { setupKibanaServer } from './lib/setup_test_servers';
const mockTaskTypeRunFn = jest.fn();
const mockCreateTaskRunner = jest.fn();
const mockTaskType = {
title: '',
description: '',
stateSchemaByVersion: {
1: {
up: (state: Record<string, unknown>) => ({ ...state, baz: state.baz || '' }),
schema: schema.object({
foo: schema.string(),
bar: schema.string(),
baz: schema.string(),
}),
},
},
createTaskRunner: mockCreateTaskRunner.mockImplementation(() => ({
run: mockTaskTypeRunFn,
})),
};
const { TaskClaiming: TaskClaimingMock } = jest.requireMock('../queries/task_claiming');
jest.mock('../queries/task_claiming', () => {
const actual = jest.requireActual('../queries/task_claiming');
return {
...actual,
TaskClaiming: jest.fn().mockImplementation((opts: TaskClaimingOpts) => {
// We need to register here because once the class is instantiated, adding
// definitions won't get claimed because of "partitionIntoClaimingBatches".
opts.definitions.registerTaskDefinitions({
fooType: mockTaskType,
});
return new actual.TaskClaiming(opts);
}),
};
});
describe('switch task claiming strategies', () => {
beforeEach(() => {
jest.clearAllMocks();
});
it('should switch from default to mget and still claim tasks', async () => {
const setupResultDefault = await setupTestServers();
const esServer = setupResultDefault.esServer;
let kibanaServer = setupResultDefault.kibanaServer;
let taskClaimingOpts: TaskClaimingOpts = TaskClaimingMock.mock.calls[0][0];
expect(taskClaimingOpts.strategy).toBe('default');
mockTaskTypeRunFn.mockImplementation(() => {
return { state: {} };
});
// inject a task to run and ensure it is claimed and run
const id1 = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: id1,
taskType: 'fooType',
params: { foo: true },
state: { foo: 'test', bar: 'test', baz: 'test' },
stateVersion: 4,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});
await retry(async () => {
expect(mockTaskTypeRunFn).toHaveBeenCalledTimes(1);
});
if (kibanaServer) {
await kibanaServer.stop();
}
const setupResultMget = await setupKibanaServer({
xpack: {
task_manager: {
claim_strategy: 'unsafe_mget',
},
},
});
kibanaServer = setupResultMget.kibanaServer;
taskClaimingOpts = TaskClaimingMock.mock.calls[1][0];
expect(taskClaimingOpts.strategy).toBe('unsafe_mget');
// inject a task to run and ensure it is claimed and run
const id2 = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: id2,
taskType: 'fooType',
params: { foo: true },
state: { foo: 'test', bar: 'test', baz: 'test' },
stateVersion: 4,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});
await retry(async () => {
expect(mockTaskTypeRunFn).toHaveBeenCalledTimes(2);
});
if (kibanaServer) {
await kibanaServer.stop();
}
if (esServer) {
await esServer.stop();
}
});
it('should switch from mget to default and still claim tasks', async () => {
const setupResultMget = await setupTestServers({
xpack: {
task_manager: {
claim_strategy: 'unsafe_mget',
},
},
});
const esServer = setupResultMget.esServer;
let kibanaServer = setupResultMget.kibanaServer;
let taskClaimingOpts: TaskClaimingOpts = TaskClaimingMock.mock.calls[0][0];
expect(taskClaimingOpts.strategy).toBe('unsafe_mget');
mockTaskTypeRunFn.mockImplementation(() => {
return { state: {} };
});
// inject a task to run and ensure it is claimed and run
const id1 = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: id1,
taskType: 'fooType',
params: { foo: true },
state: { foo: 'test', bar: 'test', baz: 'test' },
stateVersion: 4,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});
await retry(async () => {
expect(mockTaskTypeRunFn).toHaveBeenCalledTimes(1);
});
if (kibanaServer) {
await kibanaServer.stop();
}
const setupResultDefault = await setupKibanaServer();
kibanaServer = setupResultDefault.kibanaServer;
taskClaimingOpts = TaskClaimingMock.mock.calls[1][0];
expect(taskClaimingOpts.strategy).toBe('default');
// inject a task to run and ensure it is claimed and run
const id2 = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: id2,
taskType: 'fooType',
params: { foo: true },
state: { foo: 'test', bar: 'test', baz: 'test' },
stateVersion: 4,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});
await retry(async () => {
expect(mockTaskTypeRunFn).toHaveBeenCalledTimes(2);
});
if (kibanaServer) {
await kibanaServer.stop();
}
if (esServer) {
await esServer.stop();
}
});
it('should switch from default to mget and claim tasks that were running during shutdown', async () => {
const setupResultDefault = await setupTestServers();
const esServer = setupResultDefault.esServer;
let kibanaServer = setupResultDefault.kibanaServer;
let taskClaimingOpts: TaskClaimingOpts = TaskClaimingMock.mock.calls[0][0];
expect(taskClaimingOpts.strategy).toBe('default');
mockTaskTypeRunFn.mockImplementation(async () => {
await new Promise((resolve) => setTimeout(resolve, 2000));
return { state: {} };
});
// inject a task to run and ensure it is claimed and run
const id1 = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: id1,
taskType: 'fooType',
params: { foo: true },
state: { foo: 'test', bar: 'test', baz: 'test' },
stateVersion: 4,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
timeoutOverride: '5s',
retryAt: null,
ownerId: null,
});
await retry(async () => {
expect(mockTaskTypeRunFn).toHaveBeenCalledTimes(1);
});
if (kibanaServer) {
await kibanaServer.stop();
}
const setupResultMget = await setupKibanaServer({
xpack: {
task_manager: {
claim_strategy: 'unsafe_mget',
},
},
});
kibanaServer = setupResultMget.kibanaServer;
taskClaimingOpts = TaskClaimingMock.mock.calls[1][0];
expect(taskClaimingOpts.strategy).toBe('unsafe_mget');
// task doc should still exist and be running
const task = await kibanaServer.coreStart.elasticsearch.client.asInternalUser.get<{
task: SerializedConcreteTaskInstance;
}>({
id: `task:${id1}`,
index: '.kibana_task_manager',
});
expect(task._source?.task?.status).toBe(TaskStatus.Running);
// task manager should pick up and claim the task that was running during shutdown
await retry(
async () => {
expect(mockTaskTypeRunFn).toHaveBeenCalledTimes(2);
},
{ times: 60, intervalMs: 1000 }
);
if (kibanaServer) {
await kibanaServer.stop();
}
if (esServer) {
await esServer.stop();
}
});
it('should switch from mget to default and claim tasks that were running during shutdown', async () => {
const setupResultMget = await setupTestServers({
xpack: {
task_manager: {
claim_strategy: 'unsafe_mget',
},
},
});
const esServer = setupResultMget.esServer;
let kibanaServer = setupResultMget.kibanaServer;
let taskClaimingOpts: TaskClaimingOpts = TaskClaimingMock.mock.calls[0][0];
expect(taskClaimingOpts.strategy).toBe('unsafe_mget');
mockTaskTypeRunFn.mockImplementation(async () => {
await new Promise((resolve) => setTimeout(resolve, 2000));
return { state: {} };
});
// inject a task to run and ensure it is claimed and run
const id1 = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: id1,
taskType: 'fooType',
params: { foo: true },
state: { foo: 'test', bar: 'test', baz: 'test' },
stateVersion: 4,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
timeoutOverride: '5s',
retryAt: null,
ownerId: null,
});
await retry(async () => {
expect(mockTaskTypeRunFn).toHaveBeenCalledTimes(1);
});
if (kibanaServer) {
await kibanaServer.stop();
}
const setupResultDefault = await setupKibanaServer();
kibanaServer = setupResultDefault.kibanaServer;
taskClaimingOpts = TaskClaimingMock.mock.calls[1][0];
expect(taskClaimingOpts.strategy).toBe('default');
// task doc should still exist and be running
const task = await kibanaServer.coreStart.elasticsearch.client.asInternalUser.get<{
task: SerializedConcreteTaskInstance;
}>({
id: `task:${id1}`,
index: '.kibana_task_manager',
});
expect(task._source?.task?.status).toBe(TaskStatus.Running);
await retry(
async () => {
expect(mockTaskTypeRunFn).toHaveBeenCalledTimes(2);
},
{ times: 60, intervalMs: 1000 }
);
if (kibanaServer) {
await kibanaServer.stop();
}
if (esServer) {
await esServer.stop();
}
});
});