Apply backpressure to the task poller whenever Elasticsearch requests respond with 503 errors (#196900)

Resolves: #195134

This PR adds 503 error check to the error filter of
`createManagedConfiguration` function, besides the 501 error .
So it applies backpressure to the task poller for 503 errors as well.
This commit is contained in:
Ersin Erdal 2024-10-24 01:16:45 +02:00 committed by GitHub
parent d1bf69cf3d
commit 292a7d384e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 36 additions and 1 deletions

View file

@ -184,6 +184,17 @@ describe('createManagedConfiguration()', () => {
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});
test('should decrease configuration at the next interval when a 503 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(SavedObjectsErrorHelpers.createGenericNotFoundEsUnavailableError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});
test('should log a warning when the configuration changes from the starting value', async () => {
const { errors$ } = setupScenario(10);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
@ -235,6 +246,17 @@ describe('createManagedConfiguration()', () => {
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});
test('should decrease configuration at the next interval when a 503 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10, CLAIM_STRATEGY_MGET);
errors$.next(SavedObjectsErrorHelpers.createGenericNotFoundEsUnavailableError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});
test('should log a warning when the configuration changes from the starting value', async () => {
const { errors$ } = setupScenario(10, CLAIM_STRATEGY_MGET);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
@ -305,6 +327,16 @@ describe('createManagedConfiguration()', () => {
expect(subscription).toHaveBeenNthCalledWith(2, 120);
});
test('should increase configuration at the next interval when a 503 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createGenericNotFoundEsUnavailableError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 120);
});
test('should log a warning when the configuration changes from the starting value', async () => {
const { errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));

View file

@ -161,7 +161,10 @@ function countErrors(errors$: Observable<Error>, countInterval: number): Observa
interval(countInterval).pipe(map(() => FLUSH_MARKER)),
errors$.pipe(
filter(
(e) => SavedObjectsErrorHelpers.isTooManyRequestsError(e) || isEsCannotExecuteScriptError(e)
(e) =>
SavedObjectsErrorHelpers.isTooManyRequestsError(e) ||
SavedObjectsErrorHelpers.isEsUnavailableError(e) ||
isEsCannotExecuteScriptError(e)
)
)
).pipe(