[Fleet] Retry transient errors in agent status API (#189563)

## Summary

Use `retryTransientEsErrors` when fetching agent status to avoid flaky
ES errors throwing a 500.

I'd love to add tests for this, but it's a bit challenging as these
methods rely heavily on an ES runtime field to populate the status
values. The amount of mocking necessary to add Jest tests for these was
getting a bit ridiculous, so maybe FTR is a better way to go?
This commit is contained in:
Kyle Pollich 2024-07-31 14:13:59 -04:00 committed by GitHub
parent cf3c4056a3
commit b40beb17a6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 227 additions and 53 deletions

View file

@ -0,0 +1,155 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { errors as EsErrors } from '@elastic/elasticsearch';
import { createAppContextStartContractMock } from '../../mocks';
import { appContextService } from '../app_context';
import { getAgentStatusForAgentPolicy } from './status';
describe('getAgentStatusForAgentPolicy', () => {
beforeEach(async () => {
appContextService.start(createAppContextStartContractMock());
});
afterEach(() => {
appContextService.stop();
});
it('should return agent status for agent policy', async () => {
const esClient = {
search: jest.fn().mockResolvedValue({
aggregations: {
status: {
buckets: [
{
key: 'online',
doc_count: 1,
},
{
key: 'error',
doc_count: 2,
},
],
},
},
}),
};
const soClient = {
find: jest.fn().mockResolvedValue({
saved_objects: [
{
id: 'agentPolicyId',
attributes: {
name: 'Policy 1',
},
},
],
}),
};
const agentPolicyId = 'agentPolicyId';
const filterKuery = 'filterKuery';
const spaceId = 'spaceId';
const result = await getAgentStatusForAgentPolicy(
esClient as any,
soClient as any,
agentPolicyId,
filterKuery,
spaceId
);
expect(result).toEqual(
expect.objectContaining({
active: 3,
all: 3,
error: 2,
events: 0,
inactive: 0,
offline: 0,
online: 1,
other: 0,
total: 3,
unenrolled: 0,
updating: 0,
})
);
expect(esClient.search).toHaveBeenCalledTimes(1);
});
it('retries on 503', async () => {
const esClient = {
search: jest
.fn()
.mockRejectedValueOnce(
new EsErrors.ResponseError({ warnings: [], meta: {} as any, statusCode: 503 })
)
.mockResolvedValue({
aggregations: {
status: {
buckets: [
{
key: 'online',
doc_count: 1,
},
{
key: 'error',
doc_count: 2,
},
],
},
},
}),
};
const soClient = {
find: jest.fn().mockResolvedValue({
saved_objects: [
{
id: 'agentPolicyId',
attributes: {
name: 'Policy 1',
},
},
],
}),
};
const agentPolicyId = 'agentPolicyId';
const filterKuery = 'filterKuery';
const spaceId = 'spaceId';
const result = await getAgentStatusForAgentPolicy(
esClient as any,
soClient as any,
agentPolicyId,
filterKuery,
spaceId
);
expect(result).toEqual(
expect.objectContaining({
active: 3,
all: 3,
error: 2,
events: 0,
inactive: 0,
offline: 0,
online: 1,
other: 0,
total: 3,
unenrolled: 0,
updating: 0,
})
);
expect(esClient.search).toHaveBeenCalledTimes(2);
});
});

View file

@ -19,10 +19,12 @@ import { agentStatusesToSummary } from '../../../common/services';
import { AGENTS_INDEX } from '../../constants';
import type { AgentStatus } from '../../types';
import { FleetUnauthorizedError } from '../../errors';
import { FleetError, FleetUnauthorizedError } from '../../errors';
import { appContextService } from '../app_context';
import { retryTransientEsErrors } from '../epm/elasticsearch/retry';
import { getAgentById, removeSOAttributes } from './crud';
import { buildAgentStatusRuntimeField } from './build_status_runtime_field';
@ -104,28 +106,34 @@ export async function getAgentStatusForAgentPolicy(
let response;
try {
response = await esClient.search<
null,
{ status: AggregationsTermsAggregateBase<AggregationsStatusTermsBucketKeys> }
>({
index: AGENTS_INDEX,
size: 0,
query,
fields: Object.keys(runtimeFields),
runtime_mappings: runtimeFields,
aggregations: {
status: {
terms: {
field: 'status',
size: Object.keys(statuses).length,
response = await retryTransientEsErrors(
() =>
esClient.search<
null,
{ status: AggregationsTermsAggregateBase<AggregationsStatusTermsBucketKeys> }
>({
index: AGENTS_INDEX,
size: 0,
query,
fields: Object.keys(runtimeFields),
runtime_mappings: runtimeFields,
aggregations: {
status: {
terms: {
field: 'status',
size: Object.keys(statuses).length,
},
},
},
},
},
ignore_unavailable: true,
});
ignore_unavailable: true,
}),
{ logger }
);
} catch (error) {
logger.debug(`Error getting agent statuses: ${error}`);
throw error;
throw new FleetError(
`Unable to retrive agent statuses for policy ${agentPolicyId} due to error: ${error.message}`
);
}
const buckets = (response?.aggregations?.status?.buckets ||
@ -153,11 +161,14 @@ export async function getAgentStatusForAgentPolicy(
total: allActive,
};
}
export async function getIncomingDataByAgentsId(
esClient: ElasticsearchClient,
agentsIds: string[],
returnDataPreview: boolean = false
) {
const logger = appContextService.getLogger();
try {
const { has_all_requested: hasAllPrivileges } = await esClient.security.hasPrivileges({
body: {
@ -169,46 +180,51 @@ export async function getIncomingDataByAgentsId(
],
},
});
if (!hasAllPrivileges) {
throw new FleetUnauthorizedError('Missing permissions to read data streams indices');
}
const searchResult = await esClient.search({
index: DATA_STREAM_INDEX_PATTERN,
allow_partial_search_results: true,
_source: returnDataPreview,
timeout: '5s',
size: returnDataPreview ? MAX_AGENT_DATA_PREVIEW_SIZE : 0,
body: {
query: {
bool: {
filter: [
{
terms: {
'agent.id': agentsIds,
},
},
{
range: {
'@timestamp': {
gte: 'now-5m',
lte: 'now',
const searchResult = await retryTransientEsErrors(
() =>
esClient.search({
index: DATA_STREAM_INDEX_PATTERN,
allow_partial_search_results: true,
_source: returnDataPreview,
timeout: '5s',
size: returnDataPreview ? MAX_AGENT_DATA_PREVIEW_SIZE : 0,
body: {
query: {
bool: {
filter: [
{
terms: {
'agent.id': agentsIds,
},
},
{
range: {
'@timestamp': {
gte: 'now-5m',
lte: 'now',
},
},
},
],
},
},
aggs: {
agent_ids: {
terms: {
field: 'agent.id',
size: agentsIds.length,
},
},
],
},
},
aggs: {
agent_ids: {
terms: {
field: 'agent.id',
size: agentsIds.length,
},
},
},
},
});
}),
{ logger }
);
if (!searchResult.aggregations?.agent_ids) {
return {
@ -231,7 +247,10 @@ export async function getIncomingDataByAgentsId(
);
return { items, dataPreview };
} catch (e) {
throw new Error(e);
} catch (error) {
logger.debug(`Error getting incoming data for agents: ${error}`);
throw new FleetError(
`Unable to retrive incoming data for agents due to error: ${error.message}`
);
}
}