diff --git a/lib/logstash/inputs/relp.rb b/lib/logstash/inputs/relp.rb index 2ec67890b..6f98b1c9b 100644 --- a/lib/logstash/inputs/relp.rb +++ b/lib/logstash/inputs/relp.rb @@ -28,7 +28,7 @@ class LogStash::Inputs::Relp < LogStash::Inputs::Base public def register @logger.info("Starting relp input listener", :address => "#{@host}:#{@port}") - @server_socket = RelpServer.new(@host, @port) + @server_socket = RelpServer.new(@host, @port,['syslog']) end # def register private @@ -47,22 +47,32 @@ class LogStash::Inputs::Relp < LogStash::Inputs::Base public def run(output_queue) loop do - # Start a new thread for each connection. - Thread.start(@server_socket.accept) do |rs| - begin - relp_stream(@server_socket,output_queue,"relp://#{@host}:#{@port}/#{rs.peer}") - rescue Relp::ConnectionClosed => e - @logger.debug('Relp Connection to #{rs.peer} Closed') - rescue Relp::RelpError => e - @logger.warn('Relp error: '+e.class.to_s+' '+e.message) - #TODO: Still not happy with this, are they all warn level? - #Will this catch everything I want it to? - #Relp spec says to close connection on error, ensure this is the case - rs.serverclose - end - #Let garbage collection clear up after us - rs=nil - end # Thread.start + begin + # Start a new thread for each connection. + Thread.start(@server_socket.accept) do |rs| + begin + relp_stream(@server_socket,output_queue,"relp://#{@host}:#{@port}/#{rs.peer}") + rescue Relp::ConnectionClosed => e + @logger.debug('Relp Connection to #{rs.peer} Closed') + rescue Relp::RelpError => e + @logger.warn('Relp error: '+e.class.to_s+' '+e.message) + #TODO: Still not happy with this, are they all warn level? + #Will this catch everything I want it to? + #Relp spec says to close connection on error, ensure this is the case + rs.serverclose + end + #Let garbage collection clear up after us + rs=nil + end # Thread.start + rescue Relp::InsufficientCommands#TODO: why didn't it work when I included this is the same block as the other rescues? + @logger.warn('Relp client incapable of syslog') + end end # loop end # def run + + #TODO: Make sure this is doing the job properly + def teardown + @server_socket.serverclose + finished + end end # class LogStash::Inputs::Relp diff --git a/lib/logstash/util/relp.rb b/lib/logstash/util/relp.rb index c9a02000c..e47da87a3 100644 --- a/lib/logstash/util/relp.rb +++ b/lib/logstash/util/relp.rb @@ -1,27 +1,34 @@ require "socket" +#TODO: remove before release +require "pry" + class Relp#This isn't much use on its own, but gives RelpServer and RelpClient things RelpVersion='0'#TODO: spec says this is experimental, but rsyslog still seems to exclusively use it RelpSoftware='logstash,1.1.1,http://logstash.net'#TODO: this is a placeholder for now - RelpCommands=['syslog']#TODO: If this becomes a separate gem, make this variable, define required and optional ones class RelpError < StandardError; end class InvalidCommand < RelpError; end class InappropriateCommand < RelpError; end - class ConnectionClosed < RelpError; end + class ConnectionClosed < RelpError; end + class InsufficientCommands < RelpError; end - def self.valid_command?(command) + def valid_command?(command) valid_commands=Array.new - #Allow anything in the basic protocol + #Allow anything in the basic protocol for both directions valid_commands << 'open' valid_commands << 'close' - valid_commands << 'rsp' - #Allow anything we offered to accept - valid_commands += RelpCommands - #Don't accept serverclose or rsp as valid commands because this is the server TODO: generalise + #These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.) TODO: would they be invalid or just innapropriate? + valid_commands += @basic_relp_commands + + #These are extra commands that we require, otherwise refuse the connection TODO: some of these are only valid on one direction + valid_commands += @required_relp_commands + + #TODO: optional_relp_commands + #TODO: vague mentions of abort and starttls commands in spec need looking into return valid_commands.include?(command) end @@ -64,7 +71,7 @@ class Relp#This isn't much use on its own, but gives RelpServer and RelpClient t rescue EOFError raise ConnectionClosed #TODO: ECONNRESET end - if ! Relp.valid_command?(frame['command'])#TODO: is this enough to catch framing errors? + if ! self.valid_command?(frame['command'])#TODO: is this enough to catch framing errors? raise InvalidCommand,frame['command'] end return frame @@ -78,7 +85,14 @@ class RelpServer < Relp @socket.peeraddr[3]#TODO: is this the best thing to report? end - def initialize(host,port) + def initialize(host,port,required_commands=[]) + + #These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.) + @basic_relp_commands=['close']#TODO: check for others + + #These are extra commands that we require, otherwise refuse the connection + @required_relp_commands = required_commands + @server=TCPServer.new(host,port) end @@ -91,11 +105,12 @@ class RelpServer < Relp #if no version specified, relp spec says we must close connection self.serverclose raise RelpError, 'No relp_version specified' - elsif ! offer['commands'].split(',').include?('syslog') + #subtracting one array from the other checks to see if all elements in @required_relp_commands are present in the offer + elsif ! (@required_relp_commands - offer['commands'].split(',')).empty? #if it can't send us syslog it's useless to us; close the connection #TODO:Generalise relp class and make this optional (related to RelpCommands) self.serverclose - raise RelpError, 'Relp client incapable of syslog' + raise InsufficientCommands, offer['commands']+' offered, require '+@required_relp_commands.join(',') else #attempt to set up connection response_frame=Hash.new @@ -105,7 +120,7 @@ class RelpServer < Relp response_frame['message']='200 OK ' response_frame['message']+='relp_version='+RelpVersion+"\n" response_frame['message']+='relp_software='+RelpSoftware+"\n" - response_frame['message']+='commands='+RelpCommands.join(',') + response_frame['message']+='commands='+@required_relp_commands.join(',')#TODO: optional ones self.frame_write(response_frame) return self end @@ -141,7 +156,8 @@ class RelpServer < Relp frame['command']='serverclose' begin self.frame_write(frame) - @socket.close#TODO: shutdown? + @socket.shutdown + @socket.close#TODO: This isn't actually freeing up the port as shown by test_relp rescue#This catches the possibility of the client having already closed the connection end end @@ -157,14 +173,23 @@ end class RelpClient < Relp - def initialize(host,port) +# @relpcommands=['syslog']#TODO: If this becomes a separate gem, make this variable, define required and optional ones + + def initialize(host,port,required_commands=[]) + + #These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.) + @basic_relp_commands=['serverclose','rsp']#TODO: check for others + + #These are extra commands that we require, otherwise refuse the connection + @required_relp_commands = required_commands + @socket=TCPSocket.new(host,port) offer=Hash.new offer['txnr']=1 offer['command']='open' offer['message']='relp_version='+RelpVersion+"\n" offer['message']+='relp_software='+RelpSoftware+"\n" - offer['message']+='commands='+RelpCommands.join(',') + offer['message']+='commands='+@required_relp_commands.join(',')#TODO: add optional ones self.frame_write(offer) response_frame=self.frame_read raise RelpError if response_frame['message'][0,3]!='200' @@ -173,11 +198,13 @@ class RelpClient < Relp #if no version specified, relp spec says we must close connection self.close raise RelpError, 'No relp_version specified; offer: ',response_frame['message'][6..-1].scan(/^(.*)=(.*)$/).flatten - elsif ! response['commands'].split(',').include?('syslog') + + #subtracting one array from the other checks to see if all elements in @required_relp_commands are present in the offer + elsif ! (@required_relp_commands - response['commands'].split(',')).empty? #if it can't receive syslog it's useless to us; close the connection #TODO:Generalise relp class and make this optional (related to RelpCommands) self.close - raise RelpError, 'Relp server incapable of syslog' + raise InsufficientCommands, response['commands']+' offered, require '+@required_relp_commands.join(',') end #If we've got this far with no problems, we're good to go @lasttxnr=1 diff --git a/test/logstash/inputs/test_relp.rb b/test/logstash/inputs/test_relp.rb index 43c18cbf1..7f7653e21 100644 --- a/test/logstash/inputs/test_relp.rb +++ b/test/logstash/inputs/test_relp.rb @@ -10,6 +10,10 @@ require "logstash/util/relp" require "mocha" +#TODO: remove before release +require "pry" + + #TODO: I just copy/pasted all those^ which ones do I actually need? describe LogStash::Inputs::Relp do @@ -33,7 +37,7 @@ describe LogStash::Inputs::Relp do sleep(2) begin - rc=RelpClient.new('127.0.0.1',15515) + rc=RelpClient.new('127.0.0.1',15515,['syslog']) rc.syslog_write('This is the first relp test message') rc.syslog_write('This is the second relp test message') rc.syslog_write('This is the third relp test message') @@ -68,4 +72,12 @@ describe LogStash::Inputs::Relp do end end + test "RelpServer rejects invalid/innapropriate commands" do + #TODO: + end + + test "RelpServer refuses to connect if no syslog command available" do + #TODO: + end + end # testing for LogStash::Inputs::File