diff --git a/lib/net/servers/indexer.rb b/lib/net/servers/indexer.rb index 8723fa889..1990f81fc 100644 --- a/lib/net/servers/indexer.rb +++ b/lib/net/servers/indexer.rb @@ -21,11 +21,15 @@ module LogStash; module Net; module Servers listen(addr, port) @indexes = Hash.new @lines = Hash.new { |h,k| h[k] = 0 } + @indexcount = 0 end def IndexEventRequestHandler(request) response = LogStash::Net::Messages::IndexEventResponse.new response.id = request.id + @indexcount += 1 + + print "\rK#{@indexcount}" log_type = request.log_type entry = $logs[log_type].parse_entry(request.log_data) @@ -63,17 +67,26 @@ module LogStash; module Net; module Servers :or_default => false) query = qp.parse(request.query) results = [] - search.search_each(query, :limit => :all, - :sort => "@DATE") do |docid, score| - result = reader[docid][:@LINE] - results << result - if results.length > 10 - response = LogStash::Net::Messages::SearchResponse.new - response.id = request.id - response.results = results - yield response - results = [] + offset = 0 + limit = 50 + + done = false + while !done + done = true + puts "Searching..." + search.search_each(query, :limit => limit, :offset => offset, + :sort => "@DATE") do |docid, score| + done = false + result = reader[docid][:@LINE] + results << result end + + response = LogStash::Net::Messages::SearchResponse.new + response.id = request.id + response.results = results + yield response + results = [] + offset += limit end response = LogStash::Net::Messages::SearchResponse.new response.id = request.id diff --git a/lib/net/socketmux.rb b/lib/net/socketmux.rb index 28c2fd343..70678687c 100644 --- a/lib/net/socketmux.rb +++ b/lib/net/socketmux.rb @@ -246,7 +246,7 @@ module LogStash; module Net def client_handle(sock) begin @msgreaders[sock].each do |msg| - message_handle(msg) do |response| + message_handle(msg, sock) do |response| _sendmsg(response, sock) end end @@ -259,7 +259,7 @@ module LogStash; module Net end # def client_handle private - def message_handle(msg) + def message_handle(msg, sock) if msg.is_a?(ResponseMessage) and @ackwait.include?(msg.id) @ackwait.delete(msg.id) end @@ -267,8 +267,11 @@ module LogStash; module Net msgtype = msg.class.name.split(":")[-1] handler = "#{msgtype}Handler" if self.respond_to?(handler) - self.send(handler, msg) do |reply| - yield reply if reply != nil + Thread.new do + self.send(handler, msg) do |reply| + #yield reply if reply != nil + sendmsg(reply, sock) + end end else $stderr.puts "No handler for message class '#{msg.class.name}'"