Merge branch 'pyrho-master'

This commit is contained in:
Jordan Sissel 2012-12-27 22:23:10 -08:00
commit 13aa2b574f
5 changed files with 33 additions and 21 deletions

View file

@ -23,8 +23,8 @@
- new: anonymize: supports many hash mechanisms (murmur3, sha, md5, etc) as - new: anonymize: supports many hash mechanisms (murmur3, sha, md5, etc) as
well as IP address anonymization (#280, #261; patches by Richard Pijnenburg well as IP address anonymization (#280, #261; patches by Richard Pijnenburg
and Avishai Ish-Shalom) and Avishai Ish-Shalom)
- filter: date: now accepts 'match' as a setting. Use of this is preferable - feature: date: now accepts 'match' as a setting. Use of this is preferable
to the old syntax. to the old syntax. (#248, LOGSTASH-734, Patch by Louis Zuckerman)
- improvement: grok: now accepts (?<foo>...) named captures. This lets you - improvement: grok: now accepts (?<foo>...) named captures. This lets you
compose a pattern in the grok config without needing to define it in a compose a pattern in the grok config without needing to define it in a
patterns file. Example: (?<hostport>%{HOST}:%{POSINT}) to capture 'hostport' patterns file. Example: (?<hostport>%{HOST}:%{POSINT}) to capture 'hostport'

View file

@ -207,12 +207,14 @@ class LogStash::Filters::Mutate < LogStash::Filters::Base
def replace(event) def replace(event)
# TODO(sissel): use event.sprintf on the field names? # TODO(sissel): use event.sprintf on the field names?
@replace.each do |field, newvalue| @replace.each do |field, newvalue|
next unless event.include?(field)
event[field] = event.sprintf(newvalue) event[field] = event.sprintf(newvalue)
end end
end # def replace end # def replace
def convert(event) def convert(event)
@convert.each do |field, type| @convert.each do |field, type|
next unless event.include?(field)
original = event[field] original = event[field]
# calls convert_{string,integer,float} depending on type requested. # calls convert_{string,integer,float} depending on type requested.

View file

@ -37,8 +37,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# similar events to the same 'type'. String expansion '%{foo}' works here. # similar events to the same 'type'. String expansion '%{foo}' works here.
config :index_type, :validate => :string, :default => "%{@type}" config :index_type, :validate => :string, :default => "%{@type}"
# The document ID for the index. Overwrites any existing entry in elasticsearch with the same ID. # The document ID for the index. Useful for overwriting existing entries in
config :id, :validate => :string, :default => nil # elasticsearch with the same ID.
config :document_id, :validate => :string, :default => nil
# The name of your cluster if you set it on the ElasticSearch side. Useful # The name of your cluster if you set it on the ElasticSearch side. Useful
# for discovery. # for discovery.
@ -163,11 +164,11 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
end end
end end
if id.nil? if @document_id.nil?
req = @client.index(index, type, event.to_hash) req = @client.index(index, type, event.to_hash)
else else
id = event.sprintf(@id) id = event.sprintf(@document_id)
req = @client.index(index, type, id, event.to_hash) req = @client.index(index, type, id, event.to_hash)
end end
increment_inflight_request_count increment_inflight_request_count

View file

@ -40,6 +40,10 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base
# be used. # be used.
config :flush_size, :validate => :number, :default => 100 config :flush_size, :validate => :number, :default => 100
# The document ID for the index. Useful for overwriting existing entries in
# elasticsearch with the same ID.
config :document_id, :validate => :string, :default => nil
public public
def register def register
require "ftw" # gem ftw require "ftw" # gem ftw
@ -84,9 +88,12 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base
end # def receive_single end # def receive_single
def receive_bulk(event, index, type) def receive_bulk(event, index, type)
header = { "index" => { "_index" => index, "_type" => type } }
if @document_id.nil?
header["index"]["_id"] = event.sprintf(@document_id)
end
@queue << [ @queue << [
{ "index" => { "_index" => index, "_type" => type } }.to_json, header.to_json, event.to_json
event.to_json
].join("\n") ].join("\n")
# Keep trying to flush while the queue is full. # Keep trying to flush while the queue is full.
@ -98,6 +105,10 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base
puts "Flushing #{@queue.count} events" puts "Flushing #{@queue.count} events"
# If we don't tack a trailing newline at the end, elasticsearch # If we don't tack a trailing newline at the end, elasticsearch
# doesn't seem to process the last event in this bulk index call. # doesn't seem to process the last event in this bulk index call.
#
# as documented here:
# http://www.elasticsearch.org/guide/reference/api/bulk.html
# "NOTE: the final line of data must end with a newline character \n."
response = @agent.post!("http://#{@host}:#{@port}/_bulk", response = @agent.post!("http://#{@host}:#{@port}/_bulk",
:body => @queue.join("\n") + "\n") :body => @queue.join("\n") + "\n")

View file

@ -80,6 +80,10 @@ class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base
# AMQP persistence setting # AMQP persistence setting
config :persistent, :validate => :boolean, :default => true config :persistent, :validate => :boolean, :default => true
# The document ID for the index. Useful for overwriting existing entries in
# elasticsearch with the same ID.
config :document_id, :validate => :string, :default => nil
public public
def register def register
@ -199,20 +203,14 @@ class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base
public public
def receive(event) def receive(event)
return unless output?(event) return unless output?(event)
# TODO(sissel): Refactor this to not use so much string concatonation.
# River events have a format of # River events have a format of
# "action\ndata\n" # "action\ndata\n"
# where 'action' is index or delete, data is the data to index. # where 'action' is index or delete, data is the data to index.
index_message = { header = { "index" => { "_index" => index, "_type" => type } }
"index" => { if @document_id.nil?
"_index" => event.sprintf(@index), header["index"]["_id"] = event.sprintf(@document_id)
"_type" => event.sprintf(@index_type) end
}
}.to_json + "\n"
index_message += event.to_json + "\n" @mq.receive_raw(header.to_json + "\n" + event.to_json + "\n")
@mq.receive_raw(index_message)
end # def receive end # def receive
end # LogStash::Outputs::ElasticSearchRiver end # LogStash::Outputs::ElasticSearchRiver