mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
Made required commands in handshake variables
Syslog is no longer hardcoded in as the one required command, you can pass an array to RelpServer or RelpClient on creation. All elements in this array must be offered by the other participant for the handshake to complete.
This commit is contained in:
parent
db4e122614
commit
5db5872385
3 changed files with 85 additions and 36 deletions
|
@ -28,7 +28,7 @@ class LogStash::Inputs::Relp < LogStash::Inputs::Base
|
||||||
public
|
public
|
||||||
def register
|
def register
|
||||||
@logger.info("Starting relp input listener", :address => "#{@host}:#{@port}")
|
@logger.info("Starting relp input listener", :address => "#{@host}:#{@port}")
|
||||||
@server_socket = RelpServer.new(@host, @port)
|
@server_socket = RelpServer.new(@host, @port,['syslog'])
|
||||||
end # def register
|
end # def register
|
||||||
|
|
||||||
private
|
private
|
||||||
|
@ -47,22 +47,32 @@ class LogStash::Inputs::Relp < LogStash::Inputs::Base
|
||||||
public
|
public
|
||||||
def run(output_queue)
|
def run(output_queue)
|
||||||
loop do
|
loop do
|
||||||
# Start a new thread for each connection.
|
begin
|
||||||
Thread.start(@server_socket.accept) do |rs|
|
# Start a new thread for each connection.
|
||||||
begin
|
Thread.start(@server_socket.accept) do |rs|
|
||||||
relp_stream(@server_socket,output_queue,"relp://#{@host}:#{@port}/#{rs.peer}")
|
begin
|
||||||
rescue Relp::ConnectionClosed => e
|
relp_stream(@server_socket,output_queue,"relp://#{@host}:#{@port}/#{rs.peer}")
|
||||||
@logger.debug('Relp Connection to #{rs.peer} Closed')
|
rescue Relp::ConnectionClosed => e
|
||||||
rescue Relp::RelpError => e
|
@logger.debug('Relp Connection to #{rs.peer} Closed')
|
||||||
@logger.warn('Relp error: '+e.class.to_s+' '+e.message)
|
rescue Relp::RelpError => e
|
||||||
#TODO: Still not happy with this, are they all warn level?
|
@logger.warn('Relp error: '+e.class.to_s+' '+e.message)
|
||||||
#Will this catch everything I want it to?
|
#TODO: Still not happy with this, are they all warn level?
|
||||||
#Relp spec says to close connection on error, ensure this is the case
|
#Will this catch everything I want it to?
|
||||||
rs.serverclose
|
#Relp spec says to close connection on error, ensure this is the case
|
||||||
end
|
rs.serverclose
|
||||||
#Let garbage collection clear up after us
|
end
|
||||||
rs=nil
|
#Let garbage collection clear up after us
|
||||||
end # Thread.start
|
rs=nil
|
||||||
|
end # Thread.start
|
||||||
|
rescue Relp::InsufficientCommands#TODO: why didn't it work when I included this is the same block as the other rescues?
|
||||||
|
@logger.warn('Relp client incapable of syslog')
|
||||||
|
end
|
||||||
end # loop
|
end # loop
|
||||||
end # def run
|
end # def run
|
||||||
|
|
||||||
|
#TODO: Make sure this is doing the job properly
|
||||||
|
def teardown
|
||||||
|
@server_socket.serverclose
|
||||||
|
finished
|
||||||
|
end
|
||||||
end # class LogStash::Inputs::Relp
|
end # class LogStash::Inputs::Relp
|
||||||
|
|
|
@ -1,27 +1,34 @@
|
||||||
require "socket"
|
require "socket"
|
||||||
|
|
||||||
|
#TODO: remove before release
|
||||||
|
require "pry"
|
||||||
|
|
||||||
class Relp#This isn't much use on its own, but gives RelpServer and RelpClient things
|
class Relp#This isn't much use on its own, but gives RelpServer and RelpClient things
|
||||||
|
|
||||||
RelpVersion='0'#TODO: spec says this is experimental, but rsyslog still seems to exclusively use it
|
RelpVersion='0'#TODO: spec says this is experimental, but rsyslog still seems to exclusively use it
|
||||||
RelpSoftware='logstash,1.1.1,http://logstash.net'#TODO: this is a placeholder for now
|
RelpSoftware='logstash,1.1.1,http://logstash.net'#TODO: this is a placeholder for now
|
||||||
RelpCommands=['syslog']#TODO: If this becomes a separate gem, make this variable, define required and optional ones
|
|
||||||
|
|
||||||
class RelpError < StandardError; end
|
class RelpError < StandardError; end
|
||||||
class InvalidCommand < RelpError; end
|
class InvalidCommand < RelpError; end
|
||||||
class InappropriateCommand < RelpError; end
|
class InappropriateCommand < RelpError; end
|
||||||
class ConnectionClosed < RelpError; end
|
class ConnectionClosed < RelpError; end
|
||||||
|
class InsufficientCommands < RelpError; end
|
||||||
|
|
||||||
def self.valid_command?(command)
|
def valid_command?(command)
|
||||||
valid_commands=Array.new
|
valid_commands=Array.new
|
||||||
|
|
||||||
#Allow anything in the basic protocol
|
#Allow anything in the basic protocol for both directions
|
||||||
valid_commands << 'open'
|
valid_commands << 'open'
|
||||||
valid_commands << 'close'
|
valid_commands << 'close'
|
||||||
valid_commands << 'rsp'
|
|
||||||
|
|
||||||
#Allow anything we offered to accept
|
#These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.) TODO: would they be invalid or just innapropriate?
|
||||||
valid_commands += RelpCommands
|
valid_commands += @basic_relp_commands
|
||||||
#Don't accept serverclose or rsp as valid commands because this is the server TODO: generalise
|
|
||||||
|
#These are extra commands that we require, otherwise refuse the connection TODO: some of these are only valid on one direction
|
||||||
|
valid_commands += @required_relp_commands
|
||||||
|
|
||||||
|
#TODO: optional_relp_commands
|
||||||
|
|
||||||
#TODO: vague mentions of abort and starttls commands in spec need looking into
|
#TODO: vague mentions of abort and starttls commands in spec need looking into
|
||||||
return valid_commands.include?(command)
|
return valid_commands.include?(command)
|
||||||
end
|
end
|
||||||
|
@ -64,7 +71,7 @@ class Relp#This isn't much use on its own, but gives RelpServer and RelpClient t
|
||||||
rescue EOFError
|
rescue EOFError
|
||||||
raise ConnectionClosed #TODO: ECONNRESET
|
raise ConnectionClosed #TODO: ECONNRESET
|
||||||
end
|
end
|
||||||
if ! Relp.valid_command?(frame['command'])#TODO: is this enough to catch framing errors?
|
if ! self.valid_command?(frame['command'])#TODO: is this enough to catch framing errors?
|
||||||
raise InvalidCommand,frame['command']
|
raise InvalidCommand,frame['command']
|
||||||
end
|
end
|
||||||
return frame
|
return frame
|
||||||
|
@ -78,7 +85,14 @@ class RelpServer < Relp
|
||||||
@socket.peeraddr[3]#TODO: is this the best thing to report?
|
@socket.peeraddr[3]#TODO: is this the best thing to report?
|
||||||
end
|
end
|
||||||
|
|
||||||
def initialize(host,port)
|
def initialize(host,port,required_commands=[])
|
||||||
|
|
||||||
|
#These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.)
|
||||||
|
@basic_relp_commands=['close']#TODO: check for others
|
||||||
|
|
||||||
|
#These are extra commands that we require, otherwise refuse the connection
|
||||||
|
@required_relp_commands = required_commands
|
||||||
|
|
||||||
@server=TCPServer.new(host,port)
|
@server=TCPServer.new(host,port)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -91,11 +105,12 @@ class RelpServer < Relp
|
||||||
#if no version specified, relp spec says we must close connection
|
#if no version specified, relp spec says we must close connection
|
||||||
self.serverclose
|
self.serverclose
|
||||||
raise RelpError, 'No relp_version specified'
|
raise RelpError, 'No relp_version specified'
|
||||||
elsif ! offer['commands'].split(',').include?('syslog')
|
#subtracting one array from the other checks to see if all elements in @required_relp_commands are present in the offer
|
||||||
|
elsif ! (@required_relp_commands - offer['commands'].split(',')).empty?
|
||||||
#if it can't send us syslog it's useless to us; close the connection
|
#if it can't send us syslog it's useless to us; close the connection
|
||||||
#TODO:Generalise relp class and make this optional (related to RelpCommands)
|
#TODO:Generalise relp class and make this optional (related to RelpCommands)
|
||||||
self.serverclose
|
self.serverclose
|
||||||
raise RelpError, 'Relp client incapable of syslog'
|
raise InsufficientCommands, offer['commands']+' offered, require '+@required_relp_commands.join(',')
|
||||||
else
|
else
|
||||||
#attempt to set up connection
|
#attempt to set up connection
|
||||||
response_frame=Hash.new
|
response_frame=Hash.new
|
||||||
|
@ -105,7 +120,7 @@ class RelpServer < Relp
|
||||||
response_frame['message']='200 OK '
|
response_frame['message']='200 OK '
|
||||||
response_frame['message']+='relp_version='+RelpVersion+"\n"
|
response_frame['message']+='relp_version='+RelpVersion+"\n"
|
||||||
response_frame['message']+='relp_software='+RelpSoftware+"\n"
|
response_frame['message']+='relp_software='+RelpSoftware+"\n"
|
||||||
response_frame['message']+='commands='+RelpCommands.join(',')
|
response_frame['message']+='commands='+@required_relp_commands.join(',')#TODO: optional ones
|
||||||
self.frame_write(response_frame)
|
self.frame_write(response_frame)
|
||||||
return self
|
return self
|
||||||
end
|
end
|
||||||
|
@ -141,7 +156,8 @@ class RelpServer < Relp
|
||||||
frame['command']='serverclose'
|
frame['command']='serverclose'
|
||||||
begin
|
begin
|
||||||
self.frame_write(frame)
|
self.frame_write(frame)
|
||||||
@socket.close#TODO: shutdown?
|
@socket.shutdown
|
||||||
|
@socket.close#TODO: This isn't actually freeing up the port as shown by test_relp
|
||||||
rescue#This catches the possibility of the client having already closed the connection
|
rescue#This catches the possibility of the client having already closed the connection
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -157,14 +173,23 @@ end
|
||||||
|
|
||||||
class RelpClient < Relp
|
class RelpClient < Relp
|
||||||
|
|
||||||
def initialize(host,port)
|
# @relpcommands=['syslog']#TODO: If this becomes a separate gem, make this variable, define required and optional ones
|
||||||
|
|
||||||
|
def initialize(host,port,required_commands=[])
|
||||||
|
|
||||||
|
#These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.)
|
||||||
|
@basic_relp_commands=['serverclose','rsp']#TODO: check for others
|
||||||
|
|
||||||
|
#These are extra commands that we require, otherwise refuse the connection
|
||||||
|
@required_relp_commands = required_commands
|
||||||
|
|
||||||
@socket=TCPSocket.new(host,port)
|
@socket=TCPSocket.new(host,port)
|
||||||
offer=Hash.new
|
offer=Hash.new
|
||||||
offer['txnr']=1
|
offer['txnr']=1
|
||||||
offer['command']='open'
|
offer['command']='open'
|
||||||
offer['message']='relp_version='+RelpVersion+"\n"
|
offer['message']='relp_version='+RelpVersion+"\n"
|
||||||
offer['message']+='relp_software='+RelpSoftware+"\n"
|
offer['message']+='relp_software='+RelpSoftware+"\n"
|
||||||
offer['message']+='commands='+RelpCommands.join(',')
|
offer['message']+='commands='+@required_relp_commands.join(',')#TODO: add optional ones
|
||||||
self.frame_write(offer)
|
self.frame_write(offer)
|
||||||
response_frame=self.frame_read
|
response_frame=self.frame_read
|
||||||
raise RelpError if response_frame['message'][0,3]!='200'
|
raise RelpError if response_frame['message'][0,3]!='200'
|
||||||
|
@ -173,11 +198,13 @@ class RelpClient < Relp
|
||||||
#if no version specified, relp spec says we must close connection
|
#if no version specified, relp spec says we must close connection
|
||||||
self.close
|
self.close
|
||||||
raise RelpError, 'No relp_version specified; offer: ',response_frame['message'][6..-1].scan(/^(.*)=(.*)$/).flatten
|
raise RelpError, 'No relp_version specified; offer: ',response_frame['message'][6..-1].scan(/^(.*)=(.*)$/).flatten
|
||||||
elsif ! response['commands'].split(',').include?('syslog')
|
|
||||||
|
#subtracting one array from the other checks to see if all elements in @required_relp_commands are present in the offer
|
||||||
|
elsif ! (@required_relp_commands - response['commands'].split(',')).empty?
|
||||||
#if it can't receive syslog it's useless to us; close the connection
|
#if it can't receive syslog it's useless to us; close the connection
|
||||||
#TODO:Generalise relp class and make this optional (related to RelpCommands)
|
#TODO:Generalise relp class and make this optional (related to RelpCommands)
|
||||||
self.close
|
self.close
|
||||||
raise RelpError, 'Relp server incapable of syslog'
|
raise InsufficientCommands, response['commands']+' offered, require '+@required_relp_commands.join(',')
|
||||||
end
|
end
|
||||||
#If we've got this far with no problems, we're good to go
|
#If we've got this far with no problems, we're good to go
|
||||||
@lasttxnr=1
|
@lasttxnr=1
|
||||||
|
|
|
@ -10,6 +10,10 @@ require "logstash/util/relp"
|
||||||
|
|
||||||
require "mocha"
|
require "mocha"
|
||||||
|
|
||||||
|
#TODO: remove before release
|
||||||
|
require "pry"
|
||||||
|
|
||||||
|
|
||||||
#TODO: I just copy/pasted all those^ which ones do I actually need?
|
#TODO: I just copy/pasted all those^ which ones do I actually need?
|
||||||
|
|
||||||
describe LogStash::Inputs::Relp do
|
describe LogStash::Inputs::Relp do
|
||||||
|
@ -33,7 +37,7 @@ describe LogStash::Inputs::Relp do
|
||||||
sleep(2)
|
sleep(2)
|
||||||
|
|
||||||
begin
|
begin
|
||||||
rc=RelpClient.new('127.0.0.1',15515)
|
rc=RelpClient.new('127.0.0.1',15515,['syslog'])
|
||||||
rc.syslog_write('This is the first relp test message')
|
rc.syslog_write('This is the first relp test message')
|
||||||
rc.syslog_write('This is the second relp test message')
|
rc.syslog_write('This is the second relp test message')
|
||||||
rc.syslog_write('This is the third relp test message')
|
rc.syslog_write('This is the third relp test message')
|
||||||
|
@ -68,4 +72,12 @@ describe LogStash::Inputs::Relp do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "RelpServer rejects invalid/innapropriate commands" do
|
||||||
|
#TODO:
|
||||||
|
end
|
||||||
|
|
||||||
|
test "RelpServer refuses to connect if no syslog command available" do
|
||||||
|
#TODO:
|
||||||
|
end
|
||||||
|
|
||||||
end # testing for LogStash::Inputs::File
|
end # testing for LogStash::Inputs::File
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue