diff --git a/config/jvm.options b/config/jvm.options index ac27467d0..2d743c8b3 100644 --- a/config/jvm.options +++ b/config/jvm.options @@ -76,3 +76,6 @@ # Entropy source for randomness -Djava.security.egd=file:/dev/urandom + +# Copy the logging context from parent threads to children +-Dlog4j2.isThreadContextMapInheritable=true diff --git a/config/log4j2.properties b/config/log4j2.properties index a9eed0a3e..9a6af06e0 100644 --- a/config/log4j2.properties +++ b/config/log4j2.properties @@ -4,7 +4,7 @@ name = LogstashPropertiesConfig appender.console.type = Console appender.console.name = plain_console appender.console.layout.type = PatternLayout -appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %m%n +appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %m%n appender.json_console.type = Console appender.json_console.name = json_console @@ -21,7 +21,7 @@ appender.rolling.policies.time.type = TimeBasedTriggeringPolicy appender.rolling.policies.time.interval = 1 appender.rolling.policies.time.modulate = true appender.rolling.layout.type = PatternLayout -appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %-.10000m%n +appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %-.10000m%n appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size = 100MB appender.rolling.strategy.type = DefaultRolloverStrategy diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index 94f3cabfb..098fc417d 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -89,6 +89,7 @@ class LogStash::Agent def execute @thread = Thread.current # this var is implicitly used by Stud.stop? + LogStash::Util.set_thread_name("Agent thread") logger.debug("Starting agent") transition_to_running @@ -307,7 +308,7 @@ class LogStash::Agent pipeline_actions.map do |action| Thread.new(action, converge_result) do |action, converge_result| - java.lang.Thread.currentThread().setName("Converge #{action}"); + LogStash::Util.set_thread_name("Converge #{action}") # We execute every task we need to converge the current state of pipelines # for every task we will record the action result, that will help us # the results of all the task will determine if the converge was successful or not diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index 1360dd5e0..4616ee792 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -8,6 +8,8 @@ require "logstash/instrument/collector" require "logstash/compiler" require "logstash/config/lir_serializer" +java_import org.apache.logging.log4j.ThreadContext + module LogStash; class JavaPipeline < JavaBasePipeline include LogStash::Util::Loggable attr_reader \ @@ -102,6 +104,7 @@ module LogStash; class JavaPipeline < JavaBasePipeline @thread = Thread.new do begin LogStash::Util.set_thread_name("pipeline.#{pipeline_id}") + ThreadContext.put("pipeline.id", pipeline_id) run @finished_run.make_true rescue => e @@ -236,6 +239,7 @@ module LogStash; class JavaPipeline < JavaBasePipeline pipeline_workers.times do |t| thread = Thread.new do Util.set_thread_name("[#{pipeline_id}]>worker#{t}") + ThreadContext.put("pipeline.id", pipeline_id) org.logstash.execution.WorkerLoop.new( lir_execution, filter_queue_client, @events_filtered, @events_consumed, @flushRequested, @flushing, @shutdownRequested, @drain_queue).run @@ -305,6 +309,7 @@ module LogStash; class JavaPipeline < JavaBasePipeline def inputworker(plugin) Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}") + ThreadContext.put("pipeline.id", pipeline_id) begin plugin.run(wrapped_write_client(plugin.id.to_sym)) rescue => e diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index 82add7120..e9eb3d889 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -12,6 +12,8 @@ require "logstash/instrument/collector" require "logstash/filter_delegator" require "logstash/compiler" +java_import org.apache.logging.log4j.ThreadContext + module LogStash; class BasePipeline < AbstractPipeline include LogStash::Util::Loggable @@ -172,7 +174,8 @@ module LogStash; class Pipeline < BasePipeline @thread = Thread.new do begin - LogStash::Util.set_thread_name("pipeline.#{pipeline_id}") + LogStash::Util.set_thread_name("[#{pipeline_id}]-manager") + ThreadContext.put("pipeline.id", pipeline_id) run @finished_run.make_true rescue => e @@ -300,7 +303,8 @@ module LogStash; class Pipeline < BasePipeline pipeline_workers.times do |t| thread = Thread.new(batch_size, batch_delay, self) do |_b_size, _b_delay, _pipeline| - Util.set_thread_name("[#{pipeline_id}]>worker#{t}") + LogStash::Util::set_thread_name("[#{pipeline_id}]>worker#{t}") + ThreadContext.put("pipeline.id", pipeline_id) _pipeline.worker_loop(_b_size, _b_delay) end @worker_threads << thread @@ -430,6 +434,7 @@ module LogStash; class Pipeline < BasePipeline def inputworker(plugin) Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}") + ThreadContext.put("pipeline.id", pipeline_id) begin plugin.run(wrapped_write_client(plugin.id.to_sym)) rescue => e @@ -535,6 +540,8 @@ module LogStash; class Pipeline < BasePipeline raise "Attempted to start flusher on a stopped pipeline!" if stopped? @flusher_thread = Thread.new do + LogStash::Util.set_thread_name("[#{pipeline_id}]-flusher-thread") + ThreadContext.put("pipeline.id", pipeline_id) while Stud.stoppable_sleep(5, 0.1) { stopped? } flush break if stopped? diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaInputDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaInputDelegatorExt.java index 4ceb3f564..e91367f8c 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaInputDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaInputDelegatorExt.java @@ -60,7 +60,10 @@ public class JavaInputDelegatorExt extends RubyObject { } else { queueWriter = qw; } - Thread t = new Thread(() -> input.start(queueWriter::push)); + Thread t = new Thread(() -> { + org.apache.logging.log4j.ThreadContext.put("pipeline.id", pipeline.pipelineId().toString()); + input.start(queueWriter::push); + }); t.setName(pipeline.pipelineId().asJavaString() + "_" + input.getName() + "_" + input.getId()); t.start(); return JavaObject.wrap(context.getRuntime(), t); diff --git a/qa/integration/fixtures/pipeline_id_log_spec.yml b/qa/integration/fixtures/pipeline_id_log_spec.yml new file mode 100644 index 000000000..661138bdf --- /dev/null +++ b/qa/integration/fixtures/pipeline_id_log_spec.yml @@ -0,0 +1,12 @@ +--- +services: + - logstash +config: |- + input { + generator { + count => 4 + } + } + output { + null {} + } diff --git a/qa/integration/specs/pipeline_id_log_spec.rb b/qa/integration/specs/pipeline_id_log_spec.rb new file mode 100644 index 000000000..b9da009e0 --- /dev/null +++ b/qa/integration/specs/pipeline_id_log_spec.rb @@ -0,0 +1,47 @@ +require_relative '../framework/fixture' +require_relative '../framework/settings' +require_relative '../services/logstash_service' +require_relative '../framework/helpers' +require "logstash/devutils/rspec/spec_helper" +require "yaml" + +describe "Test Logstash Pipeline id" do + before(:all) { + @fixture = Fixture.new(__FILE__) + # used in multiple LS tests + @ls = @fixture.get_service("logstash") + } + + after(:all) { + @fixture.teardown + } + + before(:each) { + # backup the application settings file -- logstash.yml + FileUtils.cp(@ls.application_settings_file, "#{@ls.application_settings_file}.original") + } + + after(:each) { + @ls.teardown + # restore the application settings file -- logstash.yml + FileUtils.mv("#{@ls.application_settings_file}.original", @ls.application_settings_file) + } + + let(:temp_dir) { Stud::Temporary.directory("logstash-pipelinelog-test") } + let(:config) { @fixture.config("root") } + + it "should write logs with pipeline.id" do + pipeline_name = "custom_pipeline" + settings = { + "path.logs" => temp_dir, + "pipeline.id" => pipeline_name + } + IO.write(@ls.application_settings_file, settings.to_yaml) + @ls.spawn_logstash("-w", "1" , "-e", config) + @ls.wait_for_logstash + sleep 2 until @ls.exited? + plainlog_file = "#{temp_dir}/logstash-plain.log" + expect(File.exists?(plainlog_file)).to be true + expect(IO.read(plainlog_file) =~ /\[logstash.javapipeline\s*\]\[#{pipeline_name}\]/).to be > 0 + end +end