- Add new indexing that uses TokyoCabinet table db for storage.

This commit is contained in:
Jordan Sissel 2010-04-14 07:44:15 +00:00
parent cbfa08081d
commit 5c9922be05
5 changed files with 197 additions and 5 deletions

97
lib/db/index.rb Normal file
View file

@ -0,0 +1,97 @@
require 'rubygems' if __FILE__ == $0
require 'tokyocabinet'
require 'ap'
module LogStash; module DB;
class Index
def initialize(path)
@tdb = TokyoCabinet::TDB::new
@path = path
open_db
end # def initialize
private
def open_db
ret = @tdb.open(@path, TokyoCabinet::TDB::OWRITER \
| TokyoCabinet::TDB::OCREAT | TokyoCabinet::TDB::ONOLCK)
@tdb.setindex("@DATE", TokyoCabinet::TDB::ITDECIMAL)
if !ret
ecode = @tdb.ecode
STDERR.puts("open error: #{@tdb.errmsg(ecode)}")
end
end
public
def index(data)
key = @tdb.genuid
ret = @tdb.put(key, data)
if !ret
ecode = @tdb.ecode
STDERR.puts("open error: #{@tdb.errmsg(ecode)}")
end
end
public
def close
@tdb.close
end
end # class Index
end; end # module LogStash::DB
class LogStash::DB::IndexReader
def initialize(path)
@tdb = TokyoCabinet::TDB::new
@path = path
open_db
end # def initialize
private
def open_db
ret = @tdb.open(@path, TokyoCabinet::TDB::OREADER | TokyoCabinet::TDB::ONOLCK)
if !ret
ecode = @tdb.ecode
STDERR.puts("open error: #{@tdb.errmsg(ecode)}")
end
end
public
def each
@tdb.iterinit
while ((key = @tdb.iternext()) != nil)
yield key, @tdb.get(key)
end
end
public
def search(conditions)
query = TokyoCabinet::TDBQRY.new(@tdb)
conditions.each do |key, value|
query.addcond(key, TokyoCabinet::TDBQRY::QCSTREQ, value)
end
results = query.search
results.each do |key|
data = @tdb.get(key)
yield key, data
end
end
end
if __FILE__ == $0
i = LogStash::DB::IndexReader.new(ARGV[0])
qargs = ARGV[1..-1]
#i.each do |key, val|
#ap [key, val]
#end
query = {}
qargs.each do |arg|
key, val = arg.split(":", 2)
query[key] = val
end
ap query
i.search(query) do |key, value|
ap [key, value["@DATE"], value["@LINE"]]
end
end

60
lib/db/indexreader.rb Normal file
View file

@ -0,0 +1,60 @@
require 'rubygems' if __FILE__ == $0
require 'tokyocabinet'
require 'ap'
module LogStash; module DB;
class IndexReader
def initialize(path)
@tdb = TokyoCabinet::TDB::new
@path = path
open_db
end # def initialize
private
def open_db
ret = @tdb.open(@path, TokyoCabinet::TDB::OREADER | TokyoCabinet::TDB::ONOLCK)
if !ret
ecode = @tdb.ecode
STDERR.puts("open error: #{@tdb.errmsg(ecode)}")
end
end
public
def each
@tdb.iterinit
while ((key = @tdb.iternext()) != nil)
yield key, @tdb.get(key)
end
end
public
def search(conditions)
query = TokyoCabinet::TDBQRY.new(@tdb)
conditions.each do |key, value|
#query.addcond(key, TokyoCabinet::TDBQRY::QCSTREQ, value)
query.addcond(key, TokyoCabinet::TDBQRY::QCSTRINC, value)
end
query.setorder("@DATE", TDBQRY::QONUMASC);
results = query.search
results.each do |key|
data = @tdb.get(key)
yield key, data
end
end
end # class LogStash::DB::IndexReader
end; end # module LogStash::DB
if __FILE__ == $0
i = LogStash::DB::IndexReader.new(ARGV[0])
qargs = ARGV[1..-1]
query = {}
qargs.each do |arg|
key, val = arg.split(":", 2)
query[key] = val
end
ap query
i.search(query) do |key, value|
ap [key, value["@DATE"], value["@LINE"]]
end
end

View file

@ -88,7 +88,7 @@ module LogStash
end
end
time ||= DateTime.now
res["@DATE"] = time.strftime("%s")
res["@DATE"] = time.strftime("%s").to_i
return res
end

View file

@ -1,6 +1,6 @@
require 'lib/config/agent'
require 'lib/db/index'
require 'lib/program'
require 'lib/file/tail'
require 'grok'
require 'set'
require 'ap'
@ -8,6 +8,7 @@ require 'socket' # for Socket.gethostname
require 'eventmachine'
require 'eventmachine-tail'
AMOUNT = 500
class GrokReader < EventMachine::FileTail
def initialize(path, agent)
super(path)
@ -30,9 +31,13 @@ module LogStash; module Programs;
super(options)
@config = LogStash::Config::AgentConfig.new(options[:config])
@config.merge!(options)
@indexes = Hash.new { |h,k| h[k] = @config.logs[k].get_index }
#@indexes = Hash.new { |h,k| h[k] = @config.logs[k].get_index }
@index = LogStash::DB::Index.new(@config.logstash_dir + "/index.tct")
@hostname = Socket.gethostname
@needs_flushing = Set.new
@count = 0
@start = Time.now
end
public
@ -50,26 +55,55 @@ module LogStash; module Programs;
public
def process(path, line)
matched = false
@config.logs.each do |name, log|
begin
entry = log.parse_entry(line)
if entry
entry["@SOURCE_FILE"] = path
entry["@SOURCE_HOST"] = @hostname
puts "match #{name} in #{path}: #{line}"
matched = true
#ap entry
index(name, entry)
break
end
rescue LogStash::Log::LogParseError => e
# ignore
end
end # @logs.each
end # @config.logs.each
if !matched
puts "nomatch in #{path}: #{line}"
end
end # def process
private
def index(name, entry)
logstash_index(name, entry)
end
def logstash_index(name, entry)
@index.index(entry)
@count += 1
if @count % AMOUNT == 0
#flush_indexes
#puts "match #{name} in #{path}: #{line}"
puts "count: #{@count} #{AMOUNT / (Time.now - @start)}"
@start = Time.now
end
end
def ferret_index(name, entry)
@indexes[name] << entry
@needs_flushing << name
@count += 1
if @count % AMOUNT == 0
#flush_indexes
#puts "match #{name} in #{path}: #{line}"
puts "count: #{@count} #{AMOUNT / (Time.now - @start)}"
@start = Time.now
end
end
private

View file

@ -2,6 +2,7 @@
watch:
- /var/log/messages
- /var/log/*.log
- /b/logs/*
logstash_dir: /c/logstash
pattern_dir: /c/logstash/patterns