Set mget task claim strategy as the default (#197070)

Resolves https://github.com/elastic/kibana/issues/194625

In this PR, I'm setting `mget` as the default task claiming strategy
along the following changes:
- Given we no longer need the 8.16 specific PRs
(https://github.com/elastic/kibana/pull/196317 and
https://github.com/elastic/kibana/pull/196757), I've also reverted them.
- Given we now use `met` as the default, I've renamed
`task_manager_claimer_mget` to `task_manager_claimer_update_by_query`
and made tests in that folder test using the `update_by_query` claim
strategy.
- Stabilize flaky tests caused by mget + polling for tasks more
frequently

Flaky test runners:
-
[[59b71bc](59b71bcdbe)]
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/7197
-
[[aea910e](aea910e36d)]
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/7199
-
[[4723ced](4723ced751)]
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/7206
-
[[d28c8c5](d28c8c56f6)]
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/7209
-
[[dd7773a](dd7773aeba)]
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/7224

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Mike Côté 2024-10-25 08:57:46 -04:00 committed by GitHub
parent 4798c59158
commit c31f11e7d8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
44 changed files with 128 additions and 405 deletions

View file

@ -342,7 +342,7 @@ enabled:
- x-pack/test/spaces_api_integration/security_and_spaces/config_trial.ts
- x-pack/test/spaces_api_integration/security_and_spaces/copy_to_space_config_trial.ts
- x-pack/test/spaces_api_integration/spaces_only/config.ts
- x-pack/test/task_manager_claimer_mget/config.ts
- x-pack/test/task_manager_claimer_update_by_query/config.ts
- x-pack/test/ui_capabilities/security_and_spaces/config.ts
- x-pack/test/ui_capabilities/spaces_only/config.ts
- x-pack/test/upgrade_assistant_integration/config.ts

4
.github/CODEOWNERS vendored
View file

@ -745,7 +745,7 @@ x-pack/plugins/runtime_fields @elastic/kibana-management
packages/kbn-safer-lodash-set @elastic/kibana-security
x-pack/test/security_api_integration/plugins/saml_provider @elastic/kibana-security
x-pack/test/plugin_api_integration/plugins/sample_task_plugin @elastic/response-ops
x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget @elastic/response-ops
x-pack/test/task_manager_claimer_update_by_query/plugins/sample_task_plugin_mget @elastic/response-ops
test/plugin_functional/plugins/saved_object_export_transforms @elastic/kibana-core
test/plugin_functional/plugins/saved_object_import_warnings @elastic/kibana-core
x-pack/test/saved_object_api_integration/common/plugins/saved_object_test_plugin @elastic/kibana-security
@ -1495,7 +1495,7 @@ x-pack/plugins/cloud_integrations/cloud_full_story/server/config.ts @elastic/kib
/x-pack/test/alerting_api_integration/observability @elastic/obs-ux-management-team
/x-pack/test/plugin_api_integration/test_suites/task_manager/ @elastic/response-ops
/x-pack/test/functional_with_es_ssl/apps/triggers_actions_ui/ @elastic/response-ops
/x-pack/test/task_manager_claimer_mget/ @elastic/response-ops
/x-pack/test/task_manager_claimer_update_by_query/ @elastic/response-ops
/docs/user/alerting/ @elastic/response-ops
/docs/management/connectors/ @elastic/response-ops
/x-pack/test/cases_api_integration/ @elastic/response-ops

View file

@ -207,7 +207,6 @@ uiSettings:
labs:dashboard:deferBelowFold: false
# Task Manager
xpack.task_manager.claim_strategy: mget
xpack.task_manager.allow_reading_invalid_state: false
xpack.task_manager.request_timeouts.update_by_query: 60000
xpack.task_manager.metrics_reset_interval: 120000

View file

@ -763,7 +763,7 @@
"@kbn/safer-lodash-set": "link:packages/kbn-safer-lodash-set",
"@kbn/saml-provider-plugin": "link:x-pack/test/security_api_integration/plugins/saml_provider",
"@kbn/sample-task-plugin": "link:x-pack/test/plugin_api_integration/plugins/sample_task_plugin",
"@kbn/sample-task-plugin-mget": "link:x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget",
"@kbn/sample-task-plugin-update-by-query": "link:x-pack/test/task_manager_claimer_update_by_query/plugins/sample_task_plugin_mget",
"@kbn/saved-object-export-transforms-plugin": "link:test/plugin_functional/plugins/saved_object_export_transforms",
"@kbn/saved-object-import-warnings-plugin": "link:test/plugin_functional/plugins/saved_object_import_warnings",
"@kbn/saved-object-test-plugin": "link:x-pack/test/saved_object_api_integration/common/plugins/saved_object_test_plugin",

View file

@ -1484,8 +1484,8 @@
"@kbn/saml-provider-plugin/*": ["x-pack/test/security_api_integration/plugins/saml_provider/*"],
"@kbn/sample-task-plugin": ["x-pack/test/plugin_api_integration/plugins/sample_task_plugin"],
"@kbn/sample-task-plugin/*": ["x-pack/test/plugin_api_integration/plugins/sample_task_plugin/*"],
"@kbn/sample-task-plugin-mget": ["x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget"],
"@kbn/sample-task-plugin-mget/*": ["x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget/*"],
"@kbn/sample-task-plugin-update-by-query": ["x-pack/test/task_manager_claimer_update_by_query/plugins/sample_task_plugin_mget"],
"@kbn/sample-task-plugin-update-by-query/*": ["x-pack/test/task_manager_claimer_update_by_query/plugins/sample_task_plugin_mget/*"],
"@kbn/saved-object-export-transforms-plugin": ["test/plugin_functional/plugins/saved_object_export_transforms"],
"@kbn/saved-object-export-transforms-plugin/*": ["test/plugin_functional/plugins/saved_object_export_transforms/*"],
"@kbn/saved-object-import-warnings-plugin": ["test/plugin_functional/plugins/saved_object_import_warnings"],

View file

@ -14,6 +14,7 @@ describe('config validation', () => {
Object {
"allow_reading_invalid_state": true,
"auto_calculate_default_ech_capacity": false,
"claim_strategy": "mget",
"discovery": Object {
"active_nodes_lookback": "30s",
"interval": 10000,
@ -44,7 +45,7 @@ describe('config validation', () => {
"warn_threshold": 80,
},
},
"poll_interval": 3000,
"poll_interval": 500,
"request_capacity": 1000,
"request_timeouts": Object {
"update_by_query": 30000,
@ -66,7 +67,7 @@ describe('config validation', () => {
expect(() => {
configSchema.validate(config);
}).toThrowErrorMatchingInlineSnapshot(
`"The specified monitored_stats_required_freshness (100) is invalid, as it is below the poll_interval (3000)"`
`"The specified monitored_stats_required_freshness (100) is invalid, as it is below the poll_interval (500)"`
);
});
@ -76,6 +77,7 @@ describe('config validation', () => {
Object {
"allow_reading_invalid_state": true,
"auto_calculate_default_ech_capacity": false,
"claim_strategy": "mget",
"discovery": Object {
"active_nodes_lookback": "30s",
"interval": 10000,
@ -106,7 +108,7 @@ describe('config validation', () => {
"warn_threshold": 80,
},
},
"poll_interval": 3000,
"poll_interval": 500,
"request_capacity": 1000,
"request_timeouts": Object {
"update_by_query": 30000,
@ -136,6 +138,7 @@ describe('config validation', () => {
Object {
"allow_reading_invalid_state": true,
"auto_calculate_default_ech_capacity": false,
"claim_strategy": "mget",
"discovery": Object {
"active_nodes_lookback": "30s",
"interval": 10000,
@ -171,7 +174,7 @@ describe('config validation', () => {
"warn_threshold": 80,
},
},
"poll_interval": 3000,
"poll_interval": 500,
"request_capacity": 1000,
"request_timeouts": Object {
"update_by_query": 30000,

View file

@ -202,7 +202,7 @@ export const configSchema = schema.object(
max: 100,
min: 1,
}),
claim_strategy: schema.maybe(schema.string()),
claim_strategy: schema.string({ defaultValue: CLAIM_STRATEGY_MGET }),
request_timeouts: requestTimeoutsConfig,
auto_calculate_default_ech_capacity: schema.boolean({ defaultValue: false }),
},

View file

@ -52,13 +52,13 @@ describe('switch task claiming strategies', () => {
jest.clearAllMocks();
});
it('should switch from default to mget and still claim tasks', async () => {
it('should switch from default to update_by_query and still claim tasks', async () => {
const setupResultDefault = await setupTestServers();
const esServer = setupResultDefault.esServer;
let kibanaServer = setupResultDefault.kibanaServer;
let taskClaimingOpts: TaskClaimingOpts = TaskClaimingMock.mock.calls[0][0];
expect(taskClaimingOpts.strategy).toBe('update_by_query');
expect(taskClaimingOpts.strategy).toBe('mget');
mockTaskTypeRunFn.mockImplementation(() => {
return { state: {} };
@ -90,17 +90,17 @@ describe('switch task claiming strategies', () => {
await kibanaServer.stop();
}
const setupResultMget = await setupKibanaServer({
const setupResultUbq = await setupKibanaServer({
xpack: {
task_manager: {
claim_strategy: 'mget',
claim_strategy: 'update_by_query',
},
},
});
kibanaServer = setupResultMget.kibanaServer;
kibanaServer = setupResultUbq.kibanaServer;
taskClaimingOpts = TaskClaimingMock.mock.calls[1][0];
expect(taskClaimingOpts.strategy).toBe('mget');
expect(taskClaimingOpts.strategy).toBe('update_by_query');
// inject a task to run and ensure it is claimed and run
const id2 = uuidV4();
@ -132,19 +132,19 @@ describe('switch task claiming strategies', () => {
}
});
it('should switch from mget to default and still claim tasks', async () => {
const setupResultMget = await setupTestServers({
it('should switch from update_by_query to default and still claim tasks', async () => {
const setupResultUbq = await setupTestServers({
xpack: {
task_manager: {
claim_strategy: 'mget',
claim_strategy: 'update_by_query',
},
},
});
const esServer = setupResultMget.esServer;
let kibanaServer = setupResultMget.kibanaServer;
const esServer = setupResultUbq.esServer;
let kibanaServer = setupResultUbq.kibanaServer;
let taskClaimingOpts: TaskClaimingOpts = TaskClaimingMock.mock.calls[0][0];
expect(taskClaimingOpts.strategy).toBe('mget');
expect(taskClaimingOpts.strategy).toBe('update_by_query');
mockTaskTypeRunFn.mockImplementation(() => {
return { state: {} };
@ -180,7 +180,7 @@ describe('switch task claiming strategies', () => {
kibanaServer = setupResultDefault.kibanaServer;
taskClaimingOpts = TaskClaimingMock.mock.calls[1][0];
expect(taskClaimingOpts.strategy).toBe('update_by_query');
expect(taskClaimingOpts.strategy).toBe('mget');
// inject a task to run and ensure it is claimed and run
const id2 = uuidV4();
@ -212,13 +212,13 @@ describe('switch task claiming strategies', () => {
}
});
it('should switch from default to mget and claim tasks that were running during shutdown', async () => {
it('should switch from default to update_by_query and claim tasks that were running during shutdown', async () => {
const setupResultDefault = await setupTestServers();
const esServer = setupResultDefault.esServer;
let kibanaServer = setupResultDefault.kibanaServer;
let taskClaimingOpts: TaskClaimingOpts = TaskClaimingMock.mock.calls[0][0];
expect(taskClaimingOpts.strategy).toBe('update_by_query');
expect(taskClaimingOpts.strategy).toBe('mget');
mockTaskTypeRunFn.mockImplementation(async () => {
await new Promise((resolve) => setTimeout(resolve, 2000));
@ -252,17 +252,17 @@ describe('switch task claiming strategies', () => {
await kibanaServer.stop();
}
const setupResultMget = await setupKibanaServer({
const setupResultUbq = await setupKibanaServer({
xpack: {
task_manager: {
claim_strategy: 'mget',
claim_strategy: 'update_by_query',
},
},
});
kibanaServer = setupResultMget.kibanaServer;
kibanaServer = setupResultUbq.kibanaServer;
taskClaimingOpts = TaskClaimingMock.mock.calls[1][0];
expect(taskClaimingOpts.strategy).toBe('mget');
expect(taskClaimingOpts.strategy).toBe('update_by_query');
// task doc should still exist and be running
const task = await kibanaServer.coreStart.elasticsearch.client.asInternalUser.get<{
@ -290,19 +290,19 @@ describe('switch task claiming strategies', () => {
}
});
it('should switch from mget to default and claim tasks that were running during shutdown', async () => {
const setupResultMget = await setupTestServers({
it('should switch from update_by_query to default and claim tasks that were running during shutdown', async () => {
const setupResultUbq = await setupTestServers({
xpack: {
task_manager: {
claim_strategy: 'mget',
claim_strategy: 'update_by_query',
},
},
});
const esServer = setupResultMget.esServer;
let kibanaServer = setupResultMget.kibanaServer;
const esServer = setupResultUbq.esServer;
let kibanaServer = setupResultUbq.kibanaServer;
let taskClaimingOpts: TaskClaimingOpts = TaskClaimingMock.mock.calls[0][0];
expect(taskClaimingOpts.strategy).toBe('mget');
expect(taskClaimingOpts.strategy).toBe('update_by_query');
mockTaskTypeRunFn.mockImplementation(async () => {
await new Promise((resolve) => setTimeout(resolve, 2000));
@ -340,7 +340,7 @@ describe('switch task claiming strategies', () => {
kibanaServer = setupResultDefault.kibanaServer;
taskClaimingOpts = TaskClaimingMock.mock.calls[1][0];
expect(taskClaimingOpts.strategy).toBe('update_by_query');
expect(taskClaimingOpts.strategy).toBe('mget');
// task doc should still exist and be running
const task = await kibanaServer.coreStart.elasticsearch.client.asInternalUser.get<{

View file

@ -305,7 +305,7 @@ describe('task state validation', () => {
it('should fail the task run when setting allow_reading_invalid_state:false and reading an invalid state', async () => {
const logSpy = jest.spyOn(pollingLifecycleOpts.logger, 'warn');
const updateSpy = jest.spyOn(pollingLifecycleOpts.taskStore, 'bulkUpdate');
const updateSpy = jest.spyOn(pollingLifecycleOpts.taskStore, 'bulkPartialUpdate');
const id = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
@ -332,8 +332,7 @@ describe('task state validation', () => {
const found = calls.map((arr) => arr[0]).find((message) => message.match(expected) != null);
expect(found).toMatch(expected);
expect(updateSpy).toHaveBeenCalledWith(
expect.arrayContaining([expect.objectContaining({ id, taskType: 'fooType' })]),
{ validate: false }
expect.arrayContaining([expect.objectContaining({ id })])
);
});
});

View file

@ -1,226 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
CLAIM_STRATEGY_MGET,
CLAIM_STRATEGY_UPDATE_BY_QUERY,
DEFAULT_POLL_INTERVAL,
MGET_DEFAULT_POLL_INTERVAL,
} from '../config';
import { mockLogger } from '../test_utils';
import { setClaimStrategy } from './set_claim_strategy';
const getConfigWithoutClaimStrategy = () => ({
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
allow_reading_invalid_state: false,
version_conflict_threshold: 80,
monitored_aggregated_stats_refresh_rate: 60000,
monitored_stats_health_verbose_log: {
enabled: false,
level: 'debug' as const,
warn_delayed_task_start_in_seconds: 60,
},
monitored_stats_required_freshness: 4000,
monitored_stats_running_average_window: 50,
request_capacity: 1000,
monitored_task_execution_thresholds: {
default: {
error_threshold: 90,
warn_threshold: 80,
},
custom: {},
},
ephemeral_tasks: {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
authenticate_background_task_utilization: true,
},
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
request_timeouts: {
update_by_query: 1000,
},
poll_interval: DEFAULT_POLL_INTERVAL,
auto_calculate_default_ech_capacity: false,
});
const logger = mockLogger();
const deploymentIdUpdateByQuery = 'd2f0e7c6bc464a9b8b16e5730b9c40f9';
const deploymentIdMget = 'ab4e88d139f93d43024837d96144e7d4';
describe('setClaimStrategy', () => {
beforeEach(() => {
jest.clearAllMocks();
});
for (const isServerless of [true, false]) {
for (const isCloud of [true, false]) {
for (const isElasticStaffOwned of [true, false]) {
for (const deploymentId of [undefined, deploymentIdMget, deploymentIdUpdateByQuery]) {
for (const configuredStrategy of [CLAIM_STRATEGY_MGET, CLAIM_STRATEGY_UPDATE_BY_QUERY]) {
test(`should return config as is when claim strategy is already defined: isServerless=${isServerless}, isCloud=${isCloud}, isElasticStaffOwned=${isElasticStaffOwned}, deploymentId=${deploymentId}`, () => {
const config = {
...getConfigWithoutClaimStrategy(),
claim_strategy: configuredStrategy,
};
const returnedConfig = setClaimStrategy({
config,
logger,
isCloud,
isServerless,
isElasticStaffOwned,
deploymentId,
});
expect(returnedConfig).toStrictEqual(config);
if (deploymentId) {
expect(logger.info).toHaveBeenCalledWith(
`Using claim strategy ${configuredStrategy} as configured for deployment ${deploymentId}`
);
} else {
expect(logger.info).toHaveBeenCalledWith(
`Using claim strategy ${configuredStrategy} as configured`
);
}
});
}
}
}
}
}
for (const isCloud of [true, false]) {
for (const isElasticStaffOwned of [true, false]) {
for (const deploymentId of [undefined, deploymentIdMget, deploymentIdUpdateByQuery]) {
test(`should set claim strategy to mget if in serverless: isCloud=${isCloud}, isElasticStaffOwned=${isElasticStaffOwned}, deploymentId=${deploymentId}`, () => {
const config = getConfigWithoutClaimStrategy();
const returnedConfig = setClaimStrategy({
config,
logger,
isCloud,
isServerless: true,
isElasticStaffOwned,
deploymentId,
});
expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_MGET);
expect(returnedConfig.poll_interval).toBe(MGET_DEFAULT_POLL_INTERVAL);
if (deploymentId) {
expect(logger.info).toHaveBeenCalledWith(
`Setting claim strategy to mget for serverless deployment ${deploymentId}`
);
} else {
expect(logger.info).toHaveBeenCalledWith(`Setting claim strategy to mget`);
}
});
}
}
}
test(`should set claim strategy to update_by_query if not cloud and not serverless`, () => {
const config = getConfigWithoutClaimStrategy();
const returnedConfig = setClaimStrategy({
config,
logger,
isCloud: false,
isElasticStaffOwned: false,
isServerless: false,
});
expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_UPDATE_BY_QUERY);
expect(returnedConfig.poll_interval).toBe(DEFAULT_POLL_INTERVAL);
expect(logger.info).not.toHaveBeenCalled();
});
test(`should set claim strategy to update_by_query if cloud and not serverless with undefined deploymentId`, () => {
const config = getConfigWithoutClaimStrategy();
const returnedConfig = setClaimStrategy({
config,
logger,
isCloud: true,
isElasticStaffOwned: false,
isServerless: false,
});
expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_UPDATE_BY_QUERY);
expect(returnedConfig.poll_interval).toBe(DEFAULT_POLL_INTERVAL);
expect(logger.info).not.toHaveBeenCalled();
});
test(`should set claim strategy to update_by_query if cloud and not serverless and deploymentId does not start with a or b`, () => {
const config = getConfigWithoutClaimStrategy();
const returnedConfig = setClaimStrategy({
config,
logger,
isCloud: true,
isElasticStaffOwned: false,
isServerless: false,
deploymentId: deploymentIdUpdateByQuery,
});
expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_UPDATE_BY_QUERY);
expect(returnedConfig.poll_interval).toBe(DEFAULT_POLL_INTERVAL);
expect(logger.info).toHaveBeenCalledWith(
`Setting claim strategy to update_by_query for deployment ${deploymentIdUpdateByQuery}`
);
});
test(`should set claim strategy to mget if cloud, deploymentId does not start with a or b, not serverless and isElasticStaffOwned is true`, () => {
const config = getConfigWithoutClaimStrategy();
const returnedConfig = setClaimStrategy({
config,
logger,
isCloud: true,
isElasticStaffOwned: true,
isServerless: false,
deploymentId: deploymentIdUpdateByQuery,
});
expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_MGET);
expect(returnedConfig.poll_interval).toBe(MGET_DEFAULT_POLL_INTERVAL);
expect(logger.info).toHaveBeenCalledWith(
`Setting claim strategy to mget for deployment ${deploymentIdUpdateByQuery}`
);
});
test(`should set claim strategy to mget if cloud and not serverless and deploymentId starts with a or b`, () => {
const config = getConfigWithoutClaimStrategy();
const returnedConfig = setClaimStrategy({
config,
logger,
isCloud: true,
isElasticStaffOwned: false,
isServerless: false,
deploymentId: deploymentIdMget,
});
expect(returnedConfig.claim_strategy).toBe(CLAIM_STRATEGY_MGET);
expect(returnedConfig.poll_interval).toBe(MGET_DEFAULT_POLL_INTERVAL);
expect(logger.info).toHaveBeenCalledWith(
`Setting claim strategy to mget for deployment ${deploymentIdMget}`
);
});
});

View file

@ -1,80 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Logger } from '@kbn/core/server';
import {
CLAIM_STRATEGY_MGET,
CLAIM_STRATEGY_UPDATE_BY_QUERY,
DEFAULT_POLL_INTERVAL,
MGET_DEFAULT_POLL_INTERVAL,
TaskManagerConfig,
} from '../config';
interface SetClaimStrategyOpts {
config: TaskManagerConfig;
deploymentId?: string;
isServerless: boolean;
isCloud: boolean;
isElasticStaffOwned: boolean;
logger: Logger;
}
export function setClaimStrategy(opts: SetClaimStrategyOpts): TaskManagerConfig {
// if the claim strategy is already defined, return immediately
if (opts.config.claim_strategy) {
opts.logger.info(
`Using claim strategy ${opts.config.claim_strategy} as configured${
opts.deploymentId ? ` for deployment ${opts.deploymentId}` : ''
}`
);
return opts.config;
}
if (opts.isServerless) {
// use mget for serverless
opts.logger.info(
`Setting claim strategy to mget${
opts.deploymentId ? ` for serverless deployment ${opts.deploymentId}` : ''
}`
);
return {
...opts.config,
claim_strategy: CLAIM_STRATEGY_MGET,
poll_interval: MGET_DEFAULT_POLL_INTERVAL,
};
}
let defaultToMget = false;
if (opts.isCloud && !opts.isServerless && opts.deploymentId) {
defaultToMget =
opts.deploymentId.startsWith('a') ||
opts.deploymentId.startsWith('b') ||
opts.isElasticStaffOwned;
if (defaultToMget) {
opts.logger.info(`Setting claim strategy to mget for deployment ${opts.deploymentId}`);
} else {
opts.logger.info(
`Setting claim strategy to update_by_query for deployment ${opts.deploymentId}`
);
}
}
if (defaultToMget) {
return {
...opts.config,
claim_strategy: CLAIM_STRATEGY_MGET,
poll_interval: MGET_DEFAULT_POLL_INTERVAL,
};
}
return {
...opts.config,
claim_strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
poll_interval: DEFAULT_POLL_INTERVAL,
};
}

View file

@ -18,7 +18,7 @@ import {
ServiceStatusLevels,
CoreStatus,
} from '@kbn/core/server';
import type { CloudSetup, CloudStart } from '@kbn/cloud-plugin/server';
import type { CloudStart } from '@kbn/cloud-plugin/server';
import {
registerDeleteInactiveNodesTaskDefinition,
scheduleDeleteInactiveNodesTaskDefinition,
@ -45,7 +45,6 @@ import { metricsStream, Metrics } from './metrics';
import { TaskManagerMetricsCollector } from './metrics/task_metrics_collector';
import { TaskPartitioner } from './lib/task_partitioner';
import { getDefaultCapacity } from './lib/get_default_capacity';
import { setClaimStrategy } from './lib/set_claim_strategy';
export interface TaskManagerSetupContract {
/**
@ -127,19 +126,10 @@ export class TaskManagerPlugin
public setup(
core: CoreSetup<TaskManagerStartContract, unknown>,
plugins: { cloud?: CloudSetup; usageCollection?: UsageCollectionSetup }
plugins: { usageCollection?: UsageCollectionSetup }
): TaskManagerSetupContract {
this.elasticsearchAndSOAvailability$ = getElasticsearchAndSOAvailability(core.status.core$);
this.config = setClaimStrategy({
config: this.config,
deploymentId: plugins.cloud?.deploymentId,
isServerless: this.initContext.env.packageInfo.buildFlavor === 'serverless',
isCloud: plugins.cloud?.isCloudEnabled ?? false,
isElasticStaffOwned: plugins.cloud?.isElasticStaffOwned ?? false,
logger: this.logger,
});
core.metrics
.getOpsMetrics$()
.pipe(distinctUntilChanged())
@ -147,7 +137,7 @@ export class TaskManagerPlugin
this.heapSizeLimit = metrics.process.memory.heap.size_limit;
});
setupSavedObjects(core.savedObjects);
setupSavedObjects(core.savedObjects, this.config);
this.taskManagerId = this.initContext.env.instanceUuid;
if (!this.taskManagerId) {
@ -311,9 +301,9 @@ export class TaskManagerPlugin
this.config!.claim_strategy
} isBackgroundTaskNodeOnly=${this.isNodeBackgroundTasksOnly()} heapSizeLimit=${
this.heapSizeLimit
} defaultCapacity=${defaultCapacity} pollingInterval=${
this.config!.poll_interval
} autoCalculateDefaultEchCapacity=${this.config.auto_calculate_default_ech_capacity}`
} defaultCapacity=${defaultCapacity} autoCalculateDefaultEchCapacity=${
this.config.auto_calculate_default_ech_capacity
}`
);
const managedConfiguration = createManagedConfiguration({

View file

@ -14,7 +14,7 @@ import type { Logger, ExecutionContextStart } from '@kbn/core/server';
import { Result, asErr, mapErr, asOk, map, mapOk } from './lib/result_type';
import { ManagedConfiguration } from './lib/create_managed_configuration';
import { CLAIM_STRATEGY_UPDATE_BY_QUERY, TaskManagerConfig } from './config';
import { TaskManagerConfig, CLAIM_STRATEGY_UPDATE_BY_QUERY } from './config';
import {
TaskMarkRunning,
@ -141,7 +141,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
this.pool = new TaskPool({
logger,
strategy: config.claim_strategy!,
strategy: config.claim_strategy,
capacity$: capacityConfiguration$,
definitions: this.definitions,
});
@ -149,7 +149,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
this.taskClaiming = new TaskClaiming({
taskStore,
strategy: config.claim_strategy!,
strategy: config.claim_strategy,
maxAttempts: config.max_attempts,
excludedTaskTypes: config.unsafe.exclude_task_types,
definitions,
@ -238,7 +238,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
usageCounter: this.usageCounter,
config: this.config,
allowReadingInvalidState: this.config.allow_reading_invalid_state,
strategy: this.config.claim_strategy!,
strategy: this.config.claim_strategy,
getPollInterval: () => this.currentPollInterval,
});
};

View file

@ -9,6 +9,7 @@ import type { SavedObjectsServiceSetup } from '@kbn/core/server';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { backgroundTaskNodeMapping, taskMappings } from './mappings';
import { getMigrations } from './migrations';
import { TaskManagerConfig } from '../config';
import { getOldestIdleActionTask } from '../queries/oldest_idle_action_task';
import { TASK_MANAGER_INDEX } from '../constants';
import { backgroundTaskNodeModelVersions, taskModelVersions } from './model_versions';
@ -16,7 +17,10 @@ import { backgroundTaskNodeModelVersions, taskModelVersions } from './model_vers
export const TASK_SO_NAME = 'task';
export const BACKGROUND_TASK_NODE_SO_NAME = 'background-task-node';
export function setupSavedObjects(savedObjects: SavedObjectsServiceSetup) {
export function setupSavedObjects(
savedObjects: SavedObjectsServiceSetup,
config: TaskManagerConfig
) {
savedObjects.registerType({
name: TASK_SO_NAME,
namespaceType: 'agnostic',

View file

@ -1711,7 +1711,7 @@ instanceStateValue: true
reference
);
// @ts-expect-error doesnt handle total: number
expect(searchResult.body.hits.total.value).to.eql(1);
expect(searchResult.body.hits.total.value).to.be.greaterThan(0);
expectExpect(
// @ts-expect-error _source: unknown
JSON.parse(searchResult.body.hits.hits[0]._source.params.message)

View file

@ -15,7 +15,8 @@ export const END_DATE = '2020-01-01T00:00:00Z';
export const DOCUMENT_SOURCE = 'queryDataEndpointTests';
export const DOCUMENT_REFERENCE = '-na-';
export const TEST_CACHE_EXPIRATION_TIME = 10000;
// Higher than the configured yml setting to avoid race conditions
export const TEST_CACHE_EXPIRATION_TIME = 12000;
export async function createEsDocuments(
es: Client,

View file

@ -552,6 +552,8 @@ export default function createAlertsAsDataFlappingTest({ getService }: FtrProvid
status_change_threshold: 2,
})
.expect(200);
// wait so cache expires
await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME);
const pattern = {
alertA: [true, false, true, false, true, false, true, false],
@ -667,6 +669,8 @@ export default function createAlertsAsDataFlappingTest({ getService }: FtrProvid
status_change_threshold: 2,
})
.expect(200);
// wait so cache expires
await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME);
const pattern = {
alertA: [true, false, true, false, true, false, true, false],

View file

@ -108,7 +108,7 @@ export default ({ getPageObjects, getService }: FtrProviderContext) => {
return await createAlwaysFiringRule({
name: `test-rule-${testRunUuid}`,
schedule: {
interval: '1s',
interval: '3s',
},
actions: connectors.map((connector) => ({
id: connector.id,
@ -615,7 +615,7 @@ export default ({ getPageObjects, getService }: FtrProviderContext) => {
const rule = await createAlwaysFiringRule({
name: `test-rule-${testRunUuid}`,
schedule: {
interval: '1s',
interval: '3s',
},
notify_when: RuleNotifyWhen.THROTTLE,
throttle: '2d',

View file

@ -87,7 +87,7 @@ export default async function ({ readConfigFile }: FtrConfigProviderContext) {
'ruleStatusFilter',
'isUsingRuleCreateFlyout',
])}`,
`--xpack.alerting.rules.minimumScheduleInterval.value="2s"`,
`--xpack.alerting.rules.minimumScheduleInterval.value="5s"`,
`--xpack.actions.enabledActionTypes=${JSON.stringify(enabledActionTypes)}`,
`--xpack.actions.preconfiguredAlertHistoryEsIndex=false`,
`--xpack.actions.preconfigured=${JSON.stringify({

View file

@ -16,6 +16,7 @@ import {
} from '@kbn/core/server';
import { EventEmitter } from 'events';
import { TaskManagerStartContract } from '@kbn/task-manager-plugin/server';
import { BACKGROUND_TASK_NODE_SO_NAME } from '@kbn/task-manager-plugin/server/saved_objects';
const scope = 'testing';
const taskManagerQuery = {
@ -398,4 +399,40 @@ export function initRoutes(
}
}
);
router.post(
{
path: `/api/update_kibana_node`,
validate: {
body: schema.object({
id: schema.string(),
lastSeen: schema.string(),
}),
},
},
async function (
context: RequestHandlerContext,
req: KibanaRequest<any, any, any, any>,
res: KibanaResponseFactory
): Promise<IKibanaResponse<any>> {
const { id, lastSeen } = req.body;
const client = (await context.core).savedObjects.getClient({
includedHiddenTypes: [BACKGROUND_TASK_NODE_SO_NAME],
});
const node = await client.update(
BACKGROUND_TASK_NODE_SO_NAME,
id,
{
id,
last_seen: lastSeen,
},
{ upsert: { id, last_seen: lastSeen }, refresh: false, retryOnConflict: 3 }
);
return res.ok({
body: node,
});
}
);
}

View file

@ -125,11 +125,16 @@ export default function ({ getService }: FtrProviderContext) {
const monitoredAggregatedStatsRefreshRate = 5000;
describe('health', () => {
after(async () => {
// clean up after each test
return await request.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200);
});
it('should return basic configuration of task manager', async () => {
const health = await getHealth();
expect(health.status).to.eql('OK');
expect(health.stats.configuration.value).to.eql({
poll_interval: 3000,
poll_interval: 500,
monitored_aggregated_stats_refresh_rate: monitoredAggregatedStatsRefreshRate,
monitored_stats_running_average_window: 50,
monitored_task_execution_thresholds: {
@ -145,7 +150,7 @@ export default function ({ getService }: FtrProviderContext) {
as_workers: 10,
as_cost: 20,
},
claim_strategy: 'update_by_query',
claim_strategy: 'mget',
});
});

View file

@ -18,6 +18,7 @@ export default function ({ loadTestFile }: FtrProviderContext) {
loadTestFile(require.resolve('./task_management_removed_types'));
loadTestFile(require.resolve('./check_registered_task_types'));
loadTestFile(require.resolve('./kibana_discovery_service'));
loadTestFile(require.resolve('./task_partitions'));
loadTestFile(require.resolve('./migrations'));
});

View file

@ -563,21 +563,6 @@ export default function ({ getService }: FtrProviderContext) {
await releaseTasksWaitingForEventToComplete('releaseSecondWaveOfTasks');
});
it('should increment attempts when task fails on markAsRunning', async () => {
const originalTask = await scheduleTask({
taskType: 'sampleTask',
params: { throwOnMarkAsRunning: true },
});
expect(originalTask.attempts).to.eql(0);
// Wait for task manager to attempt running the task a second time
await retry.try(async () => {
const task = await currentTask(originalTask.id);
expect(task.attempts).to.eql(2);
});
});
it('should return a task run error result when trying to run a non-existent task', async () => {
// runSoon should fail
const failedRunSoonResult = await runTaskSoon({

View file

@ -56,6 +56,11 @@ export default function ({ getService }: FtrProviderContext) {
await esArchiver.unload('x-pack/test/functional/es_archives/task_manager_removed_types');
});
afterEach(async () => {
// clean up after each test
return await request.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200);
});
function scheduleTask(
task: Partial<ConcreteTaskInstance | DeprecatedConcreteTaskInstance>
): Promise<SerializedConcreteTaskInstance> {

View file

@ -166,7 +166,7 @@ export default function ({ getService }: FtrProviderContext) {
await updateKibanaNodes();
await setTimeoutAsync(10000);
const tasksToSchedule = [];
const tasksToSchedule: Array<Promise<SerializedConcreteTaskInstance>> = [];
for (let i = 0; i < 3; i++) {
tasksToSchedule.push(
scheduleTask({

View file

@ -20,7 +20,7 @@ export default async function ({ readConfigFile }: FtrConfigProviderContext) {
apps: integrationConfig.get('apps'),
screenshots: integrationConfig.get('screenshots'),
junit: {
reportName: 'Task Manager MGet Claimer Functional Tests',
reportName: 'Task Manager Update By Query Claimer Functional Tests',
},
kbnTestServer: {
...integrationConfig.get('kbnTestServer'),
@ -28,7 +28,7 @@ export default async function ({ readConfigFile }: FtrConfigProviderContext) {
...integrationConfig.get('kbnTestServer.serverArgs'),
'--xpack.eventLog.logEntries=true',
'--xpack.eventLog.indexEntries=true',
'--xpack.task_manager.claim_strategy="mget"',
'--xpack.task_manager.claim_strategy="update_by_query"',
'--xpack.task_manager.monitored_aggregated_stats_refresh_rate=5000',
'--xpack.task_manager.ephemeral_tasks.enabled=false',
'--xpack.task_manager.ephemeral_tasks.request_capacity=100',

View file

@ -1,9 +1,9 @@
{
"type": "plugin",
"id": "@kbn/sample-task-plugin-mget",
"id": "@kbn/sample-task-plugin-update-by-query",
"owner": "@elastic/response-ops",
"plugin": {
"id": "sampleTaskPluginMget",
"id": "sampleTaskPluginUpdateByQuery",
"server": true,
"browser": false,
"requiredPlugins": [

View file

@ -1,11 +1,11 @@
{
"name": "@kbn/sample-task-plugin-mget",
"name": "@kbn/sample-task-plugin-update-by-query",
"version": "1.0.0",
"kibana": {
"version": "kibana",
"templateVersion": "1.0.0"
},
"main": "target/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget",
"main": "target/test/task_manager_claimer_update_by_query/plugins/sample_task_plugin_update_by_query",
"scripts": {
"kbn": "node ../../../../../scripts/kbn.js",
"build": "rm -rf './target' && ../../../../../node_modules/.bin/tsc"

View file

@ -371,9 +371,6 @@ export class SampleTaskManagerFixturePlugin
},
async beforeMarkRunning(context) {
if (context.taskInstance?.params?.originalParams?.throwOnMarkAsRunning) {
throw new Error(`Sample task ${context.taskInstance.id} threw on MarkAsRunning`);
}
return context;
},
});

View file

@ -131,7 +131,7 @@ export default function ({ getService }: FtrProviderContext) {
const health = await getHealth();
expect(health.status).to.eql('OK');
expect(health.stats.configuration.value).to.eql({
poll_interval: 500,
poll_interval: 3000,
monitored_aggregated_stats_refresh_rate: monitoredAggregatedStatsRefreshRate,
monitored_stats_running_average_window: 50,
monitored_task_execution_thresholds: {
@ -147,7 +147,7 @@ export default function ({ getService }: FtrProviderContext) {
as_workers: 10,
as_cost: 20,
},
claim_strategy: 'mget',
claim_strategy: 'update_by_query',
});
});

View file

@ -8,7 +8,7 @@
import { FtrProviderContext } from '../../ftr_provider_context';
export default function ({ loadTestFile }: FtrProviderContext) {
describe('task_manager with mget task claimer', function taskManagerSuite() {
describe('task_manager with update by query task claimer', function taskManagerSuite() {
loadTestFile(require.resolve('./task_priority'));
loadTestFile(require.resolve('./background_task_utilization_route'));
loadTestFile(require.resolve('./metrics_route'));
@ -16,7 +16,6 @@ export default function ({ loadTestFile }: FtrProviderContext) {
loadTestFile(require.resolve('./task_management'));
loadTestFile(require.resolve('./task_management_scheduled_at'));
loadTestFile(require.resolve('./task_management_removed_types'));
loadTestFile(require.resolve('./task_partitions'));
loadTestFile(require.resolve('./migrations'));
});

View file

@ -6224,7 +6224,7 @@
version "0.0.0"
uid ""
"@kbn/sample-task-plugin-mget@link:x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget":
"@kbn/sample-task-plugin-update-by-query@link:x-pack/test/task_manager_claimer_update_by_query/plugins/sample_task_plugin_mget":
version "0.0.0"
uid ""