- Got things happy using STOMP

This commit is contained in:
Jordan Sissel 2009-09-06 22:13:06 +00:00
parent 42b767fc85
commit 1e33e3af19
4 changed files with 50 additions and 78 deletions

View file

@ -4,36 +4,39 @@ require 'rubygems'
require 'lib/net/client' require 'lib/net/client'
require 'lib/net/messages/indexevent' require 'lib/net/messages/indexevent'
require 'lib/file/tail/since' require 'lib/file/tail/since'
require 'stomp'
require 'socket' require 'socket'
class Agent < LogStash::Net::MessageClient class Agent < LogStash::Net::MessageClient
def initialize(host, port) def initialize(host, port)
super(host, port) super(username="", password="", host=host, port=port)
@hostname = Socket.gethostname @hostname = Socket.gethostname
@host = host
@port = port
@watcher = nil
# TODO(sissel): This should go into the network code
@needack = Hash.new
start_log_watcher
end # def initialize end # def initialize
def start_log_watcher def start_log_watcher
@t1 = Thread.new do #@t1 = Thread.new do
File::Tail::Since.new("/var/log/messages").tail do |line| #File::Tail::Since.new("/var/log/messages").tail do |line|
line.chomp! #line.chomp!
index("linux-syslog", line) #index("linux-syslog", line)
end #end
end ##end
@t2 = Thread.new do @t2 = Thread.new do
File::Tail::Since.new("/b/access").tail do |line| #File::Tail::Since.new("/b/access.10").tail do |line|
line.chomp! begin
index("httpd-access", line) count = 0
File.open("/b/access.1k").readlines.each do |line|
line.chomp!
index("httpd-access", line)
count += 1
break if count >= 3
end
rescue => e
$stderr.puts e.inspect
$stderr.puts caller.join("\n")
raise e
end end
exit
end end
end # def start_log_watcher end # def start_log_watcher
@ -43,23 +46,19 @@ class Agent < LogStash::Net::MessageClient
ier.log_data = string ier.log_data = string
ier.metadata["source_host"] = @hostname ier.metadata["source_host"] = @hostname
#$stdout.write(".") puts "Sending: #{ier}"
#$stdout.flush sendmsg("/queue/logstash", ier)
@connection.sendmsg(ier)
@needack[ier.id] = ier
sleeptime = 0.1
while @needack.length > 500
sleeptime = [sleeptime * 2, 5].min
$stderr.puts "Waiting for acks (#{sleeptime})... #{@needack.length}"
sleep(sleeptime)
end
end # def index end # def index
def IndexEventResponseHandler(msg) def IndexEventResponseHandler(msg)
@needack.delete(msg.id) puts "OK"
end # def IndexEventResponseHandler end # def IndexEventResponseHandler
def run
start_log_watcher
@t2.join
super
end
end end
@ -70,8 +69,5 @@ if $0 == __FILE__
end end
host, port = ARGV[0].split(":") host, port = ARGV[0].split(":")
agent = Agent.new(host, port) agent = Agent.new(host, port)
agent.run
agent.run do |i|
# nothing
end
end end

View file

@ -4,7 +4,6 @@
require "rubygems" require "rubygems"
require "lib/net/servers/indexer" require "lib/net/servers/indexer"
s = LogStash::Net::Servers::Indexer.new s = LogStash::Net::Servers::Indexer.new(host="snack.home")
s.run do |i| s.run
puts "OK"
end

View file

@ -26,31 +26,24 @@ module LogStash; module Net
end # def initialize end # def initialize
def subscribe(name) def subscribe(name)
puts "Subscribing to #{name}" #puts "Subscribing to #{name}"
@stomp.subscribe("/queue/#{name}", @stomp_options) do |stompmsg| @stomp.subscribe("/queue/#{name}", @stomp_options) do |stompmsg|
obj = JSON::load(stompmsg.body) obj = JSON::load(stompmsg.body)
message = Message.new_from_data(obj) message = Message.new_from_data(obj)
name = message.class.name.split(":")[-1] name = message.class.name.split(":")[-1]
func = "#{name}Handler" func = "#{name}Handler"
puts stompmsg #puts stompmsg
if @handler.respond_to?(func) if @handler.respond_to?(func)
puts "Handler found" #puts "Handler found"
@handler.send(func, message) do |response| @handler.send(func, message) do |response|
puts "response: #{response}" #puts "response: #{response}"
sendmsg(stompmsg.headers["reply-to"], response) sendmsg(stompmsg.headers["reply-to"], response)
end end
# We should allow the message handler to defer acking if they want # We should allow the message handler to defer acking if they want
# For instance, if we want to index things, but only want to ack # For instance, if we want to index things, but only want to ack
# things once we actually flush to disk. # things once we actually flush to disk.
puts "Acking message: #{stompmsg}" @stomp.acknowledge stompmsg
begin
@stomp.acknowledge stompmsg
rescue => e
puts e.inspect
raise e
end
puts "Ack done"
else else
$stderr.puts "#{@handler.class.name} does not support #{func}" $stderr.puts "#{@handler.class.name} does not support #{func}"
end # if @handler.respond_to?(func) end # if @handler.respond_to?(func)
@ -66,7 +59,7 @@ module LogStash; module Net
options = { options = {
"persistent" => true, "persistent" => true,
"reply-to" => "/queue/#{@id}", "reply-to" => "/queue/#{@id}",
#"ack" => "client", "ack" => "client",
} }
@stomp.send(destination, data, options) @stomp.send(destination, data, options)
end end
@ -74,5 +67,9 @@ module LogStash; module Net
def handler=(handler) def handler=(handler)
@handler = handler @handler = handler
end end
def close
@stomp.close
end
end # class MessageSocket end # class MessageSocket
end; end # module LogStash::Net end; end # module LogStash::Net

View file

@ -3,7 +3,7 @@
require 'rubygems' require 'rubygems'
require "socket" require "socket"
require "lib/net/message" require "lib/net/message"
require "lib/net/socketmux" require "lib/net/client"
require "lib/net/messages/indexevent" require "lib/net/messages/indexevent"
require "lib/net/messages/search" require "lib/net/messages/search"
require "lib/net/messages/ping" require "lib/net/messages/ping"
@ -15,43 +15,23 @@ $count = 0
$time = 0 $time = 0
$start = Time.now.to_f $start = Time.now.to_f
class Client < LogStash::Net::MessageSocketMux class Client < LogStash::Net::MessageClient
def gotresponse(msg)
$count += 1
$ids.delete(msg.id)
if $done and $ids.length == 0
puts "All messages ACK'd (#{$lastid})"
exit(0)
end
end
def SearchResponseHandler(msg) def SearchResponseHandler(msg)
#puts "Response (have #{$count} / want: #{$ids.length} acks); #{msg.inspect}" #puts "Response (have #{$count} / want: #{$ids.length} acks); #{msg.inspect}"
msg.results.each do |result| msg.results.each do |result|
puts result puts result
end end
if msg.finished if msg.finished
$done = true close
end end
#gotresponse(msg)
end end
end end
$me = Client.new $me = Client.new(host="localhost", port=61613)
$me.connect("localhost", 3001)
$ids = Set.new
msg = LogStash::Net::Messages::SearchRequest.new msg = LogStash::Net::Messages::SearchRequest.new
msg.log_type = ARGV[0] msg.log_type = ARGV[0]
msg.query = ARGV[1] msg.query = ARGV[1]
$me.sendmsg("/queue/logstash", msg)
msg = LogStash::Net::Messages::SearchRequest.new $me.run
msg.log_type = ARGV[0]
msg.query = ARGV[1]
$me.sendmsg(msg)
$me.close()
while !$done
$me.sendrecv(10)
end