buildkite serverless test (#15150)

This commit adds a Buildkite pipeline to test against serverless endpoint daily

Tests cover 
- es-output
- es-input
- es-filter
- central pipeline management
- legacy monitoring
- dlq
- integration-filter
- kibana API
- metricbeat stack monitoring

Co-authored-by: João Duarte <jsvd@users.noreply.github.com>
Co-authored-by: João Duarte <jsvduarte@gmail.com>
This commit is contained in:
kaisecheng 2023-07-19 17:21:53 +01:00 committed by GitHub
parent 2165d43e1a
commit 0f8695593e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 733 additions and 7 deletions

View file

@ -0,0 +1,10 @@
#!/bin/bash
set -e
install_java() {
# TODO: let's think about regularly creating a custom image for Logstash which may align on version.yml definitions
sudo apt update && sudo apt install -y openjdk-17-jdk && sudo apt install -y openjdk-17-jre
}
install_java

View file

@ -1,3 +1,24 @@
agents:
provider: "gcp"
machineType: "n1-standard-4"
image: family/core-ubuntu-2204
steps:
- label: "Elasticsearch output test"
command: ./ci/serverless/serverless_core_rspec_tests.sh
- label: "DLQ rspec integration test"
command: ./.buildkite/scripts/setup_java.sh && ./ci/serverless/dlq_rspec_tests.sh
- label: "es-output test"
command: ./.buildkite/scripts/setup_java.sh && ./ci/serverless/es_output_tests.sh
- label: "es-input test"
command: ./.buildkite/scripts/setup_java.sh && ./ci/serverless/es_input_tests.sh
- label: "es-filter test"
command: ./.buildkite/scripts/setup_java.sh && ./ci/serverless/es_filter_tests.sh
- label: "elastic_integration filter test"
command: ./.buildkite/scripts/setup_java.sh && ./ci/serverless/elastic_integration_filter_tests.sh
- label: "central pipeline management test"
command: ./.buildkite/scripts/setup_java.sh && ./ci/serverless/cpm_tests.sh
- label: "Logstash legacy monitoring test"
command: ./.buildkite/scripts/setup_java.sh && ./ci/serverless/monitoring_tests.sh
- label: "Kibana API test"
command: ./.buildkite/scripts/setup_java.sh && ./ci/serverless/kibana_api_tests.sh
- label: "metricbeat stack monitoring test"
command: ./.buildkite/scripts/setup_java.sh && ./ci/serverless/metricbeat_monitoring_tests.sh

3
.gitignore vendored
View file

@ -62,4 +62,5 @@ lib/pluginmanager/plugin_aliases.yml
logstash-core/src/main/resources/org/logstash/plugins/plugin_aliases.yml
spec/unit/plugin_manager/plugin_aliases.yml
logstash-core/src/test/resources/org/logstash/plugins/plugin_aliases.yml
qa/integration/fixtures/logs_rollover/log4j2.properties
qa/integration/fixtures/logs_rollover/log4j2.properties
ci/serverless/config/*.log

151
ci/serverless/common.sh Executable file
View file

@ -0,0 +1,151 @@
#!/usr/bin/env bash
set -ex
export CURRENT_DIR="$(dirname "$0")"
export INDEX_NAME="serverless_it_${BUILDKITE_BUILD_NUMBER:-`date +%s`}"
# store all error messages
export ERR_MSGS=()
# numeric values representing the results of the checks. 0: pass, >0: fail
export CHECKS=()
setup_vault() {
vault_path=secret/ci/elastic-logstash/serverless-test
set +x
export ES_ENDPOINT=$(vault read -field=es_host "${vault_path}")
export ES_USER=$(vault read -field=es_user "${vault_path}")
export ES_PW=$(vault read -field=es_user_pw "${vault_path}")
export KB_ENDPOINT=$(vault read -field=kb_host "${vault_path}")
set -x
}
build_logstash() {
./gradlew clean bootstrap assemble installDefaultGems
}
index_test_data() {
curl -X POST -u "$ES_USER:$ES_PW" "$ES_ENDPOINT/$INDEX_NAME/_bulk" -H 'Content-Type: application/json' --data-binary @"$CURRENT_DIR/test_data/book.json"
}
# $1: check function
run_cpm_logstash() {
# copy log4j
cp "$CURRENT_DIR/../../config/log4j2.properties" "$CURRENT_DIR/config/log4j2.properties"
# run logstash
$CURRENT_DIR/../../bin/logstash --path.settings "$CURRENT_DIR/config" 2>/dev/null &
export LS_PID=$!
check_logstash_readiness
$1 # check function
kill "$LS_PID" || true
}
# $1: pipeline file
# $2: check function
# run_logstash 001_es-output.conf check_es_output
run_logstash() {
$CURRENT_DIR/../../bin/logstash -f "$1" 2>/dev/null &
export LS_PID=$!
check_logstash_readiness
$2 # check function
kill "$LS_PID" || true
}
# $1: number of try
# $n: check function with args - return non empty string as pass
count_down_check() {
count=$1
while ! [[ $("${@:2}") ]] && [[ $count -gt 0 ]]; do
count=$(( count - 1 ))
sleep 1
done
[[ $count -eq 0 ]] && echo "1" && return
echo "Passed check!"
echo "0"
}
check_logstash_readiness() {
curl_logstash() {
[[ $(curl --silent localhost:9600) ]] && echo "0"
}
check_readiness() {
count_down_check 120 curl_logstash
}
add_check check_readiness "Failed readiness check."
[[ "${CHECKS[-1]}" -eq "1" ]] && exit 1
echo "Logstash is Up !"
return 0
}
# $1: jq filter
# $2: expected value
# check_logstash_api '.pipelines.main.plugins.outputs[0].documents.successes' '1'
check_logstash_api() {
curl_node_stats() {
[[ $(curl --silent localhost:9600/_node/stats | jq "$1") -ge "$2" ]] && echo "0"
}
count_down_check 30 curl_node_stats "$1" "$2"
}
# add check result to CHECKS
# $1: check function - expected the last char of result to be 0 or positive number
# $2: err msg
add_check() {
FEATURE_CHECK=$($1)
FEATURE_CHECK="${FEATURE_CHECK: -1}"
ERR_MSGS+=("$2")
CHECKS+=("$FEATURE_CHECK")
}
# check log if the line contains [ERROR] or [FATAL] and does not relate to "unreachable"
check_err_log() {
LOG_FILE="$CURRENT_DIR/../../logs/logstash-plain.log"
LOG_CHECK=$(grep -E "\[ERROR\]|\[FATAL\]" "$LOG_FILE" | grep -cvE "unreachable|Connection refused") || true
ERR_MSGS+=("Found error in log")
CHECKS+=("$LOG_CHECK")
}
# if CHECKS[i] is 1, print ERR_MSGS[i]
print_result() {
for i in "${!CHECKS[@]}"; do
[[ "${CHECKS[$i]}" -gt 0 ]] && echo "${ERR_MSGS[$i]}" || true
done
}
# exit 1 if one of the checks fails
exit_with_code() {
for c in "${CHECKS[@]}"; do
[[ $c -gt 0 ]] && exit 1
done
exit 0
}
clean_up_and_get_result() {
[[ -n "$LS_PID" ]] && kill "$LS_PID" || true
check_err_log
print_result
exit_with_code
}
# common setup
setup() {
setup_vault
build_logstash
trap clean_up_and_get_result INT TERM EXIT
}

View file

@ -0,0 +1,10 @@
xpack.management.enabled: true
xpack.management.pipeline.id: ["gen_es"]
xpack.management.elasticsearch.username: ${ES_USER}
xpack.management.elasticsearch.password: ${ES_PW}
xpack.management.elasticsearch.hosts: ["${ES_ENDPOINT}"]
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.username: ${ES_USER}
xpack.monitoring.elasticsearch.password: ${ES_PW}
xpack.monitoring.elasticsearch.hosts: ["${ES_ENDPOINT}"]

48
ci/serverless/cpm_tests.sh Executable file
View file

@ -0,0 +1,48 @@
#!/usr/bin/env bash
set -ex
source ./$(dirname "$0")/common.sh
export PIPELINE_NAME='gen_es'
# update pipeline and check response code
index_pipeline() {
RESP_CODE=$(curl -s -w "%{http_code}" -X PUT -u "$ES_USER:$ES_PW" "$ES_ENDPOINT/_logstash/pipeline/$1" -H 'Content-Type: application/json' -d "$2")
if [[ $RESP_CODE -ge '400' ]]; then
echo "failed to update pipeline for Central Pipeline Management. Got $RESP_CODE from Elasticsearch"
exit 1
fi
}
# index pipeline to serverless ES
index_cpm_pipelines() {
index_pipeline "$PIPELINE_NAME" '{
"pipeline": "input { generator { count => 100 } } output { elasticsearch { hosts => \"${ES_ENDPOINT}\" user => \"${ES_USER}\" password => \"${ES_PW}\" index=> \"${INDEX_NAME}\" } }",
"last_modified": "2023-07-04T22:22:22.222Z",
"pipeline_metadata": { "version": "1"},
"username": "log.stash",
"pipeline_settings": {"pipeline.batch.delay": "50"}
}'
}
check_es_output() {
check_logstash_api '.pipelines.gen_es.plugins.outputs[0].documents.successes' '100'
}
check_plugin() {
add_check check_es_output "Failed central pipeline management check."
}
delete_pipeline() {
curl -u "$ES_USER:$ES_PW" -X DELETE "$ES_ENDPOINT/_logstash/pipeline/$PIPELINE_NAME" -H 'Content-Type: application/json';
}
cpm_clean_up_and_get_result() {
delete_pipeline
clean_up_and_get_result
}
setup
trap cpm_clean_up_and_get_result INT TERM EXIT
index_cpm_pipelines
run_cpm_logstash check_plugin

View file

@ -0,0 +1,22 @@
#!/usr/bin/env bash
set -ex
vault_path=secret/ci/elastic-logstash/serverless-test
export JRUBY_OPTS="-J-Xmx1g"
export SERVERLESS=true
set +x
export ES_ENDPOINT=$(vault read -field=es_host "${vault_path}")
export ES_USER=$(vault read -field=es_user "${vault_path}")
export ES_PW=$(vault read -field=es_user_pw "${vault_path}")
set -x
./gradlew clean bootstrap assemble installDefaultGems unpackTarDistribution
./gradlew :logstash-core:copyGemjar
export GEM_PATH=vendor/bundle/jruby/3.1.0
export GEM_HOME=vendor/bundle/jruby/3.1.0
vendor/jruby/bin/jruby -S bundle install --with development
vendor/jruby/bin/jruby -S bundle exec rspec -fd qa/integration/specs/dlq_spec.rb -e "using pipelines.yml"

View file

@ -0,0 +1,54 @@
#!/usr/bin/env bash
set -ex
source ./$(dirname "$0")/common.sh
deploy_ingest_pipeline() {
PIPELINE_RESP_CODE=$(curl -s -w "%{http_code}" -o /dev/null -X PUT -u "$ES_USER:$ES_PW" "$ES_ENDPOINT/_ingest/pipeline/integration-logstash_test.events-default" \
-H 'Content-Type: application/json' \
--data-binary @"$CURRENT_DIR/test_data/ingest_pipeline.json")
TEMPLATE_RESP_CODE=$(curl -s -w "%{http_code}" -o /dev/null -X PUT -u "$ES_USER:$ES_PW" "$ES_ENDPOINT/_index_template/logs-serverless-default-template" \
-H 'Content-Type: application/json' \
--data-binary @"$CURRENT_DIR/test_data/index_template.json")
# ingest pipeline is likely be there from the last run
# failing to update pipeline does not stop the test
if [[ $PIPELINE_RESP_CODE -ge '400' ]]; then
ERR_MSGS+=("Failed to update ingest pipeline. Got $PIPELINE_RESP_CODE")
fi
if [[ $TEMPLATE_RESP_CODE -ge '400' ]]; then
ERR_MSGS+=("Failed to update index template. Got $TEMPLATE_RESP_CODE")
fi
}
# processor should append 'serverless' to message
check_integration_filter() {
check_logstash_api '.pipelines.main.plugins.filters[] | select(.id == "mutate1") | .events.out' '1'
}
get_doc_msg_length() {
curl -s -u "$ES_USER:$ES_PW" "$ES_ENDPOINT/logs-$INDEX_NAME.004-default/_search?size=1" | jq '.hits.hits[0]._source.message | length'
}
# ensure no double run of ingest pipeline
# message = ['ok', 'serverless*']
validate_ds_doc() {
[[ $(get_doc_msg_length) -eq "2" ]] && echo "0"
}
check_doc_no_duplication() {
count_down_check 20 validate_ds_doc
}
check_plugin() {
add_check check_integration_filter "Failed ingest pipeline processor check."
add_check check_doc_no_duplication "Failed ingest pipeline duplication check."
}
setup
# install plugin
"$CURRENT_DIR/../../bin/logstash-plugin" install logstash-filter-elastic_integration
deploy_ingest_pipeline
run_logstash "$CURRENT_DIR/pipeline/004_integration-filter.conf" check_plugin

View file

@ -0,0 +1,16 @@
#!/usr/bin/env bash
set -ex
source ./$(dirname "$0")/common.sh
check_es_filter() {
check_logstash_api '.pipelines.main.plugins.filters[] | select(.id == "ok") | .events.out' '1'
}
check_plugin() {
add_check check_es_filter "Failed es-filter check."
}
setup
index_test_data
run_logstash "$CURRENT_DIR/pipeline/002_es-filter.conf" check_plugin

16
ci/serverless/es_input_tests.sh Executable file
View file

@ -0,0 +1,16 @@
#!/usr/bin/env bash
set -ex
source ./$(dirname "$0")/common.sh
check_es_input() {
check_logstash_api '.pipelines.main.plugins.inputs[0].events.out' '1'
}
check_plugin() {
add_check check_es_input "Failed es-input check."
}
setup
index_test_data
run_logstash "$CURRENT_DIR/pipeline/003_es-input.conf" check_plugin

View file

@ -0,0 +1,30 @@
#!/usr/bin/env bash
set -ex
source ./$(dirname "$0")/common.sh
check_named_index() {
check_logstash_api '.pipelines.main.plugins.outputs[] | select(.id == "named_index") | .documents.successes' '1'
}
get_data_stream_count() {
curl -s -u "$ES_USER:$ES_PW" "$ES_ENDPOINT/logs-$INDEX_NAME.001-default/_count" | jq '.count'
}
compare_data_stream_count() {
[[ $(get_data_stream_count) -ge "$INITIAL_DATA_STREAM_CNT" ]] && echo "0"
}
check_data_stream_output() {
count_down_check 20 compare_data_stream_count
}
check_plugin() {
add_check check_named_index "Failed index check."
add_check check_data_stream_output "Failed data stream check."
}
setup
export INITIAL_DATA_STREAM_CNT=$(get_data_stream_count)
run_logstash "$CURRENT_DIR/pipeline/001_es-output.conf" check_plugin

View file

@ -0,0 +1,61 @@
#!/usr/bin/env bash
set -ex
source ./$(dirname "$0")/common.sh
export PIPELINE_NAME="stdin_stdout"
export EXIT_CODE="0"
create_pipeline() {
RESP_CODE=$(curl -s -w "%{http_code}" -o /dev/null -X PUT -u "$ES_USER:$ES_PW" "$KB_ENDPOINT/api/logstash/pipeline/$PIPELINE_NAME" \
-H 'Content-Type: application/json' -H 'kbn-xsrf: logstash' \
--data-binary @"$CURRENT_DIR/test_data/$PIPELINE_NAME.json")
if [[ RESP_CODE -ge '400' ]]; then
EXIT_CODE=$(( EXIT_CODE + 1 ))
echo "Fail to create pipeline."
fi
}
get_pipeline() {
RESP_BODY=$(curl -s -X GET -u "$ES_USER:$ES_PW" "$KB_ENDPOINT/api/logstash/pipeline/$PIPELINE_NAME")
SOURCE_BODY=$(cat "$CURRENT_DIR/test_data/$PIPELINE_NAME.json")
if [[ $(echo "$RESP_BODY" | jq -r '.id') -ne "$PIPELINE_NAME" ]] ||\
[[ $(echo "$RESP_BODY" | jq -r '.pipeline') -ne $(echo "$SOURCE_BODY" | jq -r '.pipeline') ]] ||\
[[ $(echo "$RESP_BODY" | jq -r '.settings') -ne $(echo "$SOURCE_BODY" | jq -r '.settings') ]]; then
EXIT_CODE=$(( EXIT_CODE + 1 ))
echo "Fail to get pipeline."
fi
}
list_pipeline() {
RESP_BODY=$(curl -s -X GET -u "$ES_USER:$ES_PW" "$KB_ENDPOINT/api/logstash/pipelines" | jq --arg name "$PIPELINE_NAME" '.pipelines[] | select(.id==$name)' )
if [[ -z "$RESP_BODY" ]]; then
EXIT_CODE=$(( EXIT_CODE + 1 ))
echo "Fail to list pipeline."
fi
}
delete_pipeline() {
RESP_CODE=$(curl -s -w "%{http_code}" -o /dev/null -X DELETE -u "$ES_USER:$ES_PW" "$KB_ENDPOINT/api/logstash/pipeline/$PIPELINE_NAME" \
-H 'Content-Type: application/json' -H 'kbn-xsrf: logstash' \
--data-binary @"$CURRENT_DIR/test_data/$PIPELINE_NAME.json")
if [[ RESP_CODE -ge '400' ]]; then
EXIT_CODE=$(( EXIT_CODE + 1 ))
echo "Fail to delete pipeline."
fi
}
setup_vault
create_pipeline
get_pipeline
list_pipeline
delete_pipeline
exit $EXIT_CODE

View file

@ -0,0 +1,20 @@
metricbeat.config:
modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
output.elasticsearch:
hosts: ["${ES_ENDPOINT}"]
protocol: "https"
username: "${ES_USER}"
password: "${ES_PW}"
metricbeat.modules:
- module: logstash
metricsets:
- node
- node_stats
period: 10s
hosts:
- localhost:9600
xpack.enabled: true

View file

@ -0,0 +1,73 @@
#!/usr/bin/env bash
set -ex
source ./$(dirname "$0")/common.sh
get_cpu_arch() {
local arch=$(uname -m)
if [ "$arch" == "aarch64" ]; then
echo "arm64"
else
echo "$arch"
fi
}
export INDEX_NAME=".monitoring-logstash-8-mb"
export OS=$(uname -s | tr '[:upper:]' '[:lower:]')
export ARCH=$(get_cpu_arch)
export BEATS_VERSION=$(curl -s "https://api.github.com/repos/elastic/beats/tags" | jq -r '.[0].name' | cut -c 2-)
start_metricbeat() {
cd "$CURRENT_DIR"
MB_FILENAME="metricbeat-$BEATS_VERSION-$OS-$ARCH"
MB_DL_URL="https://artifacts.elastic.co/downloads/beats/metricbeat/$MB_FILENAME.tar.gz"
if [[ ! -d "$MB_FILENAME" ]]; then
curl -o "$MB_FILENAME.tar.gz" "$MB_DL_URL"
tar -zxf "$MB_FILENAME.tar.gz"
fi
chmod go-w "metricbeat/metricbeat.yml"
"$MB_FILENAME/metricbeat" -c "metricbeat/metricbeat.yml" &
export MB_PID=$!
cd -
}
stop_metricbeat() {
[[ -n "$MB_PID" ]] && kill "$MB_PID" || true
}
get_monitor_count() {
curl -s -u "$ES_USER:$ES_PW" "$ES_ENDPOINT/$INDEX_NAME/_count" | jq '.count'
}
compare_monitor_count() {
[[ $(get_monitor_count) -gt "$INITIAL_MONITOR_CNT" ]] && echo "0"
}
check_monitor_output() {
count_down_check 60 compare_monitor_count
}
check_plugin() {
add_check check_monitor_output "Failed metricbeat monitor check."
}
metricbeat_clean_up() {
exit_code=$?
ERR_MSGS+=("Unknown error!")
CHECKS+=("$exit_code")
stop_metricbeat
clean_up_and_get_result
}
setup
trap metricbeat_clean_up INT TERM EXIT
export INITIAL_MONITOR_CNT=$(get_monitor_count)
start_metricbeat
run_logstash "$CURRENT_DIR/pipeline/005_uptime.conf" check_plugin

View file

@ -0,0 +1,24 @@
#!/usr/bin/env bash
set -ex
source ./$(dirname "$0")/common.sh
get_monitor_count() {
curl -s -u "$ES_USER:$ES_PW" "$ES_ENDPOINT/.monitoring-logstash-7-*/_count" | jq '.count'
}
compare_monitor_count() {
[[ $(get_monitor_count) -gt "$INITIAL_MONITOR_CNT" ]] && echo "0"
}
check_monitor() {
count_down_check 20 compare_monitor_count
}
check() {
add_check check_monitor "Failed monitor check."
}
setup
export INITIAL_MONITOR_CNT=$(get_monitor_count)
run_cpm_logstash check

View file

@ -0,0 +1,27 @@
input {
heartbeat {
interval => 1
add_field => {
"[data_stream][type]" => "logs"
"[data_stream][dataset]" => "${INDEX_NAME}.001"
"[data_stream][namespace]" => "default"
}
}
}
output {
elasticsearch {
id => "named_index"
hosts => ["${ES_ENDPOINT}"]
user => "${ES_USER}"
password => "${ES_PW}"
index => "${INDEX_NAME}"
}
elasticsearch {
id => "data_stream"
hosts => ["${ES_ENDPOINT}"]
user => "${ES_USER}"
password => "${ES_PW}"
}
}

View file

@ -0,0 +1,26 @@
input {
heartbeat {
interval => 1
}
}
filter {
elasticsearch {
hosts => ["${ES_ENDPOINT}"]
user => "${ES_USER}"
password => "${ES_PW}"
index => "${INDEX_NAME}"
query => "*"
add_field => {"check" => "good"}
}
if [check] == "good" {
mutate { id => "ok" }
}
}
output {
stdout {
codec => dots
}
}

View file

@ -0,0 +1,15 @@
input {
elasticsearch {
hosts => ["${ES_ENDPOINT}"]
user => "${ES_USER}"
password => "${ES_PW}"
index => "${INDEX_NAME}"
size => 100
schedule => "*/10 * * * * *"
}
}
output {
stdout {
codec => dots
}
}

View file

@ -0,0 +1,35 @@
input {
heartbeat {
interval => 1
add_field => {
"[data_stream][type]" => "logs"
"[data_stream][dataset]" => "${INDEX_NAME}.004"
"[data_stream][namespace]" => "default"
}
}
}
filter {
elastic_integration {
hosts => "${ES_ENDPOINT}"
username => "${ES_USER}"
password => "${ES_PW}"
remove_field => ["_version"]
add_field => {"ingested" => "ok"}
}
if ([ingested] == "ok") and ([message][1] =~ 'serverless' ) {
mutate { id => "mutate1" }
}
}
output {
stdout {
codec => dots
}
elasticsearch {
id => "data_stream"
hosts => ["${ES_ENDPOINT}"]
user => "${ES_USER}"
password => "${ES_PW}"
}
}

View file

@ -0,0 +1,8 @@
input {
exec { command => 'uptime' interval => 10 }
}
output {
stdout {
codec => dots
}
}

View file

@ -0,0 +1,14 @@
{"index": {}}
{"book_name": "The Great Gatsby", "author": "F. Scott Fitzgerald", "@timestamp": "1925-04-10T00:00:00"}
{"index": {}}
{"book_name": "To Kill a Mockingbird", "author": "Harper Lee", "@timestamp": "1960-07-11T00:00:00"}
{"index": {}}
{"book_name": "1984", "author": "George Orwell", "@timestamp": "1949-06-08T00:00:00"}
{"index": {}}
{"book_name": "Pride and Prejudice", "author": "Jane Austen", "@timestamp": "1813-01-28T00:00:00"}
{"index": {}}
{"book_name": "The Catcher in the Rye", "author": "J.D. Salinger", "@timestamp": "1951-07-16T00:00:00"}
{"index": {}}
{"book_name": "Moby Dick", "author": "Herman Melville", "@timestamp": "1851-10-18T00:00:00"}
{"index": {}}
{"book_name": "The Lord of the Rings", "author": "J.R.R. Tolkien", "@timestamp": "1954-07-29T00:00:00"}

View file

@ -0,0 +1,11 @@
{
"index_patterns": ["logs-serverless*"],
"data_stream": { },
"priority": 500,
"template": {
"settings": {
"index.default_pipeline": "integration-logstash_test.events-default",
"index.lifecycle.name": "logs"
}
}
}

View file

@ -0,0 +1,10 @@
{
"processors": [
{
"append": {
"field": "message",
"value": ["serverless test ^_^ "]
}
}
]
}

View file

@ -0,0 +1,6 @@
{
"pipeline": "input { stdin {} } output { stdout {} }",
"settings": {
"queue.type": "persisted"
}
}

View file

@ -16,6 +16,7 @@
# under the License.
require_relative "../services/service_locator"
require_relative '../specs/spec_helper'
# A class that holds all fixtures for a given test file. This deals with
# bootstrapping services, dealing with config files, inputs etc
@ -43,7 +44,7 @@ class Fixture
@test_file_location = test_file_location
@settings = TestSettings.new(@test_file_location)
@service_locator = ServiceLocator.new(@settings)
setup_services
setup_services unless serverless?
@input = File.join(FIXTURES_DIR, @settings.get("input")) if @settings.is_set?("input")
@actual_output = @settings.get("actual_output")
end

View file

@ -16,6 +16,7 @@
# under the License.
require 'elasticsearch'
require_relative '../specs/spec_helper'
class ElasticsearchService < Service
def initialize(settings)
@ -23,6 +24,10 @@ class ElasticsearchService < Service
end
def get_client
@client ||= Elasticsearch::Client.new(
:hosts => ENV["ES_ENDPOINT"],
:user => ENV["ES_USER"], :password => ENV["ES_PW"]) if serverless?
@client ||= Elasticsearch::Client.new(:hosts => "localhost:9200")
end
end

View file

@ -62,6 +62,13 @@ describe "Test Dead Letter Queue" do
let!(:config_yaml_file) { ::File.join(settings_dir, "logstash.yml") }
let!(:settings_dir) { Stud::Temporary.directory }
let(:serverless_es_config) do
if serverless?
" hosts => '${ES_ENDPOINT}' user => '${ES_USER}' password => '${ES_PW}' "
else
""
end
end
shared_examples_for "it can send 1000 documents to and index from the dlq" do
it 'should index all documents' do
@ -106,14 +113,14 @@ describe "Test Dead Letter Queue" do
"pipeline.workers" => 1,
"dead_letter_queue.enable" => true,
"pipeline.batch.size" => 100,
"config.string" => "input { generator { message => '#{message}' codec => \"json\" count => 1000 } } output { elasticsearch { index => \"test-index\" } }"
"config.string" => "input { generator { message => '#{message}' codec => \"json\" count => 1000 } } output { elasticsearch { index => \"test-index\" #{serverless_es_config} } }"
},
{
"pipeline.id" => "test2",
"pipeline.workers" => 1,
"dead_letter_queue.enable" => false,
"pipeline.batch.size" => 100,
"config.string" => "input { dead_letter_queue { pipeline_id => 'test' path => \"#{dlq_dir}\" commit_offsets => true } } filter { mutate { remove_field => [\"ip\"] add_field => {\"mutated\" => \"true\" } } } output { elasticsearch { index => \"test-index\" } }"
"config.string" => "input { dead_letter_queue { pipeline_id => 'test' path => \"#{dlq_dir}\" commit_offsets => true } } filter { mutate { remove_field => [\"ip\"] add_field => {\"mutated\" => \"true\" } } } output { elasticsearch { index => \"test-index\" #{serverless_es_config} } }"
}
]}
@ -134,7 +141,7 @@ describe "Test Dead Letter Queue" do
filter {
if ([ip]) { mutate { remove_field => [\"ip\"] add_field => { \"mutated\" => \"true\" } } }
}
output { elasticsearch { index => \"test-index\" } }"
output { elasticsearch { index => \"test-index\" #{serverless_es_config} } }"
}
]}

View file

@ -25,6 +25,10 @@ def clean_es(es_client)
es_client.indices.refresh
end
def serverless?
ENV["SERVERLESS"] == "true"
end
RSpec.configure do |config|
if RbConfig::CONFIG["host_os"] != "linux"
exclude_tags = { :linux => true }