Migrate es-archiver to typescript (#56008)

* migrate lib/archives and lib/docs

* migrate lib/indices

* migrate end of /lib

* migrate /actions

* migrate es_archiver

* migrate cli

* migrate tests

* use proper log stub

* add Record typing

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Pierre Gayvallet 2020-02-03 12:03:50 +01:00 committed by GitHub
parent fe86a86d14
commit 38dc1cbd3c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
46 changed files with 577 additions and 394 deletions

View file

@ -56,7 +56,7 @@ async function getSettingsFromFile(log: ToolingLog, path: string, settingOverrid
return transformDeprecations(settingsWithDefaults, logDeprecation);
}
export async function readConfigFile(log: ToolingLog, path: string, settingOverrides: any) {
export async function readConfigFile(log: ToolingLog, path: string, settingOverrides: any = {}) {
return new Config({
settings: await getSettingsFromFile(log, path, settingOverrides),
primary: true,

View file

@ -22,12 +22,23 @@ import Fs from 'fs';
import { createGunzip, createGzip, Z_BEST_COMPRESSION } from 'zlib';
import { promisify } from 'util';
import globby from 'globby';
import { ToolingLog } from '@kbn/dev-utils';
import { createPromiseFromStreams } from '../../legacy/utils';
const unlinkAsync = promisify(Fs.unlink);
export async function editAction({ prefix, dataDir, log, handler }) {
export async function editAction({
prefix,
dataDir,
log,
handler,
}: {
prefix: string;
dataDir: string;
log: ToolingLog;
handler: () => Promise<any>;
}) {
const archives = (
await globby('**/*.gz', {
cwd: prefix ? resolve(dataDir, prefix) : dataDir,

View file

@ -16,13 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
import { Client } from 'elasticsearch';
import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import { migrateKibanaIndex, deleteKibanaIndices, createStats } from '../lib';
export async function emptyKibanaIndexAction({ client, log, kbnClient }) {
export async function emptyKibanaIndexAction({
client,
log,
kbnClient,
}: {
client: Client;
log: ToolingLog;
kbnClient: KbnClient;
}) {
const stats = createStats('emptyKibanaIndex', log);
const kibanaPluginIds = await kbnClient.plugins.getEnabledIds();
await deleteKibanaIndices({ client, stats });
await migrateKibanaIndex({ client, log, stats, kibanaPluginIds });
await deleteKibanaIndices({ client, stats, log });
await migrateKibanaIndex({ client, log, kibanaPluginIds });
return stats;
}

View file

@ -19,6 +19,9 @@
import { resolve } from 'path';
import { createReadStream } from 'fs';
import { Readable } from 'stream';
import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import { Client } from 'elasticsearch';
import { createPromiseFromStreams, concatStreamProviders } from '../../legacy/utils';
@ -38,12 +41,26 @@ import {
// pipe a series of streams into each other so that data and errors
// flow from the first stream to the last. Errors from the last stream
// are not listened for
const pipeline = (...streams) =>
const pipeline = (...streams: Readable[]) =>
streams.reduce((source, dest) =>
source.once('error', error => dest.emit('error', error)).pipe(dest)
source.once('error', error => dest.emit('error', error)).pipe(dest as any)
);
export async function loadAction({ name, skipExisting, client, dataDir, log, kbnClient }) {
export async function loadAction({
name,
skipExisting,
client,
dataDir,
log,
kbnClient,
}: {
name: string;
skipExisting: boolean;
client: Client;
dataDir: string;
log: ToolingLog;
kbnClient: KbnClient;
}) {
const inputDir = resolve(dataDir, name);
const stats = createStats(name, log);
const files = prioritizeMappings(await readDirectory(inputDir));
@ -64,12 +81,12 @@ export async function loadAction({ name, skipExisting, client, dataDir, log, kbn
{ objectMode: true }
);
const progress = new Progress('load progress');
const progress = new Progress();
progress.activate(log);
await createPromiseFromStreams([
recordStream,
createCreateIndexStream({ client, stats, skipExisting, log, kibanaPluginIds }),
createCreateIndexStream({ client, stats, skipExisting, log }),
createIndexDocRecordsStream(client, stats, progress),
]);
@ -77,7 +94,7 @@ export async function loadAction({ name, skipExisting, client, dataDir, log, kbn
const result = stats.toJSON();
for (const [index, { docs }] of Object.entries(result)) {
if (!docs && docs.indexed > 0) {
if (docs && docs.indexed > 0) {
log.info('[%s] Indexed %d docs into %j', name, docs.indexed, index);
}
}

View file

@ -18,13 +18,12 @@
*/
import { resolve, dirname, relative } from 'path';
import { stat, rename, createReadStream, createWriteStream } from 'fs';
import { Readable, Writable } from 'stream';
import { fromNode } from 'bluebird';
import { ToolingLog } from '@kbn/dev-utils';
import { createPromiseFromStreams } from '../../legacy/utils';
import {
prioritizeMappings,
readDirectory,
@ -33,12 +32,20 @@ import {
createFormatArchiveStreams,
} from '../lib';
async function isDirectory(path) {
async function isDirectory(path: string): Promise<boolean> {
const stats = await fromNode(cb => stat(path, cb));
return stats.isDirectory();
}
export async function rebuildAllAction({ dataDir, log, rootDir = dataDir }) {
export async function rebuildAllAction({
dataDir,
log,
rootDir = dataDir,
}: {
dataDir: string;
log: ToolingLog;
rootDir?: string;
}) {
const childNames = prioritizeMappings(await readDirectory(dataDir));
for (const childName of childNames) {
const childPath = resolve(dataDir, childName);
@ -58,11 +65,11 @@ export async function rebuildAllAction({ dataDir, log, rootDir = dataDir }) {
const tempFile = childPath + (gzip ? '.rebuilding.gz' : '.rebuilding');
await createPromiseFromStreams([
createReadStream(childPath),
createReadStream(childPath) as Readable,
...createParseArchiveStreams({ gzip }),
...createFormatArchiveStreams({ gzip }),
createWriteStream(tempFile),
]);
] as [Readable, ...Writable[]]);
await fromNode(cb => rename(tempFile, childPath, cb));
log.info(`${archiveName} Rebuilt ${childName}`);

View file

@ -19,9 +19,11 @@
import { resolve } from 'path';
import { createWriteStream, mkdirSync } from 'fs';
import { Readable, Writable } from 'stream';
import { Client } from 'elasticsearch';
import { ToolingLog } from '@kbn/dev-utils';
import { createListStream, createPromiseFromStreams } from '../../legacy/utils';
import {
createStats,
createGenerateIndexRecordsStream,
@ -30,7 +32,21 @@ import {
Progress,
} from '../lib';
export async function saveAction({ name, indices, client, dataDir, log, raw }) {
export async function saveAction({
name,
indices,
client,
dataDir,
log,
raw,
}: {
name: string;
indices: string | string[];
client: Client;
dataDir: string;
log: ToolingLog;
raw: boolean;
}) {
const outputDir = resolve(dataDir, name);
const stats = createStats(name, log);
@ -48,7 +64,7 @@ export async function saveAction({ name, indices, client, dataDir, log, raw }) {
createGenerateIndexRecordsStream(client, stats),
...createFormatArchiveStreams(),
createWriteStream(resolve(outputDir, 'mappings.json')),
]),
] as [Readable, ...Writable[]]),
// export all documents from matching indexes into data.json.gz
createPromiseFromStreams([
@ -56,7 +72,7 @@ export async function saveAction({ name, indices, client, dataDir, log, raw }) {
createGenerateDocRecordsStream(client, stats, progress),
...createFormatArchiveStreams({ gzip: !raw }),
createWriteStream(resolve(outputDir, `data.json${raw ? '' : '.gz'}`)),
]),
] as [Readable, ...Writable[]]),
]);
progress.deactivate();

View file

@ -19,9 +19,11 @@
import { resolve } from 'path';
import { createReadStream } from 'fs';
import { Readable, Writable } from 'stream';
import { Client } from 'elasticsearch';
import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import { createPromiseFromStreams } from '../../legacy/utils';
import {
isGzip,
createStats,
@ -32,7 +34,19 @@ import {
createDeleteIndexStream,
} from '../lib';
export async function unloadAction({ name, client, dataDir, log, kbnClient }) {
export async function unloadAction({
name,
client,
dataDir,
log,
kbnClient,
}: {
name: string;
client: Client;
dataDir: string;
log: ToolingLog;
kbnClient: KbnClient;
}) {
const inputDir = resolve(dataDir, name);
const stats = createStats(name, log);
const kibanaPluginIds = await kbnClient.plugins.getEnabledIds();
@ -42,11 +56,11 @@ export async function unloadAction({ name, client, dataDir, log, kbnClient }) {
log.info('[%s] Unloading indices from %j', name, filename);
await createPromiseFromStreams([
createReadStream(resolve(inputDir, filename)),
createReadStream(resolve(inputDir, filename)) as Readable,
...createParseArchiveStreams({ gzip: isGzip(filename) }),
createFilterRecordsStream('index'),
createDeleteIndexStream(client, stats, log, kibanaPluginIds),
]);
] as [Readable, ...Writable[]]);
}
return stats.toJSON();

View file

@ -17,7 +17,7 @@
* under the License.
*/
/*************************************************************
/** ***********************************************************
*
* Run `node scripts/es_archiver --help` for usage information
*
@ -27,17 +27,17 @@ import { resolve } from 'path';
import { readFileSync } from 'fs';
import { format as formatUrl } from 'url';
import readline from 'readline';
import { Command } from 'commander';
import * as legacyElasticsearch from 'elasticsearch';
import { EsArchiver } from './es_archiver';
import { ToolingLog } from '@kbn/dev-utils';
import { readConfigFile } from '@kbn/test';
import { EsArchiver } from './es_archiver';
const cmd = new Command('node scripts/es_archiver');
const resolveConfigPath = v => resolve(process.cwd(), v);
const resolveConfigPath = (v: string) => resolve(process.cwd(), v);
const defaultConfigPath = resolveConfigPath('test/functional/config.js');
cmd
@ -56,6 +56,7 @@ cmd
defaultConfigPath
)
.on('--help', () => {
// eslint-disable-next-line no-console
console.log(readFileSync(resolve(__dirname, './cli_help.txt'), 'utf8'));
});
@ -95,10 +96,10 @@ cmd
output: process.stdout,
});
await new Promise(resolve => {
await new Promise(resolveInput => {
rl.question(`Press enter when you're done`, () => {
rl.close();
resolve();
resolveInput();
});
});
})
@ -112,12 +113,12 @@ cmd
cmd.parse(process.argv);
const missingCommand = cmd.args.every(a => !(a instanceof Command));
const missingCommand = cmd.args.every(a => !((a as any) instanceof Command));
if (missingCommand) {
execute();
}
async function execute(fn) {
async function execute(fn?: (esArchiver: EsArchiver, command: Command) => void): Promise<void> {
try {
const log = new ToolingLog({
level: cmd.verbose ? 'debug' : 'info',
@ -134,7 +135,7 @@ async function execute(fn) {
// log and count all validation errors
let errorCount = 0;
const error = msg => {
const error = (msg: string) => {
errorCount++;
log.error(msg);
};
@ -170,11 +171,12 @@ async function execute(fn) {
dataDir: resolve(cmd.dir),
kibanaUrl: cmd.kibanaUrl,
});
await fn(esArchiver, cmd);
await fn!(esArchiver, cmd);
} finally {
await client.close();
}
} catch (err) {
// eslint-disable-next-line no-console
console.log('FATAL ERROR', err.stack);
}
}

View file

@ -1,39 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { ToolingLog } from '@kbn/dev-utils';
import { Client } from 'elasticsearch';
import { createStats } from './lib/stats';
export type JsonStats = ReturnType<ReturnType<typeof createStats>['toJSON']>;
export class EsArchiver {
constructor(options: { client: Client; dataDir: string; log: ToolingLog; kibanaUrl: string });
public save(
name: string,
indices: string | string[],
options?: { raw?: boolean }
): Promise<JsonStats>;
public load(name: string, options?: { skipExisting?: boolean }): Promise<JsonStats>;
public unload(name: string): Promise<JsonStats>;
public rebuildAll(): Promise<void>;
public edit(prefix: string, handler: () => Promise<void>): Promise<void>;
public loadIfNeeded(name: string): Promise<JsonStats>;
public emptyKibanaIndex(): Promise<JsonStats>;
}

View file

@ -17,7 +17,8 @@
* under the License.
*/
import { KbnClient } from '@kbn/dev-utils';
import { Client } from 'elasticsearch';
import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import {
saveAction,
@ -29,7 +30,22 @@ import {
} from './actions';
export class EsArchiver {
constructor({ client, dataDir, log, kibanaUrl }) {
private readonly client: Client;
private readonly dataDir: string;
private readonly log: ToolingLog;
private readonly kbnClient: KbnClient;
constructor({
client,
dataDir,
log,
kibanaUrl,
}: {
client: Client;
dataDir: string;
log: ToolingLog;
kibanaUrl: string;
}) {
this.client = client;
this.dataDir = dataDir;
this.log = log;
@ -46,7 +62,7 @@ export class EsArchiver {
* @property {Boolean} options.raw - should the archive be raw (unzipped) or not
* @return Promise<Stats>
*/
async save(name, indices, { raw = false } = {}) {
async save(name: string, indices: string | string[], { raw = false }: { raw?: boolean } = {}) {
return await saveAction({
name,
indices,
@ -66,9 +82,7 @@ export class EsArchiver {
* be ignored or overwritten
* @return Promise<Stats>
*/
async load(name, options = {}) {
const { skipExisting } = options;
async load(name: string, { skipExisting = false }: { skipExisting?: boolean } = {}) {
return await loadAction({
name,
skipExisting: !!skipExisting,
@ -85,7 +99,7 @@ export class EsArchiver {
* @param {String} name
* @return Promise<Stats>
*/
async unload(name) {
async unload(name: string) {
return await unloadAction({
name,
client: this.client,
@ -103,7 +117,6 @@ export class EsArchiver {
*/
async rebuildAll() {
return await rebuildAllAction({
client: this.client,
dataDir: this.dataDir,
log: this.log,
});
@ -117,7 +130,7 @@ export class EsArchiver {
* @param {() => Promise<any>} handler
* @return Promise<void>
*/
async edit(prefix, handler) {
async edit(prefix: string, handler: () => Promise<void>) {
return await editAction({
prefix,
log: this.log,
@ -132,7 +145,7 @@ export class EsArchiver {
* @param {String} name
* @return Promise<Stats>
*/
async loadIfNeeded(name) {
async loadIfNeeded(name: string) {
return await this.load(name, { skipExisting: true });
}

View file

@ -1,20 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export { EsArchiver } from './es_archiver';

View file

@ -17,26 +17,26 @@
* under the License.
*/
import expect from '@kbn/expect';
import { uniq } from 'lodash';
import sinon from 'sinon';
import { createStats } from '../';
import expect from '@kbn/expect';
import { ToolingLog } from '@kbn/dev-utils';
function createBufferedLog() {
const log = new ToolingLog({
import { createStats } from '../';
function createBufferedLog(): ToolingLog & { buffer: string } {
const log: ToolingLog = new ToolingLog({
level: 'debug',
writeTo: {
write: chunk => (log.buffer += chunk),
write: chunk => ((log as any).buffer += chunk),
},
});
log.buffer = '';
return log;
(log as any).buffer = '';
return log as ToolingLog & { buffer: string };
}
function assertDeepClones(a, b) {
const path = [];
function assertDeepClones(a: any, b: any) {
const path: string[] = [];
try {
(function recurse(one, two) {
if (typeof one !== 'object' || typeof two !== 'object') {

View file

@ -17,7 +17,7 @@
* under the License.
*/
import Stream from 'stream';
import Stream, { Readable, Writable } from 'stream';
import { createGunzip } from 'zlib';
import expect from '@kbn/expect';
@ -43,11 +43,11 @@ describe('esArchiver createFormatArchiveStreams', () => {
});
it('streams consume js values and produces buffers', async () => {
const output = await createPromiseFromStreams([
const output = await createPromiseFromStreams<Buffer[]>([
createListStream(INPUTS),
...createFormatArchiveStreams({ gzip: false }),
createConcatStream([]),
]);
] as [Readable, ...Writable[]]);
expect(output.length).to.be.greaterThan(0);
output.forEach(b => expect(b).to.be.a(Buffer));
@ -58,7 +58,7 @@ describe('esArchiver createFormatArchiveStreams', () => {
createListStream(INPUTS),
...createFormatArchiveStreams({ gzip: false }),
createConcatStream(''),
]);
] as [Readable, ...Writable[]]);
expect(json).to.be(INPUT_JSON);
});
@ -73,11 +73,11 @@ describe('esArchiver createFormatArchiveStreams', () => {
});
it('streams consume js values and produces buffers', async () => {
const output = await createPromiseFromStreams([
const output = await createPromiseFromStreams<Buffer[]>([
createListStream([1, 2, { foo: 'bar' }, [1, 2]]),
...createFormatArchiveStreams({ gzip: true }),
createConcatStream([]),
]);
] as [Readable, ...Writable[]]);
expect(output.length).to.be.greaterThan(0);
output.forEach(b => expect(b).to.be.a(Buffer));
@ -89,7 +89,7 @@ describe('esArchiver createFormatArchiveStreams', () => {
...createFormatArchiveStreams({ gzip: true }),
createGunzip(),
createConcatStream(''),
]);
] as [Readable, ...Writable[]]);
expect(output).to.be(INPUT_JSON);
});
});
@ -100,7 +100,7 @@ describe('esArchiver createFormatArchiveStreams', () => {
createListStream(INPUTS),
...createFormatArchiveStreams(),
createConcatStream(''),
]);
] as [Readable, ...Writable[]]);
expect(json).to.be(INPUT_JSON);
});

View file

@ -17,7 +17,7 @@
* under the License.
*/
import Stream, { PassThrough, Transform } from 'stream';
import Stream, { PassThrough, Readable, Writable, Transform } from 'stream';
import { createGzip } from 'zlib';
import expect from '@kbn/expect';
@ -66,13 +66,13 @@ describe('esArchiver createParseArchiveStreams', () => {
]),
...createParseArchiveStreams({ gzip: false }),
createConcatStream([]),
]);
] as [Readable, ...Writable[]]);
expect(output).to.eql([{ a: 1 }, 1]);
});
it('provides each JSON object as soon as it is parsed', async () => {
let onReceived;
let onReceived: (resolved: any) => void;
const receivedPromise = new Promise(resolve => (onReceived = resolve));
const input = new PassThrough();
const check = new Transform({
@ -80,16 +80,16 @@ describe('esArchiver createParseArchiveStreams', () => {
readableObjectMode: true,
transform(chunk, env, callback) {
onReceived(chunk);
callback(null, chunk);
callback(undefined, chunk);
},
});
const finalPromise = createPromiseFromStreams([
input,
input as Readable,
...createParseArchiveStreams(),
check,
createConcatStream([]),
]);
] as [Readable, ...Writable[]]);
input.write(Buffer.from('{"a": 1}\n\n{"a":'));
expect(await receivedPromise).to.eql({ a: 1 });
@ -110,7 +110,7 @@ describe('esArchiver createParseArchiveStreams', () => {
]),
...createParseArchiveStreams({ gzip: false }),
createConcatStream(),
]);
] as [Readable, ...Writable[]]);
throw new Error('should have failed');
} catch (err) {
expect(err.message).to.contain('Unexpected number');
@ -149,7 +149,7 @@ describe('esArchiver createParseArchiveStreams', () => {
createGzip(),
...createParseArchiveStreams({ gzip: true }),
createConcatStream([]),
]);
] as [Readable, ...Writable[]]);
expect(output).to.eql([{ a: 1 }, { a: 2 }]);
});
@ -161,7 +161,7 @@ describe('esArchiver createParseArchiveStreams', () => {
createGzip(),
...createParseArchiveStreams({ gzip: true }),
createConcatStream([]),
]);
] as [Readable, ...Writable[]]);
expect(output).to.eql([]);
});
@ -173,7 +173,7 @@ describe('esArchiver createParseArchiveStreams', () => {
createListStream([Buffer.from('{"a": 1}')]),
...createParseArchiveStreams({ gzip: true }),
createConcatStream(),
]);
] as [Readable, ...Writable[]]);
throw new Error('should have failed');
} catch (err) {
expect(err.message).to.contain('incorrect header check');

View file

@ -19,7 +19,7 @@
import { basename, extname } from 'path';
export function isGzip(path) {
export function isGzip(path: string) {
return extname(path) === '.gz';
}
@ -28,7 +28,7 @@ export function isGzip(path) {
* @param {String} path
* @return {Boolean}
*/
export function isMappingFile(path) {
export function isMappingFile(path: string) {
return basename(path, '.gz') === 'mappings.json';
}
@ -41,7 +41,7 @@ export function isMappingFile(path) {
* @param {Array<String>} filenames
* @return {Array<String>}
*/
export function prioritizeMappings(filenames) {
export function prioritizeMappings(filenames: string[]) {
return filenames.slice().sort((fa, fb) => {
if (isMappingFile(fa) === isMappingFile(fb)) return 0;
return isMappingFile(fb) ? 1 : -1;

View file

@ -19,14 +19,12 @@
import { createGzip, Z_BEST_COMPRESSION } from 'zlib';
import { PassThrough } from 'stream';
import stringify from 'json-stable-stringify';
import { createMapStream, createIntersperseStream } from '../../../legacy/utils';
import { RECORD_SEPARATOR } from './constants';
export function createFormatArchiveStreams({ gzip = false } = {}) {
export function createFormatArchiveStreams({ gzip = false }: { gzip?: boolean } = {}) {
return [
createMapStream(record => stringify(record, { space: ' ' })),
createIntersperseStream(RECORD_SEPARATOR),

View file

@ -18,7 +18,5 @@
*/
export { isGzip, prioritizeMappings } from './filenames';
export { createParseArchiveStreams } from './parse';
export { createFormatArchiveStreams } from './format';

View file

@ -29,7 +29,7 @@ export function createParseArchiveStreams({ gzip = false } = {}) {
gzip ? createGunzip() : new PassThrough(),
createReplaceStream('\r\n', '\n'),
createSplitStream(RECORD_SEPARATOR),
createFilterStream(l => l.match(/[^\s]/)),
createMapStream(json => JSON.parse(json.trim())),
createFilterStream<string>(l => !!l.match(/[^\s]/)),
createMapStream<string>(json => JSON.parse(json.trim())),
];
}

View file

@ -18,10 +18,9 @@
*/
import { readdir } from 'fs';
import { fromNode } from 'bluebird';
export async function readDirectory(path) {
const allNames = await fromNode(cb => readdir(path, cb));
export async function readDirectory(path: string) {
const allNames = await fromNode<string[]>(cb => readdir(path, cb));
return allNames.filter(name => !name.startsWith('.'));
}

View file

@ -143,7 +143,7 @@ describe('esArchiver: createGenerateDocRecordsStream()', () => {
},
},
]);
sinon.assert.calledTwice(stats.archivedDoc);
sinon.assert.calledTwice(stats.archivedDoc as any);
expect(progress.getTotal()).to.be(2);
expect(progress.getComplete()).to.be(2);
});

View file

@ -26,12 +26,12 @@ import { Progress } from '../../progress';
import { createIndexDocRecordsStream } from '../index_doc_records_stream';
import { createStubStats, createStubClient, createPersonDocRecords } from './stubs';
const recordsToBulkBody = records => {
const recordsToBulkBody = (records: any[]) => {
return records.reduce((acc, record) => {
const { index, id, source } = record.value;
return [...acc, { index: { _index: index, _id: id } }, source];
}, []);
}, [] as any[]);
};
describe('esArchiver: createIndexDocRecordsStream()', () => {

View file

@ -17,17 +17,22 @@
* under the License.
*/
import { Client } from 'elasticsearch';
import sinon from 'sinon';
import Chance from 'chance';
import { times } from 'lodash';
import { Stats } from '../../stats';
const chance = new Chance();
export const createStubStats = () => ({
indexedDoc: sinon.stub(),
archivedDoc: sinon.stub(),
});
export const createStubStats = (): Stats =>
({
indexedDoc: sinon.stub(),
archivedDoc: sinon.stub(),
} as any);
export const createPersonDocRecords = n =>
export const createPersonDocRecords = (n: number) =>
times(n, () => ({
type: 'doc',
value: {
@ -42,15 +47,21 @@ export const createPersonDocRecords = n =>
},
}));
export const createStubClient = (responses = []) => {
const createStubClientMethod = name =>
type MockClient = Client & {
assertNoPendingResponses: () => void;
};
export const createStubClient = (
responses: Array<(name: string, params: any) => any | Promise<any>> = []
): MockClient => {
const createStubClientMethod = (name: string) =>
sinon.spy(async params => {
if (responses.length === 0) {
throw new Error(`unexpected client.${name} call`);
}
const response = responses.shift();
return await response(name, params);
return await response!(name, params);
});
return {
@ -63,5 +74,5 @@ export const createStubClient = (responses = []) => {
throw new Error(`There are ${responses.length} unsent responses.`);
}
},
};
} as any;
};

View file

@ -18,33 +18,36 @@
*/
import { Transform } from 'stream';
import { Client, SearchParams, SearchResponse } from 'elasticsearch';
import { Stats } from '../stats';
import { Progress } from '../progress';
const SCROLL_SIZE = 1000;
const SCROLL_TIMEOUT = '1m';
export function createGenerateDocRecordsStream(client, stats, progress) {
export function createGenerateDocRecordsStream(client: Client, stats: Stats, progress: Progress) {
return new Transform({
writableObjectMode: true,
readableObjectMode: true,
async transform(index, enc, callback) {
try {
let remainingHits = null;
let resp = null;
let remainingHits = 0;
let resp: SearchResponse<any> | null = null;
while (!resp || remainingHits > 0) {
if (!resp) {
resp = await client.search({
index: index,
index,
scroll: SCROLL_TIMEOUT,
size: SCROLL_SIZE,
_source: true,
rest_total_hits_as_int: true,
});
rest_total_hits_as_int: true, // not declared on SearchParams type
} as SearchParams);
remainingHits = resp.hits.total;
progress.addToTotal(remainingHits);
} else {
resp = await client.scroll({
scrollId: resp._scroll_id,
scrollId: resp._scroll_id!,
scroll: SCROLL_TIMEOUT,
});
}
@ -68,7 +71,7 @@ export function createGenerateDocRecordsStream(client, stats, progress) {
progress.addToComplete(resp.hits.hits.length);
}
callback(null);
callback(undefined);
} catch (err) {
callback(err);
}

View file

@ -17,11 +17,14 @@
* under the License.
*/
import { Client } from 'elasticsearch';
import { Writable } from 'stream';
import { Stats } from '../stats';
import { Progress } from '../progress';
export function createIndexDocRecordsStream(client, stats, progress) {
async function indexDocs(docs) {
const body = [];
export function createIndexDocRecordsStream(client: Client, stats: Stats, progress: Progress) {
async function indexDocs(docs: any[]) {
const body: any[] = [];
docs.forEach(doc => {
stats.indexedDoc(doc.index);

View file

@ -30,7 +30,7 @@ export {
export { createFilterRecordsStream } from './records';
export { createStats } from './stats';
export { createStats, Stats } from './stats';
export {
isGzip,

View file

@ -34,10 +34,13 @@ import {
createStubIndexRecord,
createStubDocRecord,
createStubClient,
createStubLogger,
} from './stubs';
const chance = new Chance();
const log = createStubLogger();
describe('esArchiver: createCreateIndexStream()', () => {
describe('defaults', () => {
it('deletes existing indices, creates all', async () => {
@ -48,15 +51,15 @@ describe('esArchiver: createCreateIndexStream()', () => {
createStubIndexRecord('existing-index'),
createStubIndexRecord('new-index'),
]),
createCreateIndexStream({ client, stats }),
createCreateIndexStream({ client, stats, log }),
]);
expect(stats.getTestSummary()).to.eql({
deletedIndex: 1,
createdIndex: 2,
});
sinon.assert.callCount(client.indices.delete, 1);
sinon.assert.callCount(client.indices.create, 3); // one failed create because of existing
sinon.assert.callCount(client.indices.delete as sinon.SinonSpy, 1);
sinon.assert.callCount(client.indices.create as sinon.SinonSpy, 3); // one failed create because of existing
});
it('deletes existing aliases, creates all', async () => {
@ -67,14 +70,19 @@ describe('esArchiver: createCreateIndexStream()', () => {
createStubIndexRecord('existing-index'),
createStubIndexRecord('new-index'),
]),
createCreateIndexStream({ client, stats, log: { debug: () => {} } }),
createCreateIndexStream({ client, stats, log }),
]);
expect(client.indices.getAlias.calledOnce).to.be.ok();
expect(client.indices.getAlias.args[0][0]).to.eql({ name: 'existing-index', ignore: [404] });
expect(client.indices.delete.calledOnce).to.be.ok();
expect(client.indices.delete.args[0][0]).to.eql({ index: ['actual-index'] });
sinon.assert.callCount(client.indices.create, 3); // one failed create because of existing
expect((client.indices.getAlias as sinon.SinonSpy).calledOnce).to.be.ok();
expect((client.indices.getAlias as sinon.SinonSpy).args[0][0]).to.eql({
name: 'existing-index',
ignore: [404],
});
expect((client.indices.delete as sinon.SinonSpy).calledOnce).to.be.ok();
expect((client.indices.delete as sinon.SinonSpy).args[0][0]).to.eql({
index: ['actual-index'],
});
sinon.assert.callCount(client.indices.create as sinon.SinonSpy, 3); // one failed create because of existing
});
it('passes through "hit" records', async () => {
@ -86,7 +94,7 @@ describe('esArchiver: createCreateIndexStream()', () => {
createStubDocRecord('index', 1),
createStubDocRecord('index', 2),
]),
createCreateIndexStream({ client, stats }),
createCreateIndexStream({ client, stats, log }),
createConcatStream([]),
]);
@ -101,11 +109,11 @@ describe('esArchiver: createCreateIndexStream()', () => {
createStubIndexRecord('index', { foo: {} }),
createStubDocRecord('index', 1),
]),
createCreateIndexStream({ client, stats }),
createCreateIndexStream({ client, stats, log }),
createConcatStream([]),
]);
sinon.assert.calledWith(client.indices.create, {
sinon.assert.calledWith(client.indices.create as sinon.SinonSpy, {
method: 'PUT',
index: 'index',
body: {
@ -126,7 +134,7 @@ describe('esArchiver: createCreateIndexStream()', () => {
const output = await createPromiseFromStreams([
createListStream([createStubIndexRecord('index'), ...randoms]),
createCreateIndexStream({ client, stats }),
createCreateIndexStream({ client, stats, log }),
createConcatStream([]),
]);
@ -140,7 +148,7 @@ describe('esArchiver: createCreateIndexStream()', () => {
const output = await createPromiseFromStreams([
createListStream(nonRecordValues),
createCreateIndexStream({ client, stats }),
createCreateIndexStream({ client, stats, log }),
createConcatStream([]),
]);
@ -161,6 +169,7 @@ describe('esArchiver: createCreateIndexStream()', () => {
createCreateIndexStream({
client,
stats,
log,
skipExisting: true,
}),
]);
@ -169,9 +178,12 @@ describe('esArchiver: createCreateIndexStream()', () => {
skippedIndex: 1,
createdIndex: 1,
});
sinon.assert.callCount(client.indices.delete, 0);
sinon.assert.callCount(client.indices.create, 2); // one failed create because of existing
expect(client.indices.create.args[0][0]).to.have.property('index', 'new-index');
sinon.assert.callCount(client.indices.delete as sinon.SinonSpy, 0);
sinon.assert.callCount(client.indices.create as sinon.SinonSpy, 2); // one failed create because of existing
expect((client.indices.create as sinon.SinonSpy).args[0][0]).to.have.property(
'index',
'new-index'
);
});
it('filters documents for skipped indices', async () => {
@ -190,6 +202,7 @@ describe('esArchiver: createCreateIndexStream()', () => {
createCreateIndexStream({
client,
stats,
log,
skipExisting: true,
}),
createConcatStream([]),
@ -199,8 +212,8 @@ describe('esArchiver: createCreateIndexStream()', () => {
skippedIndex: 1,
createdIndex: 1,
});
sinon.assert.callCount(client.indices.delete, 0);
sinon.assert.callCount(client.indices.create, 2); // one failed create because of existing
sinon.assert.callCount(client.indices.delete as sinon.SinonSpy, 0);
sinon.assert.callCount(client.indices.create as sinon.SinonSpy, 2); // one failed create because of existing
expect(output).to.have.length(2);
expect(output).to.eql([

View file

@ -23,7 +23,14 @@ import { createListStream, createPromiseFromStreams } from '../../../../legacy/u
import { createDeleteIndexStream } from '../delete_index_stream';
import { createStubStats, createStubClient, createStubIndexRecord } from './stubs';
import {
createStubStats,
createStubClient,
createStubIndexRecord,
createStubLogger,
} from './stubs';
const log = createStubLogger();
describe('esArchiver: createDeleteIndexStream()', () => {
it('deletes the index without checking if it exists', async () => {
@ -32,13 +39,13 @@ describe('esArchiver: createDeleteIndexStream()', () => {
await createPromiseFromStreams([
createListStream([createStubIndexRecord('index1')]),
createDeleteIndexStream(client, stats),
createDeleteIndexStream(client, stats, log, []),
]);
sinon.assert.notCalled(stats.deletedIndex);
sinon.assert.notCalled(client.indices.create);
sinon.assert.calledOnce(client.indices.delete);
sinon.assert.notCalled(client.indices.exists);
sinon.assert.notCalled(stats.deletedIndex as sinon.SinonSpy);
sinon.assert.notCalled(client.indices.create as sinon.SinonSpy);
sinon.assert.calledOnce(client.indices.delete as sinon.SinonSpy);
sinon.assert.notCalled(client.indices.exists as sinon.SinonSpy);
});
it('reports the delete when the index existed', async () => {
@ -47,12 +54,12 @@ describe('esArchiver: createDeleteIndexStream()', () => {
await createPromiseFromStreams([
createListStream([createStubIndexRecord('index1')]),
createDeleteIndexStream(client, stats),
createDeleteIndexStream(client, stats, log, []),
]);
sinon.assert.calledOnce(stats.deletedIndex);
sinon.assert.notCalled(client.indices.create);
sinon.assert.calledOnce(client.indices.delete);
sinon.assert.notCalled(client.indices.exists);
sinon.assert.calledOnce(stats.deletedIndex as sinon.SinonSpy);
sinon.assert.notCalled(client.indices.create as sinon.SinonSpy);
sinon.assert.calledOnce(client.indices.delete as sinon.SinonSpy);
sinon.assert.notCalled(client.indices.exists as sinon.SinonSpy);
});
});

View file

@ -45,10 +45,10 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
archivedIndex: 4,
});
sinon.assert.callCount(client.indices.get, 4);
sinon.assert.notCalled(client.indices.create);
sinon.assert.notCalled(client.indices.delete);
sinon.assert.notCalled(client.indices.exists);
sinon.assert.callCount(client.indices.get as sinon.SinonSpy, 4);
sinon.assert.notCalled(client.indices.create as sinon.SinonSpy);
sinon.assert.notCalled(client.indices.delete as sinon.SinonSpy);
sinon.assert.notCalled(client.indices.exists as sinon.SinonSpy);
});
it('filters index metadata from settings', async () => {
@ -60,9 +60,9 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
createGenerateIndexRecordsStream(client, stats),
]);
const params = client.indices.get.args[0][0];
const params = (client.indices.get as sinon.SinonSpy).args[0][0];
expect(params).to.have.property('filterPath');
const filters = params.filterPath;
const filters: string[] = params.filterPath;
expect(filters.some(path => path.includes('index.creation_date'))).to.be(true);
expect(filters.some(path => path.includes('index.uuid'))).to.be(true);
expect(filters.some(path => path.includes('index.version'))).to.be(true);
@ -73,7 +73,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
const stats = createStubStats();
const client = createStubClient(['index1', 'index2', 'index3']);
const indexRecords = await createPromiseFromStreams([
const indexRecords = await createPromiseFromStreams<any[]>([
createListStream(['index1', 'index2', 'index3']),
createGenerateIndexRecordsStream(client, stats),
createConcatStream([]),

View file

@ -1,128 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import sinon from 'sinon';
export const createStubStats = () => ({
createdIndex: sinon.stub(),
createdAliases: sinon.stub(),
deletedIndex: sinon.stub(),
skippedIndex: sinon.stub(),
archivedIndex: sinon.stub(),
getTestSummary() {
const summary = {};
Object.keys(this).forEach(key => {
if (this[key].callCount) {
summary[key] = this[key].callCount;
}
});
return summary;
},
});
export const createStubIndexRecord = (index, aliases = {}) => ({
type: 'index',
value: { index, aliases },
});
export const createStubDocRecord = (index, id) => ({
type: 'doc',
value: { index, id },
});
const createEsClientError = errorType => {
const err = new Error(`ES Client Error Stub "${errorType}"`);
err.body = {
error: {
type: errorType,
},
};
return err;
};
const indexAlias = (aliases, index) => Object.keys(aliases).find(k => aliases[k] === index);
export const createStubClient = (existingIndices = [], aliases = {}) => ({
indices: {
get: sinon.spy(async ({ index }) => {
if (!existingIndices.includes(index)) {
throw createEsClientError('index_not_found_exception');
}
return {
[index]: {
mappings: {},
settings: {},
},
};
}),
existsAlias: sinon.spy(({ name }) => {
return Promise.resolve(aliases.hasOwnProperty(name));
}),
getAlias: sinon.spy(async ({ index, name }) => {
if (index && existingIndices.indexOf(index) >= 0) {
const result = indexAlias(aliases, index);
return { [index]: { aliases: result ? { [result]: {} } : {} } };
}
if (name && aliases[name]) {
return { [aliases[name]]: { aliases: { [name]: {} } } };
}
return { status: 404 };
}),
updateAliases: sinon.spy(async ({ body }) => {
body.actions.forEach(({ add: { index, alias } }) => {
if (!existingIndices.includes(index)) {
throw createEsClientError('index_not_found_exception');
}
existingIndices.push({ index, alias });
});
return { ok: true };
}),
create: sinon.spy(async ({ index }) => {
if (existingIndices.includes(index) || aliases.hasOwnProperty(index)) {
throw createEsClientError('resource_already_exists_exception');
} else {
existingIndices.push(index);
return { ok: true };
}
}),
delete: sinon.spy(async ({ index }) => {
const indices = Array.isArray(index) ? index : [index];
if (indices.every(ix => existingIndices.includes(ix))) {
// Delete aliases associated with our indices
indices.forEach(ix => {
const alias = Object.keys(aliases).find(k => aliases[k] === ix);
if (alias) {
delete aliases[alias];
}
});
indices.forEach(ix => existingIndices.splice(existingIndices.indexOf(ix), 1));
return { ok: true };
} else {
throw createEsClientError('index_not_found_exception');
}
}),
exists: sinon.spy(async () => {
throw new Error('Do not use indices.exists(). React to errors instead.');
}),
},
});

View file

@ -0,0 +1,154 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Client } from 'elasticsearch';
import sinon from 'sinon';
import { ToolingLog } from '@kbn/dev-utils';
import { Stats } from '../../stats';
type StubStats = Stats & {
getTestSummary: () => Record<string, number>;
};
export const createStubStats = (): StubStats =>
({
createdIndex: sinon.stub(),
createdAliases: sinon.stub(),
deletedIndex: sinon.stub(),
skippedIndex: sinon.stub(),
archivedIndex: sinon.stub(),
getTestSummary() {
const summary: Record<string, number> = {};
Object.keys(this).forEach(key => {
if (this[key].callCount) {
summary[key] = this[key].callCount;
}
});
return summary;
},
} as any);
export const createStubLogger = (): ToolingLog =>
({
debug: sinon.stub(),
info: sinon.stub(),
success: sinon.stub(),
warning: sinon.stub(),
error: sinon.stub(),
} as any);
export const createStubIndexRecord = (index: string, aliases = {}) => ({
type: 'index',
value: { index, aliases },
});
export const createStubDocRecord = (index: string, id: number) => ({
type: 'doc',
value: { index, id },
});
const createEsClientError = (errorType: string) => {
const err = new Error(`ES Client Error Stub "${errorType}"`);
(err as any).body = {
error: {
type: errorType,
},
};
return err;
};
const indexAlias = (aliases: Record<string, any>, index: string) =>
Object.keys(aliases).find(k => aliases[k] === index);
type StubClient = Client;
export const createStubClient = (
existingIndices: string[] = [],
aliases: Record<string, any> = {}
): StubClient =>
({
indices: {
get: sinon.spy(async ({ index }) => {
if (!existingIndices.includes(index)) {
throw createEsClientError('index_not_found_exception');
}
return {
[index]: {
mappings: {},
settings: {},
},
};
}),
existsAlias: sinon.spy(({ name }) => {
return Promise.resolve(aliases.hasOwnProperty(name));
}),
getAlias: sinon.spy(async ({ index, name }) => {
if (index && existingIndices.indexOf(index) >= 0) {
const result = indexAlias(aliases, index);
return { [index]: { aliases: result ? { [result]: {} } : {} } };
}
if (name && aliases[name]) {
return { [aliases[name]]: { aliases: { [name]: {} } } };
}
return { status: 404 };
}),
updateAliases: sinon.spy(async ({ body }) => {
body.actions.forEach(
({ add: { index, alias } }: { add: { index: string; alias: string } }) => {
if (!existingIndices.includes(index)) {
throw createEsClientError('index_not_found_exception');
}
existingIndices.push({ index, alias } as any);
}
);
return { ok: true };
}),
create: sinon.spy(async ({ index }) => {
if (existingIndices.includes(index) || aliases.hasOwnProperty(index)) {
throw createEsClientError('resource_already_exists_exception');
} else {
existingIndices.push(index);
return { ok: true };
}
}),
delete: sinon.spy(async ({ index }) => {
const indices = Array.isArray(index) ? index : [index];
if (indices.every(ix => existingIndices.includes(ix))) {
// Delete aliases associated with our indices
indices.forEach(ix => {
const alias = Object.keys(aliases).find(k => aliases[k] === ix);
if (alias) {
delete aliases[alias];
}
});
indices.forEach(ix => existingIndices.splice(existingIndices.indexOf(ix), 1));
return { ok: true };
} else {
throw createEsClientError('index_not_found_exception');
}
}),
exists: sinon.spy(async () => {
throw new Error('Do not use indices.exists(). React to errors instead.');
}),
},
} as any);

View file

@ -17,13 +17,36 @@
* under the License.
*/
import { Transform } from 'stream';
import { Transform, Readable } from 'stream';
import { get, once } from 'lodash';
import { Client } from 'elasticsearch';
import { ToolingLog } from '@kbn/dev-utils';
import { Stats } from '../stats';
import { deleteKibanaIndices } from './kibana_index';
import { deleteIndex } from './delete_index';
export function createCreateIndexStream({ client, stats, skipExisting, log }) {
interface DocRecord {
value: {
index: string;
type: string;
settings: Record<string, any>;
mappings: Record<string, any>;
aliases: Record<string, any>;
};
}
export function createCreateIndexStream({
client,
stats,
skipExisting = false,
log,
}: {
client: Client;
stats: Stats;
skipExisting?: boolean;
log: ToolingLog;
}) {
const skipDocsFromIndices = new Set();
// If we're trying to import Kibana index docs, we need to ensure that
@ -31,7 +54,7 @@ export function createCreateIndexStream({ client, stats, skipExisting, log }) {
// migrations. This only needs to be done once per archive load operation.
const deleteKibanaIndicesOnce = once(deleteKibanaIndices);
async function handleDoc(stream, record) {
async function handleDoc(stream: Readable, record: DocRecord) {
if (skipDocsFromIndices.has(record.value.index)) {
return;
}
@ -39,7 +62,7 @@ export function createCreateIndexStream({ client, stats, skipExisting, log }) {
stream.push(record);
}
async function handleIndex(record) {
async function handleIndex(record: DocRecord) {
const { index, settings, mappings, aliases } = record.value;
const isKibana = index.startsWith('.kibana');
@ -102,7 +125,7 @@ export function createCreateIndexStream({ client, stats, skipExisting, log }) {
break;
}
callback(null);
callback();
} catch (err) {
callback(err);
}

View file

@ -18,22 +18,34 @@
*/
import { get } from 'lodash';
import { Client } from 'elasticsearch';
import { ToolingLog } from '@kbn/dev-utils';
import { Stats } from '../stats';
// see https://github.com/elastic/elasticsearch/blob/99f88f15c5febbca2d13b5b5fda27b844153bf1a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java#L313-L319
const PENDING_SNAPSHOT_STATUSES = ['INIT', 'STARTED', 'WAITING'];
export async function deleteIndex(options) {
export async function deleteIndex(options: {
client: Client;
stats: Stats;
index: string;
log: ToolingLog;
retryIfSnapshottingCount?: number;
}): Promise<void> {
const { client, stats, index, log, retryIfSnapshottingCount = 10 } = options;
const getIndicesToDelete = async () => {
const aliasInfo = await client.indices.getAlias({ name: index, ignore: [404] });
return aliasInfo.status === 404 ? index : Object.keys(aliasInfo);
return aliasInfo.status === 404 ? [index] : Object.keys(aliasInfo);
};
try {
const indicesToDelete = await getIndicesToDelete();
await client.indices.delete({ index: indicesToDelete });
stats.deletedIndex(indicesToDelete);
for (let i = 0; i < indicesToDelete.length; i++) {
const indexToDelete = indicesToDelete[i];
stats.deletedIndex(indexToDelete);
}
} catch (error) {
if (retryIfSnapshottingCount > 0 && isDeleteWhileSnapshotInProgressError(error)) {
stats.waitingForInProgressSnapshot(index);
@ -56,7 +68,7 @@ export async function deleteIndex(options) {
* @param {Error} error
* @return {Boolean}
*/
export function isDeleteWhileSnapshotInProgressError(error) {
export function isDeleteWhileSnapshotInProgressError(error: object) {
return get(error, 'body.error.reason', '').startsWith(
'Cannot delete indices that are being snapshotted'
);
@ -65,13 +77,9 @@ export function isDeleteWhileSnapshotInProgressError(error) {
/**
* Wait for the any snapshot in any repository that is
* snapshotting this index to complete.
*
* @param {EsClient} client
* @param {string} index the name of the index to look for
* @return {Promise<undefined>}
*/
export async function waitForSnapshotCompletion(client, index, log) {
const isSnapshotPending = async (repository, snapshot) => {
export async function waitForSnapshotCompletion(client: Client, index: string, log: ToolingLog) {
const isSnapshotPending = async (repository: string, snapshot: string) => {
const {
snapshots: [status],
} = await client.snapshot.status({
@ -83,7 +91,7 @@ export async function waitForSnapshotCompletion(client, index, log) {
return PENDING_SNAPSHOT_STATUSES.includes(status.state);
};
const getInProgressSnapshots = async repository => {
const getInProgressSnapshots = async (repository: string) => {
const { snapshots: inProgressSnapshots } = await client.snapshot.get({
repository,
snapshot: '_current',
@ -91,9 +99,9 @@ export async function waitForSnapshotCompletion(client, index, log) {
return inProgressSnapshots;
};
for (const repository of Object.keys(await client.snapshot.getRepository())) {
for (const repository of Object.keys(await client.snapshot.getRepository({} as any))) {
const allInProgress = await getInProgressSnapshots(repository);
const found = allInProgress.find(s => s.indices.includes(index));
const found = allInProgress.find((s: any) => s.indices.includes(index));
if (!found) {
continue;

View file

@ -18,11 +18,19 @@
*/
import { Transform } from 'stream';
import { Client } from 'elasticsearch';
import { ToolingLog } from '@kbn/dev-utils';
import { Stats } from '../stats';
import { deleteIndex } from './delete_index';
import { cleanKibanaIndices } from './kibana_index';
export function createDeleteIndexStream(client, stats, log, kibanaPluginIds) {
export function createDeleteIndexStream(
client: Client,
stats: Stats,
log: ToolingLog,
kibanaPluginIds: string[]
) {
return new Transform({
readableObjectMode: true,
writableObjectMode: true,

View file

@ -18,14 +18,16 @@
*/
import { Transform } from 'stream';
import { Client } from 'elasticsearch';
import { Stats } from '../stats';
export function createGenerateIndexRecordsStream(client, stats) {
export function createGenerateIndexRecordsStream(client: Client, stats: Stats) {
return new Transform({
writableObjectMode: true,
readableObjectMode: true,
async transform(indexOrAlias, enc, callback) {
try {
const resp = await client.indices.get({
const resp = (await client.indices.get({
index: indexOrAlias,
filterPath: [
'*.settings',
@ -36,7 +38,7 @@ export function createGenerateIndexRecordsStream(client, stats) {
'-*.settings.index.version',
'-*.settings.index.provided_name',
],
});
})) as Record<string, any>;
for (const [index, { settings, mappings }] of Object.entries(resp)) {
const {

View file

@ -17,29 +17,34 @@
* under the License.
*/
import _ from 'lodash';
import { get } from 'lodash';
import fs from 'fs';
import path from 'path';
import Path from 'path';
import { promisify } from 'util';
import { toArray } from 'rxjs/operators';
import { Client, CreateDocumentParams } from 'elasticsearch';
import { ToolingLog } from '@kbn/dev-utils';
import { Stats } from '../stats';
import { deleteIndex } from './delete_index';
import { collectUiExports } from '../../../legacy/ui/ui_exports';
import { KibanaMigrator } from '../../../core/server/saved_objects/migrations';
import { SavedObjectsSchema } from '../../../core/server/saved_objects';
// @ts-ignore
import { collectUiExports } from '../../../legacy/ui/ui_exports';
// @ts-ignore
import { findPluginSpecs } from '../../../legacy/plugin_discovery';
/**
* Load the uiExports for a Kibana instance, only load uiExports from xpack if
* it is enabled in the Kibana server.
*/
const getUiExports = async kibanaPluginIds => {
const getUiExports = async (kibanaPluginIds: string[]) => {
const xpackEnabled = kibanaPluginIds.includes('xpack_main');
const { spec$ } = await findPluginSpecs({
plugins: {
scanDirs: [path.resolve(__dirname, '../../../legacy/core_plugins')],
paths: xpackEnabled ? [path.resolve(__dirname, '../../../../x-pack')] : [],
scanDirs: [Path.resolve(__dirname, '../../../legacy/core_plugins')],
paths: xpackEnabled ? [Path.resolve(__dirname, '../../../../x-pack')] : [],
},
});
@ -50,7 +55,15 @@ const getUiExports = async kibanaPluginIds => {
/**
* Deletes all indices that start with `.kibana`
*/
export async function deleteKibanaIndices({ client, stats, log }) {
export async function deleteKibanaIndices({
client,
stats,
log,
}: {
client: Client;
stats: Stats;
log: ToolingLog;
}) {
const indexNames = await fetchKibanaIndices(client);
if (!indexNames.length) {
return;
@ -76,37 +89,52 @@ export async function deleteKibanaIndices({ client, stats, log }) {
* builds up an object that implements just enough of the kbnMigrations interface
* as is required by migrations.
*/
export async function migrateKibanaIndex({ client, log, kibanaPluginIds }) {
export async function migrateKibanaIndex({
client,
log,
kibanaPluginIds,
}: {
client: Client;
log: ToolingLog;
kibanaPluginIds: string[];
}) {
const uiExports = await getUiExports(kibanaPluginIds);
const kibanaVersion = await loadKibanaVersion();
const config = {
const config: Record<string, string> = {
'xpack.task_manager.index': '.kibana_task_manager',
};
const logger = {
trace: log.verbose.bind(log),
debug: log.debug.bind(log),
info: log.info.bind(log),
warn: log.warning.bind(log),
error: log.error.bind(log),
fatal: log.error.bind(log),
log: (entry: any) => log.info(entry.message),
get: () => logger,
};
const migratorOptions = {
config: { get: path => config[path] },
config: { get: (path: string) => config[path] } as any,
savedObjectsConfig: {
scrollDuration: '5m',
batchSize: 100,
pollInterval: 100,
skip: false,
},
kibanaConfig: {
index: '.kibana',
},
logger: {
trace: log.verbose.bind(log),
debug: log.debug.bind(log),
info: log.info.bind(log),
warn: log.warning.bind(log),
error: log.error.bind(log),
},
version: kibanaVersion,
} as any,
logger,
kibanaVersion,
savedObjectSchemas: new SavedObjectsSchema(uiExports.savedObjectSchemas),
savedObjectMappings: uiExports.savedObjectMappings,
savedObjectMigrations: uiExports.savedObjectMigrations,
savedObjectValidations: uiExports.savedObjectValidations,
callCluster: (path, ...args) => _.get(client, path).call(client, ...args),
callCluster: (path: string, ...args: any[]) =>
(get(client, path) as Function).call(client, ...args),
};
return await new KibanaMigrator(migratorOptions).runMigrations();
@ -114,8 +142,8 @@ export async function migrateKibanaIndex({ client, log, kibanaPluginIds }) {
async function loadKibanaVersion() {
const readFile = promisify(fs.readFile);
const packageJson = await readFile(path.join(__dirname, '../../../../package.json'));
return JSON.parse(packageJson).version;
const packageJson = await readFile(Path.join(__dirname, '../../../../package.json'));
return JSON.parse(packageJson.toString('utf-8')).version;
}
/**
@ -123,16 +151,24 @@ async function loadKibanaVersion() {
* .kibana, .kibana_1, .kibana_323, etc. This finds all indices starting
* with .kibana, then filters out any that aren't actually Kibana's core
* index (e.g. we don't want to remove .kibana_task_manager or the like).
*
* @param {string} index
*/
async function fetchKibanaIndices(client) {
async function fetchKibanaIndices(client: Client) {
const kibanaIndices = await client.cat.indices({ index: '.kibana*', format: 'json' });
const isKibanaIndex = index => /^\.kibana(:?_\d*)?$/.test(index);
return kibanaIndices.map(x => x.index).filter(isKibanaIndex);
const isKibanaIndex = (index: string) => /^\.kibana(:?_\d*)?$/.test(index);
return kibanaIndices.map((x: { index: string }) => x.index).filter(isKibanaIndex);
}
export async function cleanKibanaIndices({ client, stats, log, kibanaPluginIds }) {
export async function cleanKibanaIndices({
client,
stats,
log,
kibanaPluginIds,
}: {
client: Client;
stats: Stats;
log: ToolingLog;
kibanaPluginIds: string[];
}) {
if (!kibanaPluginIds.includes('spaces')) {
return await deleteKibanaIndices({
client,
@ -178,7 +214,7 @@ export async function cleanKibanaIndices({ client, stats, log, kibanaPluginIds }
stats.deletedIndex('.kibana');
}
export async function createDefaultSpace({ index, client }) {
export async function createDefaultSpace({ index, client }: { index: string; client: Client }) {
await client.create({
index,
id: 'space:default',
@ -193,5 +229,5 @@ export async function createDefaultSpace({ index, client }) {
_reserved: true,
},
},
});
} as CreateDocumentParams);
}

View file

@ -51,7 +51,7 @@ describe('esArchiver: createFilterRecordsStream()', () => {
it('produces record values that have a matching type', async () => {
const type1 = chance.word({ length: 5 });
const output = await createPromiseFromStreams([
const output = await createPromiseFromStreams<any[]>([
createListStream([
{ type: type1, value: {} },
{ type: type1, value: {} },

View file

@ -19,14 +19,14 @@
import { Transform } from 'stream';
export function createFilterRecordsStream(type) {
export function createFilterRecordsStream(type: string) {
return new Transform({
writableObjectMode: true,
readableObjectMode: true,
transform(record, enc, callback) {
if (record && record.type === type) {
callback(null, record);
callback(undefined, record);
} else {
callback();
}

View file

@ -37,6 +37,8 @@ export interface IndexStats {
};
}
export type Stats = ReturnType<typeof createStats>;
export function createStats(name: string, log: ToolingLog) {
const info = (msg: string, ...args: any[]) => log.info(`[${name}] ${msg}`, ...args);
const debug = (msg: string, ...args: any[]) => log.debug(`[${name}] ${msg}`, ...args);

View file

@ -18,3 +18,16 @@
*/
export function unset(object: object, rawPath: string): void;
export {
concatStreamProviders,
createConcatStream,
createFilterStream,
createIntersperseStream,
createListStream,
createMapStream,
createPromiseFromStreams,
createReduceStream,
createReplaceStream,
createSplitStream,
} from './streams';

View file

@ -20,17 +20,17 @@
import { Readable, Transform, Writable, TransformOptions } from 'stream';
export function concatStreamProviders(
sourceProviders: Readable[],
sourceProviders: Array<() => Readable>,
options: TransformOptions
): Transform;
export function createIntersperseStream(intersperseChunk: string | Buffer): Transform;
export function createSplitStream<T>(splitChunk: T): Transform;
export function createListStream(items: any[]): Readable;
export function createListStream(items: any | any[]): Readable;
export function createReduceStream<T>(reducer: (value: any, chunk: T, enc: string) => T): Transform;
export function createPromiseFromStreams<T>([first, ...rest]: [Readable, ...Writable[]]): Promise<
T
>;
export function createConcatStream(initial: any): Transform;
export function createConcatStream(initial?: any): Transform;
export function createMapStream<T>(fn: (value: T, i: number) => void): Transform;
export function createReplaceStream(toReplace: string, replacement: string | Buffer): Transform;
export function createFilterStream<T>(fn: (obj: T) => boolean): Transform;