[Logs UI] Load <LogStream> entries via async searches (#86899)

This PR replaces the usage of plain HTTP routes to load the log stream entries with async search strategy calls.
This commit is contained in:
Felix Stürmer 2021-02-02 15:42:27 +01:00 committed by GitHub
parent 2a4d39aae4
commit 7fa30ba33e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
82 changed files with 1938 additions and 2287 deletions

View file

@ -28,12 +28,6 @@ export interface InfraSource {
configuration: InfraSourceConfiguration;
/** The status of the source */
status: InfraSourceStatus;
/** A consecutive span of log entries surrounding a point in time */
logEntriesAround: InfraLogEntryInterval;
/** A consecutive span of log entries within an interval */
logEntriesBetween: InfraLogEntryInterval;
/** Sequences of log entries matching sets of highlighting queries within an interval */
logEntryHighlights: InfraLogEntryInterval[];
/** A snapshot of nodes */
snapshot?: InfraSnapshotResponse | null;
@ -129,80 +123,6 @@ export interface InfraIndexField {
/** Whether the field should be displayed based on event.module and a ECS allowed list */
displayable: boolean;
}
/** A consecutive sequence of log entries */
export interface InfraLogEntryInterval {
/** The key corresponding to the start of the interval covered by the entries */
start?: InfraTimeKey | null;
/** The key corresponding to the end of the interval covered by the entries */
end?: InfraTimeKey | null;
/** Whether there are more log entries available before the start */
hasMoreBefore: boolean;
/** Whether there are more log entries available after the end */
hasMoreAfter: boolean;
/** The query the log entries were filtered by */
filterQuery?: string | null;
/** The query the log entries were highlighted with */
highlightQuery?: string | null;
/** A list of the log entries */
entries: InfraLogEntry[];
}
/** A representation of the log entry's position in the event stream */
export interface InfraTimeKey {
/** The timestamp of the event that the log entry corresponds to */
time: number;
/** The tiebreaker that disambiguates events with the same timestamp */
tiebreaker: number;
}
/** A log entry */
export interface InfraLogEntry {
/** A unique representation of the log entry's position in the event stream */
key: InfraTimeKey;
/** The log entry's id */
gid: string;
/** The source id */
source: string;
/** The columns used for rendering the log entry */
columns: InfraLogEntryColumn[];
}
/** A special built-in column that contains the log entry's timestamp */
export interface InfraLogEntryTimestampColumn {
/** The id of the corresponding column configuration */
columnId: string;
/** The timestamp */
timestamp: number;
}
/** A special built-in column that contains the log entry's constructed message */
export interface InfraLogEntryMessageColumn {
/** The id of the corresponding column configuration */
columnId: string;
/** A list of the formatted log entry segments */
message: InfraLogMessageSegment[];
}
/** A segment of the log entry message that was derived from a field */
export interface InfraLogMessageFieldSegment {
/** The field the segment was derived from */
field: string;
/** The segment's message */
value: string;
/** A list of highlighted substrings of the value */
highlights: string[];
}
/** A segment of the log entry message that was derived from a string literal */
export interface InfraLogMessageConstantSegment {
/** The segment's message */
constant: string;
}
/** A column that contains the value of a field of the log entry */
export interface InfraLogEntryFieldColumn {
/** The id of the corresponding column configuration */
columnId: string;
/** The field name of the column */
field: string;
/** The value of the field in the log entry */
value: string;
/** A list of highlighted substrings of the value */
highlights: string[];
}
export interface InfraSnapshotResponse {
/** Nodes of type host, container or pod grouped by 0, 1 or 2 terms */
@ -276,21 +196,6 @@ export interface DeleteSourceResult {
// InputTypes
// ====================================================
export interface InfraTimeKeyInput {
time: number;
tiebreaker: number;
}
/** A highlighting definition */
export interface InfraLogEntryHighlightInput {
/** The query to highlight by */
query: string;
/** The number of highlighted documents to include beyond the beginning of the interval */
countBefore: number;
/** The number of highlighted documents to include beyond the end of the interval */
countAfter: number;
}
export interface InfraTimerangeInput {
/** The interval string to use for last bucket. The format is '{value}{unit}'. For example '5m' would return the metrics for the last 5 minutes of the timespan. */
interval: string;
@ -381,34 +286,6 @@ export interface SourceQueryArgs {
/** The id of the source */
id: string;
}
export interface LogEntriesAroundInfraSourceArgs {
/** The sort key that corresponds to the point in time */
key: InfraTimeKeyInput;
/** The maximum number of preceding to return */
countBefore?: number | null;
/** The maximum number of following to return */
countAfter?: number | null;
/** The query to filter the log entries by */
filterQuery?: string | null;
}
export interface LogEntriesBetweenInfraSourceArgs {
/** The sort key that corresponds to the start of the interval */
startKey: InfraTimeKeyInput;
/** The sort key that corresponds to the end of the interval */
endKey: InfraTimeKeyInput;
/** The query to filter the log entries by */
filterQuery?: string | null;
}
export interface LogEntryHighlightsInfraSourceArgs {
/** The sort key that corresponds to the start of the interval */
startKey: InfraTimeKeyInput;
/** The sort key that corresponds to the end of the interval */
endKey: InfraTimeKeyInput;
/** The query to filter the log entries by */
filterQuery?: string | null;
/** The highlighting to apply to the log entries */
highlights: InfraLogEntryHighlightInput[];
}
export interface SnapshotInfraSourceArgs {
timerange: InfraTimerangeInput;
@ -565,15 +442,6 @@ export type InfraSourceLogColumn =
| InfraSourceMessageLogColumn
| InfraSourceFieldLogColumn;
/** A column of a log entry */
export type InfraLogEntryColumn =
| InfraLogEntryTimestampColumn
| InfraLogEntryMessageColumn
| InfraLogEntryFieldColumn;
/** A segment of the log entry message */
export type InfraLogMessageSegment = InfraLogMessageFieldSegment | InfraLogMessageConstantSegment;
// ====================================================
// END: Typescript template
// ====================================================
@ -582,46 +450,6 @@ export type InfraLogMessageSegment = InfraLogMessageFieldSegment | InfraLogMessa
// Documents
// ====================================================
export namespace LogEntryHighlightsQuery {
export type Variables = {
sourceId?: string | null;
startKey: InfraTimeKeyInput;
endKey: InfraTimeKeyInput;
filterQuery?: string | null;
highlights: InfraLogEntryHighlightInput[];
};
export type Query = {
__typename?: 'Query';
source: Source;
};
export type Source = {
__typename?: 'InfraSource';
id: string;
logEntryHighlights: LogEntryHighlights[];
};
export type LogEntryHighlights = {
__typename?: 'InfraLogEntryInterval';
start?: Start | null;
end?: End | null;
entries: Entries[];
};
export type Start = InfraTimeKeyFields.Fragment;
export type End = InfraTimeKeyFields.Fragment;
export type Entries = InfraLogEntryHighlightFields.Fragment;
}
export namespace MetricsQuery {
export type Variables = {
sourceId: string;
@ -820,50 +648,6 @@ export namespace WaffleNodesQuery {
};
}
export namespace LogEntries {
export type Variables = {
sourceId?: string | null;
timeKey: InfraTimeKeyInput;
countBefore?: number | null;
countAfter?: number | null;
filterQuery?: string | null;
};
export type Query = {
__typename?: 'Query';
source: Source;
};
export type Source = {
__typename?: 'InfraSource';
id: string;
logEntriesAround: LogEntriesAround;
};
export type LogEntriesAround = {
__typename?: 'InfraLogEntryInterval';
start?: Start | null;
end?: End | null;
hasMoreBefore: boolean;
hasMoreAfter: boolean;
entries: Entries[];
};
export type Start = InfraTimeKeyFields.Fragment;
export type End = InfraTimeKeyFields.Fragment;
export type Entries = InfraLogEntryFields.Fragment;
}
export namespace SourceConfigurationFields {
export type Fragment = {
__typename?: 'InfraSourceConfiguration';
@ -994,124 +778,3 @@ export namespace InfraSourceFields {
origin: string;
};
}
export namespace InfraLogEntryFields {
export type Fragment = {
__typename?: 'InfraLogEntry';
gid: string;
key: Key;
columns: Columns[];
};
export type Key = {
__typename?: 'InfraTimeKey';
time: number;
tiebreaker: number;
};
export type Columns =
| InfraLogEntryTimestampColumnInlineFragment
| InfraLogEntryMessageColumnInlineFragment
| InfraLogEntryFieldColumnInlineFragment;
export type InfraLogEntryTimestampColumnInlineFragment = {
__typename?: 'InfraLogEntryTimestampColumn';
columnId: string;
timestamp: number;
};
export type InfraLogEntryMessageColumnInlineFragment = {
__typename?: 'InfraLogEntryMessageColumn';
columnId: string;
message: Message[];
};
export type Message =
| InfraLogMessageFieldSegmentInlineFragment
| InfraLogMessageConstantSegmentInlineFragment;
export type InfraLogMessageFieldSegmentInlineFragment = {
__typename?: 'InfraLogMessageFieldSegment';
field: string;
value: string;
};
export type InfraLogMessageConstantSegmentInlineFragment = {
__typename?: 'InfraLogMessageConstantSegment';
constant: string;
};
export type InfraLogEntryFieldColumnInlineFragment = {
__typename?: 'InfraLogEntryFieldColumn';
columnId: string;
field: string;
value: string;
};
}
export namespace InfraLogEntryHighlightFields {
export type Fragment = {
__typename?: 'InfraLogEntry';
gid: string;
key: Key;
columns: Columns[];
};
export type Key = {
__typename?: 'InfraTimeKey';
time: number;
tiebreaker: number;
};
export type Columns =
| InfraLogEntryMessageColumnInlineFragment
| InfraLogEntryFieldColumnInlineFragment;
export type InfraLogEntryMessageColumnInlineFragment = {
__typename?: 'InfraLogEntryMessageColumn';
columnId: string;
message: Message[];
};
export type Message = InfraLogMessageFieldSegmentInlineFragment;
export type InfraLogMessageFieldSegmentInlineFragment = {
__typename?: 'InfraLogMessageFieldSegment';
field: string;
highlights: string[];
};
export type InfraLogEntryFieldColumnInlineFragment = {
__typename?: 'InfraLogEntryFieldColumn';
columnId: string;
field: string;
highlights: string[];
};
}

View file

@ -53,6 +53,7 @@ export const logSourceColumnConfigurationRT = rt.union([
logSourceMessageColumnConfigurationRT,
logSourceFieldColumnConfigurationRT,
]);
export type LogSourceColumnConfiguration = rt.TypeOf<typeof logSourceColumnConfigurationRT>;
export const logSourceConfigurationPropertiesRT = rt.strict({
name: rt.string,

View file

@ -6,87 +6,79 @@
import * as rt from 'io-ts';
import { TimeKey } from '../time';
import { logEntryCursorRT } from './log_entry_cursor';
import { jsonArrayRT } from '../typed_json';
export interface LogEntryOrigin {
id: string;
index: string;
type: string;
}
import { logEntryCursorRT } from './log_entry_cursor';
export type LogEntryTime = TimeKey;
export interface LogEntryFieldsMapping {
message: string;
tiebreaker: string;
time: string;
}
export function isEqual(time1: LogEntryTime, time2: LogEntryTime) {
return time1.time === time2.time && time1.tiebreaker === time2.tiebreaker;
}
export function isLess(time1: LogEntryTime, time2: LogEntryTime) {
return (
time1.time < time2.time || (time1.time === time2.time && time1.tiebreaker < time2.tiebreaker)
);
}
export function isLessOrEqual(time1: LogEntryTime, time2: LogEntryTime) {
return (
time1.time < time2.time || (time1.time === time2.time && time1.tiebreaker <= time2.tiebreaker)
);
}
export function isBetween(min: LogEntryTime, max: LogEntryTime, operand: LogEntryTime) {
return isLessOrEqual(min, operand) && isLessOrEqual(operand, max);
}
/**
* message parts
*/
export const logMessageConstantPartRT = rt.type({
constant: rt.string,
});
export type LogMessageConstantPart = rt.TypeOf<typeof logMessageConstantPartRT>;
export const logMessageFieldPartRT = rt.type({
field: rt.string,
value: jsonArrayRT,
highlights: rt.array(rt.string),
});
export type LogMessageFieldPart = rt.TypeOf<typeof logMessageFieldPartRT>;
export const logMessagePartRT = rt.union([logMessageConstantPartRT, logMessageFieldPartRT]);
export type LogMessagePart = rt.TypeOf<typeof logMessagePartRT>;
/**
* columns
*/
export const logTimestampColumnRT = rt.type({ columnId: rt.string, timestamp: rt.number });
export type LogTimestampColumn = rt.TypeOf<typeof logTimestampColumnRT>;
export const logFieldColumnRT = rt.type({
columnId: rt.string,
field: rt.string,
value: jsonArrayRT,
highlights: rt.array(rt.string),
});
export type LogFieldColumn = rt.TypeOf<typeof logFieldColumnRT>;
export const logMessageColumnRT = rt.type({
columnId: rt.string,
message: rt.array(logMessagePartRT),
});
export type LogMessageColumn = rt.TypeOf<typeof logMessageColumnRT>;
export const logColumnRT = rt.union([logTimestampColumnRT, logFieldColumnRT, logMessageColumnRT]);
export type LogColumn = rt.TypeOf<typeof logColumnRT>;
/**
* fields
*/
export const logEntryContextRT = rt.union([
rt.type({}),
rt.type({ 'container.id': rt.string }),
rt.type({ 'host.name': rt.string, 'log.file.path': rt.string }),
]);
export type LogEntryContext = rt.TypeOf<typeof logEntryContextRT>;
export const logEntryFieldRT = rt.type({
field: rt.string,
value: jsonArrayRT,
});
export type LogEntryField = rt.TypeOf<typeof logEntryFieldRT>;
/**
* entry
*/
export const logEntryRT = rt.type({
id: rt.string,
index: rt.string,
cursor: logEntryCursorRT,
columns: rt.array(logColumnRT),
context: logEntryContextRT,
});
export type LogMessageConstantPart = rt.TypeOf<typeof logMessageConstantPartRT>;
export type LogMessageFieldPart = rt.TypeOf<typeof logMessageFieldPartRT>;
export type LogMessagePart = rt.TypeOf<typeof logMessagePartRT>;
export type LogEntryContext = rt.TypeOf<typeof logEntryContextRT>;
export type LogEntry = rt.TypeOf<typeof logEntryRT>;
export type LogTimestampColumn = rt.TypeOf<typeof logTimestampColumnRT>;
export type LogFieldColumn = rt.TypeOf<typeof logFieldColumnRT>;
export type LogMessageColumn = rt.TypeOf<typeof logMessageColumnRT>;
export type LogColumn = rt.TypeOf<typeof logColumnRT>;

View file

@ -11,9 +11,23 @@ export const logEntryCursorRT = rt.type({
time: rt.number,
tiebreaker: rt.number,
});
export type LogEntryCursor = rt.TypeOf<typeof logEntryCursorRT>;
export const logEntryBeforeCursorRT = rt.type({
before: rt.union([logEntryCursorRT, rt.literal('last')]),
});
export type LogEntryBeforeCursor = rt.TypeOf<typeof logEntryBeforeCursorRT>;
export const logEntryAfterCursorRT = rt.type({
after: rt.union([logEntryCursorRT, rt.literal('first')]),
});
export type LogEntryAfterCursor = rt.TypeOf<typeof logEntryAfterCursorRT>;
export const logEntryAroundCursorRT = rt.type({
center: logEntryCursorRT,
});
export type LogEntryAroundCursor = rt.TypeOf<typeof logEntryAroundCursorRT>;
export const getLogEntryCursorFromHit = (hit: { sort: [number, number] }) =>
decodeOrThrow(logEntryCursorRT)({
time: hit.sort[0],

View file

@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import * as rt from 'io-ts';
import { DslQuery } from '../../../../../../src/plugins/data/common';
import { logSourceColumnConfigurationRT } from '../../http_api/log_sources';
import {
logEntryAfterCursorRT,
logEntryBeforeCursorRT,
logEntryCursorRT,
logEntryRT,
} from '../../log_entry';
import { JsonObject, jsonObjectRT } from '../../typed_json';
import { searchStrategyErrorRT } from '../common/errors';
export const LOG_ENTRIES_SEARCH_STRATEGY = 'infra-log-entries';
const logEntriesBaseSearchRequestParamsRT = rt.intersection([
rt.type({
sourceId: rt.string,
startTimestamp: rt.number,
endTimestamp: rt.number,
size: rt.number,
}),
rt.partial({
query: jsonObjectRT,
columns: rt.array(logSourceColumnConfigurationRT),
highlightPhrase: rt.string,
}),
]);
export const logEntriesBeforeSearchRequestParamsRT = rt.intersection([
logEntriesBaseSearchRequestParamsRT,
logEntryBeforeCursorRT,
]);
export const logEntriesAfterSearchRequestParamsRT = rt.intersection([
logEntriesBaseSearchRequestParamsRT,
logEntryAfterCursorRT,
]);
export const logEntriesSearchRequestParamsRT = rt.union([
logEntriesBaseSearchRequestParamsRT,
logEntriesBeforeSearchRequestParamsRT,
logEntriesAfterSearchRequestParamsRT,
]);
export type LogEntriesSearchRequestParams = rt.TypeOf<typeof logEntriesSearchRequestParamsRT>;
export type LogEntriesSearchRequestQuery = JsonObject | DslQuery;
export const logEntriesSearchResponsePayloadRT = rt.intersection([
rt.type({
data: rt.intersection([
rt.type({
entries: rt.array(logEntryRT),
topCursor: rt.union([logEntryCursorRT, rt.null]),
bottomCursor: rt.union([logEntryCursorRT, rt.null]),
}),
rt.partial({
hasMoreBefore: rt.boolean,
hasMoreAfter: rt.boolean,
}),
]),
}),
rt.partial({
errors: rt.array(searchStrategyErrorRT),
}),
]);
export type LogEntriesSearchResponsePayload = rt.TypeOf<typeof logEntriesSearchResponsePayloadRT>;

View file

@ -5,8 +5,7 @@
*/
import * as rt from 'io-ts';
import { logEntryCursorRT } from '../../log_entry';
import { jsonArrayRT } from '../../typed_json';
import { logEntryCursorRT, logEntryFieldRT } from '../../log_entry';
import { searchStrategyErrorRT } from '../common/errors';
export const LOG_ENTRY_SEARCH_STRATEGY = 'infra-log-entry';
@ -18,18 +17,11 @@ export const logEntrySearchRequestParamsRT = rt.type({
export type LogEntrySearchRequestParams = rt.TypeOf<typeof logEntrySearchRequestParamsRT>;
const logEntryFieldRT = rt.type({
field: rt.string,
value: jsonArrayRT,
});
export type LogEntryField = rt.TypeOf<typeof logEntryFieldRT>;
export const logEntryRT = rt.type({
id: rt.string,
index: rt.string,
fields: rt.array(logEntryFieldRT),
key: logEntryCursorRT,
cursor: logEntryCursorRT,
});
export type LogEntry = rt.TypeOf<typeof logEntryRT>;

View file

@ -7,6 +7,8 @@
import * as rt from 'io-ts';
import { JsonArray, JsonObject, JsonValue } from '../../../../src/plugins/kibana_utils/common';
export { JsonArray, JsonObject, JsonValue };
export const jsonScalarRT = rt.union([rt.null, rt.boolean, rt.number, rt.string]);
export const jsonValueRT: rt.Type<JsonValue> = rt.recursion('JsonValue', () =>

View file

@ -1,10 +1,12 @@
import { Meta, Story, Canvas, ArgsTable } from '@storybook/addon-docs/blocks';
import { Subject } from 'rxjs';
import { defer, of, Subject } from 'rxjs';
import { delay } from 'rxjs/operators';
import { I18nProvider } from '@kbn/i18n/react';
import { EuiThemeProvider } from '../../../../../../src/plugins/kibana_react/common';
import { KibanaContextProvider } from '../../../../../../src/plugins/kibana_react/public';
import { LOG_ENTRIES_SEARCH_STRATEGY } from '../../../common/search_strategies/log_entries/log_entries';
import { DEFAULT_SOURCE_CONFIGURATION } from '../../test_utils/source_configuration';
import { generateFakeEntries, ENTRIES_EMPTY } from '../../test_utils/entries';
@ -15,30 +17,61 @@ import { LogStream } from './';
export const startTimestamp = 1595145600000;
export const endTimestamp = startTimestamp + 15 * 60 * 1000;
export const dataMock = {
search: {
search: ({ params }, options) => {
return defer(() => {
switch (options.strategy) {
case LOG_ENTRIES_SEARCH_STRATEGY:
if (params.after?.time === params.endTimestamp || params.before?.time === params.startTimestamp) {
return of({
id: 'EMPTY_FAKE_RESPONSE',
total: 1,
loaded: 1,
isRunning: false,
isPartial: false,
rawResponse: ENTRIES_EMPTY,
});
} else {
const entries = generateFakeEntries(
200,
params.startTimestamp,
params.endTimestamp,
params.columns || DEFAULT_SOURCE_CONFIGURATION.data.configuration.logColumns
);
return of({
id: 'FAKE_RESPONSE',
total: 1,
loaded: 1,
isRunning: false,
isPartial: false,
rawResponse: {
data: {
entries,
topCursor: entries[0].cursor,
bottomCursor: entries[entries.length - 1].cursor,
hasMoreBefore: false,
},
errors: [],
}
});
}
default:
return of({
id: 'FAKE_RESPONSE',
rawResponse: {},
});
}
}).pipe(delay(2000));
},
},
};
export const fetch = function (url, params) {
switch (url) {
case '/api/infra/log_source_configurations/default':
return DEFAULT_SOURCE_CONFIGURATION;
case '/api/log_entries/entries':
const body = JSON.parse(params.body);
if (body.after?.time === body.endTimestamp || body.before?.time === body.startTimestamp) {
return ENTRIES_EMPTY;
} else {
const entries = generateFakeEntries(
200,
body.startTimestamp,
body.endTimestamp,
body.columns || DEFAULT_SOURCE_CONFIGURATION.data.configuration.logColumns
);
return {
data: {
entries,
topCursor: entries[0].cursor,
bottomCursor: entries[entries.length - 1].cursor,
hasMoreBefore: false,
},
};
}
default:
return {};
}
@ -67,7 +100,7 @@ export const Template = (args) => <LogStream {...args} />;
(story) => (
<I18nProvider>
<EuiThemeProvider>
<KibanaContextProvider services={{ http: { fetch }, uiSettings }}>
<KibanaContextProvider services={{ data: dataMock, http: { fetch }, uiSettings }}>
{story()}
</KibanaContextProvider>
</EuiThemeProvider>

View file

@ -101,14 +101,14 @@ Read more at https://github.com/elastic/kibana/blob/master/src/plugins/kibana_re
// Internal state
const {
loadingState,
pageLoadingState,
entries,
hasMoreBefore,
hasMoreAfter,
fetchEntries,
fetchPreviousEntries,
fetchNextEntries,
fetchPreviousEntries,
hasMoreAfter,
hasMoreBefore,
isLoadingMore,
isReloading,
} = useLogStream({
sourceId,
startTimestamp,
@ -118,12 +118,6 @@ Read more at https://github.com/elastic/kibana/blob/master/src/plugins/kibana_re
columns: customColumns,
});
// Derived state
const isReloading =
isLoadingSourceConfiguration || loadingState === 'uninitialized' || loadingState === 'loading';
const isLoadingMore = pageLoadingState === 'loading';
const columnConfigurations = useMemo(() => {
return sourceConfiguration ? customColumns ?? sourceConfiguration.configuration.logColumns : [];
}, [sourceConfiguration, customColumns]);
@ -177,7 +171,7 @@ Read more at https://github.com/elastic/kibana/blob/master/src/plugins/kibana_re
items={streamItems}
scale="medium"
wrap={true}
isReloading={isReloading}
isReloading={isLoadingSourceConfiguration || isReloading}
isLoadingMore={isLoadingMore}
hasMoreBeforeStart={hasMoreBefore}
hasMoreAfterEnd={hasMoreAfter}

View file

@ -32,7 +32,7 @@ describe('LogEntryActionsMenu component', () => {
fields: [{ field: 'host.ip', value: ['HOST_IP'] }],
id: 'ITEM_ID',
index: 'INDEX',
key: {
cursor: {
time: 0,
tiebreaker: 0,
},
@ -62,7 +62,7 @@ describe('LogEntryActionsMenu component', () => {
fields: [{ field: 'container.id', value: ['CONTAINER_ID'] }],
id: 'ITEM_ID',
index: 'INDEX',
key: {
cursor: {
time: 0,
tiebreaker: 0,
},
@ -92,7 +92,7 @@ describe('LogEntryActionsMenu component', () => {
fields: [{ field: 'kubernetes.pod.uid', value: ['POD_UID'] }],
id: 'ITEM_ID',
index: 'INDEX',
key: {
cursor: {
time: 0,
tiebreaker: 0,
},
@ -126,7 +126,7 @@ describe('LogEntryActionsMenu component', () => {
],
id: 'ITEM_ID',
index: 'INDEX',
key: {
cursor: {
time: 0,
tiebreaker: 0,
},
@ -158,7 +158,7 @@ describe('LogEntryActionsMenu component', () => {
fields: [],
id: 'ITEM_ID',
index: 'INDEX',
key: {
cursor: {
time: 0,
tiebreaker: 0,
},
@ -192,7 +192,7 @@ describe('LogEntryActionsMenu component', () => {
fields: [{ field: 'trace.id', value: ['1234567'] }],
id: 'ITEM_ID',
index: 'INDEX',
key: {
cursor: {
time: 0,
tiebreaker: 0,
},
@ -226,7 +226,7 @@ describe('LogEntryActionsMenu component', () => {
],
id: 'ITEM_ID',
index: 'INDEX',
key: {
cursor: {
time: 0,
tiebreaker: 0,
},
@ -256,7 +256,7 @@ describe('LogEntryActionsMenu component', () => {
fields: [],
id: 'ITEM_ID',
index: 'INDEX',
key: {
cursor: {
time: 0,
tiebreaker: 0,
},

View file

@ -7,10 +7,8 @@
import { EuiBasicTableColumn, EuiInMemoryTable } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import React, { useMemo } from 'react';
import {
LogEntry,
LogEntryField,
} from '../../../../common/search_strategies/log_entries/log_entry';
import { LogEntryField } from '../../../../common/log_entry';
import { LogEntry } from '../../../../common/search_strategies/log_entries/log_entry';
import { TimeKey } from '../../../../common/time';
import { FieldValue } from '../log_text_stream/field_value';
@ -22,7 +20,7 @@ export const LogEntryFieldsTable: React.FC<{
() =>
onSetFieldFilter
? (field: LogEntryField) => () => {
onSetFieldFilter?.(`${field.field}:"${field.value}"`, logEntry.id, logEntry.key);
onSetFieldFilter?.(`${field.field}:"${field.value}"`, logEntry.id, logEntry.cursor);
}
: undefined,
[logEntry, onSetFieldFilter]

View file

@ -5,7 +5,6 @@
*/
import { bisector } from 'd3-array';
import { compareToTimeKey, TimeKey } from '../../../../common/time';
import { LogEntry } from '../../../../common/log_entry';

View file

@ -7,7 +7,6 @@
import React, { memo, useState, useCallback, useMemo } from 'react';
import { i18n } from '@kbn/i18n';
import { isEmpty } from 'lodash';
import { euiStyled } from '../../../../../../../src/plugins/kibana_react/common';
import { useUiTracker } from '../../../../../observability/public';
import { isTimestampColumn } from '../../../utils/log_entry';

View file

@ -11,7 +11,11 @@ import {
logEntrySearchResponsePayloadRT,
LOG_ENTRY_SEARCH_STRATEGY,
} from '../../../common/search_strategies/log_entries/log_entry';
import { useDataSearch, useLatestPartialDataSearchResponse } from '../../utils/data_search';
import {
normalizeDataSearchResponses,
useDataSearch,
useLatestPartialDataSearchResponse,
} from '../../utils/data_search';
export const useLogEntry = ({
sourceId,
@ -31,6 +35,7 @@ export const useLogEntry = ({
}
: null;
}, [sourceId, logEntryId]),
parseResponses: parseLogEntrySearchResponses,
});
const {
@ -41,11 +46,7 @@ export const useLogEntry = ({
latestResponseErrors,
loaded,
total,
} = useLatestPartialDataSearchResponse(
logEntrySearchRequests$,
null,
decodeLogEntrySearchResponse
);
} = useLatestPartialDataSearchResponse(logEntrySearchRequests$);
return {
cancelRequest,
@ -59,4 +60,7 @@ export const useLogEntry = ({
};
};
const decodeLogEntrySearchResponse = decodeOrThrow(logEntrySearchResponsePayloadRT);
const parseLogEntrySearchResponses = normalizeDataSearchResponses(
null,
decodeOrThrow(logEntrySearchResponsePayloadRT)
);

View file

@ -1,42 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import gql from 'graphql-tag';
import { sharedFragments } from '../../../../common/graphql/shared';
export const logEntryHighlightsQuery = gql`
query LogEntryHighlightsQuery(
$sourceId: ID = "default"
$startKey: InfraTimeKeyInput!
$endKey: InfraTimeKeyInput!
$filterQuery: String
$highlights: [InfraLogEntryHighlightInput!]!
) {
source(id: $sourceId) {
id
logEntryHighlights(
startKey: $startKey
endKey: $endKey
filterQuery: $filterQuery
highlights: $highlights
) {
start {
...InfraTimeKeyFields
}
end {
...InfraTimeKeyFields
}
entries {
...InfraLogEntryHighlightFields
}
}
}
}
${sharedFragments.InfraTimeKey}
${sharedFragments.InfraLogEntryHighlightFields}
`;

View file

@ -5,13 +5,12 @@
*/
import { useEffect, useMemo, useState } from 'react';
import { TimeKey } from '../../../../common/time';
import { useTrackedPromise } from '../../../utils/use_tracked_promise';
import { fetchLogEntriesHighlights } from './api/fetch_log_entries_highlights';
import { LogEntriesHighlightsResponse } from '../../../../common/http_api';
import { LogEntry } from '../../../../common/log_entry';
import { TimeKey } from '../../../../common/time';
import { useKibanaContextForPlugin } from '../../../hooks/use_kibana';
import { useTrackedPromise } from '../../../utils/use_tracked_promise';
import { fetchLogEntriesHighlights } from './api/fetch_log_entries_highlights';
export const useLogEntryHighlights = (
sourceId: string,

View file

@ -4,15 +4,16 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { useMemo, useEffect } from 'react';
import useSetState from 'react-use/lib/useSetState';
import { useCallback, useEffect, useMemo } from 'react';
import usePrevious from 'react-use/lib/usePrevious';
import useSetState from 'react-use/lib/useSetState';
import { esKuery, esQuery, Query } from '../../../../../../../src/plugins/data/public';
import { fetchLogEntries } from '../log_entries/api/fetch_log_entries';
import { useTrackedPromise } from '../../../utils/use_tracked_promise';
import { LogEntryCursor, LogEntry } from '../../../../common/log_entry';
import { useKibanaContextForPlugin } from '../../../hooks/use_kibana';
import { LogEntry, LogEntryCursor } from '../../../../common/log_entry';
import { useSubscription } from '../../../utils/use_observable';
import { LogSourceConfigurationProperties } from '../log_source';
import { useFetchLogEntriesAfter } from './use_fetch_log_entries_after';
import { useFetchLogEntriesAround } from './use_fetch_log_entries_around';
import { useFetchLogEntriesBefore } from './use_fetch_log_entries_before';
interface LogStreamProps {
sourceId: string;
@ -31,16 +32,6 @@ interface LogStreamState {
hasMoreAfter: boolean;
}
type LoadingState = 'uninitialized' | 'loading' | 'success' | 'error';
interface LogStreamReturn extends LogStreamState {
fetchEntries: () => void;
fetchPreviousEntries: () => void;
fetchNextEntries: () => void;
loadingState: LoadingState;
pageLoadingState: LoadingState;
}
const INITIAL_STATE: LogStreamState = {
entries: [],
topCursor: null,
@ -50,11 +41,7 @@ const INITIAL_STATE: LogStreamState = {
hasMoreAfter: true,
};
const EMPTY_DATA = {
entries: [],
topCursor: null,
bottomCursor: null,
};
const LOG_ENTRIES_CHUNK_SIZE = 200;
export function useLogStream({
sourceId,
@ -63,8 +50,7 @@ export function useLogStream({
query,
center,
columns,
}: LogStreamProps): LogStreamReturn {
const { services } = useKibanaContextForPlugin();
}: LogStreamProps) {
const [state, setState] = useSetState<LogStreamState>(INITIAL_STATE);
// Ensure the pagination keeps working when the timerange gets extended
@ -85,175 +71,151 @@ export function useLogStream({
const parsedQuery = useMemo(() => {
if (!query) {
return null;
}
let q;
if (typeof query === 'string') {
q = esKuery.toElasticsearchQuery(esKuery.fromKueryExpression(query));
return undefined;
} else if (typeof query === 'string') {
return esKuery.toElasticsearchQuery(esKuery.fromKueryExpression(query));
} else if (query.language === 'kuery') {
q = esKuery.toElasticsearchQuery(esKuery.fromKueryExpression(query.query as string));
return esKuery.toElasticsearchQuery(esKuery.fromKueryExpression(query.query as string));
} else if (query.language === 'lucene') {
q = esQuery.luceneStringToDsl(query.query as string);
return esQuery.luceneStringToDsl(query.query as string);
} else {
return undefined;
}
return JSON.stringify(q);
}, [query]);
// Callbacks
const [entriesPromise, fetchEntries] = useTrackedPromise(
{
cancelPreviousOn: 'creation',
createPromise: () => {
setState(INITIAL_STATE);
const fetchPosition = center ? { center } : { before: 'last' };
return fetchLogEntries(
{
sourceId,
startTimestamp,
endTimestamp,
query: parsedQuery,
columns,
...fetchPosition,
},
services.http.fetch
);
},
onResolve: ({ data }) => {
setState((prevState) => ({
...data,
hasMoreBefore: data.hasMoreBefore ?? prevState.hasMoreBefore,
hasMoreAfter: data.hasMoreAfter ?? prevState.hasMoreAfter,
}));
},
},
[sourceId, startTimestamp, endTimestamp, query]
const commonFetchArguments = useMemo(
() => ({
sourceId,
startTimestamp,
endTimestamp,
query: parsedQuery,
columnOverrides: columns,
}),
[columns, endTimestamp, parsedQuery, sourceId, startTimestamp]
);
const [previousEntriesPromise, fetchPreviousEntries] = useTrackedPromise(
{
cancelPreviousOn: 'creation',
createPromise: () => {
if (state.topCursor === null) {
throw new Error(
'useLogState: Cannot fetch previous entries. No cursor is set.\nEnsure you have called `fetchEntries` at least once.'
);
}
const {
fetchLogEntriesAround,
isRequestRunning: isLogEntriesAroundRequestRunning,
logEntriesAroundSearchResponses$,
} = useFetchLogEntriesAround(commonFetchArguments);
if (!state.hasMoreBefore) {
return Promise.resolve({ data: EMPTY_DATA });
}
return fetchLogEntries(
{
sourceId,
startTimestamp,
endTimestamp,
query: parsedQuery,
before: state.topCursor,
},
services.http.fetch
);
},
onResolve: ({ data }) => {
if (!data.entries.length) {
return;
}
useSubscription(logEntriesAroundSearchResponses$, {
next: ({ before, after, combined }) => {
if ((before.response.data != null || after?.response.data != null) && !combined.isPartial) {
setState((prevState) => ({
...prevState,
entries: combined.entries,
hasMoreAfter: combined.hasMoreAfter ?? prevState.hasMoreAfter,
hasMoreBefore: combined.hasMoreAfter ?? prevState.hasMoreAfter,
bottomCursor: combined.bottomCursor,
topCursor: combined.topCursor,
}));
}
},
});
const {
fetchLogEntriesBefore,
isRequestRunning: isLogEntriesBeforeRequestRunning,
logEntriesBeforeSearchResponse$,
} = useFetchLogEntriesBefore(commonFetchArguments);
useSubscription(logEntriesBeforeSearchResponse$, {
next: ({ response: { data, isPartial } }) => {
if (data != null && !isPartial) {
setState((prevState) => ({
...prevState,
entries: [...data.entries, ...prevState.entries],
hasMoreBefore: data.hasMoreBefore ?? prevState.hasMoreBefore,
topCursor: data.topCursor ?? prevState.topCursor,
bottomCursor: prevState.bottomCursor ?? data.bottomCursor,
}));
},
}
},
[sourceId, startTimestamp, endTimestamp, query, state.topCursor]
);
});
const [nextEntriesPromise, fetchNextEntries] = useTrackedPromise(
{
cancelPreviousOn: 'creation',
createPromise: () => {
if (state.bottomCursor === null) {
throw new Error(
'useLogState: Cannot fetch next entries. No cursor is set.\nEnsure you have called `fetchEntries` at least once.'
);
}
const fetchPreviousEntries = useCallback(() => {
if (state.topCursor === null) {
throw new Error(
'useLogState: Cannot fetch previous entries. No cursor is set.\nEnsure you have called `fetchEntries` at least once.'
);
}
if (!state.hasMoreAfter) {
return Promise.resolve({ data: EMPTY_DATA });
}
if (!state.hasMoreBefore) {
return;
}
return fetchLogEntries(
{
sourceId,
startTimestamp,
endTimestamp,
query: parsedQuery,
after: state.bottomCursor,
},
services.http.fetch
);
},
onResolve: ({ data }) => {
if (!data.entries.length) {
return;
}
fetchLogEntriesBefore(state.topCursor, LOG_ENTRIES_CHUNK_SIZE);
}, [fetchLogEntriesBefore, state.topCursor, state.hasMoreBefore]);
const {
fetchLogEntriesAfter,
isRequestRunning: isLogEntriesAfterRequestRunning,
logEntriesAfterSearchResponse$,
} = useFetchLogEntriesAfter(commonFetchArguments);
useSubscription(logEntriesAfterSearchResponse$, {
next: ({ response: { data, isPartial } }) => {
if (data != null && !isPartial) {
setState((prevState) => ({
...prevState,
entries: [...prevState.entries, ...data.entries],
hasMoreAfter: data.hasMoreAfter ?? prevState.hasMoreAfter,
topCursor: prevState.topCursor ?? data.topCursor,
bottomCursor: data.bottomCursor ?? prevState.bottomCursor,
}));
},
}
},
[sourceId, startTimestamp, endTimestamp, query, state.bottomCursor]
});
const fetchNextEntries = useCallback(() => {
if (state.bottomCursor === null) {
throw new Error(
'useLogState: Cannot fetch next entries. No cursor is set.\nEnsure you have called `fetchEntries` at least once.'
);
}
if (!state.hasMoreAfter) {
return;
}
fetchLogEntriesAfter(state.bottomCursor, LOG_ENTRIES_CHUNK_SIZE);
}, [fetchLogEntriesAfter, state.bottomCursor, state.hasMoreAfter]);
const fetchEntries = useCallback(() => {
setState(INITIAL_STATE);
if (center) {
fetchLogEntriesAround(center, LOG_ENTRIES_CHUNK_SIZE);
} else {
fetchLogEntriesBefore('last', LOG_ENTRIES_CHUNK_SIZE);
}
}, [center, fetchLogEntriesAround, fetchLogEntriesBefore, setState]);
const isReloading = useMemo(
() =>
isLogEntriesAroundRequestRunning ||
(state.bottomCursor == null && state.topCursor == null && isLogEntriesBeforeRequestRunning),
[
isLogEntriesAroundRequestRunning,
isLogEntriesBeforeRequestRunning,
state.bottomCursor,
state.topCursor,
]
);
const loadingState = useMemo<LoadingState>(
() => convertPromiseStateToLoadingState(entriesPromise.state),
[entriesPromise.state]
const isLoadingMore = useMemo(
() => isLogEntriesBeforeRequestRunning || isLogEntriesAfterRequestRunning,
[isLogEntriesAfterRequestRunning, isLogEntriesBeforeRequestRunning]
);
const pageLoadingState = useMemo<LoadingState>(() => {
const states = [previousEntriesPromise.state, nextEntriesPromise.state];
if (states.includes('pending')) {
return 'loading';
}
if (states.includes('rejected')) {
return 'error';
}
if (states.includes('resolved')) {
return 'success';
}
return 'uninitialized';
}, [previousEntriesPromise.state, nextEntriesPromise.state]);
return {
...state,
fetchEntries,
fetchPreviousEntries,
fetchNextEntries,
loadingState,
pageLoadingState,
fetchPreviousEntries,
isLoadingMore,
isReloading,
};
}
function convertPromiseStateToLoadingState(
state: 'uninitialized' | 'pending' | 'resolved' | 'rejected'
): LoadingState {
switch (state) {
case 'uninitialized':
return 'uninitialized';
case 'pending':
return 'loading';
case 'resolved':
return 'success';
case 'rejected':
return 'error';
}
}

View file

@ -0,0 +1,157 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { useCallback } from 'react';
import { Observable } from 'rxjs';
import { exhaustMap } from 'rxjs/operators';
import { IKibanaSearchRequest } from '../../../../../../../src/plugins/data/public';
import { LogSourceColumnConfiguration } from '../../../../common/http_api/log_sources';
import { LogEntryAfterCursor } from '../../../../common/log_entry';
import { decodeOrThrow } from '../../../../common/runtime_types';
import {
logEntriesSearchRequestParamsRT,
LogEntriesSearchRequestQuery,
LogEntriesSearchResponsePayload,
logEntriesSearchResponsePayloadRT,
LOG_ENTRIES_SEARCH_STRATEGY,
} from '../../../../common/search_strategies/log_entries/log_entries';
import {
flattenDataSearchResponseDescriptor,
normalizeDataSearchResponses,
ParsedDataSearchRequestDescriptor,
useDataSearch,
useDataSearchResponseState,
} from '../../../utils/data_search';
import { useOperator } from '../../../utils/use_observable';
export const useLogEntriesAfterRequest = ({
columnOverrides,
endTimestamp,
highlightPhrase,
query,
sourceId,
startTimestamp,
}: {
columnOverrides?: LogSourceColumnConfiguration[];
endTimestamp: number;
highlightPhrase?: string;
query?: LogEntriesSearchRequestQuery;
sourceId: string;
startTimestamp: number;
}) => {
const { search: fetchLogEntriesAfter, requests$: logEntriesAfterSearchRequests$ } = useDataSearch(
{
getRequest: useCallback(
(cursor: LogEntryAfterCursor['after'], size: number) => {
return !!sourceId
? {
request: {
params: logEntriesSearchRequestParamsRT.encode({
after: cursor,
columns: columnOverrides,
endTimestamp,
highlightPhrase,
query,
size,
sourceId,
startTimestamp,
}),
},
options: { strategy: LOG_ENTRIES_SEARCH_STRATEGY },
}
: null;
},
[columnOverrides, endTimestamp, highlightPhrase, query, sourceId, startTimestamp]
),
parseResponses: parseLogEntriesAfterSearchResponses,
}
);
return {
fetchLogEntriesAfter,
logEntriesAfterSearchRequests$,
};
};
export const useLogEntriesAfterResponse = <Request extends IKibanaSearchRequest>(
logEntriesAfterSearchRequests$: Observable<
ParsedDataSearchRequestDescriptor<Request, LogEntriesSearchResponsePayload['data'] | null>
>
) => {
const logEntriesAfterSearchResponse$ = useOperator(
logEntriesAfterSearchRequests$,
flattenLogEntriesAfterSearchResponse
);
const {
cancelRequest,
isRequestRunning,
isResponsePartial,
loaded,
total,
} = useDataSearchResponseState(logEntriesAfterSearchResponse$);
return {
cancelRequest,
isRequestRunning,
isResponsePartial,
loaded,
logEntriesAfterSearchRequests$,
logEntriesAfterSearchResponse$,
total,
};
};
export const useFetchLogEntriesAfter = ({
columnOverrides,
endTimestamp,
highlightPhrase,
query,
sourceId,
startTimestamp,
}: {
columnOverrides?: LogSourceColumnConfiguration[];
endTimestamp: number;
highlightPhrase?: string;
query?: LogEntriesSearchRequestQuery;
sourceId: string;
startTimestamp: number;
}) => {
const { fetchLogEntriesAfter, logEntriesAfterSearchRequests$ } = useLogEntriesAfterRequest({
columnOverrides,
endTimestamp,
highlightPhrase,
query,
sourceId,
startTimestamp,
});
const {
cancelRequest,
isRequestRunning,
isResponsePartial,
loaded,
logEntriesAfterSearchResponse$,
total,
} = useLogEntriesAfterResponse(logEntriesAfterSearchRequests$);
return {
cancelRequest,
fetchLogEntriesAfter,
isRequestRunning,
isResponsePartial,
loaded,
logEntriesAfterSearchResponse$,
total,
};
};
export const parseLogEntriesAfterSearchResponses = normalizeDataSearchResponses(
null,
decodeOrThrow(logEntriesSearchResponsePayloadRT)
);
const flattenLogEntriesAfterSearchResponse = exhaustMap(flattenDataSearchResponseDescriptor);

View file

@ -0,0 +1,184 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { useCallback } from 'react';
import { combineLatest, Observable, Subject } from 'rxjs';
import { last, map, startWith, switchMap } from 'rxjs/operators';
import { LogSourceColumnConfiguration } from '../../../../common/http_api/log_sources';
import { LogEntryCursor } from '../../../../common/log_entry';
import { LogEntriesSearchRequestQuery } from '../../../../common/search_strategies/log_entries/log_entries';
import { flattenDataSearchResponseDescriptor } from '../../../utils/data_search';
import { useObservable, useObservableState } from '../../../utils/use_observable';
import { useLogEntriesAfterRequest } from './use_fetch_log_entries_after';
import { useLogEntriesBeforeRequest } from './use_fetch_log_entries_before';
export const useFetchLogEntriesAround = ({
columnOverrides,
endTimestamp,
highlightPhrase,
query,
sourceId,
startTimestamp,
}: {
columnOverrides?: LogSourceColumnConfiguration[];
endTimestamp: number;
highlightPhrase?: string;
query?: LogEntriesSearchRequestQuery;
sourceId: string;
startTimestamp: number;
}) => {
const { fetchLogEntriesBefore } = useLogEntriesBeforeRequest({
columnOverrides,
endTimestamp,
highlightPhrase,
query,
sourceId,
startTimestamp,
});
const { fetchLogEntriesAfter } = useLogEntriesAfterRequest({
columnOverrides,
endTimestamp,
highlightPhrase,
query,
sourceId,
startTimestamp,
});
type LogEntriesBeforeRequest = NonNullable<ReturnType<typeof fetchLogEntriesBefore>>;
type LogEntriesAfterRequest = NonNullable<ReturnType<typeof fetchLogEntriesAfter>>;
const logEntriesAroundSearchRequests$ = useObservable(
() => new Subject<[LogEntriesBeforeRequest, Observable<LogEntriesAfterRequest>]>(),
[]
);
const fetchLogEntriesAround = useCallback(
(cursor: LogEntryCursor, size: number) => {
const logEntriesBeforeSearchRequest = fetchLogEntriesBefore(cursor, Math.floor(size / 2));
if (logEntriesBeforeSearchRequest == null) {
return;
}
const logEntriesAfterSearchRequest$ = flattenDataSearchResponseDescriptor(
logEntriesBeforeSearchRequest
).pipe(
last(), // in the future we could start earlier if we receive partial results already
map((lastBeforeSearchResponse) => {
const cursorAfter = lastBeforeSearchResponse.response.data?.bottomCursor ?? {
time: cursor.time - 1,
tiebreaker: 0,
};
const logEntriesAfterSearchRequest = fetchLogEntriesAfter(
cursorAfter,
Math.ceil(size / 2)
);
if (logEntriesAfterSearchRequest == null) {
throw new Error('Failed to create request: no request args given');
}
return logEntriesAfterSearchRequest;
})
);
logEntriesAroundSearchRequests$.next([
logEntriesBeforeSearchRequest,
logEntriesAfterSearchRequest$,
]);
},
[fetchLogEntriesAfter, fetchLogEntriesBefore, logEntriesAroundSearchRequests$]
);
const logEntriesAroundSearchResponses$ = useObservable(
(inputs$) =>
inputs$.pipe(
switchMap(([currentSearchRequests$]) =>
currentSearchRequests$.pipe(
switchMap(([beforeRequest, afterRequest$]) => {
const beforeResponse$ = flattenDataSearchResponseDescriptor(beforeRequest);
const afterResponse$ = afterRequest$.pipe(
switchMap(flattenDataSearchResponseDescriptor),
startWith(undefined) // emit "before" response even if "after" hasn't started yet
);
return combineLatest([beforeResponse$, afterResponse$]);
}),
map(([beforeResponse, afterResponse]) => {
const loadedBefore = beforeResponse.response.loaded;
const loadedAfter = afterResponse?.response.loaded;
const totalBefore = beforeResponse.response.total;
const totalAfter = afterResponse?.response.total;
return {
before: beforeResponse,
after: afterResponse,
combined: {
isRunning:
(beforeResponse.response.isRunning || afterResponse?.response.isRunning) ??
false,
isPartial:
(beforeResponse.response.isPartial || afterResponse?.response.isPartial) ??
false,
loaded:
loadedBefore != null || loadedAfter != null
? (loadedBefore ?? 0) + (loadedAfter ?? 0)
: undefined,
total:
totalBefore != null || totalAfter != null
? (totalBefore ?? 0) + (totalAfter ?? 0)
: undefined,
entries: [
...(beforeResponse.response.data?.entries ?? []),
...(afterResponse?.response.data?.entries ?? []),
],
errors: [
...(beforeResponse.response.errors ?? []),
...(afterResponse?.response.errors ?? []),
],
hasMoreBefore: beforeResponse.response.data?.hasMoreBefore,
hasMoreAfter: afterResponse?.response.data?.hasMoreAfter,
topCursor: beforeResponse.response.data?.topCursor,
bottomCursor: afterResponse?.response.data?.bottomCursor,
},
};
})
)
)
),
[logEntriesAroundSearchRequests$]
);
const {
latestValue: {
before: latestBeforeResponse,
after: latestAfterResponse,
combined: latestCombinedResponse,
},
} = useObservableState(logEntriesAroundSearchResponses$, initialCombinedResponse);
const cancelRequest = useCallback(() => {
latestBeforeResponse?.abortController.abort();
latestAfterResponse?.abortController.abort();
}, [latestBeforeResponse, latestAfterResponse]);
return {
cancelRequest,
fetchLogEntriesAround,
isRequestRunning: latestCombinedResponse?.isRunning ?? false,
isResponsePartial: latestCombinedResponse?.isPartial ?? false,
loaded: latestCombinedResponse?.loaded,
logEntriesAroundSearchResponses$,
total: latestCombinedResponse?.total,
};
};
const initialCombinedResponse = {
before: undefined,
after: undefined,
combined: undefined,
} as const;

View file

@ -0,0 +1,158 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { useCallback } from 'react';
import { Observable } from 'rxjs';
import { exhaustMap } from 'rxjs/operators';
import { IKibanaSearchRequest } from '../../../../../../../src/plugins/data/public';
import { LogSourceColumnConfiguration } from '../../../../common/http_api/log_sources';
import { LogEntryBeforeCursor } from '../../../../common/log_entry';
import { decodeOrThrow } from '../../../../common/runtime_types';
import {
logEntriesSearchRequestParamsRT,
LogEntriesSearchRequestQuery,
LogEntriesSearchResponsePayload,
logEntriesSearchResponsePayloadRT,
LOG_ENTRIES_SEARCH_STRATEGY,
} from '../../../../common/search_strategies/log_entries/log_entries';
import {
flattenDataSearchResponseDescriptor,
normalizeDataSearchResponses,
ParsedDataSearchRequestDescriptor,
useDataSearch,
useDataSearchResponseState,
} from '../../../utils/data_search';
import { useOperator } from '../../../utils/use_observable';
export const useLogEntriesBeforeRequest = ({
columnOverrides,
endTimestamp,
highlightPhrase,
query,
sourceId,
startTimestamp,
}: {
columnOverrides?: LogSourceColumnConfiguration[];
endTimestamp: number;
highlightPhrase?: string;
query?: LogEntriesSearchRequestQuery;
sourceId: string;
startTimestamp: number;
}) => {
const {
search: fetchLogEntriesBefore,
requests$: logEntriesBeforeSearchRequests$,
} = useDataSearch({
getRequest: useCallback(
(cursor: LogEntryBeforeCursor['before'], size: number) => {
return !!sourceId
? {
request: {
params: logEntriesSearchRequestParamsRT.encode({
before: cursor,
columns: columnOverrides,
endTimestamp,
highlightPhrase,
query,
size,
sourceId,
startTimestamp,
}),
},
options: { strategy: LOG_ENTRIES_SEARCH_STRATEGY },
}
: null;
},
[columnOverrides, endTimestamp, highlightPhrase, query, sourceId, startTimestamp]
),
parseResponses: parseLogEntriesBeforeSearchResponses,
});
return {
fetchLogEntriesBefore,
logEntriesBeforeSearchRequests$,
};
};
export const useLogEntriesBeforeResponse = <Request extends IKibanaSearchRequest>(
logEntriesBeforeSearchRequests$: Observable<
ParsedDataSearchRequestDescriptor<Request, LogEntriesSearchResponsePayload['data'] | null>
>
) => {
const logEntriesBeforeSearchResponse$ = useOperator(
logEntriesBeforeSearchRequests$,
flattenLogEntriesBeforeSearchResponse
);
const {
cancelRequest,
isRequestRunning,
isResponsePartial,
loaded,
total,
} = useDataSearchResponseState(logEntriesBeforeSearchResponse$);
return {
cancelRequest,
isRequestRunning,
isResponsePartial,
loaded,
logEntriesBeforeSearchRequests$,
logEntriesBeforeSearchResponse$,
total,
};
};
export const useFetchLogEntriesBefore = ({
columnOverrides,
endTimestamp,
highlightPhrase,
query,
sourceId,
startTimestamp,
}: {
columnOverrides?: LogSourceColumnConfiguration[];
endTimestamp: number;
highlightPhrase?: string;
query?: LogEntriesSearchRequestQuery;
sourceId: string;
startTimestamp: number;
}) => {
const { fetchLogEntriesBefore, logEntriesBeforeSearchRequests$ } = useLogEntriesBeforeRequest({
columnOverrides,
endTimestamp,
highlightPhrase,
query,
sourceId,
startTimestamp,
});
const {
cancelRequest,
isRequestRunning,
isResponsePartial,
loaded,
logEntriesBeforeSearchResponse$,
total,
} = useLogEntriesBeforeResponse(logEntriesBeforeSearchRequests$);
return {
cancelRequest,
fetchLogEntriesBefore,
isRequestRunning,
isResponsePartial,
loaded,
logEntriesBeforeSearchResponse$,
total,
};
};
export const parseLogEntriesBeforeSearchResponses = normalizeDataSearchResponses(
null,
decodeOrThrow(logEntriesSearchResponsePayloadRT)
);
const flattenLogEntriesBeforeSearchResponse = exhaustMap(flattenDataSearchResponseDescriptor);

View file

@ -137,155 +137,6 @@
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "logEntriesAround",
"description": "A consecutive span of log entries surrounding a point in time",
"args": [
{
"name": "key",
"description": "The sort key that corresponds to the point in time",
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "INPUT_OBJECT", "name": "InfraTimeKeyInput", "ofType": null }
},
"defaultValue": null
},
{
"name": "countBefore",
"description": "The maximum number of preceding to return",
"type": { "kind": "SCALAR", "name": "Int", "ofType": null },
"defaultValue": "0"
},
{
"name": "countAfter",
"description": "The maximum number of following to return",
"type": { "kind": "SCALAR", "name": "Int", "ofType": null },
"defaultValue": "0"
},
{
"name": "filterQuery",
"description": "The query to filter the log entries by",
"type": { "kind": "SCALAR", "name": "String", "ofType": null },
"defaultValue": null
}
],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "OBJECT", "name": "InfraLogEntryInterval", "ofType": null }
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "logEntriesBetween",
"description": "A consecutive span of log entries within an interval",
"args": [
{
"name": "startKey",
"description": "The sort key that corresponds to the start of the interval",
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "INPUT_OBJECT", "name": "InfraTimeKeyInput", "ofType": null }
},
"defaultValue": null
},
{
"name": "endKey",
"description": "The sort key that corresponds to the end of the interval",
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "INPUT_OBJECT", "name": "InfraTimeKeyInput", "ofType": null }
},
"defaultValue": null
},
{
"name": "filterQuery",
"description": "The query to filter the log entries by",
"type": { "kind": "SCALAR", "name": "String", "ofType": null },
"defaultValue": null
}
],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "OBJECT", "name": "InfraLogEntryInterval", "ofType": null }
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "logEntryHighlights",
"description": "Sequences of log entries matching sets of highlighting queries within an interval",
"args": [
{
"name": "startKey",
"description": "The sort key that corresponds to the start of the interval",
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "INPUT_OBJECT", "name": "InfraTimeKeyInput", "ofType": null }
},
"defaultValue": null
},
{
"name": "endKey",
"description": "The sort key that corresponds to the end of the interval",
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "INPUT_OBJECT", "name": "InfraTimeKeyInput", "ofType": null }
},
"defaultValue": null
},
{
"name": "filterQuery",
"description": "The query to filter the log entries by",
"type": { "kind": "SCALAR", "name": "String", "ofType": null },
"defaultValue": null
},
{
"name": "highlights",
"description": "The highlighting to apply to the log entries",
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": {
"kind": "LIST",
"name": null,
"ofType": {
"kind": "NON_NULL",
"name": null,
"ofType": {
"kind": "INPUT_OBJECT",
"name": "InfraLogEntryHighlightInput",
"ofType": null
}
}
}
},
"defaultValue": null
}
],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": {
"kind": "LIST",
"name": null,
"ofType": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "OBJECT", "name": "InfraLogEntryInterval", "ofType": null }
}
}
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "snapshot",
"description": "A snapshot of nodes",
@ -993,37 +844,6 @@
"enumValues": null,
"possibleTypes": null
},
{
"kind": "INPUT_OBJECT",
"name": "InfraTimeKeyInput",
"description": "",
"fields": null,
"inputFields": [
{
"name": "time",
"description": "",
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "Float", "ofType": null }
},
"defaultValue": null
},
{
"name": "tiebreaker",
"description": "",
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "Float", "ofType": null }
},
"defaultValue": null
}
],
"interfaces": null,
"enumValues": null,
"possibleTypes": null
},
{
"kind": "SCALAR",
"name": "Int",
@ -1034,486 +854,6 @@
"enumValues": null,
"possibleTypes": null
},
{
"kind": "OBJECT",
"name": "InfraLogEntryInterval",
"description": "A consecutive sequence of log entries",
"fields": [
{
"name": "start",
"description": "The key corresponding to the start of the interval covered by the entries",
"args": [],
"type": { "kind": "OBJECT", "name": "InfraTimeKey", "ofType": null },
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "end",
"description": "The key corresponding to the end of the interval covered by the entries",
"args": [],
"type": { "kind": "OBJECT", "name": "InfraTimeKey", "ofType": null },
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "hasMoreBefore",
"description": "Whether there are more log entries available before the start",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "Boolean", "ofType": null }
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "hasMoreAfter",
"description": "Whether there are more log entries available after the end",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "Boolean", "ofType": null }
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "filterQuery",
"description": "The query the log entries were filtered by",
"args": [],
"type": { "kind": "SCALAR", "name": "String", "ofType": null },
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "highlightQuery",
"description": "The query the log entries were highlighted with",
"args": [],
"type": { "kind": "SCALAR", "name": "String", "ofType": null },
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "entries",
"description": "A list of the log entries",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": {
"kind": "LIST",
"name": null,
"ofType": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "OBJECT", "name": "InfraLogEntry", "ofType": null }
}
}
},
"isDeprecated": false,
"deprecationReason": null
}
],
"inputFields": null,
"interfaces": [],
"enumValues": null,
"possibleTypes": null
},
{
"kind": "OBJECT",
"name": "InfraTimeKey",
"description": "A representation of the log entry's position in the event stream",
"fields": [
{
"name": "time",
"description": "The timestamp of the event that the log entry corresponds to",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "Float", "ofType": null }
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "tiebreaker",
"description": "The tiebreaker that disambiguates events with the same timestamp",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "Float", "ofType": null }
},
"isDeprecated": false,
"deprecationReason": null
}
],
"inputFields": null,
"interfaces": [],
"enumValues": null,
"possibleTypes": null
},
{
"kind": "OBJECT",
"name": "InfraLogEntry",
"description": "A log entry",
"fields": [
{
"name": "key",
"description": "A unique representation of the log entry's position in the event stream",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "OBJECT", "name": "InfraTimeKey", "ofType": null }
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "gid",
"description": "The log entry's id",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "String", "ofType": null }
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "source",
"description": "The source id",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "String", "ofType": null }
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "columns",
"description": "The columns used for rendering the log entry",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": {
"kind": "LIST",
"name": null,
"ofType": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "UNION", "name": "InfraLogEntryColumn", "ofType": null }
}
}
},
"isDeprecated": false,
"deprecationReason": null
}
],
"inputFields": null,
"interfaces": [],
"enumValues": null,
"possibleTypes": null
},
{
"kind": "UNION",
"name": "InfraLogEntryColumn",
"description": "A column of a log entry",
"fields": null,
"inputFields": null,
"interfaces": null,
"enumValues": null,
"possibleTypes": [
{ "kind": "OBJECT", "name": "InfraLogEntryTimestampColumn", "ofType": null },
{ "kind": "OBJECT", "name": "InfraLogEntryMessageColumn", "ofType": null },
{ "kind": "OBJECT", "name": "InfraLogEntryFieldColumn", "ofType": null }
]
},
{
"kind": "OBJECT",
"name": "InfraLogEntryTimestampColumn",
"description": "A special built-in column that contains the log entry's timestamp",
"fields": [
{
"name": "columnId",
"description": "The id of the corresponding column configuration",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "ID", "ofType": null }
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "timestamp",
"description": "The timestamp",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "Float", "ofType": null }
},
"isDeprecated": false,
"deprecationReason": null
}
],
"inputFields": null,
"interfaces": [],
"enumValues": null,
"possibleTypes": null
},
{
"kind": "OBJECT",
"name": "InfraLogEntryMessageColumn",
"description": "A special built-in column that contains the log entry's constructed message",
"fields": [
{
"name": "columnId",
"description": "The id of the corresponding column configuration",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "ID", "ofType": null }
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "message",
"description": "A list of the formatted log entry segments",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": {
"kind": "LIST",
"name": null,
"ofType": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "UNION", "name": "InfraLogMessageSegment", "ofType": null }
}
}
},
"isDeprecated": false,
"deprecationReason": null
}
],
"inputFields": null,
"interfaces": [],
"enumValues": null,
"possibleTypes": null
},
{
"kind": "UNION",
"name": "InfraLogMessageSegment",
"description": "A segment of the log entry message",
"fields": null,
"inputFields": null,
"interfaces": null,
"enumValues": null,
"possibleTypes": [
{ "kind": "OBJECT", "name": "InfraLogMessageFieldSegment", "ofType": null },
{ "kind": "OBJECT", "name": "InfraLogMessageConstantSegment", "ofType": null }
]
},
{
"kind": "OBJECT",
"name": "InfraLogMessageFieldSegment",
"description": "A segment of the log entry message that was derived from a field",
"fields": [
{
"name": "field",
"description": "The field the segment was derived from",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "String", "ofType": null }
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "value",
"description": "The segment's message",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "String", "ofType": null }
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "highlights",
"description": "A list of highlighted substrings of the value",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": {
"kind": "LIST",
"name": null,
"ofType": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "String", "ofType": null }
}
}
},
"isDeprecated": false,
"deprecationReason": null
}
],
"inputFields": null,
"interfaces": [],
"enumValues": null,
"possibleTypes": null
},
{
"kind": "OBJECT",
"name": "InfraLogMessageConstantSegment",
"description": "A segment of the log entry message that was derived from a string literal",
"fields": [
{
"name": "constant",
"description": "The segment's message",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "String", "ofType": null }
},
"isDeprecated": false,
"deprecationReason": null
}
],
"inputFields": null,
"interfaces": [],
"enumValues": null,
"possibleTypes": null
},
{
"kind": "OBJECT",
"name": "InfraLogEntryFieldColumn",
"description": "A column that contains the value of a field of the log entry",
"fields": [
{
"name": "columnId",
"description": "The id of the corresponding column configuration",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "ID", "ofType": null }
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "field",
"description": "The field name of the column",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "String", "ofType": null }
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "value",
"description": "The value of the field in the log entry",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "String", "ofType": null }
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "highlights",
"description": "A list of highlighted substrings of the value",
"args": [],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": {
"kind": "LIST",
"name": null,
"ofType": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "String", "ofType": null }
}
}
},
"isDeprecated": false,
"deprecationReason": null
}
],
"inputFields": null,
"interfaces": [],
"enumValues": null,
"possibleTypes": null
},
{
"kind": "INPUT_OBJECT",
"name": "InfraLogEntryHighlightInput",
"description": "A highlighting definition",
"fields": null,
"inputFields": [
{
"name": "query",
"description": "The query to highlight by",
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "String", "ofType": null }
},
"defaultValue": null
},
{
"name": "countBefore",
"description": "The number of highlighted documents to include beyond the beginning of the interval",
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "Int", "ofType": null }
},
"defaultValue": null
},
{
"name": "countAfter",
"description": "The number of highlighted documents to include beyond the end of the interval",
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": { "kind": "SCALAR", "name": "Int", "ofType": null }
},
"defaultValue": null
}
],
"interfaces": null,
"enumValues": null,
"possibleTypes": null
},
{
"kind": "INPUT_OBJECT",
"name": "InfraTimerangeInput",

View file

@ -30,12 +30,6 @@ export interface InfraSource {
configuration: InfraSourceConfiguration;
/** The status of the source */
status: InfraSourceStatus;
/** A consecutive span of log entries surrounding a point in time */
logEntriesAround: InfraLogEntryInterval;
/** A consecutive span of log entries within an interval */
logEntriesBetween: InfraLogEntryInterval;
/** Sequences of log entries matching sets of highlighting queries within an interval */
logEntryHighlights: InfraLogEntryInterval[];
/** A snapshot of nodes */
snapshot?: InfraSnapshotResponse | null;
@ -135,80 +129,6 @@ export interface InfraIndexField {
/** Whether the field should be displayed based on event.module and a ECS allowed list */
displayable: boolean;
}
/** A consecutive sequence of log entries */
export interface InfraLogEntryInterval {
/** The key corresponding to the start of the interval covered by the entries */
start?: InfraTimeKey | null;
/** The key corresponding to the end of the interval covered by the entries */
end?: InfraTimeKey | null;
/** Whether there are more log entries available before the start */
hasMoreBefore: boolean;
/** Whether there are more log entries available after the end */
hasMoreAfter: boolean;
/** The query the log entries were filtered by */
filterQuery?: string | null;
/** The query the log entries were highlighted with */
highlightQuery?: string | null;
/** A list of the log entries */
entries: InfraLogEntry[];
}
/** A representation of the log entry's position in the event stream */
export interface InfraTimeKey {
/** The timestamp of the event that the log entry corresponds to */
time: number;
/** The tiebreaker that disambiguates events with the same timestamp */
tiebreaker: number;
}
/** A log entry */
export interface InfraLogEntry {
/** A unique representation of the log entry's position in the event stream */
key: InfraTimeKey;
/** The log entry's id */
gid: string;
/** The source id */
source: string;
/** The columns used for rendering the log entry */
columns: InfraLogEntryColumn[];
}
/** A special built-in column that contains the log entry's timestamp */
export interface InfraLogEntryTimestampColumn {
/** The id of the corresponding column configuration */
columnId: string;
/** The timestamp */
timestamp: number;
}
/** A special built-in column that contains the log entry's constructed message */
export interface InfraLogEntryMessageColumn {
/** The id of the corresponding column configuration */
columnId: string;
/** A list of the formatted log entry segments */
message: InfraLogMessageSegment[];
}
/** A segment of the log entry message that was derived from a field */
export interface InfraLogMessageFieldSegment {
/** The field the segment was derived from */
field: string;
/** The segment's message */
value: string;
/** A list of highlighted substrings of the value */
highlights: string[];
}
/** A segment of the log entry message that was derived from a string literal */
export interface InfraLogMessageConstantSegment {
/** The segment's message */
constant: string;
}
/** A column that contains the value of a field of the log entry */
export interface InfraLogEntryFieldColumn {
/** The id of the corresponding column configuration */
columnId: string;
/** The field name of the column */
field: string;
/** The value of the field in the log entry */
value: string;
/** A list of highlighted substrings of the value */
highlights: string[];
}
export interface InfraSnapshotResponse {
/** Nodes of type host, container or pod grouped by 0, 1 or 2 terms */
@ -282,21 +202,6 @@ export interface DeleteSourceResult {
// InputTypes
// ====================================================
export interface InfraTimeKeyInput {
time: number;
tiebreaker: number;
}
/** A highlighting definition */
export interface InfraLogEntryHighlightInput {
/** The query to highlight by */
query: string;
/** The number of highlighted documents to include beyond the beginning of the interval */
countBefore: number;
/** The number of highlighted documents to include beyond the end of the interval */
countAfter: number;
}
export interface InfraTimerangeInput {
/** The interval string to use for last bucket. The format is '{value}{unit}'. For example '5m' would return the metrics for the last 5 minutes of the timespan. */
interval: string;
@ -387,34 +292,6 @@ export interface SourceQueryArgs {
/** The id of the source */
id: string;
}
export interface LogEntriesAroundInfraSourceArgs {
/** The sort key that corresponds to the point in time */
key: InfraTimeKeyInput;
/** The maximum number of preceding to return */
countBefore?: number | null;
/** The maximum number of following to return */
countAfter?: number | null;
/** The query to filter the log entries by */
filterQuery?: string | null;
}
export interface LogEntriesBetweenInfraSourceArgs {
/** The sort key that corresponds to the start of the interval */
startKey: InfraTimeKeyInput;
/** The sort key that corresponds to the end of the interval */
endKey: InfraTimeKeyInput;
/** The query to filter the log entries by */
filterQuery?: string | null;
}
export interface LogEntryHighlightsInfraSourceArgs {
/** The sort key that corresponds to the start of the interval */
startKey: InfraTimeKeyInput;
/** The sort key that corresponds to the end of the interval */
endKey: InfraTimeKeyInput;
/** The query to filter the log entries by */
filterQuery?: string | null;
/** The highlighting to apply to the log entries */
highlights: InfraLogEntryHighlightInput[];
}
export interface SnapshotInfraSourceArgs {
timerange: InfraTimerangeInput;
@ -571,15 +448,6 @@ export type InfraSourceLogColumn =
| InfraSourceMessageLogColumn
| InfraSourceFieldLogColumn;
/** A column of a log entry */
export type InfraLogEntryColumn =
| InfraLogEntryTimestampColumn
| InfraLogEntryMessageColumn
| InfraLogEntryFieldColumn;
/** A segment of the log entry message */
export type InfraLogMessageSegment = InfraLogMessageFieldSegment | InfraLogMessageConstantSegment;
// ====================================================
// END: Typescript template
// ====================================================
@ -588,46 +456,6 @@ export type InfraLogMessageSegment = InfraLogMessageFieldSegment | InfraLogMessa
// Documents
// ====================================================
export namespace LogEntryHighlightsQuery {
export type Variables = {
sourceId?: string | null;
startKey: InfraTimeKeyInput;
endKey: InfraTimeKeyInput;
filterQuery?: string | null;
highlights: InfraLogEntryHighlightInput[];
};
export type Query = {
__typename?: 'Query';
source: Source;
};
export type Source = {
__typename?: 'InfraSource';
id: string;
logEntryHighlights: LogEntryHighlights[];
};
export type LogEntryHighlights = {
__typename?: 'InfraLogEntryInterval';
start?: Start | null;
end?: End | null;
entries: Entries[];
};
export type Start = InfraTimeKeyFields.Fragment;
export type End = InfraTimeKeyFields.Fragment;
export type Entries = InfraLogEntryHighlightFields.Fragment;
}
export namespace MetricsQuery {
export type Variables = {
sourceId: string;
@ -826,50 +654,6 @@ export namespace WaffleNodesQuery {
};
}
export namespace LogEntries {
export type Variables = {
sourceId?: string | null;
timeKey: InfraTimeKeyInput;
countBefore?: number | null;
countAfter?: number | null;
filterQuery?: string | null;
};
export type Query = {
__typename?: 'Query';
source: Source;
};
export type Source = {
__typename?: 'InfraSource';
id: string;
logEntriesAround: LogEntriesAround;
};
export type LogEntriesAround = {
__typename?: 'InfraLogEntryInterval';
start?: Start | null;
end?: End | null;
hasMoreBefore: boolean;
hasMoreAfter: boolean;
entries: Entries[];
};
export type Start = InfraTimeKeyFields.Fragment;
export type End = InfraTimeKeyFields.Fragment;
export type Entries = InfraLogEntryFields.Fragment;
}
export namespace SourceConfigurationFields {
export type Fragment = {
__typename?: 'InfraSourceConfiguration';
@ -1000,124 +784,3 @@ export namespace InfraSourceFields {
origin: string;
};
}
export namespace InfraLogEntryFields {
export type Fragment = {
__typename?: 'InfraLogEntry';
gid: string;
key: Key;
columns: Columns[];
};
export type Key = {
__typename?: 'InfraTimeKey';
time: number;
tiebreaker: number;
};
export type Columns =
| InfraLogEntryTimestampColumnInlineFragment
| InfraLogEntryMessageColumnInlineFragment
| InfraLogEntryFieldColumnInlineFragment;
export type InfraLogEntryTimestampColumnInlineFragment = {
__typename?: 'InfraLogEntryTimestampColumn';
columnId: string;
timestamp: number;
};
export type InfraLogEntryMessageColumnInlineFragment = {
__typename?: 'InfraLogEntryMessageColumn';
columnId: string;
message: Message[];
};
export type Message =
| InfraLogMessageFieldSegmentInlineFragment
| InfraLogMessageConstantSegmentInlineFragment;
export type InfraLogMessageFieldSegmentInlineFragment = {
__typename?: 'InfraLogMessageFieldSegment';
field: string;
value: string;
};
export type InfraLogMessageConstantSegmentInlineFragment = {
__typename?: 'InfraLogMessageConstantSegment';
constant: string;
};
export type InfraLogEntryFieldColumnInlineFragment = {
__typename?: 'InfraLogEntryFieldColumn';
columnId: string;
field: string;
value: string;
};
}
export namespace InfraLogEntryHighlightFields {
export type Fragment = {
__typename?: 'InfraLogEntry';
gid: string;
key: Key;
columns: Columns[];
};
export type Key = {
__typename?: 'InfraTimeKey';
time: number;
tiebreaker: number;
};
export type Columns =
| InfraLogEntryMessageColumnInlineFragment
| InfraLogEntryFieldColumnInlineFragment;
export type InfraLogEntryMessageColumnInlineFragment = {
__typename?: 'InfraLogEntryMessageColumn';
columnId: string;
message: Message[];
};
export type Message = InfraLogMessageFieldSegmentInlineFragment;
export type InfraLogMessageFieldSegmentInlineFragment = {
__typename?: 'InfraLogMessageFieldSegment';
field: string;
highlights: string[];
};
export type InfraLogEntryFieldColumnInlineFragment = {
__typename?: 'InfraLogEntryFieldColumn';
columnId: string;
field: string;
highlights: string[];
};
}

View file

@ -127,6 +127,7 @@ export const CategoryExampleMessage: React.FunctionComponent<{
onClick: () => {
const logEntry: LogEntry = {
id,
index: '', // TODO: use real index when loading via async search
context,
cursor: { time: timestamp, tiebreaker },
columns: [],

View file

@ -28,6 +28,7 @@ export function generateFakeEntries(
const timestamp = i === count - 1 ? endTimestamp : startTimestamp + timestampStep * i;
entries.push({
id: `entry-${i}`,
index: 'logs-fake',
context: {},
cursor: { time: timestamp, tiebreaker: i },
columns: columns.map((column) => {

View file

@ -43,15 +43,35 @@ be issued by calling the returned `search()` function. For each new request the
hook emits an object describing the request and its state in the `requests$`
`Observable`.
Since the specific response shape depends on the data strategy used, the hook
takes a projection function, that is responsible for decoding the response in
an appropriate way. Because most response projections follow a similar pattern
there's a helper `normalizeDataSearchResponses(initialResponse,
parseRawResponse)`, which generates an RxJS operator, that...
- emits an initial response containing the given `initialResponse` value
- applies `parseRawResponse` to the `rawResponse` property of each emitted response
- transforms transport layer errors as well as parsing errors into
`SearchStrategyError`s
```typescript
const parseMyCustomSearchResponse = normalizeDataSearchResponses(
'initial value',
decodeOrThrow(myCustomSearchResponsePayloadRT)
);
const { search, requests$ } = useDataSearch({
getRequest: useCallback((searchTerm: string) => ({
request: {
params: {
searchTerm
}
}
}), []);
},
options: {
strategy: 'my-custom-search-strategy',
},
},
}), []),
parseResponses: parseMyCustomSearchResponse,
});
```
@ -68,10 +88,6 @@ observables are unsubscribed from for proper cancellation if a new request has
been created. This uses RxJS's `switchMap()` operator under the hood. The hook
also makes sure that all observables are unsubscribed from on unmount.
Since the specific response shape depends on the data strategy used, the hook
takes a projection function, that is responsible for decoding the response in
an appropriate way.
A request can fail due to various reasons that include servers-side errors,
Elasticsearch shard failures and network failures. The intention is to map all
of them to a common `SearchStrategyError` interface. While the
@ -94,11 +110,7 @@ const {
latestResponseErrors,
loaded,
total,
} = useLatestPartialDataSearchResponse(
requests$,
'initialValue',
useMemo(() => decodeOrThrow(mySearchStrategyResponsePayloadRT), []),
);
} = useLatestPartialDataSearchResponse(requests$);
```
## Representing the request state to the user

View file

@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { map } from 'rxjs/operators';
import { IKibanaSearchRequest } from '../../../../../../src/plugins/data/public';
import { ParsedDataSearchRequestDescriptor } from './types';
export const flattenDataSearchResponseDescriptor = <
Request extends IKibanaSearchRequest,
Response
>({
abortController,
options,
request,
response$,
}: ParsedDataSearchRequestDescriptor<Request, Response>) =>
response$.pipe(
map((response) => {
return {
abortController,
options,
request,
response,
};
})
);

View file

@ -4,6 +4,9 @@
* you may not use this file except in compliance with the Elastic License.
*/
export * from './flatten_data_search_response';
export * from './normalize_data_search_responses';
export * from './types';
export * from './use_data_search_request';
export * from './use_data_search_response_state';
export * from './use_latest_partial_data_search_response';

View file

@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Observable, of } from 'rxjs';
import { catchError, map, startWith } from 'rxjs/operators';
import { IKibanaSearchResponse } from '../../../../../../src/plugins/data/public';
import { AbortError } from '../../../../../../src/plugins/kibana_utils/public';
import { SearchStrategyError } from '../../../common/search_strategies/common/errors';
import { ParsedKibanaSearchResponse } from './types';
export type RawResponseParser<RawResponse, Response> = (
rawResponse: RawResponse
) => { data: Response; errors?: SearchStrategyError[] };
/**
* An operator factory that normalizes each {@link IKibanaSearchResponse} by
* parsing it into a {@link ParsedKibanaSearchResponse} and adding initial
* responses and error handling.
*
* @param initialResponse - The initial value to emit when a new request is
* handled.
* @param projectResponse - The projection function to apply to each response
* payload. It should validate that the response payload is of the type {@link
* RawResponse} and decode it to a {@link Response}.
*
* @return An operator that adds parsing and error handling transformations to
* each response payload using the arguments given above.
*/
export const normalizeDataSearchResponses = <RawResponse, Response, InitialResponse>(
initialResponse: InitialResponse,
parseRawResponse: RawResponseParser<RawResponse, Response>
) => (
response$: Observable<IKibanaSearchResponse<RawResponse>>
): Observable<ParsedKibanaSearchResponse<Response | InitialResponse>> =>
response$.pipe(
map((response) => {
const { data, errors = [] } = parseRawResponse(response.rawResponse);
return {
data,
errors,
isPartial: response.isPartial ?? false,
isRunning: response.isRunning ?? false,
loaded: response.loaded,
total: response.total,
};
}),
startWith({
data: initialResponse,
errors: [],
isPartial: true,
isRunning: true,
loaded: 0,
total: undefined,
}),
catchError((error) =>
of({
data: initialResponse,
errors: [
error instanceof AbortError
? {
type: 'aborted' as const,
}
: {
type: 'generic' as const,
message: `${error.message ?? error}`,
},
],
isPartial: true,
isRunning: false,
loaded: 0,
total: undefined,
})
)
);

View file

@ -19,7 +19,17 @@ export interface DataSearchRequestDescriptor<Request extends IKibanaSearchReques
abortController: AbortController;
}
export interface NormalizedKibanaSearchResponse<ResponseData> {
export interface ParsedDataSearchRequestDescriptor<
Request extends IKibanaSearchRequest,
ResponseData
> {
request: Request;
options: ISearchOptions;
response$: Observable<ParsedKibanaSearchResponse<ResponseData>>;
abortController: AbortController;
}
export interface ParsedKibanaSearchResponse<ResponseData> {
total?: number;
loaded?: number;
isRunning: boolean;
@ -28,9 +38,12 @@ export interface NormalizedKibanaSearchResponse<ResponseData> {
errors: SearchStrategyError[];
}
export interface DataSearchResponseDescriptor<Request extends IKibanaSearchRequest, Response> {
export interface ParsedDataSearchResponseDescriptor<
Request extends IKibanaSearchRequest,
Response
> {
request: Request;
options: ISearchOptions;
response: NormalizedKibanaSearchResponse<Response>;
response: ParsedKibanaSearchResponse<Response>;
abortController: AbortController;
}

View file

@ -17,6 +17,7 @@ import {
import { dataPluginMock } from '../../../../../../src/plugins/data/public/mocks';
import { createKibanaReactContext } from '../../../../../../src/plugins/kibana_react/public';
import { PluginKibanaContextValue } from '../../hooks/use_kibana';
import { normalizeDataSearchResponses } from './normalize_data_search_responses';
import { useDataSearch } from './use_data_search_request';
describe('useDataSearch hook', () => {
@ -34,6 +35,7 @@ describe('useDataSearch hook', () => {
() =>
useDataSearch({
getRequest,
parseResponses: noopParseResponse,
}),
{
wrapper: ({ children }) => <KibanaContextProvider>{children}</KibanaContextProvider>,
@ -48,7 +50,7 @@ describe('useDataSearch hook', () => {
expect(dataMock.search.search).not.toHaveBeenCalled();
});
it('creates search requests with the given params and options', async () => {
it('creates search requests with the given params and options and parses the responses', async () => {
const dataMock = createDataPluginMock();
const searchResponseMock$ = of<IKibanaSearchResponse>({
rawResponse: {
@ -78,6 +80,7 @@ describe('useDataSearch hook', () => {
() =>
useDataSearch({
getRequest,
parseResponses: noopParseResponse,
}),
{
wrapper: ({ children }) => <KibanaContextProvider>{children}</KibanaContextProvider>,
@ -112,10 +115,11 @@ describe('useDataSearch hook', () => {
});
expect(firstRequest).toHaveProperty('options.strategy', 'test-search-strategy');
expect(firstRequest).toHaveProperty('response$', expect.any(Observable));
await expect(firstRequest.response$.toPromise()).resolves.toEqual({
rawResponse: {
firstKey: 'firstValue',
await expect(firstRequest.response$.toPromise()).resolves.toMatchObject({
data: {
firstKey: 'firstValue', // because this specific response parser just copies the raw response
},
errors: [],
});
});
@ -145,6 +149,7 @@ describe('useDataSearch hook', () => {
() =>
useDataSearch({
getRequest,
parseResponses: noopParseResponse,
}),
{
wrapper: ({ children }) => <KibanaContextProvider>{children}</KibanaContextProvider>,
@ -186,3 +191,8 @@ const createDataPluginMock = () => {
};
return dataMock;
};
const noopParseResponse = normalizeDataSearchResponses(
null,
<Response extends any>(response: Response) => ({ data: response })
);

View file

@ -5,8 +5,8 @@
*/
import { useCallback } from 'react';
import { Subject } from 'rxjs';
import { map, share, switchMap, tap } from 'rxjs/operators';
import { OperatorFunction, Subject } from 'rxjs';
import { share, tap } from 'rxjs/operators';
import {
IKibanaSearchRequest,
IKibanaSearchResponse,
@ -14,6 +14,7 @@ import {
} from '../../../../../../src/plugins/data/public';
import { useKibanaContextForPlugin } from '../../hooks/use_kibana';
import { tapUnsubscribe, useObservable } from '../use_observable';
import { ParsedDataSearchRequestDescriptor, ParsedKibanaSearchResponse } from './types';
export type DataSearchRequestFactory<Args extends any[], Request extends IKibanaSearchRequest> = (
...args: Args
@ -25,69 +26,74 @@ export type DataSearchRequestFactory<Args extends any[], Request extends IKibana
| null
| undefined;
type ParseResponsesOperator<RawResponse, Response> = OperatorFunction<
IKibanaSearchResponse<RawResponse>,
ParsedKibanaSearchResponse<Response>
>;
export const useDataSearch = <
RequestFactoryArgs extends any[],
Request extends IKibanaSearchRequest,
RawResponse
RequestParams,
Request extends IKibanaSearchRequest<RequestParams>,
RawResponse,
Response
>({
getRequest,
parseResponses,
}: {
getRequest: DataSearchRequestFactory<RequestFactoryArgs, Request>;
parseResponses: ParseResponsesOperator<RawResponse, Response>;
}) => {
const { services } = useKibanaContextForPlugin();
const request$ = useObservable(
() => new Subject<{ request: Request; options: ISearchOptions }>(),
[]
);
const requests$ = useObservable(
(inputs$) =>
inputs$.pipe(
switchMap(([currentRequest$]) => currentRequest$),
map(({ request, options }) => {
const abortController = new AbortController();
let isAbortable = true;
return {
abortController,
request,
options,
response$: services.data.search
.search<Request, IKibanaSearchResponse<RawResponse>>(request, {
abortSignal: abortController.signal,
...options,
})
.pipe(
// avoid aborting failed or completed requests
tap({
error: () => {
isAbortable = false;
},
complete: () => {
isAbortable = false;
},
}),
tapUnsubscribe(() => {
if (isAbortable) {
abortController.abort();
}
}),
share()
),
};
})
),
[request$]
() => new Subject<ParsedDataSearchRequestDescriptor<Request, Response>>(),
[]
);
const search = useCallback(
(...args: RequestFactoryArgs) => {
const request = getRequest(...args);
const requestArgs = getRequest(...args);
if (request) {
request$.next(request);
if (requestArgs == null) {
return;
}
const abortController = new AbortController();
let isAbortable = true;
const newRequestDescriptor = {
...requestArgs,
abortController,
response$: services.data.search
.search<Request, IKibanaSearchResponse<RawResponse>>(requestArgs.request, {
abortSignal: abortController.signal,
...requestArgs.options,
})
.pipe(
// avoid aborting failed or completed requests
tap({
error: () => {
isAbortable = false;
},
complete: () => {
isAbortable = false;
},
}),
tapUnsubscribe(() => {
if (isAbortable) {
abortController.abort();
}
}),
parseResponses,
share()
),
};
requests$.next(newRequestDescriptor);
return newRequestDescriptor;
},
[getRequest, request$]
[getRequest, services.data.search, parseResponses, requests$]
);
return {

View file

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { useCallback } from 'react';
import { Observable } from 'rxjs';
import { IKibanaSearchRequest } from '../../../../../../src/plugins/data/public';
import { useObservableState } from '../use_observable';
import { ParsedDataSearchResponseDescriptor } from './types';
export const useDataSearchResponseState = <
Request extends IKibanaSearchRequest,
Response,
InitialResponse
>(
response$: Observable<ParsedDataSearchResponseDescriptor<Request, Response | InitialResponse>>
) => {
const { latestValue } = useObservableState(response$, undefined);
const cancelRequest = useCallback(() => {
latestValue?.abortController.abort();
}, [latestValue]);
return {
cancelRequest,
isRequestRunning: latestValue?.response.isRunning ?? false,
isResponsePartial: latestValue?.response.isPartial ?? false,
latestResponseData: latestValue?.response.data,
latestResponseErrors: latestValue?.response.errors,
loaded: latestValue?.response.loaded,
total: latestValue?.response.total,
};
};

View file

@ -5,12 +5,9 @@
*/
import { act, renderHook } from '@testing-library/react-hooks';
import { Observable, of, Subject } from 'rxjs';
import {
IKibanaSearchRequest,
IKibanaSearchResponse,
} from '../../../../../../src/plugins/data/public';
import { DataSearchRequestDescriptor } from './types';
import { BehaviorSubject, Observable, of, Subject } from 'rxjs';
import { IKibanaSearchRequest } from '../../../../../../src/plugins/data/public';
import { ParsedDataSearchRequestDescriptor, ParsedKibanaSearchResponse } from './types';
import { useLatestPartialDataSearchResponse } from './use_latest_partial_data_search_response';
describe('useLatestPartialDataSearchResponse hook', () => {
@ -19,25 +16,31 @@ describe('useLatestPartialDataSearchResponse hook', () => {
abortController: new AbortController(),
options: {},
request: { params: 'firstRequestParam' },
response$: new Subject<IKibanaSearchResponse<string>>(),
response$: new BehaviorSubject<ParsedKibanaSearchResponse<string>>({
data: 'initial',
isRunning: true,
isPartial: true,
errors: [],
}),
};
const secondRequest = {
abortController: new AbortController(),
options: {},
request: { params: 'secondRequestParam' },
response$: new Subject<IKibanaSearchResponse<string>>(),
response$: new BehaviorSubject<ParsedKibanaSearchResponse<string>>({
data: 'initial',
isRunning: true,
isPartial: true,
errors: [],
}),
};
const requests$ = new Subject<
DataSearchRequestDescriptor<IKibanaSearchRequest<string>, string>
ParsedDataSearchRequestDescriptor<IKibanaSearchRequest<string>, string>
>();
const { result } = renderHook(() =>
useLatestPartialDataSearchResponse(requests$, 'initial', (response) => ({
data: `projection of ${response}`,
}))
);
const { result } = renderHook(() => useLatestPartialDataSearchResponse(requests$));
expect(result).toHaveProperty('current.isRequestRunning', false);
expect(result).toHaveProperty('current.latestResponseData', undefined);
@ -52,37 +55,43 @@ describe('useLatestPartialDataSearchResponse hook', () => {
// first response of the first request arrives
act(() => {
firstRequest.response$.next({ rawResponse: 'request-1-response-1', isRunning: true });
firstRequest.response$.next({
data: 'request-1-response-1',
isRunning: true,
isPartial: true,
errors: [],
});
});
expect(result).toHaveProperty('current.isRequestRunning', true);
expect(result).toHaveProperty(
'current.latestResponseData',
'projection of request-1-response-1'
);
expect(result).toHaveProperty('current.latestResponseData', 'request-1-response-1');
// second request is started before the second response of the first request arrives
act(() => {
requests$.next(secondRequest);
secondRequest.response$.next({ rawResponse: 'request-2-response-1', isRunning: true });
secondRequest.response$.next({
data: 'request-2-response-1',
isRunning: true,
isPartial: true,
errors: [],
});
});
expect(result).toHaveProperty('current.isRequestRunning', true);
expect(result).toHaveProperty(
'current.latestResponseData',
'projection of request-2-response-1'
);
expect(result).toHaveProperty('current.latestResponseData', 'request-2-response-1');
// second response of the second request arrives
act(() => {
secondRequest.response$.next({ rawResponse: 'request-2-response-2', isRunning: false });
secondRequest.response$.next({
data: 'request-2-response-2',
isRunning: false,
isPartial: false,
errors: [],
});
});
expect(result).toHaveProperty('current.isRequestRunning', false);
expect(result).toHaveProperty(
'current.latestResponseData',
'projection of request-2-response-2'
);
expect(result).toHaveProperty('current.latestResponseData', 'request-2-response-2');
});
it("unsubscribes from the latest request's response observable on unmount", () => {
@ -92,20 +101,16 @@ describe('useLatestPartialDataSearchResponse hook', () => {
abortController: new AbortController(),
options: {},
request: { params: 'firstRequestParam' },
response$: new Observable<IKibanaSearchResponse<string>>(() => {
response$: new Observable<ParsedKibanaSearchResponse<string>>(() => {
return onUnsubscribe;
}),
};
const requests$ = of<DataSearchRequestDescriptor<IKibanaSearchRequest<string>, string>>(
const requests$ = of<ParsedDataSearchRequestDescriptor<IKibanaSearchRequest<string>, string>>(
firstRequest
);
const { unmount } = renderHook(() =>
useLatestPartialDataSearchResponse(requests$, 'initial', (response) => ({
data: `projection of ${response}`,
}))
);
const { unmount } = renderHook(() => useLatestPartialDataSearchResponse(requests$));
expect(onUnsubscribe).not.toHaveBeenCalled();

View file

@ -4,111 +4,40 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { useCallback } from 'react';
import { Observable, of } from 'rxjs';
import { catchError, map, startWith, switchMap } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { switchMap } from 'rxjs/operators';
import { IKibanaSearchRequest } from '../../../../../../src/plugins/data/public';
import { AbortError } from '../../../../../../src/plugins/kibana_utils/public';
import { SearchStrategyError } from '../../../common/search_strategies/common/errors';
import { useLatest, useObservable, useObservableState } from '../use_observable';
import { DataSearchRequestDescriptor, DataSearchResponseDescriptor } from './types';
import { useOperator } from '../use_observable';
import { flattenDataSearchResponseDescriptor } from './flatten_data_search_response';
import { ParsedDataSearchRequestDescriptor, ParsedDataSearchResponseDescriptor } from './types';
import { useDataSearchResponseState } from './use_data_search_response_state';
export const useLatestPartialDataSearchResponse = <
Request extends IKibanaSearchRequest,
RawResponse,
Response,
InitialResponse
>(
requests$: Observable<DataSearchRequestDescriptor<Request, RawResponse>>,
initialResponse: InitialResponse,
projectResponse: (rawResponse: RawResponse) => { data: Response; errors?: SearchStrategyError[] }
export const useLatestPartialDataSearchResponse = <Request extends IKibanaSearchRequest, Response>(
requests$: Observable<ParsedDataSearchRequestDescriptor<Request, Response>>
) => {
const latestInitialResponse = useLatest(initialResponse);
const latestProjectResponse = useLatest(projectResponse);
const latestResponse$: Observable<
DataSearchResponseDescriptor<Request, Response | InitialResponse>
> = useObservable(
(inputs$) =>
inputs$.pipe(
switchMap(([currentRequests$]) =>
currentRequests$.pipe(
switchMap(({ abortController, options, request, response$ }) =>
response$.pipe(
map((response) => {
const { data, errors = [] } = latestProjectResponse.current(response.rawResponse);
return {
abortController,
options,
request,
response: {
data,
errors,
isPartial: response.isPartial ?? false,
isRunning: response.isRunning ?? false,
loaded: response.loaded,
total: response.total,
},
};
}),
startWith({
abortController,
options,
request,
response: {
data: latestInitialResponse.current,
errors: [],
isPartial: true,
isRunning: true,
loaded: 0,
total: undefined,
},
}),
catchError((error) =>
of({
abortController,
options,
request,
response: {
data: latestInitialResponse.current,
errors: [
error instanceof AbortError
? {
type: 'aborted' as const,
}
: {
type: 'generic' as const,
message: `${error.message ?? error}`,
},
],
isPartial: true,
isRunning: false,
loaded: 0,
total: undefined,
},
})
)
)
)
)
)
),
[requests$] as const
);
ParsedDataSearchResponseDescriptor<Request, Response>
> = useOperator(requests$, flattenLatestDataSearchResponse);
const { latestValue } = useObservableState(latestResponse$, undefined);
const cancelRequest = useCallback(() => {
latestValue?.abortController.abort();
}, [latestValue]);
const {
cancelRequest,
isRequestRunning,
isResponsePartial,
latestResponseData,
latestResponseErrors,
loaded,
total,
} = useDataSearchResponseState(latestResponse$);
return {
cancelRequest,
isRequestRunning: latestValue?.response.isRunning ?? false,
isResponsePartial: latestValue?.response.isPartial ?? false,
latestResponseData: latestValue?.response.data,
latestResponseErrors: latestValue?.response.errors,
loaded: latestValue?.response.loaded,
total: latestValue?.response.total,
isRequestRunning,
isResponsePartial,
latestResponseData,
latestResponseErrors,
loaded,
total,
};
};
const flattenLatestDataSearchResponse = switchMap(flattenDataSearchResponseDescriptor);

View file

@ -5,9 +5,7 @@
*/
import { bisector } from 'd3-array';
import { compareToTimeKey, getIndexAtTimeKey, TimeKey, UniqueTimeKey } from '../../../common/time';
import { InfraLogEntryFields } from '../../graphql/types';
import {
LogEntry,
LogColumn,
@ -19,10 +17,6 @@ import {
LogMessageConstantPart,
} from '../../../common/log_entry';
export type LogEntryMessageSegment = InfraLogEntryFields.Message;
export type LogEntryConstantMessageSegment = InfraLogEntryFields.InfraLogMessageConstantSegmentInlineFragment;
export type LogEntryFieldMessageSegment = InfraLogEntryFields.InfraLogMessageFieldSegmentInlineFragment;
export const getLogEntryKey = (entry: { cursor: TimeKey }) => entry.cursor;
export const getUniqueLogEntryKey = (entry: { id: string; cursor: TimeKey }): UniqueTimeKey => ({

View file

@ -4,7 +4,6 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { InfraLogEntryHighlightFields } from '../../graphql/types';
import {
LogEntry,
LogColumn,
@ -14,13 +13,6 @@ import {
LogMessageFieldPart,
} from '../../../common/log_entry';
export type LogEntryHighlightColumn = InfraLogEntryHighlightFields.Columns;
export type LogEntryHighlightMessageColumn = InfraLogEntryHighlightFields.InfraLogEntryMessageColumnInlineFragment;
export type LogEntryHighlightFieldColumn = InfraLogEntryHighlightFields.InfraLogEntryFieldColumnInlineFragment;
export type LogEntryHighlightMessageSegment = InfraLogEntryHighlightFields.Message | {};
export type LogEntryHighlightFieldMessageSegment = InfraLogEntryHighlightFields.InfraLogMessageFieldSegmentInlineFragment;
export interface LogEntryHighlightsMap {
[entryId: string]: LogEntry[];
}

View file

@ -5,7 +5,8 @@
*/
import { useEffect, useRef, useState } from 'react';
import { BehaviorSubject, Observable, PartialObserver, Subscription } from 'rxjs';
import { BehaviorSubject, Observable, OperatorFunction, PartialObserver, Subscription } from 'rxjs';
import { switchMap } from 'rxjs/operators';
export const useLatest = <Value>(value: Value) => {
const valueRef = useRef(value);
@ -62,7 +63,9 @@ export const useSubscription = <InputValue>(
const fixedUnsubscribe = latestUnsubscribe.current;
const subscription = input$.subscribe({
next: (value) => latestNext.current?.(value),
next: (value) => {
return latestNext.current?.(value);
},
error: (value) => latestError.current?.(value),
complete: () => latestComplete.current?.(),
});
@ -78,6 +81,19 @@ export const useSubscription = <InputValue>(
return latestSubscription.current;
};
export const useOperator = <InputValue, OutputValue>(
input$: Observable<InputValue>,
operator: OperatorFunction<InputValue, OutputValue>
) => {
const latestOperator = useLatest(operator);
return useObservable(
(inputs$) =>
inputs$.pipe(switchMap(([currentInput$]) => latestOperator.current(currentInput$))),
[input$] as const
);
};
export const tapUnsubscribe = (onUnsubscribe: () => void) => <T>(source$: Observable<T>) => {
return new Observable<T>((subscriber) => {
const subscription = source$.subscribe({

View file

@ -56,12 +56,6 @@ export interface InfraSource {
configuration: InfraSourceConfiguration;
/** The status of the source */
status: InfraSourceStatus;
/** A consecutive span of log entries surrounding a point in time */
logEntriesAround: InfraLogEntryInterval;
/** A consecutive span of log entries within an interval */
logEntriesBetween: InfraLogEntryInterval;
/** Sequences of log entries matching sets of highlighting queries within an interval */
logEntryHighlights: InfraLogEntryInterval[];
/** A snapshot of nodes */
snapshot?: InfraSnapshotResponse | null;
@ -157,80 +151,6 @@ export interface InfraIndexField {
/** Whether the field should be displayed based on event.module and a ECS allowed list */
displayable: boolean;
}
/** A consecutive sequence of log entries */
export interface InfraLogEntryInterval {
/** The key corresponding to the start of the interval covered by the entries */
start?: InfraTimeKey | null;
/** The key corresponding to the end of the interval covered by the entries */
end?: InfraTimeKey | null;
/** Whether there are more log entries available before the start */
hasMoreBefore: boolean;
/** Whether there are more log entries available after the end */
hasMoreAfter: boolean;
/** The query the log entries were filtered by */
filterQuery?: string | null;
/** The query the log entries were highlighted with */
highlightQuery?: string | null;
/** A list of the log entries */
entries: InfraLogEntry[];
}
/** A representation of the log entry's position in the event stream */
export interface InfraTimeKey {
/** The timestamp of the event that the log entry corresponds to */
time: number;
/** The tiebreaker that disambiguates events with the same timestamp */
tiebreaker: number;
}
/** A log entry */
export interface InfraLogEntry {
/** A unique representation of the log entry's position in the event stream */
key: InfraTimeKey;
/** The log entry's id */
gid: string;
/** The source id */
source: string;
/** The columns used for rendering the log entry */
columns: InfraLogEntryColumn[];
}
/** A special built-in column that contains the log entry's timestamp */
export interface InfraLogEntryTimestampColumn {
/** The id of the corresponding column configuration */
columnId: string;
/** The timestamp */
timestamp: number;
}
/** A special built-in column that contains the log entry's constructed message */
export interface InfraLogEntryMessageColumn {
/** The id of the corresponding column configuration */
columnId: string;
/** A list of the formatted log entry segments */
message: InfraLogMessageSegment[];
}
/** A segment of the log entry message that was derived from a field */
export interface InfraLogMessageFieldSegment {
/** The field the segment was derived from */
field: string;
/** The segment's message */
value: string;
/** A list of highlighted substrings of the value */
highlights: string[];
}
/** A segment of the log entry message that was derived from a string literal */
export interface InfraLogMessageConstantSegment {
/** The segment's message */
constant: string;
}
/** A column that contains the value of a field of the log entry */
export interface InfraLogEntryFieldColumn {
/** The id of the corresponding column configuration */
columnId: string;
/** The field name of the column */
field: string;
/** The value of the field in the log entry */
value: string;
/** A list of highlighted substrings of the value */
highlights: string[];
}
export interface InfraSnapshotResponse {
/** Nodes of type host, container or pod grouped by 0, 1 or 2 terms */
@ -304,21 +224,6 @@ export interface DeleteSourceResult {
// InputTypes
// ====================================================
export interface InfraTimeKeyInput {
time: number;
tiebreaker: number;
}
/** A highlighting definition */
export interface InfraLogEntryHighlightInput {
/** The query to highlight by */
query: string;
/** The number of highlighted documents to include beyond the beginning of the interval */
countBefore: number;
/** The number of highlighted documents to include beyond the end of the interval */
countAfter: number;
}
export interface InfraTimerangeInput {
/** The interval string to use for last bucket. The format is '{value}{unit}'. For example '5m' would return the metrics for the last 5 minutes of the timespan. */
interval: string;
@ -409,34 +314,6 @@ export interface SourceQueryArgs {
/** The id of the source */
id: string;
}
export interface LogEntriesAroundInfraSourceArgs {
/** The sort key that corresponds to the point in time */
key: InfraTimeKeyInput;
/** The maximum number of preceding to return */
countBefore?: number | null;
/** The maximum number of following to return */
countAfter?: number | null;
/** The query to filter the log entries by */
filterQuery?: string | null;
}
export interface LogEntriesBetweenInfraSourceArgs {
/** The sort key that corresponds to the start of the interval */
startKey: InfraTimeKeyInput;
/** The sort key that corresponds to the end of the interval */
endKey: InfraTimeKeyInput;
/** The query to filter the log entries by */
filterQuery?: string | null;
}
export interface LogEntryHighlightsInfraSourceArgs {
/** The sort key that corresponds to the start of the interval */
startKey: InfraTimeKeyInput;
/** The sort key that corresponds to the end of the interval */
endKey: InfraTimeKeyInput;
/** The query to filter the log entries by */
filterQuery?: string | null;
/** The highlighting to apply to the log entries */
highlights: InfraLogEntryHighlightInput[];
}
export interface SnapshotInfraSourceArgs {
timerange: InfraTimerangeInput;
@ -593,15 +470,6 @@ export type InfraSourceLogColumn =
| InfraSourceMessageLogColumn
| InfraSourceFieldLogColumn;
/** A column of a log entry */
export type InfraLogEntryColumn =
| InfraLogEntryTimestampColumn
| InfraLogEntryMessageColumn
| InfraLogEntryFieldColumn;
/** A segment of the log entry message */
export type InfraLogMessageSegment = InfraLogMessageFieldSegment | InfraLogMessageConstantSegment;
// ====================================================
// END: Typescript template
// ====================================================
@ -650,12 +518,6 @@ export namespace InfraSourceResolvers {
configuration?: ConfigurationResolver<InfraSourceConfiguration, TypeParent, Context>;
/** The status of the source */
status?: StatusResolver<InfraSourceStatus, TypeParent, Context>;
/** A consecutive span of log entries surrounding a point in time */
logEntriesAround?: LogEntriesAroundResolver<InfraLogEntryInterval, TypeParent, Context>;
/** A consecutive span of log entries within an interval */
logEntriesBetween?: LogEntriesBetweenResolver<InfraLogEntryInterval, TypeParent, Context>;
/** Sequences of log entries matching sets of highlighting queries within an interval */
logEntryHighlights?: LogEntryHighlightsResolver<InfraLogEntryInterval[], TypeParent, Context>;
/** A snapshot of nodes */
snapshot?: SnapshotResolver<InfraSnapshotResponse | null, TypeParent, Context>;
@ -693,51 +555,6 @@ export namespace InfraSourceResolvers {
Parent = InfraSource,
Context = InfraContext
> = Resolver<R, Parent, Context>;
export type LogEntriesAroundResolver<
R = InfraLogEntryInterval,
Parent = InfraSource,
Context = InfraContext
> = Resolver<R, Parent, Context, LogEntriesAroundArgs>;
export interface LogEntriesAroundArgs {
/** The sort key that corresponds to the point in time */
key: InfraTimeKeyInput;
/** The maximum number of preceding to return */
countBefore?: number | null;
/** The maximum number of following to return */
countAfter?: number | null;
/** The query to filter the log entries by */
filterQuery?: string | null;
}
export type LogEntriesBetweenResolver<
R = InfraLogEntryInterval,
Parent = InfraSource,
Context = InfraContext
> = Resolver<R, Parent, Context, LogEntriesBetweenArgs>;
export interface LogEntriesBetweenArgs {
/** The sort key that corresponds to the start of the interval */
startKey: InfraTimeKeyInput;
/** The sort key that corresponds to the end of the interval */
endKey: InfraTimeKeyInput;
/** The query to filter the log entries by */
filterQuery?: string | null;
}
export type LogEntryHighlightsResolver<
R = InfraLogEntryInterval[],
Parent = InfraSource,
Context = InfraContext
> = Resolver<R, Parent, Context, LogEntryHighlightsArgs>;
export interface LogEntryHighlightsArgs {
/** The sort key that corresponds to the start of the interval */
startKey: InfraTimeKeyInput;
/** The sort key that corresponds to the end of the interval */
endKey: InfraTimeKeyInput;
/** The query to filter the log entries by */
filterQuery?: string | null;
/** The highlighting to apply to the log entries */
highlights: InfraLogEntryHighlightInput[];
}
export type SnapshotResolver<
R = InfraSnapshotResponse | null,
@ -1059,229 +876,6 @@ export namespace InfraIndexFieldResolvers {
Context = InfraContext
> = Resolver<R, Parent, Context>;
}
/** A consecutive sequence of log entries */
export namespace InfraLogEntryIntervalResolvers {
export interface Resolvers<Context = InfraContext, TypeParent = InfraLogEntryInterval> {
/** The key corresponding to the start of the interval covered by the entries */
start?: StartResolver<InfraTimeKey | null, TypeParent, Context>;
/** The key corresponding to the end of the interval covered by the entries */
end?: EndResolver<InfraTimeKey | null, TypeParent, Context>;
/** Whether there are more log entries available before the start */
hasMoreBefore?: HasMoreBeforeResolver<boolean, TypeParent, Context>;
/** Whether there are more log entries available after the end */
hasMoreAfter?: HasMoreAfterResolver<boolean, TypeParent, Context>;
/** The query the log entries were filtered by */
filterQuery?: FilterQueryResolver<string | null, TypeParent, Context>;
/** The query the log entries were highlighted with */
highlightQuery?: HighlightQueryResolver<string | null, TypeParent, Context>;
/** A list of the log entries */
entries?: EntriesResolver<InfraLogEntry[], TypeParent, Context>;
}
export type StartResolver<
R = InfraTimeKey | null,
Parent = InfraLogEntryInterval,
Context = InfraContext
> = Resolver<R, Parent, Context>;
export type EndResolver<
R = InfraTimeKey | null,
Parent = InfraLogEntryInterval,
Context = InfraContext
> = Resolver<R, Parent, Context>;
export type HasMoreBeforeResolver<
R = boolean,
Parent = InfraLogEntryInterval,
Context = InfraContext
> = Resolver<R, Parent, Context>;
export type HasMoreAfterResolver<
R = boolean,
Parent = InfraLogEntryInterval,
Context = InfraContext
> = Resolver<R, Parent, Context>;
export type FilterQueryResolver<
R = string | null,
Parent = InfraLogEntryInterval,
Context = InfraContext
> = Resolver<R, Parent, Context>;
export type HighlightQueryResolver<
R = string | null,
Parent = InfraLogEntryInterval,
Context = InfraContext
> = Resolver<R, Parent, Context>;
export type EntriesResolver<
R = InfraLogEntry[],
Parent = InfraLogEntryInterval,
Context = InfraContext
> = Resolver<R, Parent, Context>;
}
/** A representation of the log entry's position in the event stream */
export namespace InfraTimeKeyResolvers {
export interface Resolvers<Context = InfraContext, TypeParent = InfraTimeKey> {
/** The timestamp of the event that the log entry corresponds to */
time?: TimeResolver<number, TypeParent, Context>;
/** The tiebreaker that disambiguates events with the same timestamp */
tiebreaker?: TiebreakerResolver<number, TypeParent, Context>;
}
export type TimeResolver<R = number, Parent = InfraTimeKey, Context = InfraContext> = Resolver<
R,
Parent,
Context
>;
export type TiebreakerResolver<
R = number,
Parent = InfraTimeKey,
Context = InfraContext
> = Resolver<R, Parent, Context>;
}
/** A log entry */
export namespace InfraLogEntryResolvers {
export interface Resolvers<Context = InfraContext, TypeParent = InfraLogEntry> {
/** A unique representation of the log entry's position in the event stream */
key?: KeyResolver<InfraTimeKey, TypeParent, Context>;
/** The log entry's id */
gid?: GidResolver<string, TypeParent, Context>;
/** The source id */
source?: SourceResolver<string, TypeParent, Context>;
/** The columns used for rendering the log entry */
columns?: ColumnsResolver<InfraLogEntryColumn[], TypeParent, Context>;
}
export type KeyResolver<
R = InfraTimeKey,
Parent = InfraLogEntry,
Context = InfraContext
> = Resolver<R, Parent, Context>;
export type GidResolver<R = string, Parent = InfraLogEntry, Context = InfraContext> = Resolver<
R,
Parent,
Context
>;
export type SourceResolver<R = string, Parent = InfraLogEntry, Context = InfraContext> = Resolver<
R,
Parent,
Context
>;
export type ColumnsResolver<
R = InfraLogEntryColumn[],
Parent = InfraLogEntry,
Context = InfraContext
> = Resolver<R, Parent, Context>;
}
/** A special built-in column that contains the log entry's timestamp */
export namespace InfraLogEntryTimestampColumnResolvers {
export interface Resolvers<Context = InfraContext, TypeParent = InfraLogEntryTimestampColumn> {
/** The id of the corresponding column configuration */
columnId?: ColumnIdResolver<string, TypeParent, Context>;
/** The timestamp */
timestamp?: TimestampResolver<number, TypeParent, Context>;
}
export type ColumnIdResolver<
R = string,
Parent = InfraLogEntryTimestampColumn,
Context = InfraContext
> = Resolver<R, Parent, Context>;
export type TimestampResolver<
R = number,
Parent = InfraLogEntryTimestampColumn,
Context = InfraContext
> = Resolver<R, Parent, Context>;
}
/** A special built-in column that contains the log entry's constructed message */
export namespace InfraLogEntryMessageColumnResolvers {
export interface Resolvers<Context = InfraContext, TypeParent = InfraLogEntryMessageColumn> {
/** The id of the corresponding column configuration */
columnId?: ColumnIdResolver<string, TypeParent, Context>;
/** A list of the formatted log entry segments */
message?: MessageResolver<InfraLogMessageSegment[], TypeParent, Context>;
}
export type ColumnIdResolver<
R = string,
Parent = InfraLogEntryMessageColumn,
Context = InfraContext
> = Resolver<R, Parent, Context>;
export type MessageResolver<
R = InfraLogMessageSegment[],
Parent = InfraLogEntryMessageColumn,
Context = InfraContext
> = Resolver<R, Parent, Context>;
}
/** A segment of the log entry message that was derived from a field */
export namespace InfraLogMessageFieldSegmentResolvers {
export interface Resolvers<Context = InfraContext, TypeParent = InfraLogMessageFieldSegment> {
/** The field the segment was derived from */
field?: FieldResolver<string, TypeParent, Context>;
/** The segment's message */
value?: ValueResolver<string, TypeParent, Context>;
/** A list of highlighted substrings of the value */
highlights?: HighlightsResolver<string[], TypeParent, Context>;
}
export type FieldResolver<
R = string,
Parent = InfraLogMessageFieldSegment,
Context = InfraContext
> = Resolver<R, Parent, Context>;
export type ValueResolver<
R = string,
Parent = InfraLogMessageFieldSegment,
Context = InfraContext
> = Resolver<R, Parent, Context>;
export type HighlightsResolver<
R = string[],
Parent = InfraLogMessageFieldSegment,
Context = InfraContext
> = Resolver<R, Parent, Context>;
}
/** A segment of the log entry message that was derived from a string literal */
export namespace InfraLogMessageConstantSegmentResolvers {
export interface Resolvers<Context = InfraContext, TypeParent = InfraLogMessageConstantSegment> {
/** The segment's message */
constant?: ConstantResolver<string, TypeParent, Context>;
}
export type ConstantResolver<
R = string,
Parent = InfraLogMessageConstantSegment,
Context = InfraContext
> = Resolver<R, Parent, Context>;
}
/** A column that contains the value of a field of the log entry */
export namespace InfraLogEntryFieldColumnResolvers {
export interface Resolvers<Context = InfraContext, TypeParent = InfraLogEntryFieldColumn> {
/** The id of the corresponding column configuration */
columnId?: ColumnIdResolver<string, TypeParent, Context>;
/** The field name of the column */
field?: FieldResolver<string, TypeParent, Context>;
/** The value of the field in the log entry */
value?: ValueResolver<string, TypeParent, Context>;
/** A list of highlighted substrings of the value */
highlights?: HighlightsResolver<string[], TypeParent, Context>;
}
export type ColumnIdResolver<
R = string,
Parent = InfraLogEntryFieldColumn,
Context = InfraContext
> = Resolver<R, Parent, Context>;
export type FieldResolver<
R = string,
Parent = InfraLogEntryFieldColumn,
Context = InfraContext
> = Resolver<R, Parent, Context>;
export type ValueResolver<
R = string,
Parent = InfraLogEntryFieldColumn,
Context = InfraContext
> = Resolver<R, Parent, Context>;
export type HighlightsResolver<
R = string[],
Parent = InfraLogEntryFieldColumn,
Context = InfraContext
> = Resolver<R, Parent, Context>;
}
export namespace InfraSnapshotResponseResolvers {
export interface Resolvers<Context = InfraContext, TypeParent = InfraSnapshotResponse> {

View file

@ -215,6 +215,7 @@ function mapHitsToLogEntryDocuments(hits: SortedSearchHit[], fields: string[]):
return {
id: hit._id,
index: hit._index,
cursor: { time: hit.sort[0], tiebreaker: hit.sort[1] },
fields: logFields,
highlights: hit.highlight || {},

View file

@ -12,19 +12,19 @@ import {
LogEntriesSummaryHighlightsBucket,
LogEntriesRequest,
} from '../../../../common/http_api';
import { LogEntry, LogColumn } from '../../../../common/log_entry';
import { LogColumn, LogEntryCursor, LogEntry } from '../../../../common/log_entry';
import {
InfraSourceConfiguration,
InfraSources,
SavedSourceConfigurationFieldColumnRuntimeType,
} from '../../sources';
import { getBuiltinRules } from './builtin_rules';
import { getBuiltinRules } from '../../../services/log_entries/message/builtin_rules';
import {
CompiledLogMessageFormattingRule,
Fields,
Highlights,
compileFormattingRules,
} from './message';
} from '../../../services/log_entries/message/message';
import { KibanaFramework } from '../../adapters/framework/kibana_framework_adapter';
import { decodeOrThrow } from '../../../../common/runtime_types';
import {
@ -33,7 +33,6 @@ import {
CompositeDatasetKey,
createLogEntryDatasetsQuery,
} from './queries/log_entry_datasets';
import { LogEntryCursor } from '../../../../common/log_entry';
export interface LogEntriesParams {
startTimestamp: number;
@ -156,6 +155,7 @@ export class InfraLogEntriesDomain {
const entries = documents.map((doc) => {
return {
id: doc.id,
index: doc.index,
cursor: doc.cursor,
columns: columnDefinitions.map(
(column): LogColumn => {
@ -317,6 +317,7 @@ export type LogEntryQuery = JsonObject;
export interface LogEntryDocument {
id: string;
index: string;
fields: Fields;
highlights: Highlights;
cursor: LogEntryCursor;

View file

@ -5,7 +5,6 @@
*/
import type { ILegacyScopedClusterClient } from 'src/core/server';
import { LogEntryContext } from '../../../common/log_entry';
import {
compareDatasetsByMaximumAnomalyScore,
getJobId,
@ -13,6 +12,7 @@ import {
logEntryCategoriesJobTypes,
CategoriesSort,
} from '../../../common/log_analysis';
import { LogEntryContext } from '../../../common/log_entry';
import { startTracingSpan } from '../../../common/performance_tracing';
import { decodeOrThrow } from '../../../common/runtime_types';
import type { MlAnomalyDetectors, MlSystem } from '../../types';

View file

@ -0,0 +1,318 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { ResponseError } from '@elastic/elasticsearch/lib/errors';
import { of, throwError } from 'rxjs';
import {
elasticsearchServiceMock,
savedObjectsClientMock,
uiSettingsServiceMock,
} from 'src/core/server/mocks';
import {
IEsSearchRequest,
IEsSearchResponse,
ISearchStrategy,
SearchStrategyDependencies,
} from 'src/plugins/data/server';
import { InfraSource } from '../../lib/sources';
import { createInfraSourcesMock } from '../../lib/sources/mocks';
import {
logEntriesSearchRequestStateRT,
logEntriesSearchStrategyProvider,
} from './log_entries_search_strategy';
describe('LogEntries search strategy', () => {
it('handles initial search requests', async () => {
const esSearchStrategyMock = createEsSearchStrategyMock({
id: 'ASYNC_REQUEST_ID',
isRunning: true,
rawResponse: {
took: 0,
_shards: { total: 1, failed: 0, skipped: 0, successful: 0 },
timed_out: false,
hits: { total: 0, max_score: 0, hits: [] },
},
});
const dataMock = createDataPluginMock(esSearchStrategyMock);
const sourcesMock = createInfraSourcesMock();
sourcesMock.getSourceConfiguration.mockResolvedValue(createSourceConfigurationMock());
const mockDependencies = createSearchStrategyDependenciesMock();
const logEntriesSearchStrategy = logEntriesSearchStrategyProvider({
data: dataMock,
sources: sourcesMock,
});
const response = await logEntriesSearchStrategy
.search(
{
params: {
sourceId: 'SOURCE_ID',
startTimestamp: 100,
endTimestamp: 200,
size: 3,
},
},
{},
mockDependencies
)
.toPromise();
expect(sourcesMock.getSourceConfiguration).toHaveBeenCalled();
expect(esSearchStrategyMock.search).toHaveBeenCalledWith(
expect.objectContaining({
params: expect.objectContaining({
index: 'log-indices-*',
body: expect.objectContaining({
fields: expect.arrayContaining(['event.dataset', 'message']),
}),
}),
}),
expect.anything(),
expect.anything()
);
expect(response.id).toEqual(expect.any(String));
expect(response.isRunning).toBe(true);
});
it('handles subsequent polling requests', async () => {
const esSearchStrategyMock = createEsSearchStrategyMock({
id: 'ASYNC_REQUEST_ID',
isRunning: false,
rawResponse: {
took: 1,
_shards: { total: 1, failed: 0, skipped: 0, successful: 1 },
timed_out: false,
hits: {
total: 0,
max_score: 0,
hits: [
{
_id: 'HIT_ID',
_index: 'HIT_INDEX',
_type: '_doc',
_score: 0,
_source: null,
fields: {
'@timestamp': [1605116827143],
'event.dataset': ['HIT_DATASET'],
MESSAGE_FIELD: ['HIT_MESSAGE'],
'container.id': ['HIT_CONTAINER_ID'],
},
sort: [1605116827143 as any, 1 as any], // incorrectly typed as string upstream
},
],
},
},
});
const dataMock = createDataPluginMock(esSearchStrategyMock);
const sourcesMock = createInfraSourcesMock();
sourcesMock.getSourceConfiguration.mockResolvedValue(createSourceConfigurationMock());
const mockDependencies = createSearchStrategyDependenciesMock();
const logEntriesSearchStrategy = logEntriesSearchStrategyProvider({
data: dataMock,
sources: sourcesMock,
});
const requestId = logEntriesSearchRequestStateRT.encode({
esRequestId: 'ASYNC_REQUEST_ID',
});
const response = await logEntriesSearchStrategy
.search(
{
id: requestId,
params: {
sourceId: 'SOURCE_ID',
startTimestamp: 100,
endTimestamp: 200,
size: 3,
},
},
{},
mockDependencies
)
.toPromise();
expect(sourcesMock.getSourceConfiguration).toHaveBeenCalled();
expect(esSearchStrategyMock.search).toHaveBeenCalled();
expect(response.id).toEqual(requestId);
expect(response.isRunning).toBe(false);
expect(response.rawResponse.data.entries).toEqual([
{
id: 'HIT_ID',
index: 'HIT_INDEX',
cursor: {
time: 1605116827143,
tiebreaker: 1,
},
columns: [
{
columnId: 'TIMESTAMP_COLUMN_ID',
timestamp: 1605116827143,
},
{
columnId: 'DATASET_COLUMN_ID',
field: 'event.dataset',
value: ['HIT_DATASET'],
highlights: [],
},
{
columnId: 'MESSAGE_COLUMN_ID',
message: [
{
field: 'MESSAGE_FIELD',
value: ['HIT_MESSAGE'],
highlights: [],
},
],
},
],
context: {
'container.id': 'HIT_CONTAINER_ID',
},
},
]);
});
it('forwards errors from the underlying search strategy', async () => {
const esSearchStrategyMock = createEsSearchStrategyMock({
id: 'ASYNC_REQUEST_ID',
isRunning: false,
rawResponse: {
took: 1,
_shards: { total: 1, failed: 0, skipped: 0, successful: 1 },
timed_out: false,
hits: { total: 0, max_score: 0, hits: [] },
},
});
const dataMock = createDataPluginMock(esSearchStrategyMock);
const sourcesMock = createInfraSourcesMock();
sourcesMock.getSourceConfiguration.mockResolvedValue(createSourceConfigurationMock());
const mockDependencies = createSearchStrategyDependenciesMock();
const logEntriesSearchStrategy = logEntriesSearchStrategyProvider({
data: dataMock,
sources: sourcesMock,
});
const response = logEntriesSearchStrategy.search(
{
id: logEntriesSearchRequestStateRT.encode({ esRequestId: 'UNKNOWN_ID' }),
params: {
sourceId: 'SOURCE_ID',
startTimestamp: 100,
endTimestamp: 200,
size: 3,
},
},
{},
mockDependencies
);
await expect(response.toPromise()).rejects.toThrowError(ResponseError);
});
it('forwards cancellation to the underlying search strategy', async () => {
const esSearchStrategyMock = createEsSearchStrategyMock({
id: 'ASYNC_REQUEST_ID',
isRunning: false,
rawResponse: {
took: 1,
_shards: { total: 1, failed: 0, skipped: 0, successful: 1 },
timed_out: false,
hits: { total: 0, max_score: 0, hits: [] },
},
});
const dataMock = createDataPluginMock(esSearchStrategyMock);
const sourcesMock = createInfraSourcesMock();
sourcesMock.getSourceConfiguration.mockResolvedValue(createSourceConfigurationMock());
const mockDependencies = createSearchStrategyDependenciesMock();
const logEntriesSearchStrategy = logEntriesSearchStrategyProvider({
data: dataMock,
sources: sourcesMock,
});
const requestId = logEntriesSearchRequestStateRT.encode({
esRequestId: 'ASYNC_REQUEST_ID',
});
await logEntriesSearchStrategy.cancel?.(requestId, {}, mockDependencies);
expect(esSearchStrategyMock.cancel).toHaveBeenCalled();
});
});
const createSourceConfigurationMock = (): InfraSource => ({
id: 'SOURCE_ID',
origin: 'stored' as const,
configuration: {
name: 'SOURCE_NAME',
description: 'SOURCE_DESCRIPTION',
logAlias: 'log-indices-*',
metricAlias: 'metric-indices-*',
inventoryDefaultView: 'DEFAULT_VIEW',
metricsExplorerDefaultView: 'DEFAULT_VIEW',
logColumns: [
{ timestampColumn: { id: 'TIMESTAMP_COLUMN_ID' } },
{
fieldColumn: {
id: 'DATASET_COLUMN_ID',
field: 'event.dataset',
},
},
{
messageColumn: { id: 'MESSAGE_COLUMN_ID' },
},
],
fields: {
pod: 'POD_FIELD',
host: 'HOST_FIELD',
container: 'CONTAINER_FIELD',
message: ['MESSAGE_FIELD'],
timestamp: 'TIMESTAMP_FIELD',
tiebreaker: 'TIEBREAKER_FIELD',
},
},
});
const createEsSearchStrategyMock = (esSearchResponse: IEsSearchResponse) => ({
search: jest.fn((esSearchRequest: IEsSearchRequest) => {
if (typeof esSearchRequest.id === 'string') {
if (esSearchRequest.id === esSearchResponse.id) {
return of(esSearchResponse);
} else {
return throwError(
new ResponseError({
body: {},
headers: {},
meta: {} as any,
statusCode: 404,
warnings: [],
})
);
}
} else {
return of(esSearchResponse);
}
}),
cancel: jest.fn().mockResolvedValue(undefined),
});
const createSearchStrategyDependenciesMock = (): SearchStrategyDependencies => ({
uiSettingsClient: uiSettingsServiceMock.createClient(),
esClient: elasticsearchServiceMock.createScopedClusterClient(),
savedObjectsClient: savedObjectsClientMock.create(),
});
// using the official data mock from within x-pack doesn't type-check successfully,
// because the `licensing` plugin modifies the `RequestHandlerContext` core type.
const createDataPluginMock = (esSearchStrategyMock: ISearchStrategy): any => ({
search: {
getSearchStrategy: jest.fn().mockReturnValue(esSearchStrategyMock),
},
});

View file

@ -0,0 +1,245 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { pick } from '@kbn/std';
import * as rt from 'io-ts';
import { combineLatest, concat, defer, forkJoin, of } from 'rxjs';
import { concatMap, filter, map, shareReplay, take } from 'rxjs/operators';
import type {
IEsSearchRequest,
IKibanaSearchRequest,
IKibanaSearchResponse,
} from '../../../../../../src/plugins/data/common';
import type {
ISearchStrategy,
PluginStart as DataPluginStart,
} from '../../../../../../src/plugins/data/server';
import { LogSourceColumnConfiguration } from '../../../common/http_api/log_sources';
import {
getLogEntryCursorFromHit,
LogColumn,
LogEntry,
LogEntryAfterCursor,
logEntryAfterCursorRT,
LogEntryBeforeCursor,
logEntryBeforeCursorRT,
LogEntryContext,
} from '../../../common/log_entry';
import { decodeOrThrow } from '../../../common/runtime_types';
import {
LogEntriesSearchRequestParams,
logEntriesSearchRequestParamsRT,
LogEntriesSearchResponsePayload,
logEntriesSearchResponsePayloadRT,
} from '../../../common/search_strategies/log_entries/log_entries';
import type { IInfraSources } from '../../lib/sources';
import {
createAsyncRequestRTs,
createErrorFromShardFailure,
jsonFromBase64StringRT,
} from '../../utils/typed_search_strategy';
import {
CompiledLogMessageFormattingRule,
compileFormattingRules,
getBuiltinRules,
} from './message';
import {
createGetLogEntriesQuery,
getLogEntriesResponseRT,
getSortDirection,
LogEntryHit,
} from './queries/log_entries';
type LogEntriesSearchRequest = IKibanaSearchRequest<LogEntriesSearchRequestParams>;
type LogEntriesSearchResponse = IKibanaSearchResponse<LogEntriesSearchResponsePayload>;
export const logEntriesSearchStrategyProvider = ({
data,
sources,
}: {
data: DataPluginStart;
sources: IInfraSources;
}): ISearchStrategy<LogEntriesSearchRequest, LogEntriesSearchResponse> => {
const esSearchStrategy = data.search.getSearchStrategy('ese');
return {
search: (rawRequest, options, dependencies) =>
defer(() => {
const request = decodeOrThrow(asyncRequestRT)(rawRequest);
const sourceConfiguration$ = defer(() =>
sources.getSourceConfiguration(dependencies.savedObjectsClient, request.params.sourceId)
).pipe(take(1), shareReplay(1));
const messageFormattingRules$ = defer(() =>
sourceConfiguration$.pipe(
map(({ configuration }) =>
compileFormattingRules(getBuiltinRules(configuration.fields.message))
)
)
).pipe(take(1), shareReplay(1));
const recoveredRequest$ = of(request).pipe(
filter(asyncRecoveredRequestRT.is),
map(({ id: { esRequestId } }) => ({ id: esRequestId }))
);
const initialRequest$ = of(request).pipe(
filter(asyncInitialRequestRT.is),
concatMap(({ params }) =>
forkJoin([sourceConfiguration$, messageFormattingRules$]).pipe(
map(
([{ configuration }, messageFormattingRules]): IEsSearchRequest => {
return {
params: createGetLogEntriesQuery(
configuration.logAlias,
params.startTimestamp,
params.endTimestamp,
pickRequestCursor(params),
params.size + 1,
configuration.fields.timestamp,
configuration.fields.tiebreaker,
messageFormattingRules.requiredFields,
params.query,
params.highlightPhrase
),
};
}
)
)
)
);
const searchResponse$ = concat(recoveredRequest$, initialRequest$).pipe(
take(1),
concatMap((esRequest) => esSearchStrategy.search(esRequest, options, dependencies))
);
return combineLatest([searchResponse$, sourceConfiguration$, messageFormattingRules$]).pipe(
map(([esResponse, { configuration }, messageFormattingRules]) => {
const rawResponse = decodeOrThrow(getLogEntriesResponseRT)(esResponse.rawResponse);
const entries = rawResponse.hits.hits
.slice(0, request.params.size)
.map(getLogEntryFromHit(configuration.logColumns, messageFormattingRules));
const sortDirection = getSortDirection(pickRequestCursor(request.params));
if (sortDirection === 'desc') {
entries.reverse();
}
const hasMore = rawResponse.hits.hits.length > entries.length;
const hasMoreBefore = sortDirection === 'desc' ? hasMore : undefined;
const hasMoreAfter = sortDirection === 'asc' ? hasMore : undefined;
const { topCursor, bottomCursor } = getResponseCursors(entries);
const errors = (rawResponse._shards.failures ?? []).map(createErrorFromShardFailure);
return {
...esResponse,
...(esResponse.id
? { id: logEntriesSearchRequestStateRT.encode({ esRequestId: esResponse.id }) }
: {}),
rawResponse: logEntriesSearchResponsePayloadRT.encode({
data: { entries, topCursor, bottomCursor, hasMoreBefore, hasMoreAfter },
errors,
}),
};
})
);
}),
cancel: async (id, options, dependencies) => {
const { esRequestId } = decodeOrThrow(logEntriesSearchRequestStateRT)(id);
return await esSearchStrategy.cancel?.(esRequestId, options, dependencies);
},
};
};
// exported for tests
export const logEntriesSearchRequestStateRT = rt.string.pipe(jsonFromBase64StringRT).pipe(
rt.type({
esRequestId: rt.string,
})
);
const { asyncInitialRequestRT, asyncRecoveredRequestRT, asyncRequestRT } = createAsyncRequestRTs(
logEntriesSearchRequestStateRT,
logEntriesSearchRequestParamsRT
);
const getLogEntryFromHit = (
columnDefinitions: LogSourceColumnConfiguration[],
messageFormattingRules: CompiledLogMessageFormattingRule
) => (hit: LogEntryHit): LogEntry => {
const cursor = getLogEntryCursorFromHit(hit);
return {
id: hit._id,
index: hit._index,
cursor,
columns: columnDefinitions.map(
(column): LogColumn => {
if ('timestampColumn' in column) {
return {
columnId: column.timestampColumn.id,
timestamp: cursor.time,
};
} else if ('messageColumn' in column) {
return {
columnId: column.messageColumn.id,
message: messageFormattingRules.format(hit.fields, hit.highlight || {}),
};
} else {
return {
columnId: column.fieldColumn.id,
field: column.fieldColumn.field,
value: hit.fields[column.fieldColumn.field] ?? [],
highlights: hit.highlight?.[column.fieldColumn.field] ?? [],
};
}
}
),
context: getContextFromHit(hit),
};
};
const pickRequestCursor = (
params: LogEntriesSearchRequestParams
): LogEntryAfterCursor | LogEntryBeforeCursor | null => {
if (logEntryAfterCursorRT.is(params)) {
return pick(params, ['after']);
} else if (logEntryBeforeCursorRT.is(params)) {
return pick(params, ['before']);
}
return null;
};
const getContextFromHit = (hit: LogEntryHit): LogEntryContext => {
// Get all context fields, then test for the presence and type of the ones that go together
const containerId = hit.fields['container.id']?.[0];
const hostName = hit.fields['host.name']?.[0];
const logFilePath = hit.fields['log.file.path']?.[0];
if (typeof containerId === 'string') {
return { 'container.id': containerId };
}
if (typeof hostName === 'string' && typeof logFilePath === 'string') {
return { 'host.name': hostName, 'log.file.path': logFilePath };
}
return {};
};
function getResponseCursors(entries: LogEntry[]) {
const hasEntries = entries.length > 0;
const topCursor = hasEntries ? entries[0].cursor : null;
const bottomCursor = hasEntries ? entries[entries.length - 1].cursor : null;
return { topCursor, bottomCursor };
}

View file

@ -6,12 +6,18 @@
import { CoreSetup } from 'src/core/server';
import { LOG_ENTRY_SEARCH_STRATEGY } from '../../../common/search_strategies/log_entries/log_entry';
import { LOG_ENTRIES_SEARCH_STRATEGY } from '../../../common/search_strategies/log_entries/log_entries';
import { logEntriesSearchStrategyProvider } from './log_entries_search_strategy';
import { logEntrySearchStrategyProvider } from './log_entry_search_strategy';
import { LogEntriesServiceSetupDeps, LogEntriesServiceStartDeps } from './types';
export class LogEntriesService {
public setup(core: CoreSetup<LogEntriesServiceStartDeps>, setupDeps: LogEntriesServiceSetupDeps) {
core.getStartServices().then(([, startDeps]) => {
setupDeps.data.search.registerSearchStrategy(
LOG_ENTRIES_SEARCH_STRATEGY,
logEntriesSearchStrategyProvider({ ...setupDeps, ...startDeps })
);
setupDeps.data.search.registerSearchStrategy(
LOG_ENTRY_SEARCH_STRATEGY,
logEntrySearchStrategyProvider({ ...setupDeps, ...startDeps })

View file

@ -121,7 +121,7 @@ describe('LogEntry search strategy', () => {
expect(response.rawResponse.data).toEqual({
id: 'HIT_ID',
index: 'HIT_INDEX',
key: {
cursor: {
time: 1605116827143,
tiebreaker: 1,
},

View file

@ -119,6 +119,6 @@ const { asyncInitialRequestRT, asyncRecoveredRequestRT, asyncRequestRT } = creat
const createLogEntryFromHit = (hit: LogEntryHit) => ({
id: hit._id,
index: hit._index,
key: getLogEntryCursorFromHit(hit),
cursor: getLogEntryCursorFromHit(hit),
fields: Object.entries(hit.fields).map(([field, value]) => ({ field, value })),
});

View file

@ -0,0 +1,8 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export * from './message';
export { getBuiltinRules } from './builtin_rules';

View file

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export const createSortClause = (
sortDirection: 'asc' | 'desc',
timestampField: string,
tiebreakerField: string
) => ({
sort: {
[timestampField]: sortDirection,
[tiebreakerField]: sortDirection,
},
});
export const createTimeRangeFilterClauses = (
startTimestamp: number,
endTimestamp: number,
timestampField: string
) => [
{
range: {
[timestampField]: {
gte: startTimestamp,
lte: endTimestamp,
format: 'epoch_millis',
},
},
},
];

View file

@ -0,0 +1,141 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import type { RequestParams } from '@elastic/elasticsearch';
import * as rt from 'io-ts';
import {
LogEntryAfterCursor,
logEntryAfterCursorRT,
LogEntryBeforeCursor,
logEntryBeforeCursorRT,
} from '../../../../common/log_entry';
import { jsonArrayRT, JsonObject } from '../../../../common/typed_json';
import {
commonHitFieldsRT,
commonSearchSuccessResponseFieldsRT,
} from '../../../utils/elasticsearch_runtime_types';
import { createSortClause, createTimeRangeFilterClauses } from './common';
export const createGetLogEntriesQuery = (
logEntriesIndex: string,
startTimestamp: number,
endTimestamp: number,
cursor: LogEntryBeforeCursor | LogEntryAfterCursor | null | undefined,
size: number,
timestampField: string,
tiebreakerField: string,
fields: string[],
query?: JsonObject,
highlightTerm?: string
): RequestParams.AsyncSearchSubmit<Record<string, any>> => {
const sortDirection = getSortDirection(cursor);
const highlightQuery = createHighlightQuery(highlightTerm, fields);
return {
index: logEntriesIndex,
allow_no_indices: true,
track_scores: false,
track_total_hits: false,
body: {
size,
query: {
bool: {
filter: [
...(query ? [query] : []),
...(highlightQuery ? [highlightQuery] : []),
...createTimeRangeFilterClauses(startTimestamp, endTimestamp, timestampField),
],
},
},
fields,
_source: false,
...createSortClause(sortDirection, timestampField, tiebreakerField),
...createSearchAfterClause(cursor),
...createHighlightClause(highlightQuery, fields),
},
};
};
export const getSortDirection = (
cursor: LogEntryBeforeCursor | LogEntryAfterCursor | null | undefined
): 'asc' | 'desc' => (logEntryBeforeCursorRT.is(cursor) ? 'desc' : 'asc');
const createSearchAfterClause = (
cursor: LogEntryBeforeCursor | LogEntryAfterCursor | null | undefined
): { search_after?: [number, number] } => {
if (logEntryBeforeCursorRT.is(cursor) && cursor.before !== 'last') {
return {
search_after: [cursor.before.time, cursor.before.tiebreaker],
};
} else if (logEntryAfterCursorRT.is(cursor) && cursor.after !== 'first') {
return {
search_after: [cursor.after.time, cursor.after.tiebreaker],
};
}
return {};
};
const createHighlightClause = (highlightQuery: JsonObject | undefined, fields: string[]) =>
highlightQuery
? {
highlight: {
boundary_scanner: 'word',
fields: fields.reduce(
(highlightFieldConfigs, fieldName) => ({
...highlightFieldConfigs,
[fieldName]: {},
}),
{}
),
fragment_size: 1,
number_of_fragments: 100,
post_tags: [''],
pre_tags: [''],
highlight_query: highlightQuery,
},
}
: {};
const createHighlightQuery = (
highlightTerm: string | undefined,
fields: string[]
): JsonObject | undefined => {
if (highlightTerm) {
return {
multi_match: {
fields,
lenient: true,
query: highlightTerm,
type: 'phrase',
},
};
}
};
export const logEntryHitRT = rt.intersection([
commonHitFieldsRT,
rt.type({
fields: rt.record(rt.string, jsonArrayRT),
sort: rt.tuple([rt.number, rt.number]),
}),
rt.partial({
highlight: rt.record(rt.string, rt.array(rt.string)),
}),
]);
export type LogEntryHit = rt.TypeOf<typeof logEntryHitRT>;
export const getLogEntriesResponseRT = rt.intersection([
commonSearchSuccessResponseFieldsRT,
rt.type({
hits: rt.type({
hits: rt.array(logEntryHitRT),
}),
}),
]);
export type GetLogEntriesResponse = rt.TypeOf<typeof getLogEntriesResponseRT>;

View file

@ -6,21 +6,17 @@
import expect from '@kbn/expect';
import { v4 as uuidv4 } from 'uuid';
import { decodeOrThrow } from '../../../../plugins/infra/common/runtime_types';
import {
LOG_ENTRIES_PATH,
logEntriesRequestRT,
logEntriesResponseRT,
} from '../../../../plugins/infra/common/http_api';
import {
LogTimestampColumn,
LogFieldColumn,
LogMessageColumn,
LogTimestampColumn,
} from '../../../../plugins/infra/common/log_entry';
import { decodeOrThrow } from '../../../../plugins/infra/common/runtime_types';
import { FtrProviderContext } from '../../ftr_provider_context';
const KEY_WITHIN_DATA_RANGE = {