create base classes for inputs/outputs/filters

This commit is contained in:
Pete Fritchman 2010-10-31 04:46:12 +00:00
parent 2aba74e2f4
commit f7ad0a0a9a
17 changed files with 113 additions and 109 deletions

View file

@ -3,8 +3,8 @@
require "rubygems" require "rubygems"
require "eventmachine" require "eventmachine"
require "logstash/agent" require "logstash/agent"
require "yaml"
require "optparse" require "optparse"
require "yaml"
Settings = Struct.new(:config_file, :daemonize) Settings = Struct.new(:config_file, :daemonize)

View file

@ -31,9 +31,8 @@ filters:
apache-access: apache-access:
timestamp: "%d/%b/%Y:%H:%M:%S %Z" timestamp: "%d/%b/%Y:%H:%M:%S %Z"
outputs: outputs:
# For this demo, we'll write to websockets...
- stdout:/// - stdout:///
- elasticsearch://localhost:9200/logs/all #- elasticsearch://localhost:9200/logs/all
# But we could write to mongodb, too. # But we could write to mongodb, too.
# - mongodb://localhost/parsedlogs # - mongodb://localhost/parsedlogs
# And also write to an AMQP topic # And also write to an AMQP topic

View file

@ -8,11 +8,11 @@ module LogStash; class Event
@data = { @data = {
"@source" => "unknown", "@source" => "unknown",
"@tags" => [], "@tags" => [],
"fields" => {}, "@fields" => {},
}.merge(data) }.merge(data)
if !timestamp if !@data.include?("@timestamp")
timestamp = LogStash::Time.now.utc.to_iso8601 @data["@timestamp"] = LogStash::Time.now.utc.to_iso8601
end end
end # def initialize end # def initialize
@ -41,9 +41,9 @@ module LogStash; class Event
def tags; @data["@tags"]; end # def tags def tags; @data["@tags"]; end # def tags
# field-related access # field-related access
def [](key); fields[key] end # def [] def [](key); @data["@fields"][key] end # def []
def []=(key, value); fields[key] = value end # def []= def []=(key, value); @data["@fields"][key] = value end # def []=
def fields; return @data["fields"] end # def fields def fields; return @data["@fields"] end # def fields
def to_json; return @data.to_json end # def to_json def to_json; return @data.to_json end # def to_json

View file

@ -0,0 +1,17 @@
require "logstash/namespace"
require "logstash/logging"
class LogStash::Filters::Base
def initialize(config = {})
@logger = LogStash::Logger.new(STDERR)
@config = config
end # def initialize
def register
throw "#{self.class}#register must be overidden"
end # def register
def filter(event)
throw "#{self.class}#filter must be overidden"
end # def filter
end # class LogStash::Filters::Base

View file

@ -1,8 +1,7 @@
require "logstash/namespace" require "logstash/filters/base"
require "logstash/time" require "logstash/time"
require "logstash/logging"
class LogStash::Filters::Date class LogStash::Filters::Date < LogStash::Filters::Base
# The 'date' filter will take a value from your event and use it as the # The 'date' filter will take a value from your event and use it as the
# event timestamp. This is useful for parsing logs generated on remote # event timestamp. This is useful for parsing logs generated on remote
# servers or for importing old logs. # servers or for importing old logs.
@ -18,9 +17,9 @@ class LogStash::Filters::Date
# #
# The format is whatever is supported by Ruby's DateTime.strptime # The format is whatever is supported by Ruby's DateTime.strptime
def initialize(config = {}) def initialize(config = {})
@config = config super
@tags = Hash.new { |h,k| h[k] = [] } @tags = Hash.new { |h,k| h[k] = [] }
@logger = LogStash::Logger.new(STDERR)
end # def initialize end # def initialize
def register def register

View file

@ -1,7 +1,7 @@
require "logstash/namespace" require "logstash/filters/base"
require "ostruct" require "ostruct"
class LogStash::Filters::Field class LogStash::Filters::Field < LogStash::Filters::Base
class EvalSpace < OpenStruct class EvalSpace < OpenStruct
def get_binding def get_binding
return binding return binding
@ -9,7 +9,7 @@ class LogStash::Filters::Field
end end
def initialize(config = {}) def initialize(config = {})
@config = config super
end # def initialize end # def initialize
def register def register
@ -26,4 +26,4 @@ class LogStash::Filters::Field
end end
event.cancel event.cancel
end end
end # class LogStash::Filters::Grok end # class LogStash::Filters::Field

View file

@ -1,13 +1,12 @@
require "logstash/namespace" require "logstash/filters/base"
require "logstash/logging"
gem "jls-grok", ">=0.2.3071" gem "jls-grok", ">=0.2.3071"
require "grok" # rubygem 'jls-grok' require "grok" # rubygem 'jls-grok'
class LogStash::Filters::Grok class LogStash::Filters::Grok < LogStash::Filters::Base
def initialize(config = {}) def initialize(config = {})
@logger = LogStash::Logger.new(STDERR) super
@config = config
@grokpiles = {} @grokpiles = {}
end # def initialize end # def initialize
@ -39,6 +38,7 @@ class LogStash::Filters::Grok
end # @grokpiles.include?(tag) end # @grokpiles.include?(tag)
end # event.tags.each end # event.tags.each
else else
# TODO(2.0): support grok pattern discovery
#pattern = @grok.discover(message) #pattern = @grok.discover(message)
#@grok.compile(pattern) #@grok.compile(pattern)
#match = @grok.match(message) #match = @grok.match(message)
@ -68,7 +68,7 @@ class LogStash::Filters::Grok
else else
# Tag this event if we can't parse it. We can use this later to # Tag this event if we can't parse it. We can use this later to
# reparse+reindex logs if we improve the patterns given . # reparse+reindex logs if we improve the patterns given .
event.tags << "grokparsefailure" event.tags << "_grokparsefailure"
end end
end # def filter end # def filter
end # class LogStash::Filters::Grok end # class LogStash::Filters::Grok

View file

@ -1,18 +1,14 @@
require "logstash/namespace" require "logstash/inputs/base"
require "logstash/event"
require "uri"
require "amqp" # rubygem 'amqp' require "amqp" # rubygem 'amqp'
require "mq" # rubygem 'amqp' require "mq" # rubygem 'amqp'
require "uuidtools" # rubygem 'uuidtools' require "uuidtools" # rubygem 'uuidtools'
class LogStash::Inputs::Amqp class LogStash::Inputs::Amqp < LogStash::Inputs::Base
TYPES = [ "fanout", "queue", "topic" ] TYPES = [ "fanout", "queue", "topic" ]
def initialize(url, config={}, &block) def initialize(url, config={}, &block)
@url = url super
@url = URI.parse(url) if url.is_a? String
@config = config
@callback = block
@mq = nil @mq = nil
# Handle path /<type>/<name> # Handle path /<type>/<name>
@ -22,7 +18,7 @@ class LogStash::Inputs::Amqp
end end
if !TYPES.include?(@type) if !TYPES.include?(@type)
raise "Invalid type '#{@type}' must be one 'fanout' or 'queue'" raise "Invalid type '#{@type}' must be one of #{TYPES.JOIN(", ")}"
end end
end end
@ -48,16 +44,4 @@ class LogStash::Inputs::Amqp
header.ack header.ack
end end
end # def register end # def register
# TODO(sissel): Refactor this into a general 'input' class
# tag this input
public
def tag(newtag)
@tags << newtag
end
def receive(event)
event.tags |= @tags # set union
@callback.call(event)
end # def event
end # class LogStash::Inputs::Amqp end # class LogStash::Inputs::Amqp

View file

@ -0,0 +1,28 @@
require "logstash/namespace"
require "logstash/event"
require "logstash/logging"
require "uri"
class LogStash::Inputs::Base
def initialize(url, config={}, &block)
@logger = LogStash::Logger.new(STDERR)
@url = url
@url = URI.parse(url) if url.is_a? String
@config = config
@callback = block
@tags = []
end
def register
throw "#{self.class}#register must be overidden"
end
def tag(newtag)
@tags << newtag
end
def receive(event)
event.tags |= @tags # set union
@callback.call(event)
end
end # class LogStash::Inputs::Base

View file

@ -1,40 +1,21 @@
require "logstash/namespace" require "logstash/inputs/base"
require "logstash/event"
require "eventmachine-tail" require "eventmachine-tail"
require "uri"
require "socket" # for Socket.gethostname require "socket" # for Socket.gethostname
class LogStash::Inputs::File class LogStash::Inputs::File < LogStash::Inputs::Base
def initialize(url, config={}, &block) def initialize(url, config={}, &block)
@logger = Logger.new(STDERR) super
@url = url
@url = URI.parse(url) if url.is_a? String
# Hack the hostname into the url. # Hack the hostname into the url.
# This works since file:// urls don't generally have a host in it. # This works since file:// urls don't generally have a host in it.
@url.host = Socket.gethostname @url.host = Socket.gethostname
@config = config
@callback = block
@tags = []
end end
public
def register def register
EventMachine::FileGlobWatchTail.new(@url.path, Reader, interval=60, EventMachine::FileGlobWatchTail.new(@url.path, Reader, interval=60,
exclude=[], receiver=self) exclude=[], receiver=self)
end end
# TODO(sissel): Refactor this into a general 'input' class
# tag this input
public
def tag(newtag)
@logger.debug("Adding tag #{newtag} to #{@url}")
@tags << newtag
end
public
def receive(event) def receive(event)
event = LogStash::Event.new({ event = LogStash::Event.new({
"@source" => @url.to_s, "@source" => @url.to_s,
@ -44,7 +25,6 @@ class LogStash::Inputs::File
@callback.call(event) @callback.call(event)
end # def event end # def event
private
class Reader < EventMachine::FileTail class Reader < EventMachine::FileTail
def initialize(path, receiver) def initialize(path, receiver)
super(path) super(path)
@ -53,7 +33,7 @@ class LogStash::Inputs::File
end end
def receive_data(data) def receive_data(data)
# TODO(sissel): Support multiline log data # TODO(2.0): Support multiline log data
@buffer.extract(data).each do |line| @buffer.extract(data).each do |line|
@receiver.receive(line) @receiver.receive(line)
end end

View file

@ -1,16 +1,11 @@
require "logstash/namespace" require "logstash/outputs/base"
require "logstash/event"
require "uri"
require "amqp" # rubygem 'amqp' require "amqp" # rubygem 'amqp'
require "mq" # rubygem 'amqp' require "mq" # rubygem 'amqp'
class LogStash::Outputs::Amqp class LogStash::Outputs::Amqp < LogStash::Outputs::Base
TYPES = [ "fanout", "queue", "topic" ] TYPES = [ "fanout", "queue", "topic" ]
def initialize(url, config={}, &block) def initialize(url, config={}, &block)
@url = url super
@url = URI.parse(url) if url.is_a? String
@config = config
@mq = nil
# Handle path /<type>/<name> # Handle path /<type>/<name>
unused, @type, @name = @url.path.split("/", 3) unused, @type, @name = @url.path.split("/", 3)

View file

@ -0,0 +1,19 @@
require "logstash/namespace"
require "logstash/event"
require "uri"
class LogStash::Outputs::Base
def initialize(url, config={}, &block)
@url = url
@url = URI.parse(url) if url.is_a? String
@config = config
end
def register
throw "#{self.class}#register must be overidden"
end # def register
def receive(event)
throw "#{self.class}#receive must be overidden"
end
end # class LogStash::Outputs::Base

View file

@ -1,13 +1,9 @@
require "logstash/namespace" require "logstash/outputs/base"
require "logstash/event"
require "uri"
require "em-http-request" require "em-http-request"
class LogStash::Outputs::Elasticsearch class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
def initialize(url, config={}, &block) def initialize(url, config={}, &block)
@url = url super
@url = URI.parse(url) if url.is_a? String
@config = config
end end
def register def register
@ -19,7 +15,6 @@ class LogStash::Outputs::Elasticsearch
end # def register end # def register
def receive(event) def receive(event)
#http = EventMachine::HttpRequest.new(@httpurl.to_s).post :body => event.to_json
req = @http.post :body => event.to_json req = @http.post :body => event.to_json
req.errback do req.errback do
$stderr.puts "Request to index to #{@httpurl.to_s} failed. Event was #{event.to_s}" $stderr.puts "Request to index to #{@httpurl.to_s} failed. Event was #{event.to_s}"

View file

@ -1,13 +1,9 @@
require "logstash/namespace" require "logstash/outputs/base"
require "logstash/event"
require "uri"
require "em-mongo" require "em-mongo"
class LogStash::Outputs::Mongodb class LogStash::Outputs::Mongodb < LogStash::Outputs::Base
def initialize(url, config={}, &block) def initialize(url, config={}, &block)
@url = url super
@url = URI.parse(url) if url.is_a? String
@config = config
end end
def register def register
@ -18,7 +14,6 @@ class LogStash::Outputs::Mongodb
end # def register end # def register
def receive(event) def receive(event)
puts "Got: #{event}"
@mongodb.collection("events").insert(event.to_hash) @mongodb.collection("events").insert(event.to_hash)
end # def event end # def event
end # class LogStash::Outputs::Websocket end # class LogStash::Outputs::Mongodb

View file

@ -1,12 +1,8 @@
require "logstash/namespace" require "logstash/outputs/base"
require "logstash/event"
require "uri"
class LogStash::Outputs::Stdout class LogStash::Outputs::Stdout < LogStash::Outputs::Base
def initialize(url, config={}, &block) def initialize(url, config={}, &block)
@url = url super
@url = URI.parse(url) if url.is_a? String
@config = config
end end
def register def register

View file

@ -1,17 +1,12 @@
require "logstash/namespace" require "logstash/outputs/base"
require "logstash/event"
require "uri"
require "em-websocket" # rubygem 'em-websocket' require "em-websocket" # rubygem 'em-websocket'
class LogStash::Outputs::Websocket class LogStash::Outputs::Websocket < LogStash::Outputs::Base
def initialize(url, config={}, &block) def initialize(url, config={}, &block)
@url = url super
@url = URI.parse(url) if url.is_a? String
@config = config
end end
def register def register
puts "register"
@channel = EventMachine::Channel.new @channel = EventMachine::Channel.new
host = (@url.host or "0.0.0.0") host = (@url.host or "0.0.0.0")
port = (@url.port or 3000) port = (@url.port or 3000)

View file

@ -13,6 +13,8 @@ Gem::Specification.new do |spec|
spec.description = "None yet" spec.description = "None yet"
spec.add_dependency("eventmachine-tail") spec.add_dependency("eventmachine-tail")
spec.add_dependency("jls-grok", ">= 0.2.3071") spec.add_dependency("jls-grok", ">= 0.2.3071")
spec.add_dependency("awesome_print")
spec.add_dependency("json")
# TODO: In the future, make these optional # TODO: In the future, make these optional
# for websocket:// # for websocket://