mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- Purge post-refactor. The older stuff can be found in /tags/pre-agent-refactor
This commit is contained in:
parent
791f15f633
commit
4684f65995
31 changed files with 3 additions and 1847 deletions
|
@ -1,45 +0,0 @@
|
|||
#!/usr/bin/ruby
|
||||
#
|
||||
require 'rubygems'
|
||||
require 'socket'
|
||||
require 'lib/net/messages/logkeys'
|
||||
require 'mqrpc'
|
||||
|
||||
Thread::abort_on_exception = true
|
||||
|
||||
class LogKeysClient < MQRPC::Agent
|
||||
attr_reader :keys
|
||||
|
||||
def initialize(opts)
|
||||
@log_type = opts[:log_type]
|
||||
@keys = []
|
||||
super(opts)
|
||||
start
|
||||
end
|
||||
|
||||
def start
|
||||
msg = LogStash::Net::Messages::LogKeysRequest.new
|
||||
msg.log_type = @log_type
|
||||
sendmsg("logstash-index", msg)
|
||||
run
|
||||
end
|
||||
|
||||
def LogKeysResponseHandler(msg)
|
||||
@keys = msg.keys
|
||||
close
|
||||
end
|
||||
end
|
||||
|
||||
def main(args)
|
||||
client = LogKeysClient.new(:log_type => args[0], :host => "localhost")
|
||||
|
||||
# Collate & print results.
|
||||
puts "Log keys for #{args[0]}:"
|
||||
client.keys.each { |t| puts " - #{t}" }
|
||||
|
||||
return 0
|
||||
end
|
||||
|
||||
if $0 == __FILE__
|
||||
exit main(ARGV)
|
||||
end
|
|
@ -1,43 +0,0 @@
|
|||
#!/usr/bin/ruby
|
||||
#
|
||||
require 'rubygems'
|
||||
require "socket"
|
||||
require 'lib/net/messages/logtypes'
|
||||
require 'mqprc'
|
||||
|
||||
Thread::abort_on_exception = true
|
||||
|
||||
class LogTypesClient < MQPRC::Agent
|
||||
attr_reader :types
|
||||
|
||||
def initialize(opts)
|
||||
@types = []
|
||||
super(opts)
|
||||
start
|
||||
end
|
||||
|
||||
def start
|
||||
msg = LogStash::Net::Messages::LogTypesRequest.new
|
||||
sendmsg("logstash-index", msg)
|
||||
run
|
||||
end
|
||||
|
||||
def LogTypesResponseHandler(msg)
|
||||
@types = msg.types
|
||||
close
|
||||
end
|
||||
end
|
||||
|
||||
def main(args)
|
||||
client = LogTypesClient.new(:host => "localhost")
|
||||
|
||||
# Collate & print results.
|
||||
puts "Log types:"
|
||||
client.types.each { |t| puts " - #{t}" }
|
||||
|
||||
return 0
|
||||
end
|
||||
|
||||
if $0 == __FILE__
|
||||
exit main(ARGV)
|
||||
end
|
|
@ -1,69 +0,0 @@
|
|||
#!/usr/bin/env ruby
|
||||
|
||||
$: << File.join(File.dirname(__FILE__), "..")
|
||||
|
||||
require 'rubygems'
|
||||
require 'lib/programs/agent'
|
||||
require 'lib/program'
|
||||
require 'logger'
|
||||
require 'optparse'
|
||||
|
||||
$progname = File.basename($0)
|
||||
$version = "0.3"
|
||||
$logger = Logger.new(STDOUT)
|
||||
$logger.level = Logger::INFO
|
||||
$logger.progname = $progname
|
||||
$logger.datetime_format = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
def main(args)
|
||||
options = parse_options(args)
|
||||
return LogStash::Programs::Agent.new(options).run
|
||||
end # def main
|
||||
|
||||
def parse_options(args)
|
||||
options = {:daemonize => true,
|
||||
:logfile => nil,
|
||||
:pidfile => nil,
|
||||
:config => nil,
|
||||
}
|
||||
|
||||
opts = OptionParser.new do |opts|
|
||||
opts.banner = "Usage: logstash-agent [options] configfile"
|
||||
opts.version = $version
|
||||
|
||||
opts.on("-d", "--debug", "Enable debug output") do |x|
|
||||
$logger.level = Logger::DEBUG
|
||||
end
|
||||
|
||||
opts.on("--pidfile FILE", "Path to pidfile") do |x|
|
||||
options[:pidfile] = x
|
||||
end
|
||||
|
||||
opts.on("-f", "--foreground", "Do not daemonize") do |x|
|
||||
options[:daemonize] = false
|
||||
end
|
||||
|
||||
opts.on("-l FILE", "--logfile FILE", "File path to put logs") do |x|
|
||||
options[:logfile] = x
|
||||
end
|
||||
end
|
||||
|
||||
begin
|
||||
opts.order!(args)
|
||||
rescue
|
||||
$stderr.puts "#{$progname}: #{$!}"
|
||||
$stderr.puts opts
|
||||
exit(1)
|
||||
end
|
||||
|
||||
if args.length != 1
|
||||
$stderr.puts "#{$progname}: must specify exactly one config file"
|
||||
$stderr.puts opts
|
||||
exit(1)
|
||||
end
|
||||
options[:config] = args.shift
|
||||
|
||||
return options
|
||||
end # def parse_options
|
||||
|
||||
exit main(ARGV)
|
169
bin/logstashd
169
bin/logstashd
|
@ -1,169 +0,0 @@
|
|||
#!/usr/bin/env ruby
|
||||
|
||||
$: << File.join(File.dirname(__FILE__), "..")
|
||||
|
||||
require 'rubygems'
|
||||
require 'lib/net/servers/indexer'
|
||||
require 'lib/net/servers/parser'
|
||||
require 'logger'
|
||||
require 'optparse'
|
||||
require 'lib/program'
|
||||
|
||||
$progname = File.basename($0)
|
||||
$version = "0.3"
|
||||
$logger = Logger.new(STDOUT)
|
||||
$logger.level = Logger::INFO
|
||||
$logger.progname = $progname
|
||||
$logger.datetime_format = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
class LogStash::Daemon < LogStash::Program
|
||||
def start_indexer
|
||||
pid = fork do
|
||||
$0 = "logstashd (indexer)"
|
||||
indexer = LogStash::Net::Servers::Indexer.new(@options[:config],
|
||||
$logger)
|
||||
indexer.run
|
||||
exit(0)
|
||||
end
|
||||
$logger.info "starting indexer (pid #{pid})"
|
||||
@children[pid] = :indexer
|
||||
end
|
||||
|
||||
def start_parsers
|
||||
1.upto(@options[:parsers]) do |i|
|
||||
pid = fork do
|
||||
$0 = "logstashd (parser)"
|
||||
parser = LogStash::Net::Servers::Parser.new(@options[:config],
|
||||
$logger)
|
||||
parser.run
|
||||
exit(0)
|
||||
end
|
||||
$logger.info "starting parser #{i}/#{@options[:parsers]} (pid #{pid})"
|
||||
@children[pid] = :parser
|
||||
end
|
||||
end
|
||||
|
||||
def run
|
||||
@children = {}
|
||||
super
|
||||
|
||||
start_indexer if @options[:indexer]
|
||||
start_parsers if @options[:parsers] > 0
|
||||
|
||||
$0 = "logstashd (supervisor)"
|
||||
|
||||
termination_handler do
|
||||
@children.keys.each { |pid| Process.kill("TERM", pid) rescue nil }
|
||||
end
|
||||
|
||||
# We do this lame loop instead of "Process.waitall" because of a bug
|
||||
# in ruby 1.8.5 related to handling SIGTERM.
|
||||
|
||||
while @children.keys.length > 0
|
||||
# Ruby 1.8.5 has a bug with signal and syscall handlers.
|
||||
if RUBY_VERSION == "1.8.5"
|
||||
pid = Process.waitpid(-1, Process::WNOHANG)
|
||||
else
|
||||
pid = Process.waitpid(-1, 0)
|
||||
end
|
||||
|
||||
if pid and !dying?
|
||||
$logger.fatal "pid #{pid} died unexpectedly (#{@children[pid]}), " \
|
||||
"initiating shutdown"
|
||||
Process.kill("TERM", $$)
|
||||
end
|
||||
|
||||
if RUBY_VERSION == "1.8.5"
|
||||
sleep(5)
|
||||
end
|
||||
end
|
||||
|
||||
return 0
|
||||
end # def run
|
||||
end # class LogStash::Daemon
|
||||
|
||||
def main(args)
|
||||
options = parse_options(args)
|
||||
logstashd = LogStash::Daemon.new(options)
|
||||
return logstashd.run
|
||||
end
|
||||
|
||||
def parse_options(args)
|
||||
options = {:indexer => true,
|
||||
:parsers => 1,
|
||||
:parserset => false,
|
||||
:config => nil,
|
||||
:daemonize => true,
|
||||
:pidfile => nil,
|
||||
}
|
||||
|
||||
opts = OptionParser.new do |opts|
|
||||
opts.banner = "Usage: logstashd [options] configfile"
|
||||
opts.version = $version
|
||||
|
||||
opts.on("-d", "--debug", "Enable debug output") do |x|
|
||||
$logger.level = Logger::DEBUG
|
||||
end
|
||||
|
||||
opts.on("-I", "--disable-indexer",
|
||||
"Disable indexer (default enabled)") do |x|
|
||||
options[:indexer] = false
|
||||
end
|
||||
|
||||
opts.on("-p", "--parsers COUNT", Integer,
|
||||
"Number of parsers to run (default 1)") do |x|
|
||||
raise(ArgumentError, "parser count must be >=0") if x < 0
|
||||
options[:parsers] = x
|
||||
if options[:parserset]
|
||||
$stderr.puts "can only specify -p N or -P once"
|
||||
exit(1)
|
||||
end
|
||||
options[:parserset] = true
|
||||
end
|
||||
|
||||
opts.on("-P", "--disable-parser", "Disable parser") do |x|
|
||||
options[:parsers] = 0
|
||||
if options[:parserset]
|
||||
$stderr.puts "can only specify -p N or -P once"
|
||||
exit(1)
|
||||
end
|
||||
options[:parserset] = true
|
||||
end
|
||||
|
||||
opts.on("--pidfile FILE", "Path to pidfile") do |x|
|
||||
options[:pidfile] = x
|
||||
end
|
||||
|
||||
opts.on("-f", "--foreground", "Do not daemonize") do |x|
|
||||
options[:daemonize] = false
|
||||
end
|
||||
|
||||
opts.on("-h", "--help", "Show this help message") do |x|
|
||||
puts opts
|
||||
exit(0)
|
||||
end
|
||||
|
||||
opts.on("-l FILE", "--logfile FILE", "File path to put logs") do |x|
|
||||
options[:logfile] = x
|
||||
end
|
||||
end
|
||||
|
||||
begin
|
||||
opts.order!(args)
|
||||
rescue
|
||||
$stderr.puts "#{$progname}: #{$!}"
|
||||
$stderr.puts opts
|
||||
exit(1)
|
||||
end
|
||||
|
||||
if args.length != 1
|
||||
$stderr.puts "#{$progname}: must specify exactly one config file"
|
||||
$stderr.puts opts
|
||||
exit(1)
|
||||
end
|
||||
options[:config] = args.shift
|
||||
|
||||
return options
|
||||
end
|
||||
|
||||
exit main(ARGV)
|
|
@ -1,34 +0,0 @@
|
|||
#!/usr/bin/ruby
|
||||
#
|
||||
require "rubygems"
|
||||
require "lib/net/clients/search"
|
||||
require "lib/config/base"
|
||||
require "lib/util"
|
||||
require "set"
|
||||
|
||||
Thread::abort_on_exception = true
|
||||
|
||||
def main(args)
|
||||
if ARGV.length != 3
|
||||
$stderr.puts "Usage: search configfile log_type query"
|
||||
end
|
||||
stopwatch = LogStash::StopWatch.new
|
||||
client = LogStash::Net::Clients::Search.new(args[0])
|
||||
hits, results = client.search({
|
||||
:log_type => args[1],
|
||||
:query => args[2],
|
||||
:limit => 100,
|
||||
})
|
||||
|
||||
# Collate & print results.
|
||||
puts "Duration: #{stopwatch.to_s(3)}"
|
||||
puts "Hits: #{hits}"
|
||||
puts ""
|
||||
puts results.sort_by { |r| r[0] }.collect { |r| r[1] }.join("\n")
|
||||
|
||||
return 0
|
||||
end
|
||||
|
||||
if $0 == __FILE__
|
||||
exit main(ARGV)
|
||||
end
|
|
@ -1,55 +0,0 @@
|
|||
require 'lib/config/base'
|
||||
require 'lib/logs'
|
||||
require 'lib/log/json'
|
||||
require 'lib/log/text'
|
||||
|
||||
module LogStash; module Config
|
||||
class AgentConfig < BaseConfig
|
||||
attr_reader :logs
|
||||
attr_reader :watched_paths
|
||||
attr_reader :logstash_dir
|
||||
attr_reader :pattern_dir
|
||||
|
||||
def initialize(file)
|
||||
super(file)
|
||||
@logstash_dir = "/var/logstash"
|
||||
@pattern_dir = "/opt/logstash/patterns"
|
||||
@watched_paths = []
|
||||
@logs = LogStash::Logs.new
|
||||
|
||||
data = YAML::load(::File.open(file).read())
|
||||
merge!(data)
|
||||
end
|
||||
|
||||
def merge!(data)
|
||||
@pattern_dir = data["pattern_dir"] if data.has_key?("pattern_dir")
|
||||
@logstash_dir = data["logstash_dir"] if data.has_key?("logstash_dir")
|
||||
@watched_paths = data["watch"] if data.has_key?("watch")
|
||||
|
||||
if data.has_key?("log-types")
|
||||
data["log-types"].each do |log_type, log_data|
|
||||
puts "Got log #{log_type}"
|
||||
log_config = {:type => log_type,
|
||||
:date_key => log_data["date"]["key"],
|
||||
:date_format => log_data["date"]["format"],
|
||||
:logstash_dir => @logstash_dir,
|
||||
:pattern_dir => @pattern_dir,
|
||||
:elasticsearch_host => @elasticsearch_host,
|
||||
}
|
||||
|
||||
log = nil
|
||||
case log_data["type"]
|
||||
when "text"
|
||||
log_config[:grok_patterns] = log_data["patterns"]
|
||||
log = LogStash::Log::TextLog.new(log_config)
|
||||
when "json"
|
||||
log_config[:line_format] = log_data["display_format"]
|
||||
log = LogStash::Log::JsonLog.new(log_config)
|
||||
end
|
||||
|
||||
@logs.register(log)
|
||||
end
|
||||
end
|
||||
end # def merge!
|
||||
end # class AgentConfig
|
||||
end; end # module LogStash::Config
|
|
@ -1,21 +0,0 @@
|
|||
require 'rubygems'
|
||||
require 'yaml'
|
||||
|
||||
module LogStash; module Config
|
||||
# Base config class. All configs need to know how to get to a broker.
|
||||
|
||||
class BaseConfig
|
||||
attr_reader :elasticsearch_host
|
||||
def initialize(file)
|
||||
obj = YAML::load(::File.open(file).read())
|
||||
@elasticsearch_host = obj["elasticsearch_host"] || "localhost:9200"
|
||||
|
||||
@mqhost = obj["mqhost"] || "localhost"
|
||||
@mqport = obj["mqport"] || 5672
|
||||
@mquser = obj["mquser"] || "guest"
|
||||
@mqpass = obj["mqpass"] || "guest"
|
||||
@mqvhost = obj["mqvhost"] || "/"
|
||||
@mqexchange = "logstash.topic"
|
||||
end # def initialize
|
||||
end # class BaseConfig
|
||||
end; end # module LogStash::Config
|
|
@ -1,42 +0,0 @@
|
|||
require 'lib/config/base'
|
||||
require 'lib/logs'
|
||||
require 'lib/log/json'
|
||||
require 'lib/log/text'
|
||||
|
||||
module LogStash; module Config
|
||||
class IndexerConfig < BaseConfig
|
||||
attr_reader :logs
|
||||
attr_reader :logstash_dir
|
||||
attr_reader :pattern_dir
|
||||
|
||||
def initialize(file)
|
||||
super(file)
|
||||
obj = YAML::load(File.open(file).read())
|
||||
|
||||
@logstash_dir = obj["logstash_dir"] || "/var/logstash"
|
||||
@pattern_dir = obj["pattern_dir"] || "/opt/logstash/patterns"
|
||||
@logs = LogStash::Logs.new
|
||||
|
||||
obj["log-types"].each do |log_type, data|
|
||||
log = nil
|
||||
log_config = {:type => log_type,
|
||||
:date_key => data["date"]["key"],
|
||||
:date_format => data["date"]["format"],
|
||||
:logstash_dir => @logstash_dir,
|
||||
:pattern_dir => @pattern_dir,
|
||||
}
|
||||
|
||||
case data["type"]
|
||||
when "text"
|
||||
log_config[:grok_patterns] = data["patterns"]
|
||||
log = LogStash::Log::TextLog.new(log_config)
|
||||
when "json"
|
||||
log_config[:line_format] = data["display_format"]
|
||||
log = LogStash::Log::JsonLog.new(log_config)
|
||||
end
|
||||
|
||||
@logs.register(log)
|
||||
end
|
||||
end # def initialize
|
||||
end # class IndexerConfig
|
||||
end; end # module LogStash::Config
|
|
@ -1,60 +0,0 @@
|
|||
require 'rubygems' if __FILE__ == $0
|
||||
require 'tokyocabinet'
|
||||
require 'ap'
|
||||
|
||||
module LogStash; module DB;
|
||||
class Index
|
||||
def initialize(path)
|
||||
@tdb = TokyoCabinet::TDB::new
|
||||
@path = path
|
||||
open_db
|
||||
end # def initialize
|
||||
|
||||
private
|
||||
def open_db
|
||||
ret = @tdb.open(@path, TokyoCabinet::TDB::OWRITER \
|
||||
| TokyoCabinet::TDB::OCREAT | TokyoCabinet::TDB::ONOLCK)
|
||||
@tdb.setindex("@DATE", TokyoCabinet::TDB::ITDECIMAL)
|
||||
if !ret
|
||||
ecode = @tdb.ecode
|
||||
STDERR.puts("open error: #{@tdb.errmsg(ecode)}")
|
||||
end
|
||||
end
|
||||
|
||||
public
|
||||
def index(data)
|
||||
key = @tdb.genuid
|
||||
ret = @tdb.put(key, data)
|
||||
if !ret
|
||||
ecode = @tdb.ecode
|
||||
STDERR.puts("open error: #{@tdb.errmsg(ecode)}")
|
||||
end
|
||||
end
|
||||
|
||||
public
|
||||
def close
|
||||
@tdb.close
|
||||
end
|
||||
|
||||
public
|
||||
def addindex(column, type)
|
||||
case type
|
||||
when "string"
|
||||
@tdb.setindex(column, TokyoCabinet::TDB::ITTOKEN)
|
||||
#@tdb.setindex(column, TokyoCabinet::TDB::ITLEXICAL)
|
||||
else
|
||||
STDERR.puts("Invalid index type: #{type}")
|
||||
end
|
||||
end
|
||||
end # class Index
|
||||
end; end # module LogStash::DB
|
||||
|
||||
if __FILE__ == $0
|
||||
i = LogStash::DB::Index.new(ARGV[0])
|
||||
args = ARGV[1..-1]
|
||||
args.each do |arg|
|
||||
key, val = arg.split(":", 2)
|
||||
i.addindex(key, val)
|
||||
end
|
||||
i.close
|
||||
end
|
|
@ -1,64 +0,0 @@
|
|||
require 'rubygems' if __FILE__ == $0
|
||||
require 'tokyocabinet'
|
||||
require 'ap'
|
||||
|
||||
module LogStash; module DB;
|
||||
class IndexReader
|
||||
def initialize(path)
|
||||
@tdb = TokyoCabinet::TDB::new
|
||||
@path = path
|
||||
open_db
|
||||
end # def initialize
|
||||
|
||||
private
|
||||
def open_db
|
||||
ret = @tdb.open(@path, TokyoCabinet::TDB::OREADER | TokyoCabinet::TDB::ONOLCK)
|
||||
if !ret
|
||||
ecode = @tdb.ecode
|
||||
STDERR.puts("open error: #{@tdb.errmsg(ecode)}")
|
||||
end
|
||||
end
|
||||
|
||||
public
|
||||
def each
|
||||
@tdb.iterinit
|
||||
while ((key = @tdb.iternext()) != nil)
|
||||
yield key, @tdb.get(key)
|
||||
end
|
||||
end
|
||||
|
||||
public
|
||||
def search(conditions)
|
||||
query = TokyoCabinet::TDBQRY.new(@tdb)
|
||||
conditions.each do |key, value|
|
||||
#query.addcond(key, TokyoCabinet::TDBQRY::QCSTREQ, value)
|
||||
#query.addcond(key, TokyoCabinet::TDBQRY::QCSTRINC, value)
|
||||
query.addcond(key, TokyoCabinet::TDBQRY::QCFTSAND, value)
|
||||
end
|
||||
query.setorder("@DATE", TokyoCabinet::TDBQRY::QONUMASC);
|
||||
query.setlimit(10)
|
||||
results = query.search
|
||||
results.each do |key|
|
||||
data = @tdb.get(key)
|
||||
yield key, data
|
||||
end
|
||||
end
|
||||
end # class LogStash::DB::IndexReader
|
||||
end; end # module LogStash::DB
|
||||
|
||||
if __FILE__ == $0
|
||||
i = LogStash::DB::IndexReader.new(ARGV[0])
|
||||
qargs = ARGV[1..-1]
|
||||
query = {}
|
||||
qargs.each do |arg|
|
||||
key, val = arg.split(":", 2)
|
||||
query[key] = val
|
||||
end
|
||||
|
||||
ap query
|
||||
i.search(query) do |key, value|
|
||||
#ap [key, value["@DATE"], value["@LINE"]]
|
||||
puts value["@LINE"]
|
||||
#ap value
|
||||
end
|
||||
end
|
125
lib/log.rb
125
lib/log.rb
|
@ -1,125 +0,0 @@
|
|||
require 'date'
|
||||
require 'json'
|
||||
#require 'ferret'
|
||||
|
||||
module LogStash
|
||||
class LogException < StandardError
|
||||
end
|
||||
|
||||
class LogNotImplementedException < StandardError
|
||||
end
|
||||
|
||||
class Log
|
||||
REQUIRED_KEYS = [:type, :encoding]
|
||||
OPTIONAL_KEYS = [:attrs, :date_key, :date_format, :logstash_dir,
|
||||
:pattern_dir, :elasticsearch_host]
|
||||
attr_accessor :attrs
|
||||
|
||||
LogParseError = Class.new(StandardError)
|
||||
|
||||
def initialize(config)
|
||||
check_hash_keys(config, REQUIRED_KEYS, OPTIONAL_KEYS)
|
||||
|
||||
@home = config[:logstash_dir] || "/opt/logstash"
|
||||
@pattern_dir = config[:pattern_dir] || @home
|
||||
@attrs = {"log:type" => config[:type],
|
||||
"log:encoding" => config[:encoding]}
|
||||
if config[:attrs]
|
||||
if not config[:attrs].is_a?(Hash)
|
||||
throw LogException.new(":attrs must be a hash")
|
||||
end
|
||||
|
||||
config[:attrs].keys.each do |key|
|
||||
next unless key.to_s[0..3] == "log:"
|
||||
throw LogException.new("extra attrs must not begin with" +
|
||||
" log: (#{key})")
|
||||
end
|
||||
|
||||
@attrs.merge!(config[:attrs])
|
||||
end
|
||||
|
||||
@config = config
|
||||
end
|
||||
|
||||
# passed a string that represents an "entry" in :import_type
|
||||
def import_entry(entry)
|
||||
throw LogNotImplementedException.new
|
||||
end
|
||||
|
||||
def index_dir
|
||||
return "#{@home}/var/indexes/#{@attrs["log:type"]}"
|
||||
end
|
||||
|
||||
def create_index
|
||||
return if File.exists?(index_dir)
|
||||
|
||||
field_infos = Ferret::Index::FieldInfos.new(:store => :no,
|
||||
:term_vector => :no)
|
||||
field_infos.add_field(:@LINE,
|
||||
:store => :compressed,
|
||||
:index => :no)
|
||||
[:@DATE, :@LOG_TYPE, :@SOURCE_HOST].each do |special|
|
||||
field_infos.add_field(special,
|
||||
:store => :compressed,
|
||||
:index => :untokenized)
|
||||
end
|
||||
field_infos.create_index(index_dir)
|
||||
rescue
|
||||
$logger.fatal "error creating index for #{@config[:type]}: #{$!}"
|
||||
exit(4)
|
||||
end
|
||||
|
||||
def get_index
|
||||
#create_index unless File.exists?(index_dir)
|
||||
#return Ferret::Index::Index.new(:path => index_dir)
|
||||
#http = EventMachine::HttpRequest.new("http://localhost:9200/logstash/#{@config[:type]}")
|
||||
#req = http.post :body => entry.to_json
|
||||
end
|
||||
|
||||
def index(entry)
|
||||
#$logger.debug("Logging #{entry}")
|
||||
http = EventMachine::HttpRequest.new("http://#{@config[:elasticsearch_host]}/logstash/#{@config[:type]}")
|
||||
req = http.post :body => entry.to_json
|
||||
end
|
||||
|
||||
def fix_date(res)
|
||||
time = nil
|
||||
if @config[:date_key] and @config[:date_format] and \
|
||||
res[@config[:date_key]]
|
||||
raw_date = res[@config[:date_key]]
|
||||
time = nil
|
||||
begin
|
||||
time = DateTime.strptime(raw_date, @config[:date_format])
|
||||
rescue ArgumentError
|
||||
# time didn't parse
|
||||
time = DateTime.now
|
||||
end
|
||||
end
|
||||
time ||= DateTime.now
|
||||
# Store times in UTC, so subtract
|
||||
res["@DATE"] = time.strftime("%s").to_i
|
||||
|
||||
# Assume local timezone if the date format for this log doesn't
|
||||
# include a timezone offset
|
||||
if !@config[:date_format].include?("%Z")
|
||||
res["@DATE"] -= Time.now.gmtoff
|
||||
end
|
||||
|
||||
return res
|
||||
end
|
||||
|
||||
private
|
||||
def check_hash_keys(hash, required_keys, optional_keys)
|
||||
required_keys.each do |key|
|
||||
next if hash.keys.member?(key)
|
||||
raise LogException.new("missing required key #{key}")
|
||||
end
|
||||
|
||||
hash.keys.each do |key|
|
||||
next if required_keys.member?(key)
|
||||
next if optional_keys.member?(key)
|
||||
raise LogException.new("unknown key #{key}")
|
||||
end
|
||||
end
|
||||
end # class Log
|
||||
end # module LogStash
|
38
lib/logs.rb
38
lib/logs.rb
|
@ -1,38 +0,0 @@
|
|||
module LogStash
|
||||
class LogsException < StandardError
|
||||
end
|
||||
|
||||
class Logs
|
||||
def initialize
|
||||
@logs = {}
|
||||
end
|
||||
|
||||
def register(log)
|
||||
if not log.is_a?(Log)
|
||||
throw LogsException.new("#{log} is not a Log object")
|
||||
end
|
||||
|
||||
log_type = log.attrs["log:type"]
|
||||
if @logs.keys.member?(log_type)
|
||||
throw LogsException.new("#{log_type}: duplicate log_type")
|
||||
end
|
||||
|
||||
@logs[log_type] = log
|
||||
end
|
||||
|
||||
def [](log_type)
|
||||
return @logs[log_type]
|
||||
end
|
||||
|
||||
def types
|
||||
return @logs.keys
|
||||
end
|
||||
|
||||
def each
|
||||
@logs.each do |k,v|
|
||||
yield k,v
|
||||
end
|
||||
end # def each
|
||||
|
||||
end # class Logs
|
||||
end # module LogStash
|
13
lib/net.rb
13
lib/net.rb
|
@ -1,13 +0,0 @@
|
|||
require 'rubygems'
|
||||
require 'socket'
|
||||
require 'time'
|
||||
|
||||
module LogStash
|
||||
class MessageServer
|
||||
def initialize
|
||||
end
|
||||
|
||||
def register_request(name, version)
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,63 +0,0 @@
|
|||
require 'lib/config/agent'
|
||||
require 'lib/net/common'
|
||||
require 'lib/net/messages/indexevent'
|
||||
require 'lib/file/tail/since'
|
||||
require 'socket'
|
||||
|
||||
module LogStash; module Net; module Clients
|
||||
class Agent < MQRPC::Agent
|
||||
def initialize(configfile, logger)
|
||||
@config = LogStash::Config::AgentConfig.new(configfile)
|
||||
MQRPC::logger = logger
|
||||
super(@config)
|
||||
@hostname = Socket.gethostname
|
||||
@msgs = []
|
||||
@log_threads = {}
|
||||
@logger = logger
|
||||
end # def initialize
|
||||
|
||||
def log_watcher
|
||||
@logger.debug "Starting log_watcher loop"
|
||||
@config.sources.each do |file, logtype|
|
||||
next if @log_threads.member?(file)
|
||||
|
||||
Dir.glob(file).each do |path|
|
||||
next if @log_threads.member?(path)
|
||||
next if File.directory?(path)
|
||||
@log_threads[path] = Thread.new do
|
||||
@logger.info "Watching #{path} (type #{logtype})"
|
||||
File::Tail::Since.new(path).tail do |line|
|
||||
index(logtype, line.chomp)
|
||||
end
|
||||
raise "File::Tail::Since croaked for #{file}!"
|
||||
end # Thread
|
||||
end # Dir.glob
|
||||
end # @config.sources.each
|
||||
end # def start_log_watcher
|
||||
|
||||
def index(type, string)
|
||||
ier = LogStash::Net::Messages::IndexEventRequest.new
|
||||
ier.log_type = type
|
||||
ier.log_data = string.strip_upper_ascii
|
||||
ier.metadata["source_host"] = @hostname
|
||||
|
||||
@logger.debug "Indexing #{type}: #{string}"
|
||||
ier.delayable = true
|
||||
sendmsg("logstash", ier)
|
||||
end # def index
|
||||
|
||||
def IndexEventResponseHandler(msg)
|
||||
if msg.code != 0
|
||||
@logger.warn "Error indexing line (code=#{msg.code}): #{msg.error}"
|
||||
end
|
||||
end # def IndexEventResponseHandler
|
||||
|
||||
def run
|
||||
EM.add_periodic_timer(60) do
|
||||
check_for_logs_to_watch
|
||||
end
|
||||
|
||||
super
|
||||
end # def run
|
||||
end # class LogStash::Net::Clients::Agent
|
||||
end; end; end # LogStash::Net::Clients
|
|
@ -1,52 +0,0 @@
|
|||
#!/usr/bin/ruby
|
||||
|
||||
require "rubygems"
|
||||
require "lib/config/base"
|
||||
require "uri"
|
||||
require "json"
|
||||
require "ap"
|
||||
require "logger"
|
||||
require "httpclient"
|
||||
|
||||
module LogStash; module Net; module Clients
|
||||
class ElasticSearch
|
||||
def initialize(config_file)
|
||||
@config = LogStash::Config::BaseConfig.new(config_file)
|
||||
@http = HTTPClient.new
|
||||
@logger = Logger.new(STDERR)
|
||||
end
|
||||
|
||||
def _get(params, what, path = "")
|
||||
path.gsub!(/\/+$/, "")
|
||||
uri = URI.escape("http://#{@config.elasticsearch_host}#{path}/_#{what}")
|
||||
@logger.info("URL for #{what}: #{uri}")
|
||||
@logger.info("Body: #{params.to_json}");
|
||||
# ElasticSearch uses "GET" with body, so we can't call .get() here.
|
||||
response = @http.request(:get, uri, query = nil, body = params.to_json)
|
||||
|
||||
if response.status != 200
|
||||
ap JSON.parse(response.content)
|
||||
raise "Search failure (http code #{response.code})"
|
||||
end
|
||||
return JSON.parse(response.content)
|
||||
end
|
||||
|
||||
def query(query, path = "")
|
||||
return ElasticSearch::SearchResults.new(_get(query, "search", path))
|
||||
end # def query
|
||||
|
||||
def count(query, path = "")
|
||||
return _get(query, "count", path)["count"]
|
||||
end
|
||||
end
|
||||
|
||||
class ElasticSearch::SearchResults
|
||||
attr_reader :hits
|
||||
attr_reader :results
|
||||
|
||||
def initialize(data)
|
||||
@hits = data["hits"]["total"]
|
||||
@results = data["hits"]["hits"]
|
||||
end
|
||||
end # class Logstash::Net::Clients::ElasticSearch::SearchResults
|
||||
end; end; end # module LogStash::Net::Clients
|
|
@ -1,155 +0,0 @@
|
|||
#!/usr/bin/ruby
|
||||
#
|
||||
require "rubygems"
|
||||
require "lib/config/base"
|
||||
require "lib/net/messages/directory"
|
||||
require "lib/net/messages/indexevent"
|
||||
require "lib/net/messages/search"
|
||||
require "lib/net/messages/searchhits"
|
||||
require "lib/net/messages/ping"
|
||||
require "set"
|
||||
|
||||
|
||||
module LogStash::Net::Clients
|
||||
class Search < MQRPC::Agent
|
||||
attr_accessor :indexers
|
||||
attr_reader :hits
|
||||
attr_reader :responding
|
||||
attr_reader :results
|
||||
|
||||
def initialize(config_file)
|
||||
@indexers = Array.new
|
||||
@responding = Array.new
|
||||
@hits = 0
|
||||
@results = []
|
||||
@result_mutex = Mutex.new
|
||||
config = LogStash::Config::BaseConfig.new(config_file)
|
||||
super(config)
|
||||
start
|
||||
end
|
||||
|
||||
def start
|
||||
# find our indexers
|
||||
msg = LogStash::Net::Messages::DirectoryRequest.new
|
||||
op = sendmsg("logstash-directory", msg) do |response|
|
||||
DirectoryResponseHandler(response)
|
||||
:finished
|
||||
end
|
||||
|
||||
op.wait_until_finished
|
||||
end
|
||||
|
||||
def SearchResponseHandler(msg)
|
||||
@result_mutex.synchronize do
|
||||
msg.results.each do |result|
|
||||
@results << result
|
||||
end
|
||||
if msg.finished
|
||||
@responding << msg.indexer_id
|
||||
if @responding.length == @indexers.length
|
||||
close
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def SearchHitsResponseHandler(msg)
|
||||
@result_mutex.synchronize do
|
||||
@hits += msg.hits
|
||||
end
|
||||
end
|
||||
|
||||
def DirectoryResponseHandler(msg)
|
||||
@indexers = msg.indexers
|
||||
end
|
||||
|
||||
def search(options)
|
||||
query = options[:query]
|
||||
# Also skip things that need parsing when searching, by default.
|
||||
if !query.include?("@NEEDSPARSING")
|
||||
query = "(#{query}) AND -@NEEDSPARSING:1"
|
||||
end
|
||||
|
||||
@logger.info "Query: #{query}"
|
||||
|
||||
hits_msg = LogStash::Net::Messages::SearchHitsRequest.new
|
||||
hits_msg.log_type = options[:log_type]
|
||||
hits_msg.query = query
|
||||
search_msg = LogStash::Net::Messages::SearchRequest.new
|
||||
search_msg.log_type = options[:log_type]
|
||||
search_msg.query = query
|
||||
search_msg.limit = options[:limit]
|
||||
search_msg.offset = options[:offset]
|
||||
hits = 0
|
||||
results = []
|
||||
ops = []
|
||||
@indexers.each do |i|
|
||||
ops << sendmsg(i, hits_msg) do |msg|
|
||||
@logger.debug "Got #{msg.class} with age #{msg.age}"
|
||||
hits += msg.hits
|
||||
:finished
|
||||
end
|
||||
ops << sendmsg(i, search_msg) do |msg|
|
||||
@logger.debug "Got #{msg.class} with age #{msg.age}"
|
||||
msg.results.each do |result|
|
||||
results << result
|
||||
end
|
||||
:finished if msg.finished
|
||||
end
|
||||
end
|
||||
|
||||
ops.each do |op|
|
||||
op.wait_until_finished
|
||||
end
|
||||
|
||||
return [hits, results]
|
||||
end
|
||||
|
||||
def searchhits(log_type, queries)
|
||||
if !queries.is_a?(Array)
|
||||
queries = [queries]
|
||||
end
|
||||
|
||||
|
||||
hits = Hash.new { |h,k| h[k] = 0 }
|
||||
ops = []
|
||||
|
||||
queries.each do |query|
|
||||
options = {
|
||||
:query => query,
|
||||
:log_type => log_type,
|
||||
}
|
||||
|
||||
# Also skip things that need parsing when searching, by default.
|
||||
if !query.include?("@NEEDSPARSING")
|
||||
realquery = "(#{query}) AND -@NEEDSPARSING:1"
|
||||
else
|
||||
realquery = query
|
||||
end
|
||||
|
||||
@logger.info "Query: #{realquery}"
|
||||
|
||||
hits_msg = LogStash::Net::Messages::SearchHitsRequest.new
|
||||
hits_msg.log_type = options[:log_type]
|
||||
hits_msg.query = realquery
|
||||
@indexers.each do |i|
|
||||
ops << sendmsg(i, hits_msg) do |msg|
|
||||
@logger.debug "Got #{msg.class} with age #{msg.age} (query: #{query})"
|
||||
hits[query] += msg.hits
|
||||
@logger.debug "Hits: #{msg.hits}"
|
||||
:finished
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
remaining = ops.length
|
||||
ops.each do |op|
|
||||
op.wait_until_finished
|
||||
remaining -=1
|
||||
@logger.debug "Waiting for #{remaining} operations"
|
||||
end
|
||||
|
||||
return hits
|
||||
end
|
||||
end; end # class LogStash::Net::Clients::Search
|
||||
|
|
@ -1,56 +0,0 @@
|
|||
require 'zlib'
|
||||
require 'rubygems'
|
||||
require 'eventmachine'
|
||||
|
||||
module LogStash; module Net;
|
||||
MAXMSGLEN = (1 << 20) # one megabyte message blocks
|
||||
|
||||
class MessageCorrupt < StandardError
|
||||
attr_reader :expected_checksum
|
||||
attr_reader :data
|
||||
|
||||
def initialize(checksum, data)
|
||||
@expected_checksum = checksum
|
||||
@data = data
|
||||
super("Corrupt message read. Expected checksum #{checksum}, got " +
|
||||
"#{data.checksum}")
|
||||
end # def initialize
|
||||
end # class MessageReaderCorruptMessage
|
||||
|
||||
end; end # module LogStash::Net
|
||||
|
||||
# Add adler32 checksum from Zlib to String class
|
||||
class String
|
||||
def adler32
|
||||
return Zlib.adler32(self)
|
||||
end # def checksum
|
||||
|
||||
alias_method :checksum, :adler32
|
||||
|
||||
def strip_upper_ascii
|
||||
# Ruby's JSON barfs if you try to encode upper ascii characters
|
||||
# as it assumes all strings are unicode.
|
||||
newstr = self.dup
|
||||
(0 .. newstr.length - 1).each do |i|
|
||||
break if !newstr[i]
|
||||
# ruby 1.9 String#[] returns a string, 1.8 returns an int
|
||||
# force an int.
|
||||
if newstr[i].to_i >= 128
|
||||
newstr[i] = ""
|
||||
end
|
||||
end
|
||||
return newstr
|
||||
end # def strip_upper_ascii
|
||||
end # class String
|
||||
|
||||
# EventMachine uses ruby1.8 (not in 1.9) function Thread#kill!,
|
||||
# so let's fake it.
|
||||
class Thread
|
||||
def kill!(*args)
|
||||
kill
|
||||
end # def kill!
|
||||
end
|
||||
|
||||
if ENV.has_key?("USE_EPOLL")
|
||||
EventMachine.epoll
|
||||
end
|
|
@ -1,7 +0,0 @@
|
|||
require "mqrpc"
|
||||
|
||||
module LogStash; module Net; module Messages
|
||||
class BroadcastMessage < MQRPC::Message
|
||||
argument :queue
|
||||
end # class BroadcastRequest
|
||||
end; end; end # module LogStash::Net::Messages
|
|
@ -1,11 +0,0 @@
|
|||
require "mqrpc"
|
||||
|
||||
module LogStash; module Net; module Messages
|
||||
class DirectoryRequest < MQRPC::RequestMessage
|
||||
# No message attributes
|
||||
end # class DirectoryRequest
|
||||
|
||||
class DirectoryResponse < MQRPC::ResponseMessage
|
||||
argument :indexers
|
||||
end # class DirectoryResponse
|
||||
end; end; end # module LogStash::Net::Messages
|
|
@ -1,23 +0,0 @@
|
|||
require "mqrpc"
|
||||
|
||||
module LogStash; module Net; module Messages
|
||||
class IndexEventRequest < MQRPC::RequestMessage
|
||||
argument :log_type
|
||||
argument :log_data
|
||||
argument :metadata
|
||||
|
||||
def initialize
|
||||
super
|
||||
self.metadata = Hash.new
|
||||
end
|
||||
end # class IndexEventRequest
|
||||
|
||||
class IndexEventResponse < MQRPC::ResponseMessage
|
||||
argument :code
|
||||
argument :error
|
||||
|
||||
def success?
|
||||
return self.code == 0
|
||||
end
|
||||
end # class IndexEventResponse
|
||||
end; end; end # module LogStash::Net::Messages
|
|
@ -1,11 +0,0 @@
|
|||
require "mqrpc"
|
||||
|
||||
module LogStash; module Net; module Messages
|
||||
class LogKeysRequest < MQRPC::RequestMessage
|
||||
argument :log_type
|
||||
end # class LogKeysRequest
|
||||
|
||||
class LogKeysResponse < MQRPC::ResponseMessage
|
||||
argument :keys
|
||||
end # class LogKeysResponse
|
||||
end; end; end # module LogStash::Net::Messages
|
|
@ -1,12 +0,0 @@
|
|||
require "mqrpc"
|
||||
|
||||
module LogStash; module Net; module Messages
|
||||
class LogTypesRequest < MQRPC::RequestMessage
|
||||
# No message attributes
|
||||
end # class LogTypesRequest
|
||||
|
||||
class LogTypesResponse < MQRPC::ResponseMessage
|
||||
argument :log_type
|
||||
argument :types
|
||||
end # class LogTypesResponse
|
||||
end; end; end # module LogStash::Net::Messages
|
|
@ -1,16 +0,0 @@
|
|||
require "mqrpc"
|
||||
|
||||
module LogStash; module Net; module Messages
|
||||
class PingRequest < MQRPC::RequestMessage
|
||||
argument :pingdata
|
||||
|
||||
def initialize
|
||||
super
|
||||
self.pingdata = Time.now.to_f
|
||||
end
|
||||
end # class PingRequest
|
||||
|
||||
class PingResponse < MQRPC::ResponseMessage
|
||||
argument :pingdata
|
||||
end # class PingResponse
|
||||
end; end; end # module LogStash::Net::Messages
|
|
@ -1,11 +0,0 @@
|
|||
require "mqrpc"
|
||||
|
||||
module LogStash; module Net; module Messages
|
||||
class QuitRequest < MQRPC::RequestMessage
|
||||
# No attributes
|
||||
end # class QuitRequest
|
||||
|
||||
class QuitResponse < MQRPC::ResponseMessage
|
||||
# No attributes
|
||||
end # class QuitResponse
|
||||
end; end; end # module LogStash::Net::Messages
|
|
@ -1,16 +0,0 @@
|
|||
require "mqrpc"
|
||||
|
||||
module LogStash; module Net; module Messages
|
||||
class SearchRequest < MQRPC::RequestMessage
|
||||
argument :query
|
||||
argument :log_type
|
||||
argument :offset
|
||||
argument :limit
|
||||
end # class SearchRequest
|
||||
|
||||
class SearchResponse < MQRPC::ResponseMessage
|
||||
argument :results
|
||||
argument :indexer_id
|
||||
argument :finished
|
||||
end # class SearchResponse
|
||||
end; end; end # module LogStash::Net::Messages
|
|
@ -1,14 +0,0 @@
|
|||
require "mqrpc"
|
||||
|
||||
module LogStash; module Net; module Messages
|
||||
class SearchHitsRequest < MQRPC::RequestMessage
|
||||
argument :query
|
||||
argument :log_type
|
||||
argument :offset
|
||||
argument :limit
|
||||
end # class SearchHitsRequest
|
||||
|
||||
class SearchHitsResponse < MQRPC::ResponseMessage
|
||||
argument :hits
|
||||
end # class SearchHitsResponse
|
||||
end; end; end # module LogStash::Net::Messages
|
|
@ -1,268 +0,0 @@
|
|||
|
||||
require 'rubygems'
|
||||
require 'ferret'
|
||||
require 'lib/config/indexer.rb'
|
||||
require 'lib/log/text'
|
||||
require 'lib/net/messages/broadcast'
|
||||
require 'lib/net/messages/directory'
|
||||
require 'lib/net/messages/indexevent'
|
||||
require 'lib/net/messages/logkeys'
|
||||
require 'lib/net/messages/logtypes'
|
||||
require 'lib/net/messages/ping'
|
||||
require 'lib/net/messages/search'
|
||||
require 'lib/net/messages/searchhits'
|
||||
require 'lib/util'
|
||||
require 'mqrpc'
|
||||
require 'mqrpc/functions/ping'
|
||||
require 'pp'
|
||||
|
||||
module LogStash; module Net; module Servers
|
||||
class Indexer < MQRPC::Agent
|
||||
#include MQRPC::Functions::Ping
|
||||
# Default broadcast interval is 30, but for debugging/testing, sometimes
|
||||
# it's nice to set it lower.
|
||||
BROADCAST_INTERVAL = (ENV["BROADCAST_INTERVAL"] or 30).to_i
|
||||
SYNC_DELAY = 10
|
||||
|
||||
def initialize(configfile, logger)
|
||||
@logger = logger
|
||||
@logger.progname = "indexer"
|
||||
@config = LogStash::Config::IndexerConfig.new(configfile)
|
||||
|
||||
MQRPC::logger = @logger
|
||||
super(@config)
|
||||
@indexes = Hash.new
|
||||
@lines = Hash.new { |h,k| h[k] = 0 }
|
||||
@indexcount = 0
|
||||
@starttime = Time.now
|
||||
@indexers = Hash.new
|
||||
@indexers_mutex = Mutex.new
|
||||
@readers = Hash.new
|
||||
@searchers = Hash.new
|
||||
@qps = Hash.new
|
||||
end
|
||||
|
||||
def IndexEventRequestHandler(request)
|
||||
response = LogStash::Net::Messages::IndexEventResponse.new(request)
|
||||
@indexcount += 1
|
||||
|
||||
if @indexcount % 100 == 0
|
||||
duration = (Time.now.to_f - @starttime.to_f)
|
||||
@logger.debug "rate: %.2f/sec" % (@indexcount / duration)
|
||||
end
|
||||
|
||||
log_type = request.log_type
|
||||
|
||||
@indexes[log_type] ||= @config.logs[log_type].get_index
|
||||
@indexes[log_type] << request.log_data
|
||||
response.code = 0
|
||||
yield response
|
||||
end
|
||||
|
||||
def LogTypesRequestHandler(request)
|
||||
@logger.debug "received LogTypesRequest"
|
||||
response = LogStash::Net::Messages::LogTypesResponse.new(request)
|
||||
response.types = @config.logs.types
|
||||
yield response
|
||||
end
|
||||
|
||||
def LogKeysRequestHandler(request)
|
||||
@logger.debug "received LogKeysRequest"
|
||||
reader, search, qp = get_ferret(request.log_type)
|
||||
response = LogStash::Net::Messages::LogKeysResponse.new(request)
|
||||
response.keys = reader.fields
|
||||
response.log_type = request.log_type
|
||||
yield response
|
||||
end
|
||||
|
||||
def get_ferret(log_type)
|
||||
|
||||
# open the index every time otherwise we get stale results.
|
||||
reader = Ferret::Index::IndexReader.new(@config.logs[log_type].index_dir)
|
||||
searcher = Ferret::Search::Searcher.new(reader)
|
||||
@qps[log_type] ||= Ferret::QueryParser.new(
|
||||
:fields => reader.fields,
|
||||
:tokenized_fields => reader.tokenized_fields,
|
||||
:or_default => false)
|
||||
#return @readers[log_type], @searchers[log_type], @qps[log_type]
|
||||
return reader, searcher, @qps[log_type]
|
||||
end
|
||||
|
||||
def SearchRequestHandler(request)
|
||||
@logger.debug "received SearchRequest (#{request.query.inspect} in " \
|
||||
"#{request.log_type})"
|
||||
@logger.debug "message age is #{request.age} seconds"
|
||||
stopwatch = LogStash::StopWatch.new
|
||||
response = LogStash::Net::Messages::SearchResponse.new(request)
|
||||
response.indexer_id = @id
|
||||
|
||||
if @config.logs[request.log_type].nil?
|
||||
@logger.warn "SearchRequest received for invalid log_type: " \
|
||||
"#{request.log_type}"
|
||||
response.results = []
|
||||
response.finished = true
|
||||
yield response
|
||||
return
|
||||
end
|
||||
|
||||
begin
|
||||
reader, search, qp = get_ferret(request.log_type)
|
||||
rescue Ferret::FileNotFoundError => e
|
||||
$logger.warn "get_ferret failed: #{e.inspect}"
|
||||
response.results = []
|
||||
response.finished = true
|
||||
yield response
|
||||
return
|
||||
end
|
||||
query = qp.parse(request.query)
|
||||
offset = (request.offset or 0)
|
||||
total = request.limit
|
||||
max_limit = 50
|
||||
results = []
|
||||
limit = max_limit
|
||||
|
||||
done = false
|
||||
while !done
|
||||
done = true
|
||||
|
||||
if total
|
||||
limit = [total, max_limit].min
|
||||
total -= limit
|
||||
if limit <= 0
|
||||
done = true
|
||||
next
|
||||
end
|
||||
end
|
||||
|
||||
count = 0
|
||||
search.search_each(query, :limit => limit, :offset => offset,
|
||||
:sort => "@DATE") do |docid, score|
|
||||
done = false
|
||||
result = reader[docid][:@LINE]
|
||||
count += 1
|
||||
results << [reader[docid][:@DATE], result]
|
||||
end
|
||||
|
||||
if (total and count < limit)
|
||||
done = true
|
||||
end
|
||||
part_response = LogStash::Net::Messages::SearchResponse.new(request)
|
||||
part_response.results = results
|
||||
part_response.finished = false
|
||||
yield part_response
|
||||
results = []
|
||||
offset += count
|
||||
end
|
||||
response.results = []
|
||||
response.finished = true
|
||||
yield response
|
||||
|
||||
@logger.info "SearchRequest for '#{request.query}' took #{stopwatch.to_s(4)}"
|
||||
end # def SearchRequestHandler
|
||||
|
||||
def SearchHitsRequestHandler(request)
|
||||
@logger.debug "received SearchHitsRequest (#{request.query.inspect} in " \
|
||||
"#{request.log_type})"
|
||||
response = LogStash::Net::Messages::SearchHitsResponse.new(request)
|
||||
if @config.logs[request.log_type].nil?
|
||||
@logger.warn "SearchHitsRequest received for invalid log_type: " \
|
||||
"#{request.log_type}"
|
||||
response.hits = 0
|
||||
yield response
|
||||
return
|
||||
end
|
||||
|
||||
begin
|
||||
reader, search, qp = get_ferret(request.log_type)
|
||||
rescue Ferret::FileNotFoundError => e
|
||||
@logger.warn "get_ferret failed: #{e.inspect}"
|
||||
response.hits = 0
|
||||
yield response
|
||||
return
|
||||
end
|
||||
query = qp.parse(request.query)
|
||||
offset = (request.offset or 0)
|
||||
|
||||
# search_each returns number of hits, even if we don't yield them.
|
||||
hits = search.search_each(query, :limit => 1, :offset => offset) { |docid, score| }
|
||||
|
||||
response.hits = hits
|
||||
yield response
|
||||
end # def SearchHitsRequestHandler
|
||||
|
||||
def BroadcastMessageHandler(request)
|
||||
@logger.debug "received BroadcastMessage (from #{request.queue})"
|
||||
@indexers_mutex.synchronize do
|
||||
@indexers[request.queue] = Time.now
|
||||
end
|
||||
end
|
||||
|
||||
def DirectoryRequestHandler(request)
|
||||
@logger.debug "received DirectoryRequest"
|
||||
response = LogStash::Net::Messages::DirectoryResponse.new(request)
|
||||
response.indexers = @indexers.keys
|
||||
yield response
|
||||
end
|
||||
|
||||
# Special 'run' override because we have extra things to do:
|
||||
# - listen to generic "logstash-index" queue
|
||||
# - listen for indexer broadcasts on "logstash-broadcast"
|
||||
# - sync to disk once per minute.
|
||||
# - broadcast our presence to other indexers.
|
||||
# - respond to directory requests
|
||||
def run
|
||||
subscribe("logstash-index")
|
||||
subscribe_topic("logstash-broadcast")
|
||||
@syncer = Thread.new { syncer }
|
||||
@broadcaster = Thread.new { broadcaster }
|
||||
@directory_responder = Thread.new do
|
||||
# We wait to make sure we've seen some broadcasts before we start
|
||||
# answering directory requests.
|
||||
|
||||
sleep(BROADCAST_INTERVAL + 5)
|
||||
subscribe("logstash-directory")
|
||||
end
|
||||
super
|
||||
end # def run
|
||||
|
||||
def syncer
|
||||
synctime = Time.now + SYNC_DELAY
|
||||
loop do
|
||||
if Time.now > synctime
|
||||
@indexes.each do |log_type, index|
|
||||
# TODO: only run flush if we need to
|
||||
@logger.debug "Forcing a sync of #{log_type}"
|
||||
begin
|
||||
index.flush
|
||||
rescue EOFError
|
||||
@logger.fatal "#{log_type}'s index is corrupt: #{$!}."
|
||||
exit(2)
|
||||
end
|
||||
end
|
||||
|
||||
synctime = Time.now + SYNC_DELAY
|
||||
end
|
||||
sleep(synctime - Time.now)
|
||||
end
|
||||
end # def syncer
|
||||
|
||||
def broadcaster
|
||||
msg = LogStash::Net::Messages::BroadcastMessage.new
|
||||
msg.queue = @id
|
||||
loop do
|
||||
@logger.debug "Sending #{msg.class}"
|
||||
sendmsg_topic("logstash-broadcast", msg)
|
||||
sleep(BROADCAST_INTERVAL)
|
||||
@indexers_mutex.synchronize do
|
||||
cutoff = Time.now - (BROADCAST_INTERVAL * 2)
|
||||
@indexers.each do |queue, heartbeat|
|
||||
next if heartbeat > cutoff
|
||||
@logger.warn "dropping indexer #{queue}, last heartbeat at " \
|
||||
"#{Time.at(heartbeat)}"
|
||||
@indexers.delete(queue)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end # Indexer
|
||||
end; end; end # LogStash::Net::Server
|
|
@ -1,147 +0,0 @@
|
|||
require 'rubygems'
|
||||
#require 'net/http'
|
||||
#require 'curb'
|
||||
require 'em-http'
|
||||
|
||||
require 'lib/config/indexer.rb'
|
||||
require 'lib/log/text'
|
||||
require 'lib/net/messages/indexevent'
|
||||
require 'lib/net/messages/ping'
|
||||
require 'lib/net/messages/search'
|
||||
require 'lib/net/messages/searchhits'
|
||||
require 'mqrpc'
|
||||
require 'pp'
|
||||
|
||||
class EventMachine::Protocols::HttpClient2::Request
|
||||
def __send_request
|
||||
az = @args[:authorization] and az = "Authorization: #{az}\r\n"
|
||||
|
||||
is_post = (@args[:post_body] != nil)
|
||||
r = [
|
||||
"#{@args[:verb]} #{@args[:uri]} HTTP/#{@args[:version] || "1.1"}",
|
||||
"Host: #{@args[:host_header] || "_"}",
|
||||
]
|
||||
|
||||
r << az if az
|
||||
|
||||
if is_post
|
||||
r << "Content-Length: #{@args[:post_body].length}"
|
||||
r << "Content-Type: application/json"
|
||||
end
|
||||
|
||||
# end of request
|
||||
r << ""
|
||||
|
||||
# Put post body
|
||||
if is_post
|
||||
r << @args[:post_body]
|
||||
end
|
||||
|
||||
@conn.send_data r.join("\r\n")
|
||||
end
|
||||
end
|
||||
|
||||
module LogStash; module Net; module Servers
|
||||
class Parser < MQRPC::Agent
|
||||
handle LogStash::Net::Messages::IndexEventRequest, :IndexEventRequestHandler
|
||||
handle LogStash::Net::Messages::IndexEventResponse, :IndexEventResponseHandler
|
||||
|
||||
def initialize(configfile, logger)
|
||||
@config = LogStash::Config::IndexerConfig.new(configfile)
|
||||
@logger = logger
|
||||
@logger.progname = "parser"
|
||||
MQRPC::logger = @logger
|
||||
super(@config)
|
||||
@lines = Hash.new { |h,k| h[k] = 0 }
|
||||
@indexcount = 0
|
||||
@starttime = Time.now
|
||||
#@elasticsearchconn = EventMachine::Protocols::HttpClient2.connect("127.0.0.1", 9200)
|
||||
@elasticsearchconn = \
|
||||
EventMachine::HttpRequest.new("http://127.0.0.1:9200/log/stash")
|
||||
end
|
||||
|
||||
def IndexEventRequestHandler(request)
|
||||
@logger.debug "received IndexEventRequest (for type " \
|
||||
"#{request.log_type}): #{request.log_data}"
|
||||
response = LogStash::Net::Messages::IndexEventResponse.new(request)
|
||||
@indexcount += 1
|
||||
|
||||
if @indexcount % 100 == 0
|
||||
duration = (Time.now.to_f - @starttime.to_f)
|
||||
@logger.debug "rate: %.2f/sec" % (@indexcount / duration)
|
||||
end
|
||||
|
||||
log_type = request.log_type
|
||||
entry = nil
|
||||
reason = "unknown; parse_entry returned without an exception"
|
||||
begin
|
||||
entry = @config.logs[log_type].parse_entry(request.log_data)
|
||||
rescue LogStash::Log::LogParseError
|
||||
reason = $!
|
||||
end
|
||||
|
||||
if !entry
|
||||
@logger.warn "Failed parsing line: #{reason}: #{request.log_data}"
|
||||
response.code = 1
|
||||
response.error = "Entry was #{entry.inspect} (log parsing " \
|
||||
"failed: #{reason})"
|
||||
entry = {
|
||||
"@NEEDSPARSING" => 1,
|
||||
"@LINE" => request.log_data
|
||||
}
|
||||
else
|
||||
response.code = 0
|
||||
end
|
||||
yield response
|
||||
entry["@LOG_TYPE"] = log_type
|
||||
|
||||
# Make a new ID for this request before we forward it.
|
||||
#request.generate_id!
|
||||
|
||||
# Now we have a hash for the log data, send it to the indexer
|
||||
#request.log_data = entry
|
||||
#
|
||||
|
||||
start = Time.now
|
||||
req = @elasticsearchconn.post({
|
||||
#:uri => "/log/stash",
|
||||
#:post_body => entry.to_json,
|
||||
:body => entry.to_json,
|
||||
})
|
||||
req.callback do |response|
|
||||
#@logger.debug "Duration f" + response.response_header.status
|
||||
@logger.debug "Duration: #{Time.now - start}"
|
||||
end
|
||||
|
||||
# Push our message onto the queue
|
||||
#curl = Curl::Easy.new("http://localhost:9200/log/stash")
|
||||
#curl.headers["Content-Type"] = "application/json"
|
||||
#curl.post_body = entry.to_json
|
||||
#s = Time.now
|
||||
#@logger.debug "Starting index request"
|
||||
#curl.perform
|
||||
#@logger.debug "Response: #{curl.response_code} #{curl.body_str}"
|
||||
#@logger.debug "Duration: #{Time.now - s}"
|
||||
#@logger.debug "Duration: " + curl.methods.grep(/_time$/).sort.collect { |a| [a, curl.send(a)] }.join(", ")
|
||||
|
||||
end
|
||||
|
||||
def IndexEventResponseHandler(response)
|
||||
# This message comes from the indexer, we don't need to really
|
||||
# do anything with it.
|
||||
end
|
||||
|
||||
def PingRequestHandler(request)
|
||||
@logger.debug "received PingRequest (#{request.pingdata})"
|
||||
response = LogStash::Net::Messages::PingResponse.new(request)
|
||||
response.pingdata = request.pingdata
|
||||
yield response
|
||||
end
|
||||
|
||||
# Special 'run' override because we want sync to disk once per minute.
|
||||
def run
|
||||
subscribe("logstash")
|
||||
super
|
||||
end # def run
|
||||
end # Parser
|
||||
end; end; end # LogStash::Net::Server
|
107
lib/program.rb
107
lib/program.rb
|
@ -1,107 +0,0 @@
|
|||
module LogStash
|
||||
class Program
|
||||
class PidFileLockFailed < StandardError
|
||||
end # class LogStash::Program::PidFileLockFailed
|
||||
|
||||
def initialize(options)
|
||||
@pidfile = options[:pidfile]
|
||||
@logfile = options[:logfile]
|
||||
@daemonize = options[:daemonize]
|
||||
@options = options
|
||||
@dying = false
|
||||
end
|
||||
|
||||
def run
|
||||
Thread::abort_on_exception = true
|
||||
grab_pidfile if @pidfile
|
||||
redirect_io
|
||||
daemonize if @daemonize
|
||||
termination_handler do
|
||||
puts "Default termination signal handler being invoked."
|
||||
end
|
||||
yield @options if block_given?
|
||||
end # def run
|
||||
|
||||
def termination_handler(&block)
|
||||
@termination_callback = block
|
||||
Signal.trap("INT") do
|
||||
Process.kill("TERM", $$)
|
||||
end
|
||||
|
||||
Signal.trap("TERM") do
|
||||
dying
|
||||
$logger.warn "received SIGTERM, shutting down"
|
||||
@termination_callback.call if @termination_callback
|
||||
Process.waitall
|
||||
if @pidfile_fd
|
||||
@pidfile_fd.close
|
||||
@pidfile_fd.delete
|
||||
end
|
||||
exit(5)
|
||||
end
|
||||
end # def register_signals
|
||||
|
||||
def redirect_io
|
||||
if @logfile != nil
|
||||
logfd = File.open(@logfile, "a")
|
||||
logfd.sync = true
|
||||
$stdout.reopen(logfd)
|
||||
$stderr.reopen(logfd)
|
||||
else
|
||||
# Require a logfile for daemonization
|
||||
if @daemonize
|
||||
$stderr.puts "Daemonizing requires you specify a logfile."
|
||||
raise ArgumentError.new("daemonize is true, but no logfile is specified")
|
||||
end
|
||||
end
|
||||
end # def redirect_io
|
||||
|
||||
def grab_pidfile
|
||||
if @pidfile
|
||||
pidfile = File.open(@pidfile, IO::RDWR | IO::CREAT)
|
||||
gotlock = pidfile.flock(File::LOCK_EX | File::LOCK_NB)
|
||||
if !gotlock
|
||||
owner = pidfile.read()
|
||||
if owner.length == 0
|
||||
owner = "unknown"
|
||||
end
|
||||
$stderr.puts "Failed to get lock on #{@pidfile}; owned by #{owner}"
|
||||
raise LogStash::Program::PidFileLockFailed(@pidfile)
|
||||
end
|
||||
pidfile.truncate(0)
|
||||
pidfile.puts $$
|
||||
pidfile.flush
|
||||
@pidfile_fd = pidfile
|
||||
end
|
||||
end # def grab_pidfile
|
||||
|
||||
def daemonize
|
||||
fork and exit(0)
|
||||
Process.setsid
|
||||
|
||||
# Copied mostly from Daemons.daemonize, but since the ruby 1.8 'daemons'
|
||||
# and gem 'daemons' have api variances, let's do it ourselves since nobody
|
||||
# agrees.
|
||||
trap("SIGHUP", "IGNORE")
|
||||
ObjectSpace.each_object(IO) do |io|
|
||||
# closing STDIN is ok, but keep STDOUT and STDERR
|
||||
# close everything else
|
||||
next if [$stdout, $stdout].include?(io)
|
||||
begin
|
||||
unless io.closed?
|
||||
io.close
|
||||
end
|
||||
rescue ::Exception
|
||||
end
|
||||
end
|
||||
end # def daemonize
|
||||
|
||||
def dying
|
||||
@dying = true
|
||||
end
|
||||
|
||||
def dying?
|
||||
return @dying
|
||||
end
|
||||
end # class LogStash::Program
|
||||
end # class LogStash
|
|
@ -1,95 +0,0 @@
|
|||
require 'lib/config/agent'
|
||||
require 'lib/db/index'
|
||||
require 'lib/program'
|
||||
require 'grok'
|
||||
require 'set'
|
||||
require 'ap'
|
||||
require 'socket' # for Socket.gethostname
|
||||
require 'eventmachine'
|
||||
require 'eventmachine-tail'
|
||||
require 'em-http'
|
||||
|
||||
PROGRESS_AMOUNT = 500
|
||||
|
||||
class Reader < EventMachine::FileTail
|
||||
def initialize(path, agent)
|
||||
super(path)
|
||||
@agent = agent
|
||||
@buffer = BufferedTokenizer.new
|
||||
end
|
||||
|
||||
def receive_data(data)
|
||||
@buffer.extract(data).each do |line|
|
||||
@agent.process(path, line)
|
||||
end
|
||||
end # def receive_data
|
||||
end # class Reader
|
||||
|
||||
module LogStash; module Programs;
|
||||
class Agent < LogStash::Program
|
||||
public
|
||||
def initialize(options)
|
||||
super(options)
|
||||
@config = LogStash::Config::AgentConfig.new(options[:config])
|
||||
@config.merge!(options)
|
||||
@indexes = Hash.new { |h,k| h[k] = @config.logs[k] }
|
||||
|
||||
@hostname = Socket.gethostname
|
||||
@needs_flushing = Set.new
|
||||
@count = 0
|
||||
@start = Time.now
|
||||
end
|
||||
|
||||
public
|
||||
def run
|
||||
EventMachine.run do
|
||||
super
|
||||
setup_watches
|
||||
|
||||
EventMachine.add_periodic_timer(1) do
|
||||
flush_indexes
|
||||
end
|
||||
ap @options
|
||||
end # EventMachine.run
|
||||
end # def run
|
||||
|
||||
public
|
||||
def process(path, line)
|
||||
matched = false
|
||||
@config.logs.each do |name, log|
|
||||
begin
|
||||
entry = log.parse_entry(line)
|
||||
if entry
|
||||
entry["@SOURCE_FILE"] = path
|
||||
entry["@SOURCE_HOST"] = @hostname
|
||||
matched = true
|
||||
#ap entry
|
||||
index(name, entry)
|
||||
break
|
||||
end
|
||||
rescue LogStash::Log::LogParseError => e
|
||||
# ignore
|
||||
end
|
||||
end # @config.logs.each
|
||||
|
||||
if !matched
|
||||
puts "nomatch in #{path}: #{line}"
|
||||
end
|
||||
end # def process
|
||||
|
||||
private
|
||||
def publish(name, entry)
|
||||
# publish the entry
|
||||
end
|
||||
|
||||
private
|
||||
def setup_watches
|
||||
#handler = EventMachine::FileGlobWatchTail.new(Reader, self)
|
||||
@config.watched_paths.each do |path|
|
||||
$logger.info("Watching #{path}")
|
||||
EventMachine::FileGlobWatchTail.new(path, Reader, interval=60,
|
||||
exclude=[], agent=self)
|
||||
end
|
||||
end
|
||||
end # class Agent
|
||||
end; end # module LogStash::Programs
|
|
@ -2,11 +2,9 @@
|
|||
<span>
|
||||
<%= form :action => url(:controller => "search", :action => "query"),
|
||||
:method => :get do %>
|
||||
<%= text_field :name => "q", :label => "Query: ", :value => escape_html(params[:q]), :size => 100 %>
|
||||
<!--
|
||||
<%= text_field :name => "log_type", :label => "log type", :value => escape_html(params[:log_type]) %>
|
||||
<br>
|
||||
-->
|
||||
<%= text_field :name => "q", :label => "Query: ", :value => escape_html(params[:q]), :size => 100 %>
|
||||
<%= hidden_field :name => "graphperiod", :value => 60*60*24 %>
|
||||
<%= hidden_field :name => "graphstep", :value => 60*60 %>
|
||||
<%= submit "Search" %>
|
||||
<% end =%>
|
||||
</span>
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue