Synchronize state convergence to improve test stability

This (hopefully) fixes bugs like the one seen in https://logstash-ci.elastic.co/job/elastic+logstash+master+multijob--ruby-unit-tests/169/consoleFull

We got an error message like the one below.

This looks like concurrent stuff happened because the test directly invokes `converge_state_and_update`. This mutex should prevent the race from occurring

```
00:55:35     [ERROR] 2018-05-23 00:55:35.829 [Converge PipelineAction::Create<main>] create - Attempted to create a pipeline that already exists! This shouldn't be possible {:pipeline_id=>:main, :pipelines=>{:main=>{:pipeline_id=>"main", :settings=>"#<LogStash::Settings:0x56816357 @transient_settings={}, @settings={\"node.name\"=>#<LogStash::Setting::String:0x62f94470 @possible_strings=[], @name=\"node.name\", @value=nil, @value_is_set=false, @klass=String, @strict=true, @default=\"15b67abeef07\", @validator_proc=nil>, \"path.config\"=>#<LogStash::Setting::NullableString:0x1d659cdd @possible_strings=[], @name=\"path.config\", @value=\"/tmp/studtmp-103b13a45b8fe779ee6b235b33ce483ac4c3db63372d41611bf7a95c0739\", @value_is_set=true, @klass=String, @strict=false, @default=nil, @validator_proc=nil>, \"path.data\"=>#<LogStash::Setting::WritableDirectory:0x40a3dffd @name=\"path.data\", @value=\"/tmp/d20180523-486-1a9nlli\", @value_is_set=true, @klass=String, @strict=false, @default=\"/opt/logstash/data\", @validator_proc=nil>, \"config.string\"=>#<LogStash::Setting::NullableString:0x68c2c3b7 @possible_strings=[], @name=\"config.string\", @value=nil, @value_is_set=false, @klass=String, @strict=false, @default=nil, @validator_proc=nil>, \"modules.cli\"=>#<LogStash::Setting::Modules:0x40b0c01b @name=\"modules.cli\", @value=nil, @value_is_set=false, @klass=LogStash::Util::ModulesSettingArray, @default=[], @validator_proc=nil>, \"modules\"=>#<LogStash::Setting::Modules:0x71023fa4 @name=\"modules\", @value=nil, @value_is_set=false, @klass=LogStash::Util::ModulesSettingArray, @default=[], @validator_proc=nil>, \"modules_list\"=>#<LogStash::Setting:0x2f005a4e @name=\"modules_list\", @value=nil, @value_is_set=false, @klass=Array, @strict=true, @default=[], @validator_proc=nil>, \"modules_variable_list\"=>#<LogStash::Setting:0x5121fb62 @name=\"modules_variable_list\", @value=nil, @value_is_set=false, @klass=Array, @strict=true, @default=[], @validator_proc=nil>, \"cloud.id\"=>#<LogStash::Setting::Modules:0x17574945 @name=\"cloud.id\", @value=nil, @value_is_set=false, @klass=LogStash::Util::CloudSettingId, @default=nil, @validator_proc=nil>, \"cloud.auth\"=>#<LogStash::Setting::Modules:0x77c8ab14 @name=\"cloud.auth\", @value=nil, @value_is_set=false, @klass=LogStash::Util::CloudSettingAuth, @default=nil, @validator_proc=nil>, \"modules_setup\"=>#<LogStash::Setting::Boolean:0x5a375d89 @name=\"modules_setup\", @value=nil, @value_is_set=false, @klass=Object, @default=false, @validator_proc=nil>, \"config.test_and_exit\"=>#<LogStash::Setting::Boolean:0x4134746 @name=\"config.test_and_exit\", @value=nil, @value_is_set=false, @klass=Object, @default=false, @validator_proc=nil>, \"config.reload.automatic\"=>#<LogStash::Setting::Boolean:0x29a32be0 @name=\"config.reload.automatic\", @value=nil, @value_is_set=false, @klass=Object, @default=false, @validator_proc=nil>, \"config.reload.interval\"=>#<LogStash::Setting::TimeValue:0x1b01b5cb @name=\"config.reload.interval\", @value=nil, @value_is_set=false, @klass=Fixnum, @default=3000000000, @validator_proc=nil>, \"config.support_escapes\"=>#<LogStash::Setting::Boolean:0x7b3a7837 @name=\"config.support_escapes\", @value=nil, @value_is_set=false, @klass=Object, @default=false, @validator_proc=nil>, \"metric.collect\"=>#<LogStash::Setting::Boolean:0x650d3961 @name=\"metric.collect\", @value=true, @value_is_set=true, @klass=Object, @default=true, @validator_proc=nil>, \"pipeline.id\"=>#<LogStash::Setting::String:0x55c1f20b @possible_strings=[], @name=\"pipeline.id\", @value=nil, @value_is_set=false, @klass=String, @strict=true, @default=\"main\", @validator_proc=nil>, \"pipeline.system\"=>#<LogStash::Setting::Boolean:0x4378aa95 @name=\"pipeline.system\", @value=nil, @value_is_set=false, @klass=Object, @default=false, @validator_proc=nil>, \"pipeline.workers\"=>#<LogStash::Setting::PositiveInteger:0x6bab890d @name=\"pipeline.workers\", @value=nil, @value_is_set=false, @klass=Integer, @default=16, @validator_proc=#<Proc:0x67e16328@/opt/logstash/logstash-core/lib/logstash/settings.rb:334>>, \"pipeline.output.workers\"=>#<LogStash::Setting::PositiveInteger:0x3d6a9475 @name=\"pipeline.output.workers\", @value=nil, @value_is_set=false, @klass=Integer, @default=1, @validator_proc=#<Proc:0x3b40ad4f@/opt/logstash/logstash-core/lib/logstash/settings.rb:334>>, \"pipeline.batch.size\"=>#<LogStash::Setting::PositiveInteger:0xc14b8cb @name=\"pipeline.batch.size\", @value=nil, @value_is_set=false, @klass=Integer, @default=125, @validator_proc=#<Proc:0x60806242@/opt/logstash/logstash-core/lib/logstash/settings.rb:334>>, \"pipeline.batch.delay\"=>#<LogStash::Setting::Numeric:0x9a09b1b @name=\"pipeline.batch.delay\", @value=nil, @value_is_set=false, @klass=Numeric, @default=50, @validator_proc=nil>, \"pipeline.unsafe_shutdown\"=>#<LogStash::Setting::Boolean:0x373475b5 @name=\"pipeline.unsafe_shutdown\", @value=nil, @value_is_set=false, @klass=Object, @default=false, @validator_proc=nil>, \"pipeline.java_execution\"=>#<LogStash::Setting::Boolean:0x1e4f4178 @name=\"pipeline.java_execution\", @value=nil, @value_is_set=false, @klass=Object, @default=true, @validator_proc=nil>, \"pipeline.reloadable\"=>#<LogStash::Setting::Boolean:0x7dbeb281 @name=\"pipeline.reloadable\", @value=nil, @value_is_set=false, @klass=Object, @default=true, @validator_proc=nil>, \"path.plugins\"=>#<LogStash::Setting:0x704b966e @name=\"path.plugins\", @value=nil, @value_is_set=false, @klass=Array, @strict=true, @default=[\"/tmp/studtmp-14955ca0fd3d079036abad8f09c7447c4227d5af0956ece8a86b5e239ed2\"], @validator_proc=nil>, \"interactive\"=>#<LogStash::Setting::NullableString:0x6446dc0f @possible_strings=[], @name=\"interactive\", @value=nil, @value_is_set=false, @klass=String, @strict=false, @default=nil, @validator_proc=nil>, \"config.debug\"=>#<LogStash::Setting::Boolean:0xe6057e @name=\"config.debug\", @value=nil, @value_is_set=false, @klass=Object, @default=false, @validator_proc=nil>, \"log.level\"=>#<LogStash::Setting::String:0x8c49559 @possible_strings=[\"fatal\", \"error\", \"warn\", \"debug\", \"info\", \"trace\"], @name=\"log.level\", @value=nil, @value_is_set=false, @klass=String, @strict=true, @default=\"info\", @validator_proc=nil>, \"version\"=>#<LogStash::Setting::Boolean:0x76648ce0 @name=\"version\", @value=nil, @value_is_set=false, @klass=Object, @default=false, @validator_proc=nil>, \"help\"=>#<LogStash::Setting::Boolean:0x4699aaa7 @name=\"help\", @value=nil, @value_is_set=false, @klass=Object, @default=false, @validator_proc=nil>, \"log.format\"=>#<LogStash::Setting::String:0x6adb8170 @possible_strings=[\"json\", \"plain\"], @name=\"log.format\", @value=nil, @value_is_set=false, @klass=String, @strict=true, @default=\"plain\", @validator_proc=nil>, \"http.host\"=>#<LogStash::Setting::String:0x18ec9e0b @possible_strings=[], @name=\"http.host\", @value=nil, @value_is_set=false, @klass=String, @strict=true, @default=\"127.0.0.1\", @validator_proc=nil>, \"http.port\"=>#<LogStash::Setting::PortRange:0x132bd323 @name=\"http.port\", @value=nil, @value_is_set=false, @klass=Range, @default=9600..9700, @validator_proc=#<Proc:0x25cd1037@/opt/logstash/logstash-core/lib/logstash/settings.rb:360>>, \"http.environment\"=>#<LogStash::Setting::String:0x793e0b6 @possible_strings=[], @name=\"http.environment\", @value=nil, @value_is_set=false, @klass=String, @strict=true, @default=\"production\", @validator_proc=nil>, \"queue.type\"=>#<LogStash::Setting::String:0x7b733a89 @possible_strings=[\"persisted\", \"memory\"], @name=\"queue.type\", @value=\"memory\", @value_is_set=true, @klass=String, @strict=true, @default=\"memory\", @validator_proc=nil>, \"queue.drain\"=>#<LogStash::Setting::Boolean:0x7b8dbe51 @name=\"queue.drain\", @value=nil, @value_is_set=false, @klass=Object, @default=false, @validator_proc=nil>, \"queue.page_capacity\"=>#<LogStash::Setting::Bytes:0x638367c6 @name=\"queue.page_capacity\", @value=nil, @value_is_set=false, @klass=Fixnum, @default=67108864, @validator_proc=#<Proc:0x146be4d@/opt/logstash/logstash-core/lib/logstash/settings.rb:484>>, \"queue.max_bytes\"=>#<LogStash::Setting::Bytes:0x55fe764e @name=\"queue.max_bytes\", @value=nil, @value_is_set=false, @klass=Fixnum, @default=1073741824, @validator_proc=#<Proc:0x43173596@/opt/logstash/logstash-core/lib/logstash/settings.rb:484>>, \"queue.max_events\"=>#<LogStash::Setting::Numeric:0x3bd2c765 @name=\"queue.max_events\", @value=nil, @value_is_set=false, @klass=Numeric, @default=0, @validator_proc=nil>, \"queue.checkpoint.acks\"=>#<LogStash::Setting::Numeric:0xf52b284 @name=\"queue.checkpoint.acks\", @value=nil, @value_is_set=false, @klass=Numeric, @default=1024, @validator_proc=nil>, \"queue.checkpoint.writes\"=>#<LogStash::Setting::Numeric:0x29b8e926 @name=\"queue.checkpoint.writes\", @value=nil, @value_is_set=false, @klass=Numeric, @default=1024, @validator_proc=nil>, \"queue.checkpoint.interval\"=>#<LogStash::Setting::Numeric:0x5379d202 @name=\"queue.checkpoint.interval\", @value=nil, @value_is_set=false, @klass=Numeric, @default=1000, @validator_proc=nil>, \"dead_letter_queue.enable\"=>#<LogStash::Setting::Boolean:0x7ebddee7 @name=\"dead_letter_queue.enable\", @value=nil, @value_is_set=false, @klass=Object, @default=false, @validator_proc=nil>, \"dead_letter_queue.max_bytes\"=>#<LogStash::Setting::Bytes:0x64618526 @name=\"dead_letter_queue.max_bytes\", @value=nil, @value_is_set=false, @klass=Fixnum, @default=1073741824, @validator_proc=#<Proc:0xa7ea155@/opt/logstash/logstash-core/lib/logstash/settings.rb:484>>, \"slowlog.threshold.warn\"=>#<LogStash::Setting::TimeValue:0x170a9bda @name=\"slowlog.threshold.warn\", @value=nil, @value_is_set=false, @klass=Fixnum, @default=-1, @validator_proc=nil>, \"slowlog.threshold.info\"=>#<LogStash::Setting::TimeValue:0x7d0353a @name=\"slowlog.threshold.info\", @value=nil, @value_is_set=false, @klass=Fixnum, @default=-1, @validator_proc=nil>, \"slowlog.threshold.debug\"=>#<LogStash::Setting::TimeValue:0x6ff0d853 @name=\"slowlog.threshold.debug\", @value=nil, @value_is_set=false, @klass=Fixnum, @default=-1, @validator_proc=nil>, \"slowlog.threshold.trace\"=>#<LogStash::Setting::TimeValue:0x10fbb8f1 @name=\"slowlog.threshold.trace\", @value=nil, @value_is_set=false, @klass=Fixnum, @default=-1, @validator_proc=nil>, \"keystore.classname\"=>#<LogStash::Setting::String:0x536c357f @possible_strings=[], @name=\"keystore.classname\", @value=nil, @value_is_set=false, @klass=String, @strict=true, @default=\"org.logstash.secret.store.backend.JavaKeyStore\", @validator_proc=nil>, \"keystore.file\"=>#<LogStash::Setting::String:0x5513b3cd @possible_strings=[], @name=\"keystore.file\", @value=nil, @value_is_set=false, @klass=String, @strict=false, @default=\"/opt/logstash/config/logstash.keystore\", @validator_proc=nil>, \"path.queue\"=>#<LogStash::Setting::WritableDirectory:0x3c48db7c @name=\"path.queue\", @value=nil, @value_is_set=false, @klass=String, @strict=false, @default=\"/opt/logstash/data/queue\", @validator_proc=nil>, \"path.dead_letter_queue\"=>#<LogStash::Setting::WritableDirectory:0x7139c034 @name=\"path.dead_letter_queue\", @value=nil, @value_is_set=false, @klass=String, @strict=false, @default=\"/opt/logstash/data/dead_letter_queue\", @validator_proc=nil>, \"path.settings\"=>#<LogStash::Setting::String:0x434cd4a1 @possible_strings=[], @name=\"path.settings\", @value=nil, @value_is_set=false, @klass=String, @strict=true, @default=\"/opt/logstash/config\", @validator_proc=nil>, \"path.logs\"=>#<LogStash::Setting::String:0xb898ade @possible_strings=[], @name=\"path.logs\", @value=nil, @value_is_set=false, @klass=String, @strict=true, @default=\"/opt/logstash/logs\", @validator_proc=nil>}>", :ready=><#Concurrent::AtomicBoolean:0x1ad0 value:true>, :running=><#Concurrent::AtomicBoolean:0x1ad4 value:false>, :flushing=>#<Java::JavaUtilConcurrentAtomic::AtomicBoolean:0x4ee76580>}}}
00:58:15           increases the successful reload count (FAILED - 1)
```

Fixes #9645
This commit is contained in:
Andrew Cholakian 2018-05-22 22:16:04 -05:00
parent 2c4cfeb990
commit b78d32dede

View file

@ -38,6 +38,7 @@ class LogStash::Agent
@pipeline_bus = org.logstash.plugins.pipeline.PipelineBus.new
@pipelines = java.util.concurrent.ConcurrentHashMap.new();
@converge_state_mutex = Mutex.new
@name = setting("node.name")
@http_host = setting("http.host")
@ -130,35 +131,41 @@ class LogStash::Agent
end
def converge_state_and_update
results = @source_loader.fetch
# We want to enforce that only one state converge event can happen at a time
# This is especially important in tests, where code will invoke the agent, then
# call this method directly
# TODO: Switch to an explicit queue for pending changes?
@converge_state_mutex.synchronize do
results = @source_loader.fetch
unless results.success?
if auto_reload?
logger.debug("Could not fetch the configuration to converge, will retry", :message => results.error, :retrying_in => @reload_interval)
return
else
raise "Could not fetch the configuration, message: #{results.error}"
unless results.success?
if auto_reload?
logger.debug("Could not fetch the configuration to converge, will retry", :message => results.error, :retrying_in => @reload_interval)
return
else
raise "Could not fetch the configuration, message: #{results.error}"
end
end
# We Lock any access on the pipelines, since the actions will modify the
# content of it.
converge_result = nil
pipeline_actions = resolve_actions(results.response)
converge_result = converge_state(pipeline_actions)
update_metrics(converge_result)
logger.info(
"Pipelines running",
:count => running_pipelines.size,
:running_pipelines => running_pipelines.keys,
:non_running_pipelines => non_running_pipelines.keys
) if converge_result.success? && converge_result.total > 0
dispatch_events(converge_result)
converge_result
end
# We Lock any access on the pipelines, since the actions will modify the
# content of it.
converge_result = nil
pipeline_actions = resolve_actions(results.response)
converge_result = converge_state(pipeline_actions)
update_metrics(converge_result)
logger.info(
"Pipelines running",
:count => running_pipelines.size,
:running_pipelines => running_pipelines.keys,
:non_running_pipelines => non_running_pipelines.keys
) if converge_result.success? && converge_result.total > 0
dispatch_events(converge_result)
converge_result
rescue => e
logger.error("An exception happened when converging configuration", :exception => e.class, :message => e.message, :backtrace => e.backtrace)
end