Fix: make sure LS exits when running into fatal errors (#12559)

Currently, LS does not respect fatal errors such as java.lang.OutOfMemoryError and continues executing.

This is dangerous since JVM errors are a legitimate reason to halt the process and not continue processing.

Additionally:

-   make sure we log the full stack-trace on fatal errors
-   halt the JVM wout executing finalizers/hook (scissors on how ES handles uncaught exceptions)
-   also, we should now be aware of a potentially unexpectedly dying thread

Back-port of #12470
This commit is contained in:
Karol Bucek 2021-01-13 06:45:16 +01:00 committed by GitHub
parent 98d3ac9d00
commit 53e73e9ccc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 155 additions and 23 deletions

View file

@ -20,17 +20,21 @@
package org.logstash;
import java.io.IOError;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.AccessController;
import java.security.PrivilegedAction;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jruby.Ruby;
import org.jruby.RubyException;
import org.jruby.RubyInstanceConfig;
import org.jruby.RubyNumeric;
import org.jruby.RubySystemExit;
import org.jruby.exceptions.RaiseException;
import org.jruby.runtime.builtin.IRubyObject;
@ -59,6 +63,7 @@ public final class Logstash implements Runnable, AutoCloseable {
);
}
configureNashornDeprecationSwitchForJavaAbove11();
installGlobalUncaughtExceptionHandler();
final Path home = Paths.get(lsHome).toAbsolutePath();
try (
@ -66,20 +71,25 @@ public final class Logstash implements Runnable, AutoCloseable {
) {
logstash.run();
} catch (final IllegalStateException e) {
String errorMessage[] = null;
if (e.getMessage().contains("Could not load FFI Provider")) {
errorMessage = new String[]{
"\nError accessing temp directory: " + System.getProperty("java.io.tmpdir"),
"This often occurs because the temp directory has been mounted with NOEXEC or",
"the Logstash user has insufficient permissions on the directory. Possible",
"workarounds include setting the -Djava.io.tmpdir property in the jvm.options",
"file to an alternate directory or correcting the Logstash user's permissions."
};
Throwable t = e;
String message = e.getMessage();
if (message != null) {
if (message.startsWith(UNCLEAN_SHUTDOWN_PREFIX)) {
t = e.getCause(); // be less verbose with uncleanShutdown's wrapping exception
} else if (message.contains("Could not load FFI Provider")) {
message =
"Error accessing temp directory: " + System.getProperty("java.io.tmpdir") +
" this often occurs because the temp directory has been mounted with NOEXEC or" +
" the Logstash user has insufficient permissions on the directory. \n" +
"Possible workarounds include setting the -Djava.io.tmpdir property in the jvm.options" +
"file to an alternate directory or correcting the Logstash user's permissions.";
}
}
handleCriticalError(e, errorMessage);
handleFatalError(message, t);
} catch (final Throwable t) {
handleCriticalError(t, null);
handleFatalError("", t);
}
System.exit(0);
}
@ -92,16 +102,59 @@ public final class Logstash implements Runnable, AutoCloseable {
}
}
private static void handleCriticalError(Throwable t, String[] errorMessage) {
LOGGER.error(t);
if (errorMessage != null) {
for (String err : errorMessage) {
System.err.println(err);
private static void installGlobalUncaughtExceptionHandler() {
Thread.setDefaultUncaughtExceptionHandler((thread, e) -> {
if (e instanceof Error) {
handleFatalError("uncaught error (in thread " + thread.getName() + ")", e);
} else {
LOGGER.error("uncaught exception (in thread " + thread.getName() + ")", e);
}
});
}
private static void handleFatalError(String message, Throwable t) {
LOGGER.fatal(message, t);
if (t instanceof InternalError) {
halt(128);
} else if (t instanceof OutOfMemoryError) {
halt(127);
} else if (t instanceof StackOverflowError) {
halt(126);
} else if (t instanceof UnknownError) {
halt(125);
} else if (t instanceof IOError) {
halt(124);
} else if (t instanceof LinkageError) {
halt(123);
} else if (t instanceof Error) {
halt(120);
}
System.exit(1);
}
private static void halt(final int status) {
AccessController.doPrivileged(new PrivilegedHaltAction(status));
}
private static class PrivilegedHaltAction implements PrivilegedAction<Void> {
private final int status;
private PrivilegedHaltAction(final int status) {
this.status = status;
}
@Override
public Void run() {
// we halt to prevent shutdown hooks from running
Runtime.getRuntime().halt(status);
return null;
}
}
/**
* Ctor.
* @param home Logstash Root Directory
@ -132,11 +185,10 @@ public final class Logstash implements Runnable, AutoCloseable {
Thread.currentThread().setContextClassLoader(ruby.getJRubyClassLoader());
ruby.runFromMain(script, config.displayedFileName());
} catch (final RaiseException ex) {
final RubyException rexep = ex.getException();
if (ruby.getSystemExit().isInstance(rexep)) {
final IRubyObject status =
rexep.callMethod(ruby.getCurrentContext(), "status");
if (status != null && !status.isNil() && RubyNumeric.fix2int(status) != 0) {
final RubyException re = ex.getException();
if (re instanceof RubySystemExit) {
IRubyObject success = ((RubySystemExit) re).success_p();
if (!success.isTrue()) {
uncleanShutdown(ex);
}
} else {
@ -190,7 +242,10 @@ public final class Logstash implements Runnable, AutoCloseable {
return resolved.toString();
}
private static final String UNCLEAN_SHUTDOWN_PREFIX = "Logstash stopped processing because of an error: ";
private static void uncleanShutdown(final Exception ex) {
throw new IllegalStateException("Logstash stopped processing because of an error: " + ex.getMessage(), ex);
throw new IllegalStateException(UNCLEAN_SHUTDOWN_PREFIX + ex.getMessage(), ex);
}
}

View file

@ -0,0 +1,3 @@
---
services:
- logstash

View file

@ -0,0 +1,74 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
require_relative '../framework/fixture'
require_relative '../framework/helpers'
require_relative '../framework/settings'
require_relative '../services/logstash_service'
require "logstash/devutils/rspec/spec_helper"
describe "uncaught exception" do
before(:all) do
@fixture = Fixture.new(__FILE__)
@logstash = @fixture.get_service("logstash")
end
after(:each) { @logstash.teardown }
let(:timeout) { 90 } # seconds
let(:temp_dir) { Stud::Temporary.directory("logstash-error-test") }
it "halts LS on fatal error" do
config = "input { generator { count => 1 message => 'a fatal error' } } "
# inline Ruby filter seems to catch everything (including java.lang.Error) so we exercise a thread throwing
config += "filter { ruby { code => 'Thread.start { raise java.lang.AssertionError.new event.get(\"message\") }' } }"
spawn_logstash_and_wait_for_exit! config, timeout
expect(@logstash.exit_code).to be 120
log_file = "#{temp_dir}/logstash-plain.log"
expect( File.exists?(log_file) ).to be true
expect( File.read(log_file) ).to match /\[FATAL\]\[org.logstash.Logstash.*?java.lang.AssertionError: a fatal error/m
end
it "logs unexpected exception (from Java thread)" do
config = "input { generator { count => 1 message => 'unexpected' } } "
config += "filter { ruby { code => 'java.lang.Thread.new { raise java.io.EOFException.new event.get(\"message\") }.start; sleep(1.5)' } }"
spawn_logstash_and_wait_for_exit! config, timeout
expect(@logstash.exit_code).to be 0 # normal exit
log_file = "#{temp_dir}/logstash-plain.log"
expect( File.exists?(log_file) ).to be true
expect( File.read(log_file) ).to match /\[ERROR\]\[org.logstash.Logstash.*?uncaught exception \(in thread .*?java.io.EOFException: unexpected/m
end
def spawn_logstash_and_wait_for_exit!(config, timeout)
@logstash.spawn_logstash('-w', '1', '--path.logs', temp_dir, '-e', config)
time = Time.now
while (Time.now - time) < timeout
sleep(0.1)
break if @logstash.exited?
end
raise 'LS process did not exit!' unless @logstash.exited?
end
end