JAVAFICATION: Port EventDispatcher to Java

Fixes #9633
This commit is contained in:
Armin 2018-05-22 12:44:24 +02:00 committed by Armin Braun
parent 8759050aa4
commit 0dc5c4af64
6 changed files with 82 additions and 45 deletions

View file

@ -3,10 +3,8 @@ require "logstash/environment"
require "logstash/config/cpu_core_strategy"
require "logstash/instrument/collector"
require "logstash/instrument/periodic_pollers"
require "logstash/instrument/collector"
require "logstash/pipeline"
require "logstash/webserver"
require "logstash/event_dispatcher"
require "logstash/config/source_loader"
require "logstash/pipeline_action"
require "logstash/converge_result"

View file

@ -1,40 +1 @@
# encoding: utf-8
module LogStash
class EventDispatcher
java_import "java.util.concurrent.CopyOnWriteArraySet"
attr_reader :emitter
def initialize(emitter)
@emitter = emitter
@listeners = CopyOnWriteArraySet.new
end
# This operation is slow because we use a CopyOnWriteArrayList
# But the majority of the addition will be done at bootstrap time
# So add_listener shouldn't be called often at runtime.
#
# On the other hand the notification could be called really often.
def add_listener(listener)
@listeners.add(listener)
end
# This operation is slow because we use a `CopyOnWriteArrayList` as the backend, instead of a
# ConcurrentHashMap, but since we are mostly adding stuff and iterating the `CopyOnWriteArrayList`
# should provide a better performance.
#
# See note on add_listener, this method shouldn't be called really often.
def remove_listener(listener)
@listeners.remove(listener)
end
def fire(method_name, *arguments)
@listeners.each do |listener|
if listener.respond_to?(method_name)
listener.send(method_name, emitter, *arguments)
end
end
end
alias_method :execute, :fire
end
end
# Keeping this file for backwards compatibility with plugins that include it directly.

View file

@ -1,7 +1,5 @@
# encoding: utf-8
#
require "logstash/event_dispatcher"
describe LogStash::EventDispatcher do
class DummyEmitter
attr_reader :dispatcher

View file

@ -1,5 +1,4 @@
# encoding: utf-8
require "logstash/event_dispatcher"
require "logstash/plugins/hooks_registry"
describe LogStash::Plugins::HooksRegistry do

View file

@ -15,6 +15,7 @@ import org.logstash.common.BufferedTokenizerExt;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.OutputDelegatorExt;
import org.logstash.config.ir.compiler.OutputStrategyExt;
import org.logstash.execution.EventDispatcherExt;
import org.logstash.execution.ExecutionContextExt;
import org.logstash.execution.QueueReadClientBase;
import org.logstash.ext.JRubyLogstashErrorsExt;
@ -153,6 +154,8 @@ public final class RubyUtil {
public static final RubyClass UNIVERSAL_PLUGIN_CLASS;
public static final RubyClass EVENT_DISPATCHER_CLASS;
/**
* Logstash Ruby Module.
*/
@ -416,6 +419,8 @@ public final class RubyUtil {
PLUGIN_FACTORY_CLASS.defineAnnotatedMethods(PluginFactoryExt.Plugins.class);
UNIVERSAL_PLUGIN_CLASS =
setupLogstashClass(UniversalPluginExt::new, UniversalPluginExt.class);
EVENT_DISPATCHER_CLASS =
setupLogstashClass(EventDispatcherExt::new, EventDispatcherExt.class);
RUBY.getGlobalVariables().set("$LS_JARS_LOADED", RUBY.newString("true"));
RubyJavaIntegration.setupRubyJavaIntegration(RUBY);
}

View file

@ -0,0 +1,76 @@
package org.logstash.execution;
import java.util.Collection;
import java.util.concurrent.CopyOnWriteArraySet;
import org.jruby.Ruby;
import org.jruby.RubyBasicObject;
import org.jruby.RubyClass;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
@JRubyClass(name = "EventDispatcher")
public final class EventDispatcherExt extends RubyBasicObject {
private final Collection<IRubyObject> listeners = new CopyOnWriteArraySet<>();
private IRubyObject emitter;
public EventDispatcherExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod
public EventDispatcherExt initialize(final ThreadContext context, final IRubyObject emitter) {
this.emitter = emitter;
return this;
}
@JRubyMethod
public IRubyObject emitter() {
return emitter;
}
/**
* This operation is slow because we use a CopyOnWriteArrayList
* But the majority of the addition will be done at bootstrap time
* So add_listener shouldn't be called often at runtime.
* On the other hand the notification could be called really often.
* @param context ThreadContext
* @param listener Listener
* @return Nil
*/
@JRubyMethod(name = "add_listener")
public IRubyObject addListener(final ThreadContext context, final IRubyObject listener) {
return listeners.add(listener) ? context.tru : context.fals;
}
/**
* This operation is slow because we use a `CopyOnWriteArrayList` as the backend, instead of a
* ConcurrentHashMap, but since we are mostly adding stuff and iterating the `CopyOnWriteArrayList`
* should provide a better performance.
* See note on add_listener, this method shouldn't be called really often.
* @param context ThreadContext
* @param listener Listener
* @return True iff listener was actually removed
*/
@JRubyMethod(name = "remove_listener")
public IRubyObject removeListener(final ThreadContext context, final IRubyObject listener) {
return listeners.remove(listener) ? context.tru : context.fals;
}
@JRubyMethod(name = {"execute", "fire"}, required = 1, rest = true)
public IRubyObject fire(final ThreadContext context, final IRubyObject[] arguments) {
final String methodName = arguments[0].asJavaString();
final IRubyObject[] args = new IRubyObject[arguments.length];
args[0] = emitter;
System.arraycopy(arguments, 1, args, 1, arguments.length - 1);
listeners.forEach(listener -> {
if (listener.respondsTo(methodName)) {
listener.callMethod(context, methodName, args);
}
});
return context.nil;
}
}