- Messages sent from parser -> indexer that came from an IndexEventRequest now

get pushed into a queue that is managed by a separate thread. This is
  necessary to prevent sendmsg() from blocking the main AMQP reader thread when
  sendmsg() might block due to sliding window closure.

  This queue length is unchecked, however, the correct fix is to unsubscribe
  from the input (the AMQP queue) and only resubscribe once our queue has
  cleared a bit.
This commit is contained in:
Jordan Sissel 2009-11-06 10:16:44 +00:00
parent 5e51250c20
commit c234e01f38
2 changed files with 23 additions and 3 deletions

View file

@ -33,7 +33,7 @@ module LogStash; module Net; module Clients
ier.metadata["source_host"] = @hostname
@logger.debug "Indexing #{type}: #{string}"
#ier.want_buffer(true)
ier.want_buffer(true)
sendmsg("logstash", ier)
end # def index

View file

@ -1,4 +1,3 @@
require 'rubygems'
require 'ferret'
require 'lib/config/indexer.rb'
@ -23,6 +22,25 @@ module LogStash; module Net; module Servers
@lines = Hash.new { |h,k| h[k] = 0 }
@indexcount = 0
@starttime = Time.now
#@indexerqueue = SizedQueue.new(MQRPC::Agent::MAXMESSAGEWAIT)
@indexerqueue = Queue.new
start_indexer_forwarder
end
def start_indexer_forwarder
Thread.new do
# TODO(sissel): If the queue exceeds a certain size,
# we should unsubscribe from the 'logstash' queue to indicate
# we cannot handle any more capacity at this time. Once the
# queue shrinks, we can re-subscribe.
while true do
request = @indexerqueue.pop
sendmsg("logstash-index", request)
end
end
end
def IndexEventRequestHandler(request)
@ -57,7 +75,9 @@ module LogStash; module Net; module Servers
# Now we have a hash for the log data, send it to the indexer
request.log_data = entry
sendmsg("logstash-index", request)
# Push our message onto the queue
@indexerqueue << request
end
def IndexEventResponseHandler(response)