mirror of
https://github.com/elastic/logstash.git
synced 2025-06-28 09:46:03 -04:00
* Additional cleanify changes to ls2ls integ tests: replace heartbeat-input with reload option, set queue drain to get consistent result.
107 lines
4.3 KiB
Ruby
107 lines
4.3 KiB
Ruby
# Licensed to Elasticsearch B.V. under one or more contributor
|
|
# license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright
|
|
# ownership. Elasticsearch B.V. licenses this file to you under
|
|
# the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
require_relative '../framework/fixture'
|
|
require_relative '../framework/settings'
|
|
require_relative '../framework/helpers'
|
|
require_relative '../services/logstash_service'
|
|
|
|
require 'stud/temporary'
|
|
require 'logstash/devutils/rspec/spec_helper'
|
|
|
|
describe "Logstash to Logstash communication Integration test" do
|
|
|
|
before(:all) {
|
|
@fixture = Fixture.new(__FILE__)
|
|
}
|
|
|
|
after(:all) {
|
|
@fixture.teardown
|
|
}
|
|
|
|
def get_temp_path_dir
|
|
tmp_path = Stud::Temporary.pathname
|
|
tmp_data_path = File.join(tmp_path, "data")
|
|
FileUtils.mkdir_p(tmp_data_path)
|
|
tmp_data_path
|
|
end
|
|
|
|
def run_logstash_instance(config_name, options = {}, &block)
|
|
@next_api_port_offset = (@next_api_port_offset||100).next.modulo(1000) # cycle through 1000 possibles
|
|
api_port = 9600 + @next_api_port_offset
|
|
|
|
# to avoid LogstashService's clean-from-tarball default behaviour, we need
|
|
# to tell it where our LOGSTASH_HOME is in the existing service
|
|
existing_fixture_logstash_home = @fixture.get_service("logstash").logstash_home
|
|
logstash_service = LogstashService.new(@fixture.settings.override("ls_home_abs_path" => existing_fixture_logstash_home), api_port)
|
|
|
|
logstash_service.spawn_logstash("--node.name", config_name,
|
|
"--pipeline.id", config_name,
|
|
"--path.config", config_to_temp_file(@fixture.config(config_name, options)),
|
|
"--path.data", get_temp_path_dir,
|
|
"--api.http.port", api_port.to_s,
|
|
"--config.reload.automatic")
|
|
logstash_service.wait_for_rest_api
|
|
yield logstash_service
|
|
ensure
|
|
logstash_service.teardown
|
|
end
|
|
|
|
let(:num_retries) { 60 }
|
|
let(:num_events) { 1003 }
|
|
let(:config_options) {
|
|
{ :generator_count => num_events }
|
|
}
|
|
|
|
shared_examples "send events" do
|
|
let(:output_file_path_with_datetime) { "#{output_file_path}_#{DateTime.now.new_offset(0).strftime('%Y_%m_%d_%H_%M_%S')}" }
|
|
let(:all_config_options) {
|
|
config_options.merge({ :output_file_path => output_file_path_with_datetime })
|
|
}
|
|
|
|
it "successfully send events" do
|
|
run_logstash_instance(input_config_name, all_config_options) do |downstream_logstash_service|
|
|
run_logstash_instance(output_config_name, all_config_options) do |upstream_logstash_service|
|
|
|
|
try(num_retries) do
|
|
downstream_event_stats = downstream_logstash_service.monitoring_api.event_stats
|
|
expect(downstream_event_stats).to include({"in" => num_events}), lambda { "expected #{num_events} events to have been received by downstream" }
|
|
expect(downstream_event_stats).to include({"out" => num_events}), lambda { "expected #{num_events} events to have been processed by downstream" }
|
|
end
|
|
|
|
# make sure received events are in the file
|
|
file_output_path = File.join(downstream_logstash_service.logstash_home, output_file_path_with_datetime)
|
|
expect(File).to exist(file_output_path), "Logstash to Logstash output file: #{file_output_path} does not exist"
|
|
actual_lines = File.read(file_output_path).lines.to_a
|
|
expected_lines = (0...num_events).map { |sequence| "#{sequence}:Hello world!\n" }
|
|
expect(actual_lines).to match_array(expected_lines)
|
|
|
|
File.delete(file_output_path)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
context "with baseline configs" do
|
|
let(:input_config_name) { "basic_ls_input" }
|
|
let(:output_config_name) { "basic_ls_output" }
|
|
let(:output_file_path) { "basic_ls_to_ls.output" }
|
|
|
|
include_examples "send events"
|
|
end
|
|
|
|
end
|