[ML] APM Latency Correlations: Field/value candidates prioritization (#107370)

- Makes sure fields defined in `FIELDS_TO_ADD_AS_CANDIDATE` and prefixed with one of `FIELD_PREFIX_TO_ADD_AS_CANDIDATE` get queried first when retrieving the `correlation` and `ks-test` value.
- Correctly consider the `includeFrozen` parameter.
- The bulk of the PR is a refactor:
  - Moves `query_*` files to `queries` directory
  - Introduces `asyncSearchServiceStateProvider` to manage the state of the async search service in isolation so that we no longer mutate individual vars or plain objects.
  - Introduces `asyncSearchServiceLogProvider` and extends the log to not only store messages but original error messages retrieved from ES too.
  - Refactors some more functions in separate files and adds unit tests.
  - Removes some deprecated code no longer needed.
This commit is contained in:
Walter Rafelsberger 2021-08-11 10:46:35 +02:00 committed by GitHub
parent a444d8a4ab
commit 86c17daec2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
44 changed files with 1089 additions and 418 deletions

View file

@ -32,6 +32,7 @@ export interface SearchServiceParams {
export interface SearchServiceFetchParams extends SearchServiceParams {
index: string;
includeFrozen?: boolean;
}
export interface SearchServiceValue {
@ -50,5 +51,4 @@ export interface AsyncSearchProviderProgress {
loadedFieldCanditates: number;
loadedFieldValuePairs: number;
loadedHistograms: number;
getOverallProgress: () => number;
}

View file

@ -5,28 +5,25 @@
* 2.0.
*/
import { shuffle, range } from 'lodash';
import { range } from 'lodash';
import type { ElasticsearchClient } from 'src/core/server';
import type { ApmIndicesConfig } from '../../settings/apm_indices/get_apm_indices';
import { fetchTransactionDurationFieldCandidates } from './query_field_candidates';
import { fetchTransactionDurationFieldValuePairs } from './query_field_value_pairs';
import { fetchTransactionDurationPercentiles } from './query_percentiles';
import { fetchTransactionDurationCorrelation } from './query_correlation';
import { fetchTransactionDurationHistogramRangeSteps } from './query_histogram_range_steps';
import { fetchTransactionDurationRanges, HistogramItem } from './query_ranges';
import type {
AsyncSearchProviderProgress,
SearchServiceParams,
SearchServiceFetchParams,
SearchServiceValue,
} from '../../../../common/search_strategies/correlations/types';
import { computeExpectationsAndRanges } from './utils/aggregation_utils';
import { fetchTransactionDurationFractions } from './query_fractions';
const CORRELATION_THRESHOLD = 0.3;
const KS_TEST_THRESHOLD = 0.1;
const currentTimeAsString = () => new Date().toISOString();
import type { ApmIndicesConfig } from '../../settings/apm_indices/get_apm_indices';
import {
fetchTransactionDurationFieldCandidates,
fetchTransactionDurationFieldValuePairs,
fetchTransactionDurationFractions,
fetchTransactionDurationPercentiles,
fetchTransactionDurationHistograms,
fetchTransactionDurationHistogramRangeSteps,
fetchTransactionDurationRanges,
} from './queries';
import { computeExpectationsAndRanges } from './utils';
import { asyncSearchServiceLogProvider } from './async_search_service_log';
import { asyncSearchServiceStateProvider } from './async_search_service_state';
export const asyncSearchServiceProvider = (
esClient: ElasticsearchClient,
@ -34,40 +31,11 @@ export const asyncSearchServiceProvider = (
searchServiceParams: SearchServiceParams,
includeFrozen: boolean
) => {
let isCancelled = false;
let isRunning = true;
let error: Error;
let ccsWarning = false;
const log: string[] = [];
const logMessage = (message: string) =>
log.push(`${currentTimeAsString()}: ${message}`);
const { addLogMessage, getLogMessages } = asyncSearchServiceLogProvider();
const progress: AsyncSearchProviderProgress = {
started: Date.now(),
loadedHistogramStepsize: 0,
loadedOverallHistogram: 0,
loadedFieldCanditates: 0,
loadedFieldValuePairs: 0,
loadedHistograms: 0,
getOverallProgress: () =>
progress.loadedHistogramStepsize * 0.025 +
progress.loadedOverallHistogram * 0.025 +
progress.loadedFieldCanditates * 0.025 +
progress.loadedFieldValuePairs * 0.025 +
progress.loadedHistograms * 0.9,
};
const state = asyncSearchServiceStateProvider();
const values: SearchServiceValue[] = [];
let overallHistogram: HistogramItem[] | undefined;
let percentileThresholdValue: number;
const cancel = () => {
logMessage(`Service cancelled.`);
isCancelled = true;
};
const fetchCorrelations = async () => {
async function fetchCorrelations() {
let params: SearchServiceFetchParams | undefined;
try {
@ -75,6 +43,7 @@ export const asyncSearchServiceProvider = (
params = {
...searchServiceParams,
index: indices['apm_oss.transactionIndices'],
includeFrozen,
};
// 95th percentile to be displayed as a marker in the log log chart
@ -86,24 +55,27 @@ export const asyncSearchServiceProvider = (
params,
params.percentileThreshold ? [params.percentileThreshold] : undefined
);
percentileThresholdValue =
const percentileThresholdValue =
percentileThreshold[`${params.percentileThreshold}.0`];
state.setPercentileThresholdValue(percentileThresholdValue);
logMessage(
addLogMessage(
`Fetched ${params.percentileThreshold}th percentile value of ${percentileThresholdValue} based on ${totalDocs} documents.`
);
// finish early if we weren't able to identify the percentileThresholdValue.
if (percentileThresholdValue === undefined) {
logMessage(
addLogMessage(
`Abort service since percentileThresholdValue could not be determined.`
);
progress.loadedHistogramStepsize = 1;
progress.loadedOverallHistogram = 1;
progress.loadedFieldCanditates = 1;
progress.loadedFieldValuePairs = 1;
progress.loadedHistograms = 1;
isRunning = false;
state.setProgress({
loadedHistogramStepsize: 1,
loadedOverallHistogram: 1,
loadedFieldCanditates: 1,
loadedFieldValuePairs: 1,
loadedHistograms: 1,
});
state.setIsRunning(false);
return;
}
@ -111,12 +83,12 @@ export const asyncSearchServiceProvider = (
esClient,
params
);
progress.loadedHistogramStepsize = 1;
state.setProgress({ loadedHistogramStepsize: 1 });
logMessage(`Loaded histogram range steps.`);
addLogMessage(`Loaded histogram range steps.`);
if (isCancelled) {
isRunning = false;
if (state.getIsCancelled()) {
state.setIsRunning(false);
return;
}
@ -125,13 +97,13 @@ export const asyncSearchServiceProvider = (
params,
histogramRangeSteps
);
progress.loadedOverallHistogram = 1;
overallHistogram = overallLogHistogramChartData;
state.setProgress({ loadedOverallHistogram: 1 });
state.setOverallHistogram(overallLogHistogramChartData);
logMessage(`Loaded overall histogram chart data.`);
addLogMessage(`Loaded overall histogram chart data.`);
if (isCancelled) {
isRunning = false;
if (state.getIsCancelled()) {
state.setIsRunning(false);
return;
}
@ -142,10 +114,10 @@ export const asyncSearchServiceProvider = (
} = await fetchTransactionDurationPercentiles(esClient, params, percents);
const percentiles = Object.values(percentilesRecords);
logMessage(`Loaded percentiles.`);
addLogMessage(`Loaded percentiles.`);
if (isCancelled) {
isRunning = false;
if (state.getIsCancelled()) {
state.setIsRunning(false);
return;
}
@ -154,21 +126,22 @@ export const asyncSearchServiceProvider = (
params
);
logMessage(`Identified ${fieldCandidates.length} fieldCandidates.`);
addLogMessage(`Identified ${fieldCandidates.length} fieldCandidates.`);
progress.loadedFieldCanditates = 1;
state.setProgress({ loadedFieldCanditates: 1 });
const fieldValuePairs = await fetchTransactionDurationFieldValuePairs(
esClient,
params,
fieldCandidates,
progress
state,
addLogMessage
);
logMessage(`Identified ${fieldValuePairs.length} fieldValuePairs.`);
addLogMessage(`Identified ${fieldValuePairs.length} fieldValuePairs.`);
if (isCancelled) {
isRunning = false;
if (state.getIsCancelled()) {
state.setIsRunning(false);
return;
}
@ -181,114 +154,75 @@ export const asyncSearchServiceProvider = (
totalDocCount,
} = await fetchTransactionDurationFractions(esClient, params, ranges);
logMessage(`Loaded fractions and totalDocCount of ${totalDocCount}.`);
async function* fetchTransactionDurationHistograms() {
for (const item of shuffle(fieldValuePairs)) {
if (params === undefined || item === undefined || isCancelled) {
isRunning = false;
return;
}
// If one of the fields have an error
// We don't want to stop the whole process
try {
const {
correlation,
ksTest,
} = await fetchTransactionDurationCorrelation(
esClient,
params,
expectations,
ranges,
fractions,
totalDocCount,
item.field,
item.value
);
if (isCancelled) {
isRunning = false;
return;
}
if (
correlation !== null &&
correlation > CORRELATION_THRESHOLD &&
ksTest !== null &&
ksTest < KS_TEST_THRESHOLD
) {
const logHistogram = await fetchTransactionDurationRanges(
esClient,
params,
histogramRangeSteps,
item.field,
item.value
);
yield {
...item,
correlation,
ksTest,
histogram: logHistogram,
};
} else {
yield undefined;
}
} catch (e) {
// don't fail the whole process for individual correlation queries,
// just add the error to the internal log and check if we'd want to set the
// cross-cluster search compatibility warning to true.
logMessage(
`Failed to fetch correlation/kstest for '${item.field}/${item.value}'`
);
if (params?.index.includes(':')) {
ccsWarning = true;
}
yield undefined;
}
}
}
addLogMessage(`Loaded fractions and totalDocCount of ${totalDocCount}.`);
let loadedHistograms = 0;
for await (const item of fetchTransactionDurationHistograms()) {
for await (const item of fetchTransactionDurationHistograms(
esClient,
addLogMessage,
params,
state,
expectations,
ranges,
fractions,
histogramRangeSteps,
totalDocCount,
fieldValuePairs
)) {
if (item !== undefined) {
values.push(item);
state.addValue(item);
}
loadedHistograms++;
progress.loadedHistograms = loadedHistograms / fieldValuePairs.length;
state.setProgress({
loadedHistograms: loadedHistograms / fieldValuePairs.length,
});
}
logMessage(
`Identified ${values.length} significant correlations out of ${fieldValuePairs.length} field/value pairs.`
addLogMessage(
`Identified ${
state.getState().values.length
} significant correlations out of ${
fieldValuePairs.length
} field/value pairs.`
);
} catch (e) {
error = e;
state.setError(e);
}
if (error !== undefined && params?.index.includes(':')) {
ccsWarning = true;
if (state.getState().error !== undefined && params?.index.includes(':')) {
state.setCcsWarning(true);
}
isRunning = false;
};
state.setIsRunning(false);
}
fetchCorrelations();
return () => {
const sortedValues = values.sort((a, b) => b.correlation - a.correlation);
const {
ccsWarning,
error,
isRunning,
overallHistogram,
percentileThresholdValue,
progress,
} = state.getState();
return {
ccsWarning,
error,
log,
log: getLogMessages(),
isRunning,
loaded: Math.round(progress.getOverallProgress() * 100),
loaded: Math.round(state.getOverallProgress() * 100),
overallHistogram,
started: progress.started,
total: 100,
values: sortedValues,
values: state.getValuesSortedByCorrelation(),
percentileThresholdValue,
cancel,
cancel: () => {
addLogMessage(`Service cancelled.`);
state.setIsCancelled(true);
},
};
};
};

View file

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { asyncSearchServiceLogProvider } from './async_search_service_log';
describe('async search service', () => {
describe('asyncSearchServiceLogProvider', () => {
it('adds and retrieves messages from the log', async () => {
const { addLogMessage, getLogMessages } = asyncSearchServiceLogProvider();
const mockDate = new Date(1392202800000);
// @ts-ignore ignore the mockImplementation callback error
const spy = jest.spyOn(global, 'Date').mockImplementation(() => mockDate);
addLogMessage('the first message');
addLogMessage('the second message');
expect(getLogMessages()).toEqual([
'2014-02-12T11:00:00.000Z: the first message',
'2014-02-12T11:00:00.000Z: the second message',
]);
spy.mockRestore();
});
});
});

View file

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { currentTimeAsString } from './utils';
interface LogMessage {
timestamp: string;
message: string;
error?: string;
}
export const asyncSearchServiceLogProvider = () => {
const log: LogMessage[] = [];
function addLogMessage(message: string, error?: string) {
log.push({
timestamp: currentTimeAsString(),
message,
...(error !== undefined ? { error } : {}),
});
}
function getLogMessages() {
return log.map((l) => `${l.timestamp}: ${l.message}`);
}
return { addLogMessage, getLogMessages };
};
export type AsyncSearchServiceLog = ReturnType<
typeof asyncSearchServiceLogProvider
>;

View file

@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { asyncSearchServiceStateProvider } from './async_search_service_state';
describe('async search service', () => {
describe('asyncSearchServiceStateProvider', () => {
it('initializes with default state', () => {
const state = asyncSearchServiceStateProvider();
const defaultState = state.getState();
const defaultProgress = state.getOverallProgress();
expect(defaultState.ccsWarning).toBe(false);
expect(defaultState.error).toBe(undefined);
expect(defaultState.isCancelled).toBe(false);
expect(defaultState.isRunning).toBe(true);
expect(defaultState.overallHistogram).toBe(undefined);
expect(defaultState.progress.loadedFieldCanditates).toBe(0);
expect(defaultState.progress.loadedFieldValuePairs).toBe(0);
expect(defaultState.progress.loadedHistogramStepsize).toBe(0);
expect(defaultState.progress.loadedHistograms).toBe(0);
expect(defaultState.progress.loadedOverallHistogram).toBe(0);
expect(defaultState.progress.started > 0).toBe(true);
expect(defaultProgress).toBe(0);
});
it('returns updated state', () => {
const state = asyncSearchServiceStateProvider();
state.setCcsWarning(true);
state.setError(new Error('the-error-message'));
state.setIsCancelled(true);
state.setIsRunning(false);
state.setOverallHistogram([{ key: 1392202800000, doc_count: 1234 }]);
state.setProgress({ loadedHistograms: 0.5 });
const updatedState = state.getState();
const updatedProgress = state.getOverallProgress();
expect(updatedState.ccsWarning).toBe(true);
expect(updatedState.error?.message).toBe('the-error-message');
expect(updatedState.isCancelled).toBe(true);
expect(updatedState.isRunning).toBe(false);
expect(updatedState.overallHistogram).toEqual([
{ key: 1392202800000, doc_count: 1234 },
]);
expect(updatedState.progress.loadedFieldCanditates).toBe(0);
expect(updatedState.progress.loadedFieldValuePairs).toBe(0);
expect(updatedState.progress.loadedHistogramStepsize).toBe(0);
expect(updatedState.progress.loadedHistograms).toBe(0.5);
expect(updatedState.progress.loadedOverallHistogram).toBe(0);
expect(updatedState.progress.started > 0).toBe(true);
expect(updatedProgress).toBe(0.45);
});
});
});

View file

@ -0,0 +1,115 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type {
AsyncSearchProviderProgress,
SearchServiceValue,
} from '../../../../common/search_strategies/correlations/types';
import { HistogramItem } from './queries';
export const asyncSearchServiceStateProvider = () => {
let ccsWarning = false;
function setCcsWarning(d: boolean) {
ccsWarning = d;
}
let error: Error;
function setError(d: Error) {
error = d;
}
let isCancelled = false;
function getIsCancelled() {
return isCancelled;
}
function setIsCancelled(d: boolean) {
isCancelled = d;
}
let isRunning = true;
function setIsRunning(d: boolean) {
isRunning = d;
}
let overallHistogram: HistogramItem[] | undefined;
function setOverallHistogram(d: HistogramItem[]) {
overallHistogram = d;
}
let percentileThresholdValue: number;
function setPercentileThresholdValue(d: number) {
percentileThresholdValue = d;
}
let progress: AsyncSearchProviderProgress = {
started: Date.now(),
loadedHistogramStepsize: 0,
loadedOverallHistogram: 0,
loadedFieldCanditates: 0,
loadedFieldValuePairs: 0,
loadedHistograms: 0,
};
function getOverallProgress() {
return (
progress.loadedHistogramStepsize * 0.025 +
progress.loadedOverallHistogram * 0.025 +
progress.loadedFieldCanditates * 0.025 +
progress.loadedFieldValuePairs * 0.025 +
progress.loadedHistograms * 0.9
);
}
function setProgress(
d: Partial<Omit<AsyncSearchProviderProgress, 'started'>>
) {
progress = {
...progress,
...d,
};
}
const values: SearchServiceValue[] = [];
function addValue(d: SearchServiceValue) {
values.push(d);
}
function getValuesSortedByCorrelation() {
return values.sort((a, b) => b.correlation - a.correlation);
}
function getState() {
return {
ccsWarning,
error,
isCancelled,
isRunning,
overallHistogram,
percentileThresholdValue,
progress,
values,
};
}
return {
addValue,
getIsCancelled,
getOverallProgress,
getState,
getValuesSortedByCorrelation,
setCcsWarning,
setError,
setIsCancelled,
setIsRunning,
setOverallHistogram,
setPercentileThresholdValue,
setProgress,
};
};
export type AsyncSearchServiceState = ReturnType<
typeof asyncSearchServiceStateProvider
>;

View file

@ -76,3 +76,6 @@ export const PERCENTILES_STEP = 2;
export const TERMS_SIZE = 20;
export const SIGNIFICANT_FRACTION = 3;
export const SIGNIFICANT_VALUE_DIGITS = 3;
export const CORRELATION_THRESHOLD = 0.3;
export const KS_TEST_THRESHOLD = 0.1;

View file

@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { getPrioritizedFieldValuePairs } from './get_prioritized_field_value_pairs';
describe('correlations', () => {
describe('getPrioritizedFieldValuePairs', () => {
it('returns fields without prioritization in the same order', () => {
const fieldValuePairs = [
{ field: 'the-field-1', value: 'the-value-1' },
{ field: 'the-field-2', value: 'the-value-2' },
];
const prioritziedFieldValuePairs = getPrioritizedFieldValuePairs(
fieldValuePairs
);
expect(prioritziedFieldValuePairs.map((d) => d.field)).toEqual([
'the-field-1',
'the-field-2',
]);
});
it('returns fields with already sorted prioritization in the same order', () => {
const fieldValuePairs = [
{ field: 'service.version', value: 'the-value-1' },
{ field: 'the-field-2', value: 'the-value-2' },
];
const prioritziedFieldValuePairs = getPrioritizedFieldValuePairs(
fieldValuePairs
);
expect(prioritziedFieldValuePairs.map((d) => d.field)).toEqual([
'service.version',
'the-field-2',
]);
});
it('returns fields with unsorted prioritization in the corrected order', () => {
const fieldValuePairs = [
{ field: 'the-field-1', value: 'the-value-1' },
{ field: 'service.version', value: 'the-value-2' },
];
const prioritziedFieldValuePairs = getPrioritizedFieldValuePairs(
fieldValuePairs
);
expect(prioritziedFieldValuePairs.map((d) => d.field)).toEqual([
'service.version',
'the-field-1',
]);
});
it('considers prefixes when sorting', () => {
const fieldValuePairs = [
{ field: 'the-field-1', value: 'the-value-1' },
{ field: 'service.version', value: 'the-value-2' },
{ field: 'cloud.the-field-3', value: 'the-value-3' },
];
const prioritziedFieldValuePairs = getPrioritizedFieldValuePairs(
fieldValuePairs
);
expect(prioritziedFieldValuePairs.map((d) => d.field)).toEqual([
'service.version',
'cloud.the-field-3',
'the-field-1',
]);
});
});
});

View file

@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { FIELDS_TO_ADD_AS_CANDIDATE } from '../constants';
import { hasPrefixToInclude } from '../utils';
import type { FieldValuePairs } from './query_field_value_pairs';
export const getPrioritizedFieldValuePairs = (
fieldValuePairs: FieldValuePairs
) => {
const prioritizedFields = [...FIELDS_TO_ADD_AS_CANDIDATE];
return fieldValuePairs.sort((a, b) => {
const hasPrefixA = hasPrefixToInclude(a.field);
const hasPrefixB = hasPrefixToInclude(b.field);
const includesA = prioritizedFields.includes(a.field);
const includesB = prioritizedFields.includes(b.field);
if ((includesA || hasPrefixA) && !includesB && !hasPrefixB) {
return -1;
} else if (!includesA && !hasPrefixA && (includesB || hasPrefixB)) {
return 1;
}
return 0;
});
};

View file

@ -11,7 +11,12 @@ describe('correlations', () => {
describe('getQueryWithParams', () => {
it('returns the most basic query filtering on processor.event=transaction', () => {
const query = getQueryWithParams({
params: { index: 'apm-*', start: '2020', end: '2021' },
params: {
index: 'apm-*',
start: '2020',
end: '2021',
includeFrozen: false,
},
});
expect(query).toEqual({
bool: {
@ -41,6 +46,7 @@ describe('correlations', () => {
end: '2021',
environment: 'dev',
percentileThresholdValue: 75,
includeFrozen: false,
},
});
expect(query).toEqual({
@ -89,7 +95,12 @@ describe('correlations', () => {
it('returns a query considering a custom field/value pair', () => {
const query = getQueryWithParams({
params: { index: 'apm-*', start: '2020', end: '2021' },
params: {
index: 'apm-*',
start: '2020',
end: '2021',
includeFrozen: false,
},
fieldName: 'actualFieldName',
fieldValue: 'actualFieldValue',
});

View file

@ -10,11 +10,11 @@ import { getOrElse } from 'fp-ts/lib/Either';
import { pipe } from 'fp-ts/lib/pipeable';
import * as t from 'io-ts';
import { failure } from 'io-ts/lib/PathReporter';
import { TRANSACTION_DURATION } from '../../../../common/elasticsearch_fieldnames';
import type { SearchServiceFetchParams } from '../../../../common/search_strategies/correlations/types';
import { rangeRt } from '../../../routes/default_api_types';
import { getCorrelationsFilters } from '../../correlations/get_filters';
import { Setup, SetupTimeRange } from '../../helpers/setup_request';
import { TRANSACTION_DURATION } from '../../../../../common/elasticsearch_fieldnames';
import type { SearchServiceFetchParams } from '../../../../../common/search_strategies/correlations/types';
import { rangeRt } from '../../../../routes/default_api_types';
import { getCorrelationsFilters } from '../../../correlations/get_filters';
import { Setup, SetupTimeRange } from '../../../helpers/setup_request';
const getPercentileThresholdValueQuery = (
percentileThresholdValue: number | undefined

View file

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { getRequestBase } from './get_request_base';
describe('correlations', () => {
describe('getRequestBase', () => {
it('returns the request base parameters', () => {
const requestBase = getRequestBase({
index: 'apm-*',
includeFrozen: true,
});
expect(requestBase).toEqual({
index: 'apm-*',
ignore_throttled: false,
ignore_unavailable: true,
});
});
it('defaults ignore_throttled to true', () => {
const requestBase = getRequestBase({
index: 'apm-*',
});
expect(requestBase).toEqual({
index: 'apm-*',
ignore_throttled: true,
ignore_unavailable: true,
});
});
});
});

View file

@ -0,0 +1,18 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { SearchServiceFetchParams } from '../../../../../common/search_strategies/correlations/types';
export const getRequestBase = ({
index,
includeFrozen,
}: SearchServiceFetchParams) => ({
index,
// matches APM's event client settings
ignore_throttled: includeFrozen === undefined ? true : !includeFrozen,
ignore_unavailable: true,
});

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { fetchTransactionDurationFieldCandidates } from './query_field_candidates';
export { fetchTransactionDurationFieldValuePairs } from './query_field_value_pairs';
export { fetchTransactionDurationFractions } from './query_fractions';
export { fetchTransactionDurationPercentiles } from './query_percentiles';
export { fetchTransactionDurationCorrelation } from './query_correlation';
export { fetchTransactionDurationHistograms } from './query_histograms_generator';
export { fetchTransactionDurationHistogramRangeSteps } from './query_histogram_range_steps';
export { fetchTransactionDurationRanges, HistogramItem } from './query_ranges';

View file

@ -15,7 +15,12 @@ import {
BucketCorrelation,
} from './query_correlation';
const params = { index: 'apm-*', start: '2020', end: '2021' };
const params = {
index: 'apm-*',
start: '2020',
end: '2021',
includeFrozen: false,
};
const expectations = [1, 3, 5];
const ranges = [{ to: 1 }, { from: 1, to: 3 }, { from: 3, to: 5 }, { from: 5 }];
const fractions = [1, 2, 4, 5];

View file

@ -9,10 +9,11 @@ import type { estypes } from '@elastic/elasticsearch';
import type { ElasticsearchClient } from 'src/core/server';
import { TRANSACTION_DURATION } from '../../../../common/elasticsearch_fieldnames';
import type { SearchServiceFetchParams } from '../../../../common/search_strategies/correlations/types';
import { TRANSACTION_DURATION } from '../../../../../common/elasticsearch_fieldnames';
import type { SearchServiceFetchParams } from '../../../../../common/search_strategies/correlations/types';
import { getQueryWithParams } from './get_query_with_params';
import { getRequestBase } from './get_request_base';
export interface HistogramItem {
key: number;
@ -88,7 +89,7 @@ export const getTransactionDurationCorrelationRequest = (
},
};
return {
index: params.index,
...getRequestBase(params),
body,
};
};

View file

@ -9,14 +9,20 @@ import type { estypes } from '@elastic/elasticsearch';
import type { ElasticsearchClient } from 'src/core/server';
import { hasPrefixToInclude } from '../utils/has_prefix_to_include';
import {
fetchTransactionDurationFieldCandidates,
getRandomDocsRequest,
hasPrefixToInclude,
shouldBeExcluded,
} from './query_field_candidates';
const params = { index: 'apm-*', start: '2020', end: '2021' };
const params = {
index: 'apm-*',
start: '2020',
end: '2021',
includeFrozen: false,
};
describe('query_field_candidates', () => {
describe('shouldBeExcluded', () => {
@ -79,6 +85,8 @@ describe('query_field_candidates', () => {
size: 1000,
},
index: params.index,
ignore_throttled: !params.includeFrozen,
ignore_unavailable: true,
});
});
});

View file

@ -9,17 +9,19 @@ import type { estypes } from '@elastic/elasticsearch';
import type { ElasticsearchClient } from 'src/core/server';
import type { SearchServiceFetchParams } from '../../../../common/search_strategies/correlations/types';
import type { SearchServiceFetchParams } from '../../../../../common/search_strategies/correlations/types';
import { getQueryWithParams } from './get_query_with_params';
import { Field } from './query_field_value_pairs';
import {
FIELD_PREFIX_TO_ADD_AS_CANDIDATE,
FIELD_PREFIX_TO_EXCLUDE_AS_CANDIDATE,
FIELDS_TO_ADD_AS_CANDIDATE,
FIELDS_TO_EXCLUDE_AS_CANDIDATE,
POPULATED_DOC_COUNT_SAMPLE_SIZE,
} from './constants';
} from '../constants';
import { hasPrefixToInclude } from '../utils';
import { getQueryWithParams } from './get_query_with_params';
import { getRequestBase } from './get_request_base';
import type { FieldName } from './query_field_value_pairs';
export const shouldBeExcluded = (fieldName: string) => {
return (
@ -30,16 +32,10 @@ export const shouldBeExcluded = (fieldName: string) => {
);
};
export const hasPrefixToInclude = (fieldName: string) => {
return FIELD_PREFIX_TO_ADD_AS_CANDIDATE.some((prefix) =>
fieldName.startsWith(prefix)
);
};
export const getRandomDocsRequest = (
params: SearchServiceFetchParams
): estypes.SearchRequest => ({
index: params.index,
...getRequestBase(params),
body: {
fields: ['*'],
_source: false,
@ -57,7 +53,7 @@ export const getRandomDocsRequest = (
export const fetchTransactionDurationFieldCandidates = async (
esClient: ElasticsearchClient,
params: SearchServiceFetchParams
): Promise<{ fieldCandidates: Field[] }> => {
): Promise<{ fieldCandidates: FieldName[] }> => {
const { index } = params;
// Get all fields with keyword mapping
const respMapping = await esClient.fieldCaps({

View file

@ -9,14 +9,20 @@ import type { estypes } from '@elastic/elasticsearch';
import type { ElasticsearchClient } from 'src/core/server';
import type { AsyncSearchProviderProgress } from '../../../../common/search_strategies/correlations/types';
import { asyncSearchServiceLogProvider } from '../async_search_service_log';
import { asyncSearchServiceStateProvider } from '../async_search_service_state';
import {
fetchTransactionDurationFieldValuePairs,
getTermsAggRequest,
} from './query_field_value_pairs';
const params = { index: 'apm-*', start: '2020', end: '2021' };
const params = {
index: 'apm-*',
start: '2020',
end: '2021',
includeFrozen: false,
};
describe('query_field_value_pairs', () => {
describe('getTermsAggRequest', () => {
@ -34,9 +40,6 @@ describe('query_field_value_pairs', () => {
'myFieldCandidate2',
'myFieldCandidate3',
];
const progress = {
loadedFieldValuePairs: 0,
} as AsyncSearchProviderProgress;
const esClientSearchMock = jest.fn((req: estypes.SearchRequest): {
body: estypes.SearchResponse;
@ -56,13 +59,19 @@ describe('query_field_value_pairs', () => {
search: esClientSearchMock,
} as unknown) as ElasticsearchClient;
const { addLogMessage, getLogMessages } = asyncSearchServiceLogProvider();
const state = asyncSearchServiceStateProvider();
const resp = await fetchTransactionDurationFieldValuePairs(
esClientMock,
params,
fieldCandidates,
progress
state,
addLogMessage
);
const { progress } = state.getState();
expect(progress.loadedFieldValuePairs).toBe(1);
expect(resp).toEqual([
{ field: 'myFieldCandidate1', value: 'myValue1' },
@ -73,6 +82,7 @@ describe('query_field_value_pairs', () => {
{ field: 'myFieldCandidate3', value: 'myValue2' },
]);
expect(esClientSearchMock).toHaveBeenCalledTimes(3);
expect(getLogMessages()).toEqual([]);
});
});
});

View file

@ -0,0 +1,124 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient } from 'src/core/server';
import type { estypes } from '@elastic/elasticsearch';
import type { SearchServiceFetchParams } from '../../../../../common/search_strategies/correlations/types';
import type { AsyncSearchServiceLog } from '../async_search_service_log';
import type { AsyncSearchServiceState } from '../async_search_service_state';
import { TERMS_SIZE } from '../constants';
import { getQueryWithParams } from './get_query_with_params';
import { getRequestBase } from './get_request_base';
export type FieldName = string;
interface FieldValuePair {
field: FieldName;
value: string;
}
export type FieldValuePairs = FieldValuePair[];
export const getTermsAggRequest = (
params: SearchServiceFetchParams,
fieldName: FieldName
): estypes.SearchRequest => ({
...getRequestBase(params),
body: {
query: getQueryWithParams({ params }),
size: 0,
aggs: {
attribute_terms: {
terms: {
field: fieldName,
size: TERMS_SIZE,
},
},
},
},
});
const fetchTransactionDurationFieldTerms = async (
esClient: ElasticsearchClient,
params: SearchServiceFetchParams,
fieldName: string,
addLogMessage: AsyncSearchServiceLog['addLogMessage']
): Promise<FieldValuePairs> => {
try {
const resp = await esClient.search(getTermsAggRequest(params, fieldName));
if (resp.body.aggregations === undefined) {
addLogMessage(
`Failed to fetch terms for field candidate ${fieldName} fieldValuePairs, no aggregations returned.`,
JSON.stringify(resp)
);
return [];
}
const buckets = (resp.body.aggregations
.attribute_terms as estypes.AggregationsMultiBucketAggregate<{
key: string;
}>)?.buckets;
if (buckets?.length >= 1) {
return buckets.map((d) => ({
field: fieldName,
value: d.key,
}));
}
} catch (e) {
addLogMessage(
`Failed to fetch terms for field candidate ${fieldName} fieldValuePairs.`,
JSON.stringify(e)
);
}
return [];
};
async function fetchInSequence(
fieldCandidates: FieldName[],
fn: (fieldCandidate: string) => Promise<FieldValuePairs>
) {
const results = [];
for (const fieldCandidate of fieldCandidates) {
results.push(...(await fn(fieldCandidate)));
}
return results;
}
export const fetchTransactionDurationFieldValuePairs = async (
esClient: ElasticsearchClient,
params: SearchServiceFetchParams,
fieldCandidates: FieldName[],
state: AsyncSearchServiceState,
addLogMessage: AsyncSearchServiceLog['addLogMessage']
): Promise<FieldValuePairs> => {
let fieldValuePairsProgress = 1;
return await fetchInSequence(
fieldCandidates,
async function (fieldCandidate: string) {
const fieldTerms = await fetchTransactionDurationFieldTerms(
esClient,
params,
fieldCandidate,
addLogMessage
);
state.setProgress({
loadedFieldValuePairs: fieldValuePairsProgress / fieldCandidates.length,
});
fieldValuePairsProgress++;
return fieldTerms;
}
);
};

View file

@ -14,7 +14,12 @@ import {
getTransactionDurationRangesRequest,
} from './query_fractions';
const params = { index: 'apm-*', start: '2020', end: '2021' };
const params = {
index: 'apm-*',
start: '2020',
end: '2021',
includeFrozen: false,
};
const ranges = [{ to: 1 }, { from: 1, to: 3 }, { from: 3, to: 5 }, { from: 5 }];
describe('query_fractions', () => {

View file

@ -7,15 +7,18 @@
import { ElasticsearchClient } from 'kibana/server';
import { estypes } from '@elastic/elasticsearch';
import { SearchServiceFetchParams } from '../../../../common/search_strategies/correlations/types';
import { SearchServiceFetchParams } from '../../../../../common/search_strategies/correlations/types';
import { TRANSACTION_DURATION } from '../../../../../common/elasticsearch_fieldnames';
import { getQueryWithParams } from './get_query_with_params';
import { TRANSACTION_DURATION } from '../../../../common/elasticsearch_fieldnames';
import { getRequestBase } from './get_request_base';
export const getTransactionDurationRangesRequest = (
params: SearchServiceFetchParams,
ranges: estypes.AggregationsAggregationRange[]
): estypes.SearchRequest => ({
index: params.index,
...getRequestBase(params),
body: {
query: getQueryWithParams({ params }),
size: 0,

View file

@ -14,7 +14,12 @@ import {
getTransactionDurationHistogramRequest,
} from './query_histogram';
const params = { index: 'apm-*', start: '2020', end: '2021' };
const params = {
index: 'apm-*',
start: '2020',
end: '2021',
includeFrozen: false,
};
const interval = 100;
describe('query_histogram', () => {
@ -54,7 +59,9 @@ describe('query_histogram', () => {
},
size: 0,
},
index: 'apm-*',
index: params.index,
ignore_throttled: !params.includeFrozen,
ignore_unavailable: true,
});
});
});

View file

@ -9,36 +9,33 @@ import type { estypes } from '@elastic/elasticsearch';
import type { ElasticsearchClient } from 'src/core/server';
import { TRANSACTION_DURATION } from '../../../../common/elasticsearch_fieldnames';
import { TRANSACTION_DURATION } from '../../../../../common/elasticsearch_fieldnames';
import type {
HistogramItem,
ResponseHit,
SearchServiceFetchParams,
} from '../../../../common/search_strategies/correlations/types';
} from '../../../../../common/search_strategies/correlations/types';
import { getQueryWithParams } from './get_query_with_params';
import { getRequestBase } from './get_request_base';
export const getTransactionDurationHistogramRequest = (
params: SearchServiceFetchParams,
interval: number,
fieldName?: string,
fieldValue?: string
): estypes.SearchRequest => {
const query = getQueryWithParams({ params, fieldName, fieldValue });
return {
index: params.index,
body: {
query,
size: 0,
aggs: {
transaction_duration_histogram: {
histogram: { field: TRANSACTION_DURATION, interval },
},
): estypes.SearchRequest => ({
...getRequestBase(params),
body: {
query: getQueryWithParams({ params, fieldName, fieldValue }),
size: 0,
aggs: {
transaction_duration_histogram: {
histogram: { field: TRANSACTION_DURATION, interval },
},
},
};
};
},
});
export const fetchTransactionDurationHistogram = async (
esClient: ElasticsearchClient,

View file

@ -14,7 +14,12 @@ import {
getHistogramIntervalRequest,
} from './query_histogram_interval';
const params = { index: 'apm-*', start: '2020', end: '2021' };
const params = {
index: 'apm-*',
start: '2020',
end: '2021',
includeFrozen: false,
};
describe('query_histogram_interval', () => {
describe('getHistogramIntervalRequest', () => {
@ -58,6 +63,8 @@ describe('query_histogram_interval', () => {
size: 0,
},
index: params.index,
ignore_throttled: !params.includeFrozen,
ignore_unavailable: true,
});
});
});

View file

@ -9,17 +9,18 @@ import type { estypes } from '@elastic/elasticsearch';
import type { ElasticsearchClient } from 'src/core/server';
import { TRANSACTION_DURATION } from '../../../../common/elasticsearch_fieldnames';
import type { SearchServiceFetchParams } from '../../../../common/search_strategies/correlations/types';
import { TRANSACTION_DURATION } from '../../../../../common/elasticsearch_fieldnames';
import type { SearchServiceFetchParams } from '../../../../../common/search_strategies/correlations/types';
import { getQueryWithParams } from './get_query_with_params';
import { getRequestBase } from './get_request_base';
const HISTOGRAM_INTERVALS = 1000;
export const getHistogramIntervalRequest = (
params: SearchServiceFetchParams
): estypes.SearchRequest => ({
index: params.index,
...getRequestBase(params),
body: {
query: getQueryWithParams({ params }),
size: 0,

View file

@ -14,7 +14,12 @@ import {
getHistogramIntervalRequest,
} from './query_histogram_range_steps';
const params = { index: 'apm-*', start: '2020', end: '2021' };
const params = {
index: 'apm-*',
start: '2020',
end: '2021',
includeFrozen: false,
};
describe('query_histogram_range_steps', () => {
describe('getHistogramIntervalRequest', () => {
@ -58,6 +63,8 @@ describe('query_histogram_range_steps', () => {
size: 0,
},
index: params.index,
ignore_throttled: !params.includeFrozen,
ignore_unavailable: true,
});
});
});

View file

@ -11,10 +11,11 @@ import type { estypes } from '@elastic/elasticsearch';
import type { ElasticsearchClient } from 'src/core/server';
import { TRANSACTION_DURATION } from '../../../../common/elasticsearch_fieldnames';
import type { SearchServiceFetchParams } from '../../../../common/search_strategies/correlations/types';
import { TRANSACTION_DURATION } from '../../../../../common/elasticsearch_fieldnames';
import type { SearchServiceFetchParams } from '../../../../../common/search_strategies/correlations/types';
import { getQueryWithParams } from './get_query_with_params';
import { getRequestBase } from './get_request_base';
const getHistogramRangeSteps = (min: number, max: number, steps: number) => {
// A d3 based scale function as a helper to get equally distributed bins on a log scale.
@ -27,7 +28,7 @@ const getHistogramRangeSteps = (min: number, max: number, steps: number) => {
export const getHistogramIntervalRequest = (
params: SearchServiceFetchParams
): estypes.SearchRequest => ({
index: params.index,
...getRequestBase(params),
body: {
query: getQueryWithParams({ params }),
size: 0,

View file

@ -0,0 +1,134 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { estypes } from '@elastic/elasticsearch';
import type { ElasticsearchClient } from 'src/core/server';
import { asyncSearchServiceLogProvider } from '../async_search_service_log';
import { asyncSearchServiceStateProvider } from '../async_search_service_state';
import { fetchTransactionDurationHistograms } from './query_histograms_generator';
const params = {
index: 'apm-*',
start: '2020',
end: '2021',
includeFrozen: false,
};
const expectations = [1, 3, 5];
const ranges = [{ to: 1 }, { from: 1, to: 3 }, { from: 3, to: 5 }, { from: 5 }];
const fractions = [1, 2, 4, 5];
const totalDocCount = 1234;
const histogramRangeSteps = [1, 2, 4, 5];
const fieldValuePairs = [
{ field: 'the-field-name-1', value: 'the-field-value-1' },
{ field: 'the-field-name-2', value: 'the-field-value-2' },
{ field: 'the-field-name-2', value: 'the-field-value-3' },
];
describe('query_histograms_generator', () => {
describe('fetchTransactionDurationHistograms', () => {
it(`doesn't break on failing ES queries and adds messages to the log`, async () => {
const esClientSearchMock = jest.fn((req: estypes.SearchRequest): {
body: estypes.SearchResponse;
} => {
return {
body: ({} as unknown) as estypes.SearchResponse,
};
});
const esClientMock = ({
search: esClientSearchMock,
} as unknown) as ElasticsearchClient;
const state = asyncSearchServiceStateProvider();
const { addLogMessage, getLogMessages } = asyncSearchServiceLogProvider();
let loadedHistograms = 0;
const items = [];
for await (const item of fetchTransactionDurationHistograms(
esClientMock,
addLogMessage,
params,
state,
expectations,
ranges,
fractions,
histogramRangeSteps,
totalDocCount,
fieldValuePairs
)) {
if (item !== undefined) {
items.push(item);
}
loadedHistograms++;
}
expect(items.length).toEqual(0);
expect(loadedHistograms).toEqual(3);
expect(esClientSearchMock).toHaveBeenCalledTimes(3);
expect(getLogMessages().map((d) => d.split(': ')[1])).toEqual([
"Failed to fetch correlation/kstest for 'the-field-name-1/the-field-value-1'",
"Failed to fetch correlation/kstest for 'the-field-name-2/the-field-value-2'",
"Failed to fetch correlation/kstest for 'the-field-name-2/the-field-value-3'",
]);
});
it('returns items with correlation and ks-test value', async () => {
const esClientSearchMock = jest.fn((req: estypes.SearchRequest): {
body: estypes.SearchResponse;
} => {
return {
body: ({
aggregations: {
latency_ranges: { buckets: [] },
transaction_duration_correlation: { value: 0.6 },
ks_test: { less: 0.001 },
logspace_ranges: { buckets: [] },
},
} as unknown) as estypes.SearchResponse,
};
});
const esClientMock = ({
search: esClientSearchMock,
} as unknown) as ElasticsearchClient;
const state = asyncSearchServiceStateProvider();
const { addLogMessage, getLogMessages } = asyncSearchServiceLogProvider();
let loadedHistograms = 0;
const items = [];
for await (const item of fetchTransactionDurationHistograms(
esClientMock,
addLogMessage,
params,
state,
expectations,
ranges,
fractions,
histogramRangeSteps,
totalDocCount,
fieldValuePairs
)) {
if (item !== undefined) {
items.push(item);
}
loadedHistograms++;
}
expect(items.length).toEqual(3);
expect(loadedHistograms).toEqual(3);
expect(esClientSearchMock).toHaveBeenCalledTimes(6);
expect(getLogMessages().length).toEqual(0);
});
});
});

View file

@ -0,0 +1,97 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { estypes } from '@elastic/elasticsearch';
import type { ElasticsearchClient } from 'src/core/server';
import type { SearchServiceFetchParams } from '../../../../../common/search_strategies/correlations/types';
import type { AsyncSearchServiceLog } from '../async_search_service_log';
import type { AsyncSearchServiceState } from '../async_search_service_state';
import { CORRELATION_THRESHOLD, KS_TEST_THRESHOLD } from '../constants';
import { getPrioritizedFieldValuePairs } from './get_prioritized_field_value_pairs';
import { fetchTransactionDurationCorrelation } from './query_correlation';
import { fetchTransactionDurationRanges } from './query_ranges';
import type { FieldValuePairs } from './query_field_value_pairs';
export async function* fetchTransactionDurationHistograms(
esClient: ElasticsearchClient,
addLogMessage: AsyncSearchServiceLog['addLogMessage'],
params: SearchServiceFetchParams,
state: AsyncSearchServiceState,
expectations: number[],
ranges: estypes.AggregationsAggregationRange[],
fractions: number[],
histogramRangeSteps: number[],
totalDocCount: number,
fieldValuePairs: FieldValuePairs
) {
for (const item of getPrioritizedFieldValuePairs(fieldValuePairs)) {
if (params === undefined || item === undefined || state.getIsCancelled()) {
state.setIsRunning(false);
return;
}
// If one of the fields have an error
// We don't want to stop the whole process
try {
const { correlation, ksTest } = await fetchTransactionDurationCorrelation(
esClient,
params,
expectations,
ranges,
fractions,
totalDocCount,
item.field,
item.value
);
if (state.getIsCancelled()) {
state.setIsRunning(false);
return;
}
if (
correlation !== null &&
correlation > CORRELATION_THRESHOLD &&
ksTest !== null &&
ksTest < KS_TEST_THRESHOLD
) {
const logHistogram = await fetchTransactionDurationRanges(
esClient,
params,
histogramRangeSteps,
item.field,
item.value
);
yield {
...item,
correlation,
ksTest,
histogram: logHistogram,
};
} else {
yield undefined;
}
} catch (e) {
// don't fail the whole process for individual correlation queries,
// just add the error to the internal log and check if we'd want to set the
// cross-cluster search compatibility warning to true.
addLogMessage(
`Failed to fetch correlation/kstest for '${item.field}/${item.value}'`,
JSON.stringify(e)
);
if (params?.index.includes(':')) {
state.setCcsWarning(true);
}
yield undefined;
}
}
}

View file

@ -14,7 +14,12 @@ import {
getTransactionDurationPercentilesRequest,
} from './query_percentiles';
const params = { index: 'apm-*', start: '2020', end: '2021' };
const params = {
index: 'apm-*',
start: '2020',
end: '2021',
includeFrozen: false,
};
describe('query_percentiles', () => {
describe('getTransactionDurationPercentilesRequest', () => {
@ -57,6 +62,8 @@ describe('query_percentiles', () => {
track_total_hits: true,
},
index: params.index,
ignore_throttled: !params.includeFrozen,
ignore_unavailable: true,
});
});
});

View file

@ -9,11 +9,12 @@ import type { estypes } from '@elastic/elasticsearch';
import type { ElasticsearchClient } from 'src/core/server';
import { TRANSACTION_DURATION } from '../../../../common/elasticsearch_fieldnames';
import type { SearchServiceFetchParams } from '../../../../common/search_strategies/correlations/types';
import { TRANSACTION_DURATION } from '../../../../../common/elasticsearch_fieldnames';
import type { SearchServiceFetchParams } from '../../../../../common/search_strategies/correlations/types';
import { getQueryWithParams } from './get_query_with_params';
import { SIGNIFICANT_VALUE_DIGITS } from './constants';
import { getRequestBase } from './get_request_base';
import { SIGNIFICANT_VALUE_DIGITS } from '../constants';
export interface HistogramItem {
key: number;
@ -36,7 +37,7 @@ export const getTransactionDurationPercentilesRequest = (
const query = getQueryWithParams({ params, fieldName, fieldValue });
return {
index: params.index,
...getRequestBase(params),
body: {
track_total_hits: true,
query,

View file

@ -14,7 +14,12 @@ import {
getTransactionDurationRangesRequest,
} from './query_ranges';
const params = { index: 'apm-*', start: '2020', end: '2021' };
const params = {
index: 'apm-*',
start: '2020',
end: '2021',
includeFrozen: false,
};
const rangeSteps = [1, 3, 5];
describe('query_ranges', () => {
@ -74,6 +79,8 @@ describe('query_ranges', () => {
size: 0,
},
index: params.index,
ignore_throttled: !params.includeFrozen,
ignore_unavailable: true,
});
});
});

View file

@ -9,10 +9,11 @@ import type { estypes } from '@elastic/elasticsearch';
import type { ElasticsearchClient } from 'src/core/server';
import { TRANSACTION_DURATION } from '../../../../common/elasticsearch_fieldnames';
import type { SearchServiceFetchParams } from '../../../../common/search_strategies/correlations/types';
import { TRANSACTION_DURATION } from '../../../../../common/elasticsearch_fieldnames';
import type { SearchServiceFetchParams } from '../../../../../common/search_strategies/correlations/types';
import { getQueryWithParams } from './get_query_with_params';
import { getRequestBase } from './get_request_base';
export interface HistogramItem {
key: number;
@ -47,7 +48,7 @@ export const getTransactionDurationRangesRequest = (
}
return {
index: params.index,
...getRequestBase(params),
body: {
query,
size: 0,

View file

@ -1,89 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient } from 'src/core/server';
import type { estypes } from '@elastic/elasticsearch';
import type {
AsyncSearchProviderProgress,
SearchServiceFetchParams,
} from '../../../../common/search_strategies/correlations/types';
import { getQueryWithParams } from './get_query_with_params';
import { TERMS_SIZE } from './constants';
interface FieldValuePair {
field: string;
value: string;
}
type FieldValuePairs = FieldValuePair[];
export type Field = string;
export const getTermsAggRequest = (
params: SearchServiceFetchParams,
fieldName: string
): estypes.SearchRequest => ({
index: params.index,
body: {
query: getQueryWithParams({ params }),
size: 0,
aggs: {
attribute_terms: {
terms: {
field: fieldName,
size: TERMS_SIZE,
},
},
},
},
});
export const fetchTransactionDurationFieldValuePairs = async (
esClient: ElasticsearchClient,
params: SearchServiceFetchParams,
fieldCandidates: Field[],
progress: AsyncSearchProviderProgress
): Promise<FieldValuePairs> => {
const fieldValuePairs: FieldValuePairs = [];
let fieldValuePairsProgress = 1;
for (let i = 0; i < fieldCandidates.length; i++) {
const fieldName = fieldCandidates[i];
// mutate progress
progress.loadedFieldValuePairs =
fieldValuePairsProgress / fieldCandidates.length;
try {
const resp = await esClient.search(getTermsAggRequest(params, fieldName));
if (resp.body.aggregations === undefined) {
fieldValuePairsProgress++;
continue;
}
const buckets = (resp.body.aggregations
.attribute_terms as estypes.AggregationsMultiBucketAggregate<{
key: string;
}>)?.buckets;
if (buckets.length >= 1) {
fieldValuePairs.push(
...buckets.map((d) => ({
field: fieldName,
value: d.key,
}))
);
}
fieldValuePairsProgress++;
} catch (e) {
fieldValuePairsProgress++;
}
}
return fieldValuePairs;
};

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { computeExpectationsAndRanges } from './aggregation_utils';
import { computeExpectationsAndRanges } from './compute_expectations_and_ranges';
describe('aggregation utils', () => {
describe('computeExpectationsAndRanges', () => {

View file

@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { currentTimeAsString } from './current_time_as_string';
describe('aggregation utils', () => {
describe('currentTimeAsString', () => {
it('returns the current time as a string', () => {
const mockDate = new Date(1392202800000);
// @ts-ignore ignore the mockImplementation callback error
const spy = jest.spyOn(global, 'Date').mockImplementation(() => mockDate);
const timeString = currentTimeAsString();
expect(timeString).toEqual('2014-02-12T11:00:00.000Z');
spy.mockRestore();
});
});
});

View file

@ -0,0 +1,8 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const currentTimeAsString = () => new Date().toISOString();

View file

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { FIELD_PREFIX_TO_ADD_AS_CANDIDATE } from '../constants';
import { hasPrefixToInclude } from './has_prefix_to_include';
describe('aggregation utils', () => {
describe('hasPrefixToInclude', () => {
it('returns true if the prefix is included', async () => {
FIELD_PREFIX_TO_ADD_AS_CANDIDATE.forEach((prefix) => {
expect(hasPrefixToInclude(`${prefix}the-field-name`)).toBe(true);
});
});
it('returns false if the prefix is included', async () => {
FIELD_PREFIX_TO_ADD_AS_CANDIDATE.forEach((prefix) => {
expect(
hasPrefixToInclude(`unknown-prefix-.${prefix}the-field-name`)
).toBe(false);
expect(hasPrefixToInclude('the-field-name')).toBe(false);
});
});
});
});

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { FIELD_PREFIX_TO_ADD_AS_CANDIDATE } from '../constants';
export const hasPrefixToInclude = (fieldName: string) => {
return FIELD_PREFIX_TO_ADD_AS_CANDIDATE.some((prefix) =>
fieldName.startsWith(prefix)
);
};

View file

@ -5,5 +5,6 @@
* 2.0.
*/
export * from './math_utils';
export * from './aggregation_utils';
export { computeExpectationsAndRanges } from './compute_expectations_and_ranges';
export { currentTimeAsString } from './current_time_as_string';
export { hasPrefixToInclude } from './has_prefix_to_include';

View file

@ -1,26 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { getRandomInt } from './math_utils';
describe('math utils', () => {
describe('getRandomInt', () => {
it('returns a random integer within the given range', () => {
const min = 0.9;
const max = 11.1;
const randomInt = getRandomInt(min, max);
expect(Number.isInteger(randomInt)).toBe(true);
expect(randomInt > min).toBe(true);
expect(randomInt < max).toBe(true);
});
it('returns 1 if given range only allows this integer', () => {
const randomInt = getRandomInt(0.9, 1.1);
expect(randomInt).toBe(1);
});
});
});

View file

@ -1,70 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { range } from 'lodash';
import { HistogramItem } from '../query_ranges';
import { asPreciseDecimal } from '../../../../../common/utils/formatters';
// From https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Math/random
export function getRandomInt(min: number, max: number) {
min = Math.ceil(min);
max = Math.floor(max);
return Math.floor(Math.random() * (max - min + 1) + min); // The maximum is inclusive and the minimum is inclusive
}
// Roughly compare histograms by sampling random bins
// And rounding up histogram count to account for different floating points
export const isHistogramRoughlyEqual = (
a: HistogramItem[],
b: HistogramItem[],
{ numBinsToSample = 10, significantFraction = 3 }
) => {
if (a.length !== b.length) return false;
const sampledIndices = Array.from(Array(numBinsToSample).keys()).map(() =>
getRandomInt(0, a.length - 1)
);
return !sampledIndices.some((idx) => {
return (
asPreciseDecimal(a[idx].key, significantFraction) !==
asPreciseDecimal(b[idx].key, significantFraction) &&
roundToNearest(a[idx].doc_count) !== roundToNearest(b[idx].doc_count)
);
});
};
/** Round numeric to the nearest 5
* E.g. if roundBy = 5, results will be 11 -> 10, 14 -> 10, 16 -> 20
*/
export const roundToNearest = (n: number, roundBy = 5) => {
return Math.ceil((n + 1) / roundBy) * roundBy;
};
/**
* Create a rough stringified version of the histogram
*/
export const hashHistogram = (
histogram: HistogramItem[],
{ significantFraction = 3, numBinsToSample = 10 }
) => {
// Generate bins to sample evenly
const sampledIndices = Array.from(
range(
0,
histogram.length - 1,
Math.ceil(histogram.length / numBinsToSample)
)
);
return JSON.stringify(
sampledIndices.map((idx) => {
return `${asPreciseDecimal(
histogram[idx].key,
significantFraction
)}-${roundToNearest(histogram[idx].doc_count)}`;
})
);
};