mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
parent
208e480d97
commit
1cc5358892
8 changed files with 222 additions and 107 deletions
|
@ -7,7 +7,6 @@ require "logstash/pipeline"
|
|||
require "logstash/webserver"
|
||||
require "logstash/config/source_loader"
|
||||
require "logstash/pipeline_action"
|
||||
require "logstash/converge_result"
|
||||
require "logstash/state_resolver"
|
||||
require "stud/trap"
|
||||
require "uri"
|
||||
|
|
|
@ -1,102 +0,0 @@
|
|||
# encoding: utf-8
|
||||
|
||||
module LogStash
|
||||
# This class allow us to keep track and uniform all the return values from the
|
||||
# action task
|
||||
class ConvergeResult
|
||||
class ActionResult
|
||||
attr_reader :executed_at
|
||||
|
||||
def initialize
|
||||
@executed_at = LogStash::Timestamp.now
|
||||
end
|
||||
|
||||
# Until all the action have more granularity in the validation
|
||||
# or execution we make the ConvergeResult works with primitives and exceptions
|
||||
def self.create(action, action_result)
|
||||
if action_result.is_a?(ActionResult)
|
||||
action_result
|
||||
elsif action_result.is_a?(Exception)
|
||||
FailedAction.from_exception(action_result)
|
||||
elsif action_result == true
|
||||
SuccessfulAction.new
|
||||
elsif action_result == false
|
||||
FailedAction.from_action(action, action_result)
|
||||
else
|
||||
raise LogStash::Error, "Don't know how to handle `#{action_result.class}` for `#{action}`"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class FailedAction < ActionResult
|
||||
attr_reader :message, :backtrace
|
||||
|
||||
def initialize(message, backtrace = nil)
|
||||
super()
|
||||
|
||||
@message = message
|
||||
@backtrace = backtrace
|
||||
end
|
||||
|
||||
def self.from_exception(exception)
|
||||
FailedAction.new(exception.message, exception.backtrace)
|
||||
end
|
||||
|
||||
def self.from_action(action, action_result)
|
||||
FailedAction.new("Could not execute action: #{action}, action_result: #{action_result}")
|
||||
end
|
||||
|
||||
def successful?
|
||||
false
|
||||
end
|
||||
end
|
||||
|
||||
class SuccessfulAction < ActionResult
|
||||
def successful?
|
||||
true
|
||||
end
|
||||
end
|
||||
|
||||
def initialize(expected_actions_count)
|
||||
@expected_actions_count = expected_actions_count
|
||||
@actions = java.util.concurrent.ConcurrentHashMap.new
|
||||
end
|
||||
|
||||
def add(action, action_result)
|
||||
@actions[action] = ActionResult.create(action, action_result)
|
||||
end
|
||||
|
||||
def failed_actions
|
||||
filter_by_successful_state(false)
|
||||
end
|
||||
|
||||
def successful_actions
|
||||
filter_by_successful_state(true)
|
||||
end
|
||||
|
||||
def complete?
|
||||
total == @expected_actions_count
|
||||
end
|
||||
|
||||
def success?
|
||||
failed_actions.empty? && complete?
|
||||
end
|
||||
|
||||
def fails_count
|
||||
failed_actions.size
|
||||
end
|
||||
|
||||
def success_count
|
||||
successful_actions.size
|
||||
end
|
||||
|
||||
def total
|
||||
@actions.size
|
||||
end
|
||||
|
||||
private
|
||||
def filter_by_successful_state(predicate)
|
||||
@actions.select { |action, action_result| action_result.successful? == predicate }
|
||||
end
|
||||
end
|
||||
end
|
|
@ -2,7 +2,6 @@
|
|||
require "logstash/pipeline_action/base"
|
||||
require "logstash/pipeline"
|
||||
require "logstash/java_pipeline"
|
||||
require "logstash/converge_result"
|
||||
|
||||
module LogStash module PipelineAction
|
||||
class Create < Base
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
require "logstash/pipeline_action/base"
|
||||
require "logstash/pipeline_action/create"
|
||||
require "logstash/pipeline_action/stop"
|
||||
require "logstash/converge_result"
|
||||
|
||||
module LogStash module PipelineAction
|
||||
class Reload < Base
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
# encoding: utf-8
|
||||
require "logstash/pipeline_action/base"
|
||||
require "logstash/converge_result"
|
||||
|
||||
module LogStash module PipelineAction
|
||||
class Stop < Base
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
# encoding: utf-8
|
||||
require "logstash/converge_result"
|
||||
require "logstash/pipeline_action/stop"
|
||||
require "spec_helper"
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ import org.logstash.common.BufferedTokenizerExt;
|
|||
import org.logstash.config.ir.compiler.FilterDelegatorExt;
|
||||
import org.logstash.config.ir.compiler.OutputDelegatorExt;
|
||||
import org.logstash.config.ir.compiler.OutputStrategyExt;
|
||||
import org.logstash.execution.*;
|
||||
import org.logstash.execution.AbstractWrappedQueueExt;
|
||||
import org.logstash.execution.EventDispatcherExt;
|
||||
import org.logstash.execution.ExecutionContextExt;
|
||||
|
@ -171,6 +172,14 @@ public final class RubyUtil {
|
|||
|
||||
public static final RubyClass SHUTDOWN_WATCHER_CLASS;
|
||||
|
||||
public static final RubyClass CONVERGE_RESULT_CLASS;
|
||||
|
||||
public static final RubyClass ACTION_RESULT_CLASS;
|
||||
|
||||
public static final RubyClass FAILED_ACTION_CLASS;
|
||||
|
||||
public static final RubyClass SUCCESSFUL_ACTION_CLASS;
|
||||
|
||||
public static final RubyClass PIPELINE_REPORTER_SNAPSHOT_CLASS;
|
||||
|
||||
public static final RubyClass QUEUE_FACTORY_CLASS;
|
||||
|
@ -469,6 +478,19 @@ public final class RubyUtil {
|
|||
PIPELINE_REPORTER_SNAPSHOT_CLASS.defineAnnotatedMethods(
|
||||
PipelineReporterExt.SnapshotExt.class
|
||||
);
|
||||
CONVERGE_RESULT_CLASS = setupLogstashClass(ConvergeResultExt::new, ConvergeResultExt.class);
|
||||
ACTION_RESULT_CLASS = CONVERGE_RESULT_CLASS.defineClassUnder(
|
||||
"ActionResult", RUBY.getObject(), ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR
|
||||
);
|
||||
ACTION_RESULT_CLASS.defineAnnotatedMethods(ConvergeResultExt.ActionResultExt.class);
|
||||
SUCCESSFUL_ACTION_CLASS = CONVERGE_RESULT_CLASS.defineClassUnder(
|
||||
"SuccessfulAction", ACTION_RESULT_CLASS, ConvergeResultExt.SuccessfulActionExt::new
|
||||
);
|
||||
SUCCESSFUL_ACTION_CLASS.defineAnnotatedMethods(ConvergeResultExt.SuccessfulActionExt.class);
|
||||
FAILED_ACTION_CLASS = CONVERGE_RESULT_CLASS.defineClassUnder(
|
||||
"FailedAction", ACTION_RESULT_CLASS, ConvergeResultExt.FailedActionExt::new
|
||||
);
|
||||
FAILED_ACTION_CLASS.defineAnnotatedMethods(ConvergeResultExt.FailedActionExt.class);
|
||||
HOOKS_REGISTRY_CLASS =
|
||||
PLUGINS_MODULE.defineClassUnder("HooksRegistry", RUBY.getObject(), HooksRegistryExt::new);
|
||||
HOOKS_REGISTRY_CLASS.defineAnnotatedMethods(HooksRegistryExt.class);
|
||||
|
|
|
@ -0,0 +1,200 @@
|
|||
package org.logstash.execution;
|
||||
|
||||
import org.jruby.*;
|
||||
import org.jruby.anno.JRubyClass;
|
||||
import org.jruby.anno.JRubyMethod;
|
||||
import org.jruby.javasupport.JavaUtil;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.logstash.RubyUtil;
|
||||
import org.logstash.ext.JrubyTimestampExtLibrary;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@JRubyClass(name = "ConvergeResult")
|
||||
public class ConvergeResultExt extends RubyObject {
|
||||
private IRubyObject expectedActionsCount;
|
||||
private ConcurrentHashMap<IRubyObject, ActionResultExt> actions;
|
||||
|
||||
public ConvergeResultExt(Ruby runtime, RubyClass metaClass) {
|
||||
super(runtime, metaClass);
|
||||
}
|
||||
|
||||
@JRubyMethod
|
||||
public IRubyObject initialize(final ThreadContext context, final IRubyObject expectedActionsCount) {
|
||||
this.expectedActionsCount = expectedActionsCount;
|
||||
this.actions = new ConcurrentHashMap<>();
|
||||
return this;
|
||||
}
|
||||
|
||||
@JRubyMethod
|
||||
public IRubyObject add(final ThreadContext context, final IRubyObject action, final IRubyObject actionResult) {
|
||||
return this.actions.put(action, ActionResultExt.create(context, null, action, actionResult));
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "failed_actions")
|
||||
public IRubyObject failedActions(final ThreadContext context) {
|
||||
return JavaUtil.convertJavaToUsableRubyObject(context.runtime, filterBySuccessfulState(context, context.fals));
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "successful_actions")
|
||||
public IRubyObject successfulActions(final ThreadContext context) {
|
||||
return JavaUtil.convertJavaToUsableRubyObject(context.runtime, filterBySuccessfulState(context, context.tru));
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "complete?")
|
||||
public IRubyObject isComplete(final ThreadContext context) {
|
||||
return total(context).eql(expectedActionsCount) ? context.tru : context.fals;
|
||||
}
|
||||
|
||||
@JRubyMethod
|
||||
public IRubyObject total(final ThreadContext context) {
|
||||
return RubyUtil.RUBY.newFixnum(actions.size());
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "success?")
|
||||
public IRubyObject isSuccess(final ThreadContext context) {
|
||||
return filterBySuccessfulState(context, context.fals).isEmpty() && isComplete(context).isTrue()
|
||||
? context.tru : context.fals;
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "fails_count")
|
||||
public IRubyObject failsCount(final ThreadContext context) {
|
||||
return failedActions(context).callMethod(context, "size");
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "success_count")
|
||||
public IRubyObject successCount(final ThreadContext context) {
|
||||
return successfulActions(context).callMethod(context, "size");
|
||||
}
|
||||
|
||||
private Map<IRubyObject, ActionResultExt> filterBySuccessfulState(
|
||||
final ThreadContext context, final IRubyObject predicate) {
|
||||
final Map<IRubyObject, ActionResultExt> result = new HashMap<>();
|
||||
actions.entrySet().stream().filter(el -> el.getValue().isSuccessful(context).eql(predicate))
|
||||
.forEach(entry -> result.put(entry.getKey(), entry.getValue()));
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@JRubyClass(name = "ActionResult")
|
||||
public static abstract class ActionResultExt extends RubyBasicObject {
|
||||
private IRubyObject executedAt;
|
||||
|
||||
protected ActionResultExt(Ruby runtime, RubyClass metaClass) {
|
||||
super(runtime, metaClass);
|
||||
}
|
||||
|
||||
@JRubyMethod(meta = true)
|
||||
public static ActionResultExt create(final ThreadContext context, final IRubyObject recv,
|
||||
final IRubyObject action, final IRubyObject actionResult) {
|
||||
final ActionResultExt result;
|
||||
if (actionResult instanceof ActionResultExt) {
|
||||
result = (ActionResultExt) actionResult;
|
||||
} else if (actionResult.getMetaClass().isKindOfModule(context.runtime.getException())) {
|
||||
result = FailedActionExt.fromException(context, null, actionResult);
|
||||
} else if (actionResult.eql(context.tru)) {
|
||||
result = new SuccessfulActionExt(context.runtime, RubyUtil.SUCCESSFUL_ACTION_CLASS).initialize(context);
|
||||
} else if (actionResult.eql(context.fals)) {
|
||||
result = FailedActionExt.fromAction(context, RubyUtil.FAILED_ACTION_CLASS, action, actionResult);
|
||||
} else {
|
||||
throw context.runtime.newRaiseException(
|
||||
RubyUtil.LOGSTASH_ERROR,
|
||||
String.format("Don't know how to handle `%s` for `%s`", actionResult.getMetaClass(), action)
|
||||
);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@JRubyMethod
|
||||
public IRubyObject initialize(final ThreadContext context) {
|
||||
executedAt = JrubyTimestampExtLibrary.RubyTimestamp.ruby_now(context, null);
|
||||
return this;
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "executed_at")
|
||||
public final IRubyObject getExecutedAt() {
|
||||
return executedAt;
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "successful?")
|
||||
public final IRubyObject isSuccessful(final ThreadContext context) {
|
||||
return getSuccessFul() ? context.tru : context.fals;
|
||||
}
|
||||
|
||||
protected abstract boolean getSuccessFul();
|
||||
}
|
||||
|
||||
@JRubyClass(name = "FailedAction")
|
||||
public static final class FailedActionExt extends ActionResultExt {
|
||||
private IRubyObject message;
|
||||
private IRubyObject backtrace;
|
||||
|
||||
public FailedActionExt(Ruby runtime, RubyClass metaClass) {
|
||||
super(runtime, metaClass);
|
||||
}
|
||||
|
||||
@JRubyMethod(optional = 1)
|
||||
public FailedActionExt initialize(final ThreadContext context, final IRubyObject[] args) {
|
||||
super.initialize(context);
|
||||
message = args[0];
|
||||
backtrace = args.length > 1 ? args[1] : context.nil;
|
||||
return this;
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "message")
|
||||
public IRubyObject getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "backtrace")
|
||||
public IRubyObject getBacktrace() {
|
||||
return backtrace;
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "from_exception", meta = true)
|
||||
public static ActionResultExt fromException(final ThreadContext context, final IRubyObject recv,
|
||||
final IRubyObject exception) {
|
||||
final IRubyObject[] args = new IRubyObject[]{
|
||||
exception.callMethod(context, "message"), exception.callMethod(context, "backtrace")
|
||||
};
|
||||
return new FailedActionExt(context.runtime, RubyUtil.FAILED_ACTION_CLASS).initialize(context, args);
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "from_action", meta = true)
|
||||
public static ActionResultExt fromAction(final ThreadContext context, final IRubyObject recv,
|
||||
final IRubyObject action, final IRubyObject actionResult) {
|
||||
final IRubyObject[] args = new IRubyObject[]{
|
||||
RubyUtil.RUBY.newString(
|
||||
String.format("Could not execute action: %s, action_result: %s", action, actionResult)
|
||||
),
|
||||
};
|
||||
return new FailedActionExt(context.runtime, RubyUtil.FAILED_ACTION_CLASS).initialize(context, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean getSuccessFul() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@JRubyClass(name = "SuccessfulAction")
|
||||
public static final class SuccessfulActionExt extends ActionResultExt {
|
||||
public SuccessfulActionExt(Ruby runtime, RubyClass metaClass) {
|
||||
super(runtime, metaClass);
|
||||
}
|
||||
|
||||
@JRubyMethod
|
||||
public SuccessfulActionExt initialize(final ThreadContext context) {
|
||||
super.initialize(context);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean getSuccessFul() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue