mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
parent
63ea35a54c
commit
6ce44342d6
12 changed files with 375 additions and 91 deletions
|
@ -40,6 +40,7 @@ export async function loadAction({
|
|||
inputDir,
|
||||
skipExisting,
|
||||
useCreate,
|
||||
docsOnly,
|
||||
client,
|
||||
log,
|
||||
kbnClient,
|
||||
|
@ -47,6 +48,7 @@ export async function loadAction({
|
|||
inputDir: string;
|
||||
skipExisting: boolean;
|
||||
useCreate: boolean;
|
||||
docsOnly?: boolean;
|
||||
client: Client;
|
||||
log: ToolingLog;
|
||||
kbnClient: KbnClient;
|
||||
|
@ -76,7 +78,7 @@ export async function loadAction({
|
|||
|
||||
await createPromiseFromStreams([
|
||||
recordStream,
|
||||
createCreateIndexStream({ client, stats, skipExisting, log }),
|
||||
createCreateIndexStream({ client, stats, skipExisting, docsOnly, log }),
|
||||
createIndexDocRecordsStream(client, stats, progress, useCreate),
|
||||
]);
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ export async function saveAction({
|
|||
client,
|
||||
log,
|
||||
raw,
|
||||
keepIndexNames,
|
||||
query,
|
||||
}: {
|
||||
outputDir: string;
|
||||
|
@ -34,6 +35,7 @@ export async function saveAction({
|
|||
client: Client;
|
||||
log: ToolingLog;
|
||||
raw: boolean;
|
||||
keepIndexNames?: boolean;
|
||||
query?: Record<string, any>;
|
||||
}) {
|
||||
const name = relative(REPO_ROOT, outputDir);
|
||||
|
@ -50,7 +52,7 @@ export async function saveAction({
|
|||
// export and save the matching indices to mappings.json
|
||||
createPromiseFromStreams([
|
||||
createListStream(indices),
|
||||
createGenerateIndexRecordsStream(client, stats),
|
||||
createGenerateIndexRecordsStream({ client, stats, keepIndexNames }),
|
||||
...createFormatArchiveStreams(),
|
||||
createWriteStream(resolve(outputDir, 'mappings.json')),
|
||||
] as [Readable, ...Writable[]]),
|
||||
|
@ -58,7 +60,7 @@ export async function saveAction({
|
|||
// export all documents from matching indexes into data.json.gz
|
||||
createPromiseFromStreams([
|
||||
createListStream(indices),
|
||||
createGenerateDocRecordsStream({ client, stats, progress, query }),
|
||||
createGenerateDocRecordsStream({ client, stats, progress, keepIndexNames, query }),
|
||||
...createFormatArchiveStreams({ gzip: !raw }),
|
||||
createWriteStream(resolve(outputDir, `data.json${raw ? '' : '.gz'}`)),
|
||||
] as [Readable, ...Writable[]]),
|
||||
|
|
|
@ -143,11 +143,12 @@ export function runCli() {
|
|||
$ node scripts/es_archiver save test/functional/es_archives/my_test_data logstash-*
|
||||
`,
|
||||
flags: {
|
||||
boolean: ['raw'],
|
||||
boolean: ['raw', 'keep-index-names'],
|
||||
string: ['query'],
|
||||
help: `
|
||||
--raw don't gzip the archives
|
||||
--query query object to limit the documents being archived, needs to be properly escaped JSON
|
||||
--raw don't gzip the archives
|
||||
--keep-index-names don't change the names of Kibana indices to .kibana_1
|
||||
--query query object to limit the documents being archived, needs to be properly escaped JSON
|
||||
`,
|
||||
},
|
||||
async run({ flags, esArchiver, statsMeta }) {
|
||||
|
@ -168,6 +169,11 @@ export function runCli() {
|
|||
throw createFlagError('--raw does not take a value');
|
||||
}
|
||||
|
||||
const keepIndexNames = flags['keep-index-names'];
|
||||
if (typeof keepIndexNames !== 'boolean') {
|
||||
throw createFlagError('--keep-index-names does not take a value');
|
||||
}
|
||||
|
||||
const query = flags.query;
|
||||
let parsedQuery;
|
||||
if (typeof query === 'string' && query.length > 0) {
|
||||
|
@ -178,7 +184,7 @@ export function runCli() {
|
|||
}
|
||||
}
|
||||
|
||||
await esArchiver.save(path, indices, { raw, query: parsedQuery });
|
||||
await esArchiver.save(path, indices, { raw, keepIndexNames, query: parsedQuery });
|
||||
},
|
||||
})
|
||||
.command({
|
||||
|
@ -196,9 +202,10 @@ export function runCli() {
|
|||
$ node scripts/es_archiver load my_test_data --config ../config.js
|
||||
`,
|
||||
flags: {
|
||||
boolean: ['use-create'],
|
||||
boolean: ['use-create', 'docs-only'],
|
||||
help: `
|
||||
--use-create use create instead of index for loading documents
|
||||
--docs-only load only documents, not indices
|
||||
`,
|
||||
},
|
||||
async run({ flags, esArchiver, statsMeta }) {
|
||||
|
@ -217,7 +224,12 @@ export function runCli() {
|
|||
throw createFlagError('--use-create does not take a value');
|
||||
}
|
||||
|
||||
await esArchiver.load(path, { useCreate });
|
||||
const docsOnly = flags['docs-only'];
|
||||
if (typeof docsOnly !== 'boolean') {
|
||||
throw createFlagError('--docs-only does not take a value');
|
||||
}
|
||||
|
||||
await esArchiver.load(path, { useCreate, docsOnly });
|
||||
},
|
||||
})
|
||||
.command({
|
||||
|
|
|
@ -50,16 +50,22 @@ export class EsArchiver {
|
|||
* @param {String|Array<String>} indices - the indices to archive
|
||||
* @param {Object} options
|
||||
* @property {Boolean} options.raw - should the archive be raw (unzipped) or not
|
||||
* @property {Boolean} options.keepIndexNames - should the Kibana index name be kept as-is or renamed
|
||||
*/
|
||||
async save(
|
||||
path: string,
|
||||
indices: string | string[],
|
||||
{ raw = false, query }: { raw?: boolean; query?: Record<string, any> } = {}
|
||||
{
|
||||
raw = false,
|
||||
keepIndexNames = false,
|
||||
query,
|
||||
}: { raw?: boolean; keepIndexNames?: boolean; query?: Record<string, any> } = {}
|
||||
) {
|
||||
return await saveAction({
|
||||
outputDir: Path.resolve(this.baseDir, path),
|
||||
indices,
|
||||
raw,
|
||||
keepIndexNames,
|
||||
client: this.client,
|
||||
log: this.log,
|
||||
query,
|
||||
|
@ -74,18 +80,21 @@ export class EsArchiver {
|
|||
* @property {Boolean} options.skipExisting - should existing indices
|
||||
* be ignored or overwritten
|
||||
* @property {Boolean} options.useCreate - use a create operation instead of index for documents
|
||||
* @property {Boolean} options.docsOnly - load only documents, not indices
|
||||
*/
|
||||
async load(
|
||||
path: string,
|
||||
{
|
||||
skipExisting = false,
|
||||
useCreate = false,
|
||||
}: { skipExisting?: boolean; useCreate?: boolean } = {}
|
||||
docsOnly = false,
|
||||
}: { skipExisting?: boolean; useCreate?: boolean; docsOnly?: boolean } = {}
|
||||
) {
|
||||
return await loadAction({
|
||||
inputDir: this.findArchive(path),
|
||||
skipExisting: !!skipExisting,
|
||||
useCreate: !!useCreate,
|
||||
docsOnly,
|
||||
client: this.client,
|
||||
log: this.log,
|
||||
kbnClient: this.kbnClient,
|
||||
|
|
|
@ -20,48 +20,24 @@ import { createStats } from '../stats';
|
|||
|
||||
const log = new ToolingLog();
|
||||
|
||||
it('transforms each input index to a stream of docs using scrollSearch helper', async () => {
|
||||
const responses: any = {
|
||||
foo: [
|
||||
{
|
||||
body: {
|
||||
hits: {
|
||||
total: 5,
|
||||
hits: [
|
||||
{ _index: 'foo', _type: '_doc', _id: '0', _source: {} },
|
||||
{ _index: 'foo', _type: '_doc', _id: '1', _source: {} },
|
||||
{ _index: 'foo', _type: '_doc', _id: '2', _source: {} },
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
body: {
|
||||
hits: {
|
||||
total: 5,
|
||||
hits: [
|
||||
{ _index: 'foo', _type: '_doc', _id: '3', _source: {} },
|
||||
{ _index: 'foo', _type: '_doc', _id: '4', _source: {} },
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
bar: [
|
||||
{
|
||||
body: {
|
||||
hits: {
|
||||
total: 2,
|
||||
hits: [
|
||||
{ _index: 'bar', _type: '_doc', _id: '0', _source: {} },
|
||||
{ _index: 'bar', _type: '_doc', _id: '1', _source: {} },
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
};
|
||||
interface SearchResponses {
|
||||
[key: string]: Array<{
|
||||
body: {
|
||||
hits: {
|
||||
total: number;
|
||||
hits: Array<{
|
||||
_index: string;
|
||||
_type: string;
|
||||
_id: string;
|
||||
_source: Record<string, unknown>;
|
||||
}>;
|
||||
};
|
||||
};
|
||||
}>;
|
||||
}
|
||||
|
||||
function createMockClient(responses: SearchResponses) {
|
||||
// TODO: replace with proper mocked client
|
||||
const client: any = {
|
||||
helpers: {
|
||||
scrollSearch: jest.fn(function* ({ index }) {
|
||||
|
@ -71,29 +47,76 @@ it('transforms each input index to a stream of docs using scrollSearch helper',
|
|||
}),
|
||||
},
|
||||
};
|
||||
return client;
|
||||
}
|
||||
|
||||
const stats = createStats('test', log);
|
||||
const progress = new Progress();
|
||||
describe('esArchiver: createGenerateDocRecordsStream()', () => {
|
||||
it('transforms each input index to a stream of docs using scrollSearch helper', async () => {
|
||||
const responses = {
|
||||
foo: [
|
||||
{
|
||||
body: {
|
||||
hits: {
|
||||
total: 5,
|
||||
hits: [
|
||||
{ _index: 'foo', _type: '_doc', _id: '0', _source: {} },
|
||||
{ _index: 'foo', _type: '_doc', _id: '1', _source: {} },
|
||||
{ _index: 'foo', _type: '_doc', _id: '2', _source: {} },
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
body: {
|
||||
hits: {
|
||||
total: 5,
|
||||
hits: [
|
||||
{ _index: 'foo', _type: '_doc', _id: '3', _source: {} },
|
||||
{ _index: 'foo', _type: '_doc', _id: '4', _source: {} },
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
bar: [
|
||||
{
|
||||
body: {
|
||||
hits: {
|
||||
total: 2,
|
||||
hits: [
|
||||
{ _index: 'bar', _type: '_doc', _id: '0', _source: {} },
|
||||
{ _index: 'bar', _type: '_doc', _id: '1', _source: {} },
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
const results = await createPromiseFromStreams([
|
||||
createListStream(['bar', 'foo']),
|
||||
createGenerateDocRecordsStream({
|
||||
client,
|
||||
stats,
|
||||
progress,
|
||||
}),
|
||||
createMapStream((record: any) => {
|
||||
expect(record).toHaveProperty('type', 'doc');
|
||||
expect(record.value.source).toEqual({});
|
||||
expect(record.value.type).toBe('_doc');
|
||||
expect(record.value.index).toMatch(/^(foo|bar)$/);
|
||||
expect(record.value.id).toMatch(/^\d+$/);
|
||||
return `${record.value.index}:${record.value.id}`;
|
||||
}),
|
||||
createConcatStream([]),
|
||||
]);
|
||||
const client = createMockClient(responses);
|
||||
|
||||
expect(client.helpers.scrollSearch).toMatchInlineSnapshot(`
|
||||
const stats = createStats('test', log);
|
||||
const progress = new Progress();
|
||||
|
||||
const results = await createPromiseFromStreams([
|
||||
createListStream(['bar', 'foo']),
|
||||
createGenerateDocRecordsStream({
|
||||
client,
|
||||
stats,
|
||||
progress,
|
||||
}),
|
||||
createMapStream((record: any) => {
|
||||
expect(record).toHaveProperty('type', 'doc');
|
||||
expect(record.value.source).toEqual({});
|
||||
expect(record.value.type).toBe('_doc');
|
||||
expect(record.value.index).toMatch(/^(foo|bar)$/);
|
||||
expect(record.value.id).toMatch(/^\d+$/);
|
||||
return `${record.value.index}:${record.value.id}`;
|
||||
}),
|
||||
createConcatStream([]),
|
||||
]);
|
||||
|
||||
expect(client.helpers.scrollSearch).toMatchInlineSnapshot(`
|
||||
[MockFunction] {
|
||||
"calls": Array [
|
||||
Array [
|
||||
|
@ -139,7 +162,7 @@ it('transforms each input index to a stream of docs using scrollSearch helper',
|
|||
],
|
||||
}
|
||||
`);
|
||||
expect(results).toMatchInlineSnapshot(`
|
||||
expect(results).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"bar:0",
|
||||
"bar:1",
|
||||
|
@ -150,14 +173,14 @@ it('transforms each input index to a stream of docs using scrollSearch helper',
|
|||
"foo:4",
|
||||
]
|
||||
`);
|
||||
expect(progress).toMatchInlineSnapshot(`
|
||||
expect(progress).toMatchInlineSnapshot(`
|
||||
Progress {
|
||||
"complete": 7,
|
||||
"loggingInterval": undefined,
|
||||
"total": 7,
|
||||
}
|
||||
`);
|
||||
expect(stats).toMatchInlineSnapshot(`
|
||||
expect(stats).toMatchInlineSnapshot(`
|
||||
Object {
|
||||
"bar": Object {
|
||||
"archived": false,
|
||||
|
@ -193,4 +216,80 @@ it('transforms each input index to a stream of docs using scrollSearch helper',
|
|||
},
|
||||
}
|
||||
`);
|
||||
});
|
||||
|
||||
describe('keepIndexNames', () => {
|
||||
it('changes .kibana* index names if keepIndexNames is not enabled', async () => {
|
||||
const hits = [{ _index: '.kibana_7.16.0_001', _type: '_doc', _id: '0', _source: {} }];
|
||||
const responses = {
|
||||
['.kibana_7.16.0_001']: [{ body: { hits: { hits, total: hits.length } } }],
|
||||
};
|
||||
const client = createMockClient(responses);
|
||||
const stats = createStats('test', log);
|
||||
const progress = new Progress();
|
||||
|
||||
const results = await createPromiseFromStreams([
|
||||
createListStream(['.kibana_7.16.0_001']),
|
||||
createGenerateDocRecordsStream({
|
||||
client,
|
||||
stats,
|
||||
progress,
|
||||
}),
|
||||
createMapStream((record: { value: { index: string; id: string } }) => {
|
||||
return `${record.value.index}:${record.value.id}`;
|
||||
}),
|
||||
createConcatStream([]),
|
||||
]);
|
||||
expect(results).toEqual(['.kibana_1:0']);
|
||||
});
|
||||
|
||||
it('does not change non-.kibana* index names if keepIndexNames is not enabled', async () => {
|
||||
const hits = [{ _index: '.foo', _type: '_doc', _id: '0', _source: {} }];
|
||||
const responses = {
|
||||
['.foo']: [{ body: { hits: { hits, total: hits.length } } }],
|
||||
};
|
||||
const client = createMockClient(responses);
|
||||
const stats = createStats('test', log);
|
||||
const progress = new Progress();
|
||||
|
||||
const results = await createPromiseFromStreams([
|
||||
createListStream(['.foo']),
|
||||
createGenerateDocRecordsStream({
|
||||
client,
|
||||
stats,
|
||||
progress,
|
||||
}),
|
||||
createMapStream((record: { value: { index: string; id: string } }) => {
|
||||
return `${record.value.index}:${record.value.id}`;
|
||||
}),
|
||||
createConcatStream([]),
|
||||
]);
|
||||
expect(results).toEqual(['.foo:0']);
|
||||
});
|
||||
|
||||
it('does not change .kibana* index names if keepIndexNames is enabled', async () => {
|
||||
const hits = [{ _index: '.kibana_7.16.0_001', _type: '_doc', _id: '0', _source: {} }];
|
||||
const responses = {
|
||||
['.kibana_7.16.0_001']: [{ body: { hits: { hits, total: hits.length } } }],
|
||||
};
|
||||
const client = createMockClient(responses);
|
||||
const stats = createStats('test', log);
|
||||
const progress = new Progress();
|
||||
|
||||
const results = await createPromiseFromStreams([
|
||||
createListStream(['.kibana_7.16.0_001']),
|
||||
createGenerateDocRecordsStream({
|
||||
client,
|
||||
stats,
|
||||
progress,
|
||||
keepIndexNames: true,
|
||||
}),
|
||||
createMapStream((record: { value: { index: string; id: string } }) => {
|
||||
return `${record.value.index}:${record.value.id}`;
|
||||
}),
|
||||
createConcatStream([]),
|
||||
]);
|
||||
expect(results).toEqual(['.kibana_7.16.0_001:0']);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -19,11 +19,13 @@ export function createGenerateDocRecordsStream({
|
|||
client,
|
||||
stats,
|
||||
progress,
|
||||
keepIndexNames,
|
||||
query,
|
||||
}: {
|
||||
client: Client;
|
||||
stats: Stats;
|
||||
progress: Progress;
|
||||
keepIndexNames?: boolean;
|
||||
query?: Record<string, any>;
|
||||
}) {
|
||||
return new Transform({
|
||||
|
@ -59,9 +61,10 @@ export function createGenerateDocRecordsStream({
|
|||
this.push({
|
||||
type: 'doc',
|
||||
value: {
|
||||
// always rewrite the .kibana_* index to .kibana_1 so that
|
||||
// if keepIndexNames is false, rewrite the .kibana_* index to .kibana_1 so that
|
||||
// when it is loaded it can skip migration, if possible
|
||||
index: hit._index.startsWith('.kibana') ? '.kibana_1' : hit._index,
|
||||
index:
|
||||
hit._index.startsWith('.kibana') && !keepIndexNames ? '.kibana_1' : hit._index,
|
||||
type: hit._type,
|
||||
id: hit._id,
|
||||
source: hit._source,
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import type { deleteKibanaIndices } from './kibana_index';
|
||||
|
||||
export const mockDeleteKibanaIndices = jest.fn() as jest.MockedFunction<typeof deleteKibanaIndices>;
|
||||
|
||||
jest.mock('./kibana_index', () => ({
|
||||
deleteKibanaIndices: mockDeleteKibanaIndices,
|
||||
}));
|
|
@ -6,6 +6,8 @@
|
|||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
import { mockDeleteKibanaIndices } from './create_index_stream.test.mock';
|
||||
|
||||
import sinon from 'sinon';
|
||||
import Chance from 'chance';
|
||||
import { createPromiseFromStreams, createConcatStream, createListStream } from '@kbn/utils';
|
||||
|
@ -24,6 +26,10 @@ const chance = new Chance();
|
|||
|
||||
const log = createStubLogger();
|
||||
|
||||
beforeEach(() => {
|
||||
mockDeleteKibanaIndices.mockClear();
|
||||
});
|
||||
|
||||
describe('esArchiver: createCreateIndexStream()', () => {
|
||||
describe('defaults', () => {
|
||||
it('deletes existing indices, creates all', async () => {
|
||||
|
@ -167,6 +173,73 @@ describe('esArchiver: createCreateIndexStream()', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('deleteKibanaIndices', () => {
|
||||
function doTest(...indices: string[]) {
|
||||
return createPromiseFromStreams([
|
||||
createListStream(indices.map((index) => createStubIndexRecord(index))),
|
||||
createCreateIndexStream({ client: createStubClient(), stats: createStubStats(), log }),
|
||||
createConcatStream([]),
|
||||
]);
|
||||
}
|
||||
|
||||
it('does not delete Kibana indices for indexes that do not start with .kibana', async () => {
|
||||
await doTest('.foo');
|
||||
|
||||
expect(mockDeleteKibanaIndices).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('deletes Kibana indices at most once for indices that start with .kibana', async () => {
|
||||
// If we are loading the main Kibana index, we should delete all Kibana indices for backwards compatibility reasons.
|
||||
await doTest('.kibana_7.16.0_001', '.kibana_task_manager_7.16.0_001');
|
||||
|
||||
expect(mockDeleteKibanaIndices).toHaveBeenCalledTimes(1);
|
||||
expect(mockDeleteKibanaIndices).toHaveBeenCalledWith(
|
||||
expect.not.objectContaining({ onlyTaskManager: true })
|
||||
);
|
||||
});
|
||||
|
||||
it('deletes Kibana task manager index at most once, using onlyTaskManager: true', async () => {
|
||||
// If we are loading the Kibana task manager index, we should only delete that index, not any other Kibana indices.
|
||||
await doTest('.kibana_task_manager_7.16.0_001', '.kibana_task_manager_7.16.0_002');
|
||||
|
||||
expect(mockDeleteKibanaIndices).toHaveBeenCalledTimes(1);
|
||||
expect(mockDeleteKibanaIndices).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ onlyTaskManager: true })
|
||||
);
|
||||
});
|
||||
|
||||
it('deletes Kibana task manager index AND deletes all Kibana indices', async () => {
|
||||
// Because we are reading from a stream, we can't look ahead to see if we'll eventually wind up deleting all Kibana indices.
|
||||
// So, we first delete only the Kibana task manager indices, then we wind up deleting all Kibana indices.
|
||||
await doTest('.kibana_task_manager_7.16.0_001', '.kibana_7.16.0_001');
|
||||
|
||||
expect(mockDeleteKibanaIndices).toHaveBeenCalledTimes(2);
|
||||
expect(mockDeleteKibanaIndices).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
expect.objectContaining({ onlyTaskManager: true })
|
||||
);
|
||||
expect(mockDeleteKibanaIndices).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
expect.not.objectContaining({ onlyTaskManager: true })
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('docsOnly = true', () => {
|
||||
it('passes through "hit" records without attempting to create indices', async () => {
|
||||
const client = createStubClient();
|
||||
const stats = createStubStats();
|
||||
const output = await createPromiseFromStreams([
|
||||
createListStream([createStubIndexRecord('index'), createStubDocRecord('index', 1)]),
|
||||
createCreateIndexStream({ client, stats, log, docsOnly: true }),
|
||||
createConcatStream([]),
|
||||
]);
|
||||
|
||||
sinon.assert.notCalled(client.indices.create as sinon.SinonSpy);
|
||||
expect(output).toEqual([createStubDocRecord('index', 1)]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('skipExisting = true', () => {
|
||||
it('ignores preexisting indexes', async () => {
|
||||
const client = createStubClient(['existing-index']);
|
||||
|
|
|
@ -29,11 +29,13 @@ export function createCreateIndexStream({
|
|||
client,
|
||||
stats,
|
||||
skipExisting = false,
|
||||
docsOnly = false,
|
||||
log,
|
||||
}: {
|
||||
client: Client;
|
||||
stats: Stats;
|
||||
skipExisting?: boolean;
|
||||
docsOnly?: boolean;
|
||||
log: ToolingLog;
|
||||
}) {
|
||||
const skipDocsFromIndices = new Set();
|
||||
|
@ -42,6 +44,7 @@ export function createCreateIndexStream({
|
|||
// previous indices are removed so we're starting w/ a clean slate for
|
||||
// migrations. This only needs to be done once per archive load operation.
|
||||
let kibanaIndexAlreadyDeleted = false;
|
||||
let kibanaTaskManagerIndexAlreadyDeleted = false;
|
||||
|
||||
async function handleDoc(stream: Readable, record: DocRecord) {
|
||||
if (skipDocsFromIndices.has(record.value.index)) {
|
||||
|
@ -53,13 +56,21 @@ export function createCreateIndexStream({
|
|||
|
||||
async function handleIndex(record: DocRecord) {
|
||||
const { index, settings, mappings, aliases } = record.value;
|
||||
const isKibana = index.startsWith('.kibana');
|
||||
const isKibanaTaskManager = index.startsWith('.kibana_task_manager');
|
||||
const isKibana = index.startsWith('.kibana') && !isKibanaTaskManager;
|
||||
|
||||
if (docsOnly) {
|
||||
return;
|
||||
}
|
||||
|
||||
async function attemptToCreate(attemptNumber = 1) {
|
||||
try {
|
||||
if (isKibana && !kibanaIndexAlreadyDeleted) {
|
||||
await deleteKibanaIndices({ client, stats, log });
|
||||
kibanaIndexAlreadyDeleted = true;
|
||||
await deleteKibanaIndices({ client, stats, log }); // delete all .kibana* indices
|
||||
kibanaIndexAlreadyDeleted = kibanaTaskManagerIndexAlreadyDeleted = true;
|
||||
} else if (isKibanaTaskManager && !kibanaTaskManagerIndexAlreadyDeleted) {
|
||||
await deleteKibanaIndices({ client, stats, onlyTaskManager: true, log }); // delete only .kibana_task_manager* indices
|
||||
kibanaTaskManagerIndexAlreadyDeleted = true;
|
||||
}
|
||||
|
||||
await client.indices.create(
|
||||
|
|
|
@ -21,7 +21,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
|
|||
|
||||
await createPromiseFromStreams([
|
||||
createListStream(indices),
|
||||
createGenerateIndexRecordsStream(client, stats),
|
||||
createGenerateIndexRecordsStream({ client, stats }),
|
||||
]);
|
||||
|
||||
expect(stats.getTestSummary()).toEqual({
|
||||
|
@ -40,7 +40,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
|
|||
|
||||
await createPromiseFromStreams([
|
||||
createListStream(['index1']),
|
||||
createGenerateIndexRecordsStream(client, stats),
|
||||
createGenerateIndexRecordsStream({ client, stats }),
|
||||
]);
|
||||
|
||||
const params = (client.indices.get as sinon.SinonSpy).args[0][0];
|
||||
|
@ -58,7 +58,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
|
|||
|
||||
const indexRecords = await createPromiseFromStreams<any[]>([
|
||||
createListStream(['index1', 'index2', 'index3']),
|
||||
createGenerateIndexRecordsStream(client, stats),
|
||||
createGenerateIndexRecordsStream({ client, stats }),
|
||||
createConcatStream([]),
|
||||
]);
|
||||
|
||||
|
@ -83,7 +83,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
|
|||
|
||||
const indexRecords = await createPromiseFromStreams([
|
||||
createListStream(['index1']),
|
||||
createGenerateIndexRecordsStream(client, stats),
|
||||
createGenerateIndexRecordsStream({ client, stats }),
|
||||
createConcatStream([]),
|
||||
]);
|
||||
|
||||
|
@ -99,4 +99,51 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
|
|||
},
|
||||
]);
|
||||
});
|
||||
|
||||
describe('change index names', () => {
|
||||
it('changes .kibana* index names if keepIndexNames is not enabled', async () => {
|
||||
const stats = createStubStats();
|
||||
const client = createStubClient(['.kibana_7.16.0_001']);
|
||||
|
||||
const indexRecords = await createPromiseFromStreams([
|
||||
createListStream(['.kibana_7.16.0_001']),
|
||||
createGenerateIndexRecordsStream({ client, stats }),
|
||||
createConcatStream([]),
|
||||
]);
|
||||
|
||||
expect(indexRecords).toEqual([
|
||||
{ type: 'index', value: expect.objectContaining({ index: '.kibana_1' }) },
|
||||
]);
|
||||
});
|
||||
|
||||
it('does not change non-.kibana* index names if keepIndexNames is not enabled', async () => {
|
||||
const stats = createStubStats();
|
||||
const client = createStubClient(['.foo']);
|
||||
|
||||
const indexRecords = await createPromiseFromStreams([
|
||||
createListStream(['.foo']),
|
||||
createGenerateIndexRecordsStream({ client, stats }),
|
||||
createConcatStream([]),
|
||||
]);
|
||||
|
||||
expect(indexRecords).toEqual([
|
||||
{ type: 'index', value: expect.objectContaining({ index: '.foo' }) },
|
||||
]);
|
||||
});
|
||||
|
||||
it('does not change .kibana* index names if keepIndexNames is enabled', async () => {
|
||||
const stats = createStubStats();
|
||||
const client = createStubClient(['.kibana_7.16.0_001']);
|
||||
|
||||
const indexRecords = await createPromiseFromStreams([
|
||||
createListStream(['.kibana_7.16.0_001']),
|
||||
createGenerateIndexRecordsStream({ client, stats, keepIndexNames: true }),
|
||||
createConcatStream([]),
|
||||
]);
|
||||
|
||||
expect(indexRecords).toEqual([
|
||||
{ type: 'index', value: expect.objectContaining({ index: '.kibana_7.16.0_001' }) },
|
||||
]);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -11,7 +11,15 @@ import { Transform } from 'stream';
|
|||
import { Stats } from '../stats';
|
||||
import { ES_CLIENT_HEADERS } from '../../client_headers';
|
||||
|
||||
export function createGenerateIndexRecordsStream(client: Client, stats: Stats) {
|
||||
export function createGenerateIndexRecordsStream({
|
||||
client,
|
||||
stats,
|
||||
keepIndexNames,
|
||||
}: {
|
||||
client: Client;
|
||||
stats: Stats;
|
||||
keepIndexNames?: boolean;
|
||||
}) {
|
||||
return new Transform({
|
||||
writableObjectMode: true,
|
||||
readableObjectMode: true,
|
||||
|
@ -59,9 +67,9 @@ export function createGenerateIndexRecordsStream(client: Client, stats: Stats) {
|
|||
this.push({
|
||||
type: 'index',
|
||||
value: {
|
||||
// always rewrite the .kibana_* index to .kibana_1 so that
|
||||
// if keepIndexNames is false, rewrite the .kibana_* index to .kibana_1 so that
|
||||
// when it is loaded it can skip migration, if possible
|
||||
index: index.startsWith('.kibana') ? '.kibana_1' : index,
|
||||
index: index.startsWith('.kibana') && !keepIndexNames ? '.kibana_1' : index,
|
||||
settings,
|
||||
mappings,
|
||||
aliases,
|
||||
|
|
|
@ -16,18 +16,21 @@ import { deleteIndex } from './delete_index';
|
|||
import { ES_CLIENT_HEADERS } from '../../client_headers';
|
||||
|
||||
/**
|
||||
* Deletes all indices that start with `.kibana`
|
||||
* Deletes all indices that start with `.kibana`, or if onlyTaskManager==true, all indices that start with `.kibana_task_manager`
|
||||
*/
|
||||
export async function deleteKibanaIndices({
|
||||
client,
|
||||
stats,
|
||||
onlyTaskManager = false,
|
||||
log,
|
||||
}: {
|
||||
client: Client;
|
||||
stats: Stats;
|
||||
onlyTaskManager?: boolean;
|
||||
log: ToolingLog;
|
||||
}) {
|
||||
const indexNames = await fetchKibanaIndices(client);
|
||||
const indexPattern = onlyTaskManager ? '.kibana_task_manager*' : '.kibana*';
|
||||
const indexNames = await fetchKibanaIndices(client, indexPattern);
|
||||
if (!indexNames.length) {
|
||||
return;
|
||||
}
|
||||
|
@ -75,9 +78,9 @@ function isKibanaIndex(index?: string): index is string {
|
|||
);
|
||||
}
|
||||
|
||||
async function fetchKibanaIndices(client: Client) {
|
||||
async function fetchKibanaIndices(client: Client, indexPattern: string) {
|
||||
const resp = await client.cat.indices(
|
||||
{ index: '.kibana*', format: 'json' },
|
||||
{ index: indexPattern, format: 'json' },
|
||||
{
|
||||
headers: ES_CLIENT_HEADERS,
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue