api: source pipelines that are fully-loaded (#14595) (#14622)

* specs: detangle out-of-band pipeline initialization

Our API tests were initializing their pipelines-to-test in an out-of-band
manner that prevented the agent from having complete knowledge of the
pipelines that were running. By providing a ConfigSource to our Agent's
SourceLoader, we can rely on the normal pipeline reload behaviour to ensure
that the agent fully-manages the pipelines in question.

* api: do not emit pipeline that is not fully-initialized

(cherry picked from commit de49eba22a)

Co-authored-by: Ry Biesemeyer <yaauie@users.noreply.github.com>
This commit is contained in:
github-actions[bot] 2022-10-11 20:53:40 -07:00 committed by GitHub
parent f6d40866a7
commit b38e374164
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 119 additions and 24 deletions

View file

@ -300,6 +300,10 @@ class LogStash::Agent
@pipelines_registry.loading_pipelines @pipelines_registry.loading_pipelines
end end
def loaded_pipelines
@pipelines_registry.loaded_pipelines
end
def non_running_pipelines def non_running_pipelines
@pipelines_registry.non_running_pipelines @pipelines_registry.non_running_pipelines
end end

View file

@ -36,7 +36,8 @@ module LogStash
def pipelines(options={}) def pipelines(options={})
pipeline_ids = service.get_shallow(:stats, :pipelines).keys pipeline_ids = service.get_shallow(:stats, :pipelines).keys
pipeline_ids.each_with_object({}) do |pipeline_id, result| pipeline_ids.each_with_object({}) do |pipeline_id, result|
result[pipeline_id] = pipeline(pipeline_id, options) pipeline_node = pipeline(pipeline_id, options)
result[pipeline_id] = pipeline_node unless pipeline_node.empty?
end end
rescue Instrument::MetricStore::MetricNotFound rescue Instrument::MetricStore::MetricNotFound
{} {}
@ -62,8 +63,8 @@ module LogStash
metrics.merge!(extended_stats) metrics.merge!(extended_stats)
end end
metrics metrics
rescue rescue LogStash::Instrument::MetricStore::MetricNotFound
{} {} # empty
end end
def os def os

View file

@ -56,7 +56,6 @@ module LogStash
opts = {:graph => as_boolean(params.fetch("graph", false)), opts = {:graph => as_boolean(params.fetch("graph", false)),
:vertices => as_boolean(params.fetch("vertices", false))} :vertices => as_boolean(params.fetch("vertices", false))}
payload = node.pipelines(opts) payload = node.pipelines(opts)
halt(404) if payload.empty?
respond_with(:pipelines => payload ) respond_with(:pipelines => payload )
end end

View file

@ -279,6 +279,10 @@ module LogStash
select_pipelines { |state| state.loading? } select_pipelines { |state| state.loading? }
end end
def loaded_pipelines
select_pipelines { |state| !state.loading? }
end
# @return [Hash{String=>Pipeline}] # @return [Hash{String=>Pipeline}]
def non_running_pipelines def non_running_pipelines
select_pipelines { |state| state.terminated? } select_pipelines { |state| state.terminated? }

View file

@ -37,8 +37,9 @@ describe LogStash::Api::Commands::Node do
:explicit_id=>false, :explicit_id=>false,
:type=>"plugin"} :type=>"plugin"}
} }
let(:api_service) { LogStash::Api::Service.new(@agent) }
subject(:report) do subject(:report) do
factory = ::LogStash::Api::CommandFactory.new(LogStash::Api::Service.new(@agent)) factory = ::LogStash::Api::CommandFactory.new(api_service)
if report_method == :pipelines if report_method == :pipelines
factory.build(:node).send(report_method, opts) factory.build(:node).send(report_method, opts)
elsif report_method == :pipeline elsif report_method == :pipeline
@ -64,6 +65,25 @@ describe LogStash::Api::Commands::Node do
end end
end end
describe "#pipelines" do
let(:report_method) { :pipelines }
it "contains the running pipelines" do
expect(report).to include(:main, :secondary)
end
context 'when the `main` pipeline throws a MetricNotFound exception' do
before(:each) do
allow(api_service).to receive(:extract_metrics).and_call_original
expect(api_service).to receive(:extract_metrics)
.with([:stats, :pipelines, :main, :config], any_args)
.and_raise(LogStash::Instrument::MetricStore::MetricNotFound)
end
it 'does not contain the partially-constructed pipeline' do
expect(report).to include(:secondary)
expect(report).to_not include(:main)
end
end
end
describe "#pipeline" do describe "#pipeline" do
let(:report_method) { :pipeline } let(:report_method) { :pipeline }

View file

@ -48,14 +48,65 @@ def mock_settings(settings_values={})
settings settings
end end
def make_test_agent(settings=mock_settings) def make_test_agent(settings=mock_settings, config_source=nil)
sl = LogStash::Config::SourceLoader.new sl = LogStash::Config::SourceLoader.new
sl.add_source(LogStash::Config::Source::Local.new(settings)) sl.add_source(config_source || LogStash::Config::Source::Local.new(settings))
sl sl
::LogStash::Agent.new(settings, sl) ::LogStash::Agent.new(settings, sl)
end end
def make_config_source(settings=mock_settings)
TestPipelineConfigSource.new(settings)
end
##
# This TestPipelineConfigSource can be added to a LogStash::Config::SourceLoader
# to provide pipeline config strings directly for testing purposes.
class TestPipelineConfigSource
include LogStash::Util::Loggable
def initialize(settings)
@settings = settings
@pipelines = {}
end
def add_pipeline(pipeline_id, config_string, settings_overrides={})
logger.debug("adding pipeline `#{pipeline_id}` from string `#{config_string}` with additional settings `#{settings_overrides}`")
@pipelines[pipeline_id.to_sym] = compose_pipeline_config(pipeline_id, config_string, settings_overrides)
end
def remove_pipeline(pipeline_id)
logger.debug("removing pipeline `#{pipeline_id}`")
!!@pipelines.delete(pipeline_id.to_sym)
end
def pipeline_configs
@pipelines.values
end
def match?
true
end
def config_conflict?
false
end
private
def compose_pipeline_config(pipeline_id, config_string, pipeline_settings)
config_parts = [org.logstash.common.SourceWithMetadata.new("string", pipeline_id.to_s, config_string)]
merged_pipeline_settings = @settings.clone.tap do |s|
s.merge_pipeline_settings('pipeline.id' => pipeline_id)
s.merge_pipeline_settings('config.string' => config_string.dup.freeze)
s.merge_pipeline_settings(pipeline_settings)
end
org.logstash.config.ir.PipelineConfig.new(self.class, pipeline_id.to_sym, config_parts, merged_pipeline_settings)
end
end
def mock_pipeline(pipeline_id, reloadable = true, config_hash = nil) def mock_pipeline(pipeline_id, reloadable = true, config_hash = nil)
config_string = "input { stdin { id => '#{pipeline_id}' }}" config_string = "input { stdin { id => '#{pipeline_id}' }}"
settings = mock_settings("pipeline.id" => pipeline_id.to_s, settings = mock_settings("pipeline.id" => pipeline_id.to_s,

View file

@ -15,6 +15,8 @@
# specific language governing permissions and limitations # specific language governing permissions and limitations
# under the License. # under the License.
require 'time'
shared_context "execution_context" do shared_context "execution_context" do
let(:pipeline) { double("pipeline") } let(:pipeline) { double("pipeline") }
let(:pipeline_id) { :main } let(:pipeline_id) { :main }
@ -34,28 +36,42 @@ shared_context "execution_context" do
end end
shared_context "api setup" do shared_context "api setup" do
##
# blocks until the condition returns true, or the limit has passed
# @return [true] if the condition was met
# @return [false] if the condition was NOT met
def block_until(limit_seconds, &condition)
deadline = Time.now + limit_seconds
loop.with_index do |_,try|
break if Time.now >= deadline
return true if condition.call
next_sleep = [(2.0**(try))/10, 2, deadline - Time.now].min
Kernel::sleep(next_sleep) unless next_sleep <= 0
end
# one last try
condition.call
end
before :all do before :all do
clear_data_dir clear_data_dir
settings = mock_settings settings = mock_settings("config.reload.automatic" => true)
config_string = "input { generator {id => 'api-generator-pipeline' count => 100 } } output { dummyoutput {} }" config_source = make_config_source(settings)
settings.set("config.string", config_string) config_source.add_pipeline('main', "input { generator {id => 'api-generator-pipeline' count => 100 } } output { dummyoutput {} }")
settings.set("config.reload.automatic", false)
@agent = make_test_agent(settings) @agent = make_test_agent(settings, config_source)
@agent.execute @agent_execution_task = Stud::Task.new { @agent.execute }
@pipelines_registry = LogStash::PipelinesRegistry.new block_until(30) { @agent.loaded_pipelines.keys.include?(:main) } or fail('main pipeline did not come up')
pipeline_config = mock_pipeline_config(:main, "input { generator { id => '123' } } output { null {} }")
pipeline_creator = LogStash::PipelineAction::Create.new(pipeline_config, @agent.metric) config_source.add_pipeline('main', "input { generator { id => '123' } } output { null {} }")
expect(pipeline_creator.execute(@agent, @pipelines_registry)).to be_truthy config_source.add_pipeline('secondary', "input { generator { id => '123' } } output { null {} }")
pipeline_config = mock_pipeline_config(:secondary, "input { generator { id => '123' } } output { null {} }") block_until(30) { ([:main, :secondary] - @agent.running_pipelines.keys).empty? } or fail('pipelines did not come up')
pipeline_creator = LogStash::PipelineAction::Create.new(pipeline_config, @agent.metric)
expect(pipeline_creator.execute(@agent, @pipelines_registry)).to be_truthy
end end
after :all do after :all do
@pipelines_registry.running_pipelines.each do |_, pipeline| @agent_execution_task.stop!
pipeline.shutdown @agent_execution_task.wait
pipeline.thread.join
end
@agent.shutdown @agent.shutdown
end end