From 0f8695593e59fbc55402570cd3c73ece92d354d3 Mon Sep 17 00:00:00 2001 From: kaisecheng <69120390+kaisecheng@users.noreply.github.com> Date: Wed, 19 Jul 2023 17:21:53 +0100 Subject: [PATCH] buildkite serverless test (#15150) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit adds a Buildkite pipeline to test against serverless endpoint daily Tests cover - es-output - es-input - es-filter - central pipeline management - legacy monitoring - dlq - integration-filter - kibana API - metricbeat stack monitoring Co-authored-by: João Duarte Co-authored-by: João Duarte --- .buildkite/scripts/setup_java.sh | 10 ++ .../serverless_integration_pipeline.yml | 25 ++- .gitignore | 3 +- ci/serverless/common.sh | 151 ++++++++++++++++++ ci/serverless/config/logstash.yml | 10 ++ ci/serverless/cpm_tests.sh | 48 ++++++ ci/serverless/dlq_rspec_tests.sh | 22 +++ .../elastic_integration_filter_tests.sh | 54 +++++++ ci/serverless/es_filter_tests.sh | 16 ++ ci/serverless/es_input_tests.sh | 16 ++ ci/serverless/es_output_tests.sh | 30 ++++ ci/serverless/kibana_api_tests.sh | 61 +++++++ ci/serverless/metricbeat/metricbeat.yml | 20 +++ ci/serverless/metricbeat_monitoring_tests.sh | 73 +++++++++ ci/serverless/monitoring_tests.sh | 24 +++ ci/serverless/pipeline/001_es-output.conf | 27 ++++ ci/serverless/pipeline/002_es-filter.conf | 26 +++ ci/serverless/pipeline/003_es-input.conf | 15 ++ .../pipeline/004_integration-filter.conf | 35 ++++ ci/serverless/pipeline/005_uptime.conf | 8 + ci/serverless/test_data/book.json | 14 ++ ci/serverless/test_data/index_template.json | 11 ++ ci/serverless/test_data/ingest_pipeline.json | 10 ++ ci/serverless/test_data/stdin_stdout.json | 6 + qa/integration/framework/fixture.rb | 3 +- .../services/elasticsearch_service.rb | 5 + qa/integration/specs/dlq_spec.rb | 13 +- qa/integration/specs/spec_helper.rb | 4 + 28 files changed, 733 insertions(+), 7 deletions(-) create mode 100755 .buildkite/scripts/setup_java.sh create mode 100755 ci/serverless/common.sh create mode 100644 ci/serverless/config/logstash.yml create mode 100755 ci/serverless/cpm_tests.sh create mode 100755 ci/serverless/dlq_rspec_tests.sh create mode 100755 ci/serverless/elastic_integration_filter_tests.sh create mode 100755 ci/serverless/es_filter_tests.sh create mode 100755 ci/serverless/es_input_tests.sh create mode 100755 ci/serverless/es_output_tests.sh create mode 100755 ci/serverless/kibana_api_tests.sh create mode 100644 ci/serverless/metricbeat/metricbeat.yml create mode 100755 ci/serverless/metricbeat_monitoring_tests.sh create mode 100755 ci/serverless/monitoring_tests.sh create mode 100644 ci/serverless/pipeline/001_es-output.conf create mode 100644 ci/serverless/pipeline/002_es-filter.conf create mode 100644 ci/serverless/pipeline/003_es-input.conf create mode 100644 ci/serverless/pipeline/004_integration-filter.conf create mode 100644 ci/serverless/pipeline/005_uptime.conf create mode 100644 ci/serverless/test_data/book.json create mode 100644 ci/serverless/test_data/index_template.json create mode 100644 ci/serverless/test_data/ingest_pipeline.json create mode 100644 ci/serverless/test_data/stdin_stdout.json diff --git a/.buildkite/scripts/setup_java.sh b/.buildkite/scripts/setup_java.sh new file mode 100755 index 000000000..6a0ae591d --- /dev/null +++ b/.buildkite/scripts/setup_java.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +set -e + +install_java() { + # TODO: let's think about regularly creating a custom image for Logstash which may align on version.yml definitions + sudo apt update && sudo apt install -y openjdk-17-jdk && sudo apt install -y openjdk-17-jre +} + +install_java diff --git a/.buildkite/serverless_integration_pipeline.yml b/.buildkite/serverless_integration_pipeline.yml index 03598e045..fc0ad48a6 100644 --- a/.buildkite/serverless_integration_pipeline.yml +++ b/.buildkite/serverless_integration_pipeline.yml @@ -1,3 +1,24 @@ +agents: + provider: "gcp" + machineType: "n1-standard-4" + image: family/core-ubuntu-2204 + steps: - - label: "Elasticsearch output test" - command: ./ci/serverless/serverless_core_rspec_tests.sh + - label: "DLQ rspec integration test" + command: ./.buildkite/scripts/setup_java.sh && ./ci/serverless/dlq_rspec_tests.sh + - label: "es-output test" + command: ./.buildkite/scripts/setup_java.sh && ./ci/serverless/es_output_tests.sh + - label: "es-input test" + command: ./.buildkite/scripts/setup_java.sh && ./ci/serverless/es_input_tests.sh + - label: "es-filter test" + command: ./.buildkite/scripts/setup_java.sh && ./ci/serverless/es_filter_tests.sh + - label: "elastic_integration filter test" + command: ./.buildkite/scripts/setup_java.sh && ./ci/serverless/elastic_integration_filter_tests.sh + - label: "central pipeline management test" + command: ./.buildkite/scripts/setup_java.sh && ./ci/serverless/cpm_tests.sh + - label: "Logstash legacy monitoring test" + command: ./.buildkite/scripts/setup_java.sh && ./ci/serverless/monitoring_tests.sh + - label: "Kibana API test" + command: ./.buildkite/scripts/setup_java.sh && ./ci/serverless/kibana_api_tests.sh + - label: "metricbeat stack monitoring test" + command: ./.buildkite/scripts/setup_java.sh && ./ci/serverless/metricbeat_monitoring_tests.sh \ No newline at end of file diff --git a/.gitignore b/.gitignore index 1b0d8f000..cc0ab0db4 100644 --- a/.gitignore +++ b/.gitignore @@ -62,4 +62,5 @@ lib/pluginmanager/plugin_aliases.yml logstash-core/src/main/resources/org/logstash/plugins/plugin_aliases.yml spec/unit/plugin_manager/plugin_aliases.yml logstash-core/src/test/resources/org/logstash/plugins/plugin_aliases.yml -qa/integration/fixtures/logs_rollover/log4j2.properties \ No newline at end of file +qa/integration/fixtures/logs_rollover/log4j2.properties +ci/serverless/config/*.log \ No newline at end of file diff --git a/ci/serverless/common.sh b/ci/serverless/common.sh new file mode 100755 index 000000000..7646bf2b6 --- /dev/null +++ b/ci/serverless/common.sh @@ -0,0 +1,151 @@ +#!/usr/bin/env bash +set -ex + +export CURRENT_DIR="$(dirname "$0")" +export INDEX_NAME="serverless_it_${BUILDKITE_BUILD_NUMBER:-`date +%s`}" +# store all error messages +export ERR_MSGS=() +# numeric values representing the results of the checks. 0: pass, >0: fail +export CHECKS=() + +setup_vault() { + vault_path=secret/ci/elastic-logstash/serverless-test + set +x + export ES_ENDPOINT=$(vault read -field=es_host "${vault_path}") + export ES_USER=$(vault read -field=es_user "${vault_path}") + export ES_PW=$(vault read -field=es_user_pw "${vault_path}") + export KB_ENDPOINT=$(vault read -field=kb_host "${vault_path}") + set -x +} + +build_logstash() { + ./gradlew clean bootstrap assemble installDefaultGems +} + +index_test_data() { + curl -X POST -u "$ES_USER:$ES_PW" "$ES_ENDPOINT/$INDEX_NAME/_bulk" -H 'Content-Type: application/json' --data-binary @"$CURRENT_DIR/test_data/book.json" +} + +# $1: check function +run_cpm_logstash() { + # copy log4j + cp "$CURRENT_DIR/../../config/log4j2.properties" "$CURRENT_DIR/config/log4j2.properties" + + # run logstash + $CURRENT_DIR/../../bin/logstash --path.settings "$CURRENT_DIR/config" 2>/dev/null & + export LS_PID=$! + + check_logstash_readiness + + $1 # check function + + kill "$LS_PID" || true +} + +# $1: pipeline file +# $2: check function +# run_logstash 001_es-output.conf check_es_output +run_logstash() { + $CURRENT_DIR/../../bin/logstash -f "$1" 2>/dev/null & + export LS_PID=$! + + check_logstash_readiness + + $2 # check function + + kill "$LS_PID" || true +} + + +# $1: number of try +# $n: check function with args - return non empty string as pass +count_down_check() { + count=$1 + while ! [[ $("${@:2}") ]] && [[ $count -gt 0 ]]; do + count=$(( count - 1 )) + sleep 1 + done + + [[ $count -eq 0 ]] && echo "1" && return + + echo "Passed check!" + echo "0" +} + + +check_logstash_readiness() { + curl_logstash() { + [[ $(curl --silent localhost:9600) ]] && echo "0" + } + check_readiness() { + count_down_check 120 curl_logstash + } + add_check check_readiness "Failed readiness check." + + [[ "${CHECKS[-1]}" -eq "1" ]] && exit 1 + + echo "Logstash is Up !" + return 0 +} + +# $1: jq filter +# $2: expected value +# check_logstash_api '.pipelines.main.plugins.outputs[0].documents.successes' '1' +check_logstash_api() { + curl_node_stats() { + [[ $(curl --silent localhost:9600/_node/stats | jq "$1") -ge "$2" ]] && echo "0" + } + + count_down_check 30 curl_node_stats "$1" "$2" +} + +# add check result to CHECKS +# $1: check function - expected the last char of result to be 0 or positive number +# $2: err msg +add_check() { + FEATURE_CHECK=$($1) + FEATURE_CHECK="${FEATURE_CHECK: -1}" + + ERR_MSGS+=("$2") + CHECKS+=("$FEATURE_CHECK") +} + +# check log if the line contains [ERROR] or [FATAL] and does not relate to "unreachable" +check_err_log() { + LOG_FILE="$CURRENT_DIR/../../logs/logstash-plain.log" + LOG_CHECK=$(grep -E "\[ERROR\]|\[FATAL\]" "$LOG_FILE" | grep -cvE "unreachable|Connection refused") || true + + ERR_MSGS+=("Found error in log") + CHECKS+=("$LOG_CHECK") +} + +# if CHECKS[i] is 1, print ERR_MSGS[i] +print_result() { + for i in "${!CHECKS[@]}"; do + [[ "${CHECKS[$i]}" -gt 0 ]] && echo "${ERR_MSGS[$i]}" || true + done +} + +# exit 1 if one of the checks fails +exit_with_code() { + for c in "${CHECKS[@]}"; do + [[ $c -gt 0 ]] && exit 1 + done + + exit 0 +} + +clean_up_and_get_result() { + [[ -n "$LS_PID" ]] && kill "$LS_PID" || true + + check_err_log + print_result + exit_with_code +} + +# common setup +setup() { + setup_vault + build_logstash + trap clean_up_and_get_result INT TERM EXIT +} diff --git a/ci/serverless/config/logstash.yml b/ci/serverless/config/logstash.yml new file mode 100644 index 000000000..2e9f298d9 --- /dev/null +++ b/ci/serverless/config/logstash.yml @@ -0,0 +1,10 @@ +xpack.management.enabled: true +xpack.management.pipeline.id: ["gen_es"] +xpack.management.elasticsearch.username: ${ES_USER} +xpack.management.elasticsearch.password: ${ES_PW} +xpack.management.elasticsearch.hosts: ["${ES_ENDPOINT}"] + +xpack.monitoring.enabled: true +xpack.monitoring.elasticsearch.username: ${ES_USER} +xpack.monitoring.elasticsearch.password: ${ES_PW} +xpack.monitoring.elasticsearch.hosts: ["${ES_ENDPOINT}"] \ No newline at end of file diff --git a/ci/serverless/cpm_tests.sh b/ci/serverless/cpm_tests.sh new file mode 100755 index 000000000..32b6c9643 --- /dev/null +++ b/ci/serverless/cpm_tests.sh @@ -0,0 +1,48 @@ +#!/usr/bin/env bash +set -ex + +source ./$(dirname "$0")/common.sh + +export PIPELINE_NAME='gen_es' + +# update pipeline and check response code +index_pipeline() { + RESP_CODE=$(curl -s -w "%{http_code}" -X PUT -u "$ES_USER:$ES_PW" "$ES_ENDPOINT/_logstash/pipeline/$1" -H 'Content-Type: application/json' -d "$2") + if [[ $RESP_CODE -ge '400' ]]; then + echo "failed to update pipeline for Central Pipeline Management. Got $RESP_CODE from Elasticsearch" + exit 1 + fi +} + +# index pipeline to serverless ES +index_cpm_pipelines() { + index_pipeline "$PIPELINE_NAME" '{ + "pipeline": "input { generator { count => 100 } } output { elasticsearch { hosts => \"${ES_ENDPOINT}\" user => \"${ES_USER}\" password => \"${ES_PW}\" index=> \"${INDEX_NAME}\" } }", + "last_modified": "2023-07-04T22:22:22.222Z", + "pipeline_metadata": { "version": "1"}, + "username": "log.stash", + "pipeline_settings": {"pipeline.batch.delay": "50"} + }' +} + +check_es_output() { + check_logstash_api '.pipelines.gen_es.plugins.outputs[0].documents.successes' '100' +} + +check_plugin() { + add_check check_es_output "Failed central pipeline management check." +} + +delete_pipeline() { + curl -u "$ES_USER:$ES_PW" -X DELETE "$ES_ENDPOINT/_logstash/pipeline/$PIPELINE_NAME" -H 'Content-Type: application/json'; +} + +cpm_clean_up_and_get_result() { + delete_pipeline + clean_up_and_get_result +} + +setup +trap cpm_clean_up_and_get_result INT TERM EXIT +index_cpm_pipelines +run_cpm_logstash check_plugin diff --git a/ci/serverless/dlq_rspec_tests.sh b/ci/serverless/dlq_rspec_tests.sh new file mode 100755 index 000000000..afe8a91b8 --- /dev/null +++ b/ci/serverless/dlq_rspec_tests.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +set -ex + +vault_path=secret/ci/elastic-logstash/serverless-test + +export JRUBY_OPTS="-J-Xmx1g" +export SERVERLESS=true +set +x +export ES_ENDPOINT=$(vault read -field=es_host "${vault_path}") +export ES_USER=$(vault read -field=es_user "${vault_path}") +export ES_PW=$(vault read -field=es_user_pw "${vault_path}") +set -x + +./gradlew clean bootstrap assemble installDefaultGems unpackTarDistribution +./gradlew :logstash-core:copyGemjar + +export GEM_PATH=vendor/bundle/jruby/3.1.0 +export GEM_HOME=vendor/bundle/jruby/3.1.0 + +vendor/jruby/bin/jruby -S bundle install --with development + +vendor/jruby/bin/jruby -S bundle exec rspec -fd qa/integration/specs/dlq_spec.rb -e "using pipelines.yml" diff --git a/ci/serverless/elastic_integration_filter_tests.sh b/ci/serverless/elastic_integration_filter_tests.sh new file mode 100755 index 000000000..52f61f336 --- /dev/null +++ b/ci/serverless/elastic_integration_filter_tests.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash +set -ex + +source ./$(dirname "$0")/common.sh + +deploy_ingest_pipeline() { + PIPELINE_RESP_CODE=$(curl -s -w "%{http_code}" -o /dev/null -X PUT -u "$ES_USER:$ES_PW" "$ES_ENDPOINT/_ingest/pipeline/integration-logstash_test.events-default" \ + -H 'Content-Type: application/json' \ + --data-binary @"$CURRENT_DIR/test_data/ingest_pipeline.json") + + TEMPLATE_RESP_CODE=$(curl -s -w "%{http_code}" -o /dev/null -X PUT -u "$ES_USER:$ES_PW" "$ES_ENDPOINT/_index_template/logs-serverless-default-template" \ + -H 'Content-Type: application/json' \ + --data-binary @"$CURRENT_DIR/test_data/index_template.json") + + # ingest pipeline is likely be there from the last run + # failing to update pipeline does not stop the test + if [[ $PIPELINE_RESP_CODE -ge '400' ]]; then + ERR_MSGS+=("Failed to update ingest pipeline. Got $PIPELINE_RESP_CODE") + fi + + if [[ $TEMPLATE_RESP_CODE -ge '400' ]]; then + ERR_MSGS+=("Failed to update index template. Got $TEMPLATE_RESP_CODE") + fi +} + +# processor should append 'serverless' to message +check_integration_filter() { + check_logstash_api '.pipelines.main.plugins.filters[] | select(.id == "mutate1") | .events.out' '1' +} + +get_doc_msg_length() { + curl -s -u "$ES_USER:$ES_PW" "$ES_ENDPOINT/logs-$INDEX_NAME.004-default/_search?size=1" | jq '.hits.hits[0]._source.message | length' +} + +# ensure no double run of ingest pipeline +# message = ['ok', 'serverless*'] +validate_ds_doc() { + [[ $(get_doc_msg_length) -eq "2" ]] && echo "0" +} + +check_doc_no_duplication() { + count_down_check 20 validate_ds_doc +} + +check_plugin() { + add_check check_integration_filter "Failed ingest pipeline processor check." + add_check check_doc_no_duplication "Failed ingest pipeline duplication check." +} + +setup +# install plugin +"$CURRENT_DIR/../../bin/logstash-plugin" install logstash-filter-elastic_integration +deploy_ingest_pipeline +run_logstash "$CURRENT_DIR/pipeline/004_integration-filter.conf" check_plugin diff --git a/ci/serverless/es_filter_tests.sh b/ci/serverless/es_filter_tests.sh new file mode 100755 index 000000000..c86866a10 --- /dev/null +++ b/ci/serverless/es_filter_tests.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +set -ex + +source ./$(dirname "$0")/common.sh + +check_es_filter() { + check_logstash_api '.pipelines.main.plugins.filters[] | select(.id == "ok") | .events.out' '1' +} + +check_plugin() { + add_check check_es_filter "Failed es-filter check." +} + +setup +index_test_data +run_logstash "$CURRENT_DIR/pipeline/002_es-filter.conf" check_plugin diff --git a/ci/serverless/es_input_tests.sh b/ci/serverless/es_input_tests.sh new file mode 100755 index 000000000..5924ebd9b --- /dev/null +++ b/ci/serverless/es_input_tests.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +set -ex + +source ./$(dirname "$0")/common.sh + +check_es_input() { + check_logstash_api '.pipelines.main.plugins.inputs[0].events.out' '1' +} + +check_plugin() { + add_check check_es_input "Failed es-input check." +} + +setup +index_test_data +run_logstash "$CURRENT_DIR/pipeline/003_es-input.conf" check_plugin diff --git a/ci/serverless/es_output_tests.sh b/ci/serverless/es_output_tests.sh new file mode 100755 index 000000000..e2babd14c --- /dev/null +++ b/ci/serverless/es_output_tests.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +set -ex + +source ./$(dirname "$0")/common.sh + + +check_named_index() { + check_logstash_api '.pipelines.main.plugins.outputs[] | select(.id == "named_index") | .documents.successes' '1' +} + +get_data_stream_count() { + curl -s -u "$ES_USER:$ES_PW" "$ES_ENDPOINT/logs-$INDEX_NAME.001-default/_count" | jq '.count' +} + +compare_data_stream_count() { + [[ $(get_data_stream_count) -ge "$INITIAL_DATA_STREAM_CNT" ]] && echo "0" +} + +check_data_stream_output() { + count_down_check 20 compare_data_stream_count +} + +check_plugin() { + add_check check_named_index "Failed index check." + add_check check_data_stream_output "Failed data stream check." +} + +setup +export INITIAL_DATA_STREAM_CNT=$(get_data_stream_count) +run_logstash "$CURRENT_DIR/pipeline/001_es-output.conf" check_plugin \ No newline at end of file diff --git a/ci/serverless/kibana_api_tests.sh b/ci/serverless/kibana_api_tests.sh new file mode 100755 index 000000000..180a2c90b --- /dev/null +++ b/ci/serverless/kibana_api_tests.sh @@ -0,0 +1,61 @@ +#!/usr/bin/env bash +set -ex + +source ./$(dirname "$0")/common.sh + +export PIPELINE_NAME="stdin_stdout" +export EXIT_CODE="0" + +create_pipeline() { + RESP_CODE=$(curl -s -w "%{http_code}" -o /dev/null -X PUT -u "$ES_USER:$ES_PW" "$KB_ENDPOINT/api/logstash/pipeline/$PIPELINE_NAME" \ + -H 'Content-Type: application/json' -H 'kbn-xsrf: logstash' \ + --data-binary @"$CURRENT_DIR/test_data/$PIPELINE_NAME.json") + + if [[ RESP_CODE -ge '400' ]]; then + EXIT_CODE=$(( EXIT_CODE + 1 )) + echo "Fail to create pipeline." + fi +} + +get_pipeline() { + RESP_BODY=$(curl -s -X GET -u "$ES_USER:$ES_PW" "$KB_ENDPOINT/api/logstash/pipeline/$PIPELINE_NAME") + + SOURCE_BODY=$(cat "$CURRENT_DIR/test_data/$PIPELINE_NAME.json") + + if [[ $(echo "$RESP_BODY" | jq -r '.id') -ne "$PIPELINE_NAME" ]] ||\ + [[ $(echo "$RESP_BODY" | jq -r '.pipeline') -ne $(echo "$SOURCE_BODY" | jq -r '.pipeline') ]] ||\ + [[ $(echo "$RESP_BODY" | jq -r '.settings') -ne $(echo "$SOURCE_BODY" | jq -r '.settings') ]]; then + + EXIT_CODE=$(( EXIT_CODE + 1 )) + echo "Fail to get pipeline." + fi + +} + +list_pipeline() { + RESP_BODY=$(curl -s -X GET -u "$ES_USER:$ES_PW" "$KB_ENDPOINT/api/logstash/pipelines" | jq --arg name "$PIPELINE_NAME" '.pipelines[] | select(.id==$name)' ) + if [[ -z "$RESP_BODY" ]]; then + EXIT_CODE=$(( EXIT_CODE + 1 )) + echo "Fail to list pipeline." + fi +} + +delete_pipeline() { + RESP_CODE=$(curl -s -w "%{http_code}" -o /dev/null -X DELETE -u "$ES_USER:$ES_PW" "$KB_ENDPOINT/api/logstash/pipeline/$PIPELINE_NAME" \ + -H 'Content-Type: application/json' -H 'kbn-xsrf: logstash' \ + --data-binary @"$CURRENT_DIR/test_data/$PIPELINE_NAME.json") + + if [[ RESP_CODE -ge '400' ]]; then + EXIT_CODE=$(( EXIT_CODE + 1 )) + echo "Fail to delete pipeline." + fi +} + +setup_vault + +create_pipeline +get_pipeline +list_pipeline +delete_pipeline + +exit $EXIT_CODE \ No newline at end of file diff --git a/ci/serverless/metricbeat/metricbeat.yml b/ci/serverless/metricbeat/metricbeat.yml new file mode 100644 index 000000000..807f19282 --- /dev/null +++ b/ci/serverless/metricbeat/metricbeat.yml @@ -0,0 +1,20 @@ +metricbeat.config: + modules: + path: ${path.config}/modules.d/*.yml + reload.enabled: false + +output.elasticsearch: + hosts: ["${ES_ENDPOINT}"] + protocol: "https" + username: "${ES_USER}" + password: "${ES_PW}" + +metricbeat.modules: + - module: logstash + metricsets: + - node + - node_stats + period: 10s + hosts: + - localhost:9600 + xpack.enabled: true \ No newline at end of file diff --git a/ci/serverless/metricbeat_monitoring_tests.sh b/ci/serverless/metricbeat_monitoring_tests.sh new file mode 100755 index 000000000..15b486315 --- /dev/null +++ b/ci/serverless/metricbeat_monitoring_tests.sh @@ -0,0 +1,73 @@ +#!/usr/bin/env bash +set -ex + +source ./$(dirname "$0")/common.sh + +get_cpu_arch() { + local arch=$(uname -m) + + if [ "$arch" == "aarch64" ]; then + echo "arm64" + else + echo "$arch" + fi +} + +export INDEX_NAME=".monitoring-logstash-8-mb" +export OS=$(uname -s | tr '[:upper:]' '[:lower:]') +export ARCH=$(get_cpu_arch) +export BEATS_VERSION=$(curl -s "https://api.github.com/repos/elastic/beats/tags" | jq -r '.[0].name' | cut -c 2-) + +start_metricbeat() { + cd "$CURRENT_DIR" + + MB_FILENAME="metricbeat-$BEATS_VERSION-$OS-$ARCH" + MB_DL_URL="https://artifacts.elastic.co/downloads/beats/metricbeat/$MB_FILENAME.tar.gz" + + if [[ ! -d "$MB_FILENAME" ]]; then + curl -o "$MB_FILENAME.tar.gz" "$MB_DL_URL" + tar -zxf "$MB_FILENAME.tar.gz" + fi + + chmod go-w "metricbeat/metricbeat.yml" + "$MB_FILENAME/metricbeat" -c "metricbeat/metricbeat.yml" & + export MB_PID=$! + cd - +} + +stop_metricbeat() { + [[ -n "$MB_PID" ]] && kill "$MB_PID" || true +} + +get_monitor_count() { + curl -s -u "$ES_USER:$ES_PW" "$ES_ENDPOINT/$INDEX_NAME/_count" | jq '.count' +} + +compare_monitor_count() { + [[ $(get_monitor_count) -gt "$INITIAL_MONITOR_CNT" ]] && echo "0" +} + +check_monitor_output() { + count_down_check 60 compare_monitor_count +} + +check_plugin() { + add_check check_monitor_output "Failed metricbeat monitor check." +} + +metricbeat_clean_up() { + exit_code=$? + ERR_MSGS+=("Unknown error!") + CHECKS+=("$exit_code") + + stop_metricbeat + + clean_up_and_get_result +} + +setup +trap metricbeat_clean_up INT TERM EXIT +export INITIAL_MONITOR_CNT=$(get_monitor_count) + +start_metricbeat +run_logstash "$CURRENT_DIR/pipeline/005_uptime.conf" check_plugin diff --git a/ci/serverless/monitoring_tests.sh b/ci/serverless/monitoring_tests.sh new file mode 100755 index 000000000..1d2fa7cf7 --- /dev/null +++ b/ci/serverless/monitoring_tests.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +set -ex + +source ./$(dirname "$0")/common.sh + +get_monitor_count() { + curl -s -u "$ES_USER:$ES_PW" "$ES_ENDPOINT/.monitoring-logstash-7-*/_count" | jq '.count' +} + +compare_monitor_count() { + [[ $(get_monitor_count) -gt "$INITIAL_MONITOR_CNT" ]] && echo "0" +} + +check_monitor() { + count_down_check 20 compare_monitor_count +} + +check() { + add_check check_monitor "Failed monitor check." +} + +setup +export INITIAL_MONITOR_CNT=$(get_monitor_count) +run_cpm_logstash check diff --git a/ci/serverless/pipeline/001_es-output.conf b/ci/serverless/pipeline/001_es-output.conf new file mode 100644 index 000000000..33ba0967b --- /dev/null +++ b/ci/serverless/pipeline/001_es-output.conf @@ -0,0 +1,27 @@ +input { + heartbeat { + interval => 1 + add_field => { + "[data_stream][type]" => "logs" + "[data_stream][dataset]" => "${INDEX_NAME}.001" + "[data_stream][namespace]" => "default" + } + } +} + +output { + elasticsearch { + id => "named_index" + hosts => ["${ES_ENDPOINT}"] + user => "${ES_USER}" + password => "${ES_PW}" + index => "${INDEX_NAME}" + } + + elasticsearch { + id => "data_stream" + hosts => ["${ES_ENDPOINT}"] + user => "${ES_USER}" + password => "${ES_PW}" + } +} \ No newline at end of file diff --git a/ci/serverless/pipeline/002_es-filter.conf b/ci/serverless/pipeline/002_es-filter.conf new file mode 100644 index 000000000..b76570880 --- /dev/null +++ b/ci/serverless/pipeline/002_es-filter.conf @@ -0,0 +1,26 @@ +input { + heartbeat { + interval => 1 + } +} + +filter { + elasticsearch { + hosts => ["${ES_ENDPOINT}"] + user => "${ES_USER}" + password => "${ES_PW}" + index => "${INDEX_NAME}" + query => "*" + add_field => {"check" => "good"} + } + + if [check] == "good" { + mutate { id => "ok" } + } +} + +output { + stdout { + codec => dots + } +} \ No newline at end of file diff --git a/ci/serverless/pipeline/003_es-input.conf b/ci/serverless/pipeline/003_es-input.conf new file mode 100644 index 000000000..8bf3645cb --- /dev/null +++ b/ci/serverless/pipeline/003_es-input.conf @@ -0,0 +1,15 @@ +input { + elasticsearch { + hosts => ["${ES_ENDPOINT}"] + user => "${ES_USER}" + password => "${ES_PW}" + index => "${INDEX_NAME}" + size => 100 + schedule => "*/10 * * * * *" + } +} +output { + stdout { + codec => dots + } +} \ No newline at end of file diff --git a/ci/serverless/pipeline/004_integration-filter.conf b/ci/serverless/pipeline/004_integration-filter.conf new file mode 100644 index 000000000..b561b1350 --- /dev/null +++ b/ci/serverless/pipeline/004_integration-filter.conf @@ -0,0 +1,35 @@ +input { + heartbeat { + interval => 1 + add_field => { + "[data_stream][type]" => "logs" + "[data_stream][dataset]" => "${INDEX_NAME}.004" + "[data_stream][namespace]" => "default" + } + } +} +filter { + elastic_integration { + hosts => "${ES_ENDPOINT}" + username => "${ES_USER}" + password => "${ES_PW}" + remove_field => ["_version"] + add_field => {"ingested" => "ok"} + } + + if ([ingested] == "ok") and ([message][1] =~ 'serverless' ) { + mutate { id => "mutate1" } + } +} +output { + stdout { + codec => dots + } + + elasticsearch { + id => "data_stream" + hosts => ["${ES_ENDPOINT}"] + user => "${ES_USER}" + password => "${ES_PW}" + } +} \ No newline at end of file diff --git a/ci/serverless/pipeline/005_uptime.conf b/ci/serverless/pipeline/005_uptime.conf new file mode 100644 index 000000000..b0a181281 --- /dev/null +++ b/ci/serverless/pipeline/005_uptime.conf @@ -0,0 +1,8 @@ +input { + exec { command => 'uptime' interval => 10 } +} +output { + stdout { + codec => dots + } +} \ No newline at end of file diff --git a/ci/serverless/test_data/book.json b/ci/serverless/test_data/book.json new file mode 100644 index 000000000..870054fa7 --- /dev/null +++ b/ci/serverless/test_data/book.json @@ -0,0 +1,14 @@ +{"index": {}} +{"book_name": "The Great Gatsby", "author": "F. Scott Fitzgerald", "@timestamp": "1925-04-10T00:00:00"} +{"index": {}} +{"book_name": "To Kill a Mockingbird", "author": "Harper Lee", "@timestamp": "1960-07-11T00:00:00"} +{"index": {}} +{"book_name": "1984", "author": "George Orwell", "@timestamp": "1949-06-08T00:00:00"} +{"index": {}} +{"book_name": "Pride and Prejudice", "author": "Jane Austen", "@timestamp": "1813-01-28T00:00:00"} +{"index": {}} +{"book_name": "The Catcher in the Rye", "author": "J.D. Salinger", "@timestamp": "1951-07-16T00:00:00"} +{"index": {}} +{"book_name": "Moby Dick", "author": "Herman Melville", "@timestamp": "1851-10-18T00:00:00"} +{"index": {}} +{"book_name": "The Lord of the Rings", "author": "J.R.R. Tolkien", "@timestamp": "1954-07-29T00:00:00"} diff --git a/ci/serverless/test_data/index_template.json b/ci/serverless/test_data/index_template.json new file mode 100644 index 000000000..998a2bea4 --- /dev/null +++ b/ci/serverless/test_data/index_template.json @@ -0,0 +1,11 @@ +{ + "index_patterns": ["logs-serverless*"], + "data_stream": { }, + "priority": 500, + "template": { + "settings": { + "index.default_pipeline": "integration-logstash_test.events-default", + "index.lifecycle.name": "logs" + } + } +} \ No newline at end of file diff --git a/ci/serverless/test_data/ingest_pipeline.json b/ci/serverless/test_data/ingest_pipeline.json new file mode 100644 index 000000000..a192b5c53 --- /dev/null +++ b/ci/serverless/test_data/ingest_pipeline.json @@ -0,0 +1,10 @@ +{ + "processors": [ + { + "append": { + "field": "message", + "value": ["serverless test ^_^ "] + } + } + ] +} \ No newline at end of file diff --git a/ci/serverless/test_data/stdin_stdout.json b/ci/serverless/test_data/stdin_stdout.json new file mode 100644 index 000000000..d6e7acfb3 --- /dev/null +++ b/ci/serverless/test_data/stdin_stdout.json @@ -0,0 +1,6 @@ +{ + "pipeline": "input { stdin {} } output { stdout {} }", + "settings": { + "queue.type": "persisted" + } +} \ No newline at end of file diff --git a/qa/integration/framework/fixture.rb b/qa/integration/framework/fixture.rb index e3985d152..b6873ad05 100644 --- a/qa/integration/framework/fixture.rb +++ b/qa/integration/framework/fixture.rb @@ -16,6 +16,7 @@ # under the License. require_relative "../services/service_locator" +require_relative '../specs/spec_helper' # A class that holds all fixtures for a given test file. This deals with # bootstrapping services, dealing with config files, inputs etc @@ -43,7 +44,7 @@ class Fixture @test_file_location = test_file_location @settings = TestSettings.new(@test_file_location) @service_locator = ServiceLocator.new(@settings) - setup_services + setup_services unless serverless? @input = File.join(FIXTURES_DIR, @settings.get("input")) if @settings.is_set?("input") @actual_output = @settings.get("actual_output") end diff --git a/qa/integration/services/elasticsearch_service.rb b/qa/integration/services/elasticsearch_service.rb index 9aa536909..56642cfc3 100644 --- a/qa/integration/services/elasticsearch_service.rb +++ b/qa/integration/services/elasticsearch_service.rb @@ -16,6 +16,7 @@ # under the License. require 'elasticsearch' +require_relative '../specs/spec_helper' class ElasticsearchService < Service def initialize(settings) @@ -23,6 +24,10 @@ class ElasticsearchService < Service end def get_client + @client ||= Elasticsearch::Client.new( + :hosts => ENV["ES_ENDPOINT"], + :user => ENV["ES_USER"], :password => ENV["ES_PW"]) if serverless? + @client ||= Elasticsearch::Client.new(:hosts => "localhost:9200") end end diff --git a/qa/integration/specs/dlq_spec.rb b/qa/integration/specs/dlq_spec.rb index 2bacfd924..7583b1739 100644 --- a/qa/integration/specs/dlq_spec.rb +++ b/qa/integration/specs/dlq_spec.rb @@ -62,6 +62,13 @@ describe "Test Dead Letter Queue" do let!(:config_yaml_file) { ::File.join(settings_dir, "logstash.yml") } let!(:settings_dir) { Stud::Temporary.directory } + let(:serverless_es_config) do + if serverless? + " hosts => '${ES_ENDPOINT}' user => '${ES_USER}' password => '${ES_PW}' " + else + "" + end + end shared_examples_for "it can send 1000 documents to and index from the dlq" do it 'should index all documents' do @@ -106,14 +113,14 @@ describe "Test Dead Letter Queue" do "pipeline.workers" => 1, "dead_letter_queue.enable" => true, "pipeline.batch.size" => 100, - "config.string" => "input { generator { message => '#{message}' codec => \"json\" count => 1000 } } output { elasticsearch { index => \"test-index\" } }" + "config.string" => "input { generator { message => '#{message}' codec => \"json\" count => 1000 } } output { elasticsearch { index => \"test-index\" #{serverless_es_config} } }" }, { "pipeline.id" => "test2", "pipeline.workers" => 1, "dead_letter_queue.enable" => false, "pipeline.batch.size" => 100, - "config.string" => "input { dead_letter_queue { pipeline_id => 'test' path => \"#{dlq_dir}\" commit_offsets => true } } filter { mutate { remove_field => [\"ip\"] add_field => {\"mutated\" => \"true\" } } } output { elasticsearch { index => \"test-index\" } }" + "config.string" => "input { dead_letter_queue { pipeline_id => 'test' path => \"#{dlq_dir}\" commit_offsets => true } } filter { mutate { remove_field => [\"ip\"] add_field => {\"mutated\" => \"true\" } } } output { elasticsearch { index => \"test-index\" #{serverless_es_config} } }" } ]} @@ -134,7 +141,7 @@ describe "Test Dead Letter Queue" do filter { if ([ip]) { mutate { remove_field => [\"ip\"] add_field => { \"mutated\" => \"true\" } } } } - output { elasticsearch { index => \"test-index\" } }" + output { elasticsearch { index => \"test-index\" #{serverless_es_config} } }" } ]} diff --git a/qa/integration/specs/spec_helper.rb b/qa/integration/specs/spec_helper.rb index 3bc62f7af..6af6ec6a4 100644 --- a/qa/integration/specs/spec_helper.rb +++ b/qa/integration/specs/spec_helper.rb @@ -25,6 +25,10 @@ def clean_es(es_client) es_client.indices.refresh end +def serverless? + ENV["SERVERLESS"] == "true" +end + RSpec.configure do |config| if RbConfig::CONFIG["host_os"] != "linux" exclude_tags = { :linux => true }