mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Refactor: do not keep around JRuby context reference
there's no need for this and makes code base inconsistent
... also the original intent seems no longer relevant :
was introduced at 57e7a8a56b
> allows for a massive simplification for the invocation of filters and
outputs from the Java execution
Fixes #11587
This commit is contained in:
parent
e070af8e43
commit
8461a590a6
8 changed files with 15 additions and 23 deletions
|
@ -61,7 +61,7 @@ public final class ConvertedMap extends IdentityHashMap<String, Object> {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ConvertedMap newFromRubyHash(final RubyHash o) {
|
public static ConvertedMap newFromRubyHash(final RubyHash o) {
|
||||||
return newFromRubyHash(WorkerLoop.THREAD_CONTEXT.get(), o);
|
return newFromRubyHash(o.getRuntime().getCurrentContext(), o);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ConvertedMap newFromRubyHash(final ThreadContext context, final RubyHash o) {
|
public static ConvertedMap newFromRubyHash(final ThreadContext context, final RubyHash o) {
|
||||||
|
|
|
@ -11,7 +11,6 @@ import org.jruby.anno.JRubyMethod;
|
||||||
import org.jruby.runtime.ThreadContext;
|
import org.jruby.runtime.ThreadContext;
|
||||||
import org.jruby.runtime.builtin.IRubyObject;
|
import org.jruby.runtime.builtin.IRubyObject;
|
||||||
import org.logstash.RubyUtil;
|
import org.logstash.RubyUtil;
|
||||||
import org.logstash.execution.WorkerLoop;
|
|
||||||
import org.logstash.ext.JrubyEventExtLibrary;
|
import org.logstash.ext.JrubyEventExtLibrary;
|
||||||
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
|
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
|
||||||
import org.logstash.instrument.metrics.MetricKeys;
|
import org.logstash.instrument.metrics.MetricKeys;
|
||||||
|
@ -131,7 +130,7 @@ public abstract class AbstractFilterDelegatorExt extends RubyObject {
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
public RubyArray flush(final IRubyObject input) {
|
public RubyArray flush(final IRubyObject input) {
|
||||||
RubyHash options = (RubyHash) input;
|
RubyHash options = (RubyHash) input;
|
||||||
final ThreadContext context = WorkerLoop.THREAD_CONTEXT.get();
|
final ThreadContext context = options.getRuntime().getCurrentContext();
|
||||||
final IRubyObject newEvents = doFlush(context, options);
|
final IRubyObject newEvents = doFlush(context, options);
|
||||||
final RubyArray result;
|
final RubyArray result;
|
||||||
if (newEvents.isNil()) {
|
if (newEvents.isNil()) {
|
||||||
|
|
|
@ -5,6 +5,8 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
|
import org.jruby.Ruby;
|
||||||
import org.jruby.RubyString;
|
import org.jruby.RubyString;
|
||||||
import org.jruby.runtime.builtin.IRubyObject;
|
import org.jruby.runtime.builtin.IRubyObject;
|
||||||
import org.jruby.util.ByteList;
|
import org.jruby.util.ByteList;
|
||||||
|
@ -33,7 +35,6 @@ import org.logstash.config.ir.expression.binary.Or;
|
||||||
import org.logstash.config.ir.expression.binary.RegexEq;
|
import org.logstash.config.ir.expression.binary.RegexEq;
|
||||||
import org.logstash.config.ir.expression.unary.Not;
|
import org.logstash.config.ir.expression.unary.Not;
|
||||||
import org.logstash.config.ir.expression.unary.Truthy;
|
import org.logstash.config.ir.expression.unary.Truthy;
|
||||||
import org.logstash.execution.WorkerLoop;
|
|
||||||
import org.logstash.ext.JrubyEventExtLibrary;
|
import org.logstash.ext.JrubyEventExtLibrary;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -481,7 +482,7 @@ public interface EventCondition {
|
||||||
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
|
public boolean fulfilled(final JrubyEventExtLibrary.RubyEvent event) {
|
||||||
final Object tomatch = event.getEvent().getUnconvertedField(field);
|
final Object tomatch = event.getEvent().getUnconvertedField(field);
|
||||||
return tomatch instanceof RubyString &&
|
return tomatch instanceof RubyString &&
|
||||||
!((RubyString) tomatch).match(WorkerLoop.THREAD_CONTEXT.get(), regex).isNil();
|
!((RubyString) tomatch).match(RubyUtil.RUBY.getCurrentContext(), regex).isNil();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -490,10 +491,11 @@ public interface EventCondition {
|
||||||
private final boolean matches;
|
private final boolean matches;
|
||||||
|
|
||||||
private ConstantMatches(final Object constant, final String regex) {
|
private ConstantMatches(final Object constant, final String regex) {
|
||||||
|
final Ruby runtime = RubyUtil.RUBY;
|
||||||
this.matches = constant instanceof String &&
|
this.matches = constant instanceof String &&
|
||||||
!(RubyUtil.RUBY.newString((String) constant).match(
|
!(runtime.newString((String) constant).match(
|
||||||
WorkerLoop.THREAD_CONTEXT.get(),
|
runtime.getCurrentContext(),
|
||||||
RubyUtil.RUBY.newString(regex)).isNil());
|
runtime.newString(regex)).isNil());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -12,7 +12,6 @@ import org.jruby.anno.JRubyMethod;
|
||||||
import org.jruby.internal.runtime.methods.DynamicMethod;
|
import org.jruby.internal.runtime.methods.DynamicMethod;
|
||||||
import org.jruby.runtime.ThreadContext;
|
import org.jruby.runtime.ThreadContext;
|
||||||
import org.jruby.runtime.builtin.IRubyObject;
|
import org.jruby.runtime.builtin.IRubyObject;
|
||||||
import org.logstash.execution.WorkerLoop;
|
|
||||||
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
|
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
|
||||||
import org.logstash.instrument.metrics.counter.LongCounter;
|
import org.logstash.instrument.metrics.counter.LongCounter;
|
||||||
|
|
||||||
|
@ -106,7 +105,7 @@ public final class FilterDelegatorExt extends AbstractFilterDelegatorExt {
|
||||||
org.apache.logging.log4j.ThreadContext.put("plugin.id", pluginId.toString());
|
org.apache.logging.log4j.ThreadContext.put("plugin.id", pluginId.toString());
|
||||||
try {
|
try {
|
||||||
return (RubyArray) filterMethod.call(
|
return (RubyArray) filterMethod.call(
|
||||||
WorkerLoop.THREAD_CONTEXT.get(), filter, filterClass, FILTER_METHOD_NAME, batch);
|
getRuntime().getCurrentContext(), filter, filterClass, FILTER_METHOD_NAME, batch);
|
||||||
} finally {
|
} finally {
|
||||||
org.apache.logging.log4j.ThreadContext.remove("plugin.id");
|
org.apache.logging.log4j.ThreadContext.remove("plugin.id");
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,9 +76,10 @@ public class JavaFilterDelegatorExt extends AbstractFilterDelegatorExt {
|
||||||
protected IRubyObject doFlush(final ThreadContext context, final RubyHash options) {
|
protected IRubyObject doFlush(final ThreadContext context, final RubyHash options) {
|
||||||
if (filter.requiresFlush()) {
|
if (filter.requiresFlush()) {
|
||||||
Collection<Event> outputEvents = filter.flush(filterMatchListener);
|
Collection<Event> outputEvents = filter.flush(filterMatchListener);
|
||||||
@SuppressWarnings("rawtypes") RubyArray newBatch = RubyArray.newArray(RubyUtil.RUBY, outputEvents.size());
|
final Ruby runtime = context.runtime;
|
||||||
|
@SuppressWarnings("rawtypes") RubyArray newBatch = RubyArray.newArray(runtime, outputEvents.size());
|
||||||
for (Event outputEvent : outputEvents) {
|
for (Event outputEvent : outputEvents) {
|
||||||
newBatch.add(JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, (org.logstash.Event)outputEvent));
|
newBatch.add(JrubyEventExtLibrary.RubyEvent.newRubyEvent(runtime, (org.logstash.Event)outputEvent));
|
||||||
}
|
}
|
||||||
return newBatch;
|
return newBatch;
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,6 @@ import org.jruby.runtime.Block;
|
||||||
import org.jruby.runtime.ThreadContext;
|
import org.jruby.runtime.ThreadContext;
|
||||||
import org.jruby.runtime.builtin.IRubyObject;
|
import org.jruby.runtime.builtin.IRubyObject;
|
||||||
import org.logstash.execution.ExecutionContextExt;
|
import org.logstash.execution.ExecutionContextExt;
|
||||||
import org.logstash.execution.WorkerLoop;
|
|
||||||
import org.logstash.ext.JrubyEventExtLibrary;
|
import org.logstash.ext.JrubyEventExtLibrary;
|
||||||
import org.logstash.instrument.metrics.AbstractMetricExt;
|
import org.logstash.instrument.metrics.AbstractMetricExt;
|
||||||
|
|
||||||
|
@ -78,7 +77,7 @@ OutputDelegatorExt extends AbstractOutputDelegatorExt {
|
||||||
try {
|
try {
|
||||||
final IRubyObject pluginId = this.getId();
|
final IRubyObject pluginId = this.getId();
|
||||||
org.apache.logging.log4j.ThreadContext.put("plugin.id", pluginId.toString());
|
org.apache.logging.log4j.ThreadContext.put("plugin.id", pluginId.toString());
|
||||||
strategy.multiReceive(WorkerLoop.THREAD_CONTEXT.get(), (IRubyObject) batch);
|
strategy.multiReceive(getRuntime().getCurrentContext(), (IRubyObject) batch);
|
||||||
} catch (final InterruptedException ex) {
|
} catch (final InterruptedException ex) {
|
||||||
throw new IllegalStateException(ex);
|
throw new IllegalStateException(ex);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -12,14 +12,6 @@ import org.logstash.config.ir.compiler.Dataset;
|
||||||
|
|
||||||
public final class WorkerLoop implements Runnable {
|
public final class WorkerLoop implements Runnable {
|
||||||
|
|
||||||
/**
|
|
||||||
* Hard Reference to the Ruby {@link ThreadContext} for this thread. It is ok to keep
|
|
||||||
* a hard reference instead of Ruby's weak references here since we can expect worker threads
|
|
||||||
* to be runnable most of the time.
|
|
||||||
*/
|
|
||||||
public static final ThreadLocal<ThreadContext> THREAD_CONTEXT =
|
|
||||||
ThreadLocal.withInitial(RubyUtil.RUBY::getCurrentContext);
|
|
||||||
|
|
||||||
private static final Logger LOGGER = LogManager.getLogger(WorkerLoop.class);
|
private static final Logger LOGGER = LogManager.getLogger(WorkerLoop.class);
|
||||||
|
|
||||||
private final Dataset execution;
|
private final Dataset execution;
|
||||||
|
|
|
@ -23,7 +23,7 @@ public final class JrubyMemoryReadClientExtTest {
|
||||||
new ArrayBlockingQueue<>(10);
|
new ArrayBlockingQueue<>(10);
|
||||||
final JrubyMemoryReadClientExt client =
|
final JrubyMemoryReadClientExt client =
|
||||||
JrubyMemoryReadClientExt.create(queue, 5, 50);
|
JrubyMemoryReadClientExt.create(queue, 5, 50);
|
||||||
final ThreadContext context = WorkerLoop.THREAD_CONTEXT.get();
|
final ThreadContext context = client.getRuntime().getCurrentContext();
|
||||||
final QueueBatch batch = client.readBatch();
|
final QueueBatch batch = client.readBatch();
|
||||||
final RubyHash inflight = client.rubyGetInflightBatches(context);
|
final RubyHash inflight = client.rubyGetInflightBatches(context);
|
||||||
assertThat(inflight.size(), is(1));
|
assertThat(inflight.size(), is(1));
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue