From 5db587238556003f8a8031f97ec4f867b563e4ec Mon Sep 17 00:00:00 2001 From: Mike Worth Date: Mon, 30 Jul 2012 17:21:57 +0100 Subject: [PATCH] Made required commands in handshake variables Syslog is no longer hardcoded in as the one required command, you can pass an array to RelpServer or RelpClient on creation. All elements in this array must be offered by the other participant for the handshake to complete. --- lib/logstash/inputs/relp.rb | 44 ++++++++++++--------- lib/logstash/util/relp.rb | 63 ++++++++++++++++++++++--------- test/logstash/inputs/test_relp.rb | 14 ++++++- 3 files changed, 85 insertions(+), 36 deletions(-) diff --git a/lib/logstash/inputs/relp.rb b/lib/logstash/inputs/relp.rb index 2ec67890b..6f98b1c9b 100644 --- a/lib/logstash/inputs/relp.rb +++ b/lib/logstash/inputs/relp.rb @@ -28,7 +28,7 @@ class LogStash::Inputs::Relp < LogStash::Inputs::Base public def register @logger.info("Starting relp input listener", :address => "#{@host}:#{@port}") - @server_socket = RelpServer.new(@host, @port) + @server_socket = RelpServer.new(@host, @port,['syslog']) end # def register private @@ -47,22 +47,32 @@ class LogStash::Inputs::Relp < LogStash::Inputs::Base public def run(output_queue) loop do - # Start a new thread for each connection. - Thread.start(@server_socket.accept) do |rs| - begin - relp_stream(@server_socket,output_queue,"relp://#{@host}:#{@port}/#{rs.peer}") - rescue Relp::ConnectionClosed => e - @logger.debug('Relp Connection to #{rs.peer} Closed') - rescue Relp::RelpError => e - @logger.warn('Relp error: '+e.class.to_s+' '+e.message) - #TODO: Still not happy with this, are they all warn level? - #Will this catch everything I want it to? - #Relp spec says to close connection on error, ensure this is the case - rs.serverclose - end - #Let garbage collection clear up after us - rs=nil - end # Thread.start + begin + # Start a new thread for each connection. + Thread.start(@server_socket.accept) do |rs| + begin + relp_stream(@server_socket,output_queue,"relp://#{@host}:#{@port}/#{rs.peer}") + rescue Relp::ConnectionClosed => e + @logger.debug('Relp Connection to #{rs.peer} Closed') + rescue Relp::RelpError => e + @logger.warn('Relp error: '+e.class.to_s+' '+e.message) + #TODO: Still not happy with this, are they all warn level? + #Will this catch everything I want it to? + #Relp spec says to close connection on error, ensure this is the case + rs.serverclose + end + #Let garbage collection clear up after us + rs=nil + end # Thread.start + rescue Relp::InsufficientCommands#TODO: why didn't it work when I included this is the same block as the other rescues? + @logger.warn('Relp client incapable of syslog') + end end # loop end # def run + + #TODO: Make sure this is doing the job properly + def teardown + @server_socket.serverclose + finished + end end # class LogStash::Inputs::Relp diff --git a/lib/logstash/util/relp.rb b/lib/logstash/util/relp.rb index c9a02000c..e47da87a3 100644 --- a/lib/logstash/util/relp.rb +++ b/lib/logstash/util/relp.rb @@ -1,27 +1,34 @@ require "socket" +#TODO: remove before release +require "pry" + class Relp#This isn't much use on its own, but gives RelpServer and RelpClient things RelpVersion='0'#TODO: spec says this is experimental, but rsyslog still seems to exclusively use it RelpSoftware='logstash,1.1.1,http://logstash.net'#TODO: this is a placeholder for now - RelpCommands=['syslog']#TODO: If this becomes a separate gem, make this variable, define required and optional ones class RelpError < StandardError; end class InvalidCommand < RelpError; end class InappropriateCommand < RelpError; end - class ConnectionClosed < RelpError; end + class ConnectionClosed < RelpError; end + class InsufficientCommands < RelpError; end - def self.valid_command?(command) + def valid_command?(command) valid_commands=Array.new - #Allow anything in the basic protocol + #Allow anything in the basic protocol for both directions valid_commands << 'open' valid_commands << 'close' - valid_commands << 'rsp' - #Allow anything we offered to accept - valid_commands += RelpCommands - #Don't accept serverclose or rsp as valid commands because this is the server TODO: generalise + #These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.) TODO: would they be invalid or just innapropriate? + valid_commands += @basic_relp_commands + + #These are extra commands that we require, otherwise refuse the connection TODO: some of these are only valid on one direction + valid_commands += @required_relp_commands + + #TODO: optional_relp_commands + #TODO: vague mentions of abort and starttls commands in spec need looking into return valid_commands.include?(command) end @@ -64,7 +71,7 @@ class Relp#This isn't much use on its own, but gives RelpServer and RelpClient t rescue EOFError raise ConnectionClosed #TODO: ECONNRESET end - if ! Relp.valid_command?(frame['command'])#TODO: is this enough to catch framing errors? + if ! self.valid_command?(frame['command'])#TODO: is this enough to catch framing errors? raise InvalidCommand,frame['command'] end return frame @@ -78,7 +85,14 @@ class RelpServer < Relp @socket.peeraddr[3]#TODO: is this the best thing to report? end - def initialize(host,port) + def initialize(host,port,required_commands=[]) + + #These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.) + @basic_relp_commands=['close']#TODO: check for others + + #These are extra commands that we require, otherwise refuse the connection + @required_relp_commands = required_commands + @server=TCPServer.new(host,port) end @@ -91,11 +105,12 @@ class RelpServer < Relp #if no version specified, relp spec says we must close connection self.serverclose raise RelpError, 'No relp_version specified' - elsif ! offer['commands'].split(',').include?('syslog') + #subtracting one array from the other checks to see if all elements in @required_relp_commands are present in the offer + elsif ! (@required_relp_commands - offer['commands'].split(',')).empty? #if it can't send us syslog it's useless to us; close the connection #TODO:Generalise relp class and make this optional (related to RelpCommands) self.serverclose - raise RelpError, 'Relp client incapable of syslog' + raise InsufficientCommands, offer['commands']+' offered, require '+@required_relp_commands.join(',') else #attempt to set up connection response_frame=Hash.new @@ -105,7 +120,7 @@ class RelpServer < Relp response_frame['message']='200 OK ' response_frame['message']+='relp_version='+RelpVersion+"\n" response_frame['message']+='relp_software='+RelpSoftware+"\n" - response_frame['message']+='commands='+RelpCommands.join(',') + response_frame['message']+='commands='+@required_relp_commands.join(',')#TODO: optional ones self.frame_write(response_frame) return self end @@ -141,7 +156,8 @@ class RelpServer < Relp frame['command']='serverclose' begin self.frame_write(frame) - @socket.close#TODO: shutdown? + @socket.shutdown + @socket.close#TODO: This isn't actually freeing up the port as shown by test_relp rescue#This catches the possibility of the client having already closed the connection end end @@ -157,14 +173,23 @@ end class RelpClient < Relp - def initialize(host,port) +# @relpcommands=['syslog']#TODO: If this becomes a separate gem, make this variable, define required and optional ones + + def initialize(host,port,required_commands=[]) + + #These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.) + @basic_relp_commands=['serverclose','rsp']#TODO: check for others + + #These are extra commands that we require, otherwise refuse the connection + @required_relp_commands = required_commands + @socket=TCPSocket.new(host,port) offer=Hash.new offer['txnr']=1 offer['command']='open' offer['message']='relp_version='+RelpVersion+"\n" offer['message']+='relp_software='+RelpSoftware+"\n" - offer['message']+='commands='+RelpCommands.join(',') + offer['message']+='commands='+@required_relp_commands.join(',')#TODO: add optional ones self.frame_write(offer) response_frame=self.frame_read raise RelpError if response_frame['message'][0,3]!='200' @@ -173,11 +198,13 @@ class RelpClient < Relp #if no version specified, relp spec says we must close connection self.close raise RelpError, 'No relp_version specified; offer: ',response_frame['message'][6..-1].scan(/^(.*)=(.*)$/).flatten - elsif ! response['commands'].split(',').include?('syslog') + + #subtracting one array from the other checks to see if all elements in @required_relp_commands are present in the offer + elsif ! (@required_relp_commands - response['commands'].split(',')).empty? #if it can't receive syslog it's useless to us; close the connection #TODO:Generalise relp class and make this optional (related to RelpCommands) self.close - raise RelpError, 'Relp server incapable of syslog' + raise InsufficientCommands, response['commands']+' offered, require '+@required_relp_commands.join(',') end #If we've got this far with no problems, we're good to go @lasttxnr=1 diff --git a/test/logstash/inputs/test_relp.rb b/test/logstash/inputs/test_relp.rb index 43c18cbf1..7f7653e21 100644 --- a/test/logstash/inputs/test_relp.rb +++ b/test/logstash/inputs/test_relp.rb @@ -10,6 +10,10 @@ require "logstash/util/relp" require "mocha" +#TODO: remove before release +require "pry" + + #TODO: I just copy/pasted all those^ which ones do I actually need? describe LogStash::Inputs::Relp do @@ -33,7 +37,7 @@ describe LogStash::Inputs::Relp do sleep(2) begin - rc=RelpClient.new('127.0.0.1',15515) + rc=RelpClient.new('127.0.0.1',15515,['syslog']) rc.syslog_write('This is the first relp test message') rc.syslog_write('This is the second relp test message') rc.syslog_write('This is the third relp test message') @@ -68,4 +72,12 @@ describe LogStash::Inputs::Relp do end end + test "RelpServer rejects invalid/innapropriate commands" do + #TODO: + end + + test "RelpServer refuses to connect if no syslog command available" do + #TODO: + end + end # testing for LogStash::Inputs::File