[Fleet] Bulk action refactor and small fixes (#142299)

* refactor to report errors in agent update for all actions

* fixed tests

* fixed types

* refactor to reduce duplication

* fixed test

* fix for cypress test

* passing error reason

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Julia Bardi 2022-10-03 10:17:38 +02:00 committed by GitHub
parent 8a298e4c8d
commit f8a90275d5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 145 additions and 146 deletions

View file

@ -35,6 +35,7 @@ const plugin: Cypress.PluginConfig = (on, config) => {
query,
ignore_unavailable: ignoreUnavailable,
refresh: true,
conflicts: 'proceed',
});
},
});

View file

@ -123,12 +123,22 @@ export const TagsAddRemove: React.FC<Props> = ({
// sending updated tags to add/remove, in case multiple actions are done quickly and the first one is not yet propagated
const updatedTagsToAdd = tagsToAdd.concat(
labels
.filter((tag) => tag.checked === 'on' && !selectedTags.includes(tag.label))
.filter(
(tag) =>
tag.checked === 'on' &&
!selectedTags.includes(tag.label) &&
!tagsToRemove.includes(tag.label)
)
.map((tag) => tag.label)
);
const updatedTagsToRemove = tagsToRemove.concat(
labels
.filter((tag) => tag.checked !== 'on' && selectedTags.includes(tag.label))
.filter(
(tag) =>
tag.checked !== 'on' &&
selectedTags.includes(tag.label) &&
!tagsToAdd.includes(tag.label)
)
.map((tag) => tag.label)
);

View file

@ -64,7 +64,10 @@ export async function getActionStatuses(
const matchingBucket = (acks?.aggregations?.ack_counts as any)?.buckets?.find(
(bucket: any) => bucket.key === action.actionId
);
const nbAgentsAck = (matchingBucket?.agent_count as any)?.value ?? 0;
const nbAgentsAck = Math.min(
matchingBucket?.doc_count ?? 0,
(matchingBucket?.agent_count as any)?.value ?? 0
);
const completionTime = (matchingBucket?.max_timestamp as any)?.value_as_string;
const nbAgentsActioned = action.nbAgentsActioned || action.nbAgentsActionCreated;
const complete = nbAgentsAck >= nbAgentsActioned;

View file

@ -92,10 +92,14 @@ describe('Agent actions', () => {
await cancelAgentAction(esClient, 'action1');
expect(mockedBulkUpdateAgents).toBeCalled();
expect(mockedBulkUpdateAgents).toBeCalledWith(expect.anything(), [
expect.objectContaining({ agentId: 'agent1' }),
expect.objectContaining({ agentId: 'agent2' }),
]);
expect(mockedBulkUpdateAgents).toBeCalledWith(
expect.anything(),
[
expect.objectContaining({ agentId: 'agent1' }),
expect.objectContaining({ agentId: 'agent2' }),
],
{}
);
});
});
});

View file

@ -8,6 +8,7 @@
import uuid from 'uuid';
import type { ElasticsearchClient } from '@kbn/core/server';
import { appContextService } from '../app_context';
import type {
Agent,
AgentAction,
@ -101,6 +102,32 @@ export async function bulkCreateAgentActions(
return actions;
}
export async function createErrorActionResults(
esClient: ElasticsearchClient,
actionId: string,
errors: Record<Agent['id'], Error>,
errorMessage: string
) {
const errorCount = Object.keys(errors).length;
if (errorCount > 0) {
appContextService
.getLogger()
.info(
`Writing error action results of ${errorCount} agents. Possibly failed validation: ${errorMessage}.`
);
// writing out error result for those agents that have errors, so the action is not going to stay in progress forever
await bulkCreateAgentActionResults(
esClient,
Object.keys(errors).map((agentId) => ({
agentId,
actionId,
error: errors[agentId].message,
}))
);
}
}
export async function bulkCreateAgentActionResults(
esClient: ElasticsearchClient,
results: Array<{
@ -227,6 +254,19 @@ export async function cancelAgentAction(esClient: ElasticsearchClient, actionId:
if (!hit._source || !hit._source.agents || !hit._source.action_id) {
continue;
}
if (hit._source.type === 'UPGRADE') {
await bulkUpdateAgents(
esClient,
hit._source.agents.map((agentId) => ({
agentId,
data: {
upgraded_at: null,
upgrade_started_at: null,
},
})),
{}
);
}
await createAgentAction(esClient, {
id: cancelActionId,
type: 'CANCEL',
@ -237,18 +277,6 @@ export async function cancelAgentAction(esClient: ElasticsearchClient, actionId:
created_at: now,
expiration: hit._source.expiration,
});
if (hit._source.type === 'UPGRADE') {
await bulkUpdateAgents(
esClient,
hit._source.agents.map((agentId) => ({
agentId,
data: {
upgraded_at: null,
upgrade_started_at: null,
},
}))
);
}
}
return {

View file

@ -11,7 +11,7 @@ import type { SavedObjectsClientContract, ElasticsearchClient } from '@kbn/core/
import type { KueryNode } from '@kbn/es-query';
import { fromKueryExpression, toElasticsearchQuery } from '@kbn/es-query';
import type { AgentSOAttributes, Agent, BulkActionResult, ListWithKuery } from '../../types';
import type { AgentSOAttributes, Agent, ListWithKuery } from '../../types';
import { appContextService, agentPolicyService } from '..';
import type { FleetServerAgent } from '../../../common/types';
import { SO_SEARCH_LIMIT } from '../../../common/constants';
@ -395,10 +395,11 @@ export async function bulkUpdateAgents(
updateData: Array<{
agentId: string;
data: Partial<AgentSOAttributes>;
}>
): Promise<{ items: BulkActionResult[] }> {
}>,
errors: { [key: string]: Error }
): Promise<void> {
if (updateData.length === 0) {
return { items: [] };
return;
}
const body = updateData.flatMap(({ agentId, data }) => [
@ -419,14 +420,12 @@ export async function bulkUpdateAgents(
refresh: 'wait_for',
});
return {
items: res.items.map((item) => ({
id: item.update!._id as string,
success: !item.update!.error,
res.items
.filter((item) => item.update!.error)
.forEach((item) => {
// @ts-expect-error it not assignable to ErrorCause
error: item.update!.error as Error,
})),
};
errors[item.update!._id as string] = item.update!.error as Error;
});
}
export async function deleteAgent(esClient: ElasticsearchClient, agentId: string) {

View file

@ -16,7 +16,7 @@ import { appContextService } from '../app_context';
import { ActionRunner } from './action_runner';
import { bulkUpdateAgents } from './crud';
import { bulkCreateAgentActionResults, createAgentAction } from './actions';
import { createErrorActionResults, createAgentAction } from './actions';
import { getHostedPolicies, isHostedAgent } from './hosted_agent';
import { BulkActionTaskType } from './bulk_actions_resolver';
@ -72,7 +72,7 @@ export async function reassignBatch(
throw new AgentReassignmentError('No agents to reassign, already assigned or hosted agents');
}
const res = await bulkUpdateAgents(
await bulkUpdateAgents(
esClient,
agentsToUpdate.map((agent) => ({
agentId: agent.id,
@ -80,18 +80,12 @@ export async function reassignBatch(
policy_id: options.newAgentPolicyId,
policy_revision: null,
},
}))
})),
errors
);
res.items
.filter((item) => !item.success)
.forEach((item) => {
errors[item.id] = item.error!;
});
const actionId = options.actionId ?? uuid();
const errorCount = Object.keys(errors).length;
const total = options.total ?? agentsToUpdate.length + errorCount;
const total = options.total ?? givenAgents.length;
const now = new Date().toISOString();
await createAgentAction(esClient, {
@ -105,23 +99,12 @@ export async function reassignBatch(
},
});
if (errorCount > 0) {
appContextService
.getLogger()
.info(
`Skipping ${errorCount} agents, as failed validation (already assigned or assigned to hosted policy)`
);
// writing out error result for those agents that failed validation, so the action is not going to stay in progress forever
await bulkCreateAgentActionResults(
esClient,
Object.keys(errors).map((agentId) => ({
agentId,
actionId,
error: errors[agentId].message,
}))
);
}
await createErrorActionResults(
esClient,
actionId,
errors,
'already assigned or assigned to hosted policy'
);
return { actionId };
}

View file

@ -115,7 +115,7 @@ describe('unenrollAgents (plural)', () => {
// calls ES update with correct values
const onlyRegular = [agentInRegularDoc._id, agentInRegularDoc2._id];
const calledWith = esClient.bulk.mock.calls[1][0];
const calledWith = esClient.bulk.mock.calls[0][0];
const ids = (calledWith as estypes.BulkRequest)?.body
?.filter((i: any) => i.update !== undefined)
.map((i: any) => i.update._id);
@ -128,7 +128,7 @@ describe('unenrollAgents (plural)', () => {
}
// hosted policy is updated in action results with error
const calledWithActionResults = esClient.bulk.mock.calls[0][0] as estypes.BulkRequest;
const calledWithActionResults = esClient.bulk.mock.calls[1][0] as estypes.BulkRequest;
// bulk write two line per create
expect(calledWithActionResults.body?.length).toBe(2);
const expectedObject = expect.objectContaining({
@ -170,7 +170,7 @@ describe('unenrollAgents (plural)', () => {
});
expect(esClient.bulk.mock.calls.length).toEqual(3);
const bulkBody = (esClient.bulk.mock.calls[1][0] as estypes.BulkRequest)?.body?.[1] as any;
const bulkBody = (esClient.bulk.mock.calls[2][0] as estypes.BulkRequest)?.body?.[1] as any;
expect(bulkBody.agent_id).toEqual(agentInRegularDoc._id);
expect(bulkBody.action_id).toEqual('other-action');
});
@ -227,7 +227,7 @@ describe('unenrollAgents (plural)', () => {
// calls ES update with correct values
const onlyRegular = [agentInRegularDoc._id, agentInRegularDoc2._id];
const calledWith = esClient.bulk.mock.calls[2][0];
const calledWith = esClient.bulk.mock.calls[0][0];
const ids = (calledWith as estypes.BulkRequest)?.body
?.filter((i: any) => i.update !== undefined)
.map((i: any) => i.update._id);
@ -239,13 +239,13 @@ describe('unenrollAgents (plural)', () => {
expect(doc).toHaveProperty('unenrolled_at');
}
const errorResults = esClient.bulk.mock.calls[1][0];
const errorResults = esClient.bulk.mock.calls[2][0];
const errorIds = (errorResults as estypes.BulkRequest)?.body
?.filter((i: any) => i.agent_id)
.map((i: any) => i.agent_id);
expect(errorIds).toEqual([agentInHostedDoc._id]);
const actionResults = esClient.bulk.mock.calls[0][0];
const actionResults = esClient.bulk.mock.calls[1][0];
const resultIds = (actionResults as estypes.BulkRequest)?.body
?.filter((i: any) => i.agent_id)
.map((i: any) => i.agent_id);
@ -290,7 +290,7 @@ describe('unenrollAgents (plural)', () => {
expect(unenrolledResponse.actionId).toBeDefined();
// calls ES update with correct values
const calledWith = esClient.bulk.mock.calls[1][0];
const calledWith = esClient.bulk.mock.calls[0][0];
const ids = (calledWith as estypes.BulkRequest)?.body
?.filter((i: any) => i.update !== undefined)
.map((i: any) => i.update._id);
@ -302,7 +302,7 @@ describe('unenrollAgents (plural)', () => {
expect(doc).toHaveProperty('unenrolled_at');
}
const actionResults = esClient.bulk.mock.calls[0][0];
const actionResults = esClient.bulk.mock.calls[1][0];
const resultIds = (actionResults as estypes.BulkRequest)?.body
?.filter((i: any) => i.agent_id)
.map((i: any) => i.agent_id);

View file

@ -25,6 +25,7 @@ import { bulkUpdateAgents } from './crud';
import {
bulkCreateAgentActionResults,
createAgentAction,
createErrorActionResults,
getUnenrollAgentActions,
} from './actions';
import { getHostedPolicies, isHostedAgent } from './hosted_agent';
@ -81,13 +82,24 @@ export async function unenrollBatch(
return agents;
}, []);
const now = new Date().toISOString();
// Update the necessary agents
const updateData = options.revoke
? { unenrolled_at: now, active: false }
: { unenrollment_started_at: now };
await bulkUpdateAgents(
esClient,
agentsToUpdate.map(({ id }) => ({ agentId: id, data: updateData })),
outgoingErrors
);
const actionId = options.actionId ?? uuid();
const errorCount = Object.keys(outgoingErrors).length;
const total = options.total ?? givenAgents.length;
const agentIds = agentsToUpdate.map((agent) => agent.id);
const now = new Date().toISOString();
if (options.revoke) {
// Get all API keys that need to be invalidated
await invalidateAPIKeysForAgents(agentsToUpdate);
@ -104,32 +116,11 @@ export async function unenrollBatch(
});
}
if (errorCount > 0) {
appContextService
.getLogger()
.info(
`Skipping ${errorCount} agents, as failed validation (cannot unenroll from a hosted policy or already unenrolled)`
);
// writing out error result for those agents that failed validation, so the action is not going to stay in progress forever
await bulkCreateAgentActionResults(
esClient,
Object.keys(outgoingErrors).map((agentId) => ({
agentId,
actionId,
error: outgoingErrors[agentId].message,
}))
);
}
// Update the necessary agents
const updateData = options.revoke
? { unenrolled_at: now, active: false }
: { unenrollment_started_at: now };
await bulkUpdateAgents(
await createErrorActionResults(
esClient,
agentsToUpdate.map(({ id }) => ({ agentId: id, data: updateData }))
actionId,
outgoingErrors,
'cannot unenroll from a hosted policy or already unenrolled'
);
return {

View file

@ -11,14 +11,16 @@ import { difference, uniq } from 'lodash';
import type { Agent } from '../../types';
import { appContextService } from '../app_context';
import { ActionRunner } from './action_runner';
import { bulkUpdateAgents } from './crud';
import { BulkActionTaskType } from './bulk_actions_resolver';
import { filterHostedPolicies } from './filter_hosted_agents';
import { bulkCreateAgentActionResults, createAgentAction } from './actions';
import {
createErrorActionResults,
bulkCreateAgentActionResults,
createAgentAction,
} from './actions';
export class UpdateAgentTagsActionRunner extends ActionRunner {
protected async processAgents(agents: Agent[]): Promise<{ actionId: string }> {
@ -90,12 +92,12 @@ export async function updateTagsBatch(
data: {
tags: getNewTags(agent),
},
}))
})),
errors
);
const actionId = options.actionId ?? uuid();
const total = options.total ?? givenAgents.length;
const errorCount = Object.keys(errors).length;
// creating an action doc so that update tags shows up in activity
await createAgentAction(esClient, {
@ -113,23 +115,12 @@ export async function updateTagsBatch(
}))
);
if (errorCount > 0) {
appContextService
.getLogger()
.info(
`Skipping ${errorCount} agents, as failed validation (cannot modified tags on hosted agents)`
);
// writing out error result for those agents that failed validation, so the action is not going to stay in progress forever
await bulkCreateAgentActionResults(
esClient,
Object.keys(errors).map((agentId) => ({
agentId,
actionId,
error: errors[agentId].message,
}))
);
}
await createErrorActionResults(
esClient,
actionId,
errors,
'cannot modified tags on hosted agents'
);
return { actionId };
}

View file

@ -50,7 +50,7 @@ describe('sendUpgradeAgentsActions (plural)', () => {
// calls ES update with correct values
const onlyRegular = [agentInRegularDoc._id, agentInRegularDoc2._id];
const calledWith = esClient.bulk.mock.calls[1][0];
const calledWith = esClient.bulk.mock.calls[0][0];
const ids = (calledWith as estypes.BulkRequest)?.body
?.filter((i: any) => i.update !== undefined)
.map((i: any) => i.update._id);
@ -64,7 +64,7 @@ describe('sendUpgradeAgentsActions (plural)', () => {
}
// hosted policy is updated in action results with error
const calledWithActionResults = esClient.bulk.mock.calls[0][0] as estypes.BulkRequest;
const calledWithActionResults = esClient.bulk.mock.calls[1][0] as estypes.BulkRequest;
// bulk write two line per create
expect(calledWithActionResults.body?.length).toBe(2);
const expectedObject = expect.objectContaining({

View file

@ -22,7 +22,7 @@ import { ActionRunner } from './action_runner';
import type { GetAgentsOptions } from './crud';
import { bulkUpdateAgents } from './crud';
import { bulkCreateAgentActionResults, createAgentAction } from './actions';
import { createErrorActionResults, createAgentAction } from './actions';
import { getHostedPolicies, isHostedAgent } from './hosted_agent';
import { BulkActionTaskType } from './bulk_actions_resolver';
@ -108,9 +108,20 @@ export async function upgradeBatch(
options.upgradeDurationSeconds
);
await bulkUpdateAgents(
esClient,
agentsToUpdate.map((agent) => ({
agentId: agent.id,
data: {
upgraded_at: null,
upgrade_started_at: now,
},
})),
errors
);
const actionId = options.actionId ?? uuid();
const errorCount = Object.keys(errors).length;
const total = options.total ?? agentsToUpdate.length + errorCount;
const total = options.total ?? givenAgents.length;
await createAgentAction(esClient, {
id: actionId,
@ -123,33 +134,11 @@ export async function upgradeBatch(
...rollingUpgradeOptions,
});
if (errorCount > 0) {
appContextService
.getLogger()
.info(
`Skipping ${errorCount} agents, as failed validation (cannot upgrade hosted agent or agent not upgradeable)`
);
// writing out error result for those agents that failed validation, so the action is not going to stay in progress forever
await bulkCreateAgentActionResults(
esClient,
Object.keys(errors).map((agentId) => ({
agentId,
actionId,
error: errors[agentId].message,
}))
);
}
await bulkUpdateAgents(
await createErrorActionResults(
esClient,
agentsToUpdate.map((agent) => ({
agentId: agent.id,
data: {
upgraded_at: null,
upgrade_started_at: now,
},
}))
actionId,
errors,
'cannot upgrade hosted agent or agent not upgradeable'
);
return {