[API] Add CSV bulk indexing support to Kibana API

This commit is contained in:
Matthew Bargar 2016-03-18 14:28:31 -04:00
parent 3f18f8e7a7
commit 40654d15d1
4 changed files with 109 additions and 0 deletions

View file

@ -95,6 +95,7 @@
"clipboard": "1.5.5",
"commander": "2.8.1",
"css-loader": "0.17.0",
"csv": "0.4.6",
"d3": "3.5.6",
"elasticsearch": "10.1.2",
"elasticsearch-browser": "10.1.2",
@ -109,6 +110,7 @@
"good-squeeze": "2.1.0",
"gridster": "0.5.6",
"hapi": "8.8.1",
"highland": "2.7.2",
"httpolyglot": "0.1.1",
"imports-loader": "0.6.4",
"jade": "1.11.0",

View file

@ -0,0 +1,7 @@
import Joi from 'joi';
export default Joi.object({
csv: Joi.object().required(),
pipeline: Joi.boolean(),
delimiter: Joi.string()
});

View file

@ -2,10 +2,12 @@ import { registerPost } from './register_post';
import { registerDelete } from './register_delete';
import { registerProcessors } from './register_processors';
import { registerSimulate } from './register_simulate';
import { registerBulk } from './register_bulk';
export default function (server) {
registerPost(server);
registerDelete(server);
registerProcessors(server);
registerSimulate(server);
registerBulk(server);
}

View file

@ -0,0 +1,98 @@
import { Promise } from 'bluebird';
import { parse, transform } from 'csv';
import _ from 'lodash';
import hi from 'highland';
import { patternToIngest } from '../../../../common/lib/convert_pattern_and_ingest_name';
import { PassThrough } from 'stream';
import bulkRequestSchema from '../../../lib/schemas/bulk_request_schema';
export function registerBulk(server) {
server.route({
path: '/api/kibana/{id}/_bulk',
method: 'POST',
config: {
payload: {
output: 'stream',
maxBytes: 1024 * 1024 * 1024
},
validate: {
payload: bulkRequestSchema
}
},
handler: function (req, reply) {
const boundCallWithRequest = _.partial(server.plugins.elasticsearch.callWithRequest, req);
const indexPattern = req.params.id;
const usePipeline = req.payload.pipeline;
const csv = req.payload.csv;
const fileName = csv.hapi.filename;
const responseStream = new PassThrough();
const parser = parse({
columns: true,
auto_parse: true,
delimiter: _.get(req.payload, 'delimiter', ',')
});
let currentLine = 2; // Starts at 2 since we parse the header separately
responseStream.write('[');
csv.pipe(parser);
hi(parser)
.consume((err, doc, push, next) => {
if (err) {
push(err, null);
next();
}
else if (doc === hi.nil) {
// pass nil (end event) along the stream
push(null, doc);
}
else {
push(null, {index: {_id: `L${currentLine} - ${fileName}`}});
push(null, doc);
currentLine++;
next();
}
})
.batch(2000)
.map((bulkBody) => {
const bulkParams = {
index: indexPattern,
type: 'default',
body: bulkBody
};
if (usePipeline) {
bulkParams.pipeline = patternToIngest(indexPattern);
}
return hi(boundCallWithRequest('bulk', bulkParams));
})
.parallel(2)
.map((response) => {
return JSON.stringify(_.reduce(response.items, (memo, docResponse) => {
const indexResult = docResponse.index;
if (indexResult.error) {
if (_.isUndefined(_.get(memo, 'errors.index'))) {
_.set(memo, 'errors.index', []);
}
memo.errors.index.push(_.pick(indexResult, ['_id', 'error']));
}
else {
memo.created++;
}
return memo;
}, {created: 0}));
})
.stopOnError((err, push) => {
push(null, JSON.stringify({created: 0, errors: {other: [err.message]}}));
})
.intersperse(',')
.append(']')
.pipe(responseStream);
reply(responseStream).type('application/json');
}
});
}