mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
Allow multiple client to RELP input
This commit is contained in:
parent
9f5dd4ddb6
commit
dd3b1bb80a
3 changed files with 189 additions and 65 deletions
|
@ -1,6 +1,7 @@
|
|||
require "logstash/inputs/base"
|
||||
require "logstash/namespace"
|
||||
require "logstash/util/relp"
|
||||
require "logstash/util/socket_peer"
|
||||
|
||||
|
||||
# Read RELP events over a TCP socket.
|
||||
|
@ -14,6 +15,7 @@ require "logstash/util/relp"
|
|||
# Message acks only function as far as messages being put into the queue for
|
||||
# filters; anything lost after that point will not be retransmitted
|
||||
class LogStash::Inputs::Relp < LogStash::Inputs::Base
|
||||
class Interrupted < StandardError; end
|
||||
|
||||
config_name "relp"
|
||||
plugin_status "experimental"
|
||||
|
@ -35,29 +37,35 @@ class LogStash::Inputs::Relp < LogStash::Inputs::Base
|
|||
end # def register
|
||||
|
||||
private
|
||||
def relp_stream(relpsocket,output_queue,event_source)
|
||||
def relp_stream(relpserver,socket,output_queue,event_source)
|
||||
loop do
|
||||
frame = relpsocket.syslog_read
|
||||
frame = relpserver.syslog_read(socket)
|
||||
event = self.to_event(frame['message'],event_source)
|
||||
output_queue << event
|
||||
#To get this far, the message must have made it into the queue for
|
||||
#filtering. I don't think it's possible to wait for output before ack
|
||||
#without fundamentally breaking the plugin architecture
|
||||
relpsocket.ack(frame['txnr'])
|
||||
relpserver.ack(socket, frame['txnr'])
|
||||
end
|
||||
end
|
||||
|
||||
public
|
||||
def run(output_queue)
|
||||
@thread = Thread.current
|
||||
loop do
|
||||
begin
|
||||
# Start a new thread for each connection.
|
||||
Thread.start(@relp_server.accept) do |rs|
|
||||
@logger.debug("Relp Connection to #{rs.peer} created")
|
||||
Thread.start(@relp_server.accept) do |client|
|
||||
rs = client[0]
|
||||
socket = client[1]
|
||||
# monkeypatch a 'peer' method onto the socket.
|
||||
socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
|
||||
peer = socket.peer
|
||||
@logger.debug("Relp Connection to #{peer} created")
|
||||
begin
|
||||
relp_stream(rs,output_queue,"relp://#{@host}:#{@port}/#{rs.peer}")
|
||||
relp_stream(rs,socket, output_queue,"relp://#{peer}")
|
||||
rescue Relp::ConnectionClosed => e
|
||||
@logger.debug("Relp Connection to #{rs.peer} Closed")
|
||||
@logger.debug("Relp Connection to #{peer} Closed")
|
||||
rescue Relp::RelpError => e
|
||||
@logger.warn('Relp error: '+e.class.to_s+' '+e.message)
|
||||
#TODO: Still not happy with this, are they all warn level?
|
||||
|
@ -69,12 +77,22 @@ class LogStash::Inputs::Relp < LogStash::Inputs::Base
|
|||
@logger.warn('Relp client trying to open connection with something other than open:'+e.message)
|
||||
rescue Relp::InsufficientCommands
|
||||
@logger.warn('Relp client incapable of syslog')
|
||||
rescue IOError, Interrupted
|
||||
if @interrupted
|
||||
# Intended shutdown, get out of the loop
|
||||
@relp_server.shutdown
|
||||
break
|
||||
else
|
||||
# Else it was a genuine IOError caused by something else, so propagate it up..
|
||||
raise
|
||||
end
|
||||
end
|
||||
end # loop
|
||||
end # def run
|
||||
|
||||
def teardown
|
||||
@relp_server.shutdown
|
||||
@interrupted = true
|
||||
@thread.raise(Interrupted.new)
|
||||
end
|
||||
end # class LogStash::Inputs::Relp
|
||||
|
||||
|
|
|
@ -30,10 +30,10 @@ class Relp#This isn't much use on its own, but gives RelpServer and RelpClient t
|
|||
return valid_commands.include?(command)
|
||||
end
|
||||
|
||||
def frame_write(frame)
|
||||
def frame_write(socket, frame)
|
||||
unless self.server? #I think we have to trust a server to be using the correct txnr
|
||||
#Only allow txnr to be 0 or be determined automatically
|
||||
frame['txnr'] = self.nexttxnr unless frame['txnr']==0
|
||||
frame['txnr'] = self.nexttxnr() unless frame['txnr']==0
|
||||
end
|
||||
frame['txnr'] = frame['txnr'].to_s
|
||||
frame['message'] = '' if frame['message'].nil?
|
||||
|
@ -45,37 +45,39 @@ class Relp#This isn't much use on its own, but gives RelpServer and RelpClient t
|
|||
frame['message']
|
||||
].join(' ').strip
|
||||
begin
|
||||
@socket.write(wiredata)
|
||||
@logger.debug? and @logger.debug("Writing to socket", :data => wiredata)
|
||||
socket.write(wiredata)
|
||||
#Ending each frame with a newline is required in the specifications
|
||||
#Doing it a separately is useful (but a bit of a bodge) because
|
||||
#for some reason it seems to take 2 writes after the server closes the
|
||||
#connection before we get an exception
|
||||
@socket.write("\n")
|
||||
socket.write("\n")
|
||||
rescue Errno::EPIPE,IOError,Errno::ECONNRESET#TODO: is this sufficient to catch all broken connections?
|
||||
raise ConnectionClosed
|
||||
end
|
||||
frame['txnr'].to_i
|
||||
return frame['txnr'].to_i
|
||||
end
|
||||
|
||||
def frame_read
|
||||
def frame_read(socket)
|
||||
begin
|
||||
frame = Hash.new
|
||||
frame['txnr'] = @socket.readline(' ').strip.to_i
|
||||
frame['command'] = @socket.readline(' ').strip
|
||||
frame['txnr'] = socket.readline(' ').strip.to_i
|
||||
frame['command'] = socket.readline(' ').strip
|
||||
|
||||
#Things get a little tricky here because if the length is 0 it is not followed by a space.
|
||||
leading_digit=@socket.read(1)
|
||||
leading_digit=socket.read(1)
|
||||
if leading_digit=='0' then
|
||||
frame['datalen'] = 0
|
||||
frame['message'] = ''
|
||||
else
|
||||
frame['datalen'] = (leading_digit + @socket.readline(' ')).strip.to_i
|
||||
frame['message'] = @socket.read(frame['datalen'])
|
||||
frame['datalen'] = (leading_digit + socket.readline(' ')).strip.to_i
|
||||
frame['message'] = socket.read(frame['datalen'])
|
||||
end
|
||||
@logger.debug? and @logger.debug("Read frame", :frame => frame)
|
||||
rescue EOFError,Errno::ECONNRESET,IOError
|
||||
raise ConnectionClosed
|
||||
end
|
||||
if ! self.valid_command?(frame['command'])#TODO: is this enough to catch framing errors?
|
||||
if ! self.valid_command?(frame['command'])#TODO: is this enough to catch framing errors?
|
||||
if self.server?
|
||||
self.serverclose
|
||||
else
|
||||
|
@ -93,16 +95,10 @@ class Relp#This isn't much use on its own, but gives RelpServer and RelpClient t
|
|||
end
|
||||
|
||||
class RelpServer < Relp
|
||||
|
||||
def peer
|
||||
if @peer.nil?
|
||||
@peer = @socket.peeraddr[3]#TODO: is this the best thing to report? I don't think so...
|
||||
end
|
||||
@peer
|
||||
end
|
||||
|
||||
def initialize(host,port,required_commands=[])
|
||||
|
||||
@logger = Cabin::Channel.get(LogStash)
|
||||
|
||||
@server=true
|
||||
|
||||
#These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.)
|
||||
|
@ -111,20 +107,29 @@ class RelpServer < Relp
|
|||
#These are extra commands that we require, otherwise refuse the connection
|
||||
@required_relp_commands = required_commands
|
||||
|
||||
@server=TCPServer.new(host,port)#TODO: rescue if port is already in use (Errno::EADDRINUSE)
|
||||
begin
|
||||
@server = TCPServer.new(host, port)
|
||||
rescue Errno::EADDRINUSE
|
||||
@logger.error("Could not start RELP server: Address in use",
|
||||
:host => host, :port => port)
|
||||
raise
|
||||
end
|
||||
@logger.info? and @logger.info("Started RELP Server", :host => host, :port => port)
|
||||
end
|
||||
|
||||
|
||||
def accept
|
||||
@socket=@server.accept
|
||||
frame=self.frame_read
|
||||
socket = @server.accept
|
||||
frame=self.frame_read(socket)
|
||||
if frame['command'] == 'open'
|
||||
offer=Hash[*frame['message'].scan(/^(.*)=(.*)$/).flatten]
|
||||
if offer['relp_version'].nil?
|
||||
@logger.warn("No relp version specified")
|
||||
#if no version specified, relp spec says we must close connection
|
||||
self.serverclose
|
||||
self.serverclose(socket)
|
||||
raise RelpError, 'No relp_version specified'
|
||||
#subtracting one array from the other checks to see if all elements in @required_relp_commands are present in the offer
|
||||
elsif ! (@required_relp_commands - offer['commands'].split(',')).empty?
|
||||
@logger.warn("Not all required commands are available", :required => @required_relp_commands, :offer => offer['commands'])
|
||||
#Tell them why we're closing the connection:
|
||||
response_frame = Hash.new
|
||||
response_frame['txnr'] = frame['txnr']
|
||||
|
@ -132,9 +137,8 @@ class RelpServer < Relp
|
|||
response_frame['message'] = '500 Required command(s) '
|
||||
+ (@required_relp_commands - offer['commands'].split(',')).join(',')
|
||||
+ ' not offered'
|
||||
self.frame_write(response_frame)
|
||||
|
||||
self.serverclose
|
||||
self.frame_write(socket,response_frame)
|
||||
self.serverclose(socket)
|
||||
raise InsufficientCommands, offer['commands']
|
||||
+ ' offered, require ' + @required_relp_commands.join(',')
|
||||
else
|
||||
|
@ -147,18 +151,18 @@ class RelpServer < Relp
|
|||
response_frame['message'] += 'relp_version=' + RelpVersion + "\n"
|
||||
response_frame['message'] += 'relp_software=' + RelpSoftware + "\n"
|
||||
response_frame['message'] += 'commands=' + @required_relp_commands.join(',')#TODO: optional ones
|
||||
self.frame_write(response_frame)
|
||||
return self
|
||||
self.frame_write(socket, response_frame)
|
||||
return self, socket
|
||||
end
|
||||
else
|
||||
self.serverclose
|
||||
self.serverclose(socket)
|
||||
raise InappropriateCommand, frame['command'] + ' expecting open'
|
||||
end
|
||||
end
|
||||
|
||||
#This does not ack the frame, just reads it
|
||||
def syslog_read
|
||||
frame = self.frame_read
|
||||
def syslog_read(socket)
|
||||
frame = self.frame_read(socket)
|
||||
if frame['command'] == 'syslog'
|
||||
return frame
|
||||
elsif frame['command'] == 'close'
|
||||
|
@ -166,39 +170,38 @@ class RelpServer < Relp
|
|||
response_frame = Hash.new
|
||||
response_frame['txnr'] = frame['txnr']
|
||||
response_frame['command'] = 'rsp'
|
||||
self.frame_write(response_frame)
|
||||
self.serverclose
|
||||
self.frame_write(socket,response_frame)
|
||||
self.serverclose(socket)
|
||||
raise ConnectionClosed
|
||||
else
|
||||
#the client is trying to do something unexpected
|
||||
self.serverclose
|
||||
self.serverclose(socket)
|
||||
raise InappropriateCommand, frame['command'] + ' expecting syslog'
|
||||
end
|
||||
end
|
||||
|
||||
def serverclose
|
||||
def serverclose(socket)
|
||||
frame = Hash.new
|
||||
frame['txnr'] = 0
|
||||
frame['command'] = 'serverclose'
|
||||
begin
|
||||
self.frame_write(frame)
|
||||
@socket.close
|
||||
self.frame_write(socket,frame)
|
||||
socket.close
|
||||
rescue ConnectionClosed
|
||||
end
|
||||
end
|
||||
|
||||
def shutdown
|
||||
@server.shutdown
|
||||
@server.close
|
||||
rescue Exception#@server might already be down
|
||||
end
|
||||
|
||||
def ack(txnr)
|
||||
def ack(socket, txnr)
|
||||
frame = Hash.new
|
||||
frame['txnr'] = txnr
|
||||
frame['command'] = 'rsp'
|
||||
frame['message'] = '200 OK'
|
||||
self.frame_write(frame)
|
||||
self.frame_write(socket, frame)
|
||||
end
|
||||
|
||||
end
|
||||
|
@ -208,7 +211,8 @@ class RelpClient < Relp
|
|||
|
||||
def initialize(host,port,required_commands = [],buffer_size = 128,
|
||||
retransmission_timeout=10)
|
||||
|
||||
@logger = Cabin::Channel.get(LogStash)
|
||||
@logger.info? and @logger.info("Starting RELP client", :host => host, :port => port)
|
||||
@server = false
|
||||
@buffer = Hash.new
|
||||
|
||||
|
@ -231,45 +235,45 @@ class RelpClient < Relp
|
|||
offer['message'] = 'relp_version=' + RelpVersion + "\n"
|
||||
offer['message'] += 'relp_software=' + RelpSoftware + "\n"
|
||||
offer['message'] += 'commands=' + @required_relp_commands.join(',')#TODO: add optional ones
|
||||
self.frame_write(offer)
|
||||
response_frame = self.frame_read
|
||||
|
||||
unless response_frame['message'][0,3] == '200'
|
||||
self.frame_write(@socket, offer)
|
||||
response_frame = self.frame_read(@socket)
|
||||
if response_frame['message'][0,3] != '200'
|
||||
raise RelpError,response_frame['message']
|
||||
end
|
||||
|
||||
response=Hash[*response_frame['message'][7..-1].scan(/^(.*)=(.*)$/).flatten]
|
||||
if response['relp_version'].nil?
|
||||
#if no version specified, relp spec says we must close connection
|
||||
self.close
|
||||
self.close()
|
||||
raise RelpError, 'No relp_version specified; offer: '
|
||||
+ response_frame['message'][6..-1].scan(/^(.*)=(.*)$/).flatten
|
||||
|
||||
#subtracting one array from the other checks to see if all elements in @required_relp_commands are present in the offer
|
||||
elsif ! (@required_relp_commands - response['commands'].split(',')).empty?
|
||||
#if it can't receive syslog it's useless to us; close the connection
|
||||
self.close
|
||||
self.close()
|
||||
raise InsufficientCommands, response['commands'] + ' offered, require '
|
||||
+ @required_relp_commands.join(',')
|
||||
end
|
||||
#If we've got this far with no problems, we're good to go
|
||||
@logger.info? and @logger.info("Connection establish with server")
|
||||
|
||||
#This thread deals with responses that come back
|
||||
reader = Thread.start do |parent|
|
||||
reader = Thread.start do
|
||||
loop do
|
||||
f = self.frame_read
|
||||
f = self.frame_read(@socket)
|
||||
if f['command'] == 'rsp' && f['message'] == '200 OK'
|
||||
@buffer.delete(f['txnr'])
|
||||
elsif f['command'] == 'rsp' && f['message'][0,1] == '5'
|
||||
#TODO: What if we get an error for something we're already retransmitted due to timeout?
|
||||
new_txnr = self.frame_write(@buffer[f['txnr']])
|
||||
new_txnr = self.frame_write(@socket, @buffer[f['txnr']])
|
||||
@buffer[new_txnr] = @buffer[f['txnr']]
|
||||
@buffer.delete(f['txnr'])
|
||||
elsif f['command'] == 'serverclose' || f['txnr'] == @close_txnr
|
||||
parent.raise ConnectionClosed#TODO: raising errors like this makes no sense
|
||||
break
|
||||
else
|
||||
#Don't know what's going on if we get here, but it can't be good
|
||||
parent.raise RelpError#TODO: raising errors like this makes no sense
|
||||
raise RelpError#TODO: raising errors like this makes no sense
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -280,7 +284,7 @@ class RelpClient < Relp
|
|||
loop do
|
||||
#This returns old txnrs that are still present
|
||||
(@buffer.keys & old_buffer.keys).each do |txnr|
|
||||
new_txnr = self.frame_write(@buffer[txnr])
|
||||
new_txnr = self.frame_write(@socket, @buffer[txnr])
|
||||
@buffer[new_txnr] = @buffer[txnr]
|
||||
@buffer.delete(txnr)
|
||||
end
|
||||
|
@ -294,7 +298,7 @@ class RelpClient < Relp
|
|||
def close
|
||||
frame = Hash.new
|
||||
frame['command'] = 'close'
|
||||
@close_txnr=self.frame_write(frame)
|
||||
@close_txnr=self.frame_write(@socket, frame)
|
||||
#TODO: ought to properly wait for a reply etc. The serverclose will make it work though
|
||||
sleep @retransmission_timeout
|
||||
@socket.close#TODO: shutdown?
|
||||
|
@ -310,7 +314,7 @@ class RelpClient < Relp
|
|||
frame['command'] = 'syslog'
|
||||
frame['message'] = logline
|
||||
|
||||
txnr = self.frame_write(frame)
|
||||
txnr = self.frame_write(@socket, frame)
|
||||
@buffer[txnr] = frame
|
||||
end
|
||||
|
||||
|
|
102
spec/inputs/relp.rb
Normal file
102
spec/inputs/relp.rb
Normal file
|
@ -0,0 +1,102 @@
|
|||
# coding: utf-8
|
||||
require "test_utils"
|
||||
require "socket"
|
||||
require "logstash/util/relp"
|
||||
|
||||
describe "inputs/relp" do
|
||||
extend LogStash::RSpec
|
||||
|
||||
describe "Single client connection" do
|
||||
event_count = 10
|
||||
port = 5511
|
||||
config <<-CONFIG
|
||||
input {
|
||||
relp {
|
||||
type => "blah"
|
||||
port => #{port}
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
th = Thread.current
|
||||
input do |plugins|
|
||||
relp = plugins.first
|
||||
|
||||
#Define test output
|
||||
sequence = 0
|
||||
output = Shiftback.new do |event|
|
||||
sequence += 1
|
||||
relp.teardown if sequence == event_count
|
||||
begin
|
||||
insist { event.message } == "Hello"
|
||||
rescue Exception => failure
|
||||
# Get out of the threads nets
|
||||
th.raise failure
|
||||
end
|
||||
end
|
||||
|
||||
#Run input in a separate thread
|
||||
relp.register
|
||||
thread = Thread.new(relp, output) do |*args|
|
||||
relp.run(output)
|
||||
end
|
||||
|
||||
#Send events from clients
|
||||
client = RelpClient.new("0.0.0.0", port, ["syslog"])
|
||||
event_count.times do |value|
|
||||
client.syslog_write("Hello")
|
||||
end
|
||||
#Do not call client.close as the connection termination will be
|
||||
#initiated by the relp server
|
||||
#wait for input termination
|
||||
thread.join()
|
||||
end # input
|
||||
end
|
||||
describe "Two client connection" do
|
||||
event_count = 100
|
||||
port = 5511
|
||||
config <<-CONFIG
|
||||
input {
|
||||
relp {
|
||||
type => "blah"
|
||||
port => #{port}
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
th = Thread.current
|
||||
input do |plugins|
|
||||
sequence = 0
|
||||
relp = plugins.first
|
||||
output = Shiftback.new do |event|
|
||||
sequence += 1
|
||||
relp.teardown if sequence == event_count
|
||||
begin
|
||||
insist { event.message } == "Hello"
|
||||
rescue Exception => failure
|
||||
# Get out of the threads nets
|
||||
th.raise failure
|
||||
end
|
||||
end
|
||||
|
||||
relp.register
|
||||
#Run input in a separate thread
|
||||
thread = Thread.new(relp, output) do |*args|
|
||||
relp.run(output)
|
||||
end
|
||||
|
||||
#Send events from clients sockets
|
||||
client = RelpClient.new("0.0.0.0", port, ["syslog"])
|
||||
client2 = RelpClient.new("0.0.0.0", port, ["syslog"])
|
||||
event_count.times do |value|
|
||||
client.syslog_write("Hello")
|
||||
client2.syslog_write("Hello")
|
||||
end
|
||||
#Do not call client.close as the connection termination will be
|
||||
#initiated by the relp server
|
||||
|
||||
#wait for input termination
|
||||
thread.join
|
||||
end # input
|
||||
end
|
||||
end
|
Loading…
Add table
Add a link
Reference in a new issue