[8.x] [Fleet] Use metering API in serverless (#200063) (#200663)

# Backport

This will backport the following commits from `main` to `8.x`:
- [[Fleet] Use metering API in serverless
(#200063)](https://github.com/elastic/kibana/pull/200063)

<!--- Backport version: 8.9.8 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Nicolas
Chaulet","email":"nicolas.chaulet@elastic.co"},"sourceCommit":{"committedDate":"2024-11-18T21:36:27Z","message":"[Fleet]
Use metering API in serverless
(#200063)","sha":"1fd3f412e13ed234524915cc87d058f05b840528","branchLabelMapping":{"^v9.0.0$":"main","^v8.17.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["bug","release_note:skip","Team:Fleet","v9.0.0","backport:prev-minor"],"number":200063,"url":"https://github.com/elastic/kibana/pull/200063","mergeCommit":{"message":"[Fleet]
Use metering API in serverless
(#200063)","sha":"1fd3f412e13ed234524915cc87d058f05b840528"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","labelRegex":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/200063","number":200063,"mergeCommit":{"message":"[Fleet]
Use metering API in serverless
(#200063)","sha":"1fd3f412e13ed234524915cc87d058f05b840528"}}]}]
BACKPORT-->
This commit is contained in:
Nicolas Chaulet 2024-11-19 13:14:37 -05:00 committed by GitHub
parent 5c707377dc
commit 1a6efcc3d4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 193 additions and 9 deletions

View file

@ -7,6 +7,7 @@ xpack.fleet.internal.disableILMPolicies: true
xpack.fleet.internal.activeAgentsSoftLimit: 25000
xpack.fleet.internal.onlyAllowAgentUpgradeToKnownVersions: true
xpack.fleet.internal.retrySetupOnBoot: true
xpack.fleet.internal.useMeteringApi: true
## Fine-tune the feature privileges.
xpack.features.overrides:

View file

@ -65,6 +65,7 @@ export interface FleetConfigType {
disableBundledPackagesCache?: boolean;
};
internal?: {
useMeteringApi?: boolean;
disableILMPolicies: boolean;
fleetServerStandalone: boolean;
onlyAllowAgentUpgradeToKnownVersions: boolean;

View file

@ -203,6 +203,9 @@ export const config: PluginConfigDescriptor = {
internal: schema.maybe(
schema.object({
useMeteringApi: schema.boolean({
defaultValue: false,
}),
disableILMPolicies: schema.boolean({
defaultValue: false,
}),

View file

@ -4,6 +4,7 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Dictionary } from 'lodash';
import { keyBy, keys, merge } from 'lodash';
import type { RequestHandler } from '@kbn/core/server';
import pMap from 'p-map';
@ -13,9 +14,13 @@ import { KibanaSavedObjectType } from '../../../common/types';
import type { GetDataStreamsResponse } from '../../../common/types';
import { getPackageSavedObjects } from '../../services/epm/packages/get';
import { defaultFleetErrorHandler } from '../../errors';
import type { MeteringStats } from '../../services/data_streams';
import { dataStreamService } from '../../services/data_streams';
import { getDataStreamsQueryMetadata } from './get_data_streams_query_metadata';
import type { IndicesDataStreamsStatsDataStreamsStatsItem } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { ByteSizeValue } from '@kbn/config-schema';
import { appContextService } from '../../services';
const MANAGED_BY = 'fleet';
const LEGACY_MANAGED_BY = 'ingest-manager';
@ -51,10 +56,22 @@ export const getListHandler: RequestHandler = async (context, request, response)
};
try {
const useMeteringApi = appContextService.getConfig()?.internal?.useMeteringApi;
// Get matching data streams, their stats, and package SOs
const [dataStreamsInfo, dataStreamStats, packageSavedObjects] = await Promise.all([
const [
dataStreamsInfo,
dataStreamStatsOrUndefined,
dataStreamMeteringStatsorUndefined,
packageSavedObjects,
] = await Promise.all([
dataStreamService.getAllFleetDataStreams(esClient),
dataStreamService.getAllFleetDataStreamsStats(esClient),
useMeteringApi
? undefined
: dataStreamService.getAllFleetDataStreamsStats(elasticsearch.client.asSecondaryAuthUser),
useMeteringApi
? dataStreamService.getAllFleetMeteringStats(elasticsearch.client.asSecondaryAuthUser)
: undefined,
getPackageSavedObjects(savedObjects.client),
]);
@ -67,13 +84,24 @@ export const getListHandler: RequestHandler = async (context, request, response)
const dataStreamsInfoByName = keyBy<ESDataStreamInfo>(filteredDataStreamsInfo, 'name');
const filteredDataStreamsStats = dataStreamStats.filter(
(dss) => !!dataStreamsInfoByName[dss.data_stream]
);
const dataStreamsStatsByName = keyBy(filteredDataStreamsStats, 'data_stream');
let dataStreamsStatsByName: Dictionary<IndicesDataStreamsStatsDataStreamsStatsItem> = {};
if (dataStreamStatsOrUndefined) {
const filteredDataStreamsStats = dataStreamStatsOrUndefined.filter(
(dss) => !!dataStreamsInfoByName[dss.data_stream]
);
dataStreamsStatsByName = keyBy(filteredDataStreamsStats, 'data_stream');
}
let dataStreamsMeteringStatsByName: Dictionary<MeteringStats> = {};
if (dataStreamMeteringStatsorUndefined) {
dataStreamsMeteringStatsByName = keyBy(dataStreamMeteringStatsorUndefined, 'name');
}
// Combine data stream info
const dataStreams = merge(dataStreamsInfoByName, dataStreamsStatsByName);
const dataStreams = merge(
dataStreamsInfoByName,
dataStreamsStatsByName,
dataStreamsMeteringStatsByName
);
const dataStreamNames = keys(dataStreams);
// Map package SOs
@ -132,10 +160,14 @@ export const getListHandler: RequestHandler = async (context, request, response)
package: dataStream._meta?.package?.name || '',
package_version: '',
last_activity_ms: dataStream.maximum_timestamp, // overridden below if maxIngestedTimestamp agg returns a result
size_in_bytes: dataStream.store_size_bytes,
size_in_bytes: dataStream.store_size_bytes || dataStream.size_in_bytes,
// `store_size` should be available from ES due to ?human=true flag
// but fallback to bytes just in case
size_in_bytes_formatted: dataStream.store_size || `${dataStream.store_size_bytes}b`,
size_in_bytes_formatted:
dataStream.store_size ||
new ByteSizeValue(
dataStream.store_size_bytes || dataStream.size_in_bytes || 0
).toString(),
dashboards: [],
serviceDetails: null,
};

View file

@ -10,6 +10,15 @@ import type { ElasticsearchClient } from '@kbn/core/server';
const DATA_STREAM_INDEX_PATTERN = 'logs-*-*,metrics-*-*,traces-*-*,synthetics-*-*,profiling-*';
export interface MeteringStatsResponse {
datastreams: MeteringStats[];
}
export interface MeteringStats {
name: string;
num_docs: number;
size_in_bytes: number;
}
class DataStreamService {
public async getAllFleetDataStreams(esClient: ElasticsearchClient) {
const { data_streams: dataStreamsInfo } = await esClient.indices.getDataStream({
@ -19,6 +28,18 @@ class DataStreamService {
return dataStreamsInfo;
}
public async getAllFleetMeteringStats(esClient: ElasticsearchClient) {
const res = await esClient.transport.request<MeteringStatsResponse>({
path: `/_metering/stats`,
method: 'GET',
querystring: {
human: true,
},
});
return res.datastreams ?? [];
}
public async getAllFleetDataStreamsStats(esClient: ElasticsearchClient) {
const { data_streams: dataStreamStats } = await esClient.indices.dataStreamsStats({
name: DATA_STREAM_INDEX_PATTERN,

View file

@ -17,6 +17,7 @@ export default function (ctx: FtrProviderContext) {
const svlCommonApi = ctx.getService('svlCommonApi');
const supertestWithoutAuth = ctx.getService('supertestWithoutAuth');
const svlUserManager = ctx.getService('svlUserManager');
const es = ctx.getService('es');
let roleAuthc: RoleCredentials;
describe('fleet', function () {
@ -112,5 +113,130 @@ export default function (ctx: FtrProviderContext) {
});
expect(status).toBe(200);
});
describe('datastreams API', () => {
before(async () => {
await es.index({
refresh: 'wait_for',
index: 'logs-nginx.access-default',
document: {
agent: {
name: 'docker-fleet-agent',
id: 'ef5e274d-4b53-45e6-943a-a5bcf1a6f523',
ephemeral_id: '34369a4a-4f24-4a39-9758-85fc2429d7e2',
type: 'filebeat',
version: '8.5.0',
},
nginx: {
access: {
remote_ip_list: ['127.0.0.1'],
},
},
log: {
file: {
path: '/tmp/service_logs/access.log',
},
offset: 0,
},
elastic_agent: {
id: 'ef5e274d-4b53-45e6-943a-a5bcf1a6f523',
version: '8.5.0',
snapshot: false,
},
source: {
address: '127.0.0.1',
ip: '127.0.0.1',
},
url: {
path: '/server-status',
original: '/server-status',
},
tags: ['nginx-access'],
input: {
type: 'log',
},
'@timestamp': new Date().toISOString(),
_tmp: {},
ecs: {
version: '8.11.0',
},
related: {
ip: ['127.0.0.1'],
},
data_stream: {
namespace: 'default',
type: 'logs',
dataset: 'nginx.access',
},
host: {
hostname: 'docker-fleet-agent',
os: {
kernel: '5.15.49-linuxkit',
codename: 'focal',
name: 'Ubuntu',
family: 'debian',
type: 'linux',
version: '20.04.5 LTS (Focal Fossa)',
platform: 'ubuntu',
},
containerized: false,
ip: ['172.18.0.7'],
name: 'docker-fleet-agent',
id: '66392b0697b84641af8006d87aeb89f1',
mac: ['02-42-AC-12-00-07'],
architecture: 'x86_64',
},
http: {
request: {
method: 'GET',
},
response: {
status_code: 200,
body: {
bytes: 97,
},
},
version: '1.1',
},
event: {
agent_id_status: 'verified',
ingested: '2022-12-09T10:39:40Z',
created: '2022-12-09T10:39:38.896Z',
kind: 'event',
timezone: '+00:00',
category: ['web'],
type: ['access'],
dataset: 'nginx.access',
outcome: 'success',
},
user_agent: {
original: 'curl/7.64.0',
name: 'curl',
device: {
name: 'Other',
},
version: '7.64.0',
},
},
});
});
after(async () => {
await es.transport.request({
path: `/_data_stream/logs-nginx.access-default`,
method: 'delete',
});
});
it('it works', async () => {
const { body, status } = await supertestWithoutAuth
.get('/api/fleet/data_streams')
.set(svlCommonApi.getInternalRequestHeader())
.set(roleAuthc.apiKeyHeader);
expect(status).toBe(200);
expect(body.data_streams?.[0]?.index).toBe('logs-nginx.access-default');
});
});
});
}