From 434a633013b838ab2aa3964abf8786ddb860f3ee Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Thu, 10 Sep 2009 09:59:24 +0000 Subject: [PATCH] - hashbind is back! Dynamically generate methods that we would normally hand-code. This method is as fast as the hand-coded method and with less writing. - Make message registration easier with a simple 'register' function: class Foo < Message register end We dynamically generate the message 'type' by the class name. --- INSTALL | 5 +- bin/agent.rb | 41 +++++++------- bin/logstashd.rb | 24 +++++++-- lib/net/common.rb | 7 ++- lib/net/message.rb | 97 +++++++++++++++------------------- lib/net/messages/indexevent.rb | 42 ++------------- lib/net/messages/ping.rb | 17 ++---- lib/net/messages/search.rb | 21 ++------ lib/net/servers/indexer.rb | 8 ++- lib/net/socket.rb | 5 +- 10 files changed, 107 insertions(+), 160 deletions(-) diff --git a/INSTALL b/INSTALL index 6c11938cf..2ca98c9ca 100644 --- a/INSTALL +++ b/INSTALL @@ -3,15 +3,12 @@ required gems: - json - ferret - file-tail -- eventmachine - +- stomp This code is not beta, not alpha, but like something unnamed where only three people in the world have gotten it to run. YMMV. Expect much debuggery. - - # Prereqs: These instructions assume you are on a relatively recent linux system, that you have a working (C) build environment, diff --git a/bin/agent.rb b/bin/agent.rb index 47e596c73..102ba4103 100755 --- a/bin/agent.rb +++ b/bin/agent.rb @@ -1,8 +1,9 @@ - #!/usr/bin/env ruby +#!/usr/bin/env ruby require 'rubygems' require 'lib/net/client' require 'lib/net/messages/indexevent' +require 'lib/net/messages/quit' require 'lib/file/tail/since' require 'stomp' require 'socket' @@ -16,26 +17,25 @@ class Agent < LogStash::Net::MessageClient end # def initialize def start_log_watcher - @t1 = Thread.new do - File::Tail::Since.new("/b/logs/auth.log.scorn").tail do |line| - line.chomp! - index("linux-syslog", line) - end - end - - #@t2 = Thread.new do - #File::Tail::Since.new("/b/access.10k").tail do |line| - #count = 0 + #@t1 = Thread.new do + #File::Tail::Since.new("/b/logs/auth.log.scorn").tail do |line| #line.chomp! - #count += 1 - ##if count % 1000 == 0 - ##sleep 1 - #puts count - #end - #index("httpd-access", line) - ##break if count >= 1 + #index("linux-syslog", line) #end #end + + @t2 = Thread.new do + count = 0 + #File::Tail::Since.new("/b/access.100").tail do |line| + File.open("/b/access.10k").each do |line| + line.chomp! + count += 1 + index("httpd-access", line) + puts count + break if count >= 10 + end + sendmsg("/queue/logstash", LogStash::Net::Messages::QuitRequest.new) + end end # def start_log_watcher def index(type, string) @@ -49,8 +49,9 @@ class Agent < LogStash::Net::MessageClient end # def index def IndexEventResponseHandler(msg) - return if msg.code == 0 - puts msg.inspect + if msg.code != 0 + puts msg.inspect + end end # def IndexEventResponseHandler def run diff --git a/bin/logstashd.rb b/bin/logstashd.rb index 8fa4c3987..38c59ec60 100644 --- a/bin/logstashd.rb +++ b/bin/logstashd.rb @@ -1,10 +1,26 @@ #!/usr/bin/env ruby -# -require "rubygems" -require "lib/net/servers/indexer" +require 'rubygems' +require 'lib/net/servers/indexer' +if ENV.has_key?("PROFILE") + require 'ruby-prof' + RubyProf.start + + #class String + #alias_method :orig_scan, :scan + #def scan(*args) + ##raise + #return orig_scan(*args) + #end + #end +end Thread::abort_on_exception = true -s = LogStash::Net::Servers::Indexer.new(host="snack.home") +s = LogStash::Net::Servers::Indexer.new(username='', password='', host="localhost") s.run +if ENV.has_key?("PROFILE") + result = RubyProf.stop + printer = RubyProf::FlatPrinter.new(result) + printer.print(STDOUT, 0) +end diff --git a/lib/net/common.rb b/lib/net/common.rb index 48a6a4d64..f519afac5 100644 --- a/lib/net/common.rb +++ b/lib/net/common.rb @@ -1,4 +1,5 @@ require 'zlib' +require 'rubygems' require 'eventmachine' module LogStash; module Net; @@ -39,9 +40,7 @@ class String end end end - - end - + end # def strip_upper_ascii end # class String # EventMachine uses ruby1.8 (not in 1.9) function Thread#kill!, @@ -49,7 +48,7 @@ end # class String class Thread def kill!(*args) kill - end + end # def kill! end if ENV.has_key?("USE_EPOLL") diff --git a/lib/net/message.rb b/lib/net/message.rb index 1d1026f27..c8cbb7ed4 100644 --- a/lib/net/message.rb +++ b/lib/net/message.rb @@ -1,32 +1,24 @@ -require "json" +require 'json' +require 'set' # vim macro to replace 'hashbind :foo, "bar"' with two methods. # yypkct:def lxf,s return @data[A]oend def Jdt:xf,s(val) return @data[A] = val end module BindToHash def hashbind(method, key) - self.class_eval do - define_method(method) { BindToHash.hashpath_get(@data, key) } - define_method("#{method}=") { |v| BindToHash.hashpath_set(@data, key, v) } - end + hashpath = BindToHash.genhashpath(key) + self.class_eval %( + def #{method} + return #{hashpath} + end + def #{method}=(val) + #{hashpath} = val + end + ) end - def self.hashpath_get(data, key) - elements = key.split("/") - elements[0..-2].each do |k| - next if k == "" - data = data[k] - end - return data[elements[-1]] - end - - def self.hashpath_set(data, key, value) - elements = key.split("/") - elements[0..-2].each do |k| - next if k == "" - data = data[k] - end - - data[elements[-1]] = value + def self.genhashpath(key) + path = key.split("/").select { |x| x.length > 0 }.map { |x| "[#{x.inspect}]" } + return "@data#{path.join("")}" end end # modules BindToHash @@ -37,8 +29,8 @@ module LogStash; module Net extend BindToHash attr_accessor :data - # Message ID sequence number - @@translators = Array.new + # list of class instances that can identify messages + @@translators = Hash.new # Message attributes def id @@ -56,18 +48,29 @@ module LogStash; module Net return @@translators end + def self.register + name = self.name.split(":")[-1] + self.class_eval %( + def _name + return "#{name}" + end + ) + puts "Register #{name}" + @@translators[name] = self + end + def initialize @data = Hash.new end def self.new_from_data(data) obj = nil - @@translators.each do |klass| - if klass.can_process?(data) - obj = klass.new - end - end - if !obj + #@@translators.each do |translator| + name = data["type"] + if @@translators.has_key?(name) + obj = @@translators[name].new + else + $stderr.puts "No translator found for #{name} / #{data.inspect}" obj = Message.new end obj.data = data @@ -85,19 +88,11 @@ module LogStash; module Net class RequestMessage < Message @@idseq = 0 - def initialize - super - generate_id! - end - - Message.translators << self - def self.can_process?(data) - return data.has_key?("request") - end - def initialize super self.args = Hash.new + self.name = self._name + generate_id! end def generate_id! @@ -107,36 +102,28 @@ module LogStash; module Net # Message attributes def name - return @data["request"] + return @data["type"] end def name=(val) - return @data["request"] = val + return @data["type"] = val end def args + #puts "args: #{@data["args"].inspect}" return @data["args"] end def args=(val) + #if val == nil + #puts caller.join("\n") + #end return @data["args"] = val end end # class RequestMessage class ResponseMessage < RequestMessage - Message.translators << self - def self.can_process?(data) - return data.has_key?("response") - end - - # Message attributes - def name - return @data["response"] - end - - def name=(val) - return @data["response"] = val - end + #Message.translators << self # Report the success of the request this response is for. # Should be implemented by subclasses diff --git a/lib/net/messages/indexevent.rb b/lib/net/messages/indexevent.rb index 1ec02ea75..cb4b4b135 100644 --- a/lib/net/messages/indexevent.rb +++ b/lib/net/messages/indexevent.rb @@ -3,52 +3,20 @@ require "lib/net/message" module LogStash; module Net; module Messages class IndexEventRequest < RequestMessage - Message.translators << self - def self.can_process?(data) - return (super(data) and data["request"] == "IndexEvent") - end + register def initialize super - self.name = "IndexEvent" self.metadata = Hash.new end - # Message attributes - def log_type - return @data["args"]["type"] - end - - def log_type=(val) - return @data["args"]["type"] = val - end - def log_data - return @data["args"]["message"] - end - - def log_data=(val) - return @data["args"]["message"] = val - end - - def metadata - return @data["args"]["metadata"] - end - - def metadata=(val) - return @data["args"]["metadata"] = val - end + hashbind :log_type, "/args/type" + hashbind :log_data, "/args/message" + hashbind :metadata, "/args/metadata" end # class IndexEventRequest class IndexEventResponse < ResponseMessage - Message.translators << self - def self.can_process?(data) - return (super(data) and data["response"] == "IndexEvent") - end - - def initialize - super - self.name = "IndexEvent" - end + register # Message attributes hashbind :code, "/args/code" diff --git a/lib/net/messages/ping.rb b/lib/net/messages/ping.rb index 708a3283c..ec91e1538 100644 --- a/lib/net/messages/ping.rb +++ b/lib/net/messages/ping.rb @@ -3,31 +3,20 @@ require "lib/net/message" module LogStash; module Net; module Messages class PingRequest < RequestMessage - Message.translators << self - def self.can_process?(data) - return (super(data) and data["request"] == "Ping") - end + register def initialize super - self.name = "Ping" self.pingdata = Time.now.to_f end # Message attributes hashbind :pingdata, "/args/pingdata" + end # class PingRequest class PingResponse < ResponseMessage - Message.translators << self - def self.can_process?(data) - return (super(data) and data["response"] == "Ping") - end - - def initialize - super - self.name = "Ping" - end + register # Message attributes hashbind :pingdata, "/args/pingdata" diff --git a/lib/net/messages/search.rb b/lib/net/messages/search.rb index 5c4951c71..0a0fa89bd 100644 --- a/lib/net/messages/search.rb +++ b/lib/net/messages/search.rb @@ -2,15 +2,7 @@ require "lib/net/message" module LogStash; module Net; module Messages class SearchRequest < RequestMessage - Message.translators << self - def self.can_process?(data) - return (super(data) and data["request"] == "Search") - end - - def initialize - super - self.name = "Search" - end + register # Message attributes hashbind :query, "/args/query" @@ -20,18 +12,11 @@ module LogStash; module Net; module Messages end # class SearchRequest class SearchResponse < ResponseMessage - Message.translators << self - def self.can_process?(data) - return (super(data) and data["response"] == "Search") - end - - def initialize - super - self.name = "Search" - end + register # Message attributes hashbind :results, "/args/results" hashbind :finished, "/args/finished" + end # class SearchResponse end; end; end # module LogStash::Net::Messages diff --git a/lib/net/servers/indexer.rb b/lib/net/servers/indexer.rb index 5a337fc5b..72c49846f 100644 --- a/lib/net/servers/indexer.rb +++ b/lib/net/servers/indexer.rb @@ -5,6 +5,7 @@ require 'lib/net/message' require 'lib/net/messages/indexevent' require 'lib/net/messages/search' require 'lib/net/messages/searchhits' +require 'lib/net/messages/quit' require 'lib/net/messages/ping' require 'ferret' require 'lib/log/text' @@ -23,12 +24,17 @@ module LogStash; module Net; module Servers @starttime = Time.now end + def QuitRequestHandler(request) + puts "Got quit message, exiting..." + close + end + def IndexEventRequestHandler(request) response = LogStash::Net::Messages::IndexEventResponse.new response.id = request.id @indexcount += 1 - if @indexcount % 100 == 0 + if @indexcount % 10 == 0 duration = (Time.now.to_f - @starttime.to_f) puts "%.2f" % (@indexcount / duration) end diff --git a/lib/net/socket.rb b/lib/net/socket.rb index 28fd336f8..bed1f69a6 100644 --- a/lib/net/socket.rb +++ b/lib/net/socket.rb @@ -1,11 +1,10 @@ require 'rubygems' require 'lib/net/stats' require 'lib/net/messagepacket' -#require 'eventmachine' require 'uuid' require 'stomp' -USE_MARSHAL = false +USE_MARSHAL = ENV.has_key?("USE_MARSHAL") module LogStash; module Net # The MessageClient class exists only as an alias @@ -124,7 +123,7 @@ module LogStash; module Net end def handler=(handler) - puts "Setting handler to #{handler.class.name}" + #puts "Setting handler to #{handler.class.name}" @handler = handler end