[Fleet] Rollover data streams when package w/ TSDB setting changed is installed (#157869)

## Summary

Fixes https://github.com/elastic/kibana/issues/157345

When a package with a changed `index.mode` or `source.mode` setting is
installed, Fleet will now automatically perform a rollover to ensure the
correct setting is present on the resulting backing index.

There is an issue with Elasticsearch wherein toggling these settings
back and forth will incur a backing index range overlap error. See
https://github.com/elastic/elasticsearch/issues/96163.

To test
1. Install the `system` integration at version `1.28.0`
2. Create an integration policy for the `system` integration (a standard
default agent policy will do)
3. Enroll an agent in this policy, and allow it to ingest some data
4. Confirm that there are documents present in the
`metrics-system.cpu-default` data stream, and note its backing index via
Stack Management
5. Create a new `1.28.1` version of the `system` integration where
`elasticsearch.index_mode: time_series` is set and install it via
`elastic-package install --zip`
6. Confirm that a rollover occurs and the backing index for the
`metrics-system.cpu-default` data stream has been updated

### Checklist

Delete any items that are not applicable to this PR.

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios

---------

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Kyle Pollich 2023-05-16 14:16:14 -04:00 committed by GitHub
parent cf6f350ed2
commit 22e38472f6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 429 additions and 221 deletions

View file

@ -6,7 +6,10 @@
*/
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import type { IndicesIndexSettings } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type {
IndicesIndexSettings,
MappingTypeMapping,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { Field, Fields } from '../../fields/field';
import type {
@ -642,7 +645,20 @@ const updateExistingDataStream = async ({
esClient: ElasticsearchClient;
logger: Logger;
}) => {
const existingDs = await esClient.indices.get({
index: dataStreamName,
});
const existingDsConfig = Object.values(existingDs);
const currentBackingIndexConfig = existingDsConfig.at(-1);
const currentIndexMode = currentBackingIndexConfig?.settings?.index?.mode;
// @ts-expect-error Property 'mode' does not exist on type 'MappingSourceField'
const currentSourceType = currentBackingIndexConfig.mappings?._source?.mode;
let settings: IndicesIndexSettings;
let mappings: MappingTypeMapping;
try {
const simulateResult = await retryTransientEsErrors(() =>
esClient.indices.simulateTemplate({
@ -651,7 +667,8 @@ const updateExistingDataStream = async ({
);
settings = simulateResult.template.settings;
const mappings = simulateResult.template.mappings;
mappings = simulateResult.template.mappings;
// for now, remove from object so as not to update stream or data stream properties of the index until type and name
// are added in https://github.com/elastic/kibana/issues/66551. namespace value we will continue
// to skip updating and assume the value in the index mapping is correct
@ -659,6 +676,8 @@ const updateExistingDataStream = async ({
delete mappings.properties.stream;
delete mappings.properties.data_stream;
}
logger.debug(`Updating mappings for ${dataStreamName}`);
await retryTransientEsErrors(
() =>
esClient.indices.putMapping({
@ -668,16 +687,38 @@ const updateExistingDataStream = async ({
}),
{ logger }
);
// if update fails, rollover data stream
// if update fails, rollover data stream and bail out
} catch (err) {
logger.error(`Mappings update for ${dataStreamName} failed`);
logger.error(err);
await rolloverDataStream(dataStreamName, esClient);
return;
}
// Trigger a rollover if the index mode or source type has changed
if (
currentIndexMode !== settings?.index?.mode ||
// @ts-expect-error Property 'mode' does not exist on type 'MappingSourceField'
currentSourceType !== mappings?._source?.mode
) {
logger.info(
`Index mode or source type has changed for ${dataStreamName}, triggering a rollover`
);
await rolloverDataStream(dataStreamName, esClient);
}
// update settings after mappings was successful to ensure
// pointing to the new pipeline is safe
// for now, only update the pipeline
if (!settings?.index?.default_pipeline) return;
if (!settings?.index?.default_pipeline) {
return;
}
try {
logger.debug(`Updating settings for ${dataStreamName}`);
await retryTransientEsErrors(
() =>
esClient.indices.putSettings({

View file

@ -14,18 +14,13 @@ import { setupFleetAndAgents } from '../agents/services';
export default function (providerContext: FtrProviderContext) {
const { getService } = providerContext;
const supertest = getService('supertest');
const es = getService('es');
const pkgName = 'datastreams';
const pkgVersion = '0.1.0';
const pkgUpdateVersion = '0.2.0';
const logsTemplateName = `logs-${pkgName}.test_logs`;
const metricsTemplateName = `metrics-${pkgName}.test_metrics`;
const namespaces = ['default', 'foo', 'bar'];
const supertest = getService('supertest');
const uninstallPackage = async (name: string, version: string) => {
await supertest.delete(`/api/fleet/epm/packages/${name}/${version}`).set('kbn-xsrf', 'xxxx');
};
const installPackage = async (name: string, version: string) => {
return await supertest
.post(`/api/fleet/epm/packages/${name}/${version}`)
@ -35,247 +30,315 @@ export default function (providerContext: FtrProviderContext) {
};
describe('datastreams', async () => {
skipIfNoDockerRegistry(providerContext);
setupFleetAndAgents(providerContext);
describe('standard integration', () => {
const pkgName = 'datastreams';
const pkgVersion = '0.1.0';
const pkgUpdateVersion = '0.2.0';
const logsTemplateName = `logs-${pkgName}.test_logs`;
const metricsTemplateName = `metrics-${pkgName}.test_metrics`;
const namespaces = ['default', 'foo', 'bar'];
beforeEach(async () => {
await installPackage(pkgName, pkgVersion);
await Promise.all(
namespaces.map(async (namespace) => {
const createLogsRequest = es.transport.request(
{
method: 'POST',
path: `/${logsTemplateName}-${namespace}/_doc`,
body: {
'@timestamp': '2015-01-01',
logs_test_name: 'test',
data_stream: {
dataset: `${pkgName}.test_logs`,
namespace,
type: 'logs',
skipIfNoDockerRegistry(providerContext);
setupFleetAndAgents(providerContext);
beforeEach(async () => {
await installPackage(pkgName, pkgVersion);
await Promise.all(
namespaces.map(async (namespace) => {
const createLogsRequest = es.transport.request(
{
method: 'POST',
path: `/${logsTemplateName}-${namespace}/_doc`,
body: {
'@timestamp': '2015-01-01',
logs_test_name: 'test',
data_stream: {
dataset: `${pkgName}.test_logs`,
namespace,
type: 'logs',
},
},
},
},
{ meta: true }
);
const createMetricsRequest = es.transport.request(
{
method: 'POST',
path: `/${metricsTemplateName}-${namespace}/_doc`,
body: {
'@timestamp': '2015-01-01',
logs_test_name: 'test',
data_stream: {
dataset: `${pkgName}.test_metrics`,
namespace,
type: 'metrics',
{ meta: true }
);
const createMetricsRequest = es.transport.request(
{
method: 'POST',
path: `/${metricsTemplateName}-${namespace}/_doc`,
body: {
'@timestamp': '2015-01-01',
logs_test_name: 'test',
data_stream: {
dataset: `${pkgName}.test_metrics`,
namespace,
type: 'metrics',
},
},
},
},
{ meta: true }
);
return Promise.all([createLogsRequest, createMetricsRequest]);
})
);
});
{ meta: true }
);
return Promise.all([createLogsRequest, createMetricsRequest]);
})
);
});
afterEach(async () => {
await Promise.all(
namespaces.map(async (namespace) => {
const deleteLogsRequest = es.transport.request(
afterEach(async () => {
await Promise.all(
namespaces.map(async (namespace) => {
const deleteLogsRequest = es.transport.request(
{
method: 'DELETE',
path: `/_data_stream/${logsTemplateName}-${namespace}`,
},
{ meta: true }
);
const deleteMetricsRequest = es.transport.request(
{
method: 'DELETE',
path: `/_data_stream/${metricsTemplateName}-${namespace}`,
},
{ meta: true }
);
return Promise.all([deleteLogsRequest, deleteMetricsRequest]);
})
);
await uninstallPackage(pkgName, pkgVersion);
await uninstallPackage(pkgName, pkgUpdateVersion);
});
it('should list the logs and metrics datastream', async function () {
await asyncForEach(namespaces, async (namespace) => {
const resLogsDatastream = await es.transport.request<any>(
{
method: 'DELETE',
method: 'GET',
path: `/_data_stream/${logsTemplateName}-${namespace}`,
},
{ meta: true }
);
const deleteMetricsRequest = es.transport.request(
const resMetricsDatastream = await es.transport.request<any>(
{
method: 'DELETE',
method: 'GET',
path: `/_data_stream/${metricsTemplateName}-${namespace}`,
},
{ meta: true }
);
return Promise.all([deleteLogsRequest, deleteMetricsRequest]);
})
);
await uninstallPackage(pkgName, pkgVersion);
await uninstallPackage(pkgName, pkgUpdateVersion);
});
expect(resLogsDatastream.body.data_streams.length).equal(1);
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(1);
expect(resMetricsDatastream.body.data_streams.length).equal(1);
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
});
});
it('should list the logs and metrics datastream', async function () {
await asyncForEach(namespaces, async (namespace) => {
const resLogsDatastream = await es.transport.request<any>(
{
method: 'GET',
path: `/_data_stream/${logsTemplateName}-${namespace}`,
},
{ meta: true }
);
const resMetricsDatastream = await es.transport.request<any>(
{
method: 'GET',
path: `/_data_stream/${metricsTemplateName}-${namespace}`,
},
{ meta: true }
);
expect(resLogsDatastream.body.data_streams.length).equal(1);
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(1);
expect(resMetricsDatastream.body.data_streams.length).equal(1);
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
it('after update, it should have rolled over logs datastream because mappings are not compatible and not metrics', async function () {
await installPackage(pkgName, pkgUpdateVersion);
await asyncForEach(namespaces, async (namespace) => {
const resLogsDatastream = await es.transport.request<any>(
{
method: 'GET',
path: `/_data_stream/${logsTemplateName}-${namespace}`,
},
{ meta: true }
);
const resMetricsDatastream = await es.transport.request<any>(
{
method: 'GET',
path: `/_data_stream/${metricsTemplateName}-${namespace}`,
},
{ meta: true }
);
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
});
});
it('should be able to upgrade a package after a rollover', async function () {
await asyncForEach(namespaces, async (namespace) => {
await es.transport.request<any>(
{
method: 'POST',
path: `/${logsTemplateName}-${namespace}/_rollover`,
},
{ meta: true }
);
const resLogsDatastream = await es.transport.request<any>(
{
method: 'GET',
path: `/_data_stream/${logsTemplateName}-${namespace}`,
},
{ meta: true }
);
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
});
await installPackage(pkgName, pkgUpdateVersion);
});
describe('When enabling experimental data stream features', () => {
let agentPolicyId: string;
let packagePolicyId: string;
let packagePolicyData: any;
beforeEach(async () => {
const { body: agentPolicyResponse } = await supertest
.post(`/api/fleet/agent_policies`)
.set('kbn-xsrf', 'xxxx')
.send({
name: `Test policy ${uuidv4()}`,
namespace: 'default',
})
.expect(200);
agentPolicyId = agentPolicyResponse.item.id;
packagePolicyData = {
force: true,
name: `test-package-experimental-feature-${uuidv4()}`,
description: '',
namespace: 'default',
policy_id: agentPolicyId,
enabled: true,
inputs: [],
package: {
name: pkgName,
version: pkgVersion,
},
};
const { body: responseWithForce } = await supertest
.post(`/api/fleet/package_policies`)
.set('kbn-xsrf', 'xxxx')
.send(packagePolicyData)
.expect(200);
packagePolicyId = responseWithForce.item.id;
});
afterEach(async () => {
await supertest
.post(`/api/fleet/agent_policies/delete`)
.send({
agentPolicyId,
})
.set('kbn-xsrf', 'xxxx')
.expect(200);
});
async function getLogsDefaultBackingIndicesLength() {
const resLogsDatastream = await es.transport.request<any>(
{
method: 'GET',
path: `/_data_stream/${logsTemplateName}-${namespaces[0]}`,
},
{ meta: true }
);
return resLogsDatastream.body.data_streams[0].indices.length;
}
it('should rollover datstream after enabling a experimental datastream feature that need a rollover', async () => {
expect(await getLogsDefaultBackingIndicesLength()).to.be(1);
await supertest
.put(`/api/fleet/package_policies/${packagePolicyId}`)
.set('kbn-xsrf', 'xxxx')
.send({
...packagePolicyData,
package: {
...packagePolicyData.package,
experimental_data_stream_features: [
{
data_stream: logsTemplateName,
features: {
synthetic_source: false,
tsdb: false,
doc_value_only_numeric: true,
doc_value_only_other: true,
},
},
],
},
})
.expect(200);
// Datastream should have been rolled over
expect(await getLogsDefaultBackingIndicesLength()).to.be(2);
});
it('should allow updating a package policy with only a partial set of experimental datastream features', async () => {
await supertest
.put(`/api/fleet/package_policies/${packagePolicyId}`)
.set('kbn-xsrf', 'xxxx')
.send({
...packagePolicyData,
package: {
...packagePolicyData.package,
experimental_data_stream_features: [
{
data_stream: logsTemplateName,
features: {
synthetic_source: true,
},
},
],
},
})
.expect(200);
});
});
});
it('after update, it should have rolled over logs datastream because mappings are not compatible and not metrics', async function () {
await installPackage(pkgName, pkgUpdateVersion);
await asyncForEach(namespaces, async (namespace) => {
const resLogsDatastream = await es.transport.request<any>(
{
method: 'GET',
path: `/_data_stream/${logsTemplateName}-${namespace}`,
},
{ meta: true }
);
const resMetricsDatastream = await es.transport.request<any>(
{
method: 'GET',
path: `/_data_stream/${metricsTemplateName}-${namespace}`,
},
{ meta: true }
);
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
});
});
describe('tsdb integration', () => {
const pkgName = 'no_tsdb_to_tsdb';
const pkgVersion = '0.1.0';
const pkgUpdateVersion = '0.2.0';
const logsTemplateName = `logs-${pkgName}.test`;
const namespace = 'default';
it('should be able to upgrade a package after a rollover', async function () {
await asyncForEach(namespaces, async (namespace) => {
await es.transport.request<any>(
{
method: 'POST',
path: `/${logsTemplateName}-${namespace}/_rollover`,
},
{ meta: true }
);
const resLogsDatastream = await es.transport.request<any>(
{
method: 'GET',
path: `/_data_stream/${logsTemplateName}-${namespace}`,
},
{ meta: true }
);
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
});
await installPackage(pkgName, pkgUpdateVersion);
});
describe('When enabling experimental data stream features', () => {
let agentPolicyId: string;
let packagePolicyId: string;
let packagePolicyData: any;
skipIfNoDockerRegistry(providerContext);
setupFleetAndAgents(providerContext);
beforeEach(async () => {
const { body: agentPolicyResponse } = await supertest
.post(`/api/fleet/agent_policies`)
.set('kbn-xsrf', 'xxxx')
.send({
name: `Test policy ${uuidv4()}`,
namespace: 'default',
})
.expect(200);
agentPolicyId = agentPolicyResponse.item.id;
packagePolicyData = {
force: true,
name: `test-package-experimental-feature-${uuidv4()}`,
description: '',
namespace: 'default',
policy_id: agentPolicyId,
enabled: true,
inputs: [],
package: {
name: pkgName,
version: pkgVersion,
},
};
const { body: responseWithForce } = await supertest
.post(`/api/fleet/package_policies`)
.set('kbn-xsrf', 'xxxx')
.send(packagePolicyData)
.expect(200);
await installPackage(pkgName, pkgVersion);
packagePolicyId = responseWithForce.item.id;
});
afterEach(async () => {
await supertest
.post(`/api/fleet/agent_policies/delete`)
.send({
agentPolicyId,
})
.set('kbn-xsrf', 'xxxx')
.expect(200);
});
async function getLogsDefaultBackingIndicesLength() {
const resLogsDatastream = await es.transport.request<any>(
// Create a sample document so the data stream is created
await es.transport.request(
{
method: 'GET',
path: `/_data_stream/${logsTemplateName}-${namespaces[0]}`,
method: 'POST',
path: `/${logsTemplateName}-${namespace}/_doc`,
body: {
'@timestamp': '2015-01-01',
logs_test_name: 'test',
data_stream: {
dataset: `${pkgName}.test_logs`,
namespace,
type: 'logs',
},
},
},
{ meta: true }
);
});
afterEach(async () => {
await es.transport.request(
{
method: 'DELETE',
path: `/_data_stream/${logsTemplateName}-${namespace}`,
},
{ meta: true }
);
return resLogsDatastream.body.data_streams[0].indices.length;
}
it('should rollover datstream after enabling a experimental datastream feature that need a rollover', async () => {
expect(await getLogsDefaultBackingIndicesLength()).to.be(1);
await supertest
.put(`/api/fleet/package_policies/${packagePolicyId}`)
.set('kbn-xsrf', 'xxxx')
.send({
...packagePolicyData,
package: {
...packagePolicyData.package,
experimental_data_stream_features: [
{
data_stream: logsTemplateName,
features: {
synthetic_source: false,
tsdb: false,
doc_value_only_numeric: true,
doc_value_only_other: true,
},
},
],
},
})
.expect(200);
// Datastream should have been rolled over
expect(await getLogsDefaultBackingIndicesLength()).to.be(2);
await uninstallPackage(pkgName, pkgVersion);
});
it('should allow updating a package policy with only a partial set of experimental datastream features', async () => {
await supertest
.put(`/api/fleet/package_policies/${packagePolicyId}`)
.set('kbn-xsrf', 'xxxx')
.send({
...packagePolicyData,
package: {
...packagePolicyData.package,
experimental_data_stream_features: [
{
data_stream: logsTemplateName,
features: {
synthetic_source: true,
},
},
],
},
})
.expect(200);
it('rolls over data stream when index_mode: time_series is set in the updated package version', async () => {
await installPackage(pkgName, pkgUpdateVersion);
const resLogsDatastream = await es.transport.request<any>(
{
method: 'GET',
path: `/_data_stream/${logsTemplateName}-${namespace}`,
},
{ meta: true }
);
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
});
});
});

View file

@ -0,0 +1,22 @@
- name: data_stream.type
type: constant_keyword
description: >
Data stream type.
- name: data_stream.dataset
type: constant_keyword
description: >
Data stream dataset.
- name: data_stream.namespace
type: constant_keyword
description: >
Data stream namespace.
- name: '@timestamp'
type: date
description: >
Event timestamp.
- name: 'some_field'
type: keyword
dimension: true
- name: 'some_metric_field'
type: integer
metric_type: gauge

View file

@ -0,0 +1,9 @@
title: Test Dataset
type: logs
elasticsearch:
index_template.mappings:
dynamic: false
index_template.settings:
index.lifecycle.name: reference

View file

@ -0,0 +1,3 @@
# Test package
Test package where tsdb is enabled in a newer version

View file

@ -0,0 +1,17 @@
format_version: 1.0.0
name: no_tsdb_to_tsdb
title: Test package where tsdb is enabled in a newer version
description: Test package where tsdb is enabled in a newer version
version: 0.1.0
categories: []
release: beta
type: integration
license: basic
owner:
github: elastic/fleet
requirement:
elasticsearch:
versions: '>7.7.0'
kibana:
versions: '>7.7.0'

View file

@ -0,0 +1,22 @@
- name: data_stream.type
type: constant_keyword
description: >
Data stream type.
- name: data_stream.dataset
type: constant_keyword
description: >
Data stream dataset.
- name: data_stream.namespace
type: constant_keyword
description: >
Data stream namespace.
- name: '@timestamp'
type: date
description: >
Event timestamp.
- name: 'some_field'
type: keyword
dimension: true
- name: 'some_metric_field'
type: integer
metric_type: gauge

View file

@ -0,0 +1,11 @@
title: Test Dataset
type: logs
elasticsearch:
index_mode: time_series
index_template.mappings:
dynamic: false
index_template.settings:
index.lifecycle.name: reference

View file

@ -0,0 +1,3 @@
# Test package
Test package where tsdb is enabled in a newer version

View file

@ -0,0 +1,17 @@
format_version: 1.0.0
name: no_tsdb_to_tsdb
title: Test package where tsdb is enabled in a newer version
description: Test package where tsdb is enabled in a newer version
version: 0.2.0
categories: []
release: beta
type: integration
license: basic
owner:
github: elastic/fleet
requirement:
elasticsearch:
versions: '>7.7.0'
kibana:
versions: '>7.7.0'