- ruby-prof showed that hashbind was taking the most time, so convert away from it.

- make it easy to try using ruby  marshal instead of json for wire format, for comparing speed.
- Set Thread::abort_on_exception so when threads die due to simple syntax or
  name errors they don't die silently.
- Add basic search client
This commit is contained in:
Jordan Sissel 2009-09-08 06:05:09 +00:00
parent f55513ba12
commit d6cf0f34fb
8 changed files with 160 additions and 110 deletions

View file

@ -17,30 +17,25 @@ class Agent < LogStash::Net::MessageClient
def start_log_watcher
#@t1 = Thread.new do
#File::Tail::Since.new("/var/log/messages").tail do |line|
#File::Tail::Since.new("/b/logs/auth.log.scorn").tail do |line|
#line.chomp!
#index("linux-syslog", line)
#end
##end
#end
@t2 = Thread.new do
#File::Tail::Since.new("/b/access.10").tail do |line|
begin
File::Tail::Since.new("/b/access.10k").tail do |line|
count = 0
File.open("/b/access.1k").readlines.each do |line|
line.chomp!
index("httpd-access", line)
count += 1
#break if count >= 3
line.chomp!
count += 1
if count % 1000 == 0
#sleep 1
puts count
end
rescue => e
$stderr.puts e.inspect
$stderr.puts caller.join("\n")
raise e
index("httpd-access", line)
#break if count >= 1
end
#close
end
@t2.join
end # def start_log_watcher
def index(type, string)
@ -49,11 +44,13 @@ class Agent < LogStash::Net::MessageClient
ier.log_data = string
ier.metadata["source_host"] = @hostname
puts "Sending: #{ier}"
#puts "Sending: #{ier}"
sendmsg("/queue/logstash", ier)
end # def index
def IndexEventResponseHandler(msg)
return if msg.code == 0
puts msg.inspect
end # def IndexEventResponseHandler
def run
@ -68,6 +65,7 @@ if $0 == __FILE__
puts "Usage: #{$0} host:port"
exit 1
end
Thread::abort_on_exception = true
host, port = ARGV[0].split(":")
agent = Agent.new(host, port)
agent.run

View file

@ -4,6 +4,7 @@
require "rubygems"
require "lib/net/servers/indexer"
Thread::abort_on_exception = true
s = LogStash::Net::Servers::Indexer.new(host="snack.home")
s.run

48
bin/search.rb Executable file → Normal file
View file

@ -1,18 +1,40 @@
#!/usr/bin/ruby
#
require 'rubygems'
require 'ferret'
require 'json'
require "socket"
require "lib/net/message"
require "lib/net/client"
require "lib/net/messages/indexevent"
require "lib/net/messages/search"
require "lib/net/messages/ping"
require "set"
include Ferret
include Ferret::Search
$done = false
$lastid = nil
$count = 0
$time = 0
$start = Time.now.to_f
reader = Index::IndexReader.new(ARGV[0])
search = Searcher.new(reader)
qp = QueryParser.new(:fields => reader.fields,
:tokenized_fields => reader.tokenized_fields,
:or_default => false)
query = qp.parse(ARGV[1])
search.search_each(query, :limit => :all, :sort => "@DATE") do |id, score|
puts "#{reader[id][:@SOURCE_HOST]} | #{reader[id][:@LOG_NAME]} | #{reader[id][:@LINE]}"
class Client < LogStash::Net::MessageClient
def SearchResponseHandler(msg)
msg.results.each do |result|
puts result
end
if msg.finished
close
end
end
end
def main(args)
client = Client.new(host="localhost", port=61613)
msg = LogStash::Net::Messages::SearchRequest.new
msg.log_type = args[0]
msg.query = args[1]
client.sendmsg("/queue/logstash", msg)
client.run
end
if $0 == __FILE__
exit main(ARGV)
end

View file

@ -1,5 +1,6 @@
require "json"
#require "lib/net/messagestream"
# vim macro to replace 'hashbind :foo, "bar"' with two methods.
# yypkct:def lxf,s return @data[A]oend def Jdt:xf,s(val) return @data[A] = val end
module BindToHash
def hashbind(method, key)
@ -40,7 +41,13 @@ module LogStash; module Net
@@translators = Array.new
# Message attributes
hashbind :id, "/id"
def id
return @data["id"]
end
def id=(val)
return @data["id"] = val
end
# All message subclasses should register themselves here
# This will allow Message.new_from_data to automatically return
@ -78,6 +85,11 @@ module LogStash; module Net
class RequestMessage < Message
@@idseq = 0
def initialize
super
generate_id!
end
Message.translators << self
def self.can_process?(data)
return data.has_key?("request")
@ -94,8 +106,21 @@ module LogStash; module Net
end
# Message attributes
hashbind :name, "/request"
hashbind :args, "/args"
def name
return @data["request"]
end
def name=(val)
return @data["request"] = val
end
def args
return @data["args"]
end
def args=(val)
return @data["args"] = val
end
end # class RequestMessage
class ResponseMessage < RequestMessage
@ -105,7 +130,13 @@ module LogStash; module Net
end
# Message attributes
hashbind :name, "/response"
def name
return @data["response"]
end
def name=(val)
return @data["response"] = val
end
# Report the success of the request this response is for.
# Should be implemented by subclasses

View file

@ -15,9 +15,28 @@ module LogStash; module Net; module Messages
end
# Message attributes
hashbind :log_type, "/args/type"
hashbind :log_data, "/args/message"
hashbind :metadata, "/args/metadata"
def log_type
return @data["args"]["type"]
end
def log_type=(val)
return @data["args"]["type"] = val
end
def log_data
return @data["args"]["message"]
end
def log_data=(val)
return @data["args"]["message"] = val
end
def metadata
return @data["args"]["metadata"]
end
def metadata=(val)
return @data["args"]["metadata"] = val
end
end # class IndexEventRequest
class IndexEventResponse < ResponseMessage

View file

@ -9,10 +9,9 @@ require 'ferret'
require 'lib/log/text'
require 'config'
module LogStash; module Net; module Servers
class Indexer < LogStash::Net::MessageServer
SYNCDELAY = 10
SYNCDELAY = 3
def initialize(*args)
# 'super' is not the same as 'super()', and we want super().
@ -23,11 +22,6 @@ module LogStash; module Net; module Servers
@starttime = Time.now
end
def run
subscribe("logstash")
super
end
def IndexEventRequestHandler(request)
response = LogStash::Net::Messages::IndexEventResponse.new
response.id = request.id
@ -43,16 +37,24 @@ module LogStash; module Net; module Servers
if !entry
response.code = 1
response.error = "Entry was #{entry.inspect} (log parsing failed)"
entry = {
"@NEEDSPARSING" => 1,
"@LINE" => request.log_data
}
else
response.code = 0
if not @indexes.member?(log_type)
@indexes[log_type] = $logs[log_type].get_index
end
entry["@LOG_TYPE"] = log_type
@indexes[log_type] << entry
end
yield response
if not @indexes.member?(log_type)
@indexes[log_type] = $logs[log_type].get_index
end
entry["@LOG_TYPE"] = log_type
@indexes[log_type] << entry
if response.code != 0
yield response
end
end
def PingRequestHandler(request)
@ -103,6 +105,7 @@ module LogStash; module Net; module Servers
response = LogStash::Net::Messages::SearchResponse.new
response.id = request.id
response.results = results
response.finished = false
yield response
results = []
offset += limit
@ -115,29 +118,25 @@ module LogStash; module Net; module Servers
end
# Special 'run' override because we want sync to disk once per minute.
def _run
synctime = Time.now + SYNCDELAY
sleeptime = 1
loop do
active = sendrecv(sleeptime)
if !active
sleeptime *= 2
if sleeptime > SYNCDELAY
sleeptime = SYNCDELAY
end
puts "No activity, sleeping for #{sleeptime}"
end
def run
subscribe("logstash")
@syncer = Thread.new { syncer }
super
end # def run
def syncer
synctime = Time.now + SYNCDELAY
loop do
if Time.now > synctime
@indexes.each do |log_type,index|
puts "Time's up. Syncing #{log_type}"
index.commit
end
synctime = Time.now + 60
synctime = Time.now + SYNCDELAY
end
sleep(synctime - Time.now)
end
end # def run
end # def syncer
end # Indexer
end; end; end # LogStash::Net::Server

View file

@ -5,11 +5,14 @@ require 'lib/net/messagepacket'
require 'uuid'
require 'stomp'
USE_MARSHAL = false
module LogStash; module Net
# The MessageClient class exists only as an alias
# to the MessageSocketMux. You should use the
# client class if you are implementing a client.
class MessageSocket
MAXBUF = 30
def initialize(username='', password='', host='localhost', port=61613)
@stomp = Stomp::Client.new(login=username, passcode=password,
@ -34,15 +37,22 @@ module LogStash; module Net
end # def subscribe
def handle_message(stompmsg)
obj = JSON::load(stompmsg.body)
if !obj.is_a?(Array)
obj = [obj]
if USE_MARSHAL
obj = Marshal.load(stompmsg.body)
else
obj = JSON::load(stompmsg.body)
if !obj.is_a?(Array)
obj = [obj]
end
end
#puts "Got #{obj.length} items"
obj.each do |item|
#puts item.inspect
message = Message.new_from_data(item)
if USE_MARSHAL
message = item
else
message = Message.new_from_data(item)
end
name = message.class.name.split(":")[-1]
func = "#{name}Handler"
#puts stompmsg
@ -83,7 +93,11 @@ module LogStash; module Net
msgs = @outbuffer[destination]
return if msgs.length == 0
data = msgs.to_json
if USE_MARSHAL
data = Marshal.dump(msgs)
else
data = msgs.to_json
end
options = {
"persistent" => true,
"reply-to" => "/queue/#{@id}",
@ -94,10 +108,13 @@ module LogStash; module Net
end
def sendmsg(destination, msg)
if msg.is_a?(RequestMessage)
msg.generate_id!
end
#puts "Sending to #{destination}: #{msg}"
@outbuffer[destination] << msg
if @outbuffer[destination].length > 10
if @outbuffer[destination].length > MAXBUF
flushout(destination)
end
end

View file

@ -1,37 +0,0 @@
#!/usr/bin/ruby
#
require 'rubygems'
require "socket"
require "lib/net/message"
require "lib/net/client"
require "lib/net/messages/indexevent"
require "lib/net/messages/search"
require "lib/net/messages/ping"
require "set"
$done = false
$lastid = nil
$count = 0
$time = 0
$start = Time.now.to_f
class Client < LogStash::Net::MessageClient
def SearchResponseHandler(msg)
#puts "Response (have #{$count} / want: #{$ids.length} acks); #{msg.inspect}"
msg.results.each do |result|
puts result
end
if msg.finished
close
end
end
end
$me = Client.new(host="localhost", port=61613)
msg = LogStash::Net::Messages::SearchRequest.new
msg.log_type = ARGV[0]
msg.query = ARGV[1]
$me.sendmsg("/queue/logstash", msg)
$me.run