mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
[Fleet] Improving bulk actions for more than 10k agents (#134565)
* changed getAllAgentsByKuery to query all agents with pit and search_after * added internal api to test pit query * changed reassign to work on batches of 10k * unenroll in batches * upgrade in batches * fixed upgrade * added tests * cleanup * revert changes in getAllAgentsByKuery * renamed perPage to batchSize in bulk actions * fixed test * try catch around close pit Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
b475ed6a81
commit
2732f26419
15 changed files with 516 additions and 87 deletions
|
@ -87,6 +87,7 @@ export interface Agent extends AgentBase {
|
|||
access_api_key?: string;
|
||||
status?: AgentStatus;
|
||||
packages: string[];
|
||||
sort?: Array<number | string | null>;
|
||||
}
|
||||
|
||||
export interface AgentSOAttributes extends AgentBase {
|
||||
|
|
|
@ -195,7 +195,7 @@ export const postBulkAgentsReassignHandler: RequestHandler<
|
|||
const results = await AgentService.reassignAgents(
|
||||
soClient,
|
||||
esClient,
|
||||
agentOptions,
|
||||
{ ...agentOptions, batchSize: request.body.batchSize },
|
||||
request.body.policy_id
|
||||
);
|
||||
|
||||
|
|
|
@ -65,6 +65,7 @@ export const postBulkAgentsUnenrollHandler: RequestHandler<
|
|||
...agentOptions,
|
||||
revoke: request.body?.revoke,
|
||||
force: request.body?.force,
|
||||
batchSize: request.body?.batchSize,
|
||||
});
|
||||
const body = results.items.reduce<PostBulkAgentUnenrollResponse>((acc, so) => {
|
||||
acc[so.id] = {
|
||||
|
|
|
@ -97,6 +97,7 @@ export const postBulkAgentsUpgradeHandler: RequestHandler<
|
|||
force,
|
||||
rollout_duration_seconds: upgradeDurationSeconds,
|
||||
start_time: startTime,
|
||||
batchSize,
|
||||
} = request.body;
|
||||
const kibanaVersion = appContextService.getKibanaVersion();
|
||||
try {
|
||||
|
@ -122,6 +123,7 @@ export const postBulkAgentsUpgradeHandler: RequestHandler<
|
|||
force,
|
||||
upgradeDurationSeconds,
|
||||
startTime,
|
||||
batchSize,
|
||||
};
|
||||
const results = await AgentService.sendUpgradeAgentsActions(soClient, esClient, upgradeOptions);
|
||||
const body = results.items.reduce<PostBulkAgentUpgradeResponse>((acc, so) => {
|
||||
|
|
|
@ -9,7 +9,7 @@ import type { ElasticsearchClient } from '@kbn/core/server';
|
|||
|
||||
import type { Agent } from '../../types';
|
||||
|
||||
import { getAgentsByKuery } from './crud';
|
||||
import { errorsToResults, getAgentsByKuery, processAgentsInBatches } from './crud';
|
||||
|
||||
jest.mock('../../../common', () => ({
|
||||
...jest.requireActual('../../../common'),
|
||||
|
@ -19,26 +19,28 @@ jest.mock('../../../common', () => ({
|
|||
describe('Agents CRUD test', () => {
|
||||
let esClientMock: ElasticsearchClient;
|
||||
let searchMock: jest.Mock;
|
||||
|
||||
beforeEach(() => {
|
||||
searchMock = jest.fn();
|
||||
esClientMock = {
|
||||
search: searchMock,
|
||||
openPointInTime: jest.fn().mockResolvedValue({ id: '1' }),
|
||||
closePointInTime: jest.fn(),
|
||||
} as unknown as ElasticsearchClient;
|
||||
});
|
||||
|
||||
function getEsResponse(ids: string[], total: number) {
|
||||
return {
|
||||
hits: {
|
||||
total,
|
||||
hits: ids.map((id: string) => ({
|
||||
_id: id,
|
||||
_source: {},
|
||||
})),
|
||||
},
|
||||
};
|
||||
}
|
||||
describe('getAgentsByKuery', () => {
|
||||
beforeEach(() => {
|
||||
searchMock = jest.fn();
|
||||
esClientMock = {
|
||||
search: searchMock,
|
||||
} as unknown as ElasticsearchClient;
|
||||
});
|
||||
|
||||
function getEsResponse(ids: string[], total: number) {
|
||||
return {
|
||||
hits: {
|
||||
total,
|
||||
hits: ids.map((id: string) => ({
|
||||
_id: id,
|
||||
_source: {},
|
||||
})),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
it('should return upgradeable on first page', async () => {
|
||||
searchMock
|
||||
.mockImplementationOnce(() => Promise.resolve(getEsResponse(['1', '2', '3', '4', '5'], 7)))
|
||||
|
@ -192,4 +194,85 @@ describe('Agents CRUD test', () => {
|
|||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('processAgentsInBatches', () => {
|
||||
const mockProcessAgents = (agents: Agent[]) =>
|
||||
Promise.resolve({ items: agents.map((agent) => ({ id: agent.id, success: true })) });
|
||||
it('should return results for multiple batches', async () => {
|
||||
searchMock
|
||||
.mockImplementationOnce(() => Promise.resolve(getEsResponse(['1', '2'], 3)))
|
||||
.mockImplementationOnce(() => Promise.resolve(getEsResponse(['3'], 3)));
|
||||
|
||||
const response = await processAgentsInBatches(
|
||||
esClientMock,
|
||||
{
|
||||
kuery: 'active:true',
|
||||
batchSize: 2,
|
||||
showInactive: false,
|
||||
},
|
||||
mockProcessAgents
|
||||
);
|
||||
expect(response).toEqual({
|
||||
items: [
|
||||
{ id: '1', success: true },
|
||||
{ id: '2', success: true },
|
||||
{ id: '3', success: true },
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it('should return results for one batch', async () => {
|
||||
searchMock.mockImplementationOnce(() => Promise.resolve(getEsResponse(['1', '2', '3'], 3)));
|
||||
|
||||
const response = await processAgentsInBatches(
|
||||
esClientMock,
|
||||
{
|
||||
kuery: 'active:true',
|
||||
showInactive: false,
|
||||
},
|
||||
mockProcessAgents
|
||||
);
|
||||
expect(response).toEqual({
|
||||
items: [
|
||||
{ id: '1', success: true },
|
||||
{ id: '2', success: true },
|
||||
{ id: '3', success: true },
|
||||
],
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('errorsToResults', () => {
|
||||
it('should transform errors to results', () => {
|
||||
const results = errorsToResults([{ id: '1' } as Agent, { id: '2' } as Agent], {
|
||||
'1': new Error('error'),
|
||||
});
|
||||
expect(results).toEqual([
|
||||
{ id: '1', success: false, error: new Error('error') },
|
||||
{ id: '2', success: true },
|
||||
]);
|
||||
});
|
||||
|
||||
it('should transform errors to results with skip success', () => {
|
||||
const results = errorsToResults(
|
||||
[{ id: '1' } as Agent, { id: '2' } as Agent],
|
||||
{ '1': new Error('error') },
|
||||
undefined,
|
||||
true
|
||||
);
|
||||
expect(results).toEqual([{ id: '1', success: false, error: new Error('error') }]);
|
||||
});
|
||||
|
||||
it('should transform errors to results preserve order', () => {
|
||||
const results = errorsToResults(
|
||||
[{ id: '1' } as Agent, { id: '2' } as Agent],
|
||||
{ '1': new Error('error') },
|
||||
['2', '1']
|
||||
);
|
||||
expect(results).toEqual([
|
||||
{ id: '2', success: true },
|
||||
{ id: '1', success: false, error: new Error('error') },
|
||||
]);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
import Boom from '@hapi/boom';
|
||||
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
||||
import type { SortResults } from '@elastic/elasticsearch/lib/api/types';
|
||||
import type { SavedObjectsClientContract, ElasticsearchClient } from '@kbn/core/server';
|
||||
|
||||
import type { KueryNode } from '@kbn/es-query';
|
||||
|
@ -68,6 +69,7 @@ export type GetAgentsOptions =
|
|||
| {
|
||||
kuery: string;
|
||||
showInactive?: boolean;
|
||||
perPage?: number;
|
||||
};
|
||||
|
||||
export async function getAgents(esClient: ElasticsearchClient, options: GetAgentsOptions) {
|
||||
|
@ -90,6 +92,113 @@ export async function getAgents(esClient: ElasticsearchClient, options: GetAgent
|
|||
return agents;
|
||||
}
|
||||
|
||||
export async function getAgentsByKueryPit(
|
||||
esClient: ElasticsearchClient,
|
||||
options: ListWithKuery & {
|
||||
showInactive: boolean;
|
||||
pitId: string;
|
||||
searchAfter?: SortResults;
|
||||
}
|
||||
): Promise<{
|
||||
agents: Agent[];
|
||||
total: number;
|
||||
page: number;
|
||||
perPage: number;
|
||||
}> {
|
||||
const {
|
||||
page = 1,
|
||||
perPage = 20,
|
||||
sortField = 'enrolled_at',
|
||||
sortOrder = 'desc',
|
||||
kuery,
|
||||
showInactive = false,
|
||||
showUpgradeable,
|
||||
searchAfter,
|
||||
} = options;
|
||||
const { pitId } = options;
|
||||
const filters = [];
|
||||
|
||||
if (kuery && kuery !== '') {
|
||||
filters.push(kuery);
|
||||
}
|
||||
|
||||
if (showInactive === false) {
|
||||
filters.push(ACTIVE_AGENT_CONDITION);
|
||||
}
|
||||
|
||||
const kueryNode = _joinFilters(filters);
|
||||
const body = kueryNode ? { query: toElasticsearchQuery(kueryNode) } : {};
|
||||
|
||||
const queryAgents = async (from: number, size: number) => {
|
||||
return esClient.search<FleetServerAgent, {}>({
|
||||
from,
|
||||
size,
|
||||
track_total_hits: true,
|
||||
rest_total_hits_as_int: true,
|
||||
body: {
|
||||
...body,
|
||||
sort: [{ [sortField]: { order: sortOrder } }],
|
||||
},
|
||||
pit: {
|
||||
id: pitId,
|
||||
keep_alive: '1m',
|
||||
},
|
||||
...(searchAfter ? { search_after: searchAfter, from: 0 } : {}),
|
||||
});
|
||||
};
|
||||
|
||||
const res = await queryAgents((page - 1) * perPage, perPage);
|
||||
|
||||
let agents = res.hits.hits.map(searchHitToAgent);
|
||||
let total = res.hits.total as number;
|
||||
|
||||
// filtering for a range on the version string will not work,
|
||||
// nor does filtering on a flattened field (local_metadata), so filter here
|
||||
if (showUpgradeable) {
|
||||
// query all agents, then filter upgradeable, and return the requested page and correct total
|
||||
// if there are more than SO_SEARCH_LIMIT agents, the logic falls back to same as before
|
||||
if (total < SO_SEARCH_LIMIT) {
|
||||
const response = await queryAgents(0, SO_SEARCH_LIMIT);
|
||||
agents = response.hits.hits
|
||||
.map(searchHitToAgent)
|
||||
.filter((agent) => isAgentUpgradeable(agent, appContextService.getKibanaVersion()));
|
||||
total = agents.length;
|
||||
const start = (page - 1) * perPage;
|
||||
agents = agents.slice(start, start + perPage);
|
||||
} else {
|
||||
agents = agents.filter((agent) =>
|
||||
isAgentUpgradeable(agent, appContextService.getKibanaVersion())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
agents,
|
||||
total,
|
||||
page,
|
||||
perPage,
|
||||
};
|
||||
}
|
||||
|
||||
export async function openAgentsPointInTime(esClient: ElasticsearchClient): Promise<string> {
|
||||
const pitKeepAlive = '10m';
|
||||
const pitRes = await esClient.openPointInTime({
|
||||
index: AGENTS_INDEX,
|
||||
keep_alive: pitKeepAlive,
|
||||
});
|
||||
return pitRes.id;
|
||||
}
|
||||
|
||||
export async function closeAgentsPointInTime(esClient: ElasticsearchClient, pitId: string) {
|
||||
try {
|
||||
await esClient.closePointInTime({ id: pitId });
|
||||
} catch (error) {
|
||||
appContextService
|
||||
.getLogger()
|
||||
.warn(`Error closing point in time with id: ${pitId}. Error: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
export async function getAgentsByKuery(
|
||||
esClient: ElasticsearchClient,
|
||||
options: ListWithKuery & {
|
||||
|
@ -168,6 +277,83 @@ export async function getAgentsByKuery(
|
|||
};
|
||||
}
|
||||
|
||||
export async function processAgentsInBatches(
|
||||
esClient: ElasticsearchClient,
|
||||
options: Omit<ListWithKuery, 'page' | 'perPage'> & {
|
||||
showInactive: boolean;
|
||||
batchSize?: number;
|
||||
},
|
||||
processAgents: (
|
||||
agents: Agent[],
|
||||
includeSuccess: boolean
|
||||
) => Promise<{ items: BulkActionResult[] }>
|
||||
): Promise<{ items: BulkActionResult[] }> {
|
||||
const pitId = await openAgentsPointInTime(esClient);
|
||||
|
||||
const perPage = options.batchSize ?? SO_SEARCH_LIMIT;
|
||||
|
||||
const res = await getAgentsByKueryPit(esClient, {
|
||||
...options,
|
||||
page: 1,
|
||||
perPage,
|
||||
pitId,
|
||||
});
|
||||
|
||||
let currentAgents = res.agents;
|
||||
// include successful agents if total agents does not exceed 10k
|
||||
const skipSuccess = res.total > SO_SEARCH_LIMIT;
|
||||
|
||||
let results = await processAgents(currentAgents, skipSuccess);
|
||||
let allAgentsProcessed = currentAgents.length;
|
||||
|
||||
while (allAgentsProcessed < res.total) {
|
||||
const lastAgent = currentAgents[currentAgents.length - 1];
|
||||
const nextPage = await getAgentsByKueryPit(esClient, {
|
||||
...options,
|
||||
page: 1,
|
||||
perPage,
|
||||
pitId,
|
||||
searchAfter: lastAgent.sort!,
|
||||
});
|
||||
currentAgents = nextPage.agents;
|
||||
const currentResults = await processAgents(currentAgents, skipSuccess);
|
||||
results = { items: results.items.concat(currentResults.items) };
|
||||
allAgentsProcessed += currentAgents.length;
|
||||
}
|
||||
|
||||
await closeAgentsPointInTime(esClient, pitId);
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
export function errorsToResults(
|
||||
agents: Agent[],
|
||||
errors: Record<Agent['id'], Error>,
|
||||
agentIds?: string[],
|
||||
skipSuccess?: boolean
|
||||
): BulkActionResult[] {
|
||||
if (!skipSuccess) {
|
||||
const givenOrder = agentIds ? agentIds : agents.map((agent) => agent.id);
|
||||
return givenOrder.map((agentId) => {
|
||||
const hasError = agentId in errors;
|
||||
const result: BulkActionResult = {
|
||||
id: agentId,
|
||||
success: !hasError,
|
||||
};
|
||||
if (hasError) {
|
||||
result.error = errors[agentId];
|
||||
}
|
||||
return result;
|
||||
});
|
||||
} else {
|
||||
return Object.entries(errors).map(([agentId, error]) => ({
|
||||
id: agentId,
|
||||
success: false,
|
||||
error,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
export async function getAllAgentsByKuery(
|
||||
esClient: ElasticsearchClient,
|
||||
options: Omit<ListWithKuery, 'page' | 'perPage'> & {
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
*/
|
||||
|
||||
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
||||
|
||||
import type { SortResults } from '@elastic/elasticsearch/lib/api/types';
|
||||
import type { SearchHit } from '@kbn/core/types/elasticsearch';
|
||||
|
||||
import type { Agent, AgentSOAttributes, FleetServerAgent } from '../../types';
|
||||
|
@ -17,7 +17,7 @@ type FleetServerAgentESResponse =
|
|||
| estypes.SearchResponse<FleetServerAgent>['hits']['hits'][0]
|
||||
| SearchHit<FleetServerAgent>;
|
||||
|
||||
export function searchHitToAgent(hit: FleetServerAgentESResponse): Agent {
|
||||
export function searchHitToAgent(hit: FleetServerAgentESResponse & { sort?: SortResults }): Agent {
|
||||
// @ts-expect-error @elastic/elasticsearch MultiGetHit._source is optional
|
||||
const agent: Agent = {
|
||||
id: hit._id,
|
||||
|
@ -26,6 +26,7 @@ export function searchHitToAgent(hit: FleetServerAgentESResponse): Agent {
|
|||
access_api_key: undefined,
|
||||
status: undefined,
|
||||
packages: hit._source?.packages ?? [],
|
||||
sort: hit.sort,
|
||||
};
|
||||
|
||||
agent.status = getAgentStatus(agent);
|
||||
|
|
|
@ -14,10 +14,11 @@ import { AgentReassignmentError, HostedAgentPolicyRestrictionRelatedError } from
|
|||
|
||||
import {
|
||||
getAgentDocuments,
|
||||
getAgents,
|
||||
getAgentPolicyForAgent,
|
||||
updateAgent,
|
||||
bulkUpdateAgents,
|
||||
processAgentsInBatches,
|
||||
errorsToResults,
|
||||
} from './crud';
|
||||
import type { GetAgentsOptions } from '.';
|
||||
import { createAgentAction } from './actions';
|
||||
|
@ -78,7 +79,7 @@ function isMgetDoc(doc?: estypes.MgetResponseItem<unknown>): doc is estypes.GetG
|
|||
export async function reassignAgents(
|
||||
soClient: SavedObjectsClientContract,
|
||||
esClient: ElasticsearchClient,
|
||||
options: ({ agents: Agent[] } | GetAgentsOptions) & { force?: boolean },
|
||||
options: ({ agents: Agent[] } | GetAgentsOptions) & { force?: boolean; batchSize?: number },
|
||||
newAgentPolicyId: string
|
||||
): Promise<{ items: BulkActionResult[] }> {
|
||||
const newAgentPolicy = await agentPolicyService.get(soClient, newAgentPolicyId);
|
||||
|
@ -107,10 +108,46 @@ export async function reassignAgents(
|
|||
}
|
||||
}
|
||||
} else if ('kuery' in options) {
|
||||
givenAgents = await getAgents(esClient, options);
|
||||
return await processAgentsInBatches(
|
||||
esClient,
|
||||
{
|
||||
kuery: options.kuery,
|
||||
showInactive: options.showInactive ?? false,
|
||||
batchSize: options.batchSize,
|
||||
},
|
||||
async (agents: Agent[], skipSuccess: boolean) =>
|
||||
await reassignBatch(
|
||||
soClient,
|
||||
esClient,
|
||||
newAgentPolicyId,
|
||||
agents,
|
||||
outgoingErrors,
|
||||
undefined,
|
||||
skipSuccess
|
||||
)
|
||||
);
|
||||
}
|
||||
const givenOrder =
|
||||
'agentIds' in options ? options.agentIds : givenAgents.map((agent) => agent.id);
|
||||
|
||||
return await reassignBatch(
|
||||
soClient,
|
||||
esClient,
|
||||
newAgentPolicyId,
|
||||
givenAgents,
|
||||
outgoingErrors,
|
||||
'agentIds' in options ? options.agentIds : undefined
|
||||
);
|
||||
}
|
||||
|
||||
async function reassignBatch(
|
||||
soClient: SavedObjectsClientContract,
|
||||
esClient: ElasticsearchClient,
|
||||
newAgentPolicyId: string,
|
||||
givenAgents: Agent[],
|
||||
outgoingErrors: Record<Agent['id'], Error>,
|
||||
agentIds?: string[],
|
||||
skipSuccess?: boolean
|
||||
): Promise<{ items: BulkActionResult[] }> {
|
||||
const errors: Record<Agent['id'], Error> = { ...outgoingErrors };
|
||||
|
||||
const hostedPolicies = await getHostedPolicies(soClient, givenAgents);
|
||||
|
||||
|
@ -137,7 +174,7 @@ export async function reassignAgents(
|
|||
agents.push(result.value);
|
||||
} else {
|
||||
const id = givenAgents[index].id;
|
||||
outgoingErrors[id] = result.reason;
|
||||
errors[id] = result.reason;
|
||||
}
|
||||
return agents;
|
||||
}, []);
|
||||
|
@ -153,18 +190,6 @@ export async function reassignAgents(
|
|||
}))
|
||||
);
|
||||
|
||||
const orderedOut = givenOrder.map((agentId) => {
|
||||
const hasError = agentId in outgoingErrors;
|
||||
const result: BulkActionResult = {
|
||||
id: agentId,
|
||||
success: !hasError,
|
||||
};
|
||||
if (hasError) {
|
||||
result.error = outgoingErrors[agentId];
|
||||
}
|
||||
return result;
|
||||
});
|
||||
|
||||
const now = new Date().toISOString();
|
||||
await createAgentAction(esClient, {
|
||||
agents: agentsToUpdate.map((agent) => agent.id),
|
||||
|
@ -172,5 +197,5 @@ export async function reassignAgents(
|
|||
type: 'POLICY_REASSIGN',
|
||||
});
|
||||
|
||||
return { items: orderedOut };
|
||||
return { items: errorsToResults(givenAgents, errors, agentIds, skipSuccess) };
|
||||
}
|
||||
|
|
|
@ -13,12 +13,14 @@ import { HostedAgentPolicyRestrictionRelatedError } from '../../errors';
|
|||
|
||||
import { createAgentAction } from './actions';
|
||||
import type { GetAgentsOptions } from './crud';
|
||||
import { errorsToResults } from './crud';
|
||||
import {
|
||||
getAgentById,
|
||||
getAgents,
|
||||
updateAgent,
|
||||
getAgentPolicyForAgent,
|
||||
bulkUpdateAgents,
|
||||
processAgentsInBatches,
|
||||
} from './crud';
|
||||
import { getHostedPolicies, isHostedAgent } from './hosted_agent';
|
||||
|
||||
|
@ -69,11 +71,35 @@ export async function unenrollAgents(
|
|||
options: GetAgentsOptions & {
|
||||
force?: boolean;
|
||||
revoke?: boolean;
|
||||
batchSize?: number;
|
||||
}
|
||||
): Promise<{ items: BulkActionResult[] }> {
|
||||
// start with all agents specified
|
||||
const givenAgents = await getAgents(esClient, options);
|
||||
if ('agentIds' in options) {
|
||||
const givenAgents = await getAgents(esClient, options);
|
||||
return await unenrollBatch(soClient, esClient, givenAgents, options);
|
||||
}
|
||||
return await processAgentsInBatches(
|
||||
esClient,
|
||||
{
|
||||
kuery: options.kuery,
|
||||
showInactive: options.showInactive ?? false,
|
||||
batchSize: options.batchSize,
|
||||
},
|
||||
async (agents: Agent[], skipSuccess?: boolean) =>
|
||||
await unenrollBatch(soClient, esClient, agents, options, skipSuccess)
|
||||
);
|
||||
}
|
||||
|
||||
async function unenrollBatch(
|
||||
soClient: SavedObjectsClientContract,
|
||||
esClient: ElasticsearchClient,
|
||||
givenAgents: Agent[],
|
||||
options: {
|
||||
force?: boolean;
|
||||
revoke?: boolean;
|
||||
},
|
||||
skipSuccess?: boolean
|
||||
): Promise<{ items: BulkActionResult[] }> {
|
||||
// Filter to those not already unenrolled, or unenrolling
|
||||
const agentsEnrolled = givenAgents.filter((agent) => {
|
||||
if (options.revoke) {
|
||||
|
@ -124,20 +150,8 @@ export async function unenrollAgents(
|
|||
agentsToUpdate.map(({ id }) => ({ agentId: id, data: updateData }))
|
||||
);
|
||||
|
||||
const getResultForAgent = (agent: Agent) => {
|
||||
const hasError = agent.id in outgoingErrors;
|
||||
const result: BulkActionResult = {
|
||||
id: agent.id,
|
||||
success: !hasError,
|
||||
};
|
||||
if (hasError) {
|
||||
result.error = outgoingErrors[agent.id];
|
||||
}
|
||||
return result;
|
||||
};
|
||||
|
||||
return {
|
||||
items: givenAgents.map(getResultForAgent),
|
||||
items: errorsToResults(givenAgents, outgoingErrors, undefined, skipSuccess),
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -21,13 +21,8 @@ import { AGENT_ACTIONS_INDEX, AGENT_ACTIONS_RESULTS_INDEX } from '../../../commo
|
|||
|
||||
import { createAgentAction } from './actions';
|
||||
import type { GetAgentsOptions } from './crud';
|
||||
import {
|
||||
getAgentDocuments,
|
||||
getAgents,
|
||||
updateAgent,
|
||||
bulkUpdateAgents,
|
||||
getAgentPolicyForAgent,
|
||||
} from './crud';
|
||||
import { errorsToResults, processAgentsInBatches } from './crud';
|
||||
import { getAgentDocuments, updateAgent, bulkUpdateAgents, getAgentPolicyForAgent } from './crud';
|
||||
import { searchHitToAgent } from './helpers';
|
||||
import { getHostedPolicies, isHostedAgent } from './hosted_agent';
|
||||
|
||||
|
@ -85,6 +80,7 @@ export async function sendUpgradeAgentsActions(
|
|||
force?: boolean;
|
||||
upgradeDurationSeconds?: number;
|
||||
startTime?: string;
|
||||
batchSize?: number;
|
||||
}
|
||||
) {
|
||||
// Full set of agents
|
||||
|
@ -104,9 +100,37 @@ export async function sendUpgradeAgentsActions(
|
|||
}
|
||||
}
|
||||
} else if ('kuery' in options) {
|
||||
givenAgents = await getAgents(esClient, options);
|
||||
return await processAgentsInBatches(
|
||||
esClient,
|
||||
{
|
||||
kuery: options.kuery,
|
||||
showInactive: options.showInactive ?? false,
|
||||
batchSize: options.batchSize,
|
||||
},
|
||||
async (agents: Agent[], skipSuccess: boolean) =>
|
||||
await upgradeBatch(soClient, esClient, agents, outgoingErrors, options, skipSuccess)
|
||||
);
|
||||
}
|
||||
|
||||
return await upgradeBatch(soClient, esClient, givenAgents, outgoingErrors, options);
|
||||
}
|
||||
|
||||
async function upgradeBatch(
|
||||
soClient: SavedObjectsClientContract,
|
||||
esClient: ElasticsearchClient,
|
||||
givenAgents: Agent[],
|
||||
outgoingErrors: Record<Agent['id'], Error>,
|
||||
options: ({ agents: Agent[] } | GetAgentsOptions) & {
|
||||
version: string;
|
||||
sourceUri?: string | undefined;
|
||||
force?: boolean;
|
||||
upgradeDurationSeconds?: number;
|
||||
startTime?: string;
|
||||
},
|
||||
skipSuccess?: boolean
|
||||
): Promise<{ items: BulkActionResult[] }> {
|
||||
const errors: Record<Agent['id'], Error> = { ...outgoingErrors };
|
||||
|
||||
const hostedPolicies = await getHostedPolicies(soClient, givenAgents);
|
||||
|
||||
// results from getAgents with options.kuery '' (or even 'active:false') may include hosted agents
|
||||
|
@ -141,7 +165,7 @@ export async function sendUpgradeAgentsActions(
|
|||
agents.push(result.value);
|
||||
} else {
|
||||
const id = givenAgents[index].id;
|
||||
outgoingErrors[id] = result.reason;
|
||||
errors[id] = result.reason;
|
||||
}
|
||||
return agents;
|
||||
}, []);
|
||||
|
@ -183,22 +207,14 @@ export async function sendUpgradeAgentsActions(
|
|||
}))
|
||||
);
|
||||
|
||||
const givenOrder =
|
||||
'agentIds' in options ? options.agentIds : agentsToCheckUpgradeable.map((agent) => agent.id);
|
||||
|
||||
const orderedOut = givenOrder.map((agentId) => {
|
||||
const hasError = agentId in outgoingErrors;
|
||||
const result: BulkActionResult = {
|
||||
id: agentId,
|
||||
success: !hasError,
|
||||
};
|
||||
if (hasError) {
|
||||
result.error = outgoingErrors[agentId];
|
||||
}
|
||||
return result;
|
||||
});
|
||||
|
||||
return { items: orderedOut };
|
||||
return {
|
||||
items: errorsToResults(
|
||||
givenAgents,
|
||||
errors,
|
||||
'agentIds' in options ? options.agentIds : undefined,
|
||||
skipSuccess
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -59,6 +59,7 @@ export const PostBulkAgentUnenrollRequestSchema = {
|
|||
agents: schema.oneOf([schema.arrayOf(schema.string()), schema.string()]),
|
||||
force: schema.maybe(schema.boolean()),
|
||||
revoke: schema.maybe(schema.boolean()),
|
||||
batchSize: schema.maybe(schema.number()),
|
||||
}),
|
||||
};
|
||||
|
||||
|
@ -97,6 +98,7 @@ export const PostBulkAgentUpgradeRequestSchema = {
|
|||
},
|
||||
})
|
||||
),
|
||||
batchSize: schema.maybe(schema.number()),
|
||||
}),
|
||||
};
|
||||
|
||||
|
@ -113,6 +115,7 @@ export const PostBulkAgentReassignRequestSchema = {
|
|||
body: schema.object({
|
||||
policy_id: schema.string(),
|
||||
agents: schema.oneOf([schema.arrayOf(schema.string()), schema.string()]),
|
||||
batchSize: schema.maybe(schema.number()),
|
||||
}),
|
||||
};
|
||||
|
||||
|
|
|
@ -198,6 +198,31 @@ export default function (providerContext: FtrProviderContext) {
|
|||
});
|
||||
});
|
||||
|
||||
it('should bulk reassign multiple agents by kuery in batches', async () => {
|
||||
const { body: unenrolledBody } = await supertest
|
||||
.post(`/api/fleet/agents/bulk_reassign`)
|
||||
.set('kbn-xsrf', 'xxx')
|
||||
.send({
|
||||
agents: 'active: true',
|
||||
policy_id: 'policy2',
|
||||
batchSize: 2,
|
||||
})
|
||||
.expect(200);
|
||||
|
||||
expect(unenrolledBody).to.eql({
|
||||
agent1: { success: true },
|
||||
agent2: { success: true },
|
||||
agent3: { success: true },
|
||||
agent4: { success: true },
|
||||
});
|
||||
|
||||
const { body } = await supertest.get(`/api/fleet/agents`).set('kbn-xsrf', 'xxx');
|
||||
expect(body.total).to.eql(4);
|
||||
body.items.forEach((agent: any) => {
|
||||
expect(agent.policy_id).to.eql('policy2');
|
||||
});
|
||||
});
|
||||
|
||||
it('should throw an error for invalid policy id for bulk reassign', async () => {
|
||||
await supertest
|
||||
.post(`/api/fleet/agents/bulk_reassign`)
|
||||
|
|
|
@ -197,5 +197,27 @@ export default function (providerContext: FtrProviderContext) {
|
|||
const { body } = await supertest.get(`/api/fleet/agents`);
|
||||
expect(body.total).to.eql(0);
|
||||
});
|
||||
|
||||
it('/agents/bulk_unenroll should allow to unenroll multiple agents by kuery in batches', async () => {
|
||||
const { body: unenrolledBody } = await supertest
|
||||
.post(`/api/fleet/agents/bulk_unenroll`)
|
||||
.set('kbn-xsrf', 'xxx')
|
||||
.send({
|
||||
agents: 'active: true',
|
||||
revoke: true,
|
||||
batchSize: 2,
|
||||
})
|
||||
.expect(200);
|
||||
|
||||
expect(unenrolledBody).to.eql({
|
||||
agent1: { success: true },
|
||||
agent2: { success: true },
|
||||
agent3: { success: true },
|
||||
agent4: { success: true },
|
||||
});
|
||||
|
||||
const { body } = await supertest.get(`/api/fleet/agents`);
|
||||
expect(body.total).to.eql(0);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ export default function (providerContext: FtrProviderContext) {
|
|||
const kibanaServer = getService('kibanaServer');
|
||||
const supertestWithoutAuth = getService('supertestWithoutAuth');
|
||||
|
||||
describe('Agents upgrade', () => {
|
||||
describe('fleet_upgrade_agent', () => {
|
||||
skipIfNoDockerRegistry(providerContext);
|
||||
before(async () => {
|
||||
await esArchiver.load('x-pack/test/functional/es_archives/fleet/agents');
|
||||
|
@ -483,6 +483,52 @@ export default function (providerContext: FtrProviderContext) {
|
|||
expect(typeof agent2data.body.item.upgrade_started_at).to.be('undefined');
|
||||
});
|
||||
|
||||
it('should bulk upgrade multiple agents by kuery in batches', async () => {
|
||||
await es.update({
|
||||
id: 'agent1',
|
||||
refresh: 'wait_for',
|
||||
index: AGENTS_INDEX,
|
||||
body: {
|
||||
doc: {
|
||||
local_metadata: { elastic: { agent: { upgradeable: true, version: '0.0.0' } } },
|
||||
},
|
||||
},
|
||||
});
|
||||
await es.update({
|
||||
id: 'agent2',
|
||||
refresh: 'wait_for',
|
||||
index: AGENTS_INDEX,
|
||||
body: {
|
||||
doc: {
|
||||
local_metadata: {
|
||||
elastic: {
|
||||
agent: { upgradeable: false, version: '0.0.0' },
|
||||
},
|
||||
},
|
||||
upgrade_started_at: undefined,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const { body: unenrolledBody } = await supertest
|
||||
.post(`/api/fleet/agents/bulk_upgrade`)
|
||||
.set('kbn-xsrf', 'xxx')
|
||||
.send({
|
||||
agents: 'active:true',
|
||||
version: fleetServerVersion,
|
||||
batchSize: 2,
|
||||
})
|
||||
.expect(200);
|
||||
|
||||
expect(unenrolledBody).to.eql({
|
||||
agent4: { success: false, error: 'agent4 is not upgradeable' },
|
||||
agent3: { success: false, error: 'agent3 is not upgradeable' },
|
||||
agent2: { success: false, error: 'agent2 is not upgradeable' },
|
||||
agent1: { success: true },
|
||||
agentWithFS: { success: false, error: 'agentWithFS is not upgradeable' },
|
||||
});
|
||||
});
|
||||
|
||||
it('should not upgrade an unenrolling agent during bulk_upgrade', async () => {
|
||||
await supertest.post(`/api/fleet/agents/agent1/unenroll`).set('kbn-xsrf', 'xxx').send({
|
||||
revoke: true,
|
||||
|
|
|
@ -9,7 +9,8 @@
|
|||
"policy_id": "policy1",
|
||||
"type": "PERMANENT",
|
||||
"local_metadata": {},
|
||||
"user_provided_metadata": {}
|
||||
"user_provided_metadata": {},
|
||||
"enrolled_at": "2022-06-21T12:14:25Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -25,7 +26,8 @@
|
|||
"policy_id": "policy1",
|
||||
"type": "PERMANENT",
|
||||
"local_metadata": {},
|
||||
"user_provided_metadata": {}
|
||||
"user_provided_metadata": {},
|
||||
"enrolled_at": "2022-06-21T12:15:25Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -41,7 +43,8 @@
|
|||
"policy_id": "policy1",
|
||||
"type": "PERMANENT",
|
||||
"local_metadata": {},
|
||||
"user_provided_metadata": {}
|
||||
"user_provided_metadata": {},
|
||||
"enrolled_at": "2022-06-21T12:16:25Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -57,7 +60,8 @@
|
|||
"policy_id": "policy1",
|
||||
"type": "PERMANENT",
|
||||
"local_metadata": {},
|
||||
"user_provided_metadata": {}
|
||||
"user_provided_metadata": {},
|
||||
"enrolled_at": "2022-06-21T12:17:25Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue