From c6795731f15f884faeaf67ec2984d808d7e44e18 Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 14 Apr 2020 18:01:44 +0200 Subject: [PATCH] Backport to 7.x of PR #11824 Refactor: move PipelineConfig from Ruby to Java Reimplement the Ruby class PipelinceConfig in Java trying to keep the method signatures to limit the changes in client code, this is a step of other that intend to move all the configuration code in Java language. Having all that code in Java unlock some reasoning about how to better implement it and probably an improvement in performance during process startup. Moved also the spec into a JUnit and fixed here and there the failing tests Closes: #11824 --- logstash-core/build.gradle | 2 + logstash-core/lib/logstash/agent.rb | 1 + .../lib/logstash/config/pipeline_config.rb | 77 +------ .../lib/logstash/config/source/local.rb | 2 +- .../lib/logstash/config/source/modules.rb | 3 +- .../lib/logstash/config/source_loader.rb | 1 - .../lib/logstash/pipeline_action/create.rb | 2 +- .../lib/logstash/pipeline_action/reload.rb | 2 +- .../lib/logstash/pipelines_registry.rb | 2 +- logstash-core/lib/logstash/state_resolver.rb | 2 +- logstash-core/spec/logstash/agent_spec.rb | 3 +- .../logstash/config/pipeline_config_spec.rb | 166 --------------- .../logstash/config/source_loader_spec.rb | 2 +- logstash-core/spec/logstash/pipeline_spec.rb | 1 + .../spec/logstash/pipelines_registry_spec.rb | 4 +- .../spec/logstash/state_resolver_spec.rb | 1 - logstash-core/spec/support/helpers.rb | 2 +- logstash-core/spec/support/matchers.rb | 1 - .../spec/support/pipeline/pipeline_helpers.rb | 2 +- .../logstash/config/ir/ConfigCompiler.java | 4 +- .../logstash/config/ir/PipelineConfig.java | 177 ++++++++++++++++ .../execution/AbstractPipelineExt.java | 12 +- .../config/ir/ConfigCompilerTest.java | 14 +- .../config/ir/PipelineConfigTest.java | 196 ++++++++++++++++++ .../plugins/PluginFactoryExtTest.java | 9 +- .../config_management/elasticsearch_source.rb | 3 +- x-pack/lib/monitoring/monitoring.rb | 3 +- .../elasticsearch_source_spec.rb | 8 +- .../modules/azure/filters/azure_event_spec.rb | 2 + .../metrics/state_event_factory_spec.rb | 2 +- .../metrics/stats_event_factory_spec.rb | 1 + .../internal_pipeline_source_spec.rb | 3 +- 32 files changed, 415 insertions(+), 295 deletions(-) delete mode 100644 logstash-core/spec/logstash/config/pipeline_config_spec.rb create mode 100644 logstash-core/src/main/java/org/logstash/config/ir/PipelineConfig.java create mode 100644 logstash-core/src/test/java/org/logstash/config/ir/PipelineConfigTest.java diff --git a/logstash-core/build.gradle b/logstash-core/build.gradle index f01e2271d..4d308557b 100644 --- a/logstash-core/build.gradle +++ b/logstash-core/build.gradle @@ -92,6 +92,7 @@ tasks.register("javaTests", Test) { exclude '/org/logstash/config/ir/ConfigCompilerTest.class' exclude '/org/logstash/config/ir/CompiledPipelineTest.class' exclude '/org/logstash/config/ir/EventConditionTest.class' + exclude '/org/logstash/config/ir/PipelineConfigTest.class' exclude '/org/logstash/config/ir/compiler/OutputDelegatorTest.class' exclude '/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.class' exclude '/org/logstash/plugins/NamespacedMetricImplTest.class' @@ -107,6 +108,7 @@ tasks.register("rubyTests", Test) { include '/org/logstash/config/ir/ConfigCompilerTest.class' include '/org/logstash/config/ir/CompiledPipelineTest.class' include '/org/logstash/config/ir/EventConditionTest.class' + include '/org/logstash/config/ir/PipelineConfigTest.class' include '/org/logstash/config/ir/compiler/OutputDelegatorTest.class' include '/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.class' include '/org/logstash/plugins/NamespacedMetricImplTest.class' diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index 3ecd47e16..d8725146f 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -22,6 +22,7 @@ require "logstash/instrument/periodic_pollers" require "logstash/pipeline" require "logstash/webserver" require "logstash/config/source_loader" +require "logstash/config/pipeline_config" require "logstash/pipeline_action" require "logstash/state_resolver" require "logstash/pipelines_registry" diff --git a/logstash-core/lib/logstash/config/pipeline_config.rb b/logstash-core/lib/logstash/config/pipeline_config.rb index 66ca1aa4a..32194bf89 100644 --- a/logstash-core/lib/logstash/config/pipeline_config.rb +++ b/logstash-core/lib/logstash/config/pipeline_config.rb @@ -15,77 +15,6 @@ # specific language governing permissions and limitations # under the License. -require "digest" - -module LogStash module Config - class PipelineConfig - include LogStash::Util::Loggable - - LineToSource = Struct.new("LineToSource", :bounds, :source) - - attr_reader :source, :pipeline_id, :config_parts, :settings, :read_at - - def initialize(source, pipeline_id, config_parts, settings) - @source = source - @pipeline_id = pipeline_id - # We can't use Array() since config_parts may be a java object! - config_parts_array = config_parts.is_a?(Array) ? config_parts : [config_parts] - @config_parts = config_parts_array.sort_by { |config_part| [config_part.protocol.to_s, config_part.id] } - @settings = settings - @read_at = Time.now - end - - def config_hash - @config_hash ||= Digest::SHA1.hexdigest(config_string) - end - - def config_string - @config_string = config_parts.collect(&:text).join("\n") - end - - def system? - @settings.get("pipeline.system") - end - - def ==(other) - config_hash == other.config_hash && pipeline_id == other.pipeline_id && settings == other.settings - end - - def display_debug_information - logger.debug("-------- Logstash Config ---------") - logger.debug("Config from source", :source => source, :pipeline_id => pipeline_id) - - config_parts.each do |config_part| - logger.debug("Config string", :protocol => config_part.protocol, :id => config_part.id) - logger.debug("\n\n#{config_part.text}") - end - logger.debug("Merged config") - logger.debug("\n\n#{config_string}") - end - - def lookup_source(global_line_number, source_column) - res = source_references.find { |line_to_source| line_to_source.bounds.include? global_line_number } - if res == nil - raise IndexError, "can't find the config segment related to line #{global_line_number}" - end - swm = res.source - SourceWithMetadata.new(swm.getProtocol(), swm.getId(), global_line_number + 1 - res.bounds.begin, source_column, swm.getText()) - end - - private - def source_references - @source_refs ||= begin - offset = 0 - source_refs = [] - config_parts.each do |config_part| - #line numbers starts from 1 in text files - lines_range = (config_part.getLine() + offset + 1..config_part.getLinesCount() + offset) - source_segment = LineToSource.new(lines_range, config_part) - source_refs << source_segment - offset += config_part.getLinesCount() - end - source_refs.freeze - end - end - end -end end +module LogStash::Config + java_import org.logstash.config.ir.PipelineConfig +end \ No newline at end of file diff --git a/logstash-core/lib/logstash/config/source/local.rb b/logstash-core/lib/logstash/config/source/local.rb index d23ae69de..dc43ba287 100644 --- a/logstash-core/lib/logstash/config/source/local.rb +++ b/logstash-core/lib/logstash/config/source/local.rb @@ -212,7 +212,7 @@ module LogStash module Config module Source return [] if config_parts.empty? - [PipelineConfig.new(self.class, @settings.get("pipeline.id").to_sym, config_parts, @settings)] + [org.logstash.config.ir.PipelineConfig.new(self.class, @settings.get("pipeline.id").to_sym, config_parts, @settings)] end def automatic_reload_with_config_string? diff --git a/logstash-core/lib/logstash/config/source/modules.rb b/logstash-core/lib/logstash/config/source/modules.rb index 9bddbd977..47794465c 100644 --- a/logstash-core/lib/logstash/config/source/modules.rb +++ b/logstash-core/lib/logstash/config/source/modules.rb @@ -17,7 +17,6 @@ require "logstash/config/source/base" require "logstash/config/modules_common" -require "logstash/config/pipeline_config" module LogStash module Config module Source class Modules < Base @@ -29,7 +28,7 @@ module LogStash module Config module Source pipelines = LogStash::Config::ModulesCommon.pipeline_configs(@settings) pipelines.map do |hash| - PipelineConfig.new(self, hash["pipeline_id"].to_sym, + org.logstash.config.ir.PipelineConfig.new(self.class, hash["pipeline_id"].to_sym, org.logstash.common.SourceWithMetadata.new("module", hash["alt_name"], 0, 0, hash["config_string"]), hash["settings"]) end diff --git a/logstash-core/lib/logstash/config/source_loader.rb b/logstash-core/lib/logstash/config/source_loader.rb index 11959eb2a..d7c6d07a3 100644 --- a/logstash-core/lib/logstash/config/source_loader.rb +++ b/logstash-core/lib/logstash/config/source_loader.rb @@ -63,7 +63,6 @@ module LogStash module Config sources do |source| sources_loaders << source if source.match? end - if sources_loaders.empty? # This shouldn't happen with the settings object or with any external plugins. # but lets add a guard so we fail fast. diff --git a/logstash-core/lib/logstash/pipeline_action/create.rb b/logstash-core/lib/logstash/pipeline_action/create.rb index 55c8c6ecf..3a3531e15 100644 --- a/logstash-core/lib/logstash/pipeline_action/create.rb +++ b/logstash-core/lib/logstash/pipeline_action/create.rb @@ -34,7 +34,7 @@ module LogStash module PipelineAction end def pipeline_id - @pipeline_config.pipeline_id + @pipeline_config.pipeline_id.to_sym end # Make sure we execution system pipeline like the monitoring diff --git a/logstash-core/lib/logstash/pipeline_action/reload.rb b/logstash-core/lib/logstash/pipeline_action/reload.rb index 78001e827..4dc9bc77c 100644 --- a/logstash-core/lib/logstash/pipeline_action/reload.rb +++ b/logstash-core/lib/logstash/pipeline_action/reload.rb @@ -29,7 +29,7 @@ module LogStash module PipelineAction end def pipeline_id - @pipeline_config.pipeline_id + @pipeline_config.pipeline_id.to_sym end def to_s diff --git a/logstash-core/lib/logstash/pipelines_registry.rb b/logstash-core/lib/logstash/pipelines_registry.rb index 433a397e6..cb9e37527 100644 --- a/logstash-core/lib/logstash/pipelines_registry.rb +++ b/logstash-core/lib/logstash/pipelines_registry.rb @@ -142,7 +142,7 @@ module LogStash # @param pipeline_id [String, Symbol] the pipeline id # @return [Pipeline] the pipeline object or nil if none for pipeline_id def get_pipeline(pipeline_id) - state = @states.get(pipeline_id) + state = @states.get(pipeline_id.to_sym) state.nil? ? nil : state.pipeline end diff --git a/logstash-core/lib/logstash/state_resolver.rb b/logstash-core/lib/logstash/state_resolver.rb index 72e177978..386921b1f 100644 --- a/logstash-core/lib/logstash/state_resolver.rb +++ b/logstash-core/lib/logstash/state_resolver.rb @@ -41,7 +41,7 @@ module LogStash end end - configured_pipelines = pipeline_configs.collect(&:pipeline_id) + configured_pipelines = pipeline_configs.map { |config| config.pipeline_id.to_sym } # If one of the running pipeline is not in the pipeline_configs, we assume that we need to # stop it. diff --git a/logstash-core/spec/logstash/agent_spec.rb b/logstash-core/spec/logstash/agent_spec.rb index d69c8671b..f49cb7c5a 100644 --- a/logstash-core/spec/logstash/agent_spec.rb +++ b/logstash-core/spec/logstash/agent_spec.rb @@ -18,7 +18,6 @@ require "spec_helper" require "stud/temporary" require "logstash/inputs/generator" -require "logstash/config/pipeline_config" require "logstash/config/source/local" require_relative "../support/mocks_classes" require "fileutils" @@ -85,7 +84,7 @@ describe LogStash::Agent do it "should delegate settings to new pipeline" do expect(LogStash::JavaPipeline).to receive(:new) do |arg1, arg2| - expect(arg1).to eq(config_string) + expect(arg1.to_s).to eq(config_string) expect(arg2.to_hash).to include(agent_args) end subject.converge_state_and_update diff --git a/logstash-core/spec/logstash/config/pipeline_config_spec.rb b/logstash-core/spec/logstash/config/pipeline_config_spec.rb deleted file mode 100644 index a7011b372..000000000 --- a/logstash-core/spec/logstash/config/pipeline_config_spec.rb +++ /dev/null @@ -1,166 +0,0 @@ -# Licensed to Elasticsearch B.V. under one or more contributor -# license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright -# ownership. Elasticsearch B.V. licenses this file to you under -# the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -require "logstash/config/pipeline_config" -require "logstash/config/source/local" - -describe LogStash::Config::PipelineConfig do - let(:source) { LogStash::Config::Source::Local } - let(:pipeline_id) { :main } - let(:ordered_config_parts) do - [ - org.logstash.common.SourceWithMetadata.new("file", "/tmp/1", 0, 0, "input { generator1 }"), - org.logstash.common.SourceWithMetadata.new("file", "/tmp/2", 0, 0, "input { generator2 }"), - org.logstash.common.SourceWithMetadata.new("file", "/tmp/3", 0, 0, "input { generator3 }"), - org.logstash.common.SourceWithMetadata.new("file", "/tmp/4", 0, 0, "input { generator4 }"), - org.logstash.common.SourceWithMetadata.new("file", "/tmp/5", 0, 0, "input { generator5 }"), - org.logstash.common.SourceWithMetadata.new("file", "/tmp/6", 0, 0, "input { generator6 }"), - org.logstash.common.SourceWithMetadata.new("string", "config_string", 0, 0, "input { generator1 }"), - ] - end - - let(:unordered_config_parts) { ordered_config_parts.shuffle } - let(:settings) { LogStash::SETTINGS } - - subject { described_class.new(source, pipeline_id, unordered_config_parts, settings) } - - it "returns the source" do - expect(subject.source).to eq(source) - end - - it "returns the pipeline id" do - expect(subject.pipeline_id).to eq(pipeline_id) - end - - it "returns the sorted config parts" do - expect(subject.config_parts).to eq(ordered_config_parts) - end - - it "returns the config_hash" do - expect(subject.config_hash).not_to be_nil - end - - it "returns the merged `ConfigPart#config_string`" do - expect(subject.config_string).to eq(ordered_config_parts.collect(&:text).join("\n")) - end - - it "records when the config was read" do - expect(subject.read_at).to be <= Time.now - end - - it "does object equality on config_hash and pipeline_id" do - another_exact_pipeline = described_class.new(source, pipeline_id, ordered_config_parts, settings) - expect(subject).to eq(another_exact_pipeline) - - not_matching_pipeline = described_class.new(source, pipeline_id, [], settings) - expect(subject).not_to eq(not_matching_pipeline) - - not_same_pipeline_id = described_class.new(source, :another_pipeline, unordered_config_parts, settings) - expect(subject).not_to eq(not_same_pipeline_id) - end - - describe "#system?" do - context "when the pipeline is a system pipeline" do - let(:settings) { mock_settings({ "pipeline.system" => true })} - - it "returns true if the pipeline is a system pipeline" do - expect(subject.system?).to be_truthy - end - end - - context "when is not a system pipeline" do - it "returns false if the pipeline is not a system pipeline" do - expect(subject.system?).to be_falsey - end - end - end - - describe "source and line remapping" do - context "when pipeline is constructed from single file single line" do - let (:pipeline_conf_string) { 'input { generator1 }' } - subject { described_class.new(source, pipeline_id, [org.logstash.common.SourceWithMetadata.new("file", "/tmp/1", 0, 0, pipeline_conf_string)], settings) } - it "return the same line of the queried" do - expect(subject.lookup_source(1, 0).getLine()).to eq(1) - end - end - - context "when pipeline is constructed from single file" do - let (:pipeline_conf_string) { 'input { - generator1 - }' } - subject { described_class.new(source, pipeline_id, [org.logstash.common.SourceWithMetadata.new("file", "/tmp/1", 0, 0, pipeline_conf_string)], settings) } - - it "return the same line of the queried" do - expect(subject.lookup_source(1, 0).getLine()).to eq(1) - expect(subject.lookup_source(2, 0).getLine()).to eq(2) - end - - it "throw exception if line is out of bound" do - expect { subject.lookup_source(100, -1) }.to raise_exception(IndexError) - end - end - - context "when pipeline is constructed from multiple files" do - let (:pipeline_conf_string_part1) { 'input { - generator1 - }' } - let (:pipeline_conf_string_part2) { 'output { - stdout - }' } - let(:merged_config_parts) do - [ - org.logstash.common.SourceWithMetadata.new("file", "/tmp/input", 0, 0, pipeline_conf_string_part1), - org.logstash.common.SourceWithMetadata.new("file", "/tmp/output", 0, 0, pipeline_conf_string_part2) - ] - end - subject { described_class.new(source, pipeline_id, merged_config_parts, settings) } - - it "return the line of first segment" do - expect(subject.lookup_source(2, 0).getLine()).to eq(2) - expect(subject.lookup_source(2, 0).getId()).to eq("/tmp/input") - end - - it "return the line of second segment" do - expect(subject.lookup_source(4, 0).getLine()).to eq(1) - expect(subject.lookup_source(4, 0).getId()).to eq("/tmp/output") - end - - it "throw exception if line is out of bound" do - expect { subject.lookup_source(100, 0) }.to raise_exception(IndexError) - end - end - - context "when pipeline is constructed from multiple files and the first has trailing newline" do - let (:pipeline_conf_string_part1) { "input {\n generator1\n}\n" } - let (:pipeline_conf_string_part2) { 'output { - stdout - }' } - let(:merged_config_parts) do - [ - org.logstash.common.SourceWithMetadata.new("file", "/tmp/input", 0, 0, pipeline_conf_string_part1), - org.logstash.common.SourceWithMetadata.new("file", "/tmp/output", 0, 0, pipeline_conf_string_part2) - ] - end - subject { described_class.new(source, pipeline_id, merged_config_parts, settings) } - - it "shouldn't slide the mapping of subsequent" do - expect(subject.lookup_source(4, 0).getLine()).to eq(1) - expect(subject.lookup_source(4, 0).getId()).to eq("/tmp/output") - end - end - end -end diff --git a/logstash-core/spec/logstash/config/source_loader_spec.rb b/logstash-core/spec/logstash/config/source_loader_spec.rb index 309900ee8..87d812d9f 100644 --- a/logstash-core/spec/logstash/config/source_loader_spec.rb +++ b/logstash-core/spec/logstash/config/source_loader_spec.rb @@ -21,7 +21,7 @@ require_relative "../../support/helpers" def temporary_pipeline_config(id, source, reader = "random_reader") config_part = org.logstash.common.SourceWithMetadata.new("local", "...", 0, 0, "input {} output {}") - LogStash::Config::PipelineConfig.new(source, id, [config_part], LogStash::SETTINGS) + org.logstash.config.ir.PipelineConfig.new(source, id.to_sym, [config_part], LogStash::SETTINGS) end class DummySource < LogStash::Config::Source::Base diff --git a/logstash-core/spec/logstash/pipeline_spec.rb b/logstash-core/spec/logstash/pipeline_spec.rb index cf21e1136..d1058e762 100644 --- a/logstash-core/spec/logstash/pipeline_spec.rb +++ b/logstash-core/spec/logstash/pipeline_spec.rb @@ -22,6 +22,7 @@ require_relative "../support/mocks_classes" require_relative "../support/helpers" require "stud/try" require 'timeout' +require 'logstash/config/pipeline_config' class DummyInput < LogStash::Inputs::Base config_name "dummyinput" diff --git a/logstash-core/spec/logstash/pipelines_registry_spec.rb b/logstash-core/spec/logstash/pipelines_registry_spec.rb index 7a7183544..5a36b6332 100644 --- a/logstash-core/spec/logstash/pipelines_registry_spec.rb +++ b/logstash-core/spec/logstash/pipelines_registry_spec.rb @@ -20,9 +20,9 @@ require "logstash/pipelines_registry" describe LogStash::PipelinesRegistry do - let(:pipeline_id) { "test" } + let(:pipeline_id) { "test".to_sym } let(:pipeline) { double("Pipeline") } - let (:logger) { double("Logger") } + let(:logger) { double("Logger") } context "at object creation" do it "should be empty" do diff --git a/logstash-core/spec/logstash/state_resolver_spec.rb b/logstash-core/spec/logstash/state_resolver_spec.rb index 0d301fb16..8775bc3ac 100644 --- a/logstash-core/spec/logstash/state_resolver_spec.rb +++ b/logstash-core/spec/logstash/state_resolver_spec.rb @@ -19,7 +19,6 @@ require "spec_helper" require_relative "../support/helpers" require_relative "../support/matchers" require "logstash/state_resolver" -require "logstash/config/pipeline_config" require "logstash/pipeline" require "ostruct" require "digest" diff --git a/logstash-core/spec/support/helpers.rb b/logstash-core/spec/support/helpers.rb index 9d0e77c9d..52f67964d 100644 --- a/logstash-core/spec/support/helpers.rb +++ b/logstash-core/spec/support/helpers.rb @@ -86,7 +86,7 @@ def mock_pipeline_config(pipeline_id, config_string = nil, settings = {}) config_part = org.logstash.common.SourceWithMetadata.new("config_string", "config_string", 0, 0, config_string) - LogStash::Config::PipelineConfig.new(LogStash::Config::Source::Local, pipeline_id, config_part, settings) + org.logstash.config.ir.PipelineConfig.new(LogStash::Config::Source::Local, pipeline_id.to_sym, [config_part], settings) end def start_agent(agent) diff --git a/logstash-core/spec/support/matchers.rb b/logstash-core/spec/support/matchers.rb index b87f08c17..71a37cd00 100644 --- a/logstash-core/spec/support/matchers.rb +++ b/logstash-core/spec/support/matchers.rb @@ -17,7 +17,6 @@ require "rspec" require "rspec/expectations" -require "logstash/config/pipeline_config" require "stud/try" RSpec::Matchers.define :be_a_metric_event do |namespace, type, *args| diff --git a/logstash-core/spec/support/pipeline/pipeline_helpers.rb b/logstash-core/spec/support/pipeline/pipeline_helpers.rb index 0fccded8a..d6ccbddf0 100644 --- a/logstash-core/spec/support/pipeline/pipeline_helpers.rb +++ b/logstash-core/spec/support/pipeline/pipeline_helpers.rb @@ -76,7 +76,7 @@ module PipelineHelpers let(:pipeline) do settings.set_value("queue.drain", true) LogStash::JavaPipeline.new( - LogStash::Config::PipelineConfig.new( + org.logstash.config.ir.PipelineConfig.new( LogStash::Config::Source::Local, :main, SourceWithMetadata.new( "config_string", "config_string", diff --git a/logstash-core/src/main/java/org/logstash/config/ir/ConfigCompiler.java b/logstash-core/src/main/java/org/logstash/config/ir/ConfigCompiler.java index de7e3797e..f94a3ca47 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/ConfigCompiler.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/ConfigCompiler.java @@ -53,9 +53,9 @@ public final class ConfigCompiler { * @throws InvalidIRException if the the configuration contains errors */ @SuppressWarnings("unchecked") - public static PipelineIR configToPipelineIR(final @SuppressWarnings("rawtypes") RubyArray sourcesWithMetadata, + public static PipelineIR configToPipelineIR(final List sourcesWithMetadata, final boolean supportEscapes) throws InvalidIRException { - return compileSources((List) sourcesWithMetadata, supportEscapes); + return compileSources(sourcesWithMetadata, supportEscapes); } public static PipelineIR compileSources(List sourcesWithMetadata, boolean supportEscapes) throws InvalidIRException { diff --git a/logstash-core/src/main/java/org/logstash/config/ir/PipelineConfig.java b/logstash-core/src/main/java/org/logstash/config/ir/PipelineConfig.java new file mode 100644 index 000000000..c5ab1b00d --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/config/ir/PipelineConfig.java @@ -0,0 +1,177 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.config.ir; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jruby.*; +import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.common.IncompleteSourceWithMetadataException; +import org.logstash.common.SourceWithMetadata; + +import java.time.LocalDateTime; +import java.util.*; +import java.util.stream.Collectors; + +import static org.logstash.RubyUtil.RUBY; + +public final class PipelineConfig { + + private static class LineToSource { + private final int startLine; + private final int endLine; + private final SourceWithMetadata source; + + LineToSource(int startLine, int endLine, SourceWithMetadata source) { + this.startLine = startLine; + this.endLine = endLine; + this.source = source; + } + + boolean includeLine(int lineNumber) { + return startLine <= lineNumber && lineNumber <= endLine; + } + } + + private static final Logger logger = LogManager.getLogger(PipelineConfig.class); + + private RubyClass source; + private String pipelineId; + private List confParts; + private RubyObject settings; + private LocalDateTime readAt; + private String configHash; + private String configString; + private List sourceRefs; + + @SuppressWarnings({"rawtypes", "unchecked"}) + public PipelineConfig(RubyClass source, RubySymbol pipelineId, RubyObject uncastedConfigParts, RubyObject logstashSettings) { + IRubyObject uncasted = uncastedConfigParts.checkArrayType(); + final RubyArray configParts = !uncasted.isNil() ? + (RubyArray) uncasted : + RubyArray.newArray(RUBY, uncastedConfigParts); + + this.source = source; + this.pipelineId = pipelineId.toString(); + SourceWithMetadata[] castedConfigParts = (SourceWithMetadata[]) configParts.toJava(SourceWithMetadata[].class); + List confParts = Arrays.asList(castedConfigParts); + confParts.sort(Comparator.comparing(SourceWithMetadata::getProtocol) + .thenComparing(SourceWithMetadata::getId)); + this.confParts = confParts; + this.settings = logstashSettings; + this.readAt = LocalDateTime.now(); + } + + public RubyClass getSource() { + return source; + } + + public String getPipelineId() { + return pipelineId; + } + + public List getConfigParts() { + return confParts; + } + + public LocalDateTime getReadAt() { + return readAt; + } + + public RubyObject getSettings() { + return settings; + } + + public String configHash() { + if (configHash == null) { + configHash = DigestUtils.sha1Hex(configString()); + } + return configHash; + } + + public String configString() { + this.configString = confParts.stream().map(SourceWithMetadata::getText).collect(Collectors.joining("\n")); + return this.configString; + } + + public boolean isSystem() { + return this.settings.callMethod(RUBY.getCurrentContext(), "get_value", + RubyString.newString(RUBY, "pipeline.system")) + .isTrue(); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof PipelineConfig)) { + return false; + } + PipelineConfig cother = (PipelineConfig) other; + return configHash().equals(cother.configHash()) && + this.pipelineId.equals(cother.pipelineId) && + this.settings.eql(cother.settings); + } + + @Override + public int hashCode() { + return this.configHash().hashCode(); + } + + public void displayDebugInformation() { + logger.debug("-------- Logstash Config ---------"); + logger.debug("Config from source, source: {}, pipeline_id:: {}", source, pipelineId); + + for (SourceWithMetadata configPart : this.confParts) { + logger.debug("Config string, protocol: {}, id: {}", configPart.getProtocol(), configPart.getId()); + logger.debug("\n\n{}", configPart.getText()); + } + logger.debug("Merged config"); + logger.debug("\n\n{}", this.configString()); + } + + public SourceWithMetadata lookupSource(int globalLineNumber, int sourceColumn) + throws IncompleteSourceWithMetadataException { + LineToSource lts = this.sourceReferences().stream() + .filter(lts1 -> lts1.includeLine(globalLineNumber)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("can't find the config segment related to line " + globalLineNumber)); + return new SourceWithMetadata(lts.source.getProtocol(), lts.source.getId(), + globalLineNumber + 1 - lts.startLine, sourceColumn, lts.source.getText()); + } + + private List sourceReferences() { + if (this.sourceRefs == null) { + int offset = 0; + List sourceRefs = new ArrayList<>(); + + for (SourceWithMetadata configPart : confParts) { + //line numbers starts from 1 in text files + int startLine = configPart.getLine() + offset + 1; + int endLine = configPart.getLinesCount() + offset; + LineToSource sourceSegment = new LineToSource(startLine, endLine, configPart); + sourceRefs.add(sourceSegment); + offset += configPart.getLinesCount(); + } + this.sourceRefs = Collections.unmodifiableList(sourceRefs); + } + return this.sourceRefs; + } + +} diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index 49ce69a52..148378466 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -50,10 +50,10 @@ import org.logstash.ackedqueue.QueueFactoryExt; import org.logstash.ackedqueue.ext.JRubyAckedQueueExt; import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt; import org.logstash.common.DeadLetterQueueFactory; -import org.logstash.common.IncompleteSourceWithMetadataException; import org.logstash.common.SourceWithMetadata; import org.logstash.config.ir.ConfigCompiler; import org.logstash.config.ir.InvalidIRException; +import org.logstash.config.ir.PipelineConfig; import org.logstash.config.ir.PipelineIR; import org.logstash.ext.JRubyAbstractQueueWriteClientExt; import org.logstash.ext.JRubyWrappedWriteClientExt; @@ -115,7 +115,7 @@ public class AbstractPipelineExt extends RubyBasicObject { private RubyString configString; @SuppressWarnings("rawtypes") - private RubyArray configParts; + private List configParts; private RubyString configHash; @@ -151,7 +151,7 @@ public class AbstractPipelineExt extends RubyBasicObject { ); pipelineSettings = pipelineConfig; configString = (RubyString) pipelineSettings.callMethod(context, "config_string"); - configParts = (RubyArray) pipelineSettings.callMethod(context, "config_parts"); + configParts = pipelineSettings.toJava(PipelineConfig.class).getConfigParts(); configHash = context.runtime.newString( Hex.encodeHexString( MessageDigest.getInstance("SHA1").digest(configString.getBytes()) @@ -397,10 +397,8 @@ public class AbstractPipelineExt extends RubyBasicObject { @JRubyMethod(name = "pipeline_source_details", visibility = Visibility.PROTECTED) @SuppressWarnings("rawtypes") public RubyArray getPipelineSourceDetails(final ThreadContext context) { - RubyArray res = configParts; - List pipelineSources = new ArrayList<>(res.size()); - for (IRubyObject part : res.toJavaArray()) { - SourceWithMetadata sourceWithMetadata = part.toJava(SourceWithMetadata.class); + List pipelineSources = new ArrayList<>(configParts.size()); + for (SourceWithMetadata sourceWithMetadata : configParts) { String protocol = sourceWithMetadata.getProtocol(); switch (protocol) { case "string": diff --git a/logstash-core/src/test/java/org/logstash/config/ir/ConfigCompilerTest.java b/logstash-core/src/test/java/org/logstash/config/ir/ConfigCompilerTest.java index e803bfbec..379a67d0e 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/ConfigCompilerTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/ConfigCompilerTest.java @@ -30,11 +30,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import org.jruby.javasupport.JavaUtil; -import org.jruby.runtime.builtin.IRubyObject; import org.junit.Test; -import org.logstash.RubyUtil; -import org.logstash.common.IncompleteSourceWithMetadataException; import org.logstash.common.SourceWithMetadata; import org.logstash.config.ir.graph.Graph; import org.logstash.config.ir.graph.PluginVertex; @@ -47,10 +43,9 @@ public class ConfigCompilerTest extends RubyEnvTestCase { @Test public void testConfigToPipelineIR() throws Exception { - IRubyObject swm = JavaUtil.convertJavaToRuby( - RubyUtil.RUBY, new SourceWithMetadata("proto", "path", 1, 1, "input {stdin{}} output{stdout{}}")); + SourceWithMetadata swm = new SourceWithMetadata("proto", "path", 1, 1, "input {stdin{}} output{stdout{}}"); final PipelineIR pipelineIR = - ConfigCompiler.configToPipelineIR(RubyUtil.RUBY.newArray(swm), false); + ConfigCompiler.configToPipelineIR(Collections.singletonList(swm), false); assertThat(pipelineIR.getOutputPluginVertices().size(), is(1)); assertThat(pipelineIR.getFilterPluginVertices().size(), is(0)); } @@ -99,9 +94,8 @@ public class ConfigCompilerTest extends RubyEnvTestCase { } private static String graphHash(final String config) throws InvalidIRException { - IRubyObject swm = JavaUtil.convertJavaToRuby( - RubyUtil.RUBY, new SourceWithMetadata("proto", "path", 1, 1, config)); - return ConfigCompiler.configToPipelineIR(RubyUtil.RUBY.newArray(swm), false).uniqueHash(); + SourceWithMetadata swm = new SourceWithMetadata("proto", "path", 1, 1, config); + return ConfigCompiler.configToPipelineIR(Collections.singletonList(swm), false).uniqueHash(); } @Test diff --git a/logstash-core/src/test/java/org/logstash/config/ir/PipelineConfigTest.java b/logstash-core/src/test/java/org/logstash/config/ir/PipelineConfigTest.java new file mode 100644 index 000000000..841042a43 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/config/ir/PipelineConfigTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.config.ir; + +import org.jruby.*; +import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Before; +import org.junit.Test; +import org.logstash.RubyUtil; +import org.logstash.common.IncompleteSourceWithMetadataException; +import org.logstash.common.SourceWithMetadata; + +import java.time.LocalDateTime; +import java.util.*; +import java.util.stream.Collectors; + +import static org.junit.Assert.*; + +public class PipelineConfigTest extends RubyEnvTestCase { + + public static final String PIPELINE_ID = "main"; + private RubyClass source; + private RubySymbol pipelineIdSym; + private String configMerged; + private SourceWithMetadata[] unorderedConfigParts; + + private final static RubyObject SETTINGS = (RubyObject) RubyUtil.RUBY.evalScriptlet( + "require 'logstash/environment'\n" + // this is needed to register "pipeline.system" setting + "require 'logstash/settings'\n" + + "LogStash::SETTINGS");; + private PipelineConfig sut; + private SourceWithMetadata[] orderedConfigParts; + public static final String PIPELINE_CONFIG_PART_2 = + "output {\n" + + " stdout\n" + + "}"; + public static final String PIPELINE_CONFIG_PART_1 = + "input {\n" + + " generator1\n" + + "}"; + + @Before + public void setUp() throws IncompleteSourceWithMetadataException { + + source = RubyUtil.RUBY.getClass("LogStash::Config::Source::Local"); + pipelineIdSym = RubySymbol.newSymbol(RubyUtil.RUBY, PIPELINE_ID); + + orderedConfigParts = new SourceWithMetadata[]{ + new SourceWithMetadata("file", "/tmp/1", 0, 0, "input { generator1 }"), + new SourceWithMetadata("file", "/tmp/2", 0, 0, "input { generator2 }"), + new SourceWithMetadata("file", "/tmp/3", 0, 0, "input { generator3 }"), + new SourceWithMetadata("file", "/tmp/4", 0, 0, "input { generator4 }"), + new SourceWithMetadata("file", "/tmp/5", 0, 0, "input { generator5 }"), + new SourceWithMetadata("file", "/tmp/6", 0, 0, "input { generator6 }"), + new SourceWithMetadata("string", "config_string", 0, 0, "input { generator1 }"), + }; + + configMerged = Arrays.stream(orderedConfigParts).map(SourceWithMetadata::getText).collect(Collectors.joining("\n")); + + List unorderedList = Arrays.asList(orderedConfigParts); + Collections.shuffle(unorderedList); + unorderedConfigParts = unorderedList.toArray(new SourceWithMetadata[0]); + + sut = new PipelineConfig(source, pipelineIdSym, toRubyArray(unorderedConfigParts), SETTINGS); + } + + @Test + public void testReturnsTheSource() { + assertEquals("returns the source", source, sut.getSource()); + assertEquals("returns the pipeline id", PIPELINE_ID, sut.getPipelineId()); + assertNotNull("returns the config_hash", sut.configHash()); + assertEquals("returns the merged `ConfigPart#config_string`", configMerged, sut.configString()); + assertTrue("records when the config was read", sut.getReadAt().isBefore(LocalDateTime.now())); + } + + @SuppressWarnings("rawtypes") + private static RubyArray toRubyArray(SourceWithMetadata[] arr) { + List wrappedContent = Arrays.stream(arr).map(RubyUtil::toRubyObject).collect(Collectors.toList()); + return RubyArray.newArray(RubyUtil.RUBY, wrappedContent); + } + + @Test + public void testObjectEqualityOnConfigHashAndPipelineId() { + PipelineConfig anotherExactPipeline = new PipelineConfig(source, pipelineIdSym, toRubyArray(orderedConfigParts), SETTINGS); + assertEquals(anotherExactPipeline, sut); + + PipelineConfig notMatchingPipeline = new PipelineConfig(source, pipelineIdSym, RubyArray.newEmptyArray(RubyUtil.RUBY), SETTINGS); + assertNotEquals(notMatchingPipeline, sut); + + PipelineConfig notSamePipelineId = new PipelineConfig(source, RubySymbol.newSymbol(RubyUtil.RUBY, "another_pipeline"), toRubyArray(unorderedConfigParts), SETTINGS); + assertNotEquals(notSamePipelineId, sut); + } + + @Test + public void testIsSystemWhenPipelineIsNotSystemPipeline() { + assertFalse("returns false if the pipeline is not a system pipeline", sut.isSystem()); + } + + @Test + public void testIsSystemWhenPipelineIsSystemPipeline() { + RubyObject mockedSettings = mockSettings(Collections.singletonMap("pipeline.system", true)); + sut = new PipelineConfig(source, pipelineIdSym, toRubyArray(unorderedConfigParts), mockedSettings); + + assertTrue("returns true if the pipeline is a system pipeline", sut.isSystem()); + } + + public RubyObject mockSettings(Map settingsValues) { + IRubyObject settings = SETTINGS.callMethod("clone"); + settingsValues.forEach((k, v) -> { + RubyString rk = RubyString.newString(RubyUtil.RUBY, k); + IRubyObject rv = RubyUtil.toRubyObject(v); + settings.callMethod(RubyUtil.RUBY.getCurrentContext(), "set", new IRubyObject[]{rk, rv}); + }); + return (RubyObject) settings; + } + + @Test + public void testSourceAndLineRemapping_pipelineDefinedInSingleFileOneLine() throws IncompleteSourceWithMetadataException { + String oneLinerPipeline = "input { generator1 }"; + final SourceWithMetadata swm = new SourceWithMetadata("file", "/tmp/1", 0, 0, oneLinerPipeline); + sut = new PipelineConfig(source, pipelineIdSym, toRubyArray(new SourceWithMetadata[]{swm}), SETTINGS); + + assertEquals("return the same line of the queried", 1, (int) sut.lookupSource(1, 0).getLine()); + } + + @Test + public void testSourceAndLineRemapping_pipelineDefinedInSingleFileMultiLine() throws IncompleteSourceWithMetadataException { + final SourceWithMetadata swm = new SourceWithMetadata("file", "/tmp/1", 0, 0, PIPELINE_CONFIG_PART_1); + sut = new PipelineConfig(source, pipelineIdSym, toRubyArray(new SourceWithMetadata[]{swm}), SETTINGS); + + assertEquals("return the same line of the queried L1", 1, (int) sut.lookupSource(1, 0).getLine()); + assertEquals("return the same line of the queried L2", 2, (int) sut.lookupSource(2, 0).getLine()); + } + + @Test(expected = IllegalArgumentException.class) + public void testSourceAndLineRemapping_pipelineDefinedInSingleFileMultiLine_dontmatch() throws IncompleteSourceWithMetadataException { + final SourceWithMetadata swm = new SourceWithMetadata("file", "/tmp/1", 0, 0, PIPELINE_CONFIG_PART_1); + sut = new PipelineConfig(source, pipelineIdSym, toRubyArray(new SourceWithMetadata[]{swm}), SETTINGS); + + sut.lookupSource(100, -1); + } + + @Test + public void testSourceAndLineRemapping_pipelineDefinedMInMultipleFiles() throws IncompleteSourceWithMetadataException { + final SourceWithMetadata[] parts = { + new SourceWithMetadata("file", "/tmp/input", 0, 0, PIPELINE_CONFIG_PART_1), + new SourceWithMetadata("file", "/tmp/output", 0, 0, PIPELINE_CONFIG_PART_2) + }; + sut = new PipelineConfig(source, pipelineIdSym, toRubyArray(parts), SETTINGS); + + assertEquals("return the line of first segment", 2, (int) sut.lookupSource(2, 0).getLine()); + assertEquals("return the id of first segment", "/tmp/input", sut.lookupSource(2, 0).getId()); + assertEquals("return the line of second segment", 1, (int) sut.lookupSource(4, 0).getLine()); + assertEquals("return the id of second segment", "/tmp/output", sut.lookupSource(4, 0).getId()); + } + + @Test(expected = IllegalArgumentException.class) + public void testSourceAndLineRemapping_pipelineDefinedMInMultipleFiles_dontmatch() throws IncompleteSourceWithMetadataException { + final SourceWithMetadata[] parts = { + new SourceWithMetadata("file", "/tmp/input", 0, 0, PIPELINE_CONFIG_PART_1), + new SourceWithMetadata("file", "/tmp/output", 0, 0, PIPELINE_CONFIG_PART_2) + }; + sut = new PipelineConfig(source, pipelineIdSym, toRubyArray(parts), SETTINGS); + + sut.lookupSource(100, 0); + } + + @Test + public void testSourceAndLineRemapping_pipelineDefinedMInMultipleFiles_withEmptyLinesInTheMiddle() throws IncompleteSourceWithMetadataException { + final SourceWithMetadata[] parts = { + new SourceWithMetadata("file", "/tmp/input", 0, 0, PIPELINE_CONFIG_PART_1 + "\n"), + new SourceWithMetadata("file", "/tmp/output", 0, 0, PIPELINE_CONFIG_PART_2) + }; + sut = new PipelineConfig(source, pipelineIdSym, toRubyArray(parts), SETTINGS); + + assertEquals("shouldn't slide the line mapping of subsequent", 1, (int) sut.lookupSource(4, 0).getLine()); + assertEquals("shouldn't slide the id mapping of subsequent", "/tmp/output", sut.lookupSource(4, 0).getId()); + } +} diff --git a/logstash-core/src/test/java/org/logstash/plugins/PluginFactoryExtTest.java b/logstash-core/src/test/java/org/logstash/plugins/PluginFactoryExtTest.java index 30d106032..f97c21d1b 100644 --- a/logstash-core/src/test/java/org/logstash/plugins/PluginFactoryExtTest.java +++ b/logstash-core/src/test/java/org/logstash/plugins/PluginFactoryExtTest.java @@ -21,14 +21,11 @@ package org.logstash.plugins; import co.elastic.logstash.api.*; -import org.jruby.RubyArray; import org.jruby.RubyHash; import org.jruby.RubyString; -import org.jruby.javasupport.JavaUtil; import org.jruby.runtime.builtin.IRubyObject; import org.junit.Test; import org.logstash.RubyUtil; -import org.logstash.common.IncompleteSourceWithMetadataException; import org.logstash.common.SourceWithMetadata; import org.logstash.config.ir.ConfigCompiler; import org.logstash.config.ir.InvalidIRException; @@ -109,12 +106,8 @@ public final class PluginFactoryExtTest extends RubyEnvTestCase { assertEquals("Resolved config setting MUST be evaluated with substitution", envVars.get("CUSTOM"), id.toString()); } - @SuppressWarnings("rawtypes") private static PipelineIR compilePipeline(SourceWithMetadata sourceWithMetadata) throws InvalidIRException { - RubyArray sourcesWithMetadata = RubyUtil.RUBY.newArray(JavaUtil.convertJavaToRuby( - RubyUtil.RUBY, sourceWithMetadata)); - - return ConfigCompiler.configToPipelineIR(sourcesWithMetadata, false); + return ConfigCompiler.configToPipelineIR(Collections.singletonList(sourceWithMetadata), false); } private static PluginFactoryExt.ExecutionContext createExecutionContextFactory() { diff --git a/x-pack/lib/config_management/elasticsearch_source.rb b/x-pack/lib/config_management/elasticsearch_source.rb index edc4af3b1..9e04121fc 100644 --- a/x-pack/lib/config_management/elasticsearch_source.rb +++ b/x-pack/lib/config_management/elasticsearch_source.rb @@ -2,7 +2,6 @@ # or more contributor license agreements. Licensed under the Elastic License; # you may not use this file except in compliance with the Elastic License. -require "logstash/config/pipeline_config" require "logstash/config/source/base" require "logstash/config/source_loader" require "logstash/outputs/elasticsearch" @@ -112,7 +111,7 @@ module LogStash end end - LogStash::Config::PipelineConfig.new(self.class.name, pipeline_id.to_sym, config_part, settings) + Java::OrgLogstashConfigIr::PipelineConfig.new(self.class, pipeline_id.to_sym, [config_part], settings) end # This is a bit of a hack until we refactor the ElasticSearch plugins diff --git a/x-pack/lib/monitoring/monitoring.rb b/x-pack/lib/monitoring/monitoring.rb index 4d1ff269e..9a5fb9100 100644 --- a/x-pack/lib/monitoring/monitoring.rb +++ b/x-pack/lib/monitoring/monitoring.rb @@ -4,7 +4,6 @@ require "logstash/agent" require "monitoring/internal_pipeline_source" -require "logstash/config/pipeline_config" require 'helpers/elasticsearch_options' module LogStash @@ -181,7 +180,7 @@ module LogStash logger.debug("compiled metrics pipeline config: ", :config => config) config_part = org.logstash.common.SourceWithMetadata.new("x-pack-metrics", "internal_pipeline_source", config) - LogStash::Config::PipelineConfig.new(self, PIPELINE_ID.to_sym, config_part, settings) + Java::OrgLogstashConfigIr::PipelineConfig.new(self.class, PIPELINE_ID.to_sym, [config_part], settings) end def generate_pipeline_config(settings) diff --git a/x-pack/spec/config_management/elasticsearch_source_spec.rb b/x-pack/spec/config_management/elasticsearch_source_spec.rb index d7d3250ef..7eb385dd1 100644 --- a/x-pack/spec/config_management/elasticsearch_source_spec.rb +++ b/x-pack/spec/config_management/elasticsearch_source_spec.rb @@ -268,7 +268,7 @@ describe LogStash::ConfigManagement::ElasticsearchSource do pipeline_config = subject.pipeline_configs expect(pipeline_config.first.config_string).to match(config) - expect(pipeline_config.first.pipeline_id).to eq(pipeline_id.to_sym) + expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym) end it "ignores non-whitelisted and invalid settings" do @@ -295,7 +295,7 @@ describe LogStash::ConfigManagement::ElasticsearchSource do pipeline_config = subject.pipeline_configs expect(pipeline_config.first.config_string).to match(config) - expect(pipeline_config.first.pipeline_id).to eq(pipeline_id.to_sym) + expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym) end end @@ -397,7 +397,7 @@ describe LogStash::ConfigManagement::ElasticsearchSource do pipeline_config = subject.pipeline_configs expect(pipeline_config.first.config_string).to match(config) - expect(pipeline_config.first.pipeline_id).to eq(pipeline_id.to_sym) + expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym) end end end @@ -433,7 +433,7 @@ describe LogStash::ConfigManagement::ElasticsearchSource do pipeline_config = subject.pipeline_configs expect(pipeline_config.collect(&:config_string)).to include(*pipelines.values) - expect(pipeline_config.collect(&:pipeline_id)).to include(*pipelines.keys.collect(&:to_sym)) + expect(pipeline_config.map(&:pipeline_id).collect(&:to_sym)).to include(*pipelines.keys.collect(&:to_sym)) end end end diff --git a/x-pack/spec/modules/azure/filters/azure_event_spec.rb b/x-pack/spec/modules/azure/filters/azure_event_spec.rb index 43db6f7e9..a12671193 100644 --- a/x-pack/spec/modules/azure/filters/azure_event_spec.rb +++ b/x-pack/spec/modules/azure/filters/azure_event_spec.rb @@ -6,9 +6,11 @@ require 'x-pack/logstash_registry' require 'logstash/devutils/rspec/spec_helper' require 'logstash/json' require 'filters/azure_event' +require 'logstash/config/pipeline_config' describe LogStash::Filters::AzureEvent do + describe "Parses the admin activity log" do let(:config) do <<-CONFIG diff --git a/x-pack/spec/monitoring/inputs/metrics/state_event_factory_spec.rb b/x-pack/spec/monitoring/inputs/metrics/state_event_factory_spec.rb index ad8498a68..5c6e0cc74 100644 --- a/x-pack/spec/monitoring/inputs/metrics/state_event_factory_spec.rb +++ b/x-pack/spec/monitoring/inputs/metrics/state_event_factory_spec.rb @@ -14,7 +14,7 @@ describe LogStash::Inputs::Metrics::StateEventFactory do let(:config) { config_part = org.logstash.common.SourceWithMetadata.new("local", "...", 0, 0, "input { dummyblockinginput { } } output { null { } }") - LogStash::Config::PipelineConfig.new("DummySource", "fake_main", [config_part], LogStash::SETTINGS) + Java::OrgLogstashConfigIr::PipelineConfig.new("DummySource".class, "fake_main".to_sym, [config_part], LogStash::SETTINGS) } let(:pipeline_settings) { LogStash::Runner::SYSTEM_SETTINGS.clone.merge({ diff --git a/x-pack/spec/monitoring/inputs/metrics/stats_event_factory_spec.rb b/x-pack/spec/monitoring/inputs/metrics/stats_event_factory_spec.rb index 351323a20..905356fe8 100644 --- a/x-pack/spec/monitoring/inputs/metrics/stats_event_factory_spec.rb +++ b/x-pack/spec/monitoring/inputs/metrics/stats_event_factory_spec.rb @@ -3,6 +3,7 @@ # you may not use this file except in compliance with the Elastic License. require "monitoring/inputs/metrics/stats_event_factory" +require "logstash/config/pipeline_config" require 'json' describe LogStash::Inputs::Metrics::StatsEventFactory do diff --git a/x-pack/spec/monitoring/internal_pipeline_source_spec.rb b/x-pack/spec/monitoring/internal_pipeline_source_spec.rb index 3ebaabf12..4a23e9f9c 100644 --- a/x-pack/spec/monitoring/internal_pipeline_source_spec.rb +++ b/x-pack/spec/monitoring/internal_pipeline_source_spec.rb @@ -6,7 +6,6 @@ require "logstash-core" require "logstash/agent" require "logstash/runner" require "monitoring/inputs/metrics" -require "logstash/config/pipeline_config" require "logstash/config/source/local" require 'license_checker/x_pack_info' require "rspec/wait" @@ -53,7 +52,7 @@ describe LogStash::Monitoring::InternalPipelineSource do let(:unordered_config_parts) { ordered_config_parts.shuffle } - let(:pipeline_config) { LogStash::Config::PipelineConfig.new(source, pipeline_id, unordered_config_parts, system_settings) } + let(:pipeline_config) { Java::OrgLogstashConfigIr::PipelineConfig.new(source, pipeline_id, unordered_config_parts, system_settings) } let(:es_options) do {