[esArchiver] upgrade to new ES client (#89874) (#90109)

Co-authored-by: spalger <spalger@users.noreply.github.com>
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>

Co-authored-by: spalger <spalger@users.noreply.github.com>
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Spencer 2021-02-02 19:21:29 -07:00 committed by GitHub
parent 2af05b14ca
commit b4de7f47a5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 1726 additions and 1449 deletions

View file

@ -565,6 +565,7 @@
"@welldone-software/why-did-you-render": "^5.0.0", "@welldone-software/why-did-you-render": "^5.0.0",
"@yarnpkg/lockfile": "^1.1.0", "@yarnpkg/lockfile": "^1.1.0",
"abab": "^2.0.4", "abab": "^2.0.4",
"aggregate-error": "^3.1.0",
"angular-aria": "^1.8.0", "angular-aria": "^1.8.0",
"angular-mocks": "^1.7.9", "angular-mocks": "^1.7.9",
"angular-recursion": "^1.0.5", "angular-recursion": "^1.0.5",

View file

@ -6,7 +6,7 @@
* Public License, v 1. * Public License, v 1.
*/ */
import { Client } from 'elasticsearch'; import { Client } from '@elastic/elasticsearch';
import { ToolingLog, KbnClient } from '@kbn/dev-utils'; import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import { migrateKibanaIndex, createStats, cleanKibanaIndices } from '../lib'; import { migrateKibanaIndex, createStats, cleanKibanaIndices } from '../lib';

View file

@ -10,7 +10,7 @@ import { resolve } from 'path';
import { createReadStream } from 'fs'; import { createReadStream } from 'fs';
import { Readable } from 'stream'; import { Readable } from 'stream';
import { ToolingLog, KbnClient } from '@kbn/dev-utils'; import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import { Client } from 'elasticsearch'; import { Client } from '@elastic/elasticsearch';
import { createPromiseFromStreams, concatStreamProviders } from '@kbn/utils'; import { createPromiseFromStreams, concatStreamProviders } from '@kbn/utils';
@ -92,7 +92,7 @@ export async function loadAction({
await client.indices.refresh({ await client.indices.refresh({
index: '_all', index: '_all',
allowNoIndices: true, allow_no_indices: true,
}); });
// If we affected the Kibana index, we need to ensure it's migrated... // If we affected the Kibana index, we need to ensure it's migrated...

View file

@ -9,7 +9,7 @@
import { resolve } from 'path'; import { resolve } from 'path';
import { createWriteStream, mkdirSync } from 'fs'; import { createWriteStream, mkdirSync } from 'fs';
import { Readable, Writable } from 'stream'; import { Readable, Writable } from 'stream';
import { Client } from 'elasticsearch'; import { Client } from '@elastic/elasticsearch';
import { ToolingLog } from '@kbn/dev-utils'; import { ToolingLog } from '@kbn/dev-utils';
import { createListStream, createPromiseFromStreams } from '@kbn/utils'; import { createListStream, createPromiseFromStreams } from '@kbn/utils';

View file

@ -9,7 +9,7 @@
import { resolve } from 'path'; import { resolve } from 'path';
import { createReadStream } from 'fs'; import { createReadStream } from 'fs';
import { Readable, Writable } from 'stream'; import { Readable, Writable } from 'stream';
import { Client } from 'elasticsearch'; import { Client } from '@elastic/elasticsearch';
import { ToolingLog, KbnClient } from '@kbn/dev-utils'; import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import { createPromiseFromStreams } from '@kbn/utils'; import { createPromiseFromStreams } from '@kbn/utils';

View file

@ -19,7 +19,7 @@ import Fs from 'fs';
import { RunWithCommands, createFlagError, KbnClient, CA_CERT_PATH } from '@kbn/dev-utils'; import { RunWithCommands, createFlagError, KbnClient, CA_CERT_PATH } from '@kbn/dev-utils';
import { readConfigFile } from '@kbn/test'; import { readConfigFile } from '@kbn/test';
import legacyElasticsearch from 'elasticsearch'; import { Client } from '@elastic/elasticsearch';
import { EsArchiver } from './es_archiver'; import { EsArchiver } from './es_archiver';
@ -115,10 +115,9 @@ export function runCli() {
throw createFlagError('--dir or --config must be defined'); throw createFlagError('--dir or --config must be defined');
} }
const client = new legacyElasticsearch.Client({ const client = new Client({
host: esUrl, node: esUrl,
ssl: esCa ? { ca: esCa } : undefined, ssl: esCa ? { ca: esCa } : undefined,
log: flags.verbose ? 'trace' : [],
}); });
addCleanupTask(() => client.close()); addCleanupTask(() => client.close());

View file

@ -6,7 +6,7 @@
* Public License, v 1. * Public License, v 1.
*/ */
import { Client } from 'elasticsearch'; import { Client } from '@elastic/elasticsearch';
import { ToolingLog, KbnClient } from '@kbn/dev-utils'; import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import { import {

View file

@ -1,67 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* and the Server Side Public License, v 1; you may not use this file except in
* compliance with, at your election, the Elastic License or the Server Side
* Public License, v 1.
*/
import { Client } from 'elasticsearch';
import sinon from 'sinon';
import Chance from 'chance';
import { times } from 'lodash';
import { Stats } from '../../stats';
const chance = new Chance();
export const createStubStats = (): Stats =>
({
indexedDoc: sinon.stub(),
archivedDoc: sinon.stub(),
} as any);
export const createPersonDocRecords = (n: number) =>
times(n, () => ({
type: 'doc',
value: {
index: 'people',
type: 'person',
id: chance.natural(),
source: {
name: chance.name(),
birthday: chance.birthday(),
ssn: chance.ssn(),
},
},
}));
type MockClient = Client & {
assertNoPendingResponses: () => void;
};
export const createStubClient = (
responses: Array<(name: string, params: any) => any | Promise<any>> = []
): MockClient => {
const createStubClientMethod = (name: string) =>
sinon.spy(async (params) => {
if (responses.length === 0) {
throw new Error(`unexpected client.${name} call`);
}
const response = responses.shift();
return await response!(name, params);
});
return {
search: createStubClientMethod('search'),
scroll: createStubClientMethod('scroll'),
bulk: createStubClientMethod('bulk'),
assertNoPendingResponses() {
if (responses.length) {
throw new Error(`There are ${responses.length} unsent responses.`);
}
},
} as any;
};

View file

@ -6,128 +6,185 @@
* Public License, v 1. * Public License, v 1.
*/ */
import sinon from 'sinon'; import {
import { delay } from 'bluebird'; createListStream,
import { createListStream, createPromiseFromStreams, createConcatStream } from '@kbn/utils'; createPromiseFromStreams,
createConcatStream,
createMapStream,
ToolingLog,
} from '@kbn/dev-utils';
import { createGenerateDocRecordsStream } from './generate_doc_records_stream'; import { createGenerateDocRecordsStream } from './generate_doc_records_stream';
import { Progress } from '../progress'; import { Progress } from '../progress';
import { createStubStats, createStubClient } from './__mocks__/stubs'; import { createStats } from '../stats';
describe('esArchiver: createGenerateDocRecordsStream()', () => { const log = new ToolingLog();
it('scolls 1000 documents at a time', async () => {
const stats = createStubStats();
const client = createStubClient([
(name, params) => {
expect(name).toBe('search');
expect(params).toHaveProperty('index', 'logstash-*');
expect(params).toHaveProperty('size', 1000);
return {
hits: {
total: 0,
hits: [],
},
};
},
]);
const progress = new Progress(); it('transforms each input index to a stream of docs using scrollSearch helper', async () => {
await createPromiseFromStreams([ const responses: any = {
createListStream(['logstash-*']), foo: [
createGenerateDocRecordsStream({ client, stats, progress }),
]);
expect(progress.getTotal()).toBe(0);
expect(progress.getComplete()).toBe(0);
});
it('uses a 1 minute scroll timeout', async () => {
const stats = createStubStats();
const client = createStubClient([
(name, params) => {
expect(name).toBe('search');
expect(params).toHaveProperty('index', 'logstash-*');
expect(params).toHaveProperty('scroll', '1m');
expect(params).toHaveProperty('rest_total_hits_as_int', true);
return {
hits: {
total: 0,
hits: [],
},
};
},
]);
const progress = new Progress();
await createPromiseFromStreams([
createListStream(['logstash-*']),
createGenerateDocRecordsStream({ client, stats, progress }),
]);
expect(progress.getTotal()).toBe(0);
expect(progress.getComplete()).toBe(0);
});
it('consumes index names and scrolls completely before continuing', async () => {
const stats = createStubStats();
let checkpoint = Date.now();
const client = createStubClient([
async (name, params) => {
expect(name).toBe('search');
expect(params).toHaveProperty('index', 'index1');
await delay(200);
return {
_scroll_id: 'index1ScrollId',
hits: { total: 2, hits: [{ _id: 1, _index: '.kibana_foo' }] },
};
},
async (name, params) => {
expect(name).toBe('scroll');
expect(params).toHaveProperty('scrollId', 'index1ScrollId');
expect(Date.now() - checkpoint).not.toBeLessThan(200);
checkpoint = Date.now();
await delay(200);
return { hits: { total: 2, hits: [{ _id: 2, _index: 'foo' }] } };
},
async (name, params) => {
expect(name).toBe('search');
expect(params).toHaveProperty('index', 'index2');
expect(Date.now() - checkpoint).not.toBeLessThan(200);
checkpoint = Date.now();
await delay(200);
return { hits: { total: 0, hits: [] } };
},
]);
const progress = new Progress();
const docRecords = await createPromiseFromStreams([
createListStream(['index1', 'index2']),
createGenerateDocRecordsStream({ client, stats, progress }),
createConcatStream([]),
]);
expect(docRecords).toEqual([
{ {
type: 'doc', body: {
value: { hits: {
index: '.kibana_1', total: 5,
type: undefined, hits: [
id: 1, { _index: 'foo', _type: '_doc', _id: '0', _source: {} },
source: undefined, { _index: 'foo', _type: '_doc', _id: '1', _source: {} },
{ _index: 'foo', _type: '_doc', _id: '2', _source: {} },
],
},
}, },
}, },
{ {
type: 'doc', body: {
value: { hits: {
index: 'foo', total: 5,
type: undefined, hits: [
id: 2, { _index: 'foo', _type: '_doc', _id: '3', _source: {} },
source: undefined, { _index: 'foo', _type: '_doc', _id: '4', _source: {} },
],
},
}, },
}, },
]); ],
sinon.assert.calledTwice(stats.archivedDoc as any); bar: [
expect(progress.getTotal()).toBe(2); {
expect(progress.getComplete()).toBe(2); body: {
}); hits: {
total: 2,
hits: [
{ _index: 'bar', _type: '_doc', _id: '0', _source: {} },
{ _index: 'bar', _type: '_doc', _id: '1', _source: {} },
],
},
},
},
],
};
const client: any = {
helpers: {
scrollSearch: jest.fn(function* ({ index }) {
while (responses[index] && responses[index].length) {
yield responses[index].shift()!;
}
}),
},
};
const stats = createStats('test', log);
const progress = new Progress();
const results = await createPromiseFromStreams([
createListStream(['bar', 'foo']),
createGenerateDocRecordsStream({
client,
stats,
progress,
}),
createMapStream((record: any) => {
expect(record).toHaveProperty('type', 'doc');
expect(record.value.source).toEqual({});
expect(record.value.type).toBe('_doc');
expect(record.value.index).toMatch(/^(foo|bar)$/);
expect(record.value.id).toMatch(/^\d+$/);
return `${record.value.index}:${record.value.id}`;
}),
createConcatStream([]),
]);
expect(client.helpers.scrollSearch).toMatchInlineSnapshot(`
[MockFunction] {
"calls": Array [
Array [
Object {
"_source": "true",
"body": Object {
"query": undefined,
},
"index": "bar",
"rest_total_hits_as_int": true,
"scroll": "1m",
"size": 1000,
},
],
Array [
Object {
"_source": "true",
"body": Object {
"query": undefined,
},
"index": "foo",
"rest_total_hits_as_int": true,
"scroll": "1m",
"size": 1000,
},
],
],
"results": Array [
Object {
"type": "return",
"value": Object {},
},
Object {
"type": "return",
"value": Object {},
},
],
}
`);
expect(results).toMatchInlineSnapshot(`
Array [
"bar:0",
"bar:1",
"foo:0",
"foo:1",
"foo:2",
"foo:3",
"foo:4",
]
`);
expect(progress).toMatchInlineSnapshot(`
Progress {
"complete": 7,
"loggingInterval": undefined,
"total": 7,
}
`);
expect(stats).toMatchInlineSnapshot(`
Object {
"bar": Object {
"archived": false,
"configDocs": Object {
"tagged": 0,
"upToDate": 0,
"upgraded": 0,
},
"created": false,
"deleted": false,
"docs": Object {
"archived": 2,
"indexed": 0,
},
"skipped": false,
"waitForSnapshot": 0,
},
"foo": Object {
"archived": false,
"configDocs": Object {
"tagged": 0,
"upToDate": 0,
"upgraded": 0,
},
"created": false,
"deleted": false,
"docs": Object {
"archived": 5,
"indexed": 0,
},
"skipped": false,
"waitForSnapshot": 0,
},
}
`);
}); });

View file

@ -7,7 +7,7 @@
*/ */
import { Transform } from 'stream'; import { Transform } from 'stream';
import { Client, SearchParams, SearchResponse } from 'elasticsearch'; import { Client } from '@elastic/elasticsearch';
import { Stats } from '../stats'; import { Stats } from '../stats';
import { Progress } from '../progress'; import { Progress } from '../progress';
@ -30,31 +30,26 @@ export function createGenerateDocRecordsStream({
readableObjectMode: true, readableObjectMode: true,
async transform(index, enc, callback) { async transform(index, enc, callback) {
try { try {
let remainingHits = 0; const interator = client.helpers.scrollSearch({
let resp: SearchResponse<any> | null = null; index,
scroll: SCROLL_TIMEOUT,
size: SCROLL_SIZE,
_source: 'true',
body: {
query,
},
rest_total_hits_as_int: true,
});
while (!resp || remainingHits > 0) { let remainingHits: number | null = null;
if (!resp) {
resp = await client.search({ for await (const resp of interator) {
index, if (remainingHits === null) {
scroll: SCROLL_TIMEOUT, remainingHits = resp.body.hits.total as number;
size: SCROLL_SIZE,
_source: true,
body: {
query,
},
rest_total_hits_as_int: true, // not declared on SearchParams type
} as SearchParams);
remainingHits = resp.hits.total;
progress.addToTotal(remainingHits); progress.addToTotal(remainingHits);
} else {
resp = await client.scroll({
scrollId: resp._scroll_id!,
scroll: SCROLL_TIMEOUT,
});
} }
for (const hit of resp.hits.hits) { for (const hit of resp.body.hits.hits) {
remainingHits -= 1; remainingHits -= 1;
stats.archivedDoc(hit._index); stats.archivedDoc(hit._index);
this.push({ this.push({
@ -70,7 +65,7 @@ export function createGenerateDocRecordsStream({
}); });
} }
progress.addToComplete(resp.hits.hits.length); progress.addToComplete(resp.body.hits.hits.length);
} }
callback(undefined); callback(undefined);

View file

@ -6,170 +6,278 @@
* Public License, v 1. * Public License, v 1.
*/ */
import { delay } from 'bluebird'; import {
import { createListStream, createPromiseFromStreams } from '@kbn/utils'; createListStream,
createPromiseFromStreams,
ToolingLog,
createRecursiveSerializer,
} from '@kbn/dev-utils';
import { Progress } from '../progress'; import { Progress } from '../progress';
import { createIndexDocRecordsStream } from './index_doc_records_stream'; import { createIndexDocRecordsStream } from './index_doc_records_stream';
import { createStubStats, createStubClient, createPersonDocRecords } from './__mocks__/stubs'; import { createStats } from '../stats';
const recordsToBulkBody = (records: any[]) => { const AT_LINE_RE = /^\s+at /m;
return records.reduce((acc, record) => {
const { index, type, id, source } = record.value;
return [...acc, { index: { _index: index, _type: type, _id: id } }, source]; expect.addSnapshotSerializer(
}, [] as any[]); createRecursiveSerializer(
}; (v) => typeof v === 'string' && AT_LINE_RE.test(v),
(v: string) => {
const lines = v.split('\n');
const withoutStack: string[] = [];
describe('esArchiver: createIndexDocRecordsStream()', () => { // move source lines to withoutStack, filtering out stacktrace lines
it('consumes doc records and sends to `_bulk` api', async () => { while (lines.length) {
const records = createPersonDocRecords(1); const line = lines.shift()!;
const client = createStubClient([
async (name, params) => {
expect(name).toBe('bulk');
expect(params).toEqual({
body: recordsToBulkBody(records),
requestTimeout: 120000,
});
return { ok: true };
},
]);
const stats = createStubStats();
const progress = new Progress();
await createPromiseFromStreams([ if (!AT_LINE_RE.test(line)) {
createListStream(records), withoutStack.push(line);
createIndexDocRecordsStream(client, stats, progress), } else {
]); // push in representation of stack trace indented to match "at"
withoutStack.push(`${' '.repeat(line.indexOf('at'))}<stack trace>`);
client.assertNoPendingResponses(); // shift off all subsequent `at ...` lines
expect(progress.getComplete()).toBe(1); while (lines.length && AT_LINE_RE.test(lines[0])) {
expect(progress.getTotal()).toBe(undefined); lines.shift();
}); }
}
}
it('consumes multiple doc records and sends to `_bulk` api together', async () => { return withoutStack.join('\n');
const records = createPersonDocRecords(10);
const client = createStubClient([
async (name, params) => {
expect(name).toBe('bulk');
expect(params).toEqual({
body: recordsToBulkBody(records.slice(0, 1)),
requestTimeout: 120000,
});
return { ok: true };
},
async (name, params) => {
expect(name).toBe('bulk');
expect(params).toEqual({
body: recordsToBulkBody(records.slice(1)),
requestTimeout: 120000,
});
return { ok: true };
},
]);
const stats = createStubStats();
const progress = new Progress();
await createPromiseFromStreams([
createListStream(records),
createIndexDocRecordsStream(client, stats, progress),
]);
client.assertNoPendingResponses();
expect(progress.getComplete()).toBe(10);
expect(progress.getTotal()).toBe(undefined);
});
it('waits until request is complete before sending more', async () => {
const records = createPersonDocRecords(10);
const stats = createStubStats();
const start = Date.now();
const delayMs = 1234;
const client = createStubClient([
async (name, params) => {
expect(name).toBe('bulk');
expect(params).toEqual({
body: recordsToBulkBody(records.slice(0, 1)),
requestTimeout: 120000,
});
await delay(delayMs);
return { ok: true };
},
async (name, params) => {
expect(name).toBe('bulk');
expect(params).toEqual({
body: recordsToBulkBody(records.slice(1)),
requestTimeout: 120000,
});
expect(Date.now() - start).not.toBeLessThan(delayMs);
return { ok: true };
},
]);
const progress = new Progress();
await createPromiseFromStreams([
createListStream(records),
createIndexDocRecordsStream(client, stats, progress),
]);
client.assertNoPendingResponses();
expect(progress.getComplete()).toBe(10);
expect(progress.getTotal()).toBe(undefined);
});
it('sends a maximum of 300 documents at a time', async () => {
const records = createPersonDocRecords(301);
const stats = createStubStats();
const client = createStubClient([
async (name, params) => {
expect(name).toBe('bulk');
expect(params.body.length).toEqual(1 * 2);
return { ok: true };
},
async (name, params) => {
expect(name).toBe('bulk');
expect(params.body.length).toEqual(299 * 2);
return { ok: true };
},
async (name, params) => {
expect(name).toBe('bulk');
expect(params.body.length).toEqual(1 * 2);
return { ok: true };
},
]);
const progress = new Progress();
await createPromiseFromStreams([
createListStream(records),
createIndexDocRecordsStream(client, stats, progress),
]);
client.assertNoPendingResponses();
expect(progress.getComplete()).toBe(301);
expect(progress.getTotal()).toBe(undefined);
});
it('emits an error if any request fails', async () => {
const records = createPersonDocRecords(2);
const stats = createStubStats();
const client = createStubClient([
async () => ({ ok: true }),
async () => ({ errors: true, forcedError: true }),
]);
const progress = new Progress();
try {
await createPromiseFromStreams([
createListStream(records),
createIndexDocRecordsStream(client, stats, progress),
]);
throw new Error('expected stream to emit error');
} catch (err) {
expect(err.message).toMatch(/"forcedError":\s*true/);
} }
)
);
client.assertNoPendingResponses(); const log = new ToolingLog();
expect(progress.getComplete()).toBe(1);
expect(progress.getTotal()).toBe(undefined); class MockClient {
helpers = {
bulk: jest.fn(),
};
}
const testRecords = [
{
type: 'doc',
value: {
index: 'foo',
id: '0',
source: {
hello: 'world',
},
},
},
{
type: 'doc',
value: {
index: 'foo',
id: '1',
source: {
hello: 'world',
},
},
},
{
type: 'doc',
value: {
index: 'foo',
id: '2',
source: {
hello: 'world',
},
},
},
{
type: 'doc',
value: {
index: 'foo',
id: '3',
source: {
hello: 'world',
},
},
},
];
it('indexes documents using the bulk client helper', async () => {
const client = new MockClient();
client.helpers.bulk.mockImplementation(async () => {});
const progress = new Progress();
const stats = createStats('test', log);
await createPromiseFromStreams([
createListStream(testRecords),
createIndexDocRecordsStream(client as any, stats, progress),
]);
expect(stats).toMatchInlineSnapshot(`
Object {
"foo": Object {
"archived": false,
"configDocs": Object {
"tagged": 0,
"upToDate": 0,
"upgraded": 0,
},
"created": false,
"deleted": false,
"docs": Object {
"archived": 0,
"indexed": 4,
},
"skipped": false,
"waitForSnapshot": 0,
},
}
`);
expect(progress).toMatchInlineSnapshot(`
Progress {
"complete": 4,
"loggingInterval": undefined,
"total": undefined,
}
`);
expect(client.helpers.bulk).toMatchInlineSnapshot(`
[MockFunction] {
"calls": Array [
Array [
Object {
"datasource": Array [
Object {
"hello": "world",
},
],
"onDocument": [Function],
"onDrop": [Function],
"retries": 5,
},
],
Array [
Object {
"datasource": Array [
Object {
"hello": "world",
},
Object {
"hello": "world",
},
Object {
"hello": "world",
},
],
"onDocument": [Function],
"onDrop": [Function],
"retries": 5,
},
],
],
"results": Array [
Object {
"type": "return",
"value": Promise {},
},
Object {
"type": "return",
"value": Promise {},
},
],
}
`);
});
describe('bulk helper onDocument param', () => {
it('returns index ops for each doc', async () => {
expect.assertions(testRecords.length);
const client = new MockClient();
client.helpers.bulk.mockImplementation(async ({ datasource, onDocument }) => {
for (const d of datasource) {
const op = onDocument(d);
expect(op).toEqual({
index: {
_index: 'foo',
_id: expect.stringMatching(/^\d$/),
},
});
}
});
const stats = createStats('test', log);
const progress = new Progress();
await createPromiseFromStreams([
createListStream(testRecords),
createIndexDocRecordsStream(client as any, stats, progress),
]);
});
it('returns create ops for each doc when instructed', async () => {
expect.assertions(testRecords.length);
const client = new MockClient();
client.helpers.bulk.mockImplementation(async ({ datasource, onDocument }) => {
for (const d of datasource) {
const op = onDocument(d);
expect(op).toEqual({
create: {
_index: 'foo',
_id: expect.stringMatching(/^\d$/),
},
});
}
});
const stats = createStats('test', log);
const progress = new Progress();
await createPromiseFromStreams([
createListStream(testRecords),
createIndexDocRecordsStream(client as any, stats, progress, true),
]);
});
});
describe('bulk helper onDrop param', () => {
it('throws an error reporting any docs which failed all retry attempts', async () => {
const client = new MockClient();
let counter = -1;
client.helpers.bulk.mockImplementation(async ({ datasource, onDrop }) => {
for (const d of datasource) {
counter++;
if (counter > 0) {
onDrop({
document: d,
error: {
reason: `${counter} conflicts with something`,
},
});
}
}
});
const stats = createStats('test', log);
const progress = new Progress();
const promise = createPromiseFromStreams([
createListStream(testRecords),
createIndexDocRecordsStream(client as any, stats, progress),
]);
await expect(promise).rejects.toThrowErrorMatchingInlineSnapshot(`
"
Error: Bulk doc failure [operation=index]:
doc: {\\"hello\\":\\"world\\"}
error: {\\"reason\\":\\"1 conflicts with something\\"}
<stack trace>
Error: Bulk doc failure [operation=index]:
doc: {\\"hello\\":\\"world\\"}
error: {\\"reason\\":\\"2 conflicts with something\\"}
<stack trace>
Error: Bulk doc failure [operation=index]:
doc: {\\"hello\\":\\"world\\"}
error: {\\"reason\\":\\"3 conflicts with something\\"}
<stack trace>"
`);
}); });
}); });

View file

@ -6,7 +6,8 @@
* Public License, v 1. * Public License, v 1.
*/ */
import { Client } from 'elasticsearch'; import { Client } from '@elastic/elasticsearch';
import AggregateError from 'aggregate-error';
import { Writable } from 'stream'; import { Writable } from 'stream';
import { Stats } from '../stats'; import { Stats } from '../stats';
import { Progress } from '../progress'; import { Progress } from '../progress';
@ -18,25 +19,39 @@ export function createIndexDocRecordsStream(
useCreate: boolean = false useCreate: boolean = false
) { ) {
async function indexDocs(docs: any[]) { async function indexDocs(docs: any[]) {
const body: any[] = [];
const operation = useCreate === true ? 'create' : 'index'; const operation = useCreate === true ? 'create' : 'index';
docs.forEach((doc) => { const ops = new WeakMap<any, any>();
stats.indexedDoc(doc.index); const errors: string[] = [];
body.push(
{ await client.helpers.bulk({
retries: 5,
datasource: docs.map((doc) => {
const body = doc.source;
ops.set(body, {
[operation]: { [operation]: {
_index: doc.index, _index: doc.index,
_type: doc.type, _type: doc.type,
_id: doc.id, _id: doc.id,
}, },
}, });
doc.source return body;
); }),
onDocument(doc) {
return ops.get(doc);
},
onDrop(dropped) {
const dj = JSON.stringify(dropped.document);
const ej = JSON.stringify(dropped.error);
errors.push(`Bulk doc failure [operation=${operation}]:\n doc: ${dj}\n error: ${ej}`);
},
}); });
const resp = await client.bulk({ requestTimeout: 2 * 60 * 1000, body }); if (errors.length) {
if (resp.errors) { throw new AggregateError(errors);
throw new Error(`Failed to index all documents: ${JSON.stringify(resp, null, 2)}`); }
for (const doc of docs) {
stats.indexedDoc(doc.index);
} }
} }

View file

@ -6,7 +6,7 @@
* Public License, v 1. * Public License, v 1.
*/ */
import { Client } from 'elasticsearch'; import { Client } from '@elastic/elasticsearch';
import sinon from 'sinon'; import sinon from 'sinon';
import { ToolingLog } from '@kbn/dev-utils'; import { ToolingLog } from '@kbn/dev-utils';
import { Stats } from '../../stats'; import { Stats } from '../../stats';
@ -54,9 +54,11 @@ export const createStubDocRecord = (index: string, id: number) => ({
const createEsClientError = (errorType: string) => { const createEsClientError = (errorType: string) => {
const err = new Error(`ES Client Error Stub "${errorType}"`); const err = new Error(`ES Client Error Stub "${errorType}"`);
(err as any).body = { (err as any).meta = {
error: { body: {
type: errorType, error: {
type: errorType,
},
}, },
}; };
return err; return err;
@ -79,26 +81,25 @@ export const createStubClient = (
} }
return { return {
[index]: { body: {
mappings: {}, [index]: {
settings: {}, mappings: {},
settings: {},
},
}, },
}; };
}), }),
existsAlias: sinon.spy(({ name }) => {
return Promise.resolve(aliases.hasOwnProperty(name));
}),
getAlias: sinon.spy(async ({ index, name }) => { getAlias: sinon.spy(async ({ index, name }) => {
if (index && existingIndices.indexOf(index) >= 0) { if (index && existingIndices.indexOf(index) >= 0) {
const result = indexAlias(aliases, index); const result = indexAlias(aliases, index);
return { [index]: { aliases: result ? { [result]: {} } : {} } }; return { body: { [index]: { aliases: result ? { [result]: {} } : {} } } };
} }
if (name && aliases[name]) { if (name && aliases[name]) {
return { [aliases[name]]: { aliases: { [name]: {} } } }; return { body: { [aliases[name]]: { aliases: { [name]: {} } } } };
} }
return { status: 404 }; return { statusCode: 404 };
}), }),
updateAliases: sinon.spy(async ({ body }) => { updateAliases: sinon.spy(async ({ body }) => {
body.actions.forEach( body.actions.forEach(
@ -110,14 +111,14 @@ export const createStubClient = (
} }
); );
return { ok: true }; return { body: { ok: true } };
}), }),
create: sinon.spy(async ({ index }) => { create: sinon.spy(async ({ index }) => {
if (existingIndices.includes(index) || aliases.hasOwnProperty(index)) { if (existingIndices.includes(index) || aliases.hasOwnProperty(index)) {
throw createEsClientError('resource_already_exists_exception'); throw createEsClientError('resource_already_exists_exception');
} else { } else {
existingIndices.push(index); existingIndices.push(index);
return { ok: true }; return { body: { ok: true } };
} }
}), }),
delete: sinon.spy(async ({ index }) => { delete: sinon.spy(async ({ index }) => {
@ -131,7 +132,7 @@ export const createStubClient = (
} }
}); });
indices.forEach((ix) => existingIndices.splice(existingIndices.indexOf(ix), 1)); indices.forEach((ix) => existingIndices.splice(existingIndices.indexOf(ix), 1));
return { ok: true }; return { body: { ok: true } };
} else { } else {
throw createEsClientError('index_not_found_exception'); throw createEsClientError('index_not_found_exception');
} }

View file

@ -56,15 +56,35 @@ describe('esArchiver: createCreateIndexStream()', () => {
createCreateIndexStream({ client, stats, log }), createCreateIndexStream({ client, stats, log }),
]); ]);
expect((client.indices.getAlias as sinon.SinonSpy).calledOnce).toBe(true); expect((client.indices.getAlias as sinon.SinonSpy).args).toMatchInlineSnapshot(`
expect((client.indices.getAlias as sinon.SinonSpy).args[0][0]).toEqual({ Array [
name: 'existing-index', Array [
ignore: [404], Object {
}); "name": Array [
expect((client.indices.delete as sinon.SinonSpy).calledOnce).toBe(true); "existing-index",
expect((client.indices.delete as sinon.SinonSpy).args[0][0]).toEqual({ ],
index: ['actual-index'], },
}); Object {
"ignore": Array [
404,
],
},
],
]
`);
expect((client.indices.delete as sinon.SinonSpy).args).toMatchInlineSnapshot(`
Array [
Array [
Object {
"index": Array [
"actual-index",
],
},
],
]
`);
sinon.assert.callCount(client.indices.create as sinon.SinonSpy, 3); // one failed create because of existing sinon.assert.callCount(client.indices.create as sinon.SinonSpy, 3); // one failed create because of existing
}); });

View file

@ -9,7 +9,7 @@
import { Transform, Readable } from 'stream'; import { Transform, Readable } from 'stream';
import { inspect } from 'util'; import { inspect } from 'util';
import { Client } from 'elasticsearch'; import { Client } from '@elastic/elasticsearch';
import { ToolingLog } from '@kbn/dev-utils'; import { ToolingLog } from '@kbn/dev-utils';
import { Stats } from '../stats'; import { Stats } from '../stats';
@ -92,7 +92,10 @@ export function createCreateIndexStream({
return; return;
} }
if (err?.body?.error?.type !== 'resource_already_exists_exception' || attemptNumber >= 3) { if (
err?.meta?.body?.error?.type !== 'resource_already_exists_exception' ||
attemptNumber >= 3
) {
throw err; throw err;
} }

View file

@ -6,8 +6,7 @@
* Public License, v 1. * Public License, v 1.
*/ */
import { get } from 'lodash'; import { Client } from '@elastic/elasticsearch';
import { Client } from 'elasticsearch';
import { ToolingLog } from '@kbn/dev-utils'; import { ToolingLog } from '@kbn/dev-utils';
import { Stats } from '../stats'; import { Stats } from '../stats';
@ -17,35 +16,45 @@ const PENDING_SNAPSHOT_STATUSES = ['INIT', 'STARTED', 'WAITING'];
export async function deleteIndex(options: { export async function deleteIndex(options: {
client: Client; client: Client;
stats: Stats; stats: Stats;
index: string; index: string | string[];
log: ToolingLog; log: ToolingLog;
retryIfSnapshottingCount?: number; retryIfSnapshottingCount?: number;
}): Promise<void> { }): Promise<void> {
const { client, stats, index, log, retryIfSnapshottingCount = 10 } = options; const { client, stats, log, retryIfSnapshottingCount = 10 } = options;
const indices = [options.index].flat();
const getIndicesToDelete = async () => { const getIndicesToDelete = async () => {
const aliasInfo = await client.indices.getAlias({ name: index, ignore: [404] }); const resp = await client.indices.getAlias(
return aliasInfo.status === 404 ? [index] : Object.keys(aliasInfo); {
name: indices,
},
{
ignore: [404],
}
);
return resp.statusCode === 404 ? indices : Object.keys(resp.body);
}; };
try { try {
const indicesToDelete = await getIndicesToDelete(); const indicesToDelete = await getIndicesToDelete();
await client.indices.delete({ index: indicesToDelete }); await client.indices.delete({ index: indicesToDelete });
for (let i = 0; i < indicesToDelete.length; i++) { for (const index of indices) {
const indexToDelete = indicesToDelete[i]; stats.deletedIndex(index);
stats.deletedIndex(indexToDelete);
} }
} catch (error) { } catch (error) {
if (retryIfSnapshottingCount > 0 && isDeleteWhileSnapshotInProgressError(error)) { if (retryIfSnapshottingCount > 0 && isDeleteWhileSnapshotInProgressError(error)) {
stats.waitingForInProgressSnapshot(index); for (const index of indices) {
await waitForSnapshotCompletion(client, index, log); stats.waitingForInProgressSnapshot(index);
}
await waitForSnapshotCompletion(client, indices, log);
return await deleteIndex({ return await deleteIndex({
...options, ...options,
retryIfSnapshottingCount: retryIfSnapshottingCount - 1, retryIfSnapshottingCount: retryIfSnapshottingCount - 1,
}); });
} }
if (get(error, 'body.error.type') !== 'index_not_found_exception') { if (error?.meta?.body?.error?.type !== 'index_not_found_exception') {
throw error; throw error;
} }
} }
@ -57,8 +66,8 @@ export async function deleteIndex(options: {
* @param {Error} error * @param {Error} error
* @return {Boolean} * @return {Boolean}
*/ */
export function isDeleteWhileSnapshotInProgressError(error: object) { export function isDeleteWhileSnapshotInProgressError(error: any) {
return get(error, 'body.error.reason', '').startsWith( return (error?.meta?.body?.error?.reason ?? '').startsWith(
'Cannot delete indices that are being snapshotted' 'Cannot delete indices that are being snapshotted'
); );
} }
@ -67,10 +76,16 @@ export function isDeleteWhileSnapshotInProgressError(error: object) {
* Wait for the any snapshot in any repository that is * Wait for the any snapshot in any repository that is
* snapshotting this index to complete. * snapshotting this index to complete.
*/ */
export async function waitForSnapshotCompletion(client: Client, index: string, log: ToolingLog) { export async function waitForSnapshotCompletion(
client: Client,
index: string | string[],
log: ToolingLog
) {
const isSnapshotPending = async (repository: string, snapshot: string) => { const isSnapshotPending = async (repository: string, snapshot: string) => {
const { const {
snapshots: [status], body: {
snapshots: [status],
},
} = await client.snapshot.status({ } = await client.snapshot.status({
repository, repository,
snapshot, snapshot,
@ -81,10 +96,13 @@ export async function waitForSnapshotCompletion(client: Client, index: string, l
}; };
const getInProgressSnapshots = async (repository: string) => { const getInProgressSnapshots = async (repository: string) => {
const { snapshots: inProgressSnapshots } = await client.snapshot.get({ const {
body: { snapshots: inProgressSnapshots },
} = await client.snapshot.get({
repository, repository,
snapshot: '_current', snapshot: '_current',
}); });
return inProgressSnapshots; return inProgressSnapshots;
}; };

View file

@ -7,7 +7,7 @@
*/ */
import { Transform } from 'stream'; import { Transform } from 'stream';
import { Client } from 'elasticsearch'; import { Client } from '@elastic/elasticsearch';
import { ToolingLog } from '@kbn/dev-utils'; import { ToolingLog } from '@kbn/dev-utils';
import { Stats } from '../stats'; import { Stats } from '../stats';

View file

@ -44,8 +44,8 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
]); ]);
const params = (client.indices.get as sinon.SinonSpy).args[0][0]; const params = (client.indices.get as sinon.SinonSpy).args[0][0];
expect(params).toHaveProperty('filterPath'); expect(params).toHaveProperty('filter_path');
const filters: string[] = params.filterPath; const filters: string[] = params.filter_path;
expect(filters.some((path) => path.includes('index.creation_date'))).toBe(true); expect(filters.some((path) => path.includes('index.creation_date'))).toBe(true);
expect(filters.some((path) => path.includes('index.uuid'))).toBe(true); expect(filters.some((path) => path.includes('index.uuid'))).toBe(true);
expect(filters.some((path) => path.includes('index.version'))).toBe(true); expect(filters.some((path) => path.includes('index.version'))).toBe(true);

View file

@ -7,7 +7,7 @@
*/ */
import { Transform } from 'stream'; import { Transform } from 'stream';
import { Client } from 'elasticsearch'; import { Client } from '@elastic/elasticsearch';
import { Stats } from '../stats'; import { Stats } from '../stats';
export function createGenerateIndexRecordsStream(client: Client, stats: Stats) { export function createGenerateIndexRecordsStream(client: Client, stats: Stats) {
@ -16,26 +16,30 @@ export function createGenerateIndexRecordsStream(client: Client, stats: Stats) {
readableObjectMode: true, readableObjectMode: true,
async transform(indexOrAlias, enc, callback) { async transform(indexOrAlias, enc, callback) {
try { try {
const resp = (await client.indices.get({ const resp = (
index: indexOrAlias, await client.indices.get({
filterPath: [ index: indexOrAlias,
'*.settings', filter_path: [
'*.mappings', '*.settings',
// remove settings that aren't really settings '*.mappings',
'-*.settings.index.creation_date', // remove settings that aren't really settings
'-*.settings.index.uuid', '-*.settings.index.creation_date',
'-*.settings.index.version', '-*.settings.index.uuid',
'-*.settings.index.provided_name', '-*.settings.index.version',
'-*.settings.index.frozen', '-*.settings.index.provided_name',
'-*.settings.index.search.throttled', '-*.settings.index.frozen',
'-*.settings.index.query', '-*.settings.index.search.throttled',
'-*.settings.index.routing', '-*.settings.index.query',
], '-*.settings.index.routing',
})) as Record<string, any>; ],
})
).body as Record<string, any>;
for (const [index, { settings, mappings }] of Object.entries(resp)) { for (const [index, { settings, mappings }] of Object.entries(resp)) {
const { const {
[index]: { aliases }, body: {
[index]: { aliases },
},
} = await client.indices.getAlias({ index }); } = await client.indices.getAlias({ index });
stats.archivedIndex(index, { settings, mappings }); stats.archivedIndex(index, { settings, mappings });

View file

@ -6,7 +6,9 @@
* Public License, v 1. * Public License, v 1.
*/ */
import { Client, CreateDocumentParams } from 'elasticsearch'; import { inspect } from 'util';
import { Client } from '@elastic/elasticsearch';
import { ToolingLog, KbnClient } from '@kbn/dev-utils'; import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import { Stats } from '../stats'; import { Stats } from '../stats';
import { deleteIndex } from './delete_index'; import { deleteIndex } from './delete_index';
@ -57,13 +59,17 @@ export async function migrateKibanaIndex({
}) { }) {
// we allow dynamic mappings on the index, as some interceptors are accessing documents before // we allow dynamic mappings on the index, as some interceptors are accessing documents before
// the migration is actually performed. The migrator will put the value back to `strict` after migration. // the migration is actually performed. The migrator will put the value back to `strict` after migration.
await client.indices.putMapping({ await client.indices.putMapping(
index: '.kibana', {
body: { index: '.kibana',
dynamic: true, body: {
dynamic: true,
},
}, },
ignore: [404], {
} as any); ignore: [404],
}
);
await kbnClient.savedObjects.migrate(); await kbnClient.savedObjects.migrate();
} }
@ -75,9 +81,14 @@ export async function migrateKibanaIndex({
* index (e.g. we don't want to remove .kibana_task_manager or the like). * index (e.g. we don't want to remove .kibana_task_manager or the like).
*/ */
async function fetchKibanaIndices(client: Client) { async function fetchKibanaIndices(client: Client) {
const kibanaIndices = await client.cat.indices({ index: '.kibana*', format: 'json' }); const resp = await client.cat.indices<unknown>({ index: '.kibana*', format: 'json' });
const isKibanaIndex = (index: string) => /^\.kibana(:?_\d*)?$/.test(index); const isKibanaIndex = (index: string) => /^\.kibana(:?_\d*)?$/.test(index);
return kibanaIndices.map((x: { index: string }) => x.index).filter(isKibanaIndex);
if (!Array.isArray(resp.body)) {
throw new Error(`expected response to be an array ${inspect(resp.body)}`);
}
return resp.body.map((x: { index: string }) => x.index).filter(isKibanaIndex);
} }
const delay = (delayInMs: number) => new Promise((resolve) => setTimeout(resolve, delayInMs)); const delay = (delayInMs: number) => new Promise((resolve) => setTimeout(resolve, delayInMs));
@ -102,28 +113,32 @@ export async function cleanKibanaIndices({
} }
while (true) { while (true) {
const resp = await client.deleteByQuery({ const resp = await client.deleteByQuery(
index: `.kibana`, {
body: { index: `.kibana`,
query: { body: {
bool: { query: {
must_not: { bool: {
ids: { must_not: {
type: '_doc', ids: {
values: ['space:default'], type: '_doc',
values: ['space:default'],
},
}, },
}, },
}, },
}, },
}, },
ignore: [409], {
}); ignore: [409],
}
);
if (resp.total !== resp.deleted) { if (resp.body.total !== resp.body.deleted) {
log.warning( log.warning(
'delete by query deleted %d of %d total documents, trying again', 'delete by query deleted %d of %d total documents, trying again',
resp.deleted, resp.body.deleted,
resp.total resp.body.total
); );
await delay(200); await delay(200);
continue; continue;
@ -141,20 +156,24 @@ export async function cleanKibanaIndices({
} }
export async function createDefaultSpace({ index, client }: { index: string; client: Client }) { export async function createDefaultSpace({ index, client }: { index: string; client: Client }) {
await client.create({ await client.create(
index, {
type: '_doc', index,
id: 'space:default', type: '_doc',
ignore: 409, id: 'space:default',
body: { body: {
type: 'space', type: 'space',
updated_at: new Date().toISOString(), updated_at: new Date().toISOString(),
space: { space: {
name: 'Default Space', name: 'Default Space',
description: 'This is the default space', description: 'This is the default space',
disabledFeatures: [], disabledFeatures: [],
_reserved: true, _reserved: true,
},
}, },
}, },
} as CreateDocumentParams); {
ignore: [409],
}
);
} }

File diff suppressed because it is too large Load diff

View file

@ -14,7 +14,7 @@ import * as KibanaServer from './kibana_server';
export function EsArchiverProvider({ getService }: FtrProviderContext): EsArchiver { export function EsArchiverProvider({ getService }: FtrProviderContext): EsArchiver {
const config = getService('config'); const config = getService('config');
const client = getService('legacyEs'); const client = getService('es');
const log = getService('log'); const log = getService('log');
const kibanaServer = getService('kibanaServer'); const kibanaServer = getService('kibanaServer');
const retry = getService('retry'); const retry = getService('retry');

View file

@ -7386,6 +7386,14 @@ aggregate-error@^3.0.0:
clean-stack "^2.0.0" clean-stack "^2.0.0"
indent-string "^4.0.0" indent-string "^4.0.0"
aggregate-error@^3.1.0:
version "3.1.0"
resolved "https://registry.yarnpkg.com/aggregate-error/-/aggregate-error-3.1.0.tgz#92670ff50f5359bdb7a3e0d40d0ec30c5737687a"
integrity sha512-4I7Td01quW/RpocfNayFdFVk1qSuoh0E7JrbRJ16nH01HhKFQ88INq9Sd+nd72zqRySlr9BmDA8xlEJ6vJMrYA==
dependencies:
clean-stack "^2.0.0"
indent-string "^4.0.0"
airbnb-js-shims@^2.2.1: airbnb-js-shims@^2.2.1:
version "2.2.1" version "2.2.1"
resolved "https://registry.yarnpkg.com/airbnb-js-shims/-/airbnb-js-shims-2.2.1.tgz#db481102d682b98ed1daa4c5baa697a05ce5c040" resolved "https://registry.yarnpkg.com/airbnb-js-shims/-/airbnb-js-shims-2.2.1.tgz#db481102d682b98ed1daa4c5baa697a05ce5c040"