mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
Add ingest append (#7324)
* Add Append processor converto * Fix bug in convert processor * Updated comment * add JSON processor convertor * use the right name for the variable * Fix the double space in convert
This commit is contained in:
parent
3e579a7785
commit
d0b5f3a5e4
30 changed files with 353 additions and 8 deletions
|
@ -0,0 +1,33 @@
|
|||
package org.logstash.ingest;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import javax.script.Invocable;
|
||||
import javax.script.ScriptEngine;
|
||||
import javax.script.ScriptException;
|
||||
|
||||
/**
|
||||
* Ingest Append DSL to Logstash mutate Transpiler.
|
||||
*/
|
||||
public final class Append {
|
||||
|
||||
private Append() {
|
||||
// Utility Wrapper for JS Script.
|
||||
}
|
||||
|
||||
public static void main(final String... args) throws ScriptException, NoSuchMethodException {
|
||||
try {
|
||||
final ScriptEngine engine = JsUtil.engine();
|
||||
Files.write(Paths.get(args[1]), ((String) ((Invocable) engine).invokeFunction(
|
||||
"ingest_append_to_logstash",
|
||||
new String(
|
||||
Files.readAllBytes(Paths.get(args[0])), StandardCharsets.UTF_8
|
||||
)
|
||||
)).getBytes(StandardCharsets.UTF_8));
|
||||
} catch (final IOException ex) {
|
||||
throw new IllegalStateException(ex);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -14,7 +14,7 @@ final class JsUtil {
|
|||
*/
|
||||
|
||||
private static final String[] SCRIPTS =
|
||||
{"shared", "date", "grok", "geoip", "gsub", "pipeline", "convert"};
|
||||
{"shared", "date", "grok", "geoip", "gsub", "pipeline", "convert", "append", "json"};
|
||||
|
||||
private JsUtil() {
|
||||
// Utility Class
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
package org.logstash.ingest;
|
||||
|
||||
import javax.script.Invocable;
|
||||
import javax.script.ScriptEngine;
|
||||
import javax.script.ScriptException;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
/**
|
||||
* Ingest JSON processor DSL to Logstash json Transpiler.
|
||||
*/
|
||||
public class Json {
|
||||
private Json() {
|
||||
// Utility Wrapper for JS Script.
|
||||
}
|
||||
|
||||
public static void main(final String... args) throws ScriptException, NoSuchMethodException {
|
||||
try {
|
||||
final ScriptEngine engine = JsUtil.engine();
|
||||
Files.write(Paths.get(args[1]), ((String) ((Invocable) engine).invokeFunction(
|
||||
"ingest_json_to_logstash",
|
||||
new String(
|
||||
Files.readAllBytes(Paths.get(args[0])), StandardCharsets.UTF_8
|
||||
)
|
||||
)).getBytes(StandardCharsets.UTF_8));
|
||||
} catch (final IOException ex) {
|
||||
throw new IllegalStateException(ex);
|
||||
}
|
||||
}
|
||||
}
|
36
tools/ingest-converter/src/main/resources/ingest-append.js
Normal file
36
tools/ingest-converter/src/main/resources/ingest-append.js
Normal file
|
@ -0,0 +1,36 @@
|
|||
var IngestAppend = {
|
||||
has_append: function (processor) {
|
||||
return !!processor["append"];
|
||||
},
|
||||
append_hash: function (processor) {
|
||||
var append_json = processor["append"];
|
||||
var value_contents;
|
||||
var value = append_json["value"];
|
||||
if (Array.isArray(value)) {
|
||||
value_contents = IngestConverter.create_array(value);
|
||||
} else {
|
||||
value_contents = IngestConverter.quote_string(value);
|
||||
}
|
||||
var mutate_contents = IngestConverter.create_field(
|
||||
IngestConverter.quote_string(IngestConverter.dots_to_square_brackets(append_json["field"])),
|
||||
value_contents);
|
||||
return IngestConverter.create_field("add_field", IngestConverter.wrap_in_curly(mutate_contents));
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Converts Ingest Append JSON to LS mutate filter.
|
||||
*/
|
||||
function ingest_append_to_logstash(json) {
|
||||
|
||||
function map_processor(processor) {
|
||||
|
||||
return IngestConverter.filter_hash(
|
||||
IngestConverter.create_hash(
|
||||
"mutate", IngestAppend.append_hash(processor)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
return IngestConverter.filters_to_file(JSON.parse(json)["processors"].map(map_processor));
|
||||
}
|
|
@ -8,7 +8,7 @@ var IngestConvert = {
|
|||
IngestConverter.quote_string(IngestConverter.dots_to_square_brackets(convert_json["field"])),
|
||||
IngestConverter.quote_string(convert_json["type"])
|
||||
);
|
||||
return IngestConverter.create_hash("convert", mutate_contents);
|
||||
return IngestConverter.create_field("convert", IngestConverter.wrap_in_curly(mutate_contents));
|
||||
}
|
||||
};
|
||||
|
||||
|
|
44
tools/ingest-converter/src/main/resources/ingest-json.js
Normal file
44
tools/ingest-converter/src/main/resources/ingest-json.js
Normal file
|
@ -0,0 +1,44 @@
|
|||
var IngestJson = {
|
||||
has_json: function (processor) {
|
||||
return !!processor["json"];
|
||||
},
|
||||
json_hash: function (processor) {
|
||||
var json_data = processor["json"];
|
||||
var parts = [
|
||||
IngestConverter.create_field(
|
||||
"source",
|
||||
IngestConverter.quote_string(
|
||||
IngestConverter.dots_to_square_brackets(json_data["field"])
|
||||
)
|
||||
)
|
||||
];
|
||||
|
||||
if (json_data["target_field"]) {
|
||||
parts.push(
|
||||
IngestConverter.create_field(
|
||||
"target",
|
||||
IngestConverter.quote_string(
|
||||
IngestConverter.dots_to_square_brackets(json_data["target_field"])
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
return IngestConverter.join_hash_fields(parts);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Converts Ingest json processor to LS json filter.
|
||||
*/
|
||||
function ingest_json_to_logstash(json) {
|
||||
|
||||
function map_processor(processor) {
|
||||
|
||||
return IngestConverter.filter_hash(
|
||||
IngestConverter.create_hash("json", IngestJson.json_hash(processor))
|
||||
)
|
||||
}
|
||||
|
||||
return IngestConverter.filters_to_file(JSON.parse(json)["processors"].map(map_processor));
|
||||
}
|
|
@ -31,6 +31,16 @@ function ingest_pipeline_to_logstash(json) {
|
|||
IngestConverter.create_hash("mutate", IngestGsub.gsub_hash(processor))
|
||||
);
|
||||
}
|
||||
if (IngestAppend.has_append(processor)) {
|
||||
filter_blocks.push(
|
||||
IngestConverter.create_hash("mutate", IngestAppend.append_hash(processor))
|
||||
);
|
||||
}
|
||||
if (IngestJson.has_json(processor)) {
|
||||
filter_blocks.push(
|
||||
IngestConverter.create_hash("json", IngestJson.json_hash(processor))
|
||||
);
|
||||
}
|
||||
return IngestConverter.join_hash_fields(filter_blocks);
|
||||
}
|
||||
|
||||
|
|
|
@ -90,6 +90,12 @@ var IngestConverter = {
|
|||
+ patterns.map(this.dots_to_square_brackets).map(this.quote_string).join(",\n")
|
||||
+ "\n]";
|
||||
},
|
||||
|
||||
create_array: function (ingest_array) {
|
||||
return "[\n"
|
||||
+ ingest_array.map(this.quote_string).join(",\n")
|
||||
+ "\n]";
|
||||
},
|
||||
|
||||
/**
|
||||
* Converts Ingest/JSON style pattern array to LS pattern array or string if the given array
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
package org.logstash.ingest;
|
||||
|
||||
import java.util.Arrays;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
public final class AppendTest extends IngestTest {
|
||||
|
||||
@Parameters
|
||||
public static Iterable<String> data() {
|
||||
return Arrays.asList("Append", "DotsInAppendField", "AppendScalar");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void convertsConvertProcessorCorrectly() throws Exception {
|
||||
final String append = getResultPath(temp);
|
||||
Append.main(resourcePath(String.format("ingest%s.json", testCase)), append);
|
||||
assertThat(
|
||||
utf8File(append), is(utf8File(resourcePath(String.format("logstash%s.conf", testCase))))
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
package org.logstash.ingest;
|
||||
|
||||
import java.util.Arrays;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
public final class JsonTest extends IngestTest {
|
||||
|
||||
@Parameters
|
||||
public static Iterable<String> data() {
|
||||
return Arrays.asList("Json", "DotsInJsonField", "JsonExtraFields");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void convertsConvertProcessorCorrectly() throws Exception {
|
||||
final String json = getResultPath(temp);
|
||||
Json.main(resourcePath(String.format("ingest%s.json", testCase)), json);
|
||||
assertThat(
|
||||
utf8File(json), is(utf8File(resourcePath(String.format("logstash%s.conf", testCase))))
|
||||
);
|
||||
}
|
||||
}
|
|
@ -20,6 +20,8 @@ public final class PipelineTest extends IngestTest {
|
|||
GrokTest.data().forEach(cases::add);
|
||||
ConvertTest.data().forEach(cases::add);
|
||||
GsubTest.data().forEach(cases::add);
|
||||
AppendTest.data().forEach(cases::add);
|
||||
JsonTest.data().forEach(cases::add);
|
||||
return cases;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"description": "Pipeline to parse Apache logs",
|
||||
"processors": [
|
||||
{
|
||||
"append": {
|
||||
"field" : "client",
|
||||
"value": ["host1", "host2"]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"description": "Pipeline to parse Apache logs",
|
||||
"processors": [
|
||||
{
|
||||
"append": {
|
||||
"field" : "foo",
|
||||
"value": "bar"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
|
@ -30,6 +30,17 @@
|
|||
"field" : "bytes",
|
||||
"type": "integer"
|
||||
}
|
||||
},
|
||||
{
|
||||
"append": {
|
||||
"field" : "response_code",
|
||||
"value": ["200", "400", "503"]
|
||||
}
|
||||
},
|
||||
{
|
||||
"json": {
|
||||
"field": "string_source"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"description": "Pipeline to parse Apache logs",
|
||||
"processors": [
|
||||
{
|
||||
"append": {
|
||||
"field" : "client.ip",
|
||||
"value": ["127.0.0.1", "127.0.0.2"]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"description": "ExampleJson",
|
||||
"processors": [
|
||||
{
|
||||
"json": {
|
||||
"field": "[foo][string_source]",
|
||||
"target_field": "[bar][json_target]"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
{
|
||||
"description": "ExampleJson",
|
||||
"processors": [
|
||||
{
|
||||
"json": {
|
||||
"field": "string_source"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"description": "ExampleJson",
|
||||
"processors": [
|
||||
{
|
||||
"json": {
|
||||
"field": "string_source",
|
||||
"target_field": "json_target"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
filter {
|
||||
mutate {
|
||||
add_field => {
|
||||
"client" => [
|
||||
"host1",
|
||||
"host2"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
filter {
|
||||
mutate {
|
||||
add_field => {
|
||||
"foo" => "bar"
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,8 +17,20 @@ filter {
|
|||
target => "geo"
|
||||
}
|
||||
mutate {
|
||||
convert {
|
||||
convert => {
|
||||
"bytes" => "integer"
|
||||
}
|
||||
}
|
||||
mutate {
|
||||
add_field => {
|
||||
"response_code" => [
|
||||
"200",
|
||||
"400",
|
||||
"503"
|
||||
]
|
||||
}
|
||||
}
|
||||
json {
|
||||
source => "string_source"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ filter {
|
|||
target => "[source][geo]"
|
||||
}
|
||||
mutate {
|
||||
convert {
|
||||
convert => {
|
||||
"[client][bytes]" => "integer"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
filter {
|
||||
mutate {
|
||||
convert {
|
||||
convert => {
|
||||
"bytes" => "integer"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
filter {
|
||||
mutate {
|
||||
convert {
|
||||
convert => {
|
||||
"delete" => "boolean"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
filter {
|
||||
mutate {
|
||||
convert {
|
||||
convert => {
|
||||
"blah" => "string"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
filter {
|
||||
mutate {
|
||||
add_field => {
|
||||
"[client][ip]" => [
|
||||
"127.0.0.1",
|
||||
"127.0.0.2"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
filter {
|
||||
mutate {
|
||||
convert {
|
||||
convert => {
|
||||
"[client][bytes]" => "float"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
filter {
|
||||
json {
|
||||
source => "[foo][string_source]"
|
||||
target => "[bar][json_target]"
|
||||
}
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
filter {
|
||||
json {
|
||||
source => "string_source"
|
||||
}
|
||||
}
|
|
@ -0,0 +1,6 @@
|
|||
filter {
|
||||
json {
|
||||
source => "string_source"
|
||||
target => "json_target"
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue