diff --git a/lib/net/servers/indexer.rb b/lib/net/servers/indexer.rb index 7723ba29c..d2fde454b 100644 --- a/lib/net/servers/indexer.rb +++ b/lib/net/servers/indexer.rb @@ -14,7 +14,7 @@ require 'pp' module LogStash; module Net; module Servers class Indexer < LogStash::Net::MessageServer - SYNCDELAY = 3 + SYNCDELAY = 10 def initialize(configfile) #def initialize(*args) @@ -45,30 +45,12 @@ module LogStash; module Net; module Servers end log_type = request.log_type - entry = @config.logs[log_type].parse_entry(request.log_data) - #pp entry - if !entry - response.code = 1 - response.error = "Entry was #{entry.inspect} (log parsing failed)" - entry = { - "@NEEDSPARSING" => 1, - "@LINE" => request.log_data - } - else - response.code = 0 - end if not @indexes.member?(log_type) @indexes[log_type] = @config.logs[log_type].get_index end - entry["@LOG_TYPE"] = log_type - @indexes[log_type] << entry - - # only dump a response if there was an error. - if response.success? - yield response - end + @indexes[log_type] << request.log_data end def PingRequestHandler(request) @@ -170,7 +152,7 @@ module LogStash; module Net; module Servers # Special 'run' override because we want sync to disk once per minute. def run - subscribe("logstash") + subscribe("logstash-index") @syncer = Thread.new { syncer } super end # def run @@ -181,7 +163,8 @@ module LogStash; module Net; module Servers if Time.now > synctime @indexes.each do |log_type, index| puts "Time's up. Syncing #{log_type}" - index.commit + index.flush + break; end synctime = Time.now + SYNCDELAY diff --git a/lib/net/servers/parser.rb b/lib/net/servers/parser.rb new file mode 100644 index 000000000..17a9a176f --- /dev/null +++ b/lib/net/servers/parser.rb @@ -0,0 +1,85 @@ + +require 'rubygems' +require 'lib/net/server' +require 'lib/net/message' +require 'lib/net/messages/indexevent' +require 'lib/net/messages/search' +require 'lib/net/messages/searchhits' +require 'lib/net/messages/quit' +require 'lib/net/messages/ping' +require 'lib/config/indexer.rb' +require 'ferret' +require 'lib/log/text' +require 'pp' + +module LogStash; module Net; module Servers + class Parser < LogStash::Net::MessageServer + SYNCDELAY = 10 + + def initialize(configfile) + #def initialize(*args) + # 'super' is not the same as 'super()', and we want super(). + @config = LogStash::Config::IndexerConfig.new(configfile) + super(username="", password="", + host="localhost", port=61613) + #host=@config.stomphost, port=@config.stompport) + @indexes = Hash.new + @lines = Hash.new { |h,k| h[k] = 0 } + @indexcount = 0 + @starttime = Time.now + end + + def QuitRequestHandler(request) + puts "Got quit message, exiting..." + close + end + + def IndexEventRequestHandler(request) + response = LogStash::Net::Messages::IndexEventResponse.new + response.id = request.id + @indexcount += 1 + + if @indexcount % 100 == 0 + duration = (Time.now.to_f - @starttime.to_f) + puts "rate: %.2f/sec" % (@indexcount / duration) + end + + log_type = request.log_type + entry = @config.logs[log_type].parse_entry(request.log_data) + if !entry + response.code = 1 + response.error = "Entry was #{entry.inspect} (log parsing failed)" + entry = { + "@NEEDSPARSING" => 1, + "@LINE" => request.log_data + } + else + response.code = 0 + end + + if not @indexes.member?(log_type) + @indexes[log_type] = @config.logs[log_type].get_index + end + + entry["@LOG_TYPE"] = log_type + + # Now we have a hash for the log data, send it to the indexer + request.log_data = entry + sendmsg("/queue/logstash-index", request) + #@indexes[log_type] << entry + end + + def PingRequestHandler(request) + response = LogStash::Net::Messages::PingResponse.new + response.id = request.id + response.pingdata = request.pingdata + yield response + end + + # Special 'run' override because we want sync to disk once per minute. + def run + subscribe("logstash") + super + end # def run + end # Parser +end; end; end # LogStash::Net::Server