diff --git a/lib/logstash/inputs/relp.rb b/lib/logstash/inputs/relp.rb new file mode 100644 index 000000000..6f9a444aa --- /dev/null +++ b/lib/logstash/inputs/relp.rb @@ -0,0 +1,77 @@ +require "logstash/inputs/base" +require "logstash/namespace" +require "logstash/util/relp" + + +# Read RELP events over a TCP socket. +# +#Application level acknowledgements allow assurance of no message loss. +# +#This only functions as far as messages being put into the queue for filters- +# anything lost after that point will not be retransmitted +class LogStash::Inputs::Relp < LogStash::Inputs::Base + + config_name "relp" + plugin_status "experimental" + + # The address to listen on. + config :host, :validate => :string, :default => "0.0.0.0" + + # The port to listen on. + config :port, :validate => :number, :required => true + + def initialize(*args) + super(*args) + end # def initialize + + public + def register + @logger.info("Starting relp input listener", :address => "#{@host}:#{@port}") + @relp_server = RelpServer.new(@host, @port,['syslog']) + end # def register + + private + def relp_stream(relpsocket,output_queue,event_source) + loop do + frame = relpsocket.syslog_read + event = self.to_event(frame['message'],event_source) + output_queue << event + #To get this far, the message must have made it into the queue for + #filtering. I don't think it's possible to wait for output before ack + #without fundamentally breaking the plugin architecture + relpsocket.ack(frame['txnr']) + end + end + + public + def run(output_queue) + loop do + begin + # Start a new thread for each connection. + Thread.start(@relp_server.accept) do |rs| + @logger.debug("Relp Connection to #{rs.peer} created") + begin + relp_stream(rs,output_queue,"relp://#{@host}:#{@port}/#{rs.peer}") + rescue Relp::ConnectionClosed => e + @logger.debug("Relp Connection to #{rs.peer} Closed") + rescue Relp::RelpError => e + @logger.warn('Relp error: '+e.class.to_s+' '+e.message) + #TODO: Still not happy with this, are they all warn level? + #Will this catch everything I want it to? + #Relp spec says to close connection on error, ensure this is the case + end + end # Thread.start + rescue Relp::InvalidCommand,Relp::InappropriateCommand => e + @logger.warn('Relp client trying to open connection with something other than open:'+e.message) + rescue Relp::InsufficientCommands + @logger.warn('Relp client incapable of syslog') + end + end # loop + end # def run + + def teardown + @relp_server.shutdown + end +end # class LogStash::Inputs::Relp + +#TODO: structured error logging diff --git a/lib/logstash/util/relp.rb b/lib/logstash/util/relp.rb new file mode 100644 index 000000000..0aed09a9c --- /dev/null +++ b/lib/logstash/util/relp.rb @@ -0,0 +1,321 @@ +require "socket" + +class Relp#This isn't much use on its own, but gives RelpServer and RelpClient things + + RelpVersion = '0'#TODO: spec says this is experimental, but rsyslog still seems to exclusively use it + RelpSoftware = 'logstash,1.1.1,http://logstash.net' + + class RelpError < StandardError; end + class InvalidCommand < RelpError; end + class InappropriateCommand < RelpError; end + class ConnectionClosed < RelpError; end + class InsufficientCommands < RelpError; end + + def valid_command?(command) + valid_commands = Array.new + + #Allow anything in the basic protocol for both directions + valid_commands << 'open' + valid_commands << 'close' + + #These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.) TODO: would they be invalid or just innapropriate? + valid_commands += @basic_relp_commands + + #These are extra commands that we require, otherwise refuse the connection TODO: some of these are only valid on one direction + valid_commands += @required_relp_commands + + #TODO: optional_relp_commands + + #TODO: vague mentions of abort and starttls commands in spec need looking into + return valid_commands.include?(command) + end + + def frame_write(frame) + unless self.server? #I think we have to trust a server to be using the correct txnr + #Only allow txnr to be 0 or be determined automatically + frame['txnr'] = self.nexttxnr unless frame['txnr']==0 + end + frame['txnr'] = frame['txnr'].to_s + frame['message'] = '' if frame['message'].nil? + frame['datalen'] = frame['message'].length.to_s + wiredata=[ + frame['txnr'], + frame['command'], + frame['datalen'], + frame['message'] + ].join(' ').strip + begin + @socket.write(wiredata) + #Ending each frame with a newline is required in the specifications + #Doing it a separately is useful (but a bit of a bodge) because + #for some reason it seems to take 2 writes after the server closes the + #connection before we get an exception + @socket.write("\n") + rescue Errno::EPIPE,IOError,Errno::ECONNRESET#TODO: is this sufficient to catch all broken connections? + raise ConnectionClosed + end + frame['txnr'].to_i + end + + def frame_read + begin + frame = Hash.new + frame['txnr'] = @socket.readline(' ').strip.to_i + frame['command'] = @socket.readline(' ').strip + + #Things get a little tricky here because if the length is 0 it is not followed by a space. + leading_digit=@socket.read(1) + if leading_digit=='0' then + frame['datalen'] = 0 + frame['message'] = '' + else + frame['datalen'] = (leading_digit + @socket.readline(' ')).strip.to_i + frame['message'] = @socket.read(frame['datalen']) + end + rescue EOFError,Errno::ECONNRESET,IOError + raise ConnectionClosed + end + if ! self.valid_command?(frame['command'])#TODO: is this enough to catch framing errors? + if self.server? + self.serverclose + else + self.close + end + raise InvalidCommand,frame['command'] + end + return frame + end + + def server? + @server + end + +end + +class RelpServer < Relp + + def peer + if @peer.nil? + @peer = @socket.peeraddr[3]#TODO: is this the best thing to report? I don't think so... + end + @peer + end + + def initialize(host,port,required_commands=[]) + + @server=true + + #These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.) + @basic_relp_commands = ['close']#TODO: check for others + + #These are extra commands that we require, otherwise refuse the connection + @required_relp_commands = required_commands + + @server=TCPServer.new(host,port)#TODO: rescue if port is already in use (Errno::EADDRINUSE) + end + + def accept + @socket=@server.accept + frame=self.frame_read + if frame['command'] == 'open' + offer=Hash[*frame['message'].scan(/^(.*)=(.*)$/).flatten] + if offer['relp_version'].nil? + #if no version specified, relp spec says we must close connection + self.serverclose + raise RelpError, 'No relp_version specified' + #subtracting one array from the other checks to see if all elements in @required_relp_commands are present in the offer + elsif ! (@required_relp_commands - offer['commands'].split(',')).empty? + #Tell them why we're closing the connection: + response_frame = Hash.new + response_frame['txnr'] = frame['txnr'] + response_frame['command'] = 'rsp' + response_frame['message'] = '500 Required command(s) ' + + (@required_relp_commands - offer['commands'].split(',')).join(',') + + ' not offered' + self.frame_write(response_frame) + + self.serverclose + raise InsufficientCommands, offer['commands'] + + ' offered, require ' + @required_relp_commands.join(',') + else + #attempt to set up connection + response_frame = Hash.new + response_frame['txnr'] = frame['txnr'] + response_frame['command'] = 'rsp' + + response_frame['message'] = '200 OK ' + response_frame['message'] += 'relp_version=' + RelpVersion + "\n" + response_frame['message'] += 'relp_software=' + RelpSoftware + "\n" + response_frame['message'] += 'commands=' + @required_relp_commands.join(',')#TODO: optional ones + self.frame_write(response_frame) + return self + end + else + self.serverclose + raise InappropriateCommand, frame['command'] + ' expecting open' + end + end + + #This does not ack the frame, just reads it + def syslog_read + frame = self.frame_read + if frame['command'] == 'syslog' + return frame + elsif frame['command'] == 'close' + #the client is closing the connection, acknowledge the close and act on it + response_frame = Hash.new + response_frame['txnr'] = frame['txnr'] + response_frame['command'] = 'rsp' + self.frame_write(response_frame) + self.serverclose + raise ConnectionClosed + else + #the client is trying to do something unexpected + self.serverclose + raise InappropriateCommand, frame['command'] + ' expecting syslog' + end + end + + def serverclose + frame = Hash.new + frame['txnr'] = 0 + frame['command'] = 'serverclose' + begin + self.frame_write(frame) + @socket.close + rescue ConnectionClosed + end + end + + def shutdown + @server.shutdown + @server.close + rescue Exception#@server might already be down + end + + def ack(txnr) + frame = Hash.new + frame['txnr'] = txnr + frame['command'] = 'rsp' + frame['message'] = '200 OK' + self.frame_write(frame) + end + +end + +#This is only used by the tests; any problems here are not as important as elsewhere +class RelpClient < Relp + + def initialize(host,port,required_commands = [],buffer_size = 128, + retransmission_timeout=10) + + @server = false + @buffer = Hash.new + + @buffer_size = buffer_size + @retransmission_timeout = retransmission_timeout + + #These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.) + @basic_relp_commands = ['serverclose','rsp']#TODO: check for others + + #These are extra commands that we require, otherwise refuse the connection + @required_relp_commands = required_commands + + @socket=TCPSocket.new(host,port) + + #This'll start the automatic frame numbering + @lasttxnr = 0 + + offer=Hash.new + offer['command'] = 'open' + offer['message'] = 'relp_version=' + RelpVersion + "\n" + offer['message'] += 'relp_software=' + RelpSoftware + "\n" + offer['message'] += 'commands=' + @required_relp_commands.join(',')#TODO: add optional ones + self.frame_write(offer) + response_frame = self.frame_read + + unless response_frame['message'][0,3] == '200' + raise RelpError,response_frame['message'] + end + + response=Hash[*response_frame['message'][7..-1].scan(/^(.*)=(.*)$/).flatten] + if response['relp_version'].nil? + #if no version specified, relp spec says we must close connection + self.close + raise RelpError, 'No relp_version specified; offer: ' + + response_frame['message'][6..-1].scan(/^(.*)=(.*)$/).flatten + + #subtracting one array from the other checks to see if all elements in @required_relp_commands are present in the offer + elsif ! (@required_relp_commands - response['commands'].split(',')).empty? + #if it can't receive syslog it's useless to us; close the connection + self.close + raise InsufficientCommands, response['commands'] + ' offered, require ' + + @required_relp_commands.join(',') + end + #If we've got this far with no problems, we're good to go + + #This thread deals with responses that come back + reader = Thread.start do |parent| + loop do + f = self.frame_read + if f['command'] == 'rsp' && f['message'] == '200 OK' + @buffer.delete(f['txnr']) + elsif f['command'] == 'rsp' && f['message'][0,1] == '5' + #TODO: What if we get an error for something we're already retransmitted due to timeout? + new_txnr = self.frame_write(@buffer[f['txnr']]) + @buffer[new_txnr] = @buffer[f['txnr']] + @buffer.delete(f['txnr']) + elsif f['command'] == 'serverclose' || f['txnr'] == @close_txnr + parent.raise ConnectionClosed#TODO: raising errors like this makes no sense + else + #Don't know what's going on if we get here, but it can't be good + parent.raise RelpError#TODO: raising errors like this makes no sense + end + end + end + + #While this one deals with frames for which we get no reply + Thread.start do + old_buffer = Hash.new + loop do + #This returns old txnrs that are still present + (@buffer.keys & old_buffer.keys).each do |txnr| + new_txnr = self.frame_write(@buffer[txnr]) + @buffer[new_txnr] = @buffer[txnr] + @buffer.delete(txnr) + end + old_buffer = @buffer + sleep @retransmission_timeout + end + end + end + + #TODO: have a way to get back unacked messages on close + def close + frame = Hash.new + frame['command'] = 'close' + @close_txnr=self.frame_write(frame) + #TODO: ought to properly wait for a reply etc. The serverclose will make it work though + sleep @retransmission_timeout + @socket.close#TODO: shutdown? + return @buffer + end + + def syslog_write(logline) + + #If the buffer is already full, wait until a gap opens up + sleep 0.1 until @buffer.length<@buffer_size + + frame = Hash.new + frame['command'] = 'syslog' + frame['message'] = logline + + txnr = self.frame_write(frame) + @buffer[txnr] = frame + end + + def nexttxnr + @lasttxnr += 1 + end + +end diff --git a/test/logstash/inputs/test_relp.rb b/test/logstash/inputs/test_relp.rb new file mode 100644 index 000000000..ed73f93fd --- /dev/null +++ b/test/logstash/inputs/test_relp.rb @@ -0,0 +1,134 @@ +require "rubygems" +require File.join(File.dirname(__FILE__), "..", "minitest") + +require "logstash/loadlibs" +require "logstash/testcase" +require "logstash/agent" +require "logstash/logging" +require "logstash/inputs/relp" +require "logstash/util/relp" + +require "mocha" + +#TODO: I just copy/pasted all those^ which ones do I actually need? + +describe LogStash::Inputs::Relp do + + before do + #TODO: port 15515 is what I tend to use; pick a default? + @input = LogStash::Inputs::Relp.new("type" => ["relp"], + "host" => ["127.0.0.1"], "port" => [15515]) + @input.register + end # before + + after do + @input.teardown + end # after + + test "Basic handshaking/message transmission" do + queue = Queue.new + thread = Thread.new { @input.run(queue) } + + # Let the input start listening. This is a crappy solution, but until + # plugins can notify "I am ready!" testing will be a bit awkward. + # TODO: could we not pull this from the logger? + sleep(2) + + begin + rc=RelpClient.new('127.0.0.1',15515,['syslog']) + rc.syslog_write('This is the first relp test message') + rc.syslog_write('This is the second relp test message') + rc.syslog_write('This is the third relp test message') + rc.syslog_write('This is the fourth relp test message') + rc.syslog_write('This is the fifth relp test message') + count=5 + + rc.close + + events=[] + + start = Time.new + + # Allow maximum of 2 seconds for events to show up + while (Time.new - start) < 2 && events.size != count + begin + event = queue.pop(true) # don't block + events << event if event + rescue ThreadError => e + # Fail on anything other than "queue empty" + raise e if e.to_s != "queue empty" + sleep(0.05) + end + end + + assert_equal(count, events.size, "Wanted #{count}, but got #{events.size} events") + assert_equal("This is the first relp test message", events.first.message) + assert_equal("This is the fifth relp test message", events.last.message) + + rescue Relp::RelpError => re + flunk re.class.to_s + ': ' + re.to_s#TODO: is there not a proper way to do this? + end + end + + test "RelpServer rejects invalid commands" do + #Need it to close the connection, but not bring down the whole server + queue = Queue.new + thread = Thread.new { @input.run(queue) } + + logger=Queue.new + (@input.instance_eval { @logger }).subscribe(logger) + + assert_raises(Relp::ConnectionClosed) do + rc = RelpClient.new('127.0.0.1',15515,['syslog']) + badframe = Hash.new + badframe['command'] = 'badcommand' + rc.frame_write(badframe) + #We can't detect that it's closed until we try to write to it again + #(delay is required for connection to be closed) + sleep 1 + rc.frame_write(badframe) + end + assert_equal("Relp error: Relp::InvalidCommand badcommand",logger.pop[:message]) + end + + test "RelpServer rejects inappropriate commands" do + #Need it to close the connection, but not bring down the whole server + queue = Queue.new + thread = Thread.new { @input.run(queue) } + + logger = Queue.new + (@input.instance_eval { @logger }).subscribe(logger) + + assert_raises(Relp::ConnectionClosed) do + rc = RelpClient.new('127.0.0.1',15515,['syslog']) + badframe = Hash.new + badframe['command'] = 'open'#it's not expecting open again + rc.frame_write(badframe) + #We can't detect that it's closed until we try to write to it again + #(but with something other than an open) + sleep 1 + badframe['command'] = 'syslog' + rc.frame_write(badframe) + end + assert_equal("Relp error: Relp::InappropriateCommand open expecting syslog", + logger.pop[:message]) + + end + + test "RelpServer refuses to connect if no syslog command available" do + + logger = Queue.new + (@input.instance_eval { @logger }).subscribe(logger) + + assert_raises(Relp::RelpError) do + queue = Queue.new + thread = Thread.new { @input.run(queue) } + rc = RelpClient.new('127.0.0.1',15515) + end + + assert_equal("Relp client incapable of syslog",logger.pop[:message]) + end + +end # testing for LogStash::Inputs::File + +#TODO: structured error logging