mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
# Backport This will backport the following commits from `main` to `8.x`: - [Add more logs to Task Manager poller (#194741)](https://github.com/elastic/kibana/pull/194741) <!--- Backport version: 9.4.3 --> ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sqren/backport) <!--BACKPORT [{"author":{"name":"Mike Côté","email":"mikecote@users.noreply.github.com"},"sourceCommit":{"committedDate":"2024-10-03T17:12:22Z","message":"Add more logs to Task Manager poller (#194741)\n\nIn this PR, I'm adding a few more logs to the task poller to indicate\r\ncritical events to the task poller.\r\n\r\n## To verify\r\n1. Startup Elasticsearch and Kibana (and ensure Elasticsearch data is\r\npersisted somewhere `yarn es snapshot -E path.data=...`)\r\n2. Observe the `Starting the task poller` message on startup\r\n3. Shut down Elasticsearch\r\n4. Observe the following messages:\r\n- `Stopping the task poller because Elasticsearch and/or saved-objects\r\nservice became unavailable`\r\n - `Stopping the task poller`\r\n - `Task poller finished running its last cycle`\r\n5. Startup Elasticsearch again\r\n6. Wait a while and observe the `Starting the task poller` message","sha":"790c5ce7c01fabde140675a24fd852d3f86f7ebc","branchLabelMapping":{"^v9.0.0$":"main","^v8.16.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Feature:Task Manager","Team:ResponseOps","v9.0.0","backport:prev-minor","v8.16.0"],"title":"Add more logs to Task Manager poller","number":194741,"url":"https://github.com/elastic/kibana/pull/194741","mergeCommit":{"message":"Add more logs to Task Manager poller (#194741)\n\nIn this PR, I'm adding a few more logs to the task poller to indicate\r\ncritical events to the task poller.\r\n\r\n## To verify\r\n1. Startup Elasticsearch and Kibana (and ensure Elasticsearch data is\r\npersisted somewhere `yarn es snapshot -E path.data=...`)\r\n2. Observe the `Starting the task poller` message on startup\r\n3. Shut down Elasticsearch\r\n4. Observe the following messages:\r\n- `Stopping the task poller because Elasticsearch and/or saved-objects\r\nservice became unavailable`\r\n - `Stopping the task poller`\r\n - `Task poller finished running its last cycle`\r\n5. Startup Elasticsearch again\r\n6. Wait a while and observe the `Starting the task poller` message","sha":"790c5ce7c01fabde140675a24fd852d3f86f7ebc"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/194741","number":194741,"mergeCommit":{"message":"Add more logs to Task Manager poller (#194741)\n\nIn this PR, I'm adding a few more logs to the task poller to indicate\r\ncritical events to the task poller.\r\n\r\n## To verify\r\n1. Startup Elasticsearch and Kibana (and ensure Elasticsearch data is\r\npersisted somewhere `yarn es snapshot -E path.data=...`)\r\n2. Observe the `Starting the task poller` message on startup\r\n3. Shut down Elasticsearch\r\n4. Observe the following messages:\r\n- `Stopping the task poller because Elasticsearch and/or saved-objects\r\nservice became unavailable`\r\n - `Stopping the task poller`\r\n - `Task poller finished running its last cycle`\r\n5. Startup Elasticsearch again\r\n6. Wait a while and observe the `Starting the task poller` message","sha":"790c5ce7c01fabde140675a24fd852d3f86f7ebc"}},{"branch":"8.x","label":"v8.16.0","branchLabelMappingKey":"^v8.16.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}] BACKPORT--> Co-authored-by: Mike Côté <mikecote@users.noreply.github.com>
This commit is contained in:
parent
6a782989f8
commit
e422533e3d
4 changed files with 36 additions and 13 deletions
|
@ -239,13 +239,14 @@ describe('TaskPoller', () => {
|
||||||
const pollInterval = 100;
|
const pollInterval = 100;
|
||||||
|
|
||||||
const handler = jest.fn();
|
const handler = jest.fn();
|
||||||
|
const workError = new Error('failed to work');
|
||||||
const poller = createTaskPoller<string, string[]>({
|
const poller = createTaskPoller<string, string[]>({
|
||||||
initialPollInterval: pollInterval,
|
initialPollInterval: pollInterval,
|
||||||
logger: loggingSystemMock.create().get(),
|
logger: loggingSystemMock.create().get(),
|
||||||
pollInterval$: of(pollInterval),
|
pollInterval$: of(pollInterval),
|
||||||
pollIntervalDelay$: of(0),
|
pollIntervalDelay$: of(0),
|
||||||
work: async (...args) => {
|
work: async (...args) => {
|
||||||
throw new Error('failed to work');
|
throw workError;
|
||||||
},
|
},
|
||||||
getCapacity: () => 5,
|
getCapacity: () => 5,
|
||||||
});
|
});
|
||||||
|
@ -256,12 +257,13 @@ describe('TaskPoller', () => {
|
||||||
await new Promise((resolve) => setImmediate(resolve));
|
await new Promise((resolve) => setImmediate(resolve));
|
||||||
|
|
||||||
const expectedError = new PollingError<string>(
|
const expectedError = new PollingError<string>(
|
||||||
'Failed to poll for work: Error: failed to work',
|
'Failed to poll for work: failed to work',
|
||||||
PollingErrorType.WorkError,
|
PollingErrorType.WorkError,
|
||||||
none
|
none
|
||||||
);
|
);
|
||||||
expect(handler).toHaveBeenCalledWith(asErr(expectedError));
|
expect(handler).toHaveBeenCalledWith(asErr(expectedError));
|
||||||
expect(handler.mock.calls[0][0].error.type).toEqual(PollingErrorType.WorkError);
|
expect(handler.mock.calls[0][0].error.type).toEqual(PollingErrorType.WorkError);
|
||||||
|
expect(handler.mock.calls[0][0].error.stack).toContain(workError.stack);
|
||||||
});
|
});
|
||||||
|
|
||||||
test('continues polling after work fails', async () => {
|
test('continues polling after work fails', async () => {
|
||||||
|
@ -269,10 +271,11 @@ describe('TaskPoller', () => {
|
||||||
|
|
||||||
const handler = jest.fn();
|
const handler = jest.fn();
|
||||||
let callCount = 0;
|
let callCount = 0;
|
||||||
|
const workError = new Error('failed to work');
|
||||||
const work = jest.fn(async () => {
|
const work = jest.fn(async () => {
|
||||||
callCount++;
|
callCount++;
|
||||||
if (callCount === 2) {
|
if (callCount === 2) {
|
||||||
throw new Error('failed to work');
|
throw workError;
|
||||||
}
|
}
|
||||||
return callCount;
|
return callCount;
|
||||||
});
|
});
|
||||||
|
@ -296,12 +299,13 @@ describe('TaskPoller', () => {
|
||||||
await new Promise((resolve) => setImmediate(resolve));
|
await new Promise((resolve) => setImmediate(resolve));
|
||||||
|
|
||||||
const expectedError = new PollingError<string>(
|
const expectedError = new PollingError<string>(
|
||||||
'Failed to poll for work: Error: failed to work',
|
'Failed to poll for work: failed to work',
|
||||||
PollingErrorType.WorkError,
|
PollingErrorType.WorkError,
|
||||||
none
|
none
|
||||||
);
|
);
|
||||||
expect(handler).toHaveBeenCalledWith(asErr(expectedError));
|
expect(handler).toHaveBeenCalledWith(asErr(expectedError));
|
||||||
expect(handler.mock.calls[1][0].error.type).toEqual(PollingErrorType.WorkError);
|
expect(handler.mock.calls[1][0].error.type).toEqual(PollingErrorType.WorkError);
|
||||||
|
expect(handler.mock.calls[1][0].error.stack).toContain(workError.stack);
|
||||||
expect(handler).not.toHaveBeenCalledWith(asOk(2));
|
expect(handler).not.toHaveBeenCalledWith(asOk(2));
|
||||||
|
|
||||||
clock.tick(pollInterval);
|
clock.tick(pollInterval);
|
||||||
|
@ -342,7 +346,7 @@ describe('TaskPoller', () => {
|
||||||
await new Promise((resolve) => setImmediate(resolve));
|
await new Promise((resolve) => setImmediate(resolve));
|
||||||
|
|
||||||
const expectedError = new PollingError<string>(
|
const expectedError = new PollingError<string>(
|
||||||
'Failed to poll for work: Error: error getting capacity',
|
'Failed to poll for work: error getting capacity',
|
||||||
PollingErrorType.WorkError,
|
PollingErrorType.WorkError,
|
||||||
none
|
none
|
||||||
);
|
);
|
||||||
|
|
|
@ -65,6 +65,8 @@ export function createTaskPoller<T, H>({
|
||||||
if (hasCapacity()) {
|
if (hasCapacity()) {
|
||||||
const result = await work();
|
const result = await work();
|
||||||
subject.next(asOk(result));
|
subject.next(asOk(result));
|
||||||
|
} else {
|
||||||
|
logger.debug('Skipping polling cycle because there is no capacity available');
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
subject.next(asPollingError<T>(e, PollingErrorType.WorkError));
|
subject.next(asPollingError<T>(e, PollingErrorType.WorkError));
|
||||||
|
@ -73,11 +75,16 @@ export function createTaskPoller<T, H>({
|
||||||
if (running) {
|
if (running) {
|
||||||
// Set the next runCycle call
|
// Set the next runCycle call
|
||||||
timeoutId = setTimeout(
|
timeoutId = setTimeout(
|
||||||
() => runCycle().catch(() => {}),
|
() =>
|
||||||
|
runCycle().catch((e) => {
|
||||||
|
subject.next(asPollingError(e, PollingErrorType.PollerError));
|
||||||
|
}),
|
||||||
Math.max(pollInterval - (Date.now() - start) + (pollIntervalDelay % pollInterval), 0)
|
Math.max(pollInterval - (Date.now() - start) + (pollIntervalDelay % pollInterval), 0)
|
||||||
);
|
);
|
||||||
// Reset delay, it's designed to shuffle only once
|
// Reset delay, it's designed to shuffle only once
|
||||||
pollIntervalDelay = 0;
|
pollIntervalDelay = 0;
|
||||||
|
} else {
|
||||||
|
logger.info('Task poller finished running its last cycle');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,14 +120,18 @@ export function createTaskPoller<T, H>({
|
||||||
events$: subject,
|
events$: subject,
|
||||||
start: () => {
|
start: () => {
|
||||||
if (!running) {
|
if (!running) {
|
||||||
|
logger.info('Starting the task poller');
|
||||||
running = true;
|
running = true;
|
||||||
runCycle().catch(() => {});
|
runCycle().catch((e) => {
|
||||||
|
subject.next(asPollingError(e, PollingErrorType.PollerError));
|
||||||
|
});
|
||||||
// We need to subscribe shortly after start. Otherwise, the observables start emiting events
|
// We need to subscribe shortly after start. Otherwise, the observables start emiting events
|
||||||
// too soon for the task run statistics module to capture.
|
// too soon for the task run statistics module to capture.
|
||||||
setTimeout(() => subscribe(), 0);
|
setTimeout(() => subscribe(), 0);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
stop: () => {
|
stop: () => {
|
||||||
|
logger.info('Stopping the task poller');
|
||||||
if (timeoutId) {
|
if (timeoutId) {
|
||||||
clearTimeout(timeoutId);
|
clearTimeout(timeoutId);
|
||||||
timeoutId = null;
|
timeoutId = null;
|
||||||
|
@ -134,21 +145,25 @@ export enum PollingErrorType {
|
||||||
WorkError,
|
WorkError,
|
||||||
WorkTimeout,
|
WorkTimeout,
|
||||||
RequestCapacityReached,
|
RequestCapacityReached,
|
||||||
|
PollerError,
|
||||||
}
|
}
|
||||||
|
|
||||||
function asPollingError<T>(err: string | Error, type: PollingErrorType, data: Option<T> = none) {
|
function asPollingError<T>(err: Error, type: PollingErrorType, data: Option<T> = none) {
|
||||||
return asErr(new PollingError<T>(`Failed to poll for work: ${err}`, type, data));
|
return asErr(new PollingError<T>(`Failed to poll for work: ${err.message}`, type, data, err));
|
||||||
}
|
}
|
||||||
|
|
||||||
export class PollingError<T> extends Error {
|
export class PollingError<T> extends Error {
|
||||||
public readonly type: PollingErrorType;
|
public readonly type: PollingErrorType;
|
||||||
public readonly data: Option<T>;
|
public readonly data: Option<T>;
|
||||||
public readonly source: TaskErrorSource;
|
public readonly source: TaskErrorSource;
|
||||||
constructor(message: string, type: PollingErrorType, data: Option<T>) {
|
constructor(message: string, type: PollingErrorType, data: Option<T>, cause?: Error) {
|
||||||
super(message);
|
super(message, { cause });
|
||||||
Object.setPrototypeOf(this, new.target.prototype);
|
Object.setPrototypeOf(this, new.target.prototype);
|
||||||
this.type = type;
|
this.type = type;
|
||||||
this.data = data;
|
this.data = data;
|
||||||
this.source = TaskErrorSource.FRAMEWORK;
|
this.source = TaskErrorSource.FRAMEWORK;
|
||||||
|
if (cause) {
|
||||||
|
this.stack = `${this.stack}\nCaused by:\n${cause.stack}`;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -515,9 +515,10 @@ describe('TaskPollingLifecycle', () => {
|
||||||
(event: TaskLifecycleEvent) => event.type === TaskEventType.TASK_POLLING_CYCLE
|
(event: TaskLifecycleEvent) => event.type === TaskEventType.TASK_POLLING_CYCLE
|
||||||
);
|
);
|
||||||
|
|
||||||
|
expect(pollingCycleEvent!.event.tag).toEqual('err');
|
||||||
expect(pollingCycleEvent!.event).toEqual({
|
expect(pollingCycleEvent!.event).toEqual({
|
||||||
tag: 'err',
|
tag: 'err',
|
||||||
error: new Error(`Failed to poll for work: Error: booo`),
|
error: new Error(`Failed to poll for work: booo`),
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -199,6 +199,9 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
|
||||||
// start polling for work
|
// start polling for work
|
||||||
poller.start();
|
poller.start();
|
||||||
} else if (!areESAndSOAvailable) {
|
} else if (!areESAndSOAvailable) {
|
||||||
|
this.logger.info(
|
||||||
|
`Stopping the task poller because Elasticsearch and/or saved-objects service became unavailable`
|
||||||
|
);
|
||||||
poller.stop();
|
poller.stop();
|
||||||
this.pool.cancelRunningTasks();
|
this.pool.cancelRunningTasks();
|
||||||
}
|
}
|
||||||
|
@ -285,7 +288,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
|
||||||
mapOptional((id) => this.emitEvent(asTaskRunRequestEvent(id, asErr(error))))
|
mapOptional((id) => this.emitEvent(asTaskRunRequestEvent(id, asErr(error))))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
this.logger.error(error.message);
|
this.logger.error(error.message, { error: { stack_trace: error.stack } });
|
||||||
|
|
||||||
// Emit event indicating task manager utilization % at the end of a polling cycle
|
// Emit event indicating task manager utilization % at the end of a polling cycle
|
||||||
// Because there was a polling error, no tasks were claimed so this represents the number of workers busy
|
// Because there was a polling error, no tasks were claimed so this represents the number of workers busy
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue