Ingest convert processor (#7310)

* Handle ingest convert processor
This commit is contained in:
Suyog Rao 2017-06-05 08:02:59 -07:00 committed by Suyog Rao
parent d0f66bb850
commit 2a67f3e15e
18 changed files with 186 additions and 1 deletions

View file

@ -0,0 +1,33 @@
package org.logstash.ingest;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import javax.script.Invocable;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
/**
* Ingest Convert DSL to Logstash Date Transpiler.
*/
public final class Convert {
private Convert() {
// Utility Wrapper for JS Script.
}
public static void main(final String... args) throws ScriptException, NoSuchMethodException {
try {
final ScriptEngine engine = JsUtil.engine();
Files.write(Paths.get(args[1]), ((String) ((Invocable) engine).invokeFunction(
"ingest_convert_to_logstash",
new String(
Files.readAllBytes(Paths.get(args[0])), StandardCharsets.UTF_8
)
)).getBytes(StandardCharsets.UTF_8));
} catch (final IOException ex) {
throw new IllegalStateException(ex);
}
}
}

View file

@ -12,7 +12,7 @@ final class JsUtil {
/**
* Script names used by the converter in correct load order.
*/
private static final String[] SCRIPTS = {"shared", "date", "grok", "geoip", "pipeline"};
private static final String[] SCRIPTS = {"shared", "date", "grok", "geoip", "pipeline", "convert"};
private JsUtil() {
// Utility Class

View file

@ -0,0 +1,30 @@
var IngestConvert = {
has_convert: function (processor) {
return !!processor["convert"];
},
convert_hash: function (processor) {
var convert_json = processor["convert"];
var mutate_contents = IngestConverter.create_field(
IngestConverter.quote_string(IngestConverter.dots_to_square_brackets(convert_json["field"])),
IngestConverter.quote_string(convert_json["type"])
);
return IngestConverter.create_hash("convert", mutate_contents);
}
};
/**
* Converts Ingest Convert JSON to LS Date filter.
*/
function ingest_convert_to_logstash(json) {
function map_processor(processor) {
return IngestConverter.filter_hash(
IngestConverter.create_hash(
"mutate", IngestConvert.convert_hash(processor)
)
);
}
return IngestConverter.filters_to_file(JSON.parse(json)["processors"].map(map_processor));
}

View file

@ -21,6 +21,11 @@ function ingest_pipeline_to_logstash(json) {
IngestConverter.create_hash("geoip", IngestGeoIp.geoip_hash(processor))
)
}
if (IngestConvert.has_convert(processor)) {
filter_blocks.push(
IngestConverter.create_hash("mutate", IngestConvert.convert_hash(processor))
)
}
return IngestConverter.join_hash_fields(filter_blocks);
}

View file

@ -0,0 +1,25 @@
package org.logstash.ingest;
import java.util.Arrays;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.runners.Parameterized.Parameters;
public final class ConvertTest extends IngestTest {
@Parameters
public static Iterable<String> data() {
return Arrays.asList("Convert", "DotsInConvertField", "ConvertBoolean", "ConvertString");
}
@Test
public void convertsConvertProcessorCorrectly() throws Exception {
final String convert = getResultPath(temp);
Convert.main(resourcePath(String.format("ingest%s.json", testCase)), convert);
assertThat(
utf8File(convert), is(utf8File(resourcePath(String.format("logstash%s.conf", testCase))))
);
}
}

View file

@ -18,6 +18,7 @@ public final class PipelineTest extends IngestTest {
GeoIpTest.data().forEach(cases::add);
DateTest.data().forEach(cases::add);
GrokTest.data().forEach(cases::add);
ConvertTest.data().forEach(cases::add);
return cases;
}

View file

@ -24,6 +24,12 @@
"field": "client.ip",
"target_field": "geo"
}
},
{
"convert": {
"field" : "bytes",
"type": "integer"
}
}
]
}

View file

@ -30,6 +30,12 @@
"field": "source.ip",
"target_field": "source.geo"
}
},
{
"convert": {
"field" : "[client][bytes]",
"type": "integer"
}
}
]
}

View file

@ -0,0 +1,10 @@
{
"processors": [
{
"convert": {
"field" : "bytes",
"type": "integer"
}
}
]
}

View file

@ -0,0 +1,10 @@
{
"processors": [
{
"convert": {
"field" : "delete",
"type": "boolean"
}
}
]
}

View file

@ -0,0 +1,10 @@
{
"processors": [
{
"convert": {
"field" : "blah",
"type": "string"
}
}
]
}

View file

@ -0,0 +1,11 @@
{
"description": "Pipeline to parse Apache logs",
"processors": [
{
"convert": {
"field" : "client.bytes",
"type": "float"
}
}
]
}

View file

@ -16,4 +16,9 @@ filter {
source => "[client][ip]"
target => "geo"
}
mutate {
convert {
"bytes" => "integer"
}
}
}

View file

@ -20,4 +20,9 @@ filter {
source => "[source][ip]"
target => "[source][geo]"
}
mutate {
convert {
"[client][bytes]" => "integer"
}
}
}

View file

@ -0,0 +1,7 @@
filter {
mutate {
convert {
"bytes" => "integer"
}
}
}

View file

@ -0,0 +1,7 @@
filter {
mutate {
convert {
"delete" => "boolean"
}
}
}

View file

@ -0,0 +1,7 @@
filter {
mutate {
convert {
"blah" => "string"
}
}
}

View file

@ -0,0 +1,7 @@
filter {
mutate {
convert {
"[client][bytes]" => "float"
}
}
}