mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- fix tcp input tests
This commit is contained in:
parent
4dc1207483
commit
a9b19c484d
1 changed files with 68 additions and 184 deletions
|
@ -2,213 +2,97 @@
|
|||
require "test_utils"
|
||||
require "socket"
|
||||
|
||||
# Not sure why but each test need a different port
|
||||
# TODO: timeout around the thread.join
|
||||
describe "inputs/tcp" do
|
||||
extend LogStash::RSpec
|
||||
|
||||
describe "read json_event" do
|
||||
|
||||
event_count = 10
|
||||
port = 5511
|
||||
config <<-CONFIG
|
||||
input {
|
||||
tcp {
|
||||
type => "blah"
|
||||
port => #{port}
|
||||
format => "json_event"
|
||||
input {
|
||||
tcp {
|
||||
port => #{port}
|
||||
}
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
th = Thread.current
|
||||
input do |plugins|
|
||||
sequence = 0
|
||||
tcp = plugins.first
|
||||
output = Shiftback.new do |event|
|
||||
sequence += 1
|
||||
tcp.teardown if sequence == event_count
|
||||
begin
|
||||
insist { event["sequence"] } == sequence -1
|
||||
insist { event["message"]} == "Hello ü Û"
|
||||
insist { event["message"].encoding } == Encoding.find("UTF-8")
|
||||
rescue Exception => failure
|
||||
# Get out of the threads nets
|
||||
th.raise failure
|
||||
end
|
||||
input do |pipeline, queue|
|
||||
Thread.new { pipeline.run }
|
||||
sleep 0.1 while !pipeline.ready?
|
||||
|
||||
socket = Stud.try(5.times) { TCPSocket.new("127.0.0.1", port) }
|
||||
event_count.times do |i|
|
||||
# unicode smiley for testing unicode support!
|
||||
socket.puts("#{i} ☹")
|
||||
end
|
||||
#Prepare input
|
||||
tcp.register
|
||||
#Run input in a separate thread
|
||||
thread = Thread.new(tcp, output) do |*args|
|
||||
tcp.run(output)
|
||||
socket.close
|
||||
|
||||
events = event_count.times.collect { queue.pop }
|
||||
event_count.times do |i|
|
||||
insist { events[i]["message"] } == "#{i} ☹"
|
||||
end
|
||||
#Send events from clients sockets
|
||||
event_count.times do |value|
|
||||
client_socket = TCPSocket.new("0.0.0.0", port)
|
||||
event = LogStash::Event.new("@fields" => { "message" => "Hello ü Û", "sequence" => value })
|
||||
client_socket.puts event.to_json
|
||||
client_socket.close
|
||||
# micro sleep to ensure sequencing
|
||||
sleep(0.1)
|
||||
end
|
||||
#wait for input termination
|
||||
thread.join
|
||||
end # input
|
||||
end
|
||||
|
||||
describe "read plain events with system defaults, should works on UTF-8 system" do
|
||||
event_count = 10
|
||||
port = 5512
|
||||
config <<-CONFIG
|
||||
input {
|
||||
tcp {
|
||||
type => "blah"
|
||||
port => #{port}
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
th = Thread.current
|
||||
input do |plugins|
|
||||
sequence = 0
|
||||
tcp = plugins.first
|
||||
output = Shiftback.new do |event|
|
||||
sequence += 1
|
||||
begin
|
||||
insist { event.message } == "Hello ü Û\n"
|
||||
insist { event.message.encoding } == Encoding.find("UTF-8")
|
||||
rescue Exception => failure
|
||||
# Get out of the threads nets
|
||||
th.raise failure
|
||||
end
|
||||
if sequence == event_count
|
||||
tcp.teardown
|
||||
end
|
||||
end
|
||||
|
||||
tcp.register
|
||||
#Run input in a separate thread
|
||||
thread = Thread.new(tcp, output) do |*args|
|
||||
tcp.run(output)
|
||||
end
|
||||
#Send events from clients sockets
|
||||
client_socket = TCPSocket.new("0.0.0.0", port)
|
||||
event_count.times do |value|
|
||||
client_socket.write "Hello ü Û\n"
|
||||
end
|
||||
client_socket.close
|
||||
#wait for input termination
|
||||
puts "Waiting for tcp input thread to finish"
|
||||
thread.join
|
||||
end # input
|
||||
end
|
||||
|
||||
describe "read plain events with UTF-8 like charset, to prove that something is wrong with previous failing test" do
|
||||
event_count = 10
|
||||
port = 5514
|
||||
config <<-CONFIG
|
||||
input {
|
||||
tcp {
|
||||
type => "blah"
|
||||
port => #{port}
|
||||
charset => "CP65001" #that's just an alias of UTF-8
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
th = Thread.current
|
||||
# Catch aborting reception threads
|
||||
input do |plugins|
|
||||
sequence = 0
|
||||
tcp = plugins.first
|
||||
output = Shiftback.new do |event|
|
||||
sequence += 1
|
||||
tcp.teardown if sequence == event_count
|
||||
begin
|
||||
insist { event["@source_host"] } != nil
|
||||
insist { event["@source"] } != nil
|
||||
insist { event["@timestamp"] } != nil
|
||||
insist { event["@fields"] } != nil
|
||||
insist { event["@type"] } != nil
|
||||
insist { event["@tags"] } != nil
|
||||
insist { event.message } == "Hello ü Û"
|
||||
insist { event.message.encoding } == Encoding.find("UTF-8")
|
||||
rescue Exception => failure
|
||||
# Get out of the threads nets
|
||||
th.raise failure
|
||||
end
|
||||
end
|
||||
|
||||
tcp.register
|
||||
#Run input in a separate thread
|
||||
|
||||
thread = Thread.new(tcp, output) do |*args|
|
||||
tcp.run(output)
|
||||
end
|
||||
#Send events from clients sockets
|
||||
event_count.times do |value|
|
||||
client_socket = TCPSocket.new("0.0.0.0", port)
|
||||
# puts "Encoding of client", client_socket.external_encoding, client_socket.internal_encoding
|
||||
client_socket.write "Hello ü Û"
|
||||
client_socket.close
|
||||
# micro sleep to ensure sequencing, TODO must think of a cleaner solution
|
||||
sleep(0.1)
|
||||
end
|
||||
#wait for input termination
|
||||
#TODO: timeout
|
||||
thread.join
|
||||
end # input
|
||||
end
|
||||
|
||||
describe "read plain events with ISO-8859-1 charset" do
|
||||
event_count = 10
|
||||
describe "read events with plain codec and ISO-8859-1 charset" do
|
||||
port = 5513
|
||||
charset = "ISO-8859-1"
|
||||
config <<-CONFIG
|
||||
input {
|
||||
tcp {
|
||||
type => "blah"
|
||||
port => #{port}
|
||||
charset => "#{charset}"
|
||||
input {
|
||||
tcp {
|
||||
port => #{port}
|
||||
codec => plain { charset => "#{charset}" }
|
||||
}
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
th = Thread.current
|
||||
input do |plugins|
|
||||
sequence = 0
|
||||
tcp = plugins.first
|
||||
output = Shiftback.new do |event|
|
||||
sequence += 1
|
||||
tcp.teardown if sequence == event_count
|
||||
begin
|
||||
insist { event.message } == "Hello ü Û"
|
||||
insist { event.message.encoding } == Encoding.find("UTF-8")
|
||||
rescue Exception => failure
|
||||
# Get out of the threads nets
|
||||
th.raise failure
|
||||
end
|
||||
end
|
||||
input do |pipeline, queue|
|
||||
Thread.new { pipeline.run }
|
||||
sleep 0.1 while !pipeline.ready?
|
||||
|
||||
tcp.register
|
||||
#Run input in a separate thread
|
||||
socket = Stud.try(5.times) { TCPSocket.new("127.0.0.1", port) }
|
||||
text = "\xA3" # the £ symbol in ISO-8859-1 aka Latin-1
|
||||
text.force_encoding("ISO-8859-1")
|
||||
socket.puts(text)
|
||||
socket.close
|
||||
|
||||
thread = Thread.new(tcp, output) do |*args|
|
||||
tcp.run(output)
|
||||
end
|
||||
#Send events from clients sockets
|
||||
event_count.times do |value|
|
||||
client_socket = TCPSocket.new("0.0.0.0", port)
|
||||
#Force client encoding
|
||||
client_socket.set_encoding(charset)
|
||||
client_socket.write "Hello ü Û"
|
||||
client_socket.close
|
||||
# micro sleep to ensure sequencing
|
||||
sleep(0.1)
|
||||
end
|
||||
#wait for input termination
|
||||
thread.join
|
||||
event = queue.pop
|
||||
# Make sure the 0xA3 latin-1 code converts correctly to UTF-8.
|
||||
insist { event["message"].size } == 1
|
||||
insist { event["message"].bytesize } == 2
|
||||
insist { event["message"] } == "£"
|
||||
end # input
|
||||
end
|
||||
|
||||
describe "read events with json codec" do
|
||||
port = 5514
|
||||
config <<-CONFIG
|
||||
input {
|
||||
tcp {
|
||||
port => #{port}
|
||||
codec => json
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
input do |pipeline, queue|
|
||||
Thread.new { pipeline.run }
|
||||
sleep 0.1 while !pipeline.ready?
|
||||
|
||||
data = {
|
||||
"hello" => "world",
|
||||
"foo" => [1,2,3],
|
||||
"baz" => { "1" => "2" }
|
||||
}
|
||||
|
||||
socket = Stud.try(5.times) { TCPSocket.new("127.0.0.1", port) }
|
||||
socket.puts(data.to_json)
|
||||
socket.close
|
||||
|
||||
event = queue.pop
|
||||
insist { event["hello"] } == data["hello"]
|
||||
insist { event["foo"] } == data["foo"]
|
||||
insist { event["baz"] } == data["baz"]
|
||||
end # input
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue