mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
Refactor Plugins to access elasticsearch from CoreStart (#59915)
* x-pack/watcher: use Elasticsearch from CoreStart * x-pack/upgrade_assistant: use Elasticsearch from CoreStart * x-pack/actions: use Elasticsearch from CoreStart * x-pack/alerting: use Elasticsearch from CoreStart * x-pack/lens: use Elasticsearch from CoreStart * expressions: use Elasticsearch from CoreStart * x-pack/remote_clusters: remove unused Elasticsearch dependency on CoreSetup * x-pack/oss_telemetry: use Elasticsearch from CoreStart * Cleanup after #59886 * x-pack/watcher: create custom client only once * Revert "x-pack/watcher: create custom client only once" This reverts commit78fc4d2e93
. * Revert "x-pack/watcher: use Elasticsearch from CoreStart" This reverts commitb621af9388
. * x-pack/task_manager: use Elasticsearch from CoreStart * x-pack/event_log: use Elasticsearch from CoreStart * x-pack/alerting: use Elasticsearch from CoreStart * x-pack/apm: use Elasticsearch from CoreStart * x-pack/actions: use Elasticsearch from CoreStart * PR Feedback * APM review nits * Remove unused variable * Remove unused variable * x-pack/apm: better typesafety Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
parent
afb48bceca
commit
00a1144ae2
25 changed files with 148 additions and 125 deletions
|
@ -33,7 +33,12 @@ import { CoreService } from '../../types';
|
|||
import { merge } from '../../utils';
|
||||
import { CoreContext } from '../core_context';
|
||||
import { Logger } from '../logging';
|
||||
import { ClusterClient, ScopeableRequest } from './cluster_client';
|
||||
import {
|
||||
ClusterClient,
|
||||
ScopeableRequest,
|
||||
IClusterClient,
|
||||
ICustomClusterClient,
|
||||
} from './cluster_client';
|
||||
import { ElasticsearchClientConfig } from './elasticsearch_client_config';
|
||||
import { ElasticsearchConfig, ElasticsearchConfigType } from './elasticsearch_config';
|
||||
import { InternalHttpServiceSetup, GetAuthHeaders } from '../http/';
|
||||
|
@ -58,12 +63,14 @@ export class ElasticsearchService
|
|||
implements CoreService<InternalElasticsearchServiceSetup, ElasticsearchServiceStart> {
|
||||
private readonly log: Logger;
|
||||
private readonly config$: Observable<ElasticsearchConfig>;
|
||||
private subscription: Subscription | undefined;
|
||||
private subscription?: Subscription;
|
||||
private stop$ = new Subject();
|
||||
private kibanaVersion: string;
|
||||
createClient: InternalElasticsearchServiceSetup['createClient'] | undefined;
|
||||
dataClient: InternalElasticsearchServiceSetup['dataClient'] | undefined;
|
||||
adminClient: InternalElasticsearchServiceSetup['adminClient'] | undefined;
|
||||
private createClient?: (
|
||||
type: string,
|
||||
clientConfig?: Partial<ElasticsearchClientConfig>
|
||||
) => ICustomClusterClient;
|
||||
private adminClient?: IClusterClient;
|
||||
|
||||
constructor(private readonly coreContext: CoreContext) {
|
||||
this.kibanaVersion = coreContext.env.packageInfo.version;
|
||||
|
|
|
@ -201,7 +201,7 @@ export class Server {
|
|||
uiSettings: uiSettingsStart,
|
||||
};
|
||||
|
||||
const pluginsStart = await this.plugins.start(this.coreStart!);
|
||||
const pluginsStart = await this.plugins.start(this.coreStart);
|
||||
|
||||
await this.legacy.start({
|
||||
core: {
|
||||
|
|
|
@ -26,7 +26,7 @@ import { register, registryFactory, Registry, Fn } from '@kbn/interpreter/common
|
|||
|
||||
import Boom from 'boom';
|
||||
import { schema } from '@kbn/config-schema';
|
||||
import { CoreSetup, Logger } from 'src/core/server';
|
||||
import { CoreSetup, Logger, APICaller } from 'src/core/server';
|
||||
import { ExpressionsServerSetupDependencies } from './plugin';
|
||||
import { typeSpecs, ExpressionType } from '../common';
|
||||
import { serializeProvider } from '../common';
|
||||
|
@ -97,7 +97,10 @@ export const createLegacyServerEndpoints = (
|
|||
* @param {*} handlers - The Canvas handlers
|
||||
* @param {*} fnCall - Describes the function being run `{ functionName, args, context }`
|
||||
*/
|
||||
async function runFunction(handlers: any, fnCall: any) {
|
||||
async function runFunction(
|
||||
handlers: { environment: string; elasticsearchClient: APICaller },
|
||||
fnCall: any
|
||||
) {
|
||||
const { functionName, args, context } = fnCall;
|
||||
const { deserialize } = serializeProvider(registries.types.toJS());
|
||||
const fnDef = registries.serverFunctions.toJS()[functionName];
|
||||
|
@ -112,18 +115,14 @@ export const createLegacyServerEndpoints = (
|
|||
* results back using ND-JSON.
|
||||
*/
|
||||
plugins.bfetch.addBatchProcessingRoute(`/api/interpreter/fns`, request => {
|
||||
const scopedClient = core.elasticsearch.dataClient.asScoped(request);
|
||||
const handlers = {
|
||||
environment: 'server',
|
||||
elasticsearchClient: async (
|
||||
endpoint: string,
|
||||
clientParams: Record<string, any> = {},
|
||||
options?: any
|
||||
) => scopedClient.callAsCurrentUser(endpoint, clientParams, options),
|
||||
};
|
||||
|
||||
return {
|
||||
onBatchItem: async (fnCall: any) => {
|
||||
const [coreStart] = await core.getStartServices();
|
||||
const handlers = {
|
||||
environment: 'server',
|
||||
elasticsearchClient: coreStart.elasticsearch.legacy.client.asScoped(request)
|
||||
.callAsCurrentUser,
|
||||
};
|
||||
const result = await runFunction(handlers, fnCall);
|
||||
if (typeof result === 'undefined') {
|
||||
throw new Error(`Function ${fnCall.functionName} did not return anything.`);
|
||||
|
|
|
@ -99,6 +99,9 @@ describe('Actions Plugin', () => {
|
|||
savedObjects: {
|
||||
client: {},
|
||||
},
|
||||
elasticsearch: {
|
||||
adminClient: jest.fn(),
|
||||
},
|
||||
},
|
||||
} as any,
|
||||
httpServerMock.createKibanaRequest(),
|
||||
|
|
|
@ -11,13 +11,13 @@ import {
|
|||
Plugin,
|
||||
CoreSetup,
|
||||
CoreStart,
|
||||
IClusterClient,
|
||||
KibanaRequest,
|
||||
Logger,
|
||||
SharedGlobalConfig,
|
||||
RequestHandler,
|
||||
IContextProvider,
|
||||
SavedObjectsServiceStart,
|
||||
ElasticsearchServiceStart,
|
||||
} from '../../../../src/core/server';
|
||||
|
||||
import {
|
||||
|
@ -89,7 +89,6 @@ export class ActionsPlugin implements Plugin<Promise<PluginSetupContract>, Plugi
|
|||
|
||||
private readonly logger: Logger;
|
||||
private serverBasePath?: string;
|
||||
private adminClient?: IClusterClient;
|
||||
private taskRunnerFactory?: TaskRunnerFactory;
|
||||
private actionTypeRegistry?: ActionTypeRegistry;
|
||||
private actionExecutor?: ActionExecutor;
|
||||
|
@ -173,7 +172,6 @@ export class ActionsPlugin implements Plugin<Promise<PluginSetupContract>, Plugi
|
|||
this.actionTypeRegistry = actionTypeRegistry;
|
||||
this.serverBasePath = core.http.basePath.serverBasePath;
|
||||
this.actionExecutor = actionExecutor;
|
||||
this.adminClient = core.elasticsearch.adminClient;
|
||||
this.spaces = plugins.spaces?.spacesService;
|
||||
|
||||
registerBuiltInActionTypes({
|
||||
|
@ -233,7 +231,6 @@ export class ActionsPlugin implements Plugin<Promise<PluginSetupContract>, Plugi
|
|||
actionTypeRegistry,
|
||||
taskRunnerFactory,
|
||||
kibanaIndex,
|
||||
adminClient,
|
||||
isESOUsingEphemeralEncryptionKey,
|
||||
preconfiguredActions,
|
||||
} = this;
|
||||
|
@ -242,7 +239,7 @@ export class ActionsPlugin implements Plugin<Promise<PluginSetupContract>, Plugi
|
|||
logger,
|
||||
eventLogger: this.eventLogger!,
|
||||
spaces: this.spaces,
|
||||
getServices: this.getServicesFactory(core.savedObjects),
|
||||
getServices: this.getServicesFactory(core.savedObjects, core.elasticsearch),
|
||||
encryptedSavedObjectsPlugin: plugins.encryptedSavedObjects,
|
||||
actionTypeRegistry: actionTypeRegistry!,
|
||||
preconfiguredActions,
|
||||
|
@ -282,7 +279,7 @@ export class ActionsPlugin implements Plugin<Promise<PluginSetupContract>, Plugi
|
|||
savedObjectsClient: core.savedObjects.getScopedClient(request),
|
||||
actionTypeRegistry: actionTypeRegistry!,
|
||||
defaultKibanaIndex: await kibanaIndex,
|
||||
scopedClusterClient: adminClient!.asScoped(request),
|
||||
scopedClusterClient: core.elasticsearch.legacy.client.asScoped(request),
|
||||
preconfiguredActions,
|
||||
});
|
||||
},
|
||||
|
@ -291,11 +288,11 @@ export class ActionsPlugin implements Plugin<Promise<PluginSetupContract>, Plugi
|
|||
}
|
||||
|
||||
private getServicesFactory(
|
||||
savedObjects: SavedObjectsServiceStart
|
||||
savedObjects: SavedObjectsServiceStart,
|
||||
elasticsearch: ElasticsearchServiceStart
|
||||
): (request: KibanaRequest) => Services {
|
||||
const { adminClient } = this;
|
||||
return request => ({
|
||||
callCluster: adminClient!.asScoped(request).callAsCurrentUser,
|
||||
callCluster: elasticsearch.legacy.client.asScoped(request).callAsCurrentUser,
|
||||
savedObjectsClient: savedObjects.getScopedClient(request),
|
||||
});
|
||||
}
|
||||
|
@ -303,12 +300,8 @@ export class ActionsPlugin implements Plugin<Promise<PluginSetupContract>, Plugi
|
|||
private createRouteHandlerContext = (
|
||||
defaultKibanaIndex: string
|
||||
): IContextProvider<RequestHandler<any, any, any>, 'actions'> => {
|
||||
const {
|
||||
actionTypeRegistry,
|
||||
adminClient,
|
||||
isESOUsingEphemeralEncryptionKey,
|
||||
preconfiguredActions,
|
||||
} = this;
|
||||
const { actionTypeRegistry, isESOUsingEphemeralEncryptionKey, preconfiguredActions } = this;
|
||||
|
||||
return async function actionsRouteHandlerContext(context, request) {
|
||||
return {
|
||||
getActionsClient: () => {
|
||||
|
@ -321,7 +314,7 @@ export class ActionsPlugin implements Plugin<Promise<PluginSetupContract>, Plugi
|
|||
savedObjectsClient: context.core.savedObjects.client,
|
||||
actionTypeRegistry: actionTypeRegistry!,
|
||||
defaultKibanaIndex,
|
||||
scopedClusterClient: adminClient!.asScoped(request),
|
||||
scopedClusterClient: context.core.elasticsearch.adminClient,
|
||||
preconfiguredActions,
|
||||
});
|
||||
},
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { Logger, CoreSetup } from 'kibana/server';
|
||||
import { Logger, CoreSetup, APICaller } from 'kibana/server';
|
||||
import moment from 'moment';
|
||||
import {
|
||||
RunContext,
|
||||
|
@ -62,7 +62,11 @@ async function scheduleTasks(logger: Logger, taskManager: TaskManagerStartContra
|
|||
export function telemetryTaskRunner(logger: Logger, core: CoreSetup, kibanaIndex: string) {
|
||||
return ({ taskInstance }: RunContext) => {
|
||||
const { state } = taskInstance;
|
||||
const callCluster = core.elasticsearch.adminClient.callAsInternalUser;
|
||||
const callCluster = (...args: Parameters<APICaller>) => {
|
||||
return core.getStartServices().then(([{ elasticsearch: { legacy: { client } } }]) =>
|
||||
client.callAsInternalUser(...args)
|
||||
);
|
||||
};
|
||||
return {
|
||||
async run() {
|
||||
return Promise.all([
|
||||
|
|
|
@ -19,7 +19,6 @@ import { TaskRunnerFactory } from './task_runner';
|
|||
import { AlertsClientFactory } from './alerts_client_factory';
|
||||
import { LicenseState } from './lib/license_state';
|
||||
import {
|
||||
IClusterClient,
|
||||
KibanaRequest,
|
||||
Logger,
|
||||
PluginInitializerContext,
|
||||
|
@ -29,6 +28,7 @@ import {
|
|||
IContextProvider,
|
||||
RequestHandler,
|
||||
SharedGlobalConfig,
|
||||
ElasticsearchServiceStart,
|
||||
} from '../../../../src/core/server';
|
||||
|
||||
import {
|
||||
|
@ -94,7 +94,6 @@ export class AlertingPlugin {
|
|||
private readonly logger: Logger;
|
||||
private alertTypeRegistry?: AlertTypeRegistry;
|
||||
private readonly taskRunnerFactory: TaskRunnerFactory;
|
||||
private adminClient?: IClusterClient;
|
||||
private serverBasePath?: string;
|
||||
private licenseState: LicenseState | null = null;
|
||||
private isESOUsingEphemeralEncryptionKey?: boolean;
|
||||
|
@ -119,7 +118,6 @@ export class AlertingPlugin {
|
|||
}
|
||||
|
||||
public async setup(core: CoreSetup, plugins: AlertingPluginsSetup): Promise<PluginSetupContract> {
|
||||
this.adminClient = core.elasticsearch.adminClient;
|
||||
this.licenseState = new LicenseState(plugins.licensing.license$);
|
||||
this.spaces = plugins.spaces?.spacesService;
|
||||
this.security = plugins.security;
|
||||
|
@ -223,7 +221,7 @@ export class AlertingPlugin {
|
|||
|
||||
taskRunnerFactory.initialize({
|
||||
logger,
|
||||
getServices: this.getServicesFactory(core.savedObjects),
|
||||
getServices: this.getServicesFactory(core.savedObjects, core.elasticsearch),
|
||||
spaceIdToNamespace: this.spaceIdToNamespace,
|
||||
actionsPlugin: plugins.actions,
|
||||
encryptedSavedObjectsPlugin: plugins.encryptedSavedObjects,
|
||||
|
@ -263,11 +261,11 @@ export class AlertingPlugin {
|
|||
};
|
||||
|
||||
private getServicesFactory(
|
||||
savedObjects: SavedObjectsServiceStart
|
||||
savedObjects: SavedObjectsServiceStart,
|
||||
elasticsearch: ElasticsearchServiceStart
|
||||
): (request: KibanaRequest) => Services {
|
||||
const { adminClient } = this;
|
||||
return request => ({
|
||||
callCluster: adminClient!.asScoped(request).callAsCurrentUser,
|
||||
callCluster: elasticsearch.legacy.client.asScoped(request).callAsCurrentUser,
|
||||
savedObjectsClient: savedObjects.getScopedClient(request),
|
||||
});
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { Logger, CoreSetup } from 'kibana/server';
|
||||
import { Logger, CoreSetup, APICaller } from 'kibana/server';
|
||||
import moment from 'moment';
|
||||
import {
|
||||
RunContext,
|
||||
|
@ -65,7 +65,12 @@ async function scheduleTasks(logger: Logger, taskManager: TaskManagerStartContra
|
|||
export function telemetryTaskRunner(logger: Logger, core: CoreSetup, kibanaIndex: string) {
|
||||
return ({ taskInstance }: RunContext) => {
|
||||
const { state } = taskInstance;
|
||||
const callCluster = core.elasticsearch.adminClient.callAsInternalUser;
|
||||
const callCluster = (...args: Parameters<APICaller>) => {
|
||||
return core.getStartServices().then(([{ elasticsearch: { legacy: { client } } }]) =>
|
||||
client.callAsInternalUser(...args)
|
||||
);
|
||||
};
|
||||
|
||||
return {
|
||||
async run() {
|
||||
return Promise.all([
|
||||
|
|
|
@ -3,7 +3,13 @@
|
|||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
import { PluginInitializerContext, Plugin, CoreSetup } from 'src/core/server';
|
||||
import {
|
||||
PluginInitializerContext,
|
||||
Plugin,
|
||||
CoreSetup,
|
||||
CoreStart,
|
||||
Logger
|
||||
} from 'src/core/server';
|
||||
import { Observable, combineLatest, AsyncSubject } from 'rxjs';
|
||||
import { map, take } from 'rxjs/operators';
|
||||
import { Server } from 'hapi';
|
||||
|
@ -37,6 +43,8 @@ export interface APMPluginContract {
|
|||
}
|
||||
|
||||
export class APMPlugin implements Plugin<APMPluginContract> {
|
||||
private currentConfig?: APMConfig;
|
||||
private logger?: Logger;
|
||||
legacySetup$: AsyncSubject<LegacySetup>;
|
||||
constructor(private readonly initContext: PluginInitializerContext) {
|
||||
this.initContext = initContext;
|
||||
|
@ -56,7 +64,7 @@ export class APMPlugin implements Plugin<APMPluginContract> {
|
|||
actions?: ActionsPlugin['setup'];
|
||||
}
|
||||
) {
|
||||
const logger = this.initContext.logger.get();
|
||||
this.logger = this.initContext.logger.get();
|
||||
const config$ = this.initContext.config.create<APMXPackConfig>();
|
||||
const mergedConfig$ = combineLatest(plugins.apm_oss.config$, config$).pipe(
|
||||
map(([apmOssConfig, apmConfig]) => mergeConfigs(apmOssConfig, apmConfig))
|
||||
|
@ -71,49 +79,40 @@ export class APMPlugin implements Plugin<APMPluginContract> {
|
|||
}
|
||||
|
||||
this.legacySetup$.subscribe(__LEGACY => {
|
||||
createApmApi().init(core, { config$: mergedConfig$, logger, __LEGACY });
|
||||
createApmApi().init(core, {
|
||||
config$: mergedConfig$,
|
||||
logger: this.logger!,
|
||||
__LEGACY
|
||||
});
|
||||
});
|
||||
|
||||
const currentConfig = await mergedConfig$.pipe(take(1)).toPromise();
|
||||
this.currentConfig = await mergedConfig$.pipe(take(1)).toPromise();
|
||||
|
||||
if (
|
||||
plugins.taskManager &&
|
||||
plugins.usageCollection &&
|
||||
currentConfig['xpack.apm.telemetryCollectionEnabled']
|
||||
this.currentConfig['xpack.apm.telemetryCollectionEnabled']
|
||||
) {
|
||||
createApmTelemetry({
|
||||
core,
|
||||
config$: mergedConfig$,
|
||||
usageCollector: plugins.usageCollection,
|
||||
taskManager: plugins.taskManager,
|
||||
logger
|
||||
logger: this.logger
|
||||
});
|
||||
}
|
||||
|
||||
// create agent configuration index without blocking setup lifecycle
|
||||
createApmAgentConfigurationIndex({
|
||||
esClient: core.elasticsearch.dataClient,
|
||||
config: currentConfig,
|
||||
logger
|
||||
});
|
||||
// create custom action index without blocking setup lifecycle
|
||||
createApmCustomLinkIndex({
|
||||
esClient: core.elasticsearch.dataClient,
|
||||
config: currentConfig,
|
||||
logger
|
||||
});
|
||||
|
||||
plugins.home.tutorials.registerTutorial(
|
||||
tutorialProvider({
|
||||
isEnabled: currentConfig['xpack.apm.ui.enabled'],
|
||||
indexPatternTitle: currentConfig['apm_oss.indexPattern'],
|
||||
isEnabled: this.currentConfig['xpack.apm.ui.enabled'],
|
||||
indexPatternTitle: this.currentConfig['apm_oss.indexPattern'],
|
||||
cloud: plugins.cloud,
|
||||
indices: {
|
||||
errorIndices: currentConfig['apm_oss.errorIndices'],
|
||||
metricsIndices: currentConfig['apm_oss.metricsIndices'],
|
||||
onboardingIndices: currentConfig['apm_oss.onboardingIndices'],
|
||||
sourcemapIndices: currentConfig['apm_oss.sourcemapIndices'],
|
||||
transactionIndices: currentConfig['apm_oss.transactionIndices']
|
||||
errorIndices: this.currentConfig['apm_oss.errorIndices'],
|
||||
metricsIndices: this.currentConfig['apm_oss.metricsIndices'],
|
||||
onboardingIndices: this.currentConfig['apm_oss.onboardingIndices'],
|
||||
sourcemapIndices: this.currentConfig['apm_oss.sourcemapIndices'],
|
||||
transactionIndices: this.currentConfig['apm_oss.transactionIndices']
|
||||
}
|
||||
})
|
||||
);
|
||||
|
@ -127,12 +126,29 @@ export class APMPlugin implements Plugin<APMPluginContract> {
|
|||
getApmIndices: async () =>
|
||||
getApmIndices({
|
||||
savedObjectsClient: await getInternalSavedObjectsClient(core),
|
||||
config: currentConfig
|
||||
config: await mergedConfig$.pipe(take(1)).toPromise()
|
||||
})
|
||||
};
|
||||
}
|
||||
|
||||
public async start() {}
|
||||
public start(core: CoreStart) {
|
||||
if (this.currentConfig == null || this.logger == null) {
|
||||
throw new Error('APMPlugin needs to be setup before calling start()');
|
||||
}
|
||||
|
||||
// create agent configuration index without blocking start lifecycle
|
||||
createApmAgentConfigurationIndex({
|
||||
esClient: core.elasticsearch.legacy.client,
|
||||
config: this.currentConfig,
|
||||
logger: this.logger
|
||||
});
|
||||
// create custom action index without blocking start lifecycle
|
||||
createApmCustomLinkIndex({
|
||||
esClient: core.elasticsearch.legacy.client,
|
||||
config: this.currentConfig,
|
||||
logger: this.logger
|
||||
});
|
||||
}
|
||||
|
||||
public stop() {}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ beforeEach(() => {
|
|||
clusterClient = elasticsearchServiceMock.createClusterClient();
|
||||
clusterClientAdapter = new ClusterClientAdapter({
|
||||
logger,
|
||||
clusterClient,
|
||||
clusterClientPromise: Promise.resolve(clusterClient),
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@ export type IClusterClientAdapter = PublicMethodsOf<ClusterClientAdapter>;
|
|||
|
||||
export interface ConstructorOpts {
|
||||
logger: Logger;
|
||||
clusterClient: EsClusterClient;
|
||||
clusterClientPromise: Promise<EsClusterClient>;
|
||||
}
|
||||
|
||||
export interface QueryEventsBySavedObjectResult {
|
||||
|
@ -26,11 +26,11 @@ export interface QueryEventsBySavedObjectResult {
|
|||
|
||||
export class ClusterClientAdapter {
|
||||
private readonly logger: Logger;
|
||||
private readonly clusterClient: EsClusterClient;
|
||||
private readonly clusterClientPromise: Promise<EsClusterClient>;
|
||||
|
||||
constructor(opts: ConstructorOpts) {
|
||||
this.logger = opts.logger;
|
||||
this.clusterClient = opts.clusterClient;
|
||||
this.clusterClientPromise = opts.clusterClientPromise;
|
||||
}
|
||||
|
||||
public async indexDocument(doc: any): Promise<void> {
|
||||
|
@ -201,7 +201,8 @@ export class ClusterClientAdapter {
|
|||
private async callEs(operation: string, body?: any): Promise<any> {
|
||||
try {
|
||||
this.debug(`callEs(${operation}) calls:`, body);
|
||||
const result = await this.clusterClient.callAsInternalUser(operation, body);
|
||||
const clusterClient = await this.clusterClientPromise;
|
||||
const result = await clusterClient.callAsInternalUser(operation, body);
|
||||
this.debug(`callEs(${operation}) result:`, result);
|
||||
return result;
|
||||
} catch (err) {
|
||||
|
|
|
@ -32,7 +32,7 @@ export function createEsContext(params: EsContextCtorParams): EsContext {
|
|||
|
||||
export interface EsContextCtorParams {
|
||||
logger: Logger;
|
||||
clusterClient: EsClusterClient;
|
||||
clusterClientPromise: Promise<EsClusterClient>;
|
||||
indexNameRoot: string;
|
||||
}
|
||||
|
||||
|
@ -50,7 +50,7 @@ class EsContextImpl implements EsContext {
|
|||
this.initialized = false;
|
||||
this.esAdapter = new ClusterClientAdapter({
|
||||
logger: params.logger,
|
||||
clusterClient: params.clusterClient,
|
||||
clusterClientPromise: params.clusterClientPromise,
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -66,7 +66,9 @@ export class Plugin implements CorePlugin<IEventLogService, IEventLogClientServi
|
|||
logger: this.systemLogger,
|
||||
// TODO: get index prefix from config.get(kibana.index)
|
||||
indexNameRoot: kibanaIndex,
|
||||
clusterClient: core.elasticsearch.adminClient,
|
||||
clusterClientPromise: core
|
||||
.getStartServices()
|
||||
.then(([{ elasticsearch }]) => elasticsearch.legacy.client),
|
||||
});
|
||||
|
||||
this.eventLogService = new EventLogService({
|
||||
|
|
|
@ -184,7 +184,10 @@ export function telemetryTaskRunner(
|
|||
) {
|
||||
return ({ taskInstance }: RunContext) => {
|
||||
const { state } = taskInstance;
|
||||
const callCluster = core.elasticsearch.adminClient.callAsInternalUser;
|
||||
const callCluster = async (...args: Parameters<APICaller>) => {
|
||||
const [coreStart] = await core.getStartServices();
|
||||
return coreStart.elasticsearch.legacy.client.callAsInternalUser(...args);
|
||||
};
|
||||
|
||||
return {
|
||||
async run() {
|
||||
|
|
|
@ -17,12 +17,12 @@ import {
|
|||
export function registerTasks({
|
||||
taskManager,
|
||||
logger,
|
||||
elasticsearch,
|
||||
getStartServices,
|
||||
config,
|
||||
}: {
|
||||
taskManager?: TaskManagerSetupContract;
|
||||
logger: Logger;
|
||||
elasticsearch: CoreSetup['elasticsearch'];
|
||||
getStartServices: CoreSetup['getStartServices'];
|
||||
config: Observable<{ kibana: { index: string } }>;
|
||||
}) {
|
||||
if (!taskManager) {
|
||||
|
@ -30,13 +30,17 @@ export function registerTasks({
|
|||
return;
|
||||
}
|
||||
|
||||
const esClientPromise = getStartServices().then(
|
||||
([{ elasticsearch }]) => elasticsearch.legacy.client
|
||||
);
|
||||
|
||||
taskManager.registerTaskDefinitions({
|
||||
[VIS_TELEMETRY_TASK]: {
|
||||
title: 'X-Pack telemetry calculator for Visualizations',
|
||||
type: VIS_TELEMETRY_TASK,
|
||||
createTaskRunner({ taskInstance }: { taskInstance: TaskInstance }) {
|
||||
return {
|
||||
run: visualizationsTaskRunner(taskInstance, config, elasticsearch),
|
||||
run: visualizationsTaskRunner(taskInstance, config, esClientPromise),
|
||||
};
|
||||
},
|
||||
},
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
|
||||
import { Observable } from 'rxjs';
|
||||
import _, { countBy, groupBy, mapValues } from 'lodash';
|
||||
import { APICaller, CoreSetup } from 'kibana/server';
|
||||
import { APICaller, IClusterClient } from 'src/core/server';
|
||||
import { getNextMidnight } from '../../get_next_midnight';
|
||||
import { TaskInstance } from '../../../../../task_manager/server';
|
||||
import { ESSearchHit } from '../../../../../apm/typings/elasticsearch';
|
||||
|
@ -73,17 +73,15 @@ async function getStats(callCluster: APICaller, index: string) {
|
|||
export function visualizationsTaskRunner(
|
||||
taskInstance: TaskInstance,
|
||||
config: Observable<{ kibana: { index: string } }>,
|
||||
es: CoreSetup['elasticsearch']
|
||||
esClientPromise: Promise<IClusterClient>
|
||||
) {
|
||||
const { callAsInternalUser: callCluster } = es.createClient('data');
|
||||
|
||||
return async () => {
|
||||
let stats;
|
||||
let error;
|
||||
|
||||
try {
|
||||
const index = (await config.toPromise()).kibana.index;
|
||||
stats = await getStats(callCluster, index);
|
||||
stats = await getStats((await esClientPromise).callAsInternalUser, index);
|
||||
} catch (err) {
|
||||
if (err.constructor === Error) {
|
||||
error = err.message;
|
||||
|
|
|
@ -35,7 +35,7 @@ export class OssTelemetryPlugin implements Plugin {
|
|||
registerTasks({
|
||||
taskManager: deps.taskManager,
|
||||
logger: this.logger,
|
||||
elasticsearch: core.elasticsearch,
|
||||
getStartServices: core.getStartServices,
|
||||
config: this.config,
|
||||
});
|
||||
registerCollectors(
|
||||
|
|
|
@ -4,9 +4,10 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { APICaller, CoreSetup } from 'kibana/server';
|
||||
import { APICaller } from 'kibana/server';
|
||||
|
||||
import { of } from 'rxjs';
|
||||
import { elasticsearchServiceMock } from '../../../../../src/core/server/mocks';
|
||||
import {
|
||||
ConcreteTaskInstance,
|
||||
TaskStatus,
|
||||
|
@ -43,10 +44,11 @@ const defaultMockSavedObjects = [
|
|||
|
||||
const defaultMockTaskDocs = [getMockTaskInstance()];
|
||||
|
||||
export const getMockEs = (mockCallWithInternal: APICaller = getMockCallWithInternal()) =>
|
||||
(({
|
||||
createClient: () => ({ callAsInternalUser: mockCallWithInternal }),
|
||||
} as unknown) as CoreSetup['elasticsearch']);
|
||||
export const getMockEs = async (mockCallWithInternal: APICaller = getMockCallWithInternal()) => {
|
||||
const client = elasticsearchServiceMock.createClusterClient();
|
||||
(client.callAsInternalUser as any) = mockCallWithInternal;
|
||||
return client;
|
||||
};
|
||||
|
||||
export const getMockCallWithInternal = (hits: unknown[] = defaultMockSavedObjects): APICaller => {
|
||||
return ((() => {
|
||||
|
|
|
@ -29,15 +29,9 @@ export class RemoteClustersServerPlugin implements Plugin<void, void, any, any>
|
|||
this.licenseStatus = { valid: false };
|
||||
}
|
||||
|
||||
async setup(
|
||||
{ http, elasticsearch: elasticsearchService }: CoreSetup,
|
||||
{ licensing, cloud }: Dependencies
|
||||
) {
|
||||
const elasticsearch = await elasticsearchService.adminClient;
|
||||
async setup({ http }: CoreSetup, { licensing, cloud }: Dependencies) {
|
||||
const router = http.createRouter();
|
||||
const routeDependencies: RouteDependencies = {
|
||||
elasticsearch,
|
||||
elasticsearchService,
|
||||
router,
|
||||
getLicenseStatus: () => this.licenseStatus,
|
||||
config: {
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { IRouter, ElasticsearchServiceSetup, IClusterClient } from 'kibana/server';
|
||||
import { IRouter } from 'kibana/server';
|
||||
import { LicensingPluginSetup } from '../../licensing/server';
|
||||
import { CloudSetup } from '../../cloud/server';
|
||||
|
||||
|
@ -16,8 +16,6 @@ export interface Dependencies {
|
|||
export interface RouteDependencies {
|
||||
router: IRouter;
|
||||
getLicenseStatus: () => LicenseStatus;
|
||||
elasticsearchService: ElasticsearchServiceSetup;
|
||||
elasticsearch: IClusterClient;
|
||||
config: {
|
||||
isCloudEnabled: boolean;
|
||||
};
|
||||
|
|
|
@ -38,17 +38,16 @@ export class TaskManagerPlugin
|
|||
public setup(core: CoreSetup, plugins: any): TaskManagerSetupContract {
|
||||
const logger = this.initContext.logger.get('taskManager');
|
||||
const config$ = this.initContext.config.create<TaskManagerConfig>();
|
||||
const elasticsearch = core.elasticsearch.adminClient;
|
||||
return {
|
||||
registerLegacyAPI: once((__LEGACY: PluginLegacyDependencies) => {
|
||||
config$.subscribe(async config => {
|
||||
const [{ savedObjects }] = await core.getStartServices();
|
||||
const [{ savedObjects, elasticsearch }] = await core.getStartServices();
|
||||
const savedObjectsRepository = savedObjects.createInternalRepository(['task']);
|
||||
this.legacyTaskManager$.next(
|
||||
createTaskManager(core, {
|
||||
logger,
|
||||
config,
|
||||
elasticsearch,
|
||||
elasticsearch: elasticsearch.legacy.client,
|
||||
savedObjectsRepository,
|
||||
savedObjectsSerializer: savedObjects.createSerializer(),
|
||||
})
|
||||
|
|
|
@ -58,7 +58,7 @@ describe('Upgrade Assistant Usage Collector', () => {
|
|||
}),
|
||||
},
|
||||
elasticsearch: {
|
||||
adminClient: clusterClient,
|
||||
legacy: { client: clusterClient },
|
||||
},
|
||||
};
|
||||
});
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
import { set } from 'lodash';
|
||||
import {
|
||||
APICaller,
|
||||
ElasticsearchServiceSetup,
|
||||
ElasticsearchServiceStart,
|
||||
ISavedObjectsRepository,
|
||||
SavedObjectsServiceStart,
|
||||
} from 'src/core/server';
|
||||
|
@ -51,7 +51,7 @@ async function getDeprecationLoggingStatusValue(callAsCurrentUser: APICaller): P
|
|||
}
|
||||
|
||||
export async function fetchUpgradeAssistantMetrics(
|
||||
{ adminClient }: ElasticsearchServiceSetup,
|
||||
{ legacy: { client: esClient } }: ElasticsearchServiceStart,
|
||||
savedObjects: SavedObjectsServiceStart
|
||||
): Promise<UpgradeAssistantTelemetry> {
|
||||
const savedObjectsRepository = savedObjects.createInternalRepository();
|
||||
|
@ -60,7 +60,7 @@ export async function fetchUpgradeAssistantMetrics(
|
|||
UPGRADE_ASSISTANT_TYPE,
|
||||
UPGRADE_ASSISTANT_DOC_ID
|
||||
);
|
||||
const callAsInternalUser = adminClient.callAsInternalUser.bind(adminClient);
|
||||
const callAsInternalUser = esClient.callAsInternalUser.bind(esClient);
|
||||
const deprecationLoggingStatusValue = await getDeprecationLoggingStatusValue(callAsInternalUser);
|
||||
|
||||
const getTelemetrySavedObject = (
|
||||
|
@ -107,7 +107,7 @@ export async function fetchUpgradeAssistantMetrics(
|
|||
}
|
||||
|
||||
interface Dependencies {
|
||||
elasticsearch: ElasticsearchServiceSetup;
|
||||
elasticsearch: ElasticsearchServiceStart;
|
||||
savedObjects: SavedObjectsServiceStart;
|
||||
usageCollection: UsageCollectionSetup;
|
||||
}
|
||||
|
|
|
@ -11,7 +11,6 @@ import {
|
|||
CoreStart,
|
||||
PluginInitializerContext,
|
||||
Logger,
|
||||
ElasticsearchServiceSetup,
|
||||
SavedObjectsClient,
|
||||
SavedObjectsServiceStart,
|
||||
} from '../../../../src/core/server';
|
||||
|
@ -40,7 +39,6 @@ export class UpgradeAssistantServerPlugin implements Plugin {
|
|||
|
||||
// Properties set at setup
|
||||
private licensing?: LicensingPluginSetup;
|
||||
private elasticSearchService?: ElasticsearchServiceSetup;
|
||||
|
||||
// Properties set at start
|
||||
private savedObjectsServiceStart?: SavedObjectsServiceStart;
|
||||
|
@ -59,10 +57,9 @@ export class UpgradeAssistantServerPlugin implements Plugin {
|
|||
}
|
||||
|
||||
setup(
|
||||
{ http, elasticsearch, getStartServices, capabilities }: CoreSetup,
|
||||
{ http, getStartServices, capabilities }: CoreSetup,
|
||||
{ usageCollection, cloud, licensing }: PluginsSetup
|
||||
) {
|
||||
this.elasticSearchService = elasticsearch;
|
||||
this.licensing = licensing;
|
||||
|
||||
const router = http.createRouter();
|
||||
|
@ -88,13 +85,13 @@ export class UpgradeAssistantServerPlugin implements Plugin {
|
|||
registerTelemetryRoutes(dependencies);
|
||||
|
||||
if (usageCollection) {
|
||||
getStartServices().then(([{ savedObjects }]) => {
|
||||
getStartServices().then(([{ savedObjects, elasticsearch }]) => {
|
||||
registerUpgradeAssistantUsageCollector({ elasticsearch, usageCollection, savedObjects });
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
start({ savedObjects }: CoreStart) {
|
||||
start({ savedObjects, elasticsearch }: CoreStart) {
|
||||
this.savedObjectsServiceStart = savedObjects;
|
||||
|
||||
// The ReindexWorker uses a map of request headers that contain the authentication credentials
|
||||
|
@ -107,7 +104,7 @@ export class UpgradeAssistantServerPlugin implements Plugin {
|
|||
this.worker = createReindexWorker({
|
||||
credentialStore: this.credentialStore,
|
||||
licensing: this.licensing!,
|
||||
elasticsearchService: this.elasticSearchService!,
|
||||
elasticsearchService: elasticsearch,
|
||||
logger: this.logger,
|
||||
savedObjects: new SavedObjectsClient(
|
||||
this.savedObjectsServiceStart.createInternalRepository()
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
*/
|
||||
import { schema } from '@kbn/config-schema';
|
||||
import {
|
||||
ElasticsearchServiceSetup,
|
||||
ElasticsearchServiceStart,
|
||||
kibanaResponseFactory,
|
||||
Logger,
|
||||
SavedObjectsClient,
|
||||
|
@ -38,7 +38,7 @@ import { GetBatchQueueResponse, PostBatchResponse } from './types';
|
|||
|
||||
interface CreateReindexWorker {
|
||||
logger: Logger;
|
||||
elasticsearchService: ElasticsearchServiceSetup;
|
||||
elasticsearchService: ElasticsearchServiceStart;
|
||||
credentialStore: CredentialStore;
|
||||
savedObjects: SavedObjectsClient;
|
||||
licensing: LicensingPluginSetup;
|
||||
|
@ -51,8 +51,8 @@ export function createReindexWorker({
|
|||
savedObjects,
|
||||
licensing,
|
||||
}: CreateReindexWorker) {
|
||||
const { adminClient } = elasticsearchService;
|
||||
return new ReindexWorker(savedObjects, credentialStore, adminClient, logger, licensing);
|
||||
const esClient = elasticsearchService.legacy.client;
|
||||
return new ReindexWorker(savedObjects, credentialStore, esClient, logger, licensing);
|
||||
}
|
||||
|
||||
const mapAnyErrorToKibanaHttpResponse = (e: any) => {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue