mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
Apply back pressure in Task Manager whenever Elasticsearch responds with a 429 (#75666)
* Make task manager maxWorkers and pollInterval observables (#75293)
* WIP step 1
* WIP step 2
* Cleanup
* Make maxWorkers an observable for the task pool
* Cleanup
* Fix test failures
* Use BehaviorSubject
* Add some tests
* Make the task manager store emit error events (#75679)
* Add errors$ observable to the task store
* Add unit tests
* Temporarily apply back pressure to maxWorkers and pollInterval when 429 errors occur (#77096)
* WIP
* Cleanup
* Add error count to message
* Reset observable values on stop
* Add comments
* Fix issues when changing configurations
* Cleanup code
* Cleanup pt2
* Some renames
* Fix typecheck
* Use observables to manage throughput
* Rename class
* Switch to createManagedConfiguration
* Add some comments
* Start unit tests
* Add logs
* Fix log level
* Attempt at adding integration tests
* Fix test failures
* Fix timer
* Revert "Fix timer"
This reverts commit 0817e5e6a5
.
* Use Symbol
* Fix merge scan
* replace startsWith with a timer that is scheduled to 0
* typo
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Gidi Meir Morris <github@gidi.io>
This commit is contained in:
parent
cd84ace53d
commit
e0bb8605b4
11 changed files with 883 additions and 121 deletions
|
@ -0,0 +1,105 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import sinon from 'sinon';
|
||||
import { mockLogger } from '../test_utils';
|
||||
import { TaskManager } from '../task_manager';
|
||||
import { savedObjectsRepositoryMock } from '../../../../../src/core/server/mocks';
|
||||
import {
|
||||
SavedObjectsSerializer,
|
||||
SavedObjectTypeRegistry,
|
||||
SavedObjectsErrorHelpers,
|
||||
} from '../../../../../src/core/server';
|
||||
import { ADJUST_THROUGHPUT_INTERVAL } from '../lib/create_managed_configuration';
|
||||
|
||||
describe('managed configuration', () => {
|
||||
let taskManager: TaskManager;
|
||||
let clock: sinon.SinonFakeTimers;
|
||||
const callAsInternalUser = jest.fn();
|
||||
const logger = mockLogger();
|
||||
const serializer = new SavedObjectsSerializer(new SavedObjectTypeRegistry());
|
||||
const savedObjectsClient = savedObjectsRepositoryMock.create();
|
||||
const config = {
|
||||
enabled: true,
|
||||
max_workers: 10,
|
||||
index: 'foo',
|
||||
max_attempts: 9,
|
||||
poll_interval: 3000,
|
||||
max_poll_inactivity_cycles: 10,
|
||||
request_capacity: 1000,
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
jest.resetAllMocks();
|
||||
callAsInternalUser.mockResolvedValue({ total: 0, updated: 0, version_conflicts: 0 });
|
||||
clock = sinon.useFakeTimers();
|
||||
taskManager = new TaskManager({
|
||||
config,
|
||||
logger,
|
||||
serializer,
|
||||
callAsInternalUser,
|
||||
taskManagerId: 'some-uuid',
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
});
|
||||
taskManager.registerTaskDefinitions({
|
||||
foo: {
|
||||
type: 'foo',
|
||||
title: 'Foo',
|
||||
createTaskRunner: jest.fn(),
|
||||
},
|
||||
});
|
||||
taskManager.start();
|
||||
// force rxjs timers to fire when they are scheduled for setTimeout(0) as the
|
||||
// sinon fake timers cause them to stall
|
||||
clock.tick(0);
|
||||
});
|
||||
|
||||
afterEach(() => clock.restore());
|
||||
|
||||
test('should lower max workers when Elasticsearch returns 429 error', async () => {
|
||||
savedObjectsClient.create.mockRejectedValueOnce(
|
||||
SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')
|
||||
);
|
||||
// Cause "too many requests" error to be thrown
|
||||
await expect(
|
||||
taskManager.schedule({
|
||||
taskType: 'foo',
|
||||
state: {},
|
||||
params: {},
|
||||
})
|
||||
).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`);
|
||||
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
'Max workers configuration is temporarily reduced after Elasticsearch returned 1 "too many request" error(s).'
|
||||
);
|
||||
expect(logger.debug).toHaveBeenCalledWith(
|
||||
'Max workers configuration changing from 10 to 8 after seeing 1 error(s)'
|
||||
);
|
||||
expect(logger.debug).toHaveBeenCalledWith('Task pool now using 10 as the max worker value');
|
||||
});
|
||||
|
||||
test('should increase poll interval when Elasticsearch returns 429 error', async () => {
|
||||
savedObjectsClient.create.mockRejectedValueOnce(
|
||||
SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')
|
||||
);
|
||||
// Cause "too many requests" error to be thrown
|
||||
await expect(
|
||||
taskManager.schedule({
|
||||
taskType: 'foo',
|
||||
state: {},
|
||||
params: {},
|
||||
})
|
||||
).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`);
|
||||
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" error(s).'
|
||||
);
|
||||
expect(logger.debug).toHaveBeenCalledWith(
|
||||
'Poll interval configuration changing from 3000 to 3600 after seeing 1 error(s)'
|
||||
);
|
||||
expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 3600ms');
|
||||
});
|
||||
});
|
|
@ -0,0 +1,213 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import sinon from 'sinon';
|
||||
import { Subject } from 'rxjs';
|
||||
import { mockLogger } from '../test_utils';
|
||||
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
|
||||
import {
|
||||
createManagedConfiguration,
|
||||
ADJUST_THROUGHPUT_INTERVAL,
|
||||
} from './create_managed_configuration';
|
||||
|
||||
describe('createManagedConfiguration()', () => {
|
||||
let clock: sinon.SinonFakeTimers;
|
||||
const logger = mockLogger();
|
||||
|
||||
beforeEach(() => {
|
||||
jest.resetAllMocks();
|
||||
clock = sinon.useFakeTimers();
|
||||
});
|
||||
|
||||
afterEach(() => clock.restore());
|
||||
|
||||
test('returns observables with initialized values', async () => {
|
||||
const maxWorkersSubscription = jest.fn();
|
||||
const pollIntervalSubscription = jest.fn();
|
||||
const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({
|
||||
logger,
|
||||
errors$: new Subject<Error>(),
|
||||
startingMaxWorkers: 1,
|
||||
startingPollInterval: 2,
|
||||
});
|
||||
maxWorkersConfiguration$.subscribe(maxWorkersSubscription);
|
||||
pollIntervalConfiguration$.subscribe(pollIntervalSubscription);
|
||||
expect(maxWorkersSubscription).toHaveBeenCalledTimes(1);
|
||||
expect(maxWorkersSubscription).toHaveBeenNthCalledWith(1, 1);
|
||||
expect(pollIntervalSubscription).toHaveBeenCalledTimes(1);
|
||||
expect(pollIntervalSubscription).toHaveBeenNthCalledWith(1, 2);
|
||||
});
|
||||
|
||||
test(`skips errors that aren't about too many requests`, async () => {
|
||||
const maxWorkersSubscription = jest.fn();
|
||||
const pollIntervalSubscription = jest.fn();
|
||||
const errors$ = new Subject<Error>();
|
||||
const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({
|
||||
errors$,
|
||||
logger,
|
||||
startingMaxWorkers: 100,
|
||||
startingPollInterval: 100,
|
||||
});
|
||||
maxWorkersConfiguration$.subscribe(maxWorkersSubscription);
|
||||
pollIntervalConfiguration$.subscribe(pollIntervalSubscription);
|
||||
errors$.next(new Error('foo'));
|
||||
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
|
||||
expect(maxWorkersSubscription).toHaveBeenCalledTimes(1);
|
||||
expect(pollIntervalSubscription).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
describe('maxWorker configuration', () => {
|
||||
function setupScenario(startingMaxWorkers: number) {
|
||||
const errors$ = new Subject<Error>();
|
||||
const subscription = jest.fn();
|
||||
const { maxWorkersConfiguration$ } = createManagedConfiguration({
|
||||
errors$,
|
||||
startingMaxWorkers,
|
||||
logger,
|
||||
startingPollInterval: 1,
|
||||
});
|
||||
maxWorkersConfiguration$.subscribe(subscription);
|
||||
return { subscription, errors$ };
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
jest.resetAllMocks();
|
||||
clock = sinon.useFakeTimers();
|
||||
});
|
||||
|
||||
afterEach(() => clock.restore());
|
||||
|
||||
test('should decrease configuration at the next interval when an error is emitted', async () => {
|
||||
const { subscription, errors$ } = setupScenario(100);
|
||||
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
|
||||
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
|
||||
expect(subscription).toHaveBeenCalledTimes(1);
|
||||
clock.tick(1);
|
||||
expect(subscription).toHaveBeenCalledTimes(2);
|
||||
expect(subscription).toHaveBeenNthCalledWith(2, 80);
|
||||
});
|
||||
|
||||
test('should log a warning when the configuration changes from the starting value', async () => {
|
||||
const { errors$ } = setupScenario(100);
|
||||
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
|
||||
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
'Max workers configuration is temporarily reduced after Elasticsearch returned 1 "too many request" error(s).'
|
||||
);
|
||||
});
|
||||
|
||||
test('should increase configuration back to normal incrementally after an error is emitted', async () => {
|
||||
const { subscription, errors$ } = setupScenario(100);
|
||||
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
|
||||
clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10);
|
||||
expect(subscription).toHaveBeenNthCalledWith(2, 80);
|
||||
expect(subscription).toHaveBeenNthCalledWith(3, 84);
|
||||
// 88.2- > 89 from Math.ceil
|
||||
expect(subscription).toHaveBeenNthCalledWith(4, 89);
|
||||
expect(subscription).toHaveBeenNthCalledWith(5, 94);
|
||||
expect(subscription).toHaveBeenNthCalledWith(6, 99);
|
||||
// 103.95 -> 100 from Math.min with starting value
|
||||
expect(subscription).toHaveBeenNthCalledWith(7, 100);
|
||||
// No new calls due to value not changing and usage of distinctUntilChanged()
|
||||
expect(subscription).toHaveBeenCalledTimes(7);
|
||||
});
|
||||
|
||||
test('should keep reducing configuration when errors keep emitting', async () => {
|
||||
const { subscription, errors$ } = setupScenario(100);
|
||||
for (let i = 0; i < 20; i++) {
|
||||
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
|
||||
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
|
||||
}
|
||||
expect(subscription).toHaveBeenNthCalledWith(2, 80);
|
||||
expect(subscription).toHaveBeenNthCalledWith(3, 64);
|
||||
// 51.2 -> 51 from Math.floor
|
||||
expect(subscription).toHaveBeenNthCalledWith(4, 51);
|
||||
expect(subscription).toHaveBeenNthCalledWith(5, 40);
|
||||
expect(subscription).toHaveBeenNthCalledWith(6, 32);
|
||||
expect(subscription).toHaveBeenNthCalledWith(7, 25);
|
||||
expect(subscription).toHaveBeenNthCalledWith(8, 20);
|
||||
expect(subscription).toHaveBeenNthCalledWith(9, 16);
|
||||
expect(subscription).toHaveBeenNthCalledWith(10, 12);
|
||||
expect(subscription).toHaveBeenNthCalledWith(11, 9);
|
||||
expect(subscription).toHaveBeenNthCalledWith(12, 7);
|
||||
expect(subscription).toHaveBeenNthCalledWith(13, 5);
|
||||
expect(subscription).toHaveBeenNthCalledWith(14, 4);
|
||||
expect(subscription).toHaveBeenNthCalledWith(15, 3);
|
||||
expect(subscription).toHaveBeenNthCalledWith(16, 2);
|
||||
expect(subscription).toHaveBeenNthCalledWith(17, 1);
|
||||
// No new calls due to value not changing and usage of distinctUntilChanged()
|
||||
expect(subscription).toHaveBeenCalledTimes(17);
|
||||
});
|
||||
});
|
||||
|
||||
describe('pollInterval configuration', () => {
|
||||
function setupScenario(startingPollInterval: number) {
|
||||
const errors$ = new Subject<Error>();
|
||||
const subscription = jest.fn();
|
||||
const { pollIntervalConfiguration$ } = createManagedConfiguration({
|
||||
logger,
|
||||
errors$,
|
||||
startingPollInterval,
|
||||
startingMaxWorkers: 1,
|
||||
});
|
||||
pollIntervalConfiguration$.subscribe(subscription);
|
||||
return { subscription, errors$ };
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
jest.resetAllMocks();
|
||||
clock = sinon.useFakeTimers();
|
||||
});
|
||||
|
||||
afterEach(() => clock.restore());
|
||||
|
||||
test('should increase configuration at the next interval when an error is emitted', async () => {
|
||||
const { subscription, errors$ } = setupScenario(100);
|
||||
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
|
||||
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
|
||||
expect(subscription).toHaveBeenCalledTimes(1);
|
||||
clock.tick(1);
|
||||
expect(subscription).toHaveBeenCalledTimes(2);
|
||||
expect(subscription).toHaveBeenNthCalledWith(2, 120);
|
||||
});
|
||||
|
||||
test('should log a warning when the configuration changes from the starting value', async () => {
|
||||
const { errors$ } = setupScenario(100);
|
||||
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
|
||||
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" error(s).'
|
||||
);
|
||||
});
|
||||
|
||||
test('should decrease configuration back to normal incrementally after an error is emitted', async () => {
|
||||
const { subscription, errors$ } = setupScenario(100);
|
||||
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
|
||||
clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10);
|
||||
expect(subscription).toHaveBeenNthCalledWith(2, 120);
|
||||
expect(subscription).toHaveBeenNthCalledWith(3, 114);
|
||||
// 108.3 -> 108 from Math.floor
|
||||
expect(subscription).toHaveBeenNthCalledWith(4, 108);
|
||||
expect(subscription).toHaveBeenNthCalledWith(5, 102);
|
||||
// 96.9 -> 100 from Math.max with the starting value
|
||||
expect(subscription).toHaveBeenNthCalledWith(6, 100);
|
||||
// No new calls due to value not changing and usage of distinctUntilChanged()
|
||||
expect(subscription).toHaveBeenCalledTimes(6);
|
||||
});
|
||||
|
||||
test('should increase configuration when errors keep emitting', async () => {
|
||||
const { subscription, errors$ } = setupScenario(100);
|
||||
for (let i = 0; i < 3; i++) {
|
||||
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
|
||||
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
|
||||
}
|
||||
expect(subscription).toHaveBeenNthCalledWith(2, 120);
|
||||
expect(subscription).toHaveBeenNthCalledWith(3, 144);
|
||||
// 172.8 -> 173 from Math.ceil
|
||||
expect(subscription).toHaveBeenNthCalledWith(4, 173);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,160 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { interval, merge, of, Observable } from 'rxjs';
|
||||
import { filter, mergeScan, map, scan, distinctUntilChanged, startWith } from 'rxjs/operators';
|
||||
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
|
||||
import { Logger } from '../types';
|
||||
|
||||
const FLUSH_MARKER = Symbol('flush');
|
||||
export const ADJUST_THROUGHPUT_INTERVAL = 10 * 1000;
|
||||
|
||||
// When errors occur, reduce maxWorkers by MAX_WORKERS_DECREASE_PERCENTAGE
|
||||
// When errors no longer occur, start increasing maxWorkers by MAX_WORKERS_INCREASE_PERCENTAGE
|
||||
// until starting value is reached
|
||||
const MAX_WORKERS_DECREASE_PERCENTAGE = 0.8;
|
||||
const MAX_WORKERS_INCREASE_PERCENTAGE = 1.05;
|
||||
|
||||
// When errors occur, increase pollInterval by POLL_INTERVAL_INCREASE_PERCENTAGE
|
||||
// When errors no longer occur, start decreasing pollInterval by POLL_INTERVAL_DECREASE_PERCENTAGE
|
||||
// until starting value is reached
|
||||
const POLL_INTERVAL_DECREASE_PERCENTAGE = 0.95;
|
||||
const POLL_INTERVAL_INCREASE_PERCENTAGE = 1.2;
|
||||
|
||||
interface ManagedConfigurationOpts {
|
||||
logger: Logger;
|
||||
startingMaxWorkers: number;
|
||||
startingPollInterval: number;
|
||||
errors$: Observable<Error>;
|
||||
}
|
||||
|
||||
interface ManagedConfiguration {
|
||||
maxWorkersConfiguration$: Observable<number>;
|
||||
pollIntervalConfiguration$: Observable<number>;
|
||||
}
|
||||
|
||||
export function createManagedConfiguration({
|
||||
logger,
|
||||
startingMaxWorkers,
|
||||
startingPollInterval,
|
||||
errors$,
|
||||
}: ManagedConfigurationOpts): ManagedConfiguration {
|
||||
const errorCheck$ = countErrors(errors$, ADJUST_THROUGHPUT_INTERVAL);
|
||||
return {
|
||||
maxWorkersConfiguration$: errorCheck$.pipe(
|
||||
createMaxWorkersScan(logger, startingMaxWorkers),
|
||||
startWith(startingMaxWorkers),
|
||||
distinctUntilChanged()
|
||||
),
|
||||
pollIntervalConfiguration$: errorCheck$.pipe(
|
||||
createPollIntervalScan(logger, startingPollInterval),
|
||||
startWith(startingPollInterval),
|
||||
distinctUntilChanged()
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
function createMaxWorkersScan(logger: Logger, startingMaxWorkers: number) {
|
||||
return scan((previousMaxWorkers: number, errorCount: number) => {
|
||||
let newMaxWorkers: number;
|
||||
if (errorCount > 0) {
|
||||
// Decrease max workers by MAX_WORKERS_DECREASE_PERCENTAGE while making sure it doesn't go lower than 1.
|
||||
// Using Math.floor to make sure the number is different than previous while not being a decimal value.
|
||||
newMaxWorkers = Math.max(Math.floor(previousMaxWorkers * MAX_WORKERS_DECREASE_PERCENTAGE), 1);
|
||||
} else {
|
||||
// Increase max workers by MAX_WORKERS_INCREASE_PERCENTAGE while making sure it doesn't go
|
||||
// higher than the starting value. Using Math.ceil to make sure the number is different than
|
||||
// previous while not being a decimal value
|
||||
newMaxWorkers = Math.min(
|
||||
startingMaxWorkers,
|
||||
Math.ceil(previousMaxWorkers * MAX_WORKERS_INCREASE_PERCENTAGE)
|
||||
);
|
||||
}
|
||||
if (newMaxWorkers !== previousMaxWorkers) {
|
||||
logger.debug(
|
||||
`Max workers configuration changing from ${previousMaxWorkers} to ${newMaxWorkers} after seeing ${errorCount} error(s)`
|
||||
);
|
||||
if (previousMaxWorkers === startingMaxWorkers) {
|
||||
logger.warn(
|
||||
`Max workers configuration is temporarily reduced after Elasticsearch returned ${errorCount} "too many request" error(s).`
|
||||
);
|
||||
}
|
||||
}
|
||||
return newMaxWorkers;
|
||||
}, startingMaxWorkers);
|
||||
}
|
||||
|
||||
function createPollIntervalScan(logger: Logger, startingPollInterval: number) {
|
||||
return scan((previousPollInterval: number, errorCount: number) => {
|
||||
let newPollInterval: number;
|
||||
if (errorCount > 0) {
|
||||
// Increase poll interval by POLL_INTERVAL_INCREASE_PERCENTAGE and use Math.ceil to
|
||||
// make sure the number is different than previous while not being a decimal value.
|
||||
newPollInterval = Math.ceil(previousPollInterval * POLL_INTERVAL_INCREASE_PERCENTAGE);
|
||||
} else {
|
||||
// Decrease poll interval by POLL_INTERVAL_DECREASE_PERCENTAGE and use Math.floor to
|
||||
// make sure the number is different than previous while not being a decimal value.
|
||||
newPollInterval = Math.max(
|
||||
startingPollInterval,
|
||||
Math.floor(previousPollInterval * POLL_INTERVAL_DECREASE_PERCENTAGE)
|
||||
);
|
||||
}
|
||||
if (newPollInterval !== previousPollInterval) {
|
||||
logger.debug(
|
||||
`Poll interval configuration changing from ${previousPollInterval} to ${newPollInterval} after seeing ${errorCount} error(s)`
|
||||
);
|
||||
if (previousPollInterval === startingPollInterval) {
|
||||
logger.warn(
|
||||
`Poll interval configuration is temporarily increased after Elasticsearch returned ${errorCount} "too many request" error(s).`
|
||||
);
|
||||
}
|
||||
}
|
||||
return newPollInterval;
|
||||
}, startingPollInterval);
|
||||
}
|
||||
|
||||
function countErrors(errors$: Observable<Error>, countInterval: number): Observable<number> {
|
||||
return merge(
|
||||
// Flush error count at fixed interval
|
||||
interval(countInterval).pipe(map(() => FLUSH_MARKER)),
|
||||
errors$.pipe(filter((e) => SavedObjectsErrorHelpers.isTooManyRequestsError(e)))
|
||||
).pipe(
|
||||
// When tag is "flush", reset the error counter
|
||||
// Otherwise increment the error counter
|
||||
mergeScan(({ count }, next) => {
|
||||
return next === FLUSH_MARKER
|
||||
? of(emitErrorCount(count), resetErrorCount())
|
||||
: of(incementErrorCount(count));
|
||||
}, emitErrorCount(0)),
|
||||
filter(isEmitEvent),
|
||||
map(({ count }) => count)
|
||||
);
|
||||
}
|
||||
|
||||
function emitErrorCount(count: number) {
|
||||
return {
|
||||
tag: 'emit',
|
||||
count,
|
||||
};
|
||||
}
|
||||
|
||||
function isEmitEvent(event: { tag: string; count: number }) {
|
||||
return event.tag === 'emit';
|
||||
}
|
||||
|
||||
function incementErrorCount(count: number) {
|
||||
return {
|
||||
tag: 'inc',
|
||||
count: count + 1,
|
||||
};
|
||||
}
|
||||
|
||||
function resetErrorCount() {
|
||||
return {
|
||||
tag: 'initial',
|
||||
count: 0,
|
||||
};
|
||||
}
|
|
@ -4,9 +4,9 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { Subject, Observable, throwError, interval, timer, Subscription } from 'rxjs';
|
||||
import { exhaustMap, tap, takeUntil, switchMap, switchMapTo, catchError } from 'rxjs/operators';
|
||||
import { Subject, Observable, throwError, timer, Subscription } from 'rxjs';
|
||||
import { noop } from 'lodash';
|
||||
import { exhaustMap, tap, takeUntil, switchMap, switchMapTo, catchError } from 'rxjs/operators';
|
||||
|
||||
const DEFAULT_HEARTBEAT_INTERVAL = 1000;
|
||||
|
||||
|
@ -29,7 +29,7 @@ export function createObservableMonitor<T, E>(
|
|||
}: ObservableMonitorOptions<E> = {}
|
||||
): Observable<T> {
|
||||
return new Observable((subscriber) => {
|
||||
const subscription: Subscription = interval(heartbeatInterval)
|
||||
const subscription: Subscription = timer(0, heartbeatInterval)
|
||||
.pipe(
|
||||
// switch from the heartbeat interval to the instantiated observable until it completes / errors
|
||||
exhaustMap(() => takeUntilDurationOfInactivity(observableFactory(), inactivityTimeout)),
|
||||
|
|
|
@ -5,11 +5,11 @@
|
|||
*/
|
||||
|
||||
import _ from 'lodash';
|
||||
import { Subject } from 'rxjs';
|
||||
import { Subject, of, BehaviorSubject } from 'rxjs';
|
||||
import { Option, none, some } from 'fp-ts/lib/Option';
|
||||
import { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
|
||||
import { fakeSchedulers } from 'rxjs-marbles/jest';
|
||||
import { sleep, resolvable, Resolvable } from '../test_utils';
|
||||
import { sleep, resolvable, Resolvable, mockLogger } from '../test_utils';
|
||||
import { asOk, asErr } from '../lib/result_type';
|
||||
|
||||
describe('TaskPoller', () => {
|
||||
|
@ -24,10 +24,12 @@ describe('TaskPoller', () => {
|
|||
|
||||
const work = jest.fn(async () => true);
|
||||
createTaskPoller<void, boolean>({
|
||||
pollInterval,
|
||||
logger: mockLogger(),
|
||||
pollInterval$: of(pollInterval),
|
||||
bufferCapacity,
|
||||
getCapacity: () => 1,
|
||||
work,
|
||||
workTimeout: pollInterval * 5,
|
||||
pollRequests$: new Subject<Option<void>>(),
|
||||
}).subscribe(() => {});
|
||||
|
||||
|
@ -41,8 +43,51 @@ describe('TaskPoller', () => {
|
|||
expect(work).toHaveBeenCalledTimes(1);
|
||||
|
||||
await sleep(0);
|
||||
await sleep(0);
|
||||
advance(pollInterval + 10);
|
||||
await sleep(0);
|
||||
expect(work).toHaveBeenCalledTimes(2);
|
||||
})
|
||||
);
|
||||
|
||||
test(
|
||||
'poller adapts to pollInterval changes',
|
||||
fakeSchedulers(async (advance) => {
|
||||
const pollInterval = 100;
|
||||
const pollInterval$ = new BehaviorSubject(pollInterval);
|
||||
const bufferCapacity = 5;
|
||||
|
||||
const work = jest.fn(async () => true);
|
||||
createTaskPoller<void, boolean>({
|
||||
logger: mockLogger(),
|
||||
pollInterval$,
|
||||
bufferCapacity,
|
||||
getCapacity: () => 1,
|
||||
work,
|
||||
workTimeout: pollInterval * 5,
|
||||
pollRequests$: new Subject<Option<void>>(),
|
||||
}).subscribe(() => {});
|
||||
|
||||
// `work` is async, we have to force a node `tick`
|
||||
await sleep(0);
|
||||
advance(pollInterval);
|
||||
expect(work).toHaveBeenCalledTimes(1);
|
||||
|
||||
pollInterval$.next(pollInterval * 2);
|
||||
|
||||
// `work` is async, we have to force a node `tick`
|
||||
await sleep(0);
|
||||
advance(pollInterval);
|
||||
expect(work).toHaveBeenCalledTimes(1);
|
||||
advance(pollInterval);
|
||||
expect(work).toHaveBeenCalledTimes(2);
|
||||
|
||||
pollInterval$.next(pollInterval / 2);
|
||||
|
||||
// `work` is async, we have to force a node `tick`
|
||||
await sleep(0);
|
||||
advance(pollInterval / 2);
|
||||
expect(work).toHaveBeenCalledTimes(3);
|
||||
})
|
||||
);
|
||||
|
||||
|
@ -56,9 +101,11 @@ describe('TaskPoller', () => {
|
|||
|
||||
let hasCapacity = true;
|
||||
createTaskPoller<void, boolean>({
|
||||
pollInterval,
|
||||
logger: mockLogger(),
|
||||
pollInterval$: of(pollInterval),
|
||||
bufferCapacity,
|
||||
work,
|
||||
workTimeout: pollInterval * 5,
|
||||
getCapacity: () => (hasCapacity ? 1 : 0),
|
||||
pollRequests$: new Subject<Option<void>>(),
|
||||
}).subscribe(() => {});
|
||||
|
@ -113,9 +160,11 @@ describe('TaskPoller', () => {
|
|||
const work = jest.fn(async () => true);
|
||||
const pollRequests$ = new Subject<Option<void>>();
|
||||
createTaskPoller<void, boolean>({
|
||||
pollInterval,
|
||||
logger: mockLogger(),
|
||||
pollInterval$: of(pollInterval),
|
||||
bufferCapacity,
|
||||
work,
|
||||
workTimeout: pollInterval * 5,
|
||||
getCapacity: () => 1,
|
||||
pollRequests$,
|
||||
}).subscribe(jest.fn());
|
||||
|
@ -157,9 +206,11 @@ describe('TaskPoller', () => {
|
|||
const work = jest.fn(async () => true);
|
||||
const pollRequests$ = new Subject<Option<void>>();
|
||||
createTaskPoller<void, boolean>({
|
||||
pollInterval,
|
||||
logger: mockLogger(),
|
||||
pollInterval$: of(pollInterval),
|
||||
bufferCapacity,
|
||||
work,
|
||||
workTimeout: pollInterval * 5,
|
||||
getCapacity: () => (hasCapacity ? 1 : 0),
|
||||
pollRequests$,
|
||||
}).subscribe(() => {});
|
||||
|
@ -200,9 +251,11 @@ describe('TaskPoller', () => {
|
|||
const work = jest.fn(async () => true);
|
||||
const pollRequests$ = new Subject<Option<string>>();
|
||||
createTaskPoller<string, boolean>({
|
||||
pollInterval,
|
||||
logger: mockLogger(),
|
||||
pollInterval$: of(pollInterval),
|
||||
bufferCapacity,
|
||||
work,
|
||||
workTimeout: pollInterval * 5,
|
||||
getCapacity: () => 1,
|
||||
pollRequests$,
|
||||
}).subscribe(() => {});
|
||||
|
@ -235,7 +288,8 @@ describe('TaskPoller', () => {
|
|||
const handler = jest.fn();
|
||||
const pollRequests$ = new Subject<Option<string>>();
|
||||
createTaskPoller<string, string[]>({
|
||||
pollInterval,
|
||||
logger: mockLogger(),
|
||||
pollInterval$: of(pollInterval),
|
||||
bufferCapacity,
|
||||
work: async (...args) => {
|
||||
await worker;
|
||||
|
@ -285,7 +339,8 @@ describe('TaskPoller', () => {
|
|||
type ResolvableTupple = [string, PromiseLike<void> & Resolvable];
|
||||
const pollRequests$ = new Subject<Option<ResolvableTupple>>();
|
||||
createTaskPoller<[string, Resolvable], string[]>({
|
||||
pollInterval,
|
||||
logger: mockLogger(),
|
||||
pollInterval$: of(pollInterval),
|
||||
bufferCapacity,
|
||||
work: async (...resolvables) => {
|
||||
await Promise.all(resolvables.map(([, future]) => future));
|
||||
|
@ -344,11 +399,13 @@ describe('TaskPoller', () => {
|
|||
const handler = jest.fn();
|
||||
const pollRequests$ = new Subject<Option<string>>();
|
||||
createTaskPoller<string, string[]>({
|
||||
pollInterval,
|
||||
logger: mockLogger(),
|
||||
pollInterval$: of(pollInterval),
|
||||
bufferCapacity,
|
||||
work: async (...args) => {
|
||||
throw new Error('failed to work');
|
||||
},
|
||||
workTimeout: pollInterval * 5,
|
||||
getCapacity: () => 5,
|
||||
pollRequests$,
|
||||
}).subscribe(handler);
|
||||
|
@ -383,9 +440,11 @@ describe('TaskPoller', () => {
|
|||
return callCount;
|
||||
});
|
||||
createTaskPoller<string, number>({
|
||||
pollInterval,
|
||||
logger: mockLogger(),
|
||||
pollInterval$: of(pollInterval),
|
||||
bufferCapacity,
|
||||
work,
|
||||
workTimeout: pollInterval * 5,
|
||||
getCapacity: () => 5,
|
||||
pollRequests$,
|
||||
}).subscribe(handler);
|
||||
|
@ -424,9 +483,11 @@ describe('TaskPoller', () => {
|
|||
const work = jest.fn(async () => {});
|
||||
const pollRequests$ = new Subject<Option<string>>();
|
||||
createTaskPoller<string, void>({
|
||||
pollInterval,
|
||||
logger: mockLogger(),
|
||||
pollInterval$: of(pollInterval),
|
||||
bufferCapacity,
|
||||
work,
|
||||
workTimeout: pollInterval * 5,
|
||||
getCapacity: () => 5,
|
||||
pollRequests$,
|
||||
}).subscribe(handler);
|
||||
|
|
|
@ -11,10 +11,11 @@
|
|||
import { performance } from 'perf_hooks';
|
||||
import { after } from 'lodash';
|
||||
import { Subject, merge, interval, of, Observable } from 'rxjs';
|
||||
import { mapTo, filter, scan, concatMap, tap, catchError } from 'rxjs/operators';
|
||||
import { mapTo, filter, scan, concatMap, tap, catchError, switchMap } from 'rxjs/operators';
|
||||
|
||||
import { pipe } from 'fp-ts/lib/pipeable';
|
||||
import { Option, none, map as mapOptional, getOrElse } from 'fp-ts/lib/Option';
|
||||
import { Logger } from '../types';
|
||||
import { pullFromSet } from '../lib/pull_from_set';
|
||||
import {
|
||||
Result,
|
||||
|
@ -30,12 +31,13 @@ import { timeoutPromiseAfter } from './timeout_promise_after';
|
|||
type WorkFn<T, H> = (...params: T[]) => Promise<H>;
|
||||
|
||||
interface Opts<T, H> {
|
||||
pollInterval: number;
|
||||
logger: Logger;
|
||||
pollInterval$: Observable<number>;
|
||||
bufferCapacity: number;
|
||||
getCapacity: () => number;
|
||||
pollRequests$: Observable<Option<T>>;
|
||||
work: WorkFn<T, H>;
|
||||
workTimeout?: number;
|
||||
workTimeout: number;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -52,7 +54,8 @@ interface Opts<T, H> {
|
|||
* of unique request argumets of type T. The queue holds all the buffered request arguments streamed in via pollRequests$
|
||||
*/
|
||||
export function createTaskPoller<T, H>({
|
||||
pollInterval,
|
||||
logger,
|
||||
pollInterval$,
|
||||
getCapacity,
|
||||
pollRequests$,
|
||||
bufferCapacity,
|
||||
|
@ -67,7 +70,13 @@ export function createTaskPoller<T, H>({
|
|||
// emit a polling event on demand
|
||||
pollRequests$,
|
||||
// emit a polling event on a fixed interval
|
||||
interval(pollInterval).pipe(mapTo(none))
|
||||
pollInterval$.pipe(
|
||||
switchMap((period) => {
|
||||
logger.debug(`Task poller now using interval of ${period}ms`);
|
||||
return interval(period);
|
||||
}),
|
||||
mapTo(none)
|
||||
)
|
||||
).pipe(
|
||||
// buffer all requests in a single set (to remove duplicates) as we don't want
|
||||
// work to take place in parallel (it could cause Task Manager to pull in the same
|
||||
|
@ -95,7 +104,7 @@ export function createTaskPoller<T, H>({
|
|||
await promiseResult<H, Error>(
|
||||
timeoutPromiseAfter<H, Error>(
|
||||
work(...pullFromSet(set, getCapacity())),
|
||||
workTimeout ?? pollInterval,
|
||||
workTimeout,
|
||||
() => new Error(`work has timed out`)
|
||||
)
|
||||
),
|
||||
|
|
|
@ -17,6 +17,7 @@ import {
|
|||
ISavedObjectsRepository,
|
||||
} from '../../../../src/core/server';
|
||||
import { Result, asOk, asErr, either, map, mapErr, promiseResult } from './lib/result_type';
|
||||
import { createManagedConfiguration } from './lib/create_managed_configuration';
|
||||
import { TaskManagerConfig } from './config';
|
||||
|
||||
import { Logger } from './types';
|
||||
|
@ -149,6 +150,13 @@ export class TaskManager {
|
|||
// pipe store events into the TaskManager's event stream
|
||||
this.store.events.subscribe((event) => this.events$.next(event));
|
||||
|
||||
const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({
|
||||
logger: this.logger,
|
||||
errors$: this.store.errors$,
|
||||
startingMaxWorkers: opts.config.max_workers,
|
||||
startingPollInterval: opts.config.poll_interval,
|
||||
});
|
||||
|
||||
this.bufferedStore = new BufferedTaskStore(this.store, {
|
||||
bufferMaxOperations: opts.config.max_workers,
|
||||
logger: this.logger,
|
||||
|
@ -156,7 +164,7 @@ export class TaskManager {
|
|||
|
||||
this.pool = new TaskPool({
|
||||
logger: this.logger,
|
||||
maxWorkers: opts.config.max_workers,
|
||||
maxWorkers$: maxWorkersConfiguration$,
|
||||
});
|
||||
|
||||
const {
|
||||
|
@ -166,7 +174,8 @@ export class TaskManager {
|
|||
this.poller$ = createObservableMonitor<Result<FillPoolResult, PollingError<string>>, Error>(
|
||||
() =>
|
||||
createTaskPoller<string, FillPoolResult>({
|
||||
pollInterval,
|
||||
logger: this.logger,
|
||||
pollInterval$: pollIntervalConfiguration$,
|
||||
bufferCapacity: opts.config.request_capacity,
|
||||
getCapacity: () => this.pool.availableWorkers,
|
||||
pollRequests$: this.claimRequests$,
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
|
||||
import sinon from 'sinon';
|
||||
import { of, Subject } from 'rxjs';
|
||||
import { TaskPool, TaskPoolRunResult } from './task_pool';
|
||||
import { mockLogger, resolvable, sleep } from './test_utils';
|
||||
import { asOk } from './lib/result_type';
|
||||
|
@ -14,7 +15,7 @@ import moment from 'moment';
|
|||
describe('TaskPool', () => {
|
||||
test('occupiedWorkers are a sum of running tasks', async () => {
|
||||
const pool = new TaskPool({
|
||||
maxWorkers: 200,
|
||||
maxWorkers$: of(200),
|
||||
logger: mockLogger(),
|
||||
});
|
||||
|
||||
|
@ -26,7 +27,7 @@ describe('TaskPool', () => {
|
|||
|
||||
test('availableWorkers are a function of total_capacity - occupiedWorkers', async () => {
|
||||
const pool = new TaskPool({
|
||||
maxWorkers: 10,
|
||||
maxWorkers$: of(10),
|
||||
logger: mockLogger(),
|
||||
});
|
||||
|
||||
|
@ -36,9 +37,21 @@ describe('TaskPool', () => {
|
|||
expect(pool.availableWorkers).toEqual(7);
|
||||
});
|
||||
|
||||
test('availableWorkers is 0 until maxWorkers$ pushes a value', async () => {
|
||||
const maxWorkers$ = new Subject<number>();
|
||||
const pool = new TaskPool({
|
||||
maxWorkers$,
|
||||
logger: mockLogger(),
|
||||
});
|
||||
|
||||
expect(pool.availableWorkers).toEqual(0);
|
||||
maxWorkers$.next(10);
|
||||
expect(pool.availableWorkers).toEqual(10);
|
||||
});
|
||||
|
||||
test('does not run tasks that are beyond its available capacity', async () => {
|
||||
const pool = new TaskPool({
|
||||
maxWorkers: 2,
|
||||
maxWorkers$: of(2),
|
||||
logger: mockLogger(),
|
||||
});
|
||||
|
||||
|
@ -60,7 +73,7 @@ describe('TaskPool', () => {
|
|||
test('should log when marking a Task as running fails', async () => {
|
||||
const logger = mockLogger();
|
||||
const pool = new TaskPool({
|
||||
maxWorkers: 2,
|
||||
maxWorkers$: of(2),
|
||||
logger,
|
||||
});
|
||||
|
||||
|
@ -83,7 +96,7 @@ describe('TaskPool', () => {
|
|||
test('should log when running a Task fails', async () => {
|
||||
const logger = mockLogger();
|
||||
const pool = new TaskPool({
|
||||
maxWorkers: 3,
|
||||
maxWorkers$: of(3),
|
||||
logger,
|
||||
});
|
||||
|
||||
|
@ -106,7 +119,7 @@ describe('TaskPool', () => {
|
|||
test('should not log when running a Task fails due to the Task SO having been deleted while in flight', async () => {
|
||||
const logger = mockLogger();
|
||||
const pool = new TaskPool({
|
||||
maxWorkers: 3,
|
||||
maxWorkers$: of(3),
|
||||
logger,
|
||||
});
|
||||
|
||||
|
@ -117,11 +130,9 @@ describe('TaskPool', () => {
|
|||
|
||||
const result = await pool.run([mockTask(), taskFailedToRun, mockTask()]);
|
||||
|
||||
expect(logger.debug.mock.calls[0]).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"Task TaskType \\"shooooo\\" failed in attempt to run: Saved object [task/foo] not found",
|
||||
]
|
||||
`);
|
||||
expect(logger.debug).toHaveBeenCalledWith(
|
||||
'Task TaskType "shooooo" failed in attempt to run: Saved object [task/foo] not found'
|
||||
);
|
||||
expect(logger.warn).not.toHaveBeenCalled();
|
||||
|
||||
expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks);
|
||||
|
@ -130,7 +141,7 @@ describe('TaskPool', () => {
|
|||
test('Running a task which fails still takes up capacity', async () => {
|
||||
const logger = mockLogger();
|
||||
const pool = new TaskPool({
|
||||
maxWorkers: 1,
|
||||
maxWorkers$: of(1),
|
||||
logger,
|
||||
});
|
||||
|
||||
|
@ -147,7 +158,7 @@ describe('TaskPool', () => {
|
|||
|
||||
test('clears up capacity when a task completes', async () => {
|
||||
const pool = new TaskPool({
|
||||
maxWorkers: 1,
|
||||
maxWorkers$: of(1),
|
||||
logger: mockLogger(),
|
||||
});
|
||||
|
||||
|
@ -193,7 +204,7 @@ describe('TaskPool', () => {
|
|||
test('run cancels expired tasks prior to running new tasks', async () => {
|
||||
const logger = mockLogger();
|
||||
const pool = new TaskPool({
|
||||
maxWorkers: 2,
|
||||
maxWorkers$: of(2),
|
||||
logger,
|
||||
});
|
||||
|
||||
|
@ -251,7 +262,7 @@ describe('TaskPool', () => {
|
|||
const logger = mockLogger();
|
||||
const pool = new TaskPool({
|
||||
logger,
|
||||
maxWorkers: 20,
|
||||
maxWorkers$: of(20),
|
||||
});
|
||||
|
||||
const cancelled = resolvable();
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
* This module contains the logic that ensures we don't run too many
|
||||
* tasks at once in a given Kibana instance.
|
||||
*/
|
||||
import { Observable } from 'rxjs';
|
||||
import moment, { Duration } from 'moment';
|
||||
import { performance } from 'perf_hooks';
|
||||
import { padStart } from 'lodash';
|
||||
|
@ -16,7 +17,7 @@ import { TaskRunner } from './task_runner';
|
|||
import { isTaskSavedObjectNotFoundError } from './lib/is_task_not_found_error';
|
||||
|
||||
interface Opts {
|
||||
maxWorkers: number;
|
||||
maxWorkers$: Observable<number>;
|
||||
logger: Logger;
|
||||
}
|
||||
|
||||
|
@ -31,7 +32,7 @@ const VERSION_CONFLICT_MESSAGE = 'Task has been claimed by another Kibana servic
|
|||
* Runs tasks in batches, taking costs into account.
|
||||
*/
|
||||
export class TaskPool {
|
||||
private maxWorkers: number;
|
||||
private maxWorkers: number = 0;
|
||||
private running = new Set<TaskRunner>();
|
||||
private logger: Logger;
|
||||
|
||||
|
@ -44,8 +45,11 @@ export class TaskPool {
|
|||
* @prop {Logger} logger - The task manager logger.
|
||||
*/
|
||||
constructor(opts: Opts) {
|
||||
this.maxWorkers = opts.maxWorkers;
|
||||
this.logger = opts.logger;
|
||||
opts.maxWorkers$.subscribe((maxWorkers) => {
|
||||
this.logger.debug(`Task pool now using ${maxWorkers} as the max worker value`);
|
||||
this.maxWorkers = maxWorkers;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
import _ from 'lodash';
|
||||
import sinon from 'sinon';
|
||||
import uuid from 'uuid';
|
||||
import { filter, take } from 'rxjs/operators';
|
||||
import { filter, take, first } from 'rxjs/operators';
|
||||
import { Option, some, none } from 'fp-ts/lib/Option';
|
||||
|
||||
import {
|
||||
|
@ -66,8 +66,21 @@ const mockedDate = new Date('2019-02-12T21:01:22.479Z');
|
|||
|
||||
describe('TaskStore', () => {
|
||||
describe('schedule', () => {
|
||||
let store: TaskStore;
|
||||
|
||||
beforeAll(() => {
|
||||
store = new TaskStore({
|
||||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
callCluster: jest.fn(),
|
||||
maxAttempts: 2,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
});
|
||||
});
|
||||
|
||||
async function testSchedule(task: unknown) {
|
||||
const callCluster = jest.fn();
|
||||
savedObjectsClient.create.mockImplementation(async (type: string, attributes: unknown) => ({
|
||||
id: 'testid',
|
||||
type,
|
||||
|
@ -75,15 +88,6 @@ describe('TaskStore', () => {
|
|||
references: [],
|
||||
version: '123',
|
||||
}));
|
||||
const store = new TaskStore({
|
||||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
callCluster,
|
||||
maxAttempts: 2,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
});
|
||||
const result = await store.schedule(task as TaskInstance);
|
||||
|
||||
expect(savedObjectsClient.create).toHaveBeenCalledTimes(1);
|
||||
|
@ -176,12 +180,28 @@ describe('TaskStore', () => {
|
|||
/Unsupported task type "nope"/i
|
||||
);
|
||||
});
|
||||
|
||||
test('pushes error from saved objects client to errors$', async () => {
|
||||
const task: TaskInstance = {
|
||||
id: 'id',
|
||||
params: { hello: 'world' },
|
||||
state: { foo: 'bar' },
|
||||
taskType: 'report',
|
||||
};
|
||||
|
||||
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
|
||||
savedObjectsClient.create.mockRejectedValue(new Error('Failure'));
|
||||
await expect(store.schedule(task)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
|
||||
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
|
||||
});
|
||||
});
|
||||
|
||||
describe('fetch', () => {
|
||||
async function testFetch(opts?: SearchOpts, hits: unknown[] = []) {
|
||||
const callCluster = sinon.spy(async (name: string, params?: unknown) => ({ hits: { hits } }));
|
||||
const store = new TaskStore({
|
||||
let store: TaskStore;
|
||||
const callCluster = jest.fn();
|
||||
|
||||
beforeAll(() => {
|
||||
store = new TaskStore({
|
||||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
|
@ -190,15 +210,19 @@ describe('TaskStore', () => {
|
|||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
});
|
||||
});
|
||||
|
||||
async function testFetch(opts?: SearchOpts, hits: unknown[] = []) {
|
||||
callCluster.mockResolvedValue({ hits: { hits } });
|
||||
|
||||
const result = await store.fetch(opts);
|
||||
|
||||
sinon.assert.calledOnce(callCluster);
|
||||
sinon.assert.calledWith(callCluster, 'search');
|
||||
expect(callCluster).toHaveBeenCalledTimes(1);
|
||||
expect(callCluster).toHaveBeenCalledWith('search', expect.anything());
|
||||
|
||||
return {
|
||||
result,
|
||||
args: callCluster.args[0][1],
|
||||
args: callCluster.mock.calls[0][1],
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -230,6 +254,13 @@ describe('TaskStore', () => {
|
|||
},
|
||||
});
|
||||
});
|
||||
|
||||
test('pushes error from call cluster to errors$', async () => {
|
||||
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
|
||||
callCluster.mockRejectedValue(new Error('Failure'));
|
||||
await expect(store.fetch()).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
|
||||
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
|
||||
});
|
||||
});
|
||||
|
||||
describe('claimAvailableTasks', () => {
|
||||
|
@ -928,9 +959,46 @@ if (doc['task.runAt'].size()!=0) {
|
|||
},
|
||||
]);
|
||||
});
|
||||
|
||||
test('pushes error from saved objects client to errors$', async () => {
|
||||
const callCluster = jest.fn();
|
||||
const store = new TaskStore({
|
||||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
callCluster,
|
||||
definitions: taskDefinitions,
|
||||
maxAttempts: 2,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
});
|
||||
|
||||
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
|
||||
callCluster.mockRejectedValue(new Error('Failure'));
|
||||
await expect(
|
||||
store.claimAvailableTasks({
|
||||
claimOwnershipUntil: new Date(),
|
||||
size: 10,
|
||||
})
|
||||
).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
|
||||
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
|
||||
});
|
||||
});
|
||||
|
||||
describe('update', () => {
|
||||
let store: TaskStore;
|
||||
|
||||
beforeAll(() => {
|
||||
store = new TaskStore({
|
||||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
callCluster: jest.fn(),
|
||||
maxAttempts: 2,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
});
|
||||
});
|
||||
|
||||
test('refreshes the index, handles versioning', async () => {
|
||||
const task = {
|
||||
runAt: mockedDate,
|
||||
|
@ -959,16 +1027,6 @@ if (doc['task.runAt'].size()!=0) {
|
|||
}
|
||||
);
|
||||
|
||||
const store = new TaskStore({
|
||||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
callCluster: jest.fn(),
|
||||
maxAttempts: 2,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
});
|
||||
|
||||
const result = await store.update(task);
|
||||
|
||||
expect(savedObjectsClient.update).toHaveBeenCalledWith(
|
||||
|
@ -1002,28 +1060,116 @@ if (doc['task.runAt'].size()!=0) {
|
|||
version: '123',
|
||||
});
|
||||
});
|
||||
|
||||
test('pushes error from saved objects client to errors$', async () => {
|
||||
const task = {
|
||||
runAt: mockedDate,
|
||||
scheduledAt: mockedDate,
|
||||
startedAt: null,
|
||||
retryAt: null,
|
||||
id: 'task:324242',
|
||||
params: { hello: 'world' },
|
||||
state: { foo: 'bar' },
|
||||
taskType: 'report',
|
||||
attempts: 3,
|
||||
status: 'idle' as TaskStatus,
|
||||
version: '123',
|
||||
ownerId: null,
|
||||
};
|
||||
|
||||
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
|
||||
savedObjectsClient.update.mockRejectedValue(new Error('Failure'));
|
||||
await expect(store.update(task)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
|
||||
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
|
||||
});
|
||||
});
|
||||
|
||||
describe('remove', () => {
|
||||
test('removes the task with the specified id', async () => {
|
||||
const id = `id-${_.random(1, 20)}`;
|
||||
const callCluster = jest.fn();
|
||||
const store = new TaskStore({
|
||||
describe('bulkUpdate', () => {
|
||||
let store: TaskStore;
|
||||
|
||||
beforeAll(() => {
|
||||
store = new TaskStore({
|
||||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
callCluster,
|
||||
callCluster: jest.fn(),
|
||||
maxAttempts: 2,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
});
|
||||
});
|
||||
|
||||
test('pushes error from saved objects client to errors$', async () => {
|
||||
const task = {
|
||||
runAt: mockedDate,
|
||||
scheduledAt: mockedDate,
|
||||
startedAt: null,
|
||||
retryAt: null,
|
||||
id: 'task:324242',
|
||||
params: { hello: 'world' },
|
||||
state: { foo: 'bar' },
|
||||
taskType: 'report',
|
||||
attempts: 3,
|
||||
status: 'idle' as TaskStatus,
|
||||
version: '123',
|
||||
ownerId: null,
|
||||
};
|
||||
|
||||
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
|
||||
savedObjectsClient.bulkUpdate.mockRejectedValue(new Error('Failure'));
|
||||
await expect(store.bulkUpdate([task])).rejects.toThrowErrorMatchingInlineSnapshot(
|
||||
`"Failure"`
|
||||
);
|
||||
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
|
||||
});
|
||||
});
|
||||
|
||||
describe('remove', () => {
|
||||
let store: TaskStore;
|
||||
|
||||
beforeAll(() => {
|
||||
store = new TaskStore({
|
||||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
callCluster: jest.fn(),
|
||||
maxAttempts: 2,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
});
|
||||
});
|
||||
|
||||
test('removes the task with the specified id', async () => {
|
||||
const id = `id-${_.random(1, 20)}`;
|
||||
const result = await store.remove(id);
|
||||
expect(result).toBeUndefined();
|
||||
expect(savedObjectsClient.delete).toHaveBeenCalledWith('task', id);
|
||||
});
|
||||
|
||||
test('pushes error from saved objects client to errors$', async () => {
|
||||
const id = `id-${_.random(1, 20)}`;
|
||||
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
|
||||
savedObjectsClient.delete.mockRejectedValue(new Error('Failure'));
|
||||
await expect(store.remove(id)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
|
||||
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
|
||||
});
|
||||
});
|
||||
|
||||
describe('get', () => {
|
||||
let store: TaskStore;
|
||||
|
||||
beforeAll(() => {
|
||||
store = new TaskStore({
|
||||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
callCluster: jest.fn(),
|
||||
maxAttempts: 2,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
});
|
||||
});
|
||||
|
||||
test('gets the task with the specified id', async () => {
|
||||
const id = `id-${_.random(1, 20)}`;
|
||||
const task = {
|
||||
|
@ -1041,7 +1187,6 @@ if (doc['task.runAt'].size()!=0) {
|
|||
ownerId: null,
|
||||
};
|
||||
|
||||
const callCluster = jest.fn();
|
||||
savedObjectsClient.get.mockImplementation(async (type: string, objectId: string) => ({
|
||||
id: objectId,
|
||||
type,
|
||||
|
@ -1053,22 +1198,20 @@ if (doc['task.runAt'].size()!=0) {
|
|||
version: '123',
|
||||
}));
|
||||
|
||||
const store = new TaskStore({
|
||||
index: 'tasky',
|
||||
taskManagerId: '',
|
||||
serializer,
|
||||
callCluster,
|
||||
maxAttempts: 2,
|
||||
definitions: taskDefinitions,
|
||||
savedObjectsRepository: savedObjectsClient,
|
||||
});
|
||||
|
||||
const result = await store.get(id);
|
||||
|
||||
expect(result).toEqual(task);
|
||||
|
||||
expect(savedObjectsClient.get).toHaveBeenCalledWith('task', id);
|
||||
});
|
||||
|
||||
test('pushes error from saved objects client to errors$', async () => {
|
||||
const id = `id-${_.random(1, 20)}`;
|
||||
const firstErrorPromise = store.errors$.pipe(first()).toPromise();
|
||||
savedObjectsClient.get.mockRejectedValue(new Error('Failure'));
|
||||
await expect(store.get(id)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`);
|
||||
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getLifecycle', () => {
|
||||
|
|
|
@ -121,6 +121,7 @@ export class TaskStore {
|
|||
public readonly maxAttempts: number;
|
||||
public readonly index: string;
|
||||
public readonly taskManagerId: string;
|
||||
public readonly errors$ = new Subject<Error>();
|
||||
|
||||
private callCluster: ElasticJs;
|
||||
private definitions: TaskDictionary<TaskDefinition>;
|
||||
|
@ -171,11 +172,17 @@ export class TaskStore {
|
|||
);
|
||||
}
|
||||
|
||||
const savedObject = await this.savedObjectsRepository.create<SerializedConcreteTaskInstance>(
|
||||
'task',
|
||||
taskInstanceToAttributes(taskInstance),
|
||||
{ id: taskInstance.id, refresh: false }
|
||||
);
|
||||
let savedObject;
|
||||
try {
|
||||
savedObject = await this.savedObjectsRepository.create<SerializedConcreteTaskInstance>(
|
||||
'task',
|
||||
taskInstanceToAttributes(taskInstance),
|
||||
{ id: taskInstance.id, refresh: false }
|
||||
);
|
||||
} catch (e) {
|
||||
this.errors$.next(e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
return savedObjectToConcreteTaskInstance(savedObject);
|
||||
}
|
||||
|
@ -333,12 +340,22 @@ export class TaskStore {
|
|||
*/
|
||||
public async update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance> {
|
||||
const attributes = taskInstanceToAttributes(doc);
|
||||
const updatedSavedObject = await this.savedObjectsRepository.update<
|
||||
SerializedConcreteTaskInstance
|
||||
>('task', doc.id, attributes, {
|
||||
refresh: false,
|
||||
version: doc.version,
|
||||
});
|
||||
|
||||
let updatedSavedObject;
|
||||
try {
|
||||
updatedSavedObject = await this.savedObjectsRepository.update<SerializedConcreteTaskInstance>(
|
||||
'task',
|
||||
doc.id,
|
||||
attributes,
|
||||
{
|
||||
refresh: false,
|
||||
version: doc.version,
|
||||
}
|
||||
);
|
||||
} catch (e) {
|
||||
this.errors$.next(e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
return savedObjectToConcreteTaskInstance(
|
||||
// The SavedObjects update api forces a Partial on the `attributes` on the response,
|
||||
|
@ -362,8 +379,11 @@ export class TaskStore {
|
|||
return attrsById;
|
||||
}, new Map());
|
||||
|
||||
const updatedSavedObjects: Array<SavedObjectsUpdateResponse | Error> = (
|
||||
await this.savedObjectsRepository.bulkUpdate<SerializedConcreteTaskInstance>(
|
||||
let updatedSavedObjects: Array<SavedObjectsUpdateResponse | Error>;
|
||||
try {
|
||||
({ saved_objects: updatedSavedObjects } = await this.savedObjectsRepository.bulkUpdate<
|
||||
SerializedConcreteTaskInstance
|
||||
>(
|
||||
docs.map((doc) => ({
|
||||
type: 'task',
|
||||
id: doc.id,
|
||||
|
@ -373,8 +393,11 @@ export class TaskStore {
|
|||
{
|
||||
refresh: false,
|
||||
}
|
||||
)
|
||||
).saved_objects;
|
||||
));
|
||||
} catch (e) {
|
||||
this.errors$.next(e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
return updatedSavedObjects.map<BulkUpdateResult>((updatedSavedObject, index) =>
|
||||
isSavedObjectsUpdateResponse(updatedSavedObject)
|
||||
|
@ -404,7 +427,12 @@ export class TaskStore {
|
|||
* @returns {Promise<void>}
|
||||
*/
|
||||
public async remove(id: string): Promise<void> {
|
||||
await this.savedObjectsRepository.delete('task', id);
|
||||
try {
|
||||
await this.savedObjectsRepository.delete('task', id);
|
||||
} catch (e) {
|
||||
this.errors$.next(e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -414,7 +442,14 @@ export class TaskStore {
|
|||
* @returns {Promise<void>}
|
||||
*/
|
||||
public async get(id: string): Promise<ConcreteTaskInstance> {
|
||||
return savedObjectToConcreteTaskInstance(await this.savedObjectsRepository.get('task', id));
|
||||
let result;
|
||||
try {
|
||||
result = await this.savedObjectsRepository.get<SerializedConcreteTaskInstance>('task', id);
|
||||
} catch (e) {
|
||||
this.errors$.next(e);
|
||||
throw e;
|
||||
}
|
||||
return savedObjectToConcreteTaskInstance(result);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -438,14 +473,20 @@ export class TaskStore {
|
|||
private async search(opts: SearchOpts = {}): Promise<FetchResult> {
|
||||
const { query } = ensureQueryOnlyReturnsTaskObjects(opts);
|
||||
|
||||
const result = await this.callCluster('search', {
|
||||
index: this.index,
|
||||
ignoreUnavailable: true,
|
||||
body: {
|
||||
...opts,
|
||||
query,
|
||||
},
|
||||
});
|
||||
let result;
|
||||
try {
|
||||
result = await this.callCluster('search', {
|
||||
index: this.index,
|
||||
ignoreUnavailable: true,
|
||||
body: {
|
||||
...opts,
|
||||
query,
|
||||
},
|
||||
});
|
||||
} catch (e) {
|
||||
this.errors$.next(e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
const rawDocs = (result as SearchResponse<unknown>).hits.hits;
|
||||
|
||||
|
@ -464,17 +505,23 @@ export class TaskStore {
|
|||
{ max_docs }: UpdateByQueryOpts = {}
|
||||
): Promise<UpdateByQueryResult> {
|
||||
const { query } = ensureQueryOnlyReturnsTaskObjects(opts);
|
||||
const result = await this.callCluster('updateByQuery', {
|
||||
index: this.index,
|
||||
ignoreUnavailable: true,
|
||||
refresh: true,
|
||||
max_docs,
|
||||
conflicts: 'proceed',
|
||||
body: {
|
||||
...opts,
|
||||
query,
|
||||
},
|
||||
});
|
||||
let result;
|
||||
try {
|
||||
result = await this.callCluster('updateByQuery', {
|
||||
index: this.index,
|
||||
ignoreUnavailable: true,
|
||||
refresh: true,
|
||||
max_docs,
|
||||
conflicts: 'proceed',
|
||||
body: {
|
||||
...opts,
|
||||
query,
|
||||
},
|
||||
});
|
||||
} catch (e) {
|
||||
this.errors$.next(e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||
const { total, updated, version_conflicts } = result as UpdateDocumentByQueryResponse;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue