Fix kafka setup scripts

This commit updates the kafka setup scripts to ensure that the kafka setup is clean between builds, by
setting an explicit zookeeper data directory to be cleaned each time, and correctly overriding `log.dirs`
instead of `log.dir` to ensure that the kafka logs are written and wiped in a consistent place each time,
which helps when using the non-immutable images used in arm64 tests.
This commit is contained in:
Rob Bavey 2020-06-16 12:54:02 -04:00 committed by Robert Bavey
parent efbdd8d27c
commit 7a4b81363b
2 changed files with 18 additions and 14 deletions

View file

@ -17,6 +17,7 @@ KAFKA_HOME=$INSTALL_DIR/kafka
KAFKA_TOPIC=logstash_topic_plain
KAFKA_MESSAGES=37
KAFKA_LOGS_DIR=/tmp/ls_integration/kafka-logs
ZOOKEEPER_DATA_DIR=/tmp/ls_integration/zookeeper
setup_kafka() {
local version=$1
@ -25,17 +26,19 @@ setup_kafka() {
curl -s -o $INSTALL_DIR/kafka.tgz "https://mirrors.ocf.berkeley.edu/apache/kafka/$version/kafka_2.11-$version.tgz"
mkdir $KAFKA_HOME && tar xzf $INSTALL_DIR/kafka.tgz -C $KAFKA_HOME --strip-components 1
rm $INSTALL_DIR/kafka.tgz
echo "dataDir=$ZOOKEEPER_DATA_DIR" >> $KAFKA_HOME/config/zookeeper.properties
fi
}
start_kafka() {
echo "Starting ZooKeeper"
$KAFKA_HOME/bin/zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
wait_for_port 2181
echo "Starting Kafka broker"
rm -rf ${KAFKA_LOGS_DIR}
mkdir -p ${KAFKA_LOGS_DIR}
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties --override delete.topic.enable=true --override advertised.host.name=127.0.0.1 --override log.dir=${KAFKA_LOGS_DIR} --override log.flush.interval.ms=200
rm -rf ${ZOOKEEPER_DATA_DIR}
mkdir -p ${ZOOKEEPER_DATA_DIR}
$KAFKA_HOME/bin/zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
wait_for_port 2181
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties --override delete.topic.enable=true --override advertised.host.name=127.0.0.1 --override log.dir=${KAFKA_LOGS_DIR} --override log.dirs=${KAFKA_LOGS_DIR} --override log.flush.interval.ms=200
wait_for_port 9092
}

View file

@ -30,21 +30,22 @@ describe "Test Kafka Input" do
}
after(:all) {
@fixture.teardown
@fixture.teardown unless @fixture.nil?
}
it "can ingest 37 apache log lines from Kafka broker" do
logstash_service = @fixture.get_service("logstash")
logstash_service.start_background(@fixture.config)
unless @fixture.nil?
logstash_service = @fixture.get_service("logstash")
logstash_service.start_background(@fixture.config)
try(num_retries) do
expect(@fixture.output_exists?).to be true
end
try(num_retries) do
expect(@fixture.output_exists?).to be true
end
try(num_retries) do
count = File.foreach(@fixture.actual_output).inject(0) {|c, _| c+1}
expect(count).to eq(num_events)
try(num_retries) do
count = File.foreach(@fixture.actual_output).inject(0) {|c, _| c+1}
expect(count).to eq(num_events)
end
end
end
end