[Fleet] fix bulk actions timing out sometimes (#205735)

## Summary

Closes https://github.com/elastic/ingest-dev/issues/4346

Update: changed the implementation to run the first attempt of bulk
action execution in the task too.

```
[2025-01-08T11:10:54.139+01:00][INFO ][plugins.fleet] Running action asynchronously, actionId: f8658178-cb1e-485d-9d2f-ad60ccc37bc9, total agents:10001
actionParams {
  kuery: '( fleet-agents.policy_id : ("d3448ae1-9e55-485e-b74c-83471cb06977")) and (status:online or (status:error or status:degraded) or (status:updating or status:unenrolling or status:enrolling) or status:offline)',
  revoke: false,
  force: undefined,
  batchSize: 10000,
  showInactive: true,
  spaceId: 'default',
  total: 10001,
  actionId: 'f8658178-cb1e-485d-9d2f-ad60ccc37bc9'
}
retryParams {
  pitId: 'gIuaBAEPLmZsZWV0LWFnZW50cy03FmRQN3pDT1gzUnFpNFkwdHJFdzJvbncAARZ5N2R5SnVSelN2bWNMang1THNfVkNRAAEAAAAAAAFpZhZiZnpybjZYSFRRNklYNlBqWHFzVU1nAAEWZFA3ekNPWDNScWk0WTB0ckV3Mm9udwAA',
  retryCount: 1
}
[2025-01-08T11:10:54.154+01:00][INFO ][plugins.fleet] Scheduling task fleet:unenroll_action:retry:f8658178-cb1e-485d-9d2f-ad60ccc37bc9
[2025-01-08T11:10:54.154+01:00][DEBUG][plugins.fleet] [Bulk Agent Unenroll API] Unenroll agents in 647ms
[2025-01-08T11:10:55.772+01:00][WARN ][http.server.Kibana] Event loop utilization for /julia/api/fleet/agents exceeded threshold of 250ms (351ms out of 633ms) and 15% (55%) 
[2025-01-08T11:10:57.235+01:00][INFO ][plugins.fleet] Running bulk action retry task
[2025-01-08T11:10:57.235+01:00][DEBUG][plugins.fleet] Running task fleet:unenroll_action:retry:f8658178-cb1e-485d-9d2f-ad60ccc37bc9
[2025-01-08T11:10:57.236+01:00][INFO ][plugins.fleet] Completed bulk action retry task
[2025-01-08T11:10:57.251+01:00][INFO ][plugins.fleet] Scheduling task fleet:unenroll_action:retry:check:f8658178-cb1e-485d-9d2f-ad60ccc37bc9
[2025-01-08T11:10:57.251+01:00][DEBUG][plugins.fleet] kuery: ( fleet-agents.policy_id : ("d3448ae1-9e55-485e-b74c-83471cb06977")) and (status:online or (status:error or status:degraded) or (status:updating or status:unenrolling or status:enrolling) or status:offline)
[2025-01-08T11:10:57.781+01:00][DEBUG][plugins.fleet] Action not found
[2025-01-08T11:10:58.891+01:00][DEBUG][plugins.fleet] Secrets storage requirements already met, turned on in settings
[2025-01-08T11:10:59.414+01:00][WARN ][http.server.Kibana] Event loop utilization for /julia/api/fleet/agent_status exceeded threshold of 250ms (294ms out of 348ms) and 15% (85%) 
[2025-01-08T11:10:59.806+01:00][WARN ][http.server.Kibana] Event loop utilization for /julia/api/fleet/agents exceeded threshold of 250ms (504ms out of 743ms) and 15% (68%) 
[2025-01-08T11:11:01.532+01:00][INFO ][plugins.fleet] Removing task fleet:unenroll_action:retry:check:f8658178-cb1e-485d-9d2f-ad60ccc37bc9
actionParams {
  kuery: '( fleet-agents.policy_id : ("d3448ae1-9e55-485e-b74c-83471cb06977")) and (status:online or (status:error or status:degraded) or (status:updating or status:unenrolling or status:enrolling) or status:offline)',
  revoke: false,
  batchSize: 10000,
  showInactive: true,
  spaceId: 'default',
  total: 10001,
  actionId: 'f8658178-cb1e-485d-9d2f-ad60ccc37bc9'
}
retryParams {
  pitId: 'gIuaBAEPLmZsZWV0LWFnZW50cy03FmRQN3pDT1gzUnFpNFkwdHJFdzJvbncAARZ5N2R5SnVSelN2bWNMang1THNfVkNRAAEAAAAAAAFpZhZiZnpybjZYSFRRNklYNlBqWHFzVU1nAAEWZFA3ekNPWDNScWk0WTB0ckV3Mm9udwAA',
  retryCount: 1,
  taskId: 'fleet:unenroll_action:retry:f8658178-cb1e-485d-9d2f-ad60ccc37bc9',
  searchAfter: [ 1736331016589, 'online-98', 119770 ]
}
[2025-01-08T11:11:01.564+01:00][INFO ][plugins.fleet] Scheduling task fleet:unenroll_action:retry:check:f8658178-cb1e-485d-9d2f-ad60ccc37bc9
[2025
```


## Old description

Bulk actions supposed to run async in a kibana task, and the API to
return quickly with an action id.
This was implemented
[here](38e74d7ee6 (diff-69fa063ab8857204486203a718ff4bd0cbf9652623279d1959316de8e83233ff))
and unintentionally broke when a tslint rule was introduced
[here](https://github.com/elastic/kibana/pull/181456/files#diff-69fa063ab8857204486203a718ff4bd0cbf9652623279d1959316de8e83233ff),
effectively letting the async code finish before the API returns. This
results in the API timing out sometimes when there are many agents.

Tested by creating 100k agent docs with the `create_agents` script and
bulk unenroll agents.

Logs before the change:
```
[2025-01-07T14:38:04.467+01:00][INFO ][plugins.fleet] Running action asynchronously, actionId: 2a57ac7a-0c1b-4a08-8709-6ccb683db995, total agents:100000
[2025-01-07T14:38:04.482+01:00][INFO ][plugins.fleet] Scheduling task fleet:unenroll_action:retry:check:2a57ac7a-0c1b-4a08-8709-6ccb683db995
[2025-01-07T14:38:04.482+01:00][DEBUG][plugins.fleet] kuery: status:* AND (fleet-agents.policy_id : ("d3448ae1-9e55-485e-b74c-83471cb06977"))
...
[2025-01-07T14:39:00.264+01:00][INFO ][plugins.fleet] Removing task fleet:unenroll_action:retry:check:2a57ac7a-0c1b-4a08-8709-6ccb683db995
[2025-01-07T14:39:00.290+01:00][INFO ][plugins.fleet] Scheduling task fleet:unenroll_action:retry:check:2a57ac7a-0c1b-4a08-8709-6ccb683db995
[2025-01-07T14:39:00.293+01:00][INFO ][plugins.fleet] processed 100000 agents, took 55811ms
[2025-01-07T14:39:00.293+01:00][INFO ][plugins.fleet] Removing task fleet:unenroll_action:retry:check:2a57ac7a-0c1b-4a08-8709-6ccb683db995
[2025-01-07T14:39:00.304+01:00][DEBUG][plugins.fleet] [Bulk Agent Unenroll API] Unenroll agents in 56027ms
```

Logs after the change:
```
[2025-01-07T14:42:10.178+01:00][INFO ][plugins.fleet] Running action asynchronously, actionId: c3e12928-bbfe-4731-bdc2-c47bcac727a7, total agents:100000
[2025-01-07T14:42:10.194+01:00][INFO ][plugins.fleet] Scheduling task fleet:unenroll_action:retry:check:c3e12928-bbfe-4731-bdc2-c47bcac727a7
[2025-01-07T14:42:10.195+01:00][DEBUG][plugins.fleet] kuery: status:* AND (fleet-agents.policy_id : ("d3448ae1-9e55-485e-b74c-83471cb06977"))
[2025-01-07T14:42:10.195+01:00][DEBUG][plugins.fleet] [Bulk Agent Unenroll API] Unenroll agents in 196ms
[2025-01-07T14:43:00.762+01:00][INFO ][plugins.fleet] processed 100000 agents, took 50567ms
[2025-01-07T14:43:00.762+01:00][INFO ][plugins.fleet] Removing task fleet:unenroll_action:retry:check:c3e12928-bbfe-4731-bdc2-c47bcac727a7
```
This commit is contained in:
Julia Bardi 2025-01-10 11:24:53 +01:00 committed by GitHub
parent 6e3c967285
commit 20fa1a54c1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 116 additions and 71 deletions

View file

@ -63,13 +63,8 @@ export abstract class ActionRunner {
protected abstract processAgents(agents: Agent[]): Promise<{ actionId: string }>;
/**
* Common runner logic accross all agent bulk actions
* Starts action execution immeditalely, asynchronously
* On errors, starts a task with Task Manager to retry max 3 times
* If the last batch was stored in state, retry continues from there (searchAfter)
*/
public async runActionAsyncWithRetry(): Promise<{ actionId: string }> {
// first attempt to run bulk action async in a task, called from API handlers
public async runActionAsyncTask(): Promise<{ actionId: string }> {
appContextService
.getLogger()
.info(
@ -82,6 +77,33 @@ export abstract class ActionRunner {
this.bulkActionsResolver = await appContextService.getBulkActionsResolver();
}
const taskId = this.bulkActionsResolver!.getTaskId(
this.actionParams.actionId!,
this.getTaskType()
);
await this.bulkActionsResolver!.run(
this.actionParams,
{
...this.retryParams,
retryCount: (this.retryParams.retryCount ?? 0) + 1,
},
this.getTaskType(),
taskId
);
return { actionId: this.actionParams.actionId! };
}
/**
* Common runner logic accross all agent bulk actions
* Starts action execution immeditalely, asynchronously
* On errors, starts a task with Task Manager to retry max 3 times
* If the last batch was stored in state, retry continues from there (searchAfter)
*/
public async runActionAsyncWithRetry(): Promise<{ actionId: string }> {
if (!this.bulkActionsResolver) {
this.bulkActionsResolver = await appContextService.getBulkActionsResolver();
}
// create task to check result with some delay, this runs in case of kibana crash too
this.checkTaskId = await this.createCheckResultTask();

View file

@ -144,7 +144,11 @@ export function createRetryTask(
appContextService
.getLogger()
.debug(`Retry #${retryParams.retryCount} of task ${taskInstance.id}`);
.debug(
retryParams.retryCount === 1
? `Running task ${taskInstance.id}`
: `Retry #${retryParams.retryCount} of task ${taskInstance.id}`
);
if (retryParams.searchAfter) {
appContextService.getLogger().info('Continuing task from batch ' + retryParams.searchAfter);

View file

@ -143,7 +143,7 @@ export async function reassignAgents(
newAgentPolicyId,
},
{ pitId: await openPointInTime(esClient) }
).runActionAsyncWithRetry();
).runActionAsyncTask();
}
}

View file

@ -88,6 +88,6 @@ export async function bulkRequestDiagnostics(
spaceId: currentSpaceId,
},
{ pitId: await openPointInTime(esClient) }
).runActionAsyncWithRetry();
).runActionAsyncTask();
}
}

View file

@ -116,7 +116,7 @@ export async function unenrollAgents(
total: res.total,
},
{ pitId: await openPointInTime(esClient) }
).runActionAsyncWithRetry();
).runActionAsyncTask();
}
}

View file

@ -44,7 +44,7 @@ const mockRunAsync = jest.fn().mockResolvedValue({});
jest.mock('./update_agent_tags_action_runner', () => ({
...jest.requireActual('./update_agent_tags_action_runner'),
UpdateAgentTagsActionRunner: jest.fn().mockImplementation(() => {
return { runActionAsyncWithRetry: mockRunAsync };
return { runActionAsyncWithRetry: mockRunAsync, runActionAsyncTask: mockRunAsync };
}),
}));

View file

@ -80,7 +80,7 @@ export async function updateAgentTags(
total: res.total,
},
{ pitId }
).runActionAsyncWithRetry();
).runActionAsyncTask();
}
return await updateTagsBatch(soClient, esClient, givenAgents, outgoingErrors, {

View file

@ -118,7 +118,7 @@ export async function sendUpgradeAgentsActions(
spaceId: currentSpaceId,
},
{ pitId: await openPointInTime(esClient) }
).runActionAsyncWithRetry();
).runActionAsyncTask();
}
}

View file

@ -108,7 +108,7 @@ export default function (providerContext: FtrProviderContext) {
await new Promise((resolve, reject) => {
let attempts = 0;
const intervalId = setInterval(async () => {
if (attempts > 2) {
if (attempts > 5) {
clearInterval(intervalId);
reject(new Error('action timed out'));
}

View file

@ -277,7 +277,7 @@ export default function (providerContext: FtrProviderContext) {
await new Promise((resolve, reject) => {
let attempts = 0;
const intervalId = setInterval(async () => {
if (attempts > 3) {
if (attempts > 10) {
clearInterval(intervalId);
reject(new Error('action timed out'));
}
@ -285,7 +285,6 @@ export default function (providerContext: FtrProviderContext) {
const {
body: { items: actionStatuses },
} = await supertest.get(`/api/fleet/agents/action_status`).set('kbn-xsrf', 'xxx');
const action = actionStatuses?.find((a: any) => a.actionId === actionId);
if (action && action.nbAgentsActioned === action.nbAgentsActionCreated) {
clearInterval(intervalId);

View file

@ -8,6 +8,42 @@
import expect from '@kbn/expect';
import { FtrProviderContext } from '../../../api_integration/ftr_provider_context';
function getBaseUrl(spaceId?: string) {
return spaceId ? `/s/${spaceId}` : '';
}
export async function pollResult(
supertestAgent: any,
actionId: string,
nbAgentsAck: number,
verifyActionResult: Function,
spaceId?: string
) {
await new Promise((resolve, reject) => {
let attempts = 0;
const intervalId = setInterval(async () => {
if (attempts > 4) {
clearInterval(intervalId);
reject(new Error('action timed out'));
}
++attempts;
const {
body: { items: actionStatuses },
} = await supertestAgent
.get(`${getBaseUrl(spaceId)}/api/fleet/agents/action_status`)
.set('kbn-xsrf', 'xxx');
const action = actionStatuses.find((a: any) => a.actionId === actionId);
if (action && action.nbAgentsAck === nbAgentsAck) {
clearInterval(intervalId);
await verifyActionResult();
resolve({});
}
}, 1000);
}).catch((e) => {
throw e;
});
}
export default function (providerContext: FtrProviderContext) {
const { getService } = providerContext;
const esArchiver = getService('esArchiver');
@ -66,34 +102,6 @@ export default function (providerContext: FtrProviderContext) {
expect(agent2data.body.item.tags).to.eql(['existingTag']);
});
async function pollResult(
actionId: string,
nbAgentsAck: number,
verifyActionResult: Function
) {
await new Promise((resolve, reject) => {
let attempts = 0;
const intervalId = setInterval(async () => {
if (attempts > 4) {
clearInterval(intervalId);
reject(new Error('action timed out'));
}
++attempts;
const {
body: { items: actionStatuses },
} = await supertest.get(`/api/fleet/agents/action_status`).set('kbn-xsrf', 'xxx');
const action = actionStatuses.find((a: any) => a.actionId === actionId);
if (action && action.nbAgentsAck === nbAgentsAck) {
clearInterval(intervalId);
await verifyActionResult();
resolve({});
}
}, 1000);
}).catch((e) => {
throw e;
});
}
it('should bulk update tags of multiple agents by kuery - add', async () => {
const { body: actionBody } = await supertest
.post(`/api/fleet/agents/bulk_update_agent_tags`)
@ -114,7 +122,7 @@ export default function (providerContext: FtrProviderContext) {
expect(body.total).to.eql(4);
};
await pollResult(actionId, 4, verifyActionResult);
await pollResult(supertest, actionId, 4, verifyActionResult);
});
it('should bulk update tags of multiple agents by kuery - remove', async () => {
@ -137,7 +145,7 @@ export default function (providerContext: FtrProviderContext) {
expect(body.total).to.eql(0);
};
await pollResult(actionId, 2, verifyActionResult);
await pollResult(supertest, actionId, 2, verifyActionResult);
});
it('should return 200 also if the kuery is valid', async () => {

View file

@ -1059,7 +1059,7 @@ export default function (providerContext: FtrProviderContext) {
await new Promise((resolve, reject) => {
let attempts = 0;
const intervalId = setInterval(async () => {
if (attempts > 4) {
if (attempts > 10) {
clearInterval(intervalId);
reject(new Error('action timed out'));
}

View file

@ -21,6 +21,7 @@ import {
createFleetAgent,
makeAgentsUpgradeable,
} from './helpers';
import { pollResult } from '../agents/update_agent_tags';
export default function (providerContext: FtrProviderContext) {
const { getService } = providerContext;
@ -299,7 +300,7 @@ export default function (providerContext: FtrProviderContext) {
await verifyNoAgentActions(TEST_SPACE_1);
// Add tag
await apiClient.bulkUpdateAgentTags(
let { actionId } = await apiClient.bulkUpdateAgentTags(
{
agents: '*',
tagsToAdd: ['space1'],
@ -307,43 +308,54 @@ export default function (providerContext: FtrProviderContext) {
TEST_SPACE_1
);
await verifyAgentsTags({
[defaultSpaceAgent1]: ['tag1'],
[defaultSpaceAgent2]: ['tag1'],
});
await verifyAgentsTags(
{
[testSpaceAgent1]: ['tag1', 'space1'],
[testSpaceAgent2]: ['tag1', 'space1'],
[testSpaceAgent3]: ['tag1', 'space1'],
},
TEST_SPACE_1
);
let verifyActionResult = async () => {
await verifyAgentsTags(
{
[testSpaceAgent1]: ['tag1', 'space1'],
[testSpaceAgent2]: ['tag1', 'space1'],
[testSpaceAgent3]: ['tag1', 'space1'],
},
TEST_SPACE_1
);
};
await pollResult(supertest, actionId, 3, verifyActionResult, TEST_SPACE_1);
await verifyNoAgentActions();
let actionStatus = await apiClient.getActionStatus(TEST_SPACE_1);
expect(actionStatus.items.length).to.eql(1);
await verifyAgentsTags({
[defaultSpaceAgent1]: ['tag1'],
[defaultSpaceAgent2]: ['tag1'],
});
// Remove tag
await apiClient.bulkUpdateAgentTags(
({ actionId } = await apiClient.bulkUpdateAgentTags(
{
agents: '',
tagsToRemove: ['space1'],
},
TEST_SPACE_1
);
));
verifyActionResult = async () => {
await verifyAgentsTags(
{
[testSpaceAgent1]: ['tag1'],
[testSpaceAgent2]: ['tag1'],
[testSpaceAgent3]: ['tag1'],
},
TEST_SPACE_1
);
};
await pollResult(supertest, actionId, 3, verifyActionResult, TEST_SPACE_1);
await verifyAgentsTags({
[defaultSpaceAgent1]: ['tag1'],
[defaultSpaceAgent2]: ['tag1'],
});
await verifyAgentsTags(
{
[testSpaceAgent1]: ['tag1'],
[testSpaceAgent2]: ['tag1'],
[testSpaceAgent3]: ['tag1'],
},
TEST_SPACE_1
);
await verifyNoAgentActions();
actionStatus = await apiClient.getActionStatus(TEST_SPACE_1);
expect(actionStatus.items.length).to.eql(2);