mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
- Move everything into the logstash subdir
This commit is contained in:
parent
ae50ca1d82
commit
d5d532cbc6
16 changed files with 176 additions and 50 deletions
|
@ -1,37 +0,0 @@
|
|||
require "logstash/namespace"
|
||||
require "grok" # rubygem 'grok'
|
||||
|
||||
class LogStash::Filters::Grok
|
||||
def initialize(config = {})
|
||||
@config = config
|
||||
@grok = Grok.new
|
||||
end # def initialize
|
||||
|
||||
def register
|
||||
@grok.add_patterns_from_file("patterns/grok-patterns")
|
||||
end
|
||||
|
||||
def filter(event)
|
||||
# parse it with grok
|
||||
message = event.message
|
||||
pattern = @grok.discover(message)
|
||||
@grok.compile(pattern)
|
||||
match = @grok.match(message)
|
||||
match.each_capture do |key, value|
|
||||
if key.include?(":")
|
||||
key = key.split(":")[1]
|
||||
end
|
||||
|
||||
if event[key].is_a? String
|
||||
event[key] = [event[key]]
|
||||
elsif event[key] == nil
|
||||
event[key] = []
|
||||
end
|
||||
|
||||
event[key] << value
|
||||
end
|
||||
|
||||
# TODO(sissel): Flatten single-entry arrays into a single value?
|
||||
return event
|
||||
end
|
||||
end # class LogStash::Filters::Grok
|
|
@ -25,25 +25,51 @@ class LogStash::Agent
|
|||
# stomp, etc).
|
||||
protected
|
||||
def register
|
||||
# TODO(sissel): warn when no inputs and no outputs are defined.
|
||||
# TODO(sissel): Refactor this madness into a config lib
|
||||
|
||||
# Register input and output stuff
|
||||
if @config.include?("input")
|
||||
@config["input"].each do |url|
|
||||
input = LogStash::Inputs.from_url(url) { |event| receive(event) }
|
||||
input.register
|
||||
@inputs << input
|
||||
if @config.include?("inputs")
|
||||
inputs = @config["inputs"]
|
||||
inputs.each do |value|
|
||||
# If 'url' is an array, then inputs is a hash and the key is a tag
|
||||
if inputs.is_a?(Hash)
|
||||
tag, urls = value
|
||||
else
|
||||
tag = nil
|
||||
urls = value
|
||||
end
|
||||
|
||||
# url could be a string or an array.
|
||||
urls = [urls] if !urls.is_a?(Array)
|
||||
|
||||
urls.each do |url|
|
||||
input = LogStash::Inputs.from_url(url) { |event| receive(event) }
|
||||
input.tag(tag) if tag
|
||||
input.register
|
||||
@inputs << input
|
||||
end
|
||||
end # each input
|
||||
end
|
||||
|
||||
if @config.include?("filter")
|
||||
@config["filter"].each do |name|
|
||||
filter = LogStash::Filters.from_name(name)
|
||||
if @config.include?("filters")
|
||||
filters = @config["filters"]
|
||||
filters.each do |value|
|
||||
# If value is an array, then "filters" is a hash.
|
||||
if filters.is_a?(Hash)
|
||||
name, filterconfig = value
|
||||
else
|
||||
name = value
|
||||
filterconfig = {}
|
||||
end
|
||||
filter = LogStash::Filters.from_name(name, filterconfig)
|
||||
filter.register
|
||||
@filters << filter
|
||||
end # each filter
|
||||
end
|
||||
|
||||
if @config.include?("output")
|
||||
@config["output"].each do |url|
|
||||
if @config.include?("outputs")
|
||||
@config["outputs"].each do |url|
|
||||
output = LogStash::Outputs.from_url(url)
|
||||
output.register
|
||||
@outputs << output
|
||||
|
@ -63,6 +89,9 @@ class LogStash::Agent
|
|||
@filters.each do |f|
|
||||
# TODO(sissel): Add ability for a filter to cancel/drop a message
|
||||
f.filter(event)
|
||||
if event.cancelled?
|
||||
break
|
||||
end
|
||||
end
|
||||
end # def filter
|
||||
|
||||
|
@ -77,6 +106,9 @@ class LogStash::Agent
|
|||
# Process a message
|
||||
def receive(event)
|
||||
filter(event)
|
||||
output(event)
|
||||
|
||||
if !event.cancelled?
|
||||
output(event)
|
||||
end
|
||||
end # def input
|
||||
end # class LogStash::Components::Agent
|
|
@ -4,6 +4,7 @@ require "logstash/time"
|
|||
# General event type. Will expand this in the future.
|
||||
module LogStash; class Event
|
||||
def initialize(data)
|
||||
@cancelled = false
|
||||
@data = data
|
||||
if !@data.include?(:received_timestamp)
|
||||
@data[:received_timestamp] = LogStash::Time.now.utc.to_iso8601
|
||||
|
@ -18,6 +19,14 @@ module LogStash; class Event
|
|||
return @data.to_json
|
||||
end
|
||||
|
||||
def cancel
|
||||
@cancelled = true
|
||||
end
|
||||
|
||||
def cancelled?
|
||||
return @cancelled
|
||||
end
|
||||
|
||||
def to_s
|
||||
#require "ap" rescue nil
|
||||
#if @data.respond_to?(:awesome_inspect)
|
||||
|
@ -51,4 +60,8 @@ module LogStash; class Event
|
|||
def to_hash
|
||||
return @data
|
||||
end # def to_hash
|
||||
|
||||
def include?(key)
|
||||
return @data.include?(key)
|
||||
end
|
||||
end; end # class LogStash::Event
|
|
@ -2,7 +2,7 @@
|
|||
require "logstash/namespace"
|
||||
|
||||
module LogStash::Filters
|
||||
def self.from_name(name)
|
||||
def self.from_name(name, *args)
|
||||
# TODO(sissel): Add error handling
|
||||
# TODO(sissel): Allow plugin paths
|
||||
klass = name.capitalize
|
||||
|
@ -12,6 +12,6 @@ module LogStash::Filters
|
|||
|
||||
# Get the class name from the Filters namespace and create a new instance.
|
||||
# for name == 'foo' this will call LogStash::Filters::Foo.new
|
||||
LogStash::Filters.const_get(klass).new
|
||||
LogStash::Filters.const_get(klass).new(*args)
|
||||
end # def from_url
|
||||
end # module LogStash::Filters
|
30
lib/logstash/filters/field.rb
Normal file
30
lib/logstash/filters/field.rb
Normal file
|
@ -0,0 +1,30 @@
|
|||
require "logstash/namespace"
|
||||
require "ostruct"
|
||||
|
||||
class LogStash::Filters::Field
|
||||
class EvalSpace < OpenStruct
|
||||
def get_binding
|
||||
return binding
|
||||
end
|
||||
end
|
||||
|
||||
def initialize(config = {})
|
||||
@config = config
|
||||
end # def initialize
|
||||
|
||||
def register
|
||||
# nothing to do
|
||||
end # def register
|
||||
|
||||
def filter(event)
|
||||
data = EvalSpace.new(event.to_hash)
|
||||
|
||||
@config.each do |condition|
|
||||
if data.instance_eval(condition)
|
||||
return event
|
||||
end
|
||||
end
|
||||
event.cancel
|
||||
return event
|
||||
end
|
||||
end # class LogStash::Filters::Grok
|
65
lib/logstash/filters/grok.rb
Normal file
65
lib/logstash/filters/grok.rb
Normal file
|
@ -0,0 +1,65 @@
|
|||
require "logstash/namespace"
|
||||
|
||||
gem "jls-grok", ">=0.2.3071"
|
||||
require "grok" # rubygem 'jls-grok'
|
||||
|
||||
class LogStash::Filters::Grok
|
||||
def initialize(config = {})
|
||||
@config = config
|
||||
@grokpiles = {}
|
||||
end # def initialize
|
||||
|
||||
def register
|
||||
# TODO(sissel): Make patterns files come from the config
|
||||
@config.each do |tag, tagconfig|
|
||||
pile = Grok::Pile.new
|
||||
pile.add_patterns_from_file("patterns/grok-patterns")
|
||||
pile.add_patterns_from_file("patterns/linux-syslog")
|
||||
tagconfig["patterns"].each do |pattern|
|
||||
pile.compile(pattern)
|
||||
end
|
||||
@grokpiles[tag] = pile
|
||||
end # @config.each
|
||||
end # def register
|
||||
|
||||
def filter(event)
|
||||
# parse it with grok
|
||||
message = event.message
|
||||
match = false
|
||||
|
||||
if event.include?("tags")
|
||||
event["tags"].each do |tag|
|
||||
if @grokpiles.include?(tag)
|
||||
pile = @grokpiles[tag]
|
||||
grok, match = pile.match(message)
|
||||
break if match
|
||||
end # @grokpiles.include?(tag)
|
||||
end # event["tags"].each
|
||||
else
|
||||
#pattern = @grok.discover(message)
|
||||
#@grok.compile(pattern)
|
||||
#match = @grok.match(message)
|
||||
end
|
||||
|
||||
if match
|
||||
match.each_capture do |key, value|
|
||||
if key.include?(":")
|
||||
key = key.split(":")[1]
|
||||
end
|
||||
|
||||
if event[key].is_a?(String)
|
||||
event[key] = [event[key]]
|
||||
elsif event[key] == nil
|
||||
event[key] = []
|
||||
end
|
||||
|
||||
event[key] << value
|
||||
end
|
||||
else
|
||||
event["PARSEFAILURE"] = 1
|
||||
end
|
||||
|
||||
# TODO(sissel): Flatten single-entry arrays into a single value?
|
||||
return event
|
||||
end
|
||||
end # class LogStash::Filters::Grok
|
|
@ -48,7 +48,19 @@ class LogStash::Inputs::Amqp
|
|||
end
|
||||
end # def register
|
||||
|
||||
# TODO(sissel): Refactor this into a general 'input' class
|
||||
# tag this input
|
||||
public
|
||||
def tag(newtag)
|
||||
@tags << newtag
|
||||
end
|
||||
|
||||
def receive(event)
|
||||
if !event.include?("tags")
|
||||
event["tags"] = @tags
|
||||
else
|
||||
event["tags"] << @tags
|
||||
end
|
||||
@callback.call(event)
|
||||
end # def event
|
||||
end # class LogStash::Inputs::Amqp
|
|
@ -9,17 +9,28 @@ class LogStash::Inputs::File
|
|||
@url = URI.parse(url) if url.is_a? String
|
||||
@config = config
|
||||
@callback = block
|
||||
@tags = []
|
||||
end
|
||||
|
||||
public
|
||||
def register
|
||||
EventMachine::FileGlobWatchTail.new(@url.path, Reader, interval=60,
|
||||
exclude=[], receiver=self)
|
||||
end
|
||||
|
||||
# TODO(sissel): Refactor this into a general 'input' class
|
||||
# tag this input
|
||||
public
|
||||
def tag(newtag)
|
||||
@tags << newtag
|
||||
end
|
||||
|
||||
public
|
||||
def receive(event)
|
||||
event = LogStash::Event.new({
|
||||
:source => @url.to_s,
|
||||
:message => event,
|
||||
:tags => @tags,
|
||||
})
|
||||
@callback.call(event)
|
||||
end # def event
|
Loading…
Add table
Add a link
Reference in a new issue