diff --git a/logstash-core/lib/logstash/plugins/hooks_registry.rb b/logstash-core/lib/logstash/plugins/hooks_registry.rb index ed3e1d3f8..8bf9c3d84 100644 --- a/logstash-core/lib/logstash/plugins/hooks_registry.rb +++ b/logstash-core/lib/logstash/plugins/hooks_registry.rb @@ -1,63 +1 @@ -# encoding: utf-8 -module LogStash module Plugins - # This calls allow logstash to expose the endpoints for listeners - class HooksRegistry - java_import "java.util.concurrent.ConcurrentHashMap" - java_import "java.util.concurrent.CopyOnWriteArrayList" - - def initialize - @registered_emitters = ConcurrentHashMap.new - @registered_hooks = ConcurrentHashMap.new - end - - def register_emitter(emitter_scope, dispatcher) - @registered_emitters.put(emitter_scope, dispatcher) - sync_hooks - end - - def remove_emitter(emitter_scope) - @registered_emitters.remove(emitter_scope) - end - - def register_hooks(emitter_scope, callback) - callbacks = @registered_hooks.computeIfAbsent(emitter_scope) do - CopyOnWriteArrayList.new - end - - callbacks.add(callback) - sync_hooks - end - - def emitters_count - @registered_emitters.size - end - - def hooks_count(emitter_scope = nil) - if emitter_scope.nil? - @registered_hooks.elements().collect(&:size).reduce(0, :+) - else - callbacks = @registered_hooks.get(emitter_scope) - callbacks.nil? ? 0 : @registered_hooks.get(emitter_scope).size - end - end - - def registered_hook?(emitter_scope, klass) - callbacks = @registered_hooks[emitter_scope] - return false if callbacks.nil? - callbacks.collect(&:class).include?(klass) - end - - private - def sync_hooks - @registered_emitters.each do |emitter, dispatcher| - listeners = @registered_hooks.get(emitter) - - unless listeners.nil? - listeners.each do |listener| - dispatcher.add_listener(listener) - end - end - end - end - end -end end +# encoding: utf-8 \ No newline at end of file diff --git a/logstash-core/lib/logstash/plugins/registry.rb b/logstash-core/lib/logstash/plugins/registry.rb index cd3e20ab6..0ae9ad550 100644 --- a/logstash-core/lib/logstash/plugins/registry.rb +++ b/logstash-core/lib/logstash/plugins/registry.rb @@ -1,7 +1,6 @@ # encoding: utf-8 require "rubygems/package" require "logstash/plugin" -require "logstash/plugins/hooks_registry" require "logstash/modules/scaffold" require "logstash/codecs/base" require "logstash/filters/base" diff --git a/logstash-core/spec/logstash/plugins/hooks_registry_spec.rb b/logstash-core/spec/logstash/plugins/hooks_registry_spec.rb index 72817f9cf..300516a6c 100644 --- a/logstash-core/spec/logstash/plugins/hooks_registry_spec.rb +++ b/logstash-core/spec/logstash/plugins/hooks_registry_spec.rb @@ -1,5 +1,4 @@ # encoding: utf-8 -require "logstash/plugins/hooks_registry" describe LogStash::Plugins::HooksRegistry do class DummyEmitter diff --git a/logstash-core/src/main/java/org/logstash/RubyUtil.java b/logstash-core/src/main/java/org/logstash/RubyUtil.java index 69463a7ba..f988997f0 100644 --- a/logstash-core/src/main/java/org/logstash/RubyUtil.java +++ b/logstash-core/src/main/java/org/logstash/RubyUtil.java @@ -38,6 +38,7 @@ import org.logstash.instrument.metrics.NullNamespacedMetricExt; import org.logstash.log.LoggableExt; import org.logstash.log.LoggerExt; import org.logstash.log.SlowLoggerExt; +import org.logstash.plugins.HooksRegistryExt; import org.logstash.plugins.PluginFactoryExt; import org.logstash.plugins.UniversalPluginExt; @@ -161,6 +162,8 @@ public final class RubyUtil { public static final RubyClass PIPELINE_REPORTER_SNAPSHOT_CLASS; + public static final RubyClass HOOKS_REGISTRY_CLASS; + /** * Logstash Ruby Module. */ @@ -435,6 +438,9 @@ public final class RubyUtil { PIPELINE_REPORTER_SNAPSHOT_CLASS.defineAnnotatedMethods( PipelineReporterExt.SnapshotExt.class ); + HOOKS_REGISTRY_CLASS = + PLUGINS_MODULE.defineClassUnder("HooksRegistry", RUBY.getObject(), HooksRegistryExt::new); + HOOKS_REGISTRY_CLASS.defineAnnotatedMethods(HooksRegistryExt.class); RUBY.getGlobalVariables().set("$LS_JARS_LOADED", RUBY.newString("true")); RubyJavaIntegration.setupRubyJavaIntegration(RUBY); } diff --git a/logstash-core/src/main/java/org/logstash/plugins/HooksRegistryExt.java b/logstash-core/src/main/java/org/logstash/plugins/HooksRegistryExt.java new file mode 100644 index 000000000..b90b48d71 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/plugins/HooksRegistryExt.java @@ -0,0 +1,94 @@ +package org.logstash.plugins; + +import org.jruby.Ruby; +import org.jruby.RubyClass; +import org.jruby.RubyFixnum; +import org.jruby.RubyObject; +import org.jruby.anno.JRubyClass; +import org.jruby.anno.JRubyMethod; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +@JRubyClass(name = "HooksRegistry") +public final class HooksRegistryExt extends RubyObject { + private ConcurrentHashMap registeredEmitters; + private ConcurrentHashMap> registeredHooks; + + public HooksRegistryExt(Ruby runtime, RubyClass metaClass) { + super(runtime, metaClass); + } + + @JRubyMethod + public IRubyObject initialize(final ThreadContext context) { + registeredEmitters = new ConcurrentHashMap<>(); + registeredHooks = new ConcurrentHashMap<>(); + return this; + } + + @JRubyMethod(name = "register_emitter") + public IRubyObject registerEmitter(final ThreadContext context, final IRubyObject emitterScope, final IRubyObject dispatcher) { + registeredEmitters.put(emitterScope, dispatcher); + return syncHooks(context); + } + + @JRubyMethod(name = "remove_emitter") + public IRubyObject removeEmitter(final ThreadContext context, final IRubyObject emitterScope) { + return registeredEmitters.remove(emitterScope); + } + + @JRubyMethod(name = "register_hooks") + public IRubyObject registerHooks(final ThreadContext context, final IRubyObject emitterScope, final IRubyObject callback) { + final List callbacks = + registeredHooks.computeIfAbsent(emitterScope, iRubyObject -> new CopyOnWriteArrayList<>()); + callbacks.add(callback); + return syncHooks(context); + } + + @JRubyMethod(name = "emitters_count") + public IRubyObject emittersCount(final ThreadContext context) { + return RubyFixnum.newFixnum(context.runtime, registeredEmitters.size()); + } + + @JRubyMethod(name = "hooks_count", optional = 1) + public IRubyObject hooksCount(final ThreadContext context, final IRubyObject[] args) { + final IRubyObject emitterScope = args.length > 0 ? args[0] : context.nil; + final int hooksCount; + if (emitterScope.isNil()) { + hooksCount = registeredHooks.values().stream().mapToInt(List::size).sum(); + } else { + final List callbacks = registeredHooks.get(emitterScope); + if (callbacks == null) { + hooksCount = 0; + } else { + hooksCount = registeredHooks.get(emitterScope).size(); + } + } + return RubyFixnum.newFixnum(context.runtime, hooksCount); + } + + @JRubyMethod(name = "registered_hook?") + public IRubyObject ifRegisteredHook(final ThreadContext context, final IRubyObject emitterScope, final IRubyObject klass) { + final List callbacks = registeredHooks.get(emitterScope); + if (callbacks == null) { + return context.fals; + } + return callbacks.stream().map(IRubyObject::getMetaClass).anyMatch(clazz -> clazz.eql(klass)) + ? context.tru : context.fals; + } + + private IRubyObject syncHooks(final ThreadContext context) { + registeredEmitters.forEach((emitter, dispatcher) -> { + final List listeners = registeredHooks.get(emitter); + if (listeners != null) { + for (final IRubyObject listener : listeners) { + dispatcher.callMethod(context, "add_listener", listener); + } + } + }); + return context.nil; + } +} diff --git a/x-pack/spec/config_management/extension_spec.rb b/x-pack/spec/config_management/extension_spec.rb index d3ce55856..fa0c40478 100644 --- a/x-pack/spec/config_management/extension_spec.rb +++ b/x-pack/spec/config_management/extension_spec.rb @@ -6,7 +6,6 @@ require "spec_helper" require "logstash/environment" require "logstash/settings" require "logstash/util/time_value" -require "logstash/plugins/hooks_registry" require "config_management/extension" require "config_management/hooks"