- Support message buffering again, but now support both buffering and

non-buffering (ie; send message now). This is akin to TCP_NODELAY but is
  configurable on a per-message basis by calling 'message.want_buffer(true)'
This commit is contained in:
Jordan Sissel 2009-10-25 04:29:06 +00:00
parent f35a1ea88a
commit 9af8615fe0
4 changed files with 25 additions and 21 deletions

View file

@ -32,6 +32,7 @@ module LogStash; module Net; module Clients
ier.metadata["source_host"] = @hostname
@logger.debug "Indexing #{type}: #{string}"
ier.want_buffer(true)
sendmsg("logstash", ier)
end # def index

View file

@ -41,6 +41,14 @@ module LogStash; module Net
return Time.now.to_f - timestamp
end
def buffer?
return @buffer
end
def want_buffer(want_buffer=true)
@buffer = want_buffer
end
# All message subclasses should register themselves here
# This will allow Message.new_from_data to automatically return
# the correct message instance.
@ -60,6 +68,7 @@ module LogStash; module Net
def initialize
@data = Hash.new
want_buffer(false)
end
def self.new_from_data(data)

View file

@ -255,7 +255,9 @@ module LogStash; module Net; module Servers
def broadcaster
msg = LogStash::Net::Messages::BroadcastMessage.new
msg.queue = @id
msg.want_buffer(false)
loop do
@logger.info "Sending #{msg.class}"
sendmsg_topic("logstash-broadcast", msg)
sleep(BROADCAST_INTERVAL)
@indexers_mutex.synchronize do

View file

@ -4,7 +4,6 @@ require 'lib/net/messagepacket'
require 'mq'
require 'uuid'
BUFFER_DELAY_MESSAGES = false
# http://github.com/tmm1/amqp/issues/#issue/3
# This is our (lame) hack to at least notify the user that something is
# wrong.
@ -53,7 +52,7 @@ module LogStash; module Net
# TODO: document this class
class MessageSocket
MAXBUF = 30
MAXBUF = 20
def initialize(config, logger)
@id = UUID::generate
@ -63,9 +62,7 @@ module LogStash; module Net
@want_topics = []
@topics = []
@handler = self
if BUFFER_DELAY_MESSAGES
@outbuffer = Hash.new { |h,k| h[k] = [] }
end
@outbuffer = Hash.new { |h,k| h[k] = [] }
@mq = nil
@message_operations = Hash.new
@startuplock = Mutex.new
@ -93,20 +90,18 @@ module LogStash; module Net
@mq = MQ.new
# Notify the main calling thread (MessageSocket#initialize) that
# we can continue
mq_q = @mq.queue(@id, :auto_delete => true)
mq_q.subscribe(:ack =>true) { |hdr, msg| handle_message(hdr, msg) }
handle_new_subscriptions
@logger.info "Subscribing to main queue #{@id}"
condvar.signal
end
@logger.info "Subscribing to main queue #{@id}"
mq_q = @mq.queue(@id, :auto_delete => true)
mq_q.subscribe(:ack =>true) { |hdr, msg| handle_message(hdr, msg) }
handle_new_subscriptions
EM.add_periodic_timer(5) { handle_new_subscriptions }
if BUFFER_DELAY_MESSAGES
EM.add_periodic_timer(1) do
@outbuffer.each_key { |dest| flushout(dest) }
@outbuffer.clear
end
EM.add_periodic_timer(1) do
@outbuffer.each_key { |dest| flushout(dest) }
@outbuffer.clear
end
end # AMQP.start
end
@ -182,11 +177,8 @@ module LogStash; module Net
def flushout(destination)
return unless @mq # wait until we are connected
if BUFFER_DELAY_MESSAGES
msgs = @outbuffer[destination]
return if msgs.length == 0
end
msgs = @outbuffer[destination]
return if msgs.length == 0
data = msgs.to_json
@mq.queue(destination).publish(data, :persistent => true)
msgs.clear
@ -211,7 +203,7 @@ module LogStash; module Net
msg.timestamp = Time.now.to_f
msg.replyto = @id
if BUFFER_DELAY_MESSAGES
if msg.buffer?
@outbuffer[destination] << msg
if @outbuffer[destination].length > MAXBUF
flushout(destination)