Wait for Elasticsearch cluster health to be green/yellow on task manager index before starting the poller (#212785)

I've noticed some serverless projects would encounter `503` errors
shortly after "resuming". When this happens, Elasticsearch needs time to
restore indices and their data before it can fulfill requests
successfully. It was recommended to wait for the cluster / index to have
a healthy green (serverless) / yellow (stateful) status before starting
to run background tasks. This way the task manager will not encounter
503 errors as often which reflect into the metrics.

There are a few functional details to the changes I've made:
- Narrows the health call to the task manager index only
- Waits for green on serverless and yellow on stateful
- Has a timeout of 30s
- Will start claiming tasks after the timeout or when an error is
returned by the API call - to prevent a node not claiming tasks at all
(reduce risk, smoother introduction to this new constraint)

## To verify
- Ensure code reflects functional requirements
- Verify unit tests validate the functionality on various code paths
- Ensure Kibana starts claiming tasks on startup once the health API
responds (can also check on serverless and ECH. I spun up one of each
with this PR)

---------

Co-authored-by: Ying Mao <ying.mao@elastic.co>
This commit is contained in:
Mike Côté 2025-03-07 12:38:16 -05:00 committed by GitHub
parent a78f9c2efe
commit 5041031b5d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 305 additions and 136 deletions

View file

@ -101,6 +101,12 @@ describe('managed configuration', () => {
auto_calculate_default_ech_capacity: false,
};
async function runSetTimeout0() {
const promiseResult = new Promise((resolve) => setTimeout(resolve, 0));
clock.tick(0);
await promiseResult;
}
afterEach(() => clock.restore());
describe('managed poll interval with default claim strategy', () => {
@ -130,8 +136,10 @@ describe('managed configuration', () => {
taskManagerStart = await taskManager.start(coreStart, {});
// force rxjs timers to fire when they are scheduled for setTimeout(0) as the
// sinon fake timers cause them to stall
clock.tick(0);
// sinon fake timers cause them to stall. We need to do this a few times for the
// startup code to start monitoring the poll configuration properly.
await runSetTimeout0();
await runSetTimeout0();
});
test('should increase poll interval when Elasticsearch returns 429 error', async () => {
@ -226,8 +234,10 @@ describe('managed configuration', () => {
taskManagerStart = taskManager.start(coreStart, {});
// force rxjs timers to fire when they are scheduled for setTimeout(0) as the
// sinon fake timers cause them to stall
clock.tick(0);
// sinon fake timers cause them to stall. We need to do this a few times for the
// startup code to start monitoring the poll configuration properly.
await runSetTimeout0();
await runSetTimeout0();
});
test('should increase poll interval when Elasticsearch returns 429 error', async () => {
@ -326,8 +336,10 @@ describe('managed configuration', () => {
taskManagerStart = await taskManager.start(coreStart, {});
// force rxjs timers to fire when they are scheduled for setTimeout(0) as the
// sinon fake timers cause them to stall
clock.tick(0);
// sinon fake timers cause them to stall. We need to do this a few times for the
// startup code to start monitoring the poll configuration properly.
await runSetTimeout0();
await runSetTimeout0();
});
test('should lower capacity when Elasticsearch returns 429 error', async () => {

View file

@ -0,0 +1,204 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Subject } from 'rxjs';
import { bufferCount, take } from 'rxjs';
import { loggingSystemMock, elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { CoreStatus, ServiceStatusLevels } from '@kbn/core/server';
import {
getElasticsearchAndSOAvailability,
GetElasticsearchAndSOAvailabilityOpts,
} from './get_es_and_so_availability';
import type { ClusterHealthHealthResponseBody } from '@elastic/elasticsearch/lib/api/types';
const logger = loggingSystemMock.createLogger();
const clusterClientMock = elasticsearchServiceMock.createClusterClient();
const getClusterClient = async () => clusterClientMock;
function getOpts(
overwrites: Partial<GetElasticsearchAndSOAvailabilityOpts>
): GetElasticsearchAndSOAvailabilityOpts {
return {
core$: new Subject<CoreStatus>(),
logger,
getClusterClient,
isServerless: false,
...overwrites,
};
}
describe('getElasticsearchAndSOAvailability', () => {
beforeEach(() => {
jest.resetAllMocks();
});
test('returns false when elasticsearch isnt avialable, so is avialable and elasticsearch is healthy', async () => {
const core$ = new Subject<CoreStatus>();
clusterClientMock.asInternalUser.cluster.health.mockResolvedValue(healthyEsResponse);
const availability = getElasticsearchAndSOAvailability(getOpts({ core$ }))
.pipe(take(3), bufferCount(3))
.toPromise();
core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: true }));
expect(await availability).toEqual([false, false, false]);
core$.complete();
});
test('returns false when so isnt available, elasticsearch is available and elasticsearch is healthy', async () => {
const core$ = new Subject<CoreStatus>();
clusterClientMock.asInternalUser.cluster.health.mockResolvedValue(healthyEsResponse);
const availability = getElasticsearchAndSOAvailability(getOpts({ core$ }))
.pipe(take(3), bufferCount(3))
.toPromise();
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: false }));
expect(await availability).toEqual([false, false, false]);
core$.complete();
});
test('returns true when both services are available and elasticsearch isnt healthy', async () => {
const core$ = new Subject<CoreStatus>();
clusterClientMock.asInternalUser.cluster.health.mockRejectedValue(
new Error('Request timed out')
);
const availability = getElasticsearchAndSOAvailability(getOpts({ core$ }))
.pipe(take(3), bufferCount(3))
.toPromise();
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true }));
expect(await availability).toEqual([false, false, true]);
expect(logger.error).toHaveBeenCalledWith(
'Error loading the cluster health. The task poller will start regardless. Error: Request timed out'
);
core$.complete();
});
test('returns true when both services are available and elasticsearch is healthy', async () => {
const core$ = new Subject<CoreStatus>();
clusterClientMock.asInternalUser.cluster.health.mockResolvedValue(healthyEsResponse);
const availability = getElasticsearchAndSOAvailability(getOpts({ core$ }))
.pipe(take(3), bufferCount(3))
.toPromise();
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true }));
expect(await availability).toEqual([false, false, true]);
core$.complete();
});
test('shift back and forth between values as status changes', async () => {
const core$ = new Subject<CoreStatus>();
clusterClientMock.asInternalUser.cluster.health.mockResolvedValue(healthyEsResponse);
const availability = getElasticsearchAndSOAvailability(getOpts({ core$ }))
.pipe(take(5), bufferCount(5))
.toPromise();
// Let the health API response processing go first
await new Promise((resolve) => setTimeout(resolve, 1));
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: false }));
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true }));
core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: false }));
expect(await availability).toEqual([false, false, false, true, false]);
core$.complete();
});
test('wait for health=green when serverless=true', async () => {
const core$ = new Subject<CoreStatus>();
clusterClientMock.asInternalUser.cluster.health.mockResolvedValue(healthyEsResponse);
const availability = getElasticsearchAndSOAvailability(getOpts({ core$, isServerless: true }))
.pipe(take(3), bufferCount(3))
.toPromise();
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true }));
expect(await availability).toEqual([false, false, true]);
expect(clusterClientMock.asInternalUser.cluster.health).toHaveBeenCalledWith({
wait_for_status: 'green',
timeout: '30s',
index: '.kibana_task_manager',
});
core$.complete();
});
test('wait for health=yellow when serverless=false', async () => {
const core$ = new Subject<CoreStatus>();
clusterClientMock.asInternalUser.cluster.health.mockResolvedValue(healthyEsResponse);
const availability = getElasticsearchAndSOAvailability(getOpts({ core$, isServerless: false }))
.pipe(take(3), bufferCount(3))
.toPromise();
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true }));
expect(await availability).toEqual([false, false, true]);
expect(clusterClientMock.asInternalUser.cluster.health).toHaveBeenCalledWith({
wait_for_status: 'yellow',
timeout: '30s',
index: '.kibana_task_manager',
});
core$.complete();
});
test('returns true when both services are available and elasticsearch cluster client fails to load', async () => {
const core$ = new Subject<CoreStatus>();
const availability = getElasticsearchAndSOAvailability(
getOpts({ core$, getClusterClient: jest.fn().mockRejectedValue(new Error('Failed to load')) })
)
.pipe(take(3), bufferCount(3))
.toPromise();
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true }));
expect(await availability).toEqual([false, false, true]);
expect(logger.error).toHaveBeenCalledWith(
'Error loading the cluster client to fetch cluster health. The task poller will start regardless. Error: Failed to load'
);
core$.complete();
});
});
function mockCoreStatusAvailability({
elasticsearch,
savedObjects,
}: {
elasticsearch: boolean;
savedObjects: boolean;
}) {
return {
elasticsearch: {
level: elasticsearch ? ServiceStatusLevels.available : ServiceStatusLevels.unavailable,
summary: '',
},
savedObjects: {
level: savedObjects ? ServiceStatusLevels.available : ServiceStatusLevels.unavailable,
summary: '',
},
};
}
const healthyEsResponse: ClusterHealthHealthResponseBody = {
cluster_name: 'elasticsearch',
status: 'green',
timed_out: false,
number_of_nodes: 1,
number_of_data_nodes: 1,
active_primary_shards: 34,
active_shards: 34,
relocating_shards: 0,
initializing_shards: 0,
unassigned_shards: 0,
unassigned_primary_shards: 0,
delayed_unassigned_shards: 0,
number_of_pending_tasks: 1,
number_of_in_flight_fetch: 0,
task_max_waiting_in_queue_millis: 0,
active_shards_percent_as_number: 100,
};

View file

@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Observable, BehaviorSubject } from 'rxjs';
import { Logger, ServiceStatusLevels, CoreStatus, IClusterClient } from '@kbn/core/server';
import { TASK_MANAGER_INDEX } from '../constants';
export interface GetElasticsearchAndSOAvailabilityOpts {
core$: Observable<CoreStatus>;
isServerless: boolean;
logger: Logger;
getClusterClient: () => Promise<IClusterClient>;
}
export function getElasticsearchAndSOAvailability({
core$,
isServerless,
logger,
getClusterClient,
}: GetElasticsearchAndSOAvailabilityOpts): Observable<boolean> {
let isEsHealthy = false;
let isEsServiceAvailable = false;
let isSoServiceAvailable = false;
const result = new BehaviorSubject<boolean>(false);
core$.subscribe(({ elasticsearch, savedObjects }) => {
isEsServiceAvailable = elasticsearch.level === ServiceStatusLevels.available;
isSoServiceAvailable = savedObjects.level === ServiceStatusLevels.available;
result.next(isEsHealthy && isEsServiceAvailable && isSoServiceAvailable);
});
// Load cluster health to ensure task index is ready
getClusterClient()
.then((client) => {
client.asInternalUser.cluster
.health({
wait_for_status: isServerless ? 'green' : 'yellow',
timeout: '30s',
index: TASK_MANAGER_INDEX,
})
.then((healthResult) => {
logger.debug(`Cluster health: ${JSON.stringify(healthResult)}`);
isEsHealthy = true;
result.next(isEsHealthy && isEsServiceAvailable && isSoServiceAvailable);
})
.catch((e) => {
logger.error(
`Error loading the cluster health. The task poller will start regardless. Error: ${e.message}`
);
// Even if we can't load the cluster health, we should start the task
// poller in case the issue is unrelated.
isEsHealthy = true;
result.next(isEsHealthy && isEsServiceAvailable && isSoServiceAvailable);
});
})
.catch((e) => {
logger.error(
`Error loading the cluster client to fetch cluster health. The task poller will start regardless. Error: ${e.message}`
);
// Even if we can't load the cluster health, we should start the task
// poller in case the issue is unrelated.
isEsHealthy = true;
result.next(isEsHealthy && isEsServiceAvailable && isSoServiceAvailable);
});
return result;
}

View file

@ -5,14 +5,11 @@
* 2.0.
*/
import { TaskManagerPlugin, getElasticsearchAndSOAvailability } from './plugin';
import { TaskManagerPlugin } from './plugin';
import { KibanaDiscoveryService } from './kibana_discovery_service';
import { coreMock } from '@kbn/core/server/mocks';
import { TaskManagerConfig } from './config';
import { Subject } from 'rxjs';
import { bufferCount, take } from 'rxjs';
import { CoreStatus, ServiceStatusLevels } from '@kbn/core/server';
import { cloudMock } from '@kbn/cloud-plugin/public/mocks';
import { taskPollingLifecycleMock } from './polling_lifecycle.mock';
import { TaskPollingLifecycle } from './polling_lifecycle';
@ -215,99 +212,4 @@ describe('TaskManagerPlugin', () => {
expect(deleteCurrentNodeSpy).toHaveBeenCalledTimes(1);
});
});
describe('getElasticsearchAndSOAvailability', () => {
test('returns true when both services are available', async () => {
const core$ = new Subject<CoreStatus>();
const availability = getElasticsearchAndSOAvailability(core$)
.pipe(take(1), bufferCount(1))
.toPromise();
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true }));
expect(await availability).toEqual([true]);
});
test('returns false when both services are unavailable', async () => {
const core$ = new Subject<CoreStatus>();
const availability = getElasticsearchAndSOAvailability(core$)
.pipe(take(1), bufferCount(1))
.toPromise();
core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: false }));
expect(await availability).toEqual([false]);
});
test('returns false when one service is unavailable but the other is available', async () => {
const core$ = new Subject<CoreStatus>();
const availability = getElasticsearchAndSOAvailability(core$)
.pipe(take(1), bufferCount(1))
.toPromise();
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: false }));
expect(await availability).toEqual([false]);
});
test('shift back and forth between values as status changes', async () => {
const core$ = new Subject<CoreStatus>();
const availability = getElasticsearchAndSOAvailability(core$)
.pipe(take(3), bufferCount(3))
.toPromise();
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: false }));
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true }));
core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: false }));
expect(await availability).toEqual([false, true, false]);
});
test(`skips values when the status hasn't changed`, async () => {
const core$ = new Subject<CoreStatus>();
const availability = getElasticsearchAndSOAvailability(core$)
.pipe(take(3), bufferCount(3))
.toPromise();
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: false }));
// still false, so shouldn't emit a second time
core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: true }));
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true }));
// shouldn't emit as already true
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true }));
core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: false }));
expect(await availability).toEqual([false, true, false]);
});
});
});
function mockCoreStatusAvailability({
elasticsearch,
savedObjects,
}: {
elasticsearch: boolean;
savedObjects: boolean;
}) {
return {
elasticsearch: {
level: elasticsearch ? ServiceStatusLevels.available : ServiceStatusLevels.unavailable,
summary: '',
},
savedObjects: {
level: savedObjects ? ServiceStatusLevels.available : ServiceStatusLevels.unavailable,
summary: '',
},
};
}

View file

@ -13,15 +13,7 @@ import type {
UsageCollectionStart,
UsageCounter,
} from '@kbn/usage-collection-plugin/server';
import {
PluginInitializerContext,
Plugin,
CoreSetup,
Logger,
CoreStart,
ServiceStatusLevels,
CoreStatus,
} from '@kbn/core/server';
import { PluginInitializerContext, Plugin, CoreSetup, Logger, CoreStart } from '@kbn/core/server';
import type { CloudSetup, CloudStart } from '@kbn/cloud-plugin/server';
import {
registerDeleteInactiveNodesTaskDefinition,
@ -52,6 +44,7 @@ import {
registerMarkRemovedTasksAsUnrecognizedDefinition,
scheduleMarkRemovedTasksAsUnrecognizedDefinition,
} from './removed_tasks/mark_removed_tasks_as_unrecognized';
import { getElasticsearchAndSOAvailability } from './lib/get_es_and_so_availability';
export interface TaskManagerSetupContract {
/**
@ -144,7 +137,16 @@ export class TaskManagerPlugin
core: CoreSetup<TaskManagerPluginsStart, TaskManagerStartContract>,
plugins: TaskManagerPluginsSetup
): TaskManagerSetupContract {
this.elasticsearchAndSOAvailability$ = getElasticsearchAndSOAvailability(core.status.core$);
const isServerless = this.initContext.env.packageInfo.buildFlavor === 'serverless';
const clusterClientPromise = core
.getStartServices()
.then(([coreServices]) => coreServices.elasticsearch.client);
this.elasticsearchAndSOAvailability$ = getElasticsearchAndSOAvailability({
core$: core.status.core$,
isServerless,
logger: this.logger,
getClusterClient: () => clusterClientPromise,
});
core.metrics
.getOpsMetrics$()
@ -165,10 +167,6 @@ export class TaskManagerPlugin
this.logger.info(`TaskManager is identified by the Kibana UUID: ${this.taskManagerId}`);
}
const startServicesPromise = core.getStartServices().then(([coreServices]) => ({
elasticsearch: coreServices.elasticsearch,
}));
this.usageCounter = plugins.usageCollection?.createUsageCounter(`taskManager`);
// Routes
@ -182,8 +180,7 @@ export class TaskManagerPlugin
usageCounter: this.usageCounter!,
kibanaVersion: this.kibanaVersion,
kibanaIndexName: core.savedObjects.getDefaultIndex(),
getClusterClient: () =>
startServicesPromise.then(({ elasticsearch }) => elasticsearch.client),
getClusterClient: () => clusterClientPromise,
shouldRunTasks: this.shouldRunBackgroundTasks,
docLinks: core.docLinks,
numOfKibanaInstances$: this.numOfKibanaInstances$,
@ -197,8 +194,7 @@ export class TaskManagerPlugin
usageCounter: this.usageCounter!,
kibanaVersion: this.kibanaVersion,
kibanaIndexName: core.savedObjects.getDefaultIndex(),
getClusterClient: () =>
startServicesPromise.then(({ elasticsearch }) => elasticsearch.client),
getClusterClient: () => clusterClientPromise,
});
metricsRoute({
router,
@ -422,16 +418,3 @@ export class TaskManagerPlugin
}
}
}
export function getElasticsearchAndSOAvailability(
core$: Observable<CoreStatus>
): Observable<boolean> {
return core$.pipe(
map(
({ elasticsearch, savedObjects }) =>
elasticsearch.level === ServiceStatusLevels.available &&
savedObjects.level === ServiceStatusLevels.available
),
distinctUntilChanged()
);
}