Adding integration testing for filebeat and make the test a bit more flexible

The test includes theses scenario

- No TLS
- TLS Server Auth
- TLS Mutual Auth

Theses theses test relies on a certicate include in the fixtures to correctly run,
there is also a script to generate them.

Make the ServiceLocator class is now more flexible and use the pattern matching
to create the registry on the fly

Fixes #5887

Fixes #5917
This commit is contained in:
Pier-Hugues Pellerin 2016-09-08 14:03:46 -04:00 committed by Suyog Rao
parent 761ce30966
commit 3b5ac6008b
16 changed files with 361 additions and 17 deletions

4
qa/integration/.gitignore vendored Normal file
View file

@ -0,0 +1,4 @@
/services/filebeat
/fixtures/how.input
/services/elasticsearch
/services/kafka

View file

@ -0,0 +1,33 @@
---
services:
- filebeat
- logstash
config:
without_tls: |-
input {
beats {
port => 5044
}
}
tls_server_auth: |-
input {
beats {
ssl => true
port => 5044
ssl_certificate => '<%=options[:ssl_certificate]%>'
ssl_key => '<%=options[:ssl_key]%>'
}
}
tls_mutual_auth: |-
input {
beats {
ssl => true
port => 5044
ssl_certificate => '<%=options[:ssl_certificate]%>'
ssl_key => '<%=options[:ssl_key]%>'
ssl_verify_mode => "peer"
}
}
input: how_sample.input
setup_script: download_input.sh
teardown_script:

View file

@ -0,0 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIC/TCCAeWgAwIBAgIJANC15+/gKMqlMA0GCSqGSIb3DQEBCwUAMBQxEjAQBgNV
BAMMCWxvY2FsaG9zdDAgFw0xNjA5MTIxMzU5MzBaGA8yMTE2MDgxOTEzNTkzMFow
FDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB
CgKCAQEAza/8Zl6B7bYTQ+cMkHdX0Nu7yJxSx9Ymt9UKEM7YWoyg0EAqSjelzzLU
jeZnIWSMTdE+rbqBeG2GUA6eoT0N57KpGVPIiYGtq9r+mmmke/D3jG3q3OeKt3WE
ClvM1msjcgSQ+buF2Ew24yNbtTFPuSnbjTgTcJBGxVnQRD6PKH3654q+ydp2scp/
USZTAZL7p5JQbz6fNXwD67Uw5AZmHnu5djSHL6fdw2iw6UIXnaBLmxT8j/kX3WMj
1FxsTs38vJIYX3IDR/XJduk8+/9ru0t8fe6nckpVMzXFhokFk2dVNJVkii+lZqGf
AOsi7fePuFhj3PP+ms601q2CaCioDwIDAQABo1AwTjAdBgNVHQ4EFgQUnqDfyu95
7s73TyM0lwCIkGG80MkwHwYDVR0jBBgwFoAUnqDfyu957s73TyM0lwCIkGG80Mkw
DAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEACmrmIePvYzB9lEM7laFt
XDvOrH8mUCqGIF2S653GFIJULSmG5HdxiHyCzT8KL8XC0AJsUEQsMjV1wgW0Ttg/
OsT+69Tf1TCuPpYyz8C+fBEVZnjgKtfbVlc8cB1M/5y08v/eyn+Od30D44dYdgVC
IgIX8xNtcEb0QcRiO3dLBE8aoRMvdOK8AnQDSrHjbyLx/wFyTsgeF2P/duf7qfOc
ouYW6nQudZvb/UxiImJmYraZ8xSZUf0FuG53Z4HZHFeYJeMtgO0Myws2VeeGwmo3
/8P40pyaaGrde+cXb1M22XdI6ehcvyJ7pvft3wvNt269URKZSnW5GN6+pSP3vnhl
XA==
-----END CERTIFICATE-----

View file

@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDNr/xmXoHtthND
5wyQd1fQ27vInFLH1ia31QoQzthajKDQQCpKN6XPMtSN5mchZIxN0T6tuoF4bYZQ
Dp6hPQ3nsqkZU8iJga2r2v6aaaR78PeMberc54q3dYQKW8zWayNyBJD5u4XYTDbj
I1u1MU+5KduNOBNwkEbFWdBEPo8offrnir7J2naxyn9RJlMBkvunklBvPp81fAPr
tTDkBmYee7l2NIcvp93DaLDpQhedoEubFPyP+RfdYyPUXGxOzfy8khhfcgNH9cl2
6Tz7/2u7S3x97qdySlUzNcWGiQWTZ1U0lWSKL6VmoZ8A6yLt94+4WGPc8/6azrTW
rYJoKKgPAgMBAAECggEBAJDiqlQhtjwPyTpHgl4w8ra4UWpaUyVZACT1+AUm7te5
v31x9VgaIr30/CyIY5TuErat8gzGF3UREU9LPQouy6CuBNdGYTgbFI690CJM0ARO
FNHii1HX96qLKNtzFrLi/TDPbp+7K+jUpsUQnwTfG10+0tyC546e0nspbL7loknO
TsDF8JRtJAoP3Q018xeAVbqnzwMCRT8DR3Qj4Q91xPayK8QCZ/IpO9+79YNCkwez
AFxZPrhu0pgushgXW2aHFNaGWH3aw15ITcxjiZKfzjAvhHadNm4k5EznC0I3fvjf
CNb0YTOx5WiOpeaa2WTmM7cPqqKYR30ZKwlhSaoZFYECgYEA/9p8mSMS/6PzxgML
kIBRpBW9ba6NQDQTIZNBT9rcGSWHE26XUs+LGEhzhPOAFloRoxyxt35Z/WyuPzpW
wleHDCBDxzGKm2DX4dMnETMK1hrbApKoHIw2VGKTCt+4N9wGD/JB/5BeJIFWETO5
J7hgknUsRRQbobn8bnEnpPU6Ox8CgYEAzc4k1N/dk+URtcte4N0c+rM6lHrQKHQQ
f40SUR/Atoh0Vd/2k0QXGpBz6Vo3+icymnp5kVINEFS0v0xxwjM+Ln0lIYgfE0uw
29XLDhgA72sr4vXNd/WFOkctw37DIgyoZeZPFx4L1eH3sXDTfKQ2bbeQSKa2oosm
1ModklKY5RECgYAGiwPctrTprLuzOKi3DVEJLTUm9OvP1IUYurdU33f5fKgzqwRb
rcsU1+V8ZO3FpJAG59jzpFuQy5K5Mzi8x5hAiyKwmWe+CX/72naJThDc4Nrw+ecq
s7s/9TVldcs0QoA1MDDX5E1ECbq1vejiRyjRFgYHzjfWrvbxrBsjcNIIaQKBgQCG
/RF3dya03OkS3sPqpd1eKrTfsvZZslcn8nXzM+qJT1NPCEmJKn4k9F6yznQs5gzw
+ihGwxTCg2zSlOeAw2+jun3iUpBfbaQhbUd0hRYH87mjcipE5otuQEWqFrkS2k6S
F9AEj7afoUl/30s4U9VHfcFrhb4BO1hK9g8TbI3QoQKBgQD5rMVNynisRRVaM8Uh
dp0pf4NlDvAnL2xZAAFhZgNMCkR6luZzl9f4NPpLQYYg48ftbzDkYf/CGSA2ROCo
l34MFl51A2tX9dpNDGgEq0kunEFgfiOL0nC+oWoiKqexQpEmyep4i48Bt507MOld
abV3TDYKy6RlLfMTi14qZyYQvQ==
-----END PRIVATE KEY-----

View file

@ -0,0 +1,2 @@
#!/bin/sh
openssl req -subj '/CN=localhost/' -x509 -days $((100 * 365)) -batch -nodes -newkey rsa:2048 -keyout certificate.key -out certificate.crt

View file

@ -19,6 +19,6 @@ config: |-
#}
}
input: kafka_input.input
input: how_sample.input
setup_script: download_input.sh
teardown_script:

View file

@ -4,23 +4,47 @@ require_relative "../services/service_locator"
# bootstrapping services, dealing with config files, inputs etc
class Fixture
attr_reader :input
attr_reader :config
attr_reader :actual_output
attr_reader :test_dir
class TemplateContext
attr_reader :options
def initialize(options)
@options = options
end
def get_binding
binding
end
end
def initialize(test_file_location)
@test_file_location = test_file_location
@fixtures_dir = File.expand_path(File.join("..", "..", "fixtures"), __FILE__)
@settings = TestSettings.new(@test_file_location)
@service_locator = ServiceLocator.new(@settings)
setup_services
@config = @settings.get("config")
@input = File.join(@fixtures_dir, @settings.get("input")) if @settings.is_set?("input")
# this assumes current PWD.
# TODO: Remove this when we have an erb template for LS config so you can inject such stuff
@actual_output = @settings.get("actual_output")
end
def config(node = "root", options = nil)
if node == "root"
config = @settings.get("config")
else
config = @settings.get("config")[node]
end
if options != nil
ERB.new(config, nil, "-").result(TemplateContext.new(options).get_binding)
else
config
end
end
def get_service(name)
@service_locator.get_service(name)
end
@ -54,5 +78,4 @@ class Fixture
`#{script}`
end
end
end

View file

@ -15,9 +15,18 @@ class TestSettings
@suite_settings = YAML.load_file(@suite_settings_file)
# Per test settings, where one can override stuff and define test specific config
@test_settings = YAML.load_file(@tests_settings_file)
if is_set?("config")
config_string = get("config").gsub('\n','').split.join(" ")
@test_settings["config"] = config_string
if get("config").is_a?(Hash)
tmp = {}
get("config").each do |k, v|
tmp[k] = get("config")[k].gsub('\n','').split.join(" ")
end
@test_settings["config"] = tmp
else
config_string = get("config").gsub('\n','').split.join(" ")
@test_settings["config"] = config_string
end
end
end

View file

@ -10,9 +10,12 @@ Gem::Specification.new do |s|
# Files
s.test_files = s.files.grep(%r{^(test|spec|features)/})
# Gem dependencies
s.add_development_dependency 'elasticsearch'
s.add_development_dependency 'childprocess'
s.add_development_dependency 'rspec-wait'
s.add_development_dependency 'manticore'
end
s.add_development_dependency 'stud'
s.add_development_dependency 'pry'
end

View file

@ -0,0 +1,51 @@
# encoding: utf-8
class FilebeatService < Service
FILEBEAT_CMD = [File.join(File.dirname(__FILE__), "filebeat", "filebeat"), "-c"]
class BackgroundProcess
def initialize(cmd)
@client_out = Stud::Temporary.file
@client_out.sync
@process = ChildProcess.build(*cmd)
@process.duplex = true
@process.io.stdout = @process.io.stderr = @client_out
ChildProcess.posix_spawn = true
end
def start
@process.start
sleep(0.1)
self
end
def execution_output
@client_out.rewind
# can be used to helper debugging when a test fails
@execution_output = @client_out.read
end
def stop
begin
@process.poll_for_exit(5)
rescue ChildProcess::TimeoutError
Process.kill("KILL", @process.pid)
end
end
end
def initialize(settings)
super("filebeat", settings)
end
def run(config_path)
cmd = FILEBEAT_CMD.dup << config_path
puts "Starting Filebeat with #{cmd.join(" ")}"
@process = BackgroundProcess.new(cmd).start
end
def stop
@process.stop
end
end

View file

@ -0,0 +1,25 @@
#!/bin/bash
set -ex
current_dir="$(dirname "$0")"
if [ -n "${FILEBEAT_VERSION}" ]; then
echo "Filebeat version is $FILEBEAT_VERSION"
version=$FILEBEAT_VERSION
else
version=5.0.0-alpha5
fi
setup_fb() {
local version=$1
platform=`uname -s | tr '[:upper:]' '[:lower:]'`
architecture=`uname -m | tr '[:upper:]' '[:lower:]'`
download_url=https://download.elastic.co/beats/filebeat/filebeat-$version-$platform-$architecture.tar.gz
curl -sL $download_url > $current_dir/filebeat.tar.gz
mkdir $current_dir/filebeat
tar -xzf $current_dir/filebeat.tar.gz --strip-components=1 -C $current_dir/filebeat/.
rm $current_dir/filebeat.tar.gz
}
if [ ! -d $current_dir/filebeat ]; then
setup_fb $version
fi

View file

@ -1,7 +1,7 @@
require_relative "service"
class Kafka < Service
class KafkaService < Service
def initialize(settings)
super("kafka", settings)
end
end
end

View file

@ -5,7 +5,7 @@ require "bundler"
require "tempfile"
# A locally started Logstash service
class Logstash < Service
class LogstashService < Service
STDIN_CONFIG = "input {stdin {}} output { }"
RETRY_ATTEMPTS = 10
@ -95,7 +95,7 @@ class Logstash < Service
def teardown
if !@process.nil?
# todo: put this in a sleep-wait loop to kill it force kill
@process.io.stdin.close
@process.io.stdin.close rescue nil
@process.stop
@process = nil
end

View file

@ -1,19 +1,30 @@
require_relative "kafka"
require_relative "elasticsearch"
# encoding: utf-8
require_relative "service"
# This is a registry used in Fixtures so a test can get back any service class
# at runtime
# All new services should register here
class ServiceLocator
FILE_PATTERN = "_service.rb"
def initialize(settings)
@services = {}
@services["logstash"] = Logstash.new(settings)
@services["kafka"] = Kafka.new(settings)
@services["elasticsearch"] = ElasticsearchService.new(settings)
available_services do |name, klass|
@services[name] = klass.new(settings)
end
end
def get_service(name)
@services[name]
@services.fetch(name)
end
def available_services
Dir.glob(File.join(File.dirname(__FILE__), "*#{FILE_PATTERN}")).each do |f|
require f
basename = File.basename(f).gsub(/#{FILE_PATTERN}$/, "")
service_name = basename.downcase
klass = Object.const_get("#{service_name.capitalize}Service")
yield service_name, klass
end
end
end

View file

@ -0,0 +1,136 @@
require_relative '../framework/fixture'
require_relative '../framework/settings'
require "stud/temporary"
require "stud/try"
require "rspec/wait"
require "yaml"
require "fileutils"
describe "Beat Input", :integration => true do
before(:all) do
@fixture = Fixture.new(__FILE__)
end
after :each do
logstash_service.teardown
filebeat_service.stop
end
let(:max_retry) { 120 }
let(:registry_file) { Stud::Temporary.file.path }
let(:logstash_service) { @fixture.get_service("logstash") }
let(:filebeat_service) { @fixture.get_service("filebeat") }
let(:log_path) do
tmp_path = Stud::Temporary.file.path #get around ignore older completely
source = File.expand_path(@fixture.input)
FileUtils.cp(source, tmp_path)
tmp_path
end
let(:number_of_events) do
File.open(log_path, "r").readlines.size
end
shared_examples "send events" do
let(:filebeat_config_path) do
file = Stud::Temporary.file
file.write(YAML.dump(filebeat_config))
file.close
file.path
end
it "sucessfully send events" do
logstash_service.start_background(logstash_config)
process = filebeat_service.run(filebeat_config_path)
# It can take some delay for filebeat to connect to logstash and start sending data.
# Its possible that logstash isn't completely initialized here, we can get "Connection Refused"
begin
sleep(1) while (result = logstash_service.monitoring_api.event_stats).nil?
rescue
retry
end
Stud.try(max_retry.times, RSpec::Expectations::ExpectationNotMetError) do
result = logstash_service.monitoring_api.event_stats
expect(result["in"]).to eq(number_of_events)
end
end
end
context "Without TLS" do
let(:logstash_config) { @fixture.config("without_tls") }
let(:filebeat_config) do
{
"filebeat" => {
"prospectors" => [{ "paths" => [log_path], "input_type" => "log" }],
"registry_file" => registry_file,
"scan_frequency" => "1s"
},
"output" => {
"logstash" => { "hosts" => ["localhost:5044"] },
"logging" => { "level" => "debug" }
}
}
end
include_examples "send events"
end
context "With TLS" do
let(:certificate_directory) { File.expand_path(File.join(File.dirname(__FILE__), "..", "fixtures", "certificates")) }
let(:certificate) { File.join(certificate_directory, "certificate.crt") }
let(:ssl_key) { File.join(certificate_directory, "certificate.key") }
let(:certificate_authorities) { [certificate] }
context "Server auth" do
let(:logstash_config) { @fixture.config("tls_server_auth", { :ssl_certificate => certificate, :ssl_key => ssl_key}) }
let(:filebeat_config) do
{
"filebeat" => {
"prospectors" => [{ "paths" => [log_path], "input_type" => "log" }],
"registry_file" => registry_file,
"scan_frequency" => "1s"
},
"output" => {
"logstash" => {
"hosts" => ["localhost:5044"],
"tls" => {
"certificate_authorities" => certificate_authorities
}
},
"logging" => { "level" => "debug" }
}
}
end
include_examples "send events"
end
context "Mutual auth" do
let(:logstash_config) { @fixture.config("tls_mutual_auth", { :ssl_certificate => certificate, :ssl_key => ssl_key}) }
let(:filebeat_config) do
{
"filebeat" => {
"prospectors" => [{ "paths" => [log_path], "input_type" => "log" }],
"registry_file" => registry_file,
"scan_frequency" => "1s"
},
"output" => {
"logstash" => {
"hosts" => ["localhost:5044"],
"tls" => {
"certificate_authorities" => certificate_authorities,
"certificate" => certificate,
"certificate_key" => ssl_key
}
},
"logging" => { "level" => "debug" }
}
}
end
include_examples "send events"
end
end
end