mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
- Refactor network code to use EventMachine
Message decoding is working, but I think we're silently dropping data somewhere.
This commit is contained in:
parent
35c8fe2670
commit
173840a827
9 changed files with 202 additions and 322 deletions
34
bin/agent.rb
34
bin/agent.rb
|
@ -9,7 +9,7 @@ require 'socket'
|
|||
|
||||
class Agent < LogStash::Net::MessageClient
|
||||
def initialize(host, port)
|
||||
super()
|
||||
super(host, port)
|
||||
@hostname = Socket.gethostname
|
||||
@host = host
|
||||
@port = port
|
||||
|
@ -45,7 +45,7 @@ class Agent < LogStash::Net::MessageClient
|
|||
$stdout.write(".")
|
||||
$stdout.flush
|
||||
#puts "Trying to send: #{ier.inspect}"
|
||||
sendmsg(ier)
|
||||
@connection.sendmsg(ier)
|
||||
sent = true
|
||||
rescue LogStash::Net::NoSocket
|
||||
# No client connection available, wait.
|
||||
|
@ -59,31 +59,6 @@ class Agent < LogStash::Net::MessageClient
|
|||
if msg.success?
|
||||
end
|
||||
end # def IndexEventResponseHandler
|
||||
|
||||
def run
|
||||
loop do
|
||||
done = false
|
||||
while !done
|
||||
begin
|
||||
done = connect(@host, @port);
|
||||
rescue Errno::ECONNREFUSED => e
|
||||
puts "Connection to #{@host}:#{@port} failed: #{e}"
|
||||
puts "Sleeping for retry."
|
||||
sleep 1
|
||||
end
|
||||
end
|
||||
puts "Connection OK"
|
||||
|
||||
begin
|
||||
loop do
|
||||
sendrecv(nil)
|
||||
end
|
||||
rescue LogStash::Net::MessageClientConnectionReset
|
||||
puts "Connection died, retrying..."
|
||||
end
|
||||
end
|
||||
end # def run
|
||||
|
||||
end
|
||||
|
||||
|
||||
|
@ -94,5 +69,8 @@ if $0 == __FILE__
|
|||
end
|
||||
host, port = ARGV[0].split(":")
|
||||
agent = Agent.new(host, port)
|
||||
agent.run
|
||||
|
||||
agent.run do |i|
|
||||
# nothing
|
||||
end
|
||||
end
|
||||
|
|
|
@ -2,8 +2,9 @@
|
|||
#
|
||||
|
||||
require "rubygems"
|
||||
require "lib/net/socketmux"
|
||||
require "lib/net/servers/indexer"
|
||||
|
||||
s = LogStash::Net::Servers::Indexer.new
|
||||
s.run
|
||||
s.run do |i|
|
||||
puts "OK"
|
||||
end
|
||||
|
|
|
@ -1,9 +1,28 @@
|
|||
require 'lib/net/socketmux'
|
||||
require 'rubygems'
|
||||
require 'eventmachine'
|
||||
require 'lib/net/socket'
|
||||
require 'lib/net/messages/ping.rb'
|
||||
|
||||
module LogStash; module Net
|
||||
# The MessageClient class exists only as an alias
|
||||
# to the MessageSocketMux. You should use the
|
||||
# client class if you are implementing a client.
|
||||
class MessageClient < MessageSocketMux
|
||||
class MessageClient
|
||||
attr_reader :connection
|
||||
|
||||
def initialize(host, port)
|
||||
@host = host
|
||||
@port = port
|
||||
end
|
||||
|
||||
def run
|
||||
EventMachine.run do
|
||||
connect(@host, @port)
|
||||
end
|
||||
end
|
||||
|
||||
def connect(host, port)
|
||||
@connection = EventMachine::connect(host, port, MessageSocket) do |m|
|
||||
m.handler = self
|
||||
end
|
||||
end
|
||||
|
||||
end # class MessageClient
|
||||
end; end # module LogStash::Net
|
||||
|
|
|
@ -3,6 +3,18 @@ require 'zlib'
|
|||
module LogStash; module Net;
|
||||
MAXMSGLEN = (1 << 20) # one megabyte message blocks
|
||||
|
||||
class MessageCorrupt < StandardError
|
||||
attr_reader :expected_checksum
|
||||
attr_reader :data
|
||||
|
||||
def initialize(checksum, data)
|
||||
@expected_checksum = checksum
|
||||
@data = data
|
||||
super("Corrupt message read. Expected checksum #{checksum}, got " +
|
||||
"#{data.checksum}")
|
||||
end # def initialize
|
||||
end # class MessageReaderCorruptMessage
|
||||
|
||||
end; end # module LogStash::Net
|
||||
|
||||
# Add adler32 checksum from Zlib to String class
|
||||
|
|
62
lib/net/messagepacket.rb
Normal file
62
lib/net/messagepacket.rb
Normal file
|
@ -0,0 +1,62 @@
|
|||
require 'rubygems'
|
||||
require 'lib/net/common'
|
||||
require 'json'
|
||||
|
||||
module LogStash; module Net
|
||||
class MessagePacket
|
||||
# 4 byte length
|
||||
# 4 byte checksum
|
||||
HEADERSIZE = 8
|
||||
|
||||
def self.each(data)
|
||||
done = false
|
||||
while !done
|
||||
have = data.length
|
||||
need = HEADERSIZE
|
||||
if have >= need
|
||||
need = data.unpack("N")[0] + HEADERSIZE
|
||||
if have >= need
|
||||
yield MessagePacket.new_from_encoded(data[0 .. need - 1])
|
||||
else
|
||||
done = true
|
||||
end
|
||||
else
|
||||
done = true
|
||||
end
|
||||
data[0 .. need - 1] = ""
|
||||
end
|
||||
end
|
||||
|
||||
def self.new_from_encoded(string)
|
||||
len = string.unpack("N")[0]
|
||||
len, checksum, data = string.unpack("NNA#{len}")
|
||||
|
||||
return MessagePacket.new(data, len=len, checksum=checksum)
|
||||
end
|
||||
|
||||
def initialize(data, len=nil, checksum=nil)
|
||||
@content = data
|
||||
@length = (len or data.length)
|
||||
@checksum = (checksum or data.checksum)
|
||||
|
||||
verify if length and checksum
|
||||
end
|
||||
|
||||
def verify
|
||||
if (@content.checksum != @checksum or @content.length != @length)
|
||||
$stderr.puts "FAIL"
|
||||
raise MessageCorrupt.new(@checksum, @content)
|
||||
end
|
||||
end
|
||||
|
||||
def encode
|
||||
return [@length, @checksum, @content].pack("NNA*")
|
||||
end
|
||||
|
||||
public
|
||||
attr_reader :length
|
||||
attr_reader :content
|
||||
attr_reader :checksum
|
||||
|
||||
end # class MessagePacket
|
||||
end; end # module LogStash::Net
|
|
@ -1,9 +1,27 @@
|
|||
require 'lib/net/socketmux'
|
||||
require 'rubygems'
|
||||
require 'eventmachine'
|
||||
require 'lib/net/socket'
|
||||
|
||||
module LogStash; module Net
|
||||
# The MessageServer class exists only as an alias
|
||||
# to the MessageSocketMux. You should use the
|
||||
# client class if you are implementing a client.
|
||||
class MessageServer < MessageSocketMux
|
||||
class MessageServer
|
||||
def initialize(host, port)
|
||||
@host = host
|
||||
@port = port
|
||||
end
|
||||
|
||||
def run
|
||||
EventMachine.run do
|
||||
listen(@host, @port)
|
||||
end
|
||||
end
|
||||
|
||||
def listen(host, port)
|
||||
EventMachine::start_server(host, port, MessageSocket) do |m|
|
||||
m.handler = self
|
||||
end
|
||||
end
|
||||
end # class MessageServer
|
||||
end; end # module LogStash::Net
|
||||
|
|
|
@ -17,8 +17,7 @@ module LogStash; module Net; module Servers
|
|||
|
||||
def initialize(addr="0.0.0.0", port=3001)
|
||||
# 'super' is not the same as 'super()', and we want super().
|
||||
super()
|
||||
listen(addr, port)
|
||||
super(addr, port)
|
||||
@indexes = Hash.new
|
||||
@lines = Hash.new { |h,k| h[k] = 0 }
|
||||
@indexcount = 0
|
||||
|
@ -29,7 +28,8 @@ module LogStash; module Net; module Servers
|
|||
response.id = request.id
|
||||
@indexcount += 1
|
||||
|
||||
print "\rK#{@indexcount}"
|
||||
print "\rK#{@indexcount} (vs #{request.id})"
|
||||
#puts "#{@indexcount} (id: #{request.id})"
|
||||
|
||||
log_type = request.log_type
|
||||
entry = $logs[log_type].parse_entry(request.log_data)
|
||||
|
@ -108,7 +108,7 @@ module LogStash; module Net; module Servers
|
|||
end
|
||||
|
||||
# Special 'run' override because we want sync to disk once per minute.
|
||||
def run
|
||||
def _run
|
||||
synctime = Time.now + SYNCDELAY
|
||||
sleeptime = 1
|
||||
loop do
|
||||
|
|
71
lib/net/socket.rb
Normal file
71
lib/net/socket.rb
Normal file
|
@ -0,0 +1,71 @@
|
|||
require 'rubygems'
|
||||
require 'lib/net/messagepacket'
|
||||
require 'eventmachine'
|
||||
|
||||
module LogStash; module Net
|
||||
# The MessageClient class exists only as an alias
|
||||
# to the MessageSocketMux. You should use the
|
||||
# client class if you are implementing a client.
|
||||
class MessageSocket < EventMachine::Connection
|
||||
# connection init callback from EventMachine::Connection
|
||||
def post_init
|
||||
#set_comm_inactivity_timeout(30)
|
||||
@buffer = ""
|
||||
end
|
||||
|
||||
# data receiver callback from EventMachine::Connection
|
||||
def receive_data(data)
|
||||
@buffer += data
|
||||
|
||||
len = 0
|
||||
count = 0
|
||||
MessagePacket.each(@buffer) do |packet|
|
||||
len += packet.length
|
||||
count += 1
|
||||
obj = JSON::load(packet.content)
|
||||
msg = Message.new_from_data(obj)
|
||||
|
||||
if !@handler
|
||||
$stderr.puts "No message handler set. Can't handle #{msg.class.name}"
|
||||
next
|
||||
end
|
||||
|
||||
name = msg.class.name.split(":")[-1]
|
||||
func = "#{name}Handler"
|
||||
if @handler.respond_to?(func):
|
||||
operation = lambda do
|
||||
@handler.send(func, msg) do |response|
|
||||
sendmsg(response)
|
||||
end
|
||||
end
|
||||
EventMachine.defer(operation, nil)
|
||||
#@handler.send(func, msg) do |response|
|
||||
#sendmsg(response)
|
||||
#end
|
||||
else
|
||||
$stderr.puts "#{@handler.class.name} does not support #{func}"
|
||||
end
|
||||
end
|
||||
|
||||
if len > 0
|
||||
puts "Removing #{len} bytes (#{count} packets)"
|
||||
@buffer[0 .. len - 1] = ""
|
||||
end
|
||||
end # def receive_data
|
||||
|
||||
def sendmsg(msg)
|
||||
if msg.is_a?(RequestMessage) and msg.id == nil
|
||||
msg.generate_id!
|
||||
end
|
||||
|
||||
data = msg.to_json
|
||||
packet = MessagePacket.new(data)
|
||||
#puts "Sending: #{packet.encode.inspect}"
|
||||
send_data(packet.encode)
|
||||
end
|
||||
|
||||
def handler=(obj)
|
||||
@handler = obj
|
||||
end
|
||||
end # class MessageSocket
|
||||
end; end # module LogStash::Net
|
|
@ -1,281 +0,0 @@
|
|||
require 'lib/net/common'
|
||||
require 'lib/net/message'
|
||||
require 'lib/net/messagestream'
|
||||
require 'lib/net/messagereader'
|
||||
require 'set'
|
||||
require 'socket'
|
||||
require 'thread'
|
||||
require 'time'
|
||||
|
||||
module LogStash; module Net
|
||||
class MessageClientConnectionReset < StandardError; end
|
||||
class NoSocket < StandardError; end
|
||||
|
||||
class MessageSocketMux
|
||||
def initialize
|
||||
@writelock = Mutex.new
|
||||
@server = nil
|
||||
@receiver = nil
|
||||
|
||||
# signal and signal observer are for allowing us to break
|
||||
# out of the select() call whenever sendmsg() is invoked.
|
||||
# sendmsg() puts a new writer on the list of @writers and we need
|
||||
# to rerun the select() to pick that change up.
|
||||
# We maybe should switch to EventMachine (like libevent) for
|
||||
# doing this event handling nonsense for us.
|
||||
@signal, @signal_observer = Socket::socketpair(Socket::PF_LOCAL,
|
||||
Socket::SOCK_DGRAM, 0)
|
||||
|
||||
# server_done is unused right now
|
||||
@server_done = false
|
||||
@receiver_done = false
|
||||
|
||||
# Socket list for readers and writers
|
||||
@readers = [@signal_observer]
|
||||
@writers = []
|
||||
|
||||
@msgoutstreams = Hash.new do
|
||||
|h,k| h[k] = LogStash::Net::MessageStream.new
|
||||
end
|
||||
@msgreaders = Hash.new do |h,k|
|
||||
h[k] = LogStash::Net::MessageReader.new(k)
|
||||
end
|
||||
|
||||
@ackwait = Set.new
|
||||
@done = false
|
||||
end
|
||||
|
||||
# Set up a server and listen on a port
|
||||
def listen(addr="0.0.0.0", port=0)
|
||||
@server = TCPServer.new(addr, port)
|
||||
@readers << @server
|
||||
end
|
||||
|
||||
# Connect to a remote server
|
||||
def connect(addr="0.0.0.0", port=0)
|
||||
@receiver = TCPSocket.new(addr, port)
|
||||
add_socket(@receiver)
|
||||
return true
|
||||
end
|
||||
|
||||
# Send a message. This method queues the message in the outbound
|
||||
# message queue. To actually make the message get sent on the wire
|
||||
# you need to call MessageSocketMux#sendrecv or MessageSocketMux#run
|
||||
#
|
||||
# If you are implementing a client, you can omit the 'sock' argument
|
||||
# because it will automatically send to the server you connected to.
|
||||
# If a socket is given, send to that specific socket.
|
||||
def sendmsg(msg, sock=nil)
|
||||
@writelock.synchronize do
|
||||
_sendmsg(msg, sock)
|
||||
end
|
||||
end
|
||||
|
||||
# Run indefinitely.
|
||||
# Ending conditions are when there are no sockets left open.
|
||||
# If you want to terminate the server (#listen or #connect) then
|
||||
# call MessageSocketMux#close
|
||||
def run
|
||||
while !@done
|
||||
sendrecv(nil)
|
||||
end
|
||||
end
|
||||
|
||||
# Wait for network data (input and output) for the given timeout
|
||||
# If timeout is nil, we will wait until there is data.
|
||||
# If timeout is a positive number, we will wait that number of seconds
|
||||
# or until there is data - whichever is first.
|
||||
# If timeout is zero, we will not wait at all for data.
|
||||
#
|
||||
# Returns true if there was network data handled, false otherwise.
|
||||
def sendrecv(timeout=nil)
|
||||
writers = @writers.select { |w| @msgoutstreams.has_key?(w) }
|
||||
had_receiver = @receiver != nil
|
||||
s_in, s_out, s_err = IO.select(@readers, writers, nil, timeout)
|
||||
|
||||
handle_in(s_in) if s_in
|
||||
handle_out(s_out) if s_out
|
||||
|
||||
# If we had a client (via connect()) before, but we don't now,
|
||||
# raise an exception so the client can make a decision.
|
||||
if (had_receiver and @receiver == nil)
|
||||
raise MessageClientConnectionReset
|
||||
end
|
||||
|
||||
# Return true if we got data at all.
|
||||
return (s_in != nil or s_out != nil)
|
||||
end
|
||||
|
||||
def close
|
||||
if @receiver
|
||||
@receiver_done = true
|
||||
# Don't close our writer yet. Wait until our outbound queue is empty.
|
||||
end
|
||||
|
||||
if @server
|
||||
@server_done = true
|
||||
# Stop accepting new connections.
|
||||
remove_reader(@server)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
def add_socket(sock)
|
||||
@readers << sock
|
||||
#@writers << sock
|
||||
end
|
||||
|
||||
private
|
||||
def _sendmsg(msg, sock=nil)
|
||||
if msg == nil
|
||||
raise "msg is nil"
|
||||
end
|
||||
|
||||
# Handle if 'msg' is actually an array of messages
|
||||
if msg.is_a?(Array)
|
||||
msg.each do |m|
|
||||
_sendmsg(m, sock)
|
||||
end
|
||||
return
|
||||
end
|
||||
|
||||
if msg.is_a?(RequestMessage) and msg.id == nil
|
||||
msg.generate_id!
|
||||
end
|
||||
|
||||
sock = (sock or @receiver)
|
||||
if sock == nil
|
||||
raise NoSocket
|
||||
end
|
||||
if !@writers.include?(sock)
|
||||
@writers << sock
|
||||
@signal.write("x")
|
||||
end
|
||||
@msgoutstreams[sock] << msg
|
||||
|
||||
@ackwait << msg.id
|
||||
end # def _sendmsg
|
||||
|
||||
private
|
||||
def remove_writer(sock)
|
||||
puts "remove writer: #{caller[0]}"
|
||||
@writers.delete(sock)
|
||||
@msgoutstreams.delete(sock)
|
||||
@receiver = nil if sock == @receiver
|
||||
sock.close_write() rescue nil # Ignore close errors
|
||||
check_done
|
||||
end # def remove_writer
|
||||
|
||||
private
|
||||
def remove_reader(sock)
|
||||
puts "remove reader: #{caller[0]}"
|
||||
@readers.delete(sock)
|
||||
@msgreaders.delete(sock)
|
||||
@receiver = nil if sock == @receiver
|
||||
sock.close_read() rescue nil # Ignore close errors
|
||||
check_done
|
||||
end # def remove_reader
|
||||
|
||||
private
|
||||
def remove(sock)
|
||||
remove_writer(sock)
|
||||
remove_reader(sock)
|
||||
end; # def remove
|
||||
|
||||
private
|
||||
def check_done
|
||||
@done = (@writers.length == 0 and @readers.length == 0 and
|
||||
@receiver_done or @server_done)
|
||||
end # def check_done
|
||||
|
||||
private
|
||||
def handle_in(socks)
|
||||
socks.each do |sock|
|
||||
if sock == @server
|
||||
server_handle(sock)
|
||||
elsif sock == @signal_observer
|
||||
# clear signal
|
||||
@signal_observer.sysread(1)
|
||||
else
|
||||
client_handle(sock)
|
||||
end
|
||||
end
|
||||
end # def handle_in
|
||||
|
||||
private
|
||||
def handle_out(socks)
|
||||
# Lock early in the event we have to handle lots of sockets or messages
|
||||
# Locking too much causes slowdowns.
|
||||
@writelock.synchronize do
|
||||
socks.each do |sock|
|
||||
ms = @msgoutstreams[sock]
|
||||
if ms.message_count == 0
|
||||
if @receiver_done and sock == @receiver
|
||||
remove_writer(sock)
|
||||
end
|
||||
else
|
||||
# There are messages to send...
|
||||
encoded = ms.encode
|
||||
data = [encoded.length, encoded.checksum, encoded].pack("NNA*")
|
||||
len = data.length
|
||||
begin
|
||||
# TODO(sissel): use nonblocking writes and keep track of what
|
||||
# data has been written successfully.
|
||||
bytes = sock.write(data)
|
||||
rescue Errno::ECONNRESET, Errno::EPIPE => e
|
||||
$stderr.puts "write error, dropping connection (#{e})"
|
||||
remove(sock)
|
||||
end
|
||||
ms.clear
|
||||
|
||||
# We flushed, remove this writer from the list of things
|
||||
# we care to write to, for now.
|
||||
@writers.delete(sock)
|
||||
end # else / ms.message_count == 0
|
||||
end # socks.each
|
||||
end # @writelock.synchronize
|
||||
end # def handle_out
|
||||
|
||||
private
|
||||
def server_handle(sock)
|
||||
client = sock.accept_nonblock
|
||||
add_socket(client)
|
||||
end # def server_handle
|
||||
|
||||
private
|
||||
def client_handle(sock)
|
||||
begin
|
||||
@msgreaders[sock].each do |msg|
|
||||
message_handle(msg, sock) do |response|
|
||||
_sendmsg(response, sock)
|
||||
end
|
||||
end
|
||||
rescue EOFError, IOError, Errno::ECONNRESET => e
|
||||
remove_reader(sock)
|
||||
if sock == @receiver
|
||||
raise MessageClientConnectionReset
|
||||
end
|
||||
end
|
||||
end # def client_handle
|
||||
|
||||
private
|
||||
def message_handle(msg, sock)
|
||||
if msg.is_a?(ResponseMessage) and @ackwait.include?(msg.id)
|
||||
@ackwait.delete(msg.id)
|
||||
end
|
||||
|
||||
msgtype = msg.class.name.split(":")[-1]
|
||||
handler = "#{msgtype}Handler"
|
||||
if self.respond_to?(handler)
|
||||
Thread.new do
|
||||
self.send(handler, msg) do |reply|
|
||||
#yield reply if reply != nil
|
||||
sendmsg(reply, sock)
|
||||
end
|
||||
end
|
||||
else
|
||||
$stderr.puts "No handler for message class '#{msg.class.name}'"
|
||||
end
|
||||
end # def message_handle
|
||||
end # class MessageSocketMux
|
||||
end; end # module LogStash::Net
|
Loading…
Add table
Add a link
Reference in a new issue