mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
75684020e5
commit
7d6acf1799
6 changed files with 101 additions and 66 deletions
|
@ -1,63 +1 @@
|
|||
# encoding: utf-8
|
||||
module LogStash module Plugins
|
||||
# This calls allow logstash to expose the endpoints for listeners
|
||||
class HooksRegistry
|
||||
java_import "java.util.concurrent.ConcurrentHashMap"
|
||||
java_import "java.util.concurrent.CopyOnWriteArrayList"
|
||||
|
||||
def initialize
|
||||
@registered_emitters = ConcurrentHashMap.new
|
||||
@registered_hooks = ConcurrentHashMap.new
|
||||
end
|
||||
|
||||
def register_emitter(emitter_scope, dispatcher)
|
||||
@registered_emitters.put(emitter_scope, dispatcher)
|
||||
sync_hooks
|
||||
end
|
||||
|
||||
def remove_emitter(emitter_scope)
|
||||
@registered_emitters.remove(emitter_scope)
|
||||
end
|
||||
|
||||
def register_hooks(emitter_scope, callback)
|
||||
callbacks = @registered_hooks.computeIfAbsent(emitter_scope) do
|
||||
CopyOnWriteArrayList.new
|
||||
end
|
||||
|
||||
callbacks.add(callback)
|
||||
sync_hooks
|
||||
end
|
||||
|
||||
def emitters_count
|
||||
@registered_emitters.size
|
||||
end
|
||||
|
||||
def hooks_count(emitter_scope = nil)
|
||||
if emitter_scope.nil?
|
||||
@registered_hooks.elements().collect(&:size).reduce(0, :+)
|
||||
else
|
||||
callbacks = @registered_hooks.get(emitter_scope)
|
||||
callbacks.nil? ? 0 : @registered_hooks.get(emitter_scope).size
|
||||
end
|
||||
end
|
||||
|
||||
def registered_hook?(emitter_scope, klass)
|
||||
callbacks = @registered_hooks[emitter_scope]
|
||||
return false if callbacks.nil?
|
||||
callbacks.collect(&:class).include?(klass)
|
||||
end
|
||||
|
||||
private
|
||||
def sync_hooks
|
||||
@registered_emitters.each do |emitter, dispatcher|
|
||||
listeners = @registered_hooks.get(emitter)
|
||||
|
||||
unless listeners.nil?
|
||||
listeners.each do |listener|
|
||||
dispatcher.add_listener(listener)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end end
|
||||
# encoding: utf-8
|
|
@ -1,7 +1,6 @@
|
|||
# encoding: utf-8
|
||||
require "rubygems/package"
|
||||
require "logstash/plugin"
|
||||
require "logstash/plugins/hooks_registry"
|
||||
require "logstash/modules/scaffold"
|
||||
require "logstash/codecs/base"
|
||||
require "logstash/filters/base"
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
# encoding: utf-8
|
||||
require "logstash/plugins/hooks_registry"
|
||||
|
||||
describe LogStash::Plugins::HooksRegistry do
|
||||
class DummyEmitter
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.logstash.instrument.metrics.NullNamespacedMetricExt;
|
|||
import org.logstash.log.LoggableExt;
|
||||
import org.logstash.log.LoggerExt;
|
||||
import org.logstash.log.SlowLoggerExt;
|
||||
import org.logstash.plugins.HooksRegistryExt;
|
||||
import org.logstash.plugins.PluginFactoryExt;
|
||||
import org.logstash.plugins.UniversalPluginExt;
|
||||
|
||||
|
@ -161,6 +162,8 @@ public final class RubyUtil {
|
|||
|
||||
public static final RubyClass PIPELINE_REPORTER_SNAPSHOT_CLASS;
|
||||
|
||||
public static final RubyClass HOOKS_REGISTRY_CLASS;
|
||||
|
||||
/**
|
||||
* Logstash Ruby Module.
|
||||
*/
|
||||
|
@ -435,6 +438,9 @@ public final class RubyUtil {
|
|||
PIPELINE_REPORTER_SNAPSHOT_CLASS.defineAnnotatedMethods(
|
||||
PipelineReporterExt.SnapshotExt.class
|
||||
);
|
||||
HOOKS_REGISTRY_CLASS =
|
||||
PLUGINS_MODULE.defineClassUnder("HooksRegistry", RUBY.getObject(), HooksRegistryExt::new);
|
||||
HOOKS_REGISTRY_CLASS.defineAnnotatedMethods(HooksRegistryExt.class);
|
||||
RUBY.getGlobalVariables().set("$LS_JARS_LOADED", RUBY.newString("true"));
|
||||
RubyJavaIntegration.setupRubyJavaIntegration(RUBY);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
package org.logstash.plugins;
|
||||
|
||||
import org.jruby.Ruby;
|
||||
import org.jruby.RubyClass;
|
||||
import org.jruby.RubyFixnum;
|
||||
import org.jruby.RubyObject;
|
||||
import org.jruby.anno.JRubyClass;
|
||||
import org.jruby.anno.JRubyMethod;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
@JRubyClass(name = "HooksRegistry")
|
||||
public final class HooksRegistryExt extends RubyObject {
|
||||
private ConcurrentHashMap<IRubyObject, IRubyObject> registeredEmitters;
|
||||
private ConcurrentHashMap<IRubyObject, List<IRubyObject>> registeredHooks;
|
||||
|
||||
public HooksRegistryExt(Ruby runtime, RubyClass metaClass) {
|
||||
super(runtime, metaClass);
|
||||
}
|
||||
|
||||
@JRubyMethod
|
||||
public IRubyObject initialize(final ThreadContext context) {
|
||||
registeredEmitters = new ConcurrentHashMap<>();
|
||||
registeredHooks = new ConcurrentHashMap<>();
|
||||
return this;
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "register_emitter")
|
||||
public IRubyObject registerEmitter(final ThreadContext context, final IRubyObject emitterScope, final IRubyObject dispatcher) {
|
||||
registeredEmitters.put(emitterScope, dispatcher);
|
||||
return syncHooks(context);
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "remove_emitter")
|
||||
public IRubyObject removeEmitter(final ThreadContext context, final IRubyObject emitterScope) {
|
||||
return registeredEmitters.remove(emitterScope);
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "register_hooks")
|
||||
public IRubyObject registerHooks(final ThreadContext context, final IRubyObject emitterScope, final IRubyObject callback) {
|
||||
final List<IRubyObject> callbacks =
|
||||
registeredHooks.computeIfAbsent(emitterScope, iRubyObject -> new CopyOnWriteArrayList<>());
|
||||
callbacks.add(callback);
|
||||
return syncHooks(context);
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "emitters_count")
|
||||
public IRubyObject emittersCount(final ThreadContext context) {
|
||||
return RubyFixnum.newFixnum(context.runtime, registeredEmitters.size());
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "hooks_count", optional = 1)
|
||||
public IRubyObject hooksCount(final ThreadContext context, final IRubyObject[] args) {
|
||||
final IRubyObject emitterScope = args.length > 0 ? args[0] : context.nil;
|
||||
final int hooksCount;
|
||||
if (emitterScope.isNil()) {
|
||||
hooksCount = registeredHooks.values().stream().mapToInt(List::size).sum();
|
||||
} else {
|
||||
final List<IRubyObject> callbacks = registeredHooks.get(emitterScope);
|
||||
if (callbacks == null) {
|
||||
hooksCount = 0;
|
||||
} else {
|
||||
hooksCount = registeredHooks.get(emitterScope).size();
|
||||
}
|
||||
}
|
||||
return RubyFixnum.newFixnum(context.runtime, hooksCount);
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "registered_hook?")
|
||||
public IRubyObject ifRegisteredHook(final ThreadContext context, final IRubyObject emitterScope, final IRubyObject klass) {
|
||||
final List<IRubyObject> callbacks = registeredHooks.get(emitterScope);
|
||||
if (callbacks == null) {
|
||||
return context.fals;
|
||||
}
|
||||
return callbacks.stream().map(IRubyObject::getMetaClass).anyMatch(clazz -> clazz.eql(klass))
|
||||
? context.tru : context.fals;
|
||||
}
|
||||
|
||||
private IRubyObject syncHooks(final ThreadContext context) {
|
||||
registeredEmitters.forEach((emitter, dispatcher) -> {
|
||||
final List<IRubyObject> listeners = registeredHooks.get(emitter);
|
||||
if (listeners != null) {
|
||||
for (final IRubyObject listener : listeners) {
|
||||
dispatcher.callMethod(context, "add_listener", listener);
|
||||
}
|
||||
}
|
||||
});
|
||||
return context.nil;
|
||||
}
|
||||
}
|
|
@ -6,7 +6,6 @@ require "spec_helper"
|
|||
require "logstash/environment"
|
||||
require "logstash/settings"
|
||||
require "logstash/util/time_value"
|
||||
require "logstash/plugins/hooks_registry"
|
||||
require "config_management/extension"
|
||||
require "config_management/hooks"
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue