mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Enabled: * SpaceInsideArrayLiteralBrackets * SpaceInsideParens * SpaceInsidePercentLiteralDelimiters * SpaceInsideStringInterpolation * Add enforced style for SpaceInsideStringInterpolation Enabled without offenses: * SpaceInsideArrayPercentLiteral * Layout/SpaceInsideRangeLiteral * Layout/SpaceInsideReferenceBrackets
171 lines
5.8 KiB
Ruby
171 lines
5.8 KiB
Ruby
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
# or more contributor license agreements. Licensed under the Elastic License;
|
|
# you may not use this file except in compliance with the Elastic License.
|
|
|
|
require "belzebuth"
|
|
require "yaml"
|
|
require "elasticsearch"
|
|
require "fileutils"
|
|
require "stud/try"
|
|
require "open3"
|
|
require "time"
|
|
|
|
VERSIONS_YML_PATH = File.join(File.dirname(__FILE__), "..", "..", "..", "..", "versions.yml")
|
|
VERSION_PATH = File.join(File.dirname(__FILE__), "..", "..", "..", "VERSION")
|
|
VERSION = File.exist?(VERSIONS_YML_PATH) ? YAML.load_file(VERSIONS_YML_PATH)['logstash'] : File.read(VERSION_PATH).strip
|
|
|
|
def get_logstash_path
|
|
ENV["LOGSTASH_PATH"] || File.join(File.dirname(__FILE__), "../../../../")
|
|
end
|
|
|
|
def get_elasticsearch_path
|
|
ENV["ELASTICSEARCH_PATH"] || File.join(File.dirname(__FILE__), "../../../../build/elasticsearch")
|
|
end
|
|
|
|
def elastic_password
|
|
'elasticpass'
|
|
end
|
|
|
|
#
|
|
# Elasticsearch
|
|
#
|
|
def elasticsearch(options = {})
|
|
temporary_path_data = Stud::Temporary.directory
|
|
default_settings = {
|
|
"path.data" => temporary_path_data,
|
|
|
|
"xpack.monitoring.collection.enabled" => true,
|
|
"xpack.security.enabled" => true,
|
|
"action.destructive_requires_name" => false
|
|
}
|
|
settings = default_settings.merge(options.fetch(:settings, {}))
|
|
settings_arguments = settings.collect { |k, v| "-E#{k}=#{v}" }
|
|
|
|
unless bootstrap_password_exists?
|
|
bootstrap_elastic_password
|
|
end
|
|
|
|
# Launch in the background and wait for /started/ stdout
|
|
cmd = "bin/elasticsearch #{settings_arguments.join(' ')}"
|
|
puts "Running elasticsearch: #{cmd}"
|
|
response = Belzebuth.run(cmd, { :directory => get_elasticsearch_path, :wait_condition => /license.*valid/, :timeout => 15 * 60 })
|
|
unless response.successful?
|
|
raise "Could not start Elasticsearch, response: #{response}"
|
|
end
|
|
|
|
start_es_xpack_trial
|
|
|
|
response
|
|
end
|
|
|
|
def start_es_xpack_trial
|
|
if elasticsearch_client.perform_request(:get, '_license').body['license']['type'] != 'trial'
|
|
resp = elasticsearch_client.perform_request(:post, '_license/start_trial', "acknowledge" => true)
|
|
if resp.body["trial_was_started"] != true
|
|
raise "Trial not started: #{resp.body}"
|
|
end
|
|
end
|
|
end
|
|
|
|
def bootstrap_elastic_password
|
|
# we can't use Belzebuth here since the library doesn't support STDIN injection
|
|
cmd = "bin/elasticsearch-keystore add bootstrap.password -f -x"
|
|
result = Dir.chdir(get_elasticsearch_path) do |dir|
|
|
_, status = Open3.capture2(cmd, :stdin_data => elastic_password)
|
|
status
|
|
end
|
|
unless result.success?
|
|
raise "Something went wrong when installing xpack,\ncmd: #{cmd}\nresponse: #{response}"
|
|
end
|
|
end
|
|
|
|
def bootstrap_password_exists?
|
|
cmd = "bin/elasticsearch-keystore list"
|
|
response = Belzebuth.run(cmd, { :directory => get_elasticsearch_path })
|
|
response.successful? && response.stdout_lines.any? { |line| line =~ /^bootstrap.password$/ }
|
|
end
|
|
|
|
def elasticsearch_xpack_installed?
|
|
cmd = "bin/elasticsearch-plugin list"
|
|
response = Belzebuth.run(cmd, { :directory => get_elasticsearch_path })
|
|
response.stdout_lines.any? { |line| line =~ /x-pack/ }
|
|
end
|
|
|
|
def elasticsearch_client(options = { :url => "http://elastic:#{elastic_password}@localhost:9200" })
|
|
Elasticsearch::Client.new(options)
|
|
end
|
|
|
|
def es_version
|
|
response = elasticsearch_client.perform_request(:get, "")
|
|
major, minor = response.body["version"]["number"].split(".")
|
|
[major.to_i, minor.to_i]
|
|
end
|
|
|
|
def push_elasticsearch_config(pipeline_id, config, version = "1")
|
|
major, minor = es_version
|
|
if major >= 8 || (major == 7 && minor >= 10)
|
|
elasticsearch_client.perform_request(:put, "_logstash/pipeline/#{pipeline_id}", {},
|
|
{ :pipeline => config, :username => "log.stash", :pipeline_metadata => {:version => version },
|
|
:pipeline_settings => {"pipeline.batch.delay": "50"}, :last_modified => Time.now.utc.iso8601})
|
|
else
|
|
elasticsearch_client.index :index => '.logstash', :type => "_doc", id: pipeline_id, :body => { :pipeline => config }
|
|
end
|
|
end
|
|
|
|
def cleanup_elasticsearch(index = MONITORING_INDEXES)
|
|
elasticsearch_client.indices.delete :index => index
|
|
elasticsearch_client.indices.refresh
|
|
end
|
|
|
|
def cleanup_system_indices(pipeline_ids)
|
|
major, minor = es_version
|
|
|
|
if major >= 8 || (major == 7 && minor >= 10)
|
|
pipeline_ids.each do |id|
|
|
begin
|
|
elasticsearch_client.perform_request(:delete, "_logstash/pipeline/#{id}")
|
|
rescue Elasticsearch::Transport::Transport::Errors::NotFound => e
|
|
puts ".logstash can be empty #{e.message}"
|
|
end
|
|
end
|
|
else
|
|
cleanup_elasticsearch(".logstash*")
|
|
end
|
|
|
|
elasticsearch_client.indices.refresh
|
|
end
|
|
|
|
def logstash_command_append(cmd, argument, value)
|
|
if cmd !~ /#{Regexp.escape(argument)}/
|
|
cmd << " #{argument} #{value}"
|
|
else
|
|
puts "Argument '#{argument}' already exist in the command: #{cmd}"
|
|
end
|
|
|
|
cmd
|
|
end
|
|
|
|
def logstash(cmd, options = {})
|
|
logstash_with_empty_default(cmd, options, {"xpack.monitoring.enabled" => true})
|
|
end
|
|
|
|
def logstash_with_empty_default(cmd, options = {}, default_settings = {})
|
|
temporary_settings = Stud::Temporary.directory
|
|
temporary_data = Stud::Temporary.directory
|
|
|
|
cmd = logstash_command_append(cmd, "--path.settings", temporary_settings)
|
|
cmd = logstash_command_append(cmd, "--path.data", temporary_data)
|
|
|
|
logstash_yaml = File.join(temporary_settings, "logstash.yml")
|
|
IO.write(logstash_yaml, YAML::dump(default_settings.merge(options.fetch(:settings, {}))))
|
|
FileUtils.cp(File.join(get_logstash_path, "config", "log4j2.properties"), File.join(temporary_settings, "log4j2.properties"))
|
|
|
|
puts "Running logstash with #{cmd} in #{get_logstash_path} with settings #{options.inspect}"
|
|
Belzebuth.run(cmd, {:directory => get_logstash_path }.merge(options.fetch(:belzebuth, { })))
|
|
end
|
|
|
|
def verify_response!(cmd, response)
|
|
unless response.successful?
|
|
raise "Something went wrong when installing xpack,\ncmd: #{cmd}\nresponse: #{response}"
|
|
end
|
|
end
|