[Backport 8.x] benchmark script (#17283)

This commit cherry-picked the missing becnhmark script PRs
The deprecated artifacts-api is removed

[CI] benchmark uses the new artifacts-api (#17224)
[CI] benchmark readme (#16783)
Introduce a new flag to explicitly permit legacy monitoring (#16586) (Only take the benchmark script)
[ci] fix wrong queue type in benchmark marathon (#16465)
[CI] fix benchmark marathon (#16447)
[CI] benchmark dashboard and pipeline for testing against multiple versions (#16421)
This commit is contained in:
kaisecheng 2025-03-07 00:16:29 +00:00 committed by GitHub
parent fde903c93d
commit 4fd13730ee
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 648 additions and 272 deletions

View file

@ -0,0 +1,11 @@
agents:
provider: gcp
imageProject: elastic-images-prod
image: family/platform-ingest-logstash-ubuntu-2204
machineType: "n2-standard-16"
diskSizeGb: 100
diskType: pd-ssd
steps:
- label: "Benchmark Marathon"
command: .buildkite/scripts/benchmark/marathon.sh

View file

@ -0,0 +1,22 @@
## Steps to set up GCP instance to run benchmark script
- Create an instance "n2-standard-16" with Ubuntu image
- Install docker
- `sudo snap install docker`
- `sudo usermod -a -G docker $USER`
- Install jq
- Install vault
- `sudo snap install vault`
- `vault login --method github`
- `vault kv get -format json secret/ci/elastic-logstash/benchmark`
- Setup Elasticsearch index mapping and alias with `setup/*`
- Import Kibana dashboard with `save-objects/*`
- Run the benchmark script
- Send data to your own Elasticsearch. Customise `VAULT_PATH="secret/ci/elastic-logstash/your/path"`
- Run the script `main.sh`
- or run in background `nohup bash -x main.sh > log.log 2>&1 &`
## Notes
- Benchmarks should only be compared using the same hardware setup.
- Please do not send the test metrics to the benchmark cluster. You can set `VAULT_PATH` to send data and metrics to your own server.
- Run `all.sh` as calibration which gives you a baseline of performance in different versions.
- [#16586](https://github.com/elastic/logstash/pull/16586) allows legacy monitoring using the configuration `xpack.monitoring.allow_legacy_collection: true`, which is not recognized in version 8. To run benchmarks in version 8, use the script of the corresponding branch (e.g. `8.16`) instead of `main` in buildkite.

View file

@ -3,6 +3,7 @@ pipeline.workers: ${WORKER}
pipeline.batch.size: ${BATCH_SIZE}
queue.type: ${QTYPE}
xpack.monitoring.allow_legacy_collection: true
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.username: ${MONITOR_ES_USER}
xpack.monitoring.elasticsearch.password: ${MONITOR_ES_PW}

View file

@ -0,0 +1 @@
f74f1a28-25e9-494f-ba41-ca9f13d4446d

View file

@ -0,0 +1,315 @@
#!/usr/bin/env bash
set -eo pipefail
SCRIPT_PATH="$(dirname "${BASH_SOURCE[0]}")"
CONFIG_PATH="$SCRIPT_PATH/config"
source "$SCRIPT_PATH/util.sh"
usage() {
echo "Usage: $0 [FB_CNT] [QTYPE] [CPU] [MEM]"
echo "Example: $0 4 {persisted|memory|all} 2 2"
exit 1
}
parse_args() {
while [[ "$#" -gt 0 ]]; do
if [ -z "$FB_CNT" ]; then
FB_CNT=$1
elif [ -z "$QTYPE" ]; then
case $1 in
all | persisted | memory)
QTYPE=$1
;;
*)
echo "Error: wrong queue type $1"
usage
;;
esac
elif [ -z "$CPU" ]; then
CPU=$1
elif [ -z "$MEM" ]; then
MEM=$1
else
echo "Error: Too many arguments"
usage
fi
shift
done
# set default value
# number of filebeat
FB_CNT=${FB_CNT:-4}
# all | persisted | memory
QTYPE=${QTYPE:-all}
CPU=${CPU:-4}
MEM=${MEM:-4}
XMX=$((MEM / 2))
IFS=','
# worker multiplier: 1,2,4
MULTIPLIERS="${MULTIPLIERS:-1,2,4}"
read -ra MULTIPLIERS <<< "$MULTIPLIERS"
BATCH_SIZES="${BATCH_SIZES:-500}"
read -ra BATCH_SIZES <<< "$BATCH_SIZES"
# tags to json array
read -ra TAG_ARRAY <<< "$TAGS"
JSON_TAGS=$(printf '"%s",' "${TAG_ARRAY[@]}" | sed 's/,$//')
JSON_TAGS="[$JSON_TAGS]"
IFS=' '
echo "filebeats: $FB_CNT, cpu: $CPU, mem: $MEM, Queue: $QTYPE, worker multiplier: ${MULTIPLIERS[@]}, batch size: ${BATCH_SIZES[@]}"
}
get_secret() {
VAULT_PATH=${VAULT_PATH:-secret/ci/elastic-logstash/benchmark}
VAULT_DATA=$(vault kv get -format json $VAULT_PATH)
BENCHMARK_ES_HOST=$(echo $VAULT_DATA | jq -r '.data.es_host')
BENCHMARK_ES_USER=$(echo $VAULT_DATA | jq -r '.data.es_user')
BENCHMARK_ES_PW=$(echo $VAULT_DATA | jq -r '.data.es_pw')
MONITOR_ES_HOST=$(echo $VAULT_DATA | jq -r '.data.monitor_es_host')
MONITOR_ES_USER=$(echo $VAULT_DATA | jq -r '.data.monitor_es_user')
MONITOR_ES_PW=$(echo $VAULT_DATA | jq -r '.data.monitor_es_pw')
}
pull_images() {
echo "--- Pull docker images"
if [[ -n "$LS_VERSION" ]]; then
# pull image if it doesn't exist in local
[[ -z $(docker images -q docker.elastic.co/logstash/logstash:$LS_VERSION) ]] && docker pull "docker.elastic.co/logstash/logstash:$LS_VERSION"
else
# pull the latest snapshot logstash image
# select the SNAPSHOT artifact with the highest semantic version number
LS_VERSION=$( curl --retry-all-errors --retry 5 --retry-delay 1 -s "https://storage.googleapis.com/artifacts-api/snapshots/main.json" | jq -r '.version' )
BUILD_ID=$(curl --retry-all-errors --retry 5 --retry-delay 1 -s "https://storage.googleapis.com/artifacts-api/snapshots/main.json" | jq -r '.build_id')
ARCH=$(arch)
IMAGE_URL="https://snapshots.elastic.co/${BUILD_ID}/downloads/logstash/logstash-$LS_VERSION-docker-image-$ARCH.tar.gz"
IMAGE_FILENAME="$LS_VERSION.tar.gz"
echo "Download $LS_VERSION from $IMAGE_URL"
[[ ! -e $IMAGE_FILENAME ]] && curl -fsSL --retry-max-time 60 --retry 3 --retry-delay 5 -o "$IMAGE_FILENAME" "$IMAGE_URL"
[[ -z $(docker images -q docker.elastic.co/logstash/logstash:$LS_VERSION) ]] && docker load -i "$IMAGE_FILENAME"
fi
# pull filebeat image
FB_DEFAULT_VERSION="8.13.4"
FB_VERSION=${FB_VERSION:-$FB_DEFAULT_VERSION}
docker pull "docker.elastic.co/beats/filebeat:$FB_VERSION"
}
generate_logs() {
FLOG_FILE_CNT=${FLOG_FILE_CNT:-4}
SINGLE_SIZE=524288000
TOTAL_SIZE="$((FLOG_FILE_CNT * SINGLE_SIZE))"
FLOG_PATH="$SCRIPT_PATH/flog"
mkdir -p $FLOG_PATH
if [[ ! -e "$FLOG_PATH/log${FLOG_FILE_CNT}.log" ]]; then
echo "--- Generate logs in background. log: ${FLOG_FILE_CNT}, each size: 500mb"
docker run -d --name=flog --rm -v $FLOG_PATH:/go/src/data mingrammer/flog -t log -w -o "/go/src/data/log.log" -b $TOTAL_SIZE -p $SINGLE_SIZE
fi
}
check_logs() {
echo "--- Check log generation"
local cnt=0
until [[ -e "$FLOG_PATH/log${FLOG_FILE_CNT}.log" || $cnt -gt 600 ]]; do
echo "wait 30s" && sleep 30
cnt=$((cnt + 30))
done
ls -lah $FLOG_PATH
}
start_logstash() {
LS_CONFIG_PATH=$SCRIPT_PATH/ls/config
mkdir -p $LS_CONFIG_PATH
cp $CONFIG_PATH/pipelines.yml $LS_CONFIG_PATH/pipelines.yml
cp $CONFIG_PATH/logstash.yml $LS_CONFIG_PATH/logstash.yml
cp $CONFIG_PATH/uuid $LS_CONFIG_PATH/uuid
LS_JAVA_OPTS=${LS_JAVA_OPTS:--Xmx${XMX}g}
docker run -d --name=ls --net=host --cpus=$CPU --memory=${MEM}g -e LS_JAVA_OPTS="$LS_JAVA_OPTS" \
-e QTYPE="$QTYPE" -e WORKER="$WORKER" -e BATCH_SIZE="$BATCH_SIZE" \
-e BENCHMARK_ES_HOST="$BENCHMARK_ES_HOST" -e BENCHMARK_ES_USER="$BENCHMARK_ES_USER" -e BENCHMARK_ES_PW="$BENCHMARK_ES_PW" \
-e MONITOR_ES_HOST="$MONITOR_ES_HOST" -e MONITOR_ES_USER="$MONITOR_ES_USER" -e MONITOR_ES_PW="$MONITOR_ES_PW" \
-v $LS_CONFIG_PATH/logstash.yml:/usr/share/logstash/config/logstash.yml:ro \
-v $LS_CONFIG_PATH/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro \
-v $LS_CONFIG_PATH/uuid:/usr/share/logstash/data/uuid:ro \
docker.elastic.co/logstash/logstash:$LS_VERSION
}
start_filebeat() {
for ((i = 0; i < FB_CNT; i++)); do
FB_PATH="$SCRIPT_PATH/fb${i}"
mkdir -p $FB_PATH
cp $CONFIG_PATH/filebeat.yml $FB_PATH/filebeat.yml
docker run -d --name=fb$i --net=host --user=root \
-v $FB_PATH/filebeat.yml:/usr/share/filebeat/filebeat.yml \
-v $SCRIPT_PATH/flog:/usr/share/filebeat/flog \
docker.elastic.co/beats/filebeat:$FB_VERSION filebeat -e --strict.perms=false
done
}
capture_stats() {
CURRENT=$(jq -r '.flow.output_throughput.current' $NS_JSON)
local eps_1m=$(jq -r '.flow.output_throughput.last_1_minute' $NS_JSON)
local eps_5m=$(jq -r '.flow.output_throughput.last_5_minutes' $NS_JSON)
local worker_util=$(jq -r '.pipelines.main.flow.worker_utilization.last_1_minute' $NS_JSON)
local worker_concurr=$(jq -r '.pipelines.main.flow.worker_concurrency.last_1_minute' $NS_JSON)
local cpu_percent=$(jq -r '.process.cpu.percent' $NS_JSON)
local heap=$(jq -r '.jvm.mem.heap_used_in_bytes' $NS_JSON)
local non_heap=$(jq -r '.jvm.mem.non_heap_used_in_bytes' $NS_JSON)
local q_event_cnt=$(jq -r '.pipelines.main.queue.events_count' $NS_JSON)
local q_size=$(jq -r '.pipelines.main.queue.queue_size_in_bytes' $NS_JSON)
TOTAL_EVENTS_OUT=$(jq -r '.pipelines.main.events.out' $NS_JSON)
printf "current: %s, 1m: %s, 5m: %s, worker_utilization: %s, worker_concurrency: %s, cpu: %s, heap: %s, non-heap: %s, q_events: %s, q_size: %s, total_events_out: %s\n" \
$CURRENT $eps_1m $eps_5m $worker_util $worker_concurr $cpu_percent $heap $non_heap $q_event_cnt $q_size $TOTAL_EVENTS_OUT
}
aggregate_stats() {
local file_glob="$SCRIPT_PATH/$NS_DIR/${QTYPE:0:1}_w${WORKER}b${BATCH_SIZE}_*.json"
MAX_EPS_1M=$( jqmax '.flow.output_throughput.last_1_minute' "$file_glob" )
MAX_EPS_5M=$( jqmax '.flow.output_throughput.last_5_minutes' "$file_glob" )
MAX_WORKER_UTIL=$( jqmax '.pipelines.main.flow.worker_utilization.last_1_minute' "$file_glob" )
MAX_WORKER_CONCURR=$( jqmax '.pipelines.main.flow.worker_concurrency.last_1_minute' "$file_glob" )
MAX_Q_EVENT_CNT=$( jqmax '.pipelines.main.queue.events_count' "$file_glob" )
MAX_Q_SIZE=$( jqmax '.pipelines.main.queue.queue_size_in_bytes' "$file_glob" )
AVG_CPU_PERCENT=$( jqavg '.process.cpu.percent' "$file_glob" )
AVG_VIRTUAL_MEM=$( jqavg '.process.mem.total_virtual_in_bytes' "$file_glob" )
AVG_HEAP=$( jqavg '.jvm.mem.heap_used_in_bytes' "$file_glob" )
AVG_NON_HEAP=$( jqavg '.jvm.mem.non_heap_used_in_bytes' "$file_glob" )
}
send_summary() {
echo "--- Send summary to Elasticsearch"
# build json
local timestamp
timestamp=$(date -u +"%Y-%m-%dT%H:%M:%S")
SUMMARY="{\"timestamp\": \"$timestamp\", \"version\": \"$LS_VERSION\", \"cpu\": \"$CPU\", \"mem\": \"$MEM\", \"workers\": \"$WORKER\", \"batch_size\": \"$BATCH_SIZE\", \"queue_type\": \"$QTYPE\""
not_empty "$TOTAL_EVENTS_OUT" && SUMMARY="$SUMMARY, \"total_events_out\": \"$TOTAL_EVENTS_OUT\""
not_empty "$MAX_EPS_1M" && SUMMARY="$SUMMARY, \"max_eps_1m\": \"$MAX_EPS_1M\""
not_empty "$MAX_EPS_5M" && SUMMARY="$SUMMARY, \"max_eps_5m\": \"$MAX_EPS_5M\""
not_empty "$MAX_WORKER_UTIL" && SUMMARY="$SUMMARY, \"max_worker_utilization\": \"$MAX_WORKER_UTIL\""
not_empty "$MAX_WORKER_CONCURR" && SUMMARY="$SUMMARY, \"max_worker_concurrency\": \"$MAX_WORKER_CONCURR\""
not_empty "$AVG_CPU_PERCENT" && SUMMARY="$SUMMARY, \"avg_cpu_percentage\": \"$AVG_CPU_PERCENT\""
not_empty "$AVG_HEAP" && SUMMARY="$SUMMARY, \"avg_heap\": \"$AVG_HEAP\""
not_empty "$AVG_NON_HEAP" && SUMMARY="$SUMMARY, \"avg_non_heap\": \"$AVG_NON_HEAP\""
not_empty "$AVG_VIRTUAL_MEM" && SUMMARY="$SUMMARY, \"avg_virtual_memory\": \"$AVG_VIRTUAL_MEM\""
not_empty "$MAX_Q_EVENT_CNT" && SUMMARY="$SUMMARY, \"max_queue_events\": \"$MAX_Q_EVENT_CNT\""
not_empty "$MAX_Q_SIZE" && SUMMARY="$SUMMARY, \"max_queue_bytes_size\": \"$MAX_Q_SIZE\""
not_empty "$TAGS" && SUMMARY="$SUMMARY, \"tags\": $JSON_TAGS"
SUMMARY="$SUMMARY}"
tee summary.json << EOF
{"index": {}}
$SUMMARY
EOF
# send to ES
local resp
local err_status
resp=$(curl -s -X POST -u "$BENCHMARK_ES_USER:$BENCHMARK_ES_PW" "$BENCHMARK_ES_HOST/benchmark_summary/_bulk" -H 'Content-Type: application/json' --data-binary @"summary.json")
echo "$resp"
err_status=$(echo "$resp" | jq -r ".errors")
if [[ "$err_status" == "true" ]]; then
echo "Failed to send summary"
exit 1
fi
}
# $1: snapshot index
node_stats() {
NS_JSON="$SCRIPT_PATH/$NS_DIR/${QTYPE:0:1}_w${WORKER}b${BATCH_SIZE}_$1.json" # m_w8b1000_0.json
# curl inside container because docker on mac cannot resolve localhost to host network interface
docker exec -i ls curl localhost:9600/_node/stats > "$NS_JSON" 2> /dev/null
}
# $1: index
snapshot() {
node_stats $1
capture_stats
}
create_directory() {
NS_DIR="fb${FB_CNT}c${CPU}m${MEM}" # fb4c4m4
mkdir -p "$SCRIPT_PATH/$NS_DIR"
}
queue() {
for QTYPE in "persisted" "memory"; do
worker
done
}
worker() {
for m in "${MULTIPLIERS[@]}"; do
WORKER=$((CPU * m))
batch
done
}
batch() {
for BATCH_SIZE in "${BATCH_SIZES[@]}"; do
run_pipeline
stop_pipeline
done
}
run_pipeline() {
echo "--- Run pipeline. queue type: $QTYPE, worker: $WORKER, batch size: $BATCH_SIZE"
start_logstash
start_filebeat
docker ps
echo "(0) sleep 3m" && sleep 180
snapshot "0"
for i in {1..8}; do
echo "($i) sleep 30s" && sleep 30
snapshot "$i"
# print docker log when ingestion rate is zero
# remove '.' in number and return max val
[[ $(max -g "${CURRENT/./}" "0") -eq 0 ]] &&
docker logs fb0 &&
docker logs ls
done
aggregate_stats
send_summary
}
stop_pipeline() {
echo "--- Stop Pipeline"
for ((i = 0; i < FB_CNT; i++)); do
docker stop fb$i
docker rm fb$i
done
docker stop ls
docker rm ls
curl -u "$BENCHMARK_ES_USER:$BENCHMARK_ES_PW" -X DELETE $BENCHMARK_ES_HOST/_data_stream/logs-generic-default
echo " data stream deleted "
# TODO: clean page caches, reduce memory fragmentation
# https://github.com/elastic/logstash/pull/16191#discussion_r1647050216
}
clean_up() {
# stop log generation if it has not done yet
[[ -n $(docker ps | grep flog) ]] && docker stop flog || true
# remove image
docker image rm docker.elastic.co/logstash/logstash:$LS_VERSION
}

View file

@ -15,9 +15,8 @@ set -eo pipefail
# - The script sends a summary of EPS and resource usage to index `benchmark_summary`
# *******************************************************
SCRIPT_PATH="$(cd "$(dirname "$0")"; pwd)"
CONFIG_PATH="$SCRIPT_PATH/config"
source "$SCRIPT_PATH/util.sh"
SCRIPT_PATH="$(dirname "${BASH_SOURCE[0]}")"
source "$SCRIPT_PATH/core.sh"
## usage:
## main.sh FB_CNT QTYPE CPU MEM
@ -36,272 +35,9 @@ source "$SCRIPT_PATH/util.sh"
## MEM=4 # number of GB for Logstash container
## QTYPE=memory # queue type to test {persisted|memory|all}
## FB_CNT=4 # number of filebeats to use in benchmark
usage() {
echo "Usage: $0 [FB_CNT] [QTYPE] [CPU] [MEM]"
echo "Example: $0 4 {persisted|memory|all} 2 2"
exit 1
}
parse_args() {
while [[ "$#" -gt 0 ]]; do
if [ -z "$FB_CNT" ]; then
FB_CNT=$1
elif [ -z "$QTYPE" ]; then
case $1 in
all | persisted | memory)
QTYPE=$1
;;
*)
echo "Error: wrong queue type $1"
usage
;;
esac
elif [ -z "$CPU" ]; then
CPU=$1
elif [ -z "$MEM" ]; then
MEM=$1
else
echo "Error: Too many arguments"
usage
fi
shift
done
# set default value
# number of filebeat
FB_CNT=${FB_CNT:-4}
# all | persisted | memory
QTYPE=${QTYPE:-all}
CPU=${CPU:-4}
MEM=${MEM:-4}
XMX=$((MEM / 2))
IFS=','
# worker multiplier: 1,2,4
MULTIPLIERS="${MULTIPLIERS:-1,2,4}"
read -ra MULTIPLIERS <<< "$MULTIPLIERS"
BATCH_SIZES="${BATCH_SIZES:-500}"
read -ra BATCH_SIZES <<< "$BATCH_SIZES"
IFS=' '
echo "filebeats: $FB_CNT, cpu: $CPU, mem: $MEM, Queue: $QTYPE, worker multiplier: ${MULTIPLIERS[@]}, batch size: ${BATCH_SIZES[@]}"
}
get_secret() {
VAULT_PATH=secret/ci/elastic-logstash/benchmark
VAULT_DATA=$(vault kv get -format json $VAULT_PATH)
BENCHMARK_ES_HOST=$(echo $VAULT_DATA | jq -r '.data.es_host')
BENCHMARK_ES_USER=$(echo $VAULT_DATA | jq -r '.data.es_user')
BENCHMARK_ES_PW=$(echo $VAULT_DATA | jq -r '.data.es_pw')
MONITOR_ES_HOST=$(echo $VAULT_DATA | jq -r '.data.monitor_es_host')
MONITOR_ES_USER=$(echo $VAULT_DATA | jq -r '.data.monitor_es_user')
MONITOR_ES_PW=$(echo $VAULT_DATA | jq -r '.data.monitor_es_pw')
}
pull_images() {
echo "--- Pull docker images"
# pull the latest snapshot logstash image
if [[ -n "$LS_VERSION" ]]; then
docker pull "docker.elastic.co/logstash/logstash:$LS_VERSION"
else
# select the SNAPSHOT artifact with the highest semantic version number
LS_VERSION=$( curl --retry-all-errors --retry 5 --retry-delay 1 -s https://artifacts-api.elastic.co/v1/versions | jq -r '.versions | map(select(endswith("-SNAPSHOT"))) | max_by(rtrimstr("-SNAPSHOT")|split(".")|map(tonumber))' )
BUILD_ID=$(curl --retry-all-errors --retry 5 --retry-delay 1 -s "https://artifacts-api.elastic.co/v1/versions/${LS_VERSION}/builds/latest" | jq -re '.build.build_id')
ARCH=$(arch)
IMAGE_URL="https://snapshots.elastic.co/${BUILD_ID}/downloads/logstash/logstash-$LS_VERSION-docker-image-$ARCH.tar.gz"
IMAGE_FILENAME="$LS_VERSION.tar.gz"
echo "Download $LS_VERSION from $IMAGE_URL"
[[ ! -e $IMAGE_FILENAME ]] && curl -fsSL --retry-max-time 60 --retry 3 --retry-delay 5 -o "$IMAGE_FILENAME" "$IMAGE_URL"
[[ -z $(docker images -q docker.elastic.co/logstash/logstash:$LS_VERSION) ]] && docker load -i "$IMAGE_FILENAME"
fi
# pull filebeat image
FB_DEFAULT_VERSION="8.13.4"
FB_VERSION=${FB_VERSION:-$FB_DEFAULT_VERSION}
docker pull "docker.elastic.co/beats/filebeat:$FB_VERSION"
}
generate_logs() {
FLOG_PATH="$SCRIPT_PATH/flog"
mkdir -p $FLOG_PATH
if [[ ! -e "$FLOG_PATH/log4.log" ]]; then
echo "--- Generate logs in background. log: 5, size: 500mb"
docker run -d --name=flog --rm -v $FLOG_PATH:/go/src/data mingrammer/flog -t log -w -o "/go/src/data/log.log" -b 2621440000 -p 524288000
fi
}
check_logs() {
echo "--- Check log generation"
local cnt=0
until [[ -e "$FLOG_PATH/log4.log" || $cnt -gt 600 ]]; do
echo "wait 30s" && sleep 30
cnt=$((cnt + 30))
done
ls -lah $FLOG_PATH
}
start_logstash() {
LS_CONFIG_PATH=$SCRIPT_PATH/ls/config
mkdir -p $LS_CONFIG_PATH
cp $CONFIG_PATH/pipelines.yml $LS_CONFIG_PATH/pipelines.yml
cp $CONFIG_PATH/logstash.yml $LS_CONFIG_PATH/logstash.yml
LS_JAVA_OPTS=${LS_JAVA_OPTS:--Xmx${XMX}g}
docker run -d --name=ls --net=host --cpus=$CPU --memory=${MEM}g -e LS_JAVA_OPTS="$LS_JAVA_OPTS" \
-e QTYPE="$QTYPE" -e WORKER="$WORKER" -e BATCH_SIZE="$BATCH_SIZE" \
-e BENCHMARK_ES_HOST="$BENCHMARK_ES_HOST" -e BENCHMARK_ES_USER="$BENCHMARK_ES_USER" -e BENCHMARK_ES_PW="$BENCHMARK_ES_PW" \
-e MONITOR_ES_HOST="$MONITOR_ES_HOST" -e MONITOR_ES_USER="$MONITOR_ES_USER" -e MONITOR_ES_PW="$MONITOR_ES_PW" \
-v $LS_CONFIG_PATH/logstash.yml:/usr/share/logstash/config/logstash.yml:ro \
-v $LS_CONFIG_PATH/pipelines.yml:/usr/share/logstash/config/pipelines.yml:ro \
docker.elastic.co/logstash/logstash:$LS_VERSION
}
start_filebeat() {
for ((i = 0; i < FB_CNT; i++)); do
FB_PATH="$SCRIPT_PATH/fb${i}"
mkdir -p $FB_PATH
cp $CONFIG_PATH/filebeat.yml $FB_PATH/filebeat.yml
docker run -d --name=fb$i --net=host --user=root \
-v $FB_PATH/filebeat.yml:/usr/share/filebeat/filebeat.yml \
-v $SCRIPT_PATH/flog:/usr/share/filebeat/flog \
docker.elastic.co/beats/filebeat:$FB_VERSION filebeat -e --strict.perms=false
done
}
capture_stats() {
CURRENT=$(jq -r '.flow.output_throughput.current' $NS_JSON)
local eps_1m=$(jq -r '.flow.output_throughput.last_1_minute' $NS_JSON)
local eps_5m=$(jq -r '.flow.output_throughput.last_5_minutes' $NS_JSON)
local worker_util=$(jq -r '.pipelines.main.flow.worker_utilization.last_1_minute' $NS_JSON)
local worker_concurr=$(jq -r '.pipelines.main.flow.worker_concurrency.last_1_minute' $NS_JSON)
local cpu_percent=$(jq -r '.process.cpu.percent' $NS_JSON)
local heap=$(jq -r '.jvm.mem.heap_used_in_bytes' $NS_JSON)
local non_heap=$(jq -r '.jvm.mem.non_heap_used_in_bytes' $NS_JSON)
local q_event_cnt=$(jq -r '.pipelines.main.queue.events_count' $NS_JSON)
local q_size=$(jq -r '.pipelines.main.queue.queue_size_in_bytes' $NS_JSON)
TOTAL_EVENTS_OUT=$(jq -r '.pipelines.main.events.out' $NS_JSON)
printf "current: %s, 1m: %s, 5m: %s, worker_utilization: %s, worker_concurrency: %s, cpu: %s, heap: %s, non-heap: %s, q_events: %s, q_size: %s, total_events_out: %s\n" \
$CURRENT $eps_1m $eps_5m $worker_util $worker_concurr $cpu_percent $heap $non_heap $q_event_cnt $q_size $TOTAL_EVENTS_OUT
}
aggregate_stats() {
local file_glob="$SCRIPT_PATH/$NS_DIR/${QTYPE:0:1}_w${WORKER}b${BATCH_SIZE}_*.json"
MAX_EPS_1M=$( jqmax '.flow.output_throughput.last_1_minute' "$file_glob" )
MAX_EPS_5M=$( jqmax '.flow.output_throughput.last_5_minutes' "$file_glob" )
MAX_WORKER_UTIL=$( jqmax '.pipelines.main.flow.worker_utilization.last_1_minute' "$file_glob" )
MAX_WORKER_CONCURR=$( jqmax '.pipelines.main.flow.worker_concurrency.last_1_minute' "$file_glob" )
MAX_Q_EVENT_CNT=$( jqmax '.pipelines.main.queue.events_count' "$file_glob" )
MAX_Q_SIZE=$( jqmax '.pipelines.main.queue.queue_size_in_bytes' "$file_glob" )
AVG_CPU_PERCENT=$( jqavg '.process.cpu.percent' "$file_glob" )
AVG_VIRTUAL_MEM=$( jqavg '.process.mem.total_virtual_in_bytes' "$file_glob" )
AVG_HEAP=$( jqavg '.jvm.mem.heap_used_in_bytes' "$file_glob" )
AVG_NON_HEAP=$( jqavg '.jvm.mem.non_heap_used_in_bytes' "$file_glob" )
}
send_summary() {
echo "--- Send summary to Elasticsearch"
timestamp=$(date -u +"%Y-%m-%dT%H:%M:%S")
tee summary.json << EOF
{"index": {}}
{"timestamp": "$timestamp", "version": "$LS_VERSION", "cpu": "$CPU", "mem": "$MEM", "workers": "$WORKER", "batch_size": "$BATCH_SIZE", "queue_type": "$QTYPE", "total_events_out": "$TOTAL_EVENTS_OUT", "max_eps_1m": "$MAX_EPS_1M", "max_eps_5m": "$MAX_EPS_5M", "max_worker_utilization": "$MAX_WORKER_UTIL", "max_worker_concurrency": "$MAX_WORKER_CONCURR", "avg_cpu_percentage": "$AVG_CPU_PERCENT", "avg_heap": "$AVG_HEAP", "avg_non_heap": "$AVG_NON_HEAP", "avg_virtual_memory": "$AVG_VIRTUAL_MEM", "max_queue_events": "$MAX_Q_EVENT_CNT", "max_queue_bytes_size": "$MAX_Q_SIZE"}
EOF
curl -X POST -u "$BENCHMARK_ES_USER:$BENCHMARK_ES_PW" "$BENCHMARK_ES_HOST/benchmark_summary/_bulk" -H 'Content-Type: application/json' --data-binary @"summary.json"
echo ""
}
# $1: snapshot index
node_stats() {
NS_JSON="$SCRIPT_PATH/$NS_DIR/${QTYPE:0:1}_w${WORKER}b${BATCH_SIZE}_$1.json" # m_w8b1000_0.json
# curl inside container because docker on mac cannot resolve localhost to host network interface
docker exec -it ls curl localhost:9600/_node/stats > "$NS_JSON" 2> /dev/null
}
# $1: index
snapshot() {
node_stats $1
capture_stats
}
create_directory() {
NS_DIR="fb${FB_CNT}c${CPU}m${MEM}" # fb4c4m4
mkdir -p "$SCRIPT_PATH/$NS_DIR"
}
queue() {
for QTYPE in "persisted" "memory"; do
worker
done
}
worker() {
for m in "${MULTIPLIERS[@]}"; do
WORKER=$((CPU * m))
batch
done
}
batch() {
for BATCH_SIZE in "${BATCH_SIZES[@]}"; do
run_pipeline
stop_pipeline
done
}
run_pipeline() {
echo "--- Run pipeline. queue type: $QTYPE, worker: $WORKER, batch size: $BATCH_SIZE"
start_logstash
start_filebeat
docker ps
echo "(0) sleep 3m" && sleep 180
snapshot "0"
for i in {1..8}; do
echo "($i) sleep 30s" && sleep 30
snapshot "$i"
# print docker log when ingestion rate is zero
# remove '.' in number and return max val
[[ $(max -g "${CURRENT/./}" "0") -eq 0 ]] &&
docker logs fb0 &&
docker logs ls
done
aggregate_stats
send_summary
}
stop_pipeline() {
echo "--- Stop Pipeline"
for ((i = 0; i < FB_CNT; i++)); do
docker stop fb$i
docker rm fb$i
done
docker stop ls
docker rm ls
curl -u "$BENCHMARK_ES_USER:$BENCHMARK_ES_PW" -X DELETE $BENCHMARK_ES_HOST/_data_stream/logs-generic-default
echo " data stream deleted "
# TODO: clean page caches, reduce memory fragmentation
# https://github.com/elastic/logstash/pull/16191#discussion_r1647050216
}
## FLOG_FILE_CNT=4 # number of files to generate for ingestion
## VAULT_PATH=secret/path # vault path point to Elasticsearch credentials. The default value points to benchmark cluster.
## TAGS=test,other # tags with "," separator.
main() {
parse_args "$@"
get_secret
@ -317,8 +53,7 @@ main() {
worker
fi
# stop log generation if it has not done yet
[[ -n $(docker ps | grep flog) ]] && docker stop flog || true
clean_up
}
main "$@"

View file

@ -0,0 +1,44 @@
#!/usr/bin/env bash
set -eo pipefail
# *******************************************************
# Run benchmark for versions that have flow metrics
# When the hardware changes, run the marathon task to establish a new baseline.
# Usage:
# nohup bash -x all.sh > log.log 2>&1 &
# Accept env vars:
# STACK_VERSIONS=8.15.0,8.15.1,8.16.0-SNAPSHOT # versions to test. It is comma separator string
# *******************************************************
SCRIPT_PATH="$(dirname "${BASH_SOURCE[0]}")"
source "$SCRIPT_PATH/core.sh"
parse_stack_versions() {
IFS=','
STACK_VERSIONS="${STACK_VERSIONS:-8.6.0,8.7.0,8.8.0,8.9.0,8.10.0,8.11.0,8.12.0,8.13.0,8.14.0,8.15.0}"
read -ra STACK_VERSIONS <<< "$STACK_VERSIONS"
}
main() {
parse_stack_versions
parse_args "$@"
get_secret
generate_logs
check_logs
USER_QTYPE="$QTYPE"
for V in "${STACK_VERSIONS[@]}" ; do
LS_VERSION="$V"
QTYPE="$USER_QTYPE"
pull_images
create_directory
if [[ $QTYPE == "all" ]]; then
queue
else
worker
fi
done
}
main "$@"

View file

@ -0,0 +1,8 @@
## 20241210
Remove scripted field `5m_num` from dashboards
## 20240912
Updated runtime field `release` to return `true` when `version` contains "SNAPSHOT"
## 20240912
Initial dashboards

View file

@ -0,0 +1,14 @@
benchmark_objects.ndjson contains the following resources
- Dashboards
- daily snapshot
- released versions
- Data Views
- benchmark
- runtime fields
- | Fields Name | Type | Comment |
|--------------|---------------------------------------------------------------------------------------|--------------------------------------------------|
| versions_num | long | convert semantic versioning to number for graph sorting |
| release | boolean | `true` for released version. `false` for snapshot version. It is for graph filtering. |
To import objects to Kibana, navigate to Stack Management > Save Objects and click Import

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,6 @@
POST /_aliases
{
"actions": [
{ "add": { "index": "benchmark_summary_v2", "alias": "benchmark_summary" } }
]
}

View file

@ -0,0 +1,179 @@
PUT /benchmark_summary_v2/_mapping
{
"properties": {
"avg_cpu_percentage": {
"type": "float",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"avg_heap": {
"type": "float",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"avg_non_heap": {
"type": "float",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"avg_virtual_memory": {
"type": "float",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"batch_size": {
"type": "integer",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"cpu": {
"type": "float",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"max_eps_1m": {
"type": "float",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"max_eps_5m": {
"type": "float",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"max_queue_bytes_size": {
"type": "integer",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"max_queue_events": {
"type": "integer",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"max_worker_concurrency": {
"type": "float",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"max_worker_utilization": {
"type": "float",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"mem": {
"type": "float",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"queue_type": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"tag": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"timestamp": {
"type": "date"
},
"total_events_out": {
"type": "integer",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"version": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"workers": {
"type": "integer",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"tags" : {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}

View file

@ -30,3 +30,12 @@ jqavg() {
jqmax() {
jq -r "$1 | select(. != null)" $2 | jq -s . | jq 'max'
}
# return true if $1 is non empty and not "null"
not_empty() {
if [[ -n "$1" && "$1" != "null" ]]; then
return 0
else
return 1
fi
}