diff --git a/etc/agent.lgtm.conf b/etc/agent.lgtm.conf index 5becab74b..a15b40504 100644 --- a/etc/agent.lgtm.conf +++ b/etc/agent.lgtm.conf @@ -18,12 +18,12 @@ input { output { amqp { host => "127.0.0.1" - type => "queue" + exchange_type => "queue" name => "rawlogs" } amqp { host => "127.0.0.1" - type => "topic" + exchange_type => "topic" name => "logsniff" } stdout { } diff --git a/lib/logstash/config/mixin.rb b/lib/logstash/config/mixin.rb index 131a9bed4..823df9947 100644 --- a/lib/logstash/config/mixin.rb +++ b/lib/logstash/config/mixin.rb @@ -109,10 +109,14 @@ module LogStash::Config::Mixin @config.find do |config_key, config_val| if (config_key.is_a?(Regexp) && key =~ config_key) \ || (config_key.is_a?(String) && key == config_key) - success, message = validate_value(value, config_val) - if !success - @logger.error("Failed #{@plugin_name}/#{key}: #{message}") + success, result = validate_value(value, config_val) + if success + params[key] = result + else + @logger.error("Failed #{@plugin_name}/#{key}: #{result}") end + + p "Result: #{key} #{result.inspect} / #{success}" is_valid &&= success end end # config.each @@ -121,18 +125,69 @@ module LogStash::Config::Mixin return is_valid end # def validate_check_parameter_values + def validator_find(key) + @config.each do |config_key, config_val| + if (config_key.is_a?(Regexp) && key =~ config_key) \ + || (config_key.is_a?(String) && key == config_key) + return config_val + end + end # @config.each + return nil + end + def validate_value(value, validator) # Validator comes from the 'config' pieces of plugins. # They look like this # config :mykey => lambda do |value| ... end # (see LogStash::Inputs::File for example) + result = nil + if validator.nil? return true elsif validator.is_a?(Proc) return validator.call(value) + elsif validator.is_a?(Array) + if value.size > 1 + return false, "Expected one of #{validator.inspect}, got #{value.inspect}" + end + + if !validator.include?(value.first) + return false, "Expected one of #{validator.inspect}, got #{value.inspect}" + end + result = value.first + elsif validator.is_a?(Symbol) + # TODO(sissel): Factor this out into a coersion method? + case validator + when :string + if value.size > 1 # only one value wanted + return false, "Expected string, got #{value.inspect}" + end + result = value.first + when :number + if value.size > 1 # only one value wanted + return false, "Expected number, got #{value.inspect}" + end + if value != value.to_i.to_s # Try convert to number + return false, "Expected number, got #{value.inspect}" + end + result = value.first.to_i + when :boolean + if value.size > 1 # only one value wanted + return false, "Expected boolean, got #{value.inspect}" + end + + if value.first !~ /^(true|false)$/ + return false, "Expected boolean 'true' or 'false', got #{value.inspect}" + end + + result = (value == "true") + end # case validator else return false, "Unknown validator #{validator.class}" end + + # Return the validator for later use, like with type coercion. + return true, result end # def validate_value end # module LogStash::Config::DSL end # module LogStash::Config diff --git a/lib/logstash/filters/base.rb b/lib/logstash/filters/base.rb index 9ab1c3864..191cae8a6 100644 --- a/lib/logstash/filters/base.rb +++ b/lib/logstash/filters/base.rb @@ -8,11 +8,15 @@ class LogStash::Filters::Base attr_accessor :logger config_name "filter" + config :type => :string public - def initialize(config = {}) + def initialize(params) @logger = LogStash::Logger.new(STDERR) - @config = config + if !self.class.validate(params) + @logger.error "Config validation failed." + exit 1 + end end # def initialize public diff --git a/lib/logstash/filters/grok.rb b/lib/logstash/filters/grok.rb index 9117c2fb7..080e9639b 100644 --- a/lib/logstash/filters/grok.rb +++ b/lib/logstash/filters/grok.rb @@ -7,8 +7,8 @@ require "grok" # rubygem 'jls-grok' class LogStash::Filters::Grok < LogStash::Filters::Base config_name "grok" - config :pattern => :string - config :patterns_dir => :path + config :pattern => :array + config :patterns_dir => :array config :drop_if_match => :boolean # googlecode/issue/26 public diff --git a/lib/logstash/inputs/amqp.rb b/lib/logstash/inputs/amqp.rb index 6b7acdb0f..0437aad3c 100644 --- a/lib/logstash/inputs/amqp.rb +++ b/lib/logstash/inputs/amqp.rb @@ -4,15 +4,25 @@ require "logstash/namespace" require "mq" # rubygem 'amqp' require "uuidtools" # rubygem 'uuidtools' require "cgi" +require "uri" class LogStash::Inputs::Amqp < LogStash::Inputs::Base MQTYPES = [ "fanout", "queue", "topic" ] config_name "amqp" - config "pantscon" => :string #LogStash::Config::Path + config "host" => (lambda do |value| + # Use URI to validate. + u = URI.parse("dummy:///") + begin + u.host = value + rescue => e + return false, "Invalid hostname #{value.inspect}" + end + return true + ) # config "host" public - def initialize(url, type, config={}, &block) + def initialize(params) super @mq = nil diff --git a/lib/logstash/inputs/beanstalk.rb b/lib/logstash/inputs/beanstalk.rb index 87ed957ba..c251e1a39 100644 --- a/lib/logstash/inputs/beanstalk.rb +++ b/lib/logstash/inputs/beanstalk.rb @@ -5,10 +5,12 @@ require "logstash/namespace" class LogStash::Inputs::Beanstalk < LogStash::Inputs::Base config_name "beanstalk" + config :tube => nil # TODO(sissel): needs validation? public - def initialize(url, type, config={}, &block) + def initialize(params) super + raise "issue/17: needs refactor to support configfile" if @url.path == "" or @url.path == "/" raise "must specify a tube for beanstalk output" diff --git a/lib/logstash/inputs/internal.rb b/lib/logstash/inputs/internal.rb index 44b5244be..a109ac558 100644 --- a/lib/logstash/inputs/internal.rb +++ b/lib/logstash/inputs/internal.rb @@ -9,8 +9,9 @@ class LogStash::Inputs::Internal < LogStash::Inputs::Base config_name "internal" public - def initialize(url, type, config={}, &block) + def initialize(params) super + raise "issue/17: needs refactor to support configfile" # Default host to the machine's hostname if it's not set @url.host ||= Socket.gethostname diff --git a/lib/logstash/inputs/stomp.rb b/lib/logstash/inputs/stomp.rb index e0403760e..7c3b52011 100644 --- a/lib/logstash/inputs/stomp.rb +++ b/lib/logstash/inputs/stomp.rb @@ -7,8 +7,9 @@ class LogStash::Inputs::Stomp < LogStash::Inputs::Base config_name "stomp" public - def initialize(url, config={}, &block) + def initialize(params) super + raise "issue/17: needs refactor to support configfile" @logger.debug(["Connecting", { :url => @url }]) end # def initialize diff --git a/lib/logstash/inputs/syslog.rb b/lib/logstash/inputs/syslog.rb index 105a24e04..934dbcd6d 100644 --- a/lib/logstash/inputs/syslog.rb +++ b/lib/logstash/inputs/syslog.rb @@ -9,6 +9,10 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base config_name "syslog" + # The address to listen on + config :address => nil # TODO(sissel): needs validation + config :port => nil # TODO(sissel): needs validation + public def register if !@url.host or !@url.port diff --git a/lib/logstash/inputs/tcp.rb b/lib/logstash/inputs/tcp.rb index 37d71c300..7802055de 100644 --- a/lib/logstash/inputs/tcp.rb +++ b/lib/logstash/inputs/tcp.rb @@ -8,8 +8,9 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base config_name "tcp" public - def initialize(url, type, config={}, &block) + def initialize(params) super + raise "issue/17: needs refactor to support configfile" end # def initialize public diff --git a/lib/logstash/inputs/twitter.rb b/lib/logstash/inputs/twitter.rb index 8e6ed6b1a..a8d8d1c85 100644 --- a/lib/logstash/inputs/twitter.rb +++ b/lib/logstash/inputs/twitter.rb @@ -7,6 +7,7 @@ require "logstash/namespace" class LogStash::Inputs::Twitter < LogStash::Inputs::Base config_name "twitter" + config :query => nil # TODO(sissel): Validation? public def register diff --git a/lib/logstash/outputs/amqp.rb b/lib/logstash/outputs/amqp.rb index 4de6b0e7d..6eca58388 100644 --- a/lib/logstash/outputs/amqp.rb +++ b/lib/logstash/outputs/amqp.rb @@ -9,23 +9,17 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base config_name "amqp" config :host => :string - config :queue_type => :string - config :queue_name => :string + config :exchange_type => :string + config :name => :string + config :vhost => :string public - def initialize(url, config={}, &block) + def initialize(params) super - # Handle path /// or // - # vhost allowed to contain slashes - if @url.path =~ %r{^/((.*)/)?([^/]+)/([^/]+)} - unused, @vhost, @mqtype, @name = $~.captures - else - raise "amqp urls must have a path of //name or /vhost//name where is #{MQTYPES.join(", ")}" - end - - if !MQTYPES.include?(@mqtype) - raise "Invalid type '#{@mqtype}' must be one of #{MQTYPES.join(", ")}" + p @exchange_type => MQTYPES + if !MQTYPES.include?(@exchange_type) + raise "Invalid exchange_type, #{@exchange_type.inspect}, must be one of #{MQTYPES.join(", ")}" end end # def initialize @@ -41,19 +35,19 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base amqpsettings[:user] = @url.user if @url.user amqpsettings[:pass] = @url.password if @url.password amqpsettings[:logging] = query_args.include? "debug" - @logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@mqtype.inspect} queue #{@name.inspect}") + @logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@exchange_type.inspect} queue #{@name.inspect}") @amqp = AMQP.connect(amqpsettings) @mq = MQ.new(@amqp) @target = nil - case @mqtype + case @exchange_type when "fanout" @target = @mq.fanout(@name) when "queue" @target = @mq.queue(@name, :durable => @urlopts["durable"] ? true : false) when "topic" @target = @mq.topic(@name) - end # case @mqtype + end # case @exchange_type end # def register public diff --git a/lib/logstash/outputs/base.rb b/lib/logstash/outputs/base.rb index c9fbb735a..15a87b80a 100644 --- a/lib/logstash/outputs/base.rb +++ b/lib/logstash/outputs/base.rb @@ -13,16 +13,18 @@ class LogStash::Outputs::Base config_name "output" public - def initialize(url) - @url = url - @url = URI.parse(url) if url.is_a? String + def initialize(params) @logger = LogStash::Logger.new(STDOUT) - @urlopts = {} - if @url.query - @urlopts = CGI.parse(@url.query) - @urlopts.each do |k, v| - @urlopts[k] = v.last if v.is_a?(Array) - end + if !self.class.validate(params) + @logger.error "Config validation failed." + exit 1 + end + + params.each do |key, value| + validator = self.class.validator_find(key) + #value = params[key] + @logger.info("Setting: @#{key} = #{value.inspect}") + self.instance_variable_set("@#{key}", value) end end diff --git a/lib/logstash/outputs/beanstalk.rb b/lib/logstash/outputs/beanstalk.rb index 5681d4db8..3ea691efb 100644 --- a/lib/logstash/outputs/beanstalk.rb +++ b/lib/logstash/outputs/beanstalk.rb @@ -5,9 +5,10 @@ require "em-jack" class LogStash::Outputs::Beanstalk < LogStash::Outputs::Base config_name "beanstalk" + config :ttr => :number public - def initialize(url, config={}, &block) + def initialize(params) super @ttr = @urlopts["ttr"] || 300; diff --git a/lib/logstash/outputs/stdout.rb b/lib/logstash/outputs/stdout.rb index bb339dc4b..a27b74001 100644 --- a/lib/logstash/outputs/stdout.rb +++ b/lib/logstash/outputs/stdout.rb @@ -10,10 +10,6 @@ class LogStash::Outputs::Stdout < LogStash::Outputs::Base def initialize(*args) super - @opts = {} - if @url.path != "/" - @opts = @url.path[1..-1].split(",") - end end # def register public @@ -27,6 +23,6 @@ class LogStash::Outputs::Stdout < LogStash::Outputs::Base public def debug? - return @opts.member?("debug") + return @debug end end # class LogStash::Outputs::Stdout