mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
refactor jar loading
This commit is contained in:
parent
0ed8cf13c9
commit
6271cbf8fb
12 changed files with 114 additions and 48 deletions
|
@ -1,5 +1,6 @@
|
|||
# encoding: utf-8
|
||||
require "clamp" # gem 'clamp'
|
||||
require "logstash/environment"
|
||||
require "logstash/errors"
|
||||
require "i18n"
|
||||
|
||||
|
@ -16,7 +17,7 @@ class LogStash::Agent < Clamp::Command
|
|||
I18n.t("logstash.agent.flag.filterworkers"),
|
||||
:attribute_name => :filter_workers, :default => 1, &:to_i
|
||||
|
||||
option "--watchdog-timeout", "SECONDS",
|
||||
option "--watchdog-timeout", "SECONDS",
|
||||
I18n.t("logstash.agent.flag.watchdog-timeout"),
|
||||
:default => 10, &:to_f
|
||||
|
||||
|
@ -25,7 +26,7 @@ class LogStash::Agent < Clamp::Command
|
|||
:attribute_name => :log_file
|
||||
|
||||
# Old support for the '-v' flag'
|
||||
option "-v", :flag,
|
||||
option "-v", :flag,
|
||||
I18n.t("logstash.agent.flag.verbosity"),
|
||||
:attribute_name => :verbosity, :multivalued => true
|
||||
|
||||
|
@ -164,7 +165,7 @@ class LogStash::Agent < Clamp::Command
|
|||
end
|
||||
|
||||
if [:debug].include?(verbosity?) || debug?
|
||||
show_gems
|
||||
show_gems
|
||||
end
|
||||
end
|
||||
end # def show_version
|
||||
|
@ -179,14 +180,7 @@ class LogStash::Agent < Clamp::Command
|
|||
end # def show_version_ruby
|
||||
|
||||
def show_version_elasticsearch
|
||||
# Not running in the,jar? assume elasticsearch jars are
|
||||
# in ../../vendor/jar/...
|
||||
if __FILE__ !~ /^(?:jar:)?file:/
|
||||
jarpath = File.join(File.dirname(__FILE__), "../../vendor/jar/elasticsearch*/lib/*.jar")
|
||||
Dir.glob(jarpath).each do |jar|
|
||||
require jar
|
||||
end
|
||||
end
|
||||
LogStash::Environment.load_elasticsearch_jars!
|
||||
|
||||
$stdout.write("Elasticsearch: ");
|
||||
org.elasticsearch.Version::main([])
|
||||
|
|
33
lib/logstash/environment.rb
Normal file
33
lib/logstash/environment.rb
Normal file
|
@ -0,0 +1,33 @@
|
|||
require "logstash/errors"
|
||||
|
||||
module LogStash
|
||||
module Environment
|
||||
extend self
|
||||
|
||||
LOGSTASH_HOME = ::File.expand_path(::File.join(::File.dirname(__FILE__), "/../.."))
|
||||
JAR_DIR = ::File.join(LOGSTASH_HOME, "/vendor/jar")
|
||||
|
||||
def load_elasticsearch_jars!
|
||||
assess_jruby!
|
||||
|
||||
require "java"
|
||||
jars_path = ::File.join(JAR_DIR, "/elasticsearch*/lib/*.jar")
|
||||
jar_files = Dir.glob(jars_path)
|
||||
|
||||
raise(LogStash::EnvironmentError, "Could not find Elasticsearh jar files under #{JAR_DIR}") if jar_files.empty?
|
||||
|
||||
jar_files.each do |jar|
|
||||
loaded = require jar
|
||||
puts("Loaded #{jar}") if $DEBUG && loaded
|
||||
end
|
||||
end
|
||||
|
||||
def assess_jruby!(exception_class = nil, message = nil)
|
||||
raise(exception_class || LogStash::EnvironmentError, message || "JRuby is required") unless jruby?
|
||||
end
|
||||
|
||||
def jruby?
|
||||
RUBY_PLATFORM == "java"
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,6 +1,7 @@
|
|||
# encoding: utf-8
|
||||
module LogStash
|
||||
class Error < ::StandardError; end
|
||||
class EnvironmentError < Error; end
|
||||
class ConfigurationError < Error; end
|
||||
class PluginLoadingError < Error; end
|
||||
class ShutdownSignal < StandardError; end
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
# encoding: utf-8
|
||||
require "logstash/inputs/base"
|
||||
require "logstash/errors"
|
||||
require "logstash/environment"
|
||||
require "logstash/namespace"
|
||||
require "logstash/util/socket_peer"
|
||||
require "socket"
|
||||
|
@ -40,13 +42,14 @@ class LogStash::Inputs::Log4j < LogStash::Inputs::Base
|
|||
|
||||
public
|
||||
def register
|
||||
LogStash::Environment.load_elasticsearch_jars!
|
||||
require "java"
|
||||
require "jruby/serialization"
|
||||
|
||||
if __FILE__ !~ /^(jar:)?file:\/\//
|
||||
if File.exists?("vendor/jar/elasticsearch-0.90.3/lib/log4j-1.2.17.jar")
|
||||
require "vendor/jar/elasticsearch-0.90.3/lib/log4j-1.2.17.jar"
|
||||
end
|
||||
begin
|
||||
Java::OrgApacheLog4jSpi.const_get("LoggingEvent")
|
||||
rescue
|
||||
raise(LogStash::PluginLoadingError, "Log4j java library not loaded")
|
||||
end
|
||||
|
||||
if server?
|
||||
|
@ -62,7 +65,7 @@ class LogStash::Inputs::Log4j < LogStash::Inputs::Base
|
|||
# JRubyObjectInputStream uses JRuby class path to find the class to de-serialize to
|
||||
ois = JRubyObjectInputStream.new(java.io.BufferedInputStream.new(socket.to_inputstream))
|
||||
loop do
|
||||
# NOTE: event_raw is org.apache.log4j.spi.LoggingEvent
|
||||
# NOTE: log4j_obj is org.apache.log4j.spi.LoggingEvent
|
||||
log4j_obj = ois.readObject
|
||||
event = LogStash::Event.new("message" => log4j_obj.getRenderedMessage)
|
||||
decorate(event)
|
||||
|
@ -76,13 +79,13 @@ class LogStash::Inputs::Log4j < LogStash::Inputs::Base
|
|||
event["method"] = log4j_obj.getLocationInformation.getMethodName
|
||||
event["NDC"] = log4j_obj.getNDC if log4j_obj.getNDC
|
||||
event["stack_trace"] = log4j_obj.getThrowableStrRep.to_a.join("\n") if log4j_obj.getThrowableInformation
|
||||
|
||||
|
||||
# Add the MDC context properties to '@fields'
|
||||
if log4j_obj.getProperties
|
||||
log4j_obj.getPropertyKeySet.each do |key|
|
||||
event[key] = log4j_obj.getProperty(key)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
output_queue << event
|
||||
end # loop do
|
||||
|
|
|
@ -1,9 +0,0 @@
|
|||
# encoding: utf-8
|
||||
jarpath = File.join(File.dirname(__FILE__), "../../vendor/**/*.jar")
|
||||
Dir[jarpath].each do |jar|
|
||||
if $DEBUG
|
||||
puts "Loading #{jar}"
|
||||
end
|
||||
require jar
|
||||
end
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
# encoding: utf-8
|
||||
require "logstash/namespace"
|
||||
require "logstash/environment"
|
||||
require "logstash/outputs/base"
|
||||
require "stud/buffer"
|
||||
require "socket" # for Socket.gethostname
|
||||
|
@ -196,11 +197,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
|
|||
|
||||
if ["node", "transport"].include?(@protocol)
|
||||
# Node or TransportClient; requires JRuby
|
||||
if RUBY_PLATFORM != "java"
|
||||
raise LogStash::PluginLoadingError, "This configuration requires JRuby. If you are not using JRuby, you must set 'protocol' to 'http'. For example: output { elasticsearch { protocol => \"http\" } }"
|
||||
end
|
||||
LogStash::Environment.assess_jruby!(LogStash::PluginLoadingError, "This configuration requires JRuby. If you are not using JRuby, you must set 'protocol' to 'http'. For example: output { elasticsearch { protocol => \"http\" } }")
|
||||
LogStash::Environment.load_elasticsearch_jars!
|
||||
|
||||
require "logstash/loadlibs"
|
||||
# setup log4j properties for Elasticsearch
|
||||
LogStash::Logger.setup_log4j(@logger)
|
||||
end
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
# encoding: utf-8
|
||||
require "logstash/environment"
|
||||
require "logstash/namespace"
|
||||
require "logstash/outputs/base"
|
||||
require "json"
|
||||
|
@ -84,14 +85,7 @@ class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base
|
|||
|
||||
public
|
||||
def register
|
||||
|
||||
# TODO(sissel): find a better way of declaring where the elasticsearch
|
||||
# libraries are
|
||||
# TODO(sissel): can skip this step if we're running from a jar.
|
||||
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/**/*.jar")
|
||||
Dir[jarpath].each do |jar|
|
||||
require jar
|
||||
end
|
||||
LogStash::Environment.load_elasticsearch_jars!
|
||||
prepare_river
|
||||
end
|
||||
|
||||
|
|
|
@ -128,6 +128,7 @@ class LogStash::Plugin
|
|||
#
|
||||
# And expects to find LogStash::Filters::Grok (or something similar based
|
||||
# on pattern matching
|
||||
|
||||
path = "logstash/#{type}s/#{name}"
|
||||
require(path)
|
||||
|
||||
|
@ -135,9 +136,11 @@ class LogStash::Plugin
|
|||
klass = nil
|
||||
#klass_sym = base.constants.find { |c| c.to_s =~ /^#{Regexp.quote(name)}$/i }
|
||||
#if klass_sym.nil?
|
||||
|
||||
|
||||
# Look for a plugin by the config_name
|
||||
klass_sym = base.constants.find { |k| base.const_get(k).config_name == name }
|
||||
# the namespace can contain constants which are not for plugins classes (do not respond to :config_name)
|
||||
# for example, the ElasticSearch output adds the LogStash::Outputs::Elasticsearch::Protocols namespace
|
||||
klass_sym = base.constants.find { |c| o = base.const_get(c); o.respond_to?(:config_name) && o.config_name == name }
|
||||
klass = base.const_get(klass_sym)
|
||||
|
||||
raise LoadError if klass.nil?
|
||||
|
|
13
spec/environmnet.rb
Normal file
13
spec/environmnet.rb
Normal file
|
@ -0,0 +1,13 @@
|
|||
require "logstash/environment"
|
||||
|
||||
describe LogStash::Environment do
|
||||
|
||||
it "should load elasticsarch jars" do
|
||||
expect {LogStash::Environment.load_elasticsearch_jars!}.to_not raise_error
|
||||
end
|
||||
|
||||
it "should raise when cannot find elasticsarch jars" do
|
||||
stub_const("LogStash::Environment::JAR_DIR", "/some/invalid/path")
|
||||
expect {LogStash::Environment.load_elasticsearch_jars!}.to raise_error(LogStash::EnvironmentError)
|
||||
end
|
||||
end
|
13
spec/inputs/log4j.rb
Normal file
13
spec/inputs/log4j.rb
Normal file
|
@ -0,0 +1,13 @@
|
|||
# encoding: utf-8
|
||||
|
||||
require "logstash/plugin"
|
||||
|
||||
describe "inputs/log4j" do
|
||||
|
||||
it "should register" do
|
||||
input = LogStash::Plugin.lookup("input", "log4j").new("mode" => "client")
|
||||
|
||||
# register will try to load jars and raise if it cannot find jars or if org.apache.log4j.spi.LoggingEvent class is not present
|
||||
expect {input.register}.to_not raise_error
|
||||
end
|
||||
end
|
|
@ -1,10 +1,18 @@
|
|||
require "test_utils"
|
||||
require "ftw"
|
||||
require "logstash/plugin"
|
||||
|
||||
describe "outputs/elasticsearch", :elasticsearch => true do
|
||||
describe "outputs/elasticsearch" do
|
||||
extend LogStash::RSpec
|
||||
|
||||
describe "ship lots of events w/ default index_type" do
|
||||
it "should register" do
|
||||
output = LogStash::Plugin.lookup("output", "elasticsearch").new("embedded" => "false", "protocol" => "transport", "manage_template" => "false")
|
||||
|
||||
# register will try to load jars and raise if it cannot find jars
|
||||
expect {output.register}.to_not raise_error
|
||||
end
|
||||
|
||||
describe "ship lots of events w/ default index_type", :elasticsearch => true do
|
||||
# Generate a random index name
|
||||
index = 10.times.collect { rand(10).to_s }.join("")
|
||||
type = 10.times.collect { rand(10).to_s }.join("")
|
||||
|
@ -64,7 +72,7 @@ describe "outputs/elasticsearch", :elasticsearch => true do
|
|||
end
|
||||
end
|
||||
|
||||
describe "testing index_type" do
|
||||
describe "testing index_type", :elasticsearch => true do
|
||||
describe "no type value" do
|
||||
# Generate a random index name
|
||||
index = 10.times.collect { rand(10).to_s }.join("")
|
||||
|
@ -159,7 +167,7 @@ describe "outputs/elasticsearch", :elasticsearch => true do
|
|||
end
|
||||
end
|
||||
|
||||
describe "action => ..." do
|
||||
describe "action => ...", :elasticsearch => true do
|
||||
index_name = 10.times.collect { rand(10).to_s }.join("")
|
||||
|
||||
config <<-CONFIG
|
||||
|
@ -201,7 +209,7 @@ describe "outputs/elasticsearch", :elasticsearch => true do
|
|||
end
|
||||
end
|
||||
|
||||
describe "default event type value" do
|
||||
describe "default event type value", :elasticsearch => true do
|
||||
# Generate a random index name
|
||||
index = 10.times.collect { rand(10).to_s }.join("")
|
||||
event_count = 100 + rand(100)
|
||||
|
@ -249,7 +257,7 @@ describe "outputs/elasticsearch", :elasticsearch => true do
|
|||
end
|
||||
end
|
||||
|
||||
describe "index template expected behavior" do
|
||||
describe "index template expected behavior", :elasticsearch => true do
|
||||
["node", "transport", "http"].each do |protocol|
|
||||
context "with protocol => #{protocol}" do
|
||||
subject do
|
||||
|
|
14
spec/outputs/elasticsearch_river.rb
Normal file
14
spec/outputs/elasticsearch_river.rb
Normal file
|
@ -0,0 +1,14 @@
|
|||
# encoding: utf-8
|
||||
|
||||
require "logstash/plugin"
|
||||
|
||||
describe "outputs/elasticsearch_river" do
|
||||
|
||||
it "should register" do
|
||||
output = LogStash::Plugin.lookup("output", "elasticsearch_river").new("es_host" => "localhost", "rabbitmq_host" => "localhost")
|
||||
output.stub(:prepare_river)
|
||||
|
||||
# register will try to load jars and raise if it cannot find jars
|
||||
expect {output.register}.to_not raise_error
|
||||
end
|
||||
end
|
Loading…
Add table
Add a link
Reference in a new issue