[ingest] Creates the simulate api route

This commit is contained in:
Jim Unger 2016-02-04 16:22:23 -06:00
parent 57008e6410
commit d2098090d6
8 changed files with 557 additions and 0 deletions

View file

@ -0,0 +1,22 @@
const _ = require('lodash');
export default [
{ // set
typeId: 'set',
title: 'Set',
targetField: '',
getDefinition: function (processor) {
return {
'set' : {
'tag': processor.processorId,
'field' : processor.targetField ? processor.targetField : '',
'value': processor.value ? processor.value : ''
}
};
},
getDescription: function (processor) {
const target = (processor.targetField) ? processor.targetField : '?';
return `[${target}]`;
}
}
];

View file

@ -0,0 +1,87 @@
var expect = require('expect.js');
var sinon = require('sinon');
var keysDeep = require('../keys_deep');
describe('keys deep', function () {
it('should list first level properties', function () {
let object = {
property1: 'value1',
property2: 'value2'
};
let expected = [
'property1',
'property2'
];
const keys = keysDeep(object);
expect(keys).to.eql(expected);
});
it('should list nested properties', function () {
let object = {
property1: 'value1',
property2: 'value2',
property3: {
subProperty1: 'value1.1'
}
};
let expected = [
'property1',
'property2',
'property3.subProperty1',
'property3'
];
const keys = keysDeep(object);
expect(keys).to.eql(expected);
});
it('should recursivly list nested properties', function () {
let object = {
property1: 'value1',
property2: 'value2',
property3: {
subProperty1: 'value1.1',
subProperty2: {
prop1: 'value1.2.1',
prop2: 'value2.2.2'
},
subProperty3: 'value1.3'
}
};
let expected = [
'property1',
'property2',
'property3.subProperty1',
'property3.subProperty2.prop1',
'property3.subProperty2.prop2',
'property3.subProperty2',
'property3.subProperty3',
'property3'
];
const keys = keysDeep(object);
expect(keys).to.eql(expected);
});
it('should list array properties, but not contents', function () {
let object = {
property1: 'value1',
property2: [ 'item1', 'item2' ]
};
let expected = [
'property1',
'property2'
];
const keys = keysDeep(object);
expect(keys).to.eql(expected);
});
});

View file

@ -0,0 +1,21 @@
const _ = require('lodash');
export default function keysDeep(object, base) {
let result = [];
let delimitedBase = base ? base + '.' : '';
_.forEach(object, (value, key) => {
var fullKey = delimitedBase + key;
if (_.isPlainObject(value)) {
result = result.concat(keysDeep(value, fullKey));
} else {
result.push(fullKey);
}
});
if (base) {
result.push(base);
}
return result;
};

View file

@ -0,0 +1,158 @@
const expect = require('expect.js');
const _ = require('lodash');
import { buildRequest } from '../ingest_simulate';
describe('buildRequest', function () {
const processorTypes = [
{
typeId: 'simple1',
getDefinition: function (processor) {
return {
'modified_value': `modified_${processor.value}`
};
}
},
{
typeId: 'simple2',
getDefinition: function (processor) {
return {
'value1': processor.value,
'value2': `${processor.typeId}-${processor.value}`
};
}
}
];
it('should throw an error if no processorTypes argument is passed or the argument is not a plain object', function () {
expect(buildRequest).to.throwException(/requires a processorTypes object array argument/);
expect(buildRequest).withArgs('').to.throwException(/requires a processorTypes object array argument/);
expect(buildRequest).withArgs({}).to.throwException(/requires a processorTypes object array argument/);
expect(buildRequest).withArgs([]).to.throwException(/requires a processorTypes object array argument/);
});
it('should throw an error if no pipeline argument is passed or the argument is not a plain object', function () {
expect(buildRequest).withArgs([{}], []).to.throwException(/requires a pipeline object argument/);
});
it('should throw an error if pipeline contains no processors', function () {
expect(buildRequest).withArgs([{}], {}).to.throwException(/pipeline contains no processors/);
expect(buildRequest).withArgs([{}], { processors: 'foo' }).to.throwException(/pipeline contains no processors/);
expect(buildRequest).withArgs([{}], { processors: {} }).to.throwException(/pipeline contains no processors/);
expect(buildRequest).withArgs([{}], { processors: [] }).to.throwException(/pipeline contains no processors/);
});
it('populates the docs._source section', function () {
function buildSamplePipeline(input) {
return {
processors: [ { typeId: 'simple1', value: 'foo' } ],
input: input
};
}
function buildExpected(input) {
return {
'pipeline' : {
'processors': [ { modified_value: 'modified_foo' } ]
},
'docs' : [
{ '_source': input }
]
};
}
let expected;
let actual;
expected = buildExpected(undefined);
actual = buildRequest(processorTypes, buildSamplePipeline(undefined));
expect(actual).to.eql(expected);
expected = buildExpected('foo');
actual = buildRequest(processorTypes, buildSamplePipeline('foo'));
expect(actual).to.eql(expected);
expected = buildExpected({ foo: 'bar' });
actual = buildRequest(processorTypes, buildSamplePipeline({ foo: 'bar' }));
expect(actual).to.eql(expected);
});
describe('populates the pipeline.processors section with type.getDefinition()', function () {
it(' - single processor type', function () {
const pipeline = {
processors: [ { typeId: 'simple1', value: 'foo' } ],
input: {}
};
const expected = {
'pipeline' : {
'processors': [ { modified_value: 'modified_foo' } ]
},
'docs' : [
{ '_source': {} }
]
};
const actual = buildRequest(processorTypes, pipeline);
expect(actual).to.eql(expected);
});
it(' - multiple of same type of processor type', function () {
const pipeline = {
processors: [
{ typeId: 'simple1', value: 'foo' },
{ typeId: 'simple1', value: 'bar' },
{ typeId: 'simple1', value: 'baz' }
],
input: {}
};
const expected = {
'pipeline' : {
'processors': [
{ modified_value: 'modified_foo' },
{ modified_value: 'modified_bar' },
{ modified_value: 'modified_baz' }
]
},
'docs' : [
{ '_source': {} }
]
};
const actual = buildRequest(processorTypes, pipeline);
expect(actual).to.eql(expected);
});
it(' - multiple processor types', function () {
const pipeline = {
processors: [
{ typeId: 'simple1', value: 'foo' },
{ typeId: 'simple2', value: 'bar' },
{ typeId: 'simple1', value: 'baz' }
],
input: {}
};
const expected = {
'pipeline' : {
'processors': [
{ modified_value: 'modified_foo' },
{ value1: 'bar', value2: 'simple2-bar' },
{ modified_value: 'modified_baz' }
]
},
'docs' : [
{ '_source': {} }
]
};
const actual = buildRequest(processorTypes, pipeline);
expect(actual).to.eql(expected);
});
});
});

View file

@ -0,0 +1,164 @@
const expect = require('expect.js');
const _ = require('lodash');
import { processResponse } from '../ingest_simulate';
describe('processResponse', function () {
it('returns a result for each processor in the pipeline', function () {
const pipeline = {
processors: [ { processorId: 'processor1' }, { processorId: 'processor2' } ]
};
const response = {
docs: [ { processor_results: [] } ]
};
const results = processResponse(pipeline, null, response);
expect(results.length).to.be(2);
});
it('each processor that does not receive a result will contain default info', function () {
const pipeline = {
processors: [
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
]
};
const response = {
docs: [ { processor_results: [] } ]
};
const expected = [
{ processorId: 'processor1', output: 'foo', error: undefined },
{ processorId: 'processor2', output: 'bar', error: undefined },
{ processorId: 'processor3', output: 'baz', error: undefined }
];
const actual = processResponse(pipeline, null, response);
expect(actual).to.eql(expected);
});
it('each processor that receives a result will contain response info', function () {
const pipeline = {
processors: [
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
]
};
const response = {
docs: [ { processor_results: [
{ tag: 'processor2', doc: { _source: 'new_bar' }, error: undefined },
{ tag: 'processor3', doc: { _source: 'new_baz' }, error: undefined }
] } ]
};
const expected = [
{ processorId: 'processor1', output: 'foo', error: undefined },
{ processorId: 'processor2', output: 'new_bar', error: undefined },
{ processorId: 'processor3', output: 'new_baz', error: undefined }
];
const actual = processResponse(pipeline, null, response);
expect(actual).to.eql(expected);
});
describe('processors that return an error object', function () {
it('will be the root_cause reason if one exists', function () {
const pipeline = {
processors: [
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
]
};
const response = {
docs: [ { processor_results: [
{ tag: 'processor2', doc: { _source: 'new_bar' }, error: undefined },
{
tag: 'processor3',
doc: 'dummy',
error: { root_cause: [ { reason: 'something bad happened', type: 'general exception' } ] }
}
] } ]
};
const expected = [
{ processorId: 'processor1', output: 'foo', error: undefined },
{ processorId: 'processor2', output: 'new_bar', error: undefined },
{ processorId: 'processor3', output: undefined, error: { isNested: false, message: 'something bad happened'} }
];
const actual = processResponse(pipeline, null, response);
expect(actual).to.eql(expected);
});
it('will be the root_cause type if reason does not exists', function () {
const pipeline = {
processors: [
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
]
};
const response = {
docs: [ { processor_results: [
{ tag: 'processor2', doc: { _source: 'new_bar' }, error: undefined },
{
tag: 'processor3',
doc: 'dummy',
error: { root_cause: [ { type: 'something bad happened' } ] }
}
] } ]
};
const expected = [
{ processorId: 'processor1', output: 'foo', error: undefined },
{ processorId: 'processor2', output: 'new_bar', error: undefined },
{ processorId: 'processor3', output: undefined, error: { isNested: false, message: 'something bad happened'} }
];
const actual = processResponse(pipeline, null, response);
expect(actual).to.eql(expected);
});
it('any processor after errored processor will be set to a nested error state', function () {
const pipeline = {
processors: [
{ processorId: 'processor0', outputObject: 'oof' },
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
]
};
const response = {
docs: [
{
processor_results: [
{ tag: 'processor0', doc: { _source: 'new_oof' }, error: undefined },
{
tag: 'processor1',
doc: 'dummy',
error: { root_cause: [ { reason: 'something bad happened' } ] }
}
]
}
]
};
const expected = [
{ processorId: 'processor0', output: 'new_oof', error: undefined },
{ processorId: 'processor1', output: undefined, error: { isNested: false, message: 'something bad happened' } },
{ processorId: 'processor2', output: undefined, error: { isNested: true, message: 'Invalid Parent Processor' } },
{ processorId: 'processor3', output: undefined, error: { isNested: true, message: 'Invalid Parent Processor' } }
];
const actual = processResponse(pipeline, null, response);
expect(actual).to.eql(expected);
});
});
});

View file

@ -0,0 +1,79 @@
const _ = require('lodash');
function translateError(esError) {
const rootCause = _.get(esError, 'root_cause[0]');
return _.get(rootCause, 'reason') || _.get(rootCause, 'type');
}
export function buildRequest(processorTypes, pipeline) {
if (processorTypes === undefined ||
!_.isArray(processorTypes) ||
processorTypes.length === 0) {
throw new Error('requires a processorTypes object array argument');
}
if (pipeline === undefined || !_.isPlainObject(pipeline)) {
throw new Error('requires a pipeline object argument');
}
if (pipeline.processors === undefined ||
!_.isArray(pipeline.processors) ||
pipeline.processors.length === 0) {
throw new Error('pipeline contains no processors');
}
const processors = pipeline.processors;
const body = {
'pipeline': {
'processors': []
},
'docs': [
{
_source: pipeline.input
}
]
};
processors.forEach((processor) => {
const processorType = _.find(processorTypes, { 'typeId': processor.typeId });
const definition = processorType.getDefinition(processor);
body.pipeline.processors.push(definition);
});
return body;
};
export function processResponse(pipeline, err, resp) {
const results = pipeline.processors.map((processor) => {
return {
processorId: processor.processorId,
output: processor.outputObject,
error: undefined
};
});
const processorResults = _.get(resp, 'docs[0].processor_results');
processorResults.forEach((processorResult) => {
const processorId = _.get(processorResult, 'tag');
const output = _.get(processorResult, 'doc._source');
const error = _.get(processorResult, 'error');
const errorMessage = translateError(error);
const badResult = _.find(results, { 'processorId': processorId });
badResult.output = errorMessage ? undefined : output;
badResult.error = errorMessage ? { isNested: false, message: errorMessage } : undefined;
});
const errorIndex = _.findIndex(results, (result) => { return result.error !== undefined; });
if (errorIndex !== -1) {
for (let i = errorIndex + 1; i < results.length; i++) {
const badResult = results[i];
badResult.output = undefined;
badResult.error = { isNested: true, message: 'Invalid Parent Processor' };
}
}
return results;
};

View file

@ -1,4 +1,5 @@
export default function (server) {
require('./register_post')(server);
require('./register_delete')(server);
require('./register_simulate')(server);
}

View file

@ -0,0 +1,25 @@
const _ = require('lodash');
import { buildRequest, processResponse } from '../../../lib/ingest_simulate';
import processorTypes from '../../../../common/ingest_processor_types';
module.exports = function registerSimulate(server) {
server.route({
path: '/api/kibana/ingest/simulate',
method: 'POST',
handler: function (request, reply) {
const client = server.plugins.elasticsearch.client;
const pipeline = request.payload;
const body = buildRequest(processorTypes, pipeline);
client.transport.request({
path: '_ingest/pipeline/_simulate',
query: { verbose: true },
method: 'POST',
body: body
},
function (err, resp) {
reply(processResponse(pipeline, err, resp));
});
}
});
};