From 96e4881de223bec943ca36be8b7862952e00b21c Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Mon, 17 Aug 2009 08:47:05 +0000 Subject: [PATCH] - Fix Log#index_dir - SYNCDELAY for indexing to 10 seconds, for debugging/testing. - Batch search results to 10 per message block --- bin/agent.rb | 18 +++++++++++++----- lib/log.rb | 2 +- lib/net/messagereader.rb | 3 +-- lib/net/messagestream.rb | 3 --- lib/net/servers/indexer.rb | 18 +++++++++++------- 5 files changed, 26 insertions(+), 18 deletions(-) diff --git a/bin/agent.rb b/bin/agent.rb index 53e529cc5..657238912 100755 --- a/bin/agent.rb +++ b/bin/agent.rb @@ -21,22 +21,30 @@ class Agent < LogStash::Net::MessageClient Thread.new do File::Tail::Since.new("/var/log/messages").tail do |line| line.chomp! - puts "Found line: #{line}" - index(line) + index("linux-syslog", line) + end + end + + Thread.new do + File::Tail::Since.new("/b/access").tail do |line| + line.chomp! + index("httpd-access", line) end end end # def start_log_watcher - def index(string) + def index(type, string) ier = LogStash::Net::Messages::IndexEventRequest.new - ier.log_type = "linux-syslog" + ier.log_type = type ier.log_data = string ier.metadata["source_host"] = @hostname sent = false while !sent begin - puts "Trying to send: #{ier.inspect}" + $stdout.write(".") + $stdout.flush + #puts "Trying to send: #{ier.inspect}" sendmsg(ier) sent = true rescue LogStash::Net::NoSocket diff --git a/lib/log.rb b/lib/log.rb index 4246fc751..1df7eb701 100644 --- a/lib/log.rb +++ b/lib/log.rb @@ -43,7 +43,7 @@ module LogStash end def index_dir - return "#{@home}/var/indexes/#{@attrs["log:name"]}" + return "#{@home}/var/indexes/#{@attrs["log:type"]}" end def create_index diff --git a/lib/net/messagereader.rb b/lib/net/messagereader.rb index 5465543f2..9f2b6c08c 100644 --- a/lib/net/messagereader.rb +++ b/lib/net/messagereader.rb @@ -12,7 +12,7 @@ module LogStash; module Net; @expected_checksum = checksum @data = data super("Corrupt message read. Expected checksum #{checksum}, got " + - "#{data.checksum} / #{data.inspect}") + "#{data.checksum}") end # def initialize end # class MessageReaderCorruptMessage @@ -87,7 +87,6 @@ module LogStash; module Net; if have >= need length, checksum, data = @buffer.unpack("NNA#{need - HEADERSIZE}") if data.checksum != checksum - puts @buffer.inspect[0..200] raise MessageReaderCorruptMessage.new(checksum, data) end return true diff --git a/lib/net/messagestream.rb b/lib/net/messagestream.rb index a6faddda4..0655c2f28 100644 --- a/lib/net/messagestream.rb +++ b/lib/net/messagestream.rb @@ -31,9 +31,6 @@ module LogStash; module Net data = self.encode puts "Writing #{data.length} bytes to #{sock}" bytestream = [data.length, data.checksum, data].pack("NNA*") - if bytestream.length < 100 - puts bytestream.inspect - end sock.write(bytestream) self.clear end # def sendto diff --git a/lib/net/servers/indexer.rb b/lib/net/servers/indexer.rb index d53688ac9..8723fa889 100644 --- a/lib/net/servers/indexer.rb +++ b/lib/net/servers/indexer.rb @@ -13,7 +13,7 @@ require 'config' module LogStash; module Net; module Servers class Indexer < LogStash::Net::MessageServer - SYNCDELAY = 60 + SYNCDELAY = 10 def initialize(addr="0.0.0.0", port=3001) # 'super' is not the same as 'super()', and we want super(). @@ -26,7 +26,6 @@ module LogStash; module Net; module Servers def IndexEventRequestHandler(request) response = LogStash::Net::Messages::IndexEventResponse.new response.id = request.id - puts request.inspect log_type = request.log_type entry = $logs[log_type].parse_entry(request.log_data) @@ -63,17 +62,22 @@ module LogStash; module Net; module Servers :tokenized_fields => reader.tokenized_fields, :or_default => false) query = qp.parse(request.query) + results = [] search.search_each(query, :limit => :all, :sort => "@DATE") do |docid, score| result = reader[docid][:@LINE] - response = LogStash::Net::Messages::SearchResponse.new - response.id = request.id - response.results = [result] - yield response + results << result + if results.length > 10 + response = LogStash::Net::Messages::SearchResponse.new + response.id = request.id + response.results = results + yield response + results = [] + end end response = LogStash::Net::Messages::SearchResponse.new response.id = request.id - response.results = [] + response.results = results response.finished = true yield response end