esArchiver datastream support (#132853) (#135139)

* aliases fallback

* nasty datastream support implementation

* datastreams stats method

* update filter stream

* datastream support for unload action

* create-index datastream support

* index records data stream support

* doc records data streams support

* [CI] Auto-commit changed files from 'node scripts/eslint --no-cache --fix'

* lint

* pull composable templates

* set data_stream as a separate property on documents

* force create bulk operation when datastream record

* [CI] Auto-commit changed files from 'node scripts/eslint --no-cache --fix'

* lint

* getIndexTemplate tests

* [CI] Auto-commit changed files from 'node scripts/precommit_hook.js --ref HEAD~1..HEAD --fix'

* share cache across transform executions

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
(cherry picked from commit 4c4f0f5d79)

Co-authored-by: Kevin Lacabane <kevin.lacabane@elastic.co>
This commit is contained in:
Kibana Machine 2022-06-24 12:29:41 -04:00 committed by GitHub
parent 4a289b5c8d
commit e4e991bf8d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 477 additions and 46 deletions

View file

@ -52,7 +52,7 @@ export async function saveAction({
// export and save the matching indices to mappings.json
createPromiseFromStreams([
createListStream(indices),
createGenerateIndexRecordsStream({ client, stats, keepIndexNames }),
createGenerateIndexRecordsStream({ client, stats, keepIndexNames, log }),
...createFormatArchiveStreams(),
createWriteStream(resolve(outputDir, 'mappings.json')),
] as [Readable, ...Writable[]]),

View file

@ -45,7 +45,7 @@ export async function unloadAction({
await createPromiseFromStreams([
createReadStream(resolve(inputDir, filename)) as Readable,
...createParseArchiveStreams({ gzip: isGzip(filename) }),
createFilterRecordsStream('index'),
createFilterRecordsStream((record) => ['index', 'data_stream'].includes(record.type)),
createDeleteIndexStream(client, stats, log),
] as [Readable, ...Writable[]]);
}

View file

@ -36,16 +36,29 @@ interface SearchResponses {
}>;
}
function createMockClient(responses: SearchResponses) {
function createMockClient(responses: SearchResponses, hasDataStreams = false) {
// TODO: replace with proper mocked client
const client: any = {
helpers: {
scrollSearch: jest.fn(function* ({ index }) {
if (hasDataStreams) {
index = `.ds-${index}`;
}
while (responses[index] && responses[index].length) {
yield responses[index].shift()!;
}
}),
},
indices: {
get: jest.fn(async ({ index }) => {
return { [index]: { data_stream: hasDataStreams && index.substring(4) } };
}),
getDataStream: jest.fn(async ({ name }) => {
if (!hasDataStreams) return { data_streams: [] };
return { data_streams: [{ name }] };
}),
},
};
return client;
}
@ -217,6 +230,35 @@ describe('esArchiver: createGenerateDocRecordsStream()', () => {
`);
});
it('supports data streams', async () => {
const hits = [
{ _index: '.ds-foo-datastream', _id: '0', _source: {} },
{ _index: '.ds-foo-datastream', _id: '1', _source: {} },
];
const responses = {
'.ds-foo-datastream': [{ body: { hits: { hits, total: hits.length } } }],
};
const client = createMockClient(responses, true);
const stats = createStats('test', log);
const progress = new Progress();
const results = await createPromiseFromStreams([
createListStream(['foo-datastream']),
createGenerateDocRecordsStream({
client,
stats,
progress,
}),
createMapStream((record: any) => {
return `${record.value.data_stream}:${record.value.id}`;
}),
createConcatStream([]),
]);
expect(results).toEqual(['foo-datastream:0', 'foo-datastream:1']);
});
describe('keepIndexNames', () => {
it('changes .kibana* index names if keepIndexNames is not enabled', async () => {
const hits = [{ _index: '.kibana_7.16.0_001', _id: '0', _source: {} }];

View file

@ -47,6 +47,10 @@ export function createGenerateDocRecordsStream({
}
);
const hasDatastreams =
(await client.indices.getDataStream({ name: index })).data_streams.length > 0;
const indexToDatastream = new Map();
let remainingHits: number | null = null;
for await (const resp of interator) {
@ -57,7 +61,17 @@ export function createGenerateDocRecordsStream({
for (const hit of resp.body.hits.hits) {
remainingHits -= 1;
stats.archivedDoc(hit._index);
if (hasDatastreams && !indexToDatastream.has(hit._index)) {
const {
[hit._index]: { data_stream: dataStream },
} = await client.indices.get({ index: hit._index, filter_path: ['*.data_stream'] });
indexToDatastream.set(hit._index, dataStream);
}
const dataStream = indexToDatastream.get(hit._index);
stats.archivedDoc(dataStream || hit._index);
this.push({
type: 'doc',
value: {
@ -65,6 +79,7 @@ export function createGenerateDocRecordsStream({
// when it is loaded it can skip migration, if possible
index:
hit._index.startsWith('.kibana') && !keepIndexNames ? '.kibana_1' : hit._index,
data_stream: dataStream,
id: hit._id,
source: hit._source,
},

View file

@ -243,6 +243,55 @@ describe('bulk helper onDocument param', () => {
createIndexDocRecordsStream(client as any, stats, progress, true),
]);
});
it('returns create ops for data stream documents', async () => {
const records = [
{
type: 'doc',
value: {
index: '.ds-foo-ds',
data_stream: 'foo-ds',
id: '0',
source: {
hello: 'world',
},
},
},
{
type: 'doc',
value: {
index: '.ds-foo-ds',
data_stream: 'foo-ds',
id: '1',
source: {
hello: 'world',
},
},
},
];
expect.assertions(records.length);
const client = new MockClient();
client.helpers.bulk.mockImplementation(async ({ datasource, onDocument }) => {
for (const d of datasource) {
const op = onDocument(d);
expect(op).toEqual({
create: {
_index: 'foo-ds',
_id: expect.stringMatching(/^\d$/),
},
});
}
});
const stats = createStats('test', log);
const progress = new Progress();
await createPromiseFromStreams([
createListStream(records),
createIndexDocRecordsStream(client as any, stats, progress),
]);
});
});
describe('bulk helper onDrop param', () => {

View file

@ -13,6 +13,11 @@ import { Stats } from '../stats';
import { Progress } from '../progress';
import { ES_CLIENT_HEADERS } from '../../client_headers';
enum BulkOperation {
Create = 'create',
Index = 'index',
}
export function createIndexDocRecordsStream(
client: Client,
stats: Stats,
@ -20,7 +25,7 @@ export function createIndexDocRecordsStream(
useCreate: boolean = false
) {
async function indexDocs(docs: any[]) {
const operation = useCreate === true ? 'create' : 'index';
const operation = useCreate === true ? BulkOperation.Create : BulkOperation.Index;
const ops = new WeakMap<any, any>();
const errors: string[] = [];
@ -29,9 +34,11 @@ export function createIndexDocRecordsStream(
retries: 5,
datasource: docs.map((doc) => {
const body = doc.source;
const op = doc.data_stream ? BulkOperation.Create : operation;
const index = doc.data_stream || doc.index;
ops.set(body, {
[operation]: {
_index: doc.index,
[op]: {
_index: index,
_id: doc.id,
},
});
@ -56,7 +63,7 @@ export function createIndexDocRecordsStream(
}
for (const doc of docs) {
stats.indexedDoc(doc.index);
stats.indexedDoc(doc.data_stream || doc.index);
}
}

View file

@ -33,3 +33,5 @@ export {
export { readDirectory } from './directory';
export { Progress } from './progress';
export { getIndexTemplate } from './index_template';

View file

@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { Client } from '@elastic/elasticsearch';
import sinon from 'sinon';
import { getIndexTemplate } from './index_template';
describe('esArchiver: index template', () => {
describe('getIndexTemplate', () => {
it('returns the index template', async () => {
const client = {
indices: {
getIndexTemplate: sinon.stub().resolves({
index_templates: [
{
index_template: {
index_patterns: ['pattern-*'],
template: {
mappings: { properties: { foo: { type: 'keyword' } } },
},
priority: 500,
composed_of: [],
data_stream: { hidden: false },
},
},
],
}),
},
} as unknown as Client;
const template = await getIndexTemplate(client, 'template-foo');
expect(template).toEqual({
name: 'template-foo',
index_patterns: ['pattern-*'],
template: {
mappings: { properties: { foo: { type: 'keyword' } } },
},
priority: 500,
data_stream: { hidden: false },
});
});
it('resolves component templates', async () => {
const client = {
indices: {
getIndexTemplate: sinon.stub().resolves({
index_templates: [
{
index_template: {
index_patterns: ['pattern-*'],
composed_of: ['the-settings', 'the-mappings'],
},
},
],
}),
},
cluster: {
getComponentTemplate: sinon
.stub()
.onFirstCall()
.resolves({
component_templates: [
{
component_template: {
template: {
aliases: { 'foo-alias': {} },
},
},
},
],
})
.onSecondCall()
.resolves({
component_templates: [
{
component_template: {
template: {
mappings: { properties: { foo: { type: 'keyword' } } },
},
},
},
],
}),
},
} as unknown as Client;
const template = await getIndexTemplate(client, 'template-foo');
expect(template).toEqual({
name: 'template-foo',
index_patterns: ['pattern-*'],
template: {
aliases: { 'foo-alias': {} },
mappings: { properties: { foo: { type: 'keyword' } } },
},
});
});
});
});

View file

@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { merge } from 'lodash';
import type { Client } from '@elastic/elasticsearch';
import { ES_CLIENT_HEADERS } from '../client_headers';
export const getIndexTemplate = async (client: Client, templateName: string) => {
const { index_templates: indexTemplates } = await client.indices.getIndexTemplate(
{ name: templateName },
{ headers: ES_CLIENT_HEADERS }
);
const {
index_template: { template, composed_of: composedOf = [], ...indexTemplate },
} = indexTemplates[0];
const components = await Promise.all(
composedOf.map(async (component) => {
const { component_templates: componentTemplates } = await client.cluster.getComponentTemplate(
{ name: component }
);
return componentTemplates[0].component_template.template;
})
);
return {
...indexTemplate,
name: templateName,
template: merge(template, ...components),
};
};

View file

@ -19,7 +19,9 @@ export const createStubStats = (): StubStats =>
({
createdIndex: sinon.stub(),
createdAliases: sinon.stub(),
createdDataStream: sinon.stub(),
deletedIndex: sinon.stub(),
deletedDataStream: sinon.stub(),
skippedIndex: sinon.stub(),
archivedIndex: sinon.stub(),
getTestSummary() {
@ -47,6 +49,11 @@ export const createStubIndexRecord = (index: string, aliases = {}) => ({
value: { index, aliases },
});
export const createStubDataStreamRecord = (dataStream: string, template: string) => ({
type: 'data_stream',
value: { data_stream: dataStream, template: { name: template } },
});
export const createStubDocRecord = (index: string, id: number) => ({
type: 'doc',
value: { index, id },
@ -140,5 +147,10 @@ export const createStubClient = (
exists: sinon.spy(async () => {
throw new Error('Do not use indices.exists(). React to errors instead.');
}),
createDataStream: sinon.spy(async ({ name }) => {}),
deleteDataStream: sinon.spy(async ({ name }) => {}),
putIndexTemplate: sinon.spy(async ({ name }) => {}),
deleteIndexTemplate: sinon.spy(async ({ name }) => {}),
},
} as any);

View file

@ -17,6 +17,7 @@ import { createCreateIndexStream } from './create_index_stream';
import {
createStubStats,
createStubIndexRecord,
createStubDataStreamRecord,
createStubDocRecord,
createStubClient,
createStubLogger,
@ -171,6 +172,19 @@ describe('esArchiver: createCreateIndexStream()', () => {
expect(output).toEqual(nonRecordValues);
});
it('creates data streams', async () => {
const client = createStubClient();
const stats = createStubStats();
await createPromiseFromStreams([
createListStream([createStubDataStreamRecord('foo-datastream', 'foo-template')]),
createCreateIndexStream({ client, stats, log }),
]);
sinon.assert.calledOnce(client.indices.putIndexTemplate as sinon.SinonSpy);
sinon.assert.calledOnce(client.indices.createDataStream as sinon.SinonSpy);
});
});
describe('deleteKibanaIndices', () => {

View file

@ -13,15 +13,18 @@ import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { Client } from '@elastic/elasticsearch';
import { ToolingLog } from '@kbn/tooling-log';
import { IndicesPutIndexTemplateRequest } from '@elastic/elasticsearch/lib/api/types';
import { Stats } from '../stats';
import { deleteKibanaIndices } from './kibana_index';
import { deleteIndex } from './delete_index';
import { deleteDataStream } from './delete_data_stream';
import { ES_CLIENT_HEADERS } from '../../client_headers';
interface DocRecord {
value: estypes.IndicesIndexState & {
index: string;
type: string;
template?: IndicesPutIndexTemplateRequest;
};
}
@ -54,6 +57,43 @@ export function createCreateIndexStream({
stream.push(record);
}
async function handleDataStream(record: DocRecord, attempts = 1) {
if (docsOnly) return;
const { data_stream: dataStream, template } = record.value as {
data_stream: string;
template: IndicesPutIndexTemplateRequest;
};
try {
await client.indices.putIndexTemplate(template, {
headers: ES_CLIENT_HEADERS,
});
await client.indices.createDataStream(
{ name: dataStream },
{
headers: ES_CLIENT_HEADERS,
}
);
stats.createdDataStream(dataStream, template.name, { template });
} catch (err) {
if (err?.meta?.body?.error?.type !== 'resource_already_exists_exception' || attempts >= 3) {
throw err;
}
if (skipExisting) {
skipDocsFromIndices.add(dataStream);
stats.skippedIndex(dataStream);
return;
}
await deleteDataStream(client, dataStream, template.name);
stats.deletedDataStream(dataStream, template.name);
await handleDataStream(record, attempts + 1);
}
}
async function handleIndex(record: DocRecord) {
const { index, settings, mappings, aliases } = record.value;
const isKibanaTaskManager = index.startsWith('.kibana_task_manager');
@ -134,6 +174,10 @@ export function createCreateIndexStream({
await handleIndex(record);
break;
case 'data_stream':
await handleDataStream(record);
break;
case 'doc':
await handleDoc(this, record);
break;

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { Client } from '@elastic/elasticsearch';
export async function deleteDataStream(client: Client, datastream: string, template: string) {
await client.indices.deleteDataStream({ name: datastream });
await client.indices.deleteIndexTemplate({ name: template });
}

View file

@ -16,6 +16,7 @@ import {
createStubStats,
createStubClient,
createStubIndexRecord,
createStubDataStreamRecord,
createStubLogger,
} from './__mocks__/stubs';
@ -51,4 +52,25 @@ describe('esArchiver: createDeleteIndexStream()', () => {
sinon.assert.calledOnce(client.indices.delete as sinon.SinonSpy);
sinon.assert.notCalled(client.indices.exists as sinon.SinonSpy);
});
it('deletes data streams', async () => {
const stats = createStubStats();
const client = createStubClient([]);
await createPromiseFromStreams([
createListStream([createStubDataStreamRecord('foo-datastream', 'foo-template')]),
createDeleteIndexStream(client, stats, log),
]);
sinon.assert.calledOnce(stats.deletedDataStream as sinon.SinonSpy);
sinon.assert.notCalled(client.indices.create as sinon.SinonSpy);
sinon.assert.calledOnce(client.indices.deleteDataStream as sinon.SinonSpy);
sinon.assert.calledWith(client.indices.deleteDataStream as sinon.SinonSpy, {
name: 'foo-datastream',
});
sinon.assert.calledOnce(client.indices.deleteIndexTemplate as sinon.SinonSpy);
sinon.assert.calledWith(client.indices.deleteIndexTemplate as sinon.SinonSpy, {
name: 'foo-template',
});
});
});

View file

@ -13,6 +13,7 @@ import { ToolingLog } from '@kbn/tooling-log';
import { Stats } from '../stats';
import { deleteIndex } from './delete_index';
import { cleanKibanaIndices } from './kibana_index';
import { deleteDataStream } from './delete_data_stream';
export function createDeleteIndexStream(client: Client, stats: Stats, log: ToolingLog) {
return new Transform({
@ -20,7 +21,11 @@ export function createDeleteIndexStream(client: Client, stats: Stats, log: Tooli
writableObjectMode: true,
async transform(record, enc, callback) {
try {
if (!record || record.type === 'index') {
if (!record) {
log.warning(`deleteIndexStream: empty index provided`);
return callback();
}
if (record.type === 'index') {
const { index } = record.value;
if (index.startsWith('.kibana')) {
@ -28,6 +33,14 @@ export function createDeleteIndexStream(client: Client, stats: Stats, log: Tooli
} else {
await deleteIndex({ client, stats, log, index });
}
} else if (record.type === 'data_stream') {
const {
data_stream: dataStream,
template: { name },
} = record.value;
await deleteDataStream(client, dataStream, name);
stats.deletedDataStream(dataStream, name);
} else {
this.push(record);
}

View file

@ -9,10 +9,12 @@
import sinon from 'sinon';
import { createListStream, createPromiseFromStreams, createConcatStream } from '@kbn/utils';
import { createStubClient, createStubStats } from './__mocks__/stubs';
import { createStubClient, createStubLogger, createStubStats } from './__mocks__/stubs';
import { createGenerateIndexRecordsStream } from './generate_index_records_stream';
const log = createStubLogger();
describe('esArchiver: createGenerateIndexRecordsStream()', () => {
it('consumes index names and queries for the mapping of each', async () => {
const indices = ['index1', 'index2', 'index3', 'index4'];
@ -21,7 +23,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
await createPromiseFromStreams([
createListStream(indices),
createGenerateIndexRecordsStream({ client, stats }),
createGenerateIndexRecordsStream({ client, stats, log }),
]);
expect(stats.getTestSummary()).toEqual({
@ -40,7 +42,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
await createPromiseFromStreams([
createListStream(['index1']),
createGenerateIndexRecordsStream({ client, stats }),
createGenerateIndexRecordsStream({ client, stats, log }),
]);
const params = (client.indices.get as sinon.SinonSpy).args[0][0];
@ -58,7 +60,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
const indexRecords = await createPromiseFromStreams<any[]>([
createListStream(['index1', 'index2', 'index3']),
createGenerateIndexRecordsStream({ client, stats }),
createGenerateIndexRecordsStream({ client, stats, log }),
createConcatStream([]),
]);
@ -83,7 +85,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
const indexRecords = await createPromiseFromStreams([
createListStream(['index1']),
createGenerateIndexRecordsStream({ client, stats }),
createGenerateIndexRecordsStream({ client, stats, log }),
createConcatStream([]),
]);
@ -107,7 +109,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
const indexRecords = await createPromiseFromStreams([
createListStream(['.kibana_7.16.0_001']),
createGenerateIndexRecordsStream({ client, stats }),
createGenerateIndexRecordsStream({ client, stats, log }),
createConcatStream([]),
]);
@ -122,7 +124,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
const indexRecords = await createPromiseFromStreams([
createListStream(['.foo']),
createGenerateIndexRecordsStream({ client, stats }),
createGenerateIndexRecordsStream({ client, stats, log }),
createConcatStream([]),
]);
@ -137,7 +139,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
const indexRecords = await createPromiseFromStreams([
createListStream(['.kibana_7.16.0_001']),
createGenerateIndexRecordsStream({ client, stats, keepIndexNames: true }),
createGenerateIndexRecordsStream({ client, stats, log, keepIndexNames: true }),
createConcatStream([]),
]);

View file

@ -8,18 +8,28 @@
import type { Client } from '@elastic/elasticsearch';
import { Transform } from 'stream';
import { ToolingLog } from '@kbn/tooling-log';
import { Stats } from '../stats';
import { ES_CLIENT_HEADERS } from '../../client_headers';
import { getIndexTemplate } from '..';
const headers = {
headers: ES_CLIENT_HEADERS,
};
export function createGenerateIndexRecordsStream({
client,
stats,
keepIndexNames,
log,
}: {
client: Client;
stats: Stats;
keepIndexNames?: boolean;
log: ToolingLog;
}) {
const seenDatastreams = new Set();
return new Transform({
writableObjectMode: true,
readableObjectMode: true,
@ -32,6 +42,7 @@ export function createGenerateIndexRecordsStream({
filter_path: [
'*.settings',
'*.mappings',
'*.data_stream',
// remove settings that aren't really settings
'-*.settings.index.creation_date',
'-*.settings.index.uuid',
@ -44,37 +55,58 @@ export function createGenerateIndexRecordsStream({
],
},
{
headers: ES_CLIENT_HEADERS,
...headers,
meta: true,
}
)
).body;
for (const [index, { settings, mappings }] of Object.entries(resp)) {
const {
body: {
[index]: { aliases },
},
} = await client.indices.getAlias(
{ index },
{
headers: ES_CLIENT_HEADERS,
meta: true,
}
);
for (const [index, { data_stream: dataStream, settings, mappings }] of Object.entries(
resp
)) {
if (dataStream) {
log.info(`${index} will be saved as data_stream ${dataStream}`);
stats.archivedIndex(index, { settings, mappings });
this.push({
type: 'index',
value: {
// if keepIndexNames is false, rewrite the .kibana_* index to .kibana_1 so that
// when it is loaded it can skip migration, if possible
index: index.startsWith('.kibana') && !keepIndexNames ? '.kibana_1' : index,
settings,
mappings,
aliases,
},
});
if (seenDatastreams.has(dataStream)) {
log.info(`${dataStream} is already archived`);
continue;
}
const { data_streams: dataStreams } = await client.indices.getDataStream(
{ name: dataStream },
headers
);
const template = await getIndexTemplate(client, dataStreams[0].template);
seenDatastreams.add(dataStream);
stats.archivedIndex(dataStream, { template });
this.push({
type: 'data_stream',
value: {
data_stream: dataStream,
template,
},
});
} else {
const {
body: {
[index]: { aliases },
},
} = await client.indices.getAlias({ index }, { ...headers, meta: true });
stats.archivedIndex(index, { settings, mappings });
this.push({
type: 'index',
value: {
// if keepIndexNames is false, rewrite the .kibana_* index to .kibana_1 so that
// when it is loaded it can skip migration, if possible
index: index.startsWith('.kibana') && !keepIndexNames ? '.kibana_1' : index,
settings,
mappings,
aliases,
},
});
}
}
callback();

View file

@ -26,7 +26,7 @@ describe('esArchiver: createFilterRecordsStream()', () => {
},
chance.bool(),
]),
createFilterRecordsStream('type'),
createFilterRecordsStream((record) => record.type === 'type'),
createConcatStream([]),
]);
@ -45,7 +45,7 @@ describe('esArchiver: createFilterRecordsStream()', () => {
{ type: chance.word({ length: 10 }), value: {} },
{ type: chance.word({ length: 10 }), value: {} },
]),
createFilterRecordsStream(type1),
createFilterRecordsStream((record) => record.type === type1),
createConcatStream([]),
]);

View file

@ -8,13 +8,13 @@
import { Transform } from 'stream';
export function createFilterRecordsStream(type: string) {
export function createFilterRecordsStream(fn: (record: any) => boolean) {
return new Transform({
writableObjectMode: true,
readableObjectMode: true,
transform(record, enc, callback) {
if (record && record.type === type) {
if (record && fn(record)) {
callback(undefined, record);
} else {
callback();

View file

@ -83,6 +83,15 @@ export function createStats(name: string, log: ToolingLog) {
info('Deleted existing index %j', index);
}
/**
* Record that a data stream was deleted
* @param index
*/
public deletedDataStream(stream: string, template: string) {
getOrCreate(stream).deleted = true;
info('Deleted existing data stream %j with index template %j', stream, template);
}
/**
* Record that an index was created
* @param index
@ -95,6 +104,18 @@ export function createStats(name: string, log: ToolingLog) {
});
}
/**
* Record that a data stream was created
* @param index
*/
public createdDataStream(stream: string, template: string, metadata: Record<string, any> = {}) {
getOrCreate(stream).created = true;
info('Created data stream %j with index template %j', stream, template);
Object.keys(metadata).forEach((key) => {
debug('%j %s %j', stream, key, metadata[key]);
});
}
/**
* Record that an index was written to the archives
* @param index