Port Logger and SlowLogger to Java

Fixes #9520
This commit is contained in:
Dan Hermann 2018-04-30 06:27:20 -05:00
parent 09382d232a
commit 5b576f96d4
10 changed files with 289 additions and 163 deletions

View file

@ -1,6 +1,7 @@
# encoding: utf-8
#
java_import org.apache.logging.log4j.core.LoggerContext
java_import java.lang.IllegalArgumentException
module LogStash
module Api
@ -30,6 +31,9 @@ module LogStash
raise ArgumentError, I18n.t("logstash.web_api.logging.unrecognized_option", :option => remaining.keys.first)
end
respond_with({"acknowledged" => true})
rescue IllegalArgumentException => e
status 400
respond_with({"error" => e.message})
rescue ArgumentError => e
status 400
respond_with({"error" => e.message})

View file

@ -1,2 +1 @@
# encoding: utf-8
require "logstash/logging/logger"

View file

@ -1,158 +1 @@
require "uri"
module LogStash
module Logging
java_import org.apache.logging.log4j.Level
java_import org.apache.logging.log4j.LogManager
java_import org.apache.logging.log4j.core.config.Configurator
java_import org.apache.logging.log4j.core.config.DefaultConfiguration
java_import org.apache.logging.log4j.core.config.LoggerConfig
java_import org.logstash.log.LogstashLoggerContextFactory
java_import org.apache.logging.log4j.core.LoggerContext
java_import java.net.URI
class Logger
@@config_mutex = Mutex.new
def initialize(name)
@logger = LogManager.getLogger(name)
end
def debug?
@logger.is_debug_enabled
end
def info?
@logger.is_info_enabled
end
def error?
@logger.is_error_enabled
end
def warn?
@logger.is_warn_enabled
end
def fatal?
@logger.is_fatal_enabled
end
def trace?
@logger.is_trace_enabled
end
def debug(message, data = {})
@logger.debug(message, data)
end
def warn(message, data = {})
@logger.warn(message, data)
end
def info(message, data = {})
@logger.info(message, data)
end
def error(message, data = {})
@logger.error(message, data)
end
def fatal(message, data = {})
@logger.fatal(message, data)
end
def trace(message, data = {})
@logger.trace(message, data)
end
def self.configure_logging(level, path = LogManager::ROOT_LOGGER_NAME)
@@config_mutex.synchronize { set_level(level, path) }
rescue Exception => e
raise ArgumentError, "invalid level[#{level}] for logger[#{path}]"
end
def self.reconfigure(config_location)
@@config_mutex.synchronize do
config_location_uri = URI.create(config_location)
file_path = config_location_uri.path
if ::File.exists?(file_path)
logs_location = java.lang.System.getProperty("ls.logs")
puts "Sending Logstash's logs to #{logs_location} which is now configured via log4j2.properties"
#reconfigure the default context to use our log4j2.properties file
get_logging_context.setConfigLocation(URI.create(config_location))
#ensure everyone agrees which context to use for the LogManager
context_factory = LogstashLoggerContextFactory.new(get_logging_context)
LogManager.setFactory(context_factory)
else
# fall back to default config
puts "Could not find log4j2 configuration at path #{file_path}. Using default config which logs errors to the console"
end
end
end
# until dev_utils/rspec/spec_helper is changed, we need to have both methods
singleton_class.send(:alias_method, :initialize, :reconfigure)
def self.get_logging_context
return LoggerContext.getContext(false)
end
# Clone of org.apache.logging.log4j.core.config.Configurator.setLevel(), but ensure the proper context is used
def self.set_level(_level, path)
configuration = get_logging_context.getConfiguration()
level = Level.valueOf(_level)
if path.nil? || path.strip.empty?
root_logger = configuration.getRootLogger()
if root_logger.getLevel() != level
root_logger.setLevel(level)
get_logging_context.updateLoggers()
end
else
package_logger = configuration.getLoggerConfig(path)
if package_logger.name != path #no package logger found
configuration.addLogger(path, LoggerConfig.new(path, level, true))
get_logging_context.updateLoggers()
elsif package_logger.getLevel() != level
package_logger.setLevel(level)
get_logging_context.updateLoggers()
end
end
end
private_class_method :set_level
end
class SlowLogger
def initialize(name, warn_threshold, info_threshold, debug_threshold, trace_threshold)
slowlog_name = ["slowlog", name].join('.')
@slowlogger = LogManager.getLogger(slowlog_name)
@warn_threshold = warn_threshold
@info_threshold = info_threshold
@debug_threshold = debug_threshold
@trace_threshold = trace_threshold
end
def as_data(plugin_params, event, took_in_nanos)
{
:plugin_params => plugin_params,
:took_in_nanos => took_in_nanos,
:took_in_millis => took_in_nanos / 1000000,
:event => event.to_json
}
end
def on_event(message, plugin_params, event, took_in_nanos)
if @warn_threshold >= 0 and took_in_nanos > @warn_threshold
@slowlogger.warn(message, as_data(plugin_params, event, took_in_nanos))
elsif @info_threshold >= 0 and took_in_nanos > @info_threshold
@slowlogger.info(message, as_data(plugin_params, event, took_in_nanos))
elsif @debug_threshold >= 0 and took_in_nanos > @debug_threshold
@slowlogger.debug(message, as_data(plugin_params, event, took_in_nanos))
elsif @trace_threshold >= 0 and took_in_nanos > @trace_threshold
@slowlogger.trace(message, as_data(plugin_params, event, took_in_nanos))
end
end
end
end
end
# Keeping this file for backwards compatibility with plugins that include it directly.

View file

@ -33,6 +33,8 @@ import org.logstash.instrument.metrics.MetricExt;
import org.logstash.instrument.metrics.NamespacedMetricExt;
import org.logstash.instrument.metrics.NullMetricExt;
import org.logstash.instrument.metrics.NullNamespacedMetricExt;
import org.logstash.log.LoggerExt;
import org.logstash.log.SlowLoggerExt;
import org.logstash.plugins.PluginFactoryExt;
/**
@ -135,6 +137,10 @@ public final class RubyUtil {
public static final RubyClass PLUGIN_METRIC_FACTORY_CLASS;
public static final RubyClass LOGGER;
public static final RubyClass SLOW_LOGGER;
/**
* Logstash Ruby Module.
*/
@ -323,6 +329,14 @@ public final class RubyUtil {
FILTER_DELEGATOR_CLASS = setupLogstashClass(
FilterDelegatorExt::new, FilterDelegatorExt.class
);
final RubyModule loggingModule = LOGSTASH_MODULE.defineOrGetModuleUnder("Logging");
LOGGER = loggingModule.defineClassUnder("Logger", RUBY.getObject(), LoggerExt::new);
LOGGER.defineAnnotatedMethods(LoggerExt.class);
SLOW_LOGGER = loggingModule.defineClassUnder(
"SlowLogger", RUBY.getObject(), SlowLoggerExt::new);
SLOW_LOGGER.defineAnnotatedMethods(SlowLoggerExt.class);
final RubyModule json = LOGSTASH_MODULE.defineOrGetModuleUnder("Json");
final RubyClass stdErr = RUBY.getStandardError();
LOGSTASH_ERROR = LOGSTASH_MODULE.defineClassUnder(

View file

@ -0,0 +1,196 @@
package org.logstash.log;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.jruby.Ruby;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.RubyString;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import java.io.File;
import java.net.URI;
@JRubyClass(name = "Logger")
public class LoggerExt extends RubyObject {
private static final Object CONFIG_LOCK = new Object();
private Logger logger;
public LoggerExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod
public IRubyObject initialize(final ThreadContext context, final IRubyObject loggerName) {
logger = LogManager.getLogger(loggerName.asJavaString());
return this;
}
@JRubyMethod(name = "debug?")
public RubyBoolean isDebug(final ThreadContext context) {
return logger.isDebugEnabled() ? context.tru : context.fals;
}
@JRubyMethod(name = "error?")
public RubyBoolean isError(final ThreadContext context) {
return logger.isErrorEnabled() ? context.tru : context.fals;
}
@JRubyMethod(name = "warn?")
public RubyBoolean isWarn(final ThreadContext context) {
return logger.isWarnEnabled() ? context.tru : context.fals;
}
@JRubyMethod(name = "fatal?")
public RubyBoolean isFatal(final ThreadContext context) {
return logger.isDebugEnabled() ? context.tru : context.fals;
}
@JRubyMethod(name = "trace?")
public RubyBoolean isTrace(final ThreadContext context) {
return logger.isDebugEnabled() ? context.tru : context.fals;
}
@JRubyMethod(name = "debug", required = 1, optional = 1)
public IRubyObject rubyDebug(final ThreadContext context, final IRubyObject[] args) {
if (args.length > 1) {
logger.debug(args[0].asJavaString(), args[1]);
} else {
logger.debug(args[0].asJavaString());
}
return this;
}
@JRubyMethod(name = "warn", required = 1, optional = 1)
public IRubyObject rubyWarn(final ThreadContext context, final IRubyObject[] args) {
if (args.length > 1) {
logger.warn(args[0].asJavaString(), args[1]);
} else {
logger.warn(args[0].asJavaString());
}
return this;
}
@JRubyMethod(name = "info", required = 1, optional = 1)
public IRubyObject rubyInfo(final ThreadContext context, final IRubyObject[] args) {
if (args.length > 1) {
logger.info(args[0].asJavaString(), args[1]);
} else {
logger.info(args[0].asJavaString());
}
return this;
}
@JRubyMethod(name = "error", required = 1, optional = 1)
public IRubyObject rubyError(final ThreadContext context, final IRubyObject[] args) {
if (args.length > 1) {
logger.error(args[0].asJavaString(), args[1]);
} else {
logger.error(args[0].asJavaString());
}
return this;
}
@JRubyMethod(name = "fatal", required = 1, optional = 1)
public IRubyObject rubyFatal(final ThreadContext context, final IRubyObject[] args) {
if (args.length > 1) {
logger.fatal(args[0].asJavaString(), args[1]);
} else {
logger.fatal(args[0].asJavaString());
}
return this;
}
@JRubyMethod(name = "trace", required = 1, optional = 1)
public IRubyObject rubyTrace(final ThreadContext context, final IRubyObject[] args) {
if (args.length > 1) {
logger.trace(args[0].asJavaString(), args[1]);
} else {
logger.trace(args[0].asJavaString());
}
return this;
}
@JRubyMethod(name = "configure_logging", meta = true, required = 1, optional = 1)
public static IRubyObject configureLogging(final ThreadContext context, final IRubyObject self,
final IRubyObject args[]) {
synchronized (CONFIG_LOCK) {
RubyString path = args.length > 1 ? (RubyString) args[1] : null;
String level = args[0].asJavaString();
try {
setLevel(level, (path == null || path.isNil()) ? null : path.asJavaString());
} catch (Exception e) {
throw new IllegalArgumentException(
String.format("invalid level[%s] for logger[%s]", level, path));
}
}
return context.nil;
}
@JRubyMethod(meta = true)
public static IRubyObject reconfigure(final ThreadContext context, final IRubyObject self,
final IRubyObject configPath) {
synchronized (CONFIG_LOCK) {
URI configLocation = URI.create(configPath.asJavaString());
String filePath = configLocation.getPath();
File configFile = new File(filePath);
if (configFile.exists()) {
String logsLocation = System.getProperty("ls.logs");
System.out.println(String.format(
"Sending Logstash logs to %s which is now configured via log4j2.properties",
logsLocation));
LoggerContext loggerContext = LoggerContext.getContext(false);
loggerContext.setConfigLocation(configLocation);
LogManager.setFactory(new LogstashLoggerContextFactory(loggerContext));
} else {
System.out.println(String.format(
"Could not find log4j2 configuration at path %s. Using default config " +
"which logs errors to the console",
filePath));
}
}
return context.nil;
}
@JRubyMethod(name = "get_logging_context", meta = true)
public static IRubyObject getLoggingContext(final ThreadContext context,
final IRubyObject self) {
return JavaUtil.convertJavaToUsableRubyObject(
context.runtime, LoggerContext.getContext(false));
}
private static void setLevel(String level, String loggerPath) {
LoggerContext loggerContext = LoggerContext.getContext(false);
Configuration config = loggerContext.getConfiguration();
Level logLevel = Level.valueOf(level);
if (loggerPath == null || loggerPath.equals("")) {
LoggerConfig rootLogger = config.getRootLogger();
if (rootLogger.getLevel() != logLevel) {
rootLogger.setLevel(logLevel);
loggerContext.updateLoggers();
}
} else {
LoggerConfig packageLogger = config.getLoggerConfig(loggerPath);
if (!packageLogger.getName().equals(loggerPath)) {
config.addLogger(loggerPath, new LoggerConfig(loggerPath, logLevel, true));
loggerContext.updateLoggers();
} else if (packageLogger.getLevel() != logLevel) {
packageLogger.setLevel(logLevel);
loggerContext.updateLoggers();
}
}
}
}

View file

@ -0,0 +1,74 @@
package org.logstash.log;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyHash;
import org.jruby.RubyNumeric;
import org.jruby.RubyObject;
import org.jruby.RubySymbol;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
@JRubyClass(name = "SlowLogger")
public class SlowLoggerExt extends RubyObject {
private static final RubySymbol PLUGIN_PARAMS = RubyUtil.RUBY.newSymbol("plugin_params");
private static final RubySymbol TOOK_IN_NANOS = RubyUtil.RUBY.newSymbol("took_in_nanos");
private static final RubySymbol TOOK_IN_MILLIS = RubyUtil.RUBY.newSymbol("took_in_millis");
private static final RubySymbol EVENT = RubyUtil.RUBY.newSymbol("event");
private static final RubyNumeric NANO_TO_MILLI = RubyUtil.RUBY.newFixnum(1000000);
private Logger slowLogger;
private long warnThreshold;
private long infoThreshold;
private long debugThreshold;
private long traceThreshold;
public SlowLoggerExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod(required = 5)
public IRubyObject initialize(final ThreadContext context, final IRubyObject[] args) {
String loggerName = args[0].asJavaString();
slowLogger = LogManager.getLogger("slowlog." + loggerName);
warnThreshold = ((RubyNumeric) args[1]).getLongValue();
infoThreshold = ((RubyNumeric) args[2]).getLongValue();
debugThreshold = ((RubyNumeric) args[3]).getLongValue();
traceThreshold = ((RubyNumeric) args[4]).getLongValue();
return this;
}
private RubyHash asData(final ThreadContext context, final IRubyObject pluginParams,
final IRubyObject event, final IRubyObject durationNanos) {
RubyHash data = RubyHash.newHash(context.runtime);
data.put(PLUGIN_PARAMS, pluginParams);
data.put(TOOK_IN_NANOS, durationNanos);
data.put(TOOK_IN_MILLIS, ((RubyNumeric)durationNanos).div(context, NANO_TO_MILLI));
data.put(EVENT, event.callMethod(context, "to_json"));
return data;
}
@JRubyMethod(name = "on_event", required = 4)
public IRubyObject onEvent(final ThreadContext context, final IRubyObject[] args) {
String message = args[0].asJavaString();
long eventDurationNanos = ((RubyNumeric)args[3]).getLongValue();
if (warnThreshold >= 0 && eventDurationNanos > warnThreshold) {
slowLogger.warn(message, asData(context, args[1], args[2], args[3]));
} else if (infoThreshold >= 0 && eventDurationNanos > infoThreshold) {
slowLogger.info(message, asData(context, args[1], args[2], args[3]));
} else if (debugThreshold >= 0 && eventDurationNanos > debugThreshold) {
slowLogger.debug(message, asData(context, args[1], args[2], args[3]));
} else if (traceThreshold >= 0 && eventDurationNanos > traceThreshold) {
slowLogger.trace(message, asData(context, args[1], args[2], args[3]));
}
return context.nil;
}
}

View file

@ -3,7 +3,6 @@
# you may not use this file except in compliance with the Elastic License.
require "logstash/bootstrap_check/default_config"
require "logstash/logging/logger"
java_import java.util.concurrent.TimeUnit

View file

@ -5,7 +5,6 @@
require "logstash/config/pipeline_config"
require "logstash/config/source/base"
require "logstash/config/source_loader"
require "logstash/logging/logger"
require "logstash/outputs/elasticsearch"
require "logstash/json"
require 'helpers/elasticsearch_options'

View file

@ -4,7 +4,6 @@
require "logstash/environment"
require "logstash/universal_plugin"
require "logstash/logging/logger"
require "logstash/runner"
require "config_management/hooks"
require "config_management/elasticsearch_source"

View file

@ -3,7 +3,6 @@
# you may not use this file except in compliance with the Elastic License.
require "logstash/runner"
require "logstash/logging/logger"
require "config_management/bootstrap_check"
require "config_management/elasticsearch_source"
require "logstash/config/source_loader"