mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
--http.port Now accept a port range
This PR does a few modifications for our webserver: - Add a PortRange setting type. - allow the --http.port option to accept a port range or a single port, when a range is provided Logstash will incrementally try this list to find a free port for the webserver. - Logstash will report in the log which port it is using. (INFO LEVEL) - It refactors a few methods of the webserver. - It adds test for the binding of the ports. Notes: Port range can be defined in the logstash.yml or pass via the command line like this. `bin/logstash -e 'input { generator {}} output { null {}}' --log.level verbose --http.port 2000-2005` Fixes #5774
This commit is contained in:
parent
f68b2c0841
commit
672b369c41
13 changed files with 348 additions and 44 deletions
|
@ -91,9 +91,10 @@
|
|||
#
|
||||
# http.host: "127.0.0.1"
|
||||
#
|
||||
# Bind port for the metrics REST endpoint
|
||||
# Bind port for the metrics REST endpoint, this option also accept a range
|
||||
# (9600-9700) and logstash will pick up the first available ports.
|
||||
#
|
||||
# http.port: 9600
|
||||
# http.port: 9600-9700
|
||||
#
|
||||
# ------------ Debugging Settings --------------
|
||||
#
|
||||
|
|
3
docs/static/command-line-flags.asciidoc
vendored
3
docs/static/command-line-flags.asciidoc
vendored
|
@ -90,7 +90,8 @@ added[5.0.0-alpha3, Command-line flags have dots instead of dashes in their name
|
|||
The bind address for the metrics REST endpoint. The default is "127.0.0.1".
|
||||
|
||||
*`--http.port HTTP_PORT`*::
|
||||
The bind port for the metrics REST endpoint. The default is 9600.
|
||||
The bind port for the metrics REST endpoint. The default is 9600-9700.
|
||||
This settings accept a range of the format 9600-9700 and Logstash will pick up the first available port.
|
||||
|
||||
*`--pipeline.unsafe_shutdown`*::
|
||||
Force Logstash to exit during shutdown even if there are still inflight events
|
||||
|
|
|
@ -135,7 +135,7 @@ class LogStash::Agent
|
|||
|
||||
private
|
||||
def start_webserver
|
||||
options = {:http_host => @http_host, :http_port => @http_port, :http_environment => @http_environment }
|
||||
options = {:http_host => @http_host, :http_ports => @http_port, :http_environment => @http_environment }
|
||||
@webserver = LogStash::WebServer.new(@logger, self, options)
|
||||
Thread.new(@webserver) do |webserver|
|
||||
LogStash::Util.set_thread_name("Api Webserver")
|
||||
|
|
|
@ -9,7 +9,7 @@ module LogStash
|
|||
def all
|
||||
{:host => host, :version => version, :http_address => http_address}
|
||||
end
|
||||
|
||||
|
||||
def host
|
||||
Socket.gethostname
|
||||
end
|
||||
|
|
|
@ -38,7 +38,7 @@ module LogStash
|
|||
Setting::String.new("path.log", nil, false),
|
||||
Setting::String.new("log.format", "plain", true, ["json", "plain"]),
|
||||
Setting::String.new("http.host", "127.0.0.1"),
|
||||
Setting::Port.new("http.port", 9600),
|
||||
Setting::PortRange.new("http.port", 9600..9700),
|
||||
Setting::String.new("http.environment", "production"),
|
||||
].each {|setting| SETTINGS.register(setting) }
|
||||
|
||||
|
|
|
@ -278,8 +278,51 @@ module LogStash
|
|||
end
|
||||
|
||||
class Port < Integer
|
||||
VALID_PORT_RANGE = 1..65535
|
||||
|
||||
def initialize(name, default=nil, strict=true)
|
||||
super(name, default, strict) {|value| value >= 1 && value <= 65535 }
|
||||
super(name, default, strict) { |value| valid?(value) }
|
||||
end
|
||||
|
||||
def valid?(port)
|
||||
VALID_PORT_RANGE.cover?(port)
|
||||
end
|
||||
end
|
||||
|
||||
class PortRange < Coercible
|
||||
PORT_SEPARATOR = "-"
|
||||
|
||||
def initialize(name, default=nil, strict=true)
|
||||
super(name, ::Range, default, strict=true) { |value| valid?(value) }
|
||||
end
|
||||
|
||||
def valid?(range)
|
||||
Port::VALID_PORT_RANGE.first <= range.first && Port::VALID_PORT_RANGE.last >= range.last
|
||||
end
|
||||
|
||||
def coerce(value)
|
||||
case value
|
||||
when ::Range
|
||||
value
|
||||
when ::Fixnum
|
||||
value..value
|
||||
when ::String
|
||||
first, last = value.split(PORT_SEPARATOR)
|
||||
last = first if last.nil?
|
||||
begin
|
||||
(Integer(first))..(Integer(last))
|
||||
rescue ArgumentError # Trap and reraise a more human error
|
||||
raise ArgumentError.new("Could not coerce #{value} into a port range")
|
||||
end
|
||||
else
|
||||
raise ArgumentError.new("Could not coerce #{value} into a port range")
|
||||
end
|
||||
end
|
||||
|
||||
def validate(value)
|
||||
unless valid?(value)
|
||||
raise ArgumentError.new("Invalid value \"#{value}, valid options are within the range of #{Port::VALID_PORT_RANGE.first}-#{Port::VALID_PORT_RANGE.last}")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -331,7 +374,6 @@ module LogStash
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
SETTINGS = Settings.new
|
||||
|
|
|
@ -1,73 +1,84 @@
|
|||
# encoding: utf-8
|
||||
require "logstash/api/rack_app"
|
||||
require "puma"
|
||||
require "puma/server"
|
||||
require "logstash/api/rack_app"
|
||||
require "concurrent"
|
||||
|
||||
module LogStash
|
||||
module LogStash
|
||||
class WebServer
|
||||
extend Forwardable
|
||||
|
||||
attr_reader :logger, :status, :config, :options, :cli_options, :runner, :binder, :events, :http_host, :http_port, :http_environment, :agent
|
||||
attr_reader :logger, :status, :config, :options, :runner, :binder, :events, :http_host, :http_ports, :http_environment, :agent
|
||||
|
||||
def_delegator :@runner, :stats
|
||||
|
||||
DEFAULT_HOST = "127.0.0.1".freeze
|
||||
DEFAULT_PORT = 9600.freeze
|
||||
DEFAULT_PORTS = (9600..9700).freeze
|
||||
DEFAULT_ENVIRONMENT = 'production'.freeze
|
||||
|
||||
def initialize(logger, agent, options={})
|
||||
@logger = logger
|
||||
@agent = agent
|
||||
@http_host = options[:http_host] || DEFAULT_HOST
|
||||
@http_port = options[:http_port] || DEFAULT_PORT
|
||||
@http_ports = options[:http_ports] || DEFAULT_PORTS
|
||||
@http_environment = options[:http_environment] || DEFAULT_ENVIRONMENT
|
||||
@options = {}
|
||||
@cli_options = options.merge({ :rackup => ::File.join(::File.dirname(__FILE__), "api", "init.ru"),
|
||||
:binds => ["tcp://#{http_host}:#{http_port}"],
|
||||
:debug => logger.debug?,
|
||||
# Prevent puma from queueing request when not able to properly handling them,
|
||||
# fixed https://github.com/elastic/logstash/issues/4674. See
|
||||
# https://github.com/puma/puma/pull/640 for mode internal details in PUMA.
|
||||
:queue_requests => false
|
||||
})
|
||||
@status = nil
|
||||
@status = nil
|
||||
@running = Concurrent::AtomicBoolean.new(false)
|
||||
end
|
||||
|
||||
def run
|
||||
log "=== puma start: #{Time.now} ==="
|
||||
logger.debug("Starting puma")
|
||||
|
||||
stop # Just in case
|
||||
|
||||
app = LogStash::Api::RackApp.app(logger, agent, http_environment)
|
||||
@server = ::Puma::Server.new(app)
|
||||
@server.add_tcp_listener(http_host, http_port)
|
||||
running!
|
||||
|
||||
@server.run.join
|
||||
rescue Errno::EADDRINUSE
|
||||
message = "Logstash tried to bind to port #{@http_port}, but the port is already in use. You can specify a new port by launching logtash with the --http-port option."
|
||||
raise Errno::EADDRINUSE.new(message)
|
||||
http_ports.each_with_index do |port, idx|
|
||||
begin
|
||||
if running?
|
||||
@port = port
|
||||
logger.debug("Trying to start WebServer", :port => @port)
|
||||
start_webserver(@port)
|
||||
else
|
||||
break # we are closing down the server so just get out of the loop
|
||||
end
|
||||
rescue Errno::EADDRINUSE
|
||||
if http_ports.count == 1
|
||||
raise Errno::EADDRINUSE.new(I18n.t("logstash.web_api.cant_bind_to_port", :port => http_ports.first))
|
||||
elsif idx == http_ports.count-1
|
||||
raise Errno::EADDRINUSE.new(I18n.t("logstash.web_api.cant_bind_to_port_in_range", :http_ports => http_ports))
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def log(str)
|
||||
logger.debug(str)
|
||||
def running!
|
||||
@running.make_true
|
||||
end
|
||||
|
||||
def error(str)
|
||||
logger.error(str)
|
||||
def running?
|
||||
@running.value
|
||||
end
|
||||
|
||||
def address
|
||||
"#{http_host}:#{http_port}"
|
||||
"#{http_host}:#{@port}"
|
||||
end
|
||||
|
||||
# Empty method, this method is required because of the puma usage we make through
|
||||
# the Single interface, https://github.com/puma/puma/blob/master/lib/puma/single.rb#L82
|
||||
# for more details. This can always be implemented when we want to keep track of this
|
||||
# bit of data.
|
||||
def write_state; end
|
||||
|
||||
def stop(options={})
|
||||
@running.make_false
|
||||
@server.stop(true) if @server
|
||||
end
|
||||
|
||||
def start_webserver(port)
|
||||
app = LogStash::Api::RackApp.app(logger, agent, http_environment)
|
||||
|
||||
@server = ::Puma::Server.new(app)
|
||||
@server.add_tcp_listener(http_host, port)
|
||||
|
||||
logger.info("Succesfully started Logstash API", :port => @port)
|
||||
|
||||
@server.run.join
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -73,6 +73,10 @@ en:
|
|||
non_reloadable_config_register: |-
|
||||
Logstash is not able to start since configuration auto reloading was enabled but the configuration contains plugins that don't support it. Quitting...
|
||||
web_api:
|
||||
cant_bind_to_port: |-
|
||||
Logstash tried to bind to port %{port}, but the port is already in use. You can specify a new port by launching logtash with the --http-port option."
|
||||
cant_bind_to_port_in_range: |-
|
||||
Logstash tried to bind to port range %{http_ports}, but all the ports are already in use. You can specify a new port by launching logtash with the --http-port option."
|
||||
hot_threads:
|
||||
title: |-
|
||||
::: {%{hostname}}
|
||||
|
|
|
@ -35,7 +35,7 @@ module ResourceDSLMethods
|
|||
end
|
||||
|
||||
it "should include the http address" do
|
||||
expect(payload["http_address"]).to eql("#{Socket.gethostname}:#{::LogStash::WebServer::DEFAULT_PORT}")
|
||||
expect(payload["http_address"]).to eql("#{Socket.gethostname}:#{::LogStash::WebServer::DEFAULT_PORTS.first}")
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ end
|
|||
module LogStash
|
||||
class DummyAgent < Agent
|
||||
def start_webserver
|
||||
@webserver = Struct.new(:address).new("#{Socket.gethostname}:#{::LogStash::WebServer::DEFAULT_PORT}")
|
||||
@webserver = Struct.new(:address).new("#{Socket.gethostname}:#{::LogStash::WebServer::DEFAULT_PORTS.first}")
|
||||
end
|
||||
def stop_webserver; end
|
||||
end
|
||||
|
|
|
@ -189,6 +189,64 @@ describe LogStash::Runner do
|
|||
allow(pipeline).to receive(:shutdown)
|
||||
end
|
||||
|
||||
context "when :http.host is defined by the user" do
|
||||
it "should pass the value to the webserver" do
|
||||
expect(LogStash::Agent).to receive(:new) do |settings|
|
||||
expect(settings.set?("http.host")).to be(true)
|
||||
expect(settings.get("http.host")).to eq("localhost")
|
||||
end
|
||||
|
||||
args = ["--http.host", "localhost", "-e", pipeline_string]
|
||||
subject.run("bin/logstash", args)
|
||||
end
|
||||
end
|
||||
|
||||
context "when :http.host is not defined by the user" do
|
||||
it "should pass the value to the webserver" do
|
||||
expect(LogStash::Agent).to receive(:new) do |settings|
|
||||
expect(settings.set?("http.host")).to be_falsey
|
||||
expect(settings.get("http.host")).to eq("127.0.0.1")
|
||||
end
|
||||
|
||||
args = ["-e", pipeline_string]
|
||||
subject.run("bin/logstash", args)
|
||||
end
|
||||
end
|
||||
|
||||
context "when :http.port is defined by the user" do
|
||||
it "should pass a single value to the webserver" do
|
||||
expect(LogStash::Agent).to receive(:new) do |settings|
|
||||
expect(settings.set?("http.port")).to be(true)
|
||||
expect(settings.get("http.port")).to eq(10000..10000)
|
||||
end
|
||||
|
||||
args = ["--http.port", "10000", "-e", pipeline_string]
|
||||
subject.run("bin/logstash", args)
|
||||
end
|
||||
|
||||
it "should pass a range value to the webserver" do
|
||||
expect(LogStash::Agent).to receive(:new) do |settings|
|
||||
expect(settings.set?("http.port")).to be(true)
|
||||
expect(settings.get("http.port")).to eq(10000..20000)
|
||||
end
|
||||
|
||||
args = ["--http.port", "10000-20000", "-e", pipeline_string]
|
||||
subject.run("bin/logstash", args)
|
||||
end
|
||||
end
|
||||
|
||||
context "when no :http.port is not defined by the user" do
|
||||
it "should use the default settings" do
|
||||
expect(LogStash::Agent).to receive(:new) do |settings|
|
||||
expect(settings.set?("http.port")).to be_falsey
|
||||
expect(settings.get("http.port")).to eq(9600..9700)
|
||||
end
|
||||
|
||||
args = ["-e", pipeline_string]
|
||||
subject.run("bin/logstash", args)
|
||||
end
|
||||
end
|
||||
|
||||
context "when :pipeline_workers is not defined by the user" do
|
||||
it "should not pass the value to the pipeline" do
|
||||
expect(LogStash::Agent).to receive(:new) do |settings|
|
||||
|
@ -270,5 +328,4 @@ describe LogStash::Runner do
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
93
logstash-core/spec/logstash/settings/port_range_spec.rb
Normal file
93
logstash-core/spec/logstash/settings/port_range_spec.rb
Normal file
|
@ -0,0 +1,93 @@
|
|||
# encoding: utf-8
|
||||
#
|
||||
require "logstash/settings"
|
||||
require "spec_helper"
|
||||
|
||||
describe LogStash::Setting::PortRange do
|
||||
|
||||
context "When the value is a Fixnum" do
|
||||
subject { LogStash::Setting::PortRange.new("mynewtest", 9000) }
|
||||
|
||||
it "coerces the value in a range" do
|
||||
expect { subject }.not_to raise_error
|
||||
end
|
||||
|
||||
it "returns a range" do
|
||||
expect(subject.value).to eq(9000..9000)
|
||||
end
|
||||
|
||||
it "can update the range" do
|
||||
subject.set(10000)
|
||||
expect(subject.value).to eq(10000..10000)
|
||||
end
|
||||
end
|
||||
|
||||
context "When the value is a string" do
|
||||
subject { LogStash::Setting::PortRange.new("mynewtest", "9000-10000") }
|
||||
|
||||
it "coerces a string range with the format (9000-10000)" do
|
||||
expect { subject }.not_to raise_error
|
||||
end
|
||||
|
||||
it "refuses when then upper port is out of range" do
|
||||
expect { LogStash::Setting::PortRange.new("mynewtest", "1000-95000") }.to raise_error
|
||||
end
|
||||
|
||||
it "returns a range" do
|
||||
expect(subject.value).to eq(9000..10000)
|
||||
end
|
||||
|
||||
it "can update the range" do
|
||||
subject.set("500-1000")
|
||||
expect(subject.value).to eq(500..1000)
|
||||
end
|
||||
end
|
||||
|
||||
context "when the value is a garbage string" do
|
||||
subject { LogStash::Setting::PortRange.new("mynewtest", "fsdfnsdkjnfjs") }
|
||||
|
||||
it "raises an argument error" do
|
||||
expect { subject }.to raise_error
|
||||
end
|
||||
|
||||
|
||||
it "raises an exception on update" do
|
||||
expect { LogStash::Setting::PortRange.new("mynewtest", 10000).set("dsfnsdknfksdnfjksdnfjns") }.to raise_error
|
||||
end
|
||||
end
|
||||
|
||||
context "when the value is an unkown type" do
|
||||
subject { LogStash::Setting::PortRange.new("mynewtest", 0.1) }
|
||||
|
||||
|
||||
it "raises an argument error" do
|
||||
expect { subject }.to raise_error
|
||||
end
|
||||
|
||||
|
||||
it "raises an exception on update" do
|
||||
expect { LogStash::Setting::PortRange.new("mynewtest", 10000).set(0.1) }.to raise_error
|
||||
end
|
||||
end
|
||||
|
||||
context "When value is a range" do
|
||||
subject { LogStash::Setting::PortRange.new("mynewtest", 9000..10000) }
|
||||
|
||||
it "accepts a ruby range as the default value" do
|
||||
expect { subject }.not_to raise_error
|
||||
end
|
||||
|
||||
it "can update the range" do
|
||||
subject.set(500..1000)
|
||||
expect(subject.value).to eq(500..1000)
|
||||
end
|
||||
|
||||
it "refuses when then upper port is out of range" do
|
||||
expect { LogStash::Setting::PortRange.new("mynewtest", 9000..1000000) }.to raise_error
|
||||
end
|
||||
|
||||
it "raise an exception on when port are out of range" do
|
||||
expect { LogStash::Setting::PortRange.new("mynewtest", -1000..1000) }.to raise_error
|
||||
end
|
||||
end
|
||||
end
|
95
logstash-core/spec/logstash/webserver_spec.rb
Normal file
95
logstash-core/spec/logstash/webserver_spec.rb
Normal file
|
@ -0,0 +1,95 @@
|
|||
# encoding: utf-8
|
||||
# require "logstash/json"
|
||||
require "logstash/webserver"
|
||||
require "socket"
|
||||
require "spec_helper"
|
||||
require "open-uri"
|
||||
|
||||
def block_ports(range)
|
||||
servers = []
|
||||
|
||||
range.each do |port|
|
||||
server = TCPServer.new("localhost", port)
|
||||
Thread.new do
|
||||
client = server.accept rescue nil
|
||||
end
|
||||
servers << server
|
||||
end
|
||||
|
||||
sleep(1)
|
||||
servers
|
||||
end
|
||||
|
||||
def free_ports(servers)
|
||||
servers.each do |t|
|
||||
t.close rescue nil # the threads are blocked just kill
|
||||
end
|
||||
end
|
||||
|
||||
describe LogStash::WebServer do
|
||||
before :all do
|
||||
@abort = Thread.abort_on_exception
|
||||
Thread.abort_on_exception = true
|
||||
end
|
||||
|
||||
after :all do
|
||||
Thread.abort_on_exception = @abort
|
||||
end
|
||||
|
||||
let(:logger) { double("logger") }
|
||||
let(:agent) { double("agent") }
|
||||
let(:webserver) { double("webserver") }
|
||||
|
||||
before :each do
|
||||
[:info, :warn, :error, :fatal, :debug].each do |level|
|
||||
allow(logger).to receive(level)
|
||||
end
|
||||
[:info?, :warn?, :error?, :fatal?, :debug?].each do |level|
|
||||
allow(logger).to receive(level)
|
||||
end
|
||||
|
||||
allow(webserver).to receive(:address).and_return("127.0.0.1")
|
||||
allow(agent).to receive(:webserver).and_return(webserver)
|
||||
end
|
||||
|
||||
context "when the port is already in use and a range is provided" do
|
||||
subject { LogStash::WebServer.new(logger,
|
||||
agent,
|
||||
{ :http_host => "localhost", :http_ports => port_range
|
||||
})}
|
||||
|
||||
let(:port_range) { 10000..10010 }
|
||||
after(:each) { free_ports(@servers) }
|
||||
|
||||
context "when we have available ports" do
|
||||
before(:each) do
|
||||
@servers = block_ports(10000..10005)
|
||||
end
|
||||
|
||||
it "successfully find an available port" do
|
||||
t = Thread.new do
|
||||
subject.run
|
||||
end
|
||||
|
||||
sleep(1)
|
||||
|
||||
response = open("http://localhost:10006").read
|
||||
expect { LogStash::Json.load(response) }.not_to raise_error
|
||||
expect(subject.address).to eq("localhost:10006")
|
||||
|
||||
subject.stop
|
||||
t.kill rescue nil
|
||||
end
|
||||
end
|
||||
|
||||
context "when all the ports are taken" do
|
||||
before(:each) do
|
||||
@servers = block_ports(port_range)
|
||||
end
|
||||
|
||||
it "raise an exception" do
|
||||
expect { subject.run }.to raise_error(Errno::EADDRINUSE, /Logstash tried to bind to port range/)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Add table
Add a link
Reference in a new issue