diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index 12ec1d1b6..db2cd7476 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -300,6 +300,10 @@ class LogStash::Agent @pipelines_registry.loading_pipelines end + def loaded_pipelines + @pipelines_registry.loaded_pipelines + end + def non_running_pipelines @pipelines_registry.non_running_pipelines end diff --git a/logstash-core/lib/logstash/api/commands/node.rb b/logstash-core/lib/logstash/api/commands/node.rb index cc540699e..fd2850db2 100644 --- a/logstash-core/lib/logstash/api/commands/node.rb +++ b/logstash-core/lib/logstash/api/commands/node.rb @@ -36,7 +36,8 @@ module LogStash def pipelines(options={}) pipeline_ids = service.get_shallow(:stats, :pipelines).keys pipeline_ids.each_with_object({}) do |pipeline_id, result| - result[pipeline_id] = pipeline(pipeline_id, options) + pipeline_node = pipeline(pipeline_id, options) + result[pipeline_id] = pipeline_node unless pipeline_node.empty? end rescue Instrument::MetricStore::MetricNotFound {} @@ -62,8 +63,8 @@ module LogStash metrics.merge!(extended_stats) end metrics - rescue - {} + rescue LogStash::Instrument::MetricStore::MetricNotFound + {} # empty end def os diff --git a/logstash-core/lib/logstash/api/modules/node.rb b/logstash-core/lib/logstash/api/modules/node.rb index a607726f3..039d16f24 100644 --- a/logstash-core/lib/logstash/api/modules/node.rb +++ b/logstash-core/lib/logstash/api/modules/node.rb @@ -56,7 +56,6 @@ module LogStash opts = {:graph => as_boolean(params.fetch("graph", false)), :vertices => as_boolean(params.fetch("vertices", false))} payload = node.pipelines(opts) - halt(404) if payload.empty? respond_with(:pipelines => payload ) end diff --git a/logstash-core/lib/logstash/pipelines_registry.rb b/logstash-core/lib/logstash/pipelines_registry.rb index 7d63d4e97..6032387ce 100644 --- a/logstash-core/lib/logstash/pipelines_registry.rb +++ b/logstash-core/lib/logstash/pipelines_registry.rb @@ -279,6 +279,10 @@ module LogStash select_pipelines { |state| state.loading? } end + def loaded_pipelines + select_pipelines { |state| !state.loading? } + end + # @return [Hash{String=>Pipeline}] def non_running_pipelines select_pipelines { |state| state.terminated? } diff --git a/logstash-core/spec/logstash/api/commands/node_spec.rb b/logstash-core/spec/logstash/api/commands/node_spec.rb index 4e0f6c865..5e7d81ed6 100644 --- a/logstash-core/spec/logstash/api/commands/node_spec.rb +++ b/logstash-core/spec/logstash/api/commands/node_spec.rb @@ -37,8 +37,9 @@ describe LogStash::Api::Commands::Node do :explicit_id=>false, :type=>"plugin"} } + let(:api_service) { LogStash::Api::Service.new(@agent) } subject(:report) do - factory = ::LogStash::Api::CommandFactory.new(LogStash::Api::Service.new(@agent)) + factory = ::LogStash::Api::CommandFactory.new(api_service) if report_method == :pipelines factory.build(:node).send(report_method, opts) elsif report_method == :pipeline @@ -64,6 +65,25 @@ describe LogStash::Api::Commands::Node do end end + describe "#pipelines" do + let(:report_method) { :pipelines } + it "contains the running pipelines" do + expect(report).to include(:main, :secondary) + end + context 'when the `main` pipeline throws a MetricNotFound exception' do + before(:each) do + allow(api_service).to receive(:extract_metrics).and_call_original + expect(api_service).to receive(:extract_metrics) + .with([:stats, :pipelines, :main, :config], any_args) + .and_raise(LogStash::Instrument::MetricStore::MetricNotFound) + end + + it 'does not contain the partially-constructed pipeline' do + expect(report).to include(:secondary) + expect(report).to_not include(:main) + end + end + end describe "#pipeline" do let(:report_method) { :pipeline } diff --git a/logstash-core/spec/support/helpers.rb b/logstash-core/spec/support/helpers.rb index 52f67964d..66dc3364c 100644 --- a/logstash-core/spec/support/helpers.rb +++ b/logstash-core/spec/support/helpers.rb @@ -48,14 +48,65 @@ def mock_settings(settings_values={}) settings end -def make_test_agent(settings=mock_settings) +def make_test_agent(settings=mock_settings, config_source=nil) sl = LogStash::Config::SourceLoader.new - sl.add_source(LogStash::Config::Source::Local.new(settings)) + sl.add_source(config_source || LogStash::Config::Source::Local.new(settings)) sl ::LogStash::Agent.new(settings, sl) end +def make_config_source(settings=mock_settings) + TestPipelineConfigSource.new(settings) +end + +## +# This TestPipelineConfigSource can be added to a LogStash::Config::SourceLoader +# to provide pipeline config strings directly for testing purposes. +class TestPipelineConfigSource + include LogStash::Util::Loggable + + def initialize(settings) + @settings = settings + @pipelines = {} + end + + def add_pipeline(pipeline_id, config_string, settings_overrides={}) + logger.debug("adding pipeline `#{pipeline_id}` from string `#{config_string}` with additional settings `#{settings_overrides}`") + @pipelines[pipeline_id.to_sym] = compose_pipeline_config(pipeline_id, config_string, settings_overrides) + end + + def remove_pipeline(pipeline_id) + logger.debug("removing pipeline `#{pipeline_id}`") + !!@pipelines.delete(pipeline_id.to_sym) + end + + def pipeline_configs + @pipelines.values + end + + def match? + true + end + + def config_conflict? + false + end + + private + def compose_pipeline_config(pipeline_id, config_string, pipeline_settings) + config_parts = [org.logstash.common.SourceWithMetadata.new("string", pipeline_id.to_s, config_string)] + + merged_pipeline_settings = @settings.clone.tap do |s| + s.merge_pipeline_settings('pipeline.id' => pipeline_id) + s.merge_pipeline_settings('config.string' => config_string.dup.freeze) + s.merge_pipeline_settings(pipeline_settings) + end + + org.logstash.config.ir.PipelineConfig.new(self.class, pipeline_id.to_sym, config_parts, merged_pipeline_settings) + end +end + def mock_pipeline(pipeline_id, reloadable = true, config_hash = nil) config_string = "input { stdin { id => '#{pipeline_id}' }}" settings = mock_settings("pipeline.id" => pipeline_id.to_s, diff --git a/logstash-core/spec/support/shared_contexts.rb b/logstash-core/spec/support/shared_contexts.rb index 042869df5..5eb3f16a5 100644 --- a/logstash-core/spec/support/shared_contexts.rb +++ b/logstash-core/spec/support/shared_contexts.rb @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +require 'time' + shared_context "execution_context" do let(:pipeline) { double("pipeline") } let(:pipeline_id) { :main } @@ -34,28 +36,42 @@ shared_context "execution_context" do end shared_context "api setup" do + + ## + # blocks until the condition returns true, or the limit has passed + # @return [true] if the condition was met + # @return [false] if the condition was NOT met + def block_until(limit_seconds, &condition) + deadline = Time.now + limit_seconds + loop.with_index do |_,try| + break if Time.now >= deadline + return true if condition.call + + next_sleep = [(2.0**(try))/10, 2, deadline - Time.now].min + Kernel::sleep(next_sleep) unless next_sleep <= 0 + end + # one last try + condition.call + end + before :all do clear_data_dir - settings = mock_settings - config_string = "input { generator {id => 'api-generator-pipeline' count => 100 } } output { dummyoutput {} }" - settings.set("config.string", config_string) - settings.set("config.reload.automatic", false) - @agent = make_test_agent(settings) - @agent.execute - @pipelines_registry = LogStash::PipelinesRegistry.new - pipeline_config = mock_pipeline_config(:main, "input { generator { id => '123' } } output { null {} }") - pipeline_creator = LogStash::PipelineAction::Create.new(pipeline_config, @agent.metric) - expect(pipeline_creator.execute(@agent, @pipelines_registry)).to be_truthy - pipeline_config = mock_pipeline_config(:secondary, "input { generator { id => '123' } } output { null {} }") - pipeline_creator = LogStash::PipelineAction::Create.new(pipeline_config, @agent.metric) - expect(pipeline_creator.execute(@agent, @pipelines_registry)).to be_truthy + settings = mock_settings("config.reload.automatic" => true) + config_source = make_config_source(settings) + config_source.add_pipeline('main', "input { generator {id => 'api-generator-pipeline' count => 100 } } output { dummyoutput {} }") + + @agent = make_test_agent(settings, config_source) + @agent_execution_task = Stud::Task.new { @agent.execute } + block_until(30) { @agent.loaded_pipelines.keys.include?(:main) } or fail('main pipeline did not come up') + + config_source.add_pipeline('main', "input { generator { id => '123' } } output { null {} }") + config_source.add_pipeline('secondary', "input { generator { id => '123' } } output { null {} }") + block_until(30) { ([:main, :secondary] - @agent.running_pipelines.keys).empty? } or fail('pipelines did not come up') end after :all do - @pipelines_registry.running_pipelines.each do |_, pipeline| - pipeline.shutdown - pipeline.thread.join - end + @agent_execution_task.stop! + @agent_execution_task.wait @agent.shutdown end