Provide DLQ writer interface to Java plugins

Fixes #10790
This commit is contained in:
Dan Hermann 2019-05-15 10:37:33 -05:00
parent c22edf07f6
commit 9a06b204bf
7 changed files with 85 additions and 7 deletions

View file

@ -1,7 +1,6 @@
package co.elastic.logstash.api; package co.elastic.logstash.api;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.logstash.common.io.DeadLetterQueueWriter;
/** /**
* Provides Logstash context to plugins. * Provides Logstash context to plugins.

View file

@ -0,0 +1,12 @@
package co.elastic.logstash.api;
import java.io.IOException;
public interface DeadLetterQueueWriter {
void writeEntry(Event event, Plugin plugin, String reason) throws IOException;
boolean isOpen();
long getCurrentQueueSize();
}

View file

@ -0,0 +1,32 @@
package org.logstash.common;
import co.elastic.logstash.api.DeadLetterQueueWriter;
import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.Plugin;
import java.io.IOException;
import java.util.Objects;
public class DLQWriterAdapter implements DeadLetterQueueWriter {
private final org.logstash.common.io.DeadLetterQueueWriter dlqWriter;
public DLQWriterAdapter(org.logstash.common.io.DeadLetterQueueWriter dlqWriter) {
this.dlqWriter = Objects.requireNonNull(dlqWriter);
}
@Override
public void writeEntry(Event event, Plugin plugin, String reason) throws IOException {
dlqWriter.writeEntry((org.logstash.Event) event, plugin.getName(), plugin.getId(), reason);
}
@Override
public boolean isOpen() {
return dlqWriter != null && dlqWriter.isOpen();
}
@Override
public long getCurrentQueueSize() {
return dlqWriter != null ? dlqWriter.getCurrentQueueSize() : 0;
}
}

View file

@ -0,0 +1,33 @@
package org.logstash.common;
import co.elastic.logstash.api.DeadLetterQueueWriter;
import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.Plugin;
import java.io.IOException;
public class NullDeadLetterQueueWriter implements DeadLetterQueueWriter {
private static final NullDeadLetterQueueWriter INSTANCE = new NullDeadLetterQueueWriter();
private NullDeadLetterQueueWriter() {
}
public static NullDeadLetterQueueWriter getInstance() {
return INSTANCE;
}
@Override
public void writeEntry(Event event, Plugin plugin, String reason) throws IOException {
// no-op
}
@Override
public boolean isOpen() {
return false;
}
@Override
public long getCurrentQueueSize() {
return 0;
}
}

View file

@ -1,6 +1,7 @@
package org.logstash.plugins; package org.logstash.plugins;
import co.elastic.logstash.api.Context; import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.DeadLetterQueueWriter;
import co.elastic.logstash.api.Event; import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.EventFactory; import co.elastic.logstash.api.EventFactory;
import co.elastic.logstash.api.Metric; import co.elastic.logstash.api.Metric;
@ -9,7 +10,6 @@ import co.elastic.logstash.api.Plugin;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.logstash.ConvertedMap; import org.logstash.ConvertedMap;
import org.logstash.common.io.DeadLetterQueueWriter;
import java.io.Serializable; import java.io.Serializable;
import java.util.Map; import java.util.Map;

View file

@ -3,6 +3,7 @@ package org.logstash.plugins;
import co.elastic.logstash.api.Codec; import co.elastic.logstash.api.Codec;
import co.elastic.logstash.api.Configuration; import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context; import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.DeadLetterQueueWriter;
import co.elastic.logstash.api.Filter; import co.elastic.logstash.api.Filter;
import co.elastic.logstash.api.Input; import co.elastic.logstash.api.Input;
import co.elastic.logstash.api.Output; import co.elastic.logstash.api.Output;
@ -21,7 +22,8 @@ import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject; import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil; import org.logstash.RubyUtil;
import org.logstash.common.AbstractDeadLetterQueueWriterExt; import org.logstash.common.AbstractDeadLetterQueueWriterExt;
import org.logstash.common.io.DeadLetterQueueWriter; import org.logstash.common.DLQWriterAdapter;
import org.logstash.common.NullDeadLetterQueueWriter;
import org.logstash.config.ir.PipelineIR; import org.logstash.config.ir.PipelineIR;
import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt; import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt;
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt; import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
@ -395,7 +397,7 @@ public final class PluginFactoryExt {
} }
public Context toContext(PluginLookup.PluginType pluginType, AbstractNamespacedMetricExt metric) { public Context toContext(PluginLookup.PluginType pluginType, AbstractNamespacedMetricExt metric) {
DeadLetterQueueWriter dlq = null; DeadLetterQueueWriter dlq = NullDeadLetterQueueWriter.getInstance();
if (pluginType == PluginLookup.PluginType.OUTPUT) { if (pluginType == PluginLookup.PluginType.OUTPUT) {
if (dlqWriter instanceof AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt) { if (dlqWriter instanceof AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt) {
IRubyObject innerWriter = IRubyObject innerWriter =
@ -403,8 +405,8 @@ public final class PluginFactoryExt {
.innerWriter(RubyUtil.RUBY.getCurrentContext()); .innerWriter(RubyUtil.RUBY.getCurrentContext());
if (innerWriter != null) { if (innerWriter != null) {
if (innerWriter.getJavaClass().equals(DeadLetterQueueWriter.class)) { if (org.logstash.common.io.DeadLetterQueueWriter.class.isAssignableFrom(innerWriter.getJavaClass())) {
dlq = innerWriter.toJava(DeadLetterQueueWriter.class); dlq = new DLQWriterAdapter(innerWriter.toJava(org.logstash.common.io.DeadLetterQueueWriter.class));
} }
} }
} }

View file

@ -1,11 +1,11 @@
package org.logstash.plugins; package org.logstash.plugins;
import co.elastic.logstash.api.Context; import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.DeadLetterQueueWriter;
import co.elastic.logstash.api.EventFactory; import co.elastic.logstash.api.EventFactory;
import co.elastic.logstash.api.NamespacedMetric; import co.elastic.logstash.api.NamespacedMetric;
import co.elastic.logstash.api.Plugin; import co.elastic.logstash.api.Plugin;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.logstash.common.io.DeadLetterQueueWriter;
public class TestContext implements Context { public class TestContext implements Context {