mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
- the 'lgtm' config now passes validation.
- Added enum support to the config, like this: config :mykey => ["value1", "value2", ...] - Added simple value validation (:string, :number, :boolean) - Added value coersion so :number would actually set a proper value of type number in the parameter hash.
This commit is contained in:
parent
9b09652bdd
commit
99b182645b
15 changed files with 118 additions and 46 deletions
|
@ -18,12 +18,12 @@ input {
|
|||
output {
|
||||
amqp {
|
||||
host => "127.0.0.1"
|
||||
type => "queue"
|
||||
exchange_type => "queue"
|
||||
name => "rawlogs"
|
||||
}
|
||||
amqp {
|
||||
host => "127.0.0.1"
|
||||
type => "topic"
|
||||
exchange_type => "topic"
|
||||
name => "logsniff"
|
||||
}
|
||||
stdout { }
|
||||
|
|
|
@ -109,10 +109,14 @@ module LogStash::Config::Mixin
|
|||
@config.find do |config_key, config_val|
|
||||
if (config_key.is_a?(Regexp) && key =~ config_key) \
|
||||
|| (config_key.is_a?(String) && key == config_key)
|
||||
success, message = validate_value(value, config_val)
|
||||
if !success
|
||||
@logger.error("Failed #{@plugin_name}/#{key}: #{message}")
|
||||
success, result = validate_value(value, config_val)
|
||||
if success
|
||||
params[key] = result
|
||||
else
|
||||
@logger.error("Failed #{@plugin_name}/#{key}: #{result}")
|
||||
end
|
||||
|
||||
p "Result: #{key} #{result.inspect} / #{success}"
|
||||
is_valid &&= success
|
||||
end
|
||||
end # config.each
|
||||
|
@ -121,18 +125,69 @@ module LogStash::Config::Mixin
|
|||
return is_valid
|
||||
end # def validate_check_parameter_values
|
||||
|
||||
def validator_find(key)
|
||||
@config.each do |config_key, config_val|
|
||||
if (config_key.is_a?(Regexp) && key =~ config_key) \
|
||||
|| (config_key.is_a?(String) && key == config_key)
|
||||
return config_val
|
||||
end
|
||||
end # @config.each
|
||||
return nil
|
||||
end
|
||||
|
||||
def validate_value(value, validator)
|
||||
# Validator comes from the 'config' pieces of plugins.
|
||||
# They look like this
|
||||
# config :mykey => lambda do |value| ... end
|
||||
# (see LogStash::Inputs::File for example)
|
||||
result = nil
|
||||
|
||||
if validator.nil?
|
||||
return true
|
||||
elsif validator.is_a?(Proc)
|
||||
return validator.call(value)
|
||||
elsif validator.is_a?(Array)
|
||||
if value.size > 1
|
||||
return false, "Expected one of #{validator.inspect}, got #{value.inspect}"
|
||||
end
|
||||
|
||||
if !validator.include?(value.first)
|
||||
return false, "Expected one of #{validator.inspect}, got #{value.inspect}"
|
||||
end
|
||||
result = value.first
|
||||
elsif validator.is_a?(Symbol)
|
||||
# TODO(sissel): Factor this out into a coersion method?
|
||||
case validator
|
||||
when :string
|
||||
if value.size > 1 # only one value wanted
|
||||
return false, "Expected string, got #{value.inspect}"
|
||||
end
|
||||
result = value.first
|
||||
when :number
|
||||
if value.size > 1 # only one value wanted
|
||||
return false, "Expected number, got #{value.inspect}"
|
||||
end
|
||||
if value != value.to_i.to_s # Try convert to number
|
||||
return false, "Expected number, got #{value.inspect}"
|
||||
end
|
||||
result = value.first.to_i
|
||||
when :boolean
|
||||
if value.size > 1 # only one value wanted
|
||||
return false, "Expected boolean, got #{value.inspect}"
|
||||
end
|
||||
|
||||
if value.first !~ /^(true|false)$/
|
||||
return false, "Expected boolean 'true' or 'false', got #{value.inspect}"
|
||||
end
|
||||
|
||||
result = (value == "true")
|
||||
end # case validator
|
||||
else
|
||||
return false, "Unknown validator #{validator.class}"
|
||||
end
|
||||
|
||||
# Return the validator for later use, like with type coercion.
|
||||
return true, result
|
||||
end # def validate_value
|
||||
end # module LogStash::Config::DSL
|
||||
end # module LogStash::Config
|
||||
|
|
|
@ -8,11 +8,15 @@ class LogStash::Filters::Base
|
|||
attr_accessor :logger
|
||||
|
||||
config_name "filter"
|
||||
config :type => :string
|
||||
|
||||
public
|
||||
def initialize(config = {})
|
||||
def initialize(params)
|
||||
@logger = LogStash::Logger.new(STDERR)
|
||||
@config = config
|
||||
if !self.class.validate(params)
|
||||
@logger.error "Config validation failed."
|
||||
exit 1
|
||||
end
|
||||
end # def initialize
|
||||
|
||||
public
|
||||
|
|
|
@ -7,8 +7,8 @@ require "grok" # rubygem 'jls-grok'
|
|||
class LogStash::Filters::Grok < LogStash::Filters::Base
|
||||
|
||||
config_name "grok"
|
||||
config :pattern => :string
|
||||
config :patterns_dir => :path
|
||||
config :pattern => :array
|
||||
config :patterns_dir => :array
|
||||
config :drop_if_match => :boolean # googlecode/issue/26
|
||||
|
||||
public
|
||||
|
|
|
@ -4,15 +4,25 @@ require "logstash/namespace"
|
|||
require "mq" # rubygem 'amqp'
|
||||
require "uuidtools" # rubygem 'uuidtools'
|
||||
require "cgi"
|
||||
require "uri"
|
||||
|
||||
class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
||||
MQTYPES = [ "fanout", "queue", "topic" ]
|
||||
|
||||
config_name "amqp"
|
||||
config "pantscon" => :string #LogStash::Config::Path
|
||||
config "host" => (lambda do |value|
|
||||
# Use URI to validate.
|
||||
u = URI.parse("dummy:///")
|
||||
begin
|
||||
u.host = value
|
||||
rescue => e
|
||||
return false, "Invalid hostname #{value.inspect}"
|
||||
end
|
||||
return true
|
||||
) # config "host"
|
||||
|
||||
public
|
||||
def initialize(url, type, config={}, &block)
|
||||
def initialize(params)
|
||||
super
|
||||
|
||||
@mq = nil
|
||||
|
|
|
@ -5,10 +5,12 @@ require "logstash/namespace"
|
|||
class LogStash::Inputs::Beanstalk < LogStash::Inputs::Base
|
||||
|
||||
config_name "beanstalk"
|
||||
config :tube => nil # TODO(sissel): needs validation?
|
||||
|
||||
public
|
||||
def initialize(url, type, config={}, &block)
|
||||
def initialize(params)
|
||||
super
|
||||
raise "issue/17: needs refactor to support configfile"
|
||||
|
||||
if @url.path == "" or @url.path == "/"
|
||||
raise "must specify a tube for beanstalk output"
|
||||
|
|
|
@ -9,8 +9,9 @@ class LogStash::Inputs::Internal < LogStash::Inputs::Base
|
|||
config_name "internal"
|
||||
|
||||
public
|
||||
def initialize(url, type, config={}, &block)
|
||||
def initialize(params)
|
||||
super
|
||||
raise "issue/17: needs refactor to support configfile"
|
||||
|
||||
# Default host to the machine's hostname if it's not set
|
||||
@url.host ||= Socket.gethostname
|
||||
|
|
|
@ -7,8 +7,9 @@ class LogStash::Inputs::Stomp < LogStash::Inputs::Base
|
|||
config_name "stomp"
|
||||
|
||||
public
|
||||
def initialize(url, config={}, &block)
|
||||
def initialize(params)
|
||||
super
|
||||
raise "issue/17: needs refactor to support configfile"
|
||||
@logger.debug(["Connecting", { :url => @url }])
|
||||
end # def initialize
|
||||
|
||||
|
|
|
@ -9,6 +9,10 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
|
||||
config_name "syslog"
|
||||
|
||||
# The address to listen on
|
||||
config :address => nil # TODO(sissel): needs validation
|
||||
config :port => nil # TODO(sissel): needs validation
|
||||
|
||||
public
|
||||
def register
|
||||
if !@url.host or !@url.port
|
||||
|
|
|
@ -8,8 +8,9 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
config_name "tcp"
|
||||
|
||||
public
|
||||
def initialize(url, type, config={}, &block)
|
||||
def initialize(params)
|
||||
super
|
||||
raise "issue/17: needs refactor to support configfile"
|
||||
end # def initialize
|
||||
|
||||
public
|
||||
|
|
|
@ -7,6 +7,7 @@ require "logstash/namespace"
|
|||
class LogStash::Inputs::Twitter < LogStash::Inputs::Base
|
||||
|
||||
config_name "twitter"
|
||||
config :query => nil # TODO(sissel): Validation?
|
||||
|
||||
public
|
||||
def register
|
||||
|
|
|
@ -9,23 +9,17 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
|||
|
||||
config_name "amqp"
|
||||
config :host => :string
|
||||
config :queue_type => :string
|
||||
config :queue_name => :string
|
||||
config :exchange_type => :string
|
||||
config :name => :string
|
||||
config :vhost => :string
|
||||
|
||||
public
|
||||
def initialize(url, config={}, &block)
|
||||
def initialize(params)
|
||||
super
|
||||
|
||||
# Handle path /<vhost>/<type>/<name> or /<type>/<name>
|
||||
# vhost allowed to contain slashes
|
||||
if @url.path =~ %r{^/((.*)/)?([^/]+)/([^/]+)}
|
||||
unused, @vhost, @mqtype, @name = $~.captures
|
||||
else
|
||||
raise "amqp urls must have a path of /<type>/name or /vhost/<type>/name where <type> is #{MQTYPES.join(", ")}"
|
||||
end
|
||||
|
||||
if !MQTYPES.include?(@mqtype)
|
||||
raise "Invalid type '#{@mqtype}' must be one of #{MQTYPES.join(", ")}"
|
||||
p @exchange_type => MQTYPES
|
||||
if !MQTYPES.include?(@exchange_type)
|
||||
raise "Invalid exchange_type, #{@exchange_type.inspect}, must be one of #{MQTYPES.join(", ")}"
|
||||
end
|
||||
end # def initialize
|
||||
|
||||
|
@ -41,19 +35,19 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
|||
amqpsettings[:user] = @url.user if @url.user
|
||||
amqpsettings[:pass] = @url.password if @url.password
|
||||
amqpsettings[:logging] = query_args.include? "debug"
|
||||
@logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@mqtype.inspect} queue #{@name.inspect}")
|
||||
@logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@exchange_type.inspect} queue #{@name.inspect}")
|
||||
@amqp = AMQP.connect(amqpsettings)
|
||||
@mq = MQ.new(@amqp)
|
||||
@target = nil
|
||||
|
||||
case @mqtype
|
||||
case @exchange_type
|
||||
when "fanout"
|
||||
@target = @mq.fanout(@name)
|
||||
when "queue"
|
||||
@target = @mq.queue(@name, :durable => @urlopts["durable"] ? true : false)
|
||||
when "topic"
|
||||
@target = @mq.topic(@name)
|
||||
end # case @mqtype
|
||||
end # case @exchange_type
|
||||
end # def register
|
||||
|
||||
public
|
||||
|
|
|
@ -13,16 +13,18 @@ class LogStash::Outputs::Base
|
|||
config_name "output"
|
||||
|
||||
public
|
||||
def initialize(url)
|
||||
@url = url
|
||||
@url = URI.parse(url) if url.is_a? String
|
||||
def initialize(params)
|
||||
@logger = LogStash::Logger.new(STDOUT)
|
||||
@urlopts = {}
|
||||
if @url.query
|
||||
@urlopts = CGI.parse(@url.query)
|
||||
@urlopts.each do |k, v|
|
||||
@urlopts[k] = v.last if v.is_a?(Array)
|
||||
if !self.class.validate(params)
|
||||
@logger.error "Config validation failed."
|
||||
exit 1
|
||||
end
|
||||
|
||||
params.each do |key, value|
|
||||
validator = self.class.validator_find(key)
|
||||
#value = params[key]
|
||||
@logger.info("Setting: @#{key} = #{value.inspect}")
|
||||
self.instance_variable_set("@#{key}", value)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -5,9 +5,10 @@ require "em-jack"
|
|||
class LogStash::Outputs::Beanstalk < LogStash::Outputs::Base
|
||||
|
||||
config_name "beanstalk"
|
||||
config :ttr => :number
|
||||
|
||||
public
|
||||
def initialize(url, config={}, &block)
|
||||
def initialize(params)
|
||||
super
|
||||
|
||||
@ttr = @urlopts["ttr"] || 300;
|
||||
|
|
|
@ -10,10 +10,6 @@ class LogStash::Outputs::Stdout < LogStash::Outputs::Base
|
|||
def initialize(*args)
|
||||
super
|
||||
|
||||
@opts = {}
|
||||
if @url.path != "/"
|
||||
@opts = @url.path[1..-1].split(",")
|
||||
end
|
||||
end # def register
|
||||
|
||||
public
|
||||
|
@ -27,6 +23,6 @@ class LogStash::Outputs::Stdout < LogStash::Outputs::Base
|
|||
|
||||
public
|
||||
def debug?
|
||||
return @opts.member?("debug")
|
||||
return @debug
|
||||
end
|
||||
end # class LogStash::Outputs::Stdout
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue