Merge pull request #389 from zimride/improve-mongodb-output

Support mongoDb replica sets and local mongo _id
This commit is contained in:
Jordan Sissel 2013-05-28 23:53:49 -07:00
commit 4f2316c9d8

View file

@ -6,18 +6,13 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base
config_name "mongodb"
plugin_status "beta"
# your mongodb host
config :host, :validate => :string, :required => true
# the mongodb port
config :port, :validate => :number, :default => 27017
# a MongoDB URI to connect to
# See http://docs.mongodb.org/manual/reference/connection-string/
config :uri, :validate => :string, :required => true
# The database to use
config :database, :validate => :string, :required => true
config :user, :validate => :string, :required => false
config :password, :validate => :password, :required => false
# The collection to use. This value can use %{foo} values to dynamically
# select a collection based on data in the event.
config :collection, :validate => :string, :required => true
@ -30,19 +25,23 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base
# Number of seconds to wait after failure before retrying
config :retry_delay, :validate => :number, :default => 3, :required => false
# If true, a _id field will be added to the document before insertion.
# The _id field will use the timestamp of the event and overwrite an existing
# _id field in the event.
config :generateId, :validate => :boolean, :default => false
public
def register
require "mongo"
# TODO(petef): check for errors
db = Mongo::Connection.new(@host, @port).db(@database)
auth = true
if @user then
auth = db.authenticate(@user, @password.value) if @user
uriParsed=Mongo::URIParser.new(@uri)
conn = uriParsed.connection({})
if uriParsed.auths.length > 0
uriParsed.auths.each do |auth|
conn.add_auth(auth['db_name'], auth['username'], auth['password'])
end
conn.apply_saved_authentication()
end
if not auth then
raise RuntimeError, "MongoDB authentication failure"
end
@mongodb = db
@db = conn.db(@database)
end # def register
public
@ -54,15 +53,26 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base
# the mongodb driver wants time values as a ruby Time object.
# set the @timestamp value of the document to a ruby Time object, then.
document = event.to_hash.merge("@timestamp" => event.ruby_timestamp)
@mongodb.collection(event.sprintf(@collection)).insert(document)
else
@mongodb.collection(event.sprintf(@collection)).insert(event.to_hash)
document = event.to_hash
end
if @generateId
document['_id'] = BSON::ObjectId.new(nil, event.ruby_timestamp)
end
@db.collection(event.sprintf(@collection)).insert(document)
rescue => e
@logger.warn("Failed to send event to MongoDB", :event => event, :exception => e,
:backtrace => e.backtrace)
sleep @retry_delay
retry
if e.error_code == 11000
# On a duplicate key error, skip the insert.
# We could check if the duplicate key err is the _id key
# and generate a new primary key.
# If the duplicate key error is on another field, we have no way
# to fix the issue.
else
sleep @retry_delay
retry
end
end
end # def receive
end # class LogStash::Outputs::Mongodb