From e9c314dee60688a68e2d122c90366b8f0e9ba1ee Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Fri, 11 Sep 2009 10:24:13 +0000 Subject: [PATCH] - Move from config.rb to config.yaml - logstashd now takes a config.yaml as the only argument. --- bin/agent.rb | 13 +++++---- bin/logstashd.rb | 46 +++++++++++++++++++------------ config.rb | 38 -------------------------- config.yaml | 37 +++++++++++++++++++++++++ lib/config/indexer.rb | 56 ++++++++++++++++++++++++++++++++++++++ lib/log.rb | 4 +-- lib/log/text.rb | 4 +-- lib/net/servers/indexer.rb | 41 +++++++++++++++------------- 8 files changed, 155 insertions(+), 84 deletions(-) delete mode 100644 config.rb create mode 100644 config.yaml create mode 100644 lib/config/indexer.rb diff --git a/bin/agent.rb b/bin/agent.rb index 102ba4103..8eee3cdc1 100755 --- a/bin/agent.rb +++ b/bin/agent.rb @@ -17,12 +17,13 @@ class Agent < LogStash::Net::MessageClient end # def initialize def start_log_watcher - #@t1 = Thread.new do + @t1 = Thread.new do #File::Tail::Since.new("/b/logs/auth.log.scorn").tail do |line| - #line.chomp! - #index("linux-syslog", line) - #end - #end + File.open("/b/logs/auth.log.scorn").each do |line| + line.chomp! + index("linux-syslog", line) + end + end @t2 = Thread.new do count = 0 @@ -32,7 +33,7 @@ class Agent < LogStash::Net::MessageClient count += 1 index("httpd-access", line) puts count - break if count >= 10 + #break if count >= 10 end sendmsg("/queue/logstash", LogStash::Net::Messages::QuitRequest.new) end diff --git a/bin/logstashd.rb b/bin/logstashd.rb index 38c59ec60..a92b86865 100644 --- a/bin/logstashd.rb +++ b/bin/logstashd.rb @@ -3,24 +3,36 @@ require 'rubygems' require 'lib/net/servers/indexer' -if ENV.has_key?("PROFILE") - require 'ruby-prof' - RubyProf.start - #class String - #alias_method :orig_scan, :scan - #def scan(*args) - ##raise - #return orig_scan(*args) +def main(args) + if ENV.has_key?("PROFILE") + require 'ruby-prof' + RubyProf.start + + #class String + #alias_method :orig_scan, :scan + #def scan(*args) + ##raise + #return orig_scan(*args) + #end #end - #end -end -Thread::abort_on_exception = true -s = LogStash::Net::Servers::Indexer.new(username='', password='', host="localhost") -s.run + end -if ENV.has_key?("PROFILE") - result = RubyProf.stop - printer = RubyProf::FlatPrinter.new(result) - printer.print(STDOUT, 0) + if args.length != 1 + puts "Usage: #{$0} configfile" + return 1 + end + Thread::abort_on_exception = true + s = LogStash::Net::Servers::Indexer.new(args[0]) + s.run + + if ENV.has_key?("PROFILE") + result = RubyProf.stop + printer = RubyProf::FlatPrinter.new(result) + printer.print(STDOUT, 0) + end + + return 0 end + +exit main(ARGV) diff --git a/config.rb b/config.rb deleted file mode 100644 index d165e2e41..000000000 --- a/config.rb +++ /dev/null @@ -1,38 +0,0 @@ -require 'lib/logs' -require 'lib/log/json' -require 'lib/log/text' - -include LogStash - -$logs = Logs.new - -# === define & register your logs below here -log = Log::TextLog.new({:type => "httpd-access", - :grok_patterns => ["%{COMBINEDAPACHELOG}"], - :date_key => "timestamp", - :date_format => "%d/%b/%Y:%H:%M:%S %Z", -}) -$logs.register log - -log = Log::JsonLog.new({:type => "glu", - :date_key => "timestamp", - :date_format => "%Y-%m-%dT%H:%M:%S", - :line_format => "<%= entry['timestamp'] %> | <%= entry['level'] %> | <%= entry['context/sessionKey'] %> | <%= entry['sourceHostName'] %> | <%= entry['context/componentName'] %> | <%= entry['message'] %>", -}) -$logs.register log - -log = Log::TextLog.new({:type => "netscreen", - :grok_patterns => ["%{NETSCREENSESSIONLOG}"], - :date_key => "date", - :date_format => "%b %e %H:%M:%S", -}) -$logs.register log - -log = Log::TextLog.new({:type => "linux-syslog", - :grok_patterns => ["%{SYSLOGPAMSESSION}", - "%{SYSLOGLINE}", - ], - :date_key => "date", - :date_format => "%b %e %H:%M:%S", -}) -$logs.register log diff --git a/config.yaml b/config.yaml new file mode 100644 index 000000000..18cc4b332 --- /dev/null +++ b/config.yaml @@ -0,0 +1,37 @@ +--- +stompserver: localhost:61613 +logstash_dir: /var/logstash +#logstash_dir: /home/jls/projects/logstash + +log-types: + httpd-access: + type: text + patterns: + - %{COMBINEDAPACHELOG} + date: + key: timestamp + format: %d/%b/%Y:%H:%M:%S %Z + + glu: + type: json + date: + key: timestamp + format: %Y-%m-%dT%H:%M:%S + display_format: "<%= entry['timestamp'] %> | <%= entry['level'] %> | <%= entry['context/sessionKey'] %> | <%= entry['sourceHostName'] %> | <%= entry['context/componentName'] %> | <%= entry['message'] %>" + + linux-syslog: + type: text + date: + key: date + format: %b %e %H:%M:%S + patterns: + - %{SYSLOGPAMSESSION} + - %{SYSLOGLINE} + + netscreen: + type: text + patterns: + - %{NETSCREENSESSIONLOG} + date: + key: date + format: %b %e %H:%M:%S diff --git a/lib/config/indexer.rb b/lib/config/indexer.rb new file mode 100644 index 000000000..ab7cb948a --- /dev/null +++ b/lib/config/indexer.rb @@ -0,0 +1,56 @@ +require 'yaml' +require 'lib/logs' +require 'lib/log/json' +require 'lib/log/text' + + + +module LogStash::Config + class IndexerConfig + attr_reader :logs + attr_reader :logstash_dir + + def initialize(file) + obj = YAML::load(File.open(file).read()) + + @stompserver = obj["stompserver"] + @logstash_dir = obj["logstash_dir"] + @logs = LogStash::Logs.new + + if @stompserver == nil + raise ArgumentError.new("stompserver is nil (#{file})") + end + + obj["log-types"].each do |log_type, data| + log = nil + #puts ":: #{log_type}" + case data["type"] + when "text" + + log = LogStash::Log::TextLog.new(:type => log_type, + :grok_patterns => data["patterns"], + :date_key => data["date"]["key"], + :date_format => data["date"]["format"], + :logstash_dir => @logstash_dir) + when "json" + log = LogStash::Log::JsonLog.new(:type => log_type, + :line_format => data["display_format"], + :date_key => data["date"]["key"], + :date_format => data["date"]["format"], + :logstash_dir => @logstash_dir) + end + + @logs.register(log) + end + end + + def stomphost + return @stompserver.split(":")[0] + end + + def stompport + port = @stompserver.split(":")[1].to_i + return (port == 0 ? 61613 : port) + end + end # class IndexerConfig +end # module LogStash::Config diff --git a/lib/log.rb b/lib/log.rb index 1df7eb701..cb9b69b31 100644 --- a/lib/log.rb +++ b/lib/log.rb @@ -11,13 +11,13 @@ module LogStash class Log REQUIRED_KEYS = [:type, :encoding] - OPTIONAL_KEYS = [:attrs, :date_key, :date_format] + OPTIONAL_KEYS = [:attrs, :date_key, :date_format, :logstash_dir] attr_accessor :attrs def initialize(config) check_hash_keys(config, REQUIRED_KEYS, OPTIONAL_KEYS) - @home = ENV["LOGSTASH_DIR"] || "/opt/logstash" + @home = config[:logstash_dir] || ENV["LOGSTASH_DIR"] || "/opt/logstash" @attrs = {"log:type" => config[:type], "log:encoding" => config[:encoding]} if config[:attrs] diff --git a/lib/log/text.rb b/lib/log/text.rb index b540ffb38..61ec45093 100644 --- a/lib/log/text.rb +++ b/lib/log/text.rb @@ -22,7 +22,8 @@ module LogStash end @grok_patterns = config.delete(:grok_patterns) - @home = ENV["LOGSTASH_DIR"] || "/opt/logstash" + + super(config) if not File.exists?("#{@home}/patterns") throw StandardError.new("#{@home}/patterns/ does not exist") @@ -43,7 +44,6 @@ module LogStash @groks << grok end - super(config) end def parse_entry(raw_entry) diff --git a/lib/net/servers/indexer.rb b/lib/net/servers/indexer.rb index 8607c745e..2a85bafe9 100644 --- a/lib/net/servers/indexer.rb +++ b/lib/net/servers/indexer.rb @@ -7,17 +7,21 @@ require 'lib/net/messages/search' require 'lib/net/messages/searchhits' require 'lib/net/messages/quit' require 'lib/net/messages/ping' +require 'lib/config/indexer.rb' require 'ferret' require 'lib/log/text' -require 'config' module LogStash; module Net; module Servers class Indexer < LogStash::Net::MessageServer SYNCDELAY = 3 - def initialize(*args) + def initialize(configfile) + #def initialize(*args) # 'super' is not the same as 'super()', and we want super(). - super(*args) + @config = LogStash::Config::IndexerConfig.new(configfile) + super(username="", password="", + host="localhost", port=61613) + #host=@config.stomphost, port=@config.stompport) @indexes = Hash.new @lines = Hash.new { |h,k| h[k] = 0 } @indexcount = 0 @@ -34,13 +38,13 @@ module LogStash; module Net; module Servers response.id = request.id @indexcount += 1 - if @indexcount % 10 == 0 + if @indexcount % 100 == 0 duration = (Time.now.to_f - @starttime.to_f) - puts "%.2f" % (@indexcount / duration) + puts "rate: %.2f/sec" % (@indexcount / duration) end log_type = request.log_type - entry = $logs[log_type].parse_entry(request.log_data) + entry = @config.logs[log_type].parse_entry(request.log_data) if !entry response.code = 1 response.error = "Entry was #{entry.inspect} (log parsing failed)" @@ -53,13 +57,15 @@ module LogStash; module Net; module Servers end if not @indexes.member?(log_type) - @indexes[log_type] = $logs[log_type].get_index + @indexes[log_type] = @config.logs[log_type].get_index end entry["@LOG_TYPE"] = log_type + #puts entry.inspect @indexes[log_type] << entry - if response.code != 0 + # only dump a response if there was an error. + if response.success? yield response end end @@ -76,15 +82,15 @@ module LogStash; module Net; module Servers response = LogStash::Net::Messages::SearchResponse.new response.id = request.id - if $logs[request.log_type].nil? + if @config.logs[request.log_type].nil? puts "invalid log type: #{request.log_type}" response.results = [] response.finished = true - puts response.inspect yield response return end - reader = Ferret::Index::IndexReader.new($logs[request.log_type].index_dir) + + reader = Ferret::Index::IndexReader.new(@config.logs[request.log_type].index_dir) search = Ferret::Search::Searcher.new(reader) qp = Ferret::QueryParser.new(:fields => reader.fields, :tokenized_fields => reader.tokenized_fields, @@ -95,9 +101,6 @@ module LogStash; module Net; module Servers max_limit = 50 results = [] limit = max_limit - # TODO(sissel): We need a way to say 'flush now' because this - # method will batch search results due to the likely efficiency - # in searching for batches of results. done = false while !done @@ -134,21 +137,21 @@ module LogStash; module Net; module Servers end response.results = [] response.finished = true - puts response.inspect yield response end # def SearchRequestHandler def SearchHitsRequestHandler(request) - puts "Search for #{request.query.inspect}" + puts "Search for hits on #{request.query.inspect}" response = LogStash::Net::Messages::SearchHitsResponse.new response.id = request.id - if $logs[request.log_type].nil? + if @config.logs[request.log_type].nil? puts "invalid log type: #{request.log_type}" response.hits = 0 yield response return end - reader = Ferret::Index::IndexReader.new($logs[request.log_type].index_dir) + + reader = Ferret::Index::IndexReader.new(@config.logs[request.log_type].index_dir) search = Ferret::Search::Searcher.new(reader) qp = Ferret::QueryParser.new(:fields => reader.fields, :tokenized_fields => reader.tokenized_fields, @@ -174,7 +177,7 @@ module LogStash; module Net; module Servers synctime = Time.now + SYNCDELAY loop do if Time.now > synctime - @indexes.each do |log_type,index| + @indexes.each do |log_type, index| puts "Time's up. Syncing #{log_type}" index.commit end