From f55513ba12eba2b5fc4e7ce16553868573b24b65 Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Sun, 6 Sep 2009 23:38:04 +0000 Subject: [PATCH] * In client ack mode, StompServer will only send one message at a time, waiting for an ack for that message before sending another. Work around this: - batch up messages to sendmsg() and flush when there are more than 10 in the queue or there has been more than 1 second since flushing and the queue is non-empty This increases the indexing rate by a factor of 6. --- bin/agent.rb | 9 ++++--- lib/net/server.rb | 8 +++--- lib/net/servers/indexer.rb | 6 ++++- lib/net/socket.rb | 55 ++++++++++++++++++++++++++++++++------ 4 files changed, 61 insertions(+), 17 deletions(-) diff --git a/bin/agent.rb b/bin/agent.rb index b12571b75..e83e1018c 100755 --- a/bin/agent.rb +++ b/bin/agent.rb @@ -1,4 +1,4 @@ -#!/usr/bin/env ruby + #!/usr/bin/env ruby require 'rubygems' require 'lib/net/client' @@ -12,6 +12,7 @@ class Agent < LogStash::Net::MessageClient def initialize(host, port) super(username="", password="", host=host, port=port) @hostname = Socket.gethostname + @msgs = [] end # def initialize def start_log_watcher @@ -30,14 +31,16 @@ class Agent < LogStash::Net::MessageClient line.chomp! index("httpd-access", line) count += 1 - break if count >= 3 + #break if count >= 3 end rescue => e $stderr.puts e.inspect $stderr.puts caller.join("\n") raise e end + #close end + @t2.join end # def start_log_watcher def index(type, string) @@ -51,12 +54,10 @@ class Agent < LogStash::Net::MessageClient end # def index def IndexEventResponseHandler(msg) - puts "OK" end # def IndexEventResponseHandler def run start_log_watcher - @t2.join super end end diff --git a/lib/net/server.rb b/lib/net/server.rb index 6dacfdcc1..41842f750 100644 --- a/lib/net/server.rb +++ b/lib/net/server.rb @@ -6,9 +6,9 @@ require 'uuid' module LogStash; module Net class MessageServer < MessageSocket - def run - subscribe("logstash") - super - end + #def run + #subscribe("logstash") + #super + #end end # class MessageServer end; end # module LogStash::Net diff --git a/lib/net/servers/indexer.rb b/lib/net/servers/indexer.rb index d3c4bccbd..2e71f2ca2 100644 --- a/lib/net/servers/indexer.rb +++ b/lib/net/servers/indexer.rb @@ -5,7 +5,6 @@ require 'lib/net/message' require 'lib/net/messages/indexevent' require 'lib/net/messages/search' require 'lib/net/messages/ping' - require 'ferret' require 'lib/log/text' require 'config' @@ -24,6 +23,11 @@ module LogStash; module Net; module Servers @starttime = Time.now end + def run + subscribe("logstash") + super + end + def IndexEventRequestHandler(request) response = LogStash::Net::Messages::IndexEventResponse.new response.id = request.id diff --git a/lib/net/socket.rb b/lib/net/socket.rb index f1e08931f..6d40aa609 100644 --- a/lib/net/socket.rb +++ b/lib/net/socket.rb @@ -22,14 +22,27 @@ module LogStash; module Net } @handler = self + @outbuffer = Hash.new { |h,k| h[k] = [] } subscribe(@id) end # def initialize def subscribe(name) #puts "Subscribing to #{name}" @stomp.subscribe("/queue/#{name}", @stomp_options) do |stompmsg| - obj = JSON::load(stompmsg.body) - message = Message.new_from_data(obj) + handle_message(stompmsg) + end # @stomp.subscribe + end # def subscribe + + def handle_message(stompmsg) + obj = JSON::load(stompmsg.body) + if !obj.is_a?(Array) + obj = [obj] + end + + #puts "Got #{obj.length} items" + obj.each do |item| + #puts item.inspect + message = Message.new_from_data(item) name = message.class.name.split(":")[-1] func = "#{name}Handler" #puts stompmsg @@ -43,28 +56,54 @@ module LogStash; module Net # We should allow the message handler to defer acking if they want # For instance, if we want to index things, but only want to ack # things once we actually flush to disk. - @stomp.acknowledge stompmsg else $stderr.puts "#{@handler.class.name} does not support #{func}" end # if @handler.respond_to?(func) - end # @stomp.subscribe - end # def subscribe + end + @stomp.acknowledge stompmsg + end # def handle_message def run + @flusher = Thread.new { flusher } @stomp.join end - def sendmsg(destination, msg) - data = msg.to_json + def flusher + loop do + #puts @outbuffer.inspect + @outbuffer.each_key do |destination| + flushout(destination) + end + @outbuffer.clear + sleep 1 + end + end + + def flushout(destination) + msgs = @outbuffer[destination] + return if msgs.length == 0 + + data = msgs.to_json options = { "persistent" => true, "reply-to" => "/queue/#{@id}", - "ack" => "client", } + #puts "Flushing: #{data[0..40]}..." @stomp.send(destination, data, options) + msgs.clear + end + + def sendmsg(destination, msg) + #puts "Sending to #{destination}: #{msg}" + @outbuffer[destination] << msg + + if @outbuffer[destination].length > 10 + flushout(destination) + end end def handler=(handler) + puts "Setting handler to #{handler.class.name}" @handler = handler end