[Lens] Add new set of functional tests for TSDB (#161463)

## Summary

Fixes #156473

This PR adds a new service to deal with data-stream and TSDB/TSDS to be
used within the Functional test context.

Additionally a new set of tests for the TSDB world:
* Add tests for a dataView with a mixed set of index/streams (raw,
another TSDB stream, downsampled TSDB stream)
* Add tests for "upgraded' streams (`data-stream` => `TSDB`)
* Add tests for "downgraded" TSDB (`TSDB` => `data-stream`)

Also existing tests have been refactored to work directly with
data-streams where possible.

Flaky test runner 100/100  :
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/2658

### Checklist

Delete any items that are not applicable to this PR.

- [ ] Any text added follows [EUI's writing
guidelines](https://elastic.github.io/eui/#/guidelines/writing), uses
sentence case text and includes [i18n
support](https://github.com/elastic/kibana/blob/main/packages/kbn-i18n/README.md)
- [ ]
[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)
was added for features that require explanation or tutorials
- [ ] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [ ] Any UI touched in this PR is usable by keyboard only (learn more
about [keyboard accessibility](https://webaim.org/techniques/keyboard/))
- [ ] Any UI touched in this PR does not create any new axe failures
(run axe in browser:
[FF](https://addons.mozilla.org/en-US/firefox/addon/axe-devtools/),
[Chrome](https://chrome.google.com/webstore/detail/axe-web-accessibility-tes/lhdoppojpmngadmnindnejefpokejbdd?hl=en-US))
- [ ] If a plugin configuration key changed, check if it needs to be
allowlisted in the cloud and added to the [docker
list](https://github.com/elastic/kibana/blob/main/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker)
- [ ] This renders correctly on smaller devices using a responsive
layout. (You can test this [in your
browser](https://www.browserstack.com/guide/responsive-testing-on-local-server))
- [ ] This was checked for [cross-browser
compatibility](https://www.elastic.co/support/matrix#matrix_browsers)


### Risk Matrix

Delete this section if it is not applicable to this PR.

Before closing this PR, invite QA, stakeholders, and other developers to
identify risks that should be tested prior to the change/feature
release.

When forming the risk matrix, consider some of the following examples
and how they may potentially impact the change:

| Risk | Probability | Severity | Mitigation/Notes |

|---------------------------|-------------|----------|-------------------------|
| Multiple Spaces—unexpected behavior in non-default Kibana Space.
| Low | High | Integration tests will verify that all features are still
supported in non-default Kibana Space and when user switches between
spaces. |
| Multiple nodes—Elasticsearch polling might have race conditions
when multiple Kibana nodes are polling for the same tasks. | High | Low
| Tasks are idempotent, so executing them multiple times will not result
in logical error, but will degrade performance. To test for this case we
add plenty of unit tests around this logic and document manual testing
procedure. |
| Code should gracefully handle cases when feature X or plugin Y are
disabled. | Medium | High | Unit tests will verify that any feature flag
or plugin combination still results in our service operational. |
| [See more potential risk
examples](https://github.com/elastic/kibana/blob/main/RISK_MATRIX.mdx) |


### For maintainers

- [ ] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Stratoula Kalafateli <efstratia.kalafateli@elastic.co>
This commit is contained in:
Marco Liberati 2023-07-18 16:04:24 +02:00 committed by GitHub
parent 2796107353
commit e8fefc6304
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 964 additions and 95 deletions

View file

@ -7,89 +7,371 @@
import expect from '@kbn/expect';
import { partition } from 'lodash';
import moment from 'moment';
import { MappingProperty } from '@elastic/elasticsearch/lib/api/types';
import { FtrProviderContext } from '../../../ftr_provider_context';
const TEST_DOC_COUNT = 100;
const TIME_PICKER_FORMAT = 'MMM D, YYYY [@] HH:mm:ss.SSS';
const timeSeriesMetrics: Record<string, 'gauge' | 'counter'> = {
bytes_gauge: 'gauge',
bytes_counter: 'counter',
};
const timeSeriesDimensions = ['request', 'url'];
type TestDoc = Record<string, string | string[] | number | null | Record<string, unknown>>;
const testDocTemplate: TestDoc = {
agent: 'Mozilla/5.0 (X11; Linux x86_64; rv:6.0a1) Gecko/20110421 Firefox/6.0a1',
bytes: 6219,
clientip: '223.87.60.27',
extension: 'deb',
geo: {
srcdest: 'US:US',
src: 'US',
dest: 'US',
coordinates: { lat: 39.41042861, lon: -88.8454325 },
},
host: 'artifacts.elastic.co',
index: 'kibana_sample_data_logs',
ip: '223.87.60.27',
machine: { ram: 8589934592, os: 'win 8' },
memory: null,
message:
'223.87.60.27 - - [2018-07-22T00:39:02.912Z] "GET /elasticsearch/elasticsearch-6.3.2.deb_1 HTTP/1.1" 200 6219 "-" "Mozilla/5.0 (X11; Linux x86_64; rv:6.0a1) Gecko/20110421 Firefox/6.0a1"',
phpmemory: null,
referer: 'http://twitter.com/success/wendy-lawrence',
request: '/elasticsearch/elasticsearch-6.3.2.deb',
response: 200,
tags: ['success', 'info'],
'@timestamp': '2018-07-22T00:39:02.912Z',
url: 'https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.2.deb_1',
utc_time: '2018-07-22T00:39:02.912Z',
event: { dataset: 'sample_web_logs' },
bytes_gauge: 0,
bytes_counter: 0,
};
function getDataMapping(
{ tsdb, removeTSDBFields }: { tsdb: boolean; removeTSDBFields?: boolean } = {
tsdb: false,
}
): Record<string, MappingProperty> {
const dataStreamMapping: Record<string, MappingProperty> = {
'@timestamp': {
type: 'date',
},
agent: {
fields: {
keyword: {
ignore_above: 256,
type: 'keyword',
},
},
type: 'text',
},
bytes: {
type: 'long',
},
bytes_counter: {
type: 'long',
},
bytes_gauge: {
type: 'long',
},
clientip: {
type: 'ip',
},
event: {
properties: {
dataset: {
type: 'keyword',
},
},
},
extension: {
fields: {
keyword: {
ignore_above: 256,
type: 'keyword',
},
},
type: 'text',
},
geo: {
properties: {
coordinates: {
type: 'geo_point',
},
dest: {
type: 'keyword',
},
src: {
type: 'keyword',
},
srcdest: {
type: 'keyword',
},
},
},
host: {
fields: {
keyword: {
ignore_above: 256,
type: 'keyword',
},
},
type: 'text',
},
index: {
fields: {
keyword: {
ignore_above: 256,
type: 'keyword',
},
},
type: 'text',
},
ip: {
type: 'ip',
},
machine: {
properties: {
os: {
fields: {
keyword: {
ignore_above: 256,
type: 'keyword',
},
},
type: 'text',
},
ram: {
type: 'long',
},
},
},
memory: {
type: 'double',
},
message: {
fields: {
keyword: {
ignore_above: 256,
type: 'keyword',
},
},
type: 'text',
},
phpmemory: {
type: 'long',
},
referer: {
type: 'keyword',
},
request: {
type: 'keyword',
},
response: {
fields: {
keyword: {
ignore_above: 256,
type: 'keyword',
},
},
type: 'text',
},
tags: {
fields: {
keyword: {
ignore_above: 256,
type: 'keyword',
},
},
type: 'text',
},
timestamp: {
path: '@timestamp',
type: 'alias',
},
url: {
type: 'keyword',
},
utc_time: {
type: 'date',
},
};
if (tsdb) {
// augment the current mapping
for (const [fieldName, fieldMapping] of Object.entries(dataStreamMapping || {})) {
if (
timeSeriesMetrics[fieldName] &&
(fieldMapping.type === 'double' || fieldMapping.type === 'long')
) {
fieldMapping.time_series_metric = timeSeriesMetrics[fieldName];
}
if (timeSeriesDimensions.includes(fieldName) && fieldMapping.type === 'keyword') {
fieldMapping.time_series_dimension = true;
}
}
} else if (removeTSDBFields) {
for (const fieldName of Object.keys(timeSeriesMetrics)) {
delete dataStreamMapping[fieldName];
}
}
return dataStreamMapping;
}
function sumFirstNValues(n: number, bars: Array<{ y: number }>): number {
const indexes = Array(n)
.fill(1)
.map((_, i) => i);
let countSum = 0;
for (const index of indexes) {
if (bars[index]) {
countSum += bars[index].y;
}
}
return countSum;
}
export default function ({ getService, getPageObjects }: FtrProviderContext) {
const PageObjects = getPageObjects(['common', 'timePicker', 'lens', 'dashboard']);
const testSubjects = getService('testSubjects');
const es = getService('es');
const find = getService('find');
const log = getService('log');
const indexPatterns = getService('indexPatterns');
const kibanaServer = getService('kibanaServer');
const es = getService('es');
const log = getService('log');
const dataStreams = getService('dataStreams');
const elasticChart = getService('elasticChart');
const indexPatterns = getService('indexPatterns');
const esArchiver = getService('esArchiver');
describe('lens tsdb', function () {
describe('downsampling', () => {
const dataViewTitle = 'sample-01';
const rollupDataViewTitle = 'sample-01,sample-01-rollup';
const fromTime = 'Jun 17, 2022 @ 00:00:00.000';
const toTime = 'Jun 23, 2022 @ 00:00:00.000';
const testArchive = 'test/functional/fixtures/es_archiver/search/downsampled';
const testIndex = 'sample-01';
const testRollupIndex = 'sample-01-rollup';
before(async () => {
// create rollup data
log.info(`loading ${testIndex} index...`);
await esArchiver.loadIfNeeded(testArchive);
log.info(`add write block to ${testIndex} index...`);
await es.indices.addBlock({ index: testIndex, block: 'write' });
try {
log.info(`rolling up ${testIndex} index...`);
// es client currently does not have method for downsample
await es.transport.request<void>({
method: 'POST',
path: '/sample-01/_downsample/sample-01-rollup',
body: { fixed_interval: '1h' },
});
} catch (err) {
log.info(`ignoring resource_already_exists_exception...`);
if (!err.message.match(/resource_already_exists_exception/)) {
throw err;
const createDocs = async (
esIndex: string,
{ isStream, removeTSDBFields }: { isStream: boolean; removeTSDBFields?: boolean },
startTime: string
) => {
log.info(
`Adding ${TEST_DOC_COUNT} to ${esIndex} with starting time from ${moment
.utc(startTime, TIME_PICKER_FORMAT)
.format(TIME_PICKER_FORMAT)} to ${moment
.utc(startTime, TIME_PICKER_FORMAT)
.add(2 * TEST_DOC_COUNT, 'seconds')
.format(TIME_PICKER_FORMAT)}`
);
const docs = Array<TestDoc>(TEST_DOC_COUNT)
.fill(testDocTemplate)
.map((templateDoc, i) => {
const timestamp = moment
.utc(startTime, TIME_PICKER_FORMAT)
.add(TEST_DOC_COUNT + i, 'seconds')
.format();
const doc: TestDoc = {
...templateDoc,
'@timestamp': timestamp,
utc_time: timestamp,
bytes_gauge: Math.floor(Math.random() * 10000 * i),
bytes_counter: 5000,
};
if (removeTSDBFields) {
for (const field of Object.keys(timeSeriesMetrics)) {
delete doc[field];
}
}
return doc;
});
log.info(`creating ${rollupDataViewTitle} data view...`);
await indexPatterns.create(
{
title: rollupDataViewTitle,
timeFieldName: '@timestamp',
},
{ override: true }
);
await indexPatterns.create(
{
title: dataViewTitle,
timeFieldName: '@timestamp',
},
{ override: true }
);
await kibanaServer.uiSettings.update({
'dateFormat:tz': 'UTC',
defaultIndex: '0ae0bc7a-e4ca-405c-ab67-f2b5913f2a51',
'timepicker:timeDefaults': '{ "from": "now-1y", "to": "now" }',
const result = await es.bulk(
{
index: esIndex,
body: docs.map((d) => `{"${isStream ? 'create' : 'index'}": {}}\n${JSON.stringify(d)}\n`),
},
{ meta: true }
);
const res = result.body;
if (res.errors) {
const resultsWithErrors = res.items
.filter(({ index }) => index?.error)
.map(({ index }) => index?.error);
for (const error of resultsWithErrors) {
log.error(`Error: ${JSON.stringify(error)}`);
}
const [indexExists, dataStreamExists] = await Promise.all([
es.indices.exists({ index: esIndex }),
es.indices.getDataStream({ name: esIndex }),
]);
log.debug(`Index exists: ${indexExists} - Data stream exists: ${dataStreamExists}`);
}
log.info(`Indexed ${res.items.length} test data docs.`);
};
describe('lens tsdb', function () {
const tsdbIndex = 'kibana_sample_data_logstsdb';
const tsdbDataView = tsdbIndex;
const tsdbEsArchive = 'test/functional/fixtures/es_archiver/kibana_sample_data_logs_tsdb';
const fromTime = 'Apr 16, 2023 @ 00:00:00.000';
const toTime = 'Jun 16, 2023 @ 00:00:00.000';
before(async () => {
log.info(`loading ${tsdbIndex} index...`);
await esArchiver.loadIfNeeded(tsdbEsArchive);
log.info(`creating a data view for "${tsdbDataView}"...`);
await indexPatterns.create(
{
title: tsdbDataView,
timeFieldName: '@timestamp',
},
{ override: true }
);
log.info(`updating settings to use the "${tsdbDataView}" dataView...`);
await kibanaServer.uiSettings.update({
'dateFormat:tz': 'UTC',
defaultIndex: '0ae0bc7a-e4ca-405c-ab67-f2b5913f2a51',
'timepicker:timeDefaults': `{ "from": "${fromTime}", "to": "${toTime}" }`,
});
});
after(async () => {
await kibanaServer.savedObjects.cleanStandardList();
await kibanaServer.uiSettings.replace({});
await es.indices.delete({ index: [tsdbIndex] });
});
describe('downsampling', () => {
const downsampleDataView: { index: string; dataView: string } = { index: '', dataView: '' };
before(async () => {
const downsampledTargetIndex = await dataStreams.downsampleTSDBIndex(tsdbIndex, {
isStream: false,
});
downsampleDataView.index = downsampledTargetIndex;
downsampleDataView.dataView = `${tsdbIndex},${downsampledTargetIndex}`;
log.info(`creating a data view for "${downsampleDataView.dataView}"...`);
await indexPatterns.create(
{
title: downsampleDataView.dataView,
timeFieldName: '@timestamp',
},
{ override: true }
);
});
after(async () => {
await kibanaServer.savedObjects.cleanStandardList();
await kibanaServer.uiSettings.replace({});
await es.indices.delete({ index: [testIndex, testRollupIndex] });
await es.indices.delete({ index: [downsampleDataView.index] });
});
describe('for regular metric', () => {
it('defaults to median for non-rolled up metric', async () => {
await PageObjects.common.navigateToApp('lens');
await PageObjects.timePicker.setAbsoluteRange(fromTime, toTime);
await PageObjects.lens.switchDataPanelIndexPattern(dataViewTitle);
await PageObjects.lens.waitForField('kubernetes.container.memory.available.bytes');
await PageObjects.lens.dragFieldToWorkspace(
'kubernetes.container.memory.available.bytes',
'xyVisChart'
);
await PageObjects.lens.waitForField('bytes_gauge');
await PageObjects.lens.dragFieldToWorkspace('bytes_gauge', 'xyVisChart');
expect(await PageObjects.lens.getDimensionTriggerText('lnsXY_yDimensionPanel')).to.eql(
'Median of kubernetes.container.memory.available.bytes'
'Median of bytes_gauge'
);
});
@ -101,17 +383,14 @@ export default function ({ getService, getPageObjects }: FtrProviderContext) {
});
});
describe('for rolled up metric', () => {
describe('for rolled up metric (downsampled)', () => {
it('defaults to average for rolled up metric', async () => {
await PageObjects.lens.switchDataPanelIndexPattern(rollupDataViewTitle);
await PageObjects.lens.switchDataPanelIndexPattern(downsampleDataView.dataView);
await PageObjects.lens.removeLayer();
await PageObjects.lens.waitForField('kubernetes.container.memory.available.bytes');
await PageObjects.lens.dragFieldToWorkspace(
'kubernetes.container.memory.available.bytes',
'xyVisChart'
);
await PageObjects.lens.waitForField('bytes_gauge');
await PageObjects.lens.dragFieldToWorkspace('bytes_gauge', 'xyVisChart');
expect(await PageObjects.lens.getDimensionTriggerText('lnsXY_yDimensionPanel')).to.eql(
'Average of kubernetes.container.memory.available.bytes'
'Average of bytes_gauge'
);
});
it('shows warnings in editor when using median', async () => {
@ -120,7 +399,7 @@ export default function ({ getService, getPageObjects }: FtrProviderContext) {
await testSubjects.click('lns-indexPatternDimension-median');
await PageObjects.lens.waitForVisualization('xyVisChart');
await PageObjects.lens.assertMessageListContains(
'Median of kubernetes.container.memory.available.bytes uses a function that is unsupported by rolled up data. Select a different function or change the time range.',
'Median of bytes_gauge uses a function that is unsupported by rolled up data. Select a different function or change the time range.',
'warning'
);
});
@ -129,42 +408,20 @@ export default function ({ getService, getPageObjects }: FtrProviderContext) {
await PageObjects.dashboard.waitForRenderComplete();
await PageObjects.lens.assertMessageListContains(
'Median of kubernetes.container.memory.available.bytes uses a function that is unsupported by rolled up data. Select a different function or change the time range.',
'Median of bytes_gauge uses a function that is unsupported by rolled up data. Select a different function or change the time range.',
'warning'
);
});
});
});
describe('field types support', () => {
describe('time series special field types support', () => {
before(async () => {
log.info(`loading sample TSDB index...`);
await esArchiver.load('test/functional/fixtures/es_archiver/kibana_sample_data_logs_tsdb');
log.info(`creating the TSDB data view...`);
await kibanaServer.importExport.load(
'test/functional/fixtures/kbn_archiver/kibana_sample_data_logs_tsdb'
);
log.info(`setting the TSDB dataView as default...`);
await kibanaServer.uiSettings.replace({
defaultIndex: '90943e30-9a47-11e8-b64d-95841ca0c247',
});
await PageObjects.common.navigateToApp('lens');
await PageObjects.lens.switchDataPanelIndexPattern(tsdbDataView);
await PageObjects.lens.goToTimeRange();
});
after(async () => {
log.info(`removing the TSDB index...`);
await esArchiver.unload(
'test/functional/fixtures/es_archiver/kibana_sample_data_logs_tsdb'
);
log.info(`removing the TSDB dataView...`);
await kibanaServer.importExport.unload(
'test/functional/fixtures/kbn_archiver/kibana_sample_data_logs_tsdb'
);
log.info(`unsetting the TSDB dataView default...`);
await kibanaServer.uiSettings.unset('defaultIndex');
});
afterEach(async () => {
await PageObjects.lens.removeLayer();
});
@ -289,5 +546,382 @@ export default function ({ getService, getPageObjects }: FtrProviderContext) {
}
}
});
describe('Scenarios with changing stream type', () => {
const now = moment().utc();
const fromMoment = now.clone().subtract(1, 'hour');
const toMoment = now.clone();
const fromTimeForScenarios = fromMoment.format(TIME_PICKER_FORMAT);
const toTimeForScenarios = toMoment.format(TIME_PICKER_FORMAT);
const getScenarios = (
initialIndex: string
): Array<{
name: string;
indexes: Array<{
index: string;
create?: boolean;
downsample?: boolean;
tsdb?: boolean;
removeTSDBFields?: boolean;
}>;
}> => [
{
name: 'Dataview with no additional stream/index',
indexes: [{ index: initialIndex }],
},
{
name: 'Dataview with an additional regular index',
indexes: [
{ index: initialIndex },
{ index: 'regular_index', create: true, removeTSDBFields: true },
],
},
{
name: 'Dataview with an additional downsampled TSDB stream',
indexes: [
{ index: initialIndex },
{ index: 'tsdb_index_2', create: true, tsdb: true, downsample: true },
],
},
{
name: 'Dataview with additional regular index and a downsampled TSDB stream',
indexes: [
{ index: initialIndex },
{ index: 'regular_index', create: true, removeTSDBFields: true },
{ index: 'tsdb_index_2', create: true, tsdb: true, downsample: true },
],
},
{
name: 'Dataview with an additional TSDB stream',
indexes: [{ index: initialIndex }, { index: 'tsdb_index_2', create: true, tsdb: true }],
},
];
function runTestsForEachScenario(
initialIndex: string,
testingFn: (
indexes: Array<{
index: string;
create?: boolean;
downsample?: boolean;
tsdb?: boolean;
removeTSDBFields?: boolean;
}>
) => void
): void {
for (const { name, indexes } of getScenarios(initialIndex)) {
describe(name, () => {
let dataViewName: string;
let downsampledTargetIndex: string = '';
before(async () => {
for (const { index, create, downsample, tsdb, removeTSDBFields } of indexes) {
if (create) {
if (tsdb) {
await dataStreams.createDataStream(
index,
getDataMapping({ tsdb, removeTSDBFields }),
tsdb
);
} else {
log.info(`creating a index "${index}" with mapping...`);
await es.indices.create({
index,
mappings: {
properties: getDataMapping({ tsdb: Boolean(tsdb), removeTSDBFields }),
},
});
}
// add data to the newly created index
await createDocs(
index,
{ isStream: Boolean(tsdb), removeTSDBFields },
fromTimeForScenarios
);
}
if (downsample) {
downsampledTargetIndex = await dataStreams.downsampleTSDBIndex(index, {
isStream: Boolean(tsdb),
});
}
}
dataViewName = `${indexes.map(({ index }) => index).join(',')}${
downsampledTargetIndex ? `,${downsampledTargetIndex}` : ''
}`;
log.info(`creating a data view for "${dataViewName}"...`);
await indexPatterns.create(
{
title: dataViewName,
timeFieldName: '@timestamp',
},
{ override: true }
);
await PageObjects.common.navigateToApp('lens');
await elasticChart.setNewChartUiDebugFlag(true);
// go to the
await PageObjects.lens.goToTimeRange(
fromTimeForScenarios,
moment
.utc(toTimeForScenarios, TIME_PICKER_FORMAT)
.add(2, 'hour')
.format(TIME_PICKER_FORMAT) // consider also new documents
);
});
after(async () => {
for (const { index, create, tsdb } of indexes) {
if (create) {
if (tsdb) {
await dataStreams.deleteDataStream(index);
} else {
log.info(`deleting the index "${index}"...`);
await es.indices.delete({
index,
});
}
}
// no need to cleant he specific downsample index as everything linked to the stream
// is cleaned up automatically
}
});
beforeEach(async () => {
await PageObjects.lens.switchDataPanelIndexPattern(dataViewName);
await PageObjects.lens.removeLayer();
});
testingFn(indexes);
});
}
}
describe('Data-stream upgraded to TSDB scenarios', () => {
const streamIndex = 'data_stream';
// rollover does not allow to change name, it will just change backing index underneath
const streamConvertedToTsdbIndex = streamIndex;
before(async () => {
log.info(`Creating "${streamIndex}" data stream...`);
await dataStreams.createDataStream(streamIndex, getDataMapping(), false);
// add some data to the stream
await createDocs(streamIndex, { isStream: true }, fromTimeForScenarios);
log.info(`Update settings for "${streamIndex}" dataView...`);
await kibanaServer.uiSettings.update({
'dateFormat:tz': 'UTC',
'timepicker:timeDefaults': '{ "from": "now-1y", "to": "now" }',
});
log.info(`Upgrade "${streamIndex}" stream to TSDB...`);
const tsdbMapping = getDataMapping({ tsdb: true });
await dataStreams.upgradeStreamToTSDB(streamIndex, tsdbMapping);
log.info(
`Add more data to new "${streamConvertedToTsdbIndex}" dataView (now with TSDB backing index)...`
);
// add some more data when upgraded
await createDocs(streamConvertedToTsdbIndex, { isStream: true }, toTimeForScenarios);
});
after(async () => {
await dataStreams.deleteDataStream(streamIndex);
});
runTestsForEachScenario(streamConvertedToTsdbIndex, (indexes) => {
it('should detect the data stream has now been upgraded to TSDB', async () => {
await PageObjects.lens.configureDimension({
dimension: 'lnsXY_xDimensionPanel > lns-empty-dimension',
operation: 'date_histogram',
field: '@timestamp',
});
await PageObjects.lens.configureDimension({
dimension: 'lnsXY_yDimensionPanel > lns-empty-dimension',
operation: 'min',
field: `bytes_counter`,
keepOpen: true,
});
expect(
testSubjects.exists(`lns-indexPatternDimension-average incompatible`, {
timeout: 500,
})
).to.eql(false);
await PageObjects.lens.closeDimensionEditor();
});
it(`should visualize a date histogram chart for counter field`, async () => {
await PageObjects.lens.configureDimension({
dimension: 'lnsXY_xDimensionPanel > lns-empty-dimension',
operation: 'date_histogram',
field: '@timestamp',
});
// check the counter field works
await PageObjects.lens.configureDimension({
dimension: 'lnsXY_yDimensionPanel > lns-empty-dimension',
operation: 'min',
field: `bytes_counter`,
});
// and also that the count of documents should be "indexes.length" times overall
await PageObjects.lens.configureDimension({
dimension: 'lnsXY_yDimensionPanel > lns-empty-dimension',
operation: 'count',
});
await PageObjects.lens.waitForVisualization('xyVisChart');
const data = await PageObjects.lens.getCurrentChartDebugState('xyVisChart');
const counterBars = data.bars![0].bars;
const countBars = data.bars![1].bars;
log.info('Check counter data before the upgrade');
// check there's some data before the upgrade
expect(counterBars[0].y).to.eql(5000);
log.info('Check counter data after the upgrade');
// check there's some data after the upgrade
expect(counterBars[counterBars.length - 1].y).to.eql(5000);
log.info('Check count before the upgrade');
const columnsToCheck = countBars.length / 2;
// Before the upgrade the count is N times the indexes
expect(sumFirstNValues(columnsToCheck, countBars)).to.eql(
indexes.length * TEST_DOC_COUNT
);
log.info('Check count after the upgrade');
// later there are only documents for the upgraded stream
expect(sumFirstNValues(columnsToCheck, [...countBars].reverse())).to.eql(
TEST_DOC_COUNT
);
});
});
});
describe('TSDB downgraded to regular data stream scenarios', () => {
const tsdbStream = 'tsdb_stream_dowgradable';
// rollover does not allow to change name, it will just change backing index underneath
const tsdbConvertedToStream = tsdbStream;
before(async () => {
log.info(`Creating "${tsdbStream}" data stream...`);
await dataStreams.createDataStream(tsdbStream, getDataMapping({ tsdb: true }), true);
// add some data to the stream
await createDocs(tsdbStream, { isStream: true }, fromTimeForScenarios);
log.info(`Update settings for "${tsdbStream}" dataView...`);
await kibanaServer.uiSettings.update({
'dateFormat:tz': 'UTC',
'timepicker:timeDefaults': '{ "from": "now-1y", "to": "now" }',
});
log.info(
`Dowgrade "${tsdbStream}" stream into regular stream "${tsdbConvertedToStream}"...`
);
await dataStreams.downgradeTSDBtoStream(tsdbStream, getDataMapping({ tsdb: true }));
log.info(`Add more data to new "${tsdbConvertedToStream}" dataView (no longer TSDB)...`);
// add some more data when upgraded
await createDocs(tsdbConvertedToStream, { isStream: true }, toTimeForScenarios);
});
after(async () => {
await dataStreams.deleteDataStream(tsdbConvertedToStream);
});
runTestsForEachScenario(tsdbConvertedToStream, (indexes) => {
it('should keep TSDB restrictions only if a tsdb stream is in the dataView mix', async () => {
await PageObjects.lens.configureDimension({
dimension: 'lnsXY_xDimensionPanel > lns-empty-dimension',
operation: 'date_histogram',
field: '@timestamp',
});
await PageObjects.lens.configureDimension({
dimension: 'lnsXY_yDimensionPanel > lns-empty-dimension',
operation: 'min',
field: `bytes_counter`,
keepOpen: true,
});
expect(
testSubjects.exists(`lns-indexPatternDimension-average incompatible`, {
timeout: 500,
})
).to.eql(indexes.some(({ tsdb }) => tsdb));
await PageObjects.lens.closeDimensionEditor();
});
it(`should visualize a date histogram chart for counter field`, async () => {
await PageObjects.lens.configureDimension({
dimension: 'lnsXY_xDimensionPanel > lns-empty-dimension',
operation: 'date_histogram',
field: '@timestamp',
});
// just check the data is shown
await PageObjects.lens.configureDimension({
dimension: 'lnsXY_yDimensionPanel > lns-empty-dimension',
operation: 'count',
});
await PageObjects.lens.waitForVisualization('xyVisChart');
const data = await PageObjects.lens.getCurrentChartDebugState('xyVisChart');
const bars = data.bars![0].bars;
const columnsToCheck = bars.length / 2;
log.info('Check count before the downgrade');
// Before the upgrade the count is N times the indexes
expect(sumFirstNValues(columnsToCheck, bars)).to.eql(indexes.length * TEST_DOC_COUNT);
log.info('Check count after the downgrade');
// later there are only documents for the upgraded stream
expect(sumFirstNValues(columnsToCheck, [...bars].reverse())).to.eql(TEST_DOC_COUNT);
});
it('should visualize data when moving the time window around the downgrade moment', async () => {
// check after the downgrade
await PageObjects.lens.goToTimeRange(
moment
.utc(fromTimeForScenarios, TIME_PICKER_FORMAT)
.subtract(1, 'hour')
.format(TIME_PICKER_FORMAT),
moment
.utc(fromTimeForScenarios, TIME_PICKER_FORMAT)
.add(1, 'hour')
.format(TIME_PICKER_FORMAT) // consider only new documents
);
await PageObjects.lens.configureDimension({
dimension: 'lnsXY_xDimensionPanel > lns-empty-dimension',
operation: 'date_histogram',
field: '@timestamp',
});
await PageObjects.lens.configureDimension({
dimension: 'lnsXY_yDimensionPanel > lns-empty-dimension',
operation: 'count',
});
await PageObjects.lens.waitForVisualization('xyVisChart');
const dataBefore = await PageObjects.lens.getCurrentChartDebugState('xyVisChart');
const barsBefore = dataBefore.bars![0].bars;
expect(barsBefore.some(({ y }) => y)).to.eql(true);
// check after the downgrade
await PageObjects.lens.goToTimeRange(
moment
.utc(toTimeForScenarios, TIME_PICKER_FORMAT)
.add(1, 'second')
.format(TIME_PICKER_FORMAT),
moment
.utc(toTimeForScenarios, TIME_PICKER_FORMAT)
.add(2, 'hour')
.format(TIME_PICKER_FORMAT) // consider also new documents
);
await PageObjects.lens.waitForVisualization('xyVisChart');
const dataAfter = await PageObjects.lens.getCurrentChartDebugState('xyVisChart');
const barsAfter = dataAfter.bars![0].bars;
expect(barsAfter.some(({ y }) => y)).to.eql(true);
});
});
});
});
});
}

View file

@ -0,0 +1,233 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { MappingProperty } from '@elastic/elasticsearch/lib/api/types';
import type { FtrProviderContext } from '../ftr_provider_context';
const waitFor = (time: number = 1000) => new Promise((r) => setTimeout(r, time));
/**
* High level interface to operate with Elasticsearch data stream and TSDS.
*/
export function DataStreamProvider({ getService }: FtrProviderContext) {
const es = getService('es');
const log = getService('log');
const retry = getService('retry');
const downsampleDefaultOptions = {
isStream: true,
interval: '1h',
deleteOriginal: false,
};
/**
* Downsample a data-stream or a specific backing index
* @param indexOrStream An index or a data stream
* @param options A set of options to configure the downsample.
* @param options.isStream The most important option which is used to correctly handle data streams when passed (otherwise it will throw). Default is true.
* @param options.interval The interval size for the downsampling. Default value is '1h'.
* @param options.deleteOriginal Whether the original backing index (not data stream!) should be deleted after downsampling. Default to false.
* @returns the name of the downsampled index
*/
async function downsampleTSDBIndex(
indexOrStream: string,
{
isStream = downsampleDefaultOptions.isStream,
interval = downsampleDefaultOptions.interval,
deleteOriginal = downsampleDefaultOptions.deleteOriginal,
}: { isStream: boolean; interval?: string; deleteOriginal?: boolean } = downsampleDefaultOptions
) {
let sourceIndex = indexOrStream;
// block and downsample work only at index level, so no direct data stream access
// there's some more work to do if a data stream is passed
if (isStream) {
log.info('Force a rollover for the data stream to get the backing "old_index"');
const res = await es.indices.rollover({
alias: indexOrStream,
});
sourceIndex = res.old_index;
}
const downsampledTargetIndex = `${indexOrStream}_downsampled`;
log.info(`add write block to "${sourceIndex}" index...`);
await es.indices.addBlock({ index: sourceIndex, block: 'write' });
let waitTime = 1000;
await retry.tryForTime(
15000,
async () => {
log.debug(
`Wait ${
waitTime / 1000
}s before running the downsampling to avoid a null_pointer_exception`
);
await waitFor(waitTime);
try {
log.info(`downsampling "${sourceIndex}" index...`);
await es.indices.downsample({
index: sourceIndex,
target_index: downsampledTargetIndex,
config: { fixed_interval: interval || downsampleDefaultOptions.interval },
});
} catch (err) {
// Ignore this specific errors
if (err.message.match(/resource_already_exists_exception/)) {
log.info(`ignoring resource_already_exists_exception...`);
return;
}
// increase the waiting time exponentially?
waitTime = waitTime * 1.5;
// make it bubble up everything else
throw err;
}
},
async () => {
// provide some debug info if the retry fails
if (isStream) {
const [exists, oldIndexExists] = await Promise.all([
es.indices.getDataStream({ name: indexOrStream }),
es.indices.exists({ index: sourceIndex }),
]);
log.debug(`Data stream exists: ${Boolean(exists)}; old_index exists: ${oldIndexExists}`);
} else {
const exists = await es.indices.exists({ index: indexOrStream });
log.debug(`Index exists: ${exists}`);
}
}
);
if (deleteOriginal) {
log.info(`Deleting original index ${sourceIndex}`);
await es.indices.delete({ index: sourceIndex });
}
return downsampledTargetIndex;
}
// @internal
async function updateDataStreamTemplate(
stream: string,
mapping: Record<string, MappingProperty>,
tsdb?: boolean
) {
await es.cluster.putComponentTemplate({
name: `${stream}_mapping`,
template: {
settings: tsdb
? {
mode: 'time_series',
routing_path: 'request',
}
: { mode: undefined },
mappings: {
properties: mapping,
},
},
});
log.info(`Updating ${stream} index template${tsdb ? ' for TSDB' : ''}...`);
await es.indices.putIndexTemplate({
name: `${stream}_index_template`,
index_patterns: [stream],
data_stream: {},
composed_of: [`${stream}_mapping`],
_meta: {
description: `Template for ${stream} testing index`,
},
});
}
/**
* "Upgrade" a given data stream into a time series data series (TSDB/TSDS)
* @param stream the data stream name
* @param newMapping the new mapping already with time series metrics/dimensions configured
*/
async function upgradeStreamToTSDB(stream: string, newMapping: Record<string, MappingProperty>) {
// rollover to upgrade the index type to time_series
// uploading a new mapping for the stream index using the provided metric/dimension list
log.info(`Updating ${stream} data stream component template with TSDB stuff...`);
await updateDataStreamTemplate(stream, newMapping, true);
log.info('Rolling over the backing index for TSDB');
await es.indices.rollover({
alias: stream,
});
}
/**
* "Downgrade" a TSDB/TSDS data stream into a regular data stream
* @param tsdbStream the TSDB/TSDS data stream to "downgrade"
* @param oldMapping the new mapping already with time series metrics/dimensions already removed
*/
async function downgradeTSDBtoStream(
tsdbStream: string,
newMapping: Record<string, MappingProperty>
) {
// strip out any time-series specific mapping
for (const fieldMapping of Object.values(newMapping || {})) {
if ('time_series_metric' in fieldMapping) {
delete fieldMapping.time_series_metric;
}
if ('time_series_dimension' in fieldMapping) {
delete fieldMapping.time_series_dimension;
}
}
log.info(`Updating ${tsdbStream} data stream component template with TSDB stuff...`);
await updateDataStreamTemplate(tsdbStream, newMapping, false);
// rollover to downgrade the index type to regular stream
log.info(`Rolling over the ${tsdbStream} data stream into a regular data stream...`);
await es.indices.rollover({
alias: tsdbStream,
});
}
/**
* Takes care of the entire process to create a data stream
* @param streamIndex name of the new data stream to create
* @param mappings the mapping to associate with the data stream
* @param tsdb when enabled it will configure the data stream as a TSDB/TSDS
*/
async function createDataStream(
streamIndex: string,
mappings: Record<string, MappingProperty>,
tsdb: boolean = true
) {
log.info(`Creating ${streamIndex} data stream component template...`);
await updateDataStreamTemplate(streamIndex, mappings, tsdb);
log.info(`Creating ${streamIndex} data stream index...`);
await es.indices.createDataStream({
name: streamIndex,
});
}
/**
* Takes care of deleting a data stream and cleaning up everything associated to it
* @param streamIndex name of the data stream
*/
async function deleteDataStream(streamIndex: string) {
log.info(`Delete ${streamIndex} data stream index...`);
await es.indices.deleteDataStream({ name: streamIndex });
log.info(`Delete ${streamIndex} index template...`);
await es.indices.deleteIndexTemplate({
name: `${streamIndex}_index_template`,
});
log.info(`Delete ${streamIndex} data stream component template...`);
await es.cluster.deleteComponentTemplate({
name: `${streamIndex}_mapping`,
});
}
return {
createDataStream,
deleteDataStream,
downsampleTSDBIndex,
upgradeStreamToTSDB,
downgradeTSDBtoStream,
};
}

View file

@ -68,6 +68,7 @@ import { ActionsServiceProvider } from './actions';
import { RulesServiceProvider } from './rules';
import { AiopsProvider } from './aiops';
import { SampleDataServiceProvider } from './sample_data';
import { DataStreamProvider } from './data_stream';
// define the name and providers for services that should be
// available to your tests. If you don't specify anything here
@ -129,4 +130,5 @@ export const services = {
cases: CasesServiceProvider,
aiops: AiopsProvider,
sampleData: SampleDataServiceProvider,
dataStreams: DataStreamProvider,
};