mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
8810794e0b
commit
487afbc2d1
11 changed files with 60 additions and 102 deletions
|
@ -2,7 +2,6 @@
|
|||
|
||||
require "logstash/namespace"
|
||||
require "logstash/json"
|
||||
require "jruby_timestamp_ext"
|
||||
require "logstash/timestamp"
|
||||
|
||||
# Force loading the RubyUtil to ensure its loaded before the Event class is set up in Ruby since
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
# encoding: utf-8
|
||||
|
||||
require "jruby_acked_queue_ext"
|
||||
require "jruby_acked_batch_ext"
|
||||
# Force loading the RubyUtil to ensure its loaded before the WrappedAckedQueue class is set up in
|
||||
# Ruby since WrappedAckedQueue depends on Ruby classes that are dynamically set up by Java code.
|
||||
java_import org.logstash.RubyUtil
|
||||
|
||||
require "concurrent"
|
||||
# This is an adapted copy of the wrapped_synchronous_queue file
|
||||
# ideally this should be moved to Java/JRuby
|
||||
|
|
|
@ -1,11 +0,0 @@
|
|||
import org.jruby.Ruby;
|
||||
import org.jruby.runtime.load.BasicLibraryService;
|
||||
import org.logstash.ackedqueue.ext.JrubyAckedBatchExtLibrary;
|
||||
|
||||
public class JrubyAckedBatchExtService implements BasicLibraryService {
|
||||
@Override
|
||||
public boolean basicLoad(final Ruby runtime) {
|
||||
new JrubyAckedBatchExtLibrary().load(runtime, false);
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -1,13 +0,0 @@
|
|||
import org.jruby.Ruby;
|
||||
import org.jruby.runtime.load.BasicLibraryService;
|
||||
import org.logstash.ackedqueue.ext.JrubyAckedQueueExtLibrary;
|
||||
import org.logstash.ackedqueue.ext.JrubyAckedQueueMemoryExtLibrary;
|
||||
|
||||
public final class JrubyAckedQueueExtService implements BasicLibraryService {
|
||||
@Override
|
||||
public boolean basicLoad(final Ruby runtime) {
|
||||
new JrubyAckedQueueExtLibrary().load(runtime, false);
|
||||
new JrubyAckedQueueMemoryExtLibrary().load(runtime, false);
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -1,11 +0,0 @@
|
|||
import org.jruby.Ruby;
|
||||
import org.jruby.runtime.load.BasicLibraryService;
|
||||
import org.logstash.ext.JrubyTimestampExtLibrary;
|
||||
|
||||
public final class JrubyTimestampExtService implements BasicLibraryService {
|
||||
@Override
|
||||
public boolean basicLoad(final Ruby runtime) {
|
||||
new JrubyTimestampExtLibrary().load(runtime, false);
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -7,7 +7,13 @@ import org.jruby.RubyException;
|
|||
import org.jruby.RubyModule;
|
||||
import org.jruby.anno.JRubyClass;
|
||||
import org.jruby.exceptions.RaiseException;
|
||||
import org.jruby.runtime.ObjectAllocator;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.logstash.ackedqueue.ext.JrubyAckedBatchExtLibrary;
|
||||
import org.logstash.ackedqueue.ext.JrubyAckedQueueExtLibrary;
|
||||
import org.logstash.ackedqueue.ext.JrubyAckedQueueMemoryExtLibrary;
|
||||
import org.logstash.ext.JrubyEventExtLibrary;
|
||||
import org.logstash.ext.JrubyTimestampExtLibrary;
|
||||
|
||||
/**
|
||||
* Utilities around interaction with the {@link Ruby} runtime.
|
||||
|
@ -26,6 +32,10 @@ public final class RubyUtil {
|
|||
|
||||
public static final RubyClass RUBY_EVENT_CLASS;
|
||||
|
||||
public static final RubyClass RUBY_ACKED_BATCH_CLASS;
|
||||
|
||||
public static final RubyClass RUBY_TIMESTAMP_CLASS;
|
||||
|
||||
public static final RubyClass PARSER_ERROR;
|
||||
|
||||
public static final RubyClass GENERATOR_ERROR;
|
||||
|
@ -35,8 +45,15 @@ public final class RubyUtil {
|
|||
static {
|
||||
RUBY = Ruby.getGlobalRuntime();
|
||||
LOGSTASH_MODULE = RUBY.getOrCreateModule("LogStash");
|
||||
RUBY_EVENT_CLASS = RUBY.defineClassUnder(
|
||||
"Event", RUBY.getObject(), JrubyEventExtLibrary.RubyEvent::new, LOGSTASH_MODULE
|
||||
RUBY_TIMESTAMP_CLASS = setupLogstashClass("Timestamp", new ObjectAllocator() {
|
||||
@Override
|
||||
public JrubyTimestampExtLibrary.RubyTimestamp allocate(final Ruby runtime,
|
||||
final RubyClass rubyClass) {
|
||||
return new JrubyTimestampExtLibrary.RubyTimestamp(runtime, rubyClass);
|
||||
}
|
||||
}, JrubyTimestampExtLibrary.RubyTimestamp.class);
|
||||
RUBY_EVENT_CLASS = setupLogstashClass(
|
||||
"Event", JrubyEventExtLibrary.RubyEvent::new, JrubyEventExtLibrary.RubyEvent.class
|
||||
);
|
||||
final RubyModule json = LOGSTASH_MODULE.defineOrGetModuleUnder("Json");
|
||||
LOGSTASH_ERROR = LOGSTASH_MODULE.defineClassUnder(
|
||||
|
@ -63,6 +80,20 @@ public final class RubyUtil {
|
|||
RUBY_EVENT_CLASS.setConstant("VERSION_ONE", RUBY.newString(Event.VERSION_ONE));
|
||||
RUBY_EVENT_CLASS.defineAnnotatedMethods(JrubyEventExtLibrary.RubyEvent.class);
|
||||
RUBY_EVENT_CLASS.defineAnnotatedConstants(JrubyEventExtLibrary.RubyEvent.class);
|
||||
RUBY_ACKED_BATCH_CLASS = setupLogstashClass("AckedBatch", new ObjectAllocator() {
|
||||
@Override
|
||||
public IRubyObject allocate(final Ruby runtime, final RubyClass rubyClass) {
|
||||
return new JrubyAckedBatchExtLibrary.RubyAckedBatch(runtime, rubyClass);
|
||||
}
|
||||
}, JrubyAckedBatchExtLibrary.RubyAckedBatch.class);
|
||||
setupLogstashClass(
|
||||
"AckedQueue", JrubyAckedQueueExtLibrary.RubyAckedQueue::new,
|
||||
JrubyAckedQueueExtLibrary.RubyAckedQueue.class
|
||||
);
|
||||
setupLogstashClass(
|
||||
"AckedMemoryQueue", JrubyAckedQueueMemoryExtLibrary.RubyAckedMemoryQueue::new,
|
||||
JrubyAckedQueueMemoryExtLibrary.RubyAckedMemoryQueue.class
|
||||
);
|
||||
}
|
||||
|
||||
private RubyUtil() {
|
||||
|
@ -80,6 +111,22 @@ public final class RubyUtil {
|
|||
return new RaiseException(e, new NativeException(runtime, runtime.getIOError(), e));
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up a Java-defined {@link RubyClass} in the Logstash Ruby module.
|
||||
* @param name Name of the class
|
||||
* @param allocator Allocator of the class
|
||||
* @param jclass Underlying Java class that is annotated by {@link JRubyClass}
|
||||
* @return RubyClass
|
||||
*/
|
||||
private static RubyClass setupLogstashClass(final String name,
|
||||
final ObjectAllocator allocator, final Class<?> jclass) {
|
||||
final RubyClass clazz = RUBY.defineClassUnder(
|
||||
name, RUBY.getObject(), allocator, LOGSTASH_MODULE
|
||||
);
|
||||
clazz.defineAnnotatedMethods(jclass);
|
||||
return clazz;
|
||||
}
|
||||
|
||||
@JRubyClass(name = "Error")
|
||||
public static final class LogstashRubyError extends RubyException {
|
||||
|
||||
|
|
|
@ -9,10 +9,8 @@ import org.jruby.RubyClass;
|
|||
import org.jruby.RubyObject;
|
||||
import org.jruby.anno.JRubyClass;
|
||||
import org.jruby.anno.JRubyMethod;
|
||||
import org.jruby.runtime.ObjectAllocator;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.jruby.runtime.load.Library;
|
||||
import org.logstash.Event;
|
||||
import org.logstash.RubyUtil;
|
||||
import org.logstash.ackedqueue.Batch;
|
||||
|
@ -20,18 +18,7 @@ import org.logstash.ackedqueue.Queueable;
|
|||
import org.logstash.ackedqueue.io.LongVector;
|
||||
import org.logstash.ext.JrubyEventExtLibrary;
|
||||
|
||||
public final class JrubyAckedBatchExtLibrary implements Library {
|
||||
|
||||
@Override
|
||||
public void load(Ruby runtime, boolean wrap) {
|
||||
RubyClass clazz = runtime.defineClassUnder("AckedBatch", runtime.getObject(), new ObjectAllocator() {
|
||||
public IRubyObject allocate(Ruby runtime, RubyClass rubyClass) {
|
||||
return new RubyAckedBatch(runtime, rubyClass);
|
||||
}
|
||||
}, RubyUtil.LOGSTASH_MODULE);
|
||||
|
||||
clazz.defineAnnotatedMethods(RubyAckedBatch.class);
|
||||
}
|
||||
public final class JrubyAckedBatchExtLibrary {
|
||||
|
||||
@JRubyClass(name = "AckedBatch")
|
||||
public static final class RubyAckedBatch extends RubyObject {
|
||||
|
@ -40,11 +27,10 @@ public final class JrubyAckedBatchExtLibrary implements Library {
|
|||
|
||||
public RubyAckedBatch(Ruby runtime, RubyClass klass) {
|
||||
super(runtime, klass);
|
||||
this.batch = null;
|
||||
}
|
||||
|
||||
public RubyAckedBatch(Ruby runtime, Batch batch) {
|
||||
super(runtime, RubyUtil.LOGSTASH_MODULE.getClass("AckedBatch"));
|
||||
super(runtime, RubyUtil.RUBY_ACKED_BATCH_CLASS);
|
||||
this.batch = batch;
|
||||
}
|
||||
|
||||
|
|
|
@ -11,7 +11,6 @@ import org.jruby.anno.JRubyMethod;
|
|||
import org.jruby.runtime.Arity;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.jruby.runtime.load.Library;
|
||||
import org.logstash.Event;
|
||||
import org.logstash.RubyUtil;
|
||||
import org.logstash.ackedqueue.Batch;
|
||||
|
@ -21,15 +20,7 @@ import org.logstash.ackedqueue.io.FileCheckpointIO;
|
|||
import org.logstash.ackedqueue.io.MmapPageIO;
|
||||
import org.logstash.ext.JrubyEventExtLibrary;
|
||||
|
||||
public final class JrubyAckedQueueExtLibrary implements Library {
|
||||
|
||||
@Override
|
||||
public void load(Ruby runtime, boolean wrap) {
|
||||
runtime.defineClassUnder(
|
||||
"AckedQueue", runtime.getObject(), JrubyAckedQueueExtLibrary.RubyAckedQueue::new,
|
||||
RubyUtil.LOGSTASH_MODULE
|
||||
).defineAnnotatedMethods(JrubyAckedQueueExtLibrary.RubyAckedQueue.class);
|
||||
}
|
||||
public final class JrubyAckedQueueExtLibrary {
|
||||
|
||||
// TODO:
|
||||
// as a simplified first prototyping implementation, the Settings class is not exposed and the queue elements
|
||||
|
|
|
@ -11,7 +11,6 @@ import org.jruby.anno.JRubyMethod;
|
|||
import org.jruby.runtime.Arity;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.jruby.runtime.load.Library;
|
||||
import org.logstash.Event;
|
||||
import org.logstash.RubyUtil;
|
||||
import org.logstash.ackedqueue.Batch;
|
||||
|
@ -21,16 +20,7 @@ import org.logstash.ackedqueue.io.ByteBufferPageIO;
|
|||
import org.logstash.ackedqueue.io.MemoryCheckpointIO;
|
||||
import org.logstash.ext.JrubyEventExtLibrary;
|
||||
|
||||
public final class JrubyAckedQueueMemoryExtLibrary implements Library {
|
||||
|
||||
@Override
|
||||
public void load(Ruby runtime, boolean wrap) {
|
||||
runtime.defineClassUnder(
|
||||
"AckedMemoryQueue", runtime.getObject(),
|
||||
JrubyAckedQueueMemoryExtLibrary.RubyAckedMemoryQueue::new,
|
||||
RubyUtil.LOGSTASH_MODULE
|
||||
).defineAnnotatedMethods(JrubyAckedQueueMemoryExtLibrary.RubyAckedMemoryQueue.class);
|
||||
}
|
||||
public final class JrubyAckedQueueMemoryExtLibrary {
|
||||
|
||||
// TODO:
|
||||
// as a simplified first prototyping implementation, the Settings class is not exposed and the queue elements
|
||||
|
|
|
@ -13,35 +13,13 @@ import org.jruby.anno.JRubyMethod;
|
|||
import org.jruby.exceptions.RaiseException;
|
||||
import org.jruby.javasupport.JavaUtil;
|
||||
import org.jruby.runtime.Arity;
|
||||
import org.jruby.runtime.ObjectAllocator;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.jruby.runtime.load.Library;
|
||||
import org.logstash.ObjectMappers;
|
||||
import org.logstash.RubyUtil;
|
||||
import org.logstash.Timestamp;
|
||||
|
||||
public final class JrubyTimestampExtLibrary implements Library {
|
||||
|
||||
private static final ObjectAllocator ALLOCATOR = new ObjectAllocator() {
|
||||
public RubyTimestamp allocate(Ruby runtime, RubyClass rubyClass) {
|
||||
return new RubyTimestamp(runtime, rubyClass);
|
||||
}
|
||||
};
|
||||
|
||||
private static final RubyClass TIMESTAMP_CLASS = createTimestamp(RubyUtil.RUBY);
|
||||
|
||||
@Override
|
||||
public void load(Ruby runtime, boolean wrap) {
|
||||
createTimestamp(runtime);
|
||||
}
|
||||
|
||||
public static RubyClass createTimestamp(Ruby runtime) {
|
||||
RubyClass clazz =
|
||||
runtime.defineClassUnder("Timestamp", runtime.getObject(), ALLOCATOR, RubyUtil.LOGSTASH_MODULE);
|
||||
clazz.defineAnnotatedMethods(RubyTimestamp.class);
|
||||
return clazz;
|
||||
}
|
||||
public final class JrubyTimestampExtLibrary {
|
||||
|
||||
@JRubyClass(name = "Timestamp")
|
||||
@JsonSerialize(using = ObjectMappers.RubyTimestampSerializer.class)
|
||||
|
@ -61,7 +39,7 @@ public final class JrubyTimestampExtLibrary implements Library {
|
|||
}
|
||||
|
||||
public RubyTimestamp(Ruby runtime, Timestamp timestamp) {
|
||||
this(runtime, TIMESTAMP_CLASS, timestamp);
|
||||
this(runtime, RubyUtil.RUBY_TIMESTAMP_CLASS, timestamp);
|
||||
}
|
||||
|
||||
public RubyTimestamp(Ruby runtime) {
|
||||
|
|
|
@ -394,7 +394,7 @@ public final class EventTest {
|
|||
final Event event = new Event();
|
||||
final Timestamp timestamp = new Timestamp();
|
||||
event.setField("timestamp", new ConcreteJavaProxy(RubyUtil.RUBY,
|
||||
JrubyTimestampExtLibrary.createTimestamp(RubyUtil.RUBY).getRealClass(), timestamp
|
||||
RubyUtil.RUBY_TIMESTAMP_CLASS, timestamp
|
||||
));
|
||||
assertThat(event.getField("timestamp"), is(timestamp));
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue