diff --git a/logstash-core/src/main/java/org/logstash/Logstash.java b/logstash-core/src/main/java/org/logstash/Logstash.java index 4acf602db..7d387bfd5 100644 --- a/logstash-core/src/main/java/org/logstash/Logstash.java +++ b/logstash-core/src/main/java/org/logstash/Logstash.java @@ -20,17 +20,21 @@ package org.logstash; +import java.io.IOError; import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; import java.nio.file.Path; import java.nio.file.Paths; +import java.security.AccessController; +import java.security.PrivilegedAction; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jruby.Ruby; import org.jruby.RubyException; import org.jruby.RubyInstanceConfig; -import org.jruby.RubyNumeric; +import org.jruby.RubySystemExit; import org.jruby.exceptions.RaiseException; import org.jruby.runtime.builtin.IRubyObject; @@ -59,6 +63,7 @@ public final class Logstash implements Runnable, AutoCloseable { ); } configureNashornDeprecationSwitchForJavaAbove11(); + installGlobalUncaughtExceptionHandler(); final Path home = Paths.get(lsHome).toAbsolutePath(); try ( @@ -66,20 +71,25 @@ public final class Logstash implements Runnable, AutoCloseable { ) { logstash.run(); } catch (final IllegalStateException e) { - String errorMessage[] = null; - if (e.getMessage().contains("Could not load FFI Provider")) { - errorMessage = new String[]{ - "\nError accessing temp directory: " + System.getProperty("java.io.tmpdir"), - "This often occurs because the temp directory has been mounted with NOEXEC or", - "the Logstash user has insufficient permissions on the directory. Possible", - "workarounds include setting the -Djava.io.tmpdir property in the jvm.options", - "file to an alternate directory or correcting the Logstash user's permissions." - }; + Throwable t = e; + String message = e.getMessage(); + if (message != null) { + if (message.startsWith(UNCLEAN_SHUTDOWN_PREFIX)) { + t = e.getCause(); // be less verbose with uncleanShutdown's wrapping exception + } else if (message.contains("Could not load FFI Provider")) { + message = + "Error accessing temp directory: " + System.getProperty("java.io.tmpdir") + + " this often occurs because the temp directory has been mounted with NOEXEC or" + + " the Logstash user has insufficient permissions on the directory. \n" + + "Possible workarounds include setting the -Djava.io.tmpdir property in the jvm.options" + + "file to an alternate directory or correcting the Logstash user's permissions."; + } } - handleCriticalError(e, errorMessage); + handleFatalError(message, t); } catch (final Throwable t) { - handleCriticalError(t, null); + handleFatalError("", t); } + System.exit(0); } @@ -92,16 +102,59 @@ public final class Logstash implements Runnable, AutoCloseable { } } - private static void handleCriticalError(Throwable t, String[] errorMessage) { - LOGGER.error(t); - if (errorMessage != null) { - for (String err : errorMessage) { - System.err.println(err); + private static void installGlobalUncaughtExceptionHandler() { + Thread.setDefaultUncaughtExceptionHandler((thread, e) -> { + if (e instanceof Error) { + handleFatalError("uncaught error (in thread " + thread.getName() + ")", e); + } else { + LOGGER.error("uncaught exception (in thread " + thread.getName() + ")", e); } + }); + } + + private static void handleFatalError(String message, Throwable t) { + LOGGER.fatal(message, t); + + if (t instanceof InternalError) { + halt(128); + } else if (t instanceof OutOfMemoryError) { + halt(127); + } else if (t instanceof StackOverflowError) { + halt(126); + } else if (t instanceof UnknownError) { + halt(125); + } else if (t instanceof IOError) { + halt(124); + } else if (t instanceof LinkageError) { + halt(123); + } else if (t instanceof Error) { + halt(120); } + System.exit(1); } + private static void halt(final int status) { + AccessController.doPrivileged(new PrivilegedHaltAction(status)); + } + + private static class PrivilegedHaltAction implements PrivilegedAction { + + private final int status; + + private PrivilegedHaltAction(final int status) { + this.status = status; + } + + @Override + public Void run() { + // we halt to prevent shutdown hooks from running + Runtime.getRuntime().halt(status); + return null; + } + + } + /** * Ctor. * @param home Logstash Root Directory @@ -132,11 +185,10 @@ public final class Logstash implements Runnable, AutoCloseable { Thread.currentThread().setContextClassLoader(ruby.getJRubyClassLoader()); ruby.runFromMain(script, config.displayedFileName()); } catch (final RaiseException ex) { - final RubyException rexep = ex.getException(); - if (ruby.getSystemExit().isInstance(rexep)) { - final IRubyObject status = - rexep.callMethod(ruby.getCurrentContext(), "status"); - if (status != null && !status.isNil() && RubyNumeric.fix2int(status) != 0) { + final RubyException re = ex.getException(); + if (re instanceof RubySystemExit) { + IRubyObject success = ((RubySystemExit) re).success_p(); + if (!success.isTrue()) { uncleanShutdown(ex); } } else { @@ -190,7 +242,10 @@ public final class Logstash implements Runnable, AutoCloseable { return resolved.toString(); } + private static final String UNCLEAN_SHUTDOWN_PREFIX = "Logstash stopped processing because of an error: "; + private static void uncleanShutdown(final Exception ex) { - throw new IllegalStateException("Logstash stopped processing because of an error: " + ex.getMessage(), ex); + throw new IllegalStateException(UNCLEAN_SHUTDOWN_PREFIX + ex.getMessage(), ex); } + } diff --git a/qa/integration/fixtures/fatal_error_spec.yml b/qa/integration/fixtures/fatal_error_spec.yml new file mode 100644 index 000000000..cbfc784af --- /dev/null +++ b/qa/integration/fixtures/fatal_error_spec.yml @@ -0,0 +1,3 @@ +--- +services: + - logstash diff --git a/qa/integration/specs/fatal_error_spec.rb b/qa/integration/specs/fatal_error_spec.rb new file mode 100644 index 000000000..910f921a4 --- /dev/null +++ b/qa/integration/specs/fatal_error_spec.rb @@ -0,0 +1,74 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require_relative '../framework/fixture' +require_relative '../framework/helpers' +require_relative '../framework/settings' +require_relative '../services/logstash_service' +require "logstash/devutils/rspec/spec_helper" + +describe "uncaught exception" do + + before(:all) do + @fixture = Fixture.new(__FILE__) + @logstash = @fixture.get_service("logstash") + end + + after(:each) { @logstash.teardown } + + let(:timeout) { 90 } # seconds + let(:temp_dir) { Stud::Temporary.directory("logstash-error-test") } + + it "halts LS on fatal error" do + config = "input { generator { count => 1 message => 'a fatal error' } } " + # inline Ruby filter seems to catch everything (including java.lang.Error) so we exercise a thread throwing + config += "filter { ruby { code => 'Thread.start { raise java.lang.AssertionError.new event.get(\"message\") }' } }" + + spawn_logstash_and_wait_for_exit! config, timeout + + expect(@logstash.exit_code).to be 120 + + log_file = "#{temp_dir}/logstash-plain.log" + expect( File.exists?(log_file) ).to be true + expect( File.read(log_file) ).to match /\[FATAL\]\[org.logstash.Logstash.*?java.lang.AssertionError: a fatal error/m + end + + it "logs unexpected exception (from Java thread)" do + config = "input { generator { count => 1 message => 'unexpected' } } " + config += "filter { ruby { code => 'java.lang.Thread.new { raise java.io.EOFException.new event.get(\"message\") }.start; sleep(1.5)' } }" + + spawn_logstash_and_wait_for_exit! config, timeout + + expect(@logstash.exit_code).to be 0 # normal exit + + log_file = "#{temp_dir}/logstash-plain.log" + expect( File.exists?(log_file) ).to be true + expect( File.read(log_file) ).to match /\[ERROR\]\[org.logstash.Logstash.*?uncaught exception \(in thread .*?java.io.EOFException: unexpected/m + end + + def spawn_logstash_and_wait_for_exit!(config, timeout) + @logstash.spawn_logstash('-w', '1', '--path.logs', temp_dir, '-e', config) + + time = Time.now + while (Time.now - time) < timeout + sleep(0.1) + break if @logstash.exited? + end + raise 'LS process did not exit!' unless @logstash.exited? + end + +end