mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
Add Persistent UUID for Agent
This adds two new fields 'id', and 'name' to the base metadata for API requests. These fields are now returned at all API endpoints by default. The `id` field is the persisted UUID, the name field is the custom name the user has passed in (defaulted to the hostname). I renamed `node_uuid` and `node_name` to just `id` and `name` to be inline with Elasticsearch. This also fixed a broken test double in `webserver_spec.rb` that was using doubles across threads which created hidden errors. Fixes #6224
This commit is contained in:
parent
ab8a5f3cf9
commit
80d21712c6
10 changed files with 117 additions and 28 deletions
|
@ -21,12 +21,12 @@ class LogStash::Agent
|
|||
include LogStash::Util::Loggable
|
||||
STARTED_AT = Time.now.freeze
|
||||
|
||||
attr_reader :metric, :node_name, :pipelines, :settings, :webserver
|
||||
attr_reader :metric, :name, :pipelines, :settings, :webserver
|
||||
attr_accessor :logger
|
||||
|
||||
# initialize method for LogStash::Agent
|
||||
# @param params [Hash] potential parameters are:
|
||||
# :node_name [String] - identifier for the agent
|
||||
# :name [String] - identifier for the agent
|
||||
# :auto_reload [Boolean] - enable reloading of pipelines
|
||||
# :reload_interval [Integer] - reload pipelines every X seconds
|
||||
def initialize(settings = LogStash::SETTINGS)
|
||||
|
@ -35,10 +35,12 @@ class LogStash::Agent
|
|||
@auto_reload = setting("config.reload.automatic")
|
||||
|
||||
@pipelines = {}
|
||||
@node_name = setting("node.name")
|
||||
@name = setting("node.name")
|
||||
@http_host = setting("http.host")
|
||||
@http_port = setting("http.port")
|
||||
@http_environment = setting("http.environment")
|
||||
# Generate / load the persistent uuid
|
||||
id
|
||||
|
||||
@config_loader = LogStash::Config::Loader.new(@logger)
|
||||
@reload_interval = setting("config.reload.interval")
|
||||
|
@ -131,8 +133,42 @@ class LogStash::Agent
|
|||
shutdown_pipelines
|
||||
end
|
||||
|
||||
def node_uuid
|
||||
@node_uuid ||= SecureRandom.uuid
|
||||
def id
|
||||
return @id if @id
|
||||
|
||||
uuid = nil
|
||||
if ::File.exists?(id_path)
|
||||
begin
|
||||
uuid = ::File.open(id_path) {|f| f.each_line.first.chomp }
|
||||
rescue => e
|
||||
logger.warn("Could not open persistent UUID file!",
|
||||
:path => id_path,
|
||||
:error => e.message,
|
||||
:class => e.class.name)
|
||||
end
|
||||
end
|
||||
|
||||
if !uuid
|
||||
uuid = SecureRandom.uuid
|
||||
logger.info("No persistent UUID file found. Generating new UUID",
|
||||
:uuid => uuid,
|
||||
:path => id_path)
|
||||
begin
|
||||
::File.open(id_path, 'w') {|f| f.write(uuid) }
|
||||
rescue => e
|
||||
logger.warn("Could not write persistent UUID file! Will use ephemeral UUID",
|
||||
:uuid => uuid,
|
||||
:path => id_path,
|
||||
:error => e.message,
|
||||
:class => e.class.name)
|
||||
end
|
||||
end
|
||||
|
||||
@id = uuid
|
||||
end
|
||||
|
||||
def id_path
|
||||
@id_path ||= ::File.join(settings.get("path.data"), "uuid")
|
||||
end
|
||||
|
||||
def running_pipelines?
|
||||
|
|
|
@ -7,7 +7,8 @@ module LogStash
|
|||
module Commands
|
||||
class DefaultMetadata < Commands::Base
|
||||
def all
|
||||
{:host => host, :version => version, :http_address => http_address}
|
||||
{:host => host, :version => version, :http_address => http_address,
|
||||
:id => service.agent.id, :name => service.agent.name}
|
||||
end
|
||||
|
||||
def host
|
||||
|
|
|
@ -30,7 +30,7 @@ class LogStash::Runner < Clamp::StrictCommand
|
|||
|
||||
# Node Settings
|
||||
option ["-n", "--node.name"], "NAME",
|
||||
I18n.t("logstash.runner.flag.node_name"),
|
||||
I18n.t("logstash.runner.flag.name"),
|
||||
:attribute_name => "node.name",
|
||||
:default => LogStash::SETTINGS.get_default("node.name")
|
||||
|
||||
|
|
|
@ -246,7 +246,7 @@ en:
|
|||
rubyshell: |+
|
||||
Drop to shell instead of running as normal.
|
||||
Valid shells are "irb" and "pry"
|
||||
node_name: |+
|
||||
name: |+
|
||||
Specify the name of this logstash instance, if no value is given
|
||||
it will default to the current hostname.
|
||||
agent: |+
|
||||
|
|
|
@ -25,16 +25,16 @@ module ResourceDSLMethods
|
|||
def test_api(expected, path)
|
||||
context "GET #{path}" do
|
||||
let(:payload) { LogStash::Json.load(last_response.body) }
|
||||
|
||||
|
||||
before(:all) do
|
||||
do_request { get path }
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
it "should respond OK" do
|
||||
expect(last_response).to be_ok
|
||||
end
|
||||
|
||||
|
||||
|
||||
describe "the default metadata" do
|
||||
it "should include the host" do
|
||||
expect(payload["host"]).to eql(Socket.gethostname)
|
||||
|
@ -47,11 +47,19 @@ module ResourceDSLMethods
|
|||
it "should include the http address" do
|
||||
expect(payload["http_address"]).to eql("#{Socket.gethostname}:#{::LogStash::WebServer::DEFAULT_PORTS.first}")
|
||||
end
|
||||
|
||||
it "should include the node name" do
|
||||
expect(payload["name"]).to eql(@runner.agent.name)
|
||||
end
|
||||
|
||||
it "should include the node id" do
|
||||
expect(payload["id"]).to eql(@runner.agent.id)
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
hash_to_mapping(expected).each do |resource_path,klass|
|
||||
dotted = resource_path.join(".")
|
||||
|
||||
|
||||
it "should set '#{dotted}' at '#{path}' to be a '#{klass}'" do
|
||||
expect(last_response).to be_ok # fail early if need be
|
||||
resource_path_value = resource_path.reduce(payload) do |acc,v|
|
||||
|
|
|
@ -3,6 +3,8 @@ require "spec_helper"
|
|||
require "stud/temporary"
|
||||
require "logstash/inputs/generator"
|
||||
require_relative "../support/mocks_classes"
|
||||
require "fileutils"
|
||||
require_relative "../support/helpers"
|
||||
|
||||
describe LogStash::Agent do
|
||||
|
||||
|
@ -13,9 +15,12 @@ describe LogStash::Agent do
|
|||
let(:config_file) { Stud::Temporary.pathname }
|
||||
let(:config_file_txt) { "input { generator { count => 100000 } } output { }" }
|
||||
|
||||
subject { LogStash::Agent.new(agent_settings) }
|
||||
subject { LogStash::Agent.new(agent_settings) }
|
||||
|
||||
before :each do
|
||||
# This MUST run first, before `subject` is invoked to ensure clean state
|
||||
clear_data_dir
|
||||
|
||||
File.open(config_file, "w") { |f| f.puts config_file_txt }
|
||||
agent_args.each do |key, value|
|
||||
agent_settings.set(key, value)
|
||||
|
@ -32,7 +37,7 @@ describe LogStash::Agent do
|
|||
end
|
||||
|
||||
it "fallback to hostname when no name is provided" do
|
||||
expect(LogStash::Agent.new.node_name).to eq(Socket.gethostname)
|
||||
expect(LogStash::Agent.new.name).to eq(Socket.gethostname)
|
||||
end
|
||||
|
||||
describe "register_pipeline" do
|
||||
|
@ -56,7 +61,21 @@ describe LogStash::Agent do
|
|||
end
|
||||
end
|
||||
|
||||
describe "#execute" do
|
||||
describe "#id" do
|
||||
let(:config_file_txt) { "" }
|
||||
let(:id_file_data) { File.open(subject.id_path) {|f| f.read } }
|
||||
|
||||
it "should return a UUID" do
|
||||
expect(subject.id).to be_a(String)
|
||||
expect(subject.id.size).to be > 0
|
||||
end
|
||||
|
||||
it "should write out the persistent UUID" do
|
||||
expect(id_file_data).to eql(subject.id)
|
||||
end
|
||||
end
|
||||
|
||||
describe "#execute" do
|
||||
let(:config_file_txt) { "input { generator { count => 100000 } } output { }" }
|
||||
|
||||
before :each do
|
||||
|
|
|
@ -42,13 +42,8 @@ describe LogStash::WebServer do
|
|||
end
|
||||
|
||||
let(:logger) { LogStash::Logging::Logger.new("testing") }
|
||||
let(:agent) { double("agent") }
|
||||
let(:webserver) { double("webserver") }
|
||||
|
||||
before :each do
|
||||
allow(webserver).to receive(:address).and_return("127.0.0.1")
|
||||
allow(agent).to receive(:webserver).and_return(webserver)
|
||||
end
|
||||
let(:agent) { OpenStruct.new({:webserver => webserver, :http_address => "127.0.0.1", :id => "myid", :name => "myname"}) }
|
||||
let(:webserver) { OpenStruct.new({}) }
|
||||
|
||||
subject { LogStash::WebServer.new(logger,
|
||||
agent,
|
||||
|
|
|
@ -6,3 +6,11 @@ def silence_warnings
|
|||
ensure
|
||||
$VERBOSE = warn_level
|
||||
end
|
||||
|
||||
def clear_data_dir
|
||||
data_path = agent_settings.get("path.data")
|
||||
Dir.foreach(data_path) do |f|
|
||||
next if f == "." || f == ".." || f == ".gitkeep"
|
||||
FileUtils.rm_rf(File.join(data_path, f))
|
||||
end
|
||||
end
|
||||
|
|
1
qa/integration/.ruby-version
Normal file
1
qa/integration/.ruby-version
Normal file
|
@ -0,0 +1 @@
|
|||
jruby-9.1.5.0
|
|
@ -4,6 +4,8 @@ require_relative '../services/logstash_service'
|
|||
require_relative '../framework/helpers'
|
||||
require "logstash/devutils/rspec/spec_helper"
|
||||
require "yaml"
|
||||
require 'json'
|
||||
require 'open-uri'
|
||||
|
||||
describe "Test Logstash instance" do
|
||||
before(:all) {
|
||||
|
@ -16,12 +18,12 @@ describe "Test Logstash instance" do
|
|||
after(:all) {
|
||||
@fixture.teardown
|
||||
}
|
||||
|
||||
|
||||
after(:each) {
|
||||
@ls1.teardown
|
||||
@ls2.teardown
|
||||
}
|
||||
|
||||
|
||||
let(:num_retries) { 10 }
|
||||
let(:config1) { config_to_temp_file(@fixture.config("root", { :port => random_port })) }
|
||||
let(:config2) { config_to_temp_file(@fixture.config("root", { :port => random_port })) }
|
||||
|
@ -35,7 +37,7 @@ describe "Test Logstash instance" do
|
|||
expect(is_port_open?(9600)).to be true
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
it "multiple of them can be started on the same box with automatically trying different ports for HTTP server" do
|
||||
@ls1.spawn_logstash("-f", config1)
|
||||
try(num_retries) do
|
||||
|
@ -52,7 +54,7 @@ describe "Test Logstash instance" do
|
|||
|
||||
expect(@ls1.process_id).not_to eq(@ls2.process_id)
|
||||
end
|
||||
|
||||
|
||||
it "gets the right version when asked" do
|
||||
expected = YAML.load_file(LogstashService::LS_VERSION_FILE)
|
||||
expect(@ls1.get_version.strip).to eq("logstash #{expected['logstash']}")
|
||||
|
@ -87,4 +89,23 @@ describe "Test Logstash instance" do
|
|||
expect(is_port_open?(port2)).to be true
|
||||
end
|
||||
end
|
||||
|
||||
def get_id
|
||||
JSON.parse(open("http://localhost:9600/").read)["id"]
|
||||
end
|
||||
|
||||
it "should keep the same id between restarts" do
|
||||
config_string = "input { tcp { port => #{port1} } }"
|
||||
|
||||
start_ls = lambda {
|
||||
@ls1.spawn_logstash("-e", config_string, "-f", config3)
|
||||
@ls1.wait_for_logstash
|
||||
}
|
||||
start_ls.call()
|
||||
first_id = get_id
|
||||
@ls1.teardown
|
||||
start_ls.call()
|
||||
second_id = get_id
|
||||
expect(first_id).to eq(second_id)
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue