mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
Fix relp input and update spec
This commit is contained in:
parent
e79f53646a
commit
dcaca7e184
2 changed files with 24 additions and 56 deletions
|
@ -44,7 +44,7 @@ class LogStash::Inputs::Relp < LogStash::Inputs::Base
|
|||
frame = relpserver.syslog_read(socket)
|
||||
@codec.decode(frame["message"]) do |event|
|
||||
decorate(event)
|
||||
event["host"] = _addressevent_source
|
||||
event["host"] = client_address
|
||||
output_queue << event
|
||||
end
|
||||
|
||||
|
|
|
@ -18,43 +18,28 @@ describe "inputs/relp" do
|
|||
}
|
||||
CONFIG
|
||||
|
||||
th = Thread.current
|
||||
input do |plugins|
|
||||
relp = plugins.first
|
||||
|
||||
#Define test output
|
||||
sequence = 0
|
||||
output = Shiftback.new do |event|
|
||||
sequence += 1
|
||||
relp.teardown if sequence == event_count
|
||||
begin
|
||||
insist { event.message } == "Hello"
|
||||
rescue Exception => failure
|
||||
# Get out of the threads nets
|
||||
th.raise failure
|
||||
end
|
||||
end
|
||||
|
||||
#Run input in a separate thread
|
||||
relp.register
|
||||
thread = Thread.new(relp, output) do |*args|
|
||||
relp.run(output)
|
||||
end
|
||||
input do |pipeline, queue|
|
||||
th = Thread.new { pipeline.run }
|
||||
sleep 0.1 while !pipeline.ready?
|
||||
|
||||
#Send events from clients
|
||||
client = RelpClient.new("0.0.0.0", port, ["syslog"])
|
||||
event_count.times do |value|
|
||||
client.syslog_write("Hello")
|
||||
client.syslog_write("Hello #{value}")
|
||||
end
|
||||
#Do not call client.close as the connection termination will be
|
||||
#initiated by the relp server
|
||||
#wait for input termination
|
||||
thread.join()
|
||||
|
||||
events = event_count.times.collect { queue.pop }
|
||||
event_count.times do |i|
|
||||
insist { events[i]["message"] } == "Hello #{i}"
|
||||
end
|
||||
|
||||
pipeline.shutdown
|
||||
th.join
|
||||
end # input
|
||||
end
|
||||
describe "Two client connection" do
|
||||
event_count = 100
|
||||
port = 5511
|
||||
port = 5512
|
||||
config <<-CONFIG
|
||||
input {
|
||||
relp {
|
||||
|
@ -64,39 +49,22 @@ describe "inputs/relp" do
|
|||
}
|
||||
CONFIG
|
||||
|
||||
th = Thread.current
|
||||
input do |plugins|
|
||||
sequence = 0
|
||||
relp = plugins.first
|
||||
output = Shiftback.new do |event|
|
||||
sequence += 1
|
||||
relp.teardown if sequence == event_count
|
||||
begin
|
||||
insist { event.message } == "Hello"
|
||||
rescue Exception => failure
|
||||
# Get out of the threads nets
|
||||
th.raise failure
|
||||
end
|
||||
end
|
||||
|
||||
relp.register
|
||||
#Run input in a separate thread
|
||||
thread = Thread.new(relp, output) do |*args|
|
||||
relp.run(output)
|
||||
end
|
||||
input do |pipeline, queue|
|
||||
Thread.new { pipeline.run }
|
||||
sleep 0.1 while !pipeline.ready?
|
||||
|
||||
#Send events from clients sockets
|
||||
client = RelpClient.new("0.0.0.0", port, ["syslog"])
|
||||
client2 = RelpClient.new("0.0.0.0", port, ["syslog"])
|
||||
|
||||
event_count.times do |value|
|
||||
client.syslog_write("Hello")
|
||||
client2.syslog_write("Hello")
|
||||
client.syslog_write("Hello from client")
|
||||
client2.syslog_write("Hello from client 2")
|
||||
end
|
||||
#Do not call client.close as the connection termination will be
|
||||
#initiated by the relp server
|
||||
|
||||
#wait for input termination
|
||||
thread.join
|
||||
|
||||
events = (event_count*2).times.collect { queue.pop }
|
||||
insist { events.select{|event| event["message"]=="Hello from client" }.size } == event_count
|
||||
insist { events.select{|event| event["message"]=="Hello from client 2" }.size } == event_count
|
||||
end # input
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue