- restore 'type' 'tags' and 'add_field' feature that was lost during the

Great Codec Migration ;)
This commit is contained in:
Jordan Sissel 2013-08-30 17:12:11 -07:00
parent 9518347f62
commit eaaa295b22
26 changed files with 39 additions and 5 deletions

View file

@ -97,4 +97,19 @@ class LogStash::Inputs::Base < LogStash::Plugin
def to_event(raw, source)
raise LogStash::ThisMethodWasRemoved("LogStash::Inputs::Base#to_event - you should use codecs now instead of to_event. Not sure what this means? Get help on logstash-users@googlegroups.com!")
end # def to_event
protected
def decorate(event)
# Only set 'type' if not already set. This is backwards-compatible behavior
event["type"] = @type if @type && !event.include?("type")
if @tags.any?
event["tags"] ||= []
event["tags"] += @tags
end
@add_field.each do |field, value|
event[field] = value
end
end
end # class LogStash::Inputs::Base

View file

@ -77,6 +77,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
# Hack to make codecs work
@codec.decode(event.to_json) do |event|
decorate(event)
output_queue << event
end
end

View file

@ -40,6 +40,7 @@ class LogStash::Inputs::Exec < LogStash::Inputs::Base
out = IO.popen(@command)
# out.read will block until the process finishes.
@codec.decode(out.read) do |event|
decorate(event)
event["source"] = "exec://#{Socket.gethostname}"
event["command"] = @command
queue << event

View file

@ -130,8 +130,8 @@ class LogStash::Inputs::File < LogStash::Inputs::Base
source = "file://#{hostname}/#{path.gsub("\\","/")}"
@logger.debug? && @logger.debug("Received line", :path => path, :line => line)
@codec.decode(line) do |event|
decorate(event)
event["source"] = source
event["type"] = @type if @type
queue << event
end
end

View file

@ -117,9 +117,7 @@ class LogStash::Inputs::Ganglia < LogStash::Inputs::Base
return nil unless data
event=LogStash::Event.new
#event['@timestamp'] = Time.now.to_i
event["source"] = source
event["type"] = @type
data["program"] = "ganglia"
event["log_host"] = data["hostname"]

View file

@ -143,6 +143,7 @@ class LogStash::Inputs::Gemfire < LogStash::Inputs::Threadable
def process_event(event, event_name, source)
message = deserialize_message(event)
@codec.decode(message) do |event|
decorate(event)
event["source"] = source
@logstash_queue << event
end

View file

@ -67,6 +67,7 @@ class LogStash::Inputs::Generator < LogStash::Inputs::Threadable
while !finished? && (@count <= 0 || number < @count)
@lines.each do |line|
@codec.decode(line.clone) do |event|
decorate(event)
event["source"] = source
event["sequence"] = number
queue << event

View file

@ -41,6 +41,7 @@ class LogStash::Inputs::Heroku < LogStash::Inputs::Base
# this to 0 makes it fetch *all* events, not what I want.
client.read_logs(@app, ["tail=1", "num=1"]) do |chunk|
@codec.decode(chunk).each do |event|
decorate(event)
event["app"] = @app
queue << event
end

View file

@ -76,6 +76,7 @@ class LogStash::Inputs::Irc < LogStash::Inputs::Base
msg = @irc_queue.pop
if msg.user
@codec.decode(msg.message) do |event|
decorate(event)
event["channel"] = msg.channel.to_s
event["nick"] = msg.user.nick
event["server"] = "#{@host}:#{@port}"

View file

@ -43,6 +43,7 @@ class LogStash::Inputs::Lumberjack < LogStash::Inputs::Base
def run(output_queue)
@lumberjack.run do |l|
@codec.decode(l.delete("line")) do |event|
decorate(event)
l.each { |k,v| event[k] = v }
output_queue << event
end

View file

@ -38,6 +38,7 @@ class LogStash::Inputs::Pipe < LogStash::Inputs::Base
source = "pipe://#{hostname}/#{command}"
@logger.debug? && @logger.debug("Received line", :command => command, :line => line)
@codec.decode(line) do |event|
decorate(event)
event["source"] = source
queue << event
end

View file

@ -106,6 +106,7 @@ class LogStash::Inputs::RabbitMQ
@consumer = Bunny::Consumer.new(@ch, @q)
@q.subscribe(:manual_ack => @ack, :block => true) do |delivery_info, properties, data|
@codec.decode(data) do |event|
decorate(event)
event["source"] = @connection_url
@output_queue << event
end

View file

@ -104,6 +104,7 @@ class LogStash::Inputs::RabbitMQ
# in an @ivar even though we use a blocking version of HB::Queue#subscribe
@consumer = @q.build_consumer(:block => true) do |metadata, data|
@codec.decode(data) do |event|
decorate(event)
event["source"] = @connection_url
@output_queue << event if event
@ch.ack(metadata.delivery_tag) if @ack

View file

@ -127,6 +127,7 @@ EOF
def queue_event(msg, output_queue)
begin
@codec.decode(msg) do |event|
decorate(event)
output_queue << event
end
rescue => e # parse or event creation error

View file

@ -43,6 +43,7 @@ class LogStash::Inputs::Relp < LogStash::Inputs::Base
loop do
frame = relpserver.syslog_read(socket)
@codec.decode(frame["message"]) do |event|
decorate(event)
event["source"] = event_source
output_queue << event
end

View file

@ -229,6 +229,7 @@ class LogStash::Inputs::S3 < LogStash::Inputs::Base
end
else
@codec.decode(line) do |event|
decorate(event)
unless metadata[:version].nil?
event["cloudfront_version"] = metadata[:version]
end

View file

@ -114,6 +114,7 @@ class LogStash::Inputs::SQS < LogStash::Inputs::Threadable
@sqs_queue.receive_message(receive_opts) do |message|
if message
@codec.decode(message.body) do |event|
decorate(event)
event["source"] = @sqs_queue
if @id_field
event[@id_field] = message.id

View file

@ -24,9 +24,8 @@ class LogStash::Inputs::Stdin < LogStash::Inputs::Base
# IO.select call in JRuby. Bummer :(
data = $stdin.sysread(16384)
@codec.decode(data) do |event|
decorate(event)
event["source"] = @host
event["type"] = @type if @type
@tags && @tags.each { |t| event.tag(t) }
queue << event
end
rescue EOFError, LogStash::ShutdownSignal

View file

@ -62,6 +62,7 @@ class LogStash::Inputs::Stomp < LogStash::Inputs::Base
def subscription_handler
@client.subscribe(@destination) do |msg|
@codec.decode(msg.body) do |event|
decorate(event)
@output_queue << event
end
end

View file

@ -119,6 +119,7 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
# Ruby uri sucks, so don't use it.
source = "syslog://#{client[3]}/"
@codec.decode(payload) do |event|
decorate(event)
event["source"] = client[3]
syslog_relay(event)
output_queue << event

View file

@ -110,6 +110,7 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
end
end
codec.decode(buf) do |event|
decorate(event)
event["source"] = event_source
event["sslsubject"] = socket.peer_cert.subject if @ssl_enable && @ssl_verify
output_queue << event

View file

@ -59,6 +59,7 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base
loop do
payload, client = @udp.recvfrom(@buffer_size)
@codec.decode(payload) do |event|
decorate(event)
event["source"] = "#{client[3]}:#{client[1]}"
output_queue << event
end

View file

@ -79,6 +79,7 @@ class LogStash::Inputs::Unix < LogStash::Inputs::Base
end
end
@codec.decode(buf) do |event|
decorate(event)
event["source"] = event_source
output_queue << e
end

View file

@ -35,6 +35,7 @@ class LogStash::Inputs::Websocket < LogStash::Inputs::Base
websocket = agent.websocket!(@url)
websocket.each do |payload|
@codec.decode(payload) do |event|
decorate(event)
output_queue << event
end
end

View file

@ -53,6 +53,7 @@ class LogStash::Inputs::Xmpp < LogStash::Inputs::Base
@muc.join(room)
@muc.on_message do |time,from,body|
@codec.decode(body) do |event|
decorate(event)
event["room"] = room
event["from"] = from
queue << event

View file

@ -143,6 +143,7 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base
@sender ||= "zmq+#{@topology}://#{host}/#{@type}"
@codec.decode(msg) do |event|
decorate(event)
output_queue << event
end
end