mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 14:17:58 -04:00
416 lines
16 KiB
Text
416 lines
16 KiB
Text
:register_method: true
|
|
:encode_method: true
|
|
:decode_method: true
|
|
:plugintype: codec
|
|
:pluginclass: Codecs
|
|
:pluginname: example
|
|
:pluginnamecap: Example
|
|
:plugintypecap: Codec
|
|
:sversion: '0.2.0'
|
|
|
|
:pluginrepo: https://github.com/logstash-plugins/logstash-codec-java_codec_example[example codec plugin]
|
|
|
|
:blockinput: true
|
|
|
|
//:getstarted: Let's step through creating a {plugintype} plugin using the https://github.com/logstash-plugins/logstash-codec-example/[example {plugintype} plugin].
|
|
|
|
//:methodheader: Logstash codecs must implement the `register` method, and the `decode` method or the `encode` method (or both).
|
|
|
|
[[java-codec-plugin]]
|
|
|
|
=== How to write a Java codec plugin
|
|
|
|
beta[]
|
|
|
|
NOTE: Java codecs are currently supported only for Java input and output plugins. They will not work with Ruby
|
|
input or output plugins.
|
|
|
|
// Pulls in shared section: Setting Up Environment
|
|
include::include/javapluginsetup.asciidoc[]
|
|
|
|
[float]
|
|
=== Code the plugin
|
|
|
|
The example codec plugin decodes messages separated by a configurable delimiter
|
|
and encodes messages by writing their string representation separated by a
|
|
delimiter. For example, if the codec were configured with `/` as the delimiter,
|
|
the input text `event1/event2/` would be decoded into two separate events with
|
|
`message` fields of `event1` and `event2`, respectively. Note that this is only
|
|
an example codec and does not cover all the edge cases that a production-grade
|
|
codec should cover.
|
|
|
|
Let's look at the main class in that codec filter:
|
|
|
|
[source,java]
|
|
-----
|
|
@LogstashPlugin(name="java_codec_example")
|
|
public class JavaCodecExample implements Codec {
|
|
|
|
public static final PluginConfigSpec<String> DELIMITER_CONFIG =
|
|
PluginConfigSpec.stringSetting("delimiter", ",");
|
|
|
|
private final String id;
|
|
private final String delimiter;
|
|
private final CharsetEncoder encoder;
|
|
private Event currentEncodedEvent;
|
|
private CharBuffer currentEncoding;
|
|
|
|
public JavaCodecExample(final Configuration config, final Context context) {
|
|
this(config.get(DELIMITER_CONFIG));
|
|
}
|
|
|
|
private JavaCodecExample(String delimiter) {
|
|
this.id = UUID.randomUUID().toString();
|
|
this.delimiter = delimiter;
|
|
this.encoder = Charset.defaultCharset().newEncoder();
|
|
}
|
|
|
|
@Override
|
|
public void decode(ByteBuffer byteBuffer, Consumer<Map<String, Object>> consumer) {
|
|
// a not-production-grade delimiter decoder
|
|
byte[] byteInput = new byte[byteBuffer.remaining()];
|
|
byteBuffer.get(byteInput);
|
|
if (byteInput.length > 0) {
|
|
String input = new String(byteInput);
|
|
String[] split = input.split(delimiter);
|
|
for (String s : split) {
|
|
Map<String, Object> map = new HashMap<>();
|
|
map.put("message", s);
|
|
consumer.accept(map);
|
|
}
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public void flush(ByteBuffer byteBuffer, Consumer<Map<String, Object>> consumer) {
|
|
// if the codec maintains any internal state such as partially-decoded input, this
|
|
// method should flush that state along with any additional input supplied in
|
|
// the ByteBuffer
|
|
|
|
decode(byteBuffer, consumer); // this is a simplistic implementation
|
|
}
|
|
|
|
@Override
|
|
public boolean encode(Event event, ByteBuffer buffer) throws EncodeException {
|
|
try {
|
|
if (currentEncodedEvent != null && event != currentEncodedEvent) {
|
|
throw new EncodeException("New event supplied before encoding of previous event was completed");
|
|
} else if (currentEncodedEvent == null) {
|
|
currentEncoding = CharBuffer.wrap(event.toString() + delimiter);
|
|
}
|
|
|
|
CoderResult result = encoder.encode(currentEncoding, buffer, true);
|
|
buffer.flip();
|
|
if (result.isError()) {
|
|
result.throwException();
|
|
}
|
|
|
|
if (result.isOverflow()) {
|
|
currentEncodedEvent = event;
|
|
return false;
|
|
} else {
|
|
currentEncodedEvent = null;
|
|
return true;
|
|
}
|
|
} catch (IOException e) {
|
|
throw new IllegalStateException(e);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public Collection<PluginConfigSpec<?>> configSchema() {
|
|
// should return a list of all configuration options for this plugin
|
|
return Collections.singletonList(DELIMITER_CONFIG);
|
|
}
|
|
|
|
@Override
|
|
public Codec cloneCodec() {
|
|
return new JavaCodecExample(this.delimiter);
|
|
}
|
|
|
|
@Override
|
|
public String getId() {
|
|
return this.id;
|
|
}
|
|
}
|
|
-----
|
|
|
|
Let's step through and examine each part of that class.
|
|
|
|
[float]
|
|
==== Class declaration
|
|
|
|
[source,java]
|
|
-----
|
|
@LogstashPlugin(name="java_codec_example")
|
|
public class JavaCodecExample implements Codec {
|
|
-----
|
|
|
|
Notes about the class declaration:
|
|
|
|
* All Java plugins must be annotated with the `@LogstashPlugin` annotation. Additionally:
|
|
** The `name` property of the annotation must be supplied and defines the name of the plugin as it will be used
|
|
in the Logstash pipeline definition. For example, this codec would be referenced in the codec section of the
|
|
an appropriate input or output in the Logstash pipeline defintion as `codec => java_codec_example { }`
|
|
** The value of the `name` property must match the name of the class excluding casing and underscores.
|
|
* The class must implement the `co.elastic.logstash.api.Codec` interface.
|
|
|
|
[float]
|
|
===== Plugin settings
|
|
|
|
The snippet below contains both the setting definition and the method referencing it:
|
|
|
|
[source,java]
|
|
-----
|
|
public static final PluginConfigSpec<String> DELIMITER_CONFIG =
|
|
PluginConfigSpec.stringSetting("delimiter", ",");
|
|
|
|
@Override
|
|
public Collection<PluginConfigSpec<?>> configSchema() {
|
|
return Collections.singletonList(DELIMITER_CONFIG);
|
|
}
|
|
-----
|
|
|
|
The `PluginConfigSpec` class allows developers to specify the settings that a
|
|
plugin supports complete with setting name, data type, deprecation status,
|
|
required status, and default value. In this example, the `delimiter` setting
|
|
defines the delimiter on which the codec will split events. It is not a required
|
|
setting and if it is not explicitly set, its default value will be `,`.
|
|
|
|
The `configSchema` method must return a list of all settings that the plugin
|
|
supports. The Logstash execution engine will validate that all required
|
|
settings are present and that no unsupported settings are present.
|
|
|
|
[float]
|
|
===== Constructor and initialization
|
|
|
|
[source,java]
|
|
-----
|
|
private final String id;
|
|
private final String delimiter;
|
|
private final CharsetEncoder encoder;
|
|
|
|
public JavaCodecExample(final Configuration config, final Context context) {
|
|
this(config.get(DELIMITER_CONFIG));
|
|
}
|
|
|
|
private JavaCodecExample(String delimiter) {
|
|
this.id = UUID.randomUUID().toString();
|
|
this.delimiter = delimiter;
|
|
this.encoder = Charset.defaultCharset().newEncoder();
|
|
}
|
|
-----
|
|
|
|
All Java codec plugins must have a constructor taking a `Configuration` and
|
|
`Context` argument. This is the constructor that will be used to instantiate
|
|
them at runtime. The retrieval and validation of all plugin settings should
|
|
occur in this constructor. In this example, the delimiter to be used for
|
|
delimiting events is retrieved from its setting and stored in a local variable
|
|
so that it can be used later in the `decode` and `encode` methods. The codec's
|
|
ID is initialized to a random UUID (as should be done for most codecs), and a
|
|
local `encoder` variable is initialized to encode and decode with a specified
|
|
character set.
|
|
|
|
Any additional initialization may occur in the constructor as well. If there are
|
|
any unrecoverable errors encountered in the configuration or initialization of
|
|
the codec plugin, a descriptive exception should be thrown. The exception will
|
|
be logged and will prevent Logstash from starting.
|
|
|
|
[float]
|
|
==== Codec methods
|
|
|
|
[source,java]
|
|
-----
|
|
@Override
|
|
public void decode(ByteBuffer byteBuffer, Consumer<Map<String, Object>> consumer) {
|
|
// a not-production-grade delimiter decoder
|
|
byte[] byteInput = new byte[byteBuffer.remaining()];
|
|
byteBuffer.get(byteInput);
|
|
if (byteInput.length > 0) {
|
|
String input = new String(byteInput);
|
|
String[] split = input.split(delimiter);
|
|
for (String s : split) {
|
|
Map<String, Object> map = new HashMap<>();
|
|
map.put("message", s);
|
|
consumer.accept(map);
|
|
}
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public void flush(ByteBuffer byteBuffer, Consumer<Map<String, Object>> consumer) {
|
|
decode(byteBuffer, consumer); // this is a simplistic implementation
|
|
}
|
|
|
|
@Override
|
|
public boolean encode(Event event, ByteBuffer buffer) throws EncodeException {
|
|
try {
|
|
if (currentEncodedEvent != null && event != currentEncodedEvent) {
|
|
throw new EncodeException("New event supplied before encoding of previous event was completed");
|
|
} else if (currentEncodedEvent == null) {
|
|
currentEncoding = CharBuffer.wrap(event.toString() + delimiter);
|
|
}
|
|
|
|
CoderResult result = encoder.encode(currentEncoding, buffer, true);
|
|
buffer.flip();
|
|
if (result.isError()) {
|
|
result.throwException();
|
|
}
|
|
|
|
if (result.isOverflow()) {
|
|
currentEncodedEvent = event;
|
|
return false;
|
|
} else {
|
|
currentEncodedEvent = null;
|
|
return true;
|
|
}
|
|
} catch (IOException e) {
|
|
throw new IllegalStateException(e);
|
|
}
|
|
}
|
|
|
|
-----
|
|
|
|
The `decode`, `flush`, and `encode` methods provide the core functionality of
|
|
the codec. Codecs may be used by inputs to decode a sequence or stream of bytes
|
|
into events or by outputs to encode events into a sequence of bytes.
|
|
|
|
The `decode` method decodes events from the specified `ByteBuffer` and passes
|
|
them to the provided `Consumer`. The input must provide a `ByteBuffer` that is
|
|
ready for reading with `byteBuffer.position()` indicating the next position to
|
|
read and `byteBuffer.limit()` indicating the first byte in the buffer that is
|
|
not safe to read. Codecs must ensure that `byteBuffer.position()` reflects the
|
|
last-read position before returning control to the input. The input is then
|
|
responsible for returning the buffer to write mode via either
|
|
`byteBuffer.clear()` or `byteBuffer.compact()` before resuming writes. In the
|
|
example above, the `decode` method simply splits the incoming byte stream on the
|
|
specified delimiter. A production-grade codec such as
|
|
https://github.com/elastic/logstash/blob/6.7/logstash-core/src/main/java/org/logstash/plugins/codecs/Line.java[`java-line`]
|
|
would not make the simplifying assumption that the end of the supplied byte
|
|
stream corresponded with the end of an event.
|
|
|
|
The `flush` method works in coordination with the `decode` method to decode all
|
|
remaining events from the specified `ByteBuffer` along with any internal state
|
|
that may remain after previous calls to the `decode` method. As an example of
|
|
internal state that a codec might maintain, consider an input stream of bytes
|
|
`event1/event2/event3` with a delimiter of `/`. Due to buffering or other
|
|
reasons, the input might supply a partial stream of bytes such as `event1/eve`
|
|
to the codec's `decode` method. In this case, the codec could save the beginning
|
|
three characters `eve` of the second event rather than assuming that the
|
|
supplied byte stream ends on an event boundary. If the next call to `decode`
|
|
supplied the `nt2/ev` bytes, the codec would prepend the saved `eve` bytes to
|
|
produce the full `event2` event and then save the remaining `ev` bytes for
|
|
decoding when the remainder of the bytes for that event were supplied. A call to
|
|
`flush` signals the codec that the supplied bytes represent the end of an event
|
|
stream and all remaining bytes should be decoded to events. The `flush` example
|
|
above is a simplistic implementation that does not maintain any state about
|
|
partially-supplied byte streams across calls to `decode`.
|
|
|
|
The `encode` method encodes an event into a sequence of bytes and writes it into
|
|
the specified `ByteBuffer`. Under ideal circumstances, the entirety of the
|
|
event's encoding will fit into the supplied buffer. In cases where the buffer
|
|
has insufficient space to hold the event's encoding, the codec must fill the
|
|
buffer with as much of the event's encoding as possible, the `encode` must
|
|
return `false`, and the output must call the `encode` method with the same event
|
|
and a buffer that has more `buffer.remaining()` bytes. The output typically does
|
|
that by draining the partial encoding from the supplied buffer. This process
|
|
must be repeated until the event's entire encoding is written to the buffer at
|
|
which point the `encode` method will return `true`. Attempting to call this
|
|
method with a new event before the entirety of the previous event's encoding has
|
|
been written to a buffer must result in an `EncodeException`. As the coneptual
|
|
inverse of the `decode` method, the `encode` method must return the buffer in a
|
|
state from which it can be read, typically by calling `buffer.flip()` before
|
|
returning. In the example above, the `encode` method attempts to write the
|
|
event's encoding to the supplied buffer. If the buffer contains sufficient free
|
|
space, the entirety of the event is written and `true` is returned. Otherwise,
|
|
the method writes as much of the event's encoding to the buffer as possible,
|
|
returns `false`, and stores the remainder to be written to the buffer in the
|
|
next call to the `encode` method.
|
|
|
|
[float]
|
|
==== cloneCodec method
|
|
|
|
[source,java]
|
|
-----
|
|
@Override
|
|
public Codec cloneCodec() {
|
|
return new JavaCodecExample(this.delimiter);
|
|
}
|
|
-----
|
|
|
|
The `cloneCodec` method should return an identical instance of the codec with the exception of its ID. Because codecs
|
|
may be stateful across calls to their `decode` methods, input plugins that are multi-threaded should use a separate
|
|
instance of each codec via the `cloneCodec` method for each of their threads. Because a single codec instance is shared
|
|
across all pipeline workers in the output stage of the Logstash pipeline, codecs should _not_ retain state across calls
|
|
to their `encode` methods. In the example above, the codec is cloned with the same delimiter but a different ID.
|
|
|
|
[float]
|
|
==== getId method
|
|
|
|
[source,java]
|
|
-----
|
|
@Override
|
|
public String getId() {
|
|
return id;
|
|
}
|
|
-----
|
|
|
|
For codec plugins, the `getId` method should always return the id that was set at instantiation time. This is typically
|
|
an UUID.
|
|
|
|
[float]
|
|
==== Unit tests
|
|
|
|
Lastly, but certainly not least importantly, unit tests are strongly encouraged.
|
|
The example codec plugin includes an
|
|
https://github.com/logstash-plugins/logstash-codec-java_codec_example/blob/master/src/test/java/org/logstash/javaapi/JavaCodecExampleTest.java[example unit
|
|
test] that you can use as a template for your own.
|
|
|
|
// Pulls in shared section about Packaging and Deploying
|
|
include::include/javapluginpkg.asciidoc[]
|
|
|
|
[float]
|
|
=== Run Logstash with the Java codec plugin
|
|
|
|
To test the plugin, start Logstash with:
|
|
|
|
[source,java]
|
|
-----
|
|
echo "foo,bar" | bin/logstash --java-execution -e 'input { java_stdin { codec => java_codec_example } } }'
|
|
-----
|
|
|
|
NOTE: The `--java-execution` flag to enable the Java execution engine is required as Java plugins are not supported
|
|
in the Ruby execution engine.
|
|
|
|
The expected Logstash output (excluding initialization) with the configuration above is:
|
|
|
|
[source,txt]
|
|
-----
|
|
{
|
|
"@version" => "1",
|
|
"message" => "foo",
|
|
"@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ,
|
|
"host" => "<yourHostName>"
|
|
}
|
|
{
|
|
"@version" => "1",
|
|
"message" => "bar\n",
|
|
"@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ,
|
|
"host" => "<yourHostName>"
|
|
}
|
|
-----
|
|
|
|
[float]
|
|
=== Feedback
|
|
|
|
If you have any feedback on Java plugin support in Logstash, please comment on our
|
|
https://github.com/elastic/logstash/issues/9215[main Github issue] or post in the
|
|
https://discuss.elastic.co/c/logstash[Logstash forum].
|
|
|
|
:pluginrepo!:
|
|
:sversion!:
|
|
:plugintypecap!:
|
|
:pluginnamecap!:
|
|
:pluginname!:
|
|
:pluginclass!:
|
|
:plugintype!:
|