mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
parent
54cb320adc
commit
8b6c8e035c
4 changed files with 0 additions and 398 deletions
|
@ -1,144 +0,0 @@
|
|||
require 'logstash/namespace'
|
||||
require 'logstash/inputs/base'
|
||||
|
||||
# This input will read events from a Kafka topic. It uses the high level consumer API provided
|
||||
# by Kafka to read messages from the broker. It also maintains the state of what has been
|
||||
# consumed using Zookeeper. The default input codec is json
|
||||
#
|
||||
# The only required configuration is the topic name. By default it will connect to a Zookeeper
|
||||
# running on localhost. All the broker information is read from Zookeeper state
|
||||
#
|
||||
# Ideally you should have as many threads as the number of partitions for a perfect balance --
|
||||
# more threads than partitions means that some threads will be idle
|
||||
#
|
||||
# For more information see http://kafka.apache.org/documentation.html#theconsumer
|
||||
#
|
||||
# Kafka consumer configuration: http://kafka.apache.org/documentation.html#consumerconfigs
|
||||
#
|
||||
class LogStash::Inputs::Kafka < LogStash::Inputs::Base
|
||||
config_name 'kafka'
|
||||
milestone 1
|
||||
|
||||
default :codec, 'json'
|
||||
|
||||
# Specifies the ZooKeeper connection string in the form hostname:port where host and port are
|
||||
# the host and port of a ZooKeeper server. You can also specify multiple hosts in the form
|
||||
# hostname1:port1,hostname2:port2,hostname3:port3.
|
||||
config :zk_connect, :validate => :string, :default => 'localhost:2181'
|
||||
# A string that uniquely identifies the group of consumer processes to which this consumer
|
||||
# belongs. By setting the same group id multiple processes indicate that they are all part of
|
||||
# the same consumer group.
|
||||
config :group_id, :validate => :string, :default => 'logstash'
|
||||
# The topic to consume messages from
|
||||
config :topic_id, :validate => :string, :required => true
|
||||
# Specify whether to jump to beginning of the queue when there is no initial offset in
|
||||
# ZooKeeper, or if an offset is out of range. If this is false, messages are consumed
|
||||
# from the latest offset
|
||||
config :reset_beginning, :validate => :boolean, :default => false
|
||||
# Number of threads to read from the partitions. Ideally you should have as many threads as the
|
||||
# number of partitions for a perfect balance. More threads than partitions means that some
|
||||
# threads will be idle. Less threads means a single thread could be consuming from more than
|
||||
# one partition
|
||||
config :consumer_threads, :validate => :number, :default => 1
|
||||
# Internal Logstash queue size used to hold events in memory after it has been read from Kafka
|
||||
config :queue_size, :validate => :number, :default => 20
|
||||
# When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the
|
||||
# load to assign partitions to each consumer. If the set of consumers changes while this
|
||||
# assignment is taking place the rebalance will fail and retry. This setting controls the
|
||||
# maximum number of attempts before giving up.
|
||||
config :rebalance_max_retries, :validate => :number, :default => 4
|
||||
# Backoff time between retries during rebalance.
|
||||
config :rebalance_backoff_ms, :validate => :number, :default => 2000
|
||||
# Throw a timeout exception to the consumer if no message is available for consumption after
|
||||
# the specified interval
|
||||
config :consumer_timeout_ms, :validate => :number, :default => -1
|
||||
# Option to restart the consumer loop on error
|
||||
config :consumer_restart_on_error, :validate => :boolean, :default => true
|
||||
# Time in millis to wait for consumer to restart after an error
|
||||
config :consumer_restart_sleep_ms, :validate => :number, :default => 0
|
||||
# Option to add Kafka metadata like topic, message size to the event
|
||||
config :decorate_events, :validate => :boolean, :default => false
|
||||
# A unique id for the consumer; generated automatically if not set.
|
||||
config :consumer_id, :validate => :string, :default => nil
|
||||
# The number of byes of messages to attempt to fetch for each topic-partition in each fetch
|
||||
# request. These bytes will be read into memory for each partition, so this helps control
|
||||
# the memory used by the consumer. The fetch request size must be at least as large as the
|
||||
# maximum message size the server allows or else it is possible for the producer to send
|
||||
# messages larger than the consumer can fetch.
|
||||
config :fetch_message_max_bytes, :validate => :number, :default => 1048576
|
||||
|
||||
public
|
||||
def register
|
||||
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/kafka*/libs/*.jar")
|
||||
Dir[jarpath].each do |jar|
|
||||
require jar
|
||||
end
|
||||
require 'jruby-kafka'
|
||||
options = {
|
||||
:zk_connect => @zk_connect,
|
||||
:group_id => @group_id,
|
||||
:topic_id => @topic_id,
|
||||
:rebalance_max_retries => @rebalance_max_retries,
|
||||
:rebalance_backoff_ms => @rebalance_backoff_ms,
|
||||
:consumer_timeout_ms => @consumer_timeout_ms,
|
||||
:consumer_restart_on_error => @consumer_restart_on_error,
|
||||
:consumer_restart_sleep_ms => @consumer_restart_sleep_ms,
|
||||
:consumer_id => @consumer_id,
|
||||
:fetch_message_max_bytes => @fetch_message_max_bytes
|
||||
}
|
||||
if @reset_beginning == true
|
||||
options[:reset_beginning] = 'from-beginning'
|
||||
end # if :reset_beginning
|
||||
@kafka_client_queue = SizedQueue.new(@queue_size)
|
||||
@consumer_group = Kafka::Group.new(options)
|
||||
@logger.info('Registering kafka', :group_id => @group_id, :topic_id => @topic_id, :zk_connect => @zk_connect)
|
||||
end # def register
|
||||
|
||||
public
|
||||
def run(logstash_queue)
|
||||
java_import 'kafka.common.ConsumerRebalanceFailedException'
|
||||
@logger.info('Running kafka', :group_id => @group_id, :topic_id => @topic_id, :zk_connect => @zk_connect)
|
||||
begin
|
||||
@consumer_group.run(@consumer_threads,@kafka_client_queue)
|
||||
begin
|
||||
while true
|
||||
event = @kafka_client_queue.pop
|
||||
queue_event("#{event}",logstash_queue)
|
||||
end
|
||||
rescue LogStash::ShutdownSignal
|
||||
@logger.info('Kafka got shutdown signal')
|
||||
@consumer_group.shutdown()
|
||||
end
|
||||
until @kafka_client_queue.empty?
|
||||
queue_event("#{@kafka_client_queue.pop}",logstash_queue)
|
||||
end
|
||||
@logger.info('Done running kafka input')
|
||||
rescue => e
|
||||
@logger.warn('kafka client threw exception, restarting',
|
||||
:exception => e)
|
||||
if @consumer_group.running?
|
||||
@consumer_group.shutdown()
|
||||
end
|
||||
sleep(Float(@consumer_restart_sleep_ms) * 1 / 1000)
|
||||
retry
|
||||
end
|
||||
finished
|
||||
end # def run
|
||||
|
||||
private
|
||||
def queue_event(msg, output_queue)
|
||||
begin
|
||||
@codec.decode(msg) do |event|
|
||||
decorate(event)
|
||||
if @decorate_events
|
||||
event['kafka'] = {'msg_size' => msg.bytesize, 'topic' => @topic_id, 'consumer_group' => @group_id}
|
||||
end
|
||||
output_queue << event
|
||||
end # @codec.decode
|
||||
rescue => e # parse or event creation error
|
||||
@logger.error("Failed to create event", :message => msg, :exception => e,
|
||||
:backtrace => e.backtrace);
|
||||
end # begin
|
||||
end # def queue_event
|
||||
|
||||
end #class LogStash::Inputs::Kafka
|
|
@ -1,158 +0,0 @@
|
|||
require 'logstash/namespace'
|
||||
require 'logstash/outputs/base'
|
||||
|
||||
# Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on
|
||||
# the broker.
|
||||
#
|
||||
# The only required configuration is the topic name. The default codec is json,
|
||||
# so events will be persisted on the broker in json format. If you select a codec of plain,
|
||||
# Logstash will encode your messages with not only the message but also with a timestamp and
|
||||
# hostname. If you do not want anything but your message passing through, you should make the output
|
||||
# configuration something like:
|
||||
# output {
|
||||
# kafka {
|
||||
# codec => plain {
|
||||
# format => "%{message}"
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
# For more information see http://kafka.apache.org/documentation.html#theproducer
|
||||
#
|
||||
# Kafka producer configuration: http://kafka.apache.org/documentation.html#producerconfigs
|
||||
class LogStash::Outputs::Kafka < LogStash::Outputs::Base
|
||||
config_name 'kafka'
|
||||
milestone 1
|
||||
|
||||
default :codec, 'json'
|
||||
# This is for bootstrapping and the producer will only use it for getting metadata (topics,
|
||||
# partitions and replicas). The socket connections for sending the actual data will be
|
||||
# established based on the broker information returned in the metadata. The format is
|
||||
# host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a
|
||||
# subset of brokers.
|
||||
config :broker_list, :validate => :string, :default => 'localhost:9092'
|
||||
# The topic to produce the messages to
|
||||
config :topic_id, :validate => :string, :required => true
|
||||
# This parameter allows you to specify the compression codec for all data generated by this
|
||||
# producer. Valid values are "none", "gzip" and "snappy".
|
||||
config :compression_codec, :validate => %w( none gzip snappy ), :default => 'none'
|
||||
# This parameter allows you to set whether compression should be turned on for particular
|
||||
# topics. If the compression codec is anything other than NoCompressionCodec,
|
||||
# enable compression only for specified topics if any. If the list of compressed topics is
|
||||
# empty, then enable the specified compression codec for all topics. If the compression codec
|
||||
# is NoCompressionCodec, compression is disabled for all topics
|
||||
config :compressed_topics, :validate => :string, :default => ''
|
||||
# This value controls when a produce request is considered completed. Specifically,
|
||||
# how many other brokers must have committed the data to their log and acknowledged this to the
|
||||
# leader. For more info, see -- http://kafka.apache.org/documentation.html#producerconfigs
|
||||
config :request_required_acks, :validate => [-1,0,1], :default => 0
|
||||
# The serializer class for messages. The default encoder takes a byte[] and returns the same byte[]
|
||||
config :serializer_class, :validate => :string, :default => 'kafka.serializer.StringEncoder'
|
||||
# The partitioner class for partitioning messages amongst partitions in the topic. The default
|
||||
# partitioner is based on the hash of the key. If the key is null,
|
||||
# the message is sent to a random partition in the broker.
|
||||
# NOTE: topic_metadata_refresh_interval_ms controls how long the producer will distribute to a
|
||||
# partition in the topic. This defaults to 10 mins, so the producer will continue to write to a
|
||||
# single partition for 10 mins before it switches
|
||||
config :partitioner_class, :validate => :string, :default => 'kafka.producer.DefaultPartitioner'
|
||||
# The amount of time the broker will wait trying to meet the request.required.acks requirement
|
||||
# before sending back an error to the client.
|
||||
config :request_timeout_ms, :validate => :number, :default => 10000
|
||||
# This parameter specifies whether the messages are sent asynchronously in a background thread.
|
||||
# Valid values are (1) async for asynchronous send and (2) sync for synchronous send. By
|
||||
# setting the producer to async we allow batching together of requests (which is great for
|
||||
# throughput) but open the possibility of a failure of the client machine dropping unsent data.
|
||||
config :producer_type, :validate => %w( sync async ), :default => 'sync'
|
||||
# The serializer class for keys (defaults to the same as for messages if nothing is given)
|
||||
config :key_serializer_class, :validate => :string, :default => nil
|
||||
# This property will cause the producer to automatically retry a failed send request. This
|
||||
# property specifies the number of retries when such failures occur. Note that setting a
|
||||
# non-zero value here can lead to duplicates in the case of network errors that cause a message
|
||||
# to be sent but the acknowledgement to be lost.
|
||||
config :message_send_max_retries, :validate => :number, :default => 3
|
||||
# Before each retry, the producer refreshes the metadata of relevant topics to see if a new
|
||||
# leader has been elected. Since leader election takes a bit of time,
|
||||
# this property specifies the amount of time that the producer waits before refreshing the
|
||||
# metadata.
|
||||
config :retry_backoff_ms, :validate => :number, :default => 100
|
||||
# The producer generally refreshes the topic metadata from brokers when there is a failure
|
||||
# (partition missing, leader not available...). It will also poll regularly (default: every
|
||||
# 10min so 600000ms). If you set this to a negative value, metadata will only get refreshed on
|
||||
# failure. If you set this to zero, the metadata will get refreshed after each message sent
|
||||
# (not recommended). Important note: the refresh happen only AFTER the message is sent,
|
||||
# so if the producer never sends a message the metadata is never refreshed
|
||||
config :topic_metadata_refresh_interval_ms, :validate => :number, :default => 600 * 1000
|
||||
# Maximum time to buffer data when using async mode. For example a setting of 100 will try to
|
||||
# batch together 100ms of messages to send at once. This will improve throughput but adds
|
||||
# message delivery latency due to the buffering.
|
||||
config :queue_buffering_max_ms, :validate => :number, :default => 5000
|
||||
# The maximum number of unsent messages that can be queued up the producer when using async
|
||||
# mode before either the producer must be blocked or data must be dropped.
|
||||
config :queue_buffering_max_messages, :validate => :number, :default => 10000
|
||||
# The amount of time to block before dropping messages when running in async mode and the
|
||||
# buffer has reached queue.buffering.max.messages. If set to 0 events will be enqueued
|
||||
# immediately or dropped if the queue is full (the producer send call will never block). If set
|
||||
# to -1 the producer will block indefinitely and never willingly drop a send.
|
||||
config :queue_enqueue_timeout_ms, :validate => :number, :default => -1
|
||||
# The number of messages to send in one batch when using async mode. The producer will wait
|
||||
# until either this number of messages are ready to send or queue.buffer.max.ms is reached.
|
||||
config :batch_num_messages, :validate => :number, :default => 200
|
||||
# Socket write buffer size
|
||||
config :send_buffer_bytes, :validate => :number, :default => 100 * 1024
|
||||
# The client id is a user-specified string sent in each request to help trace calls. It should
|
||||
# logically identify the application making the request.
|
||||
config :client_id, :validate => :string, :default => ""
|
||||
|
||||
public
|
||||
def register
|
||||
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/kafka*/libs/*.jar")
|
||||
Dir[jarpath].each do |jar|
|
||||
require jar
|
||||
end
|
||||
require 'jruby-kafka'
|
||||
options = {
|
||||
:broker_list => @broker_list,
|
||||
:compression_codec => @compression_codec,
|
||||
:compressed_topics => @compressed_topics,
|
||||
:request_required_acks => @request_required_acks,
|
||||
:serializer_class => @serializer_class,
|
||||
:partitioner_class => @partitioner_class,
|
||||
:request_timeout_ms => @request_timeout_ms,
|
||||
:producer_type => @producer_type,
|
||||
:key_serializer_class => @key_serializer_class,
|
||||
:message_send_max_retries => @message_send_max_retries,
|
||||
:retry_backoff_ms => @retry_backoff_ms,
|
||||
:topic_metadata_refresh_interval_ms => @topic_metadata_refresh_interval_ms,
|
||||
:queue_buffering_max_ms => @queue_buffering_max_ms,
|
||||
:queue_buffering_max_messages => @queue_buffering_max_messages,
|
||||
:queue_enqueue_timeout_ms => @queue_enqueue_timeout_ms,
|
||||
:batch_num_messages => @batch_num_messages,
|
||||
:send_buffer_bytes => @send_buffer_bytes,
|
||||
:client_id => @client_id
|
||||
}
|
||||
@producer = Kafka::Producer.new(options)
|
||||
@producer.connect()
|
||||
|
||||
@logger.info('Registering kafka producer', :topic_id => @topic_id, :broker_list => @broker_list)
|
||||
|
||||
@codec.on_event do |event|
|
||||
begin
|
||||
@producer.sendMsg(@topic_id,nil,event)
|
||||
rescue LogStash::ShutdownSignal
|
||||
@logger.info('Kafka producer got shutdown signal')
|
||||
rescue => e
|
||||
@logger.warn('kafka producer threw exception, restarting',
|
||||
:exception => e)
|
||||
end
|
||||
end
|
||||
end # def register
|
||||
|
||||
def receive(event)
|
||||
return unless output?(event)
|
||||
if event == LogStash::SHUTDOWN
|
||||
finished
|
||||
return
|
||||
end
|
||||
@codec.encode(event)
|
||||
end
|
||||
|
||||
end #class LogStash::Outputs::Kafka
|
|
@ -1,57 +0,0 @@
|
|||
# encoding: utf-8
|
||||
|
||||
require 'rspec'
|
||||
require 'insist'
|
||||
require 'logstash/namespace'
|
||||
require 'logstash/inputs/kafka'
|
||||
require 'logstash/errors'
|
||||
|
||||
describe LogStash::Inputs::Kafka do
|
||||
|
||||
|
||||
let (:kafka_config) {{"topic_id" => "test"}}
|
||||
|
||||
it 'should populate kafka config with default values' do
|
||||
kafka = LogStash::Inputs::Kafka.new(kafka_config)
|
||||
insist {kafka.zk_connect} == "localhost:2181"
|
||||
insist {kafka.topic_id} == "test"
|
||||
insist {kafka.group_id} == "logstash"
|
||||
insist {kafka.reset_beginning} == false
|
||||
end
|
||||
|
||||
it "should register and load kafka jars without errors" do
|
||||
kafka = LogStash::Inputs::Kafka.new(kafka_config)
|
||||
kafka.register
|
||||
end
|
||||
|
||||
it "should retrieve event from kafka" do
|
||||
# Extend class to control behavior
|
||||
class LogStash::Inputs::TestKafka < LogStash::Inputs::Kafka
|
||||
milestone 1
|
||||
private
|
||||
def queue_event(msg, output_queue)
|
||||
super(msg, output_queue)
|
||||
# need to raise exception here to stop the infinite loop
|
||||
raise LogStash::ShutdownSignal
|
||||
end
|
||||
end
|
||||
|
||||
kafka = LogStash::Inputs::TestKafka.new(kafka_config)
|
||||
kafka.register
|
||||
|
||||
class Kafka::Group
|
||||
public
|
||||
def run(a_numThreads, a_queue)
|
||||
a_queue << "Kafka message"
|
||||
end
|
||||
end
|
||||
|
||||
logstash_queue = Queue.new
|
||||
kafka.run logstash_queue
|
||||
e = logstash_queue.pop
|
||||
insist { e["message"] } == "Kafka message"
|
||||
# no metadata by default
|
||||
insist { e["kafka"] } == nil
|
||||
end
|
||||
|
||||
end
|
|
@ -1,39 +0,0 @@
|
|||
# encoding: utf-8
|
||||
|
||||
require 'rspec'
|
||||
require 'insist'
|
||||
require 'logstash/namespace'
|
||||
require "logstash/timestamp"
|
||||
require 'logstash/outputs/kafka'
|
||||
|
||||
describe LogStash::Outputs::Kafka do
|
||||
|
||||
let (:kafka_config) {{"topic_id" => "test"}}
|
||||
|
||||
it 'should populate kafka config with default values' do
|
||||
kafka = LogStash::Outputs::Kafka.new(kafka_config)
|
||||
insist {kafka.broker_list} == "localhost:9092"
|
||||
insist {kafka.topic_id} == "test"
|
||||
insist {kafka.compression_codec} == "none"
|
||||
insist {kafka.serializer_class} == "kafka.serializer.StringEncoder"
|
||||
insist {kafka.partitioner_class} == "kafka.producer.DefaultPartitioner"
|
||||
insist {kafka.producer_type} == "sync"
|
||||
end
|
||||
|
||||
it "should register and load kafka jars without errors" do
|
||||
kafka = LogStash::Outputs::Kafka.new(kafka_config)
|
||||
kafka.register
|
||||
end
|
||||
|
||||
it "should send logstash event to kafka broker" do
|
||||
kafka = LogStash::Outputs::Kafka.new(kafka_config)
|
||||
kafka.register
|
||||
timestamp = LogStash::Timestamp.now
|
||||
expect_any_instance_of(Kafka::Producer)
|
||||
.to receive(:sendMsg)
|
||||
.with("test", nil, "{\"message\":\"hello world\",\"host\":\"test\",\"@timestamp\":\"#{timestamp}\",\"@version\":\"1\"}")
|
||||
e = LogStash::Event.new({"message" => "hello world", "host" => "test", "@timestamp" => timestamp})
|
||||
kafka.receive(e)
|
||||
end
|
||||
|
||||
end
|
Loading…
Add table
Add a link
Reference in a new issue