* In client ack mode, StompServer will only send one message at a time, waiting

for an ack for that message before sending another. Work around this:
  - batch up messages to sendmsg() and flush when there are more than 10 in the
    queue or there has been more than 1 second since flushing and the queue is
    non-empty

This increases the indexing rate by a factor of 6.
This commit is contained in:
Jordan Sissel 2009-09-06 23:38:04 +00:00
parent 1e33e3af19
commit f55513ba12
4 changed files with 61 additions and 17 deletions

View file

@ -1,4 +1,4 @@
#!/usr/bin/env ruby #!/usr/bin/env ruby
require 'rubygems' require 'rubygems'
require 'lib/net/client' require 'lib/net/client'
@ -12,6 +12,7 @@ class Agent < LogStash::Net::MessageClient
def initialize(host, port) def initialize(host, port)
super(username="", password="", host=host, port=port) super(username="", password="", host=host, port=port)
@hostname = Socket.gethostname @hostname = Socket.gethostname
@msgs = []
end # def initialize end # def initialize
def start_log_watcher def start_log_watcher
@ -30,14 +31,16 @@ class Agent < LogStash::Net::MessageClient
line.chomp! line.chomp!
index("httpd-access", line) index("httpd-access", line)
count += 1 count += 1
break if count >= 3 #break if count >= 3
end end
rescue => e rescue => e
$stderr.puts e.inspect $stderr.puts e.inspect
$stderr.puts caller.join("\n") $stderr.puts caller.join("\n")
raise e raise e
end end
#close
end end
@t2.join
end # def start_log_watcher end # def start_log_watcher
def index(type, string) def index(type, string)
@ -51,12 +54,10 @@ class Agent < LogStash::Net::MessageClient
end # def index end # def index
def IndexEventResponseHandler(msg) def IndexEventResponseHandler(msg)
puts "OK"
end # def IndexEventResponseHandler end # def IndexEventResponseHandler
def run def run
start_log_watcher start_log_watcher
@t2.join
super super
end end
end end

View file

@ -6,9 +6,9 @@ require 'uuid'
module LogStash; module Net module LogStash; module Net
class MessageServer < MessageSocket class MessageServer < MessageSocket
def run #def run
subscribe("logstash") #subscribe("logstash")
super #super
end #end
end # class MessageServer end # class MessageServer
end; end # module LogStash::Net end; end # module LogStash::Net

View file

@ -5,7 +5,6 @@ require 'lib/net/message'
require 'lib/net/messages/indexevent' require 'lib/net/messages/indexevent'
require 'lib/net/messages/search' require 'lib/net/messages/search'
require 'lib/net/messages/ping' require 'lib/net/messages/ping'
require 'ferret' require 'ferret'
require 'lib/log/text' require 'lib/log/text'
require 'config' require 'config'
@ -24,6 +23,11 @@ module LogStash; module Net; module Servers
@starttime = Time.now @starttime = Time.now
end end
def run
subscribe("logstash")
super
end
def IndexEventRequestHandler(request) def IndexEventRequestHandler(request)
response = LogStash::Net::Messages::IndexEventResponse.new response = LogStash::Net::Messages::IndexEventResponse.new
response.id = request.id response.id = request.id

View file

@ -22,14 +22,27 @@ module LogStash; module Net
} }
@handler = self @handler = self
@outbuffer = Hash.new { |h,k| h[k] = [] }
subscribe(@id) subscribe(@id)
end # def initialize end # def initialize
def subscribe(name) def subscribe(name)
#puts "Subscribing to #{name}" #puts "Subscribing to #{name}"
@stomp.subscribe("/queue/#{name}", @stomp_options) do |stompmsg| @stomp.subscribe("/queue/#{name}", @stomp_options) do |stompmsg|
obj = JSON::load(stompmsg.body) handle_message(stompmsg)
message = Message.new_from_data(obj) end # @stomp.subscribe
end # def subscribe
def handle_message(stompmsg)
obj = JSON::load(stompmsg.body)
if !obj.is_a?(Array)
obj = [obj]
end
#puts "Got #{obj.length} items"
obj.each do |item|
#puts item.inspect
message = Message.new_from_data(item)
name = message.class.name.split(":")[-1] name = message.class.name.split(":")[-1]
func = "#{name}Handler" func = "#{name}Handler"
#puts stompmsg #puts stompmsg
@ -43,28 +56,54 @@ module LogStash; module Net
# We should allow the message handler to defer acking if they want # We should allow the message handler to defer acking if they want
# For instance, if we want to index things, but only want to ack # For instance, if we want to index things, but only want to ack
# things once we actually flush to disk. # things once we actually flush to disk.
@stomp.acknowledge stompmsg
else else
$stderr.puts "#{@handler.class.name} does not support #{func}" $stderr.puts "#{@handler.class.name} does not support #{func}"
end # if @handler.respond_to?(func) end # if @handler.respond_to?(func)
end # @stomp.subscribe end
end # def subscribe @stomp.acknowledge stompmsg
end # def handle_message
def run def run
@flusher = Thread.new { flusher }
@stomp.join @stomp.join
end end
def sendmsg(destination, msg) def flusher
data = msg.to_json loop do
#puts @outbuffer.inspect
@outbuffer.each_key do |destination|
flushout(destination)
end
@outbuffer.clear
sleep 1
end
end
def flushout(destination)
msgs = @outbuffer[destination]
return if msgs.length == 0
data = msgs.to_json
options = { options = {
"persistent" => true, "persistent" => true,
"reply-to" => "/queue/#{@id}", "reply-to" => "/queue/#{@id}",
"ack" => "client",
} }
#puts "Flushing: #{data[0..40]}..."
@stomp.send(destination, data, options) @stomp.send(destination, data, options)
msgs.clear
end
def sendmsg(destination, msg)
#puts "Sending to #{destination}: #{msg}"
@outbuffer[destination] << msg
if @outbuffer[destination].length > 10
flushout(destination)
end
end end
def handler=(handler) def handler=(handler)
puts "Setting handler to #{handler.class.name}"
@handler = handler @handler = handler
end end