diff --git a/lib/logstash/outputs/mongodb.rb b/lib/logstash/outputs/mongodb.rb index d53ddd866..43183ba51 100644 --- a/lib/logstash/outputs/mongodb.rb +++ b/lib/logstash/outputs/mongodb.rb @@ -6,18 +6,13 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base config_name "mongodb" plugin_status "beta" - # your mongodb host - config :host, :validate => :string, :required => true - - # the mongodb port - config :port, :validate => :number, :default => 27017 - + # a MongoDB URI to connect to + # See http://docs.mongodb.org/manual/reference/connection-string/ + config :uri, :validate => :string, :required => true + # The database to use config :database, :validate => :string, :required => true - - config :user, :validate => :string, :required => false - config :password, :validate => :password, :required => false - + # The collection to use. This value can use %{foo} values to dynamically # select a collection based on data in the event. config :collection, :validate => :string, :required => true @@ -30,19 +25,23 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base # Number of seconds to wait after failure before retrying config :retry_delay, :validate => :number, :default => 3, :required => false + # If true, a _id field will be added to the document before insertion. + # The _id field will use the timestamp of the event and overwrite an existing + # _id field in the event. + config :generateId, :validate => :boolean, :default => false + public def register require "mongo" - # TODO(petef): check for errors - db = Mongo::Connection.new(@host, @port).db(@database) - auth = true - if @user then - auth = db.authenticate(@user, @password.value) if @user + uriParsed=Mongo::URIParser.new(@uri) + conn = uriParsed.connection({}) + if uriParsed.auths.length > 0 + uriParsed.auths.each do |auth| + conn.add_auth(auth['db_name'], auth['username'], auth['password']) + end + conn.apply_saved_authentication() end - if not auth then - raise RuntimeError, "MongoDB authentication failure" - end - @mongodb = db + @db = conn.db(@database) end # def register public @@ -54,15 +53,26 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base # the mongodb driver wants time values as a ruby Time object. # set the @timestamp value of the document to a ruby Time object, then. document = event.to_hash.merge("@timestamp" => event.ruby_timestamp) - @mongodb.collection(event.sprintf(@collection)).insert(document) else - @mongodb.collection(event.sprintf(@collection)).insert(event.to_hash) + document = event.to_hash end + if @generateId + document['_id'] = BSON::ObjectId.new(nil, event.ruby_timestamp) + end + @db.collection(event.sprintf(@collection)).insert(document) rescue => e @logger.warn("Failed to send event to MongoDB", :event => event, :exception => e, :backtrace => e.backtrace) - sleep @retry_delay - retry + if e.error_code == 11000 + # On a duplicate key error, skip the insert. + # We could check if the duplicate key err is the _id key + # and generate a new primary key. + # If the duplicate key error is on another field, we have no way + # to fix the issue. + else + sleep @retry_delay + retry + end end end # def receive end # class LogStash::Outputs::Mongodb