Native support for Java plugins (beta) (#10232)

This commit is contained in:
Dan Hermann 2019-02-04 11:36:36 -06:00 committed by GitHub
parent f08b8c5076
commit 48ee9987cc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
54 changed files with 2536 additions and 677 deletions

View file

@ -25,8 +25,6 @@ module LogStash; class JavaPipeline < JavaBasePipeline
@worker_threads = []
@java_inputs_controller = org.logstash.execution.InputsController.new(lir_execution.javaInputs)
@drain_queue = settings.get_value("queue.drain") || settings.get("queue.type") == "memory"
@events_filtered = java.util.concurrent.atomic.LongAdder.new
@ -258,8 +256,13 @@ module LogStash; class JavaPipeline < JavaBasePipeline
end
def wait_inputs
@input_threads.each(&:join)
@java_inputs_controller.awaitStop
@input_threads.each do |thread|
if thread.class == Java::JavaObject
thread.to_java.join
else
thread.join
end
end
end
def start_inputs
@ -278,11 +281,14 @@ module LogStash; class JavaPipeline < JavaBasePipeline
# then after all input plugins are successfully registered, start them
inputs.each { |input| start_input(input) }
@java_inputs_controller.startInputs(self)
end
def start_input(plugin)
@input_threads << Thread.new { inputworker(plugin) }
if plugin.class == LogStash::JavaInputDelegator
@input_threads << plugin.start
else
@input_threads << Thread.new { inputworker(plugin) }
end
end
def inputworker(plugin)
@ -344,7 +350,6 @@ module LogStash; class JavaPipeline < JavaBasePipeline
def stop_inputs
@logger.debug("Closing inputs", default_logging_keys)
inputs.each(&:do_stop)
@java_inputs_controller.stopInputs
@logger.debug("Closed inputs", default_logging_keys)
end
@ -393,7 +398,7 @@ module LogStash; class JavaPipeline < JavaBasePipeline
end
def plugin_threads_info
input_threads = @input_threads.select {|t| t.alive? }
input_threads = @input_threads.select {|t| t.class == Thread && t.alive? }
worker_threads = @worker_threads.select {|t| t.alive? }
(input_threads + worker_threads).map {|t| Util.thread_info(t) }
end

View file

@ -98,11 +98,12 @@ describe LogStash::Event do
expect(subject.sprintf("bonjour")).to eq("bonjour")
end
it "should raise error when formatting %{+%s} when @timestamp field is missing" do
it "should not raise error and should format as empty string when @timestamp field is missing" do
str = "hello-%{+%s}"
subj = subject.clone
subj.remove("[@timestamp]")
expect{ subj.sprintf(str) }.to raise_error(LogStash::Error)
expect{ subj.sprintf(str) }.not_to raise_error(LogStash::Error)
expect(subj.sprintf(str)).to eq("hello-")
end
it "should report a time with %{+format} syntax", :if => RUBY_ENGINE == "jruby" do
@ -115,11 +116,11 @@ describe LogStash::Event do
expect(subject.sprintf("foo %{+YYYY-MM-dd} %{type}")).to eq("foo 2013-01-01 sprintf")
end
it "should raise error with %{+format} syntax when @timestamp field is missing", :if => RUBY_ENGINE == "jruby" do
it "should not raise error with %{+format} syntax when @timestamp field is missing", :if => RUBY_ENGINE == "jruby" do
str = "logstash-%{+YYYY}"
subj = subject.clone
subj.remove("[@timestamp]")
expect{ subj.sprintf(str) }.to raise_error(LogStash::Error)
expect{ subj.sprintf(str) }.not_to raise_error(LogStash::Error)
end
it "should report fields with %{field} syntax" do

View file

@ -0,0 +1,86 @@
package co.elastic.logstash.api;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.function.Consumer;
/**
* Logstash Java codec interface. Logstash codecs may be used by inputs to decode a sequence or stream of bytes
* into events or by outputs to encode events into a sequence of bytes.
*/
public interface Codec extends Plugin {
/**
* Decodes events from the specified {@link ByteBuffer} and passes them to the provided
* {@link Consumer}.
*
* <ul>
* <li>The client (typically an {@link Input}) must provide a {@link ByteBuffer} that
* is ready for reading with with {@link ByteBuffer#position} indicating the next
* position to read and {@link ByteBuffer#limit} indicating the first byte in the
* buffer that is not safe to read.</li>
*
* <li>Implementations of {@link Codec} must ensure that {@link ByteBuffer#position}
* reflects the last-read position before returning control.</li>
*
* <li>The client is then responsible for returning the buffer
* to write mode via either {@link ByteBuffer#clear} or {@link ByteBuffer#compact} before
* resuming writes.</li>
* </ul>
*
* @param buffer Input buffer from which events will be decoded.
* @param eventConsumer Consumer to which decoded events will be passed.
*/
void decode(ByteBuffer buffer, Consumer<Map<String, Object>> eventConsumer);
/**
* Decodes all remaining events from the specified {@link ByteBuffer} along with any internal
* state that may remain after previous calls to {@link #decode(ByteBuffer, Consumer)}.
* @param buffer Input buffer from which events will be decoded.
* @param eventConsumer Consumer to which decoded events will be passed.
*/
void flush(ByteBuffer buffer, Consumer<Map<String, Object>> eventConsumer);
/**
* Encodes an {@link Event} and writes it into the specified {@link ByteBuffer}. Under ideal
* circumstances, the entirety of the event's encoding will fit into the supplied buffer. In cases
* where the buffer has insufficient space to hold the event's encoding, the buffer will be filled
* with as much of the event's encoding as possible, {@code false} will be returned, and the caller
* must call this method with the same event and a buffer that has more {@link Buffer#remaining()}
* bytes. That is typically done by draining the partial encoding from the supplied buffer. This
* process must be repeated until the event's entire encoding is written to the buffer at which
* point the method will return {@code true}. Attempting to call this method with a new event
* before the entirety of the previous event's encoding has been written to a buffer will result
* in an {@link EncodeException}.
*
* @param event The event to encode.
* @param buffer The buffer into which the encoding of the event should be written. Codec
* implementations are responsible for returning the buffer in a state from which it
* can be read, typically by calling {@link Buffer#flip()} before returning.
* @return {@code true} if the entirety or final segment of the event's encoding was written to
* the buffer. {@code false} if the buffer was incapable of holding the entirety or remainder of the
* event's encoding.
* @throws EncodeException if called with a new event before the entirety of the previous event's
* encoding was written to a buffer.
*/
boolean encode(Event event, ByteBuffer buffer) throws EncodeException;
/**
* Clones this {@link Codec}. All codecs should be capable of cloning themselves
* so that distinct instances of each codec can be supplied to multi-threaded
* inputs or outputs in cases where the codec is stateful.
* @return The cloned {@link Codec}.
*/
Codec cloneCodec();
class EncodeException extends Exception {
private static final long serialVersionUID = 1L;
public EncodeException(String message) {
super(message);
}
}
}

View file

@ -1,106 +1,37 @@
package co.elastic.logstash.api;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Map;
/**
* Configuration for Logstash Java plugins.
* Set of configuration settings for each plugin as read from the Logstash pipeline configuration.
*/
public final class Configuration {
private final Map<String, Object> rawSettings;
public interface Configuration {
/**
* @param raw Configuration Settings Map. Values are serialized.
* Strongly-typed accessor for a configuration setting.
* @param configSpec The setting specification for which to retrieve the setting value.
* @param <T> The type of the setting value to be retrieved.
* @return The value of the setting for the specified setting specification.
*/
public Configuration(final Map<String, Object> raw) {
this.rawSettings = raw;
}
<T> T get(PluginConfigSpec<T> configSpec);
@SuppressWarnings("unchecked")
public <T> T get(final PluginConfigSpec<T> configSpec) {
if (rawSettings.containsKey(configSpec.name())) {
Object o = rawSettings.get(configSpec.name());
if (configSpec.type().isAssignableFrom(o.getClass())) {
return (T) o;
} else {
throw new IllegalStateException(
String.format("Setting value for '%s' of type '%s' incompatible with defined type of '%s'",
configSpec.name(), o.getClass(), configSpec.type()));
}
} else {
return configSpec.defaultValue();
}
}
/**
* Weakly-typed accessor for a configuration setting.
* @param configSpec The setting specification for which to retrieve the setting value.
* @return The weakly-typed value of the setting for the specified setting specification.
*/
Object getRawValue(PluginConfigSpec<?> configSpec);
public Object getRawValue(final PluginConfigSpec<?> configSpec) {
return rawSettings.get(configSpec.name());
}
/**
* @param configSpec The setting specification for which to search.
* @return {@code true} if a value for the specified setting specification exists in
* this {@link Configuration}.
*/
boolean contains(PluginConfigSpec<?> configSpec);
public boolean contains(final PluginConfigSpec<?> configSpec) {
return rawSettings.containsKey(configSpec.name());
}
public Collection<String> allKeys() {
return rawSettings.keySet();
}
public static PluginConfigSpec<String> stringSetting(final String name) {
return new PluginConfigSpec<>(
name, String.class, null, false, false
);
}
public static PluginConfigSpec<String> stringSetting(final String name, final String defaultValue) {
return new PluginConfigSpec<>(
name, String.class, defaultValue, false, false
);
}
public static PluginConfigSpec<String> requiredStringSetting(final String name) {
return new PluginConfigSpec<>(name, String.class, null, false, true);
}
public static PluginConfigSpec<Long> numSetting(final String name) {
return new PluginConfigSpec<>(
name, Long.class, null, false, false
);
}
public static PluginConfigSpec<Long> numSetting(final String name, final long defaultValue) {
return new PluginConfigSpec<>(
name, Long.class, defaultValue, false, false
);
}
public static PluginConfigSpec<Path> pathSetting(final String name) {
return new PluginConfigSpec<>(name, Path.class, null, false, false);
}
public static PluginConfigSpec<Boolean> booleanSetting(final String name) {
return new PluginConfigSpec<>(name, Boolean.class, null, false, false);
}
@SuppressWarnings({"unchecked","rawtypes"})
public static PluginConfigSpec<Map<String, String>> hashSetting(final String name) {
return new PluginConfigSpec(name, Map.class, null, false, false);
}
@SuppressWarnings({"unchecked","rawtypes"})
public static <T> PluginConfigSpec<Map<String, T>> requiredFlatHashSetting(
final String name, Class<T> type) {
//TODO: enforce subtype
return new PluginConfigSpec(
name, Map.class, null, false, true
);
}
@SuppressWarnings({"unchecked","rawtypes"})
public static PluginConfigSpec<Map<String, Configuration>> requiredNestedHashSetting(
final String name, final Collection<PluginConfigSpec<?>> spec) {
return new PluginConfigSpec(
name, Map.class, null, false, true, spec
);
}
/**
* @return Collection of the names of all settings in this configuration as reported by
* {@link PluginConfigSpec#name()}.
*/
Collection<String> allKeys();
}

View file

@ -1,13 +1,31 @@
package co.elastic.logstash.api;
import org.apache.logging.log4j.Logger;
import org.logstash.common.io.DeadLetterQueueWriter;
/**
* Holds Logstash Environment.
* Provides Logstash context to plugins.
*/
public final class Context {
public interface Context {
/**
* Provides a dead letter queue (DLQ) writer, if configured, to output plugins. If no DLQ writer
* is configured or the plugin is not an output, {@code null} will be returned.
* @return {@link DeadLetterQueueWriter} instance if available or {@code null} otherwise.
*/
DeadLetterQueueWriter getDlqWriter();
/**
* Provides a {@link Logger} instance to plugins.
* @param plugin The plugin for which the logger should be supplied.
* @return The supplied Logger instance.
*/
Logger getLogger(Plugin plugin);
/**
* Provides an {@link EventFactory} to constructs instance of {@link Event}.
* @return The event factory.
*/
EventFactory getEventFactory();
public DeadLetterQueueWriter dlqWriter() {
return null;
}
}

View file

@ -0,0 +1,49 @@
package co.elastic.logstash.api;
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
/**
* Event interface for Java plugins. Java plugins should be not rely on the implementation details of any
* concrete implementations of the Event interface.
*/
public interface Event {
Map<String, Object> getData();
Map<String, Object> getMetadata();
void cancel();
void uncancel();
boolean isCancelled();
Instant getEventTimestamp();
void setEventTimestamp(Instant t);
Object getField(String reference);
Object getUnconvertedField(String reference);
void setField(String reference, Object value);
boolean includes(String field);
Map<String, Object> toMap();
Event overwrite(Event e);
Event append(Event e);
Object remove(String path);
String sprintf(String s) throws IOException;
Event clone();
String toString();
void tag(String tag);
}

View file

@ -0,0 +1,18 @@
package co.elastic.logstash.api;
import java.io.Serializable;
import java.util.Map;
public interface EventFactory {
/**
* @return New and empty event.
*/
Event newEvent();
/**
* @param data Map from which the new event should copy its data.
* @return New event copied from the supplied map data.
*/
Event newEvent(final Map<? extends Serializable, Object> data);
}

View file

@ -0,0 +1,68 @@
package co.elastic.logstash.api;
import java.util.Collection;
import java.util.Collections;
/**
* Logstash Java filter interface. Logstash filters may perform a variety of actions on events as they flow
* through the Logstash event pipeline including:
*
* <ul>
* <li>Mutation -- Fields in events may be added, removed, or changed by a filter. This is the most common scenario
* for filters that perform various kinds of enrichment on events.</li>
* <li>Deletion -- Events may be removed from the event pipeline by a filter so that subsequent filters and outputs
* do not receive them.</li>
* <li>Creation -- A filter may insert new events into the event pipeline that will be seen only by subsequent
* filters and outputs.</li>
* <li>Observation -- Events may pass unchanged by a filter through the event pipeline. This may be useful in
* scenarios where a filter performs external actions (e.g., updating an external cache) based on the events observed
* in the event pipeline.</li>
* </ul>
*/
public interface Filter extends Plugin {
/**
* Events from the event pipeline are presented for filtering through this method. If the filter either mutates
* the events in-place or simply observes them, the incoming collection of events may be returned without
* modification. If the filter creates new events, those new events must be added to the returned collection.
* If the filter deletes events, the deleted events must be removed from the returned collection.
* @param events Collection of events to be filtered.
* @param matchListener Filter match listener to be notified for each matching event. See
* {@link FilterMatchListener} for more details.
* @return Collection of filtered events.
*/
Collection<Event> filter(Collection<Event> events, FilterMatchListener matchListener);
/**
* If this filter maintains state between calls to {@link #filter(Collection, FilterMatchListener)}, this
* method should return events for all state currently held by the filter. This method will never be called
* by the Logstash execution engine unless {@link #requiresFlush()} returns {@code true} for this filter.
* @param matchListener Filter match listener to be notified for each matching event. See
* {@link FilterMatchListener} for more details.
* @return Collection of events for all state currently held by the filter.
*/
default Collection<Event> flush(FilterMatchListener matchListener) {
return Collections.emptyList();
}
/**
* @return {@code true} if this filter maintains state between calls to
* {@link #filter(Collection, FilterMatchListener)} and therefore requires a flush upon pipeline
* shutdown to return the final events from the filter. The default implementation returns {@code false}
* as is appropriate for stateless filters.
*/
default boolean requiresFlush() {
return false;
}
/**
* @return {@code true} if this filter maintains state between calls to
* {@link #filter(Collection, FilterMatchListener)} and requires periodic calls to flush events from the filter.
* If {@code true}, {@link #requiresFlush()} must also return {@code true} for this filter. The default
* implementation returns {@code false} as is appropriate for stateless filters.
*/
default boolean requiresPeriodicFlush() {
return false;
}
}

View file

@ -0,0 +1,18 @@
package co.elastic.logstash.api;
/**
* Mechanism by which filters indicate which events "match". The common actions for filters such as {@code add_field}
* and {@code add_tag} are applied only to events that are designated as "matching". Some filters such as the
* <a href="https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html">grok filter</a> have a clear
* definition for what constitutes a matching event and will notify the listener only for matching events. Other
* filters such as the <a href="https://www.elastic.co/guide/en/logstash/current/plugins-filters-uuid.html">UUID
* filter</a> have no specific match criteria and should notify the listener for every event filtered.
*/
public interface FilterMatchListener {
/**
* Notify the filter match listener that the specified event "matches" the filter criteria.
* @param e Event that matches the filter criteria.
*/
void filterMatched(Event e);
}

View file

@ -0,0 +1,43 @@
package co.elastic.logstash.api;
import java.util.Map;
import java.util.function.Consumer;
/**
* Logstash Java input interface. Inputs produce events that flow through the Logstash event pipeline. Inputs are
* flexible and may produce events through many different mechanisms including:
*
* <ul>
* <li>a pull mechanism such as periodic queries of external database</li>
* <li>a push mechanism such as events sent from clients to a local network port</li>
* <li>a timed computation such as a heartbeat</li>
* </ul>
*
* or any other mechanism that produces a useful stream of events. Event streams may be either finite or infinite.
* Logstash will run as long as any one of its inputs is still producing events.
*/
public interface Input extends Plugin {
/**
* Start the input and begin pushing events to the supplied {@link Consumer} instance. If the input produces
* an infinite stream of events, this method should loop until a {@link #stop()} request is made. If the
* input produces a finite stream of events, this method should terminate when the last event in the stream
* is produced.
* @param writer Consumer to which events should be pushed
*/
void start(Consumer<Map<String, Object>> writer);
/**
* Notifies the input to stop producing events. Inputs stop both asynchronously and cooperatively. Use the
* {@link #awaitStop()} method to block until the input has completed the stop process.
*/
void stop();
/**
* Blocks until the input has stopped producing events. Note that this method should <b>not</b> signal the
* input to stop as the {@link #stop()} method does.
* @throws InterruptedException On Interrupt
*/
void awaitStop() throws InterruptedException;
}

View file

@ -6,11 +6,15 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Logstash plugin annotation for finding plugins on the classpath and setting their name as used
* in the configuration syntax.
* Annotates a Logstash Java plugin. The value returned from {@link #name()} defines the name of the plugin as
* used in the Logstash pipeline configuration.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface LogstashPlugin {
/**
* @return Name of the plugin as used in the Logstash pipeline configuration.
*/
String name();
}

View file

@ -0,0 +1,31 @@
package co.elastic.logstash.api;
import java.util.Collection;
/**
* Logstash Java output interface. Outputs may send events to local sinks such as the console or a file or to remote
* systems such as Elasticsearch or other external systems.
*/
public interface Output extends Plugin {
/**
* Outputs Collection of {@link Event}.
* @param events Events to be sent through the output.
*/
void output(Collection<Event> events);
/**
* Notifies the output to stop sending events. Outputs with connections to external systems or other resources
* requiring cleanup should perform those tasks upon a stop notification. Outputs stop both asynchronously and
* cooperatively. Use the {@link #awaitStop()} method to block until an output has completed the stop process.
*/
void stop();
/**
* Blocks until the output has stopped sending events. Note that this method should <b>not</b> signal the
* output to stop as the {@link #stop()} method does.
* @throws InterruptedException On Interrupt
*/
void awaitStop() throws InterruptedException;
}

View file

@ -2,7 +2,33 @@ package co.elastic.logstash.api;
import java.util.Collection;
/**
* Base interface for Logstash Java plugins.
*/
public interface Plugin {
/**
* Provides all valid settings for this plugin as a collection of {@link PluginConfigSpec}. This will be used
* to validate against the configuration settings that are supplied to this plugin at runtime.
* @return Valid settings for this plugin.
*/
Collection<PluginConfigSpec<?>> configSchema();
/**
* @return Name for this plugin. The default implementation uses the name specified in the {@link LogstashPlugin}
* annotation, if available, and the class name otherwise.
*/
default String getName() {
LogstashPlugin annotation = getClass().getDeclaredAnnotation(LogstashPlugin.class);
return (annotation != null && !annotation.name().equals(""))
? annotation.name()
: getClass().getName();
}
/**
* @return ID for the plugin. Input, filter, and output plugins must return the ID value that was supplied
* to them at construction time. Codec plugins should generally create their own UUID at instantiation time
* and supply that as their ID.
*/
String getId();
}

View file

@ -2,8 +2,14 @@ package co.elastic.logstash.api;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Plugin configuration specification. Allows the name, type, deprecation status, required status, and default
* value for each configuration setting to be defined.
* @param <T> The expected type of the setting value.
*/
public final class PluginConfigSpec<T> {
private final String name;
@ -16,14 +22,16 @@ public final class PluginConfigSpec<T> {
private final T defaultValue;
private String rawDefaultValue;
private final Collection<PluginConfigSpec<?>> children;
public PluginConfigSpec(final String name, final Class<T> type,
private PluginConfigSpec(final String name, final Class<T> type,
final T defaultValue, final boolean deprecated, final boolean required) {
this(name, type, defaultValue, deprecated, required, Collections.emptyList());
}
public PluginConfigSpec(final String name, final Class<T> type,
private PluginConfigSpec(final String name, final Class<T> type,
final T defaultValue, final boolean deprecated, final boolean required,
final Collection<PluginConfigSpec<?>> children) {
this.name = name;
@ -37,6 +45,96 @@ public final class PluginConfigSpec<T> {
this.children = children;
}
public static PluginConfigSpec<String> stringSetting(final String name) {
return new PluginConfigSpec<>(
name, String.class, null, false, false
);
}
public static PluginConfigSpec<String> stringSetting(final String name, final String defaultValue) {
return new PluginConfigSpec<>(
name, String.class, defaultValue, false, false
);
}
public static PluginConfigSpec<String> requiredStringSetting(final String name) {
return new PluginConfigSpec<>(name, String.class, null, false, true);
}
public static PluginConfigSpec<String> stringSetting(final String name, final String defaultValue, boolean deprecated, boolean required) {
return new PluginConfigSpec<>(name, String.class, defaultValue, deprecated, required);
}
public static PluginConfigSpec<Codec> codecSetting(final String name) {
return new PluginConfigSpec<>(
name, Codec.class, null, false, false
);
}
public static PluginConfigSpec<Codec> codecSetting(final String name, final String defaultCodecName) {
PluginConfigSpec<Codec> pcs = new PluginConfigSpec<>(
name, Codec.class, null, false, false
);
pcs.rawDefaultValue = defaultCodecName;
return pcs;
}
public static PluginConfigSpec<Codec> codecSetting(final String name, final Codec defaultValue, boolean deprecated, boolean required) {
return new PluginConfigSpec<>(name, Codec.class, defaultValue, deprecated, required);
}
public static PluginConfigSpec<Long> numSetting(final String name) {
return new PluginConfigSpec<>(
name, Long.class, null, false, false
);
}
public static PluginConfigSpec<Long> numSetting(final String name, final long defaultValue) {
return new PluginConfigSpec<>(
name, Long.class, defaultValue, false, false
);
}
public static PluginConfigSpec<Long> numSetting(final String name, final long defaultValue, boolean deprecated, boolean required) {
return new PluginConfigSpec<>(name, Long.class, defaultValue, deprecated, required);
}
public static PluginConfigSpec<Boolean> booleanSetting(final String name) {
return new PluginConfigSpec<>(name, Boolean.class, null, false, false);
}
public static PluginConfigSpec<Boolean> booleanSetting(final String name, final boolean defaultValue) {
return new PluginConfigSpec<>(name, Boolean.class, defaultValue, false, false);
}
public static PluginConfigSpec<Boolean> requiredBooleanSetting(final String name) {
return new PluginConfigSpec<>(name, Boolean.class, null, false, true);
}
public static PluginConfigSpec<Boolean> booleanSetting(final String name, final boolean defaultValue, boolean deprecated, boolean required) {
return new PluginConfigSpec<>(name, Boolean.class, defaultValue, deprecated, required);
}
@SuppressWarnings({"unchecked","rawtypes"})
public static PluginConfigSpec<Map<String, Object>> hashSetting(final String name) {
return new PluginConfigSpec(name, Map.class, null, false, false);
}
@SuppressWarnings({"unchecked","rawtypes"})
public static PluginConfigSpec<Map<String, Object>> hashSetting(final String name, Map<String, Object> defaultValue, boolean deprecated, boolean required) {
return new PluginConfigSpec(name, Map.class, defaultValue, deprecated, required);
}
@SuppressWarnings({"unchecked","rawtypes"})
public static PluginConfigSpec<List<Object>> arraySetting(final String name) {
return new PluginConfigSpec(name, List.class, null, false, false);
}
@SuppressWarnings({"unchecked","rawtypes"})
public static PluginConfigSpec<List<Object>> arraySetting(final String name, List<Object> defaultValue, boolean deprecated, boolean required) {
return new PluginConfigSpec(name, List.class, defaultValue, deprecated, required);
}
public Collection<PluginConfigSpec<?>> children() {
return children;
}
@ -61,4 +159,7 @@ public final class PluginConfigSpec<T> {
return type;
}
public String getRawDefaultValue() {
return rawDefaultValue;
}
}

View file

@ -3,115 +3,112 @@ package co.elastic.logstash.api;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Utility methods for specifying common plugin config settings.
*/
public final class PluginHelper {
public static final PluginConfigSpec<Map<String, String>> ADD_FIELD_CONFIG =
Configuration.hashSetting("add_field");
public static final PluginConfigSpec<Map<String, Object>> ADD_FIELD_CONFIG =
PluginConfigSpec.hashSetting("add_field");
//public static final PluginConfigSpec<Array> ADD_TAG_CONFIG =
// Configuration.arraySetting("add_tag");
public static final PluginConfigSpec<List<Object>> ADD_TAG_CONFIG =
PluginConfigSpec.arraySetting("add_tag");
public static final PluginConfigSpec<String> CODEC_CONFIG =
Configuration.stringSetting("codec");
public static final PluginConfigSpec<Codec> CODEC_CONFIG =
PluginConfigSpec.codecSetting("codec");
public static final PluginConfigSpec<Boolean> ENABLE_METRIC_CONFIG =
Configuration.booleanSetting("enable_metric");
PluginConfigSpec.booleanSetting("enable_metric");
public static final PluginConfigSpec<String> ID_CONFIG =
Configuration.stringSetting("id");
PluginConfigSpec.stringSetting("id");
public static final PluginConfigSpec<Boolean> PERIODIC_FLUSH_CONFIG =
Configuration.booleanSetting("periodic_flush");
PluginConfigSpec.booleanSetting("periodic_flush");
//public static final PluginConfigSpec<Array> REMOVE_FIELD_CONFIG =
// Configuration.arraySetting("remove_field");
public static final PluginConfigSpec<List<Object>> REMOVE_FIELD_CONFIG =
PluginConfigSpec.arraySetting("remove_field");
//public static final PluginConfigSpec<Array> REMOVE_TAG_CONFIG =
// Configuration.arraySetting("remove_tag");
public static final PluginConfigSpec<List<Object>> REMOVE_TAG_CONFIG =
PluginConfigSpec.arraySetting("remove_tag");
//public static final PluginConfigSpec<Array> TAGS_CONFIG =
// Configuration.arraySetting("tags");
public static final PluginConfigSpec<List<Object>> TAGS_CONFIG =
PluginConfigSpec.arraySetting("tags");
public static final PluginConfigSpec<String> TYPE_CONFIG =
Configuration.stringSetting("type");
PluginConfigSpec.stringSetting("type");
/**
* @return Options that are common to all input plugins.
* @return Settings that are common to all input plugins.
*/
@SuppressWarnings("unchecked")
public static Collection<PluginConfigSpec<?>> commonInputOptions() {
return commonInputOptions(Collections.EMPTY_LIST);
public static Collection<PluginConfigSpec<?>> commonInputSettings() {
return Arrays.asList(ADD_FIELD_CONFIG, ENABLE_METRIC_CONFIG, CODEC_CONFIG, ID_CONFIG,
TAGS_CONFIG, TYPE_CONFIG);
}
/**
* Combines the provided list of options with the options that are common to all input plugins
* Combines the provided list of settings with the settings that are common to all input plugins
* ignoring any that are already present in the provided list. This allows plugins to override
* defaults and other values on the common config options.
* @param options provided list of options.
* @return combined list of options.
* defaults and other values on the common config settings.
* @param settings provided list of settings.
* @return combined list of settings.
*/
public static Collection<PluginConfigSpec<?>> commonInputOptions(Collection<PluginConfigSpec<?>> options) {
return combineOptions(options, Arrays.asList(ADD_FIELD_CONFIG, ENABLE_METRIC_CONFIG,
CODEC_CONFIG, ID_CONFIG, /*TAGS_CONFIG,*/ TYPE_CONFIG));
public static Collection<PluginConfigSpec<?>> commonInputSettings(Collection<PluginConfigSpec<?>> settings) {
return combineSettings(settings, commonInputSettings());
}
/**
* @return Options that are common to all output plugins.
* @return Settings that are common to all output plugins.
*/
@SuppressWarnings("unchecked")
public static Collection<PluginConfigSpec<?>> commonOutputOptions() {
return commonOutputOptions(Collections.EMPTY_LIST);
public static Collection<PluginConfigSpec<?>> commonOutputSettings() {
return Arrays.asList(ENABLE_METRIC_CONFIG, CODEC_CONFIG, ID_CONFIG);
}
/**
* Combines the provided list of options with the options that are common to all output plugins
* Combines the provided list of settings with the settings that are common to all output plugins
* ignoring any that are already present in the provided list. This allows plugins to override
* defaults and other values on the common config options.
* @param options provided list of options.
* @return combined list of options.
* defaults and other values on the common config settings.
* @param settings provided list of settings.
* @return combined list of settings.
*/
public static Collection<PluginConfigSpec<?>> commonOutputOptions(Collection<PluginConfigSpec<?>> options) {
return combineOptions(options, Arrays.asList(ENABLE_METRIC_CONFIG, CODEC_CONFIG, ID_CONFIG));
public static Collection<PluginConfigSpec<?>> commonOutputSettings(Collection<PluginConfigSpec<?>> settings) {
return combineSettings(settings, commonOutputSettings());
}
/**
* @return Options that are common to all filter plugins.
* @return Settings that are common to all filter plugins.
*/
@SuppressWarnings("unchecked")
public static Collection<PluginConfigSpec<?>> commonFilterOptions() {
return commonFilterOptions(Collections.EMPTY_LIST);
public static Collection<PluginConfigSpec<?>> commonFilterSettings() {
return Arrays.asList(ADD_FIELD_CONFIG, ADD_TAG_CONFIG, ENABLE_METRIC_CONFIG, ID_CONFIG,
PERIODIC_FLUSH_CONFIG , REMOVE_FIELD_CONFIG, REMOVE_TAG_CONFIG);
}
/**
* Combines the provided list of options with the options that are common to all filter plugins
* Combines the provided list of settings with the settings that are common to all filter plugins
* ignoring any that are already present in the provided list. This allows plugins to override
* defaults and other values on the common config options.
* @param options provided list of options.
* @return combined list of options.
* defaults and other values on the common config settings.
* @param settings provided list of settings.
* @return combined list of settings.
*/
public static Collection<PluginConfigSpec<?>> commonFilterOptions(Collection<PluginConfigSpec<?>> options) {
return combineOptions(options, Arrays.asList(ADD_FIELD_CONFIG, /*ADD_TAG_CONFIG,*/
ENABLE_METRIC_CONFIG, ID_CONFIG, PERIODIC_FLUSH_CONFIG /*, REMOVE_FIELD_CONFIG,
REMOVE_TAG_CONFIG*/));
public static Collection<PluginConfigSpec<?>> commonFilterSettings(Collection<PluginConfigSpec<?>> settings) {
return combineSettings(settings, commonFilterSettings());
}
@SuppressWarnings("rawtypes")
private static Collection<PluginConfigSpec<?>> combineOptions(
Collection<PluginConfigSpec<?>> providedOptions,
Collection<PluginConfigSpec<?>> commonOptions) {
List<PluginConfigSpec<?>> options = new ArrayList<>();
options.addAll(providedOptions);
for (PluginConfigSpec pcs : commonOptions) {
if (!options.contains(pcs)) {
options.add(pcs);
private static Collection<PluginConfigSpec<?>> combineSettings(
Collection<PluginConfigSpec<?>> providedSettings,
Collection<PluginConfigSpec<?>> commonSettings) {
List<PluginConfigSpec<?>> settings = new ArrayList<>(providedSettings);
for (PluginConfigSpec pcs : commonSettings) {
if (!settings.contains(pcs)) {
settings.add(pcs);
}
}
return options;
return settings;
}
}

View file

@ -1,50 +0,0 @@
package co.elastic.logstash.api.v0;
import co.elastic.logstash.api.Plugin;
import org.logstash.Event;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.function.Consumer;
public interface Codec extends Plugin {
/**
* Decodes events from the specified {@link ByteBuffer} and passes them to the provided
* {@link Consumer}.
*
* <ul>
* <li>The client (typically an {@link Input}) must provide a {@link ByteBuffer} that
* is ready for reading with with {@link ByteBuffer#position} indicating the next
* position to read and {@link ByteBuffer#limit} indicating the first byte in the
* buffer that is not safe to read.</li>
*
* <li>Implementations of {@link Codec} must ensure that {@link ByteBuffer#position}
* reflects the last-read position before returning control.</li>
*
* <li>The client is then responsible for returning the buffer
* to write mode via either {@link ByteBuffer#clear} or {@link ByteBuffer#compact} before
* resuming writes.</li>
* </ul>
*
* @param buffer Input buffer from which events will be decoded.
* @param eventConsumer Consumer to which decoded events will be passed.
*/
void decode(ByteBuffer buffer, Consumer<Map<String, Object>> eventConsumer);
/**
* Decodes all remaining events from the specified {@link ByteBuffer} along with any internal
* state that may remain after previous calls to {@link #decode(ByteBuffer, Consumer)}.
* @param buffer Input buffer from which events will be decoded.
* @param eventConsumer Consumer to which decoded events will be passed.
*/
void flush(ByteBuffer buffer, Consumer<Map<String, Object>> eventConsumer);
/**
* Encodes an {@link Event} and writes it to the specified {@link OutputStream}.
* @param event The event to encode.
* @param output The stream to which the encoded event should be written.
*/
void encode(Event event, OutputStream output);
}

View file

@ -1,15 +0,0 @@
package co.elastic.logstash.api.v0;
import co.elastic.logstash.api.Plugin;
import org.logstash.Event;
import java.util.Collection;
/**
* A Logstash Filter.
*/
public interface Filter extends Plugin {
Collection<Event> filter(Collection<Event> events);
}

View file

@ -1,30 +0,0 @@
package co.elastic.logstash.api.v0;
import co.elastic.logstash.api.Plugin;
import org.logstash.execution.queue.QueueWriter;
/**
* A Logstash Pipeline Input pushes to a {@link QueueWriter}.
*/
public interface Input extends Plugin {
/**
* Start pushing {@link org.logstash.Event} to given {@link QueueWriter}.
* @param writer Queue Writer to Push to
*/
void start(QueueWriter writer);
/**
* Stop the input.
* Stopping happens asynchronously, use {@link #awaitStop()} to make sure that the input has
* finished.
*/
void stop();
/**
* Blocks until the input execution has finished.
* @throws InterruptedException On Interrupt
*/
void awaitStop() throws InterruptedException;
}

View file

@ -1,23 +0,0 @@
package co.elastic.logstash.api.v0;
import co.elastic.logstash.api.Plugin;
import org.logstash.Event;
import java.util.Collection;
/**
* A Logstash Pipeline Output.
*/
public interface Output extends Plugin {
/**
* Outputs Collection of {@link Event}.
* @param events Events to Output
*/
void output(Collection<Event> events);
void stop();
void awaitStop() throws InterruptedException;
}

View file

@ -1,14 +1,6 @@
package org.logstash;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.joda.time.DateTime;
@ -18,10 +10,20 @@ import org.jruby.RubySymbol;
import org.logstash.ackedqueue.Queueable;
import org.logstash.ext.JrubyTimestampExtLibrary;
import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.logstash.ObjectMappers.CBOR_MAPPER;
import static org.logstash.ObjectMappers.JSON_MAPPER;
public final class Event implements Cloneable, Queueable {
public final class Event implements Cloneable, Queueable, co.elastic.logstash.api.Event {
private boolean cancelled;
private ConvertedMap data;
@ -91,34 +93,52 @@ public final class Event implements Cloneable, Queueable {
}
}
@Override
public ConvertedMap getData() {
return this.data;
}
@Override
public ConvertedMap getMetadata() {
return this.metadata;
}
@Override
public void cancel() {
this.cancelled = true;
}
@Override
public void uncancel() {
this.cancelled = false;
}
@Override
public boolean isCancelled() {
return this.cancelled;
}
public Timestamp getTimestamp() throws IOException {
@Override
public Instant getEventTimestamp() {
Timestamp t = getTimestamp();
return (t != null)
? Instant.ofEpochMilli(t.toEpochMilli())
: null;
}
@Override
public void setEventTimestamp(Instant timestamp) {
setTimestamp(timestamp != null
? new Timestamp(timestamp.toEpochMilli())
: new Timestamp(Instant.now().toEpochMilli()));
}
public Timestamp getTimestamp() {
final JrubyTimestampExtLibrary.RubyTimestamp timestamp =
(JrubyTimestampExtLibrary.RubyTimestamp) data.get(TIMESTAMP);
if (timestamp != null) {
return timestamp.getTimestamp();
} else {
throw new IOException("fails");
}
return (timestamp != null)
? timestamp.getTimestamp()
: null;
}
public void setTimestamp(Timestamp t) {
@ -127,11 +147,13 @@ public final class Event implements Cloneable, Queueable {
);
}
@Override
public Object getField(final String reference) {
final Object unconverted = getUnconvertedField(FieldReference.from(reference));
return unconverted == null ? null : Javafier.deep(unconverted);
}
@Override
public Object getUnconvertedField(final String reference) {
return getUnconvertedField(FieldReference.from(reference));
}
@ -147,6 +169,7 @@ public final class Event implements Cloneable, Queueable {
}
}
@Override
public void setField(final String reference, final Object value) {
setField(FieldReference.from(reference), value);
}
@ -166,6 +189,7 @@ public final class Event implements Cloneable, Queueable {
}
}
@Override
public boolean includes(final String field) {
return includes(FieldReference.from(field));
}
@ -231,19 +255,32 @@ public final class Event implements Cloneable, Queueable {
return result;
}
@Override
public Map<String, Object> toMap() {
return Cloner.deep(this.data);
}
@Override
public co.elastic.logstash.api.Event overwrite(co.elastic.logstash.api.Event e) {
if (e instanceof Event) {
return overwrite((Event)e);
}
return e;
}
@Override
public co.elastic.logstash.api.Event append(co.elastic.logstash.api.Event e) {
if (e instanceof Event) {
return append((Event)e);
}
return e;
}
public Event overwrite(Event e) {
this.data = e.data;
this.cancelled = e.cancelled;
try {
e.getTimestamp();
} catch (IOException exception) {
setTimestamp(new Timestamp());
}
Timestamp t = e.getTimestamp();
setTimestamp(t == null ? new Timestamp() : t);
return this;
}
@ -253,6 +290,7 @@ public final class Event implements Cloneable, Queueable {
return this;
}
@Override
public Object remove(final String path) {
return remove(FieldReference.from(path));
}
@ -261,6 +299,7 @@ public final class Event implements Cloneable, Queueable {
return Accessors.del(data, field);
}
@Override
public String sprintf(String s) throws IOException {
return StringInterpolation.evaluate(this, s);
}
@ -273,17 +312,16 @@ public final class Event implements Cloneable, Queueable {
return new Event(map);
}
@Override
public String toString() {
Object hostField = this.getField("host");
Object messageField = this.getField("message");
String hostMessageString = (hostField != null ? hostField.toString() : "%{host}") + " " + (messageField != null ? messageField.toString() : "%{message}");
try {
// getTimestamp throws an IOException if there is no @timestamp field, see #7613
return getTimestamp().toString() + " " + hostMessageString;
} catch (IOException e) {
return hostMessageString;
}
Timestamp t = getTimestamp();
return t != null
? t.toString() + " " + hostMessageString
: hostMessageString;
}
private static Timestamp initTimestamp(Object o) {
@ -326,6 +364,7 @@ public final class Event implements Cloneable, Queueable {
return null;
}
@Override
public void tag(final String tag) {
final Object tags = Accessors.get(data, TAGS_FIELD);
// short circuit the null case where we know we won't need deduplication step below at the end

View file

@ -16,6 +16,7 @@ import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt;
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.JavaFilterDelegatorExt;
import org.logstash.config.ir.compiler.JavaInputDelegatorExt;
import org.logstash.config.ir.compiler.JavaOutputDelegatorExt;
import org.logstash.config.ir.compiler.OutputDelegatorExt;
import org.logstash.config.ir.compiler.OutputStrategyExt;
@ -109,6 +110,8 @@ public final class RubyUtil {
public static final RubyClass JAVA_FILTER_DELEGATOR_CLASS;
public static final RubyClass JAVA_INPUT_DELEGATOR_CLASS;
public static final RubyClass FILTER_DELEGATOR_CLASS;
public static final RubyClass OUTPUT_STRATEGY_REGISTRY;
@ -207,8 +210,6 @@ public final class RubyUtil {
public static final RubyClass JAVA_PIPELINE_CLASS;
public static final RubyClass JAVA_INPUT_WRAPPER_CLASS;
/**
* Logstash Ruby Module.
*/
@ -439,6 +440,7 @@ public final class RubyUtil {
ABSTRACT_FILTER_DELEGATOR_CLASS, FilterDelegatorExt::new,
FilterDelegatorExt.class
);
JAVA_INPUT_DELEGATOR_CLASS = setupLogstashClass(JavaInputDelegatorExt::new, JavaInputDelegatorExt.class);
final RubyModule loggingModule = LOGSTASH_MODULE.defineOrGetModuleUnder("Logging");
LOGGER = loggingModule.defineClassUnder("Logger", RUBY.getObject(), LoggerExt::new);
LOGGER.defineAnnotatedMethods(LoggerExt.class);
@ -452,8 +454,6 @@ public final class RubyUtil {
JAVA_PIPELINE_CLASS = setupLogstashClass(
ABSTRACT_PIPELINE_CLASS, JavaBasePipelineExt::new, JavaBasePipelineExt.class
);
JAVA_INPUT_WRAPPER_CLASS = setupLogstashClass(PluginFactoryExt.JavaInputWrapperExt::new,
PluginFactoryExt.JavaInputWrapperExt.class);
final RubyModule json = LOGSTASH_MODULE.defineOrGetModuleUnder("Json");
final RubyClass stdErr = RUBY.getStandardError();
LOGSTASH_ERROR = LOGSTASH_MODULE.defineClassUnder(

View file

@ -1,11 +1,12 @@
package org.logstash;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import java.util.List;
import java.util.Map;
public final class StringInterpolation {
private static final ThreadLocal<StringBuilder> STRING_BUILDER =
@ -28,7 +29,16 @@ public final class StringInterpolation {
// Utility Class
}
public static String evaluate(final Event event, final String template) throws IOException {
public static String evaluate(final co.elastic.logstash.api.Event event, final String template)
throws JsonProcessingException {
if (event instanceof Event) {
return evaluate((Event) event, template);
} else {
throw new IllegalStateException("Unknown event concrete class: " + event.getClass().getName());
}
}
public static String evaluate(final Event event, final String template) throws JsonProcessingException {
int open = template.indexOf("%{");
int close = template.indexOf('}', open);
if (open == -1 || close == -1) {
@ -41,13 +51,16 @@ public final class StringInterpolation {
builder.append(template, pos, open);
}
if (template.regionMatches(open + 2, "+%s", 0, close - open - 2)) {
builder.append(event.getTimestamp().getTime().getMillis() / 1000L);
Timestamp t = event.getTimestamp();
builder.append(t == null ? "" : t.getTime().getMillis() / 1000L);
} else if (template.charAt(open + 2) == '+') {
builder.append(
event.getTimestamp().getTime().toString(
DateTimeFormat.forPattern(template.substring(open + 3, close))
.withZone(DateTimeZone.UTC)
));
Timestamp t = event.getTimestamp();
builder.append(t != null
? event.getTimestamp().getTime().toString(
DateTimeFormat.forPattern(template.substring(open + 3, close))
.withZone(DateTimeZone.UTC))
: ""
);
} else {
final String found = template.substring(open + 2, close);
final Object value = event.getField(found);

View file

@ -68,6 +68,11 @@ public final class Timestamp implements Comparable<Timestamp>, Queueable {
return iso8601Formatter.print(this.time);
}
public long toEpochMilli() {
return time.getMillis();
}
// returns the fraction of a second as microseconds, not the number of microseconds since epoch
public long usec() {
// JodaTime only supports milliseconds precision we can only return usec at millisec precision.
// note that getMillis() return millis since epoch

View file

@ -1,8 +1,10 @@
package org.logstash.config.ir;
import co.elastic.logstash.api.Codec;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jruby.RubyHash;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.Rubyfier;
@ -19,16 +21,8 @@ import org.logstash.config.ir.graph.IfVertex;
import org.logstash.config.ir.graph.PluginVertex;
import org.logstash.config.ir.graph.Vertex;
import org.logstash.config.ir.imperative.PluginStatement;
import co.elastic.logstash.api.v0.Input;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import org.logstash.plugins.PluginFactoryExt;
import org.logstash.plugins.discovery.PluginRegistry;
import org.logstash.ext.JrubyEventExtLibrary;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -59,11 +53,6 @@ public final class CompiledPipeline {
*/
private final Collection<IRubyObject> inputs;
/**
* Configured Java Inputs.
*/
private final Collection<Input> javaInputs = new ArrayList<>();
/**
* Configured Filters, indexed by their ID as returned by {@link PluginVertex#getId()}.
*/
@ -93,7 +82,7 @@ public final class CompiledPipeline {
outputs = setupOutputs();
}
public Collection<IRubyObject> outputs() {
public Collection<AbstractOutputDelegatorExt> outputs() {
return Collections.unmodifiableCollection(outputs.values());
}
@ -102,11 +91,7 @@ public final class CompiledPipeline {
}
public Collection<IRubyObject> inputs() {
return inputs;
}
public Collection<Input> javaInputs() {
return javaInputs;
return Collections.unmodifiableCollection(inputs);
}
/**
@ -129,7 +114,7 @@ public final class CompiledPipeline {
final SourceWithMetadata source = v.getSourceWithMetadata();
res.put(v.getId(), pluginFactory.buildOutput(
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), def.getArguments()
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def)
));
});
return res;
@ -147,7 +132,7 @@ public final class CompiledPipeline {
final SourceWithMetadata source = vertex.getSourceWithMetadata();
res.put(vertex.getId(), pluginFactory.buildFilter(
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), def.getArguments()
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def)
));
}
return res;
@ -161,26 +146,11 @@ public final class CompiledPipeline {
final Collection<IRubyObject> nodes = new HashSet<>(vertices.size());
vertices.forEach(v -> {
final PluginDefinition def = v.getPluginDefinition();
final Class<Input> cls = PluginRegistry.getInputClass(def.getName());
if (cls != null) {
try {
final Constructor<Input> ctor = cls.getConstructor(Configuration.class, Context.class);
javaInputs.add(ctor.newInstance(new Configuration(def.getArguments()), new Context()));
} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) {
throw new IllegalStateException(ex);
}
} else {
final SourceWithMetadata source = v.getSourceWithMetadata();
IRubyObject o = pluginFactory.buildInput(
final SourceWithMetadata source = v.getSourceWithMetadata();
IRubyObject o = pluginFactory.buildInput(
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), def.getArguments());
if (o instanceof PluginFactoryExt.JavaInputWrapperExt) {
javaInputs.add(((PluginFactoryExt.JavaInputWrapperExt)o).getInput());
} else {
nodes.add(o);
}
}
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def));
nodes.add(o);
});
return nodes;
}
@ -203,7 +173,7 @@ public final class CompiledPipeline {
toput = pluginFactory.buildCodec(
RubyUtil.RUBY.newString(codec.getName()),
Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()),
def.getArguments()
codec.getArguments()
);
} else {
toput = value;
@ -213,6 +183,32 @@ public final class CompiledPipeline {
return converted;
}
/**
* Converts plugin arguments from the format provided by {@link PipelineIR} into coercible
* Java types for consumption by Java plugins.
* @param def PluginDefinition as provided by {@link PipelineIR}
* @return Map of plugin arguments as understood by the {@link RubyIntegration.PluginFactory}
* methods that create Java plugins
*/
private Map<String, Object> convertJavaArgs(final PluginDefinition def) {
for (final Map.Entry<String, Object> entry : def.getArguments().entrySet()) {
final Object value = entry.getValue();
final String key = entry.getKey();
final IRubyObject toput;
if (value instanceof PluginStatement) {
final PluginDefinition codec = ((PluginStatement) value).getPluginDefinition();
toput = pluginFactory.buildCodec(
RubyUtil.RUBY.newString(codec.getName()),
Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()),
codec.getArguments()
);
Codec javaCodec = (Codec)JavaUtil.unwrapJavaValue(toput);
def.getArguments().put(key, javaCodec);
}
}
return def.getArguments();
}
/**
* Checks if a certain {@link Vertex} represents a {@link AbstractFilterDelegatorExt}.
* @param vertex Vertex to check

View file

@ -0,0 +1,194 @@
package org.logstash.config.ir.compiler;
import co.elastic.logstash.api.Event;
import org.jruby.RubyArray;
import org.logstash.RubyUtil;
import org.logstash.StringInterpolation;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import static co.elastic.logstash.api.PluginHelper.ADD_FIELD_CONFIG;
import static co.elastic.logstash.api.PluginHelper.ADD_TAG_CONFIG;
import static co.elastic.logstash.api.PluginHelper.REMOVE_FIELD_CONFIG;
import static co.elastic.logstash.api.PluginHelper.REMOVE_TAG_CONFIG;
import static co.elastic.logstash.api.PluginHelper.TAGS_CONFIG;
import static co.elastic.logstash.api.PluginHelper.TYPE_CONFIG;
/**
* Implements common actions such as "add_field" and "tags" that can be specified
* for various plugins.
*/
class CommonActions {
@SuppressWarnings("unchecked")
static Consumer<Event> getFilterAction(Map.Entry<String, Object> actionDefinition) {
String actionName = actionDefinition.getKey();
if (actionName.equals(ADD_FIELD_CONFIG.name())) {
return x -> addField(x, (Map<String, Object>) actionDefinition.getValue());
}
if (actionName.equals(ADD_TAG_CONFIG.name())) {
return x -> addTag(x, (List<Object>) actionDefinition.getValue());
}
if (actionName.equals(REMOVE_FIELD_CONFIG.name())) {
return x -> removeField(x, (List<String>) actionDefinition.getValue());
}
if (actionName.equals(REMOVE_TAG_CONFIG.name())) {
return x -> removeTag(x, (List<String>) actionDefinition.getValue());
}
return null;
}
@SuppressWarnings("unchecked")
static Function<Map<String, Object>, Map<String, Object>> getInputAction(
Map.Entry<String, Object> actionDefinition) {
String actionName = actionDefinition.getKey();
if (actionName.equals(ADD_FIELD_CONFIG.name())) {
return x -> addField(x, (Map<String, Object>) actionDefinition.getValue());
}
if (actionName.equals(TAGS_CONFIG.name())) {
return x -> addTag(x, (List<Object>) actionDefinition.getValue());
}
if (actionName.equals(TYPE_CONFIG.name())) {
return x -> addType(x, (String) actionDefinition.getValue());
}
return null;
}
/**
* Implements the {@code add_field} option for Logstash inputs.
*
* @param event Event on which to add the fields.
* @param fieldsToAdd The fields to be added to the event.
* @return Updated event.
*/
static Map<String, Object> addField(Map<String, Object> event, Map<String, Object> fieldsToAdd) {
Event tempEvent = new org.logstash.Event(event);
addField(tempEvent, fieldsToAdd);
return tempEvent.getData();
}
/**
* Implements the {@code add_field} option for Logstash filters.
*
* @param evt Event on which to add the fields.
* @param fieldsToAdd The fields to be added to the event.
*/
@SuppressWarnings("unchecked")
static void addField(Event evt, Map<String, Object> fieldsToAdd) {
try {
for (Map.Entry<String, Object> entry : fieldsToAdd.entrySet()) {
String keyToSet = StringInterpolation.evaluate(evt, entry.getKey());
Object val = evt.getField(keyToSet);
Object valueToSet = entry.getValue();
valueToSet = valueToSet instanceof String
? StringInterpolation.evaluate(evt, (String) entry.getValue())
: entry.getValue();
if (val == null) {
evt.setField(keyToSet, valueToSet);
} else {
if (val instanceof List) {
((List) val).add(valueToSet);
evt.setField(keyToSet, val);
} else {
RubyArray list = RubyArray.newArray(RubyUtil.RUBY, 2);
list.add(val);
list.add(valueToSet);
evt.setField(keyToSet, list);
}
}
}
} catch (IOException ex) {
throw new IllegalStateException(ex);
}
}
/**
* Implements the {@code tags} option for Logstash inputs.
*
* @param e Event on which to add the tags.
* @param tags The tags to be added to the event.
* @return Updated event.
*/
static Map<String, Object> addTag(Map<String, Object> e, List<Object> tags) {
Event tempEvent = new org.logstash.Event(e);
addTag(tempEvent, tags);
return tempEvent.getData();
}
/**
* Implements the {@code add_tag} option for Logstash filters.
*
* @param evt Event on which to add the tags.
* @param tags The tags to be added to the event.
*/
static void addTag(Event evt, List<Object> tags) {
try {
for (Object o : tags) {
String tagToAdd = StringInterpolation.evaluate(evt, o.toString());
evt.tag(tagToAdd);
}
} catch (IOException ex) {
throw new IllegalStateException(ex);
}
}
/**
* Implements the {@code type} option for Logstash inputs.
*
* @param event Event on which to set the type.
* @param type The type to set on the event.
* @return Updated event.
*/
static Map<String, Object> addType(Map<String, Object> event, String type) {
event.putIfAbsent("type", type);
return event;
}
/**
* Implements the {@code remove_field} option for Logstash filters.
*
* @param evt Event from which to remove the fields.
* @param fieldsToRemove The fields to remove from the event.
*/
static void removeField(Event evt, List<String> fieldsToRemove) {
try {
for (String s : fieldsToRemove) {
String fieldToRemove = StringInterpolation.evaluate(evt, s);
evt.remove(fieldToRemove);
}
} catch (IOException ex) {
throw new IllegalStateException(ex);
}
}
/**
* Implements the {@code remove_tag} option for Logstash filters.
*
* @param evt Event from which to remove the tags.
* @param tagsToRemove The tags to remove from the event.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
static void removeTag(Event evt, List<String> tagsToRemove) {
Object o = evt.getField("tags");
if (o instanceof List) {
List tags = (List) o;
if (tags.size() > 0) {
try {
for (String s : tagsToRemove) {
String tagToRemove = StringInterpolation.evaluate(evt, s);
tags.remove(tagToRemove);
}
evt.setField("tags", tags);
} catch (IOException ex) {
throw new IllegalStateException(ex);
}
}
}
}
}

View file

@ -1,5 +1,8 @@
package org.logstash.config.ir.compiler;
import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.Filter;
import co.elastic.logstash.api.FilterMatchListener;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
@ -9,14 +12,15 @@ import org.jruby.RubySymbol;
import org.jruby.anno.JRubyClass;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.Event;
import org.logstash.RubyUtil;
import co.elastic.logstash.api.v0.Filter;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@JRubyClass(name = "JavaFilterDelegator")
@ -30,41 +34,55 @@ public class JavaFilterDelegatorExt extends AbstractFilterDelegatorExt {
private Filter filter;
private FilterMatchListener filterMatchListener;
public JavaFilterDelegatorExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
public static JavaFilterDelegatorExt create(final String configName, final String id,
final AbstractNamespacedMetricExt metric,
final Filter filter) {
final Filter filter, final Map<String, Object> pluginArgs) {
final JavaFilterDelegatorExt instance =
new JavaFilterDelegatorExt(RubyUtil.RUBY, RubyUtil.JAVA_FILTER_DELEGATOR_CLASS);
instance.configName = RubyUtil.RUBY.newString(configName);
instance.initMetrics(id, metric);
AbstractNamespacedMetricExt scopedMetric =
metric.namespace(RubyUtil.RUBY.getCurrentContext(), RubyUtil.RUBY.newSymbol(filter.getId()));
instance.initMetrics(id, scopedMetric);
instance.filter = filter;
instance.initializeFilterMatchListener(pluginArgs);
return instance;
}
@SuppressWarnings("unchecked")
@Override
protected RubyArray doMultiFilter(final RubyArray batch) {
List<Event> inputEvents = (List<Event>)batch.stream()
.map(x -> ((JrubyEventExtLibrary.RubyEvent)x).getEvent())
List<Event> inputEvents = (List<Event>) batch.stream()
.map(x -> ((JrubyEventExtLibrary.RubyEvent) x).getEvent())
.collect(Collectors.toList());
Collection<Event> outputEvents = filter.filter(inputEvents);
Collection<Event> outputEvents = filter.filter(inputEvents, filterMatchListener);
RubyArray newBatch = RubyArray.newArray(RubyUtil.RUBY, outputEvents.size());
for (Event outputEvent : outputEvents) {
newBatch.add(JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, outputEvent));
newBatch.add(JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, (org.logstash.Event)outputEvent));
}
return newBatch;
}
@Override
protected void doRegister(ThreadContext context) {}
protected void doRegister(ThreadContext context) {
}
@Override
protected IRubyObject doFlush(final ThreadContext context, final RubyHash options) {
// add flush() to Java filter API?
if (filter.requiresFlush()) {
Collection<Event> outputEvents = filter.flush(filterMatchListener);
RubyArray newBatch = RubyArray.newArray(RubyUtil.RUBY, outputEvents.size());
for (Event outputEvent : outputEvents) {
newBatch.add(JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, (org.logstash.Event)outputEvent));
}
return newBatch;
}
return context.nil;
}
@ -100,11 +118,46 @@ public class JavaFilterDelegatorExt extends AbstractFilterDelegatorExt {
@Override
protected boolean getHasFlush() {
return false;
return filter.requiresFlush();
}
@Override
protected boolean getPeriodicFlush() {
return false;
return filter.requiresPeriodicFlush();
}
@SuppressWarnings("unchecked")
private void initializeFilterMatchListener(Map<String, Object> pluginArgs) {
List<Consumer<Event>> filterActions = new ArrayList<>();
for (Map.Entry<String, Object> entry : pluginArgs.entrySet()) {
Consumer<Event> filterAction =
CommonActions.getFilterAction(entry);
if (filterAction != null) {
filterActions.add(filterAction);
}
}
if (filterActions.size() == 0) {
this.filterMatchListener = e -> {
};
} else {
this.filterMatchListener = new DecoratingFilterMatchListener(filterActions);
}
}
static class DecoratingFilterMatchListener implements FilterMatchListener {
private final List<Consumer<Event>> actions;
DecoratingFilterMatchListener(List<Consumer<Event>> actions) {
this.actions = actions;
}
@Override
public void filterMatched(Event e) {
for (Consumer<Event> action : actions) {
action.accept(e);
}
}
}
}

View file

@ -0,0 +1,161 @@
package org.logstash.config.ir.compiler;
import co.elastic.logstash.api.Input;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.javasupport.JavaObject;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.execution.JavaBasePipelineExt;
import org.logstash.execution.queue.QueueWriter;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricKeys;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@JRubyClass(name = "JavaInputDelegator")
public class JavaInputDelegatorExt extends RubyObject {
private static final long serialVersionUID = 1L;
private AbstractNamespacedMetricExt metric;
private JavaBasePipelineExt pipeline;
private Input input;
private DecoratingQueueWriter decoratingQueueWriter;
public JavaInputDelegatorExt(Ruby runtime, RubyClass metaClass) {
super(runtime, metaClass);
}
public static JavaInputDelegatorExt create(final JavaBasePipelineExt pipeline,
final AbstractNamespacedMetricExt metric, final Input input,
final Map<String, Object> pluginArgs) {
final JavaInputDelegatorExt instance =
new JavaInputDelegatorExt(RubyUtil.RUBY, RubyUtil.JAVA_INPUT_DELEGATOR_CLASS);
AbstractNamespacedMetricExt scopedMetric = metric.namespace(RubyUtil.RUBY.getCurrentContext(), RubyUtil.RUBY.newSymbol(input.getId()));
scopedMetric.gauge(RubyUtil.RUBY.getCurrentContext(), MetricKeys.NAME_KEY, RubyUtil.RUBY.newString(input.getName()));
instance.setMetric(RubyUtil.RUBY.getCurrentContext(), scopedMetric);
instance.input = input;
instance.pipeline = pipeline;
instance.initializeQueueWriter(pluginArgs);
return instance;
}
@JRubyMethod(name = "start")
public IRubyObject start(final ThreadContext context) {
QueueWriter qw = pipeline.getQueueWriter(input.getId());
final QueueWriter queueWriter;
if (decoratingQueueWriter != null) {
decoratingQueueWriter.setInnerQueueWriter(qw);
queueWriter = decoratingQueueWriter;
} else {
queueWriter = qw;
}
Thread t = new Thread(() -> input.start(queueWriter::push));
t.setName(pipeline.pipelineId().asJavaString() + "_" + input.getName() + "_" + input.getId());
t.start();
return JavaObject.wrap(context.getRuntime(), t);
}
@JRubyMethod(name = "metric=")
public IRubyObject setMetric(final ThreadContext context, final IRubyObject metric) {
this.metric = (AbstractNamespacedMetricExt) metric;
return this;
}
@JRubyMethod(name = "metric")
public IRubyObject getMetric(final ThreadContext context) {
return this.metric;
}
@JRubyMethod(name = "config_name", meta = true)
public IRubyObject configName(final ThreadContext context) {
return context.getRuntime().newString(input.getName());
}
@JRubyMethod(name = "id")
public IRubyObject getId(final ThreadContext context) {
return context.getRuntime().newString(input.getId());
}
@JRubyMethod(name = "threadable")
public IRubyObject isThreadable(final ThreadContext context) {
return context.fals;
}
@JRubyMethod(name = "register")
public IRubyObject register(final ThreadContext context) {
return this;
}
@JRubyMethod(name = "do_close")
public IRubyObject close(final ThreadContext context) {
return this;
}
@JRubyMethod(name = "stop?")
public IRubyObject isStopping(final ThreadContext context) {
return context.fals;
}
@JRubyMethod(name = "do_stop")
public IRubyObject doStop(final ThreadContext context) {
try {
input.stop();
input.awaitStop();
} catch (InterruptedException ex) {
// do nothing
}
return this;
}
@SuppressWarnings("unchecked")
private void initializeQueueWriter(Map<String, Object> pluginArgs) {
List<Function<Map<String, Object>, Map<String, Object>>> inputActions = new ArrayList<>();
for (Map.Entry<String, Object> entry : pluginArgs.entrySet()) {
Function<Map<String, Object>, Map<String, Object>> inputAction =
CommonActions.getInputAction(entry);
if (inputAction != null) {
inputActions.add(inputAction);
}
}
if (inputActions.size() == 0) {
this.decoratingQueueWriter = null;
} else {
this.decoratingQueueWriter = new DecoratingQueueWriter(inputActions);
}
}
static class DecoratingQueueWriter implements QueueWriter {
private QueueWriter innerQueueWriter;
private final List<Function<Map<String, Object>, Map<String, Object>>> inputActions;
DecoratingQueueWriter(List<Function<Map<String, Object>, Map<String, Object>>> inputActions) {
this.inputActions = inputActions;
}
@Override
public void push(Map<String, Object> event) {
for (Function<Map<String, Object>, Map<String, Object>> action : inputActions) {
event = action.apply(event);
}
innerQueueWriter.push(event);
}
private void setInnerQueueWriter(QueueWriter innerQueueWriter) {
this.innerQueueWriter = innerQueueWriter;
}
}
}

View file

@ -5,6 +5,7 @@ import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import co.elastic.logstash.api.Event;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyString;
@ -12,9 +13,8 @@ import org.jruby.RubySymbol;
import org.jruby.anno.JRubyClass;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.Event;
import org.logstash.RubyUtil;
import co.elastic.logstash.api.v0.Output;
import co.elastic.logstash.api.Output;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.instrument.metrics.AbstractMetricExt;

View file

@ -1,12 +1,13 @@
package org.logstash.config.ir.compiler;
import co.elastic.logstash.api.Codec;
import org.jruby.RubyInteger;
import org.jruby.RubyString;
import org.jruby.runtime.builtin.IRubyObject;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.v0.Filter;
import co.elastic.logstash.api.v0.Input;
import co.elastic.logstash.api.Filter;
import co.elastic.logstash.api.Input;
import java.util.Map;
@ -63,5 +64,10 @@ public interface PluginFactory extends RubyIntegration.PluginFactory {
public IRubyObject buildCodec(final RubyString name, final IRubyObject args, Map<String, Object> pluginArgs) {
return rubyFactory.buildCodec(name, args, pluginArgs);
}
@Override
public Codec buildDefaultCodec(final String codecName) {
return null;
}
}
}

View file

@ -1,5 +1,6 @@
package org.logstash.config.ir.compiler;
import co.elastic.logstash.api.Codec;
import org.jruby.RubyInteger;
import org.jruby.RubyString;
import org.jruby.runtime.builtin.IRubyObject;
@ -30,5 +31,8 @@ public final class RubyIntegration {
Map<String, Object> pluginArgs);
IRubyObject buildCodec(RubyString name, IRubyObject args, Map<String, Object> pluginArgs);
Codec buildDefaultCodec(String codecName);
}
}

View file

@ -1,47 +0,0 @@
package org.logstash.execution;
import co.elastic.logstash.api.v0.Input;
import java.util.ArrayList;
import java.util.Collection;
/**
* Provides a single point of control for a set of Java inputs.
*/
public class InputsController {
private final Collection<Input> inputs;
private ArrayList<Thread> threads = new ArrayList<>();
public InputsController(final Collection<Input> inputs) {
this.inputs = inputs;
}
public void startInputs(final JavaBasePipelineExt provider) {
int inputCounter = 0;
for (Input input : inputs) {
String pluginName = input.getClass().getName(); // TODO: get annotated plugin name
Thread t = new Thread(() -> input.start(provider.getQueueWriter(pluginName)));
t.setName("input_" + (inputCounter++) + "_" + pluginName);
threads.add(t);
t.start();
}
}
public void stopInputs() {
for (Input input : inputs) {
input.stop();
}
}
public void awaitStop() {
// trivial implementation
for (Input input : inputs) {
try {
input.awaitStop();
} catch (InterruptedException e) {
// do nothing
}
}
}
}

View file

@ -1,8 +1,5 @@
package org.logstash.execution;
import java.security.NoSuchAlgorithmException;
import java.util.Collection;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jruby.Ruby;
@ -21,6 +18,10 @@ import org.logstash.execution.queue.QueueWriter;
import org.logstash.ext.JRubyWrappedWriteClientExt;
import org.logstash.plugins.PluginFactoryExt;
import java.security.NoSuchAlgorithmException;
import java.util.Collection;
import java.util.stream.Stream;
@JRubyClass(name = "JavaBasePipeline")
public final class JavaBasePipelineExt extends AbstractPipelineExt {
@ -90,7 +91,7 @@ public final class JavaBasePipelineExt extends AbstractPipelineExt {
}
@JRubyMethod(name = "reloadable?")
public RubyBoolean isReadloadable(final ThreadContext context) {
public RubyBoolean isReloadable(final ThreadContext context) {
return isConfiguredReloadable(context).isTrue() && reloadablePlugins(context).isTrue()
? context.tru : context.fals;
}

View file

@ -0,0 +1,70 @@
package org.logstash.plugins;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.PluginConfigSpec;
import co.elastic.logstash.api.Codec;
import org.logstash.config.ir.compiler.RubyIntegration;
import java.util.Collection;
import java.util.Map;
/**
* Configuration for Logstash Java plugins.
*/
public final class ConfigurationImpl implements Configuration {
private final RubyIntegration.PluginFactory pluginFactory;
private final Map<String, Object> rawSettings;
/**
* @param raw Configuration settings map. Values are serialized.
* @param pluginFactory Plugin factory for resolving default codecs by name.
*/
public ConfigurationImpl(final Map<String, Object> raw, RubyIntegration.PluginFactory pluginFactory) {
this.pluginFactory = pluginFactory;
this.rawSettings = raw;
}
/**
* @param raw Configuration Settings Map. Values are serialized.
*/
public ConfigurationImpl(final Map<String, Object> raw) {
this(raw, null);
}
@Override
@SuppressWarnings("unchecked")
public <T> T get(final PluginConfigSpec<T> configSpec) {
if (rawSettings.containsKey(configSpec.name())) {
Object o = rawSettings.get(configSpec.name());
if (configSpec.type().isAssignableFrom(o.getClass())) {
return (T) o;
} else {
throw new IllegalStateException(
String.format("Setting value for '%s' of type '%s' incompatible with defined type of '%s'",
configSpec.name(), o.getClass(), configSpec.type()));
}
} else if (configSpec.type() == Codec.class && configSpec.getRawDefaultValue() != null && pluginFactory != null) {
Codec codec = pluginFactory.buildDefaultCodec(configSpec.getRawDefaultValue());
return configSpec.type().cast(codec);
} else {
return configSpec.defaultValue();
}
}
@Override
public Object getRawValue(final PluginConfigSpec<?> configSpec) {
return rawSettings.get(configSpec.name());
}
@Override
public boolean contains(final PluginConfigSpec<?> configSpec) {
return rawSettings.containsKey(configSpec.name());
}
@Override
public Collection<String> allKeys() {
return rawSettings.keySet();
}
}

View file

@ -0,0 +1,50 @@
package org.logstash.plugins;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.EventFactory;
import co.elastic.logstash.api.Plugin;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.ConvertedMap;
import org.logstash.common.io.DeadLetterQueueWriter;
import java.io.Serializable;
import java.util.Map;
public class ContextImpl implements Context {
private DeadLetterQueueWriter dlqWriter;
public ContextImpl(DeadLetterQueueWriter dlqWriter) {
this.dlqWriter = dlqWriter;
}
@Override
public DeadLetterQueueWriter getDlqWriter() {
return dlqWriter;
}
@Override
public Logger getLogger(Plugin plugin) {
return LogManager.getLogger(plugin.getClass());
}
@Override
public EventFactory getEventFactory() {
return new EventFactory() {
@Override
public Event newEvent() {
return new org.logstash.Event();
}
@Override
public Event newEvent(Map<? extends Serializable, Object> data) {
if (data instanceof ConvertedMap) {
return new org.logstash.Event((ConvertedMap)data);
}
return new org.logstash.Event(data);
}
};
}
}

View file

@ -1,35 +1,40 @@
package org.logstash.plugins;
import co.elastic.logstash.api.v0.Input;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.Codec;
import co.elastic.logstash.api.Filter;
import co.elastic.logstash.api.Input;
import co.elastic.logstash.api.Output;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyBasicObject;
import org.jruby.RubyClass;
import org.jruby.RubyHash;
import org.jruby.RubyInteger;
import org.jruby.RubyObject;
import org.jruby.RubyString;
import org.jruby.RubySymbol;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.common.AbstractDeadLetterQueueWriterExt;
import org.logstash.common.io.DeadLetterQueueWriter;
import org.logstash.config.ir.PipelineIR;
import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt;
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.JavaFilterDelegatorExt;
import org.logstash.config.ir.compiler.JavaInputDelegatorExt;
import org.logstash.config.ir.compiler.JavaOutputDelegatorExt;
import org.logstash.config.ir.compiler.OutputDelegatorExt;
import org.logstash.config.ir.compiler.OutputStrategyExt;
import org.logstash.config.ir.compiler.RubyIntegration;
import org.logstash.config.ir.graph.Vertex;
import org.logstash.execution.ExecutionContextExt;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.v0.Filter;
import co.elastic.logstash.api.v0.Output;
import org.logstash.execution.JavaBasePipelineExt;
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricKeys;
@ -39,6 +44,7 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
@ -155,14 +161,6 @@ public final class PluginFactoryExt {
);
}
@JRubyMethod(required = 4)
public IRubyObject buildFilter(final ThreadContext context, final IRubyObject[] args) {
return buildFilter(
(RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(),
args[3], null
);
}
@SuppressWarnings("unchecked")
@Override
public IRubyObject buildCodec(final RubyString name, final IRubyObject args, Map<String, Object> pluginArgs) {
@ -172,9 +170,12 @@ public final class PluginFactoryExt {
);
}
@JRubyMethod(required = 4)
public IRubyObject buildCodec(final ThreadContext context, final IRubyObject[] args) {
return buildCodec((RubyString) args[0], args[1], null);
@Override
public Codec buildDefaultCodec(String codecName) {
return (Codec) JavaUtil.unwrapJavaValue(plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC,
codecName, 0, 0, Collections.emptyMap(), Collections.emptyMap()
));
}
@SuppressWarnings("unchecked")
@ -196,6 +197,8 @@ public final class PluginFactoryExt {
final int line, final int column, final Map<String, IRubyObject> args,
Map<String, Object> pluginArgs) {
final String id;
final PluginLookup.PluginClass pluginClass = PluginLookup.lookup(type, name);
if (type == PluginLookup.PluginType.CODEC) {
id = UUID.randomUUID().toString();
} else {
@ -221,7 +224,6 @@ public final class PluginFactoryExt {
}
pluginsById.add(id);
final AbstractNamespacedMetricExt typeScopedMetric = metrics.create(context, type.rubyLabel());
final PluginLookup.PluginClass pluginClass = PluginLookup.lookup(type, name);
if (pluginClass.language() == PluginLookup.PluginLanguage.RUBY) {
@ -263,8 +265,10 @@ public final class PluginFactoryExt {
Output output = null;
if (cls != null) {
try {
final Constructor<Output> ctor = cls.getConstructor(Configuration.class, Context.class);
output = ctor.newInstance(new Configuration(pluginArgs), new Context());
final Constructor<Output> ctor = cls.getConstructor(String.class, Configuration.class, Context.class);
Configuration config = new ConfigurationImpl(pluginArgs, this);
output = ctor.newInstance(id, config, executionContext.toContext(type));
PluginUtil.validateConfig(output, config);
} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) {
throw new IllegalStateException(ex);
}
@ -280,15 +284,17 @@ public final class PluginFactoryExt {
Filter filter = null;
if (cls != null) {
try {
final Constructor<Filter> ctor = cls.getConstructor(Configuration.class, Context.class);
filter = ctor.newInstance(new Configuration(pluginArgs), new Context());
final Constructor<Filter> ctor = cls.getConstructor(String.class, Configuration.class, Context.class);
Configuration config = new ConfigurationImpl(pluginArgs);
filter = ctor.newInstance(id, config, executionContext.toContext(type));
PluginUtil.validateConfig(filter, config);
} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) {
throw new IllegalStateException(ex);
}
}
if (filter != null) {
return JavaFilterDelegatorExt.create(name, id, typeScopedMetric, filter);
return JavaFilterDelegatorExt.create(name, id, typeScopedMetric, filter, pluginArgs);
} else {
throw new IllegalStateException("Unable to instantiate filter: " + pluginClass);
}
@ -297,47 +303,53 @@ public final class PluginFactoryExt {
Input input = null;
if (cls != null) {
try {
final Constructor<Input> ctor = cls.getConstructor(Configuration.class, Context.class);
input = ctor.newInstance(new Configuration(pluginArgs), new Context());
final Constructor<Input> ctor = cls.getConstructor(String.class, Configuration.class, Context.class);
Configuration config = new ConfigurationImpl(pluginArgs, this);
input = ctor.newInstance(id, config, executionContext.toContext(type));
PluginUtil.validateConfig(input, config);
} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) {
if (ex instanceof InvocationTargetException) {
throw new IllegalStateException((ex).getCause());
}
throw new IllegalStateException(ex);
}
}
if (input != null) {
return JavaInputWrapperExt.create(context, input);
return JavaInputDelegatorExt.create((JavaBasePipelineExt) executionContext.pipeline, typeScopedMetric, input, pluginArgs);
} else {
throw new IllegalStateException("Unable to instantiate input: " + pluginClass);
}
} else {
} else if (type == PluginLookup.PluginType.CODEC) {
final Class<Codec> cls = (Class<Codec>) pluginClass.klass();
Codec codec = null;
if (cls != null) {
try {
final Constructor<Codec> ctor = cls.getConstructor(Configuration.class, Context.class);
Configuration config = new ConfigurationImpl(pluginArgs);
codec = ctor.newInstance(config, executionContext.toContext(type));
PluginUtil.validateConfig(codec, config);
} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) {
if (ex instanceof InvocationTargetException) {
throw new IllegalStateException((ex).getCause());
}
throw new IllegalStateException(ex);
}
}
if (codec != null) {
return JavaUtil.convertJavaToRuby(RubyUtil.RUBY, codec);
} else {
throw new IllegalStateException("Unable to instantiate codec: " + pluginClass);
}
}
else {
throw new IllegalStateException("Unable to create plugin: " + pluginClass.toReadableString());
}
}
}
}
@JRubyClass(name = "JavaInputWrapper")
public static final class JavaInputWrapperExt extends RubyObject {
private static final long serialVersionUID = 1L;
private Input input;
public JavaInputWrapperExt(Ruby runtime, RubyClass metaClass) {
super(runtime, metaClass);
}
public static JavaInputWrapperExt create(ThreadContext context, Input input) {
JavaInputWrapperExt inputWrapper = new JavaInputWrapperExt(context.runtime, RubyUtil.JAVA_INPUT_WRAPPER_CLASS);
inputWrapper.input = input;
return inputWrapper;
}
public Input getInput() {
return input;
}
}
@JRubyClass(name = "ExecutionContextFactory")
public static final class ExecutionContext extends RubyBasicObject {
@ -371,6 +383,25 @@ public final class PluginFactoryExt {
context, new IRubyObject[]{pipeline, agent, id, classConfigName, dlqWriter}
);
}
public Context toContext(PluginLookup.PluginType pluginType) {
DeadLetterQueueWriter dlq = null;
if (pluginType == PluginLookup.PluginType.OUTPUT) {
if (dlqWriter instanceof AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt) {
IRubyObject innerWriter =
((AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt) dlqWriter)
.innerWriter(RubyUtil.RUBY.getCurrentContext());
if (innerWriter != null) {
if (innerWriter.getJavaClass().equals(DeadLetterQueueWriter.class)) {
dlq = (DeadLetterQueueWriter) innerWriter.toJava(DeadLetterQueueWriter.class);
}
}
}
}
return new ContextImpl(dlq);
}
}
@JRubyClass(name = "PluginMetricFactory")

View file

@ -89,7 +89,7 @@ public final class PluginLookup {
this.label = RubyUtil.RUBY.newString(label);
}
RubyString rubyLabel() {
public RubyString rubyLabel() {
return label;
}
}

View file

@ -0,0 +1,58 @@
package org.logstash.plugins;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Plugin;
import co.elastic.logstash.api.PluginConfigSpec;
import co.elastic.logstash.api.PluginHelper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
public class PluginUtil {
private PluginUtil() { /* utility methods */ }
private static final Logger LOGGER = LogManager.getLogger(PluginHelper.class);
public static void validateConfig(Plugin plugin, Configuration config) {
List<String> configErrors = doValidateConfig(plugin, config);
if (configErrors.size() > 0) {
for (String err : configErrors) {
LOGGER.error(err);
}
throw new IllegalStateException("Config errors found for plugin '" + plugin.getName() + "'");
}
}
@VisibleForTesting
public static List<String> doValidateConfig(Plugin plugin, Configuration config) {
List<String> configErrors = new ArrayList<>();
List<String> configSchemaNames = plugin.configSchema().stream().map(PluginConfigSpec::name)
.collect(Collectors.toList());
// find config options that are invalid for the specified plugin
Collection<String> providedConfig = config.allKeys();
for (String configKey : providedConfig) {
if (!configSchemaNames.contains(configKey)) {
configErrors.add(String.format("Unknown setting '%s' specified for plugin '%s'",
configKey, plugin.getName()));
}
}
// find required config options that are missing
for (PluginConfigSpec<?> configSpec : plugin.configSchema()) {
if (configSpec.required() && !providedConfig.contains(configSpec.name())) {
configErrors.add(String.format("Required setting '%s' not specified for plugin '%s'",
configSpec.name(), plugin.getName()));
}
}
return configErrors;
}
}

View file

@ -1,59 +1,82 @@
package org.logstash.plugins.codecs;
import org.logstash.Event;
import org.logstash.StringInterpolation;
import co.elastic.logstash.api.v0.Codec;
import co.elastic.logstash.api.Codec;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.LogstashPlugin;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.PluginHelper;
import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.LogstashPlugin;
import co.elastic.logstash.api.PluginConfigSpec;
import org.logstash.StringInterpolation;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
@LogstashPlugin(name = "java-line")
import static org.logstash.ObjectMappers.JSON_MAPPER;
@LogstashPlugin(name = "java_line")
public class Line implements Codec {
public static final String DEFAULT_DELIMITER = "\n";
public static final String DEFAULT_DELIMITER = System.lineSeparator();
private static final PluginConfigSpec<String> CHARSET_CONFIG =
Configuration.stringSetting("charset", "UTF-8");
PluginConfigSpec.stringSetting("charset", "UTF-8");
private static final PluginConfigSpec<String> DELIMITER_CONFIG =
Configuration.stringSetting("delimiter", DEFAULT_DELIMITER);
PluginConfigSpec.stringSetting("delimiter", DEFAULT_DELIMITER);
private static final PluginConfigSpec<String> FORMAT_CONFIG =
Configuration.stringSetting("format");
PluginConfigSpec.stringSetting("format");
private Context context;
static final String MESSAGE_FIELD = "message";
private final Map<String, Object> map = new HashMap<>();
private final String delimiter;
private final Charset charset;
private String format = null;
private String id;
private final CharBuffer charBuffer = ByteBuffer.allocateDirect(64 * 1024).asCharBuffer();
private final CharsetDecoder decoder;
private final CharsetEncoder encoder;
private String remainder = "";
private Event currentEncodedEvent;
private CharBuffer currentEncoding;
/**
* Required constructor.
*
* @param configuration Logstash Configuration
* @param context Logstash Context
*/
public Line(final Configuration configuration, final Context context) {
delimiter = configuration.get(DELIMITER_CONFIG);
charset = Charset.forName(configuration.get(CHARSET_CONFIG));
format = configuration.get(FORMAT_CONFIG);
this(context, configuration.get(DELIMITER_CONFIG), configuration.get(CHARSET_CONFIG), configuration.get(FORMAT_CONFIG));
}
private Line(Context context, String delimiter, String charsetName, String format) {
this.context = context;
this.id = UUID.randomUUID().toString();
this.delimiter = delimiter;
this.charset = Charset.forName(charsetName);
this.format = format;
decoder = charset.newDecoder();
decoder.onMalformedInput(CodingErrorAction.IGNORE);
encoder = charset.newEncoder();
}
@Override
@ -101,28 +124,54 @@ public class Line implements Codec {
}
}
private static Map<String, Object> simpleMap(String message) {
HashMap<String, Object> simpleMap = new HashMap<>();
simpleMap.put(MESSAGE_FIELD, message);
return simpleMap;
}
@Override
public void encode(Event event, OutputStream output) {
public boolean encode(Event event, ByteBuffer buffer) throws EncodeException {
try {
String outputString = (format == null
? event.toJson()
: StringInterpolation.evaluate(event, format))
+ delimiter;
output.write(outputString.getBytes(charset));
if (currentEncodedEvent != null && event != currentEncodedEvent) {
throw new EncodeException("New event supplied before encoding of previous event was completed");
} else if (currentEncodedEvent == null) {
String eventEncoding = (format == null
? JSON_MAPPER.writeValueAsString(event.getData())
: StringInterpolation.evaluate(event, format))
+ delimiter;
currentEncoding = CharBuffer.wrap(eventEncoding);
}
CoderResult result = encoder.encode(currentEncoding, buffer, true);
buffer.flip();
if (result.isError()) {
result.throwException();
}
if (result.isOverflow()) {
currentEncodedEvent = event;
return false;
} else {
currentEncodedEvent = null;
return true;
}
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
private Map<String, Object> simpleMap(String message) {
map.put(MESSAGE_FIELD, message);
return map;
}
@Override
public Collection<PluginConfigSpec<?>> configSchema() {
return PluginHelper.commonInputOptions(
Arrays.asList(CHARSET_CONFIG, DELIMITER_CONFIG, FORMAT_CONFIG));
return Arrays.asList(CHARSET_CONFIG, DELIMITER_CONFIG, FORMAT_CONFIG);
}
@Override
public String getId() {
return id;
}
@Override
public Codec cloneCodec() {
return new Line(context, delimiter, charset.name(), format);
}
}

View file

@ -1,13 +1,13 @@
package org.logstash.plugins.discovery;
import org.logstash.plugins.PluginLookup;
import co.elastic.logstash.api.v0.Codec;
import co.elastic.logstash.api.Codec;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.v0.Filter;
import co.elastic.logstash.api.v0.Input;
import co.elastic.logstash.api.Filter;
import co.elastic.logstash.api.Input;
import co.elastic.logstash.api.LogstashPlugin;
import co.elastic.logstash.api.v0.Output;
import co.elastic.logstash.api.Output;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;

View file

@ -0,0 +1,62 @@
package org.logstash.plugins.filters;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.LogstashPlugin;
import co.elastic.logstash.api.PluginConfigSpec;
import co.elastic.logstash.api.PluginHelper;
import co.elastic.logstash.api.Filter;
import co.elastic.logstash.api.FilterMatchListener;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
@LogstashPlugin(name = "java_uuid")
public class Uuid implements Filter {
public static final PluginConfigSpec<String> TARGET_CONFIG =
PluginConfigSpec.requiredStringSetting("target");
public static final PluginConfigSpec<Boolean> OVERWRITE_CONFIG =
PluginConfigSpec.booleanSetting("overwrite", false);
private String id;
private String target;
private boolean overwrite;
/**
* Required constructor.
*
* @param id Plugin id
* @param configuration Logstash Configuration
* @param context Logstash Context
*/
public Uuid(final String id, final Configuration configuration, final Context context) {
this.id = id;
this.target = configuration.get(TARGET_CONFIG);
this.overwrite = configuration.get(OVERWRITE_CONFIG);
}
@Override
public Collection<Event> filter(Collection<Event> events, final FilterMatchListener filterMatchListener) {
for (Event e : events) {
if (overwrite || e.getField(target) == null) {
e.setField(target, UUID.randomUUID().toString());
}
filterMatchListener.filterMatched(e);
}
return events;
}
@Override
public Collection<PluginConfigSpec<?>> configSchema() {
return PluginHelper.commonFilterSettings(Arrays.asList(TARGET_CONFIG, OVERWRITE_CONFIG));
}
@Override
public String getId() {
return id;
}
}

View file

@ -1,16 +1,13 @@
package org.logstash.plugins.inputs;
import co.elastic.logstash.api.v0.Codec;
import co.elastic.logstash.api.Codec;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.v0.Input;
import co.elastic.logstash.api.Input;
import co.elastic.logstash.api.LogstashPlugin;
import co.elastic.logstash.api.PluginConfigSpec;
import co.elastic.logstash.api.PluginHelper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.execution.queue.QueueWriter;
import org.logstash.plugins.discovery.PluginRegistry;
import java.io.FileDescriptor;
import java.io.FileInputStream;
@ -24,53 +21,54 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
@LogstashPlugin(name = "java-stdin")
@LogstashPlugin(name = "java_stdin")
public class Stdin implements Input, Consumer<Map<String, Object>> {
private static final Logger logger = LogManager.getLogger(Stdin.class);
private final Logger logger;
public static final PluginConfigSpec<String> CODEC_CONFIG =
Configuration.stringSetting("codec", "java-line");
public static final PluginConfigSpec<Codec> CODEC_CONFIG =
PluginConfigSpec.codecSetting("codec", "java_line");
private static final int BUFFER_SIZE = 64 * 1024;
private final LongAdder eventCounter = new LongAdder();
private String hostname;
private Codec codec;
private volatile boolean stopRequested = false;
private final CountDownLatch isStopped = new CountDownLatch(1);
private FileChannel input;
private QueueWriter writer;
private Consumer<Map<String, Object>> writer;
private String id;
/**
* Required Constructor Signature only taking a {@link Configuration}.
* Required constructor.
*
* @param id Plugin id
* @param configuration Logstash Configuration
* @param context Logstash Context
*/
public Stdin(final Configuration configuration, final Context context) {
this(configuration, context, new FileInputStream(FileDescriptor.in).getChannel());
public Stdin(final String id, final Configuration configuration, final Context context) {
this(id, configuration, context, new FileInputStream(FileDescriptor.in).getChannel());
}
Stdin(final Configuration configuration, final Context context, FileChannel inputChannel) {
Stdin(final String id, final Configuration configuration, final Context context, FileChannel inputChannel) {
logger = context.getLogger(this);
this.id = id;
try {
hostname = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
hostname = "[unknownHost]";
}
String codecName = configuration.get(CODEC_CONFIG);
codec = PluginRegistry.getCodec(codecName, configuration, context);
codec = configuration.get(CODEC_CONFIG);
if (codec == null) {
throw new IllegalStateException(String.format("Unable to obtain codec '%a'", codecName));
throw new IllegalStateException("Unable to obtain codec");
}
input = inputChannel;
}
@Override
public void start(QueueWriter writer) {
public void start(Consumer<Map<String, Object>> writer) {
this.writer = writer;
final ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
try {
@ -80,8 +78,7 @@ public class Stdin implements Input, Consumer<Map<String, Object>> {
buffer.compact();
}
} catch (AsynchronousCloseException e2) {
// do nothing -- this happens when stop is called during a pending read
logger.warn("Stop request interrupted pending read");
// do nothing -- this happens when stop is called while the read loop is blocked on input.read()
} catch (IOException e) {
stopRequested = true;
logger.error("Stopping stdin after read error", e);
@ -102,8 +99,7 @@ public class Stdin implements Input, Consumer<Map<String, Object>> {
@Override
public void accept(Map<String, Object> event) {
event.putIfAbsent("hostname", hostname);
writer.push(event);
eventCounter.increment();
writer.accept(event);
}
@Override
@ -123,6 +119,11 @@ public class Stdin implements Input, Consumer<Map<String, Object>> {
@Override
public Collection<PluginConfigSpec<?>> configSchema() {
return PluginHelper.commonInputOptions(Collections.singletonList(CODEC_CONFIG));
return PluginHelper.commonInputSettings(Collections.singletonList(CODEC_CONFIG));
}
@Override
public String getId() {
return id;
}
}

View file

@ -1,52 +1,68 @@
package org.logstash.plugins.outputs;
import co.elastic.logstash.api.v0.Codec;
import org.logstash.Event;
import co.elastic.logstash.api.LogstashPlugin;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.v0.Output;
import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.LogstashPlugin;
import co.elastic.logstash.api.PluginConfigSpec;
import org.logstash.plugins.discovery.PluginRegistry;
import co.elastic.logstash.api.PluginHelper;
import co.elastic.logstash.api.Codec;
import co.elastic.logstash.api.Output;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
@LogstashPlugin(name = "java-stdout")
@LogstashPlugin(name = "java_stdout")
public class Stdout implements Output {
public static final PluginConfigSpec<String> CODEC_CONFIG =
Configuration.stringSetting("codec", "java-line");
public static final PluginConfigSpec<Codec> CODEC_CONFIG =
PluginConfigSpec.codecSetting("codec", "java-line");
private Codec codec;
private OutputStream outputStream;
private final CountDownLatch done = new CountDownLatch(1);
private String id;
private ByteBuffer encodeBuffer = ByteBuffer.wrap(new byte[16 * 1024]);
/**
* Required Constructor Signature only taking a {@link Configuration}.
* Required constructor.
*
* @param id Plugin id
* @param configuration Logstash Configuration
* @param context Logstash Context
*/
public Stdout(final Configuration configuration, final Context context) {
this(configuration, context, System.out);
public Stdout(final String id, final Configuration configuration, final Context context) {
this(id, configuration, context, System.out);
}
Stdout(final Configuration configuration, final Context context, OutputStream targetStream) {
Stdout(final String id, final Configuration configuration, final Context context, OutputStream targetStream) {
this.id = id;
this.outputStream = targetStream;
String codecName = configuration.get(CODEC_CONFIG);
codec = PluginRegistry.getCodec(codecName, configuration, context);
codec = configuration.get(CODEC_CONFIG);
if (codec == null) {
throw new IllegalStateException(String.format("Unable to obtain codec '%a'", codecName));
throw new IllegalStateException("Unable to obtain codec");
}
}
@Override
public void output(final Collection<Event> events) {
for (Event e : events) {
codec.encode(e, outputStream);
try {
boolean encodeCompleted;
for (Event e : events) {
encodeBuffer.clear();
do {
encodeCompleted = codec.encode(e, encodeBuffer);
outputStream.write(encodeBuffer.array(), encodeBuffer.position(), encodeBuffer.limit());
encodeBuffer.flip();
}
while (!encodeCompleted);
}
} catch (Codec.EncodeException | IOException ex) {
throw new IllegalStateException(ex);
}
}
@ -62,6 +78,11 @@ public class Stdout implements Output {
@Override
public Collection<PluginConfigSpec<?>> configSchema() {
return Collections.singletonList(CODEC_CONFIG);
return PluginHelper.commonOutputSettings(Collections.singletonList(CODEC_CONFIG));
}
@Override
public String getId() {
return id;
}
}

View file

@ -2,8 +2,11 @@ package org.logstash;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Assert;
import org.junit.Test;
import java.time.Instant;
import static org.junit.Assert.*;
public class TimestampTest {
@ -41,4 +44,20 @@ public class TimestampTest {
assertEquals(DateTimeZone.UTC, t.getTime().getZone());
}
@Test
public void testMicroseconds() {
Instant i = Instant.now();
Timestamp t1 = new Timestamp(i.toEpochMilli());
long usec = t1.usec();
Assert.assertEquals(i.getNano() / 1000, usec);
}
@Test
public void testEpochMillis() {
Instant i = Instant.now();
Timestamp t1 = new Timestamp(i.toEpochMilli());
long millis = t1.toEpochMilli();
Assert.assertEquals(i.toEpochMilli(), millis);
}
}

View file

@ -1,5 +1,6 @@
package org.logstash.config.ir;
import co.elastic.logstash.api.Codec;
import com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.Collection;
@ -30,8 +31,8 @@ import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.PluginFactory;
import org.logstash.ext.JrubyEventExtLibrary;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.v0.Filter;
import co.elastic.logstash.api.v0.Input;
import co.elastic.logstash.api.Filter;
import co.elastic.logstash.api.Input;
import co.elastic.logstash.api.Context;
/**
@ -462,6 +463,11 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
throw new IllegalStateException("No codec setup expected in this test.");
}
@Override
public Codec buildDefaultCodec(String codecName) {
return null;
}
private static <T> T setupPlugin(final RubyString name,
final Map<String, Supplier<T>> suppliers) {
final String key = name.asJavaString();

View file

@ -0,0 +1,245 @@
package org.logstash.config.ir.compiler;
import org.junit.Assert;
import org.junit.Test;
import org.logstash.Event;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CommonActionsTest {
private static final String TAGS = "tags";
@Test
public void testAddField() {
// add field to empty event
Event e = new Event();
String testField = "test_field";
String testStringValue = "test_value";
CommonActions.addField(e, Collections.singletonMap(testField, testStringValue));
Assert.assertEquals(testStringValue, e.getField(testField));
// add to existing field and convert to array value
e = new Event(Collections.singletonMap(testField, testStringValue));
CommonActions.addField(e, Collections.singletonMap(testField, testStringValue));
Object value = e.getField(testField);
Assert.assertTrue(value instanceof List);
Assert.assertEquals(2, ((List) value).size());
Assert.assertEquals(testStringValue, ((List) value).get(0));
Assert.assertEquals(testStringValue, ((List) value).get(1));
// add to existing array field
String testStringValue2 = "test_value2";
List<String> stringVals = Arrays.asList(testStringValue, testStringValue2);
e = new Event(Collections.singletonMap(testField, stringVals));
CommonActions.addField(e, Collections.singletonMap(testField, testStringValue));
value = e.getField(testField);
Assert.assertTrue(value instanceof List);
Assert.assertEquals(3, ((List) value).size());
Assert.assertEquals(testStringValue, ((List) value).get(0));
Assert.assertEquals(testStringValue2, ((List) value).get(1));
Assert.assertEquals(testStringValue, ((List) value).get(2));
// add non-string value to empty event
Long testLongValue = 42L;
e = new Event();
CommonActions.addField(e, Collections.singletonMap(testField, testLongValue));
Assert.assertEquals(testLongValue, e.getField(testField));
// add non-string value to existing field
e = new Event(Collections.singletonMap(testField, testStringValue));
CommonActions.addField(e, Collections.singletonMap(testField, testLongValue));
value = e.getField(testField);
Assert.assertTrue(value instanceof List);
Assert.assertEquals(2, ((List) value).size());
Assert.assertEquals(testStringValue, ((List) value).get(0));
Assert.assertEquals(testLongValue, ((List) value).get(1));
// add non-string value to existing array field
e = new Event(Collections.singletonMap(testField, stringVals));
CommonActions.addField(e, Collections.singletonMap(testField, testLongValue));
value = e.getField(testField);
Assert.assertTrue(value instanceof List);
Assert.assertEquals(3, ((List) value).size());
Assert.assertEquals(testStringValue, ((List) value).get(0));
Assert.assertEquals(testStringValue2, ((List) value).get(1));
Assert.assertEquals(testLongValue, ((List) value).get(2));
// add field/value with dynamic values
e = new Event(Collections.singletonMap(testField, testStringValue));
String newField = "%{" + testField + "}_field";
String newValue = "%{" + testField + "}_value";
CommonActions.addField(e, Collections.singletonMap(newField, newValue));
Assert.assertEquals(testStringValue + "_value", e.getField(testStringValue + "_field"));
}
@Test
public void testAddTag() {
// add tag to empty event
Event e = new Event();
String testTag = "test_tag";
CommonActions.addTag(e, Collections.singletonList(testTag));
Object value = e.getField(TAGS);
Assert.assertTrue(value instanceof List);
Assert.assertEquals(1, ((List) value).size());
Assert.assertEquals(testTag, ((List) value).get(0));
// add two tags to empty event
e = new Event();
String testTag2 = "test_tag2";
CommonActions.addTag(e, Arrays.asList(testTag, testTag2));
value = e.getField(TAGS);
Assert.assertTrue(value instanceof List);
Assert.assertEquals(2, ((List) value).size());
Assert.assertEquals(testTag, ((List) value).get(0));
Assert.assertEquals(testTag2, ((List) value).get(1));
// add duplicate tag
e = new Event();
e.tag(testTag);
CommonActions.addTag(e, Collections.singletonList(testTag));
value = e.getField(TAGS);
Assert.assertTrue(value instanceof List);
Assert.assertEquals(1, ((List) value).size());
Assert.assertEquals(testTag, ((List) value).get(0));
// add dynamically-named tag
e = new Event(Collections.singletonMap(testTag, testTag2));
CommonActions.addTag(e, Collections.singletonList("%{" + testTag + "}_foo"));
value = e.getField(TAGS);
Assert.assertTrue(value instanceof List);
Assert.assertEquals(1, ((List) value).size());
Assert.assertEquals(testTag2 + "_foo", ((List) value).get(0));
// add non-string tag
e = new Event();
Long nonStringTag = 42L;
CommonActions.addTag(e, Collections.singletonList(nonStringTag));
value = e.getField(TAGS);
Assert.assertTrue(value instanceof List);
Assert.assertEquals(1, ((List) value).size());
Assert.assertEquals(nonStringTag.toString(), ((List) value).get(0));
}
@Test
public void testAddType() {
// add tag to empty event
Map<String, Object> e = new HashMap<>();
String testType = "test_type";
Map<String, Object> e2 = CommonActions.addType(e, testType);
Assert.assertEquals(testType, e2.get("type"));
// add type to already-typed event
e = new HashMap<>();
String existingType = "existing_type";
e.put("type", existingType);
e2 = CommonActions.addType(e, testType);
Assert.assertEquals(existingType, e2.get("type"));
}
@Test
public void testRemoveField() {
// remove a field
Event e = new Event();
String testField = "test_field";
String testValue = "test_value";
e.setField(testField, testValue);
CommonActions.removeField(e, Collections.singletonList(testField));
Assert.assertFalse(e.getData().keySet().contains(testField));
// remove non-existent field
e = new Event();
String testField2 = "test_field2";
e.setField(testField2, testValue);
CommonActions.removeField(e, Collections.singletonList(testField));
Assert.assertFalse(e.getData().keySet().contains(testField));
Assert.assertTrue(e.getData().keySet().contains(testField2));
// remove multiple fields
e = new Event();
List<String> fields = new ArrayList<>();
for (int k = 0; k < 3; k++) {
String field = testField + k;
e.setField(field, testValue);
fields.add(field);
}
e.setField(testField, testValue);
CommonActions.removeField(e, fields);
for (String field : fields) {
Assert.assertFalse(e.getData().keySet().contains(field));
}
Assert.assertTrue(e.getData().keySet().contains(testField));
// remove dynamically-named field
e = new Event();
String otherField = "other_field";
String otherValue = "other_value";
e.setField(otherField, otherValue);
String derivativeField = otherValue + "_foo";
e.setField(derivativeField, otherValue);
CommonActions.removeField(e, Collections.singletonList("%{" + otherField + "}_foo"));
Assert.assertFalse(e.getData().keySet().contains(derivativeField));
Assert.assertTrue(e.getData().keySet().contains(otherField));
}
@Test
public void testRemoveTag() {
// remove a tag
Event e = new Event();
String testTag = "test_tag";
e.tag(testTag);
CommonActions.removeTag(e, Collections.singletonList(testTag));
Object o = e.getField(TAGS);
Assert.assertTrue(o instanceof List);
Assert.assertEquals(0, ((List) o).size());
// remove non-existent tag
e = new Event();
e.tag(testTag);
CommonActions.removeTag(e, Collections.singletonList(testTag + "non-existent"));
o = e.getField(TAGS);
Assert.assertTrue(o instanceof List);
Assert.assertEquals(1, ((List) o).size());
Assert.assertEquals(testTag, ((List) o).get(0));
// remove multiple tags
e = new Event();
List<String> tags = new ArrayList<>();
for (int k = 0; k < 3; k++) {
String tag = testTag + k;
tags.add(tag);
e.tag(tag);
}
CommonActions.removeTag(e, tags);
o = e.getField(TAGS);
Assert.assertTrue(o instanceof List);
Assert.assertEquals(0, ((List) o).size());
// remove tags when "tags" fields isn't tags
e = new Event();
Long nonTagValue = 42L;
e.setField(TAGS, nonTagValue);
CommonActions.removeTag(e, Collections.singletonList(testTag));
o = e.getField(TAGS);
Assert.assertFalse(o instanceof List);
Assert.assertEquals(nonTagValue, o);
// remove dynamically-named tag
e = new Event();
String otherField = "other_field";
String otherValue = "other_value";
e.setField(otherField, otherValue);
e.tag(otherValue + "_foo");
CommonActions.removeTag(e, Collections.singletonList("%{" + otherField + "}_foo"));
o = e.getField(TAGS);
Assert.assertTrue(o instanceof List);
Assert.assertEquals(0, ((List) o).size());
}
}

View file

@ -1,12 +1,17 @@
package co.elastic.logstash.api;
package org.logstash.plugins;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.PluginConfigSpec;
import co.elastic.logstash.api.Codec;
import org.junit.Assert;
import org.junit.Test;
import org.logstash.plugins.codecs.Line;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class ConfigurationTest {
public class ConfigurationImplTest {
private String stringKey = "string", numberKey = "number", booleanKey = "boolean";
private String stringValue = "stringValue";
@ -18,16 +23,16 @@ public class ConfigurationTest {
configValues.put(stringKey, stringValue);
configValues.put(numberKey, longValue);
configValues.put(booleanKey, booleanValue);
return new Configuration(configValues);
return new ConfigurationImpl(configValues);
}
@Test
public void testConfiguration() {
Configuration config = getTestConfiguration();
PluginConfigSpec<String> stringConfig = new PluginConfigSpec<>(stringKey, String.class, "", false, false);
PluginConfigSpec<Long> numberConfig = new PluginConfigSpec<>(numberKey, Long.class, 0L, false, false);
PluginConfigSpec<Boolean> booleanConfig = new PluginConfigSpec<>(booleanKey, Boolean.class, false, false, false);
PluginConfigSpec<String> stringConfig = PluginConfigSpec.stringSetting(stringKey, "", false, false);
PluginConfigSpec<Long> numberConfig = PluginConfigSpec.numSetting(numberKey, 0L, false, false);
PluginConfigSpec<Boolean> booleanConfig = PluginConfigSpec.booleanSetting(booleanKey, false, false, false);
Assert.assertEquals(stringValue, config.get(stringConfig));
Assert.assertEquals(longValue, (long) config.get(numberConfig));
@ -36,15 +41,15 @@ public class ConfigurationTest {
@Test
public void testDefaultValues() {
Configuration unsetConfig = new Configuration(new HashMap<>());
Configuration unsetConfig = new ConfigurationImpl(new HashMap<>());
String defaultStringValue = "defaultStringValue";
long defaultLongValue = 43L;
boolean defaultBooleanValue = false;
PluginConfigSpec<String> stringConfig = new PluginConfigSpec<>(stringKey, String.class, defaultStringValue, false, false);
PluginConfigSpec<Long> numberConfig = new PluginConfigSpec<>(numberKey, Long.class, defaultLongValue, false, false);
PluginConfigSpec<Boolean> booleanConfig = new PluginConfigSpec<>(booleanKey, Boolean.class, defaultBooleanValue, false, false);
PluginConfigSpec<String> stringConfig = PluginConfigSpec.stringSetting(stringKey, defaultStringValue, false, false);
PluginConfigSpec<Long> numberConfig = PluginConfigSpec.numSetting(numberKey, defaultLongValue, false, false);
PluginConfigSpec<Boolean> booleanConfig = PluginConfigSpec.booleanSetting(booleanKey, defaultBooleanValue, false, false);
Assert.assertEquals(defaultStringValue, unsetConfig.get(stringConfig));
Assert.assertEquals(defaultLongValue, (long) unsetConfig.get(numberConfig));
@ -60,9 +65,9 @@ public class ConfigurationTest {
public void testBrokenConfig() {
Configuration config = getTestConfiguration();
PluginConfigSpec<Long> brokenLongConfig = new PluginConfigSpec<>(stringKey, Long.class, 0L, false, false);
PluginConfigSpec<Boolean> brokenBooleanConfig = new PluginConfigSpec<>(numberKey, Boolean.class, false, false, false);
PluginConfigSpec<String> brokenStringConfig = new PluginConfigSpec<>(booleanKey, String.class, "", false, false);
PluginConfigSpec<Long> brokenLongConfig = PluginConfigSpec.numSetting(stringKey, 0L, false, false);
PluginConfigSpec<Boolean> brokenBooleanConfig = PluginConfigSpec.booleanSetting(numberKey, false, false, false);
PluginConfigSpec<String> brokenStringConfig = PluginConfigSpec.stringSetting(booleanKey, "", false, false);
try {
Long l = config.get(brokenLongConfig);
@ -91,4 +96,14 @@ public class ConfigurationTest {
Assert.fail("Did not throw correct exception for invalid config value type");
}
}
@Test
public void testDefaultCodec() {
PluginConfigSpec<Codec> codecConfig = PluginConfigSpec.codecSetting("codec", "java-line");
Configuration config = new ConfigurationImpl(Collections.emptyMap(), new TestPluginFactory());
Codec codec = config.get(codecConfig);
Assert.assertTrue(codec instanceof Line);
}
}

View file

@ -0,0 +1,221 @@
package org.logstash.plugins;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Plugin;
import co.elastic.logstash.api.PluginConfigSpec;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RunWith(Parameterized.class)
public class PluginUtilValidateConfigTest {
private ValidateConfigTestCase testCase;
public PluginUtilValidateConfigTest(ValidateConfigTestCase v) {
this.testCase = v;
}
@SuppressWarnings("rawtypes")
@Parameterized.Parameters(name = "{index}: {0}")
public static Collection testParameters() {
List<ValidateConfigTestCase> testParameters = new ArrayList<>();
// optional config items, none provided
List<PluginConfigSpec<?>> configSpec01 = Arrays.asList(
PluginConfigSpec.stringSetting("foo1"), PluginConfigSpec.stringSetting("foo2"));
TestingPlugin p01 = new TestingPlugin(configSpec01);
Configuration config01 = new ConfigurationImpl(Collections.emptyMap());
testParameters.add(
new ValidateConfigTestCase("optional config items, none provided",
p01, config01, Collections.emptyList(), Collections.emptyList()));
// optional config items, some provided
List<PluginConfigSpec<?>> configSpec02 = Arrays.asList(
PluginConfigSpec.stringSetting("foo1"), PluginConfigSpec.stringSetting("foo2"));
TestingPlugin p02 = new TestingPlugin(configSpec02);
Configuration config02 = new ConfigurationImpl(Collections.singletonMap("foo1", "bar"));
testParameters.add(
new ValidateConfigTestCase("optional config items, some provided",
p02, config02, Collections.emptyList(), Collections.emptyList()));
// optional config items, all provided
List<PluginConfigSpec<?>> configSpec03 = Arrays.asList(
PluginConfigSpec.stringSetting("foo1"), PluginConfigSpec.stringSetting("foo2"));
TestingPlugin p03 = new TestingPlugin(configSpec03);
Map<String, Object> configMap03 = new HashMap<>();
configMap03.put("foo1", "bar");
configMap03.put("foo2", "bar");
Configuration config03 = new ConfigurationImpl(configMap03);
testParameters.add(
new ValidateConfigTestCase("optional config items, all provided",
p03, config03, Collections.emptyList(), Collections.emptyList()));
// optional config items, too many provided
List<PluginConfigSpec<?>> configSpec04 = Arrays.asList(
PluginConfigSpec.stringSetting("foo1"), PluginConfigSpec.stringSetting("foo2"));
TestingPlugin p04 = new TestingPlugin(configSpec04);
Map<String, Object> configMap04 = new HashMap<>();
configMap04.put("foo1", "bar");
configMap04.put("foo2", "bar");
configMap04.put("foo3", "bar");
Configuration config04 = new ConfigurationImpl(configMap04);
testParameters.add(
new ValidateConfigTestCase("optional config items, too many provided",
p04, config04, Collections.singletonList("foo3"), Collections.emptyList()));
// required config items, all provided
List<PluginConfigSpec<?>> configSpec05 = Arrays.asList(PluginConfigSpec.requiredStringSetting("foo"));
TestingPlugin p05 = new TestingPlugin(configSpec05);
Configuration config05 = new ConfigurationImpl(Collections.singletonMap("foo", "bar"));
testParameters.add(
new ValidateConfigTestCase("required config items, all provided",
p05, config05, Collections.emptyList(), Collections.emptyList()));
// required config items, some provided
List<PluginConfigSpec<?>> configSpec06 = Arrays.asList(
PluginConfigSpec.requiredStringSetting("foo1"), PluginConfigSpec.requiredStringSetting("foo2"));
TestingPlugin p06 = new TestingPlugin(configSpec06);
Configuration config06 = new ConfigurationImpl(Collections.singletonMap("foo1", "bar"));
testParameters.add(
new ValidateConfigTestCase("required config items, some provided",
p06, config06, Collections.emptyList(), Collections.singletonList("foo2")));
// required config items, too many provided
List<PluginConfigSpec<?>> configSpec07 = Arrays.asList(
PluginConfigSpec.requiredStringSetting("foo1"), PluginConfigSpec.requiredStringSetting("foo2"));
TestingPlugin p07 = new TestingPlugin(configSpec07);
Map<String, Object> configMap07 = new HashMap<>();
configMap07.put("foo1", "bar");
configMap07.put("foo3", "bar");
Configuration config07 = new ConfigurationImpl(configMap07);
testParameters.add(
new ValidateConfigTestCase("required config items, too many provided",
p07, config07, Collections.singletonList("foo3"), Collections.singletonList("foo2")));
// optional+required config items, some provided
List<PluginConfigSpec<?>> configSpec08 = Arrays.asList(
PluginConfigSpec.requiredStringSetting("foo1"), PluginConfigSpec.requiredStringSetting("foo2"),
PluginConfigSpec.stringSetting("foo3"), PluginConfigSpec.stringSetting("foo4"));
TestingPlugin p08 = new TestingPlugin(configSpec08);
Map<String, Object> configMap08 = new HashMap<>();
configMap08.put("foo1", "bar");
configMap08.put("foo2", "bar");
configMap08.put("foo3", "bar");
Configuration config08 = new ConfigurationImpl(configMap08);
testParameters.add(
new ValidateConfigTestCase("optional+required config items, some provided",
p08, config08, Collections.emptyList(), Collections.emptyList()));
// optional+required config items, some missing
List<PluginConfigSpec<?>> configSpec09 = Arrays.asList(
PluginConfigSpec.requiredStringSetting("foo1"), PluginConfigSpec.requiredStringSetting("foo2"),
PluginConfigSpec.stringSetting("foo3"), PluginConfigSpec.stringSetting("foo4"));
TestingPlugin p09 = new TestingPlugin(configSpec09);
Map<String, Object> configMap09 = new HashMap<>();
configMap09.put("foo1", "bar");
configMap09.put("foo3", "bar");
Configuration config09 = new ConfigurationImpl(configMap09);
testParameters.add(
new ValidateConfigTestCase("optional+required config items, some missing",
p09, config09, Collections.emptyList(), Collections.singletonList("foo2")));
// optional+required config items, some missing, some invalid
List<PluginConfigSpec<?>> configSpec10 = Arrays.asList(
PluginConfigSpec.requiredStringSetting("foo1"), PluginConfigSpec.requiredStringSetting("foo2"),
PluginConfigSpec.stringSetting("foo3"), PluginConfigSpec.stringSetting("foo4"));
TestingPlugin p10 = new TestingPlugin(configSpec10);
Map<String, Object> configMap10 = new HashMap<>();
configMap10.put("foo1", "bar");
configMap10.put("foo3", "bar");
configMap10.put("foo5", "bar");
Configuration config10 = new ConfigurationImpl(configMap10);
testParameters.add(
new ValidateConfigTestCase("optional+required config items, some missing, some invalid",
p10, config10, Collections.singletonList("foo5"), Collections.singletonList("foo2")));
return testParameters;
}
@Test
public void testValidateConfig() {
List<String> configErrors = PluginUtil.doValidateConfig(testCase.plugin, testCase.config);
for (String expectedUnknown : testCase.expectedUnknownOptions) {
Assert.assertTrue(
String.format("Expected [Unknown setting '%s' specified for plugin '%s']",
expectedUnknown, testCase.plugin.getName()),
configErrors.contains(String.format(
"Unknown setting '%s' specified for plugin '%s'", expectedUnknown,
testCase.plugin.getName())));
}
for (String expectedRequired : testCase.expectedRequiredOptions) {
Assert.assertTrue(
String.format("Expected [Required setting '%s' not specified for plugin '%s']",
expectedRequired, testCase.plugin.getName()),
configErrors.contains(String.format(
"Required setting '%s' not specified for plugin '%s'", expectedRequired,
testCase.plugin.getName())));
}
for (String configError : configErrors) {
if (configError.startsWith("Unknown")) {
int quoteIndex = configError.indexOf("'");
String configOption = configError.substring(
quoteIndex + 1, configError.indexOf("'", quoteIndex + 1));
Assert.assertTrue(
"Unexpected config error: " + configError,
testCase.expectedUnknownOptions.contains(configOption));
} else if (configError.startsWith("Required")) {
int quoteIndex = configError.indexOf("'");
String configOption = configError.substring(
quoteIndex + 1, configError.indexOf("'", quoteIndex + 1));
Assert.assertTrue(
"Unexpected config error: " + configError,
testCase.expectedRequiredOptions.contains(configOption));
} else {
Assert.fail("Unknown type of config error: " + configError);
}
}
Assert.assertEquals("Unexpected number of config errors",
testCase.expectedRequiredOptions.size() + testCase.expectedUnknownOptions.size(),
configErrors.size());
}
private static class ValidateConfigTestCase {
String description;
Plugin plugin;
Configuration config;
List<String> expectedUnknownOptions;
List<String> expectedRequiredOptions;
ValidateConfigTestCase(String description, Plugin plugin, Configuration config,
List<String> expectedUnknownOptions, List<String> expectedRequiredOptions) {
this.description = description;
this.plugin = plugin;
this.config = config;
this.expectedUnknownOptions = expectedUnknownOptions;
this.expectedRequiredOptions = expectedRequiredOptions;
}
@Override
public String toString() {
return description;
}
}
}

View file

@ -0,0 +1,24 @@
package org.logstash.plugins;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.EventFactory;
import co.elastic.logstash.api.Plugin;
import org.apache.logging.log4j.Logger;
import org.logstash.common.io.DeadLetterQueueWriter;
public class TestContext implements Context {
@Override
public DeadLetterQueueWriter getDlqWriter() {
return null;
}
@Override
public Logger getLogger(Plugin plugin) {
return null;
}
@Override
public EventFactory getEventFactory() { return null; }
}

View file

@ -0,0 +1,41 @@
package org.logstash.plugins;
import co.elastic.logstash.api.Codec;
import org.jruby.RubyInteger;
import org.jruby.RubyString;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt;
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
import org.logstash.config.ir.compiler.RubyIntegration;
import org.logstash.plugins.codecs.Line;
import java.util.Collections;
import java.util.Map;
public class TestPluginFactory implements RubyIntegration.PluginFactory {
@Override
public IRubyObject buildInput(RubyString name, RubyInteger line, RubyInteger column, IRubyObject args, Map<String, Object> pluginArgs) {
return null;
}
@Override
public AbstractOutputDelegatorExt buildOutput(RubyString name, RubyInteger line, RubyInteger column, IRubyObject args, Map<String, Object> pluginArgs) {
return null;
}
@Override
public AbstractFilterDelegatorExt buildFilter(RubyString name, RubyInteger line, RubyInteger column, IRubyObject args, Map<String, Object> pluginArgs) {
return null;
}
@Override
public IRubyObject buildCodec(RubyString name, IRubyObject args, Map<String, Object> pluginArgs) {
return null;
}
@Override
public Codec buildDefaultCodec(String codecName) {
return new Line(new ConfigurationImpl(Collections.emptyMap()), new ContextImpl(null));
}
}

View file

@ -0,0 +1,32 @@
package org.logstash.plugins;
import co.elastic.logstash.api.LogstashPlugin;
import co.elastic.logstash.api.Plugin;
import co.elastic.logstash.api.PluginConfigSpec;
import java.util.Collection;
import static org.logstash.plugins.TestingPlugin.TEST_PLUGIN_NAME;
@LogstashPlugin(name = TEST_PLUGIN_NAME)
public class TestingPlugin implements Plugin {
static final String TEST_PLUGIN_NAME = "test_plugin";
static final String ID = "TestingPluginId";
private final Collection<PluginConfigSpec<?>> configSchema;
TestingPlugin(Collection<PluginConfigSpec<?>> configSchema) {
this.configSchema = configSchema;
}
@Override
public Collection<PluginConfigSpec<?>> configSchema() {
return configSchema;
}
@Override
public String getId() {
return ID;
}
}

View file

@ -1,10 +1,12 @@
package org.logstash.plugins.codecs;
import co.elastic.logstash.api.Codec;
import org.junit.Assert;
import org.junit.Test;
import org.logstash.Event;
import co.elastic.logstash.api.Configuration;
import org.logstash.plugins.ConfigurationImpl;
import org.logstash.plugins.TestContext;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@ -125,41 +127,27 @@ public class LineTest {
byte[] bytes = input.getBytes();
assertTrue(bytes.length > input.length());
ByteBuffer b1 = ByteBuffer.allocate(BUFFER_SIZE);
System.out.println(b1);
b1.put(bytes, lastPos, 12);
System.out.println(b1);
b1.flip();
System.out.println(b1);
Line line = getLineCodec(null, null);
line.decode(b1, eventConsumer);
System.out.println(b1);
b1.compact();
System.out.println(b1);
int remaining = b1.remaining();
lastPos += BUFFER_SIZE;
b1.put(bytes, lastPos, remaining);
System.out.println(b1);
b1.flip();
System.out.println(b1);
line.decode(b1, eventConsumer);
System.out.println(b1);
b1.compact();
System.out.println(b1);
remaining = b1.remaining();
lastPos += remaining;
b1.put(bytes, lastPos, bytes.length - lastPos);
System.out.println(b1);
b1.flip();
System.out.println(b1);
line.decode(b1, eventConsumer);
System.out.println(b1);
b1.compact();
System.out.println(b1);
b1.flip();
System.out.println(b1);
line.flush(b1, eventConsumer);
}
@ -212,7 +200,7 @@ public class LineTest {
if (charset != null) {
config.put("charset", charset);
}
return new Line(new Configuration(config), null);
return new Line(new ConfigurationImpl(config), new TestContext());
}
@Test
@ -220,7 +208,7 @@ public class LineTest {
TestEventConsumer flushConsumer = new TestEventConsumer();
// decode with cp-1252
Line cp1252decoder = new Line(new Configuration(Collections.singletonMap("charset", "cp1252")), null);
Line cp1252decoder = new Line(new ConfigurationImpl(Collections.singletonMap("charset", "cp1252")), new TestContext());
byte[] rightSingleQuoteInCp1252 = {(byte) 0x92};
ByteBuffer b1 = ByteBuffer.wrap(rightSingleQuoteInCp1252);
cp1252decoder.decode(b1, flushConsumer);
@ -231,7 +219,7 @@ public class LineTest {
// decode with UTF-8
flushConsumer.events.clear();
Line utf8decoder = new Line(new Configuration(Collections.emptyMap()), null);
Line utf8decoder = new Line(new ConfigurationImpl(Collections.emptyMap()), new TestContext());
byte[] rightSingleQuoteInUtf8 = {(byte) 0xE2, (byte) 0x80, (byte) 0x99};
ByteBuffer b2 = ByteBuffer.wrap(rightSingleQuoteInUtf8);
utf8decoder.decode(b2, flushConsumer);
@ -243,60 +231,170 @@ public class LineTest {
}
@Test
public void testEncode() {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Line line = new Line(new Configuration(Collections.emptyMap()), null);
Event e = new Event();
e.setField("myfield1", "myvalue1");
e.setField("myfield2", 42L);
line.encode(e, outputStream);
e.setField("myfield1", "myvalue2");
e.setField("myfield2", 43L);
line.encode(e, outputStream);
String delimiter = Line.DEFAULT_DELIMITER;
String resultingString = outputStream.toString();
// first delimiter should occur at the halfway point of the string
assertEquals(resultingString.indexOf(delimiter), (resultingString.length() / 2) - delimiter.length());
// second delimiter should occur at end of string
assertEquals(resultingString.lastIndexOf(delimiter), resultingString.length() - delimiter.length());
}
@Test
public void testEncodeWithCustomDelimiter() {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
String delimiter = "xyz";
Line line = new Line(new Configuration(Collections.singletonMap("delimiter", delimiter)), null);
Event e = new Event();
e.setField("myfield1", "myvalue1");
e.setField("myfield2", 42L);
line.encode(e, outputStream);
e.setField("myfield1", "myvalue2");
e.setField("myfield2", 43L);
line.encode(e, outputStream);
String resultingString = outputStream.toString();
// first delimiter should occur at the halfway point of the string
assertEquals(resultingString.indexOf(delimiter), (resultingString.length() / 2) - delimiter.length());
// second delimiter should occur at end of string
assertEquals(resultingString.lastIndexOf(delimiter), resultingString.length() - delimiter.length());
}
@Test
public void testEncodeWithFormat() {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Line line = new Line(new Configuration(Collections.singletonMap("format", "%{host}-%{message}")), null);
public void testEncode() throws Codec.EncodeException {
String delimiter = "z";
String message = "Hello world";
String host = "test";
String expectedOutput = host + "-" + message + Line.DEFAULT_DELIMITER;
Event e = new Event();
e.setField("message", message);
e.setField("host", host);
String expectedResult = message + delimiter;
Map<String, Object> config = new HashMap<>();
config.put("delimiter", delimiter);
config.put("format", "%{message}");
Line line = new Line(new ConfigurationImpl(config), new TestContext());
Event e = new Event(Collections.singletonMap("message", message));
byte[] b = new byte[100];
ByteBuffer buffer = ByteBuffer.wrap(b);
boolean result = line.encode(e, buffer);
Assert.assertTrue(result);
String resultString = new String(buffer.array(), buffer.position(), buffer.limit());
Assert.assertEquals(expectedResult, resultString);
}
line.encode(e, outputStream);
@Test
public void testMultipleEncodesForEvent() throws Codec.EncodeException {
String delimiter = "z";
String message = "Hello world";
String expectedResult = message + delimiter;
Map<String, Object> config = new HashMap<>();
config.put("delimiter", delimiter);
config.put("format", "%{message}");
Line line = new Line(new ConfigurationImpl(config), new TestContext());
Event e = new Event(Collections.singletonMap("message", message));
byte[] b = new byte[10];
ByteBuffer buffer = ByteBuffer.wrap(b);
String resultingString = outputStream.toString();
assertEquals(expectedOutput, resultingString);
boolean result = line.encode(e, buffer);
Assert.assertFalse(result);
String resultString = new String(buffer.array(), buffer.position(), buffer.limit());
buffer.clear();
result = line.encode(e, buffer);
Assert.assertTrue(result);
resultString += new String(buffer.array(), buffer.position(), buffer.limit());
Assert.assertEquals(expectedResult, resultString);
}
@Test
public void testEncodeNewEventBeforeFinishingPreviousThrows() {
String delimiter = "z";
String message = "Hello world";
Map<String, Object> config = new HashMap<>();
config.put("delimiter", delimiter);
config.put("format", "%{message}");
Line line = new Line(new ConfigurationImpl(config), new TestContext());
Event e1 = new Event(Collections.singletonMap("message", message));
Event e2 = new Event(Collections.singletonMap("message", message));
byte[] b = new byte[10];
ByteBuffer buffer = ByteBuffer.wrap(b);
try {
line.encode(e1, buffer);
line.encode(e2, buffer);
Assert.fail("EncodeException should be thrown because previous event was not fully encoded");
} catch (Codec.EncodeException ex) {
// this exception should be thrown
}
}
@Test
public void testEncodeWithUtf8() throws Codec.EncodeException {
String delimiter = "z";
String message = "München 安装中文输入法";
Map<String, Object> config = new HashMap<>();
config.put("delimiter", delimiter);
config.put("format", "%{message}");
Line line = new Line(new ConfigurationImpl(config), new TestContext());
Event e1 = new Event(Collections.singletonMap("message", message));
byte[] b = new byte[100];
ByteBuffer buffer = ByteBuffer.wrap(b);
boolean result = line.encode(e1, buffer);
Assert.assertTrue(result);
String expectedResult = message + delimiter;
Assert.assertEquals(expectedResult, new String(buffer.array(), buffer.position(), buffer.limit()));
}
@Test
public void testEncodeAcrossMultibyteCharBoundary() throws Codec.EncodeException {
String message = "安安安安安安安安安";
String delimiter = "";
Map<String, Object> config = new HashMap<>();
config.put("delimiter", delimiter);
config.put("format", "%{message}");
Line line = new Line(new ConfigurationImpl(config), new TestContext());
Event e1 = new Event(Collections.singletonMap("message", message));
byte[] b = new byte[10];
ByteBuffer buffer = ByteBuffer.wrap(b);
boolean result = line.encode(e1, buffer);
String intermediateResult = new String(buffer.array(), buffer.position(), buffer.limit());
Assert.assertFalse(result);
Assert.assertEquals("安安安", intermediateResult);
buffer.clear();
result = line.encode(e1, buffer);
intermediateResult = new String(buffer.array(), buffer.position(), buffer.limit());
Assert.assertFalse(result);
Assert.assertEquals("安安安", intermediateResult);
buffer.clear();
result = line.encode(e1, buffer);
intermediateResult = new String(buffer.array(), buffer.position(), buffer.limit());
Assert.assertTrue(result);
Assert.assertEquals("安安安", intermediateResult);
}
@Test
public void testEncodeWithCharset() throws Exception {
byte[] rightSingleQuoteInUtf8 = {(byte) 0xE2, (byte) 0x80, (byte) 0x99};
String rightSingleQuote = new String(rightSingleQuoteInUtf8);
// encode with cp-1252
Map<String, Object> config = new HashMap<>();
config.put("charset", "cp1252");
config.put("format", "%{message}");
config.put("delimiter", "");
Event e1 = new Event(Collections.singletonMap("message", rightSingleQuote));
Line cp1252decoder = new Line(new ConfigurationImpl(config), new TestContext());
byte[] rightSingleQuoteInCp1252 = {(byte) 0x92};
byte[] b = new byte[100];
ByteBuffer buffer = ByteBuffer.wrap(b);
boolean result = cp1252decoder.encode(e1, buffer);
Assert.assertTrue(result);
byte[] resultBytes = new byte[buffer.limit() - buffer.position()];
System.arraycopy(buffer.array(), buffer.position(), resultBytes, 0, buffer.limit() - buffer.position());
Assert.assertArrayEquals(rightSingleQuoteInCp1252, resultBytes);
}
@Test
public void testClone() throws Codec.EncodeException {
String delimiter = "x";
String charset = "cp1252";
byte[] rightSingleQuoteInUtf8 = {(byte) 0xE2, (byte) 0x80, (byte) 0x99};
String rightSingleQuote = new String(rightSingleQuoteInUtf8);
// encode with cp-1252
Map<String, Object> config = new HashMap<>();
config.put("charset", charset);
config.put("format", "%{message}");
config.put("delimiter", delimiter);
Event e1 = new Event(Collections.singletonMap("message", rightSingleQuote));
Line codec = new Line(new ConfigurationImpl(config), new TestContext());
// clone codec
Codec clone = codec.cloneCodec();
Assert.assertEquals(codec.getClass(), clone.getClass());
Line line2 = (Line)clone;
// verify charset and delimiter
byte[] rightSingleQuoteAndXInCp1252 = {(byte) 0x92, (byte) 0x78};
byte[] b = new byte[100];
ByteBuffer buffer = ByteBuffer.wrap(b);
boolean result = line2.encode(e1, buffer);
Assert.assertTrue(result);
byte[] resultBytes = new byte[buffer.limit() - buffer.position()];
System.arraycopy(buffer.array(), buffer.position(), resultBytes, 0, buffer.limit() - buffer.position());
Assert.assertArrayEquals(rightSingleQuoteAndXInCp1252, resultBytes);
}
}
@ -307,6 +405,7 @@ class TestEventConsumer implements Consumer<Map<String, Object>> {
@Override
public void accept(Map<String, Object> stringObjectMap) {
events.add(stringObjectMap);
events.add(new HashMap<>(stringObjectMap));
}
}

View file

@ -0,0 +1,85 @@
package org.logstash.plugins.filters;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.FilterMatchListener;
import org.junit.Assert;
import org.junit.Test;
import org.logstash.plugins.ConfigurationImpl;
import org.logstash.plugins.ContextImpl;
import org.logstash.plugins.PluginUtil;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class UuidTest {
private static final String ID = "uuid_test_id";
private static final NoopFilterMatchListener NO_OP_MATCH_LISTENER = new NoopFilterMatchListener();
@Test
public void testUuidWithoutRequiredConfigThrows() {
try {
Configuration config = new ConfigurationImpl(Collections.emptyMap());
Uuid uuid = new Uuid(ID, config, new ContextImpl(null));
PluginUtil.validateConfig(uuid, config);
Assert.fail("java-uuid filter without required config should have thrown exception");
} catch (IllegalStateException ex) {
Assert.assertTrue(ex.getMessage().contains("Config errors found for plugin 'java_uuid'"));
} catch (Exception ex2) {
Assert.fail("Unexpected exception for java-uuid filter without required config");
}
}
@Test
public void testUuidWithoutOverwrite() {
String targetField = "target_field";
String originalValue = "originalValue";
Map<String, Object> rawConfig = new HashMap<>();
rawConfig.put(Uuid.TARGET_CONFIG.name(), targetField);
Configuration config = new ConfigurationImpl(rawConfig);
Uuid uuid = new Uuid(ID, config, new ContextImpl(null));
PluginUtil.validateConfig(uuid, config);
org.logstash.Event e = new org.logstash.Event();
e.setField(targetField, originalValue);
Collection<Event> filteredEvents = uuid.filter(Collections.singletonList(e), NO_OP_MATCH_LISTENER);
Assert.assertEquals(1, filteredEvents.size());
Event finalEvent = filteredEvents.stream().findFirst().orElse(null);
Assert.assertNotNull(finalEvent);
Assert.assertEquals(originalValue, finalEvent.getField(targetField));
}
@Test
public void testUuidWithOverwrite() {
String targetField = "target_field";
String originalValue = "originalValue";
Map<String, Object> rawConfig = new HashMap<>();
rawConfig.put(Uuid.TARGET_CONFIG.name(), targetField);
rawConfig.put(Uuid.OVERWRITE_CONFIG.name(), true);
Configuration config = new ConfigurationImpl(rawConfig);
Uuid uuid = new Uuid(ID, config, new ContextImpl(null));
PluginUtil.validateConfig(uuid, config);
org.logstash.Event e = new org.logstash.Event();
e.setField(targetField, originalValue);
Collection<Event> filteredEvents = uuid.filter(Collections.singletonList(e), NO_OP_MATCH_LISTENER);
Assert.assertEquals(1, filteredEvents.size());
Event finalEvent = filteredEvents.stream().findFirst().orElse(null);
Assert.assertNotNull(finalEvent);
Assert.assertNotEquals(originalValue, finalEvent.getField(targetField));
Assert.assertTrue(((String)finalEvent.getField(targetField)).matches("\\b[0-9a-f]{8}\\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-\\b[0-9a-f]{12}\\b"));
}
private static class NoopFilterMatchListener implements FilterMatchListener {
@Override
public void filterMatched(Event e) {
}
}
}

View file

@ -1,9 +1,10 @@
package org.logstash.plugins.inputs;
import org.junit.Test;
import co.elastic.logstash.api.Configuration;
import org.logstash.plugins.ConfigurationImpl;
import org.logstash.plugins.TestContext;
import org.logstash.plugins.TestPluginFactory;
import org.logstash.plugins.codecs.Line;
import org.logstash.execution.queue.QueueWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
@ -13,26 +14,31 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class StdinTest {
private static final String ID = "stdin_test_id";
private static volatile Throwable stdinError;
private static volatile Thread executingThread;
@Test
public void testSimpleEvent() throws IOException {
String testInput = "foo" + Line.DEFAULT_DELIMITER;
TestQueueWriter queueWriter = testStdin(testInput.getBytes());
TestConsumer queueWriter = testStdin(testInput.getBytes());
assertEquals(1, queueWriter.getEvents().size());
}
@Test
public void testEvents() throws IOException {
String testInput = "foo" + Line.DEFAULT_DELIMITER + "bar" + Line.DEFAULT_DELIMITER + "baz" + Line.DEFAULT_DELIMITER;
TestQueueWriter queueWriter = testStdin(testInput.getBytes());
TestConsumer queueWriter = testStdin(testInput.getBytes());
assertEquals(3, queueWriter.getEvents().size());
}
@ -40,7 +46,7 @@ public class StdinTest {
public void testUtf8Events() throws IOException {
String[] inputs = {"München1", "安装中文输入法", "München3"};
String testInput = String.join(Line.DEFAULT_DELIMITER, inputs) + Line.DEFAULT_DELIMITER;
TestQueueWriter queueWriter = testStdin(testInput.getBytes());
TestConsumer queueWriter = testStdin(testInput.getBytes());
List<Map<String, Object>> events = queueWriter.getEvents();
assertEquals(3, events.size());
@ -49,20 +55,35 @@ public class StdinTest {
}
}
private static TestQueueWriter testStdin(byte[] input) throws IOException {
TestQueueWriter queueWriter = new TestQueueWriter();
private static TestConsumer testStdin(byte[] input) throws IOException {
TestConsumer consumer = new TestConsumer();
try (FileChannel inChannel = getTestFileChannel(input)) {
Stdin stdin = new Stdin(new Configuration(Collections.emptyMap()), null, inChannel);
Thread t = new Thread(() -> stdin.start(queueWriter));
t.start();
try {
Thread.sleep(50);
stdin.awaitStop();
} catch (InterruptedException e) {
fail("Stdin.awaitStop failed with exception: " + e);
Stdin stdin = new Stdin(ID, new ConfigurationImpl(Collections.emptyMap(), new TestPluginFactory()), new TestContext(), inChannel);
executingThread = Thread.currentThread();
Thread t = new Thread(() -> stdin.start(consumer));
t.setName("StdinThread");
t.setUncaughtExceptionHandler((thread, throwable) -> {
stdinError = throwable;
thread.interrupt();
executingThread.interrupt();
});
t.start();
try {
Thread.sleep(50);
stdin.awaitStop();
} catch (InterruptedException e) {
if (stdinError != null) {
fail("Error in Stdin.start: " + stdinError);
} else {
fail("Stdin.awaitStop failed with exception: " + e);
}
}
} catch (Exception e) {
fail("Unexpected exception occurred: " + e);
}
}
return queueWriter;
return consumer;
}
private static FileChannel getTestFileChannel(byte[] testBytes) throws IOException {
@ -76,14 +97,14 @@ public class StdinTest {
}
class TestQueueWriter implements QueueWriter {
class TestConsumer implements Consumer<Map<String, Object>> {
private List<Map<String, Object>> events = new ArrayList<>();
@Override
public void push(Map<String, Object> event) {
public void accept(Map<String, Object> event) {
synchronized (this) {
events.add(event);
events.add(new HashMap<>(event));
}
}

View file

@ -1,9 +1,11 @@
package org.logstash.plugins.outputs;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Event;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.junit.Test;
import org.logstash.Event;
import org.logstash.plugins.ConfigurationImpl;
import org.logstash.plugins.TestContext;
import org.logstash.plugins.TestPluginFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@ -14,8 +16,10 @@ import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.logstash.ObjectMappers.JSON_MAPPER;
public class StdoutTest {
private static final String ID = "stdout_test_id";
private static boolean streamWasClosed = false;
/**
@ -31,7 +35,8 @@ public class StdoutTest {
super.close();
}
};
Stdout stdout = new Stdout(new Configuration(Collections.emptyMap()), null, dummyOutputStream);
Stdout stdout = new Stdout(ID, new ConfigurationImpl(Collections.emptyMap(), new TestPluginFactory()),
new TestContext(), dummyOutputStream);
stdout.output(getTestEvents());
stdout.stop();
@ -43,11 +48,12 @@ public class StdoutTest {
StringBuilder expectedOutput = new StringBuilder();
Collection<Event> testEvents = getTestEvents();
for (Event e : testEvents) {
expectedOutput.append(String.format(e.toJson() + "%n"));
expectedOutput.append(String.format(JSON_MAPPER.writeValueAsString(e.getData()) + "%n"));
}
OutputStream dummyOutputStream = new ByteArrayOutputStream(0);
Stdout stdout = new Stdout(new Configuration(Collections.emptyMap()), null, dummyOutputStream);
Stdout stdout = new Stdout(ID, new ConfigurationImpl(Collections.emptyMap(), new TestPluginFactory()),
new TestContext(), dummyOutputStream);
stdout.output(testEvents);
stdout.stop();
@ -55,11 +61,11 @@ public class StdoutTest {
}
private static Collection<Event> getTestEvents() {
Event e1 = new Event();
org.logstash.Event e1 = new org.logstash.Event();
e1.setField("myField", "event1");
Event e2 = new Event();
org.logstash.Event e2 = new org.logstash.Event();
e2.setField("myField", "event2");
Event e3 = new Event();
org.logstash.Event e3 = new org.logstash.Event();
e3.setField("myField", "event3");
return Arrays.asList(e1, e2, e3);
}