diff --git a/bin/agent.rb b/bin/agent.rb index 2ea351a59..53e529cc5 100755 --- a/bin/agent.rb +++ b/bin/agent.rb @@ -3,6 +3,7 @@ require 'rubygems' require 'lib/net/client' require 'lib/net/messages/indexevent' +require 'lib/file/tail/since' require 'socket' @@ -18,12 +19,10 @@ class Agent < LogStash::Net::MessageClient def start_log_watcher Thread.new do - IO.popen("tail -0f /var/log/messages", "r") do |fd| - fd.each do |line| - puts "Found line: #{line}" - line.chomp! - index(line) - end + File::Tail::Since.new("/var/log/messages").tail do |line| + line.chomp! + puts "Found line: #{line}" + index(line) end end end # def start_log_watcher @@ -55,7 +54,6 @@ class Agent < LogStash::Net::MessageClient def run loop do - puts @done done = false while !done begin diff --git a/lib/log.rb b/lib/log.rb index a2d3bb656..4246fc751 100644 --- a/lib/log.rb +++ b/lib/log.rb @@ -43,13 +43,13 @@ module LogStash end def index_dir - return "#{home}/var/indexes/#{@attrs["log:name"]}" + return "#{@home}/var/indexes/#{@attrs["log:name"]}" end def create_index return if File.exists?(index_dir) - field_info = Ferret::Index::FieldInfos.new(:store => :no, + field_infos = Ferret::Index::FieldInfos.new(:store => :no, :term_vector => :no) field_infos.add_field(:@LINE, :store => :compressed, diff --git a/lib/log/text.rb b/lib/log/text.rb index 3d211ea85..b540ffb38 100644 --- a/lib/log/text.rb +++ b/lib/log/text.rb @@ -22,7 +22,7 @@ module LogStash end @grok_patterns = config.delete(:grok_patterns) - @home = ENV["LOGSTASH_HOME"] || "/opt/logstash" + @home = ENV["LOGSTASH_DIR"] || "/opt/logstash" if not File.exists?("#{@home}/patterns") throw StandardError.new("#{@home}/patterns/ does not exist") diff --git a/lib/net/messages/search.rb b/lib/net/messages/search.rb index 334e30b95..1e4e59ab5 100644 --- a/lib/net/messages/search.rb +++ b/lib/net/messages/search.rb @@ -30,5 +30,6 @@ module LogStash; module Net; module Messages # Message attributes hashbind :results, "/args/results" + hashbind :finished, "/args/finished" end # class SearchResponse end; end; end # module LogStash::Net::Messages diff --git a/lib/net/servers/indexer.rb b/lib/net/servers/indexer.rb index be3719033..d53688ac9 100644 --- a/lib/net/servers/indexer.rb +++ b/lib/net/servers/indexer.rb @@ -36,7 +36,7 @@ module LogStash; module Net; module Servers else response.code = 0 if not @indexes.member?(log_type) - @indexes[log_type] = @logs[log_type].get_index + @indexes[log_type] = $logs[log_type].get_index end entry["@LOG_TYPE"] = log_type @@ -53,19 +53,16 @@ module LogStash; module Net; module Servers end def SearchRequestHandler(request) - response = LogStash::Net::Messages::SearchResponse.new - response.id = request.id puts "Search for #{request.query.inspect}" reader = Ferret::Index::IndexReader.new($logs[request.log_type].index_dir) search = Ferret::Search::Searcher.new(reader) - #puts reader.fields.join("\n") + puts reader.fields.join("\n") qp = Ferret::QueryParser.new(:fields => reader.fields, :tokenized_fields => reader.tokenized_fields, :or_default => false) query = qp.parse(request.query) - response.results = [] search.search_each(query, :limit => :all, :sort => "@DATE") do |docid, score| result = reader[docid][:@LINE] @@ -74,6 +71,11 @@ module LogStash; module Net; module Servers response.results = [result] yield response end + response = LogStash::Net::Messages::SearchResponse.new + response.id = request.id + response.results = [] + response.finished = true + yield response end # Special 'run' override because we want sync to disk once per minute. diff --git a/sandbox/searchclient.rb b/sandbox/searchclient.rb index 87fbbc7e3..13c4ae97f 100644 --- a/sandbox/searchclient.rb +++ b/sandbox/searchclient.rb @@ -31,6 +31,9 @@ class Client < LogStash::Net::MessageSocketMux msg.results.each do |result| puts result end + if msg.finished + $done = true + end #gotresponse(msg) end end @@ -47,7 +50,8 @@ msg = LogStash::Net::Messages::SearchRequest.new msg.log_type = ARGV[0] msg.query = ARGV[1] $me.sendmsg(msg) -$done = true $me.close() -$done = true -$me.run + +while !$done + $me.sendrecv(10) +end