mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
update beanstalk input/output
This commit is contained in:
parent
1f86bdcd70
commit
0e34d43a66
2 changed files with 35 additions and 29 deletions
|
@ -1,41 +1,43 @@
|
|||
require "em-jack"
|
||||
require "logstash/inputs/base"
|
||||
require "logstash/namespace"
|
||||
require "beanstalk-client"
|
||||
|
||||
class LogStash::Inputs::Beanstalk < LogStash::Inputs::Base
|
||||
|
||||
config_name "beanstalk"
|
||||
config :host, :validate => :string, :required => true
|
||||
config :port, :validate => :number
|
||||
config :tube, :validate => :string, :required => true
|
||||
|
||||
public
|
||||
def initialize(params)
|
||||
super
|
||||
raise "issue/17: needs refactor to support configfile"
|
||||
|
||||
if @url.path == "" or @url.path == "/"
|
||||
raise "must specify a tube for beanstalk output"
|
||||
end
|
||||
@port ||= 11300
|
||||
end # def initialize
|
||||
|
||||
public
|
||||
def register
|
||||
tube = @url.path[1..-1] # Skip leading '/'
|
||||
port = @url.port || 11300
|
||||
@beanstalk = EMJack::Connection.new(:host => @url.host,
|
||||
:port => port,
|
||||
:tube => tube)
|
||||
@beanstalk.each_job do |job|
|
||||
# TODO(petef): support pools of beanstalkd servers
|
||||
# TODO(petef): check for errors
|
||||
@beanstalk = Beanstalk::Pool.new(["#{@host}:#{@port}"])
|
||||
@beanstalk.watch(@tube)
|
||||
end # def register
|
||||
|
||||
public
|
||||
def run(output_queue)
|
||||
loop do
|
||||
job = @beanstalk.reserve
|
||||
begin
|
||||
event = LogStash::Event.from_json(job.body)
|
||||
rescue => e
|
||||
@logger.warn(["Trouble parsing beanstalk job",
|
||||
{:error => e.message, :body => job.body,
|
||||
:backtrace => e.backtrace}])
|
||||
@beanstalk.bury(job, 0)
|
||||
job.bury(job, 0)
|
||||
end
|
||||
|
||||
receive(event)
|
||||
@beanstalk.delete(job)
|
||||
end # @beanstalk.each_job
|
||||
end # def register
|
||||
output_queue << event
|
||||
job.delete
|
||||
end
|
||||
end # def run
|
||||
end # class LogStash::Inputs::Beanstalk
|
||||
|
|
|
@ -1,33 +1,37 @@
|
|||
require "logstash/outputs/base"
|
||||
require "logstash/namespace"
|
||||
require "em-jack"
|
||||
require "beanstalk-client"
|
||||
|
||||
class LogStash::Outputs::Beanstalk < LogStash::Outputs::Base
|
||||
|
||||
config_name "beanstalk"
|
||||
config :host, :validate => :string, :required => true
|
||||
config :port, :validate => :number
|
||||
config :tube, :validate => :string, :required => true
|
||||
config :priority, :validate => :number
|
||||
config :delay, :validate => :number
|
||||
config :ttr, :validate => :number
|
||||
|
||||
public
|
||||
def initialize(params)
|
||||
super
|
||||
|
||||
@ttr = @urlopts["ttr"] || 300;
|
||||
if @url.path == "" or @url.path == "/"
|
||||
raise "must specify a tube for beanstalk output"
|
||||
end
|
||||
@port ||= 11300
|
||||
@priority ||= 65536
|
||||
@delay ||= 0
|
||||
@ttr ||= 300
|
||||
end
|
||||
|
||||
public
|
||||
def register
|
||||
tube = @url.path[1..-1] # Skip leading '/'
|
||||
port = @url.port || 11300
|
||||
@beanstalk = EMJack::Connection.new(:host => @url.host,
|
||||
:port => port,
|
||||
:tube => tube)
|
||||
# TODO(petef): support pools of beanstalkd servers
|
||||
# TODO(petef): check for errors
|
||||
@beanstalk = Beanstalk::Pool.new(["#{@host}:#{@port}"])
|
||||
@beanstalk.use(@tube)
|
||||
end # def register
|
||||
|
||||
public
|
||||
def receive(event)
|
||||
@beanstalk.put(event.to_json, :ttr => @ttr)
|
||||
end # def receive
|
||||
@beanstalk.put(event.to_json, @priority, @delay, @ttr)
|
||||
end # def register
|
||||
end # class LogStash::Outputs::Beanstalk
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue