diff --git a/lib/logstash/inputs/base.rb b/lib/logstash/inputs/base.rb index 194fe8aeb..ee6928483 100644 --- a/lib/logstash/inputs/base.rb +++ b/lib/logstash/inputs/base.rb @@ -97,4 +97,19 @@ class LogStash::Inputs::Base < LogStash::Plugin def to_event(raw, source) raise LogStash::ThisMethodWasRemoved("LogStash::Inputs::Base#to_event - you should use codecs now instead of to_event. Not sure what this means? Get help on logstash-users@googlegroups.com!") end # def to_event + + protected + def decorate(event) + # Only set 'type' if not already set. This is backwards-compatible behavior + event["type"] = @type if @type && !event.include?("type") + + if @tags.any? + event["tags"] ||= [] + event["tags"] += @tags + end + + @add_field.each do |field, value| + event[field] = value + end + end end # class LogStash::Inputs::Base diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index fdf7c7da7..ef128edec 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -77,6 +77,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # Hack to make codecs work @codec.decode(event.to_json) do |event| + decorate(event) output_queue << event end end diff --git a/lib/logstash/inputs/exec.rb b/lib/logstash/inputs/exec.rb index 800e2b0d8..ae26c0594 100644 --- a/lib/logstash/inputs/exec.rb +++ b/lib/logstash/inputs/exec.rb @@ -40,6 +40,7 @@ class LogStash::Inputs::Exec < LogStash::Inputs::Base out = IO.popen(@command) # out.read will block until the process finishes. @codec.decode(out.read) do |event| + decorate(event) event["source"] = "exec://#{Socket.gethostname}" event["command"] = @command queue << event diff --git a/lib/logstash/inputs/file.rb b/lib/logstash/inputs/file.rb index cdfd69974..3909a1c54 100644 --- a/lib/logstash/inputs/file.rb +++ b/lib/logstash/inputs/file.rb @@ -130,8 +130,8 @@ class LogStash::Inputs::File < LogStash::Inputs::Base source = "file://#{hostname}/#{path.gsub("\\","/")}" @logger.debug? && @logger.debug("Received line", :path => path, :line => line) @codec.decode(line) do |event| + decorate(event) event["source"] = source - event["type"] = @type if @type queue << event end end diff --git a/lib/logstash/inputs/ganglia.rb b/lib/logstash/inputs/ganglia.rb index dc3e84a8a..95573f0c3 100644 --- a/lib/logstash/inputs/ganglia.rb +++ b/lib/logstash/inputs/ganglia.rb @@ -117,9 +117,7 @@ class LogStash::Inputs::Ganglia < LogStash::Inputs::Base return nil unless data event=LogStash::Event.new - #event['@timestamp'] = Time.now.to_i event["source"] = source - event["type"] = @type data["program"] = "ganglia" event["log_host"] = data["hostname"] diff --git a/lib/logstash/inputs/gemfire.rb b/lib/logstash/inputs/gemfire.rb index e08c9cccf..2ffff6b17 100644 --- a/lib/logstash/inputs/gemfire.rb +++ b/lib/logstash/inputs/gemfire.rb @@ -143,6 +143,7 @@ class LogStash::Inputs::Gemfire < LogStash::Inputs::Threadable def process_event(event, event_name, source) message = deserialize_message(event) @codec.decode(message) do |event| + decorate(event) event["source"] = source @logstash_queue << event end diff --git a/lib/logstash/inputs/generator.rb b/lib/logstash/inputs/generator.rb index 0fb4675eb..a49c38858 100644 --- a/lib/logstash/inputs/generator.rb +++ b/lib/logstash/inputs/generator.rb @@ -67,6 +67,7 @@ class LogStash::Inputs::Generator < LogStash::Inputs::Threadable while !finished? && (@count <= 0 || number < @count) @lines.each do |line| @codec.decode(line.clone) do |event| + decorate(event) event["source"] = source event["sequence"] = number queue << event diff --git a/lib/logstash/inputs/heroku.rb b/lib/logstash/inputs/heroku.rb index 6353f0166..b6e661500 100644 --- a/lib/logstash/inputs/heroku.rb +++ b/lib/logstash/inputs/heroku.rb @@ -41,6 +41,7 @@ class LogStash::Inputs::Heroku < LogStash::Inputs::Base # this to 0 makes it fetch *all* events, not what I want. client.read_logs(@app, ["tail=1", "num=1"]) do |chunk| @codec.decode(chunk).each do |event| + decorate(event) event["app"] = @app queue << event end diff --git a/lib/logstash/inputs/irc.rb b/lib/logstash/inputs/irc.rb index 6b7e96605..b8ee753ad 100644 --- a/lib/logstash/inputs/irc.rb +++ b/lib/logstash/inputs/irc.rb @@ -76,6 +76,7 @@ class LogStash::Inputs::Irc < LogStash::Inputs::Base msg = @irc_queue.pop if msg.user @codec.decode(msg.message) do |event| + decorate(event) event["channel"] = msg.channel.to_s event["nick"] = msg.user.nick event["server"] = "#{@host}:#{@port}" diff --git a/lib/logstash/inputs/lumberjack.rb b/lib/logstash/inputs/lumberjack.rb index 7e675ff5c..ca1a8f636 100644 --- a/lib/logstash/inputs/lumberjack.rb +++ b/lib/logstash/inputs/lumberjack.rb @@ -43,6 +43,7 @@ class LogStash::Inputs::Lumberjack < LogStash::Inputs::Base def run(output_queue) @lumberjack.run do |l| @codec.decode(l.delete("line")) do |event| + decorate(event) l.each { |k,v| event[k] = v } output_queue << event end diff --git a/lib/logstash/inputs/pipe.rb b/lib/logstash/inputs/pipe.rb index 9b6dcad15..ae05e037a 100644 --- a/lib/logstash/inputs/pipe.rb +++ b/lib/logstash/inputs/pipe.rb @@ -38,6 +38,7 @@ class LogStash::Inputs::Pipe < LogStash::Inputs::Base source = "pipe://#{hostname}/#{command}" @logger.debug? && @logger.debug("Received line", :command => command, :line => line) @codec.decode(line) do |event| + decorate(event) event["source"] = source queue << event end diff --git a/lib/logstash/inputs/rabbitmq/bunny.rb b/lib/logstash/inputs/rabbitmq/bunny.rb index 208427d45..e6f879c69 100644 --- a/lib/logstash/inputs/rabbitmq/bunny.rb +++ b/lib/logstash/inputs/rabbitmq/bunny.rb @@ -106,6 +106,7 @@ class LogStash::Inputs::RabbitMQ @consumer = Bunny::Consumer.new(@ch, @q) @q.subscribe(:manual_ack => @ack, :block => true) do |delivery_info, properties, data| @codec.decode(data) do |event| + decorate(event) event["source"] = @connection_url @output_queue << event end diff --git a/lib/logstash/inputs/rabbitmq/hot_bunnies.rb b/lib/logstash/inputs/rabbitmq/hot_bunnies.rb index e6174c499..ace39816f 100644 --- a/lib/logstash/inputs/rabbitmq/hot_bunnies.rb +++ b/lib/logstash/inputs/rabbitmq/hot_bunnies.rb @@ -104,6 +104,7 @@ class LogStash::Inputs::RabbitMQ # in an @ivar even though we use a blocking version of HB::Queue#subscribe @consumer = @q.build_consumer(:block => true) do |metadata, data| @codec.decode(data) do |event| + decorate(event) event["source"] = @connection_url @output_queue << event if event @ch.ack(metadata.delivery_tag) if @ack diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index 6e199d23e..773049e69 100644 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -127,6 +127,7 @@ EOF def queue_event(msg, output_queue) begin @codec.decode(msg) do |event| + decorate(event) output_queue << event end rescue => e # parse or event creation error diff --git a/lib/logstash/inputs/relp.rb b/lib/logstash/inputs/relp.rb index ec423efbf..95dc64864 100644 --- a/lib/logstash/inputs/relp.rb +++ b/lib/logstash/inputs/relp.rb @@ -43,6 +43,7 @@ class LogStash::Inputs::Relp < LogStash::Inputs::Base loop do frame = relpserver.syslog_read(socket) @codec.decode(frame["message"]) do |event| + decorate(event) event["source"] = event_source output_queue << event end diff --git a/lib/logstash/inputs/s3.rb b/lib/logstash/inputs/s3.rb index 7dc3576d4..c742f989d 100644 --- a/lib/logstash/inputs/s3.rb +++ b/lib/logstash/inputs/s3.rb @@ -229,6 +229,7 @@ class LogStash::Inputs::S3 < LogStash::Inputs::Base end else @codec.decode(line) do |event| + decorate(event) unless metadata[:version].nil? event["cloudfront_version"] = metadata[:version] end diff --git a/lib/logstash/inputs/sqs.rb b/lib/logstash/inputs/sqs.rb index 750481603..5aaf90a3b 100644 --- a/lib/logstash/inputs/sqs.rb +++ b/lib/logstash/inputs/sqs.rb @@ -114,6 +114,7 @@ class LogStash::Inputs::SQS < LogStash::Inputs::Threadable @sqs_queue.receive_message(receive_opts) do |message| if message @codec.decode(message.body) do |event| + decorate(event) event["source"] = @sqs_queue if @id_field event[@id_field] = message.id diff --git a/lib/logstash/inputs/stdin.rb b/lib/logstash/inputs/stdin.rb index b6283873b..aa313fb00 100644 --- a/lib/logstash/inputs/stdin.rb +++ b/lib/logstash/inputs/stdin.rb @@ -24,9 +24,8 @@ class LogStash::Inputs::Stdin < LogStash::Inputs::Base # IO.select call in JRuby. Bummer :( data = $stdin.sysread(16384) @codec.decode(data) do |event| + decorate(event) event["source"] = @host - event["type"] = @type if @type - @tags && @tags.each { |t| event.tag(t) } queue << event end rescue EOFError, LogStash::ShutdownSignal diff --git a/lib/logstash/inputs/stomp.rb b/lib/logstash/inputs/stomp.rb index 4f40a4a4e..e6ca38191 100644 --- a/lib/logstash/inputs/stomp.rb +++ b/lib/logstash/inputs/stomp.rb @@ -62,6 +62,7 @@ class LogStash::Inputs::Stomp < LogStash::Inputs::Base def subscription_handler @client.subscribe(@destination) do |msg| @codec.decode(msg.body) do |event| + decorate(event) @output_queue << event end end diff --git a/lib/logstash/inputs/syslog.rb b/lib/logstash/inputs/syslog.rb index 4844a8a0a..77bb8904a 100644 --- a/lib/logstash/inputs/syslog.rb +++ b/lib/logstash/inputs/syslog.rb @@ -119,6 +119,7 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base # Ruby uri sucks, so don't use it. source = "syslog://#{client[3]}/" @codec.decode(payload) do |event| + decorate(event) event["source"] = client[3] syslog_relay(event) output_queue << event diff --git a/lib/logstash/inputs/tcp.rb b/lib/logstash/inputs/tcp.rb index 8df0026be..49e42c85f 100644 --- a/lib/logstash/inputs/tcp.rb +++ b/lib/logstash/inputs/tcp.rb @@ -110,6 +110,7 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base end end codec.decode(buf) do |event| + decorate(event) event["source"] = event_source event["sslsubject"] = socket.peer_cert.subject if @ssl_enable && @ssl_verify output_queue << event diff --git a/lib/logstash/inputs/udp.rb b/lib/logstash/inputs/udp.rb index a2a1354eb..4788e38f6 100644 --- a/lib/logstash/inputs/udp.rb +++ b/lib/logstash/inputs/udp.rb @@ -59,6 +59,7 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base loop do payload, client = @udp.recvfrom(@buffer_size) @codec.decode(payload) do |event| + decorate(event) event["source"] = "#{client[3]}:#{client[1]}" output_queue << event end diff --git a/lib/logstash/inputs/unix.rb b/lib/logstash/inputs/unix.rb index f3a08f3a9..5a91d0d7a 100644 --- a/lib/logstash/inputs/unix.rb +++ b/lib/logstash/inputs/unix.rb @@ -79,6 +79,7 @@ class LogStash::Inputs::Unix < LogStash::Inputs::Base end end @codec.decode(buf) do |event| + decorate(event) event["source"] = event_source output_queue << e end diff --git a/lib/logstash/inputs/websocket.rb b/lib/logstash/inputs/websocket.rb index df4c2a007..3cbc580bf 100644 --- a/lib/logstash/inputs/websocket.rb +++ b/lib/logstash/inputs/websocket.rb @@ -35,6 +35,7 @@ class LogStash::Inputs::Websocket < LogStash::Inputs::Base websocket = agent.websocket!(@url) websocket.each do |payload| @codec.decode(payload) do |event| + decorate(event) output_queue << event end end diff --git a/lib/logstash/inputs/xmpp.rb b/lib/logstash/inputs/xmpp.rb index 6e22f45bf..e6beae8d3 100644 --- a/lib/logstash/inputs/xmpp.rb +++ b/lib/logstash/inputs/xmpp.rb @@ -53,6 +53,7 @@ class LogStash::Inputs::Xmpp < LogStash::Inputs::Base @muc.join(room) @muc.on_message do |time,from,body| @codec.decode(body) do |event| + decorate(event) event["room"] = room event["from"] = from queue << event diff --git a/lib/logstash/inputs/zeromq.rb b/lib/logstash/inputs/zeromq.rb index 82cb814ee..8f231ff98 100644 --- a/lib/logstash/inputs/zeromq.rb +++ b/lib/logstash/inputs/zeromq.rb @@ -143,6 +143,7 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base @sender ||= "zmq+#{@topology}://#{host}/#{@type}" @codec.decode(msg) do |event| + decorate(event) output_queue << event end end