mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
- Call the message handler in a separate thread so we can stream responses (if
there are multiple responses) back to the requestor as they are generated. This allows a large search to stream data back while the search occurs. - Do searching in chunks of 50 results at a time so we can send results back to to the requesting client in parallel with the remainder of the search. - Emit count of messages seen for debugging (will remove later)
This commit is contained in:
parent
96e4881de2
commit
788781bf22
2 changed files with 30 additions and 14 deletions
|
@ -21,11 +21,15 @@ module LogStash; module Net; module Servers
|
|||
listen(addr, port)
|
||||
@indexes = Hash.new
|
||||
@lines = Hash.new { |h,k| h[k] = 0 }
|
||||
@indexcount = 0
|
||||
end
|
||||
|
||||
def IndexEventRequestHandler(request)
|
||||
response = LogStash::Net::Messages::IndexEventResponse.new
|
||||
response.id = request.id
|
||||
@indexcount += 1
|
||||
|
||||
print "\rK#{@indexcount}"
|
||||
|
||||
log_type = request.log_type
|
||||
entry = $logs[log_type].parse_entry(request.log_data)
|
||||
|
@ -63,17 +67,26 @@ module LogStash; module Net; module Servers
|
|||
:or_default => false)
|
||||
query = qp.parse(request.query)
|
||||
results = []
|
||||
search.search_each(query, :limit => :all,
|
||||
:sort => "@DATE") do |docid, score|
|
||||
result = reader[docid][:@LINE]
|
||||
results << result
|
||||
if results.length > 10
|
||||
response = LogStash::Net::Messages::SearchResponse.new
|
||||
response.id = request.id
|
||||
response.results = results
|
||||
yield response
|
||||
results = []
|
||||
offset = 0
|
||||
limit = 50
|
||||
|
||||
done = false
|
||||
while !done
|
||||
done = true
|
||||
puts "Searching..."
|
||||
search.search_each(query, :limit => limit, :offset => offset,
|
||||
:sort => "@DATE") do |docid, score|
|
||||
done = false
|
||||
result = reader[docid][:@LINE]
|
||||
results << result
|
||||
end
|
||||
|
||||
response = LogStash::Net::Messages::SearchResponse.new
|
||||
response.id = request.id
|
||||
response.results = results
|
||||
yield response
|
||||
results = []
|
||||
offset += limit
|
||||
end
|
||||
response = LogStash::Net::Messages::SearchResponse.new
|
||||
response.id = request.id
|
||||
|
|
|
@ -246,7 +246,7 @@ module LogStash; module Net
|
|||
def client_handle(sock)
|
||||
begin
|
||||
@msgreaders[sock].each do |msg|
|
||||
message_handle(msg) do |response|
|
||||
message_handle(msg, sock) do |response|
|
||||
_sendmsg(response, sock)
|
||||
end
|
||||
end
|
||||
|
@ -259,7 +259,7 @@ module LogStash; module Net
|
|||
end # def client_handle
|
||||
|
||||
private
|
||||
def message_handle(msg)
|
||||
def message_handle(msg, sock)
|
||||
if msg.is_a?(ResponseMessage) and @ackwait.include?(msg.id)
|
||||
@ackwait.delete(msg.id)
|
||||
end
|
||||
|
@ -267,8 +267,11 @@ module LogStash; module Net
|
|||
msgtype = msg.class.name.split(":")[-1]
|
||||
handler = "#{msgtype}Handler"
|
||||
if self.respond_to?(handler)
|
||||
self.send(handler, msg) do |reply|
|
||||
yield reply if reply != nil
|
||||
Thread.new do
|
||||
self.send(handler, msg) do |reply|
|
||||
#yield reply if reply != nil
|
||||
sendmsg(reply, sock)
|
||||
end
|
||||
end
|
||||
else
|
||||
$stderr.puts "No handler for message class '#{msg.class.name}'"
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue