mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Added origins of pipeline's configuration (es config string, the paths of config files used, module).
closes 9630 Fixes #11130
This commit is contained in:
parent
652e1b70ad
commit
1cbaeebb4d
6 changed files with 137 additions and 55 deletions
|
@ -109,7 +109,11 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
||||||
@finished_run.make_true
|
@finished_run.make_true
|
||||||
rescue => e
|
rescue => e
|
||||||
close
|
close
|
||||||
logger.error("Pipeline aborted due to error", default_logging_keys(:exception => e, :backtrace => e.backtrace))
|
pipeline_log_params = default_logging_keys(
|
||||||
|
:exception => e,
|
||||||
|
:backtrace => e.backtrace,
|
||||||
|
"pipeline.sources" => pipeline_source_details)
|
||||||
|
logger.error("Pipeline aborted due to error", pipeline_log_params)
|
||||||
ensure
|
ensure
|
||||||
@finished_execution.make_true
|
@finished_execution.make_true
|
||||||
end
|
end
|
||||||
|
@ -225,11 +229,14 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
||||||
config_metric.gauge(:graph, ::LogStash::Config::LIRSerializer.serialize(lir))
|
config_metric.gauge(:graph, ::LogStash::Config::LIRSerializer.serialize(lir))
|
||||||
config_metric.gauge(:cluster_uuids, resolve_cluster_uuids)
|
config_metric.gauge(:cluster_uuids, resolve_cluster_uuids)
|
||||||
|
|
||||||
@logger.info("Starting pipeline", default_logging_keys(
|
pipeline_log_params = default_logging_keys(
|
||||||
"pipeline.workers" => pipeline_workers,
|
"pipeline.workers" => pipeline_workers,
|
||||||
"pipeline.batch.size" => batch_size,
|
"pipeline.batch.size" => batch_size,
|
||||||
"pipeline.batch.delay" => batch_delay,
|
"pipeline.batch.delay" => batch_delay,
|
||||||
"pipeline.max_inflight" => max_inflight))
|
"pipeline.max_inflight" => max_inflight,
|
||||||
|
"pipeline.sources" => pipeline_source_details)
|
||||||
|
@logger.info("Starting pipeline", pipeline_log_params)
|
||||||
|
|
||||||
if max_inflight > MAX_INFLIGHT_WARN_THRESHOLD
|
if max_inflight > MAX_INFLIGHT_WARN_THRESHOLD
|
||||||
@logger.warn("CAUTION: Recommended inflight events max exceeded! Logstash will run with up to #{max_inflight} events in memory in your current configuration. If your message sizes are large this may cause instability with the default heap size. Please consider setting a non-standard heap size, changing the batch size (currently #{batch_size}), or changing the number of pipeline workers (currently #{pipeline_workers})", default_logging_keys)
|
@logger.warn("CAUTION: Recommended inflight events max exceeded! Logstash will run with up to #{max_inflight} events in memory in your current configuration. If your message sizes are large this may cause instability with the default heap size. Please consider setting a non-standard heap size, changing the batch size (currently #{batch_size}), or changing the number of pipeline workers (currently #{pipeline_workers})", default_logging_keys)
|
||||||
end
|
end
|
||||||
|
|
|
@ -164,10 +164,12 @@ module LogStash; class Pipeline < BasePipeline
|
||||||
collect_stats
|
collect_stats
|
||||||
collect_dlq_stats
|
collect_dlq_stats
|
||||||
|
|
||||||
@logger.info("Starting pipeline", default_logging_keys(
|
pipeline_log_params = default_logging_keys(
|
||||||
"pipeline.workers" => settings.get("pipeline.workers"),
|
"pipeline.workers" => settings.get("pipeline.workers"),
|
||||||
"pipeline.batch.size" => settings.get("pipeline.batch.size"),
|
"pipeline.batch.size" => settings.get("pipeline.batch.size"),
|
||||||
"pipeline.batch.delay" => settings.get("pipeline.batch.delay")))
|
"pipeline.batch.delay" => settings.get("pipeline.batch.delay"),
|
||||||
|
"pipeline.sources" => pipeline_source_details)
|
||||||
|
@logger.info("Starting pipeline", pipeline_log_params)
|
||||||
|
|
||||||
@finished_execution.make_false
|
@finished_execution.make_false
|
||||||
@finished_run.make_false
|
@finished_run.make_false
|
||||||
|
@ -180,7 +182,11 @@ module LogStash; class Pipeline < BasePipeline
|
||||||
@finished_run.make_true
|
@finished_run.make_true
|
||||||
rescue => e
|
rescue => e
|
||||||
close
|
close
|
||||||
@logger.error("Pipeline aborted due to error", default_logging_keys(:exception => e, :backtrace => e.backtrace))
|
pipeline_log_params = default_logging_keys(
|
||||||
|
:exception => e,
|
||||||
|
:backtrace => e.backtrace,
|
||||||
|
"pipeline.sources" => pipeline_source_details)
|
||||||
|
@logger.error("Pipeline aborted due to error", pipeline_log_params)
|
||||||
ensure
|
ensure
|
||||||
@finished_execution.make_true
|
@finished_execution.make_true
|
||||||
end
|
end
|
||||||
|
|
|
@ -6,7 +6,9 @@ import java.nio.file.Files;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.security.MessageDigest;
|
import java.security.MessageDigest;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import org.apache.commons.codec.binary.Hex;
|
import org.apache.commons.codec.binary.Hex;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
@ -29,6 +31,7 @@ import org.logstash.ackedqueue.ext.JRubyAckedQueueExt;
|
||||||
import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
|
import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
|
||||||
import org.logstash.common.DeadLetterQueueFactory;
|
import org.logstash.common.DeadLetterQueueFactory;
|
||||||
import org.logstash.common.IncompleteSourceWithMetadataException;
|
import org.logstash.common.IncompleteSourceWithMetadataException;
|
||||||
|
import org.logstash.common.SourceWithMetadata;
|
||||||
import org.logstash.config.ir.ConfigCompiler;
|
import org.logstash.config.ir.ConfigCompiler;
|
||||||
import org.logstash.config.ir.PipelineIR;
|
import org.logstash.config.ir.PipelineIR;
|
||||||
import org.logstash.ext.JRubyAbstractQueueWriteClientExt;
|
import org.logstash.ext.JRubyAbstractQueueWriteClientExt;
|
||||||
|
@ -364,6 +367,35 @@ public class AbstractPipelineExt extends RubyBasicObject {
|
||||||
.initialize(inputQueueClient, pipelineId.asJavaString(), metric, pluginId);
|
.initialize(inputQueueClient, pipelineId.asJavaString(), metric, pluginId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JRubyMethod(name = "pipeline_source_details", visibility = Visibility.PROTECTED)
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
public RubyArray getPipelineSourceDetails(final ThreadContext context) {
|
||||||
|
RubyArray res = (RubyArray) pipelineSettings.callMethod(context, "config_parts");
|
||||||
|
List<RubyString> pipelineSources = new ArrayList<>(res.size());
|
||||||
|
for (IRubyObject part : res.toJavaArray()) {
|
||||||
|
SourceWithMetadata sourceWithMetadata = part.toJava(SourceWithMetadata.class);
|
||||||
|
String protocol = sourceWithMetadata.getProtocol();
|
||||||
|
switch (protocol) {
|
||||||
|
case "string":
|
||||||
|
pipelineSources.add(RubyString.newString(context.runtime, "config string"));
|
||||||
|
break;
|
||||||
|
case "file":
|
||||||
|
pipelineSources.add(RubyString.newString(context.runtime, sourceWithMetadata.getId()));
|
||||||
|
break;
|
||||||
|
case "x-pack-metrics":
|
||||||
|
pipelineSources.add(RubyString.newString(context.runtime, "monitoring pipeline"));
|
||||||
|
break;
|
||||||
|
case "x-pack-config-management":
|
||||||
|
pipelineSources.add(RubyString.newString(context.runtime, "central pipeline management"));
|
||||||
|
break;
|
||||||
|
case "module":
|
||||||
|
pipelineSources.add(RubyString.newString(context.runtime, "module"));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return RubyArray.newArray(context.runtime, pipelineSources);
|
||||||
|
}
|
||||||
|
|
||||||
protected final IRubyObject getSetting(final ThreadContext context, final String name) {
|
protected final IRubyObject getSetting(final ThreadContext context, final String name) {
|
||||||
return settings.callMethod(context, "get_value", context.runtime.newString(name));
|
return settings.callMethod(context, "get_value", context.runtime.newString(name));
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,47 +0,0 @@
|
||||||
require_relative '../framework/fixture'
|
|
||||||
require_relative '../framework/settings'
|
|
||||||
require_relative '../services/logstash_service'
|
|
||||||
require_relative '../framework/helpers'
|
|
||||||
require "logstash/devutils/rspec/spec_helper"
|
|
||||||
require "yaml"
|
|
||||||
|
|
||||||
describe "Test Logstash Pipeline id" do
|
|
||||||
before(:all) {
|
|
||||||
@fixture = Fixture.new(__FILE__)
|
|
||||||
# used in multiple LS tests
|
|
||||||
@ls = @fixture.get_service("logstash")
|
|
||||||
}
|
|
||||||
|
|
||||||
after(:all) {
|
|
||||||
@fixture.teardown
|
|
||||||
}
|
|
||||||
|
|
||||||
before(:each) {
|
|
||||||
# backup the application settings file -- logstash.yml
|
|
||||||
FileUtils.cp(@ls.application_settings_file, "#{@ls.application_settings_file}.original")
|
|
||||||
}
|
|
||||||
|
|
||||||
after(:each) {
|
|
||||||
@ls.teardown
|
|
||||||
# restore the application settings file -- logstash.yml
|
|
||||||
FileUtils.mv("#{@ls.application_settings_file}.original", @ls.application_settings_file)
|
|
||||||
}
|
|
||||||
|
|
||||||
let(:temp_dir) { Stud::Temporary.directory("logstash-pipelinelog-test") }
|
|
||||||
let(:config) { @fixture.config("root") }
|
|
||||||
|
|
||||||
it "should write logs with pipeline.id" do
|
|
||||||
pipeline_name = "custom_pipeline"
|
|
||||||
settings = {
|
|
||||||
"path.logs" => temp_dir,
|
|
||||||
"pipeline.id" => pipeline_name
|
|
||||||
}
|
|
||||||
IO.write(@ls.application_settings_file, settings.to_yaml)
|
|
||||||
@ls.spawn_logstash("-w", "1" , "-e", config)
|
|
||||||
@ls.wait_for_logstash
|
|
||||||
sleep 2 until @ls.exited?
|
|
||||||
plainlog_file = "#{temp_dir}/logstash-plain.log"
|
|
||||||
expect(File.exists?(plainlog_file)).to be true
|
|
||||||
expect(IO.read(plainlog_file) =~ /\[logstash.javapipeline\s*\]\[#{pipeline_name}\]/).to be > 0
|
|
||||||
end
|
|
||||||
end
|
|
84
qa/integration/specs/pipeline_log_spec.rb
Normal file
84
qa/integration/specs/pipeline_log_spec.rb
Normal file
|
@ -0,0 +1,84 @@
|
||||||
|
require_relative '../framework/fixture'
|
||||||
|
require_relative '../framework/settings'
|
||||||
|
require_relative '../services/logstash_service'
|
||||||
|
require_relative '../framework/helpers'
|
||||||
|
require "logstash/devutils/rspec/spec_helper"
|
||||||
|
require "yaml"
|
||||||
|
|
||||||
|
describe "Test Logstash Pipeline id" do
|
||||||
|
before(:all) {
|
||||||
|
@fixture = Fixture.new(__FILE__)
|
||||||
|
# used in multiple LS tests
|
||||||
|
@ls = @fixture.get_service("logstash")
|
||||||
|
}
|
||||||
|
|
||||||
|
after(:all) {
|
||||||
|
@fixture.teardown
|
||||||
|
}
|
||||||
|
|
||||||
|
before(:each) {
|
||||||
|
# backup the application settings file -- logstash.yml
|
||||||
|
FileUtils.cp(@ls.application_settings_file, "#{@ls.application_settings_file}.original")
|
||||||
|
}
|
||||||
|
|
||||||
|
after(:each) {
|
||||||
|
@ls.teardown
|
||||||
|
# restore the application settings file -- logstash.yml
|
||||||
|
FileUtils.mv("#{@ls.application_settings_file}.original", @ls.application_settings_file)
|
||||||
|
}
|
||||||
|
|
||||||
|
let(:temp_dir) { Stud::Temporary.directory("logstash-pipelinelog-test") }
|
||||||
|
let(:config) { @fixture.config("root") }
|
||||||
|
let(:initial_config_file) { config_to_temp_file(@fixture.config("root")) }
|
||||||
|
|
||||||
|
it "should write logs with pipeline.id" do
|
||||||
|
pipeline_name = "custom_pipeline"
|
||||||
|
settings = {
|
||||||
|
"path.logs" => temp_dir,
|
||||||
|
"pipeline.id" => pipeline_name
|
||||||
|
}
|
||||||
|
IO.write(@ls.application_settings_file, settings.to_yaml)
|
||||||
|
@ls.spawn_logstash("-w", "1" , "-e", config)
|
||||||
|
wait_logstash_process_terminate()
|
||||||
|
plainlog_file = "#{temp_dir}/logstash-plain.log"
|
||||||
|
expect(File.exists?(plainlog_file)).to be true
|
||||||
|
expect(IO.read(plainlog_file) =~ /\[logstash.javapipeline\s*\]\[#{pipeline_name}\]/).to be > 0
|
||||||
|
end
|
||||||
|
|
||||||
|
it "write pipeline config in logs - source:config string" do
|
||||||
|
pipeline_name = "custom_pipeline"
|
||||||
|
settings = {
|
||||||
|
"path.logs" => temp_dir,
|
||||||
|
"pipeline.id" => pipeline_name
|
||||||
|
}
|
||||||
|
IO.write(@ls.application_settings_file, settings.to_yaml)
|
||||||
|
@ls.spawn_logstash("-w", "1" , "-e", config)
|
||||||
|
wait_logstash_process_terminate()
|
||||||
|
plainlog_file = "#{temp_dir}/logstash-plain.log"
|
||||||
|
expect(File.exists?(plainlog_file)).to be true
|
||||||
|
expect(IO.read(plainlog_file) =~ /Starting pipeline.*"pipeline.sources"=>\["config string"\]/).to be > 0
|
||||||
|
end
|
||||||
|
|
||||||
|
it "write pipeline config in logs - source:config file" do
|
||||||
|
pipeline_name = "custom_pipeline"
|
||||||
|
settings = {
|
||||||
|
"path.logs" => temp_dir,
|
||||||
|
"pipeline.id" => pipeline_name
|
||||||
|
}
|
||||||
|
IO.write(@ls.application_settings_file, settings.to_yaml)
|
||||||
|
@ls.spawn_logstash("-w", "1", "-f", "#{initial_config_file}")
|
||||||
|
wait_logstash_process_terminate()
|
||||||
|
plainlog_file = "#{temp_dir}/logstash-plain.log"
|
||||||
|
expect(File.exists?(plainlog_file)).to be true
|
||||||
|
expect(IO.read(plainlog_file) =~ /Starting pipeline.*"pipeline.sources"=>\["#{initial_config_file}"\]/).to be > 0
|
||||||
|
end
|
||||||
|
|
||||||
|
@private
|
||||||
|
def wait_logstash_process_terminate
|
||||||
|
num_retries = 100
|
||||||
|
try(num_retries) do
|
||||||
|
expect(@ls.exited?).to be(true)
|
||||||
|
end
|
||||||
|
expect(@ls.exit_code).to be(0)
|
||||||
|
end
|
||||||
|
end
|
Loading…
Add table
Add a link
Reference in a new issue