[Fleet] Use event.ingested where possible for data stream last activity (#116641)

* use event.ingested for datastream last activity

* remove ms precision from datastream dates

* add timestamp check to test

* fix type error

* split test out to be less complex and more reliable 🤞

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Mark Hopkin 2021-11-01 15:51:12 +00:00 committed by GitHub
parent 96a1d3186b
commit dcdc0f8e8f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 108 additions and 49 deletions

View file

@ -100,7 +100,9 @@ export const DataStreamListPage: React.FunctionComponent<{}> = () => {
}),
render: (date: DataStream['last_activity_ms']) => {
try {
const formatter = fieldFormats.getInstance('date');
const formatter = fieldFormats.getInstance('date', {
pattern: 'MMM D, YYYY @ HH:mm:ss',
});
return formatter.convert(date);
} catch (e) {
return <FormattedDate value={date} year="numeric" month="short" day="2-digit" />;

View file

@ -127,7 +127,7 @@ export const getListHandler: RequestHandler = async (context, request, response)
type: '',
package: dataStream._meta?.package?.name || '',
package_version: '',
last_activity_ms: dataStream.maximum_timestamp,
last_activity_ms: dataStream.maximum_timestamp, // overridden below if maxIngestedTimestamp agg returns a result
size_in_bytes: dataStream.store_size_bytes,
dashboards: [],
};
@ -156,6 +156,11 @@ export const getListHandler: RequestHandler = async (context, request, response)
},
},
aggs: {
maxIngestedTimestamp: {
max: {
field: 'event.ingested',
},
},
dataset: {
terms: {
field: 'data_stream.dataset',
@ -178,12 +183,20 @@ export const getListHandler: RequestHandler = async (context, request, response)
},
});
const { maxIngestedTimestamp } = dataStreamAggs as Record<
string,
estypes.AggregationsValueAggregate
>;
const { dataset, namespace, type } = dataStreamAggs as Record<
string,
estypes.AggregationsMultiBucketAggregate<{ key?: string }>
estypes.AggregationsMultiBucketAggregate<{ key?: string; value?: number }>
>;
// Set values from backing indices query
// some integrations e.g custom logs don't have event.ingested
if (maxIngestedTimestamp?.value) {
dataStreamResponse.last_activity_ms = maxIngestedTimestamp?.value;
}
dataStreamResponse.dataset = dataset.buckets[0]?.key || '';
dataStreamResponse.namespace = namespace.buckets[0]?.key || '';
dataStreamResponse.type = type.buckets[0]?.key || '';

View file

@ -6,9 +6,14 @@
*/
import expect from '@kbn/expect';
import { keyBy } from 'lodash';
import { FtrProviderContext } from '../../../api_integration/ftr_provider_context';
import { skipIfNoDockerRegistry } from '../../helpers';
interface IndexResponse {
_id: string;
_index: string;
}
export default function (providerContext: FtrProviderContext) {
const { getService } = providerContext;
const supertest = getService('supertest');
@ -33,34 +38,51 @@ export default function (providerContext: FtrProviderContext) {
};
const seedDataStreams = async () => {
await es.transport.request({
method: 'POST',
path: `/${logsTemplateName}-default/_doc`,
body: {
'@timestamp': '2015-01-01',
logs_test_name: 'test',
data_stream: {
dataset: `${pkgName}.test_logs`,
namespace: 'default',
type: 'logs',
const responses = [];
responses.push(
await es.transport.request({
method: 'POST',
path: `/${logsTemplateName}-default/_doc`,
body: {
'@timestamp': '2015-01-01',
logs_test_name: 'test',
data_stream: {
dataset: `${pkgName}.test_logs`,
namespace: 'default',
type: 'logs',
},
},
},
});
await es.transport.request({
method: 'POST',
path: `/${metricsTemplateName}-default/_doc`,
body: {
'@timestamp': '2015-01-01',
logs_test_name: 'test',
data_stream: {
dataset: `${pkgName}.test_metrics`,
namespace: 'default',
type: 'metrics',
})
);
responses.push(
await es.transport.request({
method: 'POST',
path: `/${metricsTemplateName}-default/_doc`,
body: {
'@timestamp': '2015-01-01',
logs_test_name: 'test',
data_stream: {
dataset: `${pkgName}.test_metrics`,
namespace: 'default',
type: 'metrics',
},
},
},
});
})
);
return responses as IndexResponse[];
};
const getSeedDocsFromResponse = async (indexResponses: IndexResponse[]) =>
Promise.all(
indexResponses.map((indexResponse) =>
es.transport.request({
method: 'GET',
path: `/${indexResponse._index}/_doc/${indexResponse._id}`,
})
)
);
const getDataStreams = async () => {
return await supertest.get(`/api/fleet/data_streams`).set('kbn-xsrf', 'xxxx');
};
@ -93,36 +115,58 @@ export default function (providerContext: FtrProviderContext) {
expect(body).to.eql({ data_streams: [] });
});
it('should return correct data stream information', async function () {
it('should return correct basic data stream information', async function () {
await seedDataStreams();
await retry.tryForTime(10000, async () => {
const { body } = await getDataStreams();
return expect(
body.data_streams.map((dataStream: any) => {
// eslint-disable-next-line @typescript-eslint/naming-convention
const { index, size_in_bytes, ...rest } = dataStream;
return rest;
})
).to.eql([
{
dataset: 'datastreams.test_logs',
namespace: 'default',
type: 'logs',
package: 'datastreams',
package_version: '0.1.0',
last_activity_ms: 1420070400000,
dashboards: [],
},
// we can't compare the array directly as the order is unpredictable
const expectedStreamsByDataset = keyBy(
[
{
dataset: 'datastreams.test_metrics',
namespace: 'default',
type: 'metrics',
package: 'datastreams',
package_version: '0.1.0',
last_activity_ms: 1420070400000,
dashboards: [],
},
]);
{
dataset: 'datastreams.test_logs',
namespace: 'default',
type: 'logs',
package: 'datastreams',
package_version: '0.1.0',
dashboards: [],
},
],
'dataset'
);
await retry.tryForTime(10000, async () => {
const { body } = await getDataStreams();
expect(body.data_streams.length).to.eql(2);
body.data_streams.forEach((dataStream: any) => {
// eslint-disable-next-line @typescript-eslint/naming-convention
const { index, size_in_bytes, last_activity_ms, ...coreFields } = dataStream;
expect(expectedStreamsByDataset[coreFields.dataset]).not.to.eql(undefined);
expect(coreFields).to.eql(expectedStreamsByDataset[coreFields.dataset]);
});
});
});
it('should use event.ingested instead of @timestamp for last_activity_ms', async function () {
const seedResponse = await seedDataStreams();
const docs = await getSeedDocsFromResponse(seedResponse);
const docsByDataset: Record<string, any> = keyBy(docs, '_source.data_stream.dataset');
await retry.tryForTime(10000, async () => {
const { body } = await getDataStreams();
expect(body.data_streams.length).to.eql(2);
body.data_streams.forEach((dataStream: any) => {
expect(docsByDataset[dataStream.dataset]).not.to.eql(undefined);
const expectedTimestamp = new Date(
docsByDataset[dataStream.dataset]?._source?.event?.ingested
).getTime();
expect(dataStream.last_activity_ms).to.eql(expectedTimestamp);
});
});
});