testing new config syntax

This commit is contained in:
John E. Vincent 2012-02-04 12:58:24 -05:00
parent 4da55915af
commit a94997ebc1

View file

@ -15,7 +15,10 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base
config_name "zeromq"
plugin_status "experimental"
# 0mq socket address to connect or bind to
# 0mq socket address to connect or bind
# Please note that `inproc://` will not work with logstash
# As each we use a context per thread
# To listen on all addresses, use `*` as the ip address
config :address, :validate => :string, :default => "tcp://127.0.0.1:2120"
# 0mq queue size
@ -23,11 +26,24 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base
# 0mq topic (Used with ZMQ_SUBSCRIBE, see http://api.zeromq.org/2-1:zmq-setsockopt
# for 'ZMQ_SUBSCRIBE: Establish message filter')
config :queue_name, :validate => :string, :default => "" # default all
config :topic, :validate => :string, :default => "" # default all
# 0mq message pattern
# Instead of declaring socket_types in logstash,
# we abstract this to a 'pattern':
# This should cover most standard use cases
# In the event that you need finer-grained control,
# please use `socket_type` and not this option.
#
# To control if this is a sender or receiver, please set `mode` below
# TODO (lusis) fix req/rep
config :pattern, :validate => ["pushpull", "pubsub", "pair", "reqrep"]
# 0mq socket type
# There is no default.
config :socket_type, :validate => ["req","pull","sub","dealer","pair"], :required => true
# This setting conflicts with `pattern`
# TODO (lusis) fix req/rep
config :socket_type, :validate => ["req", "rep", "push", "pull", "sub", "pair"]
# 0mq swap size
# Controls buffering to disk
@ -35,7 +51,7 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base
# size in bytes
# Default: 15MB
# (ZMQ_SWAP)
config :swap_size, :validate => :number, :default => 15728640
config :swap_size, :validate => :number
# 0mq identity
# (ZMQ_IDENTITY)
@ -48,9 +64,8 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base
config :sockopt, :validate => :hash
# mode
# server mode binds
# server mode binds/listens
# client mode connects
# This only makes sense with "pair" types
config :mode, :validate => ["server", "client"], :default => "client"
@source = "0mq_#{@address}/#{@queue}"
@ -60,31 +75,78 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base
require "ffi-rzmq"
require "logstash/util/zeromq"
self.class.send(:include, LogStash::Util::ZeroMQ)
if @pattern
%w{socket_type}.each do |var|
if instance_variable_get("@#{var}")
@logger.error("inputs/zeromq: You cannot specify "\
"'pattern' and also set '#{var}'")
raise "Invalid configuration detected. Please fix."
end # if instance_variable_get
end # socket_type each
case @pattern
when "pubsub"
# Input with pubsub only makes sense as a subscriber
@socket_type = "sub"
when "pushpull"
# Input with pushpull
# We can either listen for events
# or poll for events
@mode == "client" ? (@socket_type = "pull") : (@socket_type = "push")
when "pair"
# Input with pair
# We can either listen for events
# or poll for them
@socket_type = "pair"
when "reqrep"
# Input with reqrep
# We can either listen or events
# or poll for them
@mode == "client" ? (@socket_type = "req") : (@socket_type = "rep")
end # case pattern
end # if pattern
case @socket_type
when "req"
zmq_const = ZMQ::REQ
@mode = "client"
@mode = "client" # req is by nature a client. it REQuests
when "rep"
zmq_const = ZMQ::REP
@mode = "server" # rep is by nature a server. it REPlies
when "pair"
zmq_const = ZMQ::PAIR
@mode ||= "server"
zmq_const = ZMQ::PAIR
@mode ||= "server" # pair can be either server or client.
when "pull"
zmq_const = ZMQ::PULL
@mode = "client"
@mode = "client" # pull is by nature a client.
when "push"
zmq_const = ZMQ::PUSH
@mode = "server" # push is by nature a server
when "sub"
zmq_const = ZMQ::SUB
@mode = "client"
end
@mode = "client" # sub is by nature a client
end # case socket_type
@zsocket = context.socket(zmq_const)
error_check(@zsocket.setsockopt(ZMQ::HWM, @queue_size),
"while setting ZMQ:HWM == #{@queue_size.inspect}")
error_check(@zsocket.setsockopt(ZMQ::LINGER, 1),
"while setting ZMQ::LINGER == 1)")
error_check(@zsocket.setsockopt(ZMQ::SWAP, @swap_size),
"while setting ZMQ::SWAP == #{@swap_size}")
if @swap_size
error_check(@zsocket.setsockopt(ZMQ::SWAP, @swap_size),
"while setting ZMQ::SWAP == #{@swap_size.inspect}")
end # if swap_size
if @identity
error_check(@zsocket.setsockopt(ZMQ::IDENTITY, @identity),
"while setting ZMQ::IDENTITY == #{@identity.inspect}")
end # if identity
if @socket_type == "sub"
error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, @queue_name),
"while setting ZMQ:SUBSCRIBE == #{@queue.inspect}")
end
error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, @topic),
"while setting ZMQ:SUBSCRIBE == #{@topic.inspect}")
end # if sub
setup(@zsocket, @address)
end # def register