diff --git a/bin/logstash b/bin/logstash index 5bdb4bd44..f87f83591 100755 --- a/bin/logstash +++ b/bin/logstash @@ -3,8 +3,8 @@ require "rubygems" require "eventmachine" require "logstash/agent" -require "yaml" require "optparse" +require "yaml" Settings = Struct.new(:config_file, :daemonize) diff --git a/etc/logstash-standalone.yaml b/etc/logstash-standalone.yaml index 1e75025d0..455475d25 100644 --- a/etc/logstash-standalone.yaml +++ b/etc/logstash-standalone.yaml @@ -31,9 +31,8 @@ filters: apache-access: timestamp: "%d/%b/%Y:%H:%M:%S %Z" outputs: -# For this demo, we'll write to websockets... - stdout:/// -- elasticsearch://localhost:9200/logs/all +#- elasticsearch://localhost:9200/logs/all # But we could write to mongodb, too. # - mongodb://localhost/parsedlogs # And also write to an AMQP topic diff --git a/lib/logstash/event.rb b/lib/logstash/event.rb index da6b23371..7552b0278 100644 --- a/lib/logstash/event.rb +++ b/lib/logstash/event.rb @@ -8,11 +8,11 @@ module LogStash; class Event @data = { "@source" => "unknown", "@tags" => [], - "fields" => {}, + "@fields" => {}, }.merge(data) - if !timestamp - timestamp = LogStash::Time.now.utc.to_iso8601 + if !@data.include?("@timestamp") + @data["@timestamp"] = LogStash::Time.now.utc.to_iso8601 end end # def initialize @@ -41,9 +41,9 @@ module LogStash; class Event def tags; @data["@tags"]; end # def tags # field-related access - def [](key); fields[key] end # def [] - def []=(key, value); fields[key] = value end # def []= - def fields; return @data["fields"] end # def fields + def [](key); @data["@fields"][key] end # def [] + def []=(key, value); @data["@fields"][key] = value end # def []= + def fields; return @data["@fields"] end # def fields def to_json; return @data.to_json end # def to_json diff --git a/lib/logstash/filters/base.rb b/lib/logstash/filters/base.rb new file mode 100644 index 000000000..def064b7b --- /dev/null +++ b/lib/logstash/filters/base.rb @@ -0,0 +1,17 @@ +require "logstash/namespace" +require "logstash/logging" + +class LogStash::Filters::Base + def initialize(config = {}) + @logger = LogStash::Logger.new(STDERR) + @config = config + end # def initialize + + def register + throw "#{self.class}#register must be overidden" + end # def register + + def filter(event) + throw "#{self.class}#filter must be overidden" + end # def filter +end # class LogStash::Filters::Base diff --git a/lib/logstash/filters/date.rb b/lib/logstash/filters/date.rb index 36a40c16b..9123b199e 100644 --- a/lib/logstash/filters/date.rb +++ b/lib/logstash/filters/date.rb @@ -1,8 +1,7 @@ -require "logstash/namespace" +require "logstash/filters/base" require "logstash/time" -require "logstash/logging" -class LogStash::Filters::Date +class LogStash::Filters::Date < LogStash::Filters::Base # The 'date' filter will take a value from your event and use it as the # event timestamp. This is useful for parsing logs generated on remote # servers or for importing old logs. @@ -18,9 +17,9 @@ class LogStash::Filters::Date # # The format is whatever is supported by Ruby's DateTime.strptime def initialize(config = {}) - @config = config + super + @tags = Hash.new { |h,k| h[k] = [] } - @logger = LogStash::Logger.new(STDERR) end # def initialize def register diff --git a/lib/logstash/filters/field.rb b/lib/logstash/filters/field.rb index 4ccefc284..caf2fd16d 100644 --- a/lib/logstash/filters/field.rb +++ b/lib/logstash/filters/field.rb @@ -1,7 +1,7 @@ -require "logstash/namespace" +require "logstash/filters/base" require "ostruct" -class LogStash::Filters::Field +class LogStash::Filters::Field < LogStash::Filters::Base class EvalSpace < OpenStruct def get_binding return binding @@ -9,7 +9,7 @@ class LogStash::Filters::Field end def initialize(config = {}) - @config = config + super end # def initialize def register @@ -26,4 +26,4 @@ class LogStash::Filters::Field end event.cancel end -end # class LogStash::Filters::Grok +end # class LogStash::Filters::Field diff --git a/lib/logstash/filters/grok.rb b/lib/logstash/filters/grok.rb index e5e815921..4db8face2 100644 --- a/lib/logstash/filters/grok.rb +++ b/lib/logstash/filters/grok.rb @@ -1,13 +1,12 @@ -require "logstash/namespace" -require "logstash/logging" +require "logstash/filters/base" gem "jls-grok", ">=0.2.3071" require "grok" # rubygem 'jls-grok' -class LogStash::Filters::Grok +class LogStash::Filters::Grok < LogStash::Filters::Base def initialize(config = {}) - @logger = LogStash::Logger.new(STDERR) - @config = config + super + @grokpiles = {} end # def initialize @@ -39,6 +38,7 @@ class LogStash::Filters::Grok end # @grokpiles.include?(tag) end # event.tags.each else + # TODO(2.0): support grok pattern discovery #pattern = @grok.discover(message) #@grok.compile(pattern) #match = @grok.match(message) @@ -68,7 +68,7 @@ class LogStash::Filters::Grok else # Tag this event if we can't parse it. We can use this later to # reparse+reindex logs if we improve the patterns given . - event.tags << "grokparsefailure" + event.tags << "_grokparsefailure" end end # def filter end # class LogStash::Filters::Grok diff --git a/lib/logstash/inputs/amqp.rb b/lib/logstash/inputs/amqp.rb index 84f2e398c..bac07a185 100644 --- a/lib/logstash/inputs/amqp.rb +++ b/lib/logstash/inputs/amqp.rb @@ -1,18 +1,14 @@ -require "logstash/namespace" -require "logstash/event" -require "uri" +require "logstash/inputs/base" require "amqp" # rubygem 'amqp' require "mq" # rubygem 'amqp' require "uuidtools" # rubygem 'uuidtools' -class LogStash::Inputs::Amqp +class LogStash::Inputs::Amqp < LogStash::Inputs::Base TYPES = [ "fanout", "queue", "topic" ] def initialize(url, config={}, &block) - @url = url - @url = URI.parse(url) if url.is_a? String - @config = config - @callback = block + super + @mq = nil # Handle path // @@ -22,7 +18,7 @@ class LogStash::Inputs::Amqp end if !TYPES.include?(@type) - raise "Invalid type '#{@type}' must be one 'fanout' or 'queue'" + raise "Invalid type '#{@type}' must be one of #{TYPES.JOIN(", ")}" end end @@ -48,16 +44,4 @@ class LogStash::Inputs::Amqp header.ack end end # def register - - # TODO(sissel): Refactor this into a general 'input' class - # tag this input - public - def tag(newtag) - @tags << newtag - end - - def receive(event) - event.tags |= @tags # set union - @callback.call(event) - end # def event end # class LogStash::Inputs::Amqp diff --git a/lib/logstash/inputs/base.rb b/lib/logstash/inputs/base.rb new file mode 100644 index 000000000..542d0686c --- /dev/null +++ b/lib/logstash/inputs/base.rb @@ -0,0 +1,28 @@ +require "logstash/namespace" +require "logstash/event" +require "logstash/logging" +require "uri" + +class LogStash::Inputs::Base + def initialize(url, config={}, &block) + @logger = LogStash::Logger.new(STDERR) + @url = url + @url = URI.parse(url) if url.is_a? String + @config = config + @callback = block + @tags = [] + end + + def register + throw "#{self.class}#register must be overidden" + end + + def tag(newtag) + @tags << newtag + end + + def receive(event) + event.tags |= @tags # set union + @callback.call(event) + end +end # class LogStash::Inputs::Base diff --git a/lib/logstash/inputs/file.rb b/lib/logstash/inputs/file.rb index 78cf2e93c..21130a8b8 100644 --- a/lib/logstash/inputs/file.rb +++ b/lib/logstash/inputs/file.rb @@ -1,40 +1,21 @@ -require "logstash/namespace" -require "logstash/event" +require "logstash/inputs/base" require "eventmachine-tail" -require "uri" require "socket" # for Socket.gethostname -class LogStash::Inputs::File +class LogStash::Inputs::File < LogStash::Inputs::Base def initialize(url, config={}, &block) - @logger = Logger.new(STDERR) - - @url = url - @url = URI.parse(url) if url.is_a? String + super # Hack the hostname into the url. # This works since file:// urls don't generally have a host in it. @url.host = Socket.gethostname - - @config = config - @callback = block - @tags = [] end - public def register EventMachine::FileGlobWatchTail.new(@url.path, Reader, interval=60, exclude=[], receiver=self) end - # TODO(sissel): Refactor this into a general 'input' class - # tag this input - public - def tag(newtag) - @logger.debug("Adding tag #{newtag} to #{@url}") - @tags << newtag - end - - public def receive(event) event = LogStash::Event.new({ "@source" => @url.to_s, @@ -44,7 +25,6 @@ class LogStash::Inputs::File @callback.call(event) end # def event - private class Reader < EventMachine::FileTail def initialize(path, receiver) super(path) @@ -53,7 +33,7 @@ class LogStash::Inputs::File end def receive_data(data) - # TODO(sissel): Support multiline log data + # TODO(2.0): Support multiline log data @buffer.extract(data).each do |line| @receiver.receive(line) end diff --git a/lib/logstash/outputs/amqp.rb b/lib/logstash/outputs/amqp.rb index 3ffbb7601..73cb8ca34 100644 --- a/lib/logstash/outputs/amqp.rb +++ b/lib/logstash/outputs/amqp.rb @@ -1,16 +1,11 @@ -require "logstash/namespace" -require "logstash/event" -require "uri" +require "logstash/outputs/base" require "amqp" # rubygem 'amqp' require "mq" # rubygem 'amqp' -class LogStash::Outputs::Amqp +class LogStash::Outputs::Amqp < LogStash::Outputs::Base TYPES = [ "fanout", "queue", "topic" ] def initialize(url, config={}, &block) - @url = url - @url = URI.parse(url) if url.is_a? String - @config = config - @mq = nil + super # Handle path // unused, @type, @name = @url.path.split("/", 3) diff --git a/lib/logstash/outputs/base.rb b/lib/logstash/outputs/base.rb new file mode 100644 index 000000000..0ca5fd2f3 --- /dev/null +++ b/lib/logstash/outputs/base.rb @@ -0,0 +1,19 @@ +require "logstash/namespace" +require "logstash/event" +require "uri" + +class LogStash::Outputs::Base + def initialize(url, config={}, &block) + @url = url + @url = URI.parse(url) if url.is_a? String + @config = config + end + + def register + throw "#{self.class}#register must be overidden" + end # def register + + def receive(event) + throw "#{self.class}#receive must be overidden" + end +end # class LogStash::Outputs::Base diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 19cd871cb..c2fa972c9 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -1,13 +1,9 @@ -require "logstash/namespace" -require "logstash/event" -require "uri" +require "logstash/outputs/base" require "em-http-request" -class LogStash::Outputs::Elasticsearch +class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base def initialize(url, config={}, &block) - @url = url - @url = URI.parse(url) if url.is_a? String - @config = config + super end def register @@ -19,7 +15,6 @@ class LogStash::Outputs::Elasticsearch end # def register def receive(event) - #http = EventMachine::HttpRequest.new(@httpurl.to_s).post :body => event.to_json req = @http.post :body => event.to_json req.errback do $stderr.puts "Request to index to #{@httpurl.to_s} failed. Event was #{event.to_s}" diff --git a/lib/logstash/outputs/mongodb.rb b/lib/logstash/outputs/mongodb.rb index 441752549..290ff10cf 100644 --- a/lib/logstash/outputs/mongodb.rb +++ b/lib/logstash/outputs/mongodb.rb @@ -1,13 +1,9 @@ -require "logstash/namespace" -require "logstash/event" -require "uri" +require "logstash/outputs/base" require "em-mongo" -class LogStash::Outputs::Mongodb +class LogStash::Outputs::Mongodb < LogStash::Outputs::Base def initialize(url, config={}, &block) - @url = url - @url = URI.parse(url) if url.is_a? String - @config = config + super end def register @@ -18,7 +14,6 @@ class LogStash::Outputs::Mongodb end # def register def receive(event) - puts "Got: #{event}" @mongodb.collection("events").insert(event.to_hash) end # def event -end # class LogStash::Outputs::Websocket +end # class LogStash::Outputs::Mongodb diff --git a/lib/logstash/outputs/stdout.rb b/lib/logstash/outputs/stdout.rb index 689c4f8a1..07e797804 100644 --- a/lib/logstash/outputs/stdout.rb +++ b/lib/logstash/outputs/stdout.rb @@ -1,12 +1,8 @@ -require "logstash/namespace" -require "logstash/event" -require "uri" +require "logstash/outputs/base" -class LogStash::Outputs::Stdout +class LogStash::Outputs::Stdout < LogStash::Outputs::Base def initialize(url, config={}, &block) - @url = url - @url = URI.parse(url) if url.is_a? String - @config = config + super end def register diff --git a/lib/logstash/outputs/websocket.rb b/lib/logstash/outputs/websocket.rb index da0047877..54445391b 100644 --- a/lib/logstash/outputs/websocket.rb +++ b/lib/logstash/outputs/websocket.rb @@ -1,17 +1,12 @@ -require "logstash/namespace" -require "logstash/event" -require "uri" +require "logstash/outputs/base" require "em-websocket" # rubygem 'em-websocket' -class LogStash::Outputs::Websocket +class LogStash::Outputs::Websocket < LogStash::Outputs::Base def initialize(url, config={}, &block) - @url = url - @url = URI.parse(url) if url.is_a? String - @config = config + super end def register - puts "register" @channel = EventMachine::Channel.new host = (@url.host or "0.0.0.0") port = (@url.port or 3000) diff --git a/logstash.gemspec b/logstash.gemspec index d168bf34d..04ccfdc6b 100644 --- a/logstash.gemspec +++ b/logstash.gemspec @@ -13,6 +13,8 @@ Gem::Specification.new do |spec| spec.description = "None yet" spec.add_dependency("eventmachine-tail") spec.add_dependency("jls-grok", ">= 0.2.3071") + spec.add_dependency("awesome_print") + spec.add_dependency("json") # TODO: In the future, make these optional # for websocket://