Fix task manager polling flow controls (#153491)

Fixes https://github.com/elastic/kibana/issues/151938

In this PR, I'm re-writing the Task Manager poller so it doesn't run
concurrently when timeouts occur while also fixing the issue where
polling requests would pile up when polling takes time. To support this,
I've also made the following changes:
- Removed the observable monitor and the
`xpack.task_manager.max_poll_inactivity_cycles` setting
- Make the task store `search` and `updateByQuery` functions have no
retries. This prevents the request from retrying 5x whenever a timeout
occurs, causing each call taking up to 2 1/2 minutes before Kibana sees
the error (now down to 30s each). We have polling to manage retries in
these situations.
- Switch the task poller tests to use `sinon` for faking timers
- Removing the `assertStillInSetup` checks on plugin setup. Felt like a
maintenance burden that wasn't necessary to fix with my code changes.

The main code changes are within these files (to review thoroughly so
the polling cycle doesn't suddenly stop):
- x-pack/plugins/task_manager/server/polling/task_poller.ts
- x-pack/plugins/task_manager/server/polling_lifecycle.ts (easier to
review if you disregard whitespace `?w=1`)

## To verify
1. Tasks run normally (create a rule or something that goes through task
manager regularly).
2. When the update by query takes a while, the request is cancelled
after 30s or the time manually configured.
4. When the search for claimed tasks query takes a while, the request is
cancelled after 30s or the time manually configured.

**Tips:**
<details><summary>how to slowdown search for claimed task
queries</summary>

```
diff --git a/x-pack/plugins/task_manager/server/queries/task_claiming.ts b/x-pack/plugins/task_manager/server/queries/task_claiming.ts
index 07042650a37..2caefd63672 100644
--- a/x-pack/plugins/task_manager/server/queries/task_claiming.ts
+++ b/x-pack/plugins/task_manager/server/queries/task_claiming.ts
@@ -247,7 +247,7 @@ export class TaskClaiming {
         taskTypes,
       });

-    const docs = tasksUpdated > 0 ? await this.sweepForClaimedTasks(taskTypes, size) : [];
+    const docs = await this.sweepForClaimedTasks(taskTypes, size);

     this.emitEvents(docs.map((doc) => asTaskClaimEvent(doc.id, asOk(doc))));

@@ -346,6 +346,13 @@ export class TaskClaiming {
       size,
       sort: SortByRunAtAndRetryAt,
       seq_no_primary_term: true,
+      aggs: {
+        delay: {
+          shard_delay: {
+            value: '40s',
+          },
+        },
+      },
     });

     return docs;
```
</details>

<details><summary>how to slow down update by queries</summary>
Not the cleanest way but you'll see occasional request timeouts from the
updateByQuery calls. I had more luck creating rules running every 1s.

```
diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts
index a06ee7b918a..07aa81e5388 100644
--- a/x-pack/plugins/task_manager/server/task_store.ts
+++ b/x-pack/plugins/task_manager/server/task_store.ts
@@ -126,6 +126,7 @@ export class TaskStore {
       // Timeouts are retried and make requests timeout after (requestTimeout * (1 + maxRetries))
       // The poller doesn't need retry logic because it will try again at the next polling cycle
       maxRetries: 0,
+      requestTimeout: 900,
     });
   }

@@ -458,6 +459,7 @@ export class TaskStore {
           ignore_unavailable: true,
           refresh: true,
           conflicts: 'proceed',
+          requests_per_second: 1,
           body: {
             ...opts,
             max_docs,
```
</details>

---------

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Mike Côté 2023-05-03 09:33:10 -04:00 committed by GitHub
parent d96c5a636f
commit cb2e28d1e4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 398 additions and 768 deletions

View file

@ -45,7 +45,6 @@ The API returns the following:
"timestamp": "2021-02-16T11:29:05.055Z",
"value": {
"request_capacity": 1000,
"max_poll_inactivity_cycles": 10,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_running_average_window": 50,
"monitored_task_execution_thresholds": {

View file

@ -98,7 +98,6 @@ The API returns the following:
"timestamp": "2021-02-16T11:29:05.055Z",
"value": {
"request_capacity": 1000,
"max_poll_inactivity_cycles": 10,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_running_average_window": 50,
"monitored_task_execution_thresholds": {
@ -297,7 +296,6 @@ Evaluating the health stats, you can see the following output under `stats.confi
--------------------------------------------------
{
"request_capacity": 1000,
"max_poll_inactivity_cycles": 10,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_running_average_window": 50,
"monitored_task_execution_thresholds": {
@ -322,7 +320,6 @@ Now suppose the output under `stats.configuration.value` is the following:
--------------------------------------------------
{
"request_capacity": 1000,
"max_poll_inactivity_cycles": 10,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_running_average_window": 50,
"monitored_task_execution_thresholds": {

View file

@ -400,7 +400,6 @@ kibana_vars=(
xpack.securitySolution.prebuiltRulesPackageVersion
xpack.spaces.maxSpaces
xpack.task_manager.max_attempts
xpack.task_manager.max_poll_inactivity_cycles
xpack.task_manager.max_workers
xpack.task_manager.monitored_aggregated_stats_refresh_rate
xpack.task_manager.monitored_stats_required_freshness

View file

@ -44,7 +44,6 @@ The task_manager can be configured via `taskManager` config options (e.g. `taskM
- `max_attempts` - The maximum number of times a task will be attempted before being abandoned as failed
- `poll_interval` - How often the background worker should check the task_manager index for more work
- `max_poll_inactivity_cycles` - How many poll intervals is work allowed to block polling for before it's timed out. This does not include task execution, as task execution does not block the polling, but rather includes work needed to manage Task Manager's state.
- `index` - **deprecated** The name of the index that the task_manager will use. This is deprecated, and will be removed starting in 8.0
- `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10)
- `version_conflict_threshold` - The threshold percentage for workers experiencing version conflicts for shifting the polling interval

View file

@ -21,7 +21,6 @@ describe('config validation', () => {
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
@ -73,7 +72,6 @@ describe('config validation', () => {
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
@ -123,7 +121,6 @@ describe('config validation', () => {
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {

View file

@ -10,7 +10,6 @@ import { schema, TypeOf } from '@kbn/config-schema';
export const MAX_WORKERS_LIMIT = 100;
export const DEFAULT_MAX_WORKERS = 10;
export const DEFAULT_POLL_INTERVAL = 3000;
export const DEFAULT_MAX_POLL_INACTIVITY_CYCLES = 10;
export const DEFAULT_VERSION_CONFLICT_THRESHOLD = 80;
export const DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY = MAX_WORKERS_LIMIT;
@ -64,11 +63,6 @@ export const configSchema = schema.object(
defaultValue: DEFAULT_POLL_INTERVAL,
min: 100,
}),
/* How many poll interval cycles can work take before it's timed out. */
max_poll_inactivity_cycles: schema.number({
defaultValue: DEFAULT_MAX_POLL_INACTIVITY_CYCLES,
min: 1,
}),
/* How many requests can Task Manager buffer before it rejects new requests. */
request_capacity: schema.number({
// a nice round contrived number, feel free to change as we learn how it behaves

View file

@ -49,7 +49,6 @@ describe('EphemeralTaskLifecycle', () => {
max_attempts: 9,
poll_interval: 6000000,
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_required_freshness: 5000,

View file

@ -6,6 +6,7 @@
*/
import sinon from 'sinon';
import { Client } from '@elastic/elasticsearch';
import { elasticsearchServiceMock, savedObjectsRepositoryMock } from '@kbn/core/server/mocks';
import { SavedObjectsErrorHelpers, Logger } from '@kbn/core/server';
import { ADJUST_THROUGHPUT_INTERVAL } from '../lib/create_managed_configuration';
@ -43,7 +44,6 @@ describe('managed configuration', () => {
max_attempts: 9,
poll_interval: 3000,
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
monitored_aggregated_stats_refresh_rate: 60000,
monitored_stats_health_verbose_log: {
enabled: false,
@ -87,6 +87,9 @@ describe('managed configuration', () => {
const coreStart = coreMock.createStart();
coreStart.elasticsearch = esStart;
esStart.client.asInternalUser.child.mockReturnValue(
esStart.client.asInternalUser as unknown as Client
);
coreStart.savedObjects.createInternalRepository.mockReturnValue(savedObjectsClient);
taskManagerStart = await taskManager.start(coreStart);
@ -146,7 +149,8 @@ describe('managed configuration', () => {
});
test('should lower max workers when Elasticsearch returns "cannot execute [inline] scripts" error', async () => {
esStart.client.asInternalUser.search.mockImplementationOnce(async () => {
const childEsClient = esStart.client.asInternalUser.child({}) as jest.Mocked<Client>;
childEsClient.search.mockImplementationOnce(async () => {
throw inlineScriptError;
});
@ -165,7 +169,8 @@ describe('managed configuration', () => {
});
test('should increase poll interval when Elasticsearch returns "cannot execute [inline] scripts" error', async () => {
esStart.client.asInternalUser.search.mockImplementationOnce(async () => {
const childEsClient = esStart.client.asInternalUser.child({}) as jest.Mocked<Client>;
childEsClient.search.mockImplementationOnce(async () => {
throw inlineScriptError;
});

View file

@ -422,7 +422,6 @@ function getMockMonitoredHealth(overrides = {}): MonitoredHealth {
value: {
max_workers: 10,
poll_interval: 3000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,

View file

@ -942,7 +942,6 @@ function mockStats(
value: {
max_workers: 0,
poll_interval: 0,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,

View file

@ -18,7 +18,6 @@ describe('Configuration Statistics Aggregator', () => {
poll_interval: 6000000,
version_conflict_threshold: 80,
monitored_stats_required_freshness: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_health_verbose_log: {
@ -61,7 +60,6 @@ describe('Configuration Statistics Aggregator', () => {
expect(initial.value).toEqual({
max_workers: 10,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
@ -76,7 +74,6 @@ describe('Configuration Statistics Aggregator', () => {
expect(updatedWorkers.value).toEqual({
max_workers: 8,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
@ -91,7 +88,6 @@ describe('Configuration Statistics Aggregator', () => {
expect(updatedInterval.value).toEqual({
max_workers: 8,
poll_interval: 3000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,

View file

@ -14,7 +14,6 @@ import { ManagedConfiguration } from '../lib/create_managed_configuration';
const CONFIG_FIELDS_TO_EXPOSE = [
'request_capacity',
'max_poll_inactivity_cycles',
'monitored_aggregated_stats_refresh_rate',
'monitored_stats_running_average_window',
'monitored_task_execution_thresholds',

View file

@ -22,7 +22,6 @@ describe('createMonitoringStatsStream', () => {
poll_interval: 6000000,
version_conflict_threshold: 80,
monitored_stats_required_freshness: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_health_verbose_log: {
@ -77,7 +76,6 @@ describe('createMonitoringStatsStream', () => {
value: {
max_workers: 10,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
@ -111,7 +109,6 @@ describe('createMonitoringStatsStream', () => {
value: {
max_workers: 10,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
@ -145,7 +142,6 @@ describe('createMonitoringStatsStream', () => {
value: {
max_workers: 10,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,

View file

@ -42,7 +42,6 @@ const pluginInitializerContextParams = {
max_attempts: 9,
poll_interval: 3000,
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_health_verbose_log: {
@ -97,50 +96,6 @@ describe('TaskManagerPlugin', () => {
);
});
test('throws if setup methods are called after start', async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext<TaskManagerConfig>(
pluginInitializerContextParams
);
const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext);
const setupApi = await taskManagerPlugin.setup(coreMock.createSetup(), {
usageCollection: undefined,
});
// we only start a poller if we have task types that we support and we track
// phases (moving from Setup to Start) based on whether the poller is working
setupApi.registerTaskDefinitions({
setupTimeType: {
title: 'setupTimeType',
createTaskRunner: () => ({ async run() {} }),
},
});
await taskManagerPlugin.start(coreMock.createStart());
expect(() =>
setupApi.addMiddleware({
beforeSave: async (saveOpts) => saveOpts,
beforeRun: async (runOpts) => runOpts,
beforeMarkRunning: async (runOpts) => runOpts,
})
).toThrowErrorMatchingInlineSnapshot(
`"Cannot add Middleware after the task manager has started"`
);
expect(() =>
setupApi.registerTaskDefinitions({
lateRegisteredType: {
title: 'lateRegisteredType',
createTaskRunner: () => ({ async run() {} }),
},
})
).toThrowErrorMatchingInlineSnapshot(
`"Cannot register task definitions after the task manager has started"`
);
});
test('it logs a warning when the unsafe `exclude_task_types` config is used', async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext<TaskManagerConfig>({
...pluginInitializerContextParams,

View file

@ -183,11 +183,9 @@ export class TaskManagerPlugin
return {
index: TASK_MANAGER_INDEX,
addMiddleware: (middleware: Middleware) => {
this.assertStillInSetup('add Middleware');
this.middleware = addMiddlewareToChain(this.middleware, middleware);
},
registerTaskDefinitions: (taskDefinition: TaskDefinitionRegistry) => {
this.assertStillInSetup('register task definitions');
this.definitions.registerTaskDefinitions(taskDefinition);
},
};
@ -286,18 +284,6 @@ export class TaskManagerPlugin
getRegisteredTypes: () => this.definitions.getAllTypes(),
};
}
/**
* Ensures task manager hasn't started
*
* @param {string} the name of the operation being executed
* @returns void
*/
private assertStillInSetup(operation: string) {
if (this.taskPollingLifecycle?.isStarted) {
throw new Error(`Cannot ${operation} after the task manager has started`);
}
}
}
export function getElasticsearchAndSOAvailability(

View file

@ -5,7 +5,5 @@
* 2.0.
*/
export { createObservableMonitor } from './observable_monitor';
export { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
export { timeoutPromiseAfter } from './timeout_promise_after';
export { delayOnClaimConflicts } from './delay_on_claim_conflicts';

View file

@ -1,171 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { interval, from, Subject } from 'rxjs';
import { map, concatMap, takeWhile, take } from 'rxjs/operators';
import { createObservableMonitor } from './observable_monitor';
import { times } from 'lodash';
describe('Poll Monitor', () => {
test('returns a cold observable so that the monitored Observable is only created on demand', async () => {
const instantiator = jest.fn(() => new Subject());
createObservableMonitor(instantiator);
expect(instantiator).not.toHaveBeenCalled();
});
test('subscribing to the observable instantiates a new observable and pipes its results through', async () => {
const instantiator = jest.fn(() => from([0, 1, 2]));
const monitoredObservable = createObservableMonitor(instantiator);
expect(instantiator).not.toHaveBeenCalled();
return new Promise<void>((resolve) => {
const next = jest.fn();
monitoredObservable.pipe(take(3)).subscribe({
next,
complete: () => {
expect(instantiator).toHaveBeenCalled();
expect(next).toHaveBeenCalledWith(0);
expect(next).toHaveBeenCalledWith(1);
expect(next).toHaveBeenCalledWith(2);
resolve();
},
});
});
});
test('unsubscribing from the monitor prevents the monitor from resubscribing to the observable', async () => {
const heartbeatInterval = 1000;
const instantiator = jest.fn(() => interval(100));
const monitoredObservable = createObservableMonitor(instantiator, { heartbeatInterval });
return new Promise<void>((resolve) => {
const next = jest.fn();
monitoredObservable.pipe(take(3)).subscribe({
next,
complete: () => {
expect(instantiator).toHaveBeenCalledTimes(1);
setTimeout(() => {
expect(instantiator).toHaveBeenCalledTimes(1);
resolve();
}, heartbeatInterval * 2);
},
});
});
});
test(`ensures the observable subscription hasn't closed at a fixed interval and reinstantiates if it has`, async () => {
let iteration = 0;
const instantiator = jest.fn(() => {
iteration++;
return interval(100).pipe(
map((index) => `${iteration}:${index}`),
// throw on 3rd value of the first iteration
map((value, index) => {
if (iteration === 1 && index === 3) {
throw new Error('Source threw an error!');
}
return value;
})
);
});
const onError = jest.fn();
const monitoredObservable = createObservableMonitor(instantiator, { onError });
return new Promise<void>((resolve) => {
const next = jest.fn();
const error = jest.fn();
monitoredObservable
.pipe(
// unsubscribe once we confirm we have successfully recovered from an error in the source
takeWhile(function validateExpectation() {
try {
[...times(3, (index) => `1:${index}`), ...times(5, (index) => `2:${index}`)].forEach(
(expecteArg) => {
expect(next).toHaveBeenCalledWith(expecteArg);
}
);
return false;
} catch {
return true;
}
})
)
.subscribe({
next,
error,
complete: () => {
expect(error).not.toHaveBeenCalled();
expect(onError).toHaveBeenCalledWith(new Error('Source threw an error!'));
resolve();
},
});
});
});
test(`ensures the observable subscription hasn't hung at a fixed interval and reinstantiates if it has`, async () => {
let iteration = 0;
const instantiator = jest.fn(() => {
iteration++;
return interval(100).pipe(
map((index) => `${iteration}:${index}`),
// hang on 3rd value of the first iteration
concatMap((value, index) => {
if (iteration === 1 && index === 3) {
return new Promise(() => {
// never resolve or reject, just hang for EVER
});
}
return Promise.resolve(value);
})
);
});
const onError = jest.fn();
const monitoredObservable = createObservableMonitor(instantiator, {
onError,
heartbeatInterval: 100,
inactivityTimeout: 500,
});
return new Promise<void>((resolve) => {
const next = jest.fn();
const error = jest.fn();
monitoredObservable
.pipe(
// unsubscribe once we confirm we have successfully recovered from an error in the source
takeWhile(function validateExpectation() {
try {
[...times(3, (index) => `1:${index}`), ...times(5, (index) => `2:${index}`)].forEach(
(expecteArg) => {
expect(next).toHaveBeenCalledWith(expecteArg);
}
);
return false;
} catch {
return true;
}
})
)
.subscribe({
next,
error,
complete: () => {
expect(error).not.toHaveBeenCalled();
expect(onError).toHaveBeenCalledWith(
new Error(`Observable Monitor: Hung Observable restarted after 500ms of inactivity`)
);
resolve();
},
});
});
});
});

View file

@ -1,81 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Subject, Observable, throwError, timer, Subscription } from 'rxjs';
import { noop } from 'lodash';
import { exhaustMap, tap, takeUntil, switchMap, switchMapTo, catchError } from 'rxjs/operators';
const DEFAULT_HEARTBEAT_INTERVAL = 1000;
// by default don't monitor inactivity as not all observables are expected
// to emit at any kind of fixed interval
const DEFAULT_INACTIVITY_TIMEOUT = 0;
export interface ObservableMonitorOptions<E> {
heartbeatInterval?: number;
inactivityTimeout?: number;
onError?: (err: E) => void;
}
export function createObservableMonitor<T, E>(
observableFactory: () => Observable<T>,
{
heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL,
inactivityTimeout = DEFAULT_INACTIVITY_TIMEOUT,
onError = noop,
}: ObservableMonitorOptions<E> = {}
): Observable<T> {
return new Observable((subscriber) => {
const subscription: Subscription = timer(0, heartbeatInterval)
.pipe(
// switch from the heartbeat interval to the instantiated observable until it completes / errors
exhaustMap(() => takeUntilDurationOfInactivity(observableFactory(), inactivityTimeout)),
// if an error is thrown, catch it, notify and try to recover
catchError((err: E, source$: Observable<T>) => {
onError(err);
// return source, which will allow our observable to recover from this error and
// keep pulling values out of it
return source$;
})
)
.subscribe(subscriber);
return () => {
subscription.unsubscribe();
};
});
}
function takeUntilDurationOfInactivity<T>(source$: Observable<T>, inactivityTimeout: number) {
// if there's a specified maximum duration of inactivity, only take values until that
// duration elapses without any new events
if (inactivityTimeout) {
// an observable which starts a timer every time a new value is passed in, replacing the previous timer
// if the timer goes off without having been reset by a fresh value, it will emit a single event - which will
// notify our monitor that the source has been inactive for too long
const inactivityMonitor$ = new Subject<void>();
return source$.pipe(
takeUntil(
inactivityMonitor$.pipe(
// on each new emited value, start a new timer, discarding the old one
switchMap(() => timer(inactivityTimeout)),
// every time a timer expires (meaning no new value came in on time to discard it)
// throw an error, forcing the monitor instantiate a new observable
switchMapTo(
throwError(
new Error(
`Observable Monitor: Hung Observable restarted after ${inactivityTimeout}ms of inactivity`
)
)
)
)
),
// poke `inactivityMonitor$` so it restarts the timer
tap(() => inactivityMonitor$.next())
);
}
return source$;
}

View file

@ -5,318 +5,338 @@
* 2.0.
*/
import sinon from 'sinon';
import { of, BehaviorSubject } from 'rxjs';
import { none } from 'fp-ts/lib/Option';
import { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
import { fakeSchedulers } from 'rxjs-marbles/jest';
import { sleep, resolvable, Resolvable } from '../test_utils';
import { loggingSystemMock } from '@kbn/core/server/mocks';
import { asOk, asErr } from '../lib/result_type';
describe('TaskPoller', () => {
beforeEach(() => jest.useFakeTimers({ legacyFakeTimers: true }));
let clock: sinon.SinonFakeTimers;
test(
'intializes the poller with the provided interval',
fakeSchedulers(async (advance) => {
const pollInterval = 100;
const halfInterval = Math.floor(pollInterval / 2);
beforeEach(() => {
clock = sinon.useFakeTimers({ toFake: ['Date', 'setTimeout', 'clearTimeout'] });
});
const work = jest.fn(async () => true);
createTaskPoller<void, boolean>({
logger: loggingSystemMock.create().get(),
pollInterval$: of(pollInterval),
pollIntervalDelay$: of(0),
getCapacity: () => 1,
work,
workTimeout: pollInterval * 5,
}).subscribe(() => {});
afterEach(() => clock.restore());
// `work` is async, we have to force a node `tick`
await sleep(0);
advance(halfInterval);
expect(work).toHaveBeenCalledTimes(0);
advance(halfInterval);
test('intializes the poller with the provided interval', async () => {
const pollInterval = 100;
const halfInterval = Math.floor(pollInterval / 2);
await sleep(0);
expect(work).toHaveBeenCalledTimes(1);
const work = jest.fn(async () => true);
createTaskPoller<void, boolean>({
initialPollInterval: pollInterval,
logger: loggingSystemMock.create().get(),
pollInterval$: of(pollInterval),
pollIntervalDelay$: of(0),
getCapacity: () => 1,
work,
}).start();
await sleep(0);
await sleep(0);
advance(pollInterval + 10);
await sleep(0);
expect(work).toHaveBeenCalledTimes(2);
})
);
expect(work).toHaveBeenCalledTimes(1);
test(
'poller adapts to pollInterval changes',
fakeSchedulers(async (advance) => {
const pollInterval = 100;
const pollInterval$ = new BehaviorSubject(pollInterval);
// `work` is async, we have to force a node `tick`
await new Promise((resolve) => setImmediate(resolve));
clock.tick(halfInterval);
expect(work).toHaveBeenCalledTimes(1);
clock.tick(halfInterval);
const work = jest.fn(async () => true);
createTaskPoller<void, boolean>({
logger: loggingSystemMock.create().get(),
pollInterval$,
pollIntervalDelay$: of(0),
getCapacity: () => 1,
work,
workTimeout: pollInterval * 5,
}).subscribe(() => {});
await new Promise((resolve) => setImmediate(resolve));
expect(work).toHaveBeenCalledTimes(2);
// `work` is async, we have to force a node `tick`
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(1);
await new Promise((resolve) => setImmediate(resolve));
clock.tick(pollInterval + 10);
await new Promise((resolve) => setImmediate(resolve));
expect(work).toHaveBeenCalledTimes(3);
});
pollInterval$.next(pollInterval * 2);
test('poller adapts to pollInterval changes', async () => {
const pollInterval = 100;
const pollInterval$ = new BehaviorSubject(pollInterval);
// `work` is async, we have to force a node `tick`
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(1);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(2);
const work = jest.fn(async () => true);
createTaskPoller<void, boolean>({
initialPollInterval: pollInterval,
logger: loggingSystemMock.create().get(),
pollInterval$,
pollIntervalDelay$: of(0),
getCapacity: () => 1,
work,
}).start();
pollInterval$.next(pollInterval / 2);
expect(work).toHaveBeenCalledTimes(1);
// `work` is async, we have to force a node `tick`
await sleep(0);
advance(pollInterval / 2);
expect(work).toHaveBeenCalledTimes(3);
})
);
// `work` is async, we have to force a node `tick`
await new Promise((resolve) => setImmediate(resolve));
clock.tick(pollInterval);
expect(work).toHaveBeenCalledTimes(2);
test(
'filters interval polling on capacity',
fakeSchedulers(async (advance) => {
const pollInterval = 100;
pollInterval$.next(pollInterval * 2);
const work = jest.fn(async () => true);
// `work` is async, we have to force a node `tick`
await new Promise((resolve) => setImmediate(resolve));
clock.tick(pollInterval);
expect(work).toHaveBeenCalledTimes(2);
clock.tick(pollInterval);
expect(work).toHaveBeenCalledTimes(3);
let hasCapacity = true;
createTaskPoller<void, boolean>({
logger: loggingSystemMock.create().get(),
pollInterval$: of(pollInterval),
pollIntervalDelay$: of(0),
work,
workTimeout: pollInterval * 5,
getCapacity: () => (hasCapacity ? 1 : 0),
}).subscribe(() => {});
pollInterval$.next(pollInterval / 2);
expect(work).toHaveBeenCalledTimes(0);
// `work` is async, we have to force a node `tick`
await new Promise((resolve) => setImmediate(resolve));
clock.tick(pollInterval / 2);
expect(work).toHaveBeenCalledTimes(4);
});
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(1);
test('filters interval polling on capacity', async () => {
const pollInterval = 100;
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(2);
const work = jest.fn(async () => true);
hasCapacity = false;
let hasCapacity = true;
createTaskPoller<void, boolean>({
initialPollInterval: pollInterval,
logger: loggingSystemMock.create().get(),
pollInterval$: of(pollInterval),
pollIntervalDelay$: of(0),
work,
getCapacity: () => (hasCapacity ? 1 : 0),
}).start();
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(1);
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(2);
await new Promise((resolve) => setImmediate(resolve));
clock.tick(pollInterval);
expect(work).toHaveBeenCalledTimes(2);
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(2);
await new Promise((resolve) => setImmediate(resolve));
clock.tick(pollInterval);
expect(work).toHaveBeenCalledTimes(3);
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(2);
hasCapacity = false;
hasCapacity = true;
await new Promise((resolve) => setImmediate(resolve));
clock.tick(pollInterval);
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(3);
await new Promise((resolve) => setImmediate(resolve));
clock.tick(pollInterval);
expect(work).toHaveBeenCalledTimes(3);
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(4);
})
);
await new Promise((resolve) => setImmediate(resolve));
clock.tick(pollInterval);
expect(work).toHaveBeenCalledTimes(3);
test(
'waits for work to complete before emitting the next event',
fakeSchedulers(async (advance) => {
const pollInterval = 100;
await new Promise((resolve) => setImmediate(resolve));
clock.tick(pollInterval);
expect(work).toHaveBeenCalledTimes(3);
const worker = resolvable();
hasCapacity = true;
const handler = jest.fn();
createTaskPoller<string, string[]>({
logger: loggingSystemMock.create().get(),
pollInterval$: of(pollInterval),
pollIntervalDelay$: of(0),
work: async (...args) => {
await worker;
return args;
},
getCapacity: () => 5,
workTimeout: pollInterval * 5,
}).subscribe(handler);
await new Promise((resolve) => setImmediate(resolve));
clock.tick(pollInterval);
expect(work).toHaveBeenCalledTimes(4);
advance(pollInterval);
await new Promise((resolve) => setImmediate(resolve));
clock.tick(pollInterval);
expect(work).toHaveBeenCalledTimes(5);
});
// work should now be in progress
test('waits for work to complete before emitting the next event', async () => {
const pollInterval = 100;
advance(pollInterval);
await sleep(pollInterval);
const { promise: worker, resolve: resolveWorker } = createResolvablePromise();
expect(handler).toHaveBeenCalledTimes(0);
const handler = jest.fn();
const poller = createTaskPoller<string, string[]>({
initialPollInterval: pollInterval,
logger: loggingSystemMock.create().get(),
pollInterval$: of(pollInterval),
pollIntervalDelay$: of(0),
work: async (...args) => {
await worker;
return args;
},
getCapacity: () => 5,
});
poller.events$.subscribe(handler);
poller.start();
worker.resolve();
clock.tick(pollInterval);
advance(pollInterval);
await sleep(pollInterval);
// work should now be in progress
// We did 3x "advance(pollInterval)"
expect(handler).toHaveBeenCalledTimes(3);
clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));
advance(pollInterval);
await sleep(pollInterval);
expect(handler).toHaveBeenCalledTimes(4);
})
);
expect(handler).toHaveBeenCalledTimes(0);
test(
'work times out when it exceeds a predefined amount of time',
fakeSchedulers(async (advance) => {
const pollInterval = 100;
const workTimeout = pollInterval * 2;
resolveWorker({});
const handler = jest.fn();
clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));
type ResolvableTupple = [string, PromiseLike<void> & Resolvable];
createTaskPoller<[string, Resolvable], string[]>({
logger: loggingSystemMock.create().get(),
pollInterval$: of(pollInterval),
pollIntervalDelay$: of(0),
work: async () => {
return [];
},
getCapacity: () => 5,
workTimeout,
}).subscribe(handler);
expect(handler).toHaveBeenCalledTimes(1);
const one: ResolvableTupple = ['one', resolvable()];
clock.tick(pollInterval);
// split these into two payloads
advance(pollInterval);
expect(handler).toHaveBeenCalledTimes(1);
});
const two: ResolvableTupple = ['two', resolvable()];
const three: ResolvableTupple = ['three', resolvable()];
test('returns an error when polling for work fails', async () => {
const pollInterval = 100;
advance(workTimeout);
await sleep(workTimeout);
const handler = jest.fn();
const poller = createTaskPoller<string, string[]>({
initialPollInterval: pollInterval,
logger: loggingSystemMock.create().get(),
pollInterval$: of(pollInterval),
pollIntervalDelay$: of(0),
work: async (...args) => {
throw new Error('failed to work');
},
getCapacity: () => 5,
});
poller.events$.subscribe(handler);
poller.start();
// one resolves too late!
one[1].resolve();
clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));
expect(handler).toHaveBeenCalledWith(
asErr(
new PollingError<string>(
'Failed to poll for work: Error: work has timed out',
PollingErrorType.WorkError,
none
)
)
);
expect(handler.mock.calls[0][0].error.type).toEqual(PollingErrorType.WorkError);
const expectedError = new PollingError<string>(
'Failed to poll for work: Error: failed to work',
PollingErrorType.WorkError,
none
);
expect(handler).toHaveBeenCalledWith(asErr(expectedError));
expect(handler.mock.calls[0][0].error.type).toEqual(PollingErrorType.WorkError);
});
// two and three in time
two[1].resolve();
three[1].resolve();
test('continues polling after work fails', async () => {
const pollInterval = 100;
advance(pollInterval);
await sleep(pollInterval);
const handler = jest.fn();
let callCount = 0;
const work = jest.fn(async () => {
callCount++;
if (callCount === 2) {
throw new Error('failed to work');
}
return callCount;
});
const poller = createTaskPoller<string, number>({
initialPollInterval: pollInterval,
logger: loggingSystemMock.create().get(),
pollInterval$: of(pollInterval),
pollIntervalDelay$: of(0),
work,
getCapacity: () => 5,
});
poller.events$.subscribe(handler);
poller.start();
expect(handler).toHaveBeenCalledTimes(4);
})
);
clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));
test(
'returns an error when polling for work fails',
fakeSchedulers(async (advance) => {
const pollInterval = 100;
expect(handler).toHaveBeenCalledWith(asOk(1));
const handler = jest.fn();
createTaskPoller<string, string[]>({
logger: loggingSystemMock.create().get(),
pollInterval$: of(pollInterval),
pollIntervalDelay$: of(0),
work: async (...args) => {
throw new Error('failed to work');
},
workTimeout: pollInterval * 5,
getCapacity: () => 5,
}).subscribe(handler);
clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));
advance(pollInterval);
await sleep(0);
const expectedError = new PollingError<string>(
'Failed to poll for work: Error: failed to work',
PollingErrorType.WorkError,
none
);
expect(handler).toHaveBeenCalledWith(asErr(expectedError));
expect(handler.mock.calls[1][0].error.type).toEqual(PollingErrorType.WorkError);
expect(handler).not.toHaveBeenCalledWith(asOk(2));
const expectedError = new PollingError<string>(
'Failed to poll for work: Error: failed to work',
PollingErrorType.WorkError,
none
);
expect(handler).toHaveBeenCalledWith(asErr(expectedError));
expect(handler.mock.calls[0][0].error.type).toEqual(PollingErrorType.WorkError);
})
);
clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));
test(
'continues polling after work fails',
fakeSchedulers(async (advance) => {
const pollInterval = 100;
expect(handler).toHaveBeenCalledWith(asOk(3));
});
const handler = jest.fn();
let callCount = 0;
const work = jest.fn(async () => {
callCount++;
if (callCount === 2) {
throw new Error('failed to work');
}
return callCount;
});
createTaskPoller<string, number>({
logger: loggingSystemMock.create().get(),
pollInterval$: of(pollInterval),
pollIntervalDelay$: of(0),
work,
workTimeout: pollInterval * 5,
getCapacity: () => 5,
}).subscribe(handler);
test(`doesn't start polling until start is called`, async () => {
const pollInterval = 100;
advance(pollInterval);
await sleep(0);
const work = jest.fn(async () => true);
const taskPoller = createTaskPoller<void, boolean>({
initialPollInterval: pollInterval,
logger: loggingSystemMock.create().get(),
pollInterval$: of(pollInterval),
pollIntervalDelay$: of(0),
getCapacity: () => 1,
work,
});
expect(handler).toHaveBeenCalledWith(asOk(1));
expect(work).toHaveBeenCalledTimes(0);
advance(pollInterval);
await sleep(0);
clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));
expect(work).toHaveBeenCalledTimes(0);
const expectedError = new PollingError<string>(
'Failed to poll for work: Error: failed to work',
PollingErrorType.WorkError,
none
);
expect(handler).toHaveBeenCalledWith(asErr(expectedError));
expect(handler.mock.calls[1][0].error.type).toEqual(PollingErrorType.WorkError);
expect(handler).not.toHaveBeenCalledWith(asOk(2));
clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));
expect(work).toHaveBeenCalledTimes(0);
advance(pollInterval);
await sleep(0);
// Start the poller here
taskPoller.start();
await new Promise((resolve) => setImmediate(resolve));
expect(work).toHaveBeenCalledTimes(1);
expect(handler).toHaveBeenCalledWith(asOk(3));
})
);
clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));
expect(work).toHaveBeenCalledTimes(2);
});
test(`stops polling after stop is called`, async () => {
const pollInterval = 100;
const work = jest.fn(async () => true);
const taskPoller = createTaskPoller<void, boolean>({
initialPollInterval: pollInterval,
logger: loggingSystemMock.create().get(),
pollInterval$: of(pollInterval),
pollIntervalDelay$: of(0),
getCapacity: () => 1,
work,
});
taskPoller.start();
await new Promise((resolve) => setImmediate(resolve));
expect(work).toHaveBeenCalledTimes(1);
clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));
expect(work).toHaveBeenCalledTimes(2);
clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));
expect(work).toHaveBeenCalledTimes(3);
// Stop the poller here
taskPoller.stop();
await new Promise((resolve) => setImmediate(resolve));
expect(work).toHaveBeenCalledTimes(3);
clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));
expect(work).toHaveBeenCalledTimes(3);
clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));
expect(work).toHaveBeenCalledTimes(3);
});
});
function createResolvablePromise() {
let resolve: (value: unknown) => void = () => {};
const promise = new Promise((r) => {
resolve = r;
});
// The "resolve = r;" code path is called before this
return { promise, resolve };
}

View file

@ -9,21 +9,21 @@
* This module contains the logic for polling the task manager index for new work.
*/
import { of, Observable, combineLatest, timer } from 'rxjs';
import { map, filter, concatMap, tap, catchError, switchMap } from 'rxjs/operators';
import { Observable, Subject } from 'rxjs';
import { Option, none } from 'fp-ts/lib/Option';
import { Logger } from '@kbn/core/server';
import { Result, map as mapResult, asOk, asErr, promiseResult } from '../lib/result_type';
import { timeoutPromiseAfter } from './timeout_promise_after';
import { Result, asOk, asErr } from '../lib/result_type';
type WorkFn<H> = () => Promise<H>;
interface Opts<H> {
logger: Logger;
initialPollInterval: number;
pollInterval$: Observable<number>;
pollIntervalDelay$: Observable<number>;
getCapacity: () => number;
work: () => Promise<H>;
workTimeout: number;
work: WorkFn<H>;
}
/**
@ -39,56 +39,80 @@ interface Opts<H> {
*/
export function createTaskPoller<T, H>({
logger,
initialPollInterval,
pollInterval$,
pollIntervalDelay$,
getCapacity,
work,
workTimeout,
}: Opts<H>): Observable<Result<H, PollingError<T>>> {
}: Opts<H>): {
start: () => void;
stop: () => void;
events$: Observable<Result<H, PollingError<T>>>;
} {
const hasCapacity = () => getCapacity() > 0;
let running: boolean = false;
let timeoutId: NodeJS.Timeout | null = null;
let hasSubscribed: boolean = false;
let pollInterval = initialPollInterval;
let pollIntervalDelay = 0;
const subject = new Subject<Result<H, PollingError<T>>>();
const requestWorkProcessing$ = combineLatest([
// emit a polling event on a fixed interval
pollInterval$.pipe(
tap((period) => {
logger.debug(`Task poller now using interval of ${period}ms`);
})
),
pollIntervalDelay$.pipe(
tap((pollDelay) => {
logger.debug(`Task poller now delaying emission by ${pollDelay}ms`);
})
),
])
.pipe(
// We don't have control over `pollDelay` in the poller, and a change to `delayOnClaimConflicts` could accidentally cause us to pause Task Manager
// polling for a far longer duration that we intended.
// Since the goal is to shift it within the range of `period`, we use modulo as a safe guard to ensure this doesn't happen.
switchMap(([period, pollDelay]) => timer(period + (pollDelay % period), period)),
map(() => none)
)
.pipe(
// only emit polling events when there's capacity to handle them
filter(hasCapacity),
// Run a cycle to poll for work
concatMap(async () => {
return mapResult<H, Error, Result<H, PollingError<T>>>(
await promiseResult<H, Error>(
timeoutPromiseAfter<H, Error>(
work(),
workTimeout,
() => new Error(`work has timed out`)
)
),
(workResult) => asOk(workResult),
(err: Error) => asPollingError<T>(err, PollingErrorType.WorkError)
);
}),
// catch errors during polling for work
catchError((err: Error) => of(asPollingError<T>(err, PollingErrorType.WorkError)))
);
async function runCycle() {
timeoutId = null;
const start = Date.now();
if (hasCapacity()) {
try {
const result = await work();
subject.next(asOk(result));
} catch (e) {
subject.next(asPollingError<T>(e, PollingErrorType.WorkError));
}
}
if (running) {
// Set the next runCycle call
timeoutId = setTimeout(
runCycle,
Math.max(pollInterval - (Date.now() - start) + (pollIntervalDelay % pollInterval), 0)
);
// Reset delay, it's designed to shuffle only once
pollIntervalDelay = 0;
}
}
return requestWorkProcessing$;
function subscribe() {
if (hasSubscribed) {
return;
}
pollInterval$.subscribe((interval) => {
pollInterval = interval;
logger.debug(`Task poller now using interval of ${interval}ms`);
});
pollIntervalDelay$.subscribe((delay) => {
pollIntervalDelay = delay;
logger.debug(`Task poller now delaying emission by ${delay}ms`);
});
hasSubscribed = true;
}
return {
events$: subject,
start: () => {
if (!running) {
running = true;
runCycle();
// We need to subscribe shortly after start. Otherwise, the observables start emiting events
// too soon for the task run statistics module to capture.
setTimeout(() => subscribe(), 0);
}
},
stop: () => {
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}
running = false;
},
};
}
export enum PollingErrorType {

View file

@ -1,34 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { timeoutPromiseAfter } from './timeout_promise_after';
const delay = (ms: number, result: unknown) =>
new Promise((resolve) => setTimeout(() => resolve(result), ms));
const delayRejection = (ms: number, result: unknown) =>
new Promise((resolve, reject) => setTimeout(() => reject(result), ms));
describe('Promise Timeout', () => {
test('resolves when wrapped promise resolves', async () => {
return expect(
timeoutPromiseAfter(delay(100, 'OK'), 1000, () => 'TIMEOUT ERR')
).resolves.toMatchInlineSnapshot(`"OK"`);
});
test('reject when wrapped promise rejects', async () => {
return expect(
timeoutPromiseAfter(delayRejection(100, 'ERR'), 1000, () => 'TIMEOUT ERR')
).rejects.toMatchInlineSnapshot(`"ERR"`);
});
test('reject it the timeout elapses', async () => {
return expect(
timeoutPromiseAfter(delay(1000, 'OK'), 100, () => 'TIMEOUT ERR')
).rejects.toMatchInlineSnapshot(`"TIMEOUT ERR"`);
});
});

View file

@ -1,17 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export function timeoutPromiseAfter<T, G>(
future: Promise<T>,
ms: number,
onTimeout: () => G
): Promise<T> {
return new Promise((resolve, reject) => {
setTimeout(() => reject(onTimeout()), ms);
future.then(resolve).catch(reject);
});
}

View file

@ -47,7 +47,6 @@ describe('TaskPollingLifecycle', () => {
max_attempts: 9,
poll_interval: 6000000,
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_health_verbose_log: {

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { Subject, Observable, Subscription } from 'rxjs';
import { Subject, Observable } from 'rxjs';
import { pipe } from 'fp-ts/lib/pipeable';
import { map as mapOptional } from 'fp-ts/lib/Option';
import { tap } from 'rxjs/operators';
@ -32,12 +32,7 @@ import { fillPool, FillPoolResult, TimedFillPoolResult } from './lib/fill_pool';
import { Middleware } from './lib/middleware';
import { intervalFromNow } from './lib/intervals';
import { ConcreteTaskInstance } from './task';
import {
createTaskPoller,
PollingError,
PollingErrorType,
createObservableMonitor,
} from './polling';
import { createTaskPoller, PollingError, PollingErrorType } from './polling';
import { TaskPool } from './task_pool';
import { TaskManagerRunner, TaskRunner } from './task_running';
import { TaskStore } from './task_store';
@ -83,8 +78,6 @@ export class TaskPollingLifecycle {
public pool: TaskPool;
// all task related events (task claimed, task marked as running, etc.) are emitted through events$
private events$ = new Subject<TaskLifecycleEvent>();
// our subscription to the poller
private pollingSubscription: Subscription = Subscription.EMPTY;
private middleware: Middleware;
@ -153,8 +146,7 @@ export class TaskPollingLifecycle {
// pipe taskClaiming events into the lifecycle event stream
this.taskClaiming.events.subscribe(emitEvent);
const { max_poll_inactivity_cycles: maxPollInactivityCycles, poll_interval: pollInterval } =
config;
const { poll_interval: pollInterval } = config;
const pollIntervalDelay$ = delayOnClaimConflicts(
maxWorkersConfiguration$,
@ -164,65 +156,33 @@ export class TaskPollingLifecycle {
config.monitored_stats_running_average_window
).pipe(tap((delay) => emitEvent(asTaskManagerStatEvent('pollingDelay', asOk(delay)))));
// the task poller that polls for work on fixed intervals and on demand
const poller$: Observable<Result<TimedFillPoolResult, PollingError<string>>> =
createObservableMonitor<Result<TimedFillPoolResult, PollingError<string>>, Error>(
() =>
createTaskPoller<string, TimedFillPoolResult>({
logger,
pollInterval$: pollIntervalConfiguration$,
pollIntervalDelay$,
getCapacity: () => {
const capacity = this.pool.availableWorkers;
if (!capacity) {
// if there isn't capacity, emit a load event so that we can expose how often
// high load causes the poller to skip work (work isn't called when there is no capacity)
this.emitEvent(asTaskManagerStatEvent('load', asOk(this.pool.workerLoad)));
const poller = createTaskPoller<string, TimedFillPoolResult>({
logger,
initialPollInterval: pollInterval,
pollInterval$: pollIntervalConfiguration$,
pollIntervalDelay$,
getCapacity: () => {
const capacity = this.pool.availableWorkers;
if (!capacity) {
// if there isn't capacity, emit a load event so that we can expose how often
// high load causes the poller to skip work (work isn'tcalled when there is no capacity)
this.emitEvent(asTaskManagerStatEvent('load', asOk(this.pool.workerLoad)));
// Emit event indicating task manager utilization
this.emitEvent(
asTaskManagerStatEvent('workerUtilization', asOk(this.pool.workerLoad))
);
}
return capacity;
},
work: this.pollForWork,
// Time out the `work` phase if it takes longer than a certain number of polling cycles
// The `work` phase includes the prework needed *before* executing a task
// (such as polling for new work, marking tasks as running etc.) but does not
// include the time of actually running the task
workTimeout: pollInterval * maxPollInactivityCycles,
}).pipe(
tap(
mapOk(() => {
// Emit event indicating task manager utilization % at the end of a polling cycle
this.emitEvent(
asTaskManagerStatEvent('workerUtilization', asOk(this.pool.workerLoad))
);
})
)
),
{
heartbeatInterval: pollInterval,
// Time out the poller itself if it has failed to complete the entire stream for a certain amount of time.
// This is different that the `work` timeout above, as the poller could enter an invalid state where
// it fails to complete a cycle even thought `work` is completing quickly.
// We grant it a single cycle longer than the time alotted to `work` so that timing out the `work`
// doesn't get short circuited by the monitor reinstantiating the poller all together (a far more expensive
// operation than just timing out the `work` internally)
inactivityTimeout: pollInterval * (maxPollInactivityCycles + 1),
onError: (error) => {
logger.error(`[Task Poller Monitor]: ${error.message}`);
},
// Emit event indicating task manager utilization
this.emitEvent(asTaskManagerStatEvent('workerUtilization', asOk(this.pool.workerLoad)));
}
);
return capacity;
},
work: this.pollForWork,
});
this.subscribeToPoller(poller.events$);
elasticsearchAndSOAvailability$.subscribe((areESAndSOAvailable) => {
if (areESAndSOAvailable && !this.isStarted) {
if (areESAndSOAvailable) {
// start polling for work
this.pollingSubscription = this.subscribeToPoller(poller$);
} else if (!areESAndSOAvailable && this.isStarted) {
this.pollingSubscription.unsubscribe();
poller.start();
} else if (!areESAndSOAvailable) {
poller.stop();
this.pool.cancelRunningTasks();
}
});
@ -252,10 +212,6 @@ export class TaskPollingLifecycle {
});
};
public get isStarted() {
return !this.pollingSubscription.closed;
}
private pollForWork = async (): Promise<TimedFillPoolResult> => {
return fillPool(
// claim available tasks
@ -313,6 +269,14 @@ export class TaskPollingLifecycle {
})
)
)
.pipe(
tap(
mapOk(() => {
// Emit event indicating task manager utilization % at the end of a polling cycle
this.emitEvent(asTaskManagerStatEvent('workerUtilization', asOk(this.pool.workerLoad)));
})
)
)
.subscribe((result: Result<TimedFillPoolResult, PollingError<string>>) => {
this.emitEvent(
map(

View file

@ -678,7 +678,6 @@ function mockHealthStats(overrides = {}) {
value: {
max_workers: 10,
poll_interval: 3000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,

View file

@ -5,6 +5,7 @@
* 2.0.
*/
import { Client } from '@elastic/elasticsearch';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import _ from 'lodash';
import { first } from 'rxjs/operators';
@ -227,9 +228,14 @@ describe('TaskStore', () => {
describe('fetch', () => {
let store: TaskStore;
let esClient: ReturnType<typeof elasticsearchServiceMock.createClusterClient>['asInternalUser'];
let childEsClient: ReturnType<
typeof elasticsearchServiceMock.createClusterClient
>['asInternalUser'];
beforeAll(() => {
esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
childEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
esClient.child.mockReturnValue(childEsClient as unknown as Client);
store = new TaskStore({
index: 'tasky',
taskManagerId: '',
@ -242,17 +248,17 @@ describe('TaskStore', () => {
});
async function testFetch(opts?: SearchOpts, hits: Array<estypes.SearchHit<unknown>> = []) {
esClient.search.mockResponse({
childEsClient.search.mockResponse({
hits: { hits, total: hits.length },
} as estypes.SearchResponse);
const result = await store.fetch(opts);
expect(esClient.search).toHaveBeenCalledTimes(1);
expect(childEsClient.search).toHaveBeenCalledTimes(1);
return {
result,
args: esClient.search.mock.calls[0][0],
args: childEsClient.search.mock.calls[0][0],
};
}
@ -287,7 +293,7 @@ describe('TaskStore', () => {
test('pushes error from call cluster to errors$', async () => {
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
esClient.search.mockRejectedValue(new Error('Failure'));
childEsClient.search.mockRejectedValue(new Error('Failure'));
await expect(store.fetch()).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});

View file

@ -99,6 +99,7 @@ export class TaskStore {
public readonly errors$ = new Subject<Error>();
private esClient: ElasticsearchClient;
private esClientWithoutRetries: ElasticsearchClient;
private definitions: TaskTypeDictionary;
private savedObjectsRepository: ISavedObjectsRepository;
private serializer: ISavedObjectsSerializer;
@ -121,6 +122,11 @@ export class TaskStore {
this.serializer = opts.serializer;
this.savedObjectsRepository = opts.savedObjectsRepository;
this.adHocTaskCounter = opts.adHocTaskCounter;
this.esClientWithoutRetries = opts.esClient.child({
// Timeouts are retried and make requests timeout after (requestTimeout * (1 + maxRetries))
// The poller doesn't need retry logic because it will try again at the next polling cycle
maxRetries: 0,
});
}
/**
@ -391,7 +397,7 @@ export class TaskStore {
try {
const {
hits: { hits: tasks },
} = await this.esClient.search<SavedObjectsRawDoc['_source']>({
} = await this.esClientWithoutRetries.search<SavedObjectsRawDoc['_source']>({
index: this.index,
ignore_unavailable: true,
body: {
@ -447,7 +453,7 @@ export class TaskStore {
const { query } = ensureQueryOnlyReturnsTaskObjects(opts);
try {
const // eslint-disable-next-line @typescript-eslint/naming-convention
{ total, updated, version_conflicts } = await this.esClient.updateByQuery({
{ total, updated, version_conflicts } = await this.esClientWithoutRetries.updateByQuery({
index: this.index,
ignore_unavailable: true,
refresh: true,

View file

@ -176,7 +176,6 @@ function getMockMonitoredHealth(overrides = {}): MonitoredHealth {
value: {
max_workers: 10,
poll_interval: 3000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,

View file

@ -130,7 +130,6 @@ export default function ({ getService }: FtrProviderContext) {
expect(health.status).to.eql('OK');
expect(health.stats.configuration.value).to.eql({
poll_interval: 3000,
max_poll_inactivity_cycles: 10,
monitored_aggregated_stats_refresh_rate: monitoredAggregatedStatsRefreshRate,
monitored_stats_running_average_window: 50,
monitored_task_execution_thresholds: {