mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
[Fleet] Task to publish Agent metrics (#168435)
## Summary
Closes https://github.com/elastic/ingest-dev/issues/2396
Added a new kibana task that publishes Agent metrics every minute to
data streams installed by fleet_server package.
Opened the pr for review, there are a few things to finalize, but the
core logic won't change much.
To test locally:
- Install fleet_server package 1.4.0 from
[this](https://github.com/elastic/integrations/pull/8145) pr to get the
mappings
- Start kibana locally, wait for a few minutes for the metrics task to
run (every minute)
- Go to discover, `metrics-*` index pattern, filter on
`data_stream.dataset: fleet_server.*`
- Expect data to be populated in `fleet_server.agent_status` and
`fleet_server.agent_versions` datasets.
<img width="1787" alt="image"
src="615af9df
-fe4b-4c17-8c8c-88646c403a18">
### Checklist
- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
---------
Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
2015c9f686
commit
0350f17c54
16 changed files with 737 additions and 15 deletions
|
@ -50,6 +50,9 @@ xpack.fleet.internal.registry.excludePackages: [
|
|||
xpack.fleet.packages:
|
||||
- name: apm
|
||||
version: latest
|
||||
# fleet_server package installed to publish agent metrics
|
||||
- name: fleet_server
|
||||
version: latest
|
||||
## Disable APM UI components and API calls
|
||||
xpack.apm.featureFlags.agentConfigurationAvailable: false
|
||||
xpack.apm.featureFlags.configurableIndicesAvailable: false
|
||||
|
|
|
@ -50,6 +50,10 @@ xpack.fleet.internal.registry.excludePackages: [
|
|||
'symantec',
|
||||
'cyberark',
|
||||
]
|
||||
# fleet_server package installed to publish agent metrics
|
||||
xpack.fleet.packages:
|
||||
- name: fleet_server
|
||||
version: latest
|
||||
|
||||
xpack.ml.ad.enabled: true
|
||||
xpack.ml.dfa.enabled: true
|
||||
|
|
|
@ -34,6 +34,9 @@ export default async function ({ readConfigFile }) {
|
|||
|
||||
// to be re-enabled once kibana/issues/102552 is completed
|
||||
'--xpack.reporting.enabled=false',
|
||||
|
||||
// disable fleet task that writes to metrics.fleet_server.* data streams, impacting functional tests
|
||||
`--xpack.task_manager.unsafe.exclude_task_types=${JSON.stringify(['Fleet-Metrics-Task'])}`,
|
||||
],
|
||||
},
|
||||
|
||||
|
|
|
@ -58,13 +58,13 @@ export const getAgentUsage = async (
|
|||
};
|
||||
};
|
||||
|
||||
export interface AgentPerVersion {
|
||||
version: string;
|
||||
count: number;
|
||||
}
|
||||
|
||||
export interface AgentData {
|
||||
agents_per_version: Array<
|
||||
{
|
||||
version: string;
|
||||
count: number;
|
||||
} & AgentStatus
|
||||
>;
|
||||
agents_per_version: Array<AgentPerVersion & AgentStatus>;
|
||||
agent_checkin_status: {
|
||||
error: number;
|
||||
degraded: number;
|
||||
|
|
|
@ -84,6 +84,7 @@ export const createAppContextStartContractMock = (
|
|||
config$,
|
||||
kibanaVersion: '8.99.0', // Fake version :)
|
||||
kibanaBranch: 'main',
|
||||
kibanaInstanceId: '1',
|
||||
telemetryEventsSender: createMockTelemetryEventsSender(),
|
||||
bulkActionsResolver: {} as any,
|
||||
messageSigningService: createMessageSigningServiceMock(),
|
||||
|
|
|
@ -130,6 +130,8 @@ import { FleetActionsClient, type FleetActionsClientInterface } from './services
|
|||
import type { FilesClientFactory } from './services/files/types';
|
||||
import { PolicyWatcher } from './services/agent_policy_watch';
|
||||
import { getPackageSpecTagId } from './services/epm/kibana/assets/tag_assets';
|
||||
import { FleetMetricsTask } from './services/metrics/fleet_metrics_task';
|
||||
import { fetchAgentMetrics } from './services/metrics/fetch_agent_metrics';
|
||||
|
||||
export interface FleetSetupDeps {
|
||||
security: SecurityPluginSetup;
|
||||
|
@ -167,6 +169,7 @@ export interface FleetAppContext {
|
|||
isProductionMode: PluginInitializerContext['env']['mode']['prod'];
|
||||
kibanaVersion: PluginInitializerContext['env']['packageInfo']['version'];
|
||||
kibanaBranch: PluginInitializerContext['env']['packageInfo']['branch'];
|
||||
kibanaInstanceId: PluginInitializerContext['env']['instanceUuid'];
|
||||
cloud?: CloudSetup;
|
||||
logger?: Logger;
|
||||
httpSetup?: HttpServiceSetup;
|
||||
|
@ -251,6 +254,7 @@ export class FleetPlugin
|
|||
private isProductionMode: FleetAppContext['isProductionMode'];
|
||||
private kibanaVersion: FleetAppContext['kibanaVersion'];
|
||||
private kibanaBranch: FleetAppContext['kibanaBranch'];
|
||||
private kibanaInstanceId: FleetAppContext['kibanaInstanceId'];
|
||||
private httpSetup?: HttpServiceSetup;
|
||||
private securitySetup!: SecurityPluginSetup;
|
||||
private encryptedSavedObjectsSetup?: EncryptedSavedObjectsPluginSetup;
|
||||
|
@ -259,6 +263,7 @@ export class FleetPlugin
|
|||
private bulkActionsResolver?: BulkActionsResolver;
|
||||
private fleetUsageSender?: FleetUsageSender;
|
||||
private checkDeletedFilesTask?: CheckDeletedFilesTask;
|
||||
private fleetMetricsTask?: FleetMetricsTask;
|
||||
|
||||
private agentService?: AgentService;
|
||||
private packageService?: PackageService;
|
||||
|
@ -270,6 +275,7 @@ export class FleetPlugin
|
|||
this.isProductionMode = this.initializerContext.env.mode.prod;
|
||||
this.kibanaVersion = this.initializerContext.env.packageInfo.version;
|
||||
this.kibanaBranch = this.initializerContext.env.packageInfo.branch;
|
||||
this.kibanaInstanceId = this.initializerContext.env.instanceUuid;
|
||||
this.logger = this.initializerContext.logger.get();
|
||||
this.configInitialValue = this.initializerContext.config.get();
|
||||
this.telemetryEventsSender = new TelemetryEventsSender(this.logger.get('telemetry_events'));
|
||||
|
@ -440,6 +446,10 @@ export class FleetPlugin
|
|||
this.fleetUsageSender = new FleetUsageSender(deps.taskManager, core, fetch);
|
||||
registerFleetUsageLogger(deps.taskManager, async () => fetchAgentsUsage(core, config));
|
||||
|
||||
const fetchAgents = async (abortController: AbortController) =>
|
||||
await fetchAgentMetrics(core, abortController);
|
||||
this.fleetMetricsTask = new FleetMetricsTask(deps.taskManager, fetchAgents);
|
||||
|
||||
const router: FleetRouter = core.http.createRouter<FleetRequestHandlerContext>();
|
||||
// Allow read-only users access to endpoints necessary for Integrations UI
|
||||
// Only some endpoints require superuser so we pass a raw IRouter here
|
||||
|
@ -490,6 +500,7 @@ export class FleetPlugin
|
|||
isProductionMode: this.isProductionMode,
|
||||
kibanaVersion: this.kibanaVersion,
|
||||
kibanaBranch: this.kibanaBranch,
|
||||
kibanaInstanceId: this.kibanaInstanceId,
|
||||
httpSetup: this.httpSetup,
|
||||
cloud: this.cloud,
|
||||
logger: this.logger,
|
||||
|
@ -504,6 +515,7 @@ export class FleetPlugin
|
|||
this.fleetUsageSender?.start(plugins.taskManager);
|
||||
this.checkDeletedFilesTask?.start({ taskManager: plugins.taskManager });
|
||||
startFleetUsageLogger(plugins.taskManager);
|
||||
this.fleetMetricsTask?.start(plugins.taskManager, core.elasticsearch.client.asInternalUser);
|
||||
|
||||
const logger = appContextService.getLogger();
|
||||
|
||||
|
|
|
@ -62,6 +62,7 @@ class AppContextService {
|
|||
private isProductionMode: FleetAppContext['isProductionMode'] = false;
|
||||
private kibanaVersion: FleetAppContext['kibanaVersion'] = kibanaPackageJson.version;
|
||||
private kibanaBranch: FleetAppContext['kibanaBranch'] = kibanaPackageJson.branch;
|
||||
private kibanaInstanceId: FleetAppContext['kibanaInstanceId'] = '';
|
||||
private cloud?: CloudSetup;
|
||||
private logger: Logger | undefined;
|
||||
private httpSetup?: HttpServiceSetup;
|
||||
|
@ -86,6 +87,7 @@ class AppContextService {
|
|||
this.logger = appContext.logger;
|
||||
this.kibanaVersion = appContext.kibanaVersion;
|
||||
this.kibanaBranch = appContext.kibanaBranch;
|
||||
this.kibanaInstanceId = appContext.kibanaInstanceId;
|
||||
this.httpSetup = appContext.httpSetup;
|
||||
this.telemetryEventsSender = appContext.telemetryEventsSender;
|
||||
this.savedObjectsTagging = appContext.savedObjectsTagging;
|
||||
|
@ -209,6 +211,10 @@ class AppContextService {
|
|||
return this.kibanaBranch;
|
||||
}
|
||||
|
||||
public getKibanaInstanceId() {
|
||||
return this.kibanaInstanceId;
|
||||
}
|
||||
|
||||
public addExternalCallback(type: ExternalCallback[0], callback: ExternalCallback[1]) {
|
||||
if (!this.externalCallbacks.has(type)) {
|
||||
this.externalCallbacks.set(type, new Set());
|
||||
|
|
|
@ -22,12 +22,8 @@ export function getFilteredSearchPackages() {
|
|||
}
|
||||
|
||||
export function getFilteredInstallPackages() {
|
||||
const shouldFilterFleetServer = appContextService.getConfig()?.internal?.fleetServerStandalone;
|
||||
const filtered: string[] = [];
|
||||
// Do not allow to install Fleet server integration if configured to use standalone fleet server
|
||||
if (shouldFilterFleetServer) {
|
||||
filtered.push(FLEET_SERVER_PACKAGE);
|
||||
}
|
||||
|
||||
const excludePackages = appContextService.getConfig()?.internal?.registry?.excludePackages ?? [];
|
||||
|
||||
return filtered.concat(excludePackages);
|
||||
|
|
|
@ -332,7 +332,7 @@ describe('install', () => {
|
|||
expect(response.status).toEqual('already_installed');
|
||||
});
|
||||
|
||||
it('should not allow to install fleet_server if internal.fleetServerStandalone is configured', async () => {
|
||||
it('should allow to install fleet_server if internal.fleetServerStandalone is configured', async () => {
|
||||
jest.mocked(appContextService.getConfig).mockReturnValueOnce({
|
||||
internal: {
|
||||
fleetServerStandalone: true,
|
||||
|
@ -347,8 +347,7 @@ describe('install', () => {
|
|||
esClient: {} as ElasticsearchClient,
|
||||
});
|
||||
|
||||
expect(response.error).toBeDefined();
|
||||
expect(response.error?.message).toMatch(/fleet_server installation is not authorized/);
|
||||
expect(response.status).toEqual('installed');
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { ElasticsearchClientMock } from '@kbn/core/server/mocks';
|
||||
import { coreMock } from '@kbn/core/server/mocks';
|
||||
import type { CoreSetup } from '@kbn/core/server';
|
||||
|
||||
import { fetchAgentMetrics } from './fetch_agent_metrics';
|
||||
|
||||
jest.mock('../../collectors/agent_collectors', () => {
|
||||
return {
|
||||
getAgentUsage: jest.fn().mockResolvedValue({}),
|
||||
};
|
||||
});
|
||||
|
||||
describe('fetchAgentMetrics', () => {
|
||||
const { createSetup: coreSetupMock } = coreMock;
|
||||
const abortController = new AbortController();
|
||||
let mockCore: CoreSetup;
|
||||
let esClient: ElasticsearchClientMock;
|
||||
|
||||
beforeEach(async () => {
|
||||
mockCore = coreSetupMock();
|
||||
const [{ elasticsearch }] = await mockCore.getStartServices();
|
||||
esClient = elasticsearch.client.asInternalUser as ElasticsearchClientMock;
|
||||
});
|
||||
|
||||
it('should fetch agent metrics', async () => {
|
||||
esClient.search.mockResolvedValue({
|
||||
took: 5,
|
||||
timed_out: false,
|
||||
_shards: {
|
||||
total: 1,
|
||||
successful: 1,
|
||||
skipped: 0,
|
||||
failed: 0,
|
||||
},
|
||||
hits: {
|
||||
total: {
|
||||
value: 0,
|
||||
relation: 'eq',
|
||||
},
|
||||
hits: [],
|
||||
},
|
||||
aggregations: {
|
||||
versions: {
|
||||
buckets: [
|
||||
{
|
||||
key: '8.12.0',
|
||||
doc_count: 1,
|
||||
},
|
||||
],
|
||||
},
|
||||
upgrade_details: {
|
||||
buckets: [
|
||||
{
|
||||
key: 'UPG_REQUESTED',
|
||||
doc_count: 1,
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const result = await fetchAgentMetrics(mockCore, abortController);
|
||||
|
||||
expect(result).toEqual({
|
||||
agents: {},
|
||||
agents_per_version: [
|
||||
{
|
||||
version: '8.12.0',
|
||||
count: 1,
|
||||
},
|
||||
],
|
||||
upgrading_step: {
|
||||
downloading: 0,
|
||||
extracting: 0,
|
||||
failed: 0,
|
||||
replacing: 0,
|
||||
requested: 1,
|
||||
restarting: 0,
|
||||
rollback: 0,
|
||||
scheduled: 0,
|
||||
watching: 0,
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,180 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
|
||||
import type { CoreSetup } from '@kbn/core/server';
|
||||
|
||||
import { AGENTS_INDEX } from '../../../common';
|
||||
|
||||
import type { AgentPerVersion, AgentUsage } from '../../collectors/agent_collectors';
|
||||
import { getAgentUsage } from '../../collectors/agent_collectors';
|
||||
import { getInternalClients } from '../../collectors/helpers';
|
||||
import { appContextService } from '../app_context';
|
||||
import { retryTransientEsErrors } from '../epm/elasticsearch/retry';
|
||||
|
||||
export interface AgentMetrics {
|
||||
agents: AgentUsage;
|
||||
agents_per_version: AgentPerVersion[];
|
||||
upgrading_step: UpgradingSteps;
|
||||
}
|
||||
|
||||
export interface UpgradingSteps {
|
||||
requested: number;
|
||||
scheduled: number;
|
||||
downloading: number;
|
||||
extracting: number;
|
||||
replacing: number;
|
||||
restarting: number;
|
||||
watching: number;
|
||||
rollback: number;
|
||||
failed: number;
|
||||
}
|
||||
|
||||
export const fetchAgentMetrics = async (
|
||||
core: CoreSetup,
|
||||
abortController: AbortController
|
||||
): Promise<AgentMetrics | undefined> => {
|
||||
const [soClient, esClient] = await getInternalClients(core);
|
||||
if (!soClient || !esClient) {
|
||||
return;
|
||||
}
|
||||
const usage = {
|
||||
agents: await getAgentUsage(soClient, esClient),
|
||||
agents_per_version: await getAgentsPerVersion(esClient, abortController),
|
||||
upgrading_step: await getUpgradingSteps(esClient, abortController),
|
||||
};
|
||||
return usage;
|
||||
};
|
||||
|
||||
export const getAgentsPerVersion = async (
|
||||
esClient: ElasticsearchClient,
|
||||
abortController: AbortController
|
||||
): Promise<AgentPerVersion[]> => {
|
||||
try {
|
||||
const response = await retryTransientEsErrors(() =>
|
||||
esClient.search(
|
||||
{
|
||||
index: AGENTS_INDEX,
|
||||
query: {
|
||||
bool: {
|
||||
filter: [
|
||||
{
|
||||
term: {
|
||||
active: 'true',
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
size: 0,
|
||||
aggs: {
|
||||
versions: {
|
||||
terms: { field: 'agent.version' },
|
||||
},
|
||||
},
|
||||
},
|
||||
{ signal: abortController.signal }
|
||||
)
|
||||
);
|
||||
return ((response?.aggregations?.versions as any).buckets ?? []).map((bucket: any) => ({
|
||||
version: bucket.key,
|
||||
count: bucket.doc_count,
|
||||
}));
|
||||
} catch (error) {
|
||||
if (error.statusCode === 404) {
|
||||
appContextService.getLogger().debug('Index .fleet-agents does not exist yet.');
|
||||
} else {
|
||||
throw error;
|
||||
}
|
||||
return [];
|
||||
}
|
||||
};
|
||||
|
||||
export const getUpgradingSteps = async (
|
||||
esClient: ElasticsearchClient,
|
||||
abortController: AbortController
|
||||
): Promise<UpgradingSteps> => {
|
||||
const upgradingSteps = {
|
||||
requested: 0,
|
||||
scheduled: 0,
|
||||
downloading: 0,
|
||||
extracting: 0,
|
||||
replacing: 0,
|
||||
restarting: 0,
|
||||
watching: 0,
|
||||
rollback: 0,
|
||||
failed: 0,
|
||||
};
|
||||
try {
|
||||
const response = await retryTransientEsErrors(() =>
|
||||
esClient.search(
|
||||
{
|
||||
index: AGENTS_INDEX,
|
||||
query: {
|
||||
bool: {
|
||||
filter: [
|
||||
{
|
||||
term: {
|
||||
active: 'true',
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
size: 0,
|
||||
aggs: {
|
||||
upgrade_details: {
|
||||
terms: { field: 'upgrade_details.state' },
|
||||
},
|
||||
},
|
||||
},
|
||||
{ signal: abortController.signal }
|
||||
)
|
||||
);
|
||||
((response?.aggregations?.upgrade_details as any).buckets ?? []).forEach((bucket: any) => {
|
||||
switch (bucket.key) {
|
||||
case 'UPG_REQUESTED':
|
||||
upgradingSteps.requested = bucket.doc_count;
|
||||
break;
|
||||
case 'UPG_SCHEDULED':
|
||||
upgradingSteps.scheduled = bucket.doc_count;
|
||||
break;
|
||||
case 'UPG_DOWNLOADING':
|
||||
upgradingSteps.downloading = bucket.doc_count;
|
||||
break;
|
||||
case 'UPG_EXTRACTING':
|
||||
upgradingSteps.extracting = bucket.doc_count;
|
||||
break;
|
||||
case 'UPG_REPLACING':
|
||||
upgradingSteps.replacing = bucket.doc_count;
|
||||
break;
|
||||
case 'UPG_RESTARTING':
|
||||
upgradingSteps.restarting = bucket.doc_count;
|
||||
break;
|
||||
case 'UPG_WATCHING':
|
||||
upgradingSteps.watching = bucket.doc_count;
|
||||
break;
|
||||
case 'UPG_ROLLBACK':
|
||||
upgradingSteps.rollback = bucket.doc_count;
|
||||
break;
|
||||
case 'UPG_FAILED':
|
||||
upgradingSteps.failed = bucket.doc_count;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
});
|
||||
return upgradingSteps;
|
||||
} catch (error) {
|
||||
if (error.statusCode === 404) {
|
||||
appContextService.getLogger().debug('Index .fleet-agents does not exist yet.');
|
||||
} else {
|
||||
throw error;
|
||||
}
|
||||
return upgradingSteps;
|
||||
}
|
||||
};
|
|
@ -0,0 +1,218 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { coreMock } from '@kbn/core/server/mocks';
|
||||
import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks';
|
||||
import type { TaskManagerSetupContract } from '@kbn/task-manager-plugin/server';
|
||||
import { TaskStatus } from '@kbn/task-manager-plugin/server';
|
||||
import type { CoreSetup } from '@kbn/core/server';
|
||||
import type { ElasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
|
||||
|
||||
import { appContextService } from '../app_context';
|
||||
import { createAppContextStartContractMock } from '../../mocks';
|
||||
|
||||
import { FleetMetricsTask, TYPE, VERSION } from './fleet_metrics_task';
|
||||
|
||||
const MOCK_TASK_INSTANCE = {
|
||||
id: `${TYPE}:${VERSION}`,
|
||||
runAt: new Date(),
|
||||
attempts: 0,
|
||||
ownerId: '',
|
||||
status: TaskStatus.Running,
|
||||
startedAt: new Date(),
|
||||
scheduledAt: new Date(),
|
||||
retryAt: new Date(),
|
||||
params: {},
|
||||
state: {},
|
||||
taskType: TYPE,
|
||||
};
|
||||
|
||||
describe('fleet metrics task', () => {
|
||||
const { createSetup: coreSetupMock } = coreMock;
|
||||
const { createSetup: tmSetupMock, createStart: tmStartMock } = taskManagerMock;
|
||||
|
||||
let mockContract: ReturnType<typeof createAppContextStartContractMock>;
|
||||
let mockTask: FleetMetricsTask;
|
||||
let mockCore: CoreSetup;
|
||||
let mockTaskManagerSetup: jest.Mocked<TaskManagerSetupContract>;
|
||||
let mockFetchAgentMetrics: jest.Mock;
|
||||
|
||||
let esClient: ElasticsearchClientMock;
|
||||
beforeEach(async () => {
|
||||
mockContract = createAppContextStartContractMock();
|
||||
appContextService.start(mockContract);
|
||||
mockCore = coreSetupMock();
|
||||
mockTaskManagerSetup = tmSetupMock();
|
||||
const [{ elasticsearch }] = await mockCore.getStartServices();
|
||||
esClient = elasticsearch.client.asInternalUser as ElasticsearchClientMock;
|
||||
mockFetchAgentMetrics = jest.fn();
|
||||
mockTask = new FleetMetricsTask(mockTaskManagerSetup, async () => mockFetchAgentMetrics());
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('task lifecycle', () => {
|
||||
it('should create task', () => {
|
||||
expect(mockTask).toBeInstanceOf(FleetMetricsTask);
|
||||
});
|
||||
|
||||
it('should register task', () => {
|
||||
expect(mockTaskManagerSetup.registerTaskDefinitions).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should schedule task', async () => {
|
||||
const mockTaskManagerStart = tmStartMock();
|
||||
await mockTask.start(mockTaskManagerStart, esClient);
|
||||
expect(mockTaskManagerStart.ensureScheduled).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('task logic', () => {
|
||||
beforeEach(async () => {
|
||||
esClient.info.mockResolvedValue({
|
||||
cluster_uuid: 'cluster1',
|
||||
} as any);
|
||||
|
||||
mockFetchAgentMetrics.mockResolvedValue({
|
||||
agents: {
|
||||
total_all_statuses: 10,
|
||||
total_enrolled: 5,
|
||||
unenrolled: 5,
|
||||
healthy: 1,
|
||||
offline: 1,
|
||||
updating: 1,
|
||||
unhealthy: 1,
|
||||
inactive: 1,
|
||||
},
|
||||
upgrading_step: {
|
||||
scheduled: 1,
|
||||
requested: 1,
|
||||
},
|
||||
agents_per_version: [
|
||||
{
|
||||
version: '8.12.0',
|
||||
count: 3,
|
||||
},
|
||||
{
|
||||
version: '8.11.0',
|
||||
count: 2,
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.resetAllMocks();
|
||||
});
|
||||
|
||||
const runTask = async (taskInstance = MOCK_TASK_INSTANCE) => {
|
||||
const mockTaskManagerStart = tmStartMock();
|
||||
await mockTask.start(mockTaskManagerStart, esClient);
|
||||
const createTaskRunner =
|
||||
mockTaskManagerSetup.registerTaskDefinitions.mock.calls[0][0][TYPE].createTaskRunner;
|
||||
const taskRunner = createTaskRunner({ taskInstance });
|
||||
return taskRunner.run();
|
||||
};
|
||||
|
||||
it('should publish agent metrics', async () => {
|
||||
await runTask();
|
||||
|
||||
expect(esClient.index).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
index: 'metrics-fleet_server.agent_status-default',
|
||||
body: expect.objectContaining({
|
||||
'@timestamp': expect.any(String),
|
||||
data_stream: {
|
||||
dataset: 'fleet_server.agent_status',
|
||||
type: 'metrics',
|
||||
namespace: 'default',
|
||||
},
|
||||
cluster: { id: 'cluster1' },
|
||||
agent: { id: '1', version: '8.99.0', type: 'kibana' },
|
||||
fleet: {
|
||||
agents: {
|
||||
total: 10,
|
||||
enrolled: 5,
|
||||
unenrolled: 5,
|
||||
healthy: 1,
|
||||
offline: 1,
|
||||
updating: 1,
|
||||
unhealthy: 1,
|
||||
inactive: 1,
|
||||
upgrading_step: {
|
||||
scheduled: 1,
|
||||
requested: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
})
|
||||
);
|
||||
|
||||
expect(esClient.bulk).toHaveBeenCalledWith({
|
||||
index: 'metrics-fleet_server.agent_versions-default',
|
||||
operations: [
|
||||
{ create: {} },
|
||||
{
|
||||
'@timestamp': expect.any(String),
|
||||
agent: { id: '1', type: 'kibana', version: '8.99.0' },
|
||||
cluster: { id: 'cluster1' },
|
||||
data_stream: {
|
||||
dataset: 'fleet_server.agent_versions',
|
||||
namespace: 'default',
|
||||
type: 'metrics',
|
||||
},
|
||||
fleet: { agent: { count: 3, version: '8.12.0' } },
|
||||
},
|
||||
{ create: {} },
|
||||
{
|
||||
'@timestamp': expect.any(String),
|
||||
agent: { id: '1', type: 'kibana', version: '8.99.0' },
|
||||
cluster: { id: 'cluster1' },
|
||||
data_stream: {
|
||||
dataset: 'fleet_server.agent_versions',
|
||||
namespace: 'default',
|
||||
type: 'metrics',
|
||||
},
|
||||
fleet: { agent: { count: 2, version: '8.11.0' } },
|
||||
},
|
||||
],
|
||||
refresh: true,
|
||||
});
|
||||
});
|
||||
|
||||
it('should log errors from bulk create', async () => {
|
||||
esClient.bulk.mockResolvedValue({
|
||||
errors: true,
|
||||
items: [
|
||||
{
|
||||
create: {
|
||||
error: {
|
||||
reason: 'error from es',
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
create: {
|
||||
error: {
|
||||
reason: 'error from es',
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
} as any);
|
||||
|
||||
await runTask();
|
||||
|
||||
expect(appContextService.getLogger().warn).toHaveBeenCalledWith(
|
||||
'Error occurred while publishing Fleet metrics: Error: error from es'
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,204 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
import type {
|
||||
ConcreteTaskInstance,
|
||||
TaskManagerStartContract,
|
||||
TaskManagerSetupContract,
|
||||
} from '@kbn/task-manager-plugin/server';
|
||||
import { throwUnrecoverableError } from '@kbn/task-manager-plugin/server';
|
||||
import type { ElasticsearchClient } from '@kbn/core/server';
|
||||
import { withSpan } from '@kbn/apm-utils';
|
||||
|
||||
import { uniq } from 'lodash';
|
||||
|
||||
import { appContextService } from '../app_context';
|
||||
|
||||
import type { AgentMetrics } from './fetch_agent_metrics';
|
||||
|
||||
export const TYPE = 'Fleet-Metrics-Task';
|
||||
export const VERSION = '1.0.0';
|
||||
const TITLE = 'Fleet Metrics Task';
|
||||
const TIMEOUT = '1m';
|
||||
const SCOPE = ['fleet'];
|
||||
const INTERVAL = '1m';
|
||||
|
||||
export class FleetMetricsTask {
|
||||
private taskManager?: TaskManagerStartContract;
|
||||
private wasStarted: boolean = false;
|
||||
private abortController = new AbortController();
|
||||
private esClient?: ElasticsearchClient;
|
||||
|
||||
constructor(
|
||||
taskManager: TaskManagerSetupContract,
|
||||
fetchAgentMetrics: (abortController: AbortController) => Promise<AgentMetrics | undefined>
|
||||
) {
|
||||
taskManager.registerTaskDefinitions({
|
||||
[TYPE]: {
|
||||
title: TITLE,
|
||||
timeout: TIMEOUT,
|
||||
maxAttempts: 1,
|
||||
createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => {
|
||||
return {
|
||||
run: async () => {
|
||||
return withSpan({ name: TYPE, type: 'metrics' }, () =>
|
||||
this.runTask(taskInstance, () => fetchAgentMetrics(this.abortController))
|
||||
);
|
||||
},
|
||||
|
||||
cancel: async () => {
|
||||
this.abortController.abort('task timed out');
|
||||
},
|
||||
};
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
private runTask = async (
|
||||
taskInstance: ConcreteTaskInstance,
|
||||
fetchAgentMetrics: () => Promise<AgentMetrics | undefined>
|
||||
) => {
|
||||
if (!this.wasStarted) {
|
||||
appContextService.getLogger().debug('[runTask()] Aborted. Task not started yet');
|
||||
return;
|
||||
}
|
||||
// Check that this task is current
|
||||
if (taskInstance.id !== this.taskId) {
|
||||
throwUnrecoverableError(new Error('Outdated task version for task: ' + taskInstance.id));
|
||||
return;
|
||||
}
|
||||
if (!this.esClient) {
|
||||
appContextService.getLogger().info('esClient not set, skipping Fleet metrics task');
|
||||
return;
|
||||
}
|
||||
appContextService.getLogger().info('Running Fleet metrics task');
|
||||
|
||||
try {
|
||||
const agentMetrics = await fetchAgentMetrics();
|
||||
if (!agentMetrics) {
|
||||
return;
|
||||
}
|
||||
const { agents_per_version: agentsPerVersion, agents } = agentMetrics;
|
||||
const clusterInfo = await this.esClient.info();
|
||||
const getCommonFields = (dataset: string) => {
|
||||
return {
|
||||
data_stream: {
|
||||
dataset,
|
||||
type: 'metrics',
|
||||
namespace: 'default',
|
||||
},
|
||||
agent: {
|
||||
id: appContextService.getKibanaInstanceId(),
|
||||
version: appContextService.getKibanaVersion(),
|
||||
type: 'kibana',
|
||||
},
|
||||
cluster: {
|
||||
id: clusterInfo?.cluster_uuid ?? '',
|
||||
},
|
||||
};
|
||||
};
|
||||
const agentStatusDoc = {
|
||||
'@timestamp': new Date().toISOString(),
|
||||
...getCommonFields('fleet_server.agent_status'),
|
||||
fleet: {
|
||||
agents: {
|
||||
total: agents.total_all_statuses,
|
||||
enrolled: agents.total_enrolled,
|
||||
unenrolled: agents.unenrolled,
|
||||
healthy: agents.healthy,
|
||||
offline: agents.offline,
|
||||
updating: agents.updating,
|
||||
unhealthy: agents.unhealthy,
|
||||
inactive: agents.inactive,
|
||||
upgrading_step: agentMetrics.upgrading_step,
|
||||
},
|
||||
},
|
||||
};
|
||||
appContextService
|
||||
.getLogger()
|
||||
.trace('Agent status metrics: ' + JSON.stringify(agentStatusDoc));
|
||||
await this.esClient.index({
|
||||
index: 'metrics-fleet_server.agent_status-default',
|
||||
body: agentStatusDoc,
|
||||
refresh: true,
|
||||
});
|
||||
|
||||
if (agentsPerVersion.length === 0) return;
|
||||
|
||||
const operations = [];
|
||||
|
||||
for (const byVersion of agentsPerVersion) {
|
||||
const agentVersionsDoc = {
|
||||
'@timestamp': new Date().toISOString(),
|
||||
...getCommonFields('fleet_server.agent_versions'),
|
||||
fleet: {
|
||||
agent: {
|
||||
version: byVersion.version,
|
||||
count: byVersion.count,
|
||||
},
|
||||
},
|
||||
};
|
||||
operations.push(
|
||||
{
|
||||
create: {},
|
||||
},
|
||||
agentVersionsDoc
|
||||
);
|
||||
}
|
||||
|
||||
appContextService.getLogger().trace('Agent versions metrics: ' + JSON.stringify(operations));
|
||||
const resp = await this.esClient.bulk({
|
||||
operations,
|
||||
refresh: true,
|
||||
index: 'metrics-fleet_server.agent_versions-default',
|
||||
});
|
||||
if (resp.errors) {
|
||||
const errors = uniq(
|
||||
resp.items
|
||||
.filter((item) => !!item.create?.error)
|
||||
.map((item) => item.create?.error?.reason ?? '')
|
||||
);
|
||||
throw new Error(errors.join(', '));
|
||||
}
|
||||
} catch (error) {
|
||||
appContextService.getLogger().warn('Error occurred while publishing Fleet metrics: ' + error);
|
||||
}
|
||||
};
|
||||
|
||||
private get taskId() {
|
||||
return `${TYPE}:${VERSION}`;
|
||||
}
|
||||
|
||||
public async start(taskManager: TaskManagerStartContract, esClient: ElasticsearchClient) {
|
||||
this.taskManager = taskManager;
|
||||
this.esClient = esClient;
|
||||
|
||||
if (!taskManager) {
|
||||
appContextService.getLogger().error('missing required service during start');
|
||||
return;
|
||||
}
|
||||
|
||||
this.wasStarted = true;
|
||||
|
||||
try {
|
||||
appContextService.getLogger().info(`Task ${this.taskId} scheduled with interval 1h`);
|
||||
|
||||
await this.taskManager.ensureScheduled({
|
||||
id: this.taskId,
|
||||
taskType: TYPE,
|
||||
schedule: {
|
||||
interval: INTERVAL,
|
||||
},
|
||||
scope: SCOPE,
|
||||
state: {},
|
||||
params: {},
|
||||
});
|
||||
} catch (e) {
|
||||
appContextService.getLogger().error(`Error scheduling task, received error: ${e}`);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -51,6 +51,8 @@ export default async function ({ readConfigFile }) {
|
|||
'--xpack.discoverEnhanced.actions.exploreDataInContextMenu.enabled=true',
|
||||
'--savedObjects.maxImportPayloadBytes=10485760', // for OSS test management/_import_objects,
|
||||
'--savedObjects.allowHttpApiAccess=false', // override default to not allow hiddenFromHttpApis saved objects access to the http APIs see https://github.com/elastic/dev/issues/2200
|
||||
// disable fleet task that writes to metrics.fleet_server.* data streams, impacting functional tests
|
||||
`--xpack.task_manager.unsafe.exclude_task_types=${JSON.stringify(['Fleet-Metrics-Task'])}`,
|
||||
],
|
||||
},
|
||||
uiSettings: {
|
||||
|
|
|
@ -45,6 +45,7 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
.filter((t: string) => !TEST_TYPES.includes(t))
|
||||
.sort();
|
||||
expect(types).to.eql([
|
||||
'Fleet-Metrics-Task',
|
||||
'Fleet-Usage-Logger',
|
||||
'Fleet-Usage-Sender',
|
||||
'ML:saved-objects-sync',
|
||||
|
|
|
@ -20,7 +20,8 @@ export default ({ getPageObject, getService }: FtrProviderContext) => {
|
|||
const cases = getService('cases');
|
||||
const find = getService('find');
|
||||
|
||||
describe('Cases persistable attachments', function () {
|
||||
// failing test https://github.com/elastic/kibana/issues/166592
|
||||
describe.skip('Cases persistable attachments', function () {
|
||||
// security_exception: action [indices:data/write/delete/byquery] is unauthorized for user [elastic] with effective roles [superuser] on restricted indices [.kibana_alerting_cases], this action is granted by the index privileges [delete,write,all]
|
||||
this.tags(['failsOnMKI']);
|
||||
describe('lens visualization', () => {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue