[Fleet] fix issue that not all agents went back to Healthy after Cancel Upgrade (#150047)

## Summary

Related to https://github.com/elastic/kibana/issues/148709

Using info log as can't set log level in QA cluster that the perf CI job
uses.

Fixed a bug where the cancel didn't effect an ongoing upgrade action,
and this resulted in agents being left in `Updating` state.

Steps to reproduce:
- enroll 100k agents with horde 
- start upgrade of 100k agents
- click `Cancel` shortly after (see in the logs that upgrade batches are
still ongoing e.g. 40k done)
- the 40k agents go back to `Healthy` state, the remaining 60k continue
with the upgrade even though the action is cancelled.
- expected: the upgrade action shouldn't continue if the action was
cancelled, so all 100k agents should go back to `Healthy`.
- When choosing a rollout period (e.g. 1h, 2h), a small number of agents
will be upgraded quickly (e.g. 100), the rest should stop upgrading.

Tested locally:
- upgrade 461 healthy agents to 8.2.3 with 1h rollout period and 50
batch size
```
POST kbn:/api/fleet/agents/bulk_upgrade
{"version":"8.2.3","agents":"agent.version: \"8.2.2\"","rollout_duration_seconds":3600,"start_time":"2023-02-02T09:03:56.330Z", "batchSize": 50}
```
- cancel immediately (copy actionId from previous response)
```
POST kbn:/api/fleet/agents/actions/7bd779c2-4688-441d-a701-c2e6fb54bbe0/cancel
```
- check the logs that upgrade batches after cancel are skipped, and
agents set to updating are reverted back to healthy
- a few agents might take a little longer to go back to `Healthy`, these
are the ones already started the upgrade on the agents.

```
[2023-02-02T11:23:27.472+01:00][INFO ][plugins.fleet] Running action asynchronously, actionId: 7bd779c2-4688-441d-a701-c2e6fb54bbe0, total agents:461
[2023-02-02T11:23:27.601+01:00][INFO ][plugins.fleet] Continuing batch of actionId:7bd779c2-4688-441d-a701-c2e6fb54bbe0 of 50 agents of upgrade
[2023-02-02T11:23:28.564+01:00][INFO ][plugins.fleet] Continuing batch of actionId:7bd779c2-4688-441d-a701-c2e6fb54bbe0 of 50 agents of upgrade
[2023-02-02T11:23:29.648+01:00][INFO ][plugins.fleet] Continuing batch of actionId:7bd779c2-4688-441d-a701-c2e6fb54bbe0 of 50 agents of upgrade
[2023-02-02T11:23:30.766+01:00][INFO ][plugins.fleet] Continuing batch of actionId:7bd779c2-4688-441d-a701-c2e6fb54bbe0 of 50 agents of upgrade
[2023-02-02T11:23:32.791+01:00][INFO ][plugins.fleet] Continuing batch of actionId:7bd779c2-4688-441d-a701-c2e6fb54bbe0 of 50 agents of upgrade
[2023-02-02T11:23:33.809+01:00][INFO ][plugins.fleet] Continuing batch of actionId:7bd779c2-4688-441d-a701-c2e6fb54bbe0 of 50 agents of upgrade
[2023-02-02T11:23:34.868+01:00][INFO ][plugins.fleet] Continuing batch of actionId:7bd779c2-4688-441d-a701-c2e6fb54bbe0 of 50 agents of upgrade

[2023-02-02T11:23:37.073+01:00][INFO ][plugins.fleet] Skipping batch of actionId:7bd779c2-4688-441d-a701-c2e6fb54bbe0 of 50 agents as the upgrade was cancelled
[2023-02-02T11:23:37.197+01:00][INFO ][plugins.fleet] Skipping batch of actionId:7bd779c2-4688-441d-a701-c2e6fb54bbe0 of 50 agents as the upgrade was cancelled
[2023-02-02T11:23:37.349+01:00][INFO ][plugins.fleet] Skipping batch of actionId:7bd779c2-4688-441d-a701-c2e6fb54bbe0 of 11 agents as the upgrade was cancelled

[2023-02-02T11:23:37.434+01:00][INFO ][plugins.fleet] processed 461 agents, took 9866ms

[2023-02-02T11:23:40.977+01:00][INFO ][plugins.fleet] Moving back 50 agents from updating to healthy state after cancel upgrade
[2023-02-02T11:23:41.239+01:00][INFO ][plugins.fleet] Moving back 50 agents from updating to healthy state after cancel upgrade
[2023-02-02T11:23:41.411+01:00][INFO ][plugins.fleet] Moving back 50 agents from updating to healthy state after cancel upgrade
[2023-02-02T11:23:42.324+01:00][INFO ][plugins.fleet] Moving back 50 agents from updating to healthy state after cancel upgrade
[2023-02-02T11:23:42.615+01:00][INFO ][plugins.fleet] Moving back 50 agents from updating to healthy state after cancel upgrade
[2023-02-02T11:23:42.791+01:00][INFO ][plugins.fleet] Moving back 50 agents from updating to healthy state after cancel upgrade
```

<img width="1265" alt="image"
src="https://user-images.githubusercontent.com/90178898/216298731-6689f12b-ac0a-41e1-9495-1f40ded1ad53.png">
<img width="1244" alt="image"
src="https://user-images.githubusercontent.com/90178898/216299584-877b0d79-8b82-4e48-8bee-937f3971a58c.png">

In action status we can see that out of 461, 350 agents received the
action, 111 were not started.
What I don't understand is why are those 350 agents still upgraded,
while if I test with a big batch size, only 1 agent out of 350 is
upgraded.

EDIT: I think this happened because I used an old `start_time` in my API
call, so start_time + rollout seconds have passed, and this resulted in
all agents immediately upgrading. When I did another test with a current
`start_time`, the agents went back to previous version as expected.

```
GET kbn:/api/fleet/agents/action_status
    {
      "actionId": "7bd779c2-4688-441d-a701-c2e6fb54bbe0",
      "nbAgentsActionCreated": 350,
      "nbAgentsAck": 350,
      "version": "8.2.3",
      "startTime": "2023-02-02T09:03:56.330Z",
      "type": "UPGRADE",
      "nbAgentsActioned": 461,
      "status": "CANCELLED",
      "expiration": "2023-02-02T11:03:56.330Z",
      "creationTime": "2023-02-02T10:23:35.708Z",
      "nbAgentsFailed": 0,
      "cancellationTime": "2023-02-02T10:23:39.960Z",
      "completionTime": "2023-02-02T10:23:47.000Z"
    },
```

<img width="1245" alt="image"
src="https://user-images.githubusercontent.com/90178898/216300585-602088f2-4ec7-47e2-91f2-f3840ec5fafb.png">

---------

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Julia Bardi 2023-02-06 15:57:37 +01:00 committed by GitHub
parent ee1985978f
commit 8968b74762
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 151 additions and 48 deletions

View file

@ -23,7 +23,7 @@ export async function getActionStatuses(
options: ListWithKuery
): Promise<ActionStatus[]> {
const actions = await _getActions(esClient, options);
const cancelledActions = await _getCancelledActions(esClient);
const cancelledActions = await getCancelledActions(esClient);
let acks: any;
try {
@ -135,7 +135,7 @@ export async function getActionStatuses(
return results;
}
async function _getCancelledActions(
export async function getCancelledActions(
esClient: ElasticsearchClient
): Promise<Array<{ actionId: string; timestamp?: string }>> {
const res = await esClient.search<FleetServerAgentAction>({
@ -144,7 +144,7 @@ async function _getCancelledActions(
size: SO_SEARCH_LIMIT,
query: {
bool: {
must: [
filter: [
{
term: {
type: 'CANCEL',

View file

@ -7,6 +7,9 @@
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { createAppContextStartContractMock } from '../../mocks';
import { appContextService } from '../app_context';
import { cancelAgentAction } from './actions';
import { bulkUpdateAgents } from './crud';
@ -15,6 +18,13 @@ jest.mock('./crud');
const mockedBulkUpdateAgents = bulkUpdateAgents as jest.Mock;
describe('Agent actions', () => {
beforeEach(async () => {
appContextService.start(createAppContextStartContractMock());
});
afterEach(() => {
appContextService.stop();
});
describe('cancelAgentAction', () => {
it('throw if the target action is not found', async () => {
const esClient = elasticsearchServiceMock.createInternalClient();
@ -28,7 +38,7 @@ describe('Agent actions', () => {
);
});
it('should create one CANCEL action for each action found', async () => {
it('should create one CANCEL action for each UPGRADE action found', async () => {
const esClient = elasticsearchServiceMock.createInternalClient();
esClient.search.mockResolvedValue({
hits: {
@ -38,6 +48,7 @@ describe('Agent actions', () => {
action_id: 'action1',
agents: ['agent1', 'agent2'],
expiration: '2022-05-12T18:16:18.019Z',
type: 'UPGRADE',
},
},
{
@ -45,6 +56,7 @@ describe('Agent actions', () => {
action_id: 'action1',
agents: ['agent3', 'agent4'],
expiration: '2022-05-12T18:16:18.019Z',
type: 'UPGRADE',
},
},
],

View file

@ -234,61 +234,110 @@ export async function getUnenrollAgentActions(
}
export async function cancelAgentAction(esClient: ElasticsearchClient, actionId: string) {
const res = await esClient.search<FleetServerAgentAction>({
index: AGENT_ACTIONS_INDEX,
query: {
bool: {
must: [
{
term: {
action_id: actionId,
const getUpgradeActions = async () => {
const res = await esClient.search<FleetServerAgentAction>({
index: AGENT_ACTIONS_INDEX,
query: {
bool: {
filter: [
{
term: {
action_id: actionId,
},
},
},
],
],
},
},
},
size: SO_SEARCH_LIMIT,
});
size: SO_SEARCH_LIMIT,
});
if (res.hits.hits.length === 0) {
throw new AgentActionNotFoundError('Action not found');
}
if (res.hits.hits.length === 0) {
throw new AgentActionNotFoundError('Action not found');
}
const upgradeActions: FleetServerAgentAction[] = res.hits.hits
.map((hit) => hit._source as FleetServerAgentAction)
.filter(
(action: FleetServerAgentAction | undefined): boolean =>
!!action && !!action.agents && !!action.action_id && action.type === 'UPGRADE'
);
return upgradeActions;
};
const cancelActionId = uuidv4();
const now = new Date().toISOString();
for (const hit of res.hits.hits) {
if (!hit._source || !hit._source.agents || !hit._source.action_id) {
continue;
}
if (hit._source.type === 'UPGRADE') {
const errors = {};
await bulkUpdateAgents(
esClient,
hit._source.agents.map((agentId) => ({
agentId,
data: {
upgraded_at: null,
upgrade_started_at: null,
},
})),
errors
);
if (Object.keys(errors).length > 0) {
appContextService
.getLogger()
.debug(`Errors while bulk updating agents for cancel action: ${JSON.stringify(errors)}`);
}
}
const cancelledActions: Array<{ agents: string[] }> = [];
const createAction = async (action: FleetServerAgentAction) => {
await createAgentAction(esClient, {
id: cancelActionId,
type: 'CANCEL',
agents: hit._source.agents,
agents: action.agents!,
data: {
target_id: hit._source.action_id,
target_id: action.action_id,
},
created_at: now,
expiration: hit._source.expiration,
expiration: action.expiration,
});
cancelledActions.push({
agents: action.agents!,
});
};
let upgradeActions = await getUpgradeActions();
for (const action of upgradeActions) {
await createAction(action);
}
const updateAgentsToHealthy = async (action: FleetServerAgentAction) => {
appContextService
.getLogger()
.info(
`Moving back ${
action.agents!.length
} agents from updating to healthy state after cancel upgrade`
);
const errors = {};
await bulkUpdateAgents(
esClient,
action.agents!.map((agentId: string) => ({
agentId,
data: {
upgraded_at: null,
upgrade_started_at: null,
},
})),
errors
);
if (Object.keys(errors).length > 0) {
appContextService
.getLogger()
.info(`Errors while bulk updating agents for cancel action: ${JSON.stringify(errors)}`);
}
};
for (const action of upgradeActions) {
await updateAgentsToHealthy(action);
}
// At the end of cancel, doing one more query on upgrade action to find those docs that were possibly created by a concurrent upgrade action.
// This is to make sure we cancel all upgrade batches.
upgradeActions = await getUpgradeActions();
if (cancelledActions.length < upgradeActions.length) {
const missingBatches = upgradeActions.filter(
(upgradeAction) =>
!cancelledActions.some(
(cancelled) => upgradeAction.agents && cancelled.agents[0] === upgradeAction.agents[0]
)
);
appContextService.getLogger().debug(`missing batches to cancel: ${missingBatches.length}`);
if (missingBatches.length > 0) {
for (const missingBatch of missingBatches) {
await createAction(missingBatch);
await updateAgentsToHealthy(missingBatch);
}
}
}
return {

View file

@ -8,12 +8,22 @@
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { appContextService } from '../app_context';
import type { Agent } from '../../types';
import { createAppContextStartContractMock } from '../../mocks';
import { sendUpgradeAgentsActions } from './upgrade';
import { createClientMock } from './action.mock';
import { getRollingUpgradeOptions } from './upgrade_action_runner';
import { getRollingUpgradeOptions, upgradeBatch } from './upgrade_action_runner';
jest.mock('./action_status', () => {
return {
getCancelledActions: jest.fn().mockResolvedValue([
{
actionId: 'cancelled-action',
},
]),
};
});
describe('sendUpgradeAgentsActions (plural)', () => {
beforeEach(async () => {
@ -102,6 +112,14 @@ describe('sendUpgradeAgentsActions (plural)', () => {
expect(doc.upgraded_at).toEqual(null);
}
});
it('skip upgrade if action id is cancelled', async () => {
const { soClient, esClient, agentInRegularDoc } = createClientMock();
const agents = [{ id: agentInRegularDoc._id } as Agent];
await upgradeBatch(soClient, esClient, agents, {}, {
actionId: 'cancelled-action',
} as any);
});
});
describe('getRollingUpgradeOptions', () => {

View file

@ -25,6 +25,7 @@ import { bulkUpdateAgents } from './crud';
import { createErrorActionResults, createAgentAction } from './actions';
import { getHostedPolicies, isHostedAgent } from './hosted_agent';
import { BulkActionTaskType } from './bulk_action_types';
import { getCancelledActions } from './action_status';
export class UpgradeActionRunner extends ActionRunner {
protected async processAgents(agents: Agent[]): Promise<{ actionId: string }> {
@ -40,6 +41,11 @@ export class UpgradeActionRunner extends ActionRunner {
}
}
const isActionIdCancelled = async (esClient: ElasticsearchClient, actionId: string) => {
const cancelledActions = await getCancelledActions(esClient);
return cancelledActions.filter((action) => action.actionId === actionId).length > 0;
};
export async function upgradeBatch(
soClient: SavedObjectsClientContract,
esClient: ElasticsearchClient,
@ -108,6 +114,24 @@ export async function upgradeBatch(
options.upgradeDurationSeconds
);
if (options.actionId && (await isActionIdCancelled(esClient, options.actionId))) {
appContextService
.getLogger()
.info(
`Skipping batch of actionId:${options.actionId} of ${givenAgents.length} agents as the upgrade was cancelled`
);
return {
actionId: options.actionId,
};
}
if (options.actionId) {
appContextService
.getLogger()
.info(
`Continuing batch of actionId:${options.actionId} of ${givenAgents.length} agents of upgrade`
);
}
await bulkUpdateAgents(
esClient,
agentsToUpdate.map((agent) => ({