- use elasticsearch instead of an indexer

This commit is contained in:
Jordan Sissel 2010-03-04 06:33:59 +00:00
parent d84f742554
commit e020e8487e

View file

@ -1,6 +1,9 @@
require 'rubygems'
require 'ferret'
require 'net/http'
#require 'net/http'
#require 'curb'
require 'em-http'
require 'lib/config/indexer.rb'
require 'lib/log/text'
require 'lib/net/messages/indexevent'
@ -10,6 +13,35 @@ require 'lib/net/messages/searchhits'
require 'mqrpc'
require 'pp'
class EventMachine::Protocols::HttpClient2::Request
def __send_request
az = @args[:authorization] and az = "Authorization: #{az}\r\n"
is_post = (@args[:post_body] != nil)
r = [
"#{@args[:verb]} #{@args[:uri]} HTTP/#{@args[:version] || "1.1"}",
"Host: #{@args[:host_header] || "_"}",
]
r << az if az
if is_post
r << "Content-Length: #{@args[:post_body].length}"
r << "Content-Type: application/json"
end
# end of request
r << ""
# Put post body
if is_post
r << @args[:post_body]
end
@conn.send_data r.join("\r\n")
end
end
module LogStash; module Net; module Servers
class Parser < MQRPC::Agent
handle LogStash::Net::Messages::IndexEventRequest, :IndexEventRequestHandler
@ -24,6 +56,9 @@ module LogStash; module Net; module Servers
@lines = Hash.new { |h,k| h[k] = 0 }
@indexcount = 0
@starttime = Time.now
#@elasticsearchconn = EventMachine::Protocols::HttpClient2.connect("127.0.0.1", 9200)
@elasticsearchconn = \
EventMachine::HttpRequest.new("http://127.0.0.1:9200/log/stash")
end
def IndexEventRequestHandler(request)
@ -66,13 +101,30 @@ module LogStash; module Net; module Servers
# Now we have a hash for the log data, send it to the indexer
#request.log_data = entry
#
start = Time.now
req = @elasticsearchconn.post({
#:uri => "/log/stash",
#:post_body => entry.to_json,
:body => entry.to_json,
})
req.callback do |response|
#@logger.debug "Duration f" + response.response_header.status
@logger.debug "Duration: #{Time.now - start}"
end
# Push our message onto the queue
#@indexerqueue << request
req = ::Net::HTTP::Post.new("/log/stash", initheader = {'Content-Type' =>'application/json'})
req.body = entry.to_json
response = ::Net::HTTP.new("localhost", 9200).start {|http| http.request(req) }
@logger.debug "Response #{response.code} #{response.message}"
#curl = Curl::Easy.new("http://localhost:9200/log/stash")
#curl.headers["Content-Type"] = "application/json"
#curl.post_body = entry.to_json
#s = Time.now
#@logger.debug "Starting index request"
#curl.perform
#@logger.debug "Response: #{curl.response_code} #{curl.body_str}"
#@logger.debug "Duration: #{Time.now - s}"
#@logger.debug "Duration: " + curl.methods.grep(/_time$/).sort.collect { |a| [a, curl.send(a)] }.join(", ")
end
def IndexEventResponseHandler(response)