mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
DLQ integration test improvements
Added single pipeline tests Fixes #8026
This commit is contained in:
parent
b1b3a117a6
commit
d1aa2864c6
1 changed files with 76 additions and 63 deletions
|
@ -20,8 +20,8 @@ describe "Test Dead Letter Queue" do
|
|||
end
|
||||
}
|
||||
|
||||
let(:logstash_service) { @fixture.get_service("logstash") }
|
||||
let(:dlq_dir) { Stud::Temporary.directory }
|
||||
|
||||
let(:dlq_config) {
|
||||
{
|
||||
"dead_letter_queue.enable" => true,
|
||||
|
@ -29,78 +29,91 @@ describe "Test Dead Letter Queue" do
|
|||
"log.level" => "debug"
|
||||
}
|
||||
}
|
||||
|
||||
let!(:settings_dir) { Stud::Temporary.directory }
|
||||
let!(:mp_settings_dir) { Stud::Temporary.directory }
|
||||
let!(:config_yaml) { dlq_config.to_yaml }
|
||||
let!(:config_yaml_file) { ::File.join(settings_dir, "logstash.yml") }
|
||||
let(:generator_config_file) { config_to_temp_file(@fixture.config("root",{ :dlq_dir => dlq_dir })) }
|
||||
|
||||
let!(:pipelines_yaml) { pipelines.to_yaml }
|
||||
let!(:pipelines_yaml_file) { ::File.join(settings_dir, "pipelines.yml") }
|
||||
let!(:settings_dir) { Stud::Temporary.directory }
|
||||
|
||||
it 'can index 1000 documents via dlq - single pipeline' do
|
||||
logstash_service = @fixture.get_service("logstash")
|
||||
logstash_service.start_background_with_config_settings(generator_config_file, settings_dir)
|
||||
es_service = @fixture.get_service("elasticsearch")
|
||||
es_client = es_service.get_client
|
||||
# Wait for es client to come up
|
||||
sleep(10)
|
||||
# now we test if all data was indexed by ES, but first refresh manually
|
||||
es_client.indices.refresh
|
||||
shared_examples_for "it can send 1000 documents to and index from the dlq" do
|
||||
it 'should index all documents' do
|
||||
es_service = @fixture.get_service("elasticsearch")
|
||||
es_client = es_service.get_client
|
||||
# Wait for es client to come up
|
||||
sleep(15)
|
||||
# test if all data was indexed by ES, but first refresh manually
|
||||
es_client.indices.refresh
|
||||
|
||||
logstash_service.wait_for_logstash
|
||||
try(50) do
|
||||
result = es_client.search(index: 'logstash-*', size: 0, q: '*')
|
||||
expect(result["hits"]["total"]).to eq(1000)
|
||||
logstash_service.wait_for_logstash
|
||||
try(75) do
|
||||
result = es_client.search(index: 'logstash-*', size: 0, q: '*')
|
||||
expect(result["hits"]["total"]).to eq(1000)
|
||||
end
|
||||
|
||||
result = es_client.search(index: 'logstash-*', size: 1, q: '*')
|
||||
s = result["hits"]["hits"][0]["_source"]
|
||||
expect(s["mutated"]).to eq("true")
|
||||
end
|
||||
|
||||
# randomly checked for results and structured fields
|
||||
result = es_client.search(index: 'logstash-*', size: 1, q: '*')
|
||||
s = result["hits"]["hits"][0]["_source"]
|
||||
expect(s["mutated"]).to eq("true")
|
||||
end
|
||||
|
||||
let(:pipelines) {[
|
||||
{
|
||||
"pipeline.id" => "test",
|
||||
"pipeline.workers" => 1,
|
||||
"dead_letter_queue.enable" => true,
|
||||
"pipeline.batch.size" => 1,
|
||||
"config.string" => "input { generator { message => '{\"test\":\"one\"}' codec => \"json\" count => 1000 } } filter { mutate { add_field => { \"geoip\" => \"somewhere\" } } } output { elasticsearch {} }"
|
||||
},
|
||||
{
|
||||
"pipeline.id" => "test2",
|
||||
"pipeline.workers" => 1,
|
||||
"dead_letter_queue.enable" => false,
|
||||
"pipeline.batch.size" => 1,
|
||||
"config.string" => "input { dead_letter_queue { pipeline_id => 'test' path => \"#{dlq_dir}\" commit_offsets => true } } filter { mutate { remove_field => [\"geoip\"] add_field => {\"mutated\" => \"true\" } } } output { elasticsearch {} }"
|
||||
}
|
||||
]}
|
||||
context 'using pipelines.yml' do
|
||||
let!(:pipelines_yaml) { pipelines.to_yaml }
|
||||
let!(:pipelines_yaml_file) { ::File.join(settings_dir, "pipelines.yml") }
|
||||
|
||||
let!(:pipelines_yaml) { pipelines.to_yaml }
|
||||
let!(:pipelines_yaml_file) { ::File.join(settings_dir, "pipelines.yml") }
|
||||
|
||||
|
||||
it 'can index 1000 documents via dlq - multi pipeline' do
|
||||
IO.write(pipelines_yaml_file, pipelines_yaml)
|
||||
logstash_service = @fixture.get_service("logstash")
|
||||
logstash_service.spawn_logstash("--path.settings", settings_dir, "--log.level=debug")
|
||||
es_service = @fixture.get_service("elasticsearch")
|
||||
es_client = es_service.get_client
|
||||
# Wait for es client to come up
|
||||
sleep(10)
|
||||
# test if all data was indexed by ES, but first refresh manually
|
||||
es_client.indices.refresh
|
||||
|
||||
logstash_service.wait_for_logstash
|
||||
try(50) do
|
||||
result = es_client.search(index: 'logstash-*', size: 0, q: '*')
|
||||
expect(result["hits"]["total"]).to eq(1000)
|
||||
before :each do
|
||||
IO.write(pipelines_yaml_file, pipelines_yaml)
|
||||
logstash_service.spawn_logstash("--path.settings", settings_dir, "--log.level=debug")
|
||||
end
|
||||
|
||||
result = es_client.search(index: 'logstash-*', size: 1, q: '*')
|
||||
s = result["hits"]["hits"][0]["_source"]
|
||||
expect(s["mutated"]).to eq("true")
|
||||
context 'with multiple pipelines' do
|
||||
let(:pipelines) {[
|
||||
{
|
||||
"pipeline.id" => "test",
|
||||
"pipeline.workers" => 1,
|
||||
"dead_letter_queue.enable" => true,
|
||||
"pipeline.batch.size" => 1,
|
||||
"config.string" => "input { generator { message => '{\"test\":\"one\"}' codec => \"json\" count => 1000 } } filter { mutate { add_field => { \"geoip\" => \"somewhere\" } } } output { elasticsearch {} }"
|
||||
},
|
||||
{
|
||||
"pipeline.id" => "test2",
|
||||
"pipeline.workers" => 1,
|
||||
"dead_letter_queue.enable" => false,
|
||||
"pipeline.batch.size" => 1,
|
||||
"config.string" => "input { dead_letter_queue { pipeline_id => 'test' path => \"#{dlq_dir}\" commit_offsets => true } } filter { mutate { remove_field => [\"geoip\"] add_field => {\"mutated\" => \"true\" } } } output { elasticsearch {} }"
|
||||
}
|
||||
]}
|
||||
|
||||
it_behaves_like 'it can send 1000 documents to and index from the dlq'
|
||||
end
|
||||
|
||||
context 'with a single pipeline' do
|
||||
let(:pipelines) {[
|
||||
{
|
||||
"pipeline.id" => "main",
|
||||
"pipeline.workers" => 1,
|
||||
"dead_letter_queue.enable" => true,
|
||||
"pipeline.batch.size" => 1,
|
||||
"config.string" => "
|
||||
input { generator{ message => '{\"test\":\"one\"}' codec => \"json\" count => 1000 }
|
||||
dead_letter_queue { path => \"#{dlq_dir}\" commit_offsets => true }
|
||||
}
|
||||
filter {
|
||||
if ([geoip]) { mutate { remove_field => [\"geoip\"] add_field => { \"mutated\" => \"true\" } } }
|
||||
else{ mutate { add_field => { \"geoip\" => \"somewhere\" } } }
|
||||
}
|
||||
output { elasticsearch {} }"
|
||||
}
|
||||
]}
|
||||
|
||||
it_behaves_like 'it can send 1000 documents to and index from the dlq'
|
||||
end
|
||||
end
|
||||
|
||||
context 'using logstash.yml and separate config file' do
|
||||
let(:generator_config_file) { config_to_temp_file(@fixture.config("root",{ :dlq_dir => dlq_dir })) }
|
||||
|
||||
before :each do
|
||||
logstash_service.start_background_with_config_settings(generator_config_file, settings_dir)
|
||||
end
|
||||
it_behaves_like 'it can send 1000 documents to and index from the dlq'
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue