mirror of
https://github.com/elastic/logstash.git
synced 2025-04-25 07:07:54 -04:00
introduce log types, in addition to tags
This commit is contained in:
parent
7e80e89c39
commit
8462bd05fa
9 changed files with 67 additions and 72 deletions
|
@ -5,29 +5,29 @@
|
||||||
#
|
#
|
||||||
inputs:
|
inputs:
|
||||||
# Give a list of inputs. Tag them for easy query/filter later.
|
# Give a list of inputs. Tag them for easy query/filter later.
|
||||||
linux-syslog: # this is the 'linux-syslog' tag
|
linux-syslog: # this is the 'linux-syslog' type
|
||||||
- /var/log/messages # watch /var/log/messages (uses eventmachine-tail)
|
- /var/log/messages # watch /var/log/messages (uses eventmachine-tail)
|
||||||
- /var/log/kern.log
|
- /var/log/kern.log
|
||||||
- /var/log/auth.log
|
- /var/log/auth.log
|
||||||
- /var/log/user.log
|
- /var/log/user.log
|
||||||
apache-access: # similar, different tag.
|
apache-access: # similar, different type.
|
||||||
- /var/log/apache2/access.log
|
- /var/log/apache2/access.log
|
||||||
- /b/access
|
- /b/access
|
||||||
apache-error:
|
apache-error:
|
||||||
- /var/log/apache2/error.log
|
- /var/log/apache2/error.log
|
||||||
filters:
|
filters:
|
||||||
- grok:
|
- grok:
|
||||||
linux-syslog: # for logs tagged 'linux-syslog'
|
linux-syslog: # for logs of type 'linux-syslog'
|
||||||
patterns:
|
patterns:
|
||||||
- %{SYSLOGLINE}
|
- %{SYSLOGLINE}
|
||||||
apache-access: # for logs tagged 'apache-error'
|
apache-access: # for logs of type 'apache-error'
|
||||||
patterns:
|
patterns:
|
||||||
- %{COMBINEDAPACHELOG}
|
- %{COMBINEDAPACHELOG}
|
||||||
- date:
|
- date:
|
||||||
linux-syslog: # for logs tagged 'linux-syslog'
|
linux-syslog: # for logs of type 'linux-syslog'
|
||||||
# Look for a field 'timestamp' with this format, parse and it for the timestamp
|
# Look for a field 'timestamp' with this format, parse and it for the timestamp
|
||||||
# This field comes from the SYSLOGLINE pattern
|
# This field comes from the SYSLOGLINE pattern
|
||||||
timestamp: %b %e %H:%M:%S
|
timestamp: "%b %e %H:%M:%S"
|
||||||
apache-access:
|
apache-access:
|
||||||
timestamp: "%d/%b/%Y:%H:%M:%S %Z"
|
timestamp: "%d/%b/%Y:%H:%M:%S %Z"
|
||||||
outputs:
|
outputs:
|
||||||
|
|
|
@ -35,21 +35,19 @@ class LogStash::Agent
|
||||||
if @config.include?("inputs")
|
if @config.include?("inputs")
|
||||||
inputs = @config["inputs"]
|
inputs = @config["inputs"]
|
||||||
inputs.each do |value|
|
inputs.each do |value|
|
||||||
# If 'url' is an array, then inputs is a hash and the key is a tag
|
# If 'url' is an array, then inputs is a hash and the key is the type
|
||||||
if inputs.is_a?(Hash)
|
if inputs.is_a?(Hash)
|
||||||
tag, urls = value
|
type, urls = value
|
||||||
else
|
else
|
||||||
tag = nil
|
raise "config error, no type for url #{urls.inspect}"
|
||||||
urls = value
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# url could be a string or an array.
|
# url could be a string or an array.
|
||||||
urls = [urls] if !urls.is_a?(Array)
|
urls = [urls] if !urls.is_a?(Array)
|
||||||
|
|
||||||
urls.each do |url|
|
urls.each do |url|
|
||||||
@logger.debug("Using input #{url} with tag #{tag}")
|
@logger.debug("Using input #{url} of type #{type}")
|
||||||
input = LogStash::Inputs.from_url(url) { |event| receive(event) }
|
input = LogStash::Inputs.from_url(url, type) { |event| receive(event) }
|
||||||
input.tag(tag) if tag
|
|
||||||
input.register
|
input.register
|
||||||
@inputs << input
|
@inputs << input
|
||||||
end
|
end
|
||||||
|
|
|
@ -7,6 +7,7 @@ module LogStash; class Event
|
||||||
@cancelled = false
|
@cancelled = false
|
||||||
@data = {
|
@data = {
|
||||||
"@source" => "unknown",
|
"@source" => "unknown",
|
||||||
|
"@type" => nil,
|
||||||
"@tags" => [],
|
"@tags" => [],
|
||||||
"@fields" => {},
|
"@fields" => {},
|
||||||
}.merge(data)
|
}.merge(data)
|
||||||
|
@ -38,6 +39,8 @@ module LogStash; class Event
|
||||||
def source=(val); @data["@source"] = val; end # def source=
|
def source=(val); @data["@source"] = val; end # def source=
|
||||||
def message; @data["@message"]; end # def message
|
def message; @data["@message"]; end # def message
|
||||||
def message=; @data["@message"] = val; end # def message=
|
def message=; @data["@message"] = val; end # def message=
|
||||||
|
def type; @data["@type"]; end # def type
|
||||||
|
def type=(val); @data["@type"] = val; end # def type=
|
||||||
def tags; @data["@tags"]; end # def tags
|
def tags; @data["@tags"]; end # def tags
|
||||||
|
|
||||||
# field-related access
|
# field-related access
|
||||||
|
|
|
@ -10,48 +10,45 @@ class LogStash::Filters::Date < LogStash::Filters::Base
|
||||||
#
|
#
|
||||||
# filters:
|
# filters:
|
||||||
# date:
|
# date:
|
||||||
# <tagname>:
|
# <type>:
|
||||||
# <fieldname>: <format>
|
# <fieldname>: <format>
|
||||||
# <tagname2>
|
# <type>
|
||||||
# <fieldname>: <format>
|
# <fieldname>: <format>
|
||||||
#
|
#
|
||||||
# The format is whatever is supported by Ruby's DateTime.strptime
|
# The format is whatever is supported by Ruby's DateTime.strptime
|
||||||
def initialize(config = {})
|
def initialize(config = {})
|
||||||
super
|
super
|
||||||
|
|
||||||
@tags = Hash.new { |h,k| h[k] = [] }
|
@types = Hash.new { |h,k| h[k] = [] }
|
||||||
end # def initialize
|
end # def initialize
|
||||||
|
|
||||||
def register
|
def register
|
||||||
@config.each do |tag, tagconfig|
|
@config.each do |type, typeconfig|
|
||||||
@tags[tag] << tagconfig
|
@logger.debug "Setting type #{type.inspect} to the config #{typeconfig.inspect}"
|
||||||
|
raise "date filter type \"#{type}\" defined more than once" unless @types[type].empty?
|
||||||
|
@types[type] = typeconfig
|
||||||
end # @config.each
|
end # @config.each
|
||||||
end # def register
|
end # def register
|
||||||
|
|
||||||
def filter(event)
|
def filter(event)
|
||||||
# TODO(sissel): crazy deep nesting here, refactor/redesign.
|
@logger.debug "DATE FILTER: received event of type #{event.type}"
|
||||||
return if event.tags.empty?
|
return unless @types.member?(event.type)
|
||||||
event.tags.each do |tag|
|
@types[event.type].each do |field, format|
|
||||||
next unless @tags.include?(tag)
|
@logger.debug "DATE FILTER: type #{event.type}, looking for field #{field.inspect} with format #{format.inspect}"
|
||||||
@tags[tag].each do |tagconfig|
|
|
||||||
tagconfig.each do |field, format|
|
|
||||||
# TODO(sissel): check event.message, too.
|
# TODO(sissel): check event.message, too.
|
||||||
if (event.fields.include?(field) rescue false)
|
if event.fields.member?(field)
|
||||||
fieldvalue = event.fields[field]
|
fieldvalue = event.fields[field]
|
||||||
fieldvalue = [fieldvalue] if fieldvalue.is_a?(String)
|
fieldvalue = [fieldvalue] if fieldvalue.is_a?(String)
|
||||||
fieldvalue.each do |value|
|
fieldvalue.each do |value|
|
||||||
#value = event["fields"][field]
|
|
||||||
begin
|
begin
|
||||||
time = DateTime.strptime(value, format)
|
time = DateTime.strptime(value, format)
|
||||||
event.timestamp = LogStash::Time.to_iso8601(time)
|
event.timestamp = LogStash::Time.to_iso8601(time)
|
||||||
@logger.debug "Parsed #{value.inspect} as #{event.timestamp}"
|
@logger.debug "Parsed #{value.inspect} as #{event.timestamp}"
|
||||||
rescue => e
|
rescue
|
||||||
@logger.warn "Failed parsing date #{value.inspect} from field #{field} with format #{format.inspect}. Exception: #{e}"
|
@logger.warn "Failed parsing date #{value.inspect} from field #{field} with format #{format.inspect}: #{$!}"
|
||||||
end
|
end
|
||||||
end # fieldvalue.each
|
end # fieldvalue.each
|
||||||
end # if this event has a field we expect to be a timestamp
|
end # if this event has a field we expect to be a timestamp
|
||||||
end # tagconfig.each
|
end # @types[event.type].each
|
||||||
end # @tags[tag].each
|
|
||||||
end # event.tags.each
|
|
||||||
end # def filter
|
end # def filter
|
||||||
end # class LogStash::Filters::Date
|
end # class LogStash::Filters::Date
|
||||||
|
|
|
@ -12,15 +12,15 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
|
||||||
|
|
||||||
def register
|
def register
|
||||||
# TODO(sissel): Make patterns files come from the config
|
# TODO(sissel): Make patterns files come from the config
|
||||||
@config.each do |tag, tagconfig|
|
@config.each do |type, typeconfig|
|
||||||
@logger.debug("Registering tag with grok: #{tag}")
|
@logger.debug("Registering type with grok: #{type}")
|
||||||
pile = Grok::Pile.new
|
pile = Grok::Pile.new
|
||||||
pile.add_patterns_from_file("patterns/grok-patterns")
|
pile.add_patterns_from_file("patterns/grok-patterns")
|
||||||
pile.add_patterns_from_file("patterns/linux-syslog")
|
pile.add_patterns_from_file("patterns/linux-syslog")
|
||||||
tagconfig["patterns"].each do |pattern|
|
typeconfig["patterns"].each do |pattern|
|
||||||
pile.compile(pattern)
|
pile.compile(pattern)
|
||||||
end
|
end
|
||||||
@grokpiles[tag] = pile
|
@grokpiles[type] = pile
|
||||||
end # @config.each
|
end # @config.each
|
||||||
end # def register
|
end # def register
|
||||||
|
|
||||||
|
@ -29,20 +29,14 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
|
||||||
message = event.message
|
message = event.message
|
||||||
match = false
|
match = false
|
||||||
|
|
||||||
if !event.tags.empty?
|
if event.type
|
||||||
event.tags.each do |tag|
|
if @grokpiles.include?(event.type)
|
||||||
if @grokpiles.include?(tag)
|
pile = @grokpiles[event.type]
|
||||||
pile = @grokpiles[tag]
|
|
||||||
grok, match = pile.match(message)
|
grok, match = pile.match(message)
|
||||||
break if match
|
end # @grokpiles.include?(event.type)
|
||||||
end # @grokpiles.include?(tag)
|
|
||||||
end # event.tags.each
|
|
||||||
else
|
|
||||||
# TODO(2.0): support grok pattern discovery
|
# TODO(2.0): support grok pattern discovery
|
||||||
#pattern = @grok.discover(message)
|
else
|
||||||
#@grok.compile(pattern)
|
@logger.info("Unknown type for #{event.source} (type: #{event.type})")
|
||||||
#match = @grok.match(message)
|
|
||||||
@logger.info("No known tag for #{event.source} (tags: #{event.tags.inspect})")
|
|
||||||
@logger.debug(event.to_hash)
|
@logger.debug(event.to_hash)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,7 @@ require "logstash/namespace"
|
||||||
require "uri"
|
require "uri"
|
||||||
|
|
||||||
module LogStash::Inputs
|
module LogStash::Inputs
|
||||||
def self.from_url(url, &block)
|
def self.from_url(url, type, &block)
|
||||||
# Assume file paths if we start with "/"
|
# Assume file paths if we start with "/"
|
||||||
url = "file://#{url}" if url.start_with?("/")
|
url = "file://#{url}" if url.start_with?("/")
|
||||||
|
|
||||||
|
@ -13,6 +13,6 @@ module LogStash::Inputs
|
||||||
klass = uri.scheme.capitalize
|
klass = uri.scheme.capitalize
|
||||||
file = uri.scheme
|
file = uri.scheme
|
||||||
require "logstash/inputs/#{file}"
|
require "logstash/inputs/#{file}"
|
||||||
LogStash::Inputs.const_get(klass).new(uri, &block)
|
LogStash::Inputs.const_get(klass).new(uri, type, &block)
|
||||||
end # def from_url
|
end # def from_url
|
||||||
end # module LogStash::Inputs
|
end # module LogStash::Inputs
|
||||||
|
|
|
@ -4,21 +4,21 @@ require "mq" # rubygem 'amqp'
|
||||||
require "uuidtools" # rubygem 'uuidtools'
|
require "uuidtools" # rubygem 'uuidtools'
|
||||||
|
|
||||||
class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
||||||
TYPES = [ "fanout", "queue", "topic" ]
|
MQTYPES = [ "fanout", "queue", "topic" ]
|
||||||
|
|
||||||
def initialize(url, config={}, &block)
|
def initialize(url, type, config={}, &block)
|
||||||
super
|
super
|
||||||
|
|
||||||
@mq = nil
|
@mq = nil
|
||||||
|
|
||||||
# Handle path /<type>/<name>
|
# Handle path /<type>/<name>
|
||||||
unused, @type, @name = @url.path.split("/", 3)
|
unused, @mqtype, @name = @url.path.split("/", 3)
|
||||||
if @type == nil or @name == nil
|
if @mqtype == nil or @name == nil
|
||||||
raise "amqp urls must have a path of /<type>/name where <type> is #{TYPES.join(", ")}"
|
raise "amqp urls must have a path of /<type>/name where <type> is #{MQTYPES.join(", ")}"
|
||||||
end
|
end
|
||||||
|
|
||||||
if !TYPES.include?(@type)
|
if !MQTYPES.include?(@mqtype)
|
||||||
raise "Invalid type '#{@type}' must be one of #{TYPES.JOIN(", ")}"
|
raise "Invalid type '#{@mqtype}' must be one of #{MQTYPES.JOIN(", ")}"
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -28,7 +28,7 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
||||||
@target = nil
|
@target = nil
|
||||||
|
|
||||||
@target = @mq.queue(UUIDTools::UUID.timestamp_create)
|
@target = @mq.queue(UUIDTools::UUID.timestamp_create)
|
||||||
case @type
|
case @mqtype
|
||||||
when "fanout"
|
when "fanout"
|
||||||
#@target.bind(MQ.fanout(@url.path, :durable => true))
|
#@target.bind(MQ.fanout(@url.path, :durable => true))
|
||||||
@target.bind(MQ.fanout(@url.path))
|
@target.bind(MQ.fanout(@url.path))
|
||||||
|
@ -36,7 +36,7 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
||||||
@target.bind(MQ.direct(@url.path))
|
@target.bind(MQ.direct(@url.path))
|
||||||
when "topic"
|
when "topic"
|
||||||
@target.bind(MQ.topic(@url.path))
|
@target.bind(MQ.topic(@url.path))
|
||||||
end # case @type
|
end # case @mqtype
|
||||||
|
|
||||||
@target.subscribe(:ack => true) do |header, message|
|
@target.subscribe(:ack => true) do |header, message|
|
||||||
event = LogStash::Event.from_json(message)
|
event = LogStash::Event.from_json(message)
|
||||||
|
|
|
@ -4,12 +4,13 @@ require "logstash/logging"
|
||||||
require "uri"
|
require "uri"
|
||||||
|
|
||||||
class LogStash::Inputs::Base
|
class LogStash::Inputs::Base
|
||||||
def initialize(url, config={}, &block)
|
def initialize(url, type, config={}, &block)
|
||||||
@logger = LogStash::Logger.new(STDERR)
|
@logger = LogStash::Logger.new(STDERR)
|
||||||
@url = url
|
@url = url
|
||||||
@url = URI.parse(url) if url.is_a? String
|
@url = URI.parse(url) if url.is_a? String
|
||||||
@config = config
|
@config = config
|
||||||
@callback = block
|
@callback = block
|
||||||
|
@type = type
|
||||||
@tags = []
|
@tags = []
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -22,6 +23,7 @@ class LogStash::Inputs::Base
|
||||||
end
|
end
|
||||||
|
|
||||||
def receive(event)
|
def receive(event)
|
||||||
|
event.type = @type
|
||||||
event.tags |= @tags # set union
|
event.tags |= @tags # set union
|
||||||
@callback.call(event)
|
@callback.call(event)
|
||||||
end
|
end
|
||||||
|
|
|
@ -3,7 +3,7 @@ require "eventmachine-tail"
|
||||||
require "socket" # for Socket.gethostname
|
require "socket" # for Socket.gethostname
|
||||||
|
|
||||||
class LogStash::Inputs::File < LogStash::Inputs::Base
|
class LogStash::Inputs::File < LogStash::Inputs::Base
|
||||||
def initialize(url, config={}, &block)
|
def initialize(url, type, config={}, &block)
|
||||||
super
|
super
|
||||||
|
|
||||||
# Hack the hostname into the url.
|
# Hack the hostname into the url.
|
||||||
|
@ -20,6 +20,7 @@ class LogStash::Inputs::File < LogStash::Inputs::Base
|
||||||
event = LogStash::Event.new({
|
event = LogStash::Event.new({
|
||||||
"@source" => @url.to_s,
|
"@source" => @url.to_s,
|
||||||
"@message" => event,
|
"@message" => event,
|
||||||
|
"@type" => @type,
|
||||||
"@tags" => @tags.clone,
|
"@tags" => @tags.clone,
|
||||||
})
|
})
|
||||||
@callback.call(event)
|
@callback.call(event)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue