mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
[Dataset Quality ] Apply chunking strategy for data stream stats retrieval (#194816)
## 📓 Summary Closes #192169 This work fixes the issue with some requests hitting the too-long HTTP line once we combine all the dataset names into a single request. We had a suggested strategy from the work done with #171735 , but it presented a couple of problems. - The HTTP line length issue occurs for an exceeding length of the request URL, which goes over 4096 bytes (4096 characters.) This also includes the whole URL protocol, domain, path and any other parameters, so assuming that we have 4096 characters for the `index` parameter is incorrect, as we would exceed the maximum anyway in a worst-case scenario, where we have a chunk of 16 values with length 255 chars. - Always chunking the requests in groups of 16 items might not be optimal in the most common scenario where we have short data stream patterns. I opted to adopt a different chunking strategy that optimizes each chunk so that we reduce the requests triggered on the cluster to a minimum. I'll leave more notes in the code to help with the review. --------- Co-authored-by: Marco Antonio Ghiani <marcoantonio.ghiani@elastic.co>
This commit is contained in:
parent
3a3f1300a5
commit
bff69e218a
6 changed files with 137 additions and 14 deletions
|
@ -119,7 +119,7 @@ export async function getDataStreamDetails({
|
|||
}
|
||||
|
||||
async function getDataStreamCreatedOn(esClient: ElasticsearchClient, dataStream: string) {
|
||||
const indexSettings = await dataStreamService.getDataSteamIndexSettings(esClient, dataStream);
|
||||
const indexSettings = await dataStreamService.getDataStreamIndexSettings(esClient, dataStream);
|
||||
|
||||
const indexesList = Object.values(indexSettings);
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
*/
|
||||
|
||||
import type { ElasticsearchClient } from '@kbn/core/server';
|
||||
import { reduceAsyncChunks } from '../../../utils/reduce_async_chunks';
|
||||
|
||||
export interface MeteringStatsResponse {
|
||||
datastreams: Array<{
|
||||
|
@ -26,11 +27,13 @@ export async function getDataStreamsMeteringStats({
|
|||
return {};
|
||||
}
|
||||
|
||||
const { datastreams: dataStreamsStats } = await esClient.transport.request<MeteringStatsResponse>(
|
||||
{
|
||||
method: 'GET',
|
||||
path: `/_metering/stats/` + dataStreams.join(','),
|
||||
}
|
||||
const { datastreams: dataStreamsStats } = await reduceAsyncChunks(
|
||||
dataStreams,
|
||||
(dataStreamsChunk) =>
|
||||
esClient.transport.request<MeteringStatsResponse>({
|
||||
method: 'GET',
|
||||
path: `/_metering/stats/` + dataStreamsChunk.join(','),
|
||||
})
|
||||
);
|
||||
|
||||
return dataStreamsStats.reduce(
|
||||
|
|
|
@ -10,6 +10,7 @@ import type {
|
|||
IndicesDataStreamsStatsDataStreamsStatsItem,
|
||||
} from '@elastic/elasticsearch/lib/api/types';
|
||||
import type { ElasticsearchClient } from '@kbn/core/server';
|
||||
import { reduceAsyncChunks } from '../utils/reduce_async_chunks';
|
||||
|
||||
class DataStreamService {
|
||||
public async getMatchingDataStreams(
|
||||
|
@ -37,10 +38,11 @@ class DataStreamService {
|
|||
dataStreams: string[]
|
||||
): Promise<IndicesDataStreamsStatsDataStreamsStatsItem[]> {
|
||||
try {
|
||||
const { data_streams: dataStreamsStats } = await esClient.indices.dataStreamsStats({
|
||||
name: dataStreams.join(','),
|
||||
human: true,
|
||||
});
|
||||
const { data_streams: dataStreamsStats } = await reduceAsyncChunks(
|
||||
dataStreams,
|
||||
(dataStreamsChunk) =>
|
||||
esClient.indices.dataStreamsStats({ name: dataStreamsChunk.join(','), human: true })
|
||||
);
|
||||
|
||||
return dataStreamsStats;
|
||||
} catch (e) {
|
||||
|
@ -51,7 +53,7 @@ class DataStreamService {
|
|||
}
|
||||
}
|
||||
|
||||
public async getDataSteamIndexSettings(
|
||||
public async getDataStreamIndexSettings(
|
||||
esClient: ElasticsearchClient,
|
||||
dataStream: string
|
||||
): Promise<Awaited<ReturnType<ElasticsearchClient['indices']['getSettings']>>> {
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
import { chain, sumBy } from 'lodash';
|
||||
import type { ElasticsearchClient } from '@kbn/core/server';
|
||||
import { extractIndexNameFromBackingIndex } from '../../common/utils';
|
||||
import { reduceAsyncChunks } from '../utils/reduce_async_chunks';
|
||||
|
||||
interface IndexStatsResponse {
|
||||
docsCountPerDataStream: { [indexName: string]: number };
|
||||
|
@ -19,9 +20,9 @@ class IndexStatsService {
|
|||
dataStreams: string[]
|
||||
): Promise<IndexStatsResponse> {
|
||||
try {
|
||||
const index = dataStreams;
|
||||
|
||||
const { indices } = await esClient.indices.stats({ index, metric: ['docs'] });
|
||||
const { indices } = await reduceAsyncChunks(dataStreams, (indexChunk) =>
|
||||
esClient.indices.stats({ index: indexChunk, metric: ['docs'] })
|
||||
);
|
||||
|
||||
const docsCountPerDataStream = chain(indices || {})
|
||||
.map((indexStats, indexName) => ({
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { reduceAsyncChunks } from './reduce_async_chunks';
|
||||
|
||||
describe('reduceAsyncChunks', () => {
|
||||
const spyChunkExecutor = jest
|
||||
.fn()
|
||||
.mockImplementation((chunk: string[]) =>
|
||||
Promise.resolve(chunk.map((str) => str.toUpperCase()))
|
||||
);
|
||||
|
||||
afterEach(() => {
|
||||
spyChunkExecutor.mockClear();
|
||||
});
|
||||
|
||||
it('should run a iterator mapping callback on each chunk and merge the result', async () => {
|
||||
const input = Array(20).fill('logs-dataset-default');
|
||||
const expected = Array(20).fill('LOGS-DATASET-DEFAULT');
|
||||
|
||||
const res = await reduceAsyncChunks(input, spyChunkExecutor);
|
||||
|
||||
expect(res).toEqual(expected);
|
||||
expect(spyChunkExecutor).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should create chunks where the total strings length does not exceed the allowed maximum', async () => {
|
||||
const input = Array(1000).fill('logs-dataset-default'); // 20k chars => 20k/3072 => Expected 7 chunks
|
||||
const expected = Array(1000).fill('LOGS-DATASET-DEFAULT');
|
||||
const expectedChunks = 7;
|
||||
|
||||
const res = await reduceAsyncChunks(input, spyChunkExecutor);
|
||||
|
||||
expect(res).toEqual(expected);
|
||||
expect(spyChunkExecutor).toHaveBeenCalledTimes(expectedChunks);
|
||||
});
|
||||
|
||||
it('should maximize the chunks length the chunks count', async () => {
|
||||
const input = [
|
||||
...Array(1000).fill('logs-dataset_30letters-default'),
|
||||
...Array(1000).fill('logs-dataset-default'),
|
||||
]; // 30k chars + 20k chars + ~2k commas => 52k/3072 => Expected 17 chunks
|
||||
const expected = [
|
||||
...Array(1000).fill('LOGS-DATASET_30LETTERS-DEFAULT'),
|
||||
...Array(1000).fill('LOGS-DATASET-DEFAULT'),
|
||||
];
|
||||
const expectedChunks = 17;
|
||||
|
||||
const res = await reduceAsyncChunks(input, spyChunkExecutor);
|
||||
|
||||
expect(res).toEqual(expected);
|
||||
expect(spyChunkExecutor).toHaveBeenCalledTimes(expectedChunks);
|
||||
});
|
||||
});
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
import { Observable, OperatorFunction, from, lastValueFrom, mergeMap, reduce } from 'rxjs';
|
||||
import deepmerge from 'deepmerge';
|
||||
|
||||
type CallbackFn<TResult> = (chunk: string[], id: number) => Promise<TResult>;
|
||||
|
||||
const MAX_HTTP_LINE_LENGTH = 4096;
|
||||
// Apply an 80% threshold to the http line max length to guarantee enough space for url and potentially other parameters.
|
||||
// This value might need to vary as it's an estimate of how much we can reserve for the chunked list length.
|
||||
const MAX_CHUNK_LENGTH = MAX_HTTP_LINE_LENGTH * 0.75; // 4096 *0.75 === 3072 characters, as 1 chars = 1 byte
|
||||
|
||||
export const reduceAsyncChunks = <TResult>(list: string[], chunkExecutor: CallbackFn<TResult>) => {
|
||||
const result$ = from(list).pipe(
|
||||
bufferUntil(isLessThanMaxChunkLength),
|
||||
mergeMap((chunk, id) => from(chunkExecutor(chunk, id))),
|
||||
reduce((result, chunkResult) => deepmerge(result, chunkResult))
|
||||
);
|
||||
|
||||
return lastValueFrom(result$);
|
||||
};
|
||||
|
||||
/**
|
||||
* Support functions for reduceAsyncChunks
|
||||
*/
|
||||
const bufferUntil = <TItem>(
|
||||
predicate: (chunk: TItem[], currentItem: TItem) => boolean
|
||||
): OperatorFunction<TItem, TItem[]> => {
|
||||
return (source) =>
|
||||
new Observable((observer) => {
|
||||
let chunk: TItem[] = [];
|
||||
|
||||
return source.subscribe({
|
||||
next(currentItem) {
|
||||
if (predicate(chunk, currentItem)) {
|
||||
chunk.push(currentItem);
|
||||
} else {
|
||||
// Emit the current chunk, start a new one
|
||||
if (chunk.length > 0) observer.next(chunk);
|
||||
chunk = [currentItem]; // Reset the chunk with the current item
|
||||
}
|
||||
},
|
||||
complete() {
|
||||
// Emit the final chunk if it has any items
|
||||
if (chunk.length > 0) observer.next(chunk);
|
||||
observer.complete();
|
||||
},
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
const isLessThanMaxChunkLength = (chunk: string[], currentItem: string) => {
|
||||
const totalLength = [...chunk, currentItem].join().length;
|
||||
return totalLength <= MAX_CHUNK_LENGTH; // Allow the chunk until it exceeds the max chunk length
|
||||
};
|
Loading…
Add table
Add a link
Reference in a new issue