[esArchiver] upgrade to new ES client (#89874)

Co-authored-by: spalger <spalger@users.noreply.github.com>
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Spencer 2021-02-02 16:55:41 -07:00 committed by GitHub
parent 3da4c6bb2c
commit 9246f398ea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 1724 additions and 1447 deletions

View file

@ -568,6 +568,7 @@
"@welldone-software/why-did-you-render": "^5.0.0",
"@yarnpkg/lockfile": "^1.1.0",
"abab": "^2.0.4",
"aggregate-error": "^3.1.0",
"angular-aria": "^1.8.0",
"angular-mocks": "^1.7.9",
"angular-recursion": "^1.0.5",

View file

@ -6,7 +6,7 @@
* Public License, v 1.
*/
import { Client } from 'elasticsearch';
import { Client } from '@elastic/elasticsearch';
import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import { migrateKibanaIndex, createStats, cleanKibanaIndices } from '../lib';

View file

@ -10,7 +10,7 @@ import { resolve } from 'path';
import { createReadStream } from 'fs';
import { Readable } from 'stream';
import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import { Client } from 'elasticsearch';
import { Client } from '@elastic/elasticsearch';
import { createPromiseFromStreams, concatStreamProviders } from '@kbn/utils';
@ -92,7 +92,7 @@ export async function loadAction({
await client.indices.refresh({
index: '_all',
allowNoIndices: true,
allow_no_indices: true,
});
// If we affected the Kibana index, we need to ensure it's migrated...

View file

@ -9,7 +9,7 @@
import { resolve } from 'path';
import { createWriteStream, mkdirSync } from 'fs';
import { Readable, Writable } from 'stream';
import { Client } from 'elasticsearch';
import { Client } from '@elastic/elasticsearch';
import { ToolingLog } from '@kbn/dev-utils';
import { createListStream, createPromiseFromStreams } from '@kbn/utils';

View file

@ -9,7 +9,7 @@
import { resolve } from 'path';
import { createReadStream } from 'fs';
import { Readable, Writable } from 'stream';
import { Client } from 'elasticsearch';
import { Client } from '@elastic/elasticsearch';
import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import { createPromiseFromStreams } from '@kbn/utils';

View file

@ -19,7 +19,7 @@ import Fs from 'fs';
import { RunWithCommands, createFlagError, KbnClient, CA_CERT_PATH } from '@kbn/dev-utils';
import { readConfigFile } from '@kbn/test';
import legacyElasticsearch from 'elasticsearch';
import { Client } from '@elastic/elasticsearch';
import { EsArchiver } from './es_archiver';
@ -115,10 +115,9 @@ export function runCli() {
throw createFlagError('--dir or --config must be defined');
}
const client = new legacyElasticsearch.Client({
host: esUrl,
const client = new Client({
node: esUrl,
ssl: esCa ? { ca: esCa } : undefined,
log: flags.verbose ? 'trace' : [],
});
addCleanupTask(() => client.close());

View file

@ -6,7 +6,7 @@
* Public License, v 1.
*/
import { Client } from 'elasticsearch';
import { Client } from '@elastic/elasticsearch';
import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import {

View file

@ -1,67 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* and the Server Side Public License, v 1; you may not use this file except in
* compliance with, at your election, the Elastic License or the Server Side
* Public License, v 1.
*/
import { Client } from 'elasticsearch';
import sinon from 'sinon';
import Chance from 'chance';
import { times } from 'lodash';
import { Stats } from '../../stats';
const chance = new Chance();
export const createStubStats = (): Stats =>
({
indexedDoc: sinon.stub(),
archivedDoc: sinon.stub(),
} as any);
export const createPersonDocRecords = (n: number) =>
times(n, () => ({
type: 'doc',
value: {
index: 'people',
type: 'person',
id: chance.natural(),
source: {
name: chance.name(),
birthday: chance.birthday(),
ssn: chance.ssn(),
},
},
}));
type MockClient = Client & {
assertNoPendingResponses: () => void;
};
export const createStubClient = (
responses: Array<(name: string, params: any) => any | Promise<any>> = []
): MockClient => {
const createStubClientMethod = (name: string) =>
sinon.spy(async (params) => {
if (responses.length === 0) {
throw new Error(`unexpected client.${name} call`);
}
const response = responses.shift();
return await response!(name, params);
});
return {
search: createStubClientMethod('search'),
scroll: createStubClientMethod('scroll'),
bulk: createStubClientMethod('bulk'),
assertNoPendingResponses() {
if (responses.length) {
throw new Error(`There are ${responses.length} unsent responses.`);
}
},
} as any;
};

View file

@ -6,128 +6,185 @@
* Public License, v 1.
*/
import sinon from 'sinon';
import { delay } from 'bluebird';
import { createListStream, createPromiseFromStreams, createConcatStream } from '@kbn/utils';
import {
createListStream,
createPromiseFromStreams,
createConcatStream,
createMapStream,
ToolingLog,
} from '@kbn/dev-utils';
import { createGenerateDocRecordsStream } from './generate_doc_records_stream';
import { Progress } from '../progress';
import { createStubStats, createStubClient } from './__mocks__/stubs';
import { createStats } from '../stats';
describe('esArchiver: createGenerateDocRecordsStream()', () => {
it('scolls 1000 documents at a time', async () => {
const stats = createStubStats();
const client = createStubClient([
(name, params) => {
expect(name).toBe('search');
expect(params).toHaveProperty('index', 'logstash-*');
expect(params).toHaveProperty('size', 1000);
return {
hits: {
total: 0,
hits: [],
},
};
},
]);
const log = new ToolingLog();
const progress = new Progress();
await createPromiseFromStreams([
createListStream(['logstash-*']),
createGenerateDocRecordsStream({ client, stats, progress }),
]);
expect(progress.getTotal()).toBe(0);
expect(progress.getComplete()).toBe(0);
});
it('uses a 1 minute scroll timeout', async () => {
const stats = createStubStats();
const client = createStubClient([
(name, params) => {
expect(name).toBe('search');
expect(params).toHaveProperty('index', 'logstash-*');
expect(params).toHaveProperty('scroll', '1m');
expect(params).toHaveProperty('rest_total_hits_as_int', true);
return {
hits: {
total: 0,
hits: [],
},
};
},
]);
const progress = new Progress();
await createPromiseFromStreams([
createListStream(['logstash-*']),
createGenerateDocRecordsStream({ client, stats, progress }),
]);
expect(progress.getTotal()).toBe(0);
expect(progress.getComplete()).toBe(0);
});
it('consumes index names and scrolls completely before continuing', async () => {
const stats = createStubStats();
let checkpoint = Date.now();
const client = createStubClient([
async (name, params) => {
expect(name).toBe('search');
expect(params).toHaveProperty('index', 'index1');
await delay(200);
return {
_scroll_id: 'index1ScrollId',
hits: { total: 2, hits: [{ _id: 1, _index: '.kibana_foo' }] },
};
},
async (name, params) => {
expect(name).toBe('scroll');
expect(params).toHaveProperty('scrollId', 'index1ScrollId');
expect(Date.now() - checkpoint).not.toBeLessThan(200);
checkpoint = Date.now();
await delay(200);
return { hits: { total: 2, hits: [{ _id: 2, _index: 'foo' }] } };
},
async (name, params) => {
expect(name).toBe('search');
expect(params).toHaveProperty('index', 'index2');
expect(Date.now() - checkpoint).not.toBeLessThan(200);
checkpoint = Date.now();
await delay(200);
return { hits: { total: 0, hits: [] } };
},
]);
const progress = new Progress();
const docRecords = await createPromiseFromStreams([
createListStream(['index1', 'index2']),
createGenerateDocRecordsStream({ client, stats, progress }),
createConcatStream([]),
]);
expect(docRecords).toEqual([
it('transforms each input index to a stream of docs using scrollSearch helper', async () => {
const responses: any = {
foo: [
{
type: 'doc',
value: {
index: '.kibana_1',
type: undefined,
id: 1,
source: undefined,
body: {
hits: {
total: 5,
hits: [
{ _index: 'foo', _type: '_doc', _id: '0', _source: {} },
{ _index: 'foo', _type: '_doc', _id: '1', _source: {} },
{ _index: 'foo', _type: '_doc', _id: '2', _source: {} },
],
},
},
},
{
type: 'doc',
value: {
index: 'foo',
type: undefined,
id: 2,
source: undefined,
body: {
hits: {
total: 5,
hits: [
{ _index: 'foo', _type: '_doc', _id: '3', _source: {} },
{ _index: 'foo', _type: '_doc', _id: '4', _source: {} },
],
},
},
},
]);
sinon.assert.calledTwice(stats.archivedDoc as any);
expect(progress.getTotal()).toBe(2);
expect(progress.getComplete()).toBe(2);
});
],
bar: [
{
body: {
hits: {
total: 2,
hits: [
{ _index: 'bar', _type: '_doc', _id: '0', _source: {} },
{ _index: 'bar', _type: '_doc', _id: '1', _source: {} },
],
},
},
},
],
};
const client: any = {
helpers: {
scrollSearch: jest.fn(function* ({ index }) {
while (responses[index] && responses[index].length) {
yield responses[index].shift()!;
}
}),
},
};
const stats = createStats('test', log);
const progress = new Progress();
const results = await createPromiseFromStreams([
createListStream(['bar', 'foo']),
createGenerateDocRecordsStream({
client,
stats,
progress,
}),
createMapStream((record: any) => {
expect(record).toHaveProperty('type', 'doc');
expect(record.value.source).toEqual({});
expect(record.value.type).toBe('_doc');
expect(record.value.index).toMatch(/^(foo|bar)$/);
expect(record.value.id).toMatch(/^\d+$/);
return `${record.value.index}:${record.value.id}`;
}),
createConcatStream([]),
]);
expect(client.helpers.scrollSearch).toMatchInlineSnapshot(`
[MockFunction] {
"calls": Array [
Array [
Object {
"_source": "true",
"body": Object {
"query": undefined,
},
"index": "bar",
"rest_total_hits_as_int": true,
"scroll": "1m",
"size": 1000,
},
],
Array [
Object {
"_source": "true",
"body": Object {
"query": undefined,
},
"index": "foo",
"rest_total_hits_as_int": true,
"scroll": "1m",
"size": 1000,
},
],
],
"results": Array [
Object {
"type": "return",
"value": Object {},
},
Object {
"type": "return",
"value": Object {},
},
],
}
`);
expect(results).toMatchInlineSnapshot(`
Array [
"bar:0",
"bar:1",
"foo:0",
"foo:1",
"foo:2",
"foo:3",
"foo:4",
]
`);
expect(progress).toMatchInlineSnapshot(`
Progress {
"complete": 7,
"loggingInterval": undefined,
"total": 7,
}
`);
expect(stats).toMatchInlineSnapshot(`
Object {
"bar": Object {
"archived": false,
"configDocs": Object {
"tagged": 0,
"upToDate": 0,
"upgraded": 0,
},
"created": false,
"deleted": false,
"docs": Object {
"archived": 2,
"indexed": 0,
},
"skipped": false,
"waitForSnapshot": 0,
},
"foo": Object {
"archived": false,
"configDocs": Object {
"tagged": 0,
"upToDate": 0,
"upgraded": 0,
},
"created": false,
"deleted": false,
"docs": Object {
"archived": 5,
"indexed": 0,
},
"skipped": false,
"waitForSnapshot": 0,
},
}
`);
});

View file

@ -7,7 +7,7 @@
*/
import { Transform } from 'stream';
import { Client, SearchParams, SearchResponse } from 'elasticsearch';
import { Client } from '@elastic/elasticsearch';
import { Stats } from '../stats';
import { Progress } from '../progress';
@ -30,31 +30,26 @@ export function createGenerateDocRecordsStream({
readableObjectMode: true,
async transform(index, enc, callback) {
try {
let remainingHits = 0;
let resp: SearchResponse<any> | null = null;
const interator = client.helpers.scrollSearch({
index,
scroll: SCROLL_TIMEOUT,
size: SCROLL_SIZE,
_source: 'true',
body: {
query,
},
rest_total_hits_as_int: true,
});
while (!resp || remainingHits > 0) {
if (!resp) {
resp = await client.search({
index,
scroll: SCROLL_TIMEOUT,
size: SCROLL_SIZE,
_source: true,
body: {
query,
},
rest_total_hits_as_int: true, // not declared on SearchParams type
} as SearchParams);
remainingHits = resp.hits.total;
let remainingHits: number | null = null;
for await (const resp of interator) {
if (remainingHits === null) {
remainingHits = resp.body.hits.total as number;
progress.addToTotal(remainingHits);
} else {
resp = await client.scroll({
scrollId: resp._scroll_id!,
scroll: SCROLL_TIMEOUT,
});
}
for (const hit of resp.hits.hits) {
for (const hit of resp.body.hits.hits) {
remainingHits -= 1;
stats.archivedDoc(hit._index);
this.push({
@ -70,7 +65,7 @@ export function createGenerateDocRecordsStream({
});
}
progress.addToComplete(resp.hits.hits.length);
progress.addToComplete(resp.body.hits.hits.length);
}
callback(undefined);

View file

@ -6,170 +6,278 @@
* Public License, v 1.
*/
import { delay } from 'bluebird';
import { createListStream, createPromiseFromStreams } from '@kbn/utils';
import {
createListStream,
createPromiseFromStreams,
ToolingLog,
createRecursiveSerializer,
} from '@kbn/dev-utils';
import { Progress } from '../progress';
import { createIndexDocRecordsStream } from './index_doc_records_stream';
import { createStubStats, createStubClient, createPersonDocRecords } from './__mocks__/stubs';
import { createStats } from '../stats';
const recordsToBulkBody = (records: any[]) => {
return records.reduce((acc, record) => {
const { index, id, source } = record.value;
const AT_LINE_RE = /^\s+at /m;
return [...acc, { index: { _index: index, _id: id } }, source];
}, [] as any[]);
};
expect.addSnapshotSerializer(
createRecursiveSerializer(
(v) => typeof v === 'string' && AT_LINE_RE.test(v),
(v: string) => {
const lines = v.split('\n');
const withoutStack: string[] = [];
describe('esArchiver: createIndexDocRecordsStream()', () => {
it('consumes doc records and sends to `_bulk` api', async () => {
const records = createPersonDocRecords(1);
const client = createStubClient([
async (name, params) => {
expect(name).toBe('bulk');
expect(params).toEqual({
body: recordsToBulkBody(records),
requestTimeout: 120000,
});
return { ok: true };
},
]);
const stats = createStubStats();
const progress = new Progress();
// move source lines to withoutStack, filtering out stacktrace lines
while (lines.length) {
const line = lines.shift()!;
await createPromiseFromStreams([
createListStream(records),
createIndexDocRecordsStream(client, stats, progress),
]);
if (!AT_LINE_RE.test(line)) {
withoutStack.push(line);
} else {
// push in representation of stack trace indented to match "at"
withoutStack.push(`${' '.repeat(line.indexOf('at'))}<stack trace>`);
client.assertNoPendingResponses();
expect(progress.getComplete()).toBe(1);
expect(progress.getTotal()).toBe(undefined);
});
// shift off all subsequent `at ...` lines
while (lines.length && AT_LINE_RE.test(lines[0])) {
lines.shift();
}
}
}
it('consumes multiple doc records and sends to `_bulk` api together', async () => {
const records = createPersonDocRecords(10);
const client = createStubClient([
async (name, params) => {
expect(name).toBe('bulk');
expect(params).toEqual({
body: recordsToBulkBody(records.slice(0, 1)),
requestTimeout: 120000,
});
return { ok: true };
},
async (name, params) => {
expect(name).toBe('bulk');
expect(params).toEqual({
body: recordsToBulkBody(records.slice(1)),
requestTimeout: 120000,
});
return { ok: true };
},
]);
const stats = createStubStats();
const progress = new Progress();
await createPromiseFromStreams([
createListStream(records),
createIndexDocRecordsStream(client, stats, progress),
]);
client.assertNoPendingResponses();
expect(progress.getComplete()).toBe(10);
expect(progress.getTotal()).toBe(undefined);
});
it('waits until request is complete before sending more', async () => {
const records = createPersonDocRecords(10);
const stats = createStubStats();
const start = Date.now();
const delayMs = 1234;
const client = createStubClient([
async (name, params) => {
expect(name).toBe('bulk');
expect(params).toEqual({
body: recordsToBulkBody(records.slice(0, 1)),
requestTimeout: 120000,
});
await delay(delayMs);
return { ok: true };
},
async (name, params) => {
expect(name).toBe('bulk');
expect(params).toEqual({
body: recordsToBulkBody(records.slice(1)),
requestTimeout: 120000,
});
expect(Date.now() - start).not.toBeLessThan(delayMs);
return { ok: true };
},
]);
const progress = new Progress();
await createPromiseFromStreams([
createListStream(records),
createIndexDocRecordsStream(client, stats, progress),
]);
client.assertNoPendingResponses();
expect(progress.getComplete()).toBe(10);
expect(progress.getTotal()).toBe(undefined);
});
it('sends a maximum of 300 documents at a time', async () => {
const records = createPersonDocRecords(301);
const stats = createStubStats();
const client = createStubClient([
async (name, params) => {
expect(name).toBe('bulk');
expect(params.body.length).toEqual(1 * 2);
return { ok: true };
},
async (name, params) => {
expect(name).toBe('bulk');
expect(params.body.length).toEqual(299 * 2);
return { ok: true };
},
async (name, params) => {
expect(name).toBe('bulk');
expect(params.body.length).toEqual(1 * 2);
return { ok: true };
},
]);
const progress = new Progress();
await createPromiseFromStreams([
createListStream(records),
createIndexDocRecordsStream(client, stats, progress),
]);
client.assertNoPendingResponses();
expect(progress.getComplete()).toBe(301);
expect(progress.getTotal()).toBe(undefined);
});
it('emits an error if any request fails', async () => {
const records = createPersonDocRecords(2);
const stats = createStubStats();
const client = createStubClient([
async () => ({ ok: true }),
async () => ({ errors: true, forcedError: true }),
]);
const progress = new Progress();
try {
await createPromiseFromStreams([
createListStream(records),
createIndexDocRecordsStream(client, stats, progress),
]);
throw new Error('expected stream to emit error');
} catch (err) {
expect(err.message).toMatch(/"forcedError":\s*true/);
return withoutStack.join('\n');
}
)
);
client.assertNoPendingResponses();
expect(progress.getComplete()).toBe(1);
expect(progress.getTotal()).toBe(undefined);
const log = new ToolingLog();
class MockClient {
helpers = {
bulk: jest.fn(),
};
}
const testRecords = [
{
type: 'doc',
value: {
index: 'foo',
id: '0',
source: {
hello: 'world',
},
},
},
{
type: 'doc',
value: {
index: 'foo',
id: '1',
source: {
hello: 'world',
},
},
},
{
type: 'doc',
value: {
index: 'foo',
id: '2',
source: {
hello: 'world',
},
},
},
{
type: 'doc',
value: {
index: 'foo',
id: '3',
source: {
hello: 'world',
},
},
},
];
it('indexes documents using the bulk client helper', async () => {
const client = new MockClient();
client.helpers.bulk.mockImplementation(async () => {});
const progress = new Progress();
const stats = createStats('test', log);
await createPromiseFromStreams([
createListStream(testRecords),
createIndexDocRecordsStream(client as any, stats, progress),
]);
expect(stats).toMatchInlineSnapshot(`
Object {
"foo": Object {
"archived": false,
"configDocs": Object {
"tagged": 0,
"upToDate": 0,
"upgraded": 0,
},
"created": false,
"deleted": false,
"docs": Object {
"archived": 0,
"indexed": 4,
},
"skipped": false,
"waitForSnapshot": 0,
},
}
`);
expect(progress).toMatchInlineSnapshot(`
Progress {
"complete": 4,
"loggingInterval": undefined,
"total": undefined,
}
`);
expect(client.helpers.bulk).toMatchInlineSnapshot(`
[MockFunction] {
"calls": Array [
Array [
Object {
"datasource": Array [
Object {
"hello": "world",
},
],
"onDocument": [Function],
"onDrop": [Function],
"retries": 5,
},
],
Array [
Object {
"datasource": Array [
Object {
"hello": "world",
},
Object {
"hello": "world",
},
Object {
"hello": "world",
},
],
"onDocument": [Function],
"onDrop": [Function],
"retries": 5,
},
],
],
"results": Array [
Object {
"type": "return",
"value": Promise {},
},
Object {
"type": "return",
"value": Promise {},
},
],
}
`);
});
describe('bulk helper onDocument param', () => {
it('returns index ops for each doc', async () => {
expect.assertions(testRecords.length);
const client = new MockClient();
client.helpers.bulk.mockImplementation(async ({ datasource, onDocument }) => {
for (const d of datasource) {
const op = onDocument(d);
expect(op).toEqual({
index: {
_index: 'foo',
_id: expect.stringMatching(/^\d$/),
},
});
}
});
const stats = createStats('test', log);
const progress = new Progress();
await createPromiseFromStreams([
createListStream(testRecords),
createIndexDocRecordsStream(client as any, stats, progress),
]);
});
it('returns create ops for each doc when instructed', async () => {
expect.assertions(testRecords.length);
const client = new MockClient();
client.helpers.bulk.mockImplementation(async ({ datasource, onDocument }) => {
for (const d of datasource) {
const op = onDocument(d);
expect(op).toEqual({
create: {
_index: 'foo',
_id: expect.stringMatching(/^\d$/),
},
});
}
});
const stats = createStats('test', log);
const progress = new Progress();
await createPromiseFromStreams([
createListStream(testRecords),
createIndexDocRecordsStream(client as any, stats, progress, true),
]);
});
});
describe('bulk helper onDrop param', () => {
it('throws an error reporting any docs which failed all retry attempts', async () => {
const client = new MockClient();
let counter = -1;
client.helpers.bulk.mockImplementation(async ({ datasource, onDrop }) => {
for (const d of datasource) {
counter++;
if (counter > 0) {
onDrop({
document: d,
error: {
reason: `${counter} conflicts with something`,
},
});
}
}
});
const stats = createStats('test', log);
const progress = new Progress();
const promise = createPromiseFromStreams([
createListStream(testRecords),
createIndexDocRecordsStream(client as any, stats, progress),
]);
await expect(promise).rejects.toThrowErrorMatchingInlineSnapshot(`
"
Error: Bulk doc failure [operation=index]:
doc: {\\"hello\\":\\"world\\"}
error: {\\"reason\\":\\"1 conflicts with something\\"}
<stack trace>
Error: Bulk doc failure [operation=index]:
doc: {\\"hello\\":\\"world\\"}
error: {\\"reason\\":\\"2 conflicts with something\\"}
<stack trace>
Error: Bulk doc failure [operation=index]:
doc: {\\"hello\\":\\"world\\"}
error: {\\"reason\\":\\"3 conflicts with something\\"}
<stack trace>"
`);
});
});

View file

@ -6,7 +6,8 @@
* Public License, v 1.
*/
import { Client } from 'elasticsearch';
import { Client } from '@elastic/elasticsearch';
import AggregateError from 'aggregate-error';
import { Writable } from 'stream';
import { Stats } from '../stats';
import { Progress } from '../progress';
@ -18,24 +19,38 @@ export function createIndexDocRecordsStream(
useCreate: boolean = false
) {
async function indexDocs(docs: any[]) {
const body: any[] = [];
const operation = useCreate === true ? 'create' : 'index';
docs.forEach((doc) => {
stats.indexedDoc(doc.index);
body.push(
{
const ops = new WeakMap<any, any>();
const errors: string[] = [];
await client.helpers.bulk({
retries: 5,
datasource: docs.map((doc) => {
const body = doc.source;
ops.set(body, {
[operation]: {
_index: doc.index,
_id: doc.id,
},
},
doc.source
);
});
return body;
}),
onDocument(doc) {
return ops.get(doc);
},
onDrop(dropped) {
const dj = JSON.stringify(dropped.document);
const ej = JSON.stringify(dropped.error);
errors.push(`Bulk doc failure [operation=${operation}]:\n doc: ${dj}\n error: ${ej}`);
},
});
const resp = await client.bulk({ requestTimeout: 2 * 60 * 1000, body });
if (resp.errors) {
throw new Error(`Failed to index all documents: ${JSON.stringify(resp, null, 2)}`);
if (errors.length) {
throw new AggregateError(errors);
}
for (const doc of docs) {
stats.indexedDoc(doc.index);
}
}

View file

@ -6,7 +6,7 @@
* Public License, v 1.
*/
import { Client } from 'elasticsearch';
import { Client } from '@elastic/elasticsearch';
import sinon from 'sinon';
import { ToolingLog } from '@kbn/dev-utils';
import { Stats } from '../../stats';
@ -54,9 +54,11 @@ export const createStubDocRecord = (index: string, id: number) => ({
const createEsClientError = (errorType: string) => {
const err = new Error(`ES Client Error Stub "${errorType}"`);
(err as any).body = {
error: {
type: errorType,
(err as any).meta = {
body: {
error: {
type: errorType,
},
},
};
return err;
@ -79,26 +81,25 @@ export const createStubClient = (
}
return {
[index]: {
mappings: {},
settings: {},
body: {
[index]: {
mappings: {},
settings: {},
},
},
};
}),
existsAlias: sinon.spy(({ name }) => {
return Promise.resolve(aliases.hasOwnProperty(name));
}),
getAlias: sinon.spy(async ({ index, name }) => {
if (index && existingIndices.indexOf(index) >= 0) {
const result = indexAlias(aliases, index);
return { [index]: { aliases: result ? { [result]: {} } : {} } };
return { body: { [index]: { aliases: result ? { [result]: {} } : {} } } };
}
if (name && aliases[name]) {
return { [aliases[name]]: { aliases: { [name]: {} } } };
return { body: { [aliases[name]]: { aliases: { [name]: {} } } } };
}
return { status: 404 };
return { statusCode: 404 };
}),
updateAliases: sinon.spy(async ({ body }) => {
body.actions.forEach(
@ -110,14 +111,14 @@ export const createStubClient = (
}
);
return { ok: true };
return { body: { ok: true } };
}),
create: sinon.spy(async ({ index }) => {
if (existingIndices.includes(index) || aliases.hasOwnProperty(index)) {
throw createEsClientError('resource_already_exists_exception');
} else {
existingIndices.push(index);
return { ok: true };
return { body: { ok: true } };
}
}),
delete: sinon.spy(async ({ index }) => {
@ -131,7 +132,7 @@ export const createStubClient = (
}
});
indices.forEach((ix) => existingIndices.splice(existingIndices.indexOf(ix), 1));
return { ok: true };
return { body: { ok: true } };
} else {
throw createEsClientError('index_not_found_exception');
}

View file

@ -56,15 +56,35 @@ describe('esArchiver: createCreateIndexStream()', () => {
createCreateIndexStream({ client, stats, log }),
]);
expect((client.indices.getAlias as sinon.SinonSpy).calledOnce).toBe(true);
expect((client.indices.getAlias as sinon.SinonSpy).args[0][0]).toEqual({
name: 'existing-index',
ignore: [404],
});
expect((client.indices.delete as sinon.SinonSpy).calledOnce).toBe(true);
expect((client.indices.delete as sinon.SinonSpy).args[0][0]).toEqual({
index: ['actual-index'],
});
expect((client.indices.getAlias as sinon.SinonSpy).args).toMatchInlineSnapshot(`
Array [
Array [
Object {
"name": Array [
"existing-index",
],
},
Object {
"ignore": Array [
404,
],
},
],
]
`);
expect((client.indices.delete as sinon.SinonSpy).args).toMatchInlineSnapshot(`
Array [
Array [
Object {
"index": Array [
"actual-index",
],
},
],
]
`);
sinon.assert.callCount(client.indices.create as sinon.SinonSpy, 3); // one failed create because of existing
});

View file

@ -9,7 +9,7 @@
import { Transform, Readable } from 'stream';
import { inspect } from 'util';
import { Client } from 'elasticsearch';
import { Client } from '@elastic/elasticsearch';
import { ToolingLog } from '@kbn/dev-utils';
import { Stats } from '../stats';
@ -88,7 +88,10 @@ export function createCreateIndexStream({
return;
}
if (err?.body?.error?.type !== 'resource_already_exists_exception' || attemptNumber >= 3) {
if (
err?.meta?.body?.error?.type !== 'resource_already_exists_exception' ||
attemptNumber >= 3
) {
throw err;
}

View file

@ -6,8 +6,7 @@
* Public License, v 1.
*/
import { get } from 'lodash';
import { Client } from 'elasticsearch';
import { Client } from '@elastic/elasticsearch';
import { ToolingLog } from '@kbn/dev-utils';
import { Stats } from '../stats';
@ -17,35 +16,45 @@ const PENDING_SNAPSHOT_STATUSES = ['INIT', 'STARTED', 'WAITING'];
export async function deleteIndex(options: {
client: Client;
stats: Stats;
index: string;
index: string | string[];
log: ToolingLog;
retryIfSnapshottingCount?: number;
}): Promise<void> {
const { client, stats, index, log, retryIfSnapshottingCount = 10 } = options;
const { client, stats, log, retryIfSnapshottingCount = 10 } = options;
const indices = [options.index].flat();
const getIndicesToDelete = async () => {
const aliasInfo = await client.indices.getAlias({ name: index, ignore: [404] });
return aliasInfo.status === 404 ? [index] : Object.keys(aliasInfo);
const resp = await client.indices.getAlias(
{
name: indices,
},
{
ignore: [404],
}
);
return resp.statusCode === 404 ? indices : Object.keys(resp.body);
};
try {
const indicesToDelete = await getIndicesToDelete();
await client.indices.delete({ index: indicesToDelete });
for (let i = 0; i < indicesToDelete.length; i++) {
const indexToDelete = indicesToDelete[i];
stats.deletedIndex(indexToDelete);
for (const index of indices) {
stats.deletedIndex(index);
}
} catch (error) {
if (retryIfSnapshottingCount > 0 && isDeleteWhileSnapshotInProgressError(error)) {
stats.waitingForInProgressSnapshot(index);
await waitForSnapshotCompletion(client, index, log);
for (const index of indices) {
stats.waitingForInProgressSnapshot(index);
}
await waitForSnapshotCompletion(client, indices, log);
return await deleteIndex({
...options,
retryIfSnapshottingCount: retryIfSnapshottingCount - 1,
});
}
if (get(error, 'body.error.type') !== 'index_not_found_exception') {
if (error?.meta?.body?.error?.type !== 'index_not_found_exception') {
throw error;
}
}
@ -57,8 +66,8 @@ export async function deleteIndex(options: {
* @param {Error} error
* @return {Boolean}
*/
export function isDeleteWhileSnapshotInProgressError(error: object) {
return get(error, 'body.error.reason', '').startsWith(
export function isDeleteWhileSnapshotInProgressError(error: any) {
return (error?.meta?.body?.error?.reason ?? '').startsWith(
'Cannot delete indices that are being snapshotted'
);
}
@ -67,10 +76,16 @@ export function isDeleteWhileSnapshotInProgressError(error: object) {
* Wait for the any snapshot in any repository that is
* snapshotting this index to complete.
*/
export async function waitForSnapshotCompletion(client: Client, index: string, log: ToolingLog) {
export async function waitForSnapshotCompletion(
client: Client,
index: string | string[],
log: ToolingLog
) {
const isSnapshotPending = async (repository: string, snapshot: string) => {
const {
snapshots: [status],
body: {
snapshots: [status],
},
} = await client.snapshot.status({
repository,
snapshot,
@ -81,10 +96,13 @@ export async function waitForSnapshotCompletion(client: Client, index: string, l
};
const getInProgressSnapshots = async (repository: string) => {
const { snapshots: inProgressSnapshots } = await client.snapshot.get({
const {
body: { snapshots: inProgressSnapshots },
} = await client.snapshot.get({
repository,
snapshot: '_current',
});
return inProgressSnapshots;
};

View file

@ -7,7 +7,7 @@
*/
import { Transform } from 'stream';
import { Client } from 'elasticsearch';
import { Client } from '@elastic/elasticsearch';
import { ToolingLog } from '@kbn/dev-utils';
import { Stats } from '../stats';

View file

@ -44,8 +44,8 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
]);
const params = (client.indices.get as sinon.SinonSpy).args[0][0];
expect(params).toHaveProperty('filterPath');
const filters: string[] = params.filterPath;
expect(params).toHaveProperty('filter_path');
const filters: string[] = params.filter_path;
expect(filters.some((path) => path.includes('index.creation_date'))).toBe(true);
expect(filters.some((path) => path.includes('index.uuid'))).toBe(true);
expect(filters.some((path) => path.includes('index.version'))).toBe(true);

View file

@ -7,7 +7,7 @@
*/
import { Transform } from 'stream';
import { Client } from 'elasticsearch';
import { Client } from '@elastic/elasticsearch';
import { Stats } from '../stats';
export function createGenerateIndexRecordsStream(client: Client, stats: Stats) {
@ -16,26 +16,30 @@ export function createGenerateIndexRecordsStream(client: Client, stats: Stats) {
readableObjectMode: true,
async transform(indexOrAlias, enc, callback) {
try {
const resp = (await client.indices.get({
index: indexOrAlias,
filterPath: [
'*.settings',
'*.mappings',
// remove settings that aren't really settings
'-*.settings.index.creation_date',
'-*.settings.index.uuid',
'-*.settings.index.version',
'-*.settings.index.provided_name',
'-*.settings.index.frozen',
'-*.settings.index.search.throttled',
'-*.settings.index.query',
'-*.settings.index.routing',
],
})) as Record<string, any>;
const resp = (
await client.indices.get({
index: indexOrAlias,
filter_path: [
'*.settings',
'*.mappings',
// remove settings that aren't really settings
'-*.settings.index.creation_date',
'-*.settings.index.uuid',
'-*.settings.index.version',
'-*.settings.index.provided_name',
'-*.settings.index.frozen',
'-*.settings.index.search.throttled',
'-*.settings.index.query',
'-*.settings.index.routing',
],
})
).body as Record<string, any>;
for (const [index, { settings, mappings }] of Object.entries(resp)) {
const {
[index]: { aliases },
body: {
[index]: { aliases },
},
} = await client.indices.getAlias({ index });
stats.archivedIndex(index, { settings, mappings });

View file

@ -6,7 +6,9 @@
* Public License, v 1.
*/
import { Client, CreateDocumentParams } from 'elasticsearch';
import { inspect } from 'util';
import { Client } from '@elastic/elasticsearch';
import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import { Stats } from '../stats';
import { deleteIndex } from './delete_index';
@ -57,13 +59,17 @@ export async function migrateKibanaIndex({
}) {
// we allow dynamic mappings on the index, as some interceptors are accessing documents before
// the migration is actually performed. The migrator will put the value back to `strict` after migration.
await client.indices.putMapping({
index: '.kibana',
body: {
dynamic: true,
await client.indices.putMapping(
{
index: '.kibana',
body: {
dynamic: true,
},
},
ignore: [404],
} as any);
{
ignore: [404],
}
);
await kbnClient.savedObjects.migrate();
}
@ -75,9 +81,14 @@ export async function migrateKibanaIndex({
* index (e.g. we don't want to remove .kibana_task_manager or the like).
*/
async function fetchKibanaIndices(client: Client) {
const kibanaIndices = await client.cat.indices({ index: '.kibana*', format: 'json' });
const resp = await client.cat.indices<unknown>({ index: '.kibana*', format: 'json' });
const isKibanaIndex = (index: string) => /^\.kibana(:?_\d*)?$/.test(index);
return kibanaIndices.map((x: { index: string }) => x.index).filter(isKibanaIndex);
if (!Array.isArray(resp.body)) {
throw new Error(`expected response to be an array ${inspect(resp.body)}`);
}
return resp.body.map((x: { index: string }) => x.index).filter(isKibanaIndex);
}
const delay = (delayInMs: number) => new Promise((resolve) => setTimeout(resolve, delayInMs));
@ -102,27 +113,31 @@ export async function cleanKibanaIndices({
}
while (true) {
const resp = await client.deleteByQuery({
index: `.kibana`,
body: {
query: {
bool: {
must_not: {
ids: {
values: ['space:default'],
const resp = await client.deleteByQuery(
{
index: `.kibana`,
body: {
query: {
bool: {
must_not: {
ids: {
values: ['space:default'],
},
},
},
},
},
},
ignore: [409],
});
{
ignore: [409],
}
);
if (resp.total !== resp.deleted) {
if (resp.body.total !== resp.body.deleted) {
log.warning(
'delete by query deleted %d of %d total documents, trying again',
resp.deleted,
resp.total
resp.body.deleted,
resp.body.total
);
await delay(200);
continue;
@ -140,19 +155,23 @@ export async function cleanKibanaIndices({
}
export async function createDefaultSpace({ index, client }: { index: string; client: Client }) {
await client.create({
index,
id: 'space:default',
ignore: 409,
body: {
type: 'space',
updated_at: new Date().toISOString(),
space: {
name: 'Default Space',
description: 'This is the default space',
disabledFeatures: [],
_reserved: true,
await client.create(
{
index,
id: 'space:default',
body: {
type: 'space',
updated_at: new Date().toISOString(),
space: {
name: 'Default Space',
description: 'This is the default space',
disabledFeatures: [],
_reserved: true,
},
},
},
} as CreateDocumentParams);
{
ignore: [409],
}
);
}

File diff suppressed because it is too large Load diff

View file

@ -14,7 +14,7 @@ import * as KibanaServer from './kibana_server';
export function EsArchiverProvider({ getService }: FtrProviderContext): EsArchiver {
const config = getService('config');
const client = getService('legacyEs');
const client = getService('es');
const log = getService('log');
const kibanaServer = getService('kibanaServer');
const retry = getService('retry');

View file

@ -7355,6 +7355,14 @@ aggregate-error@^3.0.0:
clean-stack "^2.0.0"
indent-string "^4.0.0"
aggregate-error@^3.1.0:
version "3.1.0"
resolved "https://registry.yarnpkg.com/aggregate-error/-/aggregate-error-3.1.0.tgz#92670ff50f5359bdb7a3e0d40d0ec30c5737687a"
integrity sha512-4I7Td01quW/RpocfNayFdFVk1qSuoh0E7JrbRJ16nH01HhKFQ88INq9Sd+nd72zqRySlr9BmDA8xlEJ6vJMrYA==
dependencies:
clean-stack "^2.0.0"
indent-string "^4.0.0"
airbnb-js-shims@^2.2.1:
version "2.2.1"
resolved "https://registry.yarnpkg.com/airbnb-js-shims/-/airbnb-js-shims-2.2.1.tgz#db481102d682b98ed1daa4c5baa697a05ce5c040"