[Fleet] Add retry logic to automatic agent upgrades (#212744)

## Summary

Relates https://github.com/elastic/ingest-dev/issues/4720

This PR adds retry logic to the task that handles automatic agent
upgrades originally implemented in
https://github.com/elastic/kibana/pull/211019.

Complementary fleet-server change which sets the agent's
`upgrade_attempts` to `null` once the upgrade is complete.:
https://github.com/elastic/fleet-server/pull/4528

### Approach

- A new `upgrade_attempts` property is added to agents and stored in the
agent doc (ES mapping update in
https://github.com/elastic/elasticsearch/pull/123256).
- When a bulk upgrade action is sent from the automatic upgrade task, it
pushes the timestamp of the upgrade to the affected agents'
`upgrade_attempts`.
- The default retry delays are `['30m', '1h', '2h', '4h', '8h', '16h',
'24h']` and can be overridden with the new
`xpack.fleet.autoUpgrades.retryDelays` setting.
- On every run, the automatic upgrade task will first process retries
and then query more agents if necessary (cf.
https://github.com/elastic/ingest-dev/issues/4720#issuecomment-2671660795).
- Once an agent has completed and failed the max retries defined by the
retry delays array, it is no longer retried.

### Testing

The ES query for fetching agents with existing `upgrade_attempts` needs
the updated mappings, so it might be necessary to pull the latest `main`
in the `elasticsearch` repo and run `yarn es source` instead of `yarn es
snapshot` (requires an up-to-date Java environment, currently 23).

In order to test that `upgrade_attempts` is set to `null` when the
upgrade is complete, fleet-server should be run in dev using the change
in https://github.com/elastic/fleet-server/pull/4528.

### Checklist

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [x] The PR description includes the appropriate Release Notes section,
and the correct `release_note:*` label is applied per the
[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)

### Identify risks

Low probability risk of incorrectly triggering agent upgrades. This
feature is currently behind the `enableAutomaticAgentUpgrades` feature
flag.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Julia Bardi <90178898+juliaElastic@users.noreply.github.com>
Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Jill Guyonnet 2025-03-06 21:31:24 +01:00 committed by GitHub
parent 1531849d6f
commit bdbc2ef43f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 352 additions and 96 deletions

View file

@ -17675,6 +17675,13 @@
"nullable": true,
"type": "array"
},
"upgrade_attempts": {
"items": {
"type": "string"
},
"nullable": true,
"type": "array"
},
"upgrade_details": {
"additionalProperties": false,
"nullable": true,
@ -19722,6 +19729,13 @@
"nullable": true,
"type": "array"
},
"upgrade_attempts": {
"items": {
"type": "string"
},
"nullable": true,
"type": "array"
},
"upgrade_details": {
"additionalProperties": false,
"nullable": true,
@ -20206,6 +20220,13 @@
"nullable": true,
"type": "array"
},
"upgrade_attempts": {
"items": {
"type": "string"
},
"nullable": true,
"type": "array"
},
"upgrade_details": {
"additionalProperties": false,
"nullable": true,

View file

@ -17675,6 +17675,13 @@
"nullable": true,
"type": "array"
},
"upgrade_attempts": {
"items": {
"type": "string"
},
"nullable": true,
"type": "array"
},
"upgrade_details": {
"additionalProperties": false,
"nullable": true,
@ -19722,6 +19729,13 @@
"nullable": true,
"type": "array"
},
"upgrade_attempts": {
"items": {
"type": "string"
},
"nullable": true,
"type": "array"
},
"upgrade_details": {
"additionalProperties": false,
"nullable": true,
@ -20206,6 +20220,13 @@
"nullable": true,
"type": "array"
},
"upgrade_attempts": {
"items": {
"type": "string"
},
"nullable": true,
"type": "array"
},
"upgrade_details": {
"additionalProperties": false,
"nullable": true,

View file

@ -18992,6 +18992,11 @@ paths:
type: string
nullable: true
type: array
upgrade_attempts:
items:
type: string
nullable: true
type: array
upgrade_details:
additionalProperties: false
nullable: true
@ -19447,6 +19452,11 @@ paths:
type: string
nullable: true
type: array
upgrade_attempts:
items:
type: string
nullable: true
type: array
upgrade_details:
additionalProperties: false
nullable: true
@ -19790,6 +19800,11 @@ paths:
type: string
nullable: true
type: array
upgrade_attempts:
items:
type: string
nullable: true
type: array
upgrade_details:
additionalProperties: false
nullable: true

View file

@ -21113,6 +21113,11 @@ paths:
type: string
nullable: true
type: array
upgrade_attempts:
items:
type: string
nullable: true
type: array
upgrade_details:
additionalProperties: false
nullable: true
@ -21565,6 +21570,11 @@ paths:
type: string
nullable: true
type: array
upgrade_attempts:
items:
type: string
nullable: true
type: array
upgrade_details:
additionalProperties: false
nullable: true
@ -21907,6 +21917,11 @@ paths:
type: string
nullable: true
type: array
upgrade_attempts:
items:
type: string
nullable: true
type: array
upgrade_details:
additionalProperties: false
nullable: true

View file

@ -58,3 +58,5 @@ export const FLEET_ENROLLMENT_API_PREFIX = 'fleet-enrollment-api-keys';
export const REQUEST_DIAGNOSTICS_TIMEOUT_MS = 3 * 60 * 60 * 1000; // 3 hours;
export * from './mappings';
export const AUTO_UPGRADE_DEFAULT_RETRIES = ['30m', '1h', '2h', '4h', '8h', '16h', '24h'];

View file

@ -363,6 +363,9 @@ export const AGENT_MAPPINGS = {
},
},
},
upgrade_attempts: {
type: 'date',
},
// added to allow validation on status field
status: {
type: 'keyword',

View file

@ -84,6 +84,9 @@ export interface FleetConfigType {
};
};
createArtifactsBulkBatchSize?: number;
autoUpgrades?: {
retryDelays?: string[];
};
}
// Calling Object.entries(PackagesGroupedByStatus) gave `status: string`

View file

@ -99,6 +99,7 @@ interface AgentBase {
upgraded_at?: string | null;
upgrade_started_at?: string | null;
upgrade_details?: AgentUpgradeDetails;
upgrade_attempts?: string[] | null;
access_api_key_id?: string;
default_api_key?: string;
default_api_key_id?: string;
@ -275,6 +276,10 @@ export interface FleetServerAgent {
* Upgrade state of the Elastic Agent
*/
upgrade_details?: AgentUpgradeDetails;
/**
* List of timestamps of attempts of Elastic Agent automatic upgrades
*/
upgrade_attempts?: string[] | null;
access_api_key_id?: string;
agent?: FleetServerAgentMetadata;
/**

View file

@ -174,7 +174,7 @@ describe('SearchBar', () => {
describe('getFieldSpecs', () => {
it('returns fieldSpecs for Fleet agents', () => {
expect(getFieldSpecs(AGENTS_INDEX, AGENTS_PREFIX)).toHaveLength(73);
expect(getFieldSpecs(AGENTS_INDEX, AGENTS_PREFIX)).toHaveLength(74);
});
it('returns fieldSpecs for Fleet enrollment tokens', () => {

View file

@ -282,6 +282,11 @@ export const config: PluginConfigDescriptor = {
min: 400,
})
),
autoUpgrades: schema.maybe(
schema.object({
retryDelays: schema.maybe(schema.arrayOf(schema.string())),
})
),
},
{
validate: (configToValidate) => {

View file

@ -69,6 +69,7 @@ export function searchHitToAgent(
upgraded_at: hit._source?.upgraded_at,
upgrade_started_at: hit._source?.upgrade_started_at,
upgrade_details: hit._source?.upgrade_details,
upgrade_attempts: hit._source?.upgrade_attempts,
access_api_key_id: hit._source?.access_api_key_id,
default_api_key_id: hit._source?.default_api_key_id,
policy_id: hit._source?.policy_id,

View file

@ -125,3 +125,22 @@ export async function sendUpgradeAgentsActions(
return await upgradeBatch(esClient, givenAgents, outgoingErrors, options, currentSpaceId);
}
export async function sendAutomaticUpgradeAgentsActions(
soClient: SavedObjectsClientContract,
esClient: ElasticsearchClient,
options: {
agents: Agent[];
version: string;
upgradeDurationSeconds?: number;
}
): Promise<{ actionId: string }> {
const currentSpaceId = getCurrentNamespace(soClient);
return await upgradeBatch(
esClient,
options.agents,
{},
{ ...options, isAutomatic: true },
currentSpaceId
);
}

View file

@ -9,11 +9,13 @@ import type { ElasticsearchClient } from '@kbn/core/server';
import { v4 as uuidv4 } from 'uuid';
import moment from 'moment';
import semverGte from 'semver/functions/gte';
import {
getRecentUpgradeInfoForAgent,
getNotUpgradeableMessage,
isAgentUpgradeableToVersion,
AGENT_UPGARDE_DETAILS_SUPPORTED_VERSION,
} from '../../../common/services';
import type { Agent } from '../../types';
@ -168,6 +170,10 @@ export async function upgradeBatch(
data: {
upgraded_at: null,
upgrade_started_at: now,
...(options.isAutomatic &&
semverGte(agent.agent?.version ?? '0.0.0', AGENT_UPGARDE_DETAILS_SUPPORTED_VERSION)
? { upgrade_attempts: [now, ...(agent.upgrade_attempts ?? [])] }
: {}),
},
})),
errors

View file

@ -17,7 +17,7 @@ import { agentPolicyService, appContextService } from '../services';
import {
fetchAllAgentsByKuery,
getAgentsByKuery,
sendUpgradeAgentsActions,
sendAutomaticUpgradeAgentsActions,
} from '../services/agents';
import { isAgentUpgradeable } from '../../common/services';
import type { Agent, AgentPolicy } from '../types';
@ -47,9 +47,10 @@ const mockedFetchAllAgentsByKuery = fetchAllAgentsByKuery as jest.MockedFunction
typeof fetchAllAgentsByKuery
>;
const mockedGetAgentsByKuery = getAgentsByKuery as jest.MockedFunction<typeof getAgentsByKuery>;
const mockedSendUpgradeAgentsActions = sendUpgradeAgentsActions as jest.MockedFunction<
typeof sendUpgradeAgentsActions
>;
const mockedSendAutomaticUpgradeAgentsActions =
sendAutomaticUpgradeAgentsActions as jest.MockedFunction<
typeof sendAutomaticUpgradeAgentsActions
>;
const mockedIsAgentUpgradeable = isAgentUpgradeable as jest.MockedFunction<
typeof isAgentUpgradeable
>;
@ -79,8 +80,8 @@ const mockDefaultAgentPolicy = () => {
const generateAgents = (
nAgents: number,
agentPolicyId: string,
version: string,
agentPolicyId: string = 'agent-policy-1',
version: string = '8.15.0',
status: string = 'online'
) => {
return [
@ -150,12 +151,13 @@ describe('AutomaticAgentUpgradeTask', () => {
jest
.spyOn(appContextService, 'getExperimentalFeatures')
.mockReturnValue({ enableAutomaticAgentUpgrades: true } as any);
mockDefaultAgentPolicy();
mockedIsAgentUpgradeable.mockReturnValue(true);
mockedSendUpgradeAgentsActions.mockResolvedValue({ actionId: 'action-1' });
mockedSendAutomaticUpgradeAgentsActions.mockResolvedValue({ actionId: 'action-1' });
});
afterEach(() => {
jest.clearAllMocks();
jest.resetAllMocks();
});
it('Should not run if task is outdated', async () => {
@ -176,116 +178,106 @@ describe('AutomaticAgentUpgradeTask', () => {
});
it('Should upgrade eligible agents', async () => {
mockDefaultAgentPolicy();
const agents = generateAgents(10, 'agent-policy-1', '8.15.0');
const agents = generateAgents(10);
mockedGetAgentsByKuery
.mockResolvedValueOnce({ total: agents.length } as any) // active agents
.mockResolvedValueOnce({ total: 0 } as any); // agents on or updating to target version
mockedFetchAllAgentsByKuery.mockResolvedValue(getMockFetchAllAgentsByKuery(agents));
mockedFetchAllAgentsByKuery
.mockResolvedValueOnce(getMockFetchAllAgentsByKuery([])) // agents marked for retry
.mockResolvedValueOnce(getMockFetchAllAgentsByKuery(agents)); // active agents
await runTask();
expect(mockedSendUpgradeAgentsActions).toHaveBeenCalledWith(
expect(mockedSendAutomaticUpgradeAgentsActions).toHaveBeenCalledWith(
expect.anything(),
expect.anything(),
{
agents: agents.slice(0, 3),
version: '8.18.0',
isAutomatic: true,
}
);
});
it('Should take agents already on target version into account', async () => {
mockDefaultAgentPolicy();
const agents = [
...generateAgents(10, 'agent-policy-1', '8.15.0'),
{
id: 'agent-11',
policy_id: 'agent-policy-1',
status: 'online',
agent: { version: '8.18.0' },
},
] as Agent[];
...generateAgents(10),
...generateAgents(1, 'agent-policy-1', '8.18.0', 'online'),
];
mockedGetAgentsByKuery
.mockResolvedValueOnce({ total: agents.length } as any) // active agents
.mockResolvedValueOnce({ total: 1 } as any); // agents on or updating to target version
mockedFetchAllAgentsByKuery.mockResolvedValue(getMockFetchAllAgentsByKuery(agents));
mockedFetchAllAgentsByKuery
.mockResolvedValueOnce(getMockFetchAllAgentsByKuery([])) // agents marked for retry
.mockResolvedValue(getMockFetchAllAgentsByKuery(agents)); // active agents
await runTask();
expect(mockedSendUpgradeAgentsActions).toHaveBeenCalledWith(
expect(mockedSendAutomaticUpgradeAgentsActions).toHaveBeenCalledWith(
expect.anything(),
expect.anything(),
{
agents: agents.slice(0, 2),
version: '8.18.0',
isAutomatic: true,
}
);
});
it('Should take agents already upgrading to target version into account', async () => {
mockDefaultAgentPolicy();
const agents = [
...generateAgents(10, 'agent-policy-1', '8.15.0'),
{
id: 'agent-11',
policy_id: 'agent-policy-1',
status: 'updating',
agent: { version: '8.15.0' },
upgrade_details: { target_version: '8.18.0' },
},
] as Agent[];
...generateAgents(10),
...generateAgents(1, 'agent-policy-1', '8.15.0', 'updating'),
];
mockedGetAgentsByKuery
.mockResolvedValueOnce({ total: agents.length } as any) // active agents
.mockResolvedValueOnce({ total: 1 } as any); // agents on or updating to target version
mockedFetchAllAgentsByKuery.mockResolvedValue(getMockFetchAllAgentsByKuery(agents));
mockedFetchAllAgentsByKuery
.mockResolvedValueOnce(getMockFetchAllAgentsByKuery([])) // agents marked for retry
.mockResolvedValue(getMockFetchAllAgentsByKuery(agents));
await runTask();
expect(mockedSendUpgradeAgentsActions).toHaveBeenCalledWith(
expect(mockedSendAutomaticUpgradeAgentsActions).toHaveBeenCalledWith(
expect.anything(),
expect.anything(),
{
agents: agents.slice(0, 2),
version: '8.18.0',
isAutomatic: true,
}
);
});
it('Should not attempt to upgrade already upgrading agents', async () => {
mockDefaultAgentPolicy();
const agents = generateAgents(10, 'agent-policy-1', '8.15.0', 'updating');
mockedGetAgentsByKuery
.mockResolvedValueOnce({ total: agents.length } as any) // active agents
.mockResolvedValueOnce({ total: agents.length } as any); // agents on or updating to target version
mockedFetchAllAgentsByKuery.mockResolvedValue(getMockFetchAllAgentsByKuery(agents)); // active agents
mockedFetchAllAgentsByKuery
.mockResolvedValueOnce(getMockFetchAllAgentsByKuery([])) // agents marked for retry
.mockResolvedValue(getMockFetchAllAgentsByKuery(agents)); // active agents
await runTask();
expect(mockedSendUpgradeAgentsActions).not.toHaveBeenCalled();
expect(mockedSendAutomaticUpgradeAgentsActions).not.toHaveBeenCalled();
});
it('Should set a rollout duration for upgrade batches bigger than 10 agents', async () => {
mockDefaultAgentPolicy();
const agents = generateAgents(100, 'agent-policy-1', '8.15.0');
const agents = generateAgents(100);
mockedGetAgentsByKuery
.mockResolvedValueOnce({ total: agents.length } as any) // active agents
.mockResolvedValueOnce({ total: 0 } as any); // agents on or updating to target version
mockedFetchAllAgentsByKuery.mockResolvedValue(getMockFetchAllAgentsByKuery(agents));
mockedFetchAllAgentsByKuery
.mockResolvedValueOnce(getMockFetchAllAgentsByKuery([])) // agents marked for retry
.mockResolvedValue(getMockFetchAllAgentsByKuery(agents)); // active agents
await runTask();
expect(mockedSendUpgradeAgentsActions).toHaveBeenCalledWith(
expect(mockedSendAutomaticUpgradeAgentsActions).toHaveBeenCalledWith(
expect.anything(),
expect.anything(),
{
agents: agents.slice(0, 30),
version: '8.18.0',
upgradeDurationSeconds: 600,
isAutomatic: true,
}
);
});
@ -310,21 +302,22 @@ describe('AutomaticAgentUpgradeTask', () => {
})()
);
mockedGetAgentsByKuery
.mockResolvedValueOnce({ total: 0 } as any) // active agents for first policy
.mockResolvedValueOnce({ total: 10 } as any) // active agents
.mockResolvedValueOnce({ total: 0 } as any); // agents on or updating to target version
.mockResolvedValueOnce({ total: 0 } as any) // active agents for first policy batch
.mockResolvedValueOnce({ total: 10 } as any) // active agents for second policy batch
.mockResolvedValueOnce({ total: 0 } as any); // agents on or updating to target version (second policy batch)
const agents = generateAgents(10, 'agent-policy-501', '8.15.0');
mockedFetchAllAgentsByKuery.mockResolvedValue(getMockFetchAllAgentsByKuery(agents));
mockedFetchAllAgentsByKuery
.mockResolvedValueOnce(getMockFetchAllAgentsByKuery([])) // agents marked for retry
.mockResolvedValue(getMockFetchAllAgentsByKuery(agents)); // active agents
await runTask();
expect(mockedSendUpgradeAgentsActions).toHaveBeenCalledWith(
expect(mockedSendAutomaticUpgradeAgentsActions).toHaveBeenCalledWith(
expect.anything(),
expect.anything(),
{
agents: agents.slice(0, 3),
version: '8.18.0',
isAutomatic: true,
}
);
});
@ -338,38 +331,111 @@ describe('AutomaticAgentUpgradeTask', () => {
] as AgentPolicy[];
mockAgentPolicyService.fetchAllAgentPolicies =
getMockAgentPolicyFetchAllAgentPolicies(agentPolicies);
const agents = generateAgents(20, 'agent-policy-1', '8.15.0');
const agents = generateAgents(20);
const firstAgentsBatch = agents.slice(0, 10);
const secondAgentsBatch = agents.slice(10);
mockedGetAgentsByKuery
.mockResolvedValueOnce({ total: agents.length } as any) // active agents
.mockResolvedValueOnce({ total: 0 } as any); // agents on or updating to target version
mockedFetchAllAgentsByKuery.mockResolvedValue(
jest.fn(async function* () {
yield firstAgentsBatch;
yield secondAgentsBatch;
})()
);
mockedFetchAllAgentsByKuery
.mockResolvedValueOnce(getMockFetchAllAgentsByKuery([])) // agents marked for retry
.mockResolvedValueOnce(
jest.fn(async function* () {
yield firstAgentsBatch;
yield secondAgentsBatch;
})()
);
await runTask();
expect(mockedSendUpgradeAgentsActions).toHaveBeenCalledWith(
expect(mockedSendAutomaticUpgradeAgentsActions).toHaveBeenCalledWith(
expect.anything(),
expect.anything(),
{
agents: firstAgentsBatch,
version: '8.18.0',
upgradeDurationSeconds: 600,
isAutomatic: true,
}
);
expect(mockedSendUpgradeAgentsActions).toHaveBeenCalledWith(
expect(mockedSendAutomaticUpgradeAgentsActions).toHaveBeenCalledWith(
expect.anything(),
expect.anything(),
{
agents: secondAgentsBatch.slice(0, 4),
version: '8.18.0',
isAutomatic: true,
}
);
});
it('Should pick up agents in failed upgrade state for retry if they are ready', async () => {
jest
.spyOn(appContextService, 'getConfig')
.mockReturnValue({ autoUpgrades: { retryDelays: ['10m', '20m'] } } as any);
const agentPolicies = [
{
id: 'agent-policy-1',
required_versions: [{ version: '8.18.0', percentage: 100 }],
},
] as AgentPolicy[];
mockAgentPolicyService.fetchAllAgentPolicies =
getMockAgentPolicyFetchAllAgentPolicies(agentPolicies);
const getDate = (minutesAgo: number) => {
return new Date(Date.now() - minutesAgo * 60000).toISOString();
};
const agents = [
{
id: 'agent-1',
policy_id: 'agent-policy-1',
status: 'online',
agent: { version: '8.15.0' },
upgrade_details: {
target_version: '8.18.0',
state: 'UPG_FAILED',
},
upgrade_attempts: [getDate(20)], // should be picked up
},
{
id: 'agent-2',
policy_id: 'agent-policy-1',
status: 'online',
agent: { version: '8.15.0' },
upgrade_details: {
target_version: '8.18.0',
state: 'UPG_FAILED',
},
upgrade_attempts: [getDate(5)], // should NOT be picked up (not ready yet)
},
{
id: 'agent-3',
policy_id: 'agent-policy-1',
status: 'online',
agent: { version: '8.15.0' },
upgrade_details: {
target_version: '8.18.0',
state: 'UPG_FAILED',
},
upgrade_attempts: [getDate(20), getDate(10), getDate(5)], // should NOT be picked up (exceeded max attempts)
},
] as unknown as Agent[];
mockedGetAgentsByKuery
.mockResolvedValueOnce({ total: agents.length } as any) // active agents
.mockResolvedValueOnce({ total: 0 } as any); // agents on or updating to target version
mockedFetchAllAgentsByKuery
.mockResolvedValueOnce(getMockFetchAllAgentsByKuery(agents)) // agents marked for retry
.mockResolvedValue(getMockFetchAllAgentsByKuery([]));
await runTask();
expect(mockedSendAutomaticUpgradeAgentsActions).toHaveBeenCalledWith(
expect.anything(),
expect.anything(),
{
agents: agents.slice(0, 1),
version: '8.18.0',
}
);
});

View file

@ -21,7 +21,9 @@ import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task';
import type { LoggerFactory } from '@kbn/core/server';
import { errors } from '@elastic/elasticsearch';
import semverGt from 'semver/functions/gt';
import moment from 'moment';
import { AUTO_UPGRADE_DEFAULT_RETRIES } from '../../common/constants';
import type {
Agent,
AgentPolicy,
@ -33,7 +35,7 @@ import { agentPolicyService, appContextService } from '../services';
import {
fetchAllAgentsByKuery,
getAgentsByKuery,
sendUpgradeAgentsActions,
sendAutomaticUpgradeAgentsActions,
} from '../services/agents';
import { AGENT_POLICY_SAVED_OBJECT_TYPE } from '../constants';
import { AgentStatusKueryHelper, isAgentUpgradeable } from '../../common/services';
@ -42,7 +44,7 @@ export const TYPE = 'fleet:automatic-agent-upgrade-task';
export const VERSION = '1.0.0';
const TITLE = 'Fleet Automatic agent upgrades';
const SCOPE = ['fleet'];
const INTERVAL = '1h';
const INTERVAL = '30m';
const TIMEOUT = '10m';
const AGENT_POLICIES_BATCHSIZE = 500;
const AGENTS_BATCHSIZE = 10000;
@ -64,6 +66,7 @@ export class AutomaticAgentUpgradeTask {
private logger: Logger;
private wasStarted: boolean = false;
private abortController = new AbortController();
private retryDelays: string[] = [];
constructor(setupContract: AutomaticAgentUpgradeTaskSetupContract) {
const { core, taskManager, logFactory } = setupContract;
@ -141,6 +144,8 @@ export class AutomaticAgentUpgradeTask {
const [coreStart] = await core.getStartServices();
const esClient = coreStart.elasticsearch.client.asInternalUser;
const soClient = new SavedObjectsClient(coreStart.savedObjects.createInternalRepository());
this.retryDelays =
appContextService.getConfig()?.autoUpgrades?.retryDelays ?? AUTO_UPGRADE_DEFAULT_RETRIES;
try {
await this.checkAgentPoliciesForAutomaticUpgrades(esClient, soClient);
@ -160,6 +165,12 @@ export class AutomaticAgentUpgradeTask {
this.logger.info(`[AutomaticAgentUpgradeTask] runTask() ended${msg ? ': ' + msg : ''}`);
}
private throwIfAborted() {
if (this.abortController.signal.aborted) {
throw new Error('Task was aborted');
}
}
private async checkAgentPoliciesForAutomaticUpgrades(
esClient: ElasticsearchClient,
soClient: SavedObjectsClientContract
@ -179,10 +190,7 @@ export class AutomaticAgentUpgradeTask {
return;
}
for (const agentPolicy of agentPolicyPageResults) {
if (this.abortController.signal.aborted) {
throw new Error('Task was aborted');
}
this.throwIfAborted();
await this.checkAgentPolicyForAutomaticUpgrades(esClient, soClient, agentPolicy);
}
}
@ -204,7 +212,7 @@ export class AutomaticAgentUpgradeTask {
const totalActiveAgents = await this.getAgentCount(
esClient,
soClient,
this.getActiveAgentsKuery(agentPolicy)
`policy_id:${agentPolicy.id} AND ${AgentStatusKueryHelper.buildKueryForActiveAgents()}`
);
if (totalActiveAgents === 0) {
this.logger.debug(
@ -237,15 +245,6 @@ export class AutomaticAgentUpgradeTask {
return res.total;
}
private getActiveAgentsKuery(agentPolicy: AgentPolicy) {
return `policy_id:${agentPolicy.id} AND ${AgentStatusKueryHelper.buildKueryForActiveAgents()}`;
}
private getOnOrUpdatingToVersionKuery(agentPolicy: AgentPolicy, version: string) {
const updatingToKuery = `(upgrade_details.target_version:${version} AND NOT upgrade_details.state:UPG_FAILED)`;
return `policy_id:${agentPolicy.id} AND (agent.version:${version} OR ${updatingToKuery})`;
}
private async processRequiredVersion(
esClient: ElasticsearchClient,
soClient: SavedObjectsClientContract,
@ -262,10 +261,11 @@ export class AutomaticAgentUpgradeTask {
(totalActiveAgents * requiredVersion.percentage) / 100
);
// Subtract total number of agents already or on or updating to target version.
const updatingToKuery = `(upgrade_details.target_version:${requiredVersion.version} AND NOT upgrade_details.state:UPG_FAILED)`;
const totalOnOrUpdatingToTargetVersionAgents = await this.getAgentCount(
esClient,
soClient,
this.getOnOrUpdatingToVersionKuery(agentPolicy, requiredVersion.version)
`policy_id:${agentPolicy.id} AND (agent.version:${requiredVersion.version} OR ${updatingToKuery})`
);
numberOfAgentsForUpgrade -= totalOnOrUpdatingToTargetVersionAgents;
// Return if target is already met.
@ -276,21 +276,44 @@ export class AutomaticAgentUpgradeTask {
return;
}
// Fetch all active agents assigned to the policy in batches.
// Handle retries.
const numberOfRetriedAgents = await this.processRetries(
esClient,
soClient,
agentPolicy,
requiredVersion.version
);
numberOfAgentsForUpgrade -= numberOfRetriedAgents;
if (numberOfAgentsForUpgrade <= 0) {
return;
}
// Fetch candidate agents assigned to the policy in batches.
// NB: ideally, we would query active agents on or below the target version. Unfortunately, this is not possible because agent.version
// is stored as text, so semver comparison cannot be done in the ES query (cf. https://github.com/elastic/kibana/issues/168604).
// As an imperfect alternative, sort agents by version. Since versions sort alphabetically, this will not always result in ascending semver sorting.
const activeAgentsFetcher = await fetchAllAgentsByKuery(esClient, soClient, {
kuery: this.getActiveAgentsKuery(agentPolicy),
const statusKuery =
'(status:online OR status:offline OR status:enrolling OR status:degraded OR status:error OR status:orphaned)'; // active status except updating
const oldStuckInUpdatingKuery = `(NOT upgrade_details:* AND status:updating AND NOT upgraded_at:* AND upgrade_started_at < now-2h)`; // agents pre 8.12.0 (without upgrade_details)
const newStuckInUpdatingKuery = `(upgrade_details.target_version:${requiredVersion.version} AND upgrade_details.state:UPG_FAILED)`;
const agentsFetcher = await fetchAllAgentsByKuery(esClient, soClient, {
kuery: `policy_id:${agentPolicy.id} AND (NOT upgrade_attempts:*) AND (${statusKuery} OR ${oldStuckInUpdatingKuery} OR ${newStuckInUpdatingKuery})`,
perPage: AGENTS_BATCHSIZE,
sortField: 'agent.version',
sortOrder: 'asc',
});
let { done, agents } = await this.getNextAgentsBatch(activeAgentsFetcher);
let { done, agents } = await this.getNextAgentsBatch(agentsFetcher);
if (agents.length === 0) {
this.logger.debug(
`[AutomaticAgentUpgradeTask] Agent policy ${agentPolicy.id}: no candidate agents found for upgrade (target version: ${requiredVersion.version}, percentage: ${requiredVersion.percentage})`
);
return;
}
let shouldProcessAgents = true;
while (shouldProcessAgents) {
this.throwIfAborted();
numberOfAgentsForUpgrade = await this.findAndUpgradeCandidateAgents(
esClient,
soClient,
@ -300,7 +323,7 @@ export class AutomaticAgentUpgradeTask {
agents
);
if (!done && numberOfAgentsForUpgrade > 0) {
({ done, agents } = await this.getNextAgentsBatch(activeAgentsFetcher));
({ done, agents } = await this.getNextAgentsBatch(agentsFetcher));
} else {
shouldProcessAgents = false;
}
@ -313,10 +336,65 @@ export class AutomaticAgentUpgradeTask {
}
}
private async processRetries(
esClient: ElasticsearchClient,
soClient: SavedObjectsClientContract,
agentPolicy: AgentPolicy,
version: string
) {
let retriedAgentsCounter = 0;
const retryingAgentsFetcher = await fetchAllAgentsByKuery(esClient, soClient, {
kuery: `policy_id:${agentPolicy.id} AND upgrade_details.target_version:${version} AND upgrade_details.state:UPG_FAILED AND upgrade_attempts:*`,
perPage: AGENTS_BATCHSIZE,
sortField: 'agent.version',
sortOrder: 'asc',
});
for await (const retryingAgentsPageResults of retryingAgentsFetcher) {
this.throwIfAborted();
// This function will return the total number of agents marked for retry so they're included in the count of agents for upgrade.
retriedAgentsCounter += retryingAgentsPageResults.length;
const agentsReadyForRetry = retryingAgentsPageResults.filter((agent) =>
this.isAgentReadyForRetry(agent, agentPolicy)
);
if (agentsReadyForRetry.length > 0) {
this.logger.info(
`[AutomaticAgentUpgradeTask] Agent policy ${agentPolicy.id}: retrying upgrade to ${version} for ${agentsReadyForRetry.length} agents`
);
await sendAutomaticUpgradeAgentsActions(soClient, esClient, {
agents: agentsReadyForRetry,
version,
...this.getUpgradeDurationSeconds(agentsReadyForRetry.length),
});
}
}
return retriedAgentsCounter;
}
private isAgentReadyForRetry(agent: Agent, agentPolicy: AgentPolicy) {
if (!agent.upgrade_attempts) {
return false;
}
if (agent.upgrade_attempts.length > this.retryDelays.length) {
this.logger.debug(
`[AutomaticAgentUpgradeTask] Agent policy ${agentPolicy.id}: max retry attempts exceeded for agent ${agent.id}`
);
return false;
}
const currentRetryDelay = moment
.duration('PT' + this.retryDelays[agent.upgrade_attempts.length - 1].toUpperCase()) // https://momentjs.com/docs/#/durations/
.asMilliseconds();
const lastUpgradeAttempt = Date.parse(agent.upgrade_attempts[0]);
return Date.now() - lastUpgradeAttempt >= currentRetryDelay;
}
private async getNextAgentsBatch(agentsFetcher: AsyncIterable<Agent[]>) {
const agentsFetcherIter = agentsFetcher[Symbol.asyncIterator]();
const agentsBatch = await agentsFetcherIter.next();
const agents: Agent[] = agentsBatch.value;
const agents: Agent[] = agentsBatch.value ?? [];
return {
done: agentsBatch.done,
agents: agents.filter((agent): agent is AgentWithDefinedVersion => agent.agent !== undefined),
@ -347,11 +425,10 @@ export class AutomaticAgentUpgradeTask {
this.logger.info(
`[AutomaticAgentUpgradeTask] Agent policy ${agentPolicy.id}: sending bulk upgrade to ${version} for ${agentsForUpgrade.length} agents`
);
await sendUpgradeAgentsActions(soClient, esClient, {
await sendAutomaticUpgradeAgentsActions(soClient, esClient, {
agents: agentsForUpgrade,
version,
...this.getUpgradeDurationSeconds(agentsForUpgrade.length),
isAutomatic: true,
});
}
@ -359,13 +436,7 @@ export class AutomaticAgentUpgradeTask {
}
private isAgentEligibleForUpgrade(agent: AgentWithDefinedVersion, version: string) {
return (
isAgentUpgradeable(agent) &&
(agent.status !== 'updating' ||
(AgentStatusKueryHelper.isStuckInUpdating(agent) &&
agent.upgrade_details?.target_version === version)) &&
semverGt(version, agent.agent.version)
);
return isAgentUpgradeable(agent) && semverGt(version, agent.agent.version);
}
private getUpgradeDurationSeconds(nAgents: number) {

View file

@ -161,6 +161,9 @@ export const AgentResponseSchema = schema.object({
}),
])
),
upgrade_attempts: schema.maybe(
schema.oneOf([schema.literal(null), schema.arrayOf(schema.string())])
),
access_api_key_id: schema.maybe(schema.string()),
default_api_key: schema.maybe(schema.string()),
default_api_key_id: schema.maybe(schema.string()),