mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
Prevent variable leak between test
When running this test with a newer version of rspec we got the warning that some values where leaking between example: ``` 1) LogStash::ShutdownWatcher when pipeline is not stalled .unsafe_shutdown = false shouldn't force the shutdown Failure/Error: m.call(*args) The use of doubles or partial doubles from rspec-mocks outside of the per-test lifecycle is not supported. # ./logstash-core/lib/logstash/shutdown_watcher.rb:67:in `pipeline_report_snapshot' # ./logstash-core/spec/logstash/shutdown_watcher_spec.rb:22:in `block in /Users/ph/es/second/logstash/logstash-core/spec/logstash/shutdown_watcher_spec.rb' # ./logstash-core/lib/logstash/shutdown_watcher.rb:44:in `block in start' # ./vendor/bundle/jruby/2.3.0/gems/stud-0.0.22/lib/stud/interval.rb:20:in `interval' # ./logstash-core/lib/logstash/shutdown_watcher.rb:42:in `start' # ./logstash-core/spec/logstash/shutdown_watcher_spec.rb:93:in `block in (root)' ``` We have added a few helpers method in the class that allowed us to simplify the expectation and removed any usage of `Thread#kill` Fixes #7152
This commit is contained in:
parent
1a2c0c8597
commit
b00b7f9656
2 changed files with 36 additions and 16 deletions
|
@ -1,4 +1,6 @@
|
|||
# encoding: utf-8
|
||||
require "concurrent/atomic/atomic_fixnum"
|
||||
require "concurrent/atomic/atomic_boolean"
|
||||
|
||||
module LogStash
|
||||
class ShutdownWatcher
|
||||
|
@ -16,6 +18,8 @@ module LogStash
|
|||
@report_every = report_every
|
||||
@abort_threshold = abort_threshold
|
||||
@reports = []
|
||||
@attempts_count = Concurrent::AtomicFixnum.new(0)
|
||||
@running = Concurrent::AtomicBoolean.new(false)
|
||||
end
|
||||
|
||||
def self.unsafe_shutdown=(boolean)
|
||||
|
@ -35,11 +39,26 @@ module LogStash
|
|||
self.class.logger
|
||||
end
|
||||
|
||||
def attempts_count
|
||||
@attempts_count.value
|
||||
end
|
||||
|
||||
def stop!
|
||||
@running.make_false
|
||||
end
|
||||
|
||||
def stopped?
|
||||
@running.false?
|
||||
end
|
||||
|
||||
def start
|
||||
sleep(@cycle_period)
|
||||
cycle_number = 0
|
||||
stalled_count = 0
|
||||
running!
|
||||
Stud.interval(@cycle_period) do
|
||||
@attempts_count.increment
|
||||
break if stopped?
|
||||
break unless @pipeline.thread.alive?
|
||||
@reports << pipeline_report_snapshot
|
||||
@reports.delete_at(0) if @reports.size > @report_every # expire old report
|
||||
|
@ -61,6 +80,8 @@ module LogStash
|
|||
end
|
||||
cycle_number = (cycle_number + 1) % @report_every
|
||||
end
|
||||
ensure
|
||||
stop!
|
||||
end
|
||||
|
||||
def pipeline_report_snapshot
|
||||
|
@ -90,5 +111,10 @@ module LogStash
|
|||
def force_exit
|
||||
exit(-1)
|
||||
end
|
||||
|
||||
private
|
||||
def running!
|
||||
@running.make_true
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -9,22 +9,12 @@ describe LogStash::ShutdownWatcher do
|
|||
let(:pipeline) { double("pipeline") }
|
||||
let(:reporter) { double("reporter") }
|
||||
let(:reporter_snapshot) { double("reporter snapshot") }
|
||||
report_count = 0
|
||||
|
||||
before :each do
|
||||
allow(pipeline).to receive(:reporter).and_return(reporter)
|
||||
allow(pipeline).to receive(:thread).and_return(Thread.current)
|
||||
allow(reporter).to receive(:snapshot).and_return(reporter_snapshot)
|
||||
allow(reporter_snapshot).to receive(:o_simple_hash).and_return({})
|
||||
|
||||
allow(subject).to receive(:pipeline_report_snapshot).and_wrap_original do |m, *args|
|
||||
report_count += 1
|
||||
m.call(*args)
|
||||
end
|
||||
end
|
||||
|
||||
after :each do
|
||||
report_count = 0
|
||||
end
|
||||
|
||||
context "when pipeline is stalled" do
|
||||
|
@ -69,8 +59,9 @@ describe LogStash::ShutdownWatcher do
|
|||
it "shouldn't force the shutdown" do
|
||||
expect(subject).to_not receive(:force_exit)
|
||||
thread = Thread.new(subject) {|subject| subject.start }
|
||||
sleep 0.1 until report_count > check_threshold
|
||||
thread.kill
|
||||
sleep 0.1 until subject.attempts_count > check_threshold
|
||||
subject.stop!
|
||||
expect(thread.join(60)).to_not be_nil
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -91,8 +82,9 @@ describe LogStash::ShutdownWatcher do
|
|||
it "should force the shutdown" do
|
||||
expect(subject).to_not receive(:force_exit)
|
||||
thread = Thread.new(subject) {|subject| subject.start }
|
||||
sleep 0.1 until report_count > check_threshold
|
||||
thread.kill
|
||||
sleep 0.1 until subject.attempts_count > check_threshold
|
||||
subject.stop!
|
||||
expect(thread.join(60)).to_not be_nil
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -105,8 +97,10 @@ describe LogStash::ShutdownWatcher do
|
|||
it "shouldn't force the shutdown" do
|
||||
expect(subject).to_not receive(:force_exit)
|
||||
thread = Thread.new(subject) {|subject| subject.start }
|
||||
sleep 0.1 until report_count > check_threshold
|
||||
thread.kill
|
||||
sleep 0.1 until subject.attempts_count > check_threshold
|
||||
subject.stop!
|
||||
thread.join
|
||||
expect(thread.join(60)).to_not be_nil
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue