mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
avoid race condition when initializing events and pipelines metrics (#9950)
This commit is contained in:
parent
796eb3e4b3
commit
5a49102b89
6 changed files with 39 additions and 27 deletions
|
@ -106,16 +106,18 @@ public abstract class AbstractOutputDelegatorExt extends RubyObject {
|
|||
this.metric = metric;
|
||||
final ThreadContext context = RubyUtil.RUBY.getCurrentContext();
|
||||
this.id = RubyString.newString(context.runtime, id);
|
||||
namespacedMetric = metric.namespace(context, context.runtime.newSymbol(id));
|
||||
metricEvents = namespacedMetric.namespace(context, MetricKeys.EVENTS_KEY);
|
||||
namespacedMetric.gauge(
|
||||
context, MetricKeys.NAME_KEY, configName(context)
|
||||
);
|
||||
eventMetricOut = LongCounter.fromRubyBase(metricEvents, MetricKeys.OUT_KEY);
|
||||
eventMetricIn = LongCounter.fromRubyBase(metricEvents, MetricKeys.IN_KEY);
|
||||
eventMetricTime = LongCounter.fromRubyBase(
|
||||
metricEvents, MetricKeys.DURATION_IN_MILLIS_KEY
|
||||
);
|
||||
synchronized (metric) {
|
||||
namespacedMetric = metric.namespace(context, context.runtime.newSymbol(id));
|
||||
metricEvents = namespacedMetric.namespace(context, MetricKeys.EVENTS_KEY);
|
||||
namespacedMetric.gauge(
|
||||
context, MetricKeys.NAME_KEY, configName(context)
|
||||
);
|
||||
eventMetricOut = LongCounter.fromRubyBase(metricEvents, MetricKeys.OUT_KEY);
|
||||
eventMetricIn = LongCounter.fromRubyBase(metricEvents, MetricKeys.IN_KEY);
|
||||
eventMetricTime = LongCounter.fromRubyBase(
|
||||
metricEvents, MetricKeys.DURATION_IN_MILLIS_KEY
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract IRubyObject getConfigName(ThreadContext context);
|
||||
|
|
|
@ -53,15 +53,14 @@ public final class FilterDelegatorExt extends RubyObject {
|
|||
this.filter = filter;
|
||||
filterClass = filter.getSingletonClass().getRealClass();
|
||||
filterMethod = filterClass.searchMethod(FILTER_METHOD_NAME);
|
||||
final AbstractNamespacedMetricExt namespacedMetric =
|
||||
(AbstractNamespacedMetricExt) filter.callMethod(context, "metric");
|
||||
metricEvents = namespacedMetric.namespace(context, MetricKeys.EVENTS_KEY);
|
||||
eventMetricOut = LongCounter.fromRubyBase(metricEvents, MetricKeys.OUT_KEY);
|
||||
eventMetricIn = LongCounter.fromRubyBase(metricEvents, MetricKeys.IN_KEY);
|
||||
eventMetricTime = LongCounter.fromRubyBase(
|
||||
metricEvents, MetricKeys.DURATION_IN_MILLIS_KEY
|
||||
);
|
||||
namespacedMetric.gauge(context, MetricKeys.NAME_KEY, configName(context));
|
||||
final AbstractNamespacedMetricExt namespacedMetric = (AbstractNamespacedMetricExt) filter.callMethod(context, "metric");
|
||||
synchronized(namespacedMetric.getMetric()) {
|
||||
metricEvents = namespacedMetric.namespace(context, MetricKeys.EVENTS_KEY);
|
||||
eventMetricOut = LongCounter.fromRubyBase(metricEvents, MetricKeys.OUT_KEY);
|
||||
eventMetricIn = LongCounter.fromRubyBase(metricEvents, MetricKeys.IN_KEY);
|
||||
eventMetricTime = LongCounter.fromRubyBase(metricEvents, MetricKeys.DURATION_IN_MILLIS_KEY);
|
||||
namespacedMetric.gauge(context, MetricKeys.NAME_KEY, configName(context));
|
||||
}
|
||||
flushes = filter.respondsTo("flush");
|
||||
return this;
|
||||
}
|
||||
|
|
|
@ -52,19 +52,22 @@ public abstract class QueueReadClientBase extends RubyObject implements QueueRea
|
|||
@JRubyMethod(name = "set_events_metric")
|
||||
public IRubyObject setEventsMetric(final IRubyObject metric) {
|
||||
final AbstractNamespacedMetricExt namespacedMetric = (AbstractNamespacedMetricExt) metric;
|
||||
eventMetricOut = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.OUT_KEY);
|
||||
eventMetricFiltered = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.FILTERED_KEY);
|
||||
eventMetricTime = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.DURATION_IN_MILLIS_KEY);
|
||||
synchronized(namespacedMetric.getMetric()) {
|
||||
eventMetricOut = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.OUT_KEY);
|
||||
eventMetricFiltered = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.FILTERED_KEY);
|
||||
eventMetricTime = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.DURATION_IN_MILLIS_KEY);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "set_pipeline_metric")
|
||||
public IRubyObject setPipelineMetric(final IRubyObject metric) {
|
||||
final AbstractNamespacedMetricExt namespacedMetric = (AbstractNamespacedMetricExt) metric;
|
||||
pipelineMetricOut = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.OUT_KEY);
|
||||
pipelineMetricFiltered = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.FILTERED_KEY);
|
||||
pipelineMetricTime =
|
||||
LongCounter.fromRubyBase(namespacedMetric, MetricKeys.DURATION_IN_MILLIS_KEY);
|
||||
synchronized(namespacedMetric.getMetric()) {
|
||||
pipelineMetricOut = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.OUT_KEY);
|
||||
pipelineMetricFiltered = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.FILTERED_KEY);
|
||||
pipelineMetricTime = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.DURATION_IN_MILLIS_KEY);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -68,4 +68,6 @@ public abstract class AbstractNamespacedMetricExt extends AbstractMetricExt {
|
|||
protected abstract IRubyObject doIncrement(ThreadContext context, IRubyObject[] args);
|
||||
|
||||
protected abstract IRubyObject doDecrement(ThreadContext context, IRubyObject[] args);
|
||||
|
||||
public abstract AbstractMetricExt getMetric();
|
||||
}
|
||||
|
|
|
@ -103,4 +103,7 @@ public final class NamespacedMetricExt extends AbstractNamespacedMetricExt {
|
|||
name instanceof RubyArray ? name : RubyArray.newArray(context.runtime, name)
|
||||
));
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractMetricExt getMetric() { return this.metric; }
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ public final class NullNamespacedMetricExt extends AbstractNamespacedMetricExt {
|
|||
@JRubyMethod(optional = 2)
|
||||
public NullNamespacedMetricExt initialize(final ThreadContext context,
|
||||
final IRubyObject[] args) {
|
||||
this.metric = args.length > 0 && !args[0].isNil() ? (NullMetricExt) args[0] : null;
|
||||
this.metric = args.length > 0 && !args[0].isNil() ? (NullMetricExt) args[0] : new NullMetricExt(context.runtime, metaClass);
|
||||
final IRubyObject namespaceName = args.length == 2 ? args[1] : NULL;
|
||||
if (namespaceName instanceof RubyArray) {
|
||||
this.namespaceName = (RubyArray) namespaceName;
|
||||
|
@ -99,6 +99,9 @@ public final class NullNamespacedMetricExt extends AbstractNamespacedMetricExt {
|
|||
));
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractMetricExt getMetric() { return this.metric; }
|
||||
|
||||
@JRubyClass(name = "NullCounter")
|
||||
public static final class NullCounter extends RubyObject {
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue