mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
* Update data streams mappings directly instead of querying for backing indices, update integration tests to test with multiple namespaces * Add flag to only update mappings of the current write index Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
f687709ac4
commit
00c8a22187
2 changed files with 112 additions and 135 deletions
|
@ -11,7 +11,6 @@ import {
|
|||
TemplateRef,
|
||||
IndexTemplate,
|
||||
IndexTemplateMappings,
|
||||
DataType,
|
||||
} from '../../../../types';
|
||||
import { getRegistryDataStreamAssetBaseName } from '../index';
|
||||
|
||||
|
@ -26,8 +25,8 @@ interface MultiFields {
|
|||
export interface IndexTemplateMapping {
|
||||
[key: string]: any;
|
||||
}
|
||||
export interface CurrentIndex {
|
||||
indexName: string;
|
||||
export interface CurrentDataStream {
|
||||
dataStreamName: string;
|
||||
indexTemplate: IndexTemplate;
|
||||
}
|
||||
const DEFAULT_SCALING_FACTOR = 1000;
|
||||
|
@ -348,33 +347,31 @@ export const updateCurrentWriteIndices = async (
|
|||
): Promise<void> => {
|
||||
if (!templates.length) return;
|
||||
|
||||
const allIndices = await queryIndicesFromTemplates(callCluster, templates);
|
||||
const allIndices = await queryDataStreamsFromTemplates(callCluster, templates);
|
||||
if (!allIndices.length) return;
|
||||
return updateAllIndices(allIndices, callCluster);
|
||||
return updateAllDataStreams(allIndices, callCluster);
|
||||
};
|
||||
|
||||
function isCurrentIndex(item: CurrentIndex[] | undefined): item is CurrentIndex[] {
|
||||
function isCurrentDataStream(item: CurrentDataStream[] | undefined): item is CurrentDataStream[] {
|
||||
return item !== undefined;
|
||||
}
|
||||
|
||||
const queryIndicesFromTemplates = async (
|
||||
const queryDataStreamsFromTemplates = async (
|
||||
callCluster: CallESAsCurrentUser,
|
||||
templates: TemplateRef[]
|
||||
): Promise<CurrentIndex[]> => {
|
||||
const indexPromises = templates.map((template) => {
|
||||
return getIndices(callCluster, template);
|
||||
): Promise<CurrentDataStream[]> => {
|
||||
const dataStreamPromises = templates.map((template) => {
|
||||
return getDataStreams(callCluster, template);
|
||||
});
|
||||
const indexObjects = await Promise.all(indexPromises);
|
||||
return indexObjects.filter(isCurrentIndex).flat();
|
||||
const dataStreamObjects = await Promise.all(dataStreamPromises);
|
||||
return dataStreamObjects.filter(isCurrentDataStream).flat();
|
||||
};
|
||||
|
||||
const getIndices = async (
|
||||
const getDataStreams = async (
|
||||
callCluster: CallESAsCurrentUser,
|
||||
template: TemplateRef
|
||||
): Promise<CurrentIndex[] | undefined> => {
|
||||
): Promise<CurrentDataStream[] | undefined> => {
|
||||
const { templateName, indexTemplate } = template;
|
||||
// Until ES provides a way to update mappings of a data stream
|
||||
// get the last index of the data stream, which is the current write index
|
||||
const res = await callCluster('transport.request', {
|
||||
method: 'GET',
|
||||
path: `/_data_stream/${templateName}-*`,
|
||||
|
@ -382,26 +379,28 @@ const getIndices = async (
|
|||
const dataStreams = res.data_streams;
|
||||
if (!dataStreams.length) return;
|
||||
return dataStreams.map((dataStream: any) => ({
|
||||
indexName: dataStream.indices[dataStream.indices.length - 1].index_name,
|
||||
dataStreamName: dataStream.name,
|
||||
indexTemplate,
|
||||
}));
|
||||
};
|
||||
|
||||
const updateAllIndices = async (
|
||||
indexNameWithTemplates: CurrentIndex[],
|
||||
const updateAllDataStreams = async (
|
||||
indexNameWithTemplates: CurrentDataStream[],
|
||||
callCluster: CallESAsCurrentUser
|
||||
): Promise<void> => {
|
||||
const updateIndexPromises = indexNameWithTemplates.map(({ indexName, indexTemplate }) => {
|
||||
return updateExistingIndex({ indexName, callCluster, indexTemplate });
|
||||
});
|
||||
await Promise.all(updateIndexPromises);
|
||||
const updatedataStreamPromises = indexNameWithTemplates.map(
|
||||
({ dataStreamName, indexTemplate }) => {
|
||||
return updateExistingDataStream({ dataStreamName, callCluster, indexTemplate });
|
||||
}
|
||||
);
|
||||
await Promise.all(updatedataStreamPromises);
|
||||
};
|
||||
const updateExistingIndex = async ({
|
||||
indexName,
|
||||
const updateExistingDataStream = async ({
|
||||
dataStreamName,
|
||||
callCluster,
|
||||
indexTemplate,
|
||||
}: {
|
||||
indexName: string;
|
||||
dataStreamName: string;
|
||||
callCluster: CallESAsCurrentUser;
|
||||
indexTemplate: IndexTemplate;
|
||||
}) => {
|
||||
|
@ -416,53 +415,13 @@ const updateExistingIndex = async ({
|
|||
// try to update the mappings first
|
||||
try {
|
||||
await callCluster('indices.putMapping', {
|
||||
index: indexName,
|
||||
index: dataStreamName,
|
||||
body: mappings,
|
||||
write_index_only: true,
|
||||
});
|
||||
// if update fails, rollover data stream
|
||||
} catch (err) {
|
||||
try {
|
||||
// get the data_stream values to compose datastream name
|
||||
const searchDataStreamFieldsResponse = await callCluster('search', {
|
||||
index: indexTemplate.index_patterns[0],
|
||||
body: {
|
||||
size: 1,
|
||||
_source: ['data_stream.namespace', 'data_stream.type', 'data_stream.dataset'],
|
||||
query: {
|
||||
bool: {
|
||||
filter: [
|
||||
{
|
||||
exists: {
|
||||
field: 'data_stream.type',
|
||||
},
|
||||
},
|
||||
{
|
||||
exists: {
|
||||
field: 'data_stream.dataset',
|
||||
},
|
||||
},
|
||||
{
|
||||
exists: {
|
||||
field: 'data_stream.namespace',
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
if (searchDataStreamFieldsResponse.hits.total.value === 0)
|
||||
throw new Error('data_stream fields are missing from datastream indices');
|
||||
const {
|
||||
dataset,
|
||||
namespace,
|
||||
type,
|
||||
}: {
|
||||
dataset: string;
|
||||
namespace: string;
|
||||
type: DataType;
|
||||
} = searchDataStreamFieldsResponse.hits.hits[0]._source.data_stream;
|
||||
const dataStreamName = `${type}-${dataset}-${namespace}`;
|
||||
const path = `/${dataStreamName}/_rollover`;
|
||||
await callCluster('transport.request', {
|
||||
method: 'POST',
|
||||
|
@ -478,10 +437,10 @@ const updateExistingIndex = async ({
|
|||
if (!settings.index.default_pipeline) return;
|
||||
try {
|
||||
await callCluster('indices.putSettings', {
|
||||
index: indexName,
|
||||
index: dataStreamName,
|
||||
body: { index: { default_pipeline: settings.index.default_pipeline } },
|
||||
});
|
||||
} catch (err) {
|
||||
throw new Error(`could not update index template settings for ${indexName}`);
|
||||
throw new Error(`could not update index template settings for ${dataStreamName}`);
|
||||
}
|
||||
};
|
||||
|
|
|
@ -12,8 +12,6 @@ export default function (providerContext: FtrProviderContext) {
|
|||
const { getService } = providerContext;
|
||||
const supertest = getService('supertest');
|
||||
const es = getService('es');
|
||||
const dockerServers = getService('dockerServers');
|
||||
const server = dockerServers.get('registry');
|
||||
const pkgName = 'datastreams';
|
||||
const pkgVersion = '0.1.0';
|
||||
const pkgUpdateVersion = '0.2.0';
|
||||
|
@ -21,6 +19,7 @@ export default function (providerContext: FtrProviderContext) {
|
|||
const pkgUpdateKey = `${pkgName}-${pkgUpdateVersion}`;
|
||||
const logsTemplateName = `logs-${pkgName}.test_logs`;
|
||||
const metricsTemplateName = `metrics-${pkgName}.test_metrics`;
|
||||
const namespaces = ['default', 'foo', 'bar'];
|
||||
|
||||
const uninstallPackage = async (pkg: string) => {
|
||||
await supertest.delete(`/api/fleet/epm/packages/${pkg}`).set('kbn-xsrf', 'xxxx');
|
||||
|
@ -35,86 +34,105 @@ export default function (providerContext: FtrProviderContext) {
|
|||
|
||||
describe('datastreams', async () => {
|
||||
skipIfNoDockerRegistry(providerContext);
|
||||
|
||||
beforeEach(async () => {
|
||||
await installPackage(pkgKey);
|
||||
await es.transport.request({
|
||||
method: 'POST',
|
||||
path: `/${logsTemplateName}-default/_doc`,
|
||||
body: {
|
||||
'@timestamp': '2015-01-01',
|
||||
logs_test_name: 'test',
|
||||
data_stream: {
|
||||
dataset: `${pkgName}.test_logs`,
|
||||
namespace: 'default',
|
||||
type: 'logs',
|
||||
},
|
||||
},
|
||||
});
|
||||
await es.transport.request({
|
||||
method: 'POST',
|
||||
path: `/${metricsTemplateName}-default/_doc`,
|
||||
body: {
|
||||
'@timestamp': '2015-01-01',
|
||||
logs_test_name: 'test',
|
||||
data_stream: {
|
||||
dataset: `${pkgName}.test_metrics`,
|
||||
namespace: 'default',
|
||||
type: 'metrics',
|
||||
},
|
||||
},
|
||||
});
|
||||
await Promise.all(
|
||||
namespaces.map(async (namespace) => {
|
||||
const createLogsRequest = es.transport.request({
|
||||
method: 'POST',
|
||||
path: `/${logsTemplateName}-${namespace}/_doc`,
|
||||
body: {
|
||||
'@timestamp': '2015-01-01',
|
||||
logs_test_name: 'test',
|
||||
data_stream: {
|
||||
dataset: `${pkgName}.test_logs`,
|
||||
namespace,
|
||||
type: 'logs',
|
||||
},
|
||||
},
|
||||
});
|
||||
const createMetricsRequest = es.transport.request({
|
||||
method: 'POST',
|
||||
path: `/${metricsTemplateName}-${namespace}/_doc`,
|
||||
body: {
|
||||
'@timestamp': '2015-01-01',
|
||||
logs_test_name: 'test',
|
||||
data_stream: {
|
||||
dataset: `${pkgName}.test_metrics`,
|
||||
namespace,
|
||||
type: 'metrics',
|
||||
},
|
||||
},
|
||||
});
|
||||
return Promise.all([createLogsRequest, createMetricsRequest]);
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
if (!server.enabled) return;
|
||||
await es.transport.request({
|
||||
method: 'DELETE',
|
||||
path: `/_data_stream/${logsTemplateName}-default`,
|
||||
});
|
||||
await es.transport.request({
|
||||
method: 'DELETE',
|
||||
path: `/_data_stream/${metricsTemplateName}-default`,
|
||||
});
|
||||
await Promise.all(
|
||||
namespaces.map(async (namespace) => {
|
||||
const deleteLogsRequest = es.transport.request({
|
||||
method: 'DELETE',
|
||||
path: `/_data_stream/${logsTemplateName}-${namespace}`,
|
||||
});
|
||||
const deleteMetricsRequest = es.transport.request({
|
||||
method: 'DELETE',
|
||||
path: `/_data_stream/${metricsTemplateName}-${namespace}`,
|
||||
});
|
||||
return Promise.all([deleteLogsRequest, deleteMetricsRequest]);
|
||||
})
|
||||
);
|
||||
await uninstallPackage(pkgKey);
|
||||
await uninstallPackage(pkgUpdateKey);
|
||||
});
|
||||
|
||||
it('should list the logs and metrics datastream', async function () {
|
||||
const resLogsDatastream = await es.transport.request({
|
||||
method: 'GET',
|
||||
path: `/_data_stream/${logsTemplateName}-default`,
|
||||
namespaces.forEach(async (namespace) => {
|
||||
const resLogsDatastream = await es.transport.request({
|
||||
method: 'GET',
|
||||
path: `/_data_stream/${logsTemplateName}-${namespace}`,
|
||||
});
|
||||
const resMetricsDatastream = await es.transport.request({
|
||||
method: 'GET',
|
||||
path: `/_data_stream/${metricsTemplateName}-${namespace}`,
|
||||
});
|
||||
expect(resLogsDatastream.body.data_streams.length).equal(1);
|
||||
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(1);
|
||||
expect(resMetricsDatastream.body.data_streams.length).equal(1);
|
||||
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
|
||||
});
|
||||
const resMetricsDatastream = await es.transport.request({
|
||||
method: 'GET',
|
||||
path: `/_data_stream/${metricsTemplateName}-default`,
|
||||
});
|
||||
expect(resLogsDatastream.body.data_streams.length).equal(1);
|
||||
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(1);
|
||||
expect(resMetricsDatastream.body.data_streams.length).equal(1);
|
||||
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
|
||||
});
|
||||
|
||||
it('after update, it should have rolled over logs datastream because mappings are not compatible and not metrics', async function () {
|
||||
await installPackage(pkgUpdateKey);
|
||||
const resLogsDatastream = await es.transport.request({
|
||||
method: 'GET',
|
||||
path: `/_data_stream/${logsTemplateName}-default`,
|
||||
namespaces.forEach(async (namespace) => {
|
||||
const resLogsDatastream = await es.transport.request({
|
||||
method: 'GET',
|
||||
path: `/_data_stream/${logsTemplateName}-${namespace}`,
|
||||
});
|
||||
const resMetricsDatastream = await es.transport.request({
|
||||
method: 'GET',
|
||||
path: `/_data_stream/${metricsTemplateName}-${namespace}`,
|
||||
});
|
||||
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
|
||||
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
|
||||
});
|
||||
const resMetricsDatastream = await es.transport.request({
|
||||
method: 'GET',
|
||||
path: `/_data_stream/${metricsTemplateName}-default`,
|
||||
});
|
||||
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
|
||||
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
|
||||
});
|
||||
|
||||
it('should be able to upgrade a package after a rollover', async function () {
|
||||
await es.transport.request({
|
||||
method: 'POST',
|
||||
path: `/${logsTemplateName}-default/_rollover`,
|
||||
namespaces.forEach(async (namespace) => {
|
||||
await es.transport.request({
|
||||
method: 'POST',
|
||||
path: `/${logsTemplateName}-${namespace}/_rollover`,
|
||||
});
|
||||
const resLogsDatastream = await es.transport.request({
|
||||
method: 'GET',
|
||||
path: `/_data_stream/${logsTemplateName}-${namespace}`,
|
||||
});
|
||||
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
|
||||
});
|
||||
const resLogsDatastream = await es.transport.request({
|
||||
method: 'GET',
|
||||
path: `/_data_stream/${logsTemplateName}-default`,
|
||||
});
|
||||
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
|
||||
await installPackage(pkgUpdateKey);
|
||||
});
|
||||
});
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue