- Move to use ElasticSearch's REST api for indexing instead of our own custom mqrpc+ferret instance.

This commit is contained in:
Jordan Sissel 2010-02-19 07:47:10 +00:00
parent 44d8eb4143
commit 056a1d6920

View file

@ -1,5 +1,6 @@
require 'rubygems'
require 'ferret'
require 'net/http'
require 'lib/config/indexer.rb'
require 'lib/log/text'
require 'lib/net/messages/indexevent'
@ -11,7 +12,8 @@ require 'pp'
module LogStash; module Net; module Servers
class Parser < MQRPC::Agent
SYNCDELAY = 10
handle LogStash::Net::Messages::IndexEventRequest, :IndexEventRequestHandler
handle LogStash::Net::Messages::IndexEventResponse, :IndexEventResponseHandler
def initialize(configfile, logger)
@config = LogStash::Config::IndexerConfig.new(configfile)
@ -22,25 +24,6 @@ module LogStash; module Net; module Servers
@lines = Hash.new { |h,k| h[k] = 0 }
@indexcount = 0
@starttime = Time.now
#@indexerqueue = SizedQueue.new(MQRPC::Agent::MAXMESSAGEWAIT)
@indexerqueue = Queue.new
start_indexer_forwarder
end
def start_indexer_forwarder
Thread.new do
# TODO(sissel): If the queue exceeds a certain size,
# we should unsubscribe from the 'logstash' queue to indicate
# we cannot handle any more capacity at this time. Once the
# queue shrinks, we can re-subscribe.
while true do
request = @indexerqueue.pop
sendmsg("logstash-index", request)
end
end
end
def IndexEventRequestHandler(request)
@ -79,13 +62,17 @@ module LogStash; module Net; module Servers
entry["@LOG_TYPE"] = log_type
# Make a new ID for this request before we forward it.
request.generate_id!
#request.generate_id!
# Now we have a hash for the log data, send it to the indexer
request.log_data = entry
#request.log_data = entry
# Push our message onto the queue
@indexerqueue << request
#@indexerqueue << request
req = ::Net::HTTP::Post.new("/log/stash", initheader = {'Content-Type' =>'application/json'})
req.body = entry.to_json
response = ::Net::HTTP.new("localhost", 9200).start {|http| http.request(req) }
@logger.debug "Response #{response.code} #{response.message}"
end
def IndexEventResponseHandler(response)