[NP] Migrate logstash server side code to NP (#63135)

* convert api_integration test into TS

* create logstash NP plugin and move models

* move common/constants to NP

* type fetch all from scroll

* move route declaration to NP

* add licence checker wrapper

* register logstash route handlers in NP

* track logstash NP i18n

* address shaunak comment

* fix validation

* udpdate security tests since for new mock defaults

* address Pierres comments

* rename upgrade file route
This commit is contained in:
Mikhail Shustov 2020-04-14 17:45:40 +02:00 committed by GitHub
parent 4ce2412316
commit d015c24509
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
91 changed files with 959 additions and 873 deletions

View file

@ -24,7 +24,7 @@
"xpack.lens": "legacy/plugins/lens",
"xpack.licenseMgmt": "plugins/license_management",
"xpack.licensing": "plugins/licensing",
"xpack.logstash": "legacy/plugins/logstash",
"xpack.logstash": ["plugins/logstash", "legacy/plugins/logstash"],
"xpack.main": "legacy/plugins/xpack_main",
"xpack.maps": ["plugins/maps", "legacy/plugins/maps"],
"xpack.ml": ["plugins/ml", "legacy/plugins/ml"],

View file

@ -5,12 +5,8 @@
*/
import { resolve } from 'path';
import { registerLogstashPipelinesRoutes } from './server/routes/api/pipelines';
import { registerLogstashPipelineRoutes } from './server/routes/api/pipeline';
import { registerLogstashUpgradeRoutes } from './server/routes/api/upgrade';
import { registerLogstashClusterRoutes } from './server/routes/api/cluster';
import { registerLicenseChecker } from './server/lib/register_license_checker';
import { PLUGIN } from './common/constants';
import { PLUGIN } from '../../../plugins/logstash/common/constants';
export const logstash = kibana =>
new kibana.Plugin({
@ -32,9 +28,5 @@ export const logstash = kibana =>
},
init: server => {
registerLicenseChecker(server);
registerLogstashPipelinesRoutes(server);
registerLogstashPipelineRoutes(server);
registerLogstashUpgradeRoutes(server);
registerLogstashClusterRoutes(server);
},
});

View file

@ -13,7 +13,7 @@ import 'brace/mode/plain_text';
import 'brace/theme/github';
import { isEmpty } from 'lodash';
import { TOOLTIPS } from '../../../common/constants/tooltips';
import { TOOLTIPS } from '../../../../../../plugins/logstash/common/constants/tooltips';
import {
EuiButton,
EuiButtonEmpty,

View file

@ -10,7 +10,7 @@ import { npSetup } from 'ui/new_platform';
import { xpackInfo } from 'plugins/xpack_main/services/xpack_info';
import { FeatureCatalogueCategory } from '../../../../../../src/plugins/home/public';
// @ts-ignore
import { PLUGIN } from '../../common/constants';
import { PLUGIN } from '../../../../../plugins/logstash/common/constants';
const {
plugins: { home },

View file

@ -8,7 +8,7 @@ import { pick, capitalize } from 'lodash';
import { getSearchValue } from 'plugins/logstash/lib/get_search_value';
import { getMoment } from 'plugins/logstash/../common/lib/get_moment';
import { PIPELINE } from '../../../common/constants';
import { PIPELINE } from '../../../../../../plugins/logstash/common/constants';
/**
* Represents the model for listing pipelines in the UI

View file

@ -5,7 +5,7 @@
*/
import chrome from 'ui/chrome';
import { ROUTES } from '../../../common/constants';
import { ROUTES } from '../../../../../../plugins/logstash/common/constants';
import { Cluster } from 'plugins/logstash/models/cluster';
export class ClusterService {

View file

@ -7,7 +7,7 @@
import React from 'react';
import { toastNotifications } from 'ui/notify';
import { MarkdownSimple } from '../../../../../../../src/plugins/kibana_react/public';
import { PLUGIN } from '../../../common/constants';
import { PLUGIN } from '../../../../../../plugins/logstash/common/constants';
export class LogstashLicenseService {
constructor(xpackInfoService, kbnUrlService, $timeout) {

View file

@ -6,7 +6,7 @@
import moment from 'moment';
import chrome from 'ui/chrome';
import { ROUTES, MONITORING } from '../../../common/constants';
import { ROUTES, MONITORING } from '../../../../../../plugins/logstash/common/constants';
import { PipelineListItem } from 'plugins/logstash/models/pipeline_list_item';
export class MonitoringService {

View file

@ -5,7 +5,7 @@
*/
import chrome from 'ui/chrome';
import { ROUTES } from '../../../common/constants';
import { ROUTES } from '../../../../../../plugins/logstash/common/constants';
import { Pipeline } from 'plugins/logstash/models/pipeline';
export class PipelineService {

View file

@ -5,7 +5,7 @@
*/
import chrome from 'ui/chrome';
import { ROUTES, MONITORING } from '../../../common/constants';
import { ROUTES, MONITORING } from '../../../../../../plugins/logstash/common/constants';
import { PipelineListItem } from 'plugins/logstash/models/pipeline_list_item';
const RECENTLY_DELETED_PIPELINE_IDS_STORAGE_KEY = 'xpack.logstash.recentlyDeletedPipelines';

View file

@ -5,7 +5,7 @@
*/
import chrome from 'ui/chrome';
import { ROUTES } from '../../../common/constants';
import { ROUTES } from '../../../../../../plugins/logstash/common/constants';
export class UpgradeService {
constructor($http) {

View file

@ -1,18 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { once } from 'lodash';
const callWithRequest = once(server => {
const cluster = server.plugins.elasticsearch.createCluster('logstash');
return cluster.callWithRequest;
});
export const callWithRequestFactory = (server, request) => {
return (...args) => {
return callWithRequest(server)(request, ...args);
};
};

View file

@ -1,21 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import expect from '@kbn/expect';
import { wrapCustomError } from '../wrap_custom_error';
describe('wrap_custom_error', () => {
describe('#wrapCustomError', () => {
it('should return a Boom object', () => {
const originalError = new Error('I am an error');
const statusCode = 404;
const wrappedError = wrapCustomError(originalError, statusCode);
expect(wrappedError.isBoom).to.be(true);
expect(wrappedError.output.statusCode).to.equal(statusCode);
});
});
});

View file

@ -1,42 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import expect from '@kbn/expect';
import { wrapEsError } from '../wrap_es_error';
describe('wrap_es_error', () => {
describe('#wrapEsError', () => {
let originalError;
beforeEach(() => {
originalError = new Error('I am an error');
originalError.statusCode = 404;
});
it('should return a Boom object', () => {
const wrappedError = wrapEsError(originalError);
expect(wrappedError.isBoom).to.be(true);
});
it('should return the correct Boom object', () => {
const wrappedError = wrapEsError(originalError);
expect(wrappedError.output.statusCode).to.be(originalError.statusCode);
expect(wrappedError.output.payload.message).to.be(originalError.message);
});
it('should return invalid permissions message for 403 errors', () => {
const securityError = new Error('I am an error');
securityError.statusCode = 403;
const wrappedError = wrapEsError(securityError);
expect(wrappedError.isBoom).to.be(true);
expect(wrappedError.message).to.be(
'Insufficient user permissions for managing Logstash pipelines'
);
});
});
});

View file

@ -1,19 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import expect from '@kbn/expect';
import { wrapUnknownError } from '../wrap_unknown_error';
describe('wrap_unknown_error', () => {
describe('#wrapUnknownError', () => {
it('should return a Boom object', () => {
const originalError = new Error('I am an error');
const wrappedError = wrapUnknownError(originalError);
expect(wrappedError.isBoom).to.be(true);
});
});
});

View file

@ -1,18 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import Boom from 'boom';
/**
* Wraps a custom error into a Boom error response and returns it
*
* @param err Object error
* @param statusCode Error status code
* @return Object Boom error response
*/
export function wrapCustomError(err, statusCode) {
return Boom.boomify(err, { statusCode });
}

View file

@ -1,27 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import Boom from 'boom';
import { i18n } from '@kbn/i18n';
/**
* Wraps ES errors into a Boom error response and returns it
* This also handles the permissions issue gracefully
*
* @param err Object ES error
* @return Object Boom error response
*/
export function wrapEsError(err) {
const statusCode = err.statusCode;
if (statusCode === 403) {
return Boom.forbidden(
i18n.translate('xpack.logstash.insufficientUserPermissionsDescription', {
defaultMessage: 'Insufficient user permissions for managing Logstash pipelines',
})
);
}
return Boom.boomify(err, { statusCode: err.statusCode });
}

View file

@ -1,17 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import Boom from 'boom';
/**
* Wraps an unknown error into a Boom error response and returns it
*
* @param err Object Unknown error
* @return Object Boom error response
*/
export function wrapUnknownError(err) {
return Boom.boomify(err);
}

View file

@ -1,88 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import expect from '@kbn/expect';
import sinon from 'sinon';
import { fetchAllFromScroll } from '../fetch_all_from_scroll';
import { set } from 'lodash';
describe('fetch_all_from_scroll', () => {
let mockResponse;
let stubCallWithRequest;
beforeEach(() => {
mockResponse = {};
stubCallWithRequest = sinon.stub();
stubCallWithRequest.onCall(0).returns(
new Promise(resolve => {
const mockInnerResponse = {
hits: {
hits: ['newhit'],
},
_scroll_id: 'newScrollId',
};
return resolve(mockInnerResponse);
})
);
stubCallWithRequest.onCall(1).returns(
new Promise(resolve => {
const mockInnerResponse = {
hits: {
hits: [],
},
};
return resolve(mockInnerResponse);
})
);
});
describe('#fetchAllFromScroll', () => {
describe('when the passed-in response has no hits', () => {
beforeEach(() => {
set(mockResponse, 'hits.hits', []);
});
it('should return an empty array of hits', () => {
return fetchAllFromScroll(mockResponse).then(hits => {
expect(hits).to.eql([]);
});
});
it('should not call callWithRequest', () => {
return fetchAllFromScroll(mockResponse, stubCallWithRequest).then(() => {
expect(stubCallWithRequest.called).to.be(false);
});
});
});
describe('when the passed-in response has some hits', () => {
beforeEach(() => {
set(mockResponse, 'hits.hits', ['foo', 'bar']);
set(mockResponse, '_scroll_id', 'originalScrollId');
});
it('should return the hits from the response', () => {
return fetchAllFromScroll(mockResponse, stubCallWithRequest).then(hits => {
expect(hits).to.eql(['foo', 'bar', 'newhit']);
});
});
it('should call callWithRequest', () => {
return fetchAllFromScroll(mockResponse, stubCallWithRequest).then(() => {
expect(stubCallWithRequest.calledTwice).to.be(true);
const firstCallWithRequestCallArgs = stubCallWithRequest.args[0];
expect(firstCallWithRequestCallArgs[1].body.scroll_id).to.eql('originalScrollId');
const secondCallWithRequestCallArgs = stubCallWithRequest.args[1];
expect(secondCallWithRequestCallArgs[1].body.scroll_id).to.eql('newScrollId');
});
});
});
});
});

View file

@ -1,28 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { get } from 'lodash';
import { ES_SCROLL_SETTINGS } from '../../../common/constants';
export function fetchAllFromScroll(response, callWithRequest, hits = []) {
const newHits = get(response, 'hits.hits', []);
const scrollId = get(response, '_scroll_id');
if (newHits.length > 0) {
hits.push(...newHits);
return callWithRequest('scroll', {
body: {
scroll: ES_SCROLL_SETTINGS.KEEPALIVE,
scroll_id: scrollId,
},
}).then(innerResponse => {
return fetchAllFromScroll(innerResponse, callWithRequest, hits);
});
}
return Promise.resolve(hits);
}

View file

@ -1,69 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import expect from '@kbn/expect';
import { licensePreRoutingFactory } from '../license_pre_routing_factory';
describe('license_pre_routing_factory', () => {
describe('#logstashFeaturePreRoutingFactory', () => {
let mockServer;
let mockLicenseCheckResults;
beforeEach(() => {
mockServer = {
plugins: {
xpack_main: {
info: {
feature: () => ({
getLicenseCheckResults: () => mockLicenseCheckResults,
}),
},
},
},
};
});
it('only instantiates one instance per server', () => {
const firstInstance = licensePreRoutingFactory(mockServer);
const secondInstance = licensePreRoutingFactory(mockServer);
expect(firstInstance).to.be(secondInstance);
});
describe('isAvailable is false', () => {
beforeEach(() => {
mockLicenseCheckResults = {
isAvailable: false,
};
});
it('replies with 403', () => {
const licensePreRouting = licensePreRoutingFactory(mockServer);
const stubRequest = {};
expect(() => licensePreRouting(stubRequest)).to.throwException(response => {
expect(response).to.be.an(Error);
expect(response.isBoom).to.be(true);
expect(response.output.statusCode).to.be(403);
});
});
});
describe('isAvailable is true', () => {
beforeEach(() => {
mockLicenseCheckResults = {
isAvailable: true,
};
});
it('replies with nothing', () => {
const licensePreRouting = licensePreRoutingFactory(mockServer);
const stubRequest = {};
const response = licensePreRouting(stubRequest);
expect(response).to.be(null);
});
});
});
});

View file

@ -1,27 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { once } from 'lodash';
import { wrapCustomError } from '../error_wrappers';
import { PLUGIN } from '../../../common/constants';
export const licensePreRoutingFactory = once(server => {
const xpackMainPlugin = server.plugins.xpack_main;
// License checking and enable/disable logic
function licensePreRouting() {
const licenseCheckResults = xpackMainPlugin.info.feature(PLUGIN.ID).getLicenseCheckResults();
if (!licenseCheckResults.isAvailable) {
const error = new Error(licenseCheckResults.message);
const statusCode = 403;
throw wrapCustomError(error, statusCode);
}
return null;
}
return licensePreRouting;
});

View file

@ -6,7 +6,7 @@
import { mirrorPluginStatus } from '../../../../../server/lib/mirror_plugin_status';
import { checkLicense } from '../check_license';
import { PLUGIN } from '../../../common/constants';
import { PLUGIN } from '../../../../../../plugins/logstash/common/constants';
export function registerLicenseChecker(server) {
const xpackMainPlugin = server.plugins.xpack_main;

View file

@ -1,11 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { registerLoadRoute } from './register_load_route';
export function registerLogstashClusterRoutes(server) {
registerLoadRoute(server);
}

View file

@ -1,40 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import Boom from 'boom';
import { callWithRequestFactory } from '../../../lib/call_with_request_factory';
import { Cluster } from '../../../models/cluster';
import { licensePreRoutingFactory } from '../../../lib/license_pre_routing_factory';
function fetchCluster(callWithRequest) {
return callWithRequest('info');
}
export function registerLoadRoute(server) {
const licensePreRouting = licensePreRoutingFactory(server);
server.route({
path: '/api/logstash/cluster',
method: 'GET',
handler: (request, h) => {
const callWithRequest = callWithRequestFactory(server, request);
return fetchCluster(callWithRequest)
.then(responseFromES => ({
cluster: Cluster.fromUpstreamJSON(responseFromES).downstreamJSON,
}))
.catch(e => {
if (e.status === 403) {
return h.response();
}
throw Boom.internal(e);
});
},
config: {
pre: [licensePreRouting],
},
});
}

View file

@ -1,38 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { callWithRequestFactory } from '../../../lib/call_with_request_factory';
import { wrapEsError } from '../../../lib/error_wrappers';
import { INDEX_NAMES } from '../../../../common/constants';
import { licensePreRoutingFactory } from '../../../lib/license_pre_routing_factory';
function deletePipeline(callWithRequest, pipelineId) {
return callWithRequest('delete', {
index: INDEX_NAMES.PIPELINES,
id: pipelineId,
refresh: 'wait_for',
});
}
export function registerDeleteRoute(server) {
const licensePreRouting = licensePreRoutingFactory(server);
server.route({
path: '/api/logstash/pipeline/{id}',
method: 'DELETE',
handler: (request, h) => {
const callWithRequest = callWithRequestFactory(server, request);
const pipelineId = request.params.id;
return deletePipeline(callWithRequest, pipelineId)
.then(() => h.response().code(204))
.catch(e => wrapEsError(e));
},
config: {
pre: [licensePreRouting],
},
});
}

View file

@ -1,47 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import Boom from 'boom';
import { INDEX_NAMES } from '../../../../common/constants';
import { callWithRequestFactory } from '../../../lib/call_with_request_factory';
import { Pipeline } from '../../../models/pipeline';
import { licensePreRoutingFactory } from '../../../lib/license_pre_routing_factory';
function fetchPipeline(callWithRequest, pipelineId) {
return callWithRequest('get', {
index: INDEX_NAMES.PIPELINES,
id: pipelineId,
_source: ['description', 'username', 'pipeline', 'pipeline_settings'],
ignore: [404],
});
}
export function registerLoadRoute(server) {
const licensePreRouting = licensePreRoutingFactory(server);
server.route({
path: '/api/logstash/pipeline/{id}',
method: 'GET',
handler: request => {
const callWithRequest = callWithRequestFactory(server, request);
const pipelineId = request.params.id;
return fetchPipeline(callWithRequest, pipelineId)
.then(pipelineResponseFromES => {
if (!pipelineResponseFromES.found) {
throw Boom.notFound();
}
const pipeline = Pipeline.fromUpstreamJSON(pipelineResponseFromES);
return pipeline.downstreamJSON;
})
.catch(e => Boom.boomify(e));
},
config: {
pre: [licensePreRouting],
},
});
}

View file

@ -1,15 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { registerLoadRoute } from './register_load_route';
import { registerDeleteRoute } from './register_delete_route';
import { registerSaveRoute } from './register_save_route';
export function registerLogstashPipelineRoutes(server) {
registerLoadRoute(server);
registerDeleteRoute(server);
registerSaveRoute(server);
}

View file

@ -1,48 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { get } from 'lodash';
import { wrapEsError } from '../../../lib/error_wrappers';
import { INDEX_NAMES } from '../../../../common/constants';
import { callWithRequestFactory } from '../../../lib/call_with_request_factory';
import { Pipeline } from '../../../models/pipeline';
import { licensePreRoutingFactory } from '../../../lib/license_pre_routing_factory';
function savePipeline(callWithRequest, pipelineId, pipelineBody) {
return callWithRequest('index', {
index: INDEX_NAMES.PIPELINES,
id: pipelineId,
body: pipelineBody,
refresh: 'wait_for',
});
}
export function registerSaveRoute(server) {
const licensePreRouting = licensePreRoutingFactory(server);
server.route({
path: '/api/logstash/pipeline/{id}',
method: 'PUT',
handler: async (request, h) => {
let username;
if (server.plugins.security) {
const user = await server.plugins.security.getUser(request);
username = get(user, 'username');
}
const callWithRequest = callWithRequestFactory(server, request);
const pipelineId = request.params.id;
const pipeline = Pipeline.fromDownstreamJSON(request.payload, pipelineId, username);
return savePipeline(callWithRequest, pipeline.id, pipeline.upstreamJSON)
.then(() => h.response().code(204))
.catch(e => wrapEsError(e));
},
config: {
pre: [licensePreRouting],
},
});
}

View file

@ -1,7 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export { registerLogstashPipelinesRoutes } from './register_pipelines_routes';

View file

@ -1,51 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { callWithRequestFactory } from '../../../lib/call_with_request_factory';
import { wrapUnknownError } from '../../../lib/error_wrappers';
import { INDEX_NAMES } from '../../../../common/constants';
import { licensePreRoutingFactory } from '../../../lib/license_pre_routing_factory';
function deletePipelines(callWithRequest, pipelineIds) {
const deletePromises = pipelineIds.map(pipelineId => {
return callWithRequest('delete', {
index: INDEX_NAMES.PIPELINES,
id: pipelineId,
refresh: 'wait_for',
})
.then(success => ({ success }))
.catch(error => ({ error }));
});
return Promise.all(deletePromises).then(results => {
const successes = results.filter(result => Boolean(result.success));
const errors = results.filter(result => Boolean(result.error));
return {
numSuccesses: successes.length,
numErrors: errors.length,
};
});
}
export function registerDeleteRoute(server) {
const licensePreRouting = licensePreRoutingFactory(server);
server.route({
path: '/api/logstash/pipelines/delete',
method: 'POST',
handler: request => {
const callWithRequest = callWithRequestFactory(server, request);
return deletePipelines(callWithRequest, request.payload.pipelineIds)
.then(results => ({ results }))
.catch(err => wrapUnknownError(err));
},
config: {
pre: [licensePreRouting],
},
});
}

View file

@ -1,52 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { wrapEsError } from '../../../lib/error_wrappers';
import { callWithRequestFactory } from '../../../lib/call_with_request_factory';
import { fetchAllFromScroll } from '../../../lib/fetch_all_from_scroll';
import { INDEX_NAMES, ES_SCROLL_SETTINGS } from '../../../../common/constants';
import { PipelineListItem } from '../../../models/pipeline_list_item';
import { licensePreRoutingFactory } from '../../../lib/license_pre_routing_factory';
function fetchPipelines(callWithRequest) {
const params = {
index: INDEX_NAMES.PIPELINES,
scroll: ES_SCROLL_SETTINGS.KEEPALIVE,
body: {
size: ES_SCROLL_SETTINGS.PAGE_SIZE,
},
ignore: [404],
};
return callWithRequest('search', params).then(response =>
fetchAllFromScroll(response, callWithRequest)
);
}
export function registerListRoute(server) {
const licensePreRouting = licensePreRoutingFactory(server);
server.route({
path: '/api/logstash/pipelines',
method: 'GET',
handler: request => {
const callWithRequest = callWithRequestFactory(server, request);
return fetchPipelines(callWithRequest)
.then((pipelinesHits = []) => {
const pipelines = pipelinesHits.map(pipeline => {
return PipelineListItem.fromUpstreamJSON(pipeline).downstreamJSON;
});
return { pipelines };
})
.catch(e => wrapEsError(e));
},
config: {
pre: [licensePreRouting],
},
});
}

View file

@ -1,13 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { registerListRoute } from './register_list_route';
import { registerDeleteRoute } from './register_delete_route';
export function registerLogstashPipelinesRoutes(server) {
registerListRoute(server);
registerDeleteRoute(server);
}

View file

@ -1,7 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export { registerLogstashUpgradeRoutes } from './register_upgrade_routes';

View file

@ -1,56 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { callWithRequestFactory } from '../../../lib/call_with_request_factory';
import { wrapUnknownError } from '../../../lib/error_wrappers';
import { INDEX_NAMES } from '../../../../common/constants';
import { licensePreRoutingFactory } from '../../../lib/license_pre_routing_factory';
function doesIndexExist(callWithRequest) {
return callWithRequest('indices.exists', {
index: INDEX_NAMES.PIPELINES,
});
}
async function executeUpgrade(callWithRequest) {
// If index doesn't exist yet, there is no mapping to upgrade
if (!(await doesIndexExist(callWithRequest))) {
return;
}
return callWithRequest('indices.putMapping', {
index: INDEX_NAMES.PIPELINES,
body: {
properties: {
pipeline_settings: {
dynamic: false,
type: 'object',
},
},
},
});
}
export function registerExecuteRoute(server) {
const licensePreRouting = licensePreRoutingFactory(server);
server.route({
path: '/api/logstash/upgrade',
method: 'POST',
handler: async request => {
const callWithRequest = callWithRequestFactory(server, request);
try {
await executeUpgrade(callWithRequest);
return { is_upgraded: true };
} catch (err) {
throw wrapUnknownError(err);
}
},
config: {
pre: [licensePreRouting],
},
});
}

View file

@ -1,11 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { registerExecuteRoute } from './register_execute_route';
export function registerLogstashUpgradeRoutes(server) {
registerExecuteRoute(server);
}

View file

@ -53,6 +53,7 @@ const createLicenseMock = () => {
};
mock.check.mockReturnValue({ state: 'valid' });
mock.hasAtLeast.mockReturnValue(true);
mock.getFeature.mockReturnValue({ isAvailable: true, isEnabled: true });
return mock;
};
export const licenseMock = {

View file

@ -12,3 +12,4 @@ export const plugin = (context: PluginInitializerContext) => new LicensingPlugin
export * from '../common/types';
export * from './types';
export { config } from './licensing_config';
export { CheckLicense, wrapRouteWithLicenseCheck } from './wrap_route_with_license_check';

View file

@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { httpServerMock } from 'src/core/server/mocks';
import { wrapRouteWithLicenseCheck, CheckLicense } from './wrap_route_with_license_check';
const context = {
licensing: {
license: {},
},
} as any;
const request = httpServerMock.createKibanaRequest();
describe('wrapRouteWithLicenseCheck', () => {
it('calls route handler if checkLicense returns "valid": true', async () => {
const checkLicense: CheckLicense = () => ({ valid: true, message: null });
const routeHandler = jest.fn();
const wrapper = wrapRouteWithLicenseCheck(checkLicense, routeHandler);
const response = httpServerMock.createResponseFactory();
await wrapper(context, request, response);
expect(routeHandler).toHaveBeenCalledTimes(1);
expect(routeHandler).toHaveBeenCalledWith(context, request, response);
});
it('does not call route handler if checkLicense returns "valid": false', async () => {
const checkLicense: CheckLicense = () => ({ valid: false, message: 'reason' });
const routeHandler = jest.fn();
const wrapper = wrapRouteWithLicenseCheck(checkLicense, routeHandler);
const response = httpServerMock.createResponseFactory();
await wrapper(context, request, response);
expect(routeHandler).toHaveBeenCalledTimes(0);
expect(response.forbidden).toHaveBeenCalledTimes(1);
expect(response.forbidden).toHaveBeenCalledWith({ body: 'reason' });
});
it('allows an exception to bubble up if handler throws', async () => {
const checkLicense: CheckLicense = () => ({ valid: true, message: null });
const routeHandler = () => {
throw new Error('reason');
};
const wrapper = wrapRouteWithLicenseCheck(checkLicense, routeHandler);
const response = httpServerMock.createResponseFactory();
await expect(wrapper(context, request, response)).rejects.toThrowErrorMatchingInlineSnapshot(
`"reason"`
);
});
it('allows an exception to bubble up if "checkLicense" throws', async () => {
const checkLicense: CheckLicense = () => {
throw new Error('reason');
};
const routeHandler = jest.fn();
const wrapper = wrapRouteWithLicenseCheck(checkLicense, routeHandler);
const response = httpServerMock.createResponseFactory();
await expect(wrapper(context, request, response)).rejects.toThrowErrorMatchingInlineSnapshot(
`"reason"`
);
expect(routeHandler).toHaveBeenCalledTimes(0);
});
});

View file

@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import {
RequestHandler,
RequestHandlerContext,
KibanaRequest,
RouteMethod,
KibanaResponseFactory,
} from 'src/core/server';
import { ILicense } from '../common/types';
export type CheckLicense = (
license: ILicense
) => { valid: false; message: string } | { valid: true; message: null };
export function wrapRouteWithLicenseCheck<P, Q, B>(
checkLicense: CheckLicense,
handler: RequestHandler<P, Q, B>
): RequestHandler<P, Q, B> {
return async (
context: RequestHandlerContext,
request: KibanaRequest<P, Q, B, RouteMethod>,
response: KibanaResponseFactory
) => {
const licenseCheckResult = checkLicense(context.licensing.license);
if (licenseCheckResult.valid) {
return handler(context, request, response);
} else {
return response.forbidden({
body: licenseCheckResult.message,
});
}
};
}

View file

@ -0,0 +1,12 @@
{
"id": "logstash",
"version": "0.0.1",
"kibanaVersion": "kibana",
"configPath": ["xpack", "logstash"],
"requiredPlugins": [
"licensing"
],
"optionalPlugins": ["security"],
"server": true,
"ui": false
}

View file

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { schema } from '@kbn/config-schema';
import { PluginInitializerContext, PluginConfigDescriptor } from 'src/core/server';
import { LogstashPlugin } from './plugin';
export const plugin = (context: PluginInitializerContext) => new LogstashPlugin(context);
export const config: PluginConfigDescriptor = {
schema: schema.object({
enabled: schema.boolean({ defaultValue: true }),
}),
};

View file

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { licensingMock } from '../../../../licensing/server/mocks';
import { checkLicense } from './check_license';
describe('check_license', function() {
describe('returns "valid": false & message when', () => {
it('license information is not available', () => {
const license = licensingMock.createLicenseMock();
license.isAvailable = false;
const { valid, message } = checkLicense(license);
expect(valid).toBe(false);
expect(message).toStrictEqual(expect.any(String));
});
it('license level is not enough', () => {
const license = licensingMock.createLicenseMock();
license.hasAtLeast.mockReturnValue(false);
const { valid, message } = checkLicense(license);
expect(valid).toBe(false);
expect(message).toStrictEqual(expect.any(String));
});
it('license is expired', () => {
const license = licensingMock.createLicenseMock();
license.isActive = false;
const { valid, message } = checkLicense(license);
expect(valid).toBe(false);
expect(message).toStrictEqual(expect.any(String));
});
it('elasticsearch security is disabled', () => {
const license = licensingMock.createLicenseMock();
license.getFeature.mockReturnValue({ isEnabled: false, isAvailable: false });
const { valid, message } = checkLicense(license);
expect(valid).toBe(false);
expect(message).toStrictEqual(expect.any(String));
});
});
it('returns "valid": true without message otherwise', () => {
const license = licensingMock.createLicenseMock();
const { valid, message } = checkLicense(license);
expect(valid).toBe(true);
expect(message).toBe(null);
});
});

View file

@ -0,0 +1,65 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { i18n } from '@kbn/i18n';
import { CheckLicense } from '../../../../licensing/server';
export const checkLicense: CheckLicense = license => {
if (!license.isAvailable) {
return {
valid: false,
message: i18n.translate(
'xpack.logstash.managementSection.notPossibleToManagePipelinesMessage',
{
defaultMessage:
'You cannot manage Logstash pipelines because license information is not available at this time.',
}
),
};
}
if (!license.hasAtLeast('standard')) {
return {
valid: false,
message: i18n.translate('xpack.logstash.managementSection.licenseDoesNotSupportDescription', {
defaultMessage:
'Your {licenseType} license does not support Logstash pipeline management features. Please upgrade your license.',
values: { licenseType: license.type },
}),
};
}
if (!license.isActive) {
return {
valid: false,
message: i18n.translate(
'xpack.logstash.managementSection.pipelineCrudOperationsNotAllowedDescription',
{
defaultMessage:
'You cannot edit, create, or delete your Logstash pipelines because your {licenseType} license has expired.',
values: { licenseType: license.type },
}
),
};
}
if (!license.getFeature('security').isEnabled) {
return {
valid: false,
message: i18n.translate('xpack.logstash.managementSection.enableSecurityDescription', {
defaultMessage:
'Security must be enabled in order to use Logstash pipeline management features.' +
' Please set xpack.security.enabled: true in your elasticsearch.yml.',
}),
};
}
return {
valid: true,
message: null,
};
};

View file

@ -4,4 +4,4 @@
* you may not use this file except in compliance with the Elastic License.
*/
export { callWithRequestFactory } from './call_with_request_factory';
export { checkLicense } from './check_license';

View file

@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { fetchAllFromScroll } from './fetch_all_from_scroll';
describe('fetch_all_from_scroll', () => {
let stubCallWithRequest: jest.Mock;
beforeEach(() => {
stubCallWithRequest = jest.fn();
stubCallWithRequest
.mockResolvedValueOnce({
hits: {
hits: ['newhit'],
},
_scroll_id: 'newScrollId',
})
.mockResolvedValueOnce({
hits: {
hits: [],
},
});
});
describe('#fetchAllFromScroll', () => {
describe('when the passed-in response has no hits', () => {
const mockResponse = {
hits: {
hits: [],
},
};
it('should return an empty array of hits', async () => {
const hits = await fetchAllFromScroll(mockResponse as any, stubCallWithRequest);
expect(hits).toEqual([]);
});
it('should not call callWithRequest', async () => {
await fetchAllFromScroll(mockResponse as any, stubCallWithRequest);
expect(stubCallWithRequest).toHaveBeenCalledTimes(0);
});
});
describe('when the passed-in response has some hits', () => {
const mockResponse = {
hits: {
hits: ['foo', 'bar'],
},
_scroll_id: 'originalScrollId',
};
it('should return the hits from the response', async () => {
const hits = await fetchAllFromScroll(mockResponse as any, stubCallWithRequest);
expect(hits).toEqual(['foo', 'bar', 'newhit']);
});
it('should call callWithRequest', async () => {
await fetchAllFromScroll(mockResponse as any, stubCallWithRequest);
expect(stubCallWithRequest).toHaveBeenCalledTimes(2);
const firstCallWithRequestCallArgs = stubCallWithRequest.mock.calls[0];
expect(firstCallWithRequestCallArgs[1].body.scroll_id).toBe('originalScrollId');
const secondCallWithRequestCallArgs = stubCallWithRequest.mock.calls[1];
expect(secondCallWithRequestCallArgs[1].body.scroll_id).toBe('newScrollId');
});
});
});
});

View file

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { APICaller } from 'src/core/server';
import { SearchResponse } from 'elasticsearch';
import { ES_SCROLL_SETTINGS } from '../../../common/constants';
import { Hits } from '../../types';
export async function fetchAllFromScroll(
response: SearchResponse<any>,
callWithRequest: APICaller,
hits: Hits = []
): Promise<Hits> {
const newHits = response.hits?.hits || [];
const scrollId = response._scroll_id;
if (newHits.length > 0) {
hits.push(...newHits);
const innerResponse = await callWithRequest('scroll', {
body: {
scroll: ES_SCROLL_SETTINGS.KEEPALIVE,
scroll_id: scrollId,
},
});
return await fetchAllFromScroll(innerResponse, callWithRequest, hits);
}
return hits;
}

View file

@ -4,8 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import expect from '@kbn/expect';
import { Cluster } from '../cluster';
import { Cluster } from './cluster';
describe('cluster', () => {
describe('Cluster', () => {
@ -16,7 +15,7 @@ describe('cluster', () => {
it('returns correct Cluster instance', () => {
const cluster = Cluster.fromUpstreamJSON(upstreamJSON);
expect(cluster.uuid).to.be(upstreamJSON.cluster_uuid);
expect(cluster.uuid).toEqual(upstreamJSON.cluster_uuid);
});
});
});

View file

@ -10,11 +10,12 @@ import { get } from 'lodash';
* This model deals with a cluster object from ES and converts it to Kibana downstream
*/
export class Cluster {
constructor(props) {
this.uuid = props.uuid;
public readonly uuid: string;
constructor({ uuid }: { uuid: string }) {
this.uuid = uuid;
}
get downstreamJSON() {
public get downstreamJSON() {
const json = {
uuid: this.uuid,
};
@ -23,8 +24,8 @@ export class Cluster {
}
// generate Pipeline object from elasticsearch response
static fromUpstreamJSON(upstreamCluster) {
const uuid = get(upstreamCluster, 'cluster_uuid');
static fromUpstreamJSON(upstreamCluster: Record<string, string>) {
const uuid = get<string>(upstreamCluster, 'cluster_uuid');
return new Cluster({ uuid });
}
}

View file

@ -4,8 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import expect from '@kbn/expect';
import { Pipeline } from '../pipeline';
import { Pipeline } from './pipeline';
describe('pipeline', () => {
describe('Pipeline', () => {
@ -25,10 +24,10 @@ describe('pipeline', () => {
it('returns correct Pipeline instance', () => {
const pipeline = Pipeline.fromUpstreamJSON(upstreamJSON);
expect(pipeline.id).to.be(upstreamJSON._id);
expect(pipeline.description).to.be(upstreamJSON._source.description);
expect(pipeline.username).to.be(upstreamJSON._source.username);
expect(pipeline.pipeline).to.be(upstreamJSON._source.pipeline);
expect(pipeline.id).toBe(upstreamJSON._id);
expect(pipeline.description).toBe(upstreamJSON._source.description);
expect(pipeline.username).toBe(upstreamJSON._source.username);
expect(pipeline.pipeline).toBe(upstreamJSON._source.pipeline);
});
it('throws if pipeline argument does not contain an id property', () => {
@ -39,7 +38,7 @@ describe('pipeline', () => {
const testFromUpstreamJsonError = () => {
return Pipeline.fromUpstreamJSON(badJSON);
};
expect(testFromUpstreamJsonError).to.throwError(
expect(testFromUpstreamJsonError).toThrowError(
/upstreamPipeline argument must contain an id property/i
);
});
@ -64,12 +63,12 @@ describe('pipeline', () => {
pipeline: 'input {} filter { grok {} }\n output {}',
};
// can't do an object level comparison because modified field is always `now`
expect(pipeline.upstreamJSON.last_modified).to.be.a('string');
expect(pipeline.upstreamJSON.description).to.be(expectedUpstreamJSON.description);
expect(pipeline.upstreamJSON.pipeline_metadata).to.eql(
expect(pipeline.upstreamJSON.last_modified).toStrictEqual(expect.any(String));
expect(pipeline.upstreamJSON.description).toBe(expectedUpstreamJSON.description);
expect(pipeline.upstreamJSON.pipeline_metadata).toEqual(
expectedUpstreamJSON.pipeline_metadata
);
expect(pipeline.upstreamJSON.pipeline).to.be(expectedUpstreamJSON.pipeline);
expect(pipeline.upstreamJSON.pipeline).toBe(expectedUpstreamJSON.pipeline);
});
});
});

View file

@ -9,19 +9,38 @@ import { badRequest } from 'boom';
import { get } from 'lodash';
import { i18n } from '@kbn/i18n';
interface PipelineOptions {
id: string;
description: string;
pipeline: string;
username?: string;
settings?: Record<string, any>;
}
interface DownstreamPipeline {
description: string;
pipeline: string;
settings?: Record<string, any>;
}
/**
* This model deals with a pipeline object from ES and converts it to Kibana downstream
*/
export class Pipeline {
constructor(props) {
this.id = props.id;
this.description = props.description;
this.username = props.username;
this.pipeline = props.pipeline;
this.settings = props.settings || {};
public readonly id: string;
public readonly description: string;
public readonly username?: string;
public readonly pipeline: string;
private readonly settings: Record<string, any>;
constructor(options: PipelineOptions) {
this.id = options.id;
this.description = options.description;
this.username = options.username;
this.pipeline = options.pipeline;
this.settings = options.settings || {};
}
get downstreamJSON() {
public get downstreamJSON() {
const json = {
id: this.id,
description: this.description,
@ -41,7 +60,7 @@ export class Pipeline {
* pipeline_metadata.type is the Logstash config type (future: LIR, json, etc)
* @return {[JSON]} [Elasticsearch JSON]
*/
get upstreamJSON() {
public get upstreamJSON() {
return {
description: this.description,
last_modified: moment().toISOString(),
@ -56,7 +75,11 @@ export class Pipeline {
}
// generate Pipeline object from kibana response
static fromDownstreamJSON(downstreamPipeline, pipelineId, username) {
static fromDownstreamJSON(
downstreamPipeline: DownstreamPipeline,
pipelineId: string,
username?: string
) {
const opts = {
id: pipelineId,
description: downstreamPipeline.description,
@ -69,7 +92,7 @@ export class Pipeline {
}
// generate Pipeline object from elasticsearch response
static fromUpstreamJSON(upstreamPipeline) {
static fromUpstreamJSON(upstreamPipeline: Record<string, any>) {
if (!upstreamPipeline._id) {
throw badRequest(
i18n.translate(
@ -80,13 +103,13 @@ export class Pipeline {
)
);
}
const id = get(upstreamPipeline, '_id');
const description = get(upstreamPipeline, '_source.description');
const username = get(upstreamPipeline, '_source.username');
const pipeline = get(upstreamPipeline, '_source.pipeline');
const settings = get(upstreamPipeline, '_source.pipeline_settings');
const id = get<string>(upstreamPipeline, '_id');
const description = get<string>(upstreamPipeline, '_source.description');
const username = get<string>(upstreamPipeline, '_source.username');
const pipeline = get<string>(upstreamPipeline, '_source.pipeline');
const settings = get<Record<string, any>>(upstreamPipeline, '_source.pipeline_settings');
const opts = { id, description, username, pipeline, settings };
const opts: PipelineOptions = { id, description, username, pipeline, settings };
return new Pipeline(opts);
}

View file

@ -4,8 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import expect from '@kbn/expect';
import { PipelineListItem } from '../pipeline_list_item';
import { PipelineListItem } from './pipeline_list_item';
describe('pipeline_list_item', () => {
describe('PipelineListItem', () => {
@ -21,15 +20,18 @@ describe('pipeline_list_item', () => {
username: 'elastic',
pipeline: 'input {} filter { grok {} }\n output {}',
},
_index: 'index',
_type: 'type',
_score: 100,
};
describe('fromUpstreamJSON factory method', () => {
it('returns correct PipelineListItem instance', () => {
const pipelineListItem = PipelineListItem.fromUpstreamJSON(upstreamJSON);
expect(pipelineListItem.id).to.be(upstreamJSON._id);
expect(pipelineListItem.description).to.be(upstreamJSON._source.description);
expect(pipelineListItem.username).to.be(upstreamJSON._source.username);
expect(pipelineListItem.last_modified).to.be(upstreamJSON._source.last_modified);
expect(pipelineListItem.id).toBe(upstreamJSON._id);
expect(pipelineListItem.description).toBe(upstreamJSON._source.description);
expect(pipelineListItem.username).toBe(upstreamJSON._source.username);
expect(pipelineListItem.last_modified).toBe(upstreamJSON._source.last_modified);
});
});
@ -42,7 +44,7 @@ describe('pipeline_list_item', () => {
username: 'elastic',
last_modified: '2017-05-14T02:50:51.250Z',
};
expect(pipelineListItem.downstreamJSON).to.eql(expectedDownstreamJSON);
expect(pipelineListItem.downstreamJSON).toEqual(expectedDownstreamJSON);
});
});
});

View file

@ -5,16 +5,21 @@
*/
import { get } from 'lodash';
import { Hit, PipelineListItemOptions } from '../../types';
export class PipelineListItem {
constructor(props) {
this.id = props.id;
this.description = props.description;
this.last_modified = props.last_modified;
this.username = props.username;
public readonly id: string;
public readonly description: string;
public readonly last_modified: string;
public readonly username: string;
constructor(options: PipelineListItemOptions) {
this.id = options.id;
this.description = options.description;
this.last_modified = options.last_modified;
this.username = options.username;
}
get downstreamJSON() {
public get downstreamJSON() {
const json = {
id: this.id,
description: this.description,
@ -29,12 +34,12 @@ export class PipelineListItem {
* Takes the json GET response from ES and constructs a pipeline model to be used
* in Kibana downstream
*/
static fromUpstreamJSON(pipeline) {
static fromUpstreamJSON(pipeline: Hit) {
const opts = {
id: pipeline._id,
description: get(pipeline, '_source.description'),
last_modified: get(pipeline, '_source.last_modified'),
username: get(pipeline, '_source.username'),
description: get<string>(pipeline, '_source.description'),
last_modified: get<string>(pipeline, '_source.last_modified'),
username: get<string>(pipeline, '_source.username'),
};
return new PipelineListItem(opts);

View file

@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import {
CoreSetup,
CoreStart,
ICustomClusterClient,
Logger,
Plugin,
PluginInitializerContext,
} from 'src/core/server';
import { LicensingPluginSetup } from '../../licensing/server';
import { SecurityPluginSetup } from '../../security/server';
import { registerRoutes } from './routes';
interface SetupDeps {
licensing: LicensingPluginSetup;
security?: SecurityPluginSetup;
}
export class LogstashPlugin implements Plugin {
private readonly logger: Logger;
private esClient?: ICustomClusterClient;
private coreSetup?: CoreSetup;
constructor(context: PluginInitializerContext) {
this.logger = context.logger.get();
}
setup(core: CoreSetup, deps: SetupDeps) {
this.logger.debug('Setting up Logstash plugin');
this.coreSetup = core;
registerRoutes(core.http.createRouter(), deps.security);
}
start(core: CoreStart) {
const esClient = core.elasticsearch.legacy.createClient('logstash');
this.coreSetup!.http.registerRouteHandlerContext('logstash', async (context, request) => {
return { esClient: esClient.asScoped(request) };
});
}
stop() {
if (this.esClient) {
this.esClient.close();
}
}
}

View file

@ -3,5 +3,4 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export { registerLogstashClusterRoutes } from './register_cluster_routes';
export { registerClusterLoadRoute } from './load';

View file

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { IRouter } from 'src/core/server';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
import { Cluster } from '../../models/cluster';
import { checkLicense } from '../../lib/check_license';
export function registerClusterLoadRoute(router: IRouter) {
router.get(
{
path: '/api/logstash/cluster',
validate: false,
},
wrapRouteWithLicenseCheck(checkLicense, async (context, request, response) => {
try {
const client = context.logstash!.esClient;
const info = await client.callAsCurrentUser('info');
return response.ok({
body: {
cluster: Cluster.fromUpstreamJSON(info).downstreamJSON,
},
});
} catch (err) {
if (err.status === 403) {
return response.ok();
}
return response.internalError();
}
})
);
}

View file

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { IRouter } from 'src/core/server';
import { SecurityPluginSetup } from '../../../security/server';
import { registerClusterLoadRoute } from './cluster';
import {
registerPipelineDeleteRoute,
registerPipelineLoadRoute,
registerPipelineSaveRoute,
} from './pipeline';
import { registerPipelinesListRoute, registerPipelinesDeleteRoute } from './pipelines';
import { registerUpgradeRoute } from './upgrade';
export function registerRoutes(router: IRouter, security?: SecurityPluginSetup) {
registerClusterLoadRoute(router);
registerPipelineDeleteRoute(router);
registerPipelineLoadRoute(router);
registerPipelineSaveRoute(router, security);
registerPipelinesListRoute(router);
registerPipelinesDeleteRoute(router);
registerUpgradeRoute(router);
}

View file

@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { schema } from '@kbn/config-schema';
import { IRouter } from 'src/core/server';
import { INDEX_NAMES } from '../../../common/constants';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
import { checkLicense } from '../../lib/check_license';
export function registerPipelineDeleteRoute(router: IRouter) {
router.delete(
{
path: '/api/logstash/pipeline/{id}',
validate: {
params: schema.object({
id: schema.string(),
}),
},
},
wrapRouteWithLicenseCheck(
checkLicense,
router.handleLegacyErrors(async (context, request, response) => {
const client = context.logstash!.esClient;
await client.callAsCurrentUser('delete', {
index: INDEX_NAMES.PIPELINES,
id: request.params.id,
refresh: 'wait_for',
});
return response.noContent();
})
)
);
}

View file

@ -3,7 +3,6 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export { wrapCustomError } from './wrap_custom_error';
export { wrapEsError } from './wrap_es_error';
export { wrapUnknownError } from './wrap_unknown_error';
export { registerPipelineDeleteRoute } from './delete';
export { registerPipelineLoadRoute } from './load';
export { registerPipelineSaveRoute } from './save';

View file

@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { schema } from '@kbn/config-schema';
import { IRouter } from 'src/core/server';
import { INDEX_NAMES } from '../../../common/constants';
import { Pipeline } from '../../models/pipeline';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
import { checkLicense } from '../../lib/check_license';
export function registerPipelineLoadRoute(router: IRouter) {
router.get(
{
path: '/api/logstash/pipeline/{id}',
validate: {
params: schema.object({
id: schema.string(),
}),
},
},
wrapRouteWithLicenseCheck(
checkLicense,
router.handleLegacyErrors(async (context, request, response) => {
const client = context.logstash!.esClient;
const result = await client.callAsCurrentUser('get', {
index: INDEX_NAMES.PIPELINES,
id: request.params.id,
_source: ['description', 'username', 'pipeline', 'pipeline_settings'],
ignore: [404],
});
if (!result.found) {
return response.notFound();
}
return response.ok({
body: Pipeline.fromUpstreamJSON(result).downstreamJSON,
});
})
)
);
}

View file

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { schema } from '@kbn/config-schema';
import { i18n } from '@kbn/i18n';
import { IRouter } from 'src/core/server';
import { INDEX_NAMES } from '../../../common/constants';
import { Pipeline } from '../../models/pipeline';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
import { SecurityPluginSetup } from '../../../../security/server';
import { checkLicense } from '../../lib/check_license';
export function registerPipelineSaveRoute(router: IRouter, security?: SecurityPluginSetup) {
router.put(
{
path: '/api/logstash/pipeline/{id}',
validate: {
params: schema.object({
id: schema.string(),
}),
body: schema.object({
id: schema.string(),
description: schema.string(),
pipeline: schema.string(),
username: schema.string(),
settings: schema.maybe(schema.object({}, { unknowns: 'allow' })),
}),
},
},
wrapRouteWithLicenseCheck(
checkLicense,
router.handleLegacyErrors(async (context, request, response) => {
try {
let username: string | undefined;
if (security) {
const user = await security.authc.getCurrentUser(request);
username = user?.username;
}
const client = context.logstash!.esClient;
const pipeline = Pipeline.fromDownstreamJSON(request.body, request.params.id, username);
await client.callAsCurrentUser('index', {
index: INDEX_NAMES.PIPELINES,
id: pipeline.id,
body: pipeline.upstreamJSON,
refresh: 'wait_for',
});
return response.noContent();
} catch (err) {
const statusCode = err.statusCode;
// handles the permissions issue of Elasticsearch
if (statusCode === 403) {
return response.forbidden({
body: i18n.translate('xpack.logstash.insufficientUserPermissionsDescription', {
defaultMessage: 'Insufficient user permissions for managing Logstash pipelines',
}),
});
}
throw err;
}
})
)
);
}

View file

@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { schema } from '@kbn/config-schema';
import { APICaller, IRouter } from 'src/core/server';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
import { INDEX_NAMES } from '../../../common/constants';
import { checkLicense } from '../../lib/check_license';
async function deletePipelines(callWithRequest: APICaller, pipelineIds: string[]) {
const deletePromises = pipelineIds.map(pipelineId => {
return callWithRequest('delete', {
index: INDEX_NAMES.PIPELINES,
id: pipelineId,
refresh: 'wait_for',
})
.then(success => ({ success }))
.catch(error => ({ error }));
});
const results = await Promise.all(deletePromises);
const successes = results.filter(result => Reflect.has(result, 'success'));
const errors = results.filter(result => Reflect.has(result, 'error'));
return {
numSuccesses: successes.length,
numErrors: errors.length,
};
}
export function registerPipelinesDeleteRoute(router: IRouter) {
router.post(
{
path: '/api/logstash/pipelines/delete',
validate: {
body: schema.object({
pipelineIds: schema.arrayOf(schema.string()),
}),
},
},
wrapRouteWithLicenseCheck(
checkLicense,
router.handleLegacyErrors(async (context, request, response) => {
const client = context.logstash!.esClient;
const results = await deletePipelines(client.callAsCurrentUser, request.body.pipelineIds);
return response.ok({ body: { results } });
})
)
);
}

View file

@ -3,5 +3,5 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export { registerLogstashPipelineRoutes } from './register_pipeline_routes';
export { registerPipelinesListRoute } from './list';
export { registerPipelinesDeleteRoute } from './delete';

View file

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { i18n } from '@kbn/i18n';
import { SearchResponse } from 'elasticsearch';
import { APICaller, IRouter } from 'src/core/server';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
import { INDEX_NAMES, ES_SCROLL_SETTINGS } from '../../../common/constants';
import { PipelineListItem } from '../../models/pipeline_list_item';
import { fetchAllFromScroll } from '../../lib/fetch_all_from_scroll';
import { checkLicense } from '../../lib/check_license';
async function fetchPipelines(callWithRequest: APICaller) {
const params = {
index: INDEX_NAMES.PIPELINES,
scroll: ES_SCROLL_SETTINGS.KEEPALIVE,
body: {
size: ES_SCROLL_SETTINGS.PAGE_SIZE,
},
ignore: [404],
};
const response = await callWithRequest<SearchResponse<any>>('search', params);
return fetchAllFromScroll(response, callWithRequest);
}
export function registerPipelinesListRoute(router: IRouter) {
router.get(
{
path: '/api/logstash/pipelines',
validate: false,
},
wrapRouteWithLicenseCheck(
checkLicense,
router.handleLegacyErrors(async (context, request, response) => {
try {
const client = context.logstash!.esClient;
const pipelinesHits = await fetchPipelines(client.callAsCurrentUser);
const pipelines = pipelinesHits.map(pipeline => {
return PipelineListItem.fromUpstreamJSON(pipeline).downstreamJSON;
});
return response.ok({ body: { pipelines } });
} catch (err) {
const statusCode = err.statusCode;
// handles the permissions issue of Elasticsearch
if (statusCode === 403) {
return response.forbidden({
body: i18n.translate('xpack.logstash.insufficientUserPermissionsDescription', {
defaultMessage: 'Insufficient user permissions for managing Logstash pipelines',
}),
});
}
throw err;
}
})
)
);
}

View file

@ -4,4 +4,4 @@
* you may not use this file except in compliance with the Elastic License.
*/
export { licensePreRoutingFactory } from './license_pre_routing_factory';
export { registerUpgradeRoute } from './upgrade';

View file

@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { IRouter } from 'src/core/server';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
import { INDEX_NAMES } from '../../../common/constants';
import { checkLicense } from '../../lib/check_license';
export function registerUpgradeRoute(router: IRouter) {
router.post(
{
path: '/api/logstash/upgrade',
validate: false,
},
wrapRouteWithLicenseCheck(
checkLicense,
router.handleLegacyErrors(async (context, request, response) => {
const client = context.logstash!.esClient;
const doesIndexExist = await client.callAsCurrentUser('indices.exists', {
index: INDEX_NAMES.PIPELINES,
});
// If index doesn't exist yet, there is no mapping to upgrade
if (doesIndexExist) {
await client.callAsCurrentUser('indices.putMapping', {
index: INDEX_NAMES.PIPELINES,
body: {
properties: {
pipeline_settings: {
dynamic: false,
type: 'object',
},
},
},
});
}
return response.ok({ body: { is_upgraded: true } });
})
)
);
}

View file

@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { SearchResponse } from 'elasticsearch';
import { IScopedClusterClient } from 'src/core/server';
type UnwrapArray<T> = T extends Array<infer U> ? U : never;
export type Hits = SearchResponse<any>['hits']['hits'];
export type Hit = UnwrapArray<Hits>;
export interface PipelineListItemOptions {
id: string;
description: string;
last_modified: string;
username: string;
}
declare module 'src/core/server' {
interface RequestHandlerContext {
logstash?: {
esClient: IScopedClusterClient;
};
}
}

View file

@ -78,14 +78,14 @@ describe('license features', function() {
expect(subscriptionHandler.mock.calls[1]).toMatchInlineSnapshot(`
Array [
Object {
"allowLogin": false,
"allowRbac": false,
"allowRoleDocumentLevelSecurity": false,
"allowRoleFieldLevelSecurity": false,
"allowSubFeaturePrivileges": false,
"showLinks": false,
"showLogin": false,
"showRoleMappingsManagement": false,
"allowLogin": true,
"allowRbac": true,
"allowRoleDocumentLevelSecurity": true,
"allowRoleFieldLevelSecurity": true,
"allowSubFeaturePrivileges": true,
"showLinks": true,
"showLogin": true,
"showRoleMappingsManagement": true,
},
]
`);

View file

@ -3,8 +3,9 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { FtrProviderContext } from '../../../ftr_provider_context';
export default function({ loadTestFile }) {
export default function({ loadTestFile }: FtrProviderContext) {
describe('cluster', () => {
loadTestFile(require.resolve('./load'));
});

View file

@ -5,8 +5,9 @@
*/
import expect from '@kbn/expect';
import { FtrProviderContext } from '../../../ftr_provider_context';
export default function({ getService }) {
export default function({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
const es = getService('legacyEs');

View file

@ -3,8 +3,9 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { FtrProviderContext } from '../../ftr_provider_context';
export default function({ loadTestFile }) {
export default function({ loadTestFile }: FtrProviderContext) {
describe('logstash', () => {
loadTestFile(require.resolve('./pipelines'));
loadTestFile(require.resolve('./pipeline'));

View file

@ -3,8 +3,9 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { FtrProviderContext } from '../../../ftr_provider_context';
export default function({ getService }) {
export default function({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
const esArchiver = getService('esArchiver');
describe('delete', () => {

View file

@ -3,8 +3,8 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export default function({ loadTestFile }) {
import { FtrProviderContext } from '../../../ftr_provider_context';
export default function({ loadTestFile }: FtrProviderContext) {
describe('pipeline', () => {
loadTestFile(require.resolve('./load'));
loadTestFile(require.resolve('./save'));

View file

@ -5,9 +5,11 @@
*/
import expect from '@kbn/expect';
import pipeline from './fixtures/load';
import { FtrProviderContext } from '../../../ftr_provider_context';
export default function({ getService }) {
import pipeline from './fixtures/load.json';
export default function({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
const esArchiver = getService('esArchiver');
describe('list', () => {

View file

@ -5,8 +5,9 @@
*/
import expect from '@kbn/expect';
import { FtrProviderContext } from '../../../ftr_provider_context';
export default function({ getService }) {
export default function({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
const esArchiver = getService('esArchiver');
describe('save', () => {

View file

@ -3,8 +3,9 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { FtrProviderContext } from '../../../ftr_provider_context';
export default function({ getService }) {
export default function({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
const esArchiver = getService('esArchiver');
describe('delete', () => {

View file

@ -3,8 +3,9 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { FtrProviderContext } from '../../../ftr_provider_context';
export default function({ loadTestFile }) {
export default function({ loadTestFile }: FtrProviderContext) {
describe('pipelines', () => {
loadTestFile(require.resolve('./list'));
loadTestFile(require.resolve('./delete'));

View file

@ -5,9 +5,10 @@
*/
import expect from '@kbn/expect';
import pipelineList from './fixtures/list';
import { FtrProviderContext } from '../../../ftr_provider_context';
import pipelineList from './fixtures/list.json';
export default function({ getService }) {
export default function({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
const esArchiver = getService('esArchiver');
describe('list', () => {