Backport to 7.x of PR #11824

Refactor: move PipelineConfig from Ruby to Java

Reimplement the Ruby class PipelinceConfig in Java trying to keep the method signatures to limit the changes in client code, this is a step of other that intend to move all the configuration code in Java language.
Having all that code in Java unlock some reasoning about how to better implement it and probably an improvement in performance during process startup.
Moved also the spec into a JUnit and fixed here and there the failing tests

Closes: #11824
This commit is contained in:
andsel 2020-04-14 18:01:44 +02:00 committed by J.A.R.V.I.S. - an Elastic git bot
parent 72f4e2f8e2
commit c6795731f1
32 changed files with 415 additions and 295 deletions

View file

@ -92,6 +92,7 @@ tasks.register("javaTests", Test) {
exclude '/org/logstash/config/ir/ConfigCompilerTest.class'
exclude '/org/logstash/config/ir/CompiledPipelineTest.class'
exclude '/org/logstash/config/ir/EventConditionTest.class'
exclude '/org/logstash/config/ir/PipelineConfigTest.class'
exclude '/org/logstash/config/ir/compiler/OutputDelegatorTest.class'
exclude '/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.class'
exclude '/org/logstash/plugins/NamespacedMetricImplTest.class'
@ -107,6 +108,7 @@ tasks.register("rubyTests", Test) {
include '/org/logstash/config/ir/ConfigCompilerTest.class'
include '/org/logstash/config/ir/CompiledPipelineTest.class'
include '/org/logstash/config/ir/EventConditionTest.class'
include '/org/logstash/config/ir/PipelineConfigTest.class'
include '/org/logstash/config/ir/compiler/OutputDelegatorTest.class'
include '/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.class'
include '/org/logstash/plugins/NamespacedMetricImplTest.class'

View file

@ -22,6 +22,7 @@ require "logstash/instrument/periodic_pollers"
require "logstash/pipeline"
require "logstash/webserver"
require "logstash/config/source_loader"
require "logstash/config/pipeline_config"
require "logstash/pipeline_action"
require "logstash/state_resolver"
require "logstash/pipelines_registry"

View file

@ -15,77 +15,6 @@
# specific language governing permissions and limitations
# under the License.
require "digest"
module LogStash module Config
class PipelineConfig
include LogStash::Util::Loggable
LineToSource = Struct.new("LineToSource", :bounds, :source)
attr_reader :source, :pipeline_id, :config_parts, :settings, :read_at
def initialize(source, pipeline_id, config_parts, settings)
@source = source
@pipeline_id = pipeline_id
# We can't use Array() since config_parts may be a java object!
config_parts_array = config_parts.is_a?(Array) ? config_parts : [config_parts]
@config_parts = config_parts_array.sort_by { |config_part| [config_part.protocol.to_s, config_part.id] }
@settings = settings
@read_at = Time.now
end
def config_hash
@config_hash ||= Digest::SHA1.hexdigest(config_string)
end
def config_string
@config_string = config_parts.collect(&:text).join("\n")
end
def system?
@settings.get("pipeline.system")
end
def ==(other)
config_hash == other.config_hash && pipeline_id == other.pipeline_id && settings == other.settings
end
def display_debug_information
logger.debug("-------- Logstash Config ---------")
logger.debug("Config from source", :source => source, :pipeline_id => pipeline_id)
config_parts.each do |config_part|
logger.debug("Config string", :protocol => config_part.protocol, :id => config_part.id)
logger.debug("\n\n#{config_part.text}")
end
logger.debug("Merged config")
logger.debug("\n\n#{config_string}")
end
def lookup_source(global_line_number, source_column)
res = source_references.find { |line_to_source| line_to_source.bounds.include? global_line_number }
if res == nil
raise IndexError, "can't find the config segment related to line #{global_line_number}"
end
swm = res.source
SourceWithMetadata.new(swm.getProtocol(), swm.getId(), global_line_number + 1 - res.bounds.begin, source_column, swm.getText())
end
private
def source_references
@source_refs ||= begin
offset = 0
source_refs = []
config_parts.each do |config_part|
#line numbers starts from 1 in text files
lines_range = (config_part.getLine() + offset + 1..config_part.getLinesCount() + offset)
source_segment = LineToSource.new(lines_range, config_part)
source_refs << source_segment
offset += config_part.getLinesCount()
end
source_refs.freeze
end
end
end
end end
module LogStash::Config
java_import org.logstash.config.ir.PipelineConfig
end

View file

@ -212,7 +212,7 @@ module LogStash module Config module Source
return [] if config_parts.empty?
[PipelineConfig.new(self.class, @settings.get("pipeline.id").to_sym, config_parts, @settings)]
[org.logstash.config.ir.PipelineConfig.new(self.class, @settings.get("pipeline.id").to_sym, config_parts, @settings)]
end
def automatic_reload_with_config_string?

View file

@ -17,7 +17,6 @@
require "logstash/config/source/base"
require "logstash/config/modules_common"
require "logstash/config/pipeline_config"
module LogStash module Config module Source
class Modules < Base
@ -29,7 +28,7 @@ module LogStash module Config module Source
pipelines = LogStash::Config::ModulesCommon.pipeline_configs(@settings)
pipelines.map do |hash|
PipelineConfig.new(self, hash["pipeline_id"].to_sym,
org.logstash.config.ir.PipelineConfig.new(self.class, hash["pipeline_id"].to_sym,
org.logstash.common.SourceWithMetadata.new("module", hash["alt_name"], 0, 0, hash["config_string"]),
hash["settings"])
end

View file

@ -63,7 +63,6 @@ module LogStash module Config
sources do |source|
sources_loaders << source if source.match?
end
if sources_loaders.empty?
# This shouldn't happen with the settings object or with any external plugins.
# but lets add a guard so we fail fast.

View file

@ -34,7 +34,7 @@ module LogStash module PipelineAction
end
def pipeline_id
@pipeline_config.pipeline_id
@pipeline_config.pipeline_id.to_sym
end
# Make sure we execution system pipeline like the monitoring

View file

@ -29,7 +29,7 @@ module LogStash module PipelineAction
end
def pipeline_id
@pipeline_config.pipeline_id
@pipeline_config.pipeline_id.to_sym
end
def to_s

View file

@ -142,7 +142,7 @@ module LogStash
# @param pipeline_id [String, Symbol] the pipeline id
# @return [Pipeline] the pipeline object or nil if none for pipeline_id
def get_pipeline(pipeline_id)
state = @states.get(pipeline_id)
state = @states.get(pipeline_id.to_sym)
state.nil? ? nil : state.pipeline
end

View file

@ -41,7 +41,7 @@ module LogStash
end
end
configured_pipelines = pipeline_configs.collect(&:pipeline_id)
configured_pipelines = pipeline_configs.map { |config| config.pipeline_id.to_sym }
# If one of the running pipeline is not in the pipeline_configs, we assume that we need to
# stop it.

View file

@ -18,7 +18,6 @@
require "spec_helper"
require "stud/temporary"
require "logstash/inputs/generator"
require "logstash/config/pipeline_config"
require "logstash/config/source/local"
require_relative "../support/mocks_classes"
require "fileutils"
@ -85,7 +84,7 @@ describe LogStash::Agent do
it "should delegate settings to new pipeline" do
expect(LogStash::JavaPipeline).to receive(:new) do |arg1, arg2|
expect(arg1).to eq(config_string)
expect(arg1.to_s).to eq(config_string)
expect(arg2.to_hash).to include(agent_args)
end
subject.converge_state_and_update

View file

@ -1,166 +0,0 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
require "logstash/config/pipeline_config"
require "logstash/config/source/local"
describe LogStash::Config::PipelineConfig do
let(:source) { LogStash::Config::Source::Local }
let(:pipeline_id) { :main }
let(:ordered_config_parts) do
[
org.logstash.common.SourceWithMetadata.new("file", "/tmp/1", 0, 0, "input { generator1 }"),
org.logstash.common.SourceWithMetadata.new("file", "/tmp/2", 0, 0, "input { generator2 }"),
org.logstash.common.SourceWithMetadata.new("file", "/tmp/3", 0, 0, "input { generator3 }"),
org.logstash.common.SourceWithMetadata.new("file", "/tmp/4", 0, 0, "input { generator4 }"),
org.logstash.common.SourceWithMetadata.new("file", "/tmp/5", 0, 0, "input { generator5 }"),
org.logstash.common.SourceWithMetadata.new("file", "/tmp/6", 0, 0, "input { generator6 }"),
org.logstash.common.SourceWithMetadata.new("string", "config_string", 0, 0, "input { generator1 }"),
]
end
let(:unordered_config_parts) { ordered_config_parts.shuffle }
let(:settings) { LogStash::SETTINGS }
subject { described_class.new(source, pipeline_id, unordered_config_parts, settings) }
it "returns the source" do
expect(subject.source).to eq(source)
end
it "returns the pipeline id" do
expect(subject.pipeline_id).to eq(pipeline_id)
end
it "returns the sorted config parts" do
expect(subject.config_parts).to eq(ordered_config_parts)
end
it "returns the config_hash" do
expect(subject.config_hash).not_to be_nil
end
it "returns the merged `ConfigPart#config_string`" do
expect(subject.config_string).to eq(ordered_config_parts.collect(&:text).join("\n"))
end
it "records when the config was read" do
expect(subject.read_at).to be <= Time.now
end
it "does object equality on config_hash and pipeline_id" do
another_exact_pipeline = described_class.new(source, pipeline_id, ordered_config_parts, settings)
expect(subject).to eq(another_exact_pipeline)
not_matching_pipeline = described_class.new(source, pipeline_id, [], settings)
expect(subject).not_to eq(not_matching_pipeline)
not_same_pipeline_id = described_class.new(source, :another_pipeline, unordered_config_parts, settings)
expect(subject).not_to eq(not_same_pipeline_id)
end
describe "#system?" do
context "when the pipeline is a system pipeline" do
let(:settings) { mock_settings({ "pipeline.system" => true })}
it "returns true if the pipeline is a system pipeline" do
expect(subject.system?).to be_truthy
end
end
context "when is not a system pipeline" do
it "returns false if the pipeline is not a system pipeline" do
expect(subject.system?).to be_falsey
end
end
end
describe "source and line remapping" do
context "when pipeline is constructed from single file single line" do
let (:pipeline_conf_string) { 'input { generator1 }' }
subject { described_class.new(source, pipeline_id, [org.logstash.common.SourceWithMetadata.new("file", "/tmp/1", 0, 0, pipeline_conf_string)], settings) }
it "return the same line of the queried" do
expect(subject.lookup_source(1, 0).getLine()).to eq(1)
end
end
context "when pipeline is constructed from single file" do
let (:pipeline_conf_string) { 'input {
generator1
}' }
subject { described_class.new(source, pipeline_id, [org.logstash.common.SourceWithMetadata.new("file", "/tmp/1", 0, 0, pipeline_conf_string)], settings) }
it "return the same line of the queried" do
expect(subject.lookup_source(1, 0).getLine()).to eq(1)
expect(subject.lookup_source(2, 0).getLine()).to eq(2)
end
it "throw exception if line is out of bound" do
expect { subject.lookup_source(100, -1) }.to raise_exception(IndexError)
end
end
context "when pipeline is constructed from multiple files" do
let (:pipeline_conf_string_part1) { 'input {
generator1
}' }
let (:pipeline_conf_string_part2) { 'output {
stdout
}' }
let(:merged_config_parts) do
[
org.logstash.common.SourceWithMetadata.new("file", "/tmp/input", 0, 0, pipeline_conf_string_part1),
org.logstash.common.SourceWithMetadata.new("file", "/tmp/output", 0, 0, pipeline_conf_string_part2)
]
end
subject { described_class.new(source, pipeline_id, merged_config_parts, settings) }
it "return the line of first segment" do
expect(subject.lookup_source(2, 0).getLine()).to eq(2)
expect(subject.lookup_source(2, 0).getId()).to eq("/tmp/input")
end
it "return the line of second segment" do
expect(subject.lookup_source(4, 0).getLine()).to eq(1)
expect(subject.lookup_source(4, 0).getId()).to eq("/tmp/output")
end
it "throw exception if line is out of bound" do
expect { subject.lookup_source(100, 0) }.to raise_exception(IndexError)
end
end
context "when pipeline is constructed from multiple files and the first has trailing newline" do
let (:pipeline_conf_string_part1) { "input {\n generator1\n}\n" }
let (:pipeline_conf_string_part2) { 'output {
stdout
}' }
let(:merged_config_parts) do
[
org.logstash.common.SourceWithMetadata.new("file", "/tmp/input", 0, 0, pipeline_conf_string_part1),
org.logstash.common.SourceWithMetadata.new("file", "/tmp/output", 0, 0, pipeline_conf_string_part2)
]
end
subject { described_class.new(source, pipeline_id, merged_config_parts, settings) }
it "shouldn't slide the mapping of subsequent" do
expect(subject.lookup_source(4, 0).getLine()).to eq(1)
expect(subject.lookup_source(4, 0).getId()).to eq("/tmp/output")
end
end
end
end

View file

@ -21,7 +21,7 @@ require_relative "../../support/helpers"
def temporary_pipeline_config(id, source, reader = "random_reader")
config_part = org.logstash.common.SourceWithMetadata.new("local", "...", 0, 0, "input {} output {}")
LogStash::Config::PipelineConfig.new(source, id, [config_part], LogStash::SETTINGS)
org.logstash.config.ir.PipelineConfig.new(source, id.to_sym, [config_part], LogStash::SETTINGS)
end
class DummySource < LogStash::Config::Source::Base

View file

@ -22,6 +22,7 @@ require_relative "../support/mocks_classes"
require_relative "../support/helpers"
require "stud/try"
require 'timeout'
require 'logstash/config/pipeline_config'
class DummyInput < LogStash::Inputs::Base
config_name "dummyinput"

View file

@ -20,9 +20,9 @@ require "logstash/pipelines_registry"
describe LogStash::PipelinesRegistry do
let(:pipeline_id) { "test" }
let(:pipeline_id) { "test".to_sym }
let(:pipeline) { double("Pipeline") }
let (:logger) { double("Logger") }
let(:logger) { double("Logger") }
context "at object creation" do
it "should be empty" do

View file

@ -19,7 +19,6 @@ require "spec_helper"
require_relative "../support/helpers"
require_relative "../support/matchers"
require "logstash/state_resolver"
require "logstash/config/pipeline_config"
require "logstash/pipeline"
require "ostruct"
require "digest"

View file

@ -86,7 +86,7 @@ def mock_pipeline_config(pipeline_id, config_string = nil, settings = {})
config_part = org.logstash.common.SourceWithMetadata.new("config_string", "config_string", 0, 0, config_string)
LogStash::Config::PipelineConfig.new(LogStash::Config::Source::Local, pipeline_id, config_part, settings)
org.logstash.config.ir.PipelineConfig.new(LogStash::Config::Source::Local, pipeline_id.to_sym, [config_part], settings)
end
def start_agent(agent)

View file

@ -17,7 +17,6 @@
require "rspec"
require "rspec/expectations"
require "logstash/config/pipeline_config"
require "stud/try"
RSpec::Matchers.define :be_a_metric_event do |namespace, type, *args|

View file

@ -76,7 +76,7 @@ module PipelineHelpers
let(:pipeline) do
settings.set_value("queue.drain", true)
LogStash::JavaPipeline.new(
LogStash::Config::PipelineConfig.new(
org.logstash.config.ir.PipelineConfig.new(
LogStash::Config::Source::Local, :main,
SourceWithMetadata.new(
"config_string", "config_string",

View file

@ -53,9 +53,9 @@ public final class ConfigCompiler {
* @throws InvalidIRException if the the configuration contains errors
*/
@SuppressWarnings("unchecked")
public static PipelineIR configToPipelineIR(final @SuppressWarnings("rawtypes") RubyArray sourcesWithMetadata,
public static PipelineIR configToPipelineIR(final List<SourceWithMetadata> sourcesWithMetadata,
final boolean supportEscapes) throws InvalidIRException {
return compileSources((List<SourceWithMetadata>) sourcesWithMetadata, supportEscapes);
return compileSources(sourcesWithMetadata, supportEscapes);
}
public static PipelineIR compileSources(List<SourceWithMetadata> sourcesWithMetadata, boolean supportEscapes) throws InvalidIRException {

View file

@ -0,0 +1,177 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.logstash.config.ir;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jruby.*;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.common.IncompleteSourceWithMetadataException;
import org.logstash.common.SourceWithMetadata;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
import static org.logstash.RubyUtil.RUBY;
public final class PipelineConfig {
private static class LineToSource {
private final int startLine;
private final int endLine;
private final SourceWithMetadata source;
LineToSource(int startLine, int endLine, SourceWithMetadata source) {
this.startLine = startLine;
this.endLine = endLine;
this.source = source;
}
boolean includeLine(int lineNumber) {
return startLine <= lineNumber && lineNumber <= endLine;
}
}
private static final Logger logger = LogManager.getLogger(PipelineConfig.class);
private RubyClass source;
private String pipelineId;
private List<SourceWithMetadata> confParts;
private RubyObject settings;
private LocalDateTime readAt;
private String configHash;
private String configString;
private List<LineToSource> sourceRefs;
@SuppressWarnings({"rawtypes", "unchecked"})
public PipelineConfig(RubyClass source, RubySymbol pipelineId, RubyObject uncastedConfigParts, RubyObject logstashSettings) {
IRubyObject uncasted = uncastedConfigParts.checkArrayType();
final RubyArray configParts = !uncasted.isNil() ?
(RubyArray) uncasted :
RubyArray.newArray(RUBY, uncastedConfigParts);
this.source = source;
this.pipelineId = pipelineId.toString();
SourceWithMetadata[] castedConfigParts = (SourceWithMetadata[]) configParts.toJava(SourceWithMetadata[].class);
List<SourceWithMetadata> confParts = Arrays.asList(castedConfigParts);
confParts.sort(Comparator.comparing(SourceWithMetadata::getProtocol)
.thenComparing(SourceWithMetadata::getId));
this.confParts = confParts;
this.settings = logstashSettings;
this.readAt = LocalDateTime.now();
}
public RubyClass getSource() {
return source;
}
public String getPipelineId() {
return pipelineId;
}
public List<SourceWithMetadata> getConfigParts() {
return confParts;
}
public LocalDateTime getReadAt() {
return readAt;
}
public RubyObject getSettings() {
return settings;
}
public String configHash() {
if (configHash == null) {
configHash = DigestUtils.sha1Hex(configString());
}
return configHash;
}
public String configString() {
this.configString = confParts.stream().map(SourceWithMetadata::getText).collect(Collectors.joining("\n"));
return this.configString;
}
public boolean isSystem() {
return this.settings.callMethod(RUBY.getCurrentContext(), "get_value",
RubyString.newString(RUBY, "pipeline.system"))
.isTrue();
}
@Override
public boolean equals(Object other) {
if (!(other instanceof PipelineConfig)) {
return false;
}
PipelineConfig cother = (PipelineConfig) other;
return configHash().equals(cother.configHash()) &&
this.pipelineId.equals(cother.pipelineId) &&
this.settings.eql(cother.settings);
}
@Override
public int hashCode() {
return this.configHash().hashCode();
}
public void displayDebugInformation() {
logger.debug("-------- Logstash Config ---------");
logger.debug("Config from source, source: {}, pipeline_id:: {}", source, pipelineId);
for (SourceWithMetadata configPart : this.confParts) {
logger.debug("Config string, protocol: {}, id: {}", configPart.getProtocol(), configPart.getId());
logger.debug("\n\n{}", configPart.getText());
}
logger.debug("Merged config");
logger.debug("\n\n{}", this.configString());
}
public SourceWithMetadata lookupSource(int globalLineNumber, int sourceColumn)
throws IncompleteSourceWithMetadataException {
LineToSource lts = this.sourceReferences().stream()
.filter(lts1 -> lts1.includeLine(globalLineNumber))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("can't find the config segment related to line " + globalLineNumber));
return new SourceWithMetadata(lts.source.getProtocol(), lts.source.getId(),
globalLineNumber + 1 - lts.startLine, sourceColumn, lts.source.getText());
}
private List<LineToSource> sourceReferences() {
if (this.sourceRefs == null) {
int offset = 0;
List<LineToSource> sourceRefs = new ArrayList<>();
for (SourceWithMetadata configPart : confParts) {
//line numbers starts from 1 in text files
int startLine = configPart.getLine() + offset + 1;
int endLine = configPart.getLinesCount() + offset;
LineToSource sourceSegment = new LineToSource(startLine, endLine, configPart);
sourceRefs.add(sourceSegment);
offset += configPart.getLinesCount();
}
this.sourceRefs = Collections.unmodifiableList(sourceRefs);
}
return this.sourceRefs;
}
}

View file

@ -50,10 +50,10 @@ import org.logstash.ackedqueue.QueueFactoryExt;
import org.logstash.ackedqueue.ext.JRubyAckedQueueExt;
import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
import org.logstash.common.DeadLetterQueueFactory;
import org.logstash.common.IncompleteSourceWithMetadataException;
import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.ConfigCompiler;
import org.logstash.config.ir.InvalidIRException;
import org.logstash.config.ir.PipelineConfig;
import org.logstash.config.ir.PipelineIR;
import org.logstash.ext.JRubyAbstractQueueWriteClientExt;
import org.logstash.ext.JRubyWrappedWriteClientExt;
@ -115,7 +115,7 @@ public class AbstractPipelineExt extends RubyBasicObject {
private RubyString configString;
@SuppressWarnings("rawtypes")
private RubyArray configParts;
private List<SourceWithMetadata> configParts;
private RubyString configHash;
@ -151,7 +151,7 @@ public class AbstractPipelineExt extends RubyBasicObject {
);
pipelineSettings = pipelineConfig;
configString = (RubyString) pipelineSettings.callMethod(context, "config_string");
configParts = (RubyArray) pipelineSettings.callMethod(context, "config_parts");
configParts = pipelineSettings.toJava(PipelineConfig.class).getConfigParts();
configHash = context.runtime.newString(
Hex.encodeHexString(
MessageDigest.getInstance("SHA1").digest(configString.getBytes())
@ -397,10 +397,8 @@ public class AbstractPipelineExt extends RubyBasicObject {
@JRubyMethod(name = "pipeline_source_details", visibility = Visibility.PROTECTED)
@SuppressWarnings("rawtypes")
public RubyArray getPipelineSourceDetails(final ThreadContext context) {
RubyArray res = configParts;
List<RubyString> pipelineSources = new ArrayList<>(res.size());
for (IRubyObject part : res.toJavaArray()) {
SourceWithMetadata sourceWithMetadata = part.toJava(SourceWithMetadata.class);
List<RubyString> pipelineSources = new ArrayList<>(configParts.size());
for (SourceWithMetadata sourceWithMetadata : configParts) {
String protocol = sourceWithMetadata.getProtocol();
switch (protocol) {
case "string":

View file

@ -30,11 +30,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.builtin.IRubyObject;
import org.junit.Test;
import org.logstash.RubyUtil;
import org.logstash.common.IncompleteSourceWithMetadataException;
import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.graph.Graph;
import org.logstash.config.ir.graph.PluginVertex;
@ -47,10 +43,9 @@ public class ConfigCompilerTest extends RubyEnvTestCase {
@Test
public void testConfigToPipelineIR() throws Exception {
IRubyObject swm = JavaUtil.convertJavaToRuby(
RubyUtil.RUBY, new SourceWithMetadata("proto", "path", 1, 1, "input {stdin{}} output{stdout{}}"));
SourceWithMetadata swm = new SourceWithMetadata("proto", "path", 1, 1, "input {stdin{}} output{stdout{}}");
final PipelineIR pipelineIR =
ConfigCompiler.configToPipelineIR(RubyUtil.RUBY.newArray(swm), false);
ConfigCompiler.configToPipelineIR(Collections.singletonList(swm), false);
assertThat(pipelineIR.getOutputPluginVertices().size(), is(1));
assertThat(pipelineIR.getFilterPluginVertices().size(), is(0));
}
@ -99,9 +94,8 @@ public class ConfigCompilerTest extends RubyEnvTestCase {
}
private static String graphHash(final String config) throws InvalidIRException {
IRubyObject swm = JavaUtil.convertJavaToRuby(
RubyUtil.RUBY, new SourceWithMetadata("proto", "path", 1, 1, config));
return ConfigCompiler.configToPipelineIR(RubyUtil.RUBY.newArray(swm), false).uniqueHash();
SourceWithMetadata swm = new SourceWithMetadata("proto", "path", 1, 1, config);
return ConfigCompiler.configToPipelineIR(Collections.singletonList(swm), false).uniqueHash();
}
@Test

View file

@ -0,0 +1,196 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.logstash.config.ir;
import org.jruby.*;
import org.jruby.runtime.builtin.IRubyObject;
import org.junit.Before;
import org.junit.Test;
import org.logstash.RubyUtil;
import org.logstash.common.IncompleteSourceWithMetadataException;
import org.logstash.common.SourceWithMetadata;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
import static org.junit.Assert.*;
public class PipelineConfigTest extends RubyEnvTestCase {
public static final String PIPELINE_ID = "main";
private RubyClass source;
private RubySymbol pipelineIdSym;
private String configMerged;
private SourceWithMetadata[] unorderedConfigParts;
private final static RubyObject SETTINGS = (RubyObject) RubyUtil.RUBY.evalScriptlet(
"require 'logstash/environment'\n" + // this is needed to register "pipeline.system" setting
"require 'logstash/settings'\n" +
"LogStash::SETTINGS");;
private PipelineConfig sut;
private SourceWithMetadata[] orderedConfigParts;
public static final String PIPELINE_CONFIG_PART_2 =
"output {\n" +
" stdout\n" +
"}";
public static final String PIPELINE_CONFIG_PART_1 =
"input {\n" +
" generator1\n" +
"}";
@Before
public void setUp() throws IncompleteSourceWithMetadataException {
source = RubyUtil.RUBY.getClass("LogStash::Config::Source::Local");
pipelineIdSym = RubySymbol.newSymbol(RubyUtil.RUBY, PIPELINE_ID);
orderedConfigParts = new SourceWithMetadata[]{
new SourceWithMetadata("file", "/tmp/1", 0, 0, "input { generator1 }"),
new SourceWithMetadata("file", "/tmp/2", 0, 0, "input { generator2 }"),
new SourceWithMetadata("file", "/tmp/3", 0, 0, "input { generator3 }"),
new SourceWithMetadata("file", "/tmp/4", 0, 0, "input { generator4 }"),
new SourceWithMetadata("file", "/tmp/5", 0, 0, "input { generator5 }"),
new SourceWithMetadata("file", "/tmp/6", 0, 0, "input { generator6 }"),
new SourceWithMetadata("string", "config_string", 0, 0, "input { generator1 }"),
};
configMerged = Arrays.stream(orderedConfigParts).map(SourceWithMetadata::getText).collect(Collectors.joining("\n"));
List<SourceWithMetadata> unorderedList = Arrays.asList(orderedConfigParts);
Collections.shuffle(unorderedList);
unorderedConfigParts = unorderedList.toArray(new SourceWithMetadata[0]);
sut = new PipelineConfig(source, pipelineIdSym, toRubyArray(unorderedConfigParts), SETTINGS);
}
@Test
public void testReturnsTheSource() {
assertEquals("returns the source", source, sut.getSource());
assertEquals("returns the pipeline id", PIPELINE_ID, sut.getPipelineId());
assertNotNull("returns the config_hash", sut.configHash());
assertEquals("returns the merged `ConfigPart#config_string`", configMerged, sut.configString());
assertTrue("records when the config was read", sut.getReadAt().isBefore(LocalDateTime.now()));
}
@SuppressWarnings("rawtypes")
private static RubyArray toRubyArray(SourceWithMetadata[] arr) {
List<IRubyObject> wrappedContent = Arrays.stream(arr).map(RubyUtil::toRubyObject).collect(Collectors.toList());
return RubyArray.newArray(RubyUtil.RUBY, wrappedContent);
}
@Test
public void testObjectEqualityOnConfigHashAndPipelineId() {
PipelineConfig anotherExactPipeline = new PipelineConfig(source, pipelineIdSym, toRubyArray(orderedConfigParts), SETTINGS);
assertEquals(anotherExactPipeline, sut);
PipelineConfig notMatchingPipeline = new PipelineConfig(source, pipelineIdSym, RubyArray.newEmptyArray(RubyUtil.RUBY), SETTINGS);
assertNotEquals(notMatchingPipeline, sut);
PipelineConfig notSamePipelineId = new PipelineConfig(source, RubySymbol.newSymbol(RubyUtil.RUBY, "another_pipeline"), toRubyArray(unorderedConfigParts), SETTINGS);
assertNotEquals(notSamePipelineId, sut);
}
@Test
public void testIsSystemWhenPipelineIsNotSystemPipeline() {
assertFalse("returns false if the pipeline is not a system pipeline", sut.isSystem());
}
@Test
public void testIsSystemWhenPipelineIsSystemPipeline() {
RubyObject mockedSettings = mockSettings(Collections.singletonMap("pipeline.system", true));
sut = new PipelineConfig(source, pipelineIdSym, toRubyArray(unorderedConfigParts), mockedSettings);
assertTrue("returns true if the pipeline is a system pipeline", sut.isSystem());
}
public RubyObject mockSettings(Map<String, Object> settingsValues) {
IRubyObject settings = SETTINGS.callMethod("clone");
settingsValues.forEach((k, v) -> {
RubyString rk = RubyString.newString(RubyUtil.RUBY, k);
IRubyObject rv = RubyUtil.toRubyObject(v);
settings.callMethod(RubyUtil.RUBY.getCurrentContext(), "set", new IRubyObject[]{rk, rv});
});
return (RubyObject) settings;
}
@Test
public void testSourceAndLineRemapping_pipelineDefinedInSingleFileOneLine() throws IncompleteSourceWithMetadataException {
String oneLinerPipeline = "input { generator1 }";
final SourceWithMetadata swm = new SourceWithMetadata("file", "/tmp/1", 0, 0, oneLinerPipeline);
sut = new PipelineConfig(source, pipelineIdSym, toRubyArray(new SourceWithMetadata[]{swm}), SETTINGS);
assertEquals("return the same line of the queried", 1, (int) sut.lookupSource(1, 0).getLine());
}
@Test
public void testSourceAndLineRemapping_pipelineDefinedInSingleFileMultiLine() throws IncompleteSourceWithMetadataException {
final SourceWithMetadata swm = new SourceWithMetadata("file", "/tmp/1", 0, 0, PIPELINE_CONFIG_PART_1);
sut = new PipelineConfig(source, pipelineIdSym, toRubyArray(new SourceWithMetadata[]{swm}), SETTINGS);
assertEquals("return the same line of the queried L1", 1, (int) sut.lookupSource(1, 0).getLine());
assertEquals("return the same line of the queried L2", 2, (int) sut.lookupSource(2, 0).getLine());
}
@Test(expected = IllegalArgumentException.class)
public void testSourceAndLineRemapping_pipelineDefinedInSingleFileMultiLine_dontmatch() throws IncompleteSourceWithMetadataException {
final SourceWithMetadata swm = new SourceWithMetadata("file", "/tmp/1", 0, 0, PIPELINE_CONFIG_PART_1);
sut = new PipelineConfig(source, pipelineIdSym, toRubyArray(new SourceWithMetadata[]{swm}), SETTINGS);
sut.lookupSource(100, -1);
}
@Test
public void testSourceAndLineRemapping_pipelineDefinedMInMultipleFiles() throws IncompleteSourceWithMetadataException {
final SourceWithMetadata[] parts = {
new SourceWithMetadata("file", "/tmp/input", 0, 0, PIPELINE_CONFIG_PART_1),
new SourceWithMetadata("file", "/tmp/output", 0, 0, PIPELINE_CONFIG_PART_2)
};
sut = new PipelineConfig(source, pipelineIdSym, toRubyArray(parts), SETTINGS);
assertEquals("return the line of first segment", 2, (int) sut.lookupSource(2, 0).getLine());
assertEquals("return the id of first segment", "/tmp/input", sut.lookupSource(2, 0).getId());
assertEquals("return the line of second segment", 1, (int) sut.lookupSource(4, 0).getLine());
assertEquals("return the id of second segment", "/tmp/output", sut.lookupSource(4, 0).getId());
}
@Test(expected = IllegalArgumentException.class)
public void testSourceAndLineRemapping_pipelineDefinedMInMultipleFiles_dontmatch() throws IncompleteSourceWithMetadataException {
final SourceWithMetadata[] parts = {
new SourceWithMetadata("file", "/tmp/input", 0, 0, PIPELINE_CONFIG_PART_1),
new SourceWithMetadata("file", "/tmp/output", 0, 0, PIPELINE_CONFIG_PART_2)
};
sut = new PipelineConfig(source, pipelineIdSym, toRubyArray(parts), SETTINGS);
sut.lookupSource(100, 0);
}
@Test
public void testSourceAndLineRemapping_pipelineDefinedMInMultipleFiles_withEmptyLinesInTheMiddle() throws IncompleteSourceWithMetadataException {
final SourceWithMetadata[] parts = {
new SourceWithMetadata("file", "/tmp/input", 0, 0, PIPELINE_CONFIG_PART_1 + "\n"),
new SourceWithMetadata("file", "/tmp/output", 0, 0, PIPELINE_CONFIG_PART_2)
};
sut = new PipelineConfig(source, pipelineIdSym, toRubyArray(parts), SETTINGS);
assertEquals("shouldn't slide the line mapping of subsequent", 1, (int) sut.lookupSource(4, 0).getLine());
assertEquals("shouldn't slide the id mapping of subsequent", "/tmp/output", sut.lookupSource(4, 0).getId());
}
}

View file

@ -21,14 +21,11 @@
package org.logstash.plugins;
import co.elastic.logstash.api.*;
import org.jruby.RubyArray;
import org.jruby.RubyHash;
import org.jruby.RubyString;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.builtin.IRubyObject;
import org.junit.Test;
import org.logstash.RubyUtil;
import org.logstash.common.IncompleteSourceWithMetadataException;
import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.ConfigCompiler;
import org.logstash.config.ir.InvalidIRException;
@ -109,12 +106,8 @@ public final class PluginFactoryExtTest extends RubyEnvTestCase {
assertEquals("Resolved config setting MUST be evaluated with substitution", envVars.get("CUSTOM"), id.toString());
}
@SuppressWarnings("rawtypes")
private static PipelineIR compilePipeline(SourceWithMetadata sourceWithMetadata) throws InvalidIRException {
RubyArray sourcesWithMetadata = RubyUtil.RUBY.newArray(JavaUtil.convertJavaToRuby(
RubyUtil.RUBY, sourceWithMetadata));
return ConfigCompiler.configToPipelineIR(sourcesWithMetadata, false);
return ConfigCompiler.configToPipelineIR(Collections.singletonList(sourceWithMetadata), false);
}
private static PluginFactoryExt.ExecutionContext createExecutionContextFactory() {

View file

@ -2,7 +2,6 @@
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
require "logstash/config/pipeline_config"
require "logstash/config/source/base"
require "logstash/config/source_loader"
require "logstash/outputs/elasticsearch"
@ -112,7 +111,7 @@ module LogStash
end
end
LogStash::Config::PipelineConfig.new(self.class.name, pipeline_id.to_sym, config_part, settings)
Java::OrgLogstashConfigIr::PipelineConfig.new(self.class, pipeline_id.to_sym, [config_part], settings)
end
# This is a bit of a hack until we refactor the ElasticSearch plugins

View file

@ -4,7 +4,6 @@
require "logstash/agent"
require "monitoring/internal_pipeline_source"
require "logstash/config/pipeline_config"
require 'helpers/elasticsearch_options'
module LogStash
@ -181,7 +180,7 @@ module LogStash
logger.debug("compiled metrics pipeline config: ", :config => config)
config_part = org.logstash.common.SourceWithMetadata.new("x-pack-metrics", "internal_pipeline_source", config)
LogStash::Config::PipelineConfig.new(self, PIPELINE_ID.to_sym, config_part, settings)
Java::OrgLogstashConfigIr::PipelineConfig.new(self.class, PIPELINE_ID.to_sym, [config_part], settings)
end
def generate_pipeline_config(settings)

View file

@ -268,7 +268,7 @@ describe LogStash::ConfigManagement::ElasticsearchSource do
pipeline_config = subject.pipeline_configs
expect(pipeline_config.first.config_string).to match(config)
expect(pipeline_config.first.pipeline_id).to eq(pipeline_id.to_sym)
expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym)
end
it "ignores non-whitelisted and invalid settings" do
@ -295,7 +295,7 @@ describe LogStash::ConfigManagement::ElasticsearchSource do
pipeline_config = subject.pipeline_configs
expect(pipeline_config.first.config_string).to match(config)
expect(pipeline_config.first.pipeline_id).to eq(pipeline_id.to_sym)
expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym)
end
end
@ -397,7 +397,7 @@ describe LogStash::ConfigManagement::ElasticsearchSource do
pipeline_config = subject.pipeline_configs
expect(pipeline_config.first.config_string).to match(config)
expect(pipeline_config.first.pipeline_id).to eq(pipeline_id.to_sym)
expect(pipeline_config.first.pipeline_id.to_sym).to eq(pipeline_id.to_sym)
end
end
end
@ -433,7 +433,7 @@ describe LogStash::ConfigManagement::ElasticsearchSource do
pipeline_config = subject.pipeline_configs
expect(pipeline_config.collect(&:config_string)).to include(*pipelines.values)
expect(pipeline_config.collect(&:pipeline_id)).to include(*pipelines.keys.collect(&:to_sym))
expect(pipeline_config.map(&:pipeline_id).collect(&:to_sym)).to include(*pipelines.keys.collect(&:to_sym))
end
end
end

View file

@ -6,9 +6,11 @@ require 'x-pack/logstash_registry'
require 'logstash/devutils/rspec/spec_helper'
require 'logstash/json'
require 'filters/azure_event'
require 'logstash/config/pipeline_config'
describe LogStash::Filters::AzureEvent do
describe "Parses the admin activity log" do
let(:config) do
<<-CONFIG

View file

@ -14,7 +14,7 @@ describe LogStash::Inputs::Metrics::StateEventFactory do
let(:config) {
config_part = org.logstash.common.SourceWithMetadata.new("local", "...", 0, 0, "input { dummyblockinginput { } } output { null { } }")
LogStash::Config::PipelineConfig.new("DummySource", "fake_main", [config_part], LogStash::SETTINGS)
Java::OrgLogstashConfigIr::PipelineConfig.new("DummySource".class, "fake_main".to_sym, [config_part], LogStash::SETTINGS)
}
let(:pipeline_settings) { LogStash::Runner::SYSTEM_SETTINGS.clone.merge({

View file

@ -3,6 +3,7 @@
# you may not use this file except in compliance with the Elastic License.
require "monitoring/inputs/metrics/stats_event_factory"
require "logstash/config/pipeline_config"
require 'json'
describe LogStash::Inputs::Metrics::StatsEventFactory do

View file

@ -6,7 +6,6 @@ require "logstash-core"
require "logstash/agent"
require "logstash/runner"
require "monitoring/inputs/metrics"
require "logstash/config/pipeline_config"
require "logstash/config/source/local"
require 'license_checker/x_pack_info'
require "rspec/wait"
@ -53,7 +52,7 @@ describe LogStash::Monitoring::InternalPipelineSource do
let(:unordered_config_parts) { ordered_config_parts.shuffle }
let(:pipeline_config) { LogStash::Config::PipelineConfig.new(source, pipeline_id, unordered_config_parts, system_settings) }
let(:pipeline_config) { Java::OrgLogstashConfigIr::PipelineConfig.new(source, pipeline_id, unordered_config_parts, system_settings) }
let(:es_options) do
{