From 173840a8270ec65ae34b2a01d521569f5505f712 Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Tue, 18 Aug 2009 09:01:39 +0000 Subject: [PATCH] - Refactor network code to use EventMachine Message decoding is working, but I think we're silently dropping data somewhere. --- bin/agent.rb | 34 +---- bin/logstashd.rb | 5 +- lib/net/client.rb | 29 +++- lib/net/common.rb | 12 ++ lib/net/messagepacket.rb | 62 ++++++++ lib/net/server.rb | 22 ++- lib/net/servers/indexer.rb | 8 +- lib/net/socket.rb | 71 ++++++++++ lib/net/socketmux.rb | 281 ------------------------------------- 9 files changed, 202 insertions(+), 322 deletions(-) create mode 100644 lib/net/messagepacket.rb create mode 100644 lib/net/socket.rb delete mode 100644 lib/net/socketmux.rb diff --git a/bin/agent.rb b/bin/agent.rb index 657238912..7a8758e06 100755 --- a/bin/agent.rb +++ b/bin/agent.rb @@ -9,7 +9,7 @@ require 'socket' class Agent < LogStash::Net::MessageClient def initialize(host, port) - super() + super(host, port) @hostname = Socket.gethostname @host = host @port = port @@ -45,7 +45,7 @@ class Agent < LogStash::Net::MessageClient $stdout.write(".") $stdout.flush #puts "Trying to send: #{ier.inspect}" - sendmsg(ier) + @connection.sendmsg(ier) sent = true rescue LogStash::Net::NoSocket # No client connection available, wait. @@ -59,31 +59,6 @@ class Agent < LogStash::Net::MessageClient if msg.success? end end # def IndexEventResponseHandler - - def run - loop do - done = false - while !done - begin - done = connect(@host, @port); - rescue Errno::ECONNREFUSED => e - puts "Connection to #{@host}:#{@port} failed: #{e}" - puts "Sleeping for retry." - sleep 1 - end - end - puts "Connection OK" - - begin - loop do - sendrecv(nil) - end - rescue LogStash::Net::MessageClientConnectionReset - puts "Connection died, retrying..." - end - end - end # def run - end @@ -94,5 +69,8 @@ if $0 == __FILE__ end host, port = ARGV[0].split(":") agent = Agent.new(host, port) - agent.run + + agent.run do |i| + # nothing + end end diff --git a/bin/logstashd.rb b/bin/logstashd.rb index ca632646e..81a32c929 100644 --- a/bin/logstashd.rb +++ b/bin/logstashd.rb @@ -2,8 +2,9 @@ # require "rubygems" -require "lib/net/socketmux" require "lib/net/servers/indexer" s = LogStash::Net::Servers::Indexer.new -s.run +s.run do |i| + puts "OK" +end diff --git a/lib/net/client.rb b/lib/net/client.rb index 226676aac..67ebfb8b2 100644 --- a/lib/net/client.rb +++ b/lib/net/client.rb @@ -1,9 +1,28 @@ -require 'lib/net/socketmux' +require 'rubygems' +require 'eventmachine' +require 'lib/net/socket' +require 'lib/net/messages/ping.rb' module LogStash; module Net - # The MessageClient class exists only as an alias - # to the MessageSocketMux. You should use the - # client class if you are implementing a client. - class MessageClient < MessageSocketMux + class MessageClient + attr_reader :connection + + def initialize(host, port) + @host = host + @port = port + end + + def run + EventMachine.run do + connect(@host, @port) + end + end + + def connect(host, port) + @connection = EventMachine::connect(host, port, MessageSocket) do |m| + m.handler = self + end + end + end # class MessageClient end; end # module LogStash::Net diff --git a/lib/net/common.rb b/lib/net/common.rb index 27be811e8..3b4ccf2c0 100644 --- a/lib/net/common.rb +++ b/lib/net/common.rb @@ -3,6 +3,18 @@ require 'zlib' module LogStash; module Net; MAXMSGLEN = (1 << 20) # one megabyte message blocks + class MessageCorrupt < StandardError + attr_reader :expected_checksum + attr_reader :data + + def initialize(checksum, data) + @expected_checksum = checksum + @data = data + super("Corrupt message read. Expected checksum #{checksum}, got " + + "#{data.checksum}") + end # def initialize + end # class MessageReaderCorruptMessage + end; end # module LogStash::Net # Add adler32 checksum from Zlib to String class diff --git a/lib/net/messagepacket.rb b/lib/net/messagepacket.rb new file mode 100644 index 000000000..da19bc560 --- /dev/null +++ b/lib/net/messagepacket.rb @@ -0,0 +1,62 @@ +require 'rubygems' +require 'lib/net/common' +require 'json' + +module LogStash; module Net + class MessagePacket + # 4 byte length + # 4 byte checksum + HEADERSIZE = 8 + + def self.each(data) + done = false + while !done + have = data.length + need = HEADERSIZE + if have >= need + need = data.unpack("N")[0] + HEADERSIZE + if have >= need + yield MessagePacket.new_from_encoded(data[0 .. need - 1]) + else + done = true + end + else + done = true + end + data[0 .. need - 1] = "" + end + end + + def self.new_from_encoded(string) + len = string.unpack("N")[0] + len, checksum, data = string.unpack("NNA#{len}") + + return MessagePacket.new(data, len=len, checksum=checksum) + end + + def initialize(data, len=nil, checksum=nil) + @content = data + @length = (len or data.length) + @checksum = (checksum or data.checksum) + + verify if length and checksum + end + + def verify + if (@content.checksum != @checksum or @content.length != @length) + $stderr.puts "FAIL" + raise MessageCorrupt.new(@checksum, @content) + end + end + + def encode + return [@length, @checksum, @content].pack("NNA*") + end + + public + attr_reader :length + attr_reader :content + attr_reader :checksum + + end # class MessagePacket +end; end # module LogStash::Net diff --git a/lib/net/server.rb b/lib/net/server.rb index cfa16dd83..747536367 100644 --- a/lib/net/server.rb +++ b/lib/net/server.rb @@ -1,9 +1,27 @@ -require 'lib/net/socketmux' +require 'rubygems' +require 'eventmachine' +require 'lib/net/socket' module LogStash; module Net # The MessageServer class exists only as an alias # to the MessageSocketMux. You should use the # client class if you are implementing a client. - class MessageServer < MessageSocketMux + class MessageServer + def initialize(host, port) + @host = host + @port = port + end + + def run + EventMachine.run do + listen(@host, @port) + end + end + + def listen(host, port) + EventMachine::start_server(host, port, MessageSocket) do |m| + m.handler = self + end + end end # class MessageServer end; end # module LogStash::Net diff --git a/lib/net/servers/indexer.rb b/lib/net/servers/indexer.rb index bf1e67033..a2bba6184 100644 --- a/lib/net/servers/indexer.rb +++ b/lib/net/servers/indexer.rb @@ -17,8 +17,7 @@ module LogStash; module Net; module Servers def initialize(addr="0.0.0.0", port=3001) # 'super' is not the same as 'super()', and we want super(). - super() - listen(addr, port) + super(addr, port) @indexes = Hash.new @lines = Hash.new { |h,k| h[k] = 0 } @indexcount = 0 @@ -29,7 +28,8 @@ module LogStash; module Net; module Servers response.id = request.id @indexcount += 1 - print "\rK#{@indexcount}" + print "\rK#{@indexcount} (vs #{request.id})" + #puts "#{@indexcount} (id: #{request.id})" log_type = request.log_type entry = $logs[log_type].parse_entry(request.log_data) @@ -108,7 +108,7 @@ module LogStash; module Net; module Servers end # Special 'run' override because we want sync to disk once per minute. - def run + def _run synctime = Time.now + SYNCDELAY sleeptime = 1 loop do diff --git a/lib/net/socket.rb b/lib/net/socket.rb new file mode 100644 index 000000000..62eb78032 --- /dev/null +++ b/lib/net/socket.rb @@ -0,0 +1,71 @@ +require 'rubygems' +require 'lib/net/messagepacket' +require 'eventmachine' + +module LogStash; module Net + # The MessageClient class exists only as an alias + # to the MessageSocketMux. You should use the + # client class if you are implementing a client. + class MessageSocket < EventMachine::Connection + # connection init callback from EventMachine::Connection + def post_init + #set_comm_inactivity_timeout(30) + @buffer = "" + end + + # data receiver callback from EventMachine::Connection + def receive_data(data) + @buffer += data + + len = 0 + count = 0 + MessagePacket.each(@buffer) do |packet| + len += packet.length + count += 1 + obj = JSON::load(packet.content) + msg = Message.new_from_data(obj) + + if !@handler + $stderr.puts "No message handler set. Can't handle #{msg.class.name}" + next + end + + name = msg.class.name.split(":")[-1] + func = "#{name}Handler" + if @handler.respond_to?(func): + operation = lambda do + @handler.send(func, msg) do |response| + sendmsg(response) + end + end + EventMachine.defer(operation, nil) + #@handler.send(func, msg) do |response| + #sendmsg(response) + #end + else + $stderr.puts "#{@handler.class.name} does not support #{func}" + end + end + + if len > 0 + puts "Removing #{len} bytes (#{count} packets)" + @buffer[0 .. len - 1] = "" + end + end # def receive_data + + def sendmsg(msg) + if msg.is_a?(RequestMessage) and msg.id == nil + msg.generate_id! + end + + data = msg.to_json + packet = MessagePacket.new(data) + #puts "Sending: #{packet.encode.inspect}" + send_data(packet.encode) + end + + def handler=(obj) + @handler = obj + end + end # class MessageSocket +end; end # module LogStash::Net diff --git a/lib/net/socketmux.rb b/lib/net/socketmux.rb deleted file mode 100644 index 70678687c..000000000 --- a/lib/net/socketmux.rb +++ /dev/null @@ -1,281 +0,0 @@ -require 'lib/net/common' -require 'lib/net/message' -require 'lib/net/messagestream' -require 'lib/net/messagereader' -require 'set' -require 'socket' -require 'thread' -require 'time' - -module LogStash; module Net - class MessageClientConnectionReset < StandardError; end - class NoSocket < StandardError; end - - class MessageSocketMux - def initialize - @writelock = Mutex.new - @server = nil - @receiver = nil - - # signal and signal observer are for allowing us to break - # out of the select() call whenever sendmsg() is invoked. - # sendmsg() puts a new writer on the list of @writers and we need - # to rerun the select() to pick that change up. - # We maybe should switch to EventMachine (like libevent) for - # doing this event handling nonsense for us. - @signal, @signal_observer = Socket::socketpair(Socket::PF_LOCAL, - Socket::SOCK_DGRAM, 0) - - # server_done is unused right now - @server_done = false - @receiver_done = false - - # Socket list for readers and writers - @readers = [@signal_observer] - @writers = [] - - @msgoutstreams = Hash.new do - |h,k| h[k] = LogStash::Net::MessageStream.new - end - @msgreaders = Hash.new do |h,k| - h[k] = LogStash::Net::MessageReader.new(k) - end - - @ackwait = Set.new - @done = false - end - - # Set up a server and listen on a port - def listen(addr="0.0.0.0", port=0) - @server = TCPServer.new(addr, port) - @readers << @server - end - - # Connect to a remote server - def connect(addr="0.0.0.0", port=0) - @receiver = TCPSocket.new(addr, port) - add_socket(@receiver) - return true - end - - # Send a message. This method queues the message in the outbound - # message queue. To actually make the message get sent on the wire - # you need to call MessageSocketMux#sendrecv or MessageSocketMux#run - # - # If you are implementing a client, you can omit the 'sock' argument - # because it will automatically send to the server you connected to. - # If a socket is given, send to that specific socket. - def sendmsg(msg, sock=nil) - @writelock.synchronize do - _sendmsg(msg, sock) - end - end - - # Run indefinitely. - # Ending conditions are when there are no sockets left open. - # If you want to terminate the server (#listen or #connect) then - # call MessageSocketMux#close - def run - while !@done - sendrecv(nil) - end - end - - # Wait for network data (input and output) for the given timeout - # If timeout is nil, we will wait until there is data. - # If timeout is a positive number, we will wait that number of seconds - # or until there is data - whichever is first. - # If timeout is zero, we will not wait at all for data. - # - # Returns true if there was network data handled, false otherwise. - def sendrecv(timeout=nil) - writers = @writers.select { |w| @msgoutstreams.has_key?(w) } - had_receiver = @receiver != nil - s_in, s_out, s_err = IO.select(@readers, writers, nil, timeout) - - handle_in(s_in) if s_in - handle_out(s_out) if s_out - - # If we had a client (via connect()) before, but we don't now, - # raise an exception so the client can make a decision. - if (had_receiver and @receiver == nil) - raise MessageClientConnectionReset - end - - # Return true if we got data at all. - return (s_in != nil or s_out != nil) - end - - def close - if @receiver - @receiver_done = true - # Don't close our writer yet. Wait until our outbound queue is empty. - end - - if @server - @server_done = true - # Stop accepting new connections. - remove_reader(@server) - end - end - - private - def add_socket(sock) - @readers << sock - #@writers << sock - end - - private - def _sendmsg(msg, sock=nil) - if msg == nil - raise "msg is nil" - end - - # Handle if 'msg' is actually an array of messages - if msg.is_a?(Array) - msg.each do |m| - _sendmsg(m, sock) - end - return - end - - if msg.is_a?(RequestMessage) and msg.id == nil - msg.generate_id! - end - - sock = (sock or @receiver) - if sock == nil - raise NoSocket - end - if !@writers.include?(sock) - @writers << sock - @signal.write("x") - end - @msgoutstreams[sock] << msg - - @ackwait << msg.id - end # def _sendmsg - - private - def remove_writer(sock) - puts "remove writer: #{caller[0]}" - @writers.delete(sock) - @msgoutstreams.delete(sock) - @receiver = nil if sock == @receiver - sock.close_write() rescue nil # Ignore close errors - check_done - end # def remove_writer - - private - def remove_reader(sock) - puts "remove reader: #{caller[0]}" - @readers.delete(sock) - @msgreaders.delete(sock) - @receiver = nil if sock == @receiver - sock.close_read() rescue nil # Ignore close errors - check_done - end # def remove_reader - - private - def remove(sock) - remove_writer(sock) - remove_reader(sock) - end; # def remove - - private - def check_done - @done = (@writers.length == 0 and @readers.length == 0 and - @receiver_done or @server_done) - end # def check_done - - private - def handle_in(socks) - socks.each do |sock| - if sock == @server - server_handle(sock) - elsif sock == @signal_observer - # clear signal - @signal_observer.sysread(1) - else - client_handle(sock) - end - end - end # def handle_in - - private - def handle_out(socks) - # Lock early in the event we have to handle lots of sockets or messages - # Locking too much causes slowdowns. - @writelock.synchronize do - socks.each do |sock| - ms = @msgoutstreams[sock] - if ms.message_count == 0 - if @receiver_done and sock == @receiver - remove_writer(sock) - end - else - # There are messages to send... - encoded = ms.encode - data = [encoded.length, encoded.checksum, encoded].pack("NNA*") - len = data.length - begin - # TODO(sissel): use nonblocking writes and keep track of what - # data has been written successfully. - bytes = sock.write(data) - rescue Errno::ECONNRESET, Errno::EPIPE => e - $stderr.puts "write error, dropping connection (#{e})" - remove(sock) - end - ms.clear - - # We flushed, remove this writer from the list of things - # we care to write to, for now. - @writers.delete(sock) - end # else / ms.message_count == 0 - end # socks.each - end # @writelock.synchronize - end # def handle_out - - private - def server_handle(sock) - client = sock.accept_nonblock - add_socket(client) - end # def server_handle - - private - def client_handle(sock) - begin - @msgreaders[sock].each do |msg| - message_handle(msg, sock) do |response| - _sendmsg(response, sock) - end - end - rescue EOFError, IOError, Errno::ECONNRESET => e - remove_reader(sock) - if sock == @receiver - raise MessageClientConnectionReset - end - end - end # def client_handle - - private - def message_handle(msg, sock) - if msg.is_a?(ResponseMessage) and @ackwait.include?(msg.id) - @ackwait.delete(msg.id) - end - - msgtype = msg.class.name.split(":")[-1] - handler = "#{msgtype}Handler" - if self.respond_to?(handler) - Thread.new do - self.send(handler, msg) do |reply| - #yield reply if reply != nil - sendmsg(reply, sock) - end - end - else - $stderr.puts "No handler for message class '#{msg.class.name}'" - end - end # def message_handle - end # class MessageSocketMux -end; end # module LogStash::Net