Silence PUMA errors when an exception occur in logstash core

When Logstash encounter an exception Puma was getting really noisy about
the file descriptor it used. Its because Puma has an infinite loop into
his reactor and when logstash is dying from the exception the FD get
removed under the puma threads.

Puma is using the constants `STDERR` and `STDOUT` to give information
about the errors, there is no easy way to provide Puma with a custom IO.
This PR hacks the Puma name to redefine both constants and send the
information to the logger using the `debug` level.

Also when we start the server we can pass a custom Puma::Events instance
to record some of the errors happening in the internal threads, this PR
also make sure we send all theses message to the configured logger.

Fixes: #5822

Fixes #5869
This commit is contained in:
Pier-Hugues Pellerin 2016-09-07 15:34:06 -04:00
parent 087a09ee71
commit 09a4d35687
3 changed files with 113 additions and 19 deletions

View file

@ -70,10 +70,38 @@ module LogStash
@server.stop(true) if @server
end
# Puma uses by default the STDERR an the STDOUT for all his error
# handling, the server class accept custom a events object that can accept custom io object,
# so I just wrap the logger into an IO like object.
class IOWrappedLogger
def initialize(logger)
@logger = logger
end
def sync=(v)
end
def puts(str)
# The logger only accept a str as the first argument
@logger.debug(str.to_s)
end
alias_method :write, :puts
alias_method :<<, :puts
end
def start_webserver(port)
# wrap any output that puma could generate into a wrapped logger
# use the puma namespace to override STDERR, STDOUT in that scope.
io_wrapped_logger = IOWrappedLogger.new(@logger)
::Puma.const_set("STDERR", io_wrapped_logger) unless ::Puma.const_defined?("STDERR")
::Puma.const_set("STDOUT", io_wrapped_logger) unless ::Puma.const_defined?("STDOUT")
app = LogStash::Api::RackApp.app(logger, agent, http_environment)
@server = ::Puma::Server.new(app)
events = ::Puma::Events.new(io_wrapped_logger, io_wrapped_logger)
@server = ::Puma::Server.new(app, events)
@server.add_tcp_listener(http_host, port)
logger.info("Succesfully started Logstash API", :port => @port)

View file

@ -1,6 +1,7 @@
# encoding: utf-8
# require "logstash/json"
require "logstash/webserver"
require_relative "../support/helpers"
require "socket"
require "spec_helper"
require "open-uri"
@ -9,11 +10,15 @@ def block_ports(range)
servers = []
range.each do |port|
server = TCPServer.new("localhost", port)
Thread.new do
client = server.accept rescue nil
begin
server = TCPServer.new("localhost", port)
Thread.new do
client = server.accept rescue nil
end
servers << server
rescue => e
# The port can already be taken
end
servers << server
end
sleep(1)
@ -36,29 +41,56 @@ describe LogStash::WebServer do
Thread.abort_on_exception = @abort
end
let(:logger) { double("logger") }
let(:logger) { LogStash::Logging::Logger.new("testing") }
let(:agent) { double("agent") }
let(:webserver) { double("webserver") }
before :each do
[:info, :warn, :error, :fatal, :debug].each do |level|
allow(logger).to receive(level)
end
[:info?, :warn?, :error?, :fatal?, :debug?].each do |level|
allow(logger).to receive(level)
end
allow(webserver).to receive(:address).and_return("127.0.0.1")
allow(agent).to receive(:webserver).and_return(webserver)
end
context "when the port is already in use and a range is provided" do
subject { LogStash::WebServer.new(logger,
agent,
{ :http_host => "localhost", :http_ports => port_range
})}
subject { LogStash::WebServer.new(logger,
agent,
{ :http_host => "localhost", :http_ports => port_range })}
let(:port_range) { 10000..10010 }
let(:port_range) { 10000..10010 }
context "when an exception occur in the server thread" do
let(:spy_output) { spy("stderr").as_null_object }
it "should not log to STDERR" do
backup_stderr = STDERR
backup_stdout = STDOUT
# We are redefining constants, so lets silence the warning
silence_warnings do
STDOUT = STDERR = spy_output
end
expect(spy_output).not_to receive(:puts).with(any_args)
expect(spy_output).not_to receive(:write).with(any_args)
# This trigger an infinite loop in the reactor
expect(IO).to receive(:select).and_raise(IOError).at_least(:once)
t = Thread.new do
subject.run
end
sleep(1)
# We cannot use stop here, since the code is stuck in an infinite loop
t.kill rescue nil
silence_warnings do
STDERR = backup_stderr
STDOUT = backup_stdout
end
end
end
context "when the port is already in use and a range is provided" do
after(:each) { free_ports(@servers) }
context "when we have available ports" do
@ -93,3 +125,29 @@ describe LogStash::WebServer do
end
end
end
describe LogStash::WebServer::IOWrappedLogger do
let(:logger) { spy("logger") }
let(:message) { "foobar" }
subject { described_class.new(logger) }
it "responds to puts" do
subject.puts(message)
expect(logger).to have_received(:debug).with(message)
end
it "responds to write" do
subject.write(message)
expect(logger).to have_received(:debug).with(message)
end
it "reponds to <<" do
subject << message
expect(logger).to have_received(:debug).with(message)
end
it "responds to sync=(v)" do
expect{ subject.sync = true }.not_to raise_error
end
end

View file

@ -0,0 +1,8 @@
# encoding: utf-8
def silence_warnings
warn_level = $VERBOSE
$VERBOSE = nil
yield
ensure
$VERBOSE = warn_level
end