diff --git a/bin/search.rb b/bin/search.rb index 7325d56fb..a5b4f1df5 100755 --- a/bin/search.rb +++ b/bin/search.rb @@ -19,68 +19,68 @@ $count = 0 $time = 0 $start = Time.now.to_f -class Client < LogStash::Net::MessageClient +class SearchClient < LogStash::Net::MessageClient attr_accessor :indexers attr_reader :hits attr_reader :responding attr_reader :results - def initialize(*args) + def initialize(opts) + @log_type = opts[:log_type] + @query = opts[:query] @indexers = Array.new @responding = Array.new @hits = 0 @results = [] - super(*args) + @result_mutex = Mutex.new + super(opts) + start + end + + def start + # find our indexers + msg = LogStash::Net::Messages::DirectoryRequest.new + sendmsg("logstash-directory", msg) + run end def SearchResponseHandler(msg) - msg.results.each do |result| - @results << result - end - if msg.finished - @responding << msg.indexer_id - if @responding.length == @indexers.length - close + @result_mutex.synchronize do + msg.results.each do |result| + @results << result + end + if msg.finished + @responding << msg.indexer_id + if @responding.length == @indexers.length + close + end end end end def SearchHitsResponseHandler(msg) - @hits += msg.hits + @result_mutex.synchronize do + @hits += msg.hits + end end def DirectoryResponseHandler(msg) + hits_msg = LogStash::Net::Messages::SearchHitsRequest.new + hits_msg.log_type = @log_type + hits_msg.query = @query + search_msg = LogStash::Net::Messages::SearchRequest.new + search_msg.log_type = @log_type + search_msg.query = @query @indexers = msg.indexers - close + @indexers.each do |i| + sendmsg(i, hits_msg) + sendmsg(i, search_msg) + end end end def main(args) - client = Client.new - - # Find out what indexers are out there - msg = LogStash::Net::Messages::DirectoryRequest.new - client.sendmsg("logstash-directory", msg) - puts "about to .run" - client.run - puts "back from client.run" - indexers = client.indexers - - # Send queries to each indexer and collect the results - client = Client.new - client.indexers = indexers - hits_msg = LogStash::Net::Messages::SearchHitsRequest.new - hits_msg.log_type = args[0] - hits_msg.query = args[1] - search_msg = LogStash::Net::Messages::SearchRequest.new - search_msg.log_type = args[0] - search_msg.query = args[1] - indexers.each do |indexer| - puts "Querying #{indexer}" - client.sendmsg("/queue/#{indexer}", hits_msg) - client.sendmsg("/queue/#{indexer}", search_msg) - end - client.run + client = SearchClient.new(:log_type => args[0], :query => args[1]) # Collate & print results. puts "Hits: #{client.hits}" diff --git a/lib/net/servers/indexer.rb b/lib/net/servers/indexer.rb index 36d9b7e39..f6e6f1764 100644 --- a/lib/net/servers/indexer.rb +++ b/lib/net/servers/indexer.rb @@ -156,7 +156,6 @@ module LogStash; module Net; module Servers end # def SearchHitsRequestHandler def BroadcastMessageHandler(request) - puts "Broadcast from indexer #{request.queue}" @indexers_mutex.synchronize do @indexers[request.queue] = Time.now end @@ -165,7 +164,6 @@ module LogStash; module Net; module Servers def DirectoryRequestHandler(request) response = LogStash::Net::Messages::DirectoryResponse.new response.indexers = @indexers.keys - puts "got directory request!" yield response end @@ -212,7 +210,6 @@ module LogStash; module Net; module Servers msg = LogStash::Net::Messages::BroadcastMessage.new msg.queue = @id loop do - puts "broadcasting" sendmsg_topic("logstash-broadcast", msg) sleep(BROADCAST_INTERVAL) @indexers_mutex.synchronize do diff --git a/lib/net/socket.rb b/lib/net/socket.rb index 3c440551d..a50500f98 100644 --- a/lib/net/socket.rb +++ b/lib/net/socket.rb @@ -69,8 +69,7 @@ module LogStash; module Net if @close # set by 'close' method # TODO: need a cleaner way to stop the event loop after our reply # actually gets sent. - puts "found @close!" - EM.add_event_timer(1) { EM.stop_event_loop } + EM.add_timer(1) { EM.stop_event_loop } end end # def handle_message @@ -78,7 +77,7 @@ module LogStash; module Net # Create connection to AMQP, and in turn, the main EventMachine loop. AMQP.start(:host => "localhost") do @mq = MQ.new - mq_q = @mq.queue(@id, :exclusive => true, :auto_delete => true) + mq_q = @mq.queue(@id, :auto_delete => true) mq_q.subscribe(:ack =>true) { |hdr, msg| handle_message(hdr, msg) } handle_new_subscriptions @@ -103,9 +102,9 @@ module LogStash; module Net todo.each do |topic| #puts "Subscribing to topic #{topic}" exchange = @mq.topic("amq.topic") - mq_q = @mq.queue("#{@id}-#{topic}").bind(exchange, :key => topic, - :exclusive => true, - :auto_delete => true) + mq_q = @mq.queue("#{@id}-#{topic}", + :exclusive => true, + :auto_delete => true).bind(exchange, :key => topic) mq_q.subscribe { |hdr, msg| handle_message(hdr, msg) } @topics << topic end # todo.each