Don't stop Task Manager polling when ES or SO is unavailable (#209794)

Resolves: #203470

This PR removes the codes that stop task polling when Elasticsearch or
SO service is unavailable.
So the TM relies only on the back pressure mechanism.
502 and 504 status codes are also added to be sure that all the possible
reasons that stops ES or SO are covered by the back pressure.

## To verify:

Force Elasticsearch version check to throw an error:

https://github.com/elastic/kibana/blob/main/src/core/packages/elasticsearch/server-internal/src/version_check/ensure_es_version.ts#L189

Then mock the response of `this.esClientWithoutRetries.msearch` in task
store
[here](https://github.com/elastic/kibana/blob/main/x-pack/platform/plugins/shared/task_manager/server/task_store.ts#L584)

Example:

```
    const responses = [
      {
        error: {
          type: 'not found',
        },
        took: 1000,
        timed_out: false,
        hits: { hits: [] },
        _shards: {
          failed: 1,
          successful: 0,
          total: 1,
        },
        status: 503,
      },
    ];
  ```  
  Expect[ back pressure](https://github.com/elastic/kibana/blob/main/x-pack/platform/plugins/shared/task_manager/server/lib/create_managed_configuration.ts#L182) to return a longer poll interval.
This commit is contained in:
Ersin Erdal 2025-02-05 19:50:06 +01:00 committed by GitHub
parent 04102c4141
commit 906910c737
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 39 additions and 53 deletions

View file

@ -207,6 +207,17 @@ describe('createManagedConfiguration()', () => {
});
describe('mget claim strategy', () => {
test('should not decrease configuration at the next interval when an error without status code is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(new Error());
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
});
test('should decrease configuration at the next interval when an msearch 429 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(new MsearchError(429));
@ -266,6 +277,17 @@ describe('createManagedConfiguration()', () => {
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});
test('should decrease configuration at the next interval when an msearch 502 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(new MsearchError(502));
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});
test('should decrease configuration at the next interval when a bulkPartialUpdate 503 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(
@ -279,6 +301,17 @@ describe('createManagedConfiguration()', () => {
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});
test('should decrease configuration at the next interval when an msearch 504 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(new MsearchError(504));
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});
test('should not change configuration at the next interval when other msearch error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(new MsearchError(404));

View file

@ -193,11 +193,9 @@ export function countErrors(
SavedObjectsErrorHelpers.isGeneralError(e) ||
isEsCannotExecuteScriptError(e) ||
getMsearchStatusCode(e) === 429 ||
getMsearchStatusCode(e) === 500 ||
getMsearchStatusCode(e) === 503 ||
(getMsearchStatusCode(e) !== undefined && getMsearchStatusCode(e)! >= 500) ||
getBulkUpdateStatusCode(e) === 429 ||
getBulkUpdateStatusCode(e) === 500 ||
getBulkUpdateStatusCode(e) === 503 ||
(getBulkUpdateStatusCode(e) !== undefined && getBulkUpdateStatusCode(e)! >= 500) ||
isClusterBlockException(e)
)
)

View file

@ -185,22 +185,6 @@ describe('TaskPollingLifecycle', () => {
});
describe('stop', () => {
test('stops polling once the ES and SavedObjects services become unavailable', () => {
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
new TaskPollingLifecycle({ elasticsearchAndSOAvailability$, ...taskManagerOpts });
elasticsearchAndSOAvailability$.next(true);
clock.tick(150);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
elasticsearchAndSOAvailability$.next(false);
mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockClear();
clock.tick(150);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).not.toHaveBeenCalled();
});
test('stops polling if stop() is called', () => {
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
const pollingLifecycle = new TaskPollingLifecycle({
@ -223,30 +207,6 @@ describe('TaskPollingLifecycle', () => {
clock.tick(100);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalledTimes(1);
});
test('restarts polling once the ES and SavedObjects services become available again', () => {
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
new TaskPollingLifecycle({
elasticsearchAndSOAvailability$,
...taskManagerOpts,
});
elasticsearchAndSOAvailability$.next(true);
clock.tick(150);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
elasticsearchAndSOAvailability$.next(false);
mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockClear();
clock.tick(150);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).not.toHaveBeenCalled();
elasticsearchAndSOAvailability$.next(true);
clock.tick(150);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
});
});
describe('claimAvailableTasks', () => {

View file

@ -97,6 +97,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
private logger: Logger;
private poller: TaskPoller<string, TimedFillPoolResult>;
private started = false;
public pool: TaskPool;
@ -121,9 +122,9 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
constructor({
logger,
middleware,
config,
// Elasticsearch and SavedObjects availability status
elasticsearchAndSOAvailability$,
config,
taskStore,
definitions,
executionContext,
@ -223,15 +224,9 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
this.subscribeToPoller(this.poller.events$);
elasticsearchAndSOAvailability$.subscribe((areESAndSOAvailable) => {
if (areESAndSOAvailable) {
// start polling for work
if (areESAndSOAvailable && !this.started) {
this.poller.start();
} else if (!areESAndSOAvailable) {
this.logger.info(
`Stopping the task poller because Elasticsearch and/or saved-objects service became unavailable`
);
this.poller.stop();
this.pool.cancelRunningTasks();
this.started = true;
}
});
}