mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
- Start working on outputs/elasticsearch jruby transition. Now uses gem
'jruby-elasticsearch' which is uses the native java ES API.
This commit is contained in:
parent
3e5441a54b
commit
1e4f70522b
2 changed files with 89 additions and 60 deletions
|
@ -1,8 +1,15 @@
|
|||
require "em-http-request"
|
||||
require "logstash/namespace"
|
||||
require "logstash/outputs/amqp"
|
||||
require "logstash/outputs/base"
|
||||
|
||||
# TODO(sissel): find a better way of declaring where the elasticsearch
|
||||
# libraries are
|
||||
Dir["/home/jls/build/elasticsearch-0.15.0//lib/*.jar"].each do |jar|
|
||||
require jar
|
||||
end
|
||||
|
||||
require "jruby-elasticsearch"
|
||||
|
||||
|
||||
class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
|
||||
|
||||
# http://host/index/type
|
||||
|
@ -15,68 +22,21 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
|
|||
public
|
||||
def register
|
||||
@pending = []
|
||||
# Port?
|
||||
# Authentication?
|
||||
@esurl = @url.clone
|
||||
@esurl.scheme = "http"
|
||||
@esurl.path = "/" + @url.path.split("/")[1]
|
||||
defaults = {"method" => "http"}
|
||||
params = defaults.merge(@urlopts)
|
||||
|
||||
# Describe this index to elasticsearch
|
||||
indexmap = {
|
||||
# The name of the index
|
||||
"settings" => {
|
||||
@url.path.split("/")[-1] => {
|
||||
"mappings" => {
|
||||
"@source" => { "type" => "string" },
|
||||
"@source_host" => { "type" => "string" },
|
||||
"@source_path" => { "type" => "string" },
|
||||
"@timestamp" => { "type" => "date" },
|
||||
"@tags" => { "type" => "string" },
|
||||
"@message" => { "type" => "string" },
|
||||
|
||||
# TODO(sissel): Hack for now until this bug is resolved:
|
||||
# https://github.com/elasticsearch/elasticsearch/issues/issue/604
|
||||
"@fields" => {
|
||||
"type" => "object",
|
||||
"properties" => {
|
||||
"HOSTNAME" => { "type" => "string" },
|
||||
},
|
||||
}, # "@fields"
|
||||
}, # "properties"
|
||||
}, # index map for this index type.
|
||||
}, # "settings"
|
||||
} # ES Index
|
||||
|
||||
#puts :waiting
|
||||
puts @esurl.to_s
|
||||
#sleep 10
|
||||
indexurl = @esurl.to_s
|
||||
indexmap_http = EventMachine::HttpRequest.new(indexurl)
|
||||
indexmap_req = indexmap_http.put :body => indexmap.to_json
|
||||
indexmap_req.callback do
|
||||
@logger.info(["Done configuring index", indexurl, indexmap])
|
||||
ready(params)
|
||||
end
|
||||
indexmap_req.errback do
|
||||
@logger.warn(["Failure configuring index (http failed to connect?)",
|
||||
@esurl.to_s, indexmap])
|
||||
@logger.warn([indexmap_req])
|
||||
#sleep 30
|
||||
raise "Failure configuring index: #{@esurl.to_s}"
|
||||
|
||||
end
|
||||
@callback = self.method(:receive_native)
|
||||
# TODO(sissel): host/port? etc?
|
||||
@client = ElasticSearch::Client.new
|
||||
end # def register
|
||||
|
||||
# TODO(sissel): Needs migration to jrubyland
|
||||
public
|
||||
def ready(params)
|
||||
case params["method"]
|
||||
when "http"
|
||||
@logger.debug "ElasticSearch using http with URL #{@url.to_s}"
|
||||
@http = EventMachine::HttpRequest.new(@url.to_s)
|
||||
#@http = EventMachine::HttpRequest.new(@url.to_s)
|
||||
@callback = self.method(:receive_http)
|
||||
when "river"
|
||||
require "logstash/outputs/amqp"
|
||||
params["port"] ||= 5672
|
||||
auth = "#{params["user"] or "guest"}:#{params["pass"] or "guest"}"
|
||||
mq_url = URI::parse("amqp://#{auth}@#{params["host"]}:#{params["port"]}/queue/#{params["queue"]}?durable=1")
|
||||
|
@ -100,7 +60,7 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
|
|||
},
|
||||
}
|
||||
@logger.debug(["ElasticSearch using river", river_config])
|
||||
http_setup = EventMachine::HttpRequest.new(em_url.to_s)
|
||||
#http_setup = EventMachine::HttpRequest.new(em_url.to_s)
|
||||
req = http_setup.put :body => river_config.to_json
|
||||
req.errback do
|
||||
@logger.warn "Error setting up river: #{req.response}"
|
||||
|
@ -147,6 +107,22 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
|
|||
end
|
||||
end # def receive_http
|
||||
|
||||
public
|
||||
def receive_native(event)
|
||||
index = event.sprintf(@index)
|
||||
type = event.sprintf(@type)
|
||||
# TODO(sissel): allow specifying the ID?
|
||||
# The document ID is how elasticsearch determines sharding hash, so it can
|
||||
# help performance if we allow folks to specify a specific ID.
|
||||
req = @client.index(index, type, event.to_hash)
|
||||
req.on(:success) do |response|
|
||||
@logger.info(["Successfully indexed", event.to_hash])
|
||||
end.on(:failure) do |exception|
|
||||
@logger.error(["Failed to index an event", exception, event.to_hash])
|
||||
end
|
||||
req.execute
|
||||
end # def receive_native
|
||||
|
||||
public
|
||||
def receive_river(event)
|
||||
# bulk format; see http://www.elasticsearch.com/docs/elasticsearch/river/rabbitmq/
|
||||
|
@ -155,4 +131,52 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
|
|||
index_message += event.to_hash.to_json + "\n"
|
||||
@mq.receive_raw(index_message)
|
||||
end # def receive_river
|
||||
|
||||
private
|
||||
def old_create_index
|
||||
# TODO(sissel): this is leftover from the old eventmachine days
|
||||
# make sure we don't need it, or, convert it.
|
||||
|
||||
# Describe this index to elasticsearch
|
||||
indexmap = {
|
||||
# The name of the index
|
||||
"settings" => {
|
||||
@url.path.split("/")[-1] => {
|
||||
"mappings" => {
|
||||
"@source" => { "type" => "string" },
|
||||
"@source_host" => { "type" => "string" },
|
||||
"@source_path" => { "type" => "string" },
|
||||
"@timestamp" => { "type" => "date" },
|
||||
"@tags" => { "type" => "string" },
|
||||
"@message" => { "type" => "string" },
|
||||
|
||||
# TODO(sissel): Hack for now until this bug is resolved:
|
||||
# https://github.com/elasticsearch/elasticsearch/issues/issue/604
|
||||
"@fields" => {
|
||||
"type" => "object",
|
||||
"properties" => {
|
||||
"HOSTNAME" => { "type" => "string" },
|
||||
},
|
||||
}, # "@fields"
|
||||
}, # "properties"
|
||||
}, # index map for this index type.
|
||||
}, # "settings"
|
||||
} # ES Index
|
||||
|
||||
#indexurl = @esurl.to_s
|
||||
#indexmap_http = EventMachine::HttpRequest.new(indexurl)
|
||||
#indexmap_req = indexmap_http.put :body => indexmap.to_json
|
||||
#indexmap_req.callback do
|
||||
#@logger.info(["Done configuring index", indexurl, indexmap])
|
||||
#ready(params)
|
||||
#end
|
||||
#indexmap_req.errback do
|
||||
#@logger.warn(["Failure configuring index (http failed to connect?)",
|
||||
#@esurl.to_s, indexmap])
|
||||
#@logger.warn([indexmap_req])
|
||||
##sleep 30
|
||||
#raise "Failure configuring index: #{@esurl.to_s}"
|
||||
#
|
||||
#end
|
||||
end # def old_create_index
|
||||
end # class LogStash::Outputs::Elasticsearch
|
||||
|
|
|
@ -13,22 +13,27 @@ Gem::Specification.new do |spec|
|
|||
spec.description = "scalable log and event management (search, archive, pipeline)"
|
||||
spec.license = "Apache License (2.0)"
|
||||
|
||||
spec.add_dependency("eventmachine-tail")
|
||||
#spec.add_dependency("eventmachine-tail") # TODO(sissel): remove, not for jruby
|
||||
spec.add_dependency("json")
|
||||
|
||||
# New for our JRuby stuff
|
||||
spec.add_dependency("file-tail")
|
||||
spec.add_dependency("jruby-elasticsearch", ">= 0.0.2")
|
||||
|
||||
#spec.add_dependency("awesome_print")
|
||||
|
||||
# For http requests (elasticsearch, etc)
|
||||
spec.add_dependency("em-http-request")
|
||||
#spec.add_dependency("em-http-request") # TODO(sissel): remove, not for jruby
|
||||
|
||||
# For the 'grok' filter
|
||||
#spec.add_dependency("jls-grok", ">= 0.3.3209")
|
||||
|
||||
# TODO: In the future, make these optional
|
||||
# for websocket://
|
||||
spec.add_dependency("em-websocket")
|
||||
#spec.add_dependency("em-websocket") # TODO(sissel): remove, not for jruby
|
||||
|
||||
# For amqp://
|
||||
#spec.add_dependency("amqp", "~> 0.6.5") # amqp 0.7.0 is incompatible for now.
|
||||
#spec.add_dependency("amqp", "~> 0.6.5") # TODO(sissel): remove, not for jruby
|
||||
spec.add_dependency("bunny")
|
||||
spec.add_dependency("uuidtools")
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue