mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
commit
e153e4b85e
3 changed files with 532 additions and 0 deletions
77
lib/logstash/inputs/relp.rb
Normal file
77
lib/logstash/inputs/relp.rb
Normal file
|
@ -0,0 +1,77 @@
|
|||
require "logstash/inputs/base"
|
||||
require "logstash/namespace"
|
||||
require "logstash/util/relp"
|
||||
|
||||
|
||||
# Read RELP events over a TCP socket.
|
||||
#
|
||||
#Application level acknowledgements allow assurance of no message loss.
|
||||
#
|
||||
#This only functions as far as messages being put into the queue for filters-
|
||||
# anything lost after that point will not be retransmitted
|
||||
class LogStash::Inputs::Relp < LogStash::Inputs::Base
|
||||
|
||||
config_name "relp"
|
||||
plugin_status "experimental"
|
||||
|
||||
# The address to listen on.
|
||||
config :host, :validate => :string, :default => "0.0.0.0"
|
||||
|
||||
# The port to listen on.
|
||||
config :port, :validate => :number, :required => true
|
||||
|
||||
def initialize(*args)
|
||||
super(*args)
|
||||
end # def initialize
|
||||
|
||||
public
|
||||
def register
|
||||
@logger.info("Starting relp input listener", :address => "#{@host}:#{@port}")
|
||||
@relp_server = RelpServer.new(@host, @port,['syslog'])
|
||||
end # def register
|
||||
|
||||
private
|
||||
def relp_stream(relpsocket,output_queue,event_source)
|
||||
loop do
|
||||
frame = relpsocket.syslog_read
|
||||
event = self.to_event(frame['message'],event_source)
|
||||
output_queue << event
|
||||
#To get this far, the message must have made it into the queue for
|
||||
#filtering. I don't think it's possible to wait for output before ack
|
||||
#without fundamentally breaking the plugin architecture
|
||||
relpsocket.ack(frame['txnr'])
|
||||
end
|
||||
end
|
||||
|
||||
public
|
||||
def run(output_queue)
|
||||
loop do
|
||||
begin
|
||||
# Start a new thread for each connection.
|
||||
Thread.start(@relp_server.accept) do |rs|
|
||||
@logger.debug("Relp Connection to #{rs.peer} created")
|
||||
begin
|
||||
relp_stream(rs,output_queue,"relp://#{@host}:#{@port}/#{rs.peer}")
|
||||
rescue Relp::ConnectionClosed => e
|
||||
@logger.debug("Relp Connection to #{rs.peer} Closed")
|
||||
rescue Relp::RelpError => e
|
||||
@logger.warn('Relp error: '+e.class.to_s+' '+e.message)
|
||||
#TODO: Still not happy with this, are they all warn level?
|
||||
#Will this catch everything I want it to?
|
||||
#Relp spec says to close connection on error, ensure this is the case
|
||||
end
|
||||
end # Thread.start
|
||||
rescue Relp::InvalidCommand,Relp::InappropriateCommand => e
|
||||
@logger.warn('Relp client trying to open connection with something other than open:'+e.message)
|
||||
rescue Relp::InsufficientCommands
|
||||
@logger.warn('Relp client incapable of syslog')
|
||||
end
|
||||
end # loop
|
||||
end # def run
|
||||
|
||||
def teardown
|
||||
@relp_server.shutdown
|
||||
end
|
||||
end # class LogStash::Inputs::Relp
|
||||
|
||||
#TODO: structured error logging
|
321
lib/logstash/util/relp.rb
Normal file
321
lib/logstash/util/relp.rb
Normal file
|
@ -0,0 +1,321 @@
|
|||
require "socket"
|
||||
|
||||
class Relp#This isn't much use on its own, but gives RelpServer and RelpClient things
|
||||
|
||||
RelpVersion = '0'#TODO: spec says this is experimental, but rsyslog still seems to exclusively use it
|
||||
RelpSoftware = 'logstash,1.1.1,http://logstash.net'
|
||||
|
||||
class RelpError < StandardError; end
|
||||
class InvalidCommand < RelpError; end
|
||||
class InappropriateCommand < RelpError; end
|
||||
class ConnectionClosed < RelpError; end
|
||||
class InsufficientCommands < RelpError; end
|
||||
|
||||
def valid_command?(command)
|
||||
valid_commands = Array.new
|
||||
|
||||
#Allow anything in the basic protocol for both directions
|
||||
valid_commands << 'open'
|
||||
valid_commands << 'close'
|
||||
|
||||
#These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.) TODO: would they be invalid or just innapropriate?
|
||||
valid_commands += @basic_relp_commands
|
||||
|
||||
#These are extra commands that we require, otherwise refuse the connection TODO: some of these are only valid on one direction
|
||||
valid_commands += @required_relp_commands
|
||||
|
||||
#TODO: optional_relp_commands
|
||||
|
||||
#TODO: vague mentions of abort and starttls commands in spec need looking into
|
||||
return valid_commands.include?(command)
|
||||
end
|
||||
|
||||
def frame_write(frame)
|
||||
unless self.server? #I think we have to trust a server to be using the correct txnr
|
||||
#Only allow txnr to be 0 or be determined automatically
|
||||
frame['txnr'] = self.nexttxnr unless frame['txnr']==0
|
||||
end
|
||||
frame['txnr'] = frame['txnr'].to_s
|
||||
frame['message'] = '' if frame['message'].nil?
|
||||
frame['datalen'] = frame['message'].length.to_s
|
||||
wiredata=[
|
||||
frame['txnr'],
|
||||
frame['command'],
|
||||
frame['datalen'],
|
||||
frame['message']
|
||||
].join(' ').strip
|
||||
begin
|
||||
@socket.write(wiredata)
|
||||
#Ending each frame with a newline is required in the specifications
|
||||
#Doing it a separately is useful (but a bit of a bodge) because
|
||||
#for some reason it seems to take 2 writes after the server closes the
|
||||
#connection before we get an exception
|
||||
@socket.write("\n")
|
||||
rescue Errno::EPIPE,IOError,Errno::ECONNRESET#TODO: is this sufficient to catch all broken connections?
|
||||
raise ConnectionClosed
|
||||
end
|
||||
frame['txnr'].to_i
|
||||
end
|
||||
|
||||
def frame_read
|
||||
begin
|
||||
frame = Hash.new
|
||||
frame['txnr'] = @socket.readline(' ').strip.to_i
|
||||
frame['command'] = @socket.readline(' ').strip
|
||||
|
||||
#Things get a little tricky here because if the length is 0 it is not followed by a space.
|
||||
leading_digit=@socket.read(1)
|
||||
if leading_digit=='0' then
|
||||
frame['datalen'] = 0
|
||||
frame['message'] = ''
|
||||
else
|
||||
frame['datalen'] = (leading_digit + @socket.readline(' ')).strip.to_i
|
||||
frame['message'] = @socket.read(frame['datalen'])
|
||||
end
|
||||
rescue EOFError,Errno::ECONNRESET,IOError
|
||||
raise ConnectionClosed
|
||||
end
|
||||
if ! self.valid_command?(frame['command'])#TODO: is this enough to catch framing errors?
|
||||
if self.server?
|
||||
self.serverclose
|
||||
else
|
||||
self.close
|
||||
end
|
||||
raise InvalidCommand,frame['command']
|
||||
end
|
||||
return frame
|
||||
end
|
||||
|
||||
def server?
|
||||
@server
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
class RelpServer < Relp
|
||||
|
||||
def peer
|
||||
if @peer.nil?
|
||||
@peer = @socket.peeraddr[3]#TODO: is this the best thing to report? I don't think so...
|
||||
end
|
||||
@peer
|
||||
end
|
||||
|
||||
def initialize(host,port,required_commands=[])
|
||||
|
||||
@server=true
|
||||
|
||||
#These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.)
|
||||
@basic_relp_commands = ['close']#TODO: check for others
|
||||
|
||||
#These are extra commands that we require, otherwise refuse the connection
|
||||
@required_relp_commands = required_commands
|
||||
|
||||
@server=TCPServer.new(host,port)#TODO: rescue if port is already in use (Errno::EADDRINUSE)
|
||||
end
|
||||
|
||||
def accept
|
||||
@socket=@server.accept
|
||||
frame=self.frame_read
|
||||
if frame['command'] == 'open'
|
||||
offer=Hash[*frame['message'].scan(/^(.*)=(.*)$/).flatten]
|
||||
if offer['relp_version'].nil?
|
||||
#if no version specified, relp spec says we must close connection
|
||||
self.serverclose
|
||||
raise RelpError, 'No relp_version specified'
|
||||
#subtracting one array from the other checks to see if all elements in @required_relp_commands are present in the offer
|
||||
elsif ! (@required_relp_commands - offer['commands'].split(',')).empty?
|
||||
#Tell them why we're closing the connection:
|
||||
response_frame = Hash.new
|
||||
response_frame['txnr'] = frame['txnr']
|
||||
response_frame['command'] = 'rsp'
|
||||
response_frame['message'] = '500 Required command(s) '
|
||||
+ (@required_relp_commands - offer['commands'].split(',')).join(',')
|
||||
+ ' not offered'
|
||||
self.frame_write(response_frame)
|
||||
|
||||
self.serverclose
|
||||
raise InsufficientCommands, offer['commands']
|
||||
+ ' offered, require ' + @required_relp_commands.join(',')
|
||||
else
|
||||
#attempt to set up connection
|
||||
response_frame = Hash.new
|
||||
response_frame['txnr'] = frame['txnr']
|
||||
response_frame['command'] = 'rsp'
|
||||
|
||||
response_frame['message'] = '200 OK '
|
||||
response_frame['message'] += 'relp_version=' + RelpVersion + "\n"
|
||||
response_frame['message'] += 'relp_software=' + RelpSoftware + "\n"
|
||||
response_frame['message'] += 'commands=' + @required_relp_commands.join(',')#TODO: optional ones
|
||||
self.frame_write(response_frame)
|
||||
return self
|
||||
end
|
||||
else
|
||||
self.serverclose
|
||||
raise InappropriateCommand, frame['command'] + ' expecting open'
|
||||
end
|
||||
end
|
||||
|
||||
#This does not ack the frame, just reads it
|
||||
def syslog_read
|
||||
frame = self.frame_read
|
||||
if frame['command'] == 'syslog'
|
||||
return frame
|
||||
elsif frame['command'] == 'close'
|
||||
#the client is closing the connection, acknowledge the close and act on it
|
||||
response_frame = Hash.new
|
||||
response_frame['txnr'] = frame['txnr']
|
||||
response_frame['command'] = 'rsp'
|
||||
self.frame_write(response_frame)
|
||||
self.serverclose
|
||||
raise ConnectionClosed
|
||||
else
|
||||
#the client is trying to do something unexpected
|
||||
self.serverclose
|
||||
raise InappropriateCommand, frame['command'] + ' expecting syslog'
|
||||
end
|
||||
end
|
||||
|
||||
def serverclose
|
||||
frame = Hash.new
|
||||
frame['txnr'] = 0
|
||||
frame['command'] = 'serverclose'
|
||||
begin
|
||||
self.frame_write(frame)
|
||||
@socket.close
|
||||
rescue ConnectionClosed
|
||||
end
|
||||
end
|
||||
|
||||
def shutdown
|
||||
@server.shutdown
|
||||
@server.close
|
||||
rescue Exception#@server might already be down
|
||||
end
|
||||
|
||||
def ack(txnr)
|
||||
frame = Hash.new
|
||||
frame['txnr'] = txnr
|
||||
frame['command'] = 'rsp'
|
||||
frame['message'] = '200 OK'
|
||||
self.frame_write(frame)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
#This is only used by the tests; any problems here are not as important as elsewhere
|
||||
class RelpClient < Relp
|
||||
|
||||
def initialize(host,port,required_commands = [],buffer_size = 128,
|
||||
retransmission_timeout=10)
|
||||
|
||||
@server = false
|
||||
@buffer = Hash.new
|
||||
|
||||
@buffer_size = buffer_size
|
||||
@retransmission_timeout = retransmission_timeout
|
||||
|
||||
#These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.)
|
||||
@basic_relp_commands = ['serverclose','rsp']#TODO: check for others
|
||||
|
||||
#These are extra commands that we require, otherwise refuse the connection
|
||||
@required_relp_commands = required_commands
|
||||
|
||||
@socket=TCPSocket.new(host,port)
|
||||
|
||||
#This'll start the automatic frame numbering
|
||||
@lasttxnr = 0
|
||||
|
||||
offer=Hash.new
|
||||
offer['command'] = 'open'
|
||||
offer['message'] = 'relp_version=' + RelpVersion + "\n"
|
||||
offer['message'] += 'relp_software=' + RelpSoftware + "\n"
|
||||
offer['message'] += 'commands=' + @required_relp_commands.join(',')#TODO: add optional ones
|
||||
self.frame_write(offer)
|
||||
response_frame = self.frame_read
|
||||
|
||||
unless response_frame['message'][0,3] == '200'
|
||||
raise RelpError,response_frame['message']
|
||||
end
|
||||
|
||||
response=Hash[*response_frame['message'][7..-1].scan(/^(.*)=(.*)$/).flatten]
|
||||
if response['relp_version'].nil?
|
||||
#if no version specified, relp spec says we must close connection
|
||||
self.close
|
||||
raise RelpError, 'No relp_version specified; offer: '
|
||||
+ response_frame['message'][6..-1].scan(/^(.*)=(.*)$/).flatten
|
||||
|
||||
#subtracting one array from the other checks to see if all elements in @required_relp_commands are present in the offer
|
||||
elsif ! (@required_relp_commands - response['commands'].split(',')).empty?
|
||||
#if it can't receive syslog it's useless to us; close the connection
|
||||
self.close
|
||||
raise InsufficientCommands, response['commands'] + ' offered, require '
|
||||
+ @required_relp_commands.join(',')
|
||||
end
|
||||
#If we've got this far with no problems, we're good to go
|
||||
|
||||
#This thread deals with responses that come back
|
||||
reader = Thread.start do |parent|
|
||||
loop do
|
||||
f = self.frame_read
|
||||
if f['command'] == 'rsp' && f['message'] == '200 OK'
|
||||
@buffer.delete(f['txnr'])
|
||||
elsif f['command'] == 'rsp' && f['message'][0,1] == '5'
|
||||
#TODO: What if we get an error for something we're already retransmitted due to timeout?
|
||||
new_txnr = self.frame_write(@buffer[f['txnr']])
|
||||
@buffer[new_txnr] = @buffer[f['txnr']]
|
||||
@buffer.delete(f['txnr'])
|
||||
elsif f['command'] == 'serverclose' || f['txnr'] == @close_txnr
|
||||
parent.raise ConnectionClosed#TODO: raising errors like this makes no sense
|
||||
else
|
||||
#Don't know what's going on if we get here, but it can't be good
|
||||
parent.raise RelpError#TODO: raising errors like this makes no sense
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
#While this one deals with frames for which we get no reply
|
||||
Thread.start do
|
||||
old_buffer = Hash.new
|
||||
loop do
|
||||
#This returns old txnrs that are still present
|
||||
(@buffer.keys & old_buffer.keys).each do |txnr|
|
||||
new_txnr = self.frame_write(@buffer[txnr])
|
||||
@buffer[new_txnr] = @buffer[txnr]
|
||||
@buffer.delete(txnr)
|
||||
end
|
||||
old_buffer = @buffer
|
||||
sleep @retransmission_timeout
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
#TODO: have a way to get back unacked messages on close
|
||||
def close
|
||||
frame = Hash.new
|
||||
frame['command'] = 'close'
|
||||
@close_txnr=self.frame_write(frame)
|
||||
#TODO: ought to properly wait for a reply etc. The serverclose will make it work though
|
||||
sleep @retransmission_timeout
|
||||
@socket.close#TODO: shutdown?
|
||||
return @buffer
|
||||
end
|
||||
|
||||
def syslog_write(logline)
|
||||
|
||||
#If the buffer is already full, wait until a gap opens up
|
||||
sleep 0.1 until @buffer.length<@buffer_size
|
||||
|
||||
frame = Hash.new
|
||||
frame['command'] = 'syslog'
|
||||
frame['message'] = logline
|
||||
|
||||
txnr = self.frame_write(frame)
|
||||
@buffer[txnr] = frame
|
||||
end
|
||||
|
||||
def nexttxnr
|
||||
@lasttxnr += 1
|
||||
end
|
||||
|
||||
end
|
134
test/logstash/inputs/test_relp.rb
Normal file
134
test/logstash/inputs/test_relp.rb
Normal file
|
@ -0,0 +1,134 @@
|
|||
require "rubygems"
|
||||
require File.join(File.dirname(__FILE__), "..", "minitest")
|
||||
|
||||
require "logstash/loadlibs"
|
||||
require "logstash/testcase"
|
||||
require "logstash/agent"
|
||||
require "logstash/logging"
|
||||
require "logstash/inputs/relp"
|
||||
require "logstash/util/relp"
|
||||
|
||||
require "mocha"
|
||||
|
||||
#TODO: I just copy/pasted all those^ which ones do I actually need?
|
||||
|
||||
describe LogStash::Inputs::Relp do
|
||||
|
||||
before do
|
||||
#TODO: port 15515 is what I tend to use; pick a default?
|
||||
@input = LogStash::Inputs::Relp.new("type" => ["relp"],
|
||||
"host" => ["127.0.0.1"], "port" => [15515])
|
||||
@input.register
|
||||
end # before
|
||||
|
||||
after do
|
||||
@input.teardown
|
||||
end # after
|
||||
|
||||
test "Basic handshaking/message transmission" do
|
||||
queue = Queue.new
|
||||
thread = Thread.new { @input.run(queue) }
|
||||
|
||||
# Let the input start listening. This is a crappy solution, but until
|
||||
# plugins can notify "I am ready!" testing will be a bit awkward.
|
||||
# TODO: could we not pull this from the logger?
|
||||
sleep(2)
|
||||
|
||||
begin
|
||||
rc=RelpClient.new('127.0.0.1',15515,['syslog'])
|
||||
rc.syslog_write('This is the first relp test message')
|
||||
rc.syslog_write('This is the second relp test message')
|
||||
rc.syslog_write('This is the third relp test message')
|
||||
rc.syslog_write('This is the fourth relp test message')
|
||||
rc.syslog_write('This is the fifth relp test message')
|
||||
count=5
|
||||
|
||||
rc.close
|
||||
|
||||
events=[]
|
||||
|
||||
start = Time.new
|
||||
|
||||
# Allow maximum of 2 seconds for events to show up
|
||||
while (Time.new - start) < 2 && events.size != count
|
||||
begin
|
||||
event = queue.pop(true) # don't block
|
||||
events << event if event
|
||||
rescue ThreadError => e
|
||||
# Fail on anything other than "queue empty"
|
||||
raise e if e.to_s != "queue empty"
|
||||
sleep(0.05)
|
||||
end
|
||||
end
|
||||
|
||||
assert_equal(count, events.size, "Wanted #{count}, but got #{events.size} events")
|
||||
assert_equal("This is the first relp test message", events.first.message)
|
||||
assert_equal("This is the fifth relp test message", events.last.message)
|
||||
|
||||
rescue Relp::RelpError => re
|
||||
flunk re.class.to_s + ': ' + re.to_s#TODO: is there not a proper way to do this?
|
||||
end
|
||||
end
|
||||
|
||||
test "RelpServer rejects invalid commands" do
|
||||
#Need it to close the connection, but not bring down the whole server
|
||||
queue = Queue.new
|
||||
thread = Thread.new { @input.run(queue) }
|
||||
|
||||
logger=Queue.new
|
||||
(@input.instance_eval { @logger }).subscribe(logger)
|
||||
|
||||
assert_raises(Relp::ConnectionClosed) do
|
||||
rc = RelpClient.new('127.0.0.1',15515,['syslog'])
|
||||
badframe = Hash.new
|
||||
badframe['command'] = 'badcommand'
|
||||
rc.frame_write(badframe)
|
||||
#We can't detect that it's closed until we try to write to it again
|
||||
#(delay is required for connection to be closed)
|
||||
sleep 1
|
||||
rc.frame_write(badframe)
|
||||
end
|
||||
assert_equal("Relp error: Relp::InvalidCommand badcommand",logger.pop[:message])
|
||||
end
|
||||
|
||||
test "RelpServer rejects inappropriate commands" do
|
||||
#Need it to close the connection, but not bring down the whole server
|
||||
queue = Queue.new
|
||||
thread = Thread.new { @input.run(queue) }
|
||||
|
||||
logger = Queue.new
|
||||
(@input.instance_eval { @logger }).subscribe(logger)
|
||||
|
||||
assert_raises(Relp::ConnectionClosed) do
|
||||
rc = RelpClient.new('127.0.0.1',15515,['syslog'])
|
||||
badframe = Hash.new
|
||||
badframe['command'] = 'open'#it's not expecting open again
|
||||
rc.frame_write(badframe)
|
||||
#We can't detect that it's closed until we try to write to it again
|
||||
#(but with something other than an open)
|
||||
sleep 1
|
||||
badframe['command'] = 'syslog'
|
||||
rc.frame_write(badframe)
|
||||
end
|
||||
assert_equal("Relp error: Relp::InappropriateCommand open expecting syslog",
|
||||
logger.pop[:message])
|
||||
|
||||
end
|
||||
|
||||
test "RelpServer refuses to connect if no syslog command available" do
|
||||
|
||||
logger = Queue.new
|
||||
(@input.instance_eval { @logger }).subscribe(logger)
|
||||
|
||||
assert_raises(Relp::RelpError) do
|
||||
queue = Queue.new
|
||||
thread = Thread.new { @input.run(queue) }
|
||||
rc = RelpClient.new('127.0.0.1',15515)
|
||||
end
|
||||
|
||||
assert_equal("Relp client incapable of syslog",logger.pop[:message])
|
||||
end
|
||||
|
||||
end # testing for LogStash::Inputs::File
|
||||
|
||||
#TODO: structured error logging
|
Loading…
Add table
Add a link
Reference in a new issue