Prevent Task Manager poller from constantly polling without delay (#172394)

Resovles https://github.com/elastic/kibana/issues/172214

In this PR, I'm making the following improvements to prevent Task
Manager from constantly hitting Elasticsearch with update by query
requests:
- Preventing the backpressure mechanism from changing the `pollInterval`
to anything greater than 60s
- Disabling the backpressure mechanism when the `pollInterval`
configured in `kibana.yml` is higher than 60s (could be implemented
better but trying to keep the code small)
- Preventing the task poller from changing the `pollInterval` to a
`null` or `NaN` value and logging a stack trace if ever it happens so we
can better understand the issue.

## To verify

1. Turn on Task Manager debug logging by setting the following in your
`kibana.yml` file.
```
logging:
  loggers:
    - name: plugins.taskManager
      level: debug
```

>Preventing the backpressure mechanism from changing the `pollInterval`
to anything greater than 60s

2. Set the following custom code to trigger the backoff mechanism that
will increase the poll interval
```
diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts
index acc12b39ec0..6670afbc29b 100644
--- a/x-pack/plugins/task_manager/server/task_store.ts
+++ b/x-pack/plugins/task_manager/server/task_store.ts
@@ -14,6 +14,7 @@ import { SavedObjectError } from '@kbn/core-saved-objects-common';

 import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
 import type { SavedObjectsBulkDeleteResponse, Logger } from '@kbn/core/server';
+import { SavedObjectsErrorHelpers } from '@kbn/core/server';

 import {
   SavedObject,
@@ -136,6 +137,10 @@ export class TaskStore {
       // The poller doesn't need retry logic because it will try again at the next polling cycle
       maxRetries: 0,
     });
+
+    setInterval(() => {
+      this.errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('', ''));
+    }, 5000);
   }

   /**
```
3. Notice the logs `Poll interval configuration changing from X to Y
after seeing Z "too many request" and/or "execute [inline] script"
error(s)` where Y keeps increasing and with time will stop at `60000`
and not surpass it. This will take a few minutes of Kibana running to
reach the limit. Once you see `Task poller now using interval of
60000ms` you will no longer see these logs because the poll interval has
stopped increasing. You can wait a minute to be certain.

>Disabling the backpressure mechanism when the pollInterval configured
in kibana.yml is higher than 60s (could be implemented better but trying
to keep the code small)

4. Set `xpack.task_manager.poll_interval: 60000` in your `kibana.yml`
file

5. Notice the logs from step 3 aren't happening but you still see max
workers reduce over time.

>Preventing the task poller from changing the `pollInterval` to a `null`
or `NaN` value and logging a stack trace if ever it happens so we can
better understand the issue.

6. Set the following code that forces a null in the observable pipeline
(still unknown how the code does this but we're testing the defensive
code in a different module)
```
diff --git a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts
index 52e11e431ba..da761a46555 100644
--- a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts
+++ b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts
@@ -134,7 +134,7 @@ function createPollIntervalScan(logger: Logger, startingPollInterval: number) {
         );
       }
     }
-    return newPollInterval;
+    return null;
   }, startingPollInterval);
 }
```

7. Notice `Error: Expected the new interval to be a number > 0,
received: null but poller will keep using: 65000` error in the logs with
a stack trace.

8. Remove the custom `xpack.task_manager.poll_interval` configuration
from your `kibana.yml` file

9. Set the following code that forces the outcome of a formula to be NaN
```
diff --git a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts
index 52e11e431ba..104997505c0 100644
--- a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts
+++ b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts
@@ -106,6 +106,7 @@ function createPollIntervalScan(logger: Logger, startingPollInterval: number) {
         Math.ceil(previousPollInterval * POLL_INTERVAL_INCREASE_PERCENTAGE),
         Math.max(PREFERRED_MAX_POLL_INTERVAL, startingPollInterval)
       );
+      newPollInterval = NaN;
       if (Number.isNaN(newPollInterval) || newPollInterval == null || newPollInterval < 0) {
         logger.error(
           `Poll interval configuration had an issue calculating the new poll interval: Math.min(Math.ceil(${previousPollInterval} * ${POLL_INTERVAL_INCREASE_PERCENTAGE}), Math.max(${PREFERRED_MAX_POLL_INTERVAL}, ${startingPollInterval})) = ${newPollInterval}`
```

10. Observe we log the formula as an error when the outcome is NaN
```
Poll interval configuration had an issue calculating the new poll interval: Math.min(Math.ceil(null * 1.2), Math.max(60000, 3000)) = NaN
```

---------

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Mike Côté 2023-12-07 13:56:36 -05:00 committed by GitHub
parent 828ba66dd9
commit dc54337976
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 138 additions and 2 deletions

View file

@ -184,6 +184,23 @@ describe('createManagedConfiguration()', () => {
);
});
test('should log a warning when an issue occurred in the calculating of the increased poll interval', async () => {
const { errors$ } = setupScenario(NaN);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.error).toHaveBeenCalledWith(
'Poll interval configuration had an issue calculating the new poll interval: Math.min(Math.ceil(NaN * 1.2), Math.max(60000, NaN)) = NaN, will keep the poll interval unchanged (NaN)'
);
});
test('should log a warning when an issue occurred in the calculating of the decreased poll interval', async () => {
setupScenario(NaN);
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.error).toHaveBeenCalledWith(
'Poll interval configuration had an issue calculating the new poll interval: Math.max(NaN, Math.floor(NaN * 0.95)) = NaN, will keep the poll interval unchanged (NaN)'
);
});
test('should decrease configuration back to normal incrementally after an error is emitted', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
@ -210,5 +227,40 @@ describe('createManagedConfiguration()', () => {
// 172.8 -> 173 from Math.ceil
expect(subscription).toHaveBeenNthCalledWith(4, 173);
});
test('should limit the upper bound to 60s by default', async () => {
const { subscription, errors$ } = setupScenario(3000);
for (let i = 0; i < 18; i++) {
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
}
expect(subscription).toHaveBeenNthCalledWith(2, 3600);
expect(subscription).toHaveBeenNthCalledWith(3, 4320);
expect(subscription).toHaveBeenNthCalledWith(4, 5184);
expect(subscription).toHaveBeenNthCalledWith(5, 6221);
expect(subscription).toHaveBeenNthCalledWith(6, 7466);
expect(subscription).toHaveBeenNthCalledWith(7, 8960);
expect(subscription).toHaveBeenNthCalledWith(8, 10752);
expect(subscription).toHaveBeenNthCalledWith(9, 12903);
expect(subscription).toHaveBeenNthCalledWith(10, 15484);
expect(subscription).toHaveBeenNthCalledWith(11, 18581);
expect(subscription).toHaveBeenNthCalledWith(12, 22298);
expect(subscription).toHaveBeenNthCalledWith(13, 26758);
expect(subscription).toHaveBeenNthCalledWith(14, 32110);
expect(subscription).toHaveBeenNthCalledWith(15, 38532);
expect(subscription).toHaveBeenNthCalledWith(16, 46239);
expect(subscription).toHaveBeenNthCalledWith(17, 55487);
expect(subscription).toHaveBeenNthCalledWith(18, 60000);
});
test('should not adjust poll interval dynamically if initial value is > 60s', async () => {
const { subscription, errors$ } = setupScenario(65000);
for (let i = 0; i < 5; i++) {
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
}
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 65000);
});
});
});

View file

@ -13,6 +13,8 @@ import { isEsCannotExecuteScriptError } from './identify_es_error';
const FLUSH_MARKER = Symbol('flush');
export const ADJUST_THROUGHPUT_INTERVAL = 10 * 1000;
export const PREFERRED_MAX_POLL_INTERVAL = 60 * 1000;
export const MIN_WORKERS = 1;
// When errors occur, reduce maxWorkers by MAX_WORKERS_DECREASE_PERCENTAGE
// When errors no longer occur, start increasing maxWorkers by MAX_WORKERS_INCREASE_PERCENTAGE
@ -65,7 +67,10 @@ function createMaxWorkersScan(logger: Logger, startingMaxWorkers: number) {
if (errorCount > 0) {
// Decrease max workers by MAX_WORKERS_DECREASE_PERCENTAGE while making sure it doesn't go lower than 1.
// Using Math.floor to make sure the number is different than previous while not being a decimal value.
newMaxWorkers = Math.max(Math.floor(previousMaxWorkers * MAX_WORKERS_DECREASE_PERCENTAGE), 1);
newMaxWorkers = Math.max(
Math.floor(previousMaxWorkers * MAX_WORKERS_DECREASE_PERCENTAGE),
MIN_WORKERS
);
} else {
// Increase max workers by MAX_WORKERS_INCREASE_PERCENTAGE while making sure it doesn't go
// higher than the starting value. Using Math.ceil to make sure the number is different than
@ -95,7 +100,18 @@ function createPollIntervalScan(logger: Logger, startingPollInterval: number) {
if (errorCount > 0) {
// Increase poll interval by POLL_INTERVAL_INCREASE_PERCENTAGE and use Math.ceil to
// make sure the number is different than previous while not being a decimal value.
newPollInterval = Math.ceil(previousPollInterval * POLL_INTERVAL_INCREASE_PERCENTAGE);
// Also ensure we don't go over PREFERRED_MAX_POLL_INTERVAL or startingPollInterval,
// whichever is greater.
newPollInterval = Math.min(
Math.ceil(previousPollInterval * POLL_INTERVAL_INCREASE_PERCENTAGE),
Math.ceil(Math.max(PREFERRED_MAX_POLL_INTERVAL, startingPollInterval))
);
if (!Number.isSafeInteger(newPollInterval) || newPollInterval < 0) {
logger.error(
`Poll interval configuration had an issue calculating the new poll interval: Math.min(Math.ceil(${previousPollInterval} * ${POLL_INTERVAL_INCREASE_PERCENTAGE}), Math.max(${PREFERRED_MAX_POLL_INTERVAL}, ${startingPollInterval})) = ${newPollInterval}, will keep the poll interval unchanged (${previousPollInterval})`
);
newPollInterval = previousPollInterval;
}
} else {
// Decrease poll interval by POLL_INTERVAL_DECREASE_PERCENTAGE and use Math.floor to
// make sure the number is different than previous while not being a decimal value.
@ -103,6 +119,12 @@ function createPollIntervalScan(logger: Logger, startingPollInterval: number) {
startingPollInterval,
Math.floor(previousPollInterval * POLL_INTERVAL_DECREASE_PERCENTAGE)
);
if (!Number.isSafeInteger(newPollInterval) || newPollInterval < 0) {
logger.error(
`Poll interval configuration had an issue calculating the new poll interval: Math.max(${startingPollInterval}, Math.floor(${previousPollInterval} * ${POLL_INTERVAL_DECREASE_PERCENTAGE})) = ${newPollInterval}, will keep the poll interval unchanged (${previousPollInterval})`
);
newPollInterval = previousPollInterval;
}
}
if (newPollInterval !== previousPollInterval) {
logger.debug(

View file

@ -90,6 +90,57 @@ describe('TaskPoller', () => {
expect(work).toHaveBeenCalledTimes(4);
});
test('poller ignores null pollInterval values', async () => {
const pollInterval = 100;
const pollInterval$ = new BehaviorSubject(pollInterval);
const work = jest.fn(async () => true);
const logger = loggingSystemMock.create().get();
createTaskPoller<void, boolean>({
initialPollInterval: pollInterval,
logger,
pollInterval$,
pollIntervalDelay$: of(0),
getCapacity: () => 1,
work,
}).start();
expect(work).toHaveBeenCalledTimes(1);
// `work` is async, we have to force a node `tick`
await new Promise((resolve) => setImmediate(resolve));
clock.tick(pollInterval);
expect(work).toHaveBeenCalledTimes(2);
pollInterval$.next(pollInterval * 2);
// `work` is async, we have to force a node `tick`
await new Promise((resolve) => setImmediate(resolve));
clock.tick(pollInterval);
expect(work).toHaveBeenCalledTimes(2);
clock.tick(pollInterval);
expect(work).toHaveBeenCalledTimes(3);
// Force null into the events
pollInterval$.next(null as unknown as number);
// `work` is async, we have to force a node `tick`
await new Promise((resolve) => setImmediate(resolve));
clock.tick(pollInterval);
expect(work).toHaveBeenCalledTimes(3);
// `work` is async, we have to force a node `tick`
await new Promise((resolve) => setImmediate(resolve));
clock.tick(pollInterval);
expect(work).toHaveBeenCalledTimes(4);
expect(logger.error).toHaveBeenCalledWith(
new Error(
'Expected the new interval to be a number > 0, received: null but poller will keep using: 200'
)
);
});
test('filters interval polling on capacity', async () => {
const pollInterval = 100;

View file

@ -85,6 +85,17 @@ export function createTaskPoller<T, H>({
return;
}
pollInterval$.subscribe((interval) => {
if (!Number.isSafeInteger(interval) || interval < 0) {
// TODO: Investigate why we sometimes get null / NaN, causing the setTimeout logic to always schedule
// the next polling cycle to run immediately. If we don't see occurrences of this message by December 2024,
// we can remove the TODO and/or check because we now have a cap to how much we increase the poll interval.
logger.error(
new Error(
`Expected the new interval to be a number > 0, received: ${interval} but poller will keep using: ${pollInterval}`
)
);
return;
}
pollInterval = interval;
logger.debug(`Task poller now using interval of ${interval}ms`);
});