diff --git a/lib/logstash/inputs/eventlog.rb b/lib/logstash/inputs/eventlog.rb index 8c66e9372..3ff2891db 100644 --- a/lib/logstash/inputs/eventlog.rb +++ b/lib/logstash/inputs/eventlog.rb @@ -57,7 +57,8 @@ class LogStash::Inputs::EventLog < LogStash::Inputs::Base timestamp = to_timestamp(event.TimeGenerated) e = LogStash::Event.new( - "source" => "eventlog://#{@hostname}/#{@logfile}", + "host" => @hostname, + "path" => @logfile, "type" => @type, "@timestamp" => timestamp ) diff --git a/lib/logstash/inputs/exec.rb b/lib/logstash/inputs/exec.rb index ae26c0594..4cf176f35 100644 --- a/lib/logstash/inputs/exec.rb +++ b/lib/logstash/inputs/exec.rb @@ -34,6 +34,7 @@ class LogStash::Inputs::Exec < LogStash::Inputs::Base public def run(queue) + hostname = Socket.gethostname loop do start = Time.now @logger.info("Running exec", :command => @command) if @debug @@ -41,7 +42,7 @@ class LogStash::Inputs::Exec < LogStash::Inputs::Base # out.read will block until the process finishes. @codec.decode(out.read) do |event| decorate(event) - event["source"] = "exec://#{Socket.gethostname}" + event["host"] = hostname event["command"] = @command queue << event end diff --git a/lib/logstash/inputs/file.rb b/lib/logstash/inputs/file.rb index 3909a1c54..db7d130f5 100644 --- a/lib/logstash/inputs/file.rb +++ b/lib/logstash/inputs/file.rb @@ -127,10 +127,11 @@ class LogStash::Inputs::File < LogStash::Inputs::Base hostname = Socket.gethostname @tail.subscribe do |path, line| - source = "file://#{hostname}/#{path.gsub("\\","/")}" @logger.debug? && @logger.debug("Received line", :path => path, :line => line) @codec.decode(line) do |event| decorate(event) + event["host"] = hostname + event["path"] = path event["source"] = source queue << event end diff --git a/lib/logstash/inputs/ganglia.rb b/lib/logstash/inputs/ganglia.rb index 95573f0c3..1ec10ec49 100644 --- a/lib/logstash/inputs/ganglia.rb +++ b/lib/logstash/inputs/ganglia.rb @@ -67,11 +67,10 @@ class LogStash::Inputs::Ganglia < LogStash::Inputs::Base loop do packet, client = @udp.recvfrom(9000) - # Ruby uri sucks, so don't use it. - source = "ganglia://#{client[3]}/" - + # TODO(sissel): make this a codec... e = parse_packet(packet,source) unless e.nil? + e["host"] = client[3] # the IP address output_queue << e end end diff --git a/lib/logstash/inputs/gelf.rb b/lib/logstash/inputs/gelf.rb index 6356f7e61..cfc38f8b7 100644 --- a/lib/logstash/inputs/gelf.rb +++ b/lib/logstash/inputs/gelf.rb @@ -83,7 +83,7 @@ class LogStash::Inputs::Gelf < LogStash::Inputs::Base end event = LogStash::Event.new(JSON.parse(data)) - event["source"] = client[3] + event["host"] = client[3] if event["timestamp"].is_a?(Numeric) event["@timestamp"] = Time.at(event["timestamp"]).gmtime event.remove("timestamp") diff --git a/lib/logstash/inputs/generator.rb b/lib/logstash/inputs/generator.rb index a49c38858..5dd9eb15a 100644 --- a/lib/logstash/inputs/generator.rb +++ b/lib/logstash/inputs/generator.rb @@ -56,7 +56,6 @@ class LogStash::Inputs::Generator < LogStash::Inputs::Threadable def run(queue) number = 0 - source = "generator://#{@host}/" if @message == "stdin" @logger.info("Generator plugin reading a line from stdin") @@ -68,7 +67,7 @@ class LogStash::Inputs::Generator < LogStash::Inputs::Threadable @lines.each do |line| @codec.decode(line.clone) do |event| decorate(event) - event["source"] = source + event["host"] = @host event["sequence"] = number queue << event end @@ -78,7 +77,7 @@ class LogStash::Inputs::Generator < LogStash::Inputs::Threadable if @codec.respond_to?(:flush) @codec.flush do |event| - event["source"] = source + event["host"] = @hos queue << event end end diff --git a/lib/logstash/inputs/irc.rb b/lib/logstash/inputs/irc.rb index b8ee753ad..42b1fa55d 100644 --- a/lib/logstash/inputs/irc.rb +++ b/lib/logstash/inputs/irc.rb @@ -38,11 +38,6 @@ class LogStash::Inputs::Irc < LogStash::Inputs::Base # "#logstash". config :channels, :validate => :array, :required => true - - def initialize(*args) - super(*args) - end # def initialize - public def register require "cinch" diff --git a/lib/logstash/inputs/log4j.rb b/lib/logstash/inputs/log4j.rb index facdfa710..1e8f4495e 100644 --- a/lib/logstash/inputs/log4j.rb +++ b/lib/logstash/inputs/log4j.rb @@ -57,8 +57,8 @@ class LogStash::Inputs::Log4j < LogStash::Inputs::Base log4j_obj = ois.readObject event = LogStash::Event.new("message" => log4j_obj.getRenderedMessage, "source" => event_source) - event["source_host"] = socket.peer - event["source_path"] = log4j_obj.getLoggerName + event["host"] = socket.peer + event["path"] = log4j_obj.getLoggerName event["priority"] = log4j_obj.getLevel.toString event["logger_name"] = log4j_obj.getLoggerName event["thread"] = log4j_obj.getThreadName diff --git a/lib/logstash/inputs/pipe.rb b/lib/logstash/inputs/pipe.rb index ae05e037a..fa3f562b4 100644 --- a/lib/logstash/inputs/pipe.rb +++ b/lib/logstash/inputs/pipe.rb @@ -30,16 +30,17 @@ class LogStash::Inputs::Pipe < LogStash::Inputs::Base public def run(queue) - @pipe = IO.popen(command, mode="r") + @pipe = IO.popen(@command, mode="r") hostname = Socket.gethostname @pipe.each do |line| line = line.chomp - source = "pipe://#{hostname}/#{command}" - @logger.debug? && @logger.debug("Received line", :command => command, :line => line) + source = "pipe://#{hostname}/#{@command}" + @logger.debug? && @logger.debug("Received line", :command => @command, :line => line) @codec.decode(line) do |event| + event["host"] = hostname + event["command"] = @command decorate(event) - event["source"] = source queue << event end end diff --git a/lib/logstash/inputs/relp.rb b/lib/logstash/inputs/relp.rb index 95dc64864..d4244b137 100644 --- a/lib/logstash/inputs/relp.rb +++ b/lib/logstash/inputs/relp.rb @@ -39,12 +39,12 @@ class LogStash::Inputs::Relp < LogStash::Inputs::Base end # def register private - def relp_stream(relpserver,socket,output_queue,event_source) + def relp_stream(relpserver,socket,output_queue,client_address) loop do frame = relpserver.syslog_read(socket) @codec.decode(frame["message"]) do |event| decorate(event) - event["source"] = event_source + event["host"] = _addressevent_source output_queue << event end @@ -69,7 +69,7 @@ class LogStash::Inputs::Relp < LogStash::Inputs::Base peer = socket.peer @logger.debug("Relp Connection to #{peer} created") begin - relp_stream(rs,socket, output_queue,"relp://#{peer}") + relp_stream(rs,socket, output_queue, peer) rescue Relp::ConnectionClosed => e @logger.debug("Relp Connection to #{peer} Closed") rescue Relp::RelpError => e diff --git a/lib/logstash/inputs/snmptrap.rb b/lib/logstash/inputs/snmptrap.rb index 230d281ab..ad0680ddb 100644 --- a/lib/logstash/inputs/snmptrap.rb +++ b/lib/logstash/inputs/snmptrap.rb @@ -70,7 +70,7 @@ class LogStash::Inputs::Snmptrap < LogStash::Inputs::Base @snmptrap.on_trap_default do |trap| begin - event = LogStash::Event.new("message" => trap.inspect, "source" => trap.source_ip) + event = LogStash::Event.new("message" => trap.inspect, "host" => trap.source_ip) trap.each_varbind do |vb| event[vb.name.to_s] = vb.value.to_s end diff --git a/lib/logstash/inputs/stdin.rb b/lib/logstash/inputs/stdin.rb index aa313fb00..667291a64 100644 --- a/lib/logstash/inputs/stdin.rb +++ b/lib/logstash/inputs/stdin.rb @@ -25,7 +25,7 @@ class LogStash::Inputs::Stdin < LogStash::Inputs::Base data = $stdin.sysread(16384) @codec.decode(data) do |event| decorate(event) - event["source"] = @host + event["host"] = @host queue << event end rescue EOFError, LogStash::ShutdownSignal diff --git a/lib/logstash/inputs/syslog.rb b/lib/logstash/inputs/syslog.rb index 77bb8904a..09fa5fa86 100644 --- a/lib/logstash/inputs/syslog.rb +++ b/lib/logstash/inputs/syslog.rb @@ -117,10 +117,9 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base loop do payload, client = @udp.recvfrom(9000) # Ruby uri sucks, so don't use it. - source = "syslog://#{client[3]}/" @codec.decode(payload) do |event| decorate(event) - event["source"] = client[3] + event["host"] = client[3] syslog_relay(event) output_queue << event end @@ -142,16 +141,10 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base ip, port = client.peeraddr[3], client.peeraddr[1] @logger.info("new connection", :client => "#{ip}:#{port}") LogStash::Util::set_thread_name("input|syslog|tcp|#{ip}:#{port}}") - if ip.include?(":") # ipv6 - source = "syslog://[#{ip}]/" - else - source = "syslog://#{ip}/" - end - begin client.each do |line| @codec.decode(line) do |event| - event["source"] = ip + event["host"] = ip syslog_relay(event) output_queue << event end diff --git a/lib/logstash/inputs/tcp.rb b/lib/logstash/inputs/tcp.rb index 49e42c85f..9c8017fe3 100644 --- a/lib/logstash/inputs/tcp.rb +++ b/lib/logstash/inputs/tcp.rb @@ -96,7 +96,7 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base end # def register private - def handle_socket(socket, event_source, output_queue, codec) + def handle_socket(socket, client_address, output_queue, codec) while true buf = nil # NOTE(petef): the timeout only hits after the line is read @@ -111,7 +111,7 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base end codec.decode(buf) do |event| decorate(event) - event["source"] = event_source + event["host"] = client_address event["sslsubject"] = socket.peer_cert.subject if @ssl_enable && @ssl_verify output_queue << event end @@ -166,7 +166,7 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}") begin - handle_socket(s, "tcp://#{s.peer}/", output_queue, @codec.clone) + handle_socket(s, s.peer, output_queue, @codec.clone) rescue Interrupted s.close rescue nil end @@ -213,7 +213,7 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base end client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } @logger.debug("Opened connection", :client => "#{client_socket.peer}") - handle_socket(client_socket, "tcp://#{client_socket.peer}/server", output_queue) + handle_socket(client_socket, client_socket.peer, output_queue, @codec.clone) end # loop ensure client_socket.close diff --git a/lib/logstash/inputs/udp.rb b/lib/logstash/inputs/udp.rb index 4788e38f6..e363e76e6 100644 --- a/lib/logstash/inputs/udp.rb +++ b/lib/logstash/inputs/udp.rb @@ -60,7 +60,7 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base payload, client = @udp.recvfrom(@buffer_size) @codec.decode(payload) do |event| decorate(event) - event["source"] = "#{client[3]}:#{client[1]}" + event["host"] = client[3] output_queue << event end end diff --git a/lib/logstash/inputs/unix.rb b/lib/logstash/inputs/unix.rb index 5a91d0d7a..4cc594d11 100644 --- a/lib/logstash/inputs/unix.rb +++ b/lib/logstash/inputs/unix.rb @@ -1,5 +1,6 @@ require "logstash/inputs/base" require "logstash/namespace" +require "socket" # Read events over a UNIX socket. # @@ -64,8 +65,9 @@ class LogStash::Inputs::Unix < LogStash::Inputs::Base end # def register private - def handle_socket(socket, output_queue, event_source) + def handle_socket(socket, output_queue) begin + hostname = Socket.gethostname loop do buf = nil # NOTE(petef): the timeout only hits after the line is read @@ -80,7 +82,8 @@ class LogStash::Inputs::Unix < LogStash::Inputs::Base end @codec.decode(buf) do |event| decorate(event) - event["source"] = event_source + event["host"] = hostname + event["path"] = @path output_queue << e end end # loop do @@ -119,7 +122,7 @@ class LogStash::Inputs::Unix < LogStash::Inputs::Base @logger.debug("Accepted connection", :server => "#{@path}") begin - handle_socket(s, output_queue, "unix://#{@path}/") + handle_socket(s, output_queue) rescue Interrupted s.close rescue nil end @@ -142,8 +145,8 @@ class LogStash::Inputs::Unix < LogStash::Inputs::Base loop do client_socket = UNIXSocket.new(@path) client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } - @logger.debug("Opened connection", :client => "#{@path}") - handle_socket(client_socket, output_queue, "unix://#{@path}/server") + @logger.debug("Opened connection", :client => @path) + handle_socket(client_socket, output_queue) end # loop end end # def run diff --git a/lib/logstash/inputs/varnishlog.rb b/lib/logstash/inputs/varnishlog.rb index 5505300a3..ecfdc0c92 100644 --- a/lib/logstash/inputs/varnishlog.rb +++ b/lib/logstash/inputs/varnishlog.rb @@ -10,7 +10,6 @@ class LogStash::Inputs::Varnishlog < LogStash::Inputs::Threadable public def register require 'varnish' - @source = "varnishlog://#{Socket.gethostname}/" @vd = Varnish::VSM.VSM_New Varnish::VSL.VSL_Setup(@vd) Varnish::VSL.VSL_Open(@vd, 1) @@ -19,6 +18,7 @@ class LogStash::Inputs::Varnishlog < LogStash::Inputs::Threadable def run(queue) @q = queue + @hostname = Socket.gethostname Varnish::VSL.VSL_Dispatch(@vd, self.method(:cb).to_proc, FFI::MemoryPointer.new(:pointer)) end # def run @@ -26,7 +26,7 @@ class LogStash::Inputs::Varnishlog < LogStash::Inputs::Threadable def cb(priv, tag, fd, len, spec, ptr, bitmap) begin str = ptr.read_string(len) - event = LogStash::Event.new("message" => str, "source" => @source) + event = LogStash::Event.new("message" => str, "host" => @host) event["varnish_tag"] = tag event["varnish_fd"] = fd event["varnish_spec"] = spec diff --git a/lib/logstash/inputs/wmi.rb b/lib/logstash/inputs/wmi.rb index 03213d943..a8891325b 100644 --- a/lib/logstash/inputs/wmi.rb +++ b/lib/logstash/inputs/wmi.rb @@ -32,7 +32,7 @@ class LogStash::Inputs::WMI < LogStash::Inputs::Base def register @host = Socket.gethostname - @logger.info("Registering input wmi://#{@host}/#{@query}") + @logger.info("Registering wmi input", :query => @query) if RUBY_PLATFORM == "java" # make use of the same fix used for the eventlog input @@ -53,7 +53,8 @@ class LogStash::Inputs::WMI < LogStash::Inputs::Base @wmi.ExecQuery(@query).each do |wmiobj| # create a single event for all properties in the collection event = LogStash::Event.new - event["source"] = @host + event["host"] = @host + decorate(event) wmiobj.Properties_.each do |prop| event[prop.name] = prop.value end diff --git a/lib/logstash/inputs/xmpp.rb b/lib/logstash/inputs/xmpp.rb index e6beae8d3..367060f90 100644 --- a/lib/logstash/inputs/xmpp.rb +++ b/lib/logstash/inputs/xmpp.rb @@ -63,11 +63,11 @@ class LogStash::Inputs::Xmpp < LogStash::Inputs::Base end # if @rooms @client.add_message_callback do |msg| # handle direct/private messages - source = "xmpp://#{msg.from.node}@#{msg.from.domain}/#{msg.from.resource}" - # accept normal msgs (skip presence updates, etc) if msg.body != nil @codec.decode(msg.body) do |event| + # Maybe "from" should just be a hash: + # { "node" => ..., "domain" => ..., "resource" => ... } event["from"] = "#{msg.from.node}@#{msg.from.domain}/#{msg.from.resource}" queue << event end diff --git a/lib/logstash/inputs/zenoss.rb b/lib/logstash/inputs/zenoss.rb index 2f5fafbb0..19b339001 100644 --- a/lib/logstash/inputs/zenoss.rb +++ b/lib/logstash/inputs/zenoss.rb @@ -73,16 +73,16 @@ class LogStash::Inputs::Zenoss < LogStash::Inputs::RabbitMQ next unless summary.occurrence.length > 0 occurrence = summary.occurrence[0] - timestamp = DateTime.strptime(occurrence.created_time.to_s, "%Q").to_s + #timestamp = DateTime.strptime(occurrence.created_time.to_s, "%Q").to_s + timestamp = Time.at(occurrence.created_time / 1000.0) # LogStash event properties. - event = LogStash::Event.new({ - "@source" => @rabbitmq_url, - "@type" => @type, + event = LogStash::Event.new( "@timestamp" => timestamp, - "@source_host" => occurrence.actor.element_title, - "@message" => occurrence.message, - }) + "type" => @type, + "host" => occurrence.actor.element_title, + "message" => occurrence.message, + ) # Direct mappings from summary. %w{uuid}.each do |property| diff --git a/lib/logstash/inputs/zeromq.rb b/lib/logstash/inputs/zeromq.rb index 8f231ff98..bf829922d 100644 --- a/lib/logstash/inputs/zeromq.rb +++ b/lib/logstash/inputs/zeromq.rb @@ -140,9 +140,9 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base @logger.debug("ZMQ receiving", :event => m2) msg = m2 end - @sender ||= "zmq+#{@topology}://#{host}/#{@type}" @codec.decode(msg) do |event| + event["host"] = host decorate(event) output_queue << event end