Merge pull request #6 from Bargs/ingest/simulateUpdates

PR Review Updates
This commit is contained in:
Jim Unger 2016-03-09 09:03:26 -06:00
commit 2c7a7baaa8
14 changed files with 315 additions and 278 deletions

View file

@ -1,22 +0,0 @@
const _ = require('lodash');
export default [
{ // set
typeId: 'set',
title: 'Set',
targetField: '',
getDefinition: function (processor) {
return {
'set' : {
'tag': processor.processorId,
'field' : processor.targetField ? processor.targetField : '',
'value': processor.value ? processor.value : ''
}
};
},
getDescription: function (processor) {
const target = (processor.targetField) ? processor.targetField : '?';
return `[${target}]`;
}
}
];

View file

@ -0,0 +1,87 @@
import expect from 'expect.js';
import _ from 'lodash';
import ingestSimulateApiKibanaToEsConverter from '../../converters/ingest_simulate_api_kibana_to_es_converter';
describe('ingestSimulateApiKibanaToEsConverter', function () {
it('populates the docs._source section and converts known processors', function () {
function buildSamplePipeline(input) {
return {
processors: [ { processor_id: 'processor1', type_id: 'set', target_field: 'bar', value: 'foo' } ],
input: input
};
}
function buildExpected(input) {
return {
pipeline : {
processors: [{
set: {
field: 'bar',
tag: 'processor1',
value: 'foo'
}
}]
},
'docs' : [
{ '_source': input }
]
};
}
let expected;
let actual;
expected = buildExpected(undefined);
actual = ingestSimulateApiKibanaToEsConverter(buildSamplePipeline(undefined));
expect(actual).to.eql(expected);
expected = buildExpected('foo');
actual = ingestSimulateApiKibanaToEsConverter(buildSamplePipeline('foo'));
expect(actual).to.eql(expected);
expected = buildExpected({ foo: 'bar' });
actual = ingestSimulateApiKibanaToEsConverter(buildSamplePipeline({ foo: 'bar' }));
expect(actual).to.eql(expected);
});
it('handles multiple processors', function () {
const pipeline = {
processors: [
{ processor_id: 'processor1', type_id: 'set', target_field: 'bar', value: 'foo' },
{ processor_id: 'processor2', type_id: 'set', target_field: 'bar', value: 'foo' },
],
input: {}
};
const expected = {
'pipeline': {
'processors': [
{
set: {
field: 'bar',
tag: 'processor1',
value: 'foo'
}
},
{
set: {
field: 'bar',
tag: 'processor2',
value: 'foo'
}
}
]
},
'docs': [
{'_source': {}}
]
};
const actual = ingestSimulateApiKibanaToEsConverter(pipeline);
expect(actual).to.eql(expected);
});
});

View file

@ -1,158 +0,0 @@
const expect = require('expect.js');
const _ = require('lodash');
import { buildRequest } from '../ingest_simulate';
describe('buildRequest', function () {
const processorTypes = [
{
typeId: 'simple1',
getDefinition: function (processor) {
return {
'modified_value': `modified_${processor.value}`
};
}
},
{
typeId: 'simple2',
getDefinition: function (processor) {
return {
'value1': processor.value,
'value2': `${processor.typeId}-${processor.value}`
};
}
}
];
it('should throw an error if no processorTypes argument is passed or the argument is not a plain object', function () {
expect(buildRequest).to.throwException(/requires a processorTypes object array argument/);
expect(buildRequest).withArgs('').to.throwException(/requires a processorTypes object array argument/);
expect(buildRequest).withArgs({}).to.throwException(/requires a processorTypes object array argument/);
expect(buildRequest).withArgs([]).to.throwException(/requires a processorTypes object array argument/);
});
it('should throw an error if no pipeline argument is passed or the argument is not a plain object', function () {
expect(buildRequest).withArgs([{}], []).to.throwException(/requires a pipeline object argument/);
});
it('should throw an error if pipeline contains no processors', function () {
expect(buildRequest).withArgs([{}], {}).to.throwException(/pipeline contains no processors/);
expect(buildRequest).withArgs([{}], { processors: 'foo' }).to.throwException(/pipeline contains no processors/);
expect(buildRequest).withArgs([{}], { processors: {} }).to.throwException(/pipeline contains no processors/);
expect(buildRequest).withArgs([{}], { processors: [] }).to.throwException(/pipeline contains no processors/);
});
it('populates the docs._source section', function () {
function buildSamplePipeline(input) {
return {
processors: [ { typeId: 'simple1', value: 'foo' } ],
input: input
};
}
function buildExpected(input) {
return {
'pipeline' : {
'processors': [ { modified_value: 'modified_foo' } ]
},
'docs' : [
{ '_source': input }
]
};
}
let expected;
let actual;
expected = buildExpected(undefined);
actual = buildRequest(processorTypes, buildSamplePipeline(undefined));
expect(actual).to.eql(expected);
expected = buildExpected('foo');
actual = buildRequest(processorTypes, buildSamplePipeline('foo'));
expect(actual).to.eql(expected);
expected = buildExpected({ foo: 'bar' });
actual = buildRequest(processorTypes, buildSamplePipeline({ foo: 'bar' }));
expect(actual).to.eql(expected);
});
describe('populates the pipeline.processors section with type.getDefinition()', function () {
it(' - single processor type', function () {
const pipeline = {
processors: [ { typeId: 'simple1', value: 'foo' } ],
input: {}
};
const expected = {
'pipeline' : {
'processors': [ { modified_value: 'modified_foo' } ]
},
'docs' : [
{ '_source': {} }
]
};
const actual = buildRequest(processorTypes, pipeline);
expect(actual).to.eql(expected);
});
it(' - multiple of same type of processor type', function () {
const pipeline = {
processors: [
{ typeId: 'simple1', value: 'foo' },
{ typeId: 'simple1', value: 'bar' },
{ typeId: 'simple1', value: 'baz' }
],
input: {}
};
const expected = {
'pipeline' : {
'processors': [
{ modified_value: 'modified_foo' },
{ modified_value: 'modified_bar' },
{ modified_value: 'modified_baz' }
]
},
'docs' : [
{ '_source': {} }
]
};
const actual = buildRequest(processorTypes, pipeline);
expect(actual).to.eql(expected);
});
it(' - multiple processor types', function () {
const pipeline = {
processors: [
{ typeId: 'simple1', value: 'foo' },
{ typeId: 'simple2', value: 'bar' },
{ typeId: 'simple1', value: 'baz' }
],
input: {}
};
const expected = {
'pipeline' : {
'processors': [
{ modified_value: 'modified_foo' },
{ value1: 'bar', value2: 'simple2-bar' },
{ modified_value: 'modified_baz' }
]
},
'docs' : [
{ '_source': {} }
]
};
const actual = buildRequest(processorTypes, pipeline);
expect(actual).to.eql(expected);
});
});
});

View file

@ -1,29 +1,25 @@
const expect = require('expect.js');
const _ = require('lodash');
import { processResponse } from '../ingest_simulate';
import processESIngestSimulateResponse from '../process_es_ingest_simulate_response';
describe('processResponse', function () {
describe('processESIngestSimulateResponse', function () {
it('returns a result for each processor in the pipeline', function () {
const pipeline = {
processors: [ { processorId: 'processor1' }, { processorId: 'processor2' } ]
};
const processors = [ { processorId: 'processor1' }, { processorId: 'processor2' } ];
const response = {
docs: [ { processor_results: [] } ]
};
const results = processResponse(pipeline, null, response);
const results = processESIngestSimulateResponse(processors, response);
expect(results.length).to.be(2);
});
it('each processor that does not receive a result will contain default info', function () {
const pipeline = {
processors: [
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
]
};
const processors = [
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
];
const response = {
docs: [ { processor_results: [] } ]
};
@ -33,19 +29,17 @@ describe('processResponse', function () {
{ processorId: 'processor2', output: 'bar', error: undefined },
{ processorId: 'processor3', output: 'baz', error: undefined }
];
const actual = processResponse(pipeline, null, response);
const actual = processESIngestSimulateResponse(processors, response);
expect(actual).to.eql(expected);
});
it('each processor that receives a result will contain response info', function () {
const pipeline = {
processors: [
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
]
};
const processors = [
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
];
const response = {
docs: [ { processor_results: [
{ tag: 'processor2', doc: { _source: 'new_bar' }, error: undefined },
@ -58,7 +52,7 @@ describe('processResponse', function () {
{ processorId: 'processor2', output: 'new_bar', error: undefined },
{ processorId: 'processor3', output: 'new_baz', error: undefined }
];
const actual = processResponse(pipeline, null, response);
const actual = processESIngestSimulateResponse(processors, response);
expect(actual).to.eql(expected);
});
@ -66,13 +60,11 @@ describe('processResponse', function () {
describe('processors that return an error object', function () {
it('will be the root_cause reason if one exists', function () {
const pipeline = {
processors: [
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
]
};
const processors = [
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
];
const response = {
docs: [ { processor_results: [
{ tag: 'processor2', doc: { _source: 'new_bar' }, error: undefined },
@ -89,19 +81,17 @@ describe('processResponse', function () {
{ processorId: 'processor2', output: 'new_bar', error: undefined },
{ processorId: 'processor3', output: undefined, error: { isNested: false, message: 'something bad happened'} }
];
const actual = processResponse(pipeline, null, response);
const actual = processESIngestSimulateResponse(processors, response);
expect(actual).to.eql(expected);
});
it('will be the root_cause type if reason does not exists', function () {
const pipeline = {
processors: [
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
]
};
const processors = [
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
];
const response = {
docs: [ { processor_results: [
{ tag: 'processor2', doc: { _source: 'new_bar' }, error: undefined },
@ -118,20 +108,18 @@ describe('processResponse', function () {
{ processorId: 'processor2', output: 'new_bar', error: undefined },
{ processorId: 'processor3', output: undefined, error: { isNested: false, message: 'something bad happened'} }
];
const actual = processResponse(pipeline, null, response);
const actual = processESIngestSimulateResponse(processors, response);
expect(actual).to.eql(expected);
});
it('any processor after errored processor will be set to a nested error state', function () {
const pipeline = {
processors: [
{ processorId: 'processor0', outputObject: 'oof' },
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
]
};
const processors = [
{ processorId: 'processor0', outputObject: 'oof' },
{ processorId: 'processor1', outputObject: 'foo' },
{ processorId: 'processor2', outputObject: 'bar' },
{ processorId: 'processor3', outputObject: 'baz' }
];
const response = {
docs: [
{
@ -153,7 +141,7 @@ describe('processResponse', function () {
{ processorId: 'processor2', output: undefined, error: { isNested: true, message: 'Invalid Parent Processor' } },
{ processorId: 'processor3', output: undefined, error: { isNested: true, message: 'Invalid Parent Processor' } }
];
const actual = processResponse(pipeline, null, response);
const actual = processESIngestSimulateResponse(processors, response);
expect(actual).to.eql(expected);
});

View file

@ -0,0 +1,9 @@
export function set(processorApiDocument) {
return {
set: {
tag: processorApiDocument.processor_id,
field: processorApiDocument.target_field,
value: processorApiDocument.value
}
};
}

View file

@ -0,0 +1,17 @@
import _ from 'lodash';
import * as ingestProcessorApiKibanaToEsConverters from './ingest_processor_api_kibana_to_es_converters';
export default function ingestSimulateApiKibanaToEsConverter(simulateApiDocument) {
return {
pipeline: {
processors: _.map(simulateApiDocument.processors, (processor) => {
return ingestProcessorApiKibanaToEsConverters[processor.type_id](processor);
})
},
docs: [
{
_source: simulateApiDocument.input
}
]
};
}

View file

@ -6,46 +6,8 @@ function translateError(esError) {
return _.get(rootCause, 'reason') || _.get(rootCause, 'type');
}
export function buildRequest(processorTypes, pipeline) {
if (processorTypes === undefined ||
!_.isArray(processorTypes) ||
processorTypes.length === 0) {
throw new Error('requires a processorTypes object array argument');
}
if (pipeline === undefined || !_.isPlainObject(pipeline)) {
throw new Error('requires a pipeline object argument');
}
if (pipeline.processors === undefined ||
!_.isArray(pipeline.processors) ||
pipeline.processors.length === 0) {
throw new Error('pipeline contains no processors');
}
const processors = pipeline.processors;
const body = {
'pipeline': {
'processors': []
},
'docs': [
{
_source: pipeline.input
}
]
};
processors.forEach((processor) => {
const processorType = _.find(processorTypes, { 'typeId': processor.typeId });
const definition = processorType.getDefinition(processor);
body.pipeline.processors.push(definition);
});
return body;
};
export function processResponse(pipeline, err, resp) {
const results = pipeline.processors.map((processor) => {
export default function processESIngestSimulateResponse(processors, resp) {
const results = processors.map((processor) => {
return {
processorId: processor.processorId,
output: processor.outputObject,

View file

@ -0,0 +1,11 @@
import Joi from 'joi';
const base = Joi.object({
processor_id: Joi.string().required()
}).unknown();
export const set = base.keys({
type_id: Joi.string().only('set').required(),
target_field: Joi.string().required(),
value: Joi.any().required()
});

View file

@ -0,0 +1,8 @@
import Joi from 'joi';
import * as ingestProcessorSchemas from './resources/ingest_processor_schemas';
import _ from 'lodash';
export default Joi.object({
processors: Joi.array().items(_.values(ingestProcessorSchemas)).required().min(1),
input: Joi.object().required()
});

View file

@ -1,25 +1,32 @@
const _ = require('lodash');
import { buildRequest, processResponse } from '../../../lib/ingest_simulate';
import processorTypes from '../../../../common/ingest_processor_types';
import _ from 'lodash';
import processESIngestSimulateResponse from '../../../lib/process_es_ingest_simulate_response';
import simulateRequestSchema from '../../../lib/schemas/simulate_request_schema';
import ingestSimulateApiKibanaToEsConverter from '../../../lib/converters/ingest_simulate_api_kibana_to_es_converter';
import { keysToCamelCaseShallow, keysToSnakeCaseShallow } from '../../../lib/case_conversion';
module.exports = function registerSimulate(server) {
server.route({
path: '/api/kibana/ingest/simulate',
method: 'POST',
config: {
validate: {
payload: simulateRequestSchema
}
},
handler: function (request, reply) {
const client = server.plugins.elasticsearch.client;
const pipeline = request.payload;
const body = buildRequest(processorTypes, pipeline);
const boundCallWithRequest = _.partial(server.plugins.elasticsearch.callWithRequest, request);
const simulateApiDocument = request.payload;
const body = ingestSimulateApiKibanaToEsConverter(simulateApiDocument);
client.transport.request({
return boundCallWithRequest('transport.request', {
path: '_ingest/pipeline/_simulate',
query: { verbose: true },
method: 'POST',
body: body
},
function (err, resp) {
reply(processResponse(pipeline, err, resp));
});
})
.then(_.partial(processESIngestSimulateResponse, _.map(simulateApiDocument.processors, keysToCamelCaseShallow)))
.then((processors) => _.map(processors, keysToSnakeCaseShallow))
.then(reply);
}
});
};

View file

@ -0,0 +1,49 @@
define(function (require) {
var Promise = require('bluebird');
var createTestData = require('intern/dojo/node!../../../unit/api/ingest/data');
var _ = require('intern/dojo/node!lodash');
var expect = require('intern/dojo/node!expect.js');
const testPipeline = {
processors: [{
processor_id: 'processor1',
type_id: 'set',
target_field: 'foo',
value: 'bar'
}],
input: {}
};
return function (bdd, scenarioManager, request) {
bdd.describe('simulate', function simulatePipeline() {
bdd.it('should return 400 for an invalid payload', function invalidPayload() {
return Promise.all([
request.post('/kibana/ingest/simulate').expect(400),
request.post('/kibana/ingest/simulate')
.send({})
.expect(400),
// requires at least one processor
request.post('/kibana/ingest/simulate')
.send({input: {}, processors: []})
.expect(400),
// All processors must have a processorId property and a typeId property
request.post('/kibana/ingest/simulate')
.send({input: {}, processors: [{}]})
.expect(400)
]);
});
bdd.it('should return 200 for a successful run', function () {
return request.post('/kibana/ingest/simulate')
.send(testPipeline)
.expect(200);
});
});
};
});

View file

@ -8,6 +8,8 @@ define(function (require) {
var expect = require('intern/dojo/node!expect.js');
var post = require('./_post');
var del = require('./_del');
var simulate = require('./_simulate');
var processors = require('./processors/index');
bdd.describe('ingest API', function () {
var scenarioManager = new ScenarioManager(url.format(serverConfig.servers.elasticsearch));
@ -23,5 +25,7 @@ define(function (require) {
post(bdd, scenarioManager, request);
del(bdd, scenarioManager, request);
simulate(bdd, scenarioManager, request);
processors(bdd, scenarioManager, request);
});
});

View file

@ -0,0 +1,66 @@
define(function (require) {
var Promise = require('bluebird');
var _ = require('intern/dojo/node!lodash');
var expect = require('intern/dojo/node!expect.js');
const testPipeline = {
processors: [{
processor_id: 'processor1',
type_id: 'set',
target_field: 'foo',
value: 'bar'
}],
input: {}
};
return function (bdd, scenarioManager, request) {
bdd.describe('simulate - set processor', function simulatePipeline() {
bdd.it('should return 400 for an invalid payload', function invalidPayload() {
return Promise.all([
// Set processor requires targetField property
request.post('/kibana/ingest/simulate')
.send({
input: {},
processors: [{
processor_id: 'processor1',
type_id: 'set',
value: 'bar'
}]
})
.expect(400)
]);
});
bdd.it('should return 200 for a valid simulate request', function validSetSimulate() {
return request.post('/kibana/ingest/simulate')
.send(testPipeline)
.expect(200);
});
bdd.it('should return a simulated output with the correct result for the given processor', function () {
return request.post('/kibana/ingest/simulate')
.send(testPipeline)
.expect(200)
.then(function (response) {
expect(response.body[0].output.foo).to.be.equal('bar');
});
});
bdd.it('should enforce snake case', function setSimulateSnakeCase() {
return request.post('/kibana/ingest/simulate')
.send({
processors: [{
processorId: 'processor1',
typeId: 'set',
targetField: 'foo',
value: 'bar'
}],
input: {}
})
.expect(400);
});
});
};
});

View file

@ -0,0 +1,9 @@
define(function (require) {
var set = require('./_set');
return function processors(bdd, scenarioManager, request) {
set(bdd, scenarioManager, request);
};
});