diff --git a/bin/agent.rb b/bin/agent.rb index e83e1018c..bf7b69b98 100755 --- a/bin/agent.rb +++ b/bin/agent.rb @@ -17,30 +17,25 @@ class Agent < LogStash::Net::MessageClient def start_log_watcher #@t1 = Thread.new do - #File::Tail::Since.new("/var/log/messages").tail do |line| + #File::Tail::Since.new("/b/logs/auth.log.scorn").tail do |line| #line.chomp! #index("linux-syslog", line) #end - ##end + #end @t2 = Thread.new do - #File::Tail::Since.new("/b/access.10").tail do |line| - begin + File::Tail::Since.new("/b/access.10k").tail do |line| count = 0 - File.open("/b/access.1k").readlines.each do |line| - line.chomp! - index("httpd-access", line) - count += 1 - #break if count >= 3 + line.chomp! + count += 1 + if count % 1000 == 0 + #sleep 1 + puts count end - rescue => e - $stderr.puts e.inspect - $stderr.puts caller.join("\n") - raise e + index("httpd-access", line) + #break if count >= 1 end - #close end - @t2.join end # def start_log_watcher def index(type, string) @@ -49,11 +44,13 @@ class Agent < LogStash::Net::MessageClient ier.log_data = string ier.metadata["source_host"] = @hostname - puts "Sending: #{ier}" + #puts "Sending: #{ier}" sendmsg("/queue/logstash", ier) end # def index def IndexEventResponseHandler(msg) + return if msg.code == 0 + puts msg.inspect end # def IndexEventResponseHandler def run @@ -68,6 +65,7 @@ if $0 == __FILE__ puts "Usage: #{$0} host:port" exit 1 end + Thread::abort_on_exception = true host, port = ARGV[0].split(":") agent = Agent.new(host, port) agent.run diff --git a/bin/logstashd.rb b/bin/logstashd.rb index 587e4506c..8fa4c3987 100644 --- a/bin/logstashd.rb +++ b/bin/logstashd.rb @@ -4,6 +4,7 @@ require "rubygems" require "lib/net/servers/indexer" +Thread::abort_on_exception = true s = LogStash::Net::Servers::Indexer.new(host="snack.home") s.run diff --git a/bin/search.rb b/bin/search.rb old mode 100755 new mode 100644 index 74657b3b9..c6dab27c8 --- a/bin/search.rb +++ b/bin/search.rb @@ -1,18 +1,40 @@ #!/usr/bin/ruby - +# require 'rubygems' -require 'ferret' -require 'json' +require "socket" +require "lib/net/message" +require "lib/net/client" +require "lib/net/messages/indexevent" +require "lib/net/messages/search" +require "lib/net/messages/ping" +require "set" -include Ferret -include Ferret::Search +$done = false +$lastid = nil +$count = 0 +$time = 0 +$start = Time.now.to_f -reader = Index::IndexReader.new(ARGV[0]) -search = Searcher.new(reader) -qp = QueryParser.new(:fields => reader.fields, - :tokenized_fields => reader.tokenized_fields, - :or_default => false) -query = qp.parse(ARGV[1]) -search.search_each(query, :limit => :all, :sort => "@DATE") do |id, score| - puts "#{reader[id][:@SOURCE_HOST]} | #{reader[id][:@LOG_NAME]} | #{reader[id][:@LINE]}" +class Client < LogStash::Net::MessageClient + def SearchResponseHandler(msg) + msg.results.each do |result| + puts result + end + if msg.finished + close + end + end +end + +def main(args) + client = Client.new(host="localhost", port=61613) + msg = LogStash::Net::Messages::SearchRequest.new + msg.log_type = args[0] + msg.query = args[1] + client.sendmsg("/queue/logstash", msg) + client.run +end + +if $0 == __FILE__ + exit main(ARGV) end diff --git a/lib/net/message.rb b/lib/net/message.rb index 4102b15b5..1d1026f27 100644 --- a/lib/net/message.rb +++ b/lib/net/message.rb @@ -1,5 +1,6 @@ require "json" -#require "lib/net/messagestream" +# vim macro to replace 'hashbind :foo, "bar"' with two methods. +# yypkct:def lxf,s return @data[A]oend def Jdt:xf,s(val) return @data[A] = val end module BindToHash def hashbind(method, key) @@ -40,7 +41,13 @@ module LogStash; module Net @@translators = Array.new # Message attributes - hashbind :id, "/id" + def id + return @data["id"] + end + + def id=(val) + return @data["id"] = val + end # All message subclasses should register themselves here # This will allow Message.new_from_data to automatically return @@ -78,6 +85,11 @@ module LogStash; module Net class RequestMessage < Message @@idseq = 0 + def initialize + super + generate_id! + end + Message.translators << self def self.can_process?(data) return data.has_key?("request") @@ -94,8 +106,21 @@ module LogStash; module Net end # Message attributes - hashbind :name, "/request" - hashbind :args, "/args" + def name + return @data["request"] + end + + def name=(val) + return @data["request"] = val + end + + def args + return @data["args"] + end + + def args=(val) + return @data["args"] = val + end end # class RequestMessage class ResponseMessage < RequestMessage @@ -105,7 +130,13 @@ module LogStash; module Net end # Message attributes - hashbind :name, "/response" + def name + return @data["response"] + end + + def name=(val) + return @data["response"] = val + end # Report the success of the request this response is for. # Should be implemented by subclasses diff --git a/lib/net/messages/indexevent.rb b/lib/net/messages/indexevent.rb index de7d005f5..1ec02ea75 100644 --- a/lib/net/messages/indexevent.rb +++ b/lib/net/messages/indexevent.rb @@ -15,9 +15,28 @@ module LogStash; module Net; module Messages end # Message attributes - hashbind :log_type, "/args/type" - hashbind :log_data, "/args/message" - hashbind :metadata, "/args/metadata" + def log_type + return @data["args"]["type"] + end + + def log_type=(val) + return @data["args"]["type"] = val + end + def log_data + return @data["args"]["message"] + end + + def log_data=(val) + return @data["args"]["message"] = val + end + + def metadata + return @data["args"]["metadata"] + end + + def metadata=(val) + return @data["args"]["metadata"] = val + end end # class IndexEventRequest class IndexEventResponse < ResponseMessage diff --git a/lib/net/servers/indexer.rb b/lib/net/servers/indexer.rb index 2e71f2ca2..4c37dcd99 100644 --- a/lib/net/servers/indexer.rb +++ b/lib/net/servers/indexer.rb @@ -9,10 +9,9 @@ require 'ferret' require 'lib/log/text' require 'config' - module LogStash; module Net; module Servers class Indexer < LogStash::Net::MessageServer - SYNCDELAY = 10 + SYNCDELAY = 3 def initialize(*args) # 'super' is not the same as 'super()', and we want super(). @@ -23,11 +22,6 @@ module LogStash; module Net; module Servers @starttime = Time.now end - def run - subscribe("logstash") - super - end - def IndexEventRequestHandler(request) response = LogStash::Net::Messages::IndexEventResponse.new response.id = request.id @@ -43,16 +37,24 @@ module LogStash; module Net; module Servers if !entry response.code = 1 response.error = "Entry was #{entry.inspect} (log parsing failed)" + entry = { + "@NEEDSPARSING" => 1, + "@LINE" => request.log_data + } else response.code = 0 - if not @indexes.member?(log_type) - @indexes[log_type] = $logs[log_type].get_index - end - - entry["@LOG_TYPE"] = log_type - @indexes[log_type] << entry end - yield response + + if not @indexes.member?(log_type) + @indexes[log_type] = $logs[log_type].get_index + end + + entry["@LOG_TYPE"] = log_type + @indexes[log_type] << entry + + if response.code != 0 + yield response + end end def PingRequestHandler(request) @@ -103,6 +105,7 @@ module LogStash; module Net; module Servers response = LogStash::Net::Messages::SearchResponse.new response.id = request.id response.results = results + response.finished = false yield response results = [] offset += limit @@ -115,29 +118,25 @@ module LogStash; module Net; module Servers end # Special 'run' override because we want sync to disk once per minute. - def _run - synctime = Time.now + SYNCDELAY - sleeptime = 1 - loop do - active = sendrecv(sleeptime) - if !active - sleeptime *= 2 - if sleeptime > SYNCDELAY - sleeptime = SYNCDELAY - end - puts "No activity, sleeping for #{sleeptime}" - end + def run + subscribe("logstash") + @syncer = Thread.new { syncer } + super + end # def run + def syncer + synctime = Time.now + SYNCDELAY + loop do if Time.now > synctime @indexes.each do |log_type,index| puts "Time's up. Syncing #{log_type}" index.commit end - synctime = Time.now + 60 + synctime = Time.now + SYNCDELAY end + sleep(synctime - Time.now) end - end # def run - + end # def syncer end # Indexer end; end; end # LogStash::Net::Server diff --git a/lib/net/socket.rb b/lib/net/socket.rb index 6d40aa609..54e2fefe2 100644 --- a/lib/net/socket.rb +++ b/lib/net/socket.rb @@ -5,11 +5,14 @@ require 'lib/net/messagepacket' require 'uuid' require 'stomp' +USE_MARSHAL = false + module LogStash; module Net # The MessageClient class exists only as an alias # to the MessageSocketMux. You should use the # client class if you are implementing a client. class MessageSocket + MAXBUF = 30 def initialize(username='', password='', host='localhost', port=61613) @stomp = Stomp::Client.new(login=username, passcode=password, @@ -34,15 +37,22 @@ module LogStash; module Net end # def subscribe def handle_message(stompmsg) - obj = JSON::load(stompmsg.body) - if !obj.is_a?(Array) - obj = [obj] + if USE_MARSHAL + obj = Marshal.load(stompmsg.body) + else + obj = JSON::load(stompmsg.body) + if !obj.is_a?(Array) + obj = [obj] + end end #puts "Got #{obj.length} items" obj.each do |item| - #puts item.inspect - message = Message.new_from_data(item) + if USE_MARSHAL + message = item + else + message = Message.new_from_data(item) + end name = message.class.name.split(":")[-1] func = "#{name}Handler" #puts stompmsg @@ -83,7 +93,11 @@ module LogStash; module Net msgs = @outbuffer[destination] return if msgs.length == 0 - data = msgs.to_json + if USE_MARSHAL + data = Marshal.dump(msgs) + else + data = msgs.to_json + end options = { "persistent" => true, "reply-to" => "/queue/#{@id}", @@ -94,10 +108,13 @@ module LogStash; module Net end def sendmsg(destination, msg) + if msg.is_a?(RequestMessage) + msg.generate_id! + end #puts "Sending to #{destination}: #{msg}" @outbuffer[destination] << msg - if @outbuffer[destination].length > 10 + if @outbuffer[destination].length > MAXBUF flushout(destination) end end diff --git a/sandbox/searchclient.rb b/sandbox/searchclient.rb deleted file mode 100644 index 6daca3841..000000000 --- a/sandbox/searchclient.rb +++ /dev/null @@ -1,37 +0,0 @@ -#!/usr/bin/ruby -# -require 'rubygems' -require "socket" -require "lib/net/message" -require "lib/net/client" -require "lib/net/messages/indexevent" -require "lib/net/messages/search" -require "lib/net/messages/ping" -require "set" - -$done = false -$lastid = nil -$count = 0 -$time = 0 -$start = Time.now.to_f - -class Client < LogStash::Net::MessageClient - def SearchResponseHandler(msg) - #puts "Response (have #{$count} / want: #{$ids.length} acks); #{msg.inspect}" - msg.results.each do |result| - puts result - end - if msg.finished - close - end - end -end - -$me = Client.new(host="localhost", port=61613) - -msg = LogStash::Net::Messages::SearchRequest.new -msg.log_type = ARGV[0] -msg.query = ARGV[1] -$me.sendmsg("/queue/logstash", msg) - -$me.run