[streams] lifecycle - ingestion and total docs metadata (#210301)

Adds avg ingestion per day, total doc count and ingestion rate graph to
the lifecycle view.

We use the dataset quality plugin to compute these values. I've added a
query string to optionally retrieve the creation date of a data stream
in the `data_streams/stats` endpoint.

![Screenshot 2025-02-11 at 17 39
13](https://github.com/user-attachments/assets/9242ecbc-ebee-43da-b742-fbc0d0997bc2)

-----

@elastic/obs-ux-logs-team the change in dataset quality involves the
optional retrieval of the data streams creation date in the `/stats`
endpoint. There are other ways in dataset quality to get these
informations but they rely on queries to compute the data. In our case
these queries will always be unbounded and using the `/stats` would be
more efficient as it relies on cluster state.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Kevin Lacabane 2025-02-17 13:35:20 +01:00 committed by GitHub
parent 0adce7a3db
commit 95b3f6e14d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 493 additions and 41 deletions

View file

@ -32,6 +32,7 @@ export const dataStreamStatRt = rt.intersection([
lastActivity: rt.number,
integration: rt.string,
totalDocs: rt.number,
creationDate: rt.number,
}),
]);

View file

@ -9,8 +9,12 @@ import type { PluginInitializerContext } from '@kbn/core/public';
import { DatasetQualityConfig } from '../common/plugin_config';
import { DatasetQualityPlugin } from './plugin';
export type { DataStreamStatServiceResponse } from '../common/data_streams_stats';
export type { DatasetQualityPluginSetup, DatasetQualityPluginStart } from './types';
export { DataStreamsStatsService } from './services/data_streams_stats/data_streams_stats_service';
export type { IDataStreamsStatsClient } from './services/data_streams_stats/types';
export function plugin(context: PluginInitializerContext<DatasetQualityConfig>) {
return new DatasetQualityPlugin(context);
}

View file

@ -41,12 +41,15 @@ export class DataStreamsStatsClient implements IDataStreamsStatsClient {
public async getDataStreamsStats(
params: GetDataStreamsStatsQuery
): Promise<DataStreamStatServiceResponse> {
const types = params.types.length === 0 ? KNOWN_TYPES : params.types;
const types =
'types' in params
? rison.encodeArray(params.types.length === 0 ? KNOWN_TYPES : params.types)
: undefined;
const response = await this.http
.get<GetDataStreamsStatsResponse>('/internal/dataset_quality/data_streams/stats', {
query: {
...params,
types: rison.encodeArray(types),
types,
},
})
.catch((error) => {

View file

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ElasticsearchClient } from '@kbn/core/server';
import { dataStreamService } from '../../services';
export async function getDataStreamsCreationDate({
esClient,
dataStreams,
}: {
esClient: ElasticsearchClient;
dataStreams: string[];
}) {
const matchingStreams = await dataStreamService.getMatchingDataStreams(esClient, dataStreams);
const streamByIndex = matchingStreams.reduce((acc, { name, indices }) => {
if (indices[0]) acc[indices[0].index_name] = name;
return acc;
}, {} as Record<string, string>);
const indices = Object.keys(streamByIndex);
if (indices.length === 0) {
return {};
}
// While _cat api is not recommended for application use this is the only way
// to retrieve the creation date in serverless for now. We should change this
// once a proper approach exists (see elastic/elasticsearch-serverless#3010)
const catIndices = await esClient.cat.indices({
index: indices,
h: ['creation.date', 'index'],
format: 'json',
});
return catIndices.reduce((acc, index) => {
const creationDate = index['creation.date'];
const indexName = index.index!;
const stream = streamByIndex[indexName];
acc[stream] = creationDate ? Number(creationDate) : undefined;
return acc;
}, {} as Record<string, number | undefined>);
}

View file

@ -6,6 +6,7 @@
*/
import * as t from 'io-ts';
import { toBooleanRt } from '@kbn/io-ts-utils';
import {
CheckAndLoadIntegrationResponse,
DataStreamDetails,
@ -38,15 +39,14 @@ import { getDegradedFieldValues } from './get_degraded_field_values';
import { getDegradedFields } from './get_degraded_fields';
import { getNonAggregatableDataStreams } from './get_non_aggregatable_data_streams';
import { updateFieldLimit } from './update_field_limit';
import { getDataStreamsCreationDate } from './get_data_streams_creation_date';
const statsRoute = createDatasetQualityServerRoute({
endpoint: 'GET /internal/dataset_quality/data_streams/stats',
params: t.type({
query: t.intersection([
t.type({ types: typesRt }),
t.partial({
datasetQuery: t.string,
}),
t.union([t.type({ types: typesRt }), t.type({ datasetQuery: t.string })]),
t.partial({ includeCreationDate: toBooleanRt }),
]),
}),
options: {
@ -81,15 +81,25 @@ const statsRoute = createDatasetQualityServerRoute({
return dataStream.userPrivileges.canMonitor;
});
const dataStreamsStats = isServerless
? await getDataStreamsMeteringStats({
esClient: esClientAsSecondaryAuthUser,
dataStreams: privilegedDataStreams.map((stream) => stream.name),
})
: await getDataStreamsStats({
esClient,
dataStreams: privilegedDataStreams.map((stream) => stream.name),
});
const dataStreamsNames = privilegedDataStreams.map((stream) => stream.name);
const [dataStreamsStats, dataStreamsCreationDate] = await Promise.all([
isServerless
? getDataStreamsMeteringStats({
esClient: esClientAsSecondaryAuthUser,
dataStreams: dataStreamsNames,
})
: getDataStreamsStats({
esClient,
dataStreams: dataStreamsNames,
}),
params.query.includeCreationDate
? getDataStreamsCreationDate({
esClient: esClientAsSecondaryAuthUser,
dataStreams: dataStreamsNames,
})
: ({} as Record<string, number | undefined>),
]);
return {
datasetUserPrivileges,
@ -97,6 +107,7 @@ const statsRoute = createDatasetQualityServerRoute({
dataStream.size = dataStreamsStats[dataStream.name]?.size;
dataStream.sizeBytes = dataStreamsStats[dataStream.name]?.sizeBytes;
dataStream.totalDocs = dataStreamsStats[dataStream.name]?.totalDocs;
dataStream.creationDate = dataStreamsCreationDate[dataStream.name];
return dataStream;
}),

View file

@ -15,7 +15,7 @@ import { reduceAsyncChunks } from '../utils/reduce_async_chunks';
class DataStreamService {
public async getMatchingDataStreams(
esClient: ElasticsearchClient,
datasetName: string
datasetName: string | string[]
): Promise<IndicesDataStream[]> {
try {
const { data_streams: dataStreamsInfo } = await esClient.indices.getDataStream({

View file

@ -15,6 +15,7 @@ import type { SharePublicStart } from '@kbn/share-plugin/public/plugin';
import { NavigationPublicStart } from '@kbn/navigation-plugin/public/types';
import type { SavedObjectTaggingPluginStart } from '@kbn/saved-objects-tagging-plugin/public';
import { fieldsMetadataPluginPublicMock } from '@kbn/fields-metadata-plugin/public/mocks';
import { DataStreamsStatsClient } from '@kbn/dataset-quality-plugin/public/services/data_streams_stats/data_streams_stats_client';
import type { StreamsAppKibanaContext } from '../public/hooks/use_kibana';
export function getMockStreamsAppContext(): StreamsAppKibanaContext {
@ -38,7 +39,7 @@ export function getMockStreamsAppContext(): StreamsAppKibanaContext {
},
},
services: {
query: jest.fn(),
dataStreamsClient: Promise.resolve({} as unknown as DataStreamsStatsClient),
},
isServerless: false,
};

View file

@ -19,6 +19,7 @@
"savedObjectsTagging",
"navigation",
"fieldsMetadata",
"datasetQuality"
],
"requiredBundles": [
"kibanaReact"

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { formatNumber } from '@elastic/eui';
export const formatBytes = (value: number) => formatNumber(value, '0.0 b');

View file

@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import datemath from '@kbn/datemath';
export const ingestionRateQuery = ({
index,
start,
end,
timestampField = '@timestamp',
bucketCount = 10,
}: {
index: string;
start: string;
end: string;
timestampField?: string;
bucketCount?: number;
}) => {
const startDate = datemath.parse(start);
const endDate = datemath.parse(end);
if (!startDate || !endDate) {
throw new Error(`Expected a valid start and end date but got [start: ${start} | end: ${end}]`);
}
const intervalInSeconds = Math.max(
Math.round(endDate.diff(startDate, 'seconds') / bucketCount),
1
);
return {
index,
track_total_hits: false,
body: {
size: 0,
query: {
bool: {
filter: [{ range: { [timestampField]: { gte: start, lte: end } } }],
},
},
aggs: {
docs_count: {
date_histogram: {
field: timestampField,
fixed_interval: `${intervalInSeconds}s`,
min_doc_count: 0,
},
},
},
},
};
};

View file

@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import moment from 'moment';
import { IngestStreamGetResponse } from '@kbn/streams-schema';
import { DataStreamStatServiceResponse } from '@kbn/dataset-quality-plugin/public';
import { useKibana } from '../../../hooks/use_kibana';
import { useStreamsAppFetch } from '../../../hooks/use_streams_app_fetch';
export type DataStreamStats = DataStreamStatServiceResponse['dataStreamsStats'][number] & {
bytesPerDoc: number;
bytesPerDay: number;
};
export const useDataStreamStats = ({ definition }: { definition?: IngestStreamGetResponse }) => {
const {
services: { dataStreamsClient },
} = useKibana();
const statsFetch = useStreamsAppFetch(async () => {
if (!definition) {
return;
}
const client = await dataStreamsClient;
const {
dataStreamsStats: [dsStats],
} = await client.getDataStreamsStats({
datasetQuery: definition.stream.name,
includeCreationDate: true,
});
if (!dsStats || !dsStats.creationDate || !dsStats.sizeBytes) {
return undefined;
}
const daysSinceCreation = Math.max(
1,
Math.round(moment().diff(moment(dsStats.creationDate), 'days'))
);
return {
...dsStats,
bytesPerDay: dsStats.sizeBytes / daysSinceCreation,
bytesPerDoc: dsStats.totalDocs ? dsStats.sizeBytes / dsStats.totalDocs : 0,
};
}, [dataStreamsClient, definition]);
return {
stats: statsFetch.value,
isLoading: statsFetch.loading,
refresh: statsFetch.refresh,
error: statsFetch.error,
};
};

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { EuiFlexGroup, EuiFlexItem, EuiPanel } from '@elastic/eui';
import { EuiFlexGroup, EuiFlexItem, EuiPanel, EuiSpacer } from '@elastic/eui';
import React, { useMemo, useState } from 'react';
import {
IngestStreamGetResponse,
@ -27,6 +27,8 @@ import { useKibana } from '../../hooks/use_kibana';
import { EditLifecycleModal, LifecycleEditAction } from './modal';
import { RetentionSummary } from './summary';
import { RetentionMetadata } from './metadata';
import { IngestionRate } from './ingestion_rate';
import { useDataStreamStats } from './hooks/use_data_stream_stats';
import { getFormattedError } from '../../util/errors';
function useLifecycleState({
@ -112,6 +114,13 @@ export function StreamDetailLifecycle({
setUpdateInProgress,
} = useLifecycleState({ definition, isServerless });
const {
stats,
isLoading: isLoadingStats,
refresh: refreshStats,
error: statsError,
} = useDataStreamStats({ definition });
const { signal } = useAbortController();
if (!definition) {
@ -176,24 +185,38 @@ export function StreamDetailLifecycle({
ilmLocator={ilmLocator}
/>
<EuiFlexItem grow={false}>
<EuiPanel hasShadow={false} hasBorder paddingSize="s">
<EuiFlexGroup gutterSize="m">
<EuiFlexItem grow={1}>
<RetentionSummary definition={definition} />
</EuiFlexItem>
<EuiPanel grow={false} hasShadow={false} hasBorder paddingSize="s">
<EuiFlexGroup gutterSize="m">
<EuiFlexItem grow={1}>
<RetentionSummary definition={definition} />
</EuiFlexItem>
<EuiFlexItem grow={4}>
<RetentionMetadata
definition={definition}
lifecycleActions={lifecycleActions}
ilmLocator={ilmLocator}
openEditModal={(action) => setOpenEditModal(action)}
/>
</EuiFlexItem>
</EuiFlexGroup>
<EuiFlexItem grow={4}>
<RetentionMetadata
definition={definition}
lifecycleActions={lifecycleActions}
ilmLocator={ilmLocator}
openEditModal={(action) => setOpenEditModal(action)}
isLoadingStats={isLoadingStats}
stats={stats}
statsError={statsError}
/>
</EuiFlexItem>
</EuiFlexGroup>
</EuiPanel>
<EuiSpacer size="s" />
<EuiFlexGroup>
<EuiPanel hasShadow={false} hasBorder paddingSize="s">
<IngestionRate
definition={definition}
refreshStats={refreshStats}
isLoadingStats={isLoadingStats}
stats={stats}
/>
</EuiPanel>
</EuiFlexItem>
</EuiFlexGroup>
</>
);
}

View file

@ -0,0 +1,169 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import moment from 'moment';
import React from 'react';
import { lastValueFrom } from 'rxjs';
import { IKibanaSearchRequest, IKibanaSearchResponse } from '@kbn/search-types';
import { i18n } from '@kbn/i18n';
import { IngestStreamGetResponse } from '@kbn/streams-schema';
import {
EuiFlexGroup,
EuiFlexItem,
EuiLoadingChart,
EuiPanel,
EuiSpacer,
EuiText,
} from '@elastic/eui';
import { AreaSeries, Axis, Chart, Settings } from '@elastic/charts';
import { useDateRange } from '@kbn/observability-utils-browser/hooks/use_date_range';
import { useKibana } from '../../hooks/use_kibana';
import { DataStreamStats } from './hooks/use_data_stream_stats';
import { useStreamsAppFetch } from '../../hooks/use_streams_app_fetch';
import { ingestionRateQuery } from './helpers/ingestion_rate_query';
import { formatBytes } from './helpers/format_bytes';
import { StreamsAppSearchBar } from '../streams_app_search_bar';
export function IngestionRate({
definition,
stats,
isLoadingStats,
refreshStats,
}: {
definition?: IngestStreamGetResponse;
stats?: DataStreamStats;
isLoadingStats: boolean;
refreshStats: () => void;
}) {
const {
dependencies: {
start: { data },
},
} = useKibana();
const { timeRange, setTimeRange } = useDateRange({ data });
const {
loading: isLoadingIngestionRate,
value: ingestionRate,
error: ingestionRateError,
} = useStreamsAppFetch(
async ({ signal }) => {
if (!definition || isLoadingStats || !stats?.bytesPerDay) {
return;
}
const { rawResponse } = await lastValueFrom(
data.search.search<
IKibanaSearchRequest,
IKibanaSearchResponse<{
aggregations: { docs_count: { buckets: Array<{ key: string; doc_count: number }> } };
}>
>(
{
params: ingestionRateQuery({
start: timeRange.from,
end: timeRange.to,
index: definition.stream.name,
}),
},
{ abortSignal: signal }
)
);
return rawResponse.aggregations.docs_count.buckets.map(({ key, doc_count: docCount }) => ({
key,
value: docCount * stats.bytesPerDoc,
}));
},
[data.search, definition, stats, isLoadingStats, timeRange]
);
return (
<>
<EuiPanel hasShadow={false} hasBorder={false} paddingSize="s">
<EuiFlexGroup alignItems="center">
<EuiFlexItem grow={3}>
<EuiText>
<h5>
{i18n.translate('xpack.streams.streamDetailLifecycle.ingestionRatePanel', {
defaultMessage: 'Ingestion rate',
})}
</h5>
</EuiText>
</EuiFlexItem>
<EuiFlexItem grow={false}>
<StreamsAppSearchBar
dateRangeFrom={timeRange.from}
dateRangeTo={timeRange.to}
onQuerySubmit={({ dateRange }, isUpdate) => {
if (!isUpdate) {
refreshStats();
return;
}
if (dateRange) {
setTimeRange({
from: dateRange.from,
to: dateRange?.to,
mode: dateRange.mode,
});
}
}}
/>
</EuiFlexItem>
</EuiFlexGroup>
</EuiPanel>
<EuiSpacer size="s" />
{ingestionRateError ? (
<EuiFlexGroup
justifyContent="center"
alignItems="center"
style={{ width: '100%', height: '250px' }}
>
Failed to load ingestion rate
</EuiFlexGroup>
) : isLoadingIngestionRate || isLoadingStats || !ingestionRate ? (
<EuiFlexGroup
justifyContent="center"
alignItems="center"
style={{ width: '100%', height: '250px' }}
>
<EuiLoadingChart />
</EuiFlexGroup>
) : (
<Chart size={{ height: 250 }}>
<Settings showLegend={false} />
<AreaSeries
id="ingestionRate"
name="Ingestion rate"
data={ingestionRate}
color="#61A2FF"
xScaleType="time"
xAccessor={'key'}
yAccessors={['value']}
/>
<Axis
id="bottom-axis"
position="bottom"
tickFormat={(value) => moment(value).format('YYYY-MM-DD')}
gridLine={{ visible: false }}
/>
<Axis
id="left-axis"
position="left"
tickFormat={(value) => formatBytes(value)}
gridLine={{ visible: true }}
/>
</Chart>
)}
</>
);
}

View file

@ -26,6 +26,7 @@ import {
EuiFlexItem,
EuiHorizontalRule,
EuiLink,
EuiLoadingSpinner,
EuiPanel,
EuiPopover,
EuiText,
@ -33,21 +34,28 @@ import {
import { i18n } from '@kbn/i18n';
import { LifecycleEditAction } from './modal';
import { useStreamsAppRouter } from '../../hooks/use_streams_app_router';
import { DataStreamStats } from './hooks/use_data_stream_stats';
import { formatBytes } from './helpers/format_bytes';
export function RetentionMetadata({
definition,
ilmLocator,
lifecycleActions,
openEditModal,
stats,
isLoadingStats,
statsError,
}: {
definition: IngestStreamGetResponse;
ilmLocator?: LocatorPublic<IlmLocatorParams>;
lifecycleActions: Array<{ name: string; action: LifecycleEditAction }>;
openEditModal: (action: LifecycleEditAction) => void;
stats?: DataStreamStats;
isLoadingStats: boolean;
statsError?: Error;
}) {
const [isMenuOpen, { toggle: toggleMenu, off: closeMenu }] = useBoolean(false);
const router = useStreamsAppRouter();
const lifecycle = definition.effective_lifecycle;
const contextualMenu =
@ -171,6 +179,38 @@ export function RetentionMetadata({
</EuiFlexGroup>
}
/>
<EuiHorizontalRule margin="m" />
<MetadataRow
metadata={i18n.translate('xpack.streams.streamDetailLifecycle.ingestionRate', {
defaultMessage: 'Ingestion',
})}
value={
statsError ? (
'-'
) : isLoadingStats || !stats ? (
<EuiLoadingSpinner size="s" />
) : stats.bytesPerDay ? (
formatIngestionRate(stats.bytesPerDay)
) : (
'-'
)
}
/>
<EuiHorizontalRule margin="m" />
<MetadataRow
metadata={i18n.translate('xpack.streams.streamDetailLifecycle.totalDocs', {
defaultMessage: 'Total doc count',
})}
value={
statsError ? (
'-'
) : isLoadingStats || !stats ? (
<EuiLoadingSpinner size="s" />
) : (
stats.totalDocs
)
}
/>
</EuiPanel>
);
}
@ -197,3 +237,9 @@ function MetadataRow({
</EuiFlexGroup>
);
}
const formatIngestionRate = (bytesPerDay: number) => {
const perDay = formatBytes(bytesPerDay);
const perMonth = formatBytes(bytesPerDay * 30);
return `${perDay} / Day - ${perMonth} / Month`;
};

View file

@ -21,7 +21,7 @@ export function RetentionSummary({ definition }: { definition: IngestStreamGetRe
const summary = useMemo(() => summaryText(definition), [definition]);
return (
<EuiPanel hasShadow={false} hasBorder color="subdued" paddingSize="s">
<EuiPanel hasShadow={false} hasBorder color="subdued" paddingSize="m">
<EuiText>
<h5>
{i18n.translate('xpack.streams.streamDetailLifecycle.retentionSummaryLabel', {

View file

@ -18,6 +18,7 @@ import {
} from '@kbn/core/public';
import type { Logger } from '@kbn/logging';
import { STREAMS_APP_ID } from '@kbn/deeplinks-observability/constants';
import { DataStreamsStatsService } from '@kbn/dataset-quality-plugin/public';
import type {
ConfigSchema,
StreamsAppPublicSetup,
@ -119,7 +120,11 @@ export class StreamsAppPlugin
coreSetup.getStartServices(),
]);
const services: StreamsAppServices = {};
const services: StreamsAppServices = {
dataStreamsClient: new DataStreamsStatsService()
.start({ http: coreStart.http })
.getClient(),
};
return renderApp({
coreStart,

View file

@ -5,5 +5,8 @@
* 2.0.
*/
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface StreamsAppServices {}
import { IDataStreamsStatsClient } from '@kbn/dataset-quality-plugin/public';
export interface StreamsAppServices {
dataStreamsClient: Promise<IDataStreamsStatsClient>;
}

View file

@ -55,6 +55,9 @@
"@kbn/deeplinks-analytics",
"@kbn/dashboard-plugin",
"@kbn/react-kibana-mount",
"@kbn/fields-metadata-plugin"
"@kbn/fields-metadata-plugin",
"@kbn/datemath",
"@kbn/dataset-quality-plugin",
"@kbn/search-types"
]
}

View file

@ -21,13 +21,15 @@ export default function ApiTest({ getService }: FtrProviderContext) {
async function callApiAs(
user: DatasetQualityApiClientKey,
types: Array<'logs' | 'metrics' | 'traces' | 'synthetics'> = ['logs']
types: Array<'logs' | 'metrics' | 'traces' | 'synthetics'> = ['logs'],
includeCreationDate = false
) {
return await datasetQualityApiClient[user]({
endpoint: 'GET /internal/dataset_quality/data_streams/stats',
params: {
query: {
types: rison.encodeArray(types),
includeCreationDate,
},
},
});
@ -152,6 +154,18 @@ export default function ApiTest({ getService }: FtrProviderContext) {
expect(stats.body.dataStreamsStats[0].totalDocs).greaterThan(0);
});
it('does not return creation date by default', async () => {
const stats = await callApiAs('datasetQualityMonitorUser');
expect(stats.body.dataStreamsStats[0].size).not.empty();
expect(stats.body.dataStreamsStats[0].creationDate).to.be(undefined);
});
it('returns creation date when specified', async () => {
const stats = await callApiAs('datasetQualityMonitorUser', ['logs'], true);
expect(stats.body.dataStreamsStats[0].size).not.empty();
expect(stats.body.dataStreamsStats[0].creationDate).greaterThan(0);
});
after(async () => {
await logsSynthtrace.clean();
await cleanLogIndexTemplate({ esClient: es });