mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- Purge old network code now implemented by MQRPC
This commit is contained in:
parent
6d41874029
commit
1f85aad3d5
5 changed files with 0 additions and 485 deletions
|
@ -1,16 +0,0 @@
|
|||
require 'rubygems'
|
||||
require 'lib/net/socket'
|
||||
require 'logger'
|
||||
require 'uuid'
|
||||
|
||||
module LogStash; module Net
|
||||
class MessageClient < MessageSocket
|
||||
def initialize(config, progname)
|
||||
logger = Logger.new(STDOUT)
|
||||
logger.progname = progname
|
||||
logger.datetime_format = "%Y-%m-%d %H:%M:%S"
|
||||
super(config, logger)
|
||||
end
|
||||
# Nothing, yet.
|
||||
end # class MessageClient
|
||||
end; end # module LogStash::Net
|
|
@ -1,138 +0,0 @@
|
|||
require 'json'
|
||||
require 'set'
|
||||
# vim macro to replace 'hashbind :foo, "bar"' with two methods.
|
||||
# yypkct:def lxf,s
return @data[A]oend
def Jdt:xf,s(val)
return @data[A] = val
end
|
||||
|
||||
module BindToHash
|
||||
def hashbind(method, key)
|
||||
hashpath = BindToHash.genhashpath(key)
|
||||
self.class_eval %(
|
||||
def #{method}
|
||||
return #{hashpath}
|
||||
end
|
||||
def #{method}=(val)
|
||||
#{hashpath} = val
|
||||
end
|
||||
)
|
||||
end
|
||||
|
||||
def self.genhashpath(key)
|
||||
path = key.split("/").select { |x| x.length > 0 }.map { |x| "[#{x.inspect}]" }
|
||||
return "@data#{path.join("")}"
|
||||
end
|
||||
end # modules BindToHash
|
||||
|
||||
module LogStash; module Net
|
||||
PROTOCOL_VERSION = 1
|
||||
|
||||
class Message
|
||||
extend BindToHash
|
||||
attr_accessor :data
|
||||
|
||||
# list of class instances that can identify messages
|
||||
@@translators = Hash.new
|
||||
|
||||
# Message attributes
|
||||
hashbind :id, "id"
|
||||
hashbind :replyto, "reply-to"
|
||||
hashbind :timestamp, "timestamp"
|
||||
|
||||
def age
|
||||
return Time.now.to_f - timestamp
|
||||
end
|
||||
|
||||
def buffer?
|
||||
return @buffer
|
||||
end
|
||||
|
||||
def want_buffer(want_buffer=true)
|
||||
@buffer = want_buffer
|
||||
end
|
||||
|
||||
# All message subclasses should register themselves here
|
||||
# This will allow Message.new_from_data to automatically return
|
||||
# the correct message instance.
|
||||
def self.translators
|
||||
return @@translators
|
||||
end
|
||||
|
||||
def self.register
|
||||
name = self.name.split(":")[-1]
|
||||
self.class_eval %(
|
||||
def _name
|
||||
return "#{name}"
|
||||
end
|
||||
)
|
||||
@@translators[name] = self
|
||||
end
|
||||
|
||||
def initialize
|
||||
@data = Hash.new
|
||||
want_buffer(false)
|
||||
end
|
||||
|
||||
def self.new_from_data(data)
|
||||
obj = nil
|
||||
#@@translators.each do |translator|
|
||||
name = data["type"]
|
||||
if @@translators.has_key?(name)
|
||||
obj = @@translators[name].new
|
||||
else
|
||||
$stderr.puts "No translator found for #{name} / #{data.inspect}"
|
||||
obj = Message.new
|
||||
end
|
||||
obj.data = data
|
||||
return obj
|
||||
end
|
||||
|
||||
def to_json(*args)
|
||||
return @data.to_json(*args)
|
||||
end
|
||||
|
||||
protected
|
||||
attr :data
|
||||
end # class Message
|
||||
|
||||
class RequestMessage < Message
|
||||
@@idseq = 0
|
||||
|
||||
def initialize
|
||||
super
|
||||
self.args = Hash.new
|
||||
self.name = self._name
|
||||
generate_id!
|
||||
end
|
||||
|
||||
def generate_id!
|
||||
@data["id"] = @@idseq
|
||||
@@idseq += 1
|
||||
end
|
||||
|
||||
# Message attributes
|
||||
def name
|
||||
return @data["type"]
|
||||
end
|
||||
|
||||
def name=(val)
|
||||
return @data["type"] = val
|
||||
end
|
||||
|
||||
def args
|
||||
return @data["args"]
|
||||
end
|
||||
|
||||
def args=(val)
|
||||
return @data["args"] = val
|
||||
end
|
||||
end # class RequestMessage
|
||||
|
||||
class ResponseMessage < RequestMessage
|
||||
#Message.translators << self
|
||||
|
||||
# Report the success of the request this response is for.
|
||||
# Should be implemented by subclasses
|
||||
def success?
|
||||
raise NotImplementedError
|
||||
end
|
||||
end # class ResponseMessage
|
||||
end; end # module LogStash::Net
|
|
@ -1,62 +0,0 @@
|
|||
require 'rubygems'
|
||||
require 'lib/net/common'
|
||||
require 'json'
|
||||
|
||||
module LogStash; module Net
|
||||
class MessagePacket
|
||||
# 4 byte length
|
||||
# 4 byte checksum
|
||||
HEADERSIZE = 8
|
||||
|
||||
def self.each(data)
|
||||
done = false
|
||||
while !done
|
||||
have = data.length
|
||||
need = HEADERSIZE
|
||||
if have >= need
|
||||
need = data.unpack("N")[0] + HEADERSIZE
|
||||
if have >= need
|
||||
yield MessagePacket.new_from_encoded(data[0 .. need - 1])
|
||||
else
|
||||
done = true
|
||||
end
|
||||
else
|
||||
done = true
|
||||
end
|
||||
data[0 .. need - 1] = ""
|
||||
end
|
||||
end
|
||||
|
||||
def self.new_from_encoded(string)
|
||||
len = string.unpack("N")[0]
|
||||
len, checksum, data = string.unpack("NNA#{len}")
|
||||
|
||||
return MessagePacket.new(data, len=len, checksum=checksum)
|
||||
end
|
||||
|
||||
def initialize(data, len=nil, checksum=nil)
|
||||
@content = data
|
||||
@length = (len or data.length)
|
||||
@checksum = (checksum or data.checksum)
|
||||
|
||||
verify if length and checksum
|
||||
end
|
||||
|
||||
def verify
|
||||
if (@content.checksum != @checksum or @content.length != @length)
|
||||
$stderr.puts "FAIL"
|
||||
raise MessageCorrupt.new(@checksum, @content)
|
||||
end
|
||||
end
|
||||
|
||||
def encode
|
||||
return [@length, @checksum, @content].pack("NNA*")
|
||||
end
|
||||
|
||||
public
|
||||
attr_reader :length
|
||||
attr_reader :content
|
||||
attr_reader :checksum
|
||||
|
||||
end # class MessagePacket
|
||||
end; end # module LogStash::Net
|
|
@ -1,6 +0,0 @@
|
|||
require 'lib/net/socket'
|
||||
|
||||
module LogStash; module Net
|
||||
class MessageServer < MessageSocket
|
||||
end # class MessageServer
|
||||
end; end # module LogStash::Net
|
|
@ -1,263 +0,0 @@
|
|||
require 'rubygems'
|
||||
require 'amqp'
|
||||
require 'lib/net/messagepacket'
|
||||
require 'lib/util'
|
||||
require 'mq'
|
||||
require 'uuid'
|
||||
require 'thread'
|
||||
|
||||
# http://github.com/tmm1/amqp/issues/#issue/3
|
||||
# This is our (lame) hack to at least notify the user that something is
|
||||
# wrong.
|
||||
module AMQP
|
||||
module Client
|
||||
alias :original_reconnect :reconnect
|
||||
def reconnect(*args)
|
||||
$logger.warn "reconnecting to broker (bad MQ settings?)"
|
||||
|
||||
# some rate limiting
|
||||
sleep(5)
|
||||
|
||||
original_reconnect(*args)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
module LogStash; module Net
|
||||
|
||||
# A single message operation
|
||||
# * Takes a callback to call when a message is received
|
||||
# * Allows you to wait for the operation to complete.
|
||||
# * An operation is 'complete' when the callback returns :finished
|
||||
class Operation
|
||||
def initialize(callback)
|
||||
@mutex = Mutex.new
|
||||
@callback = callback
|
||||
@cv = ConditionVariable.new
|
||||
@finished = false
|
||||
end # def initialize
|
||||
|
||||
def call(*args)
|
||||
@mutex.synchronize do
|
||||
ret = @callback.call(*args)
|
||||
if ret == :finished
|
||||
@finished = true
|
||||
@cv.signal
|
||||
else
|
||||
return ret
|
||||
end
|
||||
end
|
||||
end # def call
|
||||
|
||||
def wait_until_finished
|
||||
@mutex.synchronize do
|
||||
@cv.wait(@mutex) if !finished?
|
||||
end
|
||||
end # def wait_until_finished
|
||||
|
||||
def finished?
|
||||
return @finished
|
||||
end
|
||||
end # def Operation
|
||||
|
||||
# TODO: document this class
|
||||
class MessageSocket
|
||||
MAXBUF = 20
|
||||
|
||||
def initialize(config, logger)
|
||||
@id = UUID::generate
|
||||
@config, @logger = config, logger
|
||||
@want_queues = []
|
||||
@queues = []
|
||||
@want_topics = []
|
||||
@topics = []
|
||||
@handler = self
|
||||
@receive_queue = Queue.new
|
||||
@outbuffer = Hash.new { |h,k| h[k] = [] }
|
||||
#@slidingwindow = LogStash::SlidingWindowSet.new
|
||||
@mq = nil
|
||||
@message_operations = Hash.new
|
||||
@startuplock = Mutex.new
|
||||
|
||||
@mutex = Mutex.new
|
||||
@cv = ConditionVariable.new
|
||||
start_amqp(@mutex, @cv)
|
||||
@mutex.synchronize do
|
||||
@logger.debug "Waiting for @mq ..."
|
||||
@cv.wait(@mutex)
|
||||
end
|
||||
|
||||
start_receiver
|
||||
end # def initialize
|
||||
|
||||
def start_amqp(mutex, condvar)
|
||||
@amqpthread = Thread.new do
|
||||
# Create connection to AMQP, and in turn, the main EventMachine loop.
|
||||
amqp_config = {:host => @config.mqhost,
|
||||
:port => @config.mqport,
|
||||
:user => @config.mquser,
|
||||
:pass => @config.mqpass,
|
||||
:vhost => @config.mqvhost,
|
||||
}
|
||||
AMQP.start(amqp_config) do
|
||||
mutex.synchronize do
|
||||
@mq = MQ.new
|
||||
# Notify the main calling thread (MessageSocket#initialize) that
|
||||
# we can continue
|
||||
condvar.signal
|
||||
end
|
||||
|
||||
@logger.info "Subscribing to main queue #{@id}"
|
||||
mq_q = @mq.queue(@id, :auto_delete => true)
|
||||
#mq_q.subscribe(:ack =>true) { |hdr, msg| handle_message(hdr, msg) }
|
||||
mq_q.subscribe(:ack =>true) { |hdr, msg| @receive_queue << [hdr, msg] }
|
||||
handle_new_subscriptions
|
||||
|
||||
EM.add_periodic_timer(5) { handle_new_subscriptions }
|
||||
EM.add_periodic_timer(1) do
|
||||
@outbuffer.each_key { |dest| flushout(dest) }
|
||||
@outbuffer.clear
|
||||
end
|
||||
end # AMQP.start
|
||||
end
|
||||
end # def start_amqp
|
||||
|
||||
def start_receiver
|
||||
Thread.new do
|
||||
while true
|
||||
header, message = @receive_queue.pop
|
||||
handle_message(header, message)
|
||||
end
|
||||
end
|
||||
end # def start_receiver
|
||||
|
||||
def subscribe(name)
|
||||
@want_queues << name
|
||||
end # def subscribe
|
||||
|
||||
def subscribe_topic(name)
|
||||
@want_topics << name
|
||||
end # def subscribe_topic
|
||||
|
||||
def handle_message(hdr, msg_body)
|
||||
obj = JSON::load(msg_body)
|
||||
if !obj.is_a?(Array)
|
||||
obj = [obj]
|
||||
end
|
||||
|
||||
obj.each do |item|
|
||||
message = Message.new_from_data(item)
|
||||
#if @slidingwindow.include?(message.id)
|
||||
#puts "Removing ack for #{message.id}"
|
||||
#@slidingwindow.delete(message.id)
|
||||
#end
|
||||
name = message.class.name.split(":")[-1]
|
||||
func = "#{name}Handler"
|
||||
|
||||
if @message_operations.has_key?(message.id)
|
||||
operation = @message_operations[message.id]
|
||||
operation.call message
|
||||
elsif @handler.respond_to?(func)
|
||||
@handler.send(func, message) do |response|
|
||||
reply = message.replyto
|
||||
sendmsg(reply, response)
|
||||
end
|
||||
|
||||
# We should allow the message handler to defer acking if they want
|
||||
# For instance, if we want to index things, but only want to ack
|
||||
# things once we actually flush to disk.
|
||||
else
|
||||
$stderr.puts "#{@handler.class.name} does not support #{func}"
|
||||
end # if @handler.respond_to?(func)
|
||||
end
|
||||
hdr.ack
|
||||
|
||||
if @close # set by 'close' method
|
||||
EM.stop_event_loop
|
||||
end
|
||||
end # def handle_message
|
||||
|
||||
def run
|
||||
@amqpthread.join
|
||||
end # run
|
||||
|
||||
def handle_new_subscriptions
|
||||
todo = @want_queues - @queues
|
||||
todo.each do |queue|
|
||||
@logger.info "Subscribing to queue #{queue}"
|
||||
mq_q = @mq.queue(queue, :durable => true)
|
||||
mq_q.subscribe(:ack => true) { |hdr, msg| @receive_queue << [hdr, msg] }
|
||||
@queues << queue
|
||||
end # todo.each
|
||||
|
||||
todo = @want_topics - @topics
|
||||
todo.each do |topic|
|
||||
@logger.info "Subscribing to topic #{topic}"
|
||||
exchange = @mq.topic("amq.topic")
|
||||
mq_q = @mq.queue("#{@id}-#{topic}",
|
||||
:exclusive => true,
|
||||
:auto_delete => true).bind(exchange, :key => topic)
|
||||
mq_q.subscribe { |hdr, msg| @receive_queue << [hdr, msg] }
|
||||
@topics << topic
|
||||
end # todo.each
|
||||
end # handle_new_subscriptions
|
||||
|
||||
def flushout(destination)
|
||||
return unless @mq # wait until we are connected
|
||||
|
||||
msgs = @outbuffer[destination]
|
||||
return if msgs.length == 0
|
||||
data = msgs.to_json
|
||||
@mq.queue(destination, :durable => true).publish(data, :persistent => true)
|
||||
msgs.clear
|
||||
end
|
||||
|
||||
def sendmsg_topic(key, msg)
|
||||
return unless @mq # wait until we are connected
|
||||
if (msg.is_a?(RequestMessage) and msg.id == nil)
|
||||
msg.generate_id!
|
||||
end
|
||||
msg.timestamp = Time.now.to_f
|
||||
|
||||
data = msg.to_json
|
||||
@mq.topic("amq.topic").publish(data, :key => key)
|
||||
end
|
||||
|
||||
def sendmsg(destination, msg, &callback)
|
||||
return unless @mq # wait until we are connected
|
||||
if (msg.is_a?(RequestMessage) and msg.id == nil)
|
||||
msg.generate_id!
|
||||
end
|
||||
msg.timestamp = Time.now.to_f
|
||||
msg.replyto = @id
|
||||
|
||||
if (msg.is_a?(RequestMessage) and !msg.is_a?(ResponseMessage))
|
||||
@logger.info "Tracking #{msg.class.name}##{msg.id}"
|
||||
#@slidingwindow << msg.id
|
||||
end
|
||||
|
||||
if msg.buffer?
|
||||
@outbuffer[destination] << msg
|
||||
if @outbuffer[destination].length > MAXBUF
|
||||
flushout(destination)
|
||||
end
|
||||
else
|
||||
@mq.queue(destination, :durable => true).publish([msg].to_json, :persistent => true)
|
||||
end
|
||||
|
||||
if block_given?
|
||||
op = Operation.new(callback)
|
||||
@message_operations[msg.id] = op
|
||||
return op
|
||||
end
|
||||
end
|
||||
|
||||
def handler=(handler)
|
||||
@handler = handler
|
||||
end
|
||||
|
||||
def close
|
||||
@close = true
|
||||
end
|
||||
end # class MessageSocket
|
||||
end; end # module LogStash::Net
|
Loading…
Add table
Add a link
Reference in a new issue