diff --git a/bin/agent.rb b/bin/agent.rb index 081350a29..b12571b75 100755 --- a/bin/agent.rb +++ b/bin/agent.rb @@ -4,36 +4,39 @@ require 'rubygems' require 'lib/net/client' require 'lib/net/messages/indexevent' require 'lib/file/tail/since' +require 'stomp' require 'socket' class Agent < LogStash::Net::MessageClient def initialize(host, port) - super(host, port) + super(username="", password="", host=host, port=port) @hostname = Socket.gethostname - @host = host - @port = port - @watcher = nil - - # TODO(sissel): This should go into the network code - @needack = Hash.new - start_log_watcher end # def initialize def start_log_watcher - @t1 = Thread.new do - File::Tail::Since.new("/var/log/messages").tail do |line| - line.chomp! - index("linux-syslog", line) - end - end + #@t1 = Thread.new do + #File::Tail::Since.new("/var/log/messages").tail do |line| + #line.chomp! + #index("linux-syslog", line) + #end + ##end @t2 = Thread.new do - File::Tail::Since.new("/b/access").tail do |line| - line.chomp! - index("httpd-access", line) + #File::Tail::Since.new("/b/access.10").tail do |line| + begin + count = 0 + File.open("/b/access.1k").readlines.each do |line| + line.chomp! + index("httpd-access", line) + count += 1 + break if count >= 3 + end + rescue => e + $stderr.puts e.inspect + $stderr.puts caller.join("\n") + raise e end - exit end end # def start_log_watcher @@ -43,23 +46,19 @@ class Agent < LogStash::Net::MessageClient ier.log_data = string ier.metadata["source_host"] = @hostname - #$stdout.write(".") - #$stdout.flush - @connection.sendmsg(ier) - @needack[ier.id] = ier - - sleeptime = 0.1 - while @needack.length > 500 - sleeptime = [sleeptime * 2, 5].min - $stderr.puts "Waiting for acks (#{sleeptime})... #{@needack.length}" - sleep(sleeptime) - end - + puts "Sending: #{ier}" + sendmsg("/queue/logstash", ier) end # def index def IndexEventResponseHandler(msg) - @needack.delete(msg.id) + puts "OK" end # def IndexEventResponseHandler + + def run + start_log_watcher + @t2.join + super + end end @@ -70,8 +69,5 @@ if $0 == __FILE__ end host, port = ARGV[0].split(":") agent = Agent.new(host, port) - - agent.run do |i| - # nothing - end + agent.run end diff --git a/bin/logstashd.rb b/bin/logstashd.rb index 81a32c929..587e4506c 100644 --- a/bin/logstashd.rb +++ b/bin/logstashd.rb @@ -4,7 +4,6 @@ require "rubygems" require "lib/net/servers/indexer" -s = LogStash::Net::Servers::Indexer.new -s.run do |i| - puts "OK" -end +s = LogStash::Net::Servers::Indexer.new(host="snack.home") +s.run + diff --git a/lib/net/socket.rb b/lib/net/socket.rb index e64bb2206..f1e08931f 100644 --- a/lib/net/socket.rb +++ b/lib/net/socket.rb @@ -26,31 +26,24 @@ module LogStash; module Net end # def initialize def subscribe(name) - puts "Subscribing to #{name}" + #puts "Subscribing to #{name}" @stomp.subscribe("/queue/#{name}", @stomp_options) do |stompmsg| obj = JSON::load(stompmsg.body) message = Message.new_from_data(obj) name = message.class.name.split(":")[-1] func = "#{name}Handler" - puts stompmsg + #puts stompmsg if @handler.respond_to?(func) - puts "Handler found" + #puts "Handler found" @handler.send(func, message) do |response| - puts "response: #{response}" + #puts "response: #{response}" sendmsg(stompmsg.headers["reply-to"], response) end # We should allow the message handler to defer acking if they want # For instance, if we want to index things, but only want to ack # things once we actually flush to disk. - puts "Acking message: #{stompmsg}" - begin - @stomp.acknowledge stompmsg - rescue => e - puts e.inspect - raise e - end - puts "Ack done" + @stomp.acknowledge stompmsg else $stderr.puts "#{@handler.class.name} does not support #{func}" end # if @handler.respond_to?(func) @@ -66,7 +59,7 @@ module LogStash; module Net options = { "persistent" => true, "reply-to" => "/queue/#{@id}", - #"ack" => "client", + "ack" => "client", } @stomp.send(destination, data, options) end @@ -74,5 +67,9 @@ module LogStash; module Net def handler=(handler) @handler = handler end + + def close + @stomp.close + end end # class MessageSocket end; end # module LogStash::Net diff --git a/sandbox/searchclient.rb b/sandbox/searchclient.rb index 13c4ae97f..6daca3841 100644 --- a/sandbox/searchclient.rb +++ b/sandbox/searchclient.rb @@ -3,7 +3,7 @@ require 'rubygems' require "socket" require "lib/net/message" -require "lib/net/socketmux" +require "lib/net/client" require "lib/net/messages/indexevent" require "lib/net/messages/search" require "lib/net/messages/ping" @@ -15,43 +15,23 @@ $count = 0 $time = 0 $start = Time.now.to_f -class Client < LogStash::Net::MessageSocketMux - def gotresponse(msg) - $count += 1 - $ids.delete(msg.id) - - if $done and $ids.length == 0 - puts "All messages ACK'd (#{$lastid})" - exit(0) - end - end - +class Client < LogStash::Net::MessageClient def SearchResponseHandler(msg) #puts "Response (have #{$count} / want: #{$ids.length} acks); #{msg.inspect}" msg.results.each do |result| puts result end if msg.finished - $done = true + close end - #gotresponse(msg) end end -$me = Client.new -$me.connect("localhost", 3001) -$ids = Set.new +$me = Client.new(host="localhost", port=61613) msg = LogStash::Net::Messages::SearchRequest.new msg.log_type = ARGV[0] msg.query = ARGV[1] +$me.sendmsg("/queue/logstash", msg) -msg = LogStash::Net::Messages::SearchRequest.new -msg.log_type = ARGV[0] -msg.query = ARGV[1] -$me.sendmsg(msg) -$me.close() - -while !$done - $me.sendrecv(10) -end +$me.run