mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
Make request timeout of updateByQuery in TM configurable (#176461)
Resolves: https://github.com/elastic/kibana-team/issues/721 As we discovered by an issue, it's an anti pattern to timeout-and-retry requests part of the polling cycle because it's basically putting the requests back in the same queue that it once was ahead on. Therefore we would like to make TaskManager's update-by-query request timeout configurable.
This commit is contained in:
parent
7bbea56c16
commit
a500ad64e7
15 changed files with 145 additions and 16 deletions
|
@ -136,6 +136,7 @@ uiSettings:
|
|||
|
||||
# Task Manager
|
||||
xpack.task_manager.allow_reading_invalid_state: false
|
||||
xpack.task_manager.request_timeouts.update_by_query: 60000
|
||||
|
||||
## TaskManager requeue invalid tasks, supports ZDT
|
||||
xpack.task_manager.requeue_invalid_tasks.enabled: true
|
||||
|
|
|
@ -42,6 +42,9 @@ describe('config validation', () => {
|
|||
},
|
||||
"poll_interval": 3000,
|
||||
"request_capacity": 1000,
|
||||
"request_timeouts": Object {
|
||||
"update_by_query": 30000,
|
||||
},
|
||||
"requeue_invalid_tasks": Object {
|
||||
"delay": 3000,
|
||||
"enabled": false,
|
||||
|
@ -102,6 +105,9 @@ describe('config validation', () => {
|
|||
},
|
||||
"poll_interval": 3000,
|
||||
"request_capacity": 1000,
|
||||
"request_timeouts": Object {
|
||||
"update_by_query": 30000,
|
||||
},
|
||||
"requeue_invalid_tasks": Object {
|
||||
"delay": 3000,
|
||||
"enabled": false,
|
||||
|
@ -165,6 +171,9 @@ describe('config validation', () => {
|
|||
},
|
||||
"poll_interval": 3000,
|
||||
"request_capacity": 1000,
|
||||
"request_timeouts": Object {
|
||||
"update_by_query": 30000,
|
||||
},
|
||||
"requeue_invalid_tasks": Object {
|
||||
"delay": 3000,
|
||||
"enabled": false,
|
||||
|
|
|
@ -62,6 +62,11 @@ const requeueInvalidTasksConfig = schema.object({
|
|||
max_attempts: schema.number({ defaultValue: 100, min: 1, max: 500 }),
|
||||
});
|
||||
|
||||
const requestTimeoutsConfig = schema.object({
|
||||
/* The request timeout config for task manager's updateByQuery default:30s, min:10s, max:10m */
|
||||
update_by_query: schema.number({ defaultValue: 1000 * 30, min: 1000 * 10, max: 1000 * 60 * 10 }),
|
||||
});
|
||||
|
||||
export const configSchema = schema.object(
|
||||
{
|
||||
allow_reading_invalid_state: schema.boolean({ defaultValue: true }),
|
||||
|
@ -156,6 +161,7 @@ export const configSchema = schema.object(
|
|||
min: 1,
|
||||
}),
|
||||
claim_strategy: schema.string({ defaultValue: CLAIM_STRATEGY_DEFAULT }),
|
||||
request_timeouts: requestTimeoutsConfig,
|
||||
},
|
||||
{
|
||||
validate: (config) => {
|
||||
|
@ -179,3 +185,4 @@ export type RequeueInvalidTasksConfig = TypeOf<typeof requeueInvalidTasksConfig>
|
|||
export type TaskManagerConfig = TypeOf<typeof configSchema>;
|
||||
export type TaskExecutionFailureThreshold = TypeOf<typeof taskExecutionFailureThresholdSchema>;
|
||||
export type EventLoopDelayConfig = TypeOf<typeof eventLoopDelaySchema>;
|
||||
export type RequestTimeoutsConfig = TypeOf<typeof requestTimeoutsConfig>;
|
||||
|
|
|
@ -86,6 +86,9 @@ describe('EphemeralTaskLifecycle', () => {
|
|||
},
|
||||
metrics_reset_interval: 3000,
|
||||
claim_strategy: 'default',
|
||||
request_timeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
...config,
|
||||
},
|
||||
elasticsearchAndSOAvailability$,
|
||||
|
|
|
@ -81,6 +81,9 @@ describe('managed configuration', () => {
|
|||
},
|
||||
metrics_reset_interval: 3000,
|
||||
claim_strategy: 'default',
|
||||
request_timeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
});
|
||||
logger = context.logger.get('taskManager');
|
||||
|
||||
|
|
|
@ -6,14 +6,11 @@
|
|||
*/
|
||||
|
||||
import { v4 as uuidV4 } from 'uuid';
|
||||
import {
|
||||
type TestElasticsearchUtils,
|
||||
type TestKibanaUtils,
|
||||
} from '@kbn/core-test-helpers-kbn-server';
|
||||
import type { TestElasticsearchUtils, TestKibanaUtils } from '@kbn/core-test-helpers-kbn-server';
|
||||
import { schema } from '@kbn/config-schema';
|
||||
import { TaskStatus } from '../task';
|
||||
import { type TaskPollingLifecycleOpts } from '../polling_lifecycle';
|
||||
import { type TaskClaimingOpts } from '../queries/task_claiming';
|
||||
import type { TaskPollingLifecycleOpts } from '../polling_lifecycle';
|
||||
import type { TaskClaimingOpts } from '../queries/task_claiming';
|
||||
import { TaskManagerPlugin, type TaskManagerStartContract } from '../plugin';
|
||||
import { injectTask, setupTestServers, retry } from './lib';
|
||||
|
||||
|
|
|
@ -58,6 +58,9 @@ const config = {
|
|||
},
|
||||
metrics_reset_interval: 3000,
|
||||
claim_strategy: 'default',
|
||||
request_timeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
};
|
||||
|
||||
const getStatsWithTimestamp = ({
|
||||
|
|
|
@ -74,6 +74,9 @@ const config: TaskManagerConfig = {
|
|||
version_conflict_threshold: 80,
|
||||
worker_utilization_running_average_window: 5,
|
||||
claim_strategy: 'default',
|
||||
request_timeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
};
|
||||
|
||||
describe('createAggregator', () => {
|
||||
|
|
|
@ -54,6 +54,9 @@ describe('Configuration Statistics Aggregator', () => {
|
|||
},
|
||||
metrics_reset_interval: 3000,
|
||||
claim_strategy: 'default',
|
||||
request_timeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
};
|
||||
|
||||
const managedConfig = {
|
||||
|
|
|
@ -59,6 +59,9 @@ describe('createMonitoringStatsStream', () => {
|
|||
},
|
||||
metrics_reset_interval: 3000,
|
||||
claim_strategy: 'default',
|
||||
request_timeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
};
|
||||
|
||||
it('returns the initial config used to configure Task Manager', async () => {
|
||||
|
|
|
@ -79,6 +79,9 @@ const pluginInitializerContextParams = {
|
|||
},
|
||||
metrics_reset_interval: 3000,
|
||||
claim_strategy: 'default',
|
||||
request_timeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
};
|
||||
|
||||
describe('TaskManagerPlugin', () => {
|
||||
|
|
|
@ -241,6 +241,7 @@ export class TaskManagerPlugin
|
|||
adHocTaskCounter: this.adHocTaskCounter,
|
||||
allowReadingInvalidState: this.config.allow_reading_invalid_state,
|
||||
logger: this.logger,
|
||||
requestTimeouts: this.config.request_timeouts,
|
||||
});
|
||||
|
||||
const managedConfiguration = createManagedConfiguration({
|
||||
|
|
|
@ -84,6 +84,9 @@ describe('TaskPollingLifecycle', () => {
|
|||
},
|
||||
metrics_reset_interval: 3000,
|
||||
claim_strategy: 'default',
|
||||
request_timeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
},
|
||||
taskStore: mockTaskStore,
|
||||
logger: taskManagerLogger,
|
||||
|
|
|
@ -25,6 +25,7 @@ import { TaskTypeDictionary } from './task_type_dictionary';
|
|||
import { mockLogger } from './test_utils';
|
||||
import { AdHocTaskCounter } from './lib/adhoc_task_counter';
|
||||
import { asErr } from './lib/result_type';
|
||||
import { UpdateByQueryResponse } from '@elastic/elasticsearch/lib/api/types';
|
||||
|
||||
const mockGetValidatedTaskInstanceFromReading = jest.fn();
|
||||
const mockGetValidatedTaskInstanceForUpdating = jest.fn();
|
||||
|
@ -108,6 +109,9 @@ describe('TaskStore', () => {
|
|||
savedObjectsRepository: savedObjectsClient,
|
||||
adHocTaskCounter,
|
||||
allowReadingInvalidState: false,
|
||||
requestTimeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -280,6 +284,9 @@ describe('TaskStore', () => {
|
|||
savedObjectsRepository: savedObjectsClient,
|
||||
adHocTaskCounter,
|
||||
allowReadingInvalidState: false,
|
||||
requestTimeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -351,6 +358,9 @@ describe('TaskStore', () => {
|
|||
savedObjectsRepository: savedObjectsClient,
|
||||
adHocTaskCounter,
|
||||
allowReadingInvalidState: false,
|
||||
requestTimeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -459,6 +469,9 @@ describe('TaskStore', () => {
|
|||
savedObjectsRepository: savedObjectsClient,
|
||||
adHocTaskCounter,
|
||||
allowReadingInvalidState: false,
|
||||
requestTimeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -610,6 +623,9 @@ describe('TaskStore', () => {
|
|||
savedObjectsRepository: savedObjectsClient,
|
||||
adHocTaskCounter,
|
||||
allowReadingInvalidState: false,
|
||||
requestTimeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -693,6 +709,9 @@ describe('TaskStore', () => {
|
|||
savedObjectsRepository: savedObjectsClient,
|
||||
adHocTaskCounter,
|
||||
allowReadingInvalidState: false,
|
||||
requestTimeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -729,6 +748,9 @@ describe('TaskStore', () => {
|
|||
savedObjectsRepository: savedObjectsClient,
|
||||
adHocTaskCounter,
|
||||
allowReadingInvalidState: false,
|
||||
requestTimeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -768,6 +790,9 @@ describe('TaskStore', () => {
|
|||
savedObjectsRepository: savedObjectsClient,
|
||||
adHocTaskCounter,
|
||||
allowReadingInvalidState: false,
|
||||
requestTimeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -828,6 +853,9 @@ describe('TaskStore', () => {
|
|||
savedObjectsRepository: savedObjectsClient,
|
||||
adHocTaskCounter,
|
||||
allowReadingInvalidState: false,
|
||||
requestTimeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -923,6 +951,9 @@ describe('TaskStore', () => {
|
|||
savedObjectsRepository: savedObjectsClient,
|
||||
adHocTaskCounter,
|
||||
allowReadingInvalidState: false,
|
||||
requestTimeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
});
|
||||
|
||||
expect(await store.getLifecycle(task.id)).toEqual(status);
|
||||
|
@ -945,6 +976,9 @@ describe('TaskStore', () => {
|
|||
savedObjectsRepository: savedObjectsClient,
|
||||
adHocTaskCounter,
|
||||
allowReadingInvalidState: false,
|
||||
requestTimeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
});
|
||||
|
||||
expect(await store.getLifecycle(randomId())).toEqual(TaskLifecycleResult.NotFound);
|
||||
|
@ -965,6 +999,9 @@ describe('TaskStore', () => {
|
|||
savedObjectsRepository: savedObjectsClient,
|
||||
adHocTaskCounter,
|
||||
allowReadingInvalidState: false,
|
||||
requestTimeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
});
|
||||
|
||||
return expect(store.getLifecycle(randomId())).rejects.toThrow('Bad Request');
|
||||
|
@ -985,6 +1022,9 @@ describe('TaskStore', () => {
|
|||
savedObjectsRepository: savedObjectsClient,
|
||||
adHocTaskCounter,
|
||||
allowReadingInvalidState: false,
|
||||
requestTimeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -1155,6 +1195,9 @@ describe('TaskStore', () => {
|
|||
savedObjectsRepository: savedObjectsClient,
|
||||
adHocTaskCounter,
|
||||
allowReadingInvalidState: false,
|
||||
requestTimeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
});
|
||||
|
||||
expect(jest.requireMock('./task_validator').TaskValidator).toHaveBeenCalledWith({
|
||||
|
@ -1177,6 +1220,9 @@ describe('TaskStore', () => {
|
|||
savedObjectsRepository: savedObjectsClient,
|
||||
adHocTaskCounter,
|
||||
allowReadingInvalidState: true,
|
||||
requestTimeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
});
|
||||
|
||||
expect(jest.requireMock('./task_validator').TaskValidator).toHaveBeenCalledWith({
|
||||
|
@ -1186,4 +1232,41 @@ describe('TaskStore', () => {
|
|||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('updateByQuery', () => {
|
||||
let store: TaskStore;
|
||||
let esClient: ReturnType<typeof elasticsearchServiceMock.createClusterClient>['asInternalUser'];
|
||||
let childEsClient: ReturnType<
|
||||
typeof elasticsearchServiceMock.createClusterClient
|
||||
>['asInternalUser'];
|
||||
|
||||
beforeAll(() => {
|
||||
esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
|
||||
childEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
|
||||
esClient.child.mockReturnValue(childEsClient as unknown as Client);
|
||||
store = new TaskStore({
|
||||
logger: mockLogger(),
|
||||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
esClient,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
adHocTaskCounter,
|
||||
allowReadingInvalidState: false,
|
||||
requestTimeouts: {
|
||||
update_by_query: 1000,
|
||||
},
|
||||
});
|
||||
});
|
||||
test('should pass requestTimeout', async () => {
|
||||
childEsClient.updateByQuery.mockResponse({
|
||||
hits: { hits: [], total: 0, updated: 100, version_conflicts: 0 },
|
||||
} as UpdateByQueryResponse);
|
||||
await store.updateByQuery({ script: '' }, { max_docs: 10 });
|
||||
expect(childEsClient.updateByQuery).toHaveBeenCalledWith(expect.any(Object), {
|
||||
requestTimeout: 1000,
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -24,6 +24,7 @@ import {
|
|||
ElasticsearchClient,
|
||||
} from '@kbn/core/server';
|
||||
|
||||
import { RequestTimeoutsConfig } from './config';
|
||||
import { asOk, asErr, Result } from './lib/result_type';
|
||||
|
||||
import {
|
||||
|
@ -48,6 +49,7 @@ export interface StoreOpts {
|
|||
adHocTaskCounter: AdHocTaskCounter;
|
||||
allowReadingInvalidState: boolean;
|
||||
logger: Logger;
|
||||
requestTimeouts: RequestTimeoutsConfig;
|
||||
}
|
||||
|
||||
export interface SearchOpts {
|
||||
|
@ -108,6 +110,7 @@ export class TaskStore {
|
|||
private savedObjectsRepository: ISavedObjectsRepository;
|
||||
private serializer: ISavedObjectsSerializer;
|
||||
private adHocTaskCounter: AdHocTaskCounter;
|
||||
private requestTimeouts: RequestTimeoutsConfig;
|
||||
|
||||
/**
|
||||
* Constructs a new TaskStore.
|
||||
|
@ -136,6 +139,7 @@ export class TaskStore {
|
|||
// The poller doesn't need retry logic because it will try again at the next polling cycle
|
||||
maxRetries: 0,
|
||||
});
|
||||
this.requestTimeouts = opts.requestTimeouts;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -492,17 +496,20 @@ export class TaskStore {
|
|||
const { query } = ensureQueryOnlyReturnsTaskObjects(opts);
|
||||
try {
|
||||
const // eslint-disable-next-line @typescript-eslint/naming-convention
|
||||
{ total, updated, version_conflicts } = await this.esClientWithoutRetries.updateByQuery({
|
||||
index: this.index,
|
||||
ignore_unavailable: true,
|
||||
refresh: true,
|
||||
conflicts: 'proceed',
|
||||
body: {
|
||||
...opts,
|
||||
max_docs,
|
||||
query,
|
||||
{ total, updated, version_conflicts } = await this.esClientWithoutRetries.updateByQuery(
|
||||
{
|
||||
index: this.index,
|
||||
ignore_unavailable: true,
|
||||
refresh: true,
|
||||
conflicts: 'proceed',
|
||||
body: {
|
||||
...opts,
|
||||
max_docs,
|
||||
query,
|
||||
},
|
||||
},
|
||||
});
|
||||
{ requestTimeout: this.requestTimeouts.update_by_query }
|
||||
);
|
||||
|
||||
const conflictsCorrectedForContinuation = correctVersionConflictsForContinuation(
|
||||
updated,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue