add beanstalk input & output

This commit is contained in:
Pete Fritchman 2010-12-04 12:27:24 -05:00
parent af2fe853da
commit 9ebc0874ea
2 changed files with 56 additions and 0 deletions

View file

@ -0,0 +1,31 @@
require "logstash/inputs/base"
require "em-jack"
class LogStash::Inputs::Beanstalk < LogStash::Inputs::Base
def initialize(url, type, config={}, &block)
super
if @url.path == "" or @url.path == "/"
raise "must specify a tube for beanstalk output"
end
end
def register
tube = @url.path[1..-1] # Skip leading '/'
port = @url.port || 11300
@beanstalk = EMJack::Connection.new(:host => @url.host,
:port => port,
:tube => tube)
@beanstalk.each_job do |job|
begin
event = LogStash::Event.from_json(job.body)
receive(event)
@beanstalk.delete(job)
rescue
@logger.warn(["Trouble parsing beanstalk job",
{:error => $!, :body => job.body}])
@beanstalk.bury(job, 0)
end
end # @beanstalk.each_job
end # def register
end # class LogStash::Inputs::Beanstalk

View file

@ -0,0 +1,25 @@
require "logstash/outputs/base"
require "em-jack"
class LogStash::Outputs::Beanstalk < LogStash::Outputs::Base
def initialize(url, config={}, &block)
super
@ttr = @urlopts["ttr"] || 300;
if @url.path == "" or @url.path == "/"
raise "must specify a tube for beanstalk output"
end
end
def register
tube = @url.path[1..-1] # Skip leading '/'
port = @url.port || 11300
@beanstalk = EMJack::Connection.new(:host => @url.host,
:port => port,
:tube => tube)
end # def register
def receive(event)
@beanstalk.put(event.to_json, :ttr => @ttr)
end # def receive
end # class LogStash::Outputs::Beanstalk