mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Disabled: * EmptyLineAfterGuardClause * EmptyLineAfterMultilineCondition * EmptyLinesAroundAccessModifier Enabled: * Layout/EmptyLineAfterMagicComment * Layout/EmptyLineBetweenDefs * Layout/EmptyLines * Layout/EmptyLinesAroundArguments * Layout/EmptyLinesAroundAttributeAccessor * Layout/EmptyLinesAroundBeginBody * Layout/EmptyLinesAroundBlockBody * Layout/EmptyLinesAroundExceptionHandlingKeywords * Layout/EmptyLinesAroundMethodBody * Layout/EmptyLinesAroundModuleBody
156 lines
5.3 KiB
Ruby
156 lines
5.3 KiB
Ruby
# Licensed to Elasticsearch B.V. under one or more contributor
|
|
# license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright
|
|
# ownership. Elasticsearch B.V. licenses this file to you under
|
|
# the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
require_relative '../framework/fixture'
|
|
require_relative '../framework/settings'
|
|
require_relative '../services/logstash_service'
|
|
require_relative '../framework/helpers'
|
|
require "logstash/devutils/rspec/spec_helper"
|
|
require "socket"
|
|
require "yaml"
|
|
|
|
describe "Test Logstash service when multiple pipelines are used" do
|
|
before(:all) {
|
|
@fixture = Fixture.new(__FILE__)
|
|
}
|
|
|
|
after(:all) {
|
|
@fixture.teardown
|
|
}
|
|
|
|
let(:temporary_out_file_1) { Stud::Temporary.pathname }
|
|
let(:temporary_out_file_2) { Stud::Temporary.pathname }
|
|
|
|
let(:pipelines) {[
|
|
{
|
|
"pipeline.id" => "test",
|
|
"pipeline.workers" => 1,
|
|
"pipeline.batch.size" => 1,
|
|
"config.string" => "input { generator { count => 1 } } output { file { path => \"#{temporary_out_file_1}\" } }"
|
|
},
|
|
{
|
|
"pipeline.id" => "test2",
|
|
"pipeline.workers" => 1,
|
|
"pipeline.batch.size" => 1,
|
|
"config.string" => "input { generator { count => 1 } } output { file { path => \"#{temporary_out_file_2}\" } }"
|
|
}
|
|
]}
|
|
|
|
let!(:settings_dir) { Stud::Temporary.directory }
|
|
let!(:pipelines_yaml) { pipelines.to_yaml }
|
|
let!(:pipelines_yaml_file) { ::File.join(settings_dir, "pipelines.yml") }
|
|
|
|
let(:retry_attempts) { 40 }
|
|
|
|
let(:pipelines_yaml_file_permissions) { 0644 }
|
|
|
|
before(:each) do
|
|
IO.write(pipelines_yaml_file, pipelines_yaml)
|
|
File.chmod(pipelines_yaml_file_permissions, pipelines_yaml_file)
|
|
end
|
|
|
|
it "executes the multiple pipelines" do
|
|
logstash_service = @fixture.get_service("logstash")
|
|
logstash_service.spawn_logstash("--path.settings", settings_dir, "--log.level=debug")
|
|
try(retry_attempts) do
|
|
expect(logstash_service.exited?).to be(true)
|
|
end
|
|
expect(logstash_service.exit_code).to eq(0)
|
|
expect(File.exist?(temporary_out_file_1)).to be(true)
|
|
expect(IO.readlines(temporary_out_file_1).size).to eq(1)
|
|
expect(File.exist?(temporary_out_file_2)).to be(true)
|
|
expect(IO.readlines(temporary_out_file_2).size).to eq(1)
|
|
end
|
|
|
|
context 'effectively-empty pipelines.yml file' do
|
|
let!(:pipelines_yaml) do
|
|
<<~EOYAML
|
|
# this yaml file contains
|
|
# only comments and
|
|
# is effectively empty
|
|
EOYAML
|
|
end
|
|
|
|
it 'exits with helpful guidance' do
|
|
logstash_service = @fixture.get_service("logstash")
|
|
status = logstash_service.run('--path.settings', settings_dir, '--log.level=debug')
|
|
expect(status.exit_code).to_not be_zero
|
|
expect(status.stderr_and_stdout).to include('Pipelines YAML file is empty')
|
|
end
|
|
end
|
|
|
|
context 'unreadable pipelines.yml file' do
|
|
let(:pipelines_yaml_file_permissions) { 000 }
|
|
|
|
it 'exits with helpful guidance' do
|
|
logstash_service = @fixture.get_service("logstash")
|
|
status = logstash_service.run('--path.settings', settings_dir, '--log.level=debug')
|
|
expect(status.exit_code).to_not be_zero
|
|
expect(status.stderr_and_stdout).to include('Failed to read pipelines yaml file', 'Permission denied')
|
|
end
|
|
end
|
|
|
|
context 'readable pipelines.yml with invalid YAML contents' do
|
|
let!(:pipelines_yaml) do
|
|
<<~EOYAML
|
|
- pipeline.id: my_id
|
|
pipeline.workers: 1
|
|
# note: indentation not aligned will cause YAML parse error
|
|
pipeline.ordered: true
|
|
EOYAML
|
|
end
|
|
|
|
it 'exits with helpful guidance' do
|
|
logstash_service = @fixture.get_service("logstash")
|
|
status = logstash_service.run('--path.settings', settings_dir, '--log.level=debug')
|
|
expect(status.exit_code).to_not be_zero
|
|
expect(status.stderr_and_stdout).to include('Failed to parse contents of pipelines yaml file', 'SyntaxError:')
|
|
end
|
|
end
|
|
|
|
describe "inter-pipeline communication" do
|
|
let(:count) { 2 }
|
|
let(:pipelines) do
|
|
[
|
|
{
|
|
"pipeline.id" => "test",
|
|
"config.string" => "input { generator { count => #{count} } } output { pipeline { send_to => testaddr } }"
|
|
},
|
|
{
|
|
"pipeline.id" => "test2",
|
|
"config.string" => "input { pipeline { address => testaddr } } output { file { path => \"#{temporary_out_file_1}\" flush_interval => 0} }"
|
|
}
|
|
]
|
|
end
|
|
it "can communicate between pipelines" do
|
|
logstash_service = @fixture.get_service("logstash")
|
|
logstash_service.spawn_logstash("--path.settings", settings_dir, "--log.level=debug")
|
|
logstash_service.wait_for_logstash
|
|
|
|
# Wait for LS to come up
|
|
i = 0
|
|
until File.exist?(temporary_out_file_1) && IO.readlines(temporary_out_file_1).size >= count
|
|
i += 1
|
|
sleep 1
|
|
break if i > 30
|
|
end
|
|
expect(IO.readlines(temporary_out_file_1).size).to eq(count)
|
|
|
|
puts "Done"
|
|
end
|
|
end
|
|
end
|