mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
- Split indexer and parsers
This commit is contained in:
parent
a592c6fa1a
commit
9b4e10e757
2 changed files with 90 additions and 22 deletions
|
@ -14,7 +14,7 @@ require 'pp'
|
|||
|
||||
module LogStash; module Net; module Servers
|
||||
class Indexer < LogStash::Net::MessageServer
|
||||
SYNCDELAY = 3
|
||||
SYNCDELAY = 10
|
||||
|
||||
def initialize(configfile)
|
||||
#def initialize(*args)
|
||||
|
@ -45,30 +45,12 @@ module LogStash; module Net; module Servers
|
|||
end
|
||||
|
||||
log_type = request.log_type
|
||||
entry = @config.logs[log_type].parse_entry(request.log_data)
|
||||
#pp entry
|
||||
if !entry
|
||||
response.code = 1
|
||||
response.error = "Entry was #{entry.inspect} (log parsing failed)"
|
||||
entry = {
|
||||
"@NEEDSPARSING" => 1,
|
||||
"@LINE" => request.log_data
|
||||
}
|
||||
else
|
||||
response.code = 0
|
||||
end
|
||||
|
||||
if not @indexes.member?(log_type)
|
||||
@indexes[log_type] = @config.logs[log_type].get_index
|
||||
end
|
||||
|
||||
entry["@LOG_TYPE"] = log_type
|
||||
@indexes[log_type] << entry
|
||||
|
||||
# only dump a response if there was an error.
|
||||
if response.success?
|
||||
yield response
|
||||
end
|
||||
@indexes[log_type] << request.log_data
|
||||
end
|
||||
|
||||
def PingRequestHandler(request)
|
||||
|
@ -170,7 +152,7 @@ module LogStash; module Net; module Servers
|
|||
|
||||
# Special 'run' override because we want sync to disk once per minute.
|
||||
def run
|
||||
subscribe("logstash")
|
||||
subscribe("logstash-index")
|
||||
@syncer = Thread.new { syncer }
|
||||
super
|
||||
end # def run
|
||||
|
@ -181,7 +163,8 @@ module LogStash; module Net; module Servers
|
|||
if Time.now > synctime
|
||||
@indexes.each do |log_type, index|
|
||||
puts "Time's up. Syncing #{log_type}"
|
||||
index.commit
|
||||
index.flush
|
||||
break;
|
||||
end
|
||||
|
||||
synctime = Time.now + SYNCDELAY
|
||||
|
|
85
lib/net/servers/parser.rb
Normal file
85
lib/net/servers/parser.rb
Normal file
|
@ -0,0 +1,85 @@
|
|||
|
||||
require 'rubygems'
|
||||
require 'lib/net/server'
|
||||
require 'lib/net/message'
|
||||
require 'lib/net/messages/indexevent'
|
||||
require 'lib/net/messages/search'
|
||||
require 'lib/net/messages/searchhits'
|
||||
require 'lib/net/messages/quit'
|
||||
require 'lib/net/messages/ping'
|
||||
require 'lib/config/indexer.rb'
|
||||
require 'ferret'
|
||||
require 'lib/log/text'
|
||||
require 'pp'
|
||||
|
||||
module LogStash; module Net; module Servers
|
||||
class Parser < LogStash::Net::MessageServer
|
||||
SYNCDELAY = 10
|
||||
|
||||
def initialize(configfile)
|
||||
#def initialize(*args)
|
||||
# 'super' is not the same as 'super()', and we want super().
|
||||
@config = LogStash::Config::IndexerConfig.new(configfile)
|
||||
super(username="", password="",
|
||||
host="localhost", port=61613)
|
||||
#host=@config.stomphost, port=@config.stompport)
|
||||
@indexes = Hash.new
|
||||
@lines = Hash.new { |h,k| h[k] = 0 }
|
||||
@indexcount = 0
|
||||
@starttime = Time.now
|
||||
end
|
||||
|
||||
def QuitRequestHandler(request)
|
||||
puts "Got quit message, exiting..."
|
||||
close
|
||||
end
|
||||
|
||||
def IndexEventRequestHandler(request)
|
||||
response = LogStash::Net::Messages::IndexEventResponse.new
|
||||
response.id = request.id
|
||||
@indexcount += 1
|
||||
|
||||
if @indexcount % 100 == 0
|
||||
duration = (Time.now.to_f - @starttime.to_f)
|
||||
puts "rate: %.2f/sec" % (@indexcount / duration)
|
||||
end
|
||||
|
||||
log_type = request.log_type
|
||||
entry = @config.logs[log_type].parse_entry(request.log_data)
|
||||
if !entry
|
||||
response.code = 1
|
||||
response.error = "Entry was #{entry.inspect} (log parsing failed)"
|
||||
entry = {
|
||||
"@NEEDSPARSING" => 1,
|
||||
"@LINE" => request.log_data
|
||||
}
|
||||
else
|
||||
response.code = 0
|
||||
end
|
||||
|
||||
if not @indexes.member?(log_type)
|
||||
@indexes[log_type] = @config.logs[log_type].get_index
|
||||
end
|
||||
|
||||
entry["@LOG_TYPE"] = log_type
|
||||
|
||||
# Now we have a hash for the log data, send it to the indexer
|
||||
request.log_data = entry
|
||||
sendmsg("/queue/logstash-index", request)
|
||||
#@indexes[log_type] << entry
|
||||
end
|
||||
|
||||
def PingRequestHandler(request)
|
||||
response = LogStash::Net::Messages::PingResponse.new
|
||||
response.id = request.id
|
||||
response.pingdata = request.pingdata
|
||||
yield response
|
||||
end
|
||||
|
||||
# Special 'run' override because we want sync to disk once per minute.
|
||||
def run
|
||||
subscribe("logstash")
|
||||
super
|
||||
end # def run
|
||||
end # Parser
|
||||
end; end; end # LogStash::Net::Server
|
Loading…
Add table
Add a link
Reference in a new issue