Merge pull request #6121 from BigFunger/ingest-pipeline-setup-server

[ingest] Creates the simulate api route
This commit is contained in:
Matt Bargar 2016-03-09 10:11:59 -05:00
commit 4d7b468e3e
15 changed files with 594 additions and 0 deletions

View file

@ -0,0 +1,87 @@
var expect = require('expect.js');
var sinon = require('sinon');
var keysDeep = require('../keys_deep');
describe('keys deep', function () {
it('should list first level properties', function () {
let object = {
property1: 'value1',
property2: 'value2'
};
let expected = [
'property1',
'property2'
];
const keys = keysDeep(object);
expect(keys).to.eql(expected);
});
it('should list nested properties', function () {
let object = {
property1: 'value1',
property2: 'value2',
property3: {
subProperty1: 'value1.1'
}
};
let expected = [
'property1',
'property2',
'property3.subProperty1',
'property3'
];
const keys = keysDeep(object);
expect(keys).to.eql(expected);
});
it('should recursivly list nested properties', function () {
let object = {
property1: 'value1',
property2: 'value2',
property3: {
subProperty1: 'value1.1',
subProperty2: {
prop1: 'value1.2.1',
prop2: 'value2.2.2'
},
subProperty3: 'value1.3'
}
};
let expected = [
'property1',
'property2',
'property3.subProperty1',
'property3.subProperty2.prop1',
'property3.subProperty2.prop2',
'property3.subProperty2',
'property3.subProperty3',
'property3'
];
const keys = keysDeep(object);
expect(keys).to.eql(expected);
});
it('should list array properties, but not contents', function () {
let object = {
property1: 'value1',
property2: [ 'item1', 'item2' ]
};
let expected = [
'property1',
'property2'
];
const keys = keysDeep(object);
expect(keys).to.eql(expected);
});
});

View file

@ -0,0 +1,21 @@
const _ = require('lodash');
export default function keysDeep(object, base) {
let result = [];
let delimitedBase = base ? base + '.' : '';
_.forEach(object, (value, key) => {
var fullKey = delimitedBase + key;
if (_.isPlainObject(value)) {
result = result.concat(keysDeep(value, fullKey));
} else {
result.push(fullKey);
}
});
if (base) {
result.push(base);
}
return result;
};

View file

@ -0,0 +1,87 @@
import expect from 'expect.js';
import _ from 'lodash';
import ingestSimulateApiKibanaToEsConverter from '../../converters/ingest_simulate_api_kibana_to_es_converter';
describe('ingestSimulateApiKibanaToEsConverter', function () {
it('populates the docs._source section and converts known processors', function () {
function buildSamplePipeline(input) {
return {
processors: [ { processor_id: 'processor1', type_id: 'set', target_field: 'bar', value: 'foo' } ],
input: input
};
}
function buildExpected(input) {
return {
pipeline : {
processors: [{
set: {
field: 'bar',
tag: 'processor1',
value: 'foo'
}
}]
},
'docs' : [
{ '_source': input }
]
};
}
let expected;
let actual;
expected = buildExpected(undefined);
actual = ingestSimulateApiKibanaToEsConverter(buildSamplePipeline(undefined));
expect(actual).to.eql(expected);
expected = buildExpected('foo');
actual = ingestSimulateApiKibanaToEsConverter(buildSamplePipeline('foo'));
expect(actual).to.eql(expected);
expected = buildExpected({ foo: 'bar' });
actual = ingestSimulateApiKibanaToEsConverter(buildSamplePipeline({ foo: 'bar' }));
expect(actual).to.eql(expected);
});
it('handles multiple processors', function () {
const pipeline = {
processors: [
{ processor_id: 'processor1', type_id: 'set', target_field: 'bar', value: 'foo' },
{ processor_id: 'processor2', type_id: 'set', target_field: 'bar', value: 'foo' },
],
input: {}
};
const expected = {
'pipeline': {
'processors': [
{
set: {
field: 'bar',
tag: 'processor1',
value: 'foo'
}
},
{
set: {
field: 'bar',
tag: 'processor2',
value: 'foo'
}
}
]
},
'docs': [
{'_source': {}}
]
};
const actual = ingestSimulateApiKibanaToEsConverter(pipeline);
expect(actual).to.eql(expected);
});
});

View file

@ -0,0 +1,152 @@
const expect = require('expect.js');
const _ = require('lodash');
import processESIngestSimulateResponse from '../process_es_ingest_simulate_response';
describe('processESIngestSimulateResponse', function () {
it('returns a result for each processor in the pipeline', function () {
const processors = [ { processorId: 'processor1' }, { processorId: 'processor2' } ];
const response = {
docs: [ { processor_results: [] } ]
};
const results = processESIngestSimulateResponse(processors, response);
expect(results.length).to.be(2);
});
it('each processor that does not receive a result will contain default info', function () {
const processors = [
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
];
const response = {
docs: [ { processor_results: [] } ]
};
const expected = [
{ processorId: 'processor1', output: 'foo', error: undefined },
{ processorId: 'processor2', output: 'bar', error: undefined },
{ processorId: 'processor3', output: 'baz', error: undefined }
];
const actual = processESIngestSimulateResponse(processors, response);
expect(actual).to.eql(expected);
});
it('each processor that receives a result will contain response info', function () {
const processors = [
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
];
const response = {
docs: [ { processor_results: [
{ tag: 'processor2', doc: { _source: 'new_bar' }, error: undefined },
{ tag: 'processor3', doc: { _source: 'new_baz' }, error: undefined }
] } ]
};
const expected = [
{ processorId: 'processor1', output: 'foo', error: undefined },
{ processorId: 'processor2', output: 'new_bar', error: undefined },
{ processorId: 'processor3', output: 'new_baz', error: undefined }
];
const actual = processESIngestSimulateResponse(processors, response);
expect(actual).to.eql(expected);
});
describe('processors that return an error object', function () {
it('will be the root_cause reason if one exists', function () {
const processors = [
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
];
const response = {
docs: [ { processor_results: [
{ tag: 'processor2', doc: { _source: 'new_bar' }, error: undefined },
{
tag: 'processor3',
doc: 'dummy',
error: { root_cause: [ { reason: 'something bad happened', type: 'general exception' } ] }
}
] } ]
};
const expected = [
{ processorId: 'processor1', output: 'foo', error: undefined },
{ processorId: 'processor2', output: 'new_bar', error: undefined },
{ processorId: 'processor3', output: undefined, error: { isNested: false, message: 'something bad happened'} }
];
const actual = processESIngestSimulateResponse(processors, response);
expect(actual).to.eql(expected);
});
it('will be the root_cause type if reason does not exists', function () {
const processors = [
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
];
const response = {
docs: [ { processor_results: [
{ tag: 'processor2', doc: { _source: 'new_bar' }, error: undefined },
{
tag: 'processor3',
doc: 'dummy',
error: { root_cause: [ { type: 'something bad happened' } ] }
}
] } ]
};
const expected = [
{ processorId: 'processor1', output: 'foo', error: undefined },
{ processorId: 'processor2', output: 'new_bar', error: undefined },
{ processorId: 'processor3', output: undefined, error: { isNested: false, message: 'something bad happened'} }
];
const actual = processESIngestSimulateResponse(processors, response);
expect(actual).to.eql(expected);
});
it('any processor after errored processor will be set to a nested error state', function () {
const processors = [
{ processorId: 'processor0', outputObject: 'oof' },
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
];
const response = {
docs: [
{
processor_results: [
{ tag: 'processor0', doc: { _source: 'new_oof' }, error: undefined },
{
tag: 'processor1',
doc: 'dummy',
error: { root_cause: [ { reason: 'something bad happened' } ] }
}
]
}
]
};
const expected = [
{ processorId: 'processor0', output: 'new_oof', error: undefined },
{ processorId: 'processor1', output: undefined, error: { isNested: false, message: 'something bad happened' } },
{ processorId: 'processor2', output: undefined, error: { isNested: true, message: 'Invalid Parent Processor' } },
{ processorId: 'processor3', output: undefined, error: { isNested: true, message: 'Invalid Parent Processor' } }
];
const actual = processESIngestSimulateResponse(processors, response);
expect(actual).to.eql(expected);
});
});
});

View file

@ -0,0 +1,9 @@
export function set(processorApiDocument) {
return {
set: {
tag: processorApiDocument.processor_id,
field: processorApiDocument.target_field,
value: processorApiDocument.value
}
};
}

View file

@ -0,0 +1,17 @@
import _ from 'lodash';
import * as ingestProcessorApiKibanaToEsConverters from './ingest_processor_api_kibana_to_es_converters';
export default function ingestSimulateApiKibanaToEsConverter(simulateApiDocument) {
return {
pipeline: {
processors: _.map(simulateApiDocument.processors, (processor) => {
return ingestProcessorApiKibanaToEsConverters[processor.type_id](processor);
})
},
docs: [
{
_source: simulateApiDocument.input
}
]
};
}

View file

@ -0,0 +1,41 @@
const _ = require('lodash');
function translateError(esError) {
const rootCause = _.get(esError, 'root_cause[0]');
return _.get(rootCause, 'reason') || _.get(rootCause, 'type');
}
export default function processESIngestSimulateResponse(processors, resp) {
const results = processors.map((processor) => {
return {
processorId: processor.processorId,
output: processor.outputObject,
error: undefined
};
});
const processorResults = _.get(resp, 'docs[0].processor_results');
processorResults.forEach((processorResult) => {
const processorId = _.get(processorResult, 'tag');
const output = _.get(processorResult, 'doc._source');
const error = _.get(processorResult, 'error');
const errorMessage = translateError(error);
const badResult = _.find(results, { 'processorId': processorId });
badResult.output = errorMessage ? undefined : output;
badResult.error = errorMessage ? { isNested: false, message: errorMessage } : undefined;
});
const errorIndex = _.findIndex(results, (result) => { return result.error !== undefined; });
if (errorIndex !== -1) {
for (let i = errorIndex + 1; i < results.length; i++) {
const badResult = results[i];
badResult.output = undefined;
badResult.error = { isNested: true, message: 'Invalid Parent Processor' };
}
}
return results;
};

View file

@ -0,0 +1,11 @@
import Joi from 'joi';
const base = Joi.object({
processor_id: Joi.string().required()
}).unknown();
export const set = base.keys({
type_id: Joi.string().only('set').required(),
target_field: Joi.string().required(),
value: Joi.any().required()
});

View file

@ -0,0 +1,8 @@
import Joi from 'joi';
import * as ingestProcessorSchemas from './resources/ingest_processor_schemas';
import _ from 'lodash';
export default Joi.object({
processors: Joi.array().items(_.values(ingestProcessorSchemas)).required().min(1),
input: Joi.object().required()
});

View file

@ -1,4 +1,5 @@
export default function (server) {
require('./register_post')(server);
require('./register_delete')(server);
require('./register_simulate')(server);
}

View file

@ -0,0 +1,32 @@
import _ from 'lodash';
import processESIngestSimulateResponse from '../../../lib/process_es_ingest_simulate_response';
import simulateRequestSchema from '../../../lib/schemas/simulate_request_schema';
import ingestSimulateApiKibanaToEsConverter from '../../../lib/converters/ingest_simulate_api_kibana_to_es_converter';
import { keysToCamelCaseShallow, keysToSnakeCaseShallow } from '../../../lib/case_conversion';
module.exports = function registerSimulate(server) {
server.route({
path: '/api/kibana/ingest/simulate',
method: 'POST',
config: {
validate: {
payload: simulateRequestSchema
}
},
handler: function (request, reply) {
const boundCallWithRequest = _.partial(server.plugins.elasticsearch.callWithRequest, request);
const simulateApiDocument = request.payload;
const body = ingestSimulateApiKibanaToEsConverter(simulateApiDocument);
return boundCallWithRequest('transport.request', {
path: '_ingest/pipeline/_simulate',
query: { verbose: true },
method: 'POST',
body: body
})
.then(_.partial(processESIngestSimulateResponse, _.map(simulateApiDocument.processors, keysToCamelCaseShallow)))
.then((processors) => _.map(processors, keysToSnakeCaseShallow))
.then(reply);
}
});
};

View file

@ -0,0 +1,49 @@
define(function (require) {
var Promise = require('bluebird');
var createTestData = require('intern/dojo/node!../../../unit/api/ingest/data');
var _ = require('intern/dojo/node!lodash');
var expect = require('intern/dojo/node!expect.js');
const testPipeline = {
processors: [{
processor_id: 'processor1',
type_id: 'set',
target_field: 'foo',
value: 'bar'
}],
input: {}
};
return function (bdd, scenarioManager, request) {
bdd.describe('simulate', function simulatePipeline() {
bdd.it('should return 400 for an invalid payload', function invalidPayload() {
return Promise.all([
request.post('/kibana/ingest/simulate').expect(400),
request.post('/kibana/ingest/simulate')
.send({})
.expect(400),
// requires at least one processor
request.post('/kibana/ingest/simulate')
.send({input: {}, processors: []})
.expect(400),
// All processors must have a processorId property and a typeId property
request.post('/kibana/ingest/simulate')
.send({input: {}, processors: [{}]})
.expect(400)
]);
});
bdd.it('should return 200 for a successful run', function () {
return request.post('/kibana/ingest/simulate')
.send(testPipeline)
.expect(200);
});
});
};
});

View file

@ -8,6 +8,8 @@ define(function (require) {
var expect = require('intern/dojo/node!expect.js');
var post = require('./_post');
var del = require('./_del');
var simulate = require('./_simulate');
var processors = require('./processors/index');
bdd.describe('ingest API', function () {
var scenarioManager = new ScenarioManager(url.format(serverConfig.servers.elasticsearch));
@ -23,5 +25,7 @@ define(function (require) {
post(bdd, scenarioManager, request);
del(bdd, scenarioManager, request);
simulate(bdd, scenarioManager, request);
processors(bdd, scenarioManager, request);
});
});

View file

@ -0,0 +1,66 @@
define(function (require) {
var Promise = require('bluebird');
var _ = require('intern/dojo/node!lodash');
var expect = require('intern/dojo/node!expect.js');
const testPipeline = {
processors: [{
processor_id: 'processor1',
type_id: 'set',
target_field: 'foo',
value: 'bar'
}],
input: {}
};
return function (bdd, scenarioManager, request) {
bdd.describe('simulate - set processor', function simulatePipeline() {
bdd.it('should return 400 for an invalid payload', function invalidPayload() {
return Promise.all([
// Set processor requires targetField property
request.post('/kibana/ingest/simulate')
.send({
input: {},
processors: [{
processor_id: 'processor1',
type_id: 'set',
value: 'bar'
}]
})
.expect(400)
]);
});
bdd.it('should return 200 for a valid simulate request', function validSetSimulate() {
return request.post('/kibana/ingest/simulate')
.send(testPipeline)
.expect(200);
});
bdd.it('should return a simulated output with the correct result for the given processor', function () {
return request.post('/kibana/ingest/simulate')
.send(testPipeline)
.expect(200)
.then(function (response) {
expect(response.body[0].output.foo).to.be.equal('bar');
});
});
bdd.it('should enforce snake case', function setSimulateSnakeCase() {
return request.post('/kibana/ingest/simulate')
.send({
processors: [{
processorId: 'processor1',
typeId: 'set',
targetField: 'foo',
value: 'bar'
}],
input: {}
})
.expect(400);
});
});
};
});

View file

@ -0,0 +1,9 @@
define(function (require) {
var set = require('./_set');
return function processors(bdd, scenarioManager, request) {
set(bdd, scenarioManager, request);
};
});