- Disable message buffering internal to logstash. Previously, sendmsg() would

push into an array which would get flushed when full or once per second.
  This was causing any normally-fast search request to take a minimum of 1 second.

  The original need for buffering was due to features of STOMP (not used
  anymore) that would limit the number of messages we could handle at once. We
  would buffer multiple messages to a single STOMP queue message so that we
  could handle multiple messages. This may not be a problem anymore with AMQP.

  Set BUFFER_DELAY_MESSAGES to true in socket.rb if you want to re-enable this.

- Add 'timestamp' attribute for all messages. This attribute is set
  automatically when it is sent with MessageSocket#sendmsg.
- Add blocking on MessageSocket#initialize to wait for the AMQP thread. Uses
  the same mutex/conditionvariable method we use to block LogStash::Operation
  instances.
- Removed old 'USE_MARSHAL' support as I think we're pretty happy with JSON now.
This commit is contained in:
Jordan Sissel 2009-10-25 03:24:02 +00:00
parent b9a273885f
commit 226027c024
4 changed files with 73 additions and 64 deletions

View file

@ -72,7 +72,7 @@ module LogStash::Net::Clients
query = "(#{query}) AND -@NEEDSPARSING:1"
end
puts "Query: #{query}"
@logger.info "Query: #{query}"
hits_msg = LogStash::Net::Messages::SearchHitsRequest.new
hits_msg.log_type = options[:log_type]
@ -87,10 +87,12 @@ module LogStash::Net::Clients
ops = []
@indexers.each do |i|
ops << sendmsg(i, hits_msg) do |msg|
@logger.debug "Got #{msg.class} with age #{msg.age}"
hits += msg.hits
:finished
end
ops << sendmsg(i, search_msg) do |msg|
@logger.debug "Got #{msg.class} with age #{msg.age}"
msg.results.each do |result|
results << result
end

View file

@ -33,20 +33,12 @@ module LogStash; module Net
@@translators = Hash.new
# Message attributes
def id
return @data["id"]
end
hashbind :id, "id"
hashbind :replyto, "reply-to"
hashbind :timestamp, "timestamp"
def id=(val)
return @data["id"] = val
end
def replyto
return @data["reply-to"]
end
def replyto=(val)
return @data["reply-to"] = val
def age
return Time.now.to_f - timestamp
end
# All message subclasses should register themselves here

View file

@ -11,13 +11,16 @@ require 'lib/net/messages/search'
require 'lib/net/messages/searchhits'
require 'lib/net/messages/ping'
require 'lib/config/indexer.rb'
require 'lib/util'
require 'ferret'
require 'lib/log/text'
require 'pp'
module LogStash; module Net; module Servers
class Indexer < LogStash::Net::MessageServer
BROADCAST_INTERVAL = 30
# Default broadcast interval is 30, but for debugging/testing, sometimes
# it's nice to set it lower.
BROADCAST_INTERVAL = (ENV["BROADCAST_INTERVAL"] or 30).to_i
SYNC_DELAY = 10
def initialize(configfile, logger)
@ -80,10 +83,6 @@ module LogStash; module Net; module Servers
end
def get_ferret(log_type)
#@readers[log_type] ||= Ferret::Index::IndexReader.new(
#@config.logs[log_type].index_dir)
#reader = @readers[log_type]
#@searchers[log_type] ||= Ferret::Search::Searcher.new(reader)
# open the index every time otherwise we get stale results.
reader = Ferret::Index::IndexReader.new(@config.logs[log_type].index_dir)
@ -99,6 +98,8 @@ module LogStash; module Net; module Servers
def SearchRequestHandler(request)
@logger.debug "received SearchRequest (#{request.query.inspect} in " \
"#{request.log_type})"
@logger.debug "message age is #{request.age} seconds"
stopwatch = LogStash::StopWatch.new
response = LogStash::Net::Messages::SearchResponse.new
response.id = request.id
response.indexer_id = @id
@ -164,6 +165,8 @@ module LogStash; module Net; module Servers
response.results = []
response.finished = true
yield response
@logger.info "SearchRequest for '#{request.query}' took #{stopwatch.to_s(4)}"
end # def SearchRequestHandler
def SearchHitsRequestHandler(request)
@ -241,7 +244,6 @@ module LogStash; module Net; module Servers
# TODO: only run flush if we need to
@logger.debug "Forcing a sync of #{log_type}"
index.flush
break
end
synctime = Time.now + SYNC_DELAY

View file

@ -4,8 +4,7 @@ require 'lib/net/messagepacket'
require 'mq'
require 'uuid'
USE_MARSHAL = ENV.has_key?("USE_MARSHAL")
BUFFER_DELAY_MESSAGES = false
# http://github.com/tmm1/amqp/issues/#issue/3
# This is our (lame) hack to at least notify the user that something is
# wrong.
@ -24,6 +23,11 @@ module AMQP
end
module LogStash; module Net
# A single message operation
# * Takes a callback to call when a message is received
# * Allows you to wait for the operation to complete.
# * An operation is 'complete' when the callback returns :finished
class Operation
def initialize(callback)
@mutex = Mutex.new
@ -47,9 +51,7 @@ module LogStash; module Net
end # def wait_until_finished
end # def Operation
# The MessageClient class exists only as an alias
# to the MessageSocketMux. You should use the
# client class if you are implementing a client.
# TODO: document this class
class MessageSocket
MAXBUF = 30
@ -61,13 +63,23 @@ module LogStash; module Net
@want_topics = []
@topics = []
@handler = self
@outbuffer = Hash.new { |h,k| h[k] = [] }
if BUFFER_DELAY_MESSAGES
@outbuffer = Hash.new { |h,k| h[k] = [] }
end
@mq = nil
@message_operations = Hash.new
start_amqp
@startuplock = Mutex.new
@mutex = Mutex.new
@cv = ConditionVariable.new
start_amqp(@mutex, @cv)
@mutex.synchronize do
@logger.debug "Waiting for @mq ..."
@cv.wait(@mutex)
end
end # def initialize
def start_amqp
def start_amqp(mutex, condvar)
@amqpthread = Thread.new do
# Create connection to AMQP, and in turn, the main EventMachine loop.
amqp_config = {:host => @config.mqhost,
@ -77,16 +89,24 @@ module LogStash; module Net
:vhost => @config.mqvhost,
}
AMQP.start(amqp_config) do
@mq = MQ.new
@logger.info "Subscribing to main queue #{@id}"
mq_q = @mq.queue(@id, :auto_delete => true)
mq_q.subscribe(:ack =>true) { |hdr, msg| handle_message(hdr, msg) }
handle_new_subscriptions
mutex.synchronize do
@mq = MQ.new
# Notify the main calling thread (MessageSocket#initialize) that
# we can continue
mq_q = @mq.queue(@id, :auto_delete => true)
mq_q.subscribe(:ack =>true) { |hdr, msg| handle_message(hdr, msg) }
handle_new_subscriptions
@logger.info "Subscribing to main queue #{@id}"
condvar.signal
end
EM.add_periodic_timer(5) { handle_new_subscriptions }
EM.add_periodic_timer(1) do
@outbuffer.each_key { |dest| flushout(dest) }
@outbuffer.clear
if BUFFER_DELAY_MESSAGES
EM.add_periodic_timer(1) do
@outbuffer.each_key { |dest| flushout(dest) }
@outbuffer.clear
end
end
end # AMQP.start
end
@ -101,21 +121,13 @@ module LogStash; module Net
end # def subscribe_topic
def handle_message(hdr, msg_body)
if USE_MARSHAL
obj = Marshal.load(msg_body)
else
obj = JSON::load(msg_body)
if !obj.is_a?(Array)
obj = [obj]
end
obj = JSON::load(msg_body)
if !obj.is_a?(Array)
obj = [obj]
end
obj.each do |item|
if USE_MARSHAL
message = item
else
message = Message.new_from_data(item)
end
message = Message.new_from_data(item)
name = message.class.name.split(":")[-1]
func = "#{name}Handler"
@ -170,14 +182,12 @@ module LogStash; module Net
def flushout(destination)
return unless @mq # wait until we are connected
msgs = @outbuffer[destination]
return if msgs.length == 0
if USE_MARSHAL
data = Marshal.dump(msgs)
else
data = msgs.to_json
if BUFFER_DELAY_MESSAGES
msgs = @outbuffer[destination]
return if msgs.length == 0
end
data = msgs.to_json
@mq.queue(destination).publish(data, :persistent => true)
msgs.clear
end
@ -187,24 +197,27 @@ module LogStash; module Net
if (msg.is_a?(RequestMessage) and msg.id == nil)
msg.generate_id!
end
msg.timestamp = Time.now.to_f
if USE_MARSHAL
data = Marshal.dump(msg)
else
data = msg.to_json
end
data = msg.to_json
@mq.topic("amq.topic").publish(data, :key => key)
end
def sendmsg(destination, msg, &callback)
return unless @mq # wait until we are connected
if (msg.is_a?(RequestMessage) and msg.id == nil)
msg.generate_id!
end
msg.timestamp = Time.now.to_f
msg.replyto = @id
@outbuffer[destination] << msg
if @outbuffer[destination].length > MAXBUF
flushout(destination)
if BUFFER_DELAY_MESSAGES
@outbuffer[destination] << msg
if @outbuffer[destination].length > MAXBUF
flushout(destination)
end
else
@mq.queue(destination).publish([msg].to_json, :persistent => true)
end
if block_given?