[7.x] [esArchiver/edit] add command for temporarily extracting archives (#30472) (#30479)

* [esArchiver/edit] add command for temporarily extracting archives (#30472)

* [esArchiver/edit] add command for temporarily extracting archives

* [esArchiver/edit] fix method description

* [esArchiver/edit] fix return value doc

* [esArchiver] support editing all or sections of archives

* fix import
This commit is contained in:
Spencer 2019-02-12 08:19:17 -08:00 committed by GitHub
parent 00a7ae5326
commit 8ff5ecbf2a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 112 additions and 0 deletions

View file

@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { resolve, relative } from 'path';
import Fs from 'fs';
import { createGunzip, createGzip, Z_BEST_COMPRESSION } from 'zlib';
import { promisify } from 'util';
import globby from 'globby';
import { createPromiseFromStreams } from '../../legacy/utils';
const unlinkAsync = promisify(Fs.unlink);
export async function editAction({ prefix, dataDir, log, handler }) {
const archives = (
await globby('**/*.gz', {
cwd: prefix ? resolve(dataDir, prefix) : dataDir,
absolute: true
})
).map(path => ({
path,
rawPath: path.slice(0, -3)
}));
await Promise.all(archives.map(async archive => {
await createPromiseFromStreams([
Fs.createReadStream(archive.path),
createGunzip(),
Fs.createWriteStream(archive.rawPath)
]);
await unlinkAsync(archive.path);
log.info(
`Extracted %s to %s`,
relative(process.cwd(), archive.path),
relative(process.cwd(), archive.rawPath)
);
}));
await handler();
await Promise.all(archives.map(async archive => {
await createPromiseFromStreams([
Fs.createReadStream(archive.rawPath),
createGzip({ level: Z_BEST_COMPRESSION }),
Fs.createWriteStream(archive.path)
]);
await unlinkAsync(archive.rawPath);
log.info(
`Archived %s to %s`,
relative(process.cwd(), archive.rawPath),
relative(process.cwd(), archive.path)
);
}));
}

View file

@ -22,3 +22,4 @@ export { loadAction } from './load';
export { unloadAction } from './unload';
export { rebuildAllAction } from './rebuild_all';
export { emptyKibanaIndexAction } from './empty_kibana_index';
export { editAction } from './edit';

View file

@ -26,6 +26,7 @@
import { resolve } from 'path';
import { readFileSync } from 'fs';
import { format as formatUrl } from 'url';
import readline from 'readline';
import { Command } from 'commander';
import elasticsearch from 'elasticsearch';
@ -68,6 +69,24 @@ cmd.command('empty-kibana-index')
.description('[internal] Delete any Kibana indices, and initialize the Kibana index as Kibana would do on startup.')
.action(() => execute(archiver => archiver.emptyKibanaIndex()));
cmd.command('edit [prefix]')
.description('extract the archives under the prefix, wait for edits to be completed, and then recompress the archives')
.action(prefix => (
execute(archiver => archiver.edit(prefix, async () => {
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout
});
await new Promise(resolve => {
rl.question(`Press enter when you're done`, () => {
rl.close();
resolve();
});
});
}))
));
cmd.command('rebuild-all')
.description('[internal] read and write all archives in --dir to remove any inconsistencies')
.action(() => execute(archiver => archiver.rebuildAll()));

View file

@ -23,6 +23,7 @@ import {
unloadAction,
rebuildAllAction,
emptyKibanaIndexAction,
editAction,
} from './actions';
export class EsArchiver {
@ -106,6 +107,23 @@ export class EsArchiver {
});
}
/**
* Extract the gzipped files in an archive, then call the handler. When it
* resolves re-archive the gzipped files.
*
* @param {String} prefix optional prefix to limit archives that are extracted
* @param {() => Promise<any>} handler
* @return Promise<void>
*/
async edit(prefix, handler) {
return await editAction({
prefix,
log: this.log,
dataDir: this.dataDir,
handler
});
}
/**
* Just like load, but skips any existing index
*