plain codec for Java

Fixes #10791
This commit is contained in:
Dan Hermann 2019-05-15 10:04:29 -05:00
parent 6dd85ca2a8
commit 8a2b7be327
4 changed files with 309 additions and 13 deletions

View file

@ -0,0 +1,112 @@
package org.logstash.plugins.codecs;
import co.elastic.logstash.api.Codec;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.LogstashPlugin;
import co.elastic.logstash.api.PluginConfigSpec;
import org.logstash.StringInterpolation;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
/**
* The plain codec accepts input bytes as events with no decoding beyond the application of a specified
* character set. For encoding, an optional format string may be specified.
*/
@LogstashPlugin(name = "java_plain")
public class Plain implements Codec {
private static final PluginConfigSpec<String> CHARSET_CONFIG =
PluginConfigSpec.stringSetting("charset", "UTF-8");
private static final PluginConfigSpec<String> FORMAT_CONFIG =
PluginConfigSpec.stringSetting("format");
static final String MESSAGE_FIELD = "message";
private Context context;
private final Map<String, Object> map = new HashMap<>();
private final Charset charset;
private String format = null;
private String id;
private final CharBuffer charBuffer = ByteBuffer.allocateDirect(64 * 1024).asCharBuffer();
private final CharsetDecoder decoder;
/**
* Required constructor.
*
* @param configuration Logstash Configuration
* @param context Logstash Context
*/
public Plain(final Configuration configuration, final Context context) {
this(context, configuration.get(CHARSET_CONFIG), configuration.get(FORMAT_CONFIG));
}
private Plain(Context context, String charsetName, String format) {
this.context = context;
this.id = UUID.randomUUID().toString();
this.charset = Charset.forName(charsetName);
this.format = format;
decoder = charset.newDecoder();
decoder.onMalformedInput(CodingErrorAction.IGNORE);
}
@Override
public void decode(ByteBuffer buffer, Consumer<Map<String, Object>> eventConsumer) {
if (buffer.position() < buffer.limit()) {
decoder.decode(buffer, charBuffer, true);
charBuffer.flip();
eventConsumer.accept(simpleMap(charBuffer.toString()));
charBuffer.clear();
}
}
@Override
public void flush(ByteBuffer buffer, Consumer<Map<String, Object>> eventConsumer) {
decode(buffer, eventConsumer);
}
@Override
public void encode(Event event, OutputStream output) throws IOException {
String outputString = (format == null
? event.toString()
: StringInterpolation.evaluate(event, format));
output.write(outputString.getBytes(charset));
}
private Map<String, Object> simpleMap(String message) {
map.put(MESSAGE_FIELD, message);
return map;
}
@Override
public Collection<PluginConfigSpec<?>> configSchema() {
return Arrays.asList(CHARSET_CONFIG, FORMAT_CONFIG);
}
@Override
public String getId() {
return id;
}
@Override
public Codec cloneCodec() {
return new Plain(context, charset.name(), format);
}
}

View file

@ -12,12 +12,10 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -191,7 +189,7 @@ public class LineTest {
compareMessages(expectedMessages, eventConsumer.events, flushConsumer.events);
}
private static void compareMessages(String[] expectedMessages, List<Map<String, Object>> events, List<Map<String, Object>> flushedEvents) {
static void compareMessages(String[] expectedMessages, List<Map<String, Object>> events, List<Map<String, Object>> flushedEvents) {
if (expectedMessages != null) {
for (int k = 0; k < events.size(); k++) {
assertEquals(expectedMessages[k], events.get(k).get(Line.MESSAGE_FIELD));
@ -214,7 +212,7 @@ public class LineTest {
}
@Test
public void testDecodeWithCharset() throws Exception {
public void testDecodeWithCharset() {
TestEventConsumer flushConsumer = new TestEventConsumer();
// decode with cp-1252
@ -341,12 +339,3 @@ public class LineTest {
}
class TestEventConsumer implements Consumer<Map<String, Object>> {
List<Map<String, Object>> events = new ArrayList<>();
@Override
public void accept(Map<String, Object> stringObjectMap) {
events.add(new HashMap<>(stringObjectMap));
}
}

View file

@ -0,0 +1,178 @@
package org.logstash.plugins.codecs;
import co.elastic.logstash.api.Codec;
import org.junit.Assert;
import org.junit.Test;
import org.logstash.Event;
import org.logstash.plugins.ConfigurationImpl;
import org.logstash.plugins.TestContext;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.logstash.plugins.codecs.LineTest.compareMessages;
public class PlainTest {
@Test
public void testSimpleDecode() {
String input = new String("abc".getBytes(), Charset.forName("UTF-8"));
testDecode( null, input, 1, new String[]{input});
}
@Test
public void testEmptyDecode() {
String input = new String("".getBytes(), Charset.forName("UTF-8"));
testDecode( null, input, 0, new String[]{});
}
@Test
public void testDecodeWithUtf8() {
String input = new String("München 安装中文输入法".getBytes(), Charset.forName("UTF-8"));
testDecode(null, input, 1, new String[]{input});
}
@Test
public void testDecodeWithCharset() {
TestEventConsumer eventConsumer = new TestEventConsumer();
// decode with cp-1252
Plain cp1252decoder = new Plain(new ConfigurationImpl(Collections.singletonMap("charset", "cp1252")), new TestContext());
byte[] rightSingleQuoteInCp1252 = {(byte) 0x92};
ByteBuffer b1 = ByteBuffer.wrap(rightSingleQuoteInCp1252);
cp1252decoder.decode(b1, eventConsumer);
assertEquals(1, eventConsumer.events.size());
cp1252decoder.flush(b1, eventConsumer);
assertEquals(1, eventConsumer.events.size());
String fromCp1252 = (String) eventConsumer.events.get(0).get(Plain.MESSAGE_FIELD);
// decode with UTF-8
eventConsumer.events.clear();
Plain utf8decoder = new Plain(new ConfigurationImpl(Collections.emptyMap()), new TestContext());
byte[] rightSingleQuoteInUtf8 = {(byte) 0xE2, (byte) 0x80, (byte) 0x99};
ByteBuffer b2 = ByteBuffer.wrap(rightSingleQuoteInUtf8);
utf8decoder.decode(b2, eventConsumer);
assertEquals(1, eventConsumer.events.size());
utf8decoder.flush(b2, eventConsumer);
assertEquals(1, eventConsumer.events.size());
String fromUtf8 = (String) eventConsumer.events.get(0).get(Plain.MESSAGE_FIELD);
assertEquals(fromCp1252, fromUtf8);
}
private void testDecode(String charset, String inputString, Integer expectedPreflushEvents, String[] expectedMessages) {
Plain plain = getPlainCodec(charset);
byte[] inputBytes = null;
try {
inputBytes = inputString.getBytes("UTF-8");
} catch (UnsupportedEncodingException ex) {
Assert.fail();
}
TestEventConsumer eventConsumer = new TestEventConsumer();
ByteBuffer inputBuffer = ByteBuffer.wrap(inputBytes, 0, inputBytes.length);
plain.decode(inputBuffer, eventConsumer);
if (expectedPreflushEvents != null) {
assertEquals(expectedPreflushEvents.intValue(), eventConsumer.events.size());
}
inputBuffer.compact();
inputBuffer.flip();
// flushing the plain codec should never produce events
TestEventConsumer flushConsumer = new TestEventConsumer();
plain.flush(inputBuffer, flushConsumer);
assertEquals(0, flushConsumer.events.size());
compareMessages(expectedMessages, eventConsumer.events, flushConsumer.events);
}
private static Plain getPlainCodec(String charset) {
Map<String, Object> config = new HashMap<>();
if (charset != null) {
config.put("charset", charset);
}
return new Plain(new ConfigurationImpl(config), new TestContext());
}
@Test
public void testEncodeWithUtf8() throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
String message = new String("München 安装中文输入法".getBytes(), Charset.forName("UTF-8"));
Map<String, Object> config = new HashMap<>();
config.put("format", "%{message}");
Plain codec = new Plain(new ConfigurationImpl(config), new TestContext());
Event e1 = new Event(Collections.singletonMap("message", message));
codec.encode(e1, outputStream);
Assert.assertEquals(message, new String(outputStream.toByteArray(), Charset.forName("UTF-8")));
}
@Test
public void testEncodeWithCharset() throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
byte[] rightSingleQuoteInUtf8 = {(byte) 0xE2, (byte) 0x80, (byte) 0x99};
String rightSingleQuote = new String(rightSingleQuoteInUtf8, Charset.forName("UTF-8"));
// encode with cp-1252
Map<String, Object> config = new HashMap<>();
config.put("charset", "cp1252");
config.put("format", "%{message}");
config.put("delimiter", "");
Event e1 = new Event(Collections.singletonMap("message", rightSingleQuote));
Plain cp1252encoder = new Plain(new ConfigurationImpl(config), new TestContext());
byte[] rightSingleQuoteInCp1252 = {(byte) 0x92};
cp1252encoder.encode(e1, outputStream);
byte[] resultBytes = outputStream.toByteArray();
Assert.assertArrayEquals(rightSingleQuoteInCp1252, resultBytes);
}
@Test
public void testEncodeWithFormat() throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Plain encoder = new Plain(new ConfigurationImpl(Collections.singletonMap("format", "%{host}-%{message}")), null);
String message = "Hello world";
String host = "test";
String expectedOutput = host + "-" + message;
Event e = new Event();
e.setField("message", message);
e.setField("host", host);
encoder.encode(e, outputStream);
String resultingString = outputStream.toString();
assertEquals(expectedOutput, resultingString);
}
@Test
public void testClone() throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
String charset = "cp1252";
byte[] rightSingleQuoteInUtf8 = {(byte) 0xE2, (byte) 0x80, (byte) 0x99};
String rightSingleQuote = new String(rightSingleQuoteInUtf8, Charset.forName("UTF-8"));
// encode with cp-1252
Map<String, Object> config = new HashMap<>();
config.put("charset", charset);
config.put("format", "%{message}");
Event e1 = new Event(Collections.singletonMap("message", rightSingleQuote));
Plain codec = new Plain(new ConfigurationImpl(config), new TestContext());
// clone codec
Codec clone = codec.cloneCodec();
Assert.assertEquals(codec.getClass(), clone.getClass());
Plain plain2 = (Plain)clone;
// verify charset and delimiter
byte[] rightSingleQuoteInCp1252 = {(byte) 0x92};
plain2.encode(e1, outputStream);
Assert.assertArrayEquals(rightSingleQuoteInCp1252, outputStream.toByteArray());
}
}

View file

@ -0,0 +1,17 @@
package org.logstash.plugins.codecs;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
public class TestEventConsumer implements Consumer<Map<String, Object>> {
List<Map<String, Object>> events = new ArrayList<>();
@Override
public void accept(Map<String, Object> stringObjectMap) {
events.add(new HashMap<>(stringObjectMap));
}
}