mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
- Components can now add global flags
This commit is contained in:
parent
c2ea10ed5d
commit
71903bdc22
3 changed files with 100 additions and 26 deletions
|
@ -44,7 +44,7 @@ class LogStash::Agent
|
||||||
@plugin_paths = []
|
@plugin_paths = []
|
||||||
|
|
||||||
# Add logstash's plugin path (plugin paths must contain inputs, outputs, filters)
|
# Add logstash's plugin path (plugin paths must contain inputs, outputs, filters)
|
||||||
@plugin_paths += File.dirname(__FILE__)
|
@plugin_paths << File.dirname(__FILE__)
|
||||||
|
|
||||||
# TODO(sissel): Other default plugin paths?
|
# TODO(sissel): Other default plugin paths?
|
||||||
|
|
||||||
|
@ -80,53 +80,94 @@ class LogStash::Agent
|
||||||
@verbose += 1
|
@verbose += 1
|
||||||
end
|
end
|
||||||
|
|
||||||
opts.on("-p PLUGIN_PATH", "--plugin_path PLUGIN_PATH",
|
opts.on("-p PLUGIN_PATH", "--pluginpath PLUGIN_PATH",
|
||||||
"A colon-delimited path to find plugins in.") do |path|
|
"A colon-delimited path to find plugins in.") do |path|
|
||||||
@plugin_paths += p unless @plugin_paths.include?(p)
|
path.split(":").each do |p|
|
||||||
|
@plugin_paths << p unless @plugin_paths.include?(p)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end # def options
|
||||||
|
|
||||||
# Parse options.
|
# Parse options.
|
||||||
private
|
private
|
||||||
def parse_options
|
def parse_options
|
||||||
@opts = OptionParser.new
|
@opts = OptionParser.new
|
||||||
|
|
||||||
# Step one is to add agent flags.
|
# Step one is to add agent flags.
|
||||||
options(@opts)
|
options(@opts)
|
||||||
|
|
||||||
# Apply agent flags, save unknown options for plugins to parse later.
|
# TODO(sissel): Check for plugin_path flags, add them to @plugin_paths.
|
||||||
@remaining_args = @opts.permute(@argv)
|
@argv.each_with_index do |arg, index|
|
||||||
|
next unless arg =~ /^(?:-p|--pluginpath)(?:=(.*))?$/
|
||||||
|
path = $1
|
||||||
|
if path.nil?
|
||||||
|
path = @argv[index + 1]
|
||||||
|
end
|
||||||
|
|
||||||
|
@plugin_paths += path.split(":")
|
||||||
|
end # @argv.each
|
||||||
|
|
||||||
# At this point, we should load any plugin-specific flags.
|
# At this point, we should load any plugin-specific flags.
|
||||||
# These are 'unknown' flags that begin --<plugin>-flag
|
# These are 'unknown' flags that begin --<plugin>-flag
|
||||||
# Put any plugin paths into the ruby library path for requiring later.
|
# Put any plugin paths into the ruby library path for requiring later.
|
||||||
@plugin_paths.each do |p|
|
@plugin_paths.each do |p|
|
||||||
@logger.info "Adding #{p.inspect} to $:"
|
@logger.info "Adding #{p.inspect} to ruby load path"
|
||||||
$:.unshift p
|
$:.unshift p
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# TODO(sissel): Go through all inputs, filters, and outputs to get the flags.
|
||||||
|
# Add plugin flags to @opts
|
||||||
|
|
||||||
# Load any plugins that we have flags for.
|
# Load any plugins that we have flags for.
|
||||||
# TODO(sissel): The --<plugin> flag support currently will load
|
# TODO(sissel): The --<plugin> flag support currently will load
|
||||||
# any matching plugins input, output, or filter. This means, for example,
|
# any matching plugins input, output, or filter. This means, for example,
|
||||||
# that the 'amqp' input *and* output plugin will be loaded if you pass
|
# that the 'amqp' input *and* output plugin will be loaded if you pass
|
||||||
# --amqp-foo flag. This might cause confusion, but it seems reasonable for
|
# --amqp-foo flag. This might cause confusion, but it seems reasonable for
|
||||||
# now that any same-named component will have the same flags.
|
# now that any same-named component will have the same flags.
|
||||||
remaining_args.each do |arg|
|
plugins = []
|
||||||
next unless arg =~ /^--[A-z]/ # skip things that don't look like flags
|
@argv.each do |arg|
|
||||||
name = arg.split("-")[2]
|
# skip things that don't look like plugin flags
|
||||||
|
next unless arg =~ /^--[A-z0-9]+-/
|
||||||
|
name = arg.split("-")[2] # pull the plugin name out
|
||||||
|
|
||||||
|
# Try to load any plugin by that name
|
||||||
%w{inputs outputs filters}.each do |component|
|
%w{inputs outputs filters}.each do |component|
|
||||||
@plugin_paths.each do |path|
|
@plugin_paths.each do |path|
|
||||||
plugin = File.join(path, component, name)
|
plugin = File.join(path, component, name) + ".rb"
|
||||||
|
@logger.debug("Flag #{arg} found; trying to load #{plugin}")
|
||||||
if File.file?(plugin)
|
if File.file?(plugin)
|
||||||
@logger.info("Loading plugin #{plugin}")
|
@logger.info("Loading plugin #{plugin}")
|
||||||
require plugin
|
require plugin
|
||||||
|
[LogStash::Inputs, LogStash::Filters, LogStash::Outputs].each do |c|
|
||||||
|
# If we get flag --foo-bar, check for LogStash::Inputs::Foo
|
||||||
|
# and add any options to our option parser.
|
||||||
|
klass_name = name.capitalize
|
||||||
|
if c.const_defined?(klass_name)
|
||||||
|
@logger.info("Found plugin class #{c}::#{klass_name})")
|
||||||
|
klass = c.const_get(klass_name)
|
||||||
|
# See LogStash::Config::Mixin::DSL#options
|
||||||
|
klass.options(@opts)
|
||||||
|
plugins << klass
|
||||||
|
end # c.const_defined?
|
||||||
|
end # each component type (input/filter/outputs)
|
||||||
end # if File.file?(plugin)
|
end # if File.file?(plugin)
|
||||||
end # @plugin_paths.each
|
end # @plugin_paths.each
|
||||||
end # %{inputs outputs filters}.each
|
end # %{inputs outputs filters}.each
|
||||||
end # remaining_args.each
|
|
||||||
|
|
||||||
# TODO(sissel): Go through all inputs, filters, and outputs to get the flags.
|
#if !found
|
||||||
logger.info("remaining_args" => remaining_args)
|
#@logger.fatal("Flag #{arg.inspect} requires plugin #{name}, but no plugin found.")
|
||||||
|
#return false
|
||||||
|
#end
|
||||||
|
end # @remaining_args.each
|
||||||
|
|
||||||
|
begin
|
||||||
|
@opts.parse!(@argv)
|
||||||
|
rescue OptionParser::InvalidOption => e
|
||||||
|
@logger.info e
|
||||||
|
raise e
|
||||||
|
end
|
||||||
|
|
||||||
|
return true
|
||||||
end # def parse_options
|
end # def parse_options
|
||||||
|
|
||||||
private
|
private
|
||||||
|
@ -165,7 +206,12 @@ class LogStash::Agent
|
||||||
public
|
public
|
||||||
def run
|
def run
|
||||||
JThread.currentThread().setName(self.class.name)
|
JThread.currentThread().setName(self.class.name)
|
||||||
parse_options
|
ok = parse_options
|
||||||
|
if !ok
|
||||||
|
raise "Option parsing failed. See error log."
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
configure
|
configure
|
||||||
|
|
||||||
# Load the config file
|
# Load the config file
|
||||||
|
@ -252,10 +298,10 @@ class LogStash::Agent
|
||||||
queue = SizedQueue.new(10)
|
queue = SizedQueue.new(10)
|
||||||
output_queue.add_queue(queue)
|
output_queue.add_queue(queue)
|
||||||
@threads["outputs/#{output.to_s}"] = Thread.new(queue) do |queue|
|
@threads["outputs/#{output.to_s}"] = Thread.new(queue) do |queue|
|
||||||
|
output.register
|
||||||
begin
|
begin
|
||||||
JThread.currentThread().setName("output/#{output.to_s}")
|
JThread.currentThread().setName("output/#{output.to_s}")
|
||||||
output.logger = @logger
|
output.logger = @logger
|
||||||
output.register
|
|
||||||
|
|
||||||
while event = queue.pop do
|
while event = queue.pop do
|
||||||
@logger.debug("Sending event to #{output.to_s}")
|
@logger.debug("Sending event to #{output.to_s}")
|
||||||
|
|
|
@ -59,7 +59,7 @@ module LogStash::Config::Mixin
|
||||||
end # def config_init
|
end # def config_init
|
||||||
|
|
||||||
module DSL
|
module DSL
|
||||||
|
attr_accessor :flags
|
||||||
# If name is given, set the name and return it.
|
# If name is given, set the name and return it.
|
||||||
# If no name given (nil), return the current name.
|
# If no name given (nil), return the current name.
|
||||||
def config_name(name=nil)
|
def config_name(name=nil)
|
||||||
|
@ -79,11 +79,24 @@ module LogStash::Config::Mixin
|
||||||
@required << name if opts[:required] == true
|
@required << name if opts[:required] == true
|
||||||
end # def config
|
end # def config
|
||||||
|
|
||||||
def flag(name, opts={})
|
def flag(*args, &block)
|
||||||
@flags ||= Hash.new
|
@flags ||= []
|
||||||
|
|
||||||
name = name.to_s if name.is_a?(Symbol)
|
@flags << {
|
||||||
@flags[name] = opts[:validate] # ok if this is nil
|
:args => args,
|
||||||
|
:block => block
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
def options(opts)
|
||||||
|
# add any options from this class
|
||||||
|
prefix = self.name.split("::").last.downcase
|
||||||
|
@flags.each do |flag|
|
||||||
|
flagpart = flag[:args].first.gsub(/^--/,"")
|
||||||
|
# TODO(sissel): logger things here could help debugging.
|
||||||
|
|
||||||
|
opts.on("--#{prefix}-#{flagpart}", *flag[:args][1..-1], &flag[:block])
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# This is called whenever someone subclasses a class that has this mixin.
|
# This is called whenever someone subclasses a class that has this mixin.
|
||||||
|
|
|
@ -11,6 +11,15 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
|
||||||
config :patterns_dir
|
config :patterns_dir
|
||||||
config :drop_if_match, :validate => :boolean # googlecode/issue/26
|
config :drop_if_match, :validate => :boolean # googlecode/issue/26
|
||||||
|
|
||||||
|
class << self
|
||||||
|
attr_reader :patterns_dir
|
||||||
|
end
|
||||||
|
|
||||||
|
flag("--patterns-path PATH", "Colon-delimited path of patterns to load") do |val|
|
||||||
|
@patterns_dir ||= ["#{File.dirname(__FILE__)}/../../../patterns/*"]
|
||||||
|
@patterns_dir += val.split(":")
|
||||||
|
end
|
||||||
|
|
||||||
@@grokpiles = Hash.new { |h, k| h[k] = [] }
|
@@grokpiles = Hash.new { |h, k| h[k] = [] }
|
||||||
@@grokpiles_lock = Mutex.new
|
@@grokpiles_lock = Mutex.new
|
||||||
|
|
||||||
|
@ -21,10 +30,16 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
|
||||||
|
|
||||||
public
|
public
|
||||||
def register
|
def register
|
||||||
@patterns_dir ||= "#{File.dirname(__FILE__)}/../../../patterns/*"
|
|
||||||
@pile = Grok::Pile.new
|
@pile = Grok::Pile.new
|
||||||
Dir.glob(@patterns_dir).each do |path|
|
self.class.patterns_dir.each do |path|
|
||||||
@pile.add_patterns_from_file(path)
|
if File.directory?(path)
|
||||||
|
path = File.join(path, "*")
|
||||||
|
end
|
||||||
|
|
||||||
|
Dir.glob(path).each do |file|
|
||||||
|
@logger.info("Grok loading patterns from #{file}")
|
||||||
|
@pile.add_patterns_from_file(file)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@pattern.each do |pattern|
|
@pattern.each do |pattern|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue