Introducing the concept of ES capabilities (#164850)

## Summary

We recently got problems because some index creation settings are
rejected by stateless ES, causing the whole system to fail and Kibana to
terminate.

We can't really use feature flags for this, given:
1. it doesn't really make sense to use manual flags for something that
strictly depend on one of our dependency's capabilities
2. we're mixing the concept of "serverless" offering and "serverless"
build. Atm we sometimes run "serverless" Kibana against traditional ES,
meaning that the "serverless" info **cannot** be used to determine if
we're connected against a default or serverless version of ES.

This was something that was agreed a few weeks back, but never acted
upon.

## Introducing ES capabilities

This PR introduces the concept of elasticsearch "capabilities".

Those capabilities are built exclusively from info coming from the ES
cluster (and not by some config flag).

This first implementation simply exposes a `serverless` flag, that is
populated depending on the `build_flavor` field of the `info` API (`/`
endpoint).

The end goal would be to expose a real capabilities (e.g "what is
supported") list instead. But ideally this would be provided by some ES
API and not by us guessing what is supported depending on the build
flavor, so for now, just exposing whether we're connected to a default
of serverless ES will suffice.

### Using it to adapt some API calls during SO migration

This PR also adapts the `createIndex` and `cloneIndex` migration action
to use this information and change their request against ES accordingly
(removing some index creation parameters that are not supported).

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Pierre Gayvallet 2023-08-28 10:20:27 +02:00 committed by GitHub
parent 3c1e333275
commit 53173f1033
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
46 changed files with 695 additions and 68 deletions

View file

@ -29,4 +29,5 @@ export {
export { CoreElasticsearchRouteHandlerContext } from './src/elasticsearch_route_handler_context';
export { retryCallCluster, migrationRetryCallCluster } from './src/retry_call_cluster';
export { isInlineScriptingEnabled } from './src/is_scripting_enabled';
export { getCapabilitiesFromClient } from './src/get_capabilities';
export type { ClusterInfo } from './src/get_cluster_info';

View file

@ -23,3 +23,8 @@ export const isScriptingEnabledMock = jest.fn();
jest.doMock('./is_scripting_enabled', () => ({
isInlineScriptingEnabled: isScriptingEnabledMock,
}));
export const getClusterInfoMock = jest.fn();
jest.doMock('./get_cluster_info', () => ({
getClusterInfo$: getClusterInfoMock,
}));

View file

@ -16,10 +16,14 @@ jest.mock('./version_check/ensure_es_version', () => ({
pollEsNodesVersion: jest.fn(),
}));
import { MockClusterClient, isScriptingEnabledMock } from './elasticsearch_service.test.mocks';
import {
MockClusterClient,
isScriptingEnabledMock,
getClusterInfoMock,
} from './elasticsearch_service.test.mocks';
import type { NodesVersionCompatibility } from './version_check/ensure_es_version';
import { BehaviorSubject, firstValueFrom } from 'rxjs';
import { BehaviorSubject, firstValueFrom, of } from 'rxjs';
import { first, concatMap } from 'rxjs/operators';
import { REPO_ROOT } from '@kbn/repo-info';
import { Env } from '@kbn/config';
@ -81,6 +85,8 @@ beforeEach(() => {
isScriptingEnabledMock.mockResolvedValue(true);
getClusterInfoMock.mockReturnValue(of({}));
// @ts-expect-error TS does not get that `pollEsNodesVersion` is mocked
pollEsNodesVersionMocked.mockImplementation(pollEsNodesVersionActual);
});
@ -89,6 +95,7 @@ afterEach(async () => {
jest.clearAllMocks();
MockClusterClient.mockClear();
isScriptingEnabledMock.mockReset();
getClusterInfoMock.mockReset();
await elasticsearchService?.stop();
});

View file

@ -21,6 +21,7 @@ import type { InternalHttpServiceSetup } from '@kbn/core-http-server-internal';
import type {
UnauthorizedErrorHandler,
ElasticsearchClientConfig,
ElasticsearchCapabilities,
} from '@kbn/core-elasticsearch-server';
import { ClusterClient, AgentManager } from '@kbn/core-elasticsearch-client-server-internal';
@ -37,7 +38,8 @@ import { calculateStatus$ } from './status';
import { isValidConnection } from './is_valid_connection';
import { isInlineScriptingEnabled } from './is_scripting_enabled';
import { mergeConfig } from './merge_config';
import { getClusterInfo$ } from './get_cluster_info';
import { type ClusterInfo, getClusterInfo$ } from './get_cluster_info';
import { getElasticsearchCapabilities } from './get_capabilities';
export interface SetupDeps {
analytics: AnalyticsServiceSetup;
@ -57,6 +59,7 @@ export class ElasticsearchService
private executionContextClient?: IExecutionContext;
private esNodesCompatibility$?: Observable<NodesVersionCompatibility>;
private client?: ClusterClient;
private clusterInfo$?: Observable<ClusterInfo>;
private unauthorizedErrorHandler?: UnauthorizedErrorHandler;
private agentManager: AgentManager;
@ -104,14 +107,14 @@ export class ElasticsearchService
this.esNodesCompatibility$ = esNodesCompatibility$;
const clusterInfo$ = getClusterInfo$(this.client.asInternalUser);
registerAnalyticsContextProvider(deps.analytics, clusterInfo$);
this.clusterInfo$ = getClusterInfo$(this.client.asInternalUser);
registerAnalyticsContextProvider(deps.analytics, this.clusterInfo$);
return {
legacy: {
config$: this.config$,
},
clusterInfo$,
clusterInfo$: this.clusterInfo$,
esNodesCompatibility$,
status$: calculateStatus$(esNodesCompatibility$),
setUnauthorizedErrorHandler: (handler) => {
@ -140,6 +143,8 @@ export class ElasticsearchService
}
});
let capabilities: ElasticsearchCapabilities;
if (!config.skipStartupConnectionCheck) {
// Ensure that the connection is established and the product is valid before moving on
await isValidConnection(this.esNodesCompatibility$);
@ -155,11 +160,21 @@ export class ElasticsearchService
'Refer to https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-scripting-security.html for more info.'
);
}
capabilities = getElasticsearchCapabilities({
clusterInfo: await firstValueFrom(this.clusterInfo$!),
});
} else {
// skipStartupConnectionCheck is only used for unit testing, we default to base capabilities
capabilities = {
serverless: false,
};
}
return {
client: this.client!,
createClient: (type, clientConfig) => this.createClusterClient(type, config, clientConfig),
getCapabilities: () => capabilities,
};
}

View file

@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { getElasticsearchCapabilities } from './get_capabilities';
import type { ClusterInfo } from './get_cluster_info';
describe('getElasticsearchCapabilities', () => {
const getClusterInfo = (parts: Partial<ClusterInfo>): ClusterInfo => ({
cluster_name: 'cluster_name',
cluster_uuid: 'uuid',
cluster_version: '13.42.9000',
cluster_build_flavor: 'default',
...parts,
});
describe('capabilities.serverless', () => {
it('is `true` when `build_flavor` is `serverless`', () => {
expect(
getElasticsearchCapabilities({
clusterInfo: getClusterInfo({ cluster_build_flavor: 'serverless' }),
})
).toEqual(
expect.objectContaining({
serverless: true,
})
);
});
it('is `false` when `build_flavor` is `default`', () => {
expect(
getElasticsearchCapabilities({
clusterInfo: getClusterInfo({ cluster_build_flavor: 'default' }),
})
).toEqual(
expect.objectContaining({
serverless: false,
})
);
});
it('is `false` when `build_flavor` is a random string', () => {
expect(
getElasticsearchCapabilities({
clusterInfo: getClusterInfo({ cluster_build_flavor: 'some totally random string' }),
})
).toEqual(
expect.objectContaining({
serverless: false,
})
);
});
});
});

View file

@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { firstValueFrom } from 'rxjs';
import type {
ElasticsearchCapabilities,
ElasticsearchClient,
} from '@kbn/core-elasticsearch-server';
import { type ClusterInfo, getClusterInfo$ } from './get_cluster_info';
const SERVERLESS_BUILD_FLAVOR = 'serverless';
export const getElasticsearchCapabilities = ({
clusterInfo,
}: {
clusterInfo: ClusterInfo;
}): ElasticsearchCapabilities => {
const buildFlavor = clusterInfo.cluster_build_flavor;
return {
serverless: buildFlavor === SERVERLESS_BUILD_FLAVOR,
};
};
/**
* Returns the capabilities for the ES cluster the provided client is connected to.
*
* @internal
*/
export const getCapabilitiesFromClient = async (
client: ElasticsearchClient
): Promise<ElasticsearchCapabilities> => {
const clusterInfo = await firstValueFrom(getClusterInfo$(client));
return getElasticsearchCapabilities({ clusterInfo });
};

View file

@ -21,7 +21,7 @@ describe('getClusterInfo', () => {
number: '1.2.3',
lucene_version: '1.2.3',
build_date: 'DateString',
build_flavor: 'string',
build_flavor: 'default',
build_hash: 'string',
build_snapshot: true,
build_type: 'string',
@ -39,6 +39,7 @@ describe('getClusterInfo', () => {
const context$ = getClusterInfo$(internalClient);
await expect(firstValueFrom(context$)).resolves.toMatchInlineSnapshot(`
Object {
"cluster_build_flavor": "default",
"cluster_name": "cluster-name",
"cluster_uuid": "cluster_uuid",
"cluster_version": "1.2.3",
@ -52,6 +53,7 @@ describe('getClusterInfo', () => {
const context$ = getClusterInfo$(internalClient);
await expect(firstValueFrom(context$)).resolves.toMatchInlineSnapshot(`
Object {
"cluster_build_flavor": "default",
"cluster_name": "cluster-name",
"cluster_uuid": "cluster_uuid",
"cluster_version": "1.2.3",
@ -65,6 +67,7 @@ describe('getClusterInfo', () => {
const context$ = getClusterInfo$(internalClient);
await expect(firstValueFrom(context$)).resolves.toMatchInlineSnapshot(`
Object {
"cluster_build_flavor": "default",
"cluster_name": "cluster-name",
"cluster_uuid": "cluster_uuid",
"cluster_version": "1.2.3",
@ -72,6 +75,7 @@ describe('getClusterInfo', () => {
`);
await expect(firstValueFrom(context$)).resolves.toMatchInlineSnapshot(`
Object {
"cluster_build_flavor": "default",
"cluster_name": "cluster-name",
"cluster_uuid": "cluster_uuid",
"cluster_version": "1.2.3",

View file

@ -15,6 +15,7 @@ export interface ClusterInfo {
cluster_name: string;
cluster_uuid: string;
cluster_version: string;
cluster_build_flavor: string;
}
/**
@ -28,6 +29,7 @@ export function getClusterInfo$(internalClient: ElasticsearchClient): Observable
cluster_name: info.cluster_name,
cluster_uuid: info.cluster_uuid,
cluster_version: info.version.number,
cluster_build_flavor: info.version.build_flavor,
})),
retry({ delay: 1000 }),
shareReplay(1)

View file

@ -21,11 +21,17 @@ describe('registerAnalyticsContextProvider', () => {
test('it provides the context', async () => {
registerAnalyticsContextProvider(
analyticsMock,
of({ cluster_name: 'cluster-name', cluster_uuid: 'cluster_uuid', cluster_version: '1.2.3' })
of({
cluster_name: 'cluster-name',
cluster_uuid: 'cluster_uuid',
cluster_version: '1.2.3',
cluster_build_flavor: 'default',
})
);
const { context$ } = analyticsMock.registerContextProvider.mock.calls[0][0];
await expect(firstValueFrom(context$)).resolves.toMatchInlineSnapshot(`
Object {
"cluster_build_flavor": "default",
"cluster_name": "cluster-name",
"cluster_uuid": "cluster_uuid",
"cluster_version": "1.2.3",

View file

@ -27,6 +27,7 @@ export function registerAnalyticsContextProvider(
cluster_name: { type: 'keyword', _meta: { description: 'The Cluster Name' } },
cluster_uuid: { type: 'keyword', _meta: { description: 'The Cluster UUID' } },
cluster_version: { type: 'keyword', _meta: { description: 'The Cluster version' } },
cluster_build_flavor: { type: 'keyword', _meta: { description: 'The Cluster build flavor' } },
},
});
}

View file

@ -18,7 +18,9 @@ import {
import type {
ElasticsearchClientConfig,
ElasticsearchServiceSetup,
ElasticsearchServiceStart,
ElasticsearchServicePreboot,
ElasticsearchCapabilities,
} from '@kbn/core-elasticsearch-server';
import type {
ElasticsearchConfig,
@ -40,12 +42,14 @@ export type MockedElasticSearchServiceSetup = jest.Mocked<
};
};
export interface MockedElasticSearchServiceStart {
export type MockedElasticSearchServiceStart = jest.Mocked<
Omit<ElasticsearchServiceStart, 'client' | 'createClient'>
> & {
client: ClusterClientMock;
createClient: jest.MockedFunction<
(type: string, config?: Partial<ElasticsearchClientConfig>) => CustomClusterClientMock
>;
}
};
const createPrebootContractMock = () => {
const prebootContract: MockedElasticSearchServicePreboot = {
@ -69,6 +73,7 @@ const createStartContractMock = () => {
const startContract: MockedElasticSearchServiceStart = {
client: elasticsearchClientMock.createClusterClient(),
createClient: jest.fn((type: string) => elasticsearchClientMock.createCustomClusterClient()),
getCapabilities: jest.fn().mockReturnValue(createCapabilities()),
};
return startContract;
};
@ -90,6 +95,7 @@ const createInternalSetupContractMock = () => {
cluster_uuid: 'cluster-uuid',
cluster_name: 'cluster-name',
cluster_version: '8.0.0',
cluster_build_flavor: 'default',
}),
status$: new BehaviorSubject<ServiceStatus<ElasticsearchStatusMeta>>({
level: ServiceStatusLevels.available,
@ -117,6 +123,15 @@ const createMock = () => {
return mocked;
};
const createCapabilities = (
parts: Partial<ElasticsearchCapabilities> = {}
): ElasticsearchCapabilities => {
return {
serverless: false,
...parts,
};
};
export const elasticsearchServiceMock = {
create: createMock,
createInternalPreboot: createInternalPrebootContractMock,
@ -125,6 +140,7 @@ export const elasticsearchServiceMock = {
createSetup: createSetupContractMock,
createInternalStart: createInternalStartContractMock,
createStart: createStartContractMock,
createCapabilities,
...elasticsearchClientMock,
};

View file

@ -30,6 +30,7 @@ export type {
ElasticsearchServicePreboot,
ElasticsearchServiceStart,
ElasticsearchServiceSetup,
ElasticsearchCapabilities,
} from './src/contracts';
export type { IElasticsearchConfig, ElasticsearchSslConfig } from './src/elasticsearch_config';
export type { ElasticsearchRequestHandlerContext } from './src/request_handler_context';

View file

@ -126,6 +126,24 @@ export interface ElasticsearchServiceStart {
type: string,
clientConfig?: Partial<ElasticsearchClientConfig>
) => ICustomClusterClient;
/**
* Returns the capabilities for the default cluster.
*/
getCapabilities: () => ElasticsearchCapabilities;
}
/**
* Represent the capabilities supported by a given ES cluster.
*
* @public
*/
export interface ElasticsearchCapabilities {
/**
* Indicates whether we're connected to a serverless version of elasticsearch.
* Required because some options aren't working for serverless and code needs to have the info to react accordingly.
*/
serverless: boolean;
}
/**

View file

@ -302,6 +302,7 @@ export function createPluginStartContext<TPlugin, TPluginDependencies>(
elasticsearch: {
client: deps.elasticsearch.client,
createClient: deps.elasticsearch.createClient,
getCapabilities: deps.elasticsearch.getCapabilities,
},
executionContext: deps.executionContext,
http: {

View file

@ -519,6 +519,7 @@ Object {
"branch": Any<String>,
"buildNumber": Any<Number>,
"clusterInfo": Object {
"cluster_build_flavor": "default",
"cluster_name": "cluster-name",
"cluster_uuid": "cluster-uuid",
"cluster_version": "8.0.0",
@ -590,6 +591,7 @@ Object {
"branch": Any<String>,
"buildNumber": Any<Number>,
"clusterInfo": Object {
"cluster_build_flavor": "default",
"cluster_name": "cluster-name",
"cluster_uuid": "cluster-uuid",
"cluster_version": "8.0.0",
@ -720,6 +722,7 @@ Object {
"branch": Any<String>,
"buildNumber": Any<Number>,
"clusterInfo": Object {
"cluster_build_flavor": "default",
"cluster_name": "cluster-name",
"cluster_uuid": "cluster-uuid",
"cluster_version": "8.0.0",
@ -791,6 +794,7 @@ Object {
"branch": Any<String>,
"buildNumber": Any<Number>,
"clusterInfo": Object {
"cluster_build_flavor": "default",
"cluster_name": "cluster-name",
"cluster_uuid": "cluster-uuid",
"cluster_version": "8.0.0",
@ -858,6 +862,7 @@ Object {
"branch": Any<String>,
"buildNumber": Any<Number>,
"clusterInfo": Object {
"cluster_build_flavor": "default",
"cluster_name": "cluster-name",
"cluster_uuid": "cluster-uuid",
"cluster_version": "8.0.0",

View file

@ -25,6 +25,9 @@ Object {
"currentAlias": ".my-so-index",
"discardCorruptObjects": false,
"discardUnknownObjects": false,
"esCapabilities": Object {
"serverless": false,
},
"excludeFromUpgradeFilterHooks": Object {},
"excludeOnUpgradeQuery": Object {
"bool": Object {
@ -250,6 +253,9 @@ Object {
"currentAlias": ".my-so-index",
"discardCorruptObjects": false,
"discardUnknownObjects": false,
"esCapabilities": Object {
"serverless": false,
},
"excludeFromUpgradeFilterHooks": Object {},
"excludeOnUpgradeQuery": Object {
"bool": Object {
@ -479,6 +485,9 @@ Object {
"currentAlias": ".my-so-index",
"discardCorruptObjects": false,
"discardUnknownObjects": false,
"esCapabilities": Object {
"serverless": false,
},
"excludeFromUpgradeFilterHooks": Object {},
"excludeOnUpgradeQuery": Object {
"bool": Object {
@ -712,6 +721,9 @@ Object {
"currentAlias": ".my-so-index",
"discardCorruptObjects": false,
"discardUnknownObjects": false,
"esCapabilities": Object {
"serverless": false,
},
"excludeFromUpgradeFilterHooks": Object {},
"excludeOnUpgradeQuery": Object {
"bool": Object {
@ -981,6 +993,9 @@ Object {
"currentAlias": ".my-so-index",
"discardCorruptObjects": false,
"discardUnknownObjects": false,
"esCapabilities": Object {
"serverless": false,
},
"excludeFromUpgradeFilterHooks": Object {},
"excludeOnUpgradeQuery": Object {
"bool": Object {
@ -1217,6 +1232,9 @@ Object {
"currentAlias": ".my-so-index",
"discardCorruptObjects": false,
"discardUnknownObjects": false,
"esCapabilities": Object {
"serverless": false,
},
"excludeFromUpgradeFilterHooks": Object {},
"excludeOnUpgradeQuery": Object {
"bool": Object {

View file

@ -8,8 +8,8 @@
import { errors as EsErrors } from '@elastic/elasticsearch';
import { cloneIndex } from './clone_index';
import { setWriteBlock } from './set_write_block';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { elasticsearchServiceMock } from '@kbn/core-elasticsearch-server-mocks';
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
jest.mock('./catch_retryable_es_client_errors');
@ -36,11 +36,82 @@ describe('cloneIndex', () => {
elasticsearchClientMock.createErrorTransportRequestPromise(nonRetryableError)
);
it('calls client.indices.clone with the correct parameter for default ES', async () => {
const statefulCapabilities = elasticsearchServiceMock.createCapabilities({ serverless: false });
const task = cloneIndex({
client,
source: 'my_source_index',
target: 'my_target_index',
esCapabilities: statefulCapabilities,
});
try {
await task();
} catch (e) {
/** ignore */
}
expect(client.indices.clone.mock.calls[0][0]).toMatchInlineSnapshot(`
Object {
"index": "my_source_index",
"settings": Object {
"index": Object {
"auto_expand_replicas": "0-1",
"blocks.write": false,
"mapping": Object {
"total_fields": Object {
"limit": 1500,
},
},
"number_of_shards": 1,
"priority": 10,
"refresh_interval": "1s",
},
},
"target": "my_target_index",
"timeout": "60s",
"wait_for_active_shards": "all",
}
`);
});
it('calls client.indices.clone with the correct parameter for serverless ES', async () => {
const statelessCapabilities = elasticsearchServiceMock.createCapabilities({ serverless: true });
const task = cloneIndex({
client,
source: 'my_source_index',
target: 'my_target_index',
esCapabilities: statelessCapabilities,
});
try {
await task();
} catch (e) {
/** ignore */
}
expect(client.indices.clone.mock.calls[0][0]).toMatchInlineSnapshot(`
Object {
"index": "my_source_index",
"settings": Object {
"index": Object {
"blocks.write": false,
"mapping": Object {
"total_fields": Object {
"limit": 1500,
},
},
},
},
"target": "my_target_index",
"timeout": "60s",
"wait_for_active_shards": "all",
}
`);
});
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = cloneIndex({
client,
source: 'my_source_index',
target: 'my_target_index',
esCapabilities: elasticsearchServiceMock.createCapabilities(),
});
try {
await task();
@ -51,11 +122,17 @@ describe('cloneIndex', () => {
});
it('re-throws non retry-able errors', async () => {
const task = setWriteBlock({
const task = cloneIndex({
client: clientWithNonRetryableError,
index: 'my_index',
source: 'my_source_index',
target: 'my_target_index',
esCapabilities: elasticsearchServiceMock.createCapabilities(),
});
await task();
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError);
});
});

View file

@ -10,7 +10,10 @@ import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import { pipe } from 'fp-ts/lib/pipeable';
import { errors as EsErrors } from '@elastic/elasticsearch';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type {
ElasticsearchClient,
ElasticsearchCapabilities,
} from '@kbn/core-elasticsearch-server';
import {
catchRetryableEsClientErrors,
type RetryableEsClientError,
@ -31,11 +34,13 @@ export type CloneIndexResponse = AcknowledgeResponse;
/** @internal */
export interface CloneIndexParams {
client: ElasticsearchClient;
esCapabilities: ElasticsearchCapabilities;
source: string;
target: string;
/** only used for testing */
timeout?: string;
}
/**
* Makes a clone of the source index into the target.
*
@ -48,6 +53,7 @@ export interface CloneIndexParams {
*/
export const cloneIndex = ({
client,
esCapabilities,
source,
target,
timeout = DEFAULT_TIMEOUT,
@ -59,16 +65,18 @@ export const cloneIndex = ({
RetryableEsClientError | IndexNotFound | ClusterShardLimitExceeded,
AcknowledgeResponse
> = () => {
return client.indices
.clone({
index: source,
target,
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
settings: {
index: {
// The source we're cloning from will have a write block set, so
// we need to remove it to allow writes to our newly cloned index
'blocks.write': false,
const indexSettings = {
// The source we're cloning from will have a write block set, so
// we need to remove it to allow writes to our newly cloned index
'blocks.write': false,
// Increase the fields limit beyond the default of 1000
mapping: {
total_fields: { limit: 1500 },
},
// settings not being supported on serverless ES
...(esCapabilities.serverless
? {}
: {
// The rest of the index settings should have already been applied
// to the source index and will be copied to the clone target. But
// we repeat it here for explicitness.
@ -80,11 +88,16 @@ export const cloneIndex = ({
refresh_interval: '1s',
// Bump priority so that recovery happens before newer indices
priority: 10,
// Increase the fields limit beyond the default of 1000
mapping: {
total_fields: { limit: 1500 },
},
},
}),
};
return client.indices
.clone({
index: source,
target,
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
settings: {
index: indexSettings,
},
timeout,
})

View file

@ -9,8 +9,8 @@
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
import { errors as EsErrors } from '@elastic/elasticsearch';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { elasticsearchServiceMock } from '@kbn/core-elasticsearch-server-mocks';
import { createIndex } from './create_index';
import { setWriteBlock } from './set_write_block';
jest.mock('./catch_retryable_es_client_errors');
@ -35,11 +35,87 @@ describe('createIndex', () => {
const clientWithNonRetryableError = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(nonRetryableError)
);
it('calls client.indices.create with the correct parameter for default ES', async () => {
const statefulCapabilities = elasticsearchServiceMock.createCapabilities({ serverless: false });
const task = createIndex({
client,
indexName: 'my_index',
mappings: { properties: {} },
esCapabilities: statefulCapabilities,
});
try {
await task();
} catch (e) {
/** ignore */
}
expect(client.indices.create.mock.calls[0][0]).toMatchInlineSnapshot(`
Object {
"aliases": Object {},
"index": "my_index",
"mappings": Object {
"properties": Object {},
},
"settings": Object {
"index": Object {
"auto_expand_replicas": "0-1",
"mapping": Object {
"total_fields": Object {
"limit": 1500,
},
},
"number_of_shards": 1,
"priority": 10,
"refresh_interval": "1s",
},
},
"timeout": "60s",
"wait_for_active_shards": "all",
}
`);
});
it('calls client.indices.create with the correct parameter for serverless ES', async () => {
const statelessCapabilities = elasticsearchServiceMock.createCapabilities({ serverless: true });
const task = createIndex({
client,
indexName: 'my_index',
mappings: { properties: {} },
esCapabilities: statelessCapabilities,
});
try {
await task();
} catch (e) {
/** ignore */
}
expect(client.indices.create.mock.calls[0][0]).toMatchInlineSnapshot(`
Object {
"aliases": Object {},
"index": "my_index",
"mappings": Object {
"properties": Object {},
},
"settings": Object {
"index": Object {
"mapping": Object {
"total_fields": Object {
"limit": 1500,
},
},
},
},
"timeout": "60s",
"wait_for_active_shards": "all",
}
`);
});
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = createIndex({
client,
indexName: 'new_index',
mappings: { properties: {} },
esCapabilities: elasticsearchServiceMock.createCapabilities(),
});
try {
await task();
@ -49,12 +125,19 @@ describe('createIndex', () => {
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
it('re-throws non retry-able errors', async () => {
const task = setWriteBlock({
const task = createIndex({
client: clientWithNonRetryableError,
index: 'my_index',
indexName: 'my_index',
mappings: { properties: {} },
esCapabilities: elasticsearchServiceMock.createCapabilities(),
});
await task();
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError);
});
});

View file

@ -10,7 +10,10 @@ import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import { pipe } from 'fp-ts/lib/pipeable';
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type {
ElasticsearchClient,
ElasticsearchCapabilities,
} from '@kbn/core-elasticsearch-server';
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
import {
catchRetryableEsClientErrors,
@ -19,6 +22,7 @@ import {
import {
DEFAULT_TIMEOUT,
INDEX_AUTO_EXPAND_REPLICAS,
INDEX_NUMBER_OF_SHARDS,
WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
} from './constants';
import { type IndexNotGreenTimeout, waitForIndexStatus } from './wait_for_index_status';
@ -42,6 +46,7 @@ export interface CreateIndexParams {
client: ElasticsearchClient;
indexName: string;
mappings: IndexMapping;
esCapabilities: ElasticsearchCapabilities;
aliases?: string[];
timeout?: string;
}
@ -62,6 +67,7 @@ export const createIndex = ({
client,
indexName,
mappings,
esCapabilities,
aliases = [],
timeout = DEFAULT_TIMEOUT,
}: CreateIndexParams): TaskEither.TaskEither<
@ -74,6 +80,28 @@ export const createIndex = ({
> = () => {
const aliasesObject = aliasArrayToRecord(aliases);
const indexSettings = {
// settings not being supported on serverless ES
...(esCapabilities.serverless
? {}
: {
// ES rule of thumb: shards should be several GB to 10's of GB, so
// Kibana is unlikely to cross that limit.
number_of_shards: INDEX_NUMBER_OF_SHARDS,
auto_expand_replicas: INDEX_AUTO_EXPAND_REPLICAS,
// Set an explicit refresh interval so that we don't inherit the
// value from incorrectly configured index templates (not required
// after we adopt system indices)
refresh_interval: '1s',
// Bump priority so that recovery happens before newer indices
priority: 10,
}),
// Increase the fields limit beyond the default of 1000
mapping: {
total_fields: { limit: 1500 },
},
};
return client.indices
.create({
index: indexName,
@ -87,22 +115,7 @@ export const createIndex = ({
mappings,
aliases: aliasesObject,
settings: {
index: {
// ES rule of thumb: shards should be several GB to 10's of GB, so
// Kibana is unlikely to cross that limit.
number_of_shards: 1,
auto_expand_replicas: INDEX_AUTO_EXPAND_REPLICAS,
// Set an explicit refresh interval so that we don't inherit the
// value from incorrectly configured index templates (not required
// after we adopt system indices)
refresh_interval: '1s',
// Bump priority so that recovery happens before newer indices
priority: 10,
// Increase the fields limit beyond the default of 1000
mapping: {
total_fields: { limit: 1500 },
},
},
index: indexSettings,
},
})
.then(() => {

View file

@ -17,6 +17,7 @@ import {
} from '@kbn/core-saved-objects-base-server-internal';
import type { Logger } from '@kbn/logging';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import { elasticsearchServiceMock } from '@kbn/core-elasticsearch-server-mocks';
import { createInitialState, type CreateInitialStateParams } from './initial_state';
import * as getOutdatedDocumentsQueryModule from './get_outdated_documents_query';
import { getOutdatedDocumentsQuery } from './get_outdated_documents_query';
@ -64,6 +65,7 @@ describe('createInitialState', () => {
typeRegistry,
docLinks,
logger,
esCapabilities: elasticsearchServiceMock.createCapabilities(),
};
});
@ -82,6 +84,9 @@ describe('createInitialState', () => {
"currentAlias": ".kibana_task_manager",
"discardCorruptObjects": false,
"discardUnknownObjects": false,
"esCapabilities": Object {
"serverless": false,
},
"excludeFromUpgradeFilterHooks": Object {},
"excludeOnUpgradeQuery": Object {
"bool": Object {

View file

@ -15,6 +15,7 @@ import type {
IndexTypesMap,
SavedObjectsMigrationConfigType,
} from '@kbn/core-saved-objects-base-server-internal';
import type { ElasticsearchCapabilities } from '@kbn/core-elasticsearch-server';
import {
getOutdatedDocumentsQuery,
type OutdatedDocumentsQueryParams,
@ -35,6 +36,7 @@ export interface CreateInitialStateParams extends OutdatedDocumentsQueryParams {
typeRegistry: ISavedObjectTypeRegistry;
docLinks: DocLinksServiceStart;
logger: Logger;
esCapabilities: ElasticsearchCapabilities;
}
/**
@ -54,6 +56,7 @@ export const createInitialState = ({
typeRegistry,
docLinks,
logger,
esCapabilities,
}: CreateInitialStateParams): InitState => {
const outdatedDocumentsQuery = getOutdatedDocumentsQuery({
coreMigrationVersionPerType,
@ -137,5 +140,6 @@ export const createInitialState = ({
knownTypes,
excludeFromUpgradeFilterHooks: excludeFilterHooks,
migrationDocLinks,
esCapabilities,
};
};

View file

@ -9,6 +9,7 @@
import { take } from 'rxjs/operators';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { elasticsearchServiceMock } from '@kbn/core-elasticsearch-server-mocks';
import type { SavedObjectsType } from '@kbn/core-saved-objects-server';
import {
type MigrationResult,
@ -322,5 +323,6 @@ const mockOptions = (algorithm: 'v2' | 'zdt' = 'v2') => {
client: mockedClient,
docLinks: docLinksServiceMock.createSetupContract(),
nodeRoles: { backgroundTasks: true, ui: true, migrator: true },
esCapabilities: elasticsearchServiceMock.createCapabilities(),
};
};

View file

@ -15,7 +15,10 @@ import { BehaviorSubject } from 'rxjs';
import type { NodeRoles } from '@kbn/core-node-server';
import type { Logger } from '@kbn/logging';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type {
ElasticsearchClient,
ElasticsearchCapabilities,
} from '@kbn/core-elasticsearch-server';
import {
type SavedObjectUnsanitizedDoc,
type ISavedObjectTypeRegistry,
@ -48,6 +51,7 @@ export interface KibanaMigratorOptions {
docLinks: DocLinksServiceStart;
waitForMigrationCompletion: boolean;
nodeRoles: NodeRoles;
esCapabilities: ElasticsearchCapabilities;
}
/**
@ -71,6 +75,8 @@ export class KibanaMigrator implements IKibanaMigrator {
private readonly docLinks: DocLinksServiceStart;
private readonly waitForMigrationCompletion: boolean;
private readonly nodeRoles: NodeRoles;
private readonly esCapabilities: ElasticsearchCapabilities;
public readonly kibanaVersion: string;
/**
@ -87,6 +93,7 @@ export class KibanaMigrator implements IKibanaMigrator {
docLinks,
waitForMigrationCompletion,
nodeRoles,
esCapabilities,
}: KibanaMigratorOptions) {
this.client = client;
this.kibanaIndex = kibanaIndex;
@ -109,6 +116,7 @@ export class KibanaMigrator implements IKibanaMigrator {
// operation so we cache the result
this.activeMappings = buildActiveMappings(this.mappingProperties);
this.docLinks = docLinks;
this.esCapabilities = esCapabilities;
}
public runMigrations({ rerun = false }: { rerun?: boolean } = {}): Promise<MigrationResult[]> {
@ -152,6 +160,7 @@ export class KibanaMigrator implements IKibanaMigrator {
serializer: this.serializer,
elasticsearchClient: this.client,
nodeRoles: this.nodeRoles,
esCapabilities: this.esCapabilities,
});
} else {
return runV2Migration({
@ -167,6 +176,7 @@ export class KibanaMigrator implements IKibanaMigrator {
elasticsearchClient: this.client,
mappingProperties: this.mappingProperties,
waitForMigrationCompletion: this.waitForMigrationCompletion,
esCapabilities: this.esCapabilities,
});
}
}

View file

@ -15,6 +15,7 @@ import * as Either from 'fp-ts/lib/Either';
import * as Option from 'fp-ts/lib/Option';
import { errors } from '@elastic/elasticsearch';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { elasticsearchServiceMock } from '@kbn/core-elasticsearch-server-mocks';
import type { AllControlStates, State } from './state';
import { createInitialState } from './initial_state';
import { ByteSizeValue } from '@kbn/config-schema';
@ -63,6 +64,7 @@ describe('migrationsStateActionMachine', () => {
},
typeRegistry,
docLinks,
esCapabilities: elasticsearchServiceMock.createCapabilities(),
logger: mockLogger.get(),
});

View file

@ -9,6 +9,7 @@
import { chain } from 'lodash';
import * as Either from 'fp-ts/lib/Either';
import * as Option from 'fp-ts/lib/Option';
import { elasticsearchServiceMock } from '@kbn/core-elasticsearch-server-mocks';
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
import {
DEFAULT_INDEX_TYPES_MAP,
@ -125,6 +126,7 @@ describe('migrations v2 model', () => {
waitForMigrationCompletion: false,
mustRelocateDocuments: false,
indexTypesMap: DEFAULT_INDEX_TYPES_MAP,
esCapabilities: elasticsearchServiceMock.createCapabilities(),
};
const postInitState = {
...baseState,

View file

@ -137,6 +137,7 @@ export const nextActionMap = (
client,
indexName: state.targetIndex,
mappings: state.targetIndexMappings,
esCapabilities: state.esCapabilities,
}),
CREATE_REINDEX_TEMP: (state: CreateReindexTempState) =>
Actions.createIndex({
@ -144,6 +145,7 @@ export const nextActionMap = (
indexName: state.tempIndex,
aliases: [state.tempIndexAlias],
mappings: state.tempIndexMappings,
esCapabilities: state.esCapabilities,
}),
READY_TO_REINDEX_SYNC: () =>
Actions.synchronizeMigrators({
@ -194,7 +196,12 @@ export const nextActionMap = (
SET_TEMP_WRITE_BLOCK: (state: SetTempWriteBlock) =>
Actions.setWriteBlock({ client, index: state.tempIndex }),
CLONE_TEMP_TO_TARGET: (state: CloneTempToTarget) =>
Actions.cloneIndex({ client, source: state.tempIndex, target: state.targetIndex }),
Actions.cloneIndex({
client,
source: state.tempIndex,
target: state.targetIndex,
esCapabilities: state.esCapabilities,
}),
REFRESH_TARGET: (state: RefreshTarget) =>
Actions.refreshIndex({ client, index: state.targetIndex }),
CHECK_TARGET_MAPPINGS: (state: CheckTargetMappingsState) =>
@ -281,6 +288,7 @@ export const nextActionMap = (
client,
indexName: state.sourceIndex.value,
mappings: state.sourceIndexMappings.value,
esCapabilities: state.esCapabilities,
}),
LEGACY_REINDEX: (state: LegacyReindexState) =>
Actions.reindex({

View file

@ -10,6 +10,7 @@ import buffer from 'buffer';
import { ByteSizeValue } from '@kbn/config-schema';
import { docLinksServiceMock } from '@kbn/core-doc-links-server-mocks';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { elasticsearchServiceMock } from '@kbn/core-elasticsearch-server-mocks';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import type { MigrationResult } from '@kbn/core-saved-objects-base-server-internal';
import { createInitialState } from './initial_state';
@ -79,6 +80,7 @@ describe('runResilientMigrator', () => {
typeRegistry: options.typeRegistry,
docLinks: options.docLinks,
logger: options.logger,
esCapabilities: options.esCapabilities,
});
// store the created initial state
@ -153,5 +155,6 @@ const mockOptions = (): RunResilientMigratorParams => {
},
typeRegistry: savedObjectTypeRegistryMock,
docLinks: docLinksServiceMock.createSetupContract(),
esCapabilities: elasticsearchServiceMock.createCapabilities(),
};
};

View file

@ -8,7 +8,10 @@
import type { Logger } from '@kbn/logging';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type {
ElasticsearchClient,
ElasticsearchCapabilities,
} from '@kbn/core-elasticsearch-server';
import type { SavedObjectsMigrationVersion } from '@kbn/core-saved-objects-common';
import type { ISavedObjectTypeRegistry } from '@kbn/core-saved-objects-server';
import type {
@ -60,6 +63,7 @@ export interface RunResilientMigratorParams {
migrationsConfig: SavedObjectsMigrationConfigType;
typeRegistry: ISavedObjectTypeRegistry;
docLinks: DocLinksServiceStart;
esCapabilities: ElasticsearchCapabilities;
}
/**
@ -86,6 +90,7 @@ export async function runResilientMigrator({
migrationsConfig,
typeRegistry,
docLinks,
esCapabilities,
}: RunResilientMigratorParams): Promise<MigrationResult> {
const initialState = createInitialState({
kibanaVersion,
@ -101,6 +106,7 @@ export async function runResilientMigrator({
typeRegistry,
docLinks,
logger,
esCapabilities,
});
const migrationClient = client.child(MIGRATION_CLIENT_OPTIONS);
return migrationStateActionMachine({

View file

@ -9,6 +9,7 @@
import buffer from 'buffer';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { elasticsearchServiceMock } from '@kbn/core-elasticsearch-server-mocks';
import {
type MigrationResult,
SavedObjectsSerializer,
@ -271,5 +272,6 @@ const mockOptions = (kibanaVersion = '8.2.3'): RunV2MigrationOpts => {
}),
serializer: new SavedObjectsSerializer(typeRegistry),
mappingProperties: buildTypesMappings(typeRegistry.getAllTypes()),
esCapabilities: elasticsearchServiceMock.createCapabilities(),
};
};

View file

@ -8,7 +8,10 @@
import type { Logger } from '@kbn/logging';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type {
ElasticsearchClient,
ElasticsearchCapabilities,
} from '@kbn/core-elasticsearch-server';
import type {
ISavedObjectTypeRegistry,
ISavedObjectsSerializer,
@ -56,6 +59,8 @@ export interface RunV2MigrationOpts {
mappingProperties: SavedObjectsTypeMappingDefinitions;
/** Tells whether this instance should actively participate in the migration or not */
waitForMigrationCompletion: boolean;
/** Capabilities of the ES cluster we're using */
esCapabilities: ElasticsearchCapabilities;
}
export const runV2Migration = async (options: RunV2MigrationOpts): Promise<MigrationResult[]> => {
@ -147,6 +152,7 @@ export const runV2Migration = async (options: RunV2MigrationOpts): Promise<Migra
migrationsConfig: options.migrationConfig,
typeRegistry: options.typeRegistry,
docLinks: options.docLinks,
esCapabilities: options.esCapabilities,
});
},
};

View file

@ -14,6 +14,7 @@ import type {
SavedObjectTypeExcludeFromUpgradeFilterHook,
} from '@kbn/core-saved-objects-server';
import type { IndexMapping, IndexTypesMap } from '@kbn/core-saved-objects-base-server-internal';
import type { ElasticsearchCapabilities } from '@kbn/core-elasticsearch-server';
import type { ControlState } from './state_action_machine';
import type { AliasAction } from './actions';
import type { TransformErrorObjects } from './core';
@ -186,6 +187,9 @@ export interface BaseState extends ControlState {
* }
*/
readonly indexTypesMap: IndexTypesMap;
/** Capabilities of the ES cluster we're using */
readonly esCapabilities: ElasticsearchCapabilities;
}
export interface InitState extends BaseState {

View file

@ -27,6 +27,7 @@ export const createContext = ({
typeRegistry,
serializer,
nodeRoles,
esCapabilities,
}: CreateContextOps): MigratorContext => {
return {
migrationConfig,
@ -44,5 +45,6 @@ export const createContext = ({
batchSize: migrationConfig.batchSize,
discardCorruptObjects: Boolean(migrationConfig.discardCorruptObjects),
nodeRoles,
esCapabilities,
};
};

View file

@ -6,7 +6,10 @@
* Side Public License, v 1.
*/
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type {
ElasticsearchClient,
ElasticsearchCapabilities,
} from '@kbn/core-elasticsearch-server';
import type { NodeRoles } from '@kbn/core-node-server';
import type {
ISavedObjectTypeRegistry,
@ -53,4 +56,6 @@ export interface MigratorContext {
readonly discardCorruptObjects: boolean;
/** The node roles of the Kibana instance */
readonly nodeRoles: NodeRoles;
/** Capabilities of the ES cluster we're using */
readonly esCapabilities: ElasticsearchCapabilities;
}

View file

@ -6,7 +6,10 @@
* Side Public License, v 1.
*/
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type {
ElasticsearchClient,
ElasticsearchCapabilities,
} from '@kbn/core-elasticsearch-server';
import {
type SavedObjectsMigrationConfigType,
type MigrationResult,
@ -45,6 +48,8 @@ export interface MigrateIndexOptions {
elasticsearchClient: ElasticsearchClient;
/** The node roles of the Kibana instance */
readonly nodeRoles: NodeRoles;
/** Capabilities of the ES cluster we're using */
esCapabilities: ElasticsearchCapabilities;
}
export const migrateIndex = async ({

View file

@ -66,6 +66,7 @@ export const nextActionMap = (context: MigratorContext) => {
client,
indexName: state.currentIndex,
mappings: state.indexMappings,
esCapabilities: context.esCapabilities,
}),
UPDATE_INDEX_MAPPINGS: (state: UpdateIndexMappingsState) =>
Actions.updateAndPickupMappings({

View file

@ -9,7 +9,10 @@
import type { Logger } from '@kbn/logging';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import type { NodeRoles } from '@kbn/core-node-server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type {
ElasticsearchClient,
ElasticsearchCapabilities,
} from '@kbn/core-elasticsearch-server';
import type {
ISavedObjectTypeRegistry,
ISavedObjectsSerializer,
@ -43,6 +46,8 @@ export interface RunZeroDowntimeMigrationOpts {
elasticsearchClient: ElasticsearchClient;
/** The node roles of the Kibana instance */
nodeRoles: NodeRoles;
/** Capabilities of the ES cluster we're using */
esCapabilities: ElasticsearchCapabilities;
}
export const runZeroDowntimeMigration = async (

View file

@ -11,6 +11,7 @@ import {
ElasticsearchClientMock,
elasticsearchClientMock,
} from '@kbn/core-elasticsearch-client-server-mocks';
import { elasticsearchServiceMock } from '@kbn/core-elasticsearch-server-mocks';
import {
SavedObjectTypeRegistry,
type SavedObjectsMigrationConfigType,
@ -67,6 +68,7 @@ export const createContextMock = (
batchSize: 1000,
discardCorruptObjects: false,
nodeRoles: { migrator: true, ui: false, backgroundTasks: false },
esCapabilities: elasticsearchServiceMock.createCapabilities(),
...parts,
};
};

View file

@ -32,6 +32,7 @@
"@kbn/core-saved-objects-base-server-mocks",
"@kbn/core-elasticsearch-server-internal",
"@kbn/core-node-server",
"@kbn/core-elasticsearch-server-mocks",
],
"exclude": [
"target/**/*",

View file

@ -15,7 +15,10 @@ import type { CoreContext, CoreService } from '@kbn/core-base-server-internal';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import type { KibanaRequest } from '@kbn/core-http-server';
import type { InternalHttpServiceSetup } from '@kbn/core-http-server-internal';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type {
ElasticsearchClient,
ElasticsearchCapabilities,
} from '@kbn/core-elasticsearch-server';
import type {
InternalElasticsearchServiceSetup,
InternalElasticsearchServiceStart,
@ -223,7 +226,8 @@ export class SavedObjectsService
elasticsearch.client.asInternalUser,
docLinks,
waitForMigrationCompletion,
node
node,
elasticsearch.getCapabilities()
);
this.migrator$.next(migrator);
@ -368,7 +372,8 @@ export class SavedObjectsService
client: ElasticsearchClient,
docLinks: DocLinksServiceStart,
waitForMigrationCompletion: boolean,
nodeInfo: NodeInfo
nodeInfo: NodeInfo,
esCapabilities: ElasticsearchCapabilities
): IKibanaMigrator {
return new KibanaMigrator({
typeRegistry: this.typeRegistry,
@ -381,6 +386,7 @@ export class SavedObjectsService
docLinks,
waitForMigrationCompletion,
nodeRoles: nodeInfo.roles,
esCapabilities,
});
}
}

View file

@ -5,7 +5,9 @@
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { defaultsDeep } from 'lodash';
import { Client, HttpConnection } from '@elastic/elasticsearch';
import { Cluster } from '@kbn/es';
import Path from 'path';
import { REPO_ROOT } from '@kbn/repo-info';
@ -14,8 +16,11 @@ import execa from 'execa';
import { CliArgs } from '@kbn/config';
import { createRoot, type TestElasticsearchUtils, type TestKibanaUtils } from './create_root';
export type TestServerlessESUtils = Pick<TestElasticsearchUtils, 'stop' | 'es'>;
export type TestServerlessESUtils = Pick<TestElasticsearchUtils, 'stop' | 'es'> & {
getClient: () => Client;
};
export type TestServerlessKibanaUtils = TestKibanaUtils;
export interface TestServerlessUtils {
startES: () => Promise<TestServerlessESUtils>;
startKibana: (abortSignal?: AbortSignal) => Promise<TestServerlessKibanaUtils>;
@ -37,9 +42,10 @@ export function createTestServerlessInstances({
adjustTimeout?.(120_000);
return {
startES: async () => {
const { stop } = await esUtils.start();
const { stop, getClient } = await esUtils.start();
return {
es: esUtils.es,
getClient,
stop,
};
},
@ -70,7 +76,10 @@ function createServerlessES() {
await es.runServerless({
basePath: Path.join(REPO_ROOT, '.es/es_test_serverless'),
});
// runServerless doesn't wait until the nodes are up
await waitUntilClusterReady(getServerlessESClient());
return {
getClient: getServerlessESClient,
stop: async () => {
// hack to stop the ES cluster
await execa('docker', ['container', 'stop', 'es01', 'es02', 'es03']);
@ -80,6 +89,30 @@ function createServerlessES() {
};
}
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
const waitUntilClusterReady = async (client: Client, timeoutMs = 60 * 1000) => {
const started = Date.now();
while (started + timeoutMs > Date.now()) {
try {
await client.info();
break;
} catch (e) {
await delay(1000);
/* trap to continue */
}
}
};
const getServerlessESClient = () => {
return new Client({
// node ports not configurable from
node: 'http://localhost:9200',
Connection: HttpConnection,
});
};
const defaults = {
server: {
restrictInternalApis: true,
@ -95,6 +128,7 @@ const defaults = {
serviceAccountToken: 'BEEF',
},
};
function createServerlessKibana(settings = {}, cliArgs: Partial<CliArgs> = {}) {
return createRoot(defaultsDeep(settings, defaults), { ...cliArgs, serverless: true });
}

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { createTestServers, TestElasticsearchUtils } from '@kbn/core-test-helpers-kbn-server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { getCapabilitiesFromClient } from '@kbn/core-elasticsearch-server-internal';
describe('ES capabilities for traditional ES', () => {
let esServer: TestElasticsearchUtils;
let client: ElasticsearchClient;
beforeEach(async () => {
const { startES } = createTestServers({
adjustTimeout: (t: number) => jest.setTimeout(t),
settings: {
es: {
license: 'basic',
},
},
});
esServer = await startES();
client = esServer.es.getClient();
});
afterEach(async () => {
if (esServer) {
await esServer.stop();
}
});
it('returns the correct capabilities', async () => {
const capabilities = await getCapabilitiesFromClient(client);
expect(capabilities).toEqual({
serverless: false,
});
});
});

View file

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import {
TestServerlessESUtils,
createTestServerlessInstances,
} from '@kbn/core-test-helpers-kbn-server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { getCapabilitiesFromClient } from '@kbn/core-elasticsearch-server-internal';
// skipped because test serverless ES nodes are currently using static ports
// causing parallel jest runners to fail for obvious port conflicts reasons.
describe.skip('ES capabilities for serverless ES', () => {
let serverlessES: TestServerlessESUtils;
let client: ElasticsearchClient;
beforeEach(async () => {
const { startES } = createTestServerlessInstances({
adjustTimeout: jest.setTimeout,
});
serverlessES = await startES();
client = serverlessES.getClient();
});
afterEach(async () => {
await serverlessES?.stop();
});
it('returns the correct capabilities', async () => {
const capabilities = await getCapabilitiesFromClient(client);
expect(capabilities).toEqual({
serverless: true,
});
});
});

View file

@ -13,6 +13,7 @@ import { errors } from '@elastic/elasticsearch';
import type { TaskEither } from 'fp-ts/lib/TaskEither';
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { elasticsearchServiceMock } from '@kbn/core-elasticsearch-server-mocks';
import { createTestServers, type TestElasticsearchUtils } from '@kbn/core-test-helpers-kbn-server';
import {
bulkOverwriteTransformedDocuments,
@ -59,16 +60,19 @@ let esServer: TestElasticsearchUtils;
describe('migration actions', () => {
let client: ElasticsearchClient;
let esCapabilities: ReturnType<typeof elasticsearchServiceMock.createCapabilities>;
beforeAll(async () => {
esServer = await startES();
client = esServer.es.getClient().child(MIGRATION_CLIENT_OPTIONS);
esCapabilities = elasticsearchServiceMock.createCapabilities();
// Create test fixture data:
await createIndex({
client,
indexName: 'existing_index_with_docs',
aliases: ['existing_index_with_docs_alias'],
esCapabilities,
mappings: {
dynamic: true,
properties: {
@ -98,11 +102,17 @@ describe('migration actions', () => {
refresh: 'wait_for',
})();
await createIndex({ client, indexName: 'existing_index_2', mappings: { properties: {} } })();
await createIndex({
client,
indexName: 'existing_index_2',
mappings: { properties: {} },
esCapabilities,
})();
await createIndex({
client,
indexName: 'existing_index_with_write_block',
mappings: { properties: {} },
esCapabilities,
})();
await bulkOverwriteTransformedDocuments({
client,
@ -276,6 +286,7 @@ describe('migration actions', () => {
client,
indexName: 'new_index_without_write_block',
mappings: { properties: {} },
esCapabilities,
})();
});
it('resolves right when setting the write block succeeds', async () => {
@ -341,11 +352,13 @@ describe('migration actions', () => {
client,
indexName: 'existing_index_without_write_block_2',
mappings: { properties: {} },
esCapabilities,
})();
await createIndex({
client,
indexName: 'existing_index_with_write_block_2',
mappings: { properties: {} },
esCapabilities,
})();
await setWriteBlock({ client, index: 'existing_index_with_write_block_2' })();
});
@ -514,6 +527,7 @@ describe('migration actions', () => {
client,
source: 'existing_index_with_write_block',
target: 'clone_target_1',
esCapabilities,
});
expect.assertions(3);
await expect(task()).resolves.toMatchInlineSnapshot(`
@ -557,6 +571,7 @@ describe('migration actions', () => {
client,
source: 'existing_index_with_write_block',
target: 'clone_red_then_green_index',
esCapabilities,
})();
let indexGreen = false;
@ -609,6 +624,7 @@ describe('migration actions', () => {
source: 'existing_index_with_write_block',
target: 'clone_red_index',
timeout: '1s',
esCapabilities,
})();
await expect(cloneIndexPromise).resolves.toMatchInlineSnapshot(`
@ -637,6 +653,7 @@ describe('migration actions', () => {
source: 'existing_index_with_write_block',
target: 'clone_red_index',
timeout: '1s',
esCapabilities,
})();
await expect(cloneIndexPromise).resolves.toMatchInlineSnapshot(`
@ -665,6 +682,7 @@ describe('migration actions', () => {
source: 'existing_index_with_write_block',
target: 'clone_red_index',
timeout: '30s',
esCapabilities,
})();
await expect(cloneIndexPromise).resolves.toMatchInlineSnapshot(`
@ -679,7 +697,12 @@ describe('migration actions', () => {
});
it('resolves left index_not_found_exception if the source index does not exist', async () => {
expect.assertions(1);
const task = cloneIndex({ client, source: 'no_such_index', target: 'clone_target_3' });
const task = cloneIndex({
client,
source: 'no_such_index',
target: 'clone_target_3',
esCapabilities,
});
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Left",
@ -697,6 +720,7 @@ describe('migration actions', () => {
client,
source: 'existing_index_with_write_block',
target: 'clone_target_4',
esCapabilities,
})();
await expect(cloneIndexPromise).resolves.toMatchInlineSnapshot(`
Object {
@ -866,7 +890,12 @@ describe('migration actions', () => {
expect.assertions(2);
// Simulate a reindex that only adds some of the documents from the
// source index into the target index
await createIndex({ client, indexName: 'reindex_target_4', mappings: { properties: {} } })();
await createIndex({
client,
indexName: 'reindex_target_4',
mappings: { properties: {} },
esCapabilities,
})();
const response = await client.search({ index: 'existing_index_with_docs', size: 1000 });
const sourceDocs = (response.hits?.hits as SavedObjectsRawDoc[])
.slice(0, 2)
@ -931,6 +960,7 @@ describe('migration actions', () => {
/** no title field */
},
},
esCapabilities,
})();
const {
@ -971,6 +1001,7 @@ describe('migration actions', () => {
dynamic: false,
properties: { title: { type: 'integer' } }, // integer is incompatible with string title
},
esCapabilities,
})();
const {
@ -1464,6 +1495,7 @@ describe('migration actions', () => {
dynamic: false,
properties: {},
},
esCapabilities,
})();
const sourceDocs = [
{ _source: { title: 'doc 1' } },
@ -1769,6 +1801,7 @@ describe('migration actions', () => {
indexName: 'create_new_index',
mappings: undefined as any,
timeout: '1nanos',
esCapabilities,
})();
await expect(createIndexPromise).resolves.toEqual({
_tag: 'Right',
@ -1809,6 +1842,7 @@ describe('migration actions', () => {
client,
indexName: 'red_then_yellow_index',
mappings: undefined as any,
esCapabilities,
})();
let indexYellow = false;
@ -1861,6 +1895,7 @@ describe('migration actions', () => {
client,
indexName: 'yellow_then_green_index',
mappings: undefined as any,
esCapabilities,
})();
let indexGreen = false;
@ -1893,6 +1928,7 @@ describe('migration actions', () => {
client,
indexName: 'create_index_1',
mappings: undefined as any,
esCapabilities,
})();
await expect(createIndexPromise).resolves.toMatchInlineSnapshot(`
Object {
@ -1907,7 +1943,12 @@ describe('migration actions', () => {
// Creating an index with the same name as an existing alias to induce
// failure
await expect(
createIndex({ client, indexName: 'existing_index_2_alias', mappings: undefined as any })()
createIndex({
client,
indexName: 'existing_index_2_alias',
mappings: undefined as any,
esCapabilities,
})()
).rejects.toThrow('invalid_index_name_exception');
});
});

View file

@ -8,6 +8,7 @@
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { InternalCoreStart } from '@kbn/core-lifecycle-server-internal';
import { Root } from '@kbn/core-root-server-internal';
import { elasticsearchServiceMock } from '@kbn/core-elasticsearch-server-mocks';
import type { ElasticsearchClient } from '../../../../..';
import {
createRootWithCorePlugins,
@ -48,6 +49,7 @@ describe('Elasticsearch Errors', () => {
client,
indexName: 'existing_index_with_write_block',
mappings: { properties: {} },
esCapabilities: elasticsearchServiceMock.createCapabilities(),
})();
await setWriteBlock({ client, index: 'existing_index_with_write_block' })();
});

View file

@ -16,7 +16,7 @@ import { ConfigService, Env } from '@kbn/config';
import { getEnvOptions } from '@kbn/config-mocks';
import { REPO_ROOT } from '@kbn/repo-info';
import { KibanaMigrator } from '@kbn/core-saved-objects-migration-server-internal';
import { elasticsearchServiceMock } from '@kbn/core-elasticsearch-server-mocks';
import {
SavedObjectConfig,
type SavedObjectsConfigType,
@ -311,6 +311,7 @@ const getMigrator = async ({
docLinks,
waitForMigrationCompletion: false, // ensure we have an active role in the migration
nodeRoles,
esCapabilities: elasticsearchServiceMock.createCapabilities(),
});
};