mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- Add inputs/stomp test
- Add necessary changes to LogStash::Stomp::Handler to allow for testing and for using that class as a stomp client without subscriptions.
This commit is contained in:
parent
0a7f793476
commit
2494ea6d67
2 changed files with 122 additions and 4 deletions
|
@ -5,29 +5,42 @@ class LogStash::Stomp
|
|||
class Handler < EventMachine::Connection
|
||||
include EM::Protocols::Stomp
|
||||
|
||||
attr_accessor :should_subscribe
|
||||
attr_accessor :ready
|
||||
|
||||
def initialize(*args)
|
||||
super
|
||||
|
||||
@input = args[0]
|
||||
@logger = args[1]
|
||||
@url = args[2]
|
||||
@should_subscribe = true
|
||||
@ready = false
|
||||
end # def initialize
|
||||
|
||||
def connection_completed
|
||||
@logger.debug("Connected")
|
||||
connect :login => @url.user, :passcode => @url.password
|
||||
@ready = true
|
||||
end # def connection_completed
|
||||
|
||||
def unbind
|
||||
@logger.error(["Error when connecting to stomp broker", { :url => @url }])
|
||||
@logger.error(["Connection to stomp broker died, retrying.", { :url => @url }])
|
||||
@ready = false
|
||||
EventMachine::Timer.new(1) do
|
||||
reconnect(@url.host, @url.port)
|
||||
end
|
||||
end # def unbind
|
||||
|
||||
def receive_msg(message)
|
||||
@logger.debug(["receiving message", { :msg => message }])
|
||||
if message.command == "CONNECTED"
|
||||
@logger.debug(["subscribing to", { :path => @url.path }])
|
||||
subscribe @url.path
|
||||
return
|
||||
if @should_subscribe
|
||||
@logger.debug(["subscribing to", { :path => @url.path }])
|
||||
subscribe @url.path
|
||||
return
|
||||
end
|
||||
@ready = true
|
||||
end
|
||||
end # def receive_msg
|
||||
end # class Handler
|
||||
|
|
105
test/logstash/inputs/test_stomp.rb
Normal file
105
test/logstash/inputs/test_stomp.rb
Normal file
|
@ -0,0 +1,105 @@
|
|||
$:.unshift File.dirname(__FILE__) + "/../../../lib"
|
||||
$:.unshift File.dirname(__FILE__) + "/../../"
|
||||
|
||||
require "logstash/testcase"
|
||||
require "logstash/agent"
|
||||
require "logstash/stomp/handler"
|
||||
require "logstash/logging"
|
||||
|
||||
class TestInputStomp < LogStash::TestCase
|
||||
def em_setup
|
||||
@flags ||= ["-d"]
|
||||
# Launch stomp server on a random port
|
||||
stomp_done = false
|
||||
#1.upto(30) do
|
||||
stomp_done = true
|
||||
@stomp_pid = nil
|
||||
@port = 61613
|
||||
while false
|
||||
#@port = (rand * 30000 + 20000).to_i
|
||||
@port = 61613
|
||||
@stomp_pid = Process.fork do
|
||||
args = ["-p", @port.to_s, *@flags]
|
||||
exec("stompserver", "stompserver", *args)
|
||||
$stderr.puts("$!")
|
||||
exit 1
|
||||
end
|
||||
|
||||
# Let stompserver start up and try to start listening.
|
||||
# Hard to otherwise test this. Maybe a tcp connection with timeouts?
|
||||
sleep(2)
|
||||
Process.waitpid(@stomp_pid, Process::WNOHANG)
|
||||
if $? != nil and $?.exited?
|
||||
# Try again
|
||||
else
|
||||
stomp_done = true
|
||||
break
|
||||
end
|
||||
end
|
||||
|
||||
if !stomp_done
|
||||
raise "Stompserver failed to start (failure to find ephemeral port? stompserver not installed?)"
|
||||
end
|
||||
|
||||
@queue = "/queue/testing"
|
||||
config = {
|
||||
"inputs" => {
|
||||
@type => [
|
||||
"stomp://localhost:#{@port}#{@queue}"
|
||||
]
|
||||
},
|
||||
"outputs" => [
|
||||
"internal:///"
|
||||
]
|
||||
}
|
||||
|
||||
super(config)
|
||||
|
||||
@stomp = EventMachine::connect("127.0.0.1", @port, LogStash::Stomp::Handler,
|
||||
nil, LogStash::Logger.new(STDERR),
|
||||
URI.parse(config["inputs"][@type][0]))
|
||||
@stomp.should_subscribe = false
|
||||
end # def em_setup
|
||||
|
||||
def test_foo
|
||||
inputs = [
|
||||
LogStash::Event.new("@message" => "hello world", "@type" => @type),
|
||||
LogStash::Event.new("@message" => "one two three", "@type" => @type),
|
||||
LogStash::Event.new("@message" => "one two three", "@type" => @type,
|
||||
"@fields" => { "field1" => "value1"})
|
||||
]
|
||||
EventMachine::run do
|
||||
em_setup
|
||||
expected_events = inputs.clone
|
||||
@output.subscribe do |event|
|
||||
expect = expected_events.shift
|
||||
#ap :event => event.to_hash
|
||||
|
||||
assert_equal(expect.message, event.message, "@message")
|
||||
assert_equal(expect.type, event.type, "@type")
|
||||
assert_equal(expect.tags, event.tags, "@tags")
|
||||
assert_equal(expect.timestamp, event.timestamp, "@tags")
|
||||
assert_equal(expect.fields, event.fields, "@tags")
|
||||
@agent.stop if expected_events.size == 0
|
||||
end
|
||||
|
||||
timer = EM::PeriodicTimer.new(0.2) do
|
||||
next if !@stomp.ready
|
||||
|
||||
if inputs.size == 0
|
||||
timer.cancel
|
||||
next
|
||||
end
|
||||
|
||||
event = inputs.shift
|
||||
@stomp.send @queue, event.to_json
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def teardown
|
||||
if @stomp_pid
|
||||
Process.kill("KILL", @stomp_pid)
|
||||
end
|
||||
end # def teardown
|
||||
end # class TestInputStomp
|
Loading…
Add table
Add a link
Reference in a new issue