mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
* [esArchiver/deleteIndex] wait and retry if snapshot in progress * [esArchiver/deleteIndex] use recursion for retry * [esArchiver/waitForSnapshot] invert status check * [esArchiver] share delete-with-retry with create stream * [esArchiver/stats] include index name in message * [esArchiver/indexDelete] wait for snapshot completion up to three times * [esArchiver] log status of snapshot during checks
This commit is contained in:
parent
5603706023
commit
7d153525e6
6 changed files with 107 additions and 20 deletions
|
@ -48,7 +48,7 @@ export async function loadAction({ name, skipExisting, client, dataDir, log }) {
|
|||
|
||||
await createPromiseFromStreams([
|
||||
recordStream,
|
||||
createCreateIndexStream({ client, stats, skipExisting }),
|
||||
createCreateIndexStream({ client, stats, skipExisting, log }),
|
||||
createIndexDocRecordsStream(client, stats),
|
||||
]);
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ export async function unloadAction({ name, client, dataDir, log }) {
|
|||
createReadStream(resolve(inputDir, filename)),
|
||||
...createParseArchiveStreams({ gzip: isGzip(filename) }),
|
||||
createFilterRecordsStream('index'),
|
||||
createDeleteIndexStream(client, stats)
|
||||
createDeleteIndexStream(client, stats, log)
|
||||
]);
|
||||
}
|
||||
|
||||
|
|
|
@ -2,7 +2,9 @@ import { Transform } from 'stream';
|
|||
|
||||
import { get } from 'lodash';
|
||||
|
||||
export function createCreateIndexStream({ client, stats, skipExisting }) {
|
||||
import { deleteIndex } from './delete_index';
|
||||
|
||||
export function createCreateIndexStream({ client, stats, skipExisting, log }) {
|
||||
const skipDocsFromIndices = new Set();
|
||||
|
||||
async function handleDoc(stream, record) {
|
||||
|
@ -35,8 +37,7 @@ export function createCreateIndexStream({ client, stats, skipExisting }) {
|
|||
return;
|
||||
}
|
||||
|
||||
await client.indices.delete({ index });
|
||||
stats.deletedIndex(index);
|
||||
await deleteIndex({ client, stats, index, log });
|
||||
await attemptToCreate(attemptNumber + 1);
|
||||
return;
|
||||
}
|
||||
|
|
92
src/es_archiver/lib/indices/delete_index.js
Normal file
92
src/es_archiver/lib/indices/delete_index.js
Normal file
|
@ -0,0 +1,92 @@
|
|||
import { get } from 'lodash';
|
||||
|
||||
// see https://github.com/elastic/elasticsearch/blob/99f88f15c5febbca2d13b5b5fda27b844153bf1a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java#L313-L319
|
||||
const PENDING_SNAPSHOT_STATUSES = [
|
||||
'INIT',
|
||||
'STARTED',
|
||||
'WAITING',
|
||||
];
|
||||
|
||||
export async function deleteIndex(options) {
|
||||
const {
|
||||
client,
|
||||
stats,
|
||||
index,
|
||||
log,
|
||||
retryIfSnapshottingCount = 3
|
||||
} = options;
|
||||
|
||||
try {
|
||||
await client.indices.delete({ index });
|
||||
stats.deletedIndex(index);
|
||||
} catch (error) {
|
||||
|
||||
if (retryIfSnapshottingCount > 0 && isDeleteWhileSnapshotInProgressError(error)) {
|
||||
stats.waitingForInProgressSnapshot(index);
|
||||
await waitForSnapshotCompletion(client, index, log);
|
||||
return await deleteIndex({
|
||||
...options,
|
||||
retryIfSnapshottingCount: retryIfSnapshottingCount - 1
|
||||
});
|
||||
}
|
||||
|
||||
if (get(error, 'body.error.type') !== 'index_not_found_exception') {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if an error is complaining about a delete while
|
||||
* a snapshot is in progress
|
||||
* @param {Error} error
|
||||
* @return {Boolean}
|
||||
*/
|
||||
export function isDeleteWhileSnapshotInProgressError(error) {
|
||||
return get(error, 'body.error.reason', '')
|
||||
.startsWith('Cannot delete indices that are being snapshotted');
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the any snapshot in any respository that is
|
||||
* snapshotting this index to complete.
|
||||
*
|
||||
* @param {EsClient} client
|
||||
* @param {string} index the name of the index to look for
|
||||
* @return {Promise<undefined>}
|
||||
*/
|
||||
export async function waitForSnapshotCompletion(client, index, log) {
|
||||
const isSnapshotPending = async (repository, snapshot) => {
|
||||
const { snapshots: [status] } = await client.snapshot.status({
|
||||
repository,
|
||||
snapshot,
|
||||
});
|
||||
|
||||
log.debug(`Snapshot ${repository}/${snapshot} is ${status.state}`);
|
||||
return PENDING_SNAPSHOT_STATUSES.includes(status.state);
|
||||
};
|
||||
|
||||
const getInProgressSnapshots = async (repository) => {
|
||||
const { snapshots: inProgressSnapshots } = await client.snapshot.get({
|
||||
repository,
|
||||
snapshot: '_current'
|
||||
});
|
||||
return inProgressSnapshots;
|
||||
};
|
||||
|
||||
for (const repository of Object.keys(await client.snapshot.getRepository())) {
|
||||
const allInProgress = await getInProgressSnapshots(repository);
|
||||
const found = allInProgress.find(s => s.indices.includes(index));
|
||||
|
||||
if (!found) {
|
||||
continue;
|
||||
}
|
||||
|
||||
while (await isSnapshotPending(repository, found.snapshot)) {
|
||||
// wait a bit before getting status again
|
||||
await new Promise(resolve => setTimeout(resolve, 500));
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
|
@ -1,27 +1,15 @@
|
|||
import { Transform } from 'stream';
|
||||
|
||||
import { get } from 'lodash';
|
||||
|
||||
export function createDeleteIndexStream(client, stats) {
|
||||
|
||||
async function deleteIndex(index) {
|
||||
try {
|
||||
await client.indices.delete({ index });
|
||||
stats.deletedIndex(index);
|
||||
} catch (err) {
|
||||
if (get(err, 'body.error.type') !== 'index_not_found_exception') {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
import { deleteIndex } from './delete_index';
|
||||
|
||||
export function createDeleteIndexStream(client, stats, log) {
|
||||
return new Transform({
|
||||
readableObjectMode: true,
|
||||
writableObjectMode: true,
|
||||
async transform(record, enc, callback) {
|
||||
try {
|
||||
if (!record || record.type === 'index') {
|
||||
await deleteIndex(record.value.index);
|
||||
await deleteIndex({ client, stats, log, index: record.value.index });
|
||||
} else {
|
||||
this.push(record);
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ export function createStats(name, log) {
|
|||
deleted: false,
|
||||
created: false,
|
||||
archived: false,
|
||||
waitForSnapshot: 0,
|
||||
configDocs: {
|
||||
upgraded: 0,
|
||||
tagged: 0,
|
||||
|
@ -32,6 +33,11 @@ export function createStats(name, log) {
|
|||
info('Skipped restore for existing index %j', index);
|
||||
}
|
||||
|
||||
waitingForInProgressSnapshot(index) {
|
||||
getOrCreate(index).waitForSnapshot += 1;
|
||||
info('Waiting for snapshot of %j to complete', index);
|
||||
}
|
||||
|
||||
deletedIndex(index) {
|
||||
getOrCreate(index).deleted = true;
|
||||
info('Deleted existing index %j', index);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue