[esArchiver] report progress when save/load takes > 10s (#35818) (#36396)

This commit is contained in:
Spencer 2019-05-09 13:22:29 -07:00 committed by GitHub
parent ee14952e83
commit b557df5145
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 144 additions and 12 deletions

View file

@ -34,6 +34,7 @@ import {
createCreateIndexStream,
createIndexDocRecordsStream,
migrateKibanaIndex,
Progress,
} from '../lib';
// pipe a series of streams into each other so that data and errors
@ -66,12 +67,16 @@ export async function loadAction({ name, skipExisting, client, dataDir, log, kib
{ objectMode: true }
);
const progress = new Progress('load progress');
progress.activate(log);
await createPromiseFromStreams([
recordStream,
createCreateIndexStream({ client, stats, skipExisting, log, kibanaUrl }),
createIndexDocRecordsStream(client, stats),
createIndexDocRecordsStream(client, stats, progress),
]);
progress.deactivate();
const result = stats.toJSON();
for (const [index, { docs }] of Object.entries(result)) {

View file

@ -33,6 +33,7 @@ import {
createGenerateIndexRecordsStream,
createFormatArchiveStreams,
createGenerateDocRecordsStream,
Progress
} from '../lib';
export async function saveAction({ name, indices, client, dataDir, log, raw }) {
@ -43,6 +44,9 @@ export async function saveAction({ name, indices, client, dataDir, log, raw }) {
await fromNode(cb => mkdirp(outputDir, cb));
const progress = new Progress();
progress.activate(log);
await Promise.all([
// export and save the matching indices to mappings.json
createPromiseFromStreams([
@ -55,12 +59,13 @@ export async function saveAction({ name, indices, client, dataDir, log, raw }) {
// export all documents from matching indexes into data.json.gz
createPromiseFromStreams([
createListStream(indices),
createGenerateDocRecordsStream(client, stats),
createGenerateDocRecordsStream(client, stats, progress),
...createFormatArchiveStreams({ gzip: !raw }),
createWriteStream(resolve(outputDir, `data.json${raw ? '' : '.gz'}`))
])
]);
progress.deactivate();
stats.forEachIndex((index, { docs }) => {
log.info('[%s] Archived %d docs from %j', name, docs.archived, index);
});

View file

@ -28,6 +28,7 @@ import {
} from '../../../../legacy/utils';
import { createGenerateDocRecordsStream } from '../generate_doc_records_stream';
import { Progress } from '../../progress';
import {
createStubStats,
createStubClient,
@ -50,10 +51,14 @@ describe('esArchiver: createGenerateDocRecordsStream()', () => {
}
]);
const progress = new Progress();
await createPromiseFromStreams([
createListStream(['logstash-*']),
createGenerateDocRecordsStream(client, stats)
createGenerateDocRecordsStream(client, stats, progress)
]);
expect(progress.getTotal()).to.be(0);
expect(progress.getComplete()).to.be(0);
});
it('uses a 1 minute scroll timeout', async () => {
@ -73,10 +78,14 @@ describe('esArchiver: createGenerateDocRecordsStream()', () => {
}
]);
const progress = new Progress();
await createPromiseFromStreams([
createListStream(['logstash-*']),
createGenerateDocRecordsStream(client, stats)
createGenerateDocRecordsStream(client, stats, progress)
]);
expect(progress.getTotal()).to.be(0);
expect(progress.getComplete()).to.be(0);
});
it('consumes index names and scrolls completely before continuing', async () => {
@ -110,12 +119,13 @@ describe('esArchiver: createGenerateDocRecordsStream()', () => {
}
]);
const progress = new Progress();
const docRecords = await createPromiseFromStreams([
createListStream([
'index1',
'index2',
]),
createGenerateDocRecordsStream(client, stats),
createGenerateDocRecordsStream(client, stats, progress),
createConcatStream([])
]);
@ -140,5 +150,7 @@ describe('esArchiver: createGenerateDocRecordsStream()', () => {
},
]);
sinon.assert.calledTwice(stats.archivedDoc);
expect(progress.getTotal()).to.be(2);
expect(progress.getComplete()).to.be(2);
});
});

View file

@ -25,6 +25,7 @@ import {
createPromiseFromStreams,
} from '../../../../legacy/utils';
import { Progress } from '../../progress';
import { createIndexDocRecordsStream } from '../index_doc_records_stream';
import {
createStubStats,
@ -57,13 +58,16 @@ describe('esArchiver: createIndexDocRecordsStream()', () => {
}
]);
const stats = createStubStats();
const progress = new Progress();
await createPromiseFromStreams([
createListStream(records),
createIndexDocRecordsStream(client, stats),
createIndexDocRecordsStream(client, stats, progress),
]);
client.assertNoPendingResponses();
expect(progress.getComplete()).to.be(1);
expect(progress.getTotal()).to.be(undefined);
});
it('consumes multiple doc records and sends to `_bulk` api together', async () => {
@ -85,13 +89,16 @@ describe('esArchiver: createIndexDocRecordsStream()', () => {
}
]);
const stats = createStubStats();
const progress = new Progress();
await createPromiseFromStreams([
createListStream(records),
createIndexDocRecordsStream(client, stats),
createIndexDocRecordsStream(client, stats, progress),
]);
client.assertNoPendingResponses();
expect(progress.getComplete()).to.be(10);
expect(progress.getTotal()).to.be(undefined);
});
it('waits until request is complete before sending more', async () => {
@ -117,13 +124,16 @@ describe('esArchiver: createIndexDocRecordsStream()', () => {
return { ok: true };
}
]);
const progress = new Progress();
await createPromiseFromStreams([
createListStream(records),
createIndexDocRecordsStream(client, stats)
createIndexDocRecordsStream(client, stats, progress)
]);
client.assertNoPendingResponses();
expect(progress.getComplete()).to.be(10);
expect(progress.getTotal()).to.be(undefined);
});
it('sends a maximum of 300 documents at a time', async () => {
@ -146,13 +156,16 @@ describe('esArchiver: createIndexDocRecordsStream()', () => {
return { ok: true };
},
]);
const progress = new Progress();
await createPromiseFromStreams([
createListStream(records),
createIndexDocRecordsStream(client, stats),
createIndexDocRecordsStream(client, stats, progress),
]);
client.assertNoPendingResponses();
expect(progress.getComplete()).to.be(301);
expect(progress.getTotal()).to.be(undefined);
});
it('emits an error if any request fails', async () => {
@ -162,11 +175,12 @@ describe('esArchiver: createIndexDocRecordsStream()', () => {
async () => ({ ok: true }),
async () => ({ errors: true, forcedError: true })
]);
const progress = new Progress();
try {
await createPromiseFromStreams([
createListStream(records),
createIndexDocRecordsStream(client, stats),
createIndexDocRecordsStream(client, stats, progress),
]);
throw new Error('expected stream to emit error');
} catch (err) {
@ -174,5 +188,7 @@ describe('esArchiver: createIndexDocRecordsStream()', () => {
}
client.assertNoPendingResponses();
expect(progress.getComplete()).to.be(1);
expect(progress.getTotal()).to.be(undefined);
});
});

View file

@ -22,7 +22,7 @@ import { Transform } from 'stream';
const SCROLL_SIZE = 1000;
const SCROLL_TIMEOUT = '1m';
export function createGenerateDocRecordsStream(client, stats) {
export function createGenerateDocRecordsStream(client, stats, progress) {
return new Transform({
writableObjectMode: true,
readableObjectMode: true,
@ -41,6 +41,7 @@ export function createGenerateDocRecordsStream(client, stats) {
rest_total_hits_as_int: true
});
remainingHits = resp.hits.total;
progress.addToTotal(remainingHits);
} else {
resp = await client.scroll({
scrollId: resp._scroll_id,
@ -63,6 +64,8 @@ export function createGenerateDocRecordsStream(client, stats) {
}
});
}
progress.addToComplete(resp.hits.hits.length);
}
callback(null);

View file

@ -19,7 +19,7 @@
import { Writable } from 'stream';
export function createIndexDocRecordsStream(client, stats) {
export function createIndexDocRecordsStream(client, stats, progress) {
async function indexDocs(docs) {
const body = [];
@ -51,6 +51,7 @@ export function createIndexDocRecordsStream(client, stats) {
async write(record, enc, callback) {
try {
await indexDocs([record.value]);
progress.addToComplete(1);
callback(null);
} catch (err) {
callback(err);
@ -60,6 +61,7 @@ export function createIndexDocRecordsStream(client, stats) {
async writev(chunks, callback) {
try {
await indexDocs(chunks.map(({ chunk: record }) => record.value));
progress.addToComplete(chunks.length);
callback(null);
} catch (err) {
callback(err);

View file

@ -48,3 +48,7 @@ export {
export {
readDirectory
} from './directory';
export {
Progress
} from './progress';

View file

@ -0,0 +1,85 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { ToolingLog } from '@kbn/dev-utils';
const SECOND = 1000;
export class Progress {
private total?: number;
private complete?: number;
private loggingInterval?: NodeJS.Timer;
getTotal() {
return this.total;
}
getComplete() {
return this.complete;
}
getPercent() {
if (this.complete === undefined || this.total === undefined) {
return 0;
}
return Math.round((this.complete / this.total) * 100);
}
isActive() {
return !!this.loggingInterval;
}
activate(log: ToolingLog) {
if (this.loggingInterval) {
throw new Error('Progress is already active');
}
// if the action takes longer than 10 seconds, log info about the transfer every 10 seconds
this.loggingInterval = setInterval(() => {
if (this.complete === undefined) {
return;
}
if (this.total === undefined) {
log.info('progress: %d', this.getComplete());
return;
}
log.info('progress: %d/%d (%d%)', this.getComplete(), this.getTotal(), this.getPercent());
}, 10 * SECOND);
}
deactivate() {
if (!this.loggingInterval) {
throw new Error('Progress is not active');
}
clearInterval(this.loggingInterval);
this.loggingInterval = undefined;
}
addToTotal(n: number) {
this.total = this.total === undefined ? n : this.total + n;
}
addToComplete(n: number) {
this.complete = this.complete === undefined ? n : this.complete + n;
}
}