logstash/qa/integration/specs/dlq_spec.rb
2017-09-13 12:53:34 +00:00

127 lines
4.5 KiB
Ruby

require_relative '../framework/fixture'
require_relative '../framework/settings'
require_relative '../services/logstash_service'
require_relative '../framework/helpers'
require "logstash/devutils/rspec/spec_helper"
describe "Test Dead Letter Queue" do
before(:all) {
@fixture = Fixture.new(__FILE__)
}
after(:all) {
@fixture.teardown
}
before(:each) {
IO.write(config_yaml_file, config_yaml)
}
after(:each) do
es_client = @fixture.get_service("elasticsearch").get_client
es_client.indices.delete(index: 'logstash-*') unless es_client.nil?
logstash_service.teardown
end
let(:logstash_service) { @fixture.get_service("logstash") }
let(:dlq_dir) { Stud::Temporary.directory }
let(:dlq_config) {
{
"dead_letter_queue.enable" => true,
"path.dead_letter_queue" => dlq_dir,
"log.level" => "debug"
}
}
let!(:config_yaml) { dlq_config.to_yaml }
let!(:config_yaml_file) { ::File.join(settings_dir, "logstash.yml") }
let!(:settings_dir) { Stud::Temporary.directory }
shared_examples_for "it can send 1000 documents to and index from the dlq" do
it 'should index all documents' do
es_service = @fixture.get_service("elasticsearch")
es_client = es_service.get_client
# test if all data was indexed by ES, but first refresh manually
es_client.indices.refresh
logstash_service.wait_for_logstash
try(60) do
begin
result = es_client.search(index: 'logstash-*', size: 0, q: '*')
hits = result["hits"]["total"]
rescue Elasticsearch::Transport::Transport::Errors::ServiceUnavailable => e
puts "Elasticsearch unavailable #{e.inspect}"
hits = 0
end
expect(hits).to eq(1000)
end
result = es_client.search(index: 'logstash-*', size: 1, q: '*')
s = result["hits"]["hits"][0]["_source"]
expect(s["mutated"]).to eq("true")
end
end
context 'using pipelines.yml' do
let!(:pipelines_yaml) { pipelines.to_yaml }
let!(:pipelines_yaml_file) { ::File.join(settings_dir, "pipelines.yml") }
before :each do
IO.write(pipelines_yaml_file, pipelines_yaml)
logstash_service.spawn_logstash("--path.settings", settings_dir, "--log.level=debug")
end
context 'with multiple pipelines' do
let(:pipelines) {[
{
"pipeline.id" => "test",
"pipeline.workers" => 1,
"dead_letter_queue.enable" => true,
"pipeline.batch.size" => 1,
"config.string" => "input { generator { message => '{\"test\":\"one\"}' codec => \"json\" count => 1000 } } filter { mutate { add_field => { \"geoip\" => \"somewhere\" } } } output { elasticsearch {} }"
},
{
"pipeline.id" => "test2",
"pipeline.workers" => 1,
"dead_letter_queue.enable" => false,
"pipeline.batch.size" => 1,
"config.string" => "input { dead_letter_queue { pipeline_id => 'test' path => \"#{dlq_dir}\" commit_offsets => true } } filter { mutate { remove_field => [\"geoip\"] add_field => {\"mutated\" => \"true\" } } } output { elasticsearch {} }"
}
]}
it_behaves_like 'it can send 1000 documents to and index from the dlq'
end
context 'with a single pipeline' do
let(:pipelines) {[
{
"pipeline.id" => "main",
"pipeline.workers" => 1,
"dead_letter_queue.enable" => true,
"pipeline.batch.size" => 1,
"config.string" => "
input { generator{ message => '{\"test\":\"one\"}' codec => \"json\" count => 1000 }
dead_letter_queue { path => \"#{dlq_dir}\" commit_offsets => true }
}
filter {
if ([geoip]) { mutate { remove_field => [\"geoip\"] add_field => { \"mutated\" => \"true\" } } }
else{ mutate { add_field => { \"geoip\" => \"somewhere\" } } }
}
output { elasticsearch {} }"
}
]}
it_behaves_like 'it can send 1000 documents to and index from the dlq'
end
end
context 'using logstash.yml and separate config file' do
let(:generator_config_file) { config_to_temp_file(@fixture.config("root",{ :dlq_dir => dlq_dir })) }
before :each do
logstash_service.start_background_with_config_settings(generator_config_file, settings_dir)
end
it_behaves_like 'it can send 1000 documents to and index from the dlq'
end
end