mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 01:13:23 -04:00
migrate logstash plugin to new ES client (#98064)
* migrate logstash plugin to new ES client * handle 404s * use default ES client
This commit is contained in:
parent
26962cd705
commit
328febabd3
11 changed files with 56 additions and 80 deletions
|
@ -5,6 +5,7 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import { estypes } from '@elastic/elasticsearch';
|
||||
import { Cluster } from './cluster';
|
||||
|
||||
describe('cluster', () => {
|
||||
|
@ -12,7 +13,7 @@ describe('cluster', () => {
|
|||
describe('fromUpstreamJSON factory method', () => {
|
||||
const upstreamJSON = {
|
||||
cluster_uuid: 'S-S4NNZDRV-g9c-JrIhx6A',
|
||||
};
|
||||
} as estypes.RootNodeInfoResponse;
|
||||
|
||||
it('returns correct Cluster instance', () => {
|
||||
const cluster = Cluster.fromUpstreamJSON(upstreamJSON);
|
||||
|
|
|
@ -5,28 +5,27 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import { get } from 'lodash';
|
||||
import { estypes } from '@elastic/elasticsearch';
|
||||
|
||||
/**
|
||||
* This model deals with a cluster object from ES and converts it to Kibana downstream
|
||||
*/
|
||||
export class Cluster {
|
||||
public readonly uuid: string;
|
||||
|
||||
constructor({ uuid }: { uuid: string }) {
|
||||
this.uuid = uuid;
|
||||
}
|
||||
|
||||
public get downstreamJSON() {
|
||||
const json = {
|
||||
return {
|
||||
uuid: this.uuid,
|
||||
};
|
||||
|
||||
return json;
|
||||
}
|
||||
|
||||
// generate Pipeline object from elasticsearch response
|
||||
static fromUpstreamJSON(upstreamCluster: Record<string, string>) {
|
||||
const uuid = get(upstreamCluster, 'cluster_uuid') as string;
|
||||
static fromUpstreamJSON(upstreamCluster: estypes.RootNodeInfoResponse) {
|
||||
const uuid = upstreamCluster.cluster_uuid;
|
||||
return new Cluster({ uuid });
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,20 +5,11 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import {
|
||||
CoreSetup,
|
||||
CoreStart,
|
||||
ILegacyCustomClusterClient,
|
||||
Logger,
|
||||
Plugin,
|
||||
PluginInitializerContext,
|
||||
} from 'src/core/server';
|
||||
import { CoreSetup, CoreStart, Logger, Plugin, PluginInitializerContext } from 'src/core/server';
|
||||
import { LicensingPluginSetup } from '../../licensing/server';
|
||||
import { PluginSetupContract as FeaturesPluginSetup } from '../../features/server';
|
||||
import { SecurityPluginSetup } from '../../security/server';
|
||||
|
||||
import { registerRoutes } from './routes';
|
||||
import type { LogstashRequestHandlerContext } from './types';
|
||||
|
||||
interface SetupDeps {
|
||||
licensing: LicensingPluginSetup;
|
||||
|
@ -28,8 +19,7 @@ interface SetupDeps {
|
|||
|
||||
export class LogstashPlugin implements Plugin {
|
||||
private readonly logger: Logger;
|
||||
private esClient?: ILegacyCustomClusterClient;
|
||||
private coreSetup?: CoreSetup;
|
||||
|
||||
constructor(context: PluginInitializerContext) {
|
||||
this.logger = context.logger.get();
|
||||
}
|
||||
|
@ -37,7 +27,6 @@ export class LogstashPlugin implements Plugin {
|
|||
setup(core: CoreSetup, deps: SetupDeps) {
|
||||
this.logger.debug('Setting up Logstash plugin');
|
||||
|
||||
this.coreSetup = core;
|
||||
registerRoutes(core.http.createRouter(), deps.security);
|
||||
|
||||
deps.features.registerElasticsearchFeature({
|
||||
|
@ -55,19 +44,5 @@ export class LogstashPlugin implements Plugin {
|
|||
});
|
||||
}
|
||||
|
||||
start(core: CoreStart) {
|
||||
const esClient = core.elasticsearch.legacy.createClient('logstash');
|
||||
|
||||
this.coreSetup!.http.registerRouteHandlerContext<LogstashRequestHandlerContext, 'logstash'>(
|
||||
'logstash',
|
||||
async (context, request) => {
|
||||
return { esClient: esClient.asScoped(request) };
|
||||
}
|
||||
);
|
||||
}
|
||||
stop() {
|
||||
if (this.esClient) {
|
||||
this.esClient.close();
|
||||
}
|
||||
}
|
||||
start(core: CoreStart) {}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,8 @@ export function registerClusterLoadRoute(router: LogstashPluginRouter) {
|
|||
},
|
||||
wrapRouteWithLicenseCheck(checkLicense, async (context, request, response) => {
|
||||
try {
|
||||
const client = context.logstash!.esClient;
|
||||
const info = await client.callAsCurrentUser('info');
|
||||
const { client } = context.core.elasticsearch;
|
||||
const { body: info } = await client.asCurrentUser.info();
|
||||
return response.ok({
|
||||
body: {
|
||||
cluster: Cluster.fromUpstreamJSON(info).downstreamJSON,
|
||||
|
|
|
@ -23,14 +23,18 @@ export function registerPipelineDeleteRoute(router: LogstashPluginRouter) {
|
|||
wrapRouteWithLicenseCheck(
|
||||
checkLicense,
|
||||
router.handleLegacyErrors(async (context, request, response) => {
|
||||
const client = context.logstash!.esClient;
|
||||
const { id } = request.params;
|
||||
const { client } = context.core.elasticsearch;
|
||||
|
||||
await client.callAsCurrentUser('transport.request', {
|
||||
path: '/_logstash/pipeline/' + encodeURIComponent(request.params.id),
|
||||
method: 'DELETE',
|
||||
});
|
||||
|
||||
return response.noContent();
|
||||
try {
|
||||
await client.asCurrentUser.logstash.deletePipeline({ id });
|
||||
return response.noContent();
|
||||
} catch (e) {
|
||||
if (e.statusCode === 404) {
|
||||
return response.notFound();
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
})
|
||||
)
|
||||
);
|
||||
|
|
|
@ -25,13 +25,13 @@ export function registerPipelineLoadRoute(router: LogstashPluginRouter) {
|
|||
wrapRouteWithLicenseCheck(
|
||||
checkLicense,
|
||||
router.handleLegacyErrors(async (context, request, response) => {
|
||||
const client = context.logstash!.esClient;
|
||||
const { id } = request.params;
|
||||
const { client } = context.core.elasticsearch;
|
||||
|
||||
const result = await client.callAsCurrentUser('transport.request', {
|
||||
path: '/_logstash/pipeline/' + encodeURIComponent(request.params.id),
|
||||
method: 'GET',
|
||||
ignore: [404],
|
||||
});
|
||||
const { body: result } = await client.asCurrentUser.logstash.getPipeline(
|
||||
{ id },
|
||||
{ ignore: [404] }
|
||||
);
|
||||
|
||||
if (result[request.params.id] === undefined) {
|
||||
return response.notFound();
|
||||
|
|
|
@ -42,12 +42,11 @@ export function registerPipelineSaveRoute(
|
|||
username = user?.username;
|
||||
}
|
||||
|
||||
const client = context.logstash!.esClient;
|
||||
const { client } = context.core.elasticsearch;
|
||||
const pipeline = Pipeline.fromDownstreamJSON(request.body, request.params.id, username);
|
||||
|
||||
await client.callAsCurrentUser('transport.request', {
|
||||
path: '/_logstash/pipeline/' + encodeURIComponent(pipeline.id),
|
||||
method: 'PUT',
|
||||
await client.asCurrentUser.logstash.putPipeline({
|
||||
id: pipeline.id,
|
||||
body: pipeline.upstreamJSON,
|
||||
});
|
||||
|
||||
|
|
|
@ -6,19 +6,19 @@
|
|||
*/
|
||||
|
||||
import { schema } from '@kbn/config-schema';
|
||||
import { LegacyAPICaller } from 'src/core/server';
|
||||
import { ElasticsearchClient } from 'src/core/server';
|
||||
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
|
||||
|
||||
import { checkLicense } from '../../lib/check_license';
|
||||
import type { LogstashPluginRouter } from '../../types';
|
||||
|
||||
async function deletePipelines(callWithRequest: LegacyAPICaller, pipelineIds: string[]) {
|
||||
async function deletePipelines(client: ElasticsearchClient, pipelineIds: string[]) {
|
||||
const deletePromises = pipelineIds.map((pipelineId) => {
|
||||
return callWithRequest('transport.request', {
|
||||
path: '/_logstash/pipeline/' + encodeURIComponent(pipelineId),
|
||||
method: 'DELETE',
|
||||
})
|
||||
.then((success) => ({ success }))
|
||||
return client.logstash
|
||||
.deletePipeline({
|
||||
id: pipelineId,
|
||||
})
|
||||
.then((response) => ({ success: response.body }))
|
||||
.catch((error) => ({ error }));
|
||||
});
|
||||
|
||||
|
@ -45,8 +45,8 @@ export function registerPipelinesDeleteRoute(router: LogstashPluginRouter) {
|
|||
wrapRouteWithLicenseCheck(
|
||||
checkLicense,
|
||||
router.handleLegacyErrors(async (context, request, response) => {
|
||||
const client = context.logstash.esClient;
|
||||
const results = await deletePipelines(client.callAsCurrentUser, request.body.pipelineIds);
|
||||
const client = context.core.elasticsearch.client.asCurrentUser;
|
||||
const results = await deletePipelines(client, request.body.pipelineIds);
|
||||
|
||||
return response.ok({ body: { results } });
|
||||
})
|
||||
|
|
|
@ -6,21 +6,22 @@
|
|||
*/
|
||||
|
||||
import { i18n } from '@kbn/i18n';
|
||||
import { LegacyAPICaller } from 'src/core/server';
|
||||
import { ElasticsearchClient } from 'src/core/server';
|
||||
import type { LogstashPluginRouter } from '../../types';
|
||||
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
|
||||
|
||||
import { PipelineListItem } from '../../models/pipeline_list_item';
|
||||
import { checkLicense } from '../../lib/check_license';
|
||||
|
||||
async function fetchPipelines(callWithRequest: LegacyAPICaller) {
|
||||
const params = {
|
||||
path: '/_logstash/pipeline',
|
||||
method: 'GET',
|
||||
ignore: [404],
|
||||
};
|
||||
|
||||
return await callWithRequest('transport.request', params);
|
||||
async function fetchPipelines(client: ElasticsearchClient) {
|
||||
const { body } = await client.transport.request(
|
||||
{
|
||||
method: 'GET',
|
||||
path: '/_logstash/pipeline',
|
||||
},
|
||||
{ ignore: [404] }
|
||||
);
|
||||
return body;
|
||||
}
|
||||
|
||||
export function registerPipelinesListRoute(router: LogstashPluginRouter) {
|
||||
|
@ -33,8 +34,8 @@ export function registerPipelinesListRoute(router: LogstashPluginRouter) {
|
|||
checkLicense,
|
||||
router.handleLegacyErrors(async (context, request, response) => {
|
||||
try {
|
||||
const client = context.logstash!.esClient;
|
||||
const pipelinesRecord = (await fetchPipelines(client.callAsCurrentUser)) as Record<
|
||||
const { client } = context.core.elasticsearch;
|
||||
const pipelinesRecord = (await fetchPipelines(client.asCurrentUser)) as Record<
|
||||
string,
|
||||
any
|
||||
>;
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { ILegacyScopedClusterClient, IRouter, RequestHandlerContext } from 'src/core/server';
|
||||
import type { IRouter, RequestHandlerContext } from 'src/core/server';
|
||||
import type { LicensingApiRequestHandlerContext } from '../../licensing/server';
|
||||
|
||||
export interface PipelineListItemOptions {
|
||||
|
@ -19,9 +19,6 @@ export interface PipelineListItemOptions {
|
|||
* @internal
|
||||
*/
|
||||
export interface LogstashRequestHandlerContext extends RequestHandlerContext {
|
||||
logstash: {
|
||||
esClient: ILegacyScopedClusterClient;
|
||||
};
|
||||
licensing: LicensingApiRequestHandlerContext;
|
||||
}
|
||||
|
||||
|
|
|
@ -10,13 +10,13 @@ import { FtrProviderContext } from '../../../ftr_provider_context';
|
|||
|
||||
export default function ({ getService }: FtrProviderContext) {
|
||||
const supertest = getService('supertest');
|
||||
const es = getService('legacyEs');
|
||||
const es = getService('es');
|
||||
|
||||
describe('load', () => {
|
||||
it('should return the ES cluster info', async () => {
|
||||
const { body } = await supertest.get('/api/logstash/cluster').expect(200);
|
||||
|
||||
const responseFromES = await es.info();
|
||||
const { body: responseFromES } = await es.info();
|
||||
expect(body.cluster.uuid).to.eql(responseFromES.cluster_uuid);
|
||||
});
|
||||
});
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue