rework into topologies

This commit is contained in:
John E. Vincent 2012-02-05 00:38:40 -05:00
parent a94997ebc1
commit c331c7b8d0
2 changed files with 97 additions and 152 deletions

View file

@ -17,138 +17,75 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base
# 0mq socket address to connect or bind
# Please note that `inproc://` will not work with logstash
# As each we use a context per thread
# To listen on all addresses, use `*` as the ip address
config :address, :validate => :string, :default => "tcp://127.0.0.1:2120"
# as each we use a context per thread.
# By default, inputs bind/listen
# and outputs connect
config :address, :validate => :array, :default => ["tcp://*:2120"]
# 0mq queue size
config :queue_size, :validate => :number, :default => 20
# 0mq topology
# The default logstash topologies work as follows:
# * pushpull - inputs are pull, outputs are push
# * pubsub - inputs are subscribers, outputs are publishers
# * pair - inputs are clients, inputs are servers
#
# If the predefined topology flows don't work for you,
# you can change the 'mode' setting
# TODO (lusis) add req/rep MAYBE
# TODO (lusis) add router/dealer
config :topology, :validate => ["pushpull", "pubsub", "pair"]
# 0mq topic (Used with ZMQ_SUBSCRIBE, see http://api.zeromq.org/2-1:zmq-setsockopt
# for 'ZMQ_SUBSCRIBE: Establish message filter')
config :topic, :validate => :string, :default => "" # default all
# 0mq message pattern
# Instead of declaring socket_types in logstash,
# we abstract this to a 'pattern':
# This should cover most standard use cases
# In the event that you need finer-grained control,
# please use `socket_type` and not this option.
#
# To control if this is a sender or receiver, please set `mode` below
# TODO (lusis) fix req/rep
config :pattern, :validate => ["pushpull", "pubsub", "pair", "reqrep"]
# 0mq socket type
# There is no default.
# This setting conflicts with `pattern`
# TODO (lusis) fix req/rep
config :socket_type, :validate => ["req", "rep", "push", "pull", "sub", "pair"]
# 0mq swap size
# Controls buffering to disk
# in the event of messages counts exceeding the queue_size
# size in bytes
# Default: 15MB
# (ZMQ_SWAP)
config :swap_size, :validate => :number
# 0mq identity
# (ZMQ_IDENTITY)
config :identity, :validate => :string
# mode
# server mode binds/listens
# client mode connects
config :mode, :validate => ["server", "client"], :default => "server"
# 0mq socket options
# This exposes zmq_setsockopt
# for advanced tuning
# see http://api.zeromq.org/2-1:zmq-setsockopt for details
#
# This is where you would set values like:
# ZMQ::HWM - high water mark
# ZMQ::IDENTITY - named queues
# ZMQ::SWAP_SIZE - space for disk overflow
# ZMQ::SUBSCRIBE - topic filters for pubsub
#
# example: sockopt => ["ZMQ::HWM", 50, "ZMQ::IDENTITY", "my_named_queue"]
config :sockopt, :validate => :hash
# mode
# server mode binds/listens
# client mode connects
config :mode, :validate => ["server", "client"], :default => "client"
@source = "0mq_#{@address}/#{@queue}"
public
def register
require "ffi-rzmq"
require "logstash/util/zeromq"
self.class.send(:include, LogStash::Util::ZeroMQ)
if @pattern
%w{socket_type}.each do |var|
if instance_variable_get("@#{var}")
@logger.error("inputs/zeromq: You cannot specify "\
"'pattern' and also set '#{var}'")
raise "Invalid configuration detected. Please fix."
end # if instance_variable_get
end # socket_type each
case @pattern
when "pubsub"
# Input with pubsub only makes sense as a subscriber
@socket_type = "sub"
when "pushpull"
# Input with pushpull
# We can either listen for events
# or poll for events
@mode == "client" ? (@socket_type = "pull") : (@socket_type = "push")
when "pair"
# Input with pair
# We can either listen for events
# or poll for them
@socket_type = "pair"
when "reqrep"
# Input with reqrep
# We can either listen or events
# or poll for them
@mode == "client" ? (@socket_type = "req") : (@socket_type = "rep")
end # case pattern
end # if pattern
case @socket_type
when "req"
zmq_const = ZMQ::REQ
@mode = "client" # req is by nature a client. it REQuests
when "rep"
zmq_const = ZMQ::REP
@mode = "server" # rep is by nature a server. it REPlies
case @topology
when "pair"
zmq_const = ZMQ::PAIR
@mode ||= "server" # pair can be either server or client.
when "pull"
when "pushpull"
zmq_const = ZMQ::PULL
@mode = "client" # pull is by nature a client.
when "push"
zmq_const = ZMQ::PUSH
@mode = "server" # push is by nature a server
when "sub"
when "pubsub"
zmq_const = ZMQ::SUB
@mode = "client" # sub is by nature a client
end # case socket_type
@zsocket = context.socket(zmq_const)
error_check(@zsocket.setsockopt(ZMQ::HWM, @queue_size),
"while setting ZMQ:HWM == #{@queue_size.inspect}")
error_check(@zsocket.setsockopt(ZMQ::LINGER, 1),
"while setting ZMQ::LINGER == 1)")
if @swap_size
error_check(@zsocket.setsockopt(ZMQ::SWAP, @swap_size),
"while setting ZMQ::SWAP == #{@swap_size.inspect}")
end # if swap_size
if @identity
error_check(@zsocket.setsockopt(ZMQ::IDENTITY, @identity),
"while setting ZMQ::IDENTITY == #{@identity.inspect}")
end # if identity
if @socket_type == "sub"
error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, @topic),
"while setting ZMQ:SUBSCRIBE == #{@topic.inspect}")
end # if sub
setup(@zsocket, @address)
# TODO (lusis)
# wireup sockopt hash
if @sockopt
@sockopt.each do |opt,value|
sockopt = opt.split('::')[1]
option = ZMQ.const_defined?(sockopt) ? ZMQ.const_get(sockopt) : ZMQ.const_missing(sockopt)
error_check(@zsocket.setsockopt(option, value),
"while setting #{opt} == 1)")
end
end
@address.each do |addr|
setup(@zsocket, addr)
end
end # def register
def teardown

View file

@ -14,72 +14,80 @@ class LogStash::Outputs::ZeroMQ < LogStash::Outputs::Base
config_name "zeromq"
plugin_status "experimental"
# 0mq socket address to connect or bind to
config :address, :validate => :string, :default => "tcp://127.0.0.1:2120"
# 0mq socket address to connect or bind
# Please note that `inproc://` will not work with logstash
# As each we use a context per thread
# By default, inputs bind/listen
# and outputs connect
config :address, :validate => :array, :default => ["tcp://127.0.0.1:2120"]
# 0mq queue size
config :queue_size, :validate => :number, :default => 20
# 0mq topology
# The default logstash topologies work as follows:
# * pushpull - inputs are pull, outputs are push
# * pubsub - inputs are subscribers, outputs are publishers
# * pair - inputs are clients, inputs are servers
#
# If the predefined topology flows don't work for you,
# you can change the 'mode' setting
# TODO (lusis) add req/rep MAYBE
# TODO (lusis) add router/dealer
config :topology, :validate => ["pushpull", "pubsub", "pair"]
# 0mq topic (Used with ZMQ_SUBSCRIBE, see http://api.zeromq.org/2-1:zmq-setsockopt
# for 'ZMQ_SUBSCRIBE: Establish message filter')
config :queue_name, :validate => :string, :default => ""
# 0mq socket type
# There is no default.
config :socket_type, :validate => ["rep","push","pub","router","pair"], :required => true
# 0mq swap size
# Controls buffering to disk
# in the event of messages counts exceeding the queue_size
# size in bytes
# Default: 15MB
# (ZMQ_SWAP)
config :swap_size, :validate => :number, :default => 15728640
# 0mq identity
# (ZMQ_IDENTITY)
config :identity, :validate => :string
# mode
# server mode binds/listens
# client mode connects
config :mode, :validate => ["server", "client"], :default => "client"
# 0mq socket options
# This exposes zmq_setsockopt
# for advanced tuning
# see http://api.zeromq.org/2-1:zmq-setsockopt for details
#
# This is where you would set values like:
# ZMQ::HWM - high water mark
# ZMQ::IDENTITY - named queues
# ZMQ::SWAP_SIZE - space for disk overflow
# ZMQ::SUBSCRIBE - topic filters for pubsub
#
# example: sockopt => ["ZMQ::HWM", 50, "ZMQ::IDENTITY", "my_named_queue"]
config :sockopt, :validate => :hash
# mode
# server mode binds/listens
# client mode connects
# This only makes sense with "pair" types
# default pair mode is server
config :mode, :validate => [ "server", "client"], :default => "server"
public
def register
require "ffi-rzmq"
require "logstash/util/zeromq"
self.class.send(:include, LogStash::Util::ZeroMQ)
case @socket_type
when "rep"
zmq_const = ZMQ::REP
@mode = "server"
# Translate topology shorthand to socket types
case @topology
when "pair"
zmq_const = ZMQ::PAIR
@mode ||= "client"
when "push"
when "pushpull"
zmq_const = ZMQ::PUSH
@mode = "server"
when "pub"
when "pubsub"
zmq_const = ZMQ::PUB
@mode = "server"
end
end # case socket_type
@zsocket = context.socket(zmq_const)
error_check(@zsocket.setsockopt(ZMQ::HWM, @queue_size),
"while setting ZMQ:HWM == #{@queue_size.inspect}")
error_check(@zsocket.setsockopt(ZMQ::LINGER, 1),
"while setting ZMQ::LINGER == 1)")
error_check(@zsocket.setsockopt(ZMQ::SWAP, @swap_size),
"while setting ZMQ::SWAP == #{@swap_size}")
setup(@zsocket, @address)
# TODO (lusis)
# wireup sockopt hash better
# making assumptions on split
if @sockopt
@sockopt.each do |opt,value|
sockopt = opt.split('::')[1]
option = ZMQ.const_defined?(sockopt) ? ZMQ.const_get(sockopt) : ZMQ.const_missing(sockopt)
error_check(@zsocket.setsockopt(option, value),
"while setting #{opt} == 1)")
end
end
@address.each do |addr|
setup(@zsocket, addr)
end
end # def register
public