[ML] File upload doc count chart (#173210)

Adds a doc count chart to the file upload which shows the progress of
doc ingestion.

Adds a new endpoint to the file upload plugin with is used to predict
the full time range of the index based on the first and last docs read
from the file. This allows us to draw the full time range on the chart
and see the docs appearing as the progress increases.


f45565bf-e25d-4e46-9dfd-3def72ddb63d

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
James Gowdy 2024-01-22 13:25:15 +00:00 committed by GitHub
parent 64d7ded191
commit 8d353a6dd0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 1063 additions and 86 deletions

View file

@ -17,7 +17,8 @@
"uiActions",
"charts",
"unifiedSearch",
"savedSearch"
"savedSearch",
"fieldFormats"
],
"optionalPlugins": [
"security",

View file

@ -209,6 +209,7 @@ export const ResultsLinks: FC<Props> = ({
{createDataView && discoverLink && (
<EuiFlexItem>
<EuiCard
hasBorder
icon={<EuiIcon size="xxl" type={`discoverApp`} />}
title={
<FormattedMessage
@ -225,6 +226,7 @@ export const ResultsLinks: FC<Props> = ({
{indexManagementLink && (
<EuiFlexItem>
<EuiCard
hasBorder
icon={<EuiIcon size="xxl" type={`managementApp`} />}
title={
<FormattedMessage
@ -241,6 +243,7 @@ export const ResultsLinks: FC<Props> = ({
{dataViewsManagementLink && (
<EuiFlexItem>
<EuiCard
hasBorder
icon={<EuiIcon size="xxl" type={`managementApp`} />}
title={
<FormattedMessage
@ -255,6 +258,7 @@ export const ResultsLinks: FC<Props> = ({
)}
<EuiFlexItem>
<EuiCard
hasBorder
icon={<EuiIcon size="xxl" type={`filebeatApp`} />}
data-test-subj="fileDataVisFilebeatConfigLink"
title={
@ -271,6 +275,7 @@ export const ResultsLinks: FC<Props> = ({
asyncHrefCards.map((link) => (
<EuiFlexItem key={link.title}>
<EuiCard
hasBorder
icon={<EuiIcon size="xxl" type={link.icon} />}
data-test-subj="fileDataVisLink"
title={link.title}

View file

@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { FC } from 'react';
import { Axis, Position } from '@elastic/charts';
import { MULTILAYER_TIME_AXIS_STYLE } from '@kbn/charts-plugin/common';
import type { LineChartPoint } from './event_rate_chart';
import { useDataVisualizerKibana } from '../../../kibana_context';
interface Props {
chartData?: LineChartPoint[];
}
// round to 2dp
function tickFormatter(d: number): string {
return (Math.round(d * 100) / 100).toString();
}
export const Axes: FC<Props> = ({ chartData }) => {
const yDomain = getYRange(chartData);
const {
services: { fieldFormats, uiSettings },
} = useDataVisualizerKibana();
const useLegacyTimeAxis = uiSettings.get('visualization:useLegacyTimeAxis', false);
const xAxisFormatter = fieldFormats.deserialize({ id: 'date' });
return (
<>
<Axis
id="bottom"
position={Position.Bottom}
showOverlappingTicks={true}
tickFormat={(value) => xAxisFormatter.convert(value)}
labelFormat={useLegacyTimeAxis ? undefined : () => ''}
timeAxisLayerCount={useLegacyTimeAxis ? 0 : 2}
style={useLegacyTimeAxis ? {} : MULTILAYER_TIME_AXIS_STYLE}
/>
<Axis id="left" position={Position.Left} tickFormat={tickFormatter} domain={yDomain} />
</>
);
};
function getYRange(chartData?: LineChartPoint[]) {
const fit = false;
if (chartData === undefined) {
return { fit, min: NaN, max: NaN };
}
if (chartData.length === 0) {
return { min: 0, max: 0, fit };
}
let max: number = Number.MIN_VALUE;
let min: number = Number.MAX_VALUE;
chartData.forEach((r) => {
max = Math.max(r.value, max);
min = Math.min(r.value, min);
});
const padding = (max - min) * 0.1;
max += padding;
min -= padding;
return {
min,
max,
fit,
};
}

View file

@ -0,0 +1,211 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { FC, useEffect, useState, useCallback, useRef, useMemo } from 'react';
import type { DataPublicPluginStart } from '@kbn/data-plugin/public';
import type { IImporter } from '@kbn/file-upload-plugin/public';
import moment, { type Moment } from 'moment';
import { useTimeBuckets } from '../../../common/hooks/use_time_buckets';
import { IMPORT_STATUS, type Statuses } from '../import_progress';
import { EventRateChart, type LineChartPoint } from './event_rate_chart';
import { runDocCountSearch } from './doc_count_search';
const BAR_TARGET = 150;
const PROGRESS_INCREMENT = 5;
const FINISHED_CHECKS = 3;
const ERROR_ATTEMPTS = 3;
const BACK_FILL_BUCKETS = 8;
export const DocCountChart: FC<{
statuses: Statuses;
dataStart: DataPublicPluginStart;
importer: IImporter;
}> = ({ statuses, dataStart, importer }) => {
const timeBuckets = useTimeBuckets();
const index = useMemo(() => importer.getIndex(), [importer]);
const timeField = useMemo(() => importer.getTimeField(), [importer]);
const [loading, setLoading] = useState(false);
const [loadingTimeRange, setLoadingTimeRange] = useState(false);
const [finished, setFinished] = useState(false);
const [previousProgress, setPreviousProgress] = useState(0);
const [lastNonZeroTimeMs, setLastNonZeroTimeMs] = useState<
{ index: number; time: number } | undefined
>(undefined);
const [eventRateChartData, setEventRateChartData] = useState<LineChartPoint[]>([]);
const [timeRange, setTimeRange] = useState<{ start: Moment; end: Moment } | undefined>(undefined);
const loadFullData = useRef(false);
const [errorAttempts, setErrorAttempts] = useState(ERROR_ATTEMPTS);
const recordFailure = useCallback(() => {
setErrorAttempts(errorAttempts - 1);
}, [errorAttempts]);
const loadData = useCallback(async () => {
if (timeField === undefined || index === undefined || timeRange === undefined) {
return;
}
setLoading(true);
timeBuckets.setInterval('auto');
const { start, end } = timeRange;
const fullData = loadFullData.current;
try {
const startMs =
fullData === true || lastNonZeroTimeMs === undefined
? start.valueOf()
: lastNonZeroTimeMs.time;
const endMs = end.valueOf();
if (start != null && end != null) {
timeBuckets.setBounds({
min: start,
max: end,
});
timeBuckets.setBarTarget(BAR_TARGET);
}
const data = await runDocCountSearch(
dataStart,
index,
timeField,
startMs,
endMs,
timeBuckets
);
const newData =
fullData === true
? data
: [...eventRateChartData].splice(0, lastNonZeroTimeMs?.index ?? 0).concat(data);
setEventRateChartData(newData);
setLastNonZeroTimeMs(findLastTimestamp(newData, BACK_FILL_BUCKETS));
} catch (error) {
recordFailure();
}
setLoading(false);
}, [
timeField,
index,
timeRange,
timeBuckets,
lastNonZeroTimeMs,
dataStart,
eventRateChartData,
recordFailure,
]);
const finishedChecks = useCallback(
async (counter: number) => {
loadData();
if (counter !== 0) {
setTimeout(() => {
finishedChecks(counter - 1);
}, 2 * 1000);
}
},
[loadData]
);
const loadTimeRange = useCallback(async () => {
if (loadingTimeRange === true) {
return;
}
setLoadingTimeRange(true);
try {
const { start, end } = await importer.previewIndexTimeRange();
if (start === null || end === null || start >= end) {
throw new Error('Invalid time range');
}
setTimeRange({ start: moment(start), end: moment(end) });
} catch (error) {
recordFailure();
}
setLoadingTimeRange(false);
}, [importer, loadingTimeRange, recordFailure]);
useEffect(
function loadProgress() {
if (errorAttempts === 0) {
return;
}
if (timeRange === undefined) {
loadTimeRange();
return;
}
if (loading === false && statuses.uploadProgress > 1 && statuses.uploadProgress < 100) {
if (statuses.uploadProgress - previousProgress > PROGRESS_INCREMENT) {
setPreviousProgress(statuses.uploadProgress);
loadData();
}
} else if (loading === false && statuses.uploadProgress === 100 && finished === false) {
setFinished(true);
finishedChecks(FINISHED_CHECKS);
loadFullData.current = true;
}
},
[
finished,
finishedChecks,
loadData,
loadTimeRange,
loading,
loadingTimeRange,
previousProgress,
statuses,
timeRange,
errorAttempts,
]
);
if (
timeField === undefined ||
statuses.indexCreatedStatus === IMPORT_STATUS.INCOMPLETE ||
statuses.ingestPipelineCreatedStatus === IMPORT_STATUS.INCOMPLETE ||
errorAttempts === 0 ||
eventRateChartData.length === 0
) {
return null;
}
return (
<>
<EventRateChart eventRateChartData={eventRateChartData} height={'150px'} width={'100%'} />
</>
);
};
/**
* Finds the last non-zero data point in the chart data
* backFillOffset can be set to jump back a number of buckets from the final non-zero bucket.
* This means the next time we load data, refresh the last n buckets of data in case there are new documents.
* @param data LineChartPoint[]
* @param backFillOffset number
* @returns
*/
function findLastTimestamp(data: LineChartPoint[], backFillOffset = 0) {
let lastNonZeroDataPoint = data[0].time;
let index = 0;
for (let i = 0; i < data.length; i++) {
if (data[i].value > 0) {
const backTrackIndex = i - backFillOffset >= 0 ? i - backFillOffset : i;
lastNonZeroDataPoint = data[backTrackIndex].time;
index = backTrackIndex;
} else {
break;
}
}
return { index, time: lastNonZeroDataPoint as number };
}

View file

@ -0,0 +1,84 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { lastValueFrom } from 'rxjs';
import type { DataPublicPluginStart, IKibanaSearchResponse } from '@kbn/data-plugin/public';
import type { LineChartPoint } from './event_rate_chart';
import type { TimeBuckets } from '../../../../../common/services/time_buckets';
type EventRateResponse = IKibanaSearchResponse<
estypes.SearchResponse<
unknown,
{
eventRate: {
buckets: Array<{ key: number; doc_count: number }>;
};
}
>
>;
export async function runDocCountSearch(
dataStart: DataPublicPluginStart,
index: string,
timeField: string,
earliestMs: number,
latestMs: number,
timeBuckets: TimeBuckets
): Promise<LineChartPoint[]> {
const intervalMs = timeBuckets.getInterval().asMilliseconds();
const resp = await lastValueFrom(
dataStart.search.search<any, EventRateResponse>({
params: {
index,
body: {
size: 0,
query: {
bool: {
must: [
{
range: {
[timeField]: {
gte: earliestMs,
lte: latestMs,
format: 'epoch_millis',
},
},
},
{
match_all: {},
},
],
},
},
aggs: {
eventRate: {
date_histogram: {
field: timeField,
fixed_interval: `${intervalMs}ms`,
min_doc_count: 0,
extended_bounds: {
min: earliestMs,
max: latestMs,
},
},
},
},
},
},
})
);
if (resp.rawResponse.aggregations === undefined) {
return [];
}
return resp.rawResponse.aggregations.eventRate.buckets.map((b) => ({
time: b.key,
value: b.doc_count,
}));
}

View file

@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import React, { FC } from 'react';
import {
HistogramBarSeries,
Chart,
ScaleType,
Settings,
PartialTheme,
Tooltip,
TooltipType,
} from '@elastic/charts';
import { i18n } from '@kbn/i18n';
import { euiLightVars } from '@kbn/ui-theme';
import { Axes } from './axes';
import { useCurrentEuiTheme } from '../../../common/hooks/use_current_eui_theme';
export interface LineChartPoint {
time: number | string;
value: number;
}
interface Props {
eventRateChartData: LineChartPoint[];
height: string;
width: string;
}
export const EventRateChart: FC<Props> = ({ eventRateChartData, height, width }) => {
const { euiColorLightShade } = useCurrentEuiTheme();
const theme: PartialTheme = {
scales: { histogramPadding: 0.2 },
background: {
color: 'transparent',
},
axes: {
gridLine: {
horizontal: {
stroke: euiColorLightShade,
},
vertical: {
stroke: euiColorLightShade,
},
},
},
};
return (
<div
style={{ width, height }}
data-test-subj={`dataVisualizerEventRateChart ${
eventRateChartData.length ? 'withData' : 'empty'
}`}
>
<Chart>
<Axes />
<Tooltip type={TooltipType.None} />
<Settings theme={theme} locale={i18n.getLocale()} />
<HistogramBarSeries
id="event_rate"
xScaleType={ScaleType.Time}
yScaleType={ScaleType.Linear}
xAccessor={'time'}
yAccessors={['value']}
data={eventRateChartData}
color={euiLightVars.euiColorVis0}
/>
</Chart>
</div>
);
};

View file

@ -0,0 +1,8 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { DocCountChart } from './doc_count_chart';

View file

@ -342,6 +342,7 @@ export class FileDataVisualizerView extends Component {
fileContents={fileContents}
data={data}
dataViewsContract={this.props.dataViewsContract}
dataStart={this.props.dataStart}
fileUpload={this.props.fileUpload}
getAdditionalLinks={this.props.getAdditionalLinks}
capabilities={this.props.capabilities}

View file

@ -34,7 +34,7 @@ export class Failures extends Component<Props, State> {
_renderPaginationControl() {
return this.props.failedDocs.length > PAGE_SIZE ? (
<EuiPagination
pageCount={Math.floor(this.props.failedDocs.length / PAGE_SIZE)}
pageCount={Math.ceil(this.props.failedDocs.length / PAGE_SIZE)}
activePage={this.state.page}
onPageClick={(page) => this.setState({ page })}
compressed
@ -43,9 +43,8 @@ export class Failures extends Component<Props, State> {
}
render() {
const lastDocIndex = this.props.failedDocs.length - 1;
const startIndex = this.state.page * PAGE_SIZE;
const endIndex = startIndex + PAGE_SIZE > lastDocIndex ? lastDocIndex : startIndex + PAGE_SIZE;
const endIndex = startIndex + PAGE_SIZE;
return (
<EuiAccordion
id="failureList"

View file

@ -22,13 +22,13 @@ import {
import { i18n } from '@kbn/i18n';
import { debounce } from 'lodash';
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
import { ResultsLinks } from '../../../common/components/results_links';
import { FilebeatConfigFlyout } from '../../../common/components/filebeat_config_flyout';
import { ImportProgress, IMPORT_STATUS } from '../import_progress';
import { ImportErrors } from '../import_errors';
import { ImportSummary } from '../import_summary';
import { ImportSettings } from '../import_settings';
import { DocCountChart } from '../doc_count_chart';
import {
addCombinedFieldsToPipeline,
addCombinedFieldsToMappings,
@ -36,7 +36,6 @@ import {
} from '../../../common/components/combined_fields';
import { MODE as DATAVISUALIZER_MODE } from '../file_data_visualizer_view/constants';
const DEFAULT_TIME_FIELD = '@timestamp';
const DEFAULT_INDEX_SETTINGS = {};
const CONFIG_MODE = { SIMPLE: 0, ADVANCED: 1 };
@ -74,6 +73,7 @@ const DEFAULT_STATE = {
isFilebeatFlyoutVisible: false,
checkingValidIndex: false,
combinedFields: [],
importer: undefined,
};
export class ImportView extends Component {
@ -205,15 +205,6 @@ export class ImportView extends Component {
parseJSONStatus: success ? IMPORT_STATUS.COMPLETE : IMPORT_STATUS.FAILED,
});
// if an @timestamp field has been added to the
// mappings, use this field as the time field.
// This relies on the field being populated by
// the ingest pipeline on ingest
if (isPopulatedObject(mappings.properties, [DEFAULT_TIME_FIELD])) {
timeFieldName = DEFAULT_TIME_FIELD;
this.setState({ timeFieldName });
}
if (success) {
const importer = await fileUpload.importerFactory(format, {
excludeLinesPattern: results.exclude_lines_pattern,
@ -225,6 +216,7 @@ export class ImportView extends Component {
this.setState({
readStatus: success ? IMPORT_STATUS.COMPLETE : IMPORT_STATUS.FAILED,
reading: false,
importer,
});
if (readResp.success === false) {
@ -240,6 +232,9 @@ export class ImportView extends Component {
pipeline
);
timeFieldName = importer.getTimeField();
this.setState({ timeFieldName });
const indexCreated = initializeImportResp.index !== undefined;
this.setState({
indexCreatedStatus: indexCreated
@ -454,6 +449,7 @@ export class ImportView extends Component {
isFilebeatFlyoutVisible,
checkingValidIndex,
combinedFields,
importer,
} = this.state;
const createPipeline = pipelineString !== '';
@ -487,40 +483,44 @@ export class ImportView extends Component {
</EuiTitle>
</EuiPageHeader>
<EuiSpacer size="m" />
<EuiPanel data-test-subj="dataVisualizerFileImportSettingsPanel">
<EuiTitle size="s">
<h2>
<FormattedMessage
id="xpack.dataVisualizer.file.importView.importDataTitle"
defaultMessage="Import data"
/>
</h2>
</EuiTitle>
{initialized === false ? (
<EuiPanel
data-test-subj="dataVisualizerFileImportSettingsPanel"
hasShadow={false}
hasBorder
>
<EuiTitle size="s">
<h2>
<FormattedMessage
id="xpack.dataVisualizer.file.importView.importDataTitle"
defaultMessage="Import data"
/>
</h2>
</EuiTitle>
<ImportSettings
index={index}
dataView={dataView}
initialized={initialized}
onIndexChange={this.onIndexChange}
createDataView={createDataView}
onCreateDataViewChange={this.onCreateDataViewChange}
onDataViewChange={this.onDataViewChange}
indexSettingsString={indexSettingsString}
mappingsString={mappingsString}
pipelineString={pipelineString}
onIndexSettingsStringChange={this.onIndexSettingsStringChange}
onMappingsStringChange={this.onMappingsStringChange}
onPipelineStringChange={this.onPipelineStringChange}
indexNameError={indexNameError}
dataViewNameError={dataViewNameError}
combinedFields={combinedFields}
onCombinedFieldsChange={this.onCombinedFieldsChange}
results={this.props.results}
/>
<ImportSettings
index={index}
dataView={dataView}
initialized={initialized}
onIndexChange={this.onIndexChange}
createDataView={createDataView}
onCreateDataViewChange={this.onCreateDataViewChange}
onDataViewChange={this.onDataViewChange}
indexSettingsString={indexSettingsString}
mappingsString={mappingsString}
pipelineString={pipelineString}
onIndexSettingsStringChange={this.onIndexSettingsStringChange}
onMappingsStringChange={this.onMappingsStringChange}
onPipelineStringChange={this.onPipelineStringChange}
indexNameError={indexNameError}
dataViewNameError={dataViewNameError}
combinedFields={combinedFields}
onCombinedFieldsChange={this.onCombinedFieldsChange}
results={this.props.results}
/>
<EuiSpacer size="m" />
<EuiSpacer size="m" />
{(initialized === false || importing === true) && (
<EuiFlexGroup>
<EuiFlexItem grow={false}>
<EuiButton
@ -557,48 +557,24 @@ export class ImportView extends Component {
</EuiButtonEmpty>
</EuiFlexItem>
</EuiFlexGroup>
)}
{initialized === true && importing === false && (
<EuiFlexGroup>
<EuiFlexItem grow={false}>
<EuiButton onClick={this.clickReset}>
<FormattedMessage
id="xpack.dataVisualizer.file.importView.resetButtonLabel"
defaultMessage="Reset"
/>
</EuiButton>
</EuiFlexItem>
<EuiFlexItem grow={false}>
<EuiButton
onClick={() => this.props.onChangeMode(DATAVISUALIZER_MODE.READ)}
isDisabled={importing}
>
<FormattedMessage
id="xpack.dataVisualizer.file.importView.backButtonLabel"
defaultMessage="Back"
/>
</EuiButton>
</EuiFlexItem>
<EuiFlexItem grow={false}>
<EuiButtonEmpty onClick={() => this.props.onCancel()} isDisabled={importing}>
<FormattedMessage
id="xpack.dataVisualizer.file.importView.importNewButtonLabel"
defaultMessage="Import a new file"
/>
</EuiButtonEmpty>
</EuiFlexItem>
</EuiFlexGroup>
)}
</EuiPanel>
</EuiPanel>
) : null}
{initialized === true && (
<React.Fragment>
<EuiSpacer size="m" />
<EuiPanel>
<EuiPanel hasShadow={false} hasBorder>
<ImportProgress statuses={statuses} />
{importer !== undefined && importer.initialized() && (
<DocCountChart
statuses={statuses}
dataStart={this.props.dataStart}
importer={importer}
/>
)}
{imported === true && (
<React.Fragment>
<EuiSpacer size="m" />
@ -615,6 +591,27 @@ export class ImportView extends Component {
<EuiSpacer size="l" />
<EuiFlexGroup>
<EuiFlexItem grow={false}>
<EuiButton onClick={this.clickReset}>
<FormattedMessage
id="xpack.dataVisualizer.file.importView.resetButtonLabel"
defaultMessage="Reset"
/>
</EuiButton>
</EuiFlexItem>
<EuiFlexItem grow={false}>
<EuiButtonEmpty onClick={() => this.props.onCancel()} isDisabled={importing}>
<FormattedMessage
id="xpack.dataVisualizer.file.importView.importNewButtonLabel"
defaultMessage="Import a new file"
/>
</EuiButtonEmpty>
</EuiFlexItem>
</EuiFlexGroup>
<EuiSpacer size="l" />
<ResultsLinks
fieldStats={this.props.results?.field_stats}
index={index}

View file

@ -20,7 +20,7 @@ interface Props {
export type FileDataVisualizerSpec = typeof FileDataVisualizer;
export const FileDataVisualizer: FC<Props> = ({ getAdditionalLinks }) => {
const coreStart = getCoreStart();
const { data, maps, embeddable, discover, share, security, fileUpload, cloud } =
const { data, maps, embeddable, discover, share, security, fileUpload, cloud, fieldFormats } =
getPluginsStart();
const services = {
data,
@ -30,6 +30,7 @@ export const FileDataVisualizer: FC<Props> = ({ getAdditionalLinks }) => {
share,
security,
fileUpload,
fieldFormats,
...coreStart,
};
@ -42,6 +43,7 @@ export const FileDataVisualizer: FC<Props> = ({ getAdditionalLinks }) => {
<CloudContext>
<FileDataVisualizerView
dataViewsContract={data.dataViews}
dataStart={data}
http={coreStart.http}
fileUpload={fileUpload}
getAdditionalLinks={getAdditionalLinks}

View file

@ -70,7 +70,8 @@
"@kbn/ml-chi2test",
"@kbn/field-utils",
"@kbn/visualization-utils",
"@kbn/code-editor"
"@kbn/code-editor",
"@kbn/ui-theme"
],
"exclude": [
"target/**/*",

View file

@ -23,10 +23,27 @@ const REDUCED_CHUNK_SIZE = 100;
export const MAX_CHUNK_CHAR_COUNT = 1000000;
export const IMPORT_RETRIES = 5;
const STRING_CHUNKS_MB = 100;
const DEFAULT_TIME_FIELD = '@timestamp';
export abstract class Importer implements IImporter {
protected _docArray: ImportDoc[] = [];
private _chunkSize = CHUNK_SIZE;
private _index: string | undefined;
private _pipeline: IngestPipeline | undefined;
private _timeFieldName: string | undefined;
private _initialized = false;
public initialized() {
return this._initialized;
}
public getIndex() {
return this._index;
}
public getTimeField() {
return this._timeFieldName;
}
public read(data: ArrayBuffer) {
const decoder = new TextDecoder();
@ -82,6 +99,19 @@ export abstract class Importer implements IImporter {
}
: {};
this._index = index;
this._pipeline = pipeline;
// if an @timestamp field has been added to the
// mappings, use this field as the time field.
// This relies on the field being populated by
// the ingest pipeline on ingest
this._timeFieldName = isPopulatedObject(mappings.properties, [DEFAULT_TIME_FIELD])
? DEFAULT_TIME_FIELD
: undefined;
this._initialized = true;
return await callImportRoute({
id: undefined,
index,
@ -180,6 +210,39 @@ export abstract class Importer implements IImporter {
return result;
}
private _getFirstReadDocs(count = 1): object[] {
const firstReadDocs = this._docArray.slice(0, count);
return firstReadDocs.map((doc) => (typeof doc === 'string' ? JSON.parse(doc) : doc));
}
private _getLastReadDocs(count = 1): object[] {
const lastReadDocs = this._docArray.slice(-count);
return lastReadDocs.map((doc) => (typeof doc === 'string' ? JSON.parse(doc) : doc));
}
public async previewIndexTimeRange() {
if (this._initialized === false || this._pipeline === undefined) {
throw new Error('Import has not been initialized');
}
// take the first and last 10 docs from the file, to reduce the chance of getting
// bad data or out of order data.
const firstDocs = this._getFirstReadDocs(10);
const lastDocs = this._getLastReadDocs(10);
const body = JSON.stringify({
docs: firstDocs.concat(lastDocs),
pipeline: this._pipeline,
timeField: this._timeFieldName,
});
return await getHttp().fetch<{ start: number | null; end: number | null }>({
path: `/internal/file_upload/preview_index_time_range`,
method: 'POST',
version: '1',
body,
});
}
}
function populateFailures(

View file

@ -52,4 +52,8 @@ export interface IImporter {
pipelineId: string | undefined,
setImportProgress: (progress: number) => void
): Promise<ImportResults>;
initialized(): boolean;
getIndex(): string | undefined;
getTimeField(): string | undefined;
previewIndexTimeRange(): Promise<{ start: number | null; end: number | null }>;
}

View file

@ -15,3 +15,4 @@ export type { Props as IndexNameFormProps } from './components/geo_upload_form/i
export type { FileUploadPluginStart } from './plugin';
export type { FileUploadComponentProps, FileUploadGeoResults } from './lazy_load_bundle';
export type { IImporter } from './importer/types';

View file

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import dateMath from '@kbn/datemath';
import type {
IngestPipeline,
IngestSimulateDocument,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { IScopedClusterClient } from '@kbn/core/server';
type Doc = IngestSimulateDocument['_source'];
/**
* Returns the start and end time range in epoch milliseconds for a given set of documents
* @param client IScopedClusterClient
* @param timeField Time field name
* @param pipeline ingest pipeline config
* @param docs array of documents
* @returns start and end time range in epoch milliseconds
*/
export async function previewIndexTimeRange(
client: IScopedClusterClient,
timeField: string,
pipeline: IngestPipeline,
docs: Doc[]
): Promise<{ start: number | null; end: number | null }> {
const resp = await client.asInternalUser.ingest.simulate({
pipeline,
docs: docs.map((doc, i) => ({
_index: 'index',
_id: `id${i}`,
_source: doc,
})),
});
const timeFieldValues: string[] = resp.docs.map((doc) => doc.doc?._source[timeField]);
const epochs: number[] = timeFieldValues
.map((timeFieldValue) => dateMath.parse(timeFieldValue)?.valueOf())
.filter((epoch) => epoch !== undefined) as number[];
return {
start: Math.min(...epochs),
end: Math.max(...epochs),
};
}

View file

@ -6,8 +6,8 @@
*/
import { schema } from '@kbn/config-schema';
import { IScopedClusterClient } from '@kbn/core/server';
import { CoreSetup, Logger } from '@kbn/core/server';
import type { IScopedClusterClient } from '@kbn/core/server';
import type { CoreSetup, Logger } from '@kbn/core/server';
import type {
IndicesIndexSettings,
MappingTypeMapping,
@ -26,8 +26,9 @@ import {
analyzeFileQuerySchema,
runtimeMappingsSchema,
} from './schemas';
import { StartDeps } from './types';
import type { StartDeps } from './types';
import { checkFileUploadPrivileges } from './check_privileges';
import { previewIndexTimeRange } from './preview_index_time_range';
function importData(
client: IScopedClusterClient,
@ -270,6 +271,49 @@ export function fileUploadRoutes(coreSetup: CoreSetup<StartDeps, unknown>, logge
runtimeMappings
);
return response.ok({
body: resp,
});
} catch (e) {
return response.customError(wrapError(e));
}
}
);
/**
* @apiGroup FileDataVisualizer
*
* @api {post} /internal/file_upload/preview_index_time_range Predict the time range for an index using example documents
* @apiName PreviewIndexTimeRange
* @apiDescription Predict the time range for an index using example documents
*/
router.versioned
.post({
path: '/internal/file_upload/preview_index_time_range',
access: 'internal',
options: {
tags: ['access:fileUpload:analyzeFile'],
},
})
.addVersion(
{
version: '1',
validate: {
request: {
body: schema.object({
docs: schema.arrayOf(schema.any()),
pipeline: schema.any(),
timeField: schema.string(),
}),
},
},
},
async (context, request, response) => {
try {
const { docs, pipeline, timeField } = request.body;
const esClient = (await context.core).elasticsearch.client;
const resp = await previewIndexTimeRange(esClient, timeField, pipeline, docs);
return response.ok({
body: resp,
});

View file

@ -15,6 +15,7 @@
"@kbn/ml-is-populated-object",
"@kbn/config-schema",
"@kbn/code-editor",
"@kbn/datemath",
],
"exclude": [
"target/**/*",

View file

@ -11,5 +11,6 @@ export default function ({ loadTestFile }: FtrProviderContext) {
describe('File upload', function () {
loadTestFile(require.resolve('./has_import_permission'));
loadTestFile(require.resolve('./index_exists'));
loadTestFile(require.resolve('./preview_index_time_range'));
});
}

View file

@ -0,0 +1,353 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ELASTIC_HTTP_VERSION_HEADER } from '@kbn/core-http-common';
import expect from '@kbn/expect';
import { FtrProviderContext } from '../../ftr_provider_context';
export default ({ getService }: FtrProviderContext) => {
const supertest = getService('supertest');
const fqPipeline = {
description: 'Ingest pipeline created by text structure finder',
processors: [
{
csv: {
field: 'message',
target_fields: ['time', 'airline', 'responsetime', 'sourcetype'],
ignore_missing: false,
},
},
{
date: {
field: 'time',
formats: ['yyyy-MM-dd HH:mm:ssXX'],
},
},
{
convert: {
field: 'responsetime',
type: 'double',
ignore_missing: true,
},
},
{
remove: {
field: 'message',
},
},
],
};
const fqTimeField = '@timestamp';
async function runRequest(docs: any[], pipeline: any, timeField: string) {
const { body } = await supertest
.post(`/internal/file_upload/preview_index_time_range`)
.set('kbn-xsrf', 'kibana')
.set(ELASTIC_HTTP_VERSION_HEADER, '1')
.send({ docs, pipeline, timeField })
.expect(200);
return body;
}
describe('POST /internal/file_upload/preview_index_time_range', () => {
it('should return the correct start and end for normal data', async () => {
const resp = await runRequest(
[
{
message: '2014-06-20 00:00:00Z,AAL,132.2046,farequote',
},
{
message: '2014-06-21 00:00:00Z,JZA,990.4628,farequote',
},
{
message: '2014-06-22 00:00:00Z,JBU,877.5927,farequote',
},
{
message: '2014-06-23 00:00:00Z,KLM,1355.4812,farequote',
},
{
message: '2014-06-24 00:00:00Z,NKS,9991.3981,farequote',
},
{
message: '2014-06-26 23:59:35Z,JBU,923.6772,farequote',
},
{
message: '2014-06-27 23:59:45Z,ACA,21.5385,farequote',
},
{
message: '2014-06-28 23:59:54Z,FFT,251.573,farequote',
},
{
message: '2014-06-29 23:59:54Z,ASA,78.2927,farequote',
},
{
message: '2014-06-30 23:59:56Z,AWE,19.6438,farequote',
},
],
fqPipeline,
fqTimeField
);
expect(resp).to.eql({
start: 1403222400000,
end: 1404172796000,
});
});
it('should return the correct start and end for normal data out of order', async () => {
const resp = await runRequest(
[
{
message: '2014-06-22 00:00:00Z,JBU,877.5927,farequote',
},
{
message: '2014-06-21 00:00:00Z,JZA,990.4628,farequote',
},
{
message: '2014-06-20 00:00:00Z,AAL,132.2046,farequote',
},
{
message: '2014-06-23 00:00:00Z,KLM,1355.4812,farequote',
},
{
message: '2014-06-24 00:00:00Z,NKS,9991.3981,farequote',
},
{
message: '2014-06-26 23:59:35Z,JBU,923.6772,farequote',
},
{
message: '2014-06-27 23:59:45Z,ACA,21.5385,farequote',
},
{
message: '2014-06-30 23:59:56Z,AWE,19.6438,farequote',
},
{
message: '2014-06-28 23:59:54Z,FFT,251.573,farequote',
},
{
message: '2014-06-29 23:59:54Z,ASA,78.2927,farequote',
},
],
fqPipeline,
fqTimeField
);
expect(resp).to.eql({
start: 1403222400000,
end: 1404172796000,
});
});
it('should return the correct start and end for data with bad last doc', async () => {
const resp = await runRequest(
[
{
message: '2014-06-20 00:00:00Z,AAL,132.2046,farequote',
},
{
message: '2014-06-21 00:00:00Z,JZA,990.4628,farequote',
},
{
message: '2014-06-22 00:00:00Z,JBU,877.5927,farequote',
},
{
message: '2014-06-23 00:00:00Z,KLM,1355.4812,farequote',
},
{
message: '2014-06-24 00:00:00Z,NKS,9991.3981,farequote',
},
{
message: '2014-06-26 23:59:35Z,JBU,923.6772,farequote',
},
{
message: '2014-06-27 23:59:45Z,ACA,21.5385,farequote',
},
{
message: '2014-06-28 23:59:54Z,FFT,251.573,farequote',
},
{
message: '2014-06-29 23:59:54Z,ASA,78.2927,farequote',
},
{
// bad data
message: '2014-06-bad 23:59:56Z,AWE,19.6438,farequote',
},
],
fqPipeline,
fqTimeField
);
expect(resp).to.eql({
start: 1403222400000,
end: 1404086394000,
});
});
it('should return the correct start and end for data with bad data near the end', async () => {
const resp = await runRequest(
[
{
message: '2014-06-20 00:00:00Z,AAL,132.2046,farequote',
},
{
message: '2014-06-21 00:00:00Z,JZA,990.4628,farequote',
},
{
message: '2014-06-22 00:00:00Z,JBU,877.5927,farequote',
},
{
message: '2014-06-23 00:00:00Z,KLM,1355.4812,farequote',
},
{
message: '2014-06-24 00:00:00Z,NKS,9991.3981,farequote',
},
{
message: '2014-06-26 23:59:35Z,JBU,923.6772,farequote',
},
{
message: '2014-06-27 23:59:45Z,ACA,21.5385,farequote',
},
{
message: '2014-06-28 23:59:54Z,FFT,251.573,farequote',
},
{
// bad data
message: '2014-06-bad 23:59:54Z,ASA,78.2927,farequote',
},
{
message: '2014-06-30 23:59:56Z,AWE,19.6438,farequote',
},
],
fqPipeline,
fqTimeField
);
expect(resp).to.eql({
start: 1403222400000,
end: 1404172796000,
});
});
it('should return the correct start and end for data with bad first doc', async () => {
const resp = await runRequest(
[
{
// bad data
message: '2014-06-bad 00:00:00Z,AAL,132.2046,farequote',
},
{
message: '2014-06-21 00:00:00Z,JZA,990.4628,farequote',
},
{
message: '2014-06-22 00:00:00Z,JBU,877.5927,farequote',
},
{
message: '2014-06-23 00:00:00Z,KLM,1355.4812,farequote',
},
{
message: '2014-06-24 00:00:00Z,NKS,9991.3981,farequote',
},
{
message: '2014-06-26 23:59:35Z,JBU,923.6772,farequote',
},
{
message: '2014-06-27 23:59:45Z,ACA,21.5385,farequote',
},
{
message: '2014-06-28 23:59:54Z,FFT,251.573,farequote',
},
{
message: '2014-06-29 23:59:54Z,ASA,78.2927,farequote',
},
{
message: '2014-06-30 23:59:56Z,AWE,19.6438,farequote',
},
],
fqPipeline,
fqTimeField
);
expect(resp).to.eql({
start: 1403308800000,
end: 1404172796000,
});
});
it('should return the correct start and end for data with bad near the start', async () => {
const resp = await runRequest(
[
{
message: '2014-06-20 00:00:00Z,AAL,132.2046,farequote',
},
{
// bad data
message: '2014-06-bad 00:00:00Z,JZA,990.4628,farequote',
},
{
message: '2014-06-22 00:00:00Z,JBU,877.5927,farequote',
},
{
message: '2014-06-23 00:00:00Z,KLM,1355.4812,farequote',
},
{
message: '2014-06-24 00:00:00Z,NKS,9991.3981,farequote',
},
{
message: '2014-06-26 23:59:35Z,JBU,923.6772,farequote',
},
{
message: '2014-06-27 23:59:45Z,ACA,21.5385,farequote',
},
{
message: '2014-06-28 23:59:54Z,FFT,251.573,farequote',
},
{
message: '2014-06-29 23:59:54Z,ASA,78.2927,farequote',
},
{
message: '2014-06-30 23:59:56Z,AWE,19.6438,farequote',
},
],
fqPipeline,
fqTimeField
);
expect(resp).to.eql({
start: 1403222400000,
end: 1404172796000,
});
});
it('should return null start and end for entire bad data', async () => {
const resp = await runRequest(
[
{
message: '2014-06-bad 00:00:00Z,AAL,132.2046,farequote',
},
{
message: '2014-06-bad 00:00:00Z,JZA,990.4628,farequote',
},
{
message: '2014-06-bad 00:00:00Z,JBU,877.5927,farequote',
},
{
message: '2014-06-bad 00:00:00Z,KLM,1355.4812,farequote',
},
],
fqPipeline,
fqTimeField
);
expect(resp).to.eql({
start: null,
end: null,
});
});
});
};