mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
Merge pull request #6675 from BigFunger/ingest-error-handling
[add data] Adds gsub processor and fixes processor error flow
This commit is contained in:
commit
889c9c1a73
25 changed files with 469 additions and 187 deletions
|
@ -12,7 +12,8 @@ app.directive('outputPreview', function () {
|
|||
template: outputPreviewTemplate,
|
||||
scope: {
|
||||
oldObject: '=',
|
||||
newObject: '='
|
||||
newObject: '=',
|
||||
error: '='
|
||||
},
|
||||
link: function ($scope, $el) {
|
||||
const div = $el.find('.visual')[0];
|
||||
|
@ -27,10 +28,10 @@ app.directive('outputPreview', function () {
|
|||
});
|
||||
|
||||
$scope.updateUi = function () {
|
||||
const left = $scope.oldObject;
|
||||
const right = $scope.newObject;
|
||||
let left = $scope.oldObject;
|
||||
let right = $scope.newObject;
|
||||
let delta = $scope.diffpatch.diff(left, right);
|
||||
if (!delta) delta = {};
|
||||
if (!delta || $scope.error) delta = {};
|
||||
|
||||
div.innerHTML = htmlFormat(delta, left);
|
||||
};
|
||||
|
|
|
@ -50,8 +50,6 @@ app.directive('pipelineSetup', function () {
|
|||
|
||||
//initiates the simulate call if the pipeline is dirty
|
||||
const simulatePipeline = debounce((event, message) => {
|
||||
if (!pipeline.dirty) return;
|
||||
|
||||
if (pipeline.processors.length === 0) {
|
||||
pipeline.updateOutput();
|
||||
return;
|
||||
|
|
|
@ -1,2 +1,3 @@
|
|||
import './processor_ui_container';
|
||||
import './processor_ui_gsub';
|
||||
import './processor_ui_set';
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
import uiModules from 'ui/modules';
|
||||
import _ from 'lodash';
|
||||
import keysDeep from '../lib/keys_deep';
|
||||
import template from '../views/processor_ui_gsub.html';
|
||||
|
||||
const app = uiModules.get('kibana');
|
||||
|
||||
//scope.processor, scope.pipeline are attached by the process_container.
|
||||
app.directive('processorUiGsub', function () {
|
||||
return {
|
||||
restrict: 'E',
|
||||
template: template,
|
||||
controller : function ($scope) {
|
||||
const processor = $scope.processor;
|
||||
const pipeline = $scope.pipeline;
|
||||
|
||||
function consumeNewInputObject() {
|
||||
$scope.fields = keysDeep(processor.inputObject);
|
||||
refreshFieldData();
|
||||
}
|
||||
|
||||
function refreshFieldData() {
|
||||
$scope.fieldData = _.get(processor.inputObject, processor.sourceField);
|
||||
}
|
||||
|
||||
function processorUiChanged() {
|
||||
pipeline.setDirty();
|
||||
}
|
||||
|
||||
$scope.$watch('processor.inputObject', consumeNewInputObject);
|
||||
|
||||
$scope.$watch('processor.sourceField', () => {
|
||||
refreshFieldData();
|
||||
processorUiChanged();
|
||||
});
|
||||
|
||||
$scope.$watch('processor.pattern', processorUiChanged);
|
||||
$scope.$watch('processor.replacement', processorUiChanged);
|
||||
}
|
||||
};
|
||||
});
|
|
@ -1,5 +1,5 @@
|
|||
import uiModules from 'ui/modules';
|
||||
import processorUiSetTemplate from '../views/processor_ui_set.html';
|
||||
import template from '../views/processor_ui_set.html';
|
||||
|
||||
const app = uiModules.get('kibana');
|
||||
|
||||
|
@ -7,13 +7,13 @@ const app = uiModules.get('kibana');
|
|||
app.directive('processorUiSet', function () {
|
||||
return {
|
||||
restrict: 'E',
|
||||
template: processorUiSetTemplate,
|
||||
template: template,
|
||||
controller : function ($scope) {
|
||||
const processor = $scope.processor;
|
||||
const pipeline = $scope.pipeline;
|
||||
|
||||
function processorUiChanged() {
|
||||
pipeline.dirty = true;
|
||||
pipeline.setDirty();
|
||||
}
|
||||
|
||||
$scope.$watch('processor.targetField', processorUiChanged);
|
||||
|
|
|
@ -10,7 +10,8 @@ app.directive('sourceData', function () {
|
|||
restrict: 'E',
|
||||
scope: {
|
||||
samples: '=',
|
||||
sample: '='
|
||||
sample: '=',
|
||||
disabled: '='
|
||||
},
|
||||
template: sourceDataTemplate,
|
||||
controller: function ($scope) {
|
||||
|
|
|
@ -34,7 +34,10 @@ describe('processor pipeline', function () {
|
|||
pipeline.processors[0].model = { bar: 'baz' };
|
||||
|
||||
const actual = pipeline.model;
|
||||
const expected = { input: pipeline.input, processors: [ pipeline.processors[0].model ]};
|
||||
const expected = {
|
||||
input: pipeline.input,
|
||||
processors: [ pipeline.processors[0].model ]
|
||||
};
|
||||
|
||||
expect(actual).to.eql(expected);
|
||||
});
|
||||
|
|
|
@ -1,20 +1,76 @@
|
|||
import _ from 'lodash';
|
||||
|
||||
function updateProcessorOutputs(pipeline, simulateResults) {
|
||||
simulateResults.forEach((result) => {
|
||||
const processor = pipeline.getProcessorById(result.processorId);
|
||||
|
||||
processor.outputObject = _.get(result, 'output');
|
||||
processor.error = _.get(result, 'error');
|
||||
});
|
||||
}
|
||||
|
||||
//Updates the error state of the pipeline and its processors
|
||||
//If a pipeline compile error is returned, lock all processors but the error
|
||||
//If a pipeline data error is returned, lock all processors after the error
|
||||
function updateErrorState(pipeline) {
|
||||
pipeline.hasCompileError = _.some(pipeline.processors, (processor) => {
|
||||
return _.get(processor, 'error.compile');
|
||||
});
|
||||
_.forEach(pipeline.processors, processor => {
|
||||
processor.locked = false;
|
||||
});
|
||||
|
||||
const errorIndex = _.findIndex(pipeline.processors, 'error');
|
||||
if (errorIndex === -1) return;
|
||||
|
||||
_.forEach(pipeline.processors, (processor, index) => {
|
||||
if (pipeline.hasCompileError && index !== errorIndex) {
|
||||
processor.locked = true;
|
||||
}
|
||||
if (!pipeline.hasCompileError && index > errorIndex) {
|
||||
processor.locked = true;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function updateProcessorInputs(pipeline) {
|
||||
pipeline.processors.forEach((processor) => {
|
||||
//we don't want to change the inputObject if the parent processor
|
||||
//is in error because that can cause us to lose state.
|
||||
if (!_.get(processor, 'parent.error')) {
|
||||
//the parent property of the first processor is set to the pipeline.input.
|
||||
//In all other cases it is set to processor[index-1]
|
||||
if (!processor.parent.processorId) {
|
||||
processor.inputObject = _.cloneDeep(processor.parent);
|
||||
} else {
|
||||
processor.inputObject = _.cloneDeep(processor.parent.outputObject);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
export default class Pipeline {
|
||||
|
||||
constructor() {
|
||||
this.processors = [];
|
||||
this.counter = 0;
|
||||
this.processorCounter = 0;
|
||||
this.input = {};
|
||||
this.output = undefined;
|
||||
this.dirty = false;
|
||||
this.hasCompileError = false;
|
||||
}
|
||||
|
||||
get model() {
|
||||
return {
|
||||
const pipeline = {
|
||||
input: this.input,
|
||||
processors: _.map(this.processors, processor => processor.model)
|
||||
};
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
setDirty() {
|
||||
this.dirty = true;
|
||||
}
|
||||
|
||||
load(pipeline) {
|
||||
|
@ -64,8 +120,8 @@ export default class Pipeline {
|
|||
add(ProcessorType) {
|
||||
const processors = this.processors;
|
||||
|
||||
this.counter += 1;
|
||||
const processorId = `processor_${this.counter}`;
|
||||
this.processorCounter += 1;
|
||||
const processorId = `processor_${this.processorCounter}`;
|
||||
const newProcessor = new ProcessorType(processorId);
|
||||
processors.push(newProcessor);
|
||||
|
||||
|
@ -88,16 +144,6 @@ export default class Pipeline {
|
|||
this.dirty = true;
|
||||
}
|
||||
|
||||
updateOutput() {
|
||||
const processors = this.processors;
|
||||
|
||||
this.output = undefined;
|
||||
if (processors.length > 0) {
|
||||
this.output = processors[processors.length - 1].outputObject;
|
||||
}
|
||||
this.dirty = false;
|
||||
}
|
||||
|
||||
getProcessorById(processorId) {
|
||||
const result = _.find(this.processors, { processorId });
|
||||
|
||||
|
@ -108,34 +154,22 @@ export default class Pipeline {
|
|||
return result;
|
||||
}
|
||||
|
||||
updateOutput() {
|
||||
const processors = this.processors;
|
||||
|
||||
this.output = undefined;
|
||||
if (processors.length > 0) {
|
||||
this.output = processors[processors.length - 1].outputObject;
|
||||
}
|
||||
this.dirty = false;
|
||||
}
|
||||
|
||||
// Updates the state of the pipeline and processors with the results
|
||||
// from an ingest simulate call.
|
||||
applySimulateResults(results) {
|
||||
//update the outputObject of each processor
|
||||
results.forEach((result) => {
|
||||
const processor = this.getProcessorById(result.processorId);
|
||||
|
||||
processor.outputObject = _.get(result, 'output');
|
||||
processor.error = _.get(result, 'error');
|
||||
});
|
||||
|
||||
//update the inputObject of each processor
|
||||
results.forEach((result) => {
|
||||
const processor = this.getProcessorById(result.processorId);
|
||||
|
||||
//we don't want to change the inputObject if the parent processor
|
||||
//is in error because that can cause us to lose state.
|
||||
if (!_.get(processor, 'error.isNested')) {
|
||||
//the parent property of the first processor is set to the pipeline.input.
|
||||
//In all other cases it is set to processor[index-1]
|
||||
if (!processor.parent.processorId) {
|
||||
processor.inputObject = _.cloneDeep(processor.parent);
|
||||
} else {
|
||||
processor.inputObject = _.cloneDeep(processor.parent.outputObject);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
applySimulateResults(simulateResults) {
|
||||
updateProcessorOutputs(this, simulateResults);
|
||||
updateErrorState(this);
|
||||
updateProcessorInputs(this);
|
||||
this.updateOutput();
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,30 @@ class Processor {
|
|||
}
|
||||
}
|
||||
|
||||
export class Gsub extends Processor {
|
||||
constructor(processorId) {
|
||||
super(processorId, 'gsub', 'Gsub');
|
||||
this.sourceField = '';
|
||||
this.pattern = '';
|
||||
this.replacement = '';
|
||||
}
|
||||
|
||||
get description() {
|
||||
const source = this.sourceField || '?';
|
||||
return `[${source}] - /${this.pattern}/ -> '${this.replacement}'`;
|
||||
}
|
||||
|
||||
get model() {
|
||||
return {
|
||||
processorId: this.processorId,
|
||||
typeId: this.typeId,
|
||||
sourceField: this.sourceField || '',
|
||||
pattern: this.pattern || '',
|
||||
replacement: this.replacement || ''
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
export class Set extends Processor {
|
||||
constructor(processorId) {
|
||||
super(processorId, 'set', 'Set');
|
||||
|
@ -38,8 +62,8 @@ export class Set extends Processor {
|
|||
return {
|
||||
processorId: this.processorId,
|
||||
typeId: this.typeId,
|
||||
targetField: this.targetField,
|
||||
value: this.value
|
||||
targetField: this.targetField || '',
|
||||
value: this.value || ''
|
||||
};
|
||||
}
|
||||
};
|
||||
|
|
|
@ -26,7 +26,7 @@ processor-ui-container {
|
|||
background-color: @settings-pipeline-setup-processor-container-overlay-bg;
|
||||
}
|
||||
|
||||
&.dirty {
|
||||
&.locked {
|
||||
.overlay {
|
||||
display: block;
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
<source-data sample="sample" samples="samples"></source-data>
|
||||
<source-data sample="sample" samples="samples" disabled="pipeline.hasCompileError"></source-data>
|
||||
|
||||
<pipeline-output pipeline="pipeline"></pipeline-output>
|
||||
|
||||
|
@ -28,6 +28,6 @@
|
|||
</div>
|
||||
<button
|
||||
ng-click="pipeline.add(processorType.Type)"
|
||||
ng-disabled="!processorType">
|
||||
ng-disabled="!processorType || pipeline.hasCompileError">
|
||||
Add Processor
|
||||
</button>
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
</processor-ui-container-header>
|
||||
<div
|
||||
class="processor-ui-container-body"
|
||||
ng-class="{dirty: processor.error.isNested}">
|
||||
ng-class="{locked: processor.locked}">
|
||||
<div
|
||||
class="processor-ui-container-body-content"
|
||||
ng-hide="processor.collapsed">
|
||||
|
@ -15,7 +15,11 @@
|
|||
{{processor.error.message}}
|
||||
</div>
|
||||
<div class="processor-ui-content"></div>
|
||||
<output-preview new-object="processor.outputObject" old-object="processor.inputObject"></output-preview>
|
||||
<output-preview
|
||||
new-object="processor.outputObject"
|
||||
old-object="processor.inputObject"
|
||||
error="processor.error">
|
||||
</output-preview>
|
||||
</div>
|
||||
<div class="overlay"></div>
|
||||
</div>
|
||||
|
|
|
@ -31,7 +31,8 @@
|
|||
tooltip-append-to-body="true"
|
||||
ng-click="pipeline.moveUp(processor)"
|
||||
type="button"
|
||||
class="btn btn-xs btn-default">
|
||||
class="btn btn-xs btn-default"
|
||||
ng-disabled="pipeline.hasCompileError">
|
||||
<i aria-hidden="true" class="fa fa-caret-up"></i>
|
||||
</button>
|
||||
|
||||
|
@ -41,7 +42,8 @@
|
|||
tooltip-append-to-body="true"
|
||||
ng-click="pipeline.moveDown(processor)"
|
||||
type="button"
|
||||
class="btn btn-xs btn-default">
|
||||
class="btn btn-xs btn-default"
|
||||
ng-disabled="pipeline.hasCompileError">
|
||||
<i aria-hidden="true" class="fa fa-caret-down"></i>
|
||||
</button>
|
||||
|
||||
|
@ -51,7 +53,8 @@
|
|||
tooltip-append-to-body="true"
|
||||
ng-click="pipeline.remove(processor)"
|
||||
type="button"
|
||||
class="btn btn-xs btn-danger">
|
||||
class="btn btn-xs btn-danger"
|
||||
ng-disabled="pipeline.hasCompileError && !processor.error">
|
||||
<i aria-hidden="true" class="fa fa-times"></i>
|
||||
</button>
|
||||
</div>
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
<div class="form-group">
|
||||
<label>Field:</label>
|
||||
<select
|
||||
class="form-control"
|
||||
ng-options="field as field for field in fields"
|
||||
ng-model="processor.sourceField">
|
||||
</select>
|
||||
</div>
|
||||
<div class="form-group">
|
||||
<label>Field Data:</label>
|
||||
<pre>{{ fieldData }}</pre>
|
||||
</div>
|
||||
<div class="form-group">
|
||||
<label>Pattern:</label>
|
||||
<input type="text" class="form-control" ng-model="processor.pattern">
|
||||
</div>
|
||||
<div class="form-group">
|
||||
<label>Replacement:</label>
|
||||
<input type="text" class="form-control" ng-trim="false" ng-model="processor.replacement">
|
||||
</div>
|
|
@ -4,7 +4,8 @@
|
|||
<select
|
||||
class="form-control"
|
||||
ng-options="sample.message for sample in samples"
|
||||
ng-model="selectedSample">
|
||||
ng-model="selectedSample"
|
||||
ng-disabled="disabled">
|
||||
</select>
|
||||
<button
|
||||
aria-label="Previous Line"
|
||||
|
@ -12,7 +13,8 @@
|
|||
tooltip-append-to-body="true"
|
||||
ng-click="previousLine()"
|
||||
type="button"
|
||||
class="btn btn-xs btn-default">
|
||||
class="btn btn-xs btn-default"
|
||||
ng-disabled="disabled">
|
||||
<i aria-hidden="true" class="fa fa-chevron-left"></i>
|
||||
</button>
|
||||
<button
|
||||
|
@ -21,7 +23,8 @@
|
|||
tooltip-append-to-body="true"
|
||||
ng-click="nextLine()"
|
||||
type="button"
|
||||
class="btn btn-xs btn-default">
|
||||
class="btn btn-xs btn-default"
|
||||
ng-disabled="disabled">
|
||||
<i aria-hidden="true" class="fa fa-chevron-right"></i>
|
||||
</button>
|
||||
</div>
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
import processESIngestSimulateError from '../process_es_ingest_simulate_error';
|
||||
import expect from 'expect.js';
|
||||
import _ from 'lodash';
|
||||
|
||||
describe('processESIngestSimulateError', function () {
|
||||
|
||||
it('result will be returned for processor that threw the error', function () {
|
||||
const error = _.set({}, 'body.error.root_cause[0].reason', 'foobar');
|
||||
_.set(error, 'body.error.root_cause[0].header.processor_tag', 'processor1');
|
||||
|
||||
const expected = [
|
||||
{ processorId: 'processor1', error: { compile: true, message: 'foobar' } }
|
||||
];
|
||||
const actual = processESIngestSimulateError(error);
|
||||
|
||||
expect(_.isEqual(actual, expected)).to.be.ok();
|
||||
});
|
||||
|
||||
});
|
|
@ -4,55 +4,21 @@ import _ from 'lodash';
|
|||
|
||||
describe('processESIngestSimulateResponse', function () {
|
||||
|
||||
it('returns a result for each processor in the pipeline', function () {
|
||||
const processors = [ { processorId: 'processor1' }, { processorId: 'processor2' } ];
|
||||
const response = {
|
||||
docs: [ { processor_results: [] } ]
|
||||
};
|
||||
|
||||
const results = processESIngestSimulateResponse(processors, response);
|
||||
expect(results.length).to.be(2);
|
||||
});
|
||||
|
||||
it('each processor that does not receive a result will contain default info', function () {
|
||||
const processors = [
|
||||
{ processorId: 'processor1', outputObject: 'foo' },
|
||||
{ processorId: 'processor2', outputObject: 'bar' },
|
||||
{ processorId: 'processor3', outputObject: 'baz' }
|
||||
];
|
||||
const response = {
|
||||
docs: [ { processor_results: [] } ]
|
||||
};
|
||||
|
||||
const expected = [
|
||||
{ processorId: 'processor1', output: 'foo', error: undefined },
|
||||
{ processorId: 'processor2', output: 'bar', error: undefined },
|
||||
{ processorId: 'processor3', output: 'baz', error: undefined }
|
||||
];
|
||||
const actual = processESIngestSimulateResponse(processors, response);
|
||||
|
||||
expect(actual).to.eql(expected);
|
||||
});
|
||||
|
||||
it('each processor that receives a result will contain response info', function () {
|
||||
const processors = [
|
||||
{ processorId: 'processor1', outputObject: 'foo' },
|
||||
{ processorId: 'processor2', outputObject: 'bar' },
|
||||
{ processorId: 'processor3', outputObject: 'baz' }
|
||||
];
|
||||
const response = {
|
||||
docs: [ { processor_results: [
|
||||
{ tag: 'processor1', doc: { _source: 'new_foo' }, error: undefined },
|
||||
{ tag: 'processor2', doc: { _source: 'new_bar' }, error: undefined },
|
||||
{ tag: 'processor3', doc: { _source: 'new_baz' }, error: undefined }
|
||||
] } ]
|
||||
};
|
||||
|
||||
const expected = [
|
||||
{ processorId: 'processor1', output: 'foo', error: undefined },
|
||||
{ processorId: 'processor1', output: 'new_foo', error: undefined },
|
||||
{ processorId: 'processor2', output: 'new_bar', error: undefined },
|
||||
{ processorId: 'processor3', output: 'new_baz', error: undefined }
|
||||
];
|
||||
const actual = processESIngestSimulateResponse(processors, response);
|
||||
const actual = processESIngestSimulateResponse(response);
|
||||
|
||||
expect(actual).to.eql(expected);
|
||||
});
|
||||
|
@ -60,16 +26,11 @@ describe('processESIngestSimulateResponse', function () {
|
|||
describe('processors that return an error object', function () {
|
||||
|
||||
it('will be the root_cause reason if one exists', function () {
|
||||
const processors = [
|
||||
{ processorId: 'processor1', outputObject: 'foo' },
|
||||
{ processorId: 'processor2', outputObject: 'bar' },
|
||||
{ processorId: 'processor3', outputObject: 'baz' }
|
||||
];
|
||||
const response = {
|
||||
docs: [ { processor_results: [
|
||||
{ tag: 'processor2', doc: { _source: 'new_bar' }, error: undefined },
|
||||
{ tag: 'processor1', doc: { _source: 'new_foo' }, error: undefined },
|
||||
{
|
||||
tag: 'processor3',
|
||||
tag: 'processor2',
|
||||
doc: 'dummy',
|
||||
error: { root_cause: [ { reason: 'something bad happened', type: 'general exception' } ] }
|
||||
}
|
||||
|
@ -77,21 +38,15 @@ describe('processESIngestSimulateResponse', function () {
|
|||
};
|
||||
|
||||
const expected = [
|
||||
{ processorId: 'processor1', output: 'foo', error: undefined },
|
||||
{ processorId: 'processor2', output: 'new_bar', error: undefined },
|
||||
{ processorId: 'processor3', output: undefined, error: { isNested: false, message: 'something bad happened'} }
|
||||
{ processorId: 'processor1', output: 'new_foo', error: undefined },
|
||||
{ processorId: 'processor2', output: undefined, error: { compile: false, message: 'something bad happened'} }
|
||||
];
|
||||
const actual = processESIngestSimulateResponse(processors, response);
|
||||
const actual = processESIngestSimulateResponse(response);
|
||||
|
||||
expect(actual).to.eql(expected);
|
||||
});
|
||||
|
||||
it('will be the root_cause type if reason does not exists', function () {
|
||||
const processors = [
|
||||
{ processorId: 'processor1', outputObject: 'foo' },
|
||||
{ processorId: 'processor2', outputObject: 'bar' },
|
||||
{ processorId: 'processor3', outputObject: 'baz' }
|
||||
];
|
||||
const response = {
|
||||
docs: [ { processor_results: [
|
||||
{ tag: 'processor2', doc: { _source: 'new_bar' }, error: undefined },
|
||||
|
@ -104,44 +59,10 @@ describe('processESIngestSimulateResponse', function () {
|
|||
};
|
||||
|
||||
const expected = [
|
||||
{ processorId: 'processor1', output: 'foo', error: undefined },
|
||||
{ processorId: 'processor2', output: 'new_bar', error: undefined },
|
||||
{ processorId: 'processor3', output: undefined, error: { isNested: false, message: 'something bad happened'} }
|
||||
{ processorId: 'processor3', output: undefined, error: { compile: false, message: 'something bad happened'} }
|
||||
];
|
||||
const actual = processESIngestSimulateResponse(processors, response);
|
||||
|
||||
expect(actual).to.eql(expected);
|
||||
});
|
||||
|
||||
it('any processor after errored processor will be set to a nested error state', function () {
|
||||
const processors = [
|
||||
{ processorId: 'processor0', outputObject: 'oof' },
|
||||
{ processorId: 'processor1', outputObject: 'foo' },
|
||||
{ processorId: 'processor2', outputObject: 'bar' },
|
||||
{ processorId: 'processor3', outputObject: 'baz' }
|
||||
];
|
||||
const response = {
|
||||
docs: [
|
||||
{
|
||||
processor_results: [
|
||||
{ tag: 'processor0', doc: { _source: 'new_oof' }, error: undefined },
|
||||
{
|
||||
tag: 'processor1',
|
||||
doc: 'dummy',
|
||||
error: { root_cause: [ { reason: 'something bad happened' } ] }
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
};
|
||||
|
||||
const expected = [
|
||||
{ processorId: 'processor0', output: 'new_oof', error: undefined },
|
||||
{ processorId: 'processor1', output: undefined, error: { isNested: false, message: 'something bad happened' } },
|
||||
{ processorId: 'processor2', output: undefined, error: { isNested: true, message: 'Invalid Parent Processor' } },
|
||||
{ processorId: 'processor3', output: undefined, error: { isNested: true, message: 'Invalid Parent Processor' } }
|
||||
];
|
||||
const actual = processESIngestSimulateResponse(processors, response);
|
||||
const actual = processESIngestSimulateResponse(response);
|
||||
|
||||
expect(actual).to.eql(expected);
|
||||
});
|
||||
|
|
|
@ -1,3 +1,14 @@
|
|||
export function gsub(processorApiDocument) {
|
||||
return {
|
||||
gsub: {
|
||||
tag: processorApiDocument.processor_id,
|
||||
field: processorApiDocument.source_field,
|
||||
pattern: processorApiDocument.pattern,
|
||||
replacement: processorApiDocument.replacement
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
export function set(processorApiDocument) {
|
||||
return {
|
||||
set: {
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
const _ = require('lodash');
|
||||
|
||||
function buildError(error) {
|
||||
const errorMessage = _.get(error, 'body.error.root_cause[0].reason');
|
||||
return {
|
||||
compile: true,
|
||||
message: errorMessage
|
||||
};
|
||||
}
|
||||
|
||||
export default function processESIngestSimulateError(error) {
|
||||
const processorId = _.get(error, 'body.error.root_cause[0].header.processor_tag');
|
||||
if (!processorId) throw error;
|
||||
|
||||
const results = [
|
||||
{
|
||||
processorId: processorId,
|
||||
error: buildError(error)
|
||||
}
|
||||
];
|
||||
|
||||
return results;
|
||||
}
|
|
@ -1,41 +1,24 @@
|
|||
import _ from 'lodash';
|
||||
const _ = require('lodash');
|
||||
|
||||
function translateError(esError) {
|
||||
const rootCause = _.get(esError, 'root_cause[0]');
|
||||
function buildError(error) {
|
||||
const errorMessage = _.get(error, 'root_cause[0].reason') || _.get(error, 'root_cause[0].type');
|
||||
if (!errorMessage) return;
|
||||
|
||||
return _.get(rootCause, 'reason') || _.get(rootCause, 'type');
|
||||
return {
|
||||
compile: false,
|
||||
message: errorMessage
|
||||
};
|
||||
}
|
||||
|
||||
export default function processESIngestSimulateResponse(processors, resp) {
|
||||
const results = processors.map((processor) => {
|
||||
export default function processESIngestSimulateResponse(resp) {
|
||||
const processorResults = _.get(resp, 'docs[0].processor_results');
|
||||
const results = processorResults.map((processorResult) => {
|
||||
return {
|
||||
processorId: processor.processorId,
|
||||
output: processor.outputObject,
|
||||
error: undefined
|
||||
processorId: _.get(processorResult, 'tag'),
|
||||
output: _.get(processorResult, 'doc._source'),
|
||||
error: buildError(_.get(processorResult, 'error'))
|
||||
};
|
||||
});
|
||||
|
||||
const processorResults = _.get(resp, 'docs[0].processor_results');
|
||||
processorResults.forEach((processorResult) => {
|
||||
const processorId = _.get(processorResult, 'tag');
|
||||
const output = _.get(processorResult, 'doc._source');
|
||||
const error = _.get(processorResult, 'error');
|
||||
const errorMessage = translateError(error);
|
||||
const badResult = _.find(results, { 'processorId': processorId });
|
||||
|
||||
badResult.output = errorMessage ? undefined : output;
|
||||
badResult.error = errorMessage ? { isNested: false, message: errorMessage } : undefined;
|
||||
});
|
||||
|
||||
const errorIndex = _.findIndex(results, (result) => { return result.error !== undefined; });
|
||||
if (errorIndex !== -1) {
|
||||
for (let i = errorIndex + 1; i < results.length; i++) {
|
||||
const badResult = results[i];
|
||||
|
||||
badResult.output = undefined;
|
||||
badResult.error = { isNested: true, message: 'Invalid Parent Processor' };
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
};
|
||||
|
|
|
@ -4,6 +4,13 @@ const base = Joi.object({
|
|||
processor_id: Joi.string().required()
|
||||
});
|
||||
|
||||
export const gsub = base.keys({
|
||||
type_id: Joi.string().only('gsub').required(),
|
||||
source_field: Joi.string().allow(''),
|
||||
pattern: Joi.string().allow(''),
|
||||
replacement: Joi.string().allow('')
|
||||
});
|
||||
|
||||
export const set = base.keys({
|
||||
type_id: Joi.string().only('set').required(),
|
||||
target_field: Joi.string().allow(''),
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
import _ from 'lodash';
|
||||
import processESIngestSimulateResponse from '../../../lib/process_es_ingest_simulate_response';
|
||||
import handleESError from '../../../lib/handle_es_error';
|
||||
import handleResponse from '../../../lib/process_es_ingest_simulate_response';
|
||||
import handleError from '../../../lib/process_es_ingest_simulate_error';
|
||||
import simulateRequestSchema from '../../../lib/schemas/simulate_request_schema';
|
||||
import ingestSimulateApiKibanaToEsConverter from '../../../lib/converters/ingest_simulate_api_kibana_to_es_converter';
|
||||
import { keysToCamelCaseShallow, keysToSnakeCaseShallow } from '../../../../common/lib/case_conversion';
|
||||
|
@ -24,9 +26,12 @@ export function registerSimulate(server) {
|
|||
method: 'POST',
|
||||
body: body
|
||||
})
|
||||
.then(_.partial(processESIngestSimulateResponse, _.map(simulateApiDocument.processors, keysToCamelCaseShallow)))
|
||||
.then(handleResponse, handleError)
|
||||
.then((processors) => _.map(processors, keysToSnakeCaseShallow))
|
||||
.then(reply);
|
||||
.then(reply)
|
||||
.catch((error) => {
|
||||
reply(handleESError(error));
|
||||
});
|
||||
}
|
||||
});
|
||||
};
|
||||
|
|
|
@ -34,6 +34,14 @@ define(function (require) {
|
|||
// All processors must have a processorId property and a typeId property
|
||||
request.post('/kibana/ingest/simulate')
|
||||
.send({input: {}, processors: [{}]})
|
||||
.expect(400),
|
||||
|
||||
request.post('/kibana/ingest/simulate')
|
||||
.send({input: {}, processors: ['foo']})
|
||||
.expect(400),
|
||||
|
||||
request.post('/kibana/ingest/simulate')
|
||||
.send({input: {}, processors: 'foo'})
|
||||
.expect(400)
|
||||
]);
|
||||
});
|
||||
|
@ -44,6 +52,106 @@ define(function (require) {
|
|||
.expect(200);
|
||||
});
|
||||
|
||||
bdd.describe('compilation errors', function simulatePipeline() {
|
||||
const pipeline = {
|
||||
input: { foo: '[message]' },
|
||||
processors: [
|
||||
{
|
||||
processor_id: 'processor1',
|
||||
type_id: 'set',
|
||||
target_field: 'foo',
|
||||
value: 'bar'
|
||||
},
|
||||
{
|
||||
processor_id: 'processor2',
|
||||
type_id: 'gsub',
|
||||
source_field: 'foo',
|
||||
pattern: '[',
|
||||
replacement: '<'
|
||||
},
|
||||
{
|
||||
processor_id: 'processor3',
|
||||
type_id: 'set',
|
||||
target_field: 'bar',
|
||||
value: 'baz'
|
||||
}
|
||||
]
|
||||
};
|
||||
|
||||
bdd.it('should return a 200 for a compile error caused by a processor', function () {
|
||||
request.post('/kibana/ingest/simulate')
|
||||
.send(pipeline)
|
||||
.expect(200)
|
||||
.then((response) => {
|
||||
expect(response.body[0].processor_id).to.be('processor2');
|
||||
expect(response.body[0].error.compile).to.be(true);
|
||||
});
|
||||
});
|
||||
|
||||
bdd.it('should only return a result for the processor that threw the error', function () {
|
||||
request.post('/kibana/ingest/simulate')
|
||||
.send(pipeline)
|
||||
.expect(200)
|
||||
.then((response) => {
|
||||
expect(response.body[0].processor_id).to.be('processor2');
|
||||
expect(response.body[0].error.compile).to.be(true);
|
||||
expect(response.body.length).to.be(1);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
bdd.describe('data errors', function simulatePipeline() {
|
||||
const pipeline = {
|
||||
input: { foo: '[message]' },
|
||||
processors: [
|
||||
{
|
||||
processor_id: 'processor1',
|
||||
type_id: 'set',
|
||||
target_field: 'foo',
|
||||
value: 'bar'
|
||||
},
|
||||
{
|
||||
processor_id: 'processor2',
|
||||
type_id: 'gsub',
|
||||
source_field: '', //invalid source field
|
||||
pattern: '\\[',
|
||||
replacement: '<'
|
||||
},
|
||||
{
|
||||
processor_id: 'processor3',
|
||||
type_id: 'set',
|
||||
target_field: 'bar',
|
||||
value: 'baz'
|
||||
}
|
||||
]
|
||||
};
|
||||
|
||||
bdd.it('should return 200 with non-compile error object for a processor with an invalid source_field', () => {
|
||||
return Promise.all([
|
||||
request.post('/kibana/ingest/simulate')
|
||||
.send(pipeline)
|
||||
.expect(200)
|
||||
.then((response) => {
|
||||
expect(response.body[0].error).to.be(undefined);
|
||||
expect(response.body[1].error.compile).to.be(false);
|
||||
expect(response.body[1].processor_id).to.be('processor2');
|
||||
})
|
||||
]);
|
||||
});
|
||||
|
||||
bdd.it('should return results up to and including the erroring processor', () => {
|
||||
return Promise.all([
|
||||
request.post('/kibana/ingest/simulate')
|
||||
.send(pipeline)
|
||||
.expect(200)
|
||||
.then((response) => {
|
||||
expect(response.body.length).to.be(2);
|
||||
})
|
||||
]);
|
||||
});
|
||||
|
||||
});
|
||||
|
||||
});
|
||||
};
|
||||
});
|
||||
|
|
70
test/unit/api/ingest/processors/_gsub.js
Normal file
70
test/unit/api/ingest/processors/_gsub.js
Normal file
|
@ -0,0 +1,70 @@
|
|||
define(function (require) {
|
||||
var Promise = require('bluebird');
|
||||
var _ = require('intern/dojo/node!lodash');
|
||||
var expect = require('intern/dojo/node!expect.js');
|
||||
|
||||
const testPipeline = {
|
||||
processors: [{
|
||||
processor_id: 'processor1',
|
||||
type_id: 'gsub',
|
||||
source_field: 'foo',
|
||||
pattern: 'bar',
|
||||
replacement: 'baz'
|
||||
}],
|
||||
input: { foo: 'bar' }
|
||||
};
|
||||
|
||||
return function (bdd, scenarioManager, request) {
|
||||
bdd.describe('simulate - gsub processor', () => {
|
||||
|
||||
bdd.it('should return 400 for an invalid payload', () => {
|
||||
return Promise.all([
|
||||
// GSub processor requires targetField property
|
||||
request.post('/kibana/ingest/simulate')
|
||||
.send({
|
||||
input: { foo: 'bar' },
|
||||
processors: [{
|
||||
processor_id: 'processor1',
|
||||
type_id: 'gsub',
|
||||
source_field: 42,
|
||||
pattern: 'bar',
|
||||
replacement: 'baz'
|
||||
}]
|
||||
})
|
||||
.expect(400)
|
||||
]);
|
||||
});
|
||||
|
||||
bdd.it('should return 200 for a valid simulate request', () => {
|
||||
return request.post('/kibana/ingest/simulate')
|
||||
.send(testPipeline)
|
||||
.expect(200);
|
||||
});
|
||||
|
||||
bdd.it('should return a simulated output with the correct result for the given processor', () => {
|
||||
return request.post('/kibana/ingest/simulate')
|
||||
.send(testPipeline)
|
||||
.expect(200)
|
||||
.then((response) => {
|
||||
expect(response.body[0].output.foo).to.be.equal('baz');
|
||||
});
|
||||
});
|
||||
|
||||
bdd.it('should enforce snake case', () => {
|
||||
return request.post('/kibana/ingest/simulate')
|
||||
.send({
|
||||
processors: [{
|
||||
processorId: 'processor1',
|
||||
typeId: 'gsub',
|
||||
sourceField: 'foo',
|
||||
pattern: 'bar',
|
||||
replacement: 'baz'
|
||||
}],
|
||||
input: { foo: 'bar' }
|
||||
})
|
||||
.expect(400);
|
||||
});
|
||||
|
||||
});
|
||||
};
|
||||
});
|
|
@ -1,8 +1,10 @@
|
|||
define(function (require) {
|
||||
var set = require('./_set');
|
||||
var gsub = require('./_gsub');
|
||||
|
||||
return function processors(bdd, scenarioManager, request) {
|
||||
set(bdd, scenarioManager, request);
|
||||
gsub(bdd, scenarioManager, request);
|
||||
};
|
||||
|
||||
});
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue