From 84d938ebdc308555d5d7bfcfab2036cabb5c2b62 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Mon, 26 May 2014 17:21:13 -0400 Subject: [PATCH] replace json parsers with JrJackson and Oj refactored timestamps with new Timestamp class closes #1434 --- lib/logstash/codecs/edn.rb | 14 ++- lib/logstash/codecs/edn_lines.rb | 12 +- lib/logstash/codecs/fluent.rb | 14 ++- lib/logstash/codecs/graphite.rb | 6 +- lib/logstash/codecs/json.rb | 16 +-- lib/logstash/codecs/json_lines.rb | 12 +- lib/logstash/codecs/json_spooler.rb | 7 +- lib/logstash/codecs/line.rb | 8 +- lib/logstash/codecs/msgpack.rb | 11 +- lib/logstash/codecs/multiline.rb | 9 +- lib/logstash/codecs/netflow.rb | 5 +- lib/logstash/codecs/noop.rb | 6 +- lib/logstash/codecs/oldlogstashjson.rb | 15 +-- lib/logstash/codecs/plain.rb | 8 +- lib/logstash/codecs/rubydebug.rb | 4 +- lib/logstash/codecs/spool.rb | 8 +- lib/logstash/event.rb | 95 +++++++------- lib/logstash/filters/date.rb | 16 +-- lib/logstash/filters/json.rb | 22 ++-- lib/logstash/filters/metrics.rb | 2 +- lib/logstash/filters/multiline.rb | 2 +- lib/logstash/filters/sleep.rb | 6 +- lib/logstash/inputs/collectd.rb | 3 +- lib/logstash/inputs/elasticsearch.rb | 57 ++++----- lib/logstash/inputs/eventlog.rb | 11 +- lib/logstash/inputs/gelf.rb | 6 +- lib/logstash/inputs/graphite.rb | 5 +- lib/logstash/inputs/imap.rb | 4 +- lib/logstash/inputs/sqs.rb | 7 +- lib/logstash/inputs/twitter.rb | 11 +- lib/logstash/java_integration.rb | 41 +++++++ lib/logstash/json.rb | 53 ++++++++ lib/logstash/outputs/csv.rb | 14 +-- lib/logstash/outputs/elasticsearch.rb | 3 +- .../outputs/elasticsearch/protocol.rb | 12 +- lib/logstash/outputs/elasticsearch_http.rb | 5 +- lib/logstash/outputs/elasticsearch_river.rb | 8 +- lib/logstash/outputs/gelf.rb | 2 +- lib/logstash/outputs/http.rb | 7 +- lib/logstash/outputs/juggernaut.rb | 3 +- lib/logstash/outputs/pagerduty.rb | 9 +- lib/logstash/outputs/rabbitmq/bunny.rb | 5 +- lib/logstash/outputs/s3.rb | 116 +++++++++--------- lib/logstash/outputs/sns.rb | 2 +- lib/logstash/plugin.rb | 2 + lib/logstash/time_addon.rb | 25 ---- lib/logstash/timestamp.rb | 92 ++++++++++++++ lib/logstash/util.rb | 42 ++++++- logstash.gemspec | 3 +- spec/codecs/edn.rb | 29 ++++- spec/codecs/edn_lines.rb | 32 ++++- spec/codecs/graphite.rb | 14 +-- spec/codecs/json.rb | 9 +- spec/codecs/json_lines.rb | 15 +-- spec/codecs/json_spooler.rb | 74 +++++------ spec/codecs/msgpack.rb | 32 +++-- spec/codecs/oldlogstashjson.rb | 19 +-- spec/event.rb | 47 +++++++ spec/examples/graphite-input.rb | 2 +- spec/examples/parse-apache-logs.rb | 2 +- spec/filters/date.rb | 36 +++--- spec/filters/json.rb | 9 +- spec/inputs/elasticsearch.rb | 80 ++++++++++++ spec/inputs/gelf.rb | 44 +++---- spec/inputs/tcp.rb | 7 +- spec/json.rb | 94 ++++++++++++++ spec/outputs/elasticsearch.rb | 21 ++-- spec/outputs/elasticsearch_http.rb | 13 +- spec/outputs/file.rb | 5 +- spec/outputs/redis.rb | 5 +- spec/support/date-http.rb | 2 +- spec/test_utils.rb | 9 +- spec/timestamp.rb | 35 ++++++ tools/Gemfile.jruby-1.9.lock | 77 ++++++------ 74 files changed, 1053 insertions(+), 505 deletions(-) create mode 100644 lib/logstash/java_integration.rb create mode 100644 lib/logstash/json.rb delete mode 100644 lib/logstash/time_addon.rb create mode 100644 lib/logstash/timestamp.rb create mode 100644 spec/inputs/elasticsearch.rb create mode 100644 spec/json.rb create mode 100644 spec/timestamp.rb diff --git a/lib/logstash/codecs/edn.rb b/lib/logstash/codecs/edn.rb index f5686db0b..449b7cec4 100644 --- a/lib/logstash/codecs/edn.rb +++ b/lib/logstash/codecs/edn.rb @@ -1,5 +1,6 @@ require "logstash/codecs/base" require "logstash/codecs/line" +require "logstash/util" class LogStash::Codecs::EDN < LogStash::Codecs::Base config_name "edn" @@ -14,15 +15,20 @@ class LogStash::Codecs::EDN < LogStash::Codecs::Base def decode(data) begin yield LogStash::Event.new(EDN.read(data)) - rescue - @logger.info("EDN parse failure. Falling back to plain-text", :error => e, :data => data) + rescue => e + @logger.warn("EDN parse failure. Falling back to plain-text", :error => e, :data => data) yield LogStash::Event.new("message" => data) end end public - def encode(data) - @on_event.call(data.to_hash.to_edn) + def encode(event) + # use normalize to make sure returned Hash is pure Ruby + # #to_edn which relies on pure Ruby object recognition + data = LogStash::Util.normalize(event.to_hash) + # timestamp is serialized as a iso8601 string + # merge to avoid modifying data which could have side effects if multiple outputs + @on_event.call(data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601).to_edn) end end diff --git a/lib/logstash/codecs/edn_lines.rb b/lib/logstash/codecs/edn_lines.rb index 8b6b490c2..3c4a0a38b 100644 --- a/lib/logstash/codecs/edn_lines.rb +++ b/lib/logstash/codecs/edn_lines.rb @@ -1,5 +1,6 @@ require "logstash/codecs/base" require "logstash/codecs/line" +require "logstash/util" class LogStash::Codecs::EDNLines < LogStash::Codecs::Base config_name "edn_lines" @@ -22,15 +23,20 @@ class LogStash::Codecs::EDNLines < LogStash::Codecs::Base begin yield LogStash::Event.new(EDN.read(event["message"])) rescue => e - @logger.info("EDN parse failure. Falling back to plain-text", :error => e, :data => data) + @logger.warn("EDN parse failure. Falling back to plain-text", :error => e, :data => data) yield LogStash::Event.new("message" => data) end end end public - def encode(data) - @on_event.call(data.to_hash.to_edn + "\n") + def encode(event) + # use normalize to make sure returned Hash is pure Ruby for + # #to_edn which relies on pure Ruby object recognition + data = LogStash::Util.normalize(event.to_hash) + # timestamp is serialized as a iso8601 string + # merge to avoid modifying data which could have side effects if multiple outputs + @on_event.call(data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601).to_edn + NL) end end diff --git a/lib/logstash/codecs/fluent.rb b/lib/logstash/codecs/fluent.rb index d1e6acd33..cbcaf3ebe 100644 --- a/lib/logstash/codecs/fluent.rb +++ b/lib/logstash/codecs/fluent.rb @@ -1,6 +1,8 @@ # encoding: utf-8 require "logstash/codecs/base" require "logstash/util/charset" +require "logstash/timestamp" +require "logstash/util" # This codec handles fluentd's msgpack schema. # @@ -38,7 +40,7 @@ class LogStash::Codecs::Fluent < LogStash::Codecs::Base @decoder.feed(data) @decoder.each do |tag, epochtime, map| event = LogStash::Event.new(map.merge( - "@timestamp" => Time.at(epochtime), + LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), "tags" => tag )) yield event @@ -48,8 +50,14 @@ class LogStash::Codecs::Fluent < LogStash::Codecs::Base public def encode(event) tag = event["tags"] || "log" - epochtime = event["@timestamp"].to_i - @on_event.call(MessagePack.pack([ tag, epochtime, event.to_hash ])) + epochtime = event.timestamp.to_i + + # use normalize to make sure returned Hash is pure Ruby for + # MessagePack#pack which relies on pure Ruby object recognition + data = LogStash::Util.normalize(event.to_hash) + # timestamp is serialized as a iso8601 string + # merge to avoid modifying data which could have side effects if multiple outputs + @on_event.call(MessagePack.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)])) end # def encode end # class LogStash::Codecs::Fluent diff --git a/lib/logstash/codecs/graphite.rb b/lib/logstash/codecs/graphite.rb index e3510f3b6..4471df9aa 100644 --- a/lib/logstash/codecs/graphite.rb +++ b/lib/logstash/codecs/graphite.rb @@ -1,7 +1,7 @@ # encoding: utf-8 require "logstash/codecs/base" require "logstash/codecs/line" -require "json" +require "logstash/timestamp" # This codec will encode and decode Graphite formated lines. class LogStash::Codecs::Graphite < LogStash::Codecs::Base @@ -52,7 +52,7 @@ class LogStash::Codecs::Graphite < LogStash::Codecs::Base def decode(data) @lines.decode(data) do |event| name, value, time = event["message"].split(" ") - yield LogStash::Event.new(name => value.to_f, "@timestamp" => Time.at(time.to_i).gmtime) + yield LogStash::Event.new(name => value.to_f, LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(time.to_i)) end # @lines.decode end # def decode @@ -93,7 +93,7 @@ class LogStash::Codecs::Graphite < LogStash::Codecs::Base if messages.empty? @logger.debug("Message is empty, not emiting anything.", :messages => messages) else - message = messages.join("\n") + "\n" + message = messages.join(NL) + NL @logger.debug("Emiting carbon messages", :messages => messages) @on_event.call(message) diff --git a/lib/logstash/codecs/json.rb b/lib/logstash/codecs/json.rb index 718498cad..1ba0163f8 100644 --- a/lib/logstash/codecs/json.rb +++ b/lib/logstash/codecs/json.rb @@ -1,9 +1,9 @@ # encoding: utf-8 require "logstash/codecs/base" -require "logstash/codecs/line" -require "json" +require "logstash/util/charset" +require "logstash/json" -# This codec may be used to decode (via inputs) and encode (via outputs) +# This codec may be used to decode (via inputs) and encode (via outputs) # full JSON messages. If you are streaming JSON messages delimited # by '\n' then see the `json_lines` codec. # Encoding will result in a single JSON string. @@ -28,21 +28,21 @@ class LogStash::Codecs::JSON < LogStash::Codecs::Base @converter = LogStash::Util::Charset.new(@charset) @converter.logger = @logger end - + public def decode(data) data = @converter.convert(data) begin - yield LogStash::Event.new(JSON.parse(data)) - rescue JSON::ParserError => e + yield LogStash::Event.new(LogStash::Json.load(data)) + rescue LogStash::Json::ParserError => e @logger.info("JSON parse failure. Falling back to plain-text", :error => e, :data => data) yield LogStash::Event.new("message" => data) end end # def decode public - def encode(data) - @on_event.call(data.to_json) + def encode(event) + @on_event.call(event.to_json) end # def encode end # class LogStash::Codecs::JSON diff --git a/lib/logstash/codecs/json_lines.rb b/lib/logstash/codecs/json_lines.rb index e3dac771d..f319340f7 100644 --- a/lib/logstash/codecs/json_lines.rb +++ b/lib/logstash/codecs/json_lines.rb @@ -1,7 +1,7 @@ # encoding: utf-8 require "logstash/codecs/base" require "logstash/codecs/line" -require "json" +require "logstash/json" # This codec will decode streamed JSON that is newline delimited. # For decoding line-oriented JSON payload in the redis or file inputs, @@ -29,14 +29,14 @@ class LogStash::Codecs::JSONLines < LogStash::Codecs::Base @lines = LogStash::Codecs::Line.new @lines.charset = @charset end - + public def decode(data) @lines.decode(data) do |event| begin - yield LogStash::Event.new(JSON.parse(event["message"])) - rescue JSON::ParserError => e + yield LogStash::Event.new(LogStash::Json.load(event["message"])) + rescue LogStash::Json::ParserError => e @logger.info("JSON parse failure. Falling back to plain-text", :error => e, :data => data) yield LogStash::Event.new("message" => event["message"]) end @@ -44,10 +44,10 @@ class LogStash::Codecs::JSONLines < LogStash::Codecs::Base end # def decode public - def encode(data) + def encode(event) # Tack on a \n for now because previously most of logstash's JSON # outputs emitted one per line, and whitespace is OK in json. - @on_event.call(data.to_json + "\n") + @on_event.call(event.to_json + NL) end # def encode end # class LogStash::Codecs::JSON diff --git a/lib/logstash/codecs/json_spooler.rb b/lib/logstash/codecs/json_spooler.rb index a143971ee..2dc512b2f 100644 --- a/lib/logstash/codecs/json_spooler.rb +++ b/lib/logstash/codecs/json_spooler.rb @@ -1,6 +1,7 @@ # encoding: utf-8 require "logstash/codecs/base" require "logstash/codecs/spool" +require "logstash/json" # This is the base class for logstash codecs. class LogStash::Codecs::JsonSpooler < LogStash::Codecs::Spool @@ -14,14 +15,14 @@ class LogStash::Codecs::JsonSpooler < LogStash::Codecs::Spool public def decode(data) - super(JSON.parse(data.force_encoding(Encoding::UTF_8))) do |event| + super(LogStash::Json.load(data.force_encoding(Encoding::UTF_8))) do |event| yield event end end # def decode public - def encode(data) - super(data) + def encode(event) + super(event) end # def encode end # class LogStash::Codecs::Json diff --git a/lib/logstash/codecs/line.rb b/lib/logstash/codecs/line.rb index 21ae47a89..f673708a5 100644 --- a/lib/logstash/codecs/line.rb +++ b/lib/logstash/codecs/line.rb @@ -30,7 +30,7 @@ class LogStash::Codecs::Line < LogStash::Codecs::Base @converter = LogStash::Util::Charset.new(@charset) @converter.logger = @logger end - + public def decode(data) @buffer.extract(data).each do |line| @@ -47,11 +47,11 @@ class LogStash::Codecs::Line < LogStash::Codecs::Base end public - def encode(data) + def encode(event) if data.is_a? LogStash::Event and @format - @on_event.call(data.sprintf(@format) + "\n") + @on_event.call(event.sprintf(@format) + NL) else - @on_event.call(data.to_s + "\n") + @on_event.call(event.to_s + NL) end end # def encode diff --git a/lib/logstash/codecs/msgpack.rb b/lib/logstash/codecs/msgpack.rb index 05dedf449..b2f45e28a 100644 --- a/lib/logstash/codecs/msgpack.rb +++ b/lib/logstash/codecs/msgpack.rb @@ -1,5 +1,7 @@ # encoding: utf-8 require "logstash/codecs/base" +require "logstash/timestamp" +require "logstash/util" class LogStash::Codecs::Msgpack < LogStash::Codecs::Base config_name "msgpack" @@ -18,7 +20,6 @@ class LogStash::Codecs::Msgpack < LogStash::Codecs::Base begin # Msgpack does not care about UTF-8 event = LogStash::Event.new(MessagePack.unpack(data)) - event["@timestamp"] = Time.at(event["@timestamp"]).utc if event["@timestamp"].is_a? Float event["tags"] ||= [] if @format event["message"] ||= event.sprintf(@format) @@ -36,8 +37,12 @@ class LogStash::Codecs::Msgpack < LogStash::Codecs::Base public def encode(event) - event["@timestamp"] = event["@timestamp"].to_f - @on_event.call event.to_hash.to_msgpack + # use normalize to make sure returned Hash is pure Ruby for + # MessagePack#pack which relies on pure Ruby object recognition + data = LogStash::Util.normalize(event.to_hash) + # timestamp is serialized as a iso8601 string + # merge to avoid modifying data which could have side effects if multiple outputs + @on_event.call(MessagePack.pack(data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601))) end # def encode end # class LogStash::Codecs::Msgpack diff --git a/lib/logstash/codecs/multiline.rb b/lib/logstash/codecs/multiline.rb index 1bbdecb0d..4815ffa96 100644 --- a/lib/logstash/codecs/multiline.rb +++ b/lib/logstash/codecs/multiline.rb @@ -1,6 +1,7 @@ # encoding: utf-8 require "logstash/codecs/base" require "logstash/util/charset" +require "logstash/timestamp" # The multiline codec will collapse multiline messages and merge them into a # single event. @@ -159,13 +160,13 @@ class LogStash::Codecs::Multiline < LogStash::Codecs::Base end # def decode def buffer(text) - @time = Time.now.utc if @buffer.empty? + @time = LogStash::Timestamp.now if @buffer.empty? @buffer << text end def flush(&block) if @buffer.any? - event = LogStash::Event.new("@timestamp" => @time, "message" => @buffer.join("\n")) + event = LogStash::Event.new(LogStash::Event::TIMESTAMP => @time, "message" => @buffer.join(NL)) # Tag multiline events event.tag @multiline_tag if @multiline_tag && @buffer.size > 1 @@ -185,9 +186,9 @@ class LogStash::Codecs::Multiline < LogStash::Codecs::Base end public - def encode(data) + def encode(event) # Nothing to do. - @on_event.call(data) + @on_event.call(event) end # def encode end # class LogStash::Codecs::Plain diff --git a/lib/logstash/codecs/netflow.rb b/lib/logstash/codecs/netflow.rb index 278cd214d..360de0fd8 100644 --- a/lib/logstash/codecs/netflow.rb +++ b/lib/logstash/codecs/netflow.rb @@ -1,6 +1,7 @@ # encoding: utf-8 require "logstash/filters/base" require "logstash/namespace" +require "logstash/timestamp" # The "netflow" codec is for decoding Netflow v5/v9 flows. class LogStash::Codecs::Netflow < LogStash::Codecs::Base @@ -90,7 +91,7 @@ class LogStash::Codecs::Netflow < LogStash::Codecs::Base # # The flowset header gives us the UTC epoch seconds along with # residual nanoseconds so we can set @timestamp to that easily - event["@timestamp"] = Time.at(flowset.unix_sec, flowset.unix_nsec / 1000).utc + event.timestamp = LogStash::Timestamp.at(flowset.unix_sec, flowset.unix_nsec / 1000) event[@target] = {} # Copy some of the pertinent fields in the header to the event @@ -190,7 +191,7 @@ class LogStash::Codecs::Netflow < LogStash::Codecs::Base records.each do |r| event = LogStash::Event.new( - "@timestamp" => Time.at(flowset.unix_sec).utc, + LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(flowset.unix_sec), @target => {} ) diff --git a/lib/logstash/codecs/noop.rb b/lib/logstash/codecs/noop.rb index 8a7a0d721..ed74a685f 100644 --- a/lib/logstash/codecs/noop.rb +++ b/lib/logstash/codecs/noop.rb @@ -5,15 +5,15 @@ class LogStash::Codecs::Noop < LogStash::Codecs::Base config_name "noop" milestone 1 - + public def decode(data) yield data end # def decode public - def encode(data) - @on_event.call data + def encode(event) + @on_event.call event end # def encode end # class LogStash::Codecs::Noop diff --git a/lib/logstash/codecs/oldlogstashjson.rb b/lib/logstash/codecs/oldlogstashjson.rb index d815ece33..0cd5ce749 100644 --- a/lib/logstash/codecs/oldlogstashjson.rb +++ b/lib/logstash/codecs/oldlogstashjson.rb @@ -1,5 +1,6 @@ # encoding: utf-8 require "logstash/codecs/base" +require "logstash/json" class LogStash::Codecs::OldLogStashJSON < LogStash::Codecs::Base config_name "oldlogstashjson" @@ -14,8 +15,8 @@ class LogStash::Codecs::OldLogStashJSON < LogStash::Codecs::Base public def decode(data) begin - obj = JSON.parse(data.force_encoding(Encoding::UTF_8)) - rescue JSON::ParserError => e + obj = LogStash::Json.load(data.force_encoding(Encoding::UTF_8)) + rescue LogStash::Json::ParserError => e @logger.info("JSON parse failure. Falling back to plain-text", :error => e, :data => data) yield LogStash::Event.new("message" => data) return @@ -33,24 +34,24 @@ class LogStash::Codecs::OldLogStashJSON < LogStash::Codecs::Base end # def decode public - def encode(data) + def encode(event) h = {} # Convert the new logstash schema to the old one. V0_TO_V1.each do |key, val| - h[key] = data[val] if data.include?(val) + h[key] = event[val] if event.include?(val) end - data.to_hash.each do |field, val| + event.to_hash.each do |field, val| # TODO: might be better to V1_TO_V0 = V0_TO_V1.invert during # initialization than V0_TO_V1.has_value? within loop next if field == "@version" or V0_TO_V1.has_value?(field) - h["@fields"] = {} if h["@fields"].nil? + h["@fields"] ||= {} h["@fields"][field] = val end # Tack on a \n because JSON outputs 1.1.x had them. - @on_event.call(h.to_json + "\n") + @on_event.call(LogStash::Json.dump(h) + NL) end # def encode end # class LogStash::Codecs::OldLogStashJSON diff --git a/lib/logstash/codecs/plain.rb b/lib/logstash/codecs/plain.rb index 40071f5ad..5fe86aacd 100644 --- a/lib/logstash/codecs/plain.rb +++ b/lib/logstash/codecs/plain.rb @@ -37,11 +37,11 @@ class LogStash::Codecs::Plain < LogStash::Codecs::Base end # def decode public - def encode(data) - if data.is_a? LogStash::Event and @format - @on_event.call(data.sprintf(@format)) + def encode(event) + if event.is_a?(LogStash::Event) and @format + @on_event.call(event.sprintf(@format)) else - @on_event.call(data.to_s) + @on_event.call(event.to_s) end end # def encode diff --git a/lib/logstash/codecs/rubydebug.rb b/lib/logstash/codecs/rubydebug.rb index 607131a29..fa53a5ec6 100644 --- a/lib/logstash/codecs/rubydebug.rb +++ b/lib/logstash/codecs/rubydebug.rb @@ -18,8 +18,8 @@ class LogStash::Codecs::RubyDebug < LogStash::Codecs::Base end # def decode public - def encode(data) - @on_event.call(data.to_hash.awesome_inspect + "\n") + def encode(event) + @on_event.call(event.to_hash.awesome_inspect + NL) end # def encode end # class LogStash::Codecs::Dots diff --git a/lib/logstash/codecs/spool.rb b/lib/logstash/codecs/spool.rb index 67ff480b0..04fdd05bd 100644 --- a/lib/logstash/codecs/spool.rb +++ b/lib/logstash/codecs/spool.rb @@ -16,15 +16,15 @@ class LogStash::Codecs::Spool < LogStash::Codecs::Base end # def decode public - def encode(data) - @buffer = [] if @buffer.nil? - #buffer size is hard coded for now until a + def encode(event) + @buffer ||= [] + #buffer size is hard coded for now until a #better way to pass args into codecs is implemented if @buffer.length >= @spool_size @on_event.call @buffer @buffer = [] else - @buffer << data + @buffer << event end end # def encode diff --git a/lib/logstash/event.rb b/lib/logstash/event.rb index 1604ad603..f9806a09c 100644 --- a/lib/logstash/event.rb +++ b/lib/logstash/event.rb @@ -1,23 +1,12 @@ # encoding: utf-8 -require "json" require "time" require "date" +require "cabin" require "logstash/namespace" require "logstash/util/fieldreference" require "logstash/util/accessors" -require "logstash/time_addon" - -# Use a custom serialization for jsonifying Time objects. -# TODO(sissel): Put this in a separate file. -class Time - def to_json(*args) - return iso8601(3).to_json(*args) - end - - def inspect - return to_json - end -end +require "logstash/timestamp" +require "logstash/json" # the logstash event object. # @@ -48,23 +37,17 @@ class LogStash::Event TIMESTAMP = "@timestamp" VERSION = "@version" VERSION_ONE = "1" + TIMESTAMP_FAILURE_TAG = "_timestampparsefailure" + TIMESTAMP_FAILURE_FIELD = "_@timestamp" public - def initialize(data={}) + def initialize(data = {}) + @logger = Cabin::Channel.get(LogStash) @cancelled = false - @data = data @accessors = LogStash::Util::Accessors.new(data) - - data[VERSION] = VERSION_ONE if !@data.include?(VERSION) - if data.include?(TIMESTAMP) - t = data[TIMESTAMP] - if t.is_a?(String) - data[TIMESTAMP] = LogStash::Time.parse_iso8601(t) - end - else - data[TIMESTAMP] = ::Time.now.utc - end + @data[VERSION] ||= VERSION_ONE + @data[TIMESTAMP] = init_timestamp(@data[TIMESTAMP]) end # def initialize public @@ -101,7 +84,7 @@ class LogStash::Event else public def to_s - return self.sprintf("#{self["@timestamp"].iso8601} %{host} %{message}") + return self.sprintf("#{timestamp.to_iso8601} %{host} %{message}") end # def to_s end @@ -119,23 +102,18 @@ class LogStash::Event # field-related access public - def [](str) - if str[0,1] == CHAR_PLUS - # nothing? - else - # return LogStash::Util::FieldReference.exec(str, @data) - @accessors.get(str) - end + def [](fieldref) + @accessors.get(fieldref) end # def [] public # keep []= implementation in sync with spec/test_utils.rb monkey patch # which redefines []= but using @accessors.strict_set - def []=(str, value) - if str == TIMESTAMP && !value.is_a?(Time) - raise TypeError, "The field '@timestamp' must be a Time, not a #{value.class} (#{value})" + def []=(fieldref, value) + if fieldref == TIMESTAMP && !value.is_a?(LogStash::Timestamp) + raise TypeError, "The field '@timestamp' must be a (LogStash::Timestamp, not a #{value.class} (#{value})" end - @accessors.set(str, value) + @accessors.set(fieldref, value) end # def []= public @@ -144,12 +122,13 @@ class LogStash::Event end public - def to_json(*args) - return @data.to_json(*args) + def to_json + LogStash::Json.dump(@data) end # def to_json + public def to_hash - return @data + @data end # def to_hash public @@ -161,7 +140,7 @@ class LogStash::Event #convert timestamp if it is a String if @data[TIMESTAMP].is_a?(String) - @data[TIMESTAMP] = LogStash::Time.parse_iso8601(@data[TIMESTAMP]) + @data[TIMESTAMP] = LogStash::Timestamp.parse_iso8601(@data[TIMESTAMP]) end end @@ -183,11 +162,8 @@ class LogStash::Event # Remove a field or field reference. Returns the value of that field when # deleted public - def remove(str) - # return LogStash::Util::FieldReference.exec(str, @data) do |obj, key| - # next obj.delete(key) - # end - @accessors.del(str) + def remove(fieldref) + @accessors.del(fieldref) end # def remove # sprintf. This could use a better method name. @@ -219,9 +195,9 @@ class LogStash::Event if key == "+%s" # Got %{+%s}, support for unix epoch time - next @data["@timestamp"].to_i + next @data[TIMESTAMP].to_i elsif key[0,1] == "+" - t = @data["@timestamp"] + t = @data[TIMESTAMP] formatter = org.joda.time.format.DateTimeFormat.forPattern(key[1 .. -1])\ .withZone(org.joda.time.DateTimeZone::UTC) #next org.joda.time.Instant.new(t.tv_sec * 1000 + t.tv_usec / 1000).toDateTime.toString(formatter) @@ -238,7 +214,7 @@ class LogStash::Event when Array value.join(",") # Join by ',' if value is an array when Hash - value.to_json # Convert hashes to json + LogStash::Json.dump(value) # Convert hashes to json else value # otherwise return the value end # case value @@ -251,4 +227,23 @@ class LogStash::Event self["tags"] ||= [] self["tags"] << value unless self["tags"].include?(value) end + + private + + def init_timestamp(o) + begin + timestamp = o ? LogStash::Timestamp.coerce(o) : LogStash::Timestamp.now + return timestamp if timestamp + + @logger.warn("Unrecognized #{TIMESTAMP} value, setting current time to #{TIMESTAMP}, original in #{TIMESTAMP_FAILURE_FIELD}field", :value => o.inspect) + rescue LogStash::TimestampParserError => e + @logger.warn("Error parsing #{TIMESTAMP} string, setting current time to #{TIMESTAMP}, original in #{TIMESTAMP_FAILURE_FIELD} field", :value => o.inspect, :exception => e.message) + end + + @data["tags"] ||= [] + @data["tags"] << TIMESTAMP_FAILURE_TAG unless @data["tags"].include?(TIMESTAMP_FAILURE_TAG) + @data[TIMESTAMP_FAILURE_FIELD] = o + + LogStash::Timestamp.now + end end # class LogStash::Event diff --git a/lib/logstash/filters/date.rb b/lib/logstash/filters/date.rb index 87663f4ee..937127a64 100644 --- a/lib/logstash/filters/date.rb +++ b/lib/logstash/filters/date.rb @@ -1,6 +1,7 @@ # encoding: utf-8 require "logstash/filters/base" require "logstash/namespace" +require "logstash/timestamp" # The date filter is used for parsing dates from fields, and then using that # date or timestamp as the logstash timestamp for the event. @@ -88,7 +89,7 @@ class LogStash::Filters::Date < LogStash::Filters::Base config :target, :validate => :string, :default => "@timestamp" # LOGSTASH-34 - DATEPATTERNS = %w{ y d H m s S } + DATEPATTERNS = %w{ y d H m s S } public def initialize(config = {}) @@ -111,7 +112,7 @@ class LogStash::Filters::Date < LogStash::Filters::Base def register require "java" if @match.length < 2 - raise LogStash::ConfigurationError, I18n.t("logstash.agent.configuration.invalid_plugin_register", + raise LogStash::ConfigurationError, I18n.t("logstash.agent.configuration.invalid_plugin_register", :plugin => "filter", :type => "date", :error => "The match setting should contains first a field name and at least one date format, current value is #{@match}") end @@ -137,16 +138,16 @@ class LogStash::Filters::Date < LogStash::Filters::Base parser = lambda { |date| (date.to_f * 1000).to_i } when "UNIX_MS" # unix epoch in ms joda_instant = org.joda.time.Instant.java_class.constructor(Java::long).method(:new_instance) - parser = lambda do |date| + parser = lambda do |date| #return joda_instant.call(date.to_i).to_java.toDateTime return date.to_i end when "TAI64N" # TAI64 with nanoseconds, -10000 accounts for leap seconds joda_instant = org.joda.time.Instant.java_class.constructor(Java::long).method(:new_instance) - parser = lambda do |date| + parser = lambda do |date| # Skip leading "@" if it is present (common in tai64n times) date = date[1..-1] if date[0, 1] == "@" - #return joda_instant.call((date[1..15].hex * 1000 - 10000)+(date[16..23].hex/1000000)).to_java.toDateTime + #return joda_instant.call((date[1..15].hex * 1000 - 10000)+(date[16..23].hex/1000000)).to_java.toDateTime return (date[1..15].hex * 1000 - 10000)+(date[16..23].hex/1000000) end else @@ -204,8 +205,7 @@ class LogStash::Filters::Date < LogStash::Filters::Base raise last_exception unless success # Convert joda DateTime to a ruby Time - event[@target] = Time.at(epochmillis / 1000, (epochmillis % 1000) * 1000).utc - #event[@target] = Time.at(epochmillis / 1000.0).utc + event[@target] = LogStash::Timestamp.at(epochmillis / 1000, (epochmillis % 1000) * 1000) @logger.debug? && @logger.debug("Date parsing done", :value => value, :timestamp => event[@target]) filter_matched(event) @@ -217,7 +217,7 @@ class LogStash::Filters::Date < LogStash::Filters::Base # TODO(sissel): What do we do on a failure? Tag it like grok does? #raise e end # begin - end # fieldvalue.each + end # fieldvalue.each end # @parsers.each return event diff --git a/lib/logstash/filters/json.rb b/lib/logstash/filters/json.rb index 2f6d47135..688844ff7 100644 --- a/lib/logstash/filters/json.rb +++ b/lib/logstash/filters/json.rb @@ -1,10 +1,12 @@ # encoding: utf-8 require "logstash/filters/base" require "logstash/namespace" +require "logstash/json" +require "logstash/timestamp" # This is a JSON parsing filter. It takes an existing field which contains JSON and # expands it into an actual data structure within the Logstash event. -# +# # By default it will place the parsed JSON in the root (top level) of the Logstash event, but this # filter can be configured to place the JSON into any arbitrary event field, using the # `target` configuration. @@ -45,8 +47,6 @@ class LogStash::Filters::Json < LogStash::Filters::Base # NOTE: if the `target` field already exists, it will be overwritten! config :target, :validate => :string - TIMESTAMP = "@timestamp" - public def register # Nothing to do here @@ -60,6 +60,8 @@ class LogStash::Filters::Json < LogStash::Filters::Base return unless event.include?(@source) + # TODO(colin) this field merging stuff below should be handled in Event. + source = event[@source] if @target.nil? # Default is to write to the root of the event. @@ -75,18 +77,16 @@ class LogStash::Filters::Json < LogStash::Filters::Base begin # TODO(sissel): Note, this will not successfully handle json lists - # like your text is '[ 1,2,3 ]' JSON.parse gives you an array (correctly) + # like your text is '[ 1,2,3 ]' json parser gives you an array (correctly) # which won't merge into a hash. If someone needs this, we can fix it # later. - dest.merge!(JSON.parse(source)) + dest.merge!(LogStash::Json.load(source)) # If no target, we target the root of the event object. This can allow - # you to overwrite @timestamp. If so, let's parse it as a timestamp! - if !@target && event[TIMESTAMP].is_a?(String) - # This is a hack to help folks who are mucking with @timestamp during - # their json filter. You aren't supposed to do anything with - # "@timestamp" outside of the date filter, but nobody listens... ;) - event[TIMESTAMP] = Time.parse(event[TIMESTAMP]).utc + # you to overwrite @timestamp and this will typically happen for json + # LogStash Event deserialized here. + if !@target && event.timestamp.is_a?(String) + event.timestamp = LogStash::Timestamp.parse_iso8601(event.timestamp) end filter_matched(event) diff --git a/lib/logstash/filters/metrics.rb b/lib/logstash/filters/metrics.rb index e22431d74..9d27b4b4b 100644 --- a/lib/logstash/filters/metrics.rb +++ b/lib/logstash/filters/metrics.rb @@ -158,7 +158,7 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base return unless filter?(event) # TODO(piavlo): This should probably be moved to base filter class. - if @ignore_older_than > 0 && Time.now - event["@timestamp"] > @ignore_older_than + if @ignore_older_than > 0 && Time.now - event.timestamp.time > @ignore_older_than @logger.debug("Skipping metriks for old event", :event => event) return end diff --git a/lib/logstash/filters/multiline.rb b/lib/logstash/filters/multiline.rb index eafae06d1..600a7bca6 100644 --- a/lib/logstash/filters/multiline.rb +++ b/lib/logstash/filters/multiline.rb @@ -233,7 +233,7 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base def collapse_event!(event) event["message"] = event["message"].join("\n") if event["message"].is_a?(Array) - event["@timestamp"] = event["@timestamp"].first if event["@timestamp"].is_a?(Array) + event.timestamp = event.timestamp.first if event.timestamp.is_a?(Array) event end end # class LogStash::Filters::Multiline diff --git a/lib/logstash/filters/sleep.rb b/lib/logstash/filters/sleep.rb index 3b83b6442..3309f86af 100644 --- a/lib/logstash/filters/sleep.rb +++ b/lib/logstash/filters/sleep.rb @@ -12,7 +12,7 @@ class LogStash::Filters::Sleep < LogStash::Filters::Base # The length of time to sleep, in seconds, for every event. # - # This can be a number (eg, 0.5), or a string (eg, "%{foo}") + # This can be a number (eg, 0.5), or a string (eg, "%{foo}") # The second form (string with a field value) is useful if # you have an attribute of your event that you want to use # to indicate the amount of time to sleep. @@ -33,7 +33,7 @@ class LogStash::Filters::Sleep < LogStash::Filters::Base # # filter { # sleep { - # time => "1" # Sleep 1 second + # time => "1" # Sleep 1 second # every => 10 # on every 10th event # } # } @@ -89,7 +89,7 @@ class LogStash::Filters::Sleep < LogStash::Filters::Base end if @replay - clock = event["@timestamp"].to_f + clock = event.timestamp.to_f if @last_clock delay = clock - @last_clock time = delay/time diff --git a/lib/logstash/inputs/collectd.rb b/lib/logstash/inputs/collectd.rb index f5f6afb55..59eb3b4ad 100644 --- a/lib/logstash/inputs/collectd.rb +++ b/lib/logstash/inputs/collectd.rb @@ -2,6 +2,7 @@ require "date" require "logstash/inputs/base" require "logstash/namespace" +require "logstash/timestamp" require "socket" require "tempfile" require "time" @@ -107,7 +108,7 @@ class LogStash::Inputs::Collectd < LogStash::Inputs::Base def initialize(params) super BasicSocket.do_not_reverse_lookup = true - @timestamp = Time.now().utc + @timestamp = LogStash::Timestamp.now @collectd = {} @types = {} end # def initialize diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index d3e8b211e..d34630eaf 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -2,6 +2,7 @@ require "logstash/inputs/base" require "logstash/namespace" require "logstash/util/socket_peer" +require "logstash/json" # Read from an Elasticsearch cluster, based on search query results. # This is useful for replaying test logs, reindexing, etc. @@ -55,15 +56,16 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base def register require "ftw" @agent = FTW::Agent.new + params = { "q" => @query, "scroll" => @scroll, "size" => "#{@size}", } - params['search_type'] = "scan" if @scan - @url = "http://#{@host}:#{@port}/#{@index}/_search?#{encode(params)}" + @search_url = "http://#{@host}:#{@port}/#{@index}/_search?#{encode(params)}" + @scroll_url = "http://#{@host}:#{@port}/_search/scroll?#{encode({"scroll" => @scroll})}" end # def register private @@ -73,42 +75,41 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base end.join("&") end # def encode - public - def run(output_queue) - - # Execute the search request - response = @agent.get!(@url) + private + def execute_search_request + response = @agent.get!(@search_url) json = "" response.read_body { |c| json << c } - result = JSON.parse(json) + json + end + + private + def execute_scroll_request(scroll_id) + response = @agent.post!(@scroll_url, :body => scroll_id) + json = "" + response.read_body { |c| json << c } + json + end + + public + def run(output_queue) + result = LogStash::Json.load(execute_search_request) scroll_id = result["_scroll_id"] # When using the search_type=scan we don't get an initial result set. # So we do it here. if @scan - - scroll_params = { - "scroll" => @scroll - } - - scroll_url = "http://#{@host}:#{@port}/_search/scroll?#{encode(scroll_params)}" - response = @agent.post!(scroll_url, :body => scroll_id) - json = "" - response.read_body { |c| json << c } - result = JSON.parse(json) - + result = LogStash::Json.load(execute_scroll_request(scroll_id)) end - while true + loop do break if result.nil? hits = result["hits"]["hits"] break if hits.empty? hits.each do |hit| - event = hit["_source"] - # Hack to make codecs work - @codec.decode(event.to_json) do |event| + @codec.decode(LogStash::Json.dump(hit["_source"])) do |event| decorate(event) output_queue << event end @@ -118,15 +119,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base scroll_id = result["_scroll_id"] # Fetch the next result set - scroll_params = { - "scroll" => @scroll - } - scroll_url = "http://#{@host}:#{@port}/_search/scroll?#{encode(scroll_params)}" - - response = @agent.post!(scroll_url, :body => scroll_id) - json = "" - response.read_body { |c| json << c } - result = JSON.parse(json) + result = LogStash::Json.load(execute_scroll_request(scroll_id)) if result["error"] @logger.warn(result["error"], :request => scroll_url) diff --git a/lib/logstash/inputs/eventlog.rb b/lib/logstash/inputs/eventlog.rb index 92c0790fc..ac6e7bd93 100644 --- a/lib/logstash/inputs/eventlog.rb +++ b/lib/logstash/inputs/eventlog.rb @@ -1,6 +1,7 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" +require "logstash/timestamp" require "socket" # This input will pull events from a (http://msdn.microsoft.com/en-us/library/windows/desktop/bb309026%28v=vs.85%29.aspx)[Windows Event Log]. @@ -60,14 +61,14 @@ class LogStash::Inputs::EventLog < LogStash::Inputs::Base "host" => @hostname, "path" => @logfile, "type" => @type, - "@timestamp" => timestamp + LogStash::Event::TIMESTAMP => timestamp ) %w{Category CategoryString ComputerName EventCode EventIdentifier EventType Logfile Message RecordNumber SourceName TimeGenerated TimeWritten Type User }.each{ - |property| e[property] = event.send property + |property| e[property] = event.send property } if RUBY_PLATFORM == "java" @@ -111,7 +112,7 @@ class LogStash::Inputs::EventLog < LogStash::Inputs::Base # parse the utc date string /(?\d{8})(?\d{6})\.\d{6}(?[\+-])(?\d{3})/ =~ wmi_time result = "#{w_date}T#{w_time}#{w_sign}" - # the offset is represented by the difference, in minutes, + # the offset is represented by the difference, in minutes, # between the local time zone and Greenwich Mean Time (GMT). if w_diff.to_i > 0 # calculate the timezone offset in hours and minutes @@ -121,8 +122,8 @@ class LogStash::Inputs::EventLog < LogStash::Inputs::Base else result.concat("0000") end - - return DateTime.strptime(result, "%Y%m%dT%H%M%S%z").iso8601 + + return LogStash::Timestamp.new(DateTime.strptime(result, "%Y%m%dT%H%M%S%z").to_time) end end # class LogStash::Inputs::EventLog diff --git a/lib/logstash/inputs/gelf.rb b/lib/logstash/inputs/gelf.rb index 777c3ce87..741c8b3a1 100644 --- a/lib/logstash/inputs/gelf.rb +++ b/lib/logstash/inputs/gelf.rb @@ -2,6 +2,8 @@ require "date" require "logstash/inputs/base" require "logstash/namespace" +require "logstash/json" +require "logstash/timestamp" require "socket" # This input will read GELF messages as events over the network, @@ -89,10 +91,10 @@ class LogStash::Inputs::Gelf < LogStash::Inputs::Base # Gelfd parser outputs null if it received and cached a non-final chunk next if data.nil? - event = LogStash::Event.new(JSON.parse(data)) + event = LogStash::Event.new(LogStash::Json.load(data)) event["source_host"] = client[3] if event["timestamp"].is_a?(Numeric) - event["@timestamp"] = Time.at(event["timestamp"]).gmtime + event.timestamp = LogStash::Timestamp.at(event["timestamp"]) event.remove("timestamp") end remap_gelf(event) if @remap diff --git a/lib/logstash/inputs/graphite.rb b/lib/logstash/inputs/graphite.rb index e590cc3bd..c2f2f25f1 100644 --- a/lib/logstash/inputs/graphite.rb +++ b/lib/logstash/inputs/graphite.rb @@ -1,6 +1,7 @@ # encoding: utf-8 require "logstash/inputs/tcp" require "logstash/namespace" +require "logstash/timestamp" # Receive graphite metrics. This plugin understands the text-based graphite # carbon protocol. Both 'N' and specific-timestamp forms are supported, example: @@ -18,8 +19,6 @@ class LogStash::Inputs::Graphite < LogStash::Inputs::Tcp config_name "graphite" milestone 1 - ISO8601_STRFTIME = "%04d-%02d-%02dT%02d:%02d:%02d.%06d%+03d:00".freeze - public def run(output_queue) @queue = output_queue @@ -33,7 +32,7 @@ class LogStash::Inputs::Graphite < LogStash::Inputs::Tcp event[name] = value.to_f if time != "N" - event["@timestamp"] = Time.at(time.to_i).gmtime + event.timestamp = LogStash::Timestamp.at(time.to_i) end @queue << event diff --git a/lib/logstash/inputs/imap.rb b/lib/logstash/inputs/imap.rb index ec2c9bccb..e980c9e9d 100644 --- a/lib/logstash/inputs/imap.rb +++ b/lib/logstash/inputs/imap.rb @@ -1,6 +1,7 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" +require "logstash/timestamp" require "stud/interval" require "socket" # for Socket.gethostname @@ -11,7 +12,6 @@ require "socket" # for Socket.gethostname class LogStash::Inputs::IMAP < LogStash::Inputs::Base config_name "imap" milestone 1 - ISO8601_STRFTIME = "%04d-%02d-%02dT%02d:%02d:%02d.%06d%+03d:00".freeze default :codec, "plain" @@ -106,7 +106,7 @@ class LogStash::Inputs::IMAP < LogStash::Inputs::Base # event = LogStash::Event.new("message" => message) # Use the 'Date' field as the timestamp - event["@timestamp"] = mail.date.to_time.gmtime + event.timestamp = LogStash::Timestamp.new(mail.date.to_time) # Add fields: Add message.header_fields { |h| h.name=> h.value } mail.header_fields.each do |header| diff --git a/lib/logstash/inputs/sqs.rb b/lib/logstash/inputs/sqs.rb index c0d3e7560..8b0599aa6 100644 --- a/lib/logstash/inputs/sqs.rb +++ b/lib/logstash/inputs/sqs.rb @@ -1,12 +1,13 @@ # encoding: utf-8 require "logstash/inputs/threadable" require "logstash/namespace" +require "logstash/timestamp" require "logstash/plugin_mixins/aws_config" require "digest/sha2" # Pull events from an Amazon Web Services Simple Queue Service (SQS) queue. # -# SQS is a simple, scalable queue system that is part of the +# SQS is a simple, scalable queue system that is part of the # Amazon Web Services suite of tools. # # Although SQS is similar to other queuing systems like AMQP, it @@ -52,7 +53,7 @@ require "digest/sha2" # ] # } # ] -# } +# } # # See http://aws.amazon.com/iam/ for more details on setting up AWS identities. # @@ -124,7 +125,7 @@ class LogStash::Inputs::SQS < LogStash::Inputs::Threadable event[@md5_field] = message.md5 end if @sent_timestamp_field - event[@sent_timestamp_field] = message.sent_timestamp.utc + event[@sent_timestamp_field] = LogStash::Timestamp.new(message.sent_timestamp).utc end @logger.debug? && @logger.debug("Processed SQS message", :message_id => message.id, :message_md5 => message.md5, :sent_timestamp => message.sent_timestamp, :queue => @queue) output_queue << event diff --git a/lib/logstash/inputs/twitter.rb b/lib/logstash/inputs/twitter.rb index fecd0755c..333b21a56 100644 --- a/lib/logstash/inputs/twitter.rb +++ b/lib/logstash/inputs/twitter.rb @@ -1,7 +1,7 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" -require "json" +require "logstash/timestamp" # Read events from the twitter streaming api. class LogStash::Inputs::Twitter < LogStash::Inputs::Base @@ -32,7 +32,7 @@ class LogStash::Inputs::Twitter < LogStash::Inputs::Base # will create an oauth token and secret bound to your account and that # application. config :oauth_token, :validate => :string, :required => true - + # Your oauth token secret. # # To get this, login to twitter with whatever account you want, @@ -67,12 +67,11 @@ class LogStash::Inputs::Twitter < LogStash::Inputs::Base @client.filter(:track => @keywords.join(",")) do |tweet| @logger.info? && @logger.info("Got tweet", :user => tweet.user.screen_name, :text => tweet.text) if @full_tweet - event = LogStash::Event.new( - tweet.to_hash.merge("@timestamp" => tweet.created_at.gmtime) - ) + event = LogStash::Event.new(tweet.to_hash) + event.timestamp = LogStash::Timestamp.new(tweet.created_at) else event = LogStash::Event.new( - "@timestamp" => tweet.created_at.gmtime, + LogStash::Event::TIMESTAMP => LogStash::Timestamp.new(tweet.created_at), "message" => tweet.full_text, "user" => tweet.user.screen_name, "client" => tweet.source, diff --git a/lib/logstash/java_integration.rb b/lib/logstash/java_integration.rb new file mode 100644 index 000000000..2bfeb3e81 --- /dev/null +++ b/lib/logstash/java_integration.rb @@ -0,0 +1,41 @@ +require "java" + +# this is mainly for usage with JrJackson json parsing in :raw mode which genenerates +# Java::JavaUtil::ArrayList and Java::JavaUtil::LinkedHashMap native objects for speed. +# these object already quacks like their Ruby equivalents Array and Hash but they will +# not test for is_a?(Array) or is_a?(Hash) and we do not want to include tests for +# both classes everywhere. see LogStash::JSon. + +class Java::JavaUtil::ArrayList + # have ArrayList objects report is_a?(Array) == true + def is_a?(clazz) + return true if clazz == Array + super + end +end + +class Java::JavaUtil::LinkedHashMap + # have LinkedHashMap objects report is_a?(Array) == true + def is_a?(clazz) + return true if clazz == Hash + super + end +end + +class Array + # enable class equivalence between Array and ArrayList + # so that ArrayList will work with case o when Array ... + def self.===(other) + return true if other.is_a?(Java::JavaUtil::ArrayList) + super + end +end + +class Hash + # enable class equivalence between Hash and LinkedHashMap + # so that LinkedHashMap will work with case o when Hash ... + def self.===(other) + return true if other.is_a?(Java::JavaUtil::LinkedHashMap) + super + end +end diff --git a/lib/logstash/json.rb b/lib/logstash/json.rb new file mode 100644 index 000000000..d7fce4397 --- /dev/null +++ b/lib/logstash/json.rb @@ -0,0 +1,53 @@ +# encoding: utf-8 +require "logstash/environment" +require "logstash/errors" +if LogStash::Environment.jruby? + require "jrjackson" + require "logstash/java_integration" +else + require "oj" +end + +module LogStash + module Json + class ParserError < LogStash::Error; end + class GeneratorError < LogStash::Error; end + + extend self + + ### MRI + + def mri_load(data) + Oj.load(data) + rescue Oj::ParseError => e + raise LogStash::Json::ParserError.new(e.message) + end + + def mri_dump(o) + Oj.dump(o, :mode => :compat, :use_to_json => true) + rescue => e + raise LogStash::Json::GeneratorError.new(e.message) + end + + ### JRuby + + def jruby_load(data) + JrJackson::Raw.parse_raw(data) + rescue JrJackson::ParseError => e + raise LogStash::Json::ParserError.new(e.message) + end + + def jruby_dump(o) + # test for enumerable here to work around an omission in JrJackson::Json.dump to + # also look for Java::JavaUtil::ArrayList, see TODO submit issue + o.is_a?(Enumerable) ? JrJackson::Raw.generate(o) : JrJackson::Json.dump(o) + rescue => e + raise LogStash::Json::GeneratorError.new(e.message) + end + + prefix = LogStash::Environment.jruby? ? "jruby" : "mri" + alias_method :load, "#{prefix}_load".to_sym + alias_method :dump, "#{prefix}_dump".to_sym + + end +end diff --git a/lib/logstash/outputs/csv.rb b/lib/logstash/outputs/csv.rb index d35cb67c0..42daa155b 100644 --- a/lib/logstash/outputs/csv.rb +++ b/lib/logstash/outputs/csv.rb @@ -1,6 +1,7 @@ require "csv" require "logstash/namespace" require "logstash/outputs/file" +require "logstash/json" # CSV output. # @@ -17,8 +18,8 @@ class LogStash::Outputs::CSV < LogStash::Outputs::File # If a field does not exist on the event, an empty string will be written. # Supports field reference syntax eg: `fields => ["field1", "[nested][field]"]`. config :fields, :validate => :array, :required => true - - # Options for CSV output. This is passed directly to the Ruby stdlib to\_csv function. + + # Options for CSV output. This is passed directly to the Ruby stdlib to\_csv function. # Full documentation is available here: [http://ruby-doc.org/stdlib-2.0.0/libdoc/csv/rdoc/index.html]. # A typical use case would be to use alternative column or row seperators eg: `csv_options => {"col_sep" => "\t" "row_sep" => "\r\n"}` gives tab seperated data with windows line endings config :csv_options, :validate => :hash, :required => false, :default => Hash.new @@ -26,7 +27,7 @@ class LogStash::Outputs::CSV < LogStash::Outputs::File public def register super - @csv_options = Hash[@csv_options.map{|(k,v)|[k.to_sym, v]}] + @csv_options = Hash[@csv_options.map{|(k, v)|[k.to_sym, v]}] end public @@ -44,12 +45,7 @@ class LogStash::Outputs::CSV < LogStash::Outputs::File private def get_value(name, event) val = event[name] - case val - when Hash - return val.to_json - else - return val - end + val.is_a?(Hash) ? LogStash::Json.dump(val) : val end end # class LogStash::Outputs::CSV diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 2e6458b09..d42eb9eee 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -2,6 +2,7 @@ require "logstash/namespace" require "logstash/environment" require "logstash/outputs/base" +require "logstash/json" require "stud/buffer" require "socket" # for Socket.gethostname @@ -276,7 +277,7 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base end template_json = IO.read(@template).gsub(/\n/,'') @logger.info("Using mapping template", :template => template_json) - return JSON.parse(template_json) + return LogStash::Json.load(template_json) end # def get_template protected diff --git a/lib/logstash/outputs/elasticsearch/protocol.rb b/lib/logstash/outputs/elasticsearch/protocol.rb index 876598ec4..1a860e945 100644 --- a/lib/logstash/outputs/elasticsearch/protocol.rb +++ b/lib/logstash/outputs/elasticsearch/protocol.rb @@ -80,7 +80,7 @@ module LogStash::Outputs::Elasticsearch bulk_ftw(actions) end end - + def bulk_esruby(actions) @client.bulk(:body => actions.collect do |action, args, source| if source @@ -97,9 +97,9 @@ module LogStash::Outputs::Elasticsearch body = actions.collect do |action, args, source| header = { action => args } if source - next [ header.to_json, NEWLINE, source.to_json, NEWLINE ] + next [ LogStash::Json.dump(header), NEWLINE, LogStash::Json.dump(source), NEWLINE ] else - next [ header.to_json, NEWLINE ] + next [ LogStash::Json.dump(header), NEWLINE ] end end.flatten.join("") begin @@ -170,7 +170,7 @@ module LogStash::Outputs::Elasticsearch @settings.put("node.client", true) @settings.put("http.enabled", false) - + if options[:client_settings] options[:client_settings].each do |key, value| @settings.put(key, value) @@ -182,7 +182,7 @@ module LogStash::Outputs::Elasticsearch def hosts(options) if options[:port].to_s =~ /^\d+-\d+$/ - # port ranges are 'host[port1-port2]' according to + # port ranges are 'host[port1-port2]' according to # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ # However, it seems to only query the first port. # So generate our own list of unicast hosts to scan. @@ -234,7 +234,7 @@ module LogStash::Outputs::Elasticsearch def template_put(name, template) request = org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder.new(@client.admin.indices, name) - request.setSource(template.to_json) + request.setSource(LogStash::Json.dump(template)) # execute the request and get the response, if it fails, we'll get an exception. request.get diff --git a/lib/logstash/outputs/elasticsearch_http.rb b/lib/logstash/outputs/elasticsearch_http.rb index 496e108e8..124b73905 100644 --- a/lib/logstash/outputs/elasticsearch_http.rb +++ b/lib/logstash/outputs/elasticsearch_http.rb @@ -1,6 +1,7 @@ # encoding: utf-8 require "logstash/namespace" require "logstash/outputs/base" +require "logstash/json" require "stud/buffer" # This output lets you store logs in Elasticsearch. @@ -123,7 +124,7 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base elsif response.status == 200 begin response.read_body { |c| json << c } - results = JSON.parse(json) + results = LogStash::Json.load(json) rescue Exception => e @logger.error("Error parsing JSON", :json => json, :results => results.to_s, :error => e.to_s) raise "Exception in parsing JSON", e @@ -204,7 +205,7 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base header = { "index" => { "_index" => index, "_type" => type } } header["index"]["_id"] = event.sprintf(@document_id) if !@document_id.nil? - [ header.to_json, newline, event.to_json, newline ] + [ LogStash::Json.dump(header), newline, event.to_json, newline ] end.flatten post(body.join("")) diff --git a/lib/logstash/outputs/elasticsearch_river.rb b/lib/logstash/outputs/elasticsearch_river.rb index c3ac9c6b5..365d2b53e 100644 --- a/lib/logstash/outputs/elasticsearch_river.rb +++ b/lib/logstash/outputs/elasticsearch_river.rb @@ -2,7 +2,7 @@ require "logstash/environment" require "logstash/namespace" require "logstash/outputs/base" -require "json" +require "logstash/json" require "uri" require "net/http" @@ -146,7 +146,7 @@ class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base @logger.info("ElasticSearch using river", :config => river_config) Net::HTTP.start(@es_host, @es_port) do |http| req = Net::HTTP::Put.new(api_path) - req.body = river_config.to_json + req.body = LogStash::Json.dump(river_config) response = http.request(req) response.value() # raise an exception if error @logger.info("River created: #{response.body}") @@ -173,7 +173,7 @@ class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base req = Net::HTTP::Get.new(@status_path) response = http.request(req) response.value - status = JSON.parse(response.body) + status = LogStash::Json.load(response.body) @logger.debug("Checking ES river status", :status => status) if status["_source"]["error"] reason = "ES river status: #{status["_source"]["error"]}" @@ -201,6 +201,6 @@ class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base header["index"]["_id"] = event.sprintf(@document_id) end - @mq.publish_serialized(header.to_json + "\n" + event.to_json + "\n") + @mq.publish_serialized(LogStash::Json.dump(header) + "\n" + event.to_json + "\n") end # def receive end # LogStash::Outputs::ElasticSearchRiver diff --git a/lib/logstash/outputs/gelf.rb b/lib/logstash/outputs/gelf.rb index e6c072558..1d5dc161f 100644 --- a/lib/logstash/outputs/gelf.rb +++ b/lib/logstash/outputs/gelf.rb @@ -202,7 +202,7 @@ class LogStash::Outputs::Gelf < LogStash::Outputs::Base @logger.debug(["Sending GELF event", m]) begin - @gelf.notify!(m, :timestamp => event["@timestamp"].to_f) + @gelf.notify!(m, :timestamp => event.timestamp.to_f) rescue @logger.warn("Trouble sending GELF event", :gelf_event => m, :event => event, :error => $!) diff --git a/lib/logstash/outputs/http.rb b/lib/logstash/outputs/http.rb index 029fc961a..8b955d169 100644 --- a/lib/logstash/outputs/http.rb +++ b/lib/logstash/outputs/http.rb @@ -1,6 +1,7 @@ # encoding: utf-8 require "logstash/outputs/base" require "logstash/namespace" +require "logstash/json" class LogStash::Outputs::Http < LogStash::Outputs::Base # This output lets you `PUT` or `POST` events to a @@ -102,7 +103,7 @@ class LogStash::Outputs::Http < LogStash::Outputs::Base else @logger.error("Unknown verb:", :verb => @http_method) end - + if @headers @headers.each do |k,v| request.headers[k] = event.sprintf(v) @@ -113,7 +114,7 @@ class LogStash::Outputs::Http < LogStash::Outputs::Base begin if @format == "json" - request.body = evt.to_json + request.body = LogStash::Json.dump(evt) elsif @format == "message" request.body = event.sprintf(@message) else @@ -121,7 +122,7 @@ class LogStash::Outputs::Http < LogStash::Outputs::Base end #puts "#{request.port} / #{request.protocol}" #puts request - #puts + #puts #puts request.body response = @agent.execute(request) diff --git a/lib/logstash/outputs/juggernaut.rb b/lib/logstash/outputs/juggernaut.rb index e41f89217..f6985430b 100644 --- a/lib/logstash/outputs/juggernaut.rb +++ b/lib/logstash/outputs/juggernaut.rb @@ -2,6 +2,7 @@ require "logstash/outputs/base" require "logstash/namespace" require "logstash/event" +require "logstash/json" # Push messages to the juggernaut websockets server: # @@ -85,7 +86,7 @@ class LogStash::Outputs::Juggernaut < LogStash::Outputs::Base "data" => event["message"] } - @redis.publish 'juggernaut', juggernaut_message.to_json + @redis.publish 'juggernaut', LogStash::Json.dump(juggernaut_message) rescue => e @logger.warn("Failed to send event to redis", :event => event, :identity => identity, :exception => e, diff --git a/lib/logstash/outputs/pagerduty.rb b/lib/logstash/outputs/pagerduty.rb index 1c67e9a02..e48519862 100644 --- a/lib/logstash/outputs/pagerduty.rb +++ b/lib/logstash/outputs/pagerduty.rb @@ -1,13 +1,14 @@ # encoding: utf-8 require "logstash/outputs/base" require "logstash/namespace" +require "logstash/json" -# The PagerDuty output will send notifications based on pre-configured services +# The PagerDuty output will send notifications based on pre-configured services # and escalation policies. Logstash can send "trigger", "acknowledge" and "resolve" # event types. In addition, you may configure custom descriptions and event details. # The only required field is the PagerDuty "Service API Key", which can be found on # the service's web page on pagerduty.com. In the default case, the description and -# event details will be populated by Logstash, using `message`, `timestamp` and `host` data. +# event details will be populated by Logstash, using `message`, `timestamp` and `host` data. class LogStash::Outputs::PagerDuty < LogStash::Outputs::Base config_name "pagerduty" milestone 1 @@ -49,7 +50,7 @@ class LogStash::Outputs::PagerDuty < LogStash::Outputs::Base public def receive(event) return unless output?(event) - + pd_event = Hash.new pd_event[:service_key] = "#{@service_key}" pd_event[:incident_key] = event.sprintf(@incident_key) @@ -65,7 +66,7 @@ class LogStash::Outputs::PagerDuty < LogStash::Outputs::Base @logger.info("PD Event", :event => pd_event) begin request = Net::HTTP::Post.new(@pd_uri.path) - request.body = pd_event.to_json + request.body = LogStash::Json.dump(pd_event) @logger.debug("PD Request", :request => request.inspect) response = @client.request(request) @logger.debug("PD Response", :response => response.body) diff --git a/lib/logstash/outputs/rabbitmq/bunny.rb b/lib/logstash/outputs/rabbitmq/bunny.rb index 3a59234d2..cc83eacc0 100644 --- a/lib/logstash/outputs/rabbitmq/bunny.rb +++ b/lib/logstash/outputs/rabbitmq/bunny.rb @@ -1,4 +1,7 @@ # encoding: utf-8 + +require "logstash/json" + class LogStash::Outputs::RabbitMQ module BunnyImpl @@ -24,7 +27,7 @@ class LogStash::Outputs::RabbitMQ begin publish_serialized(event.to_json, key) - rescue JSON::GeneratorError => e + rescue LogStash::Json::GeneratorError => e @logger.warn("Trouble converting event to JSON", :exception => e, :event => event) end diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index e3a47b41b..c493d29b5 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -3,13 +3,13 @@ require "logstash/outputs/base" require "logstash/namespace" require "socket" # for Socket.gethostname -# TODO integrate aws_config in the future +# TODO integrate aws_config in the future #require "logstash/plugin_mixins/aws_config" # INFORMATION: # This plugin was created for store the logstash's events into Amazon Simple Storage Service (Amazon S3). -# For use it you needs authentications and an s3 bucket. +# For use it you needs authentications and an s3 bucket. # Be careful to have the permission to write file on S3's bucket and run logstash with super user for establish connection. # S3 plugin allows you to do something complex, let's explain:) @@ -23,9 +23,9 @@ require "socket" # for Socket.gethostname # "ip-10-228-27-95" : indicate you ip machine, if you have more logstash and writing on the same bucket for example. # "2013-04-18T10.00" : represents the time whenever you specify time_file. -# "tag_hello" : this indicate the event's tag, you can collect events with the same tag. -# "part0" : this means if you indicate size_file then it will generate more parts if you file.size > size_file. -# When a file is full it will pushed on bucket and will be deleted in temporary directory. +# "tag_hello" : this indicate the event's tag, you can collect events with the same tag. +# "part0" : this means if you indicate size_file then it will generate more parts if you file.size > size_file. +# When a file is full it will pushed on bucket and will be deleted in temporary directory. # If a file is empty is not pushed, but deleted. # This plugin have a system to restore the previous temporary files if something crash. @@ -35,7 +35,7 @@ require "socket" # for Socket.gethostname ## If you specify size_file and time_file then it will create file for each tag (if specified), when time_file or ## their size > size_file, it will be triggered then they will be pushed on s3's bucket and will delete from local disk. -## If you don't specify size_file, but time_file then it will create only one file for each tag (if specified). +## If you don't specify size_file, but time_file then it will create only one file for each tag (if specified). ## When time_file it will be triggered then the files will be pushed on s3's bucket and delete from local disk. ## If you don't specify time_file, but size_file then it will create files for each tag (if specified), @@ -46,15 +46,15 @@ require "socket" # for Socket.gethostname # INFORMATION ABOUT CLASS: -# I tried to comment the class at best i could do. +# I tried to comment the class at best i could do. # I think there are much thing to improve, but if you want some points to develop here a list: -# TODO Integrate aws_config in the future +# TODO Integrate aws_config in the future # TODO Find a method to push them all files when logtstash close the session. # TODO Integrate @field on the path file -# TODO Permanent connection or on demand? For now on demand, but isn't a good implementation. +# TODO Permanent connection or on demand? For now on demand, but isn't a good implementation. # Use a while or a thread to try the connection before break a time_out and signal an error. -# TODO If you have bugs report or helpful advice contact me, but remember that this code is much mine as much as yours, +# TODO If you have bugs report or helpful advice contact me, but remember that this code is much mine as much as yours, # try to work on it if you want :) @@ -63,30 +63,30 @@ require "socket" # for Socket.gethostname # This is an example of logstash config: # output { -# s3{ +# s3{ # access_key_id => "crazy_key" (required) # secret_access_key => "monkey_access_key" (required) # endpoint_region => "eu-west-1" (required) -# bucket => "boss_please_open_your_bucket" (required) +# bucket => "boss_please_open_your_bucket" (required) # size_file => 2048 (optional) # time_file => 5 (optional) -# format => "plain" (optional) +# format => "plain" (optional) # canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" ) # } # } # We analize this: -# access_key_id => "crazy_key" +# access_key_id => "crazy_key" # Amazon will give you the key for use their service if you buy it or try it. (not very much open source anyway) # secret_access_key => "monkey_access_key" # Amazon will give you the secret_access_key for use their service if you buy it or try it . (not very much open source anyway). -# endpoint_region => "eu-west-1" +# endpoint_region => "eu-west-1" # When you make a contract with Amazon, you should know where the services you use. -# bucket => "boss_please_open_your_bucket" +# bucket => "boss_please_open_your_bucket" # Be careful you have the permission to write on bucket and know the name. # size_file => 2048 @@ -95,7 +95,7 @@ require "socket" # for Socket.gethostname # time_file => 5 # Means, in minutes, the time before the files will be pushed on bucket. Is useful if you want to push the files every specific time. - + # format => "plain" # Means the format of events you want to store in the files @@ -105,7 +105,7 @@ require "socket" # for Socket.gethostname # LET'S ROCK AND ROLL ON THE CODE! class LogStash::Outputs::S3 < LogStash::Outputs::Base - #TODO integrate aws_config in the future + #TODO integrate aws_config in the future # include LogStash::PluginMixins::AwsConfig config_name "s3" @@ -113,7 +113,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base # Aws access_key. config :access_key_id, :validate => :string - + # Aws secret_access_key config :secret_access_key, :validate => :string @@ -125,24 +125,24 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base "eu-west-1", "ap-southeast-1", "ap-southeast-2", "ap-northeast-1", "sa-east-1", "us-gov-west-1"], :default => "us-east-1" - # Set the size of file in KB, this means that files on bucket when have dimension > file_size, they are stored in two or more file. + # Set the size of file in KB, this means that files on bucket when have dimension > file_size, they are stored in two or more file. # If you have tags then it will generate a specific size file for every tags - ##NOTE: define size of file is the better thing, because generate a local temporary file on disk and then put it in bucket. + ##NOTE: define size of file is the better thing, because generate a local temporary file on disk and then put it in bucket. config :size_file, :validate => :number, :default => 0 - # Set the time, in minutes, to close the current sub_time_section of bucket. + # Set the time, in minutes, to close the current sub_time_section of bucket. # If you define file_size you have a number of files in consideration of the section and the current tag. # 0 stay all time on listerner, beware if you specific 0 and size_file 0, because you will not put the file on bucket, # for now the only thing this plugin can do is to put the file when logstash restart. - config :time_file, :validate => :number, :default => 0 - + config :time_file, :validate => :number, :default => 0 + # The event format you want to store in files. Defaults to plain text. config :format, :validate => [ "json", "plain", "nil" ], :default => "plain" ## IMPORTANT: if you use multiple instance of s3, you should specify on one of them the "restore=> true" and on the others "restore => false". - ## This is hack for not destroy the new files after restoring the initial files. + ## This is hack for not destroy the new files after restoring the initial files. ## If you do not specify "restore => true" when logstash crashes or is restarted, the files are not sent into the bucket, - ## for example if you have single Instance. + ## for example if you have single Instance. config :restore, :validate => :boolean, :default => false # Aws canned ACL @@ -161,7 +161,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base :secret_access_key => @secret_access_key, :s3_endpoint => @endpoint_region ) - @s3 = AWS::S3.new + @s3 = AWS::S3.new end @@ -181,9 +181,9 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base # this method is used for write files on bucket. It accept the file and the name of file. def write_on_bucket (file_data, file_basename) - + # if you lose connection with s3, bad control implementation. - if ( @s3 == nil) + if ( @s3 == nil) aws_s3_config end @@ -195,31 +195,31 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base # prepare for write the file object = bucket.objects[file_basename] object.write(:file => file_data, :acl => @canned_acl) - + @logger.debug "S3: has written "+file_basename+" in bucket "+@bucket + " with canned ACL \"" + @canned_acl + "\"" end - + # this method is used for create new path for name the file def getFinalPath - - @pass_time = Time.now + + @pass_time = Time.now return @temp_directory+"ls.s3."+Socket.gethostname+"."+(@pass_time).strftime("%Y-%m-%dT%H.%M") end - # This method is used for restore the previous crash of logstash or to prepare the files to send in bucket. - # Take two parameter: flag and name. Flag indicate if you want to restore or not, name is the name of file + # This method is used for restore the previous crash of logstash or to prepare the files to send in bucket. + # Take two parameter: flag and name. Flag indicate if you want to restore or not, name is the name of file def upFile(flag, name) - + Dir[@temp_directory+name].each do |file| name_file = File.basename(file) - + if (flag == true) @logger.warn "S3: have found temporary file: "+name_file+", something has crashed before... Prepare for upload in bucket!" end - - if (!File.zero?(file)) + + if (!File.zero?(file)) write_on_bucket(file, name_file) if (flag == true) @@ -236,7 +236,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base # This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file. def newFile (flag) - + if (flag == true) @current_final_path = getFinalPath @sizeCounter = 0 @@ -258,7 +258,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base if (@tags.size != 0) @tag_path = "" for i in (0..@tags.size-1) - @tag_path += @tags[i].to_s+"." + @tag_path += @tags[i].to_s+"." end end @@ -267,16 +267,16 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base Dir.mkdir(@temp_directory) else @logger.debug "S3: Directory "+@temp_directory+" exist, nothing to do" - end - + end + if (@restore == true ) @logger.debug "S3: is attempting to verify previous crashes..." - - upFile(true, "*.txt") + + upFile(true, "*.txt") end - + newFile(true) - + if (time_file != 0) first_time = true @thread = time_alert(@time_file*60) do @@ -289,14 +289,14 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base end end end - + end - + public def receive(event) return unless output?(event) - - # Prepare format of Events + + # Prepare format of Events if (@format == "plain") message = self.class.format_message(event) elsif (@format == "json") @@ -304,20 +304,20 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base else message = event.to_s end - + if(time_file !=0) @logger.debug "S3: trigger files after "+((@pass_time+60*time_file)-Time.now).to_s end # if specific the size if(size_file !=0) - + if (@tempFile.size < @size_file ) @logger.debug "S3: File have size: "+@tempFile.size.to_s+" and size_file is: "+ @size_file.to_s @logger.debug "S3: put event into: "+File.basename(@tempFile) - # Put the event in the file, now! + # Put the event in the file, now! File.open(@tempFile, 'a') do |file| file.puts message file.write "\n" @@ -331,8 +331,8 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base newFile(false) end - - # else we put all in one file + + # else we put all in one file else @logger.debug "S3: put event into "+File.basename(@tempFile) @@ -341,11 +341,11 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base file.write "\n" end end - + end def self.format_message(event) - message = "Date: #{event["@timestamp"]}\n" + message = "Date: #{event[LogStash::Event::TIMESTAMP]}\n" message << "Source: #{event["source"]}\n" message << "Tags: #{event["tags"].join(', ')}\n" message << "Fields: #{event.to_hash.inspect}\n" diff --git a/lib/logstash/outputs/sns.rb b/lib/logstash/outputs/sns.rb index ed91b6b25..f882f65de 100644 --- a/lib/logstash/outputs/sns.rb +++ b/lib/logstash/outputs/sns.rb @@ -112,7 +112,7 @@ class LogStash::Outputs::Sns < LogStash::Outputs::Base end def self.format_message(event) - message = "Date: #{event["@timestamp"]}\n" + message = "Date: #{event.timestamp}\n" message << "Source: #{event["source"]}\n" message << "Tags: #{event["tags"].join(', ')}\n" message << "Fields: #{event.to_hash.inspect}\n" diff --git a/lib/logstash/plugin.rb b/lib/logstash/plugin.rb index 418ccb8a9..ce8de95a5 100644 --- a/lib/logstash/plugin.rb +++ b/lib/logstash/plugin.rb @@ -8,6 +8,8 @@ class LogStash::Plugin attr_accessor :params attr_accessor :logger + NL = "\n" + public def hash params.hash ^ diff --git a/lib/logstash/time_addon.rb b/lib/logstash/time_addon.rb deleted file mode 100644 index a5970332d..000000000 --- a/lib/logstash/time_addon.rb +++ /dev/null @@ -1,25 +0,0 @@ -# encoding: utf-8 -require "logstash/namespace" - -module LogStash::Time - ISO8601_STRFTIME = "%04d-%02d-%02dT%02d:%02d:%02d.%06d%+03d:00".freeze - def self.now - return Time.new.utc - end - - if RUBY_PLATFORM == "java" - JODA_ISO8601_PARSER = org.joda.time.format.ISODateTimeFormat.dateTimeParser - #JODA_ISO8601_PARSER = org.joda.time.format.DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ") - UTC = org.joda.time.DateTimeZone.forID("UTC") - def self.parse_iso8601(t) - millis = JODA_ISO8601_PARSER.parseMillis(t) - return Time.at(millis / 1000, (millis % 1000) * 1000) - end - else - def self.parse_iso8601(t) - # Warning, ruby's Time.parse is *really* terrible and slow. - return unless t.is_a?(String) - return Time.parse(t).gmtime - end - end -end # module LogStash::Time diff --git a/lib/logstash/timestamp.rb b/lib/logstash/timestamp.rb new file mode 100644 index 000000000..a96dfb02c --- /dev/null +++ b/lib/logstash/timestamp.rb @@ -0,0 +1,92 @@ +# encoding: utf-8 +require "logstash/environment" +require "logstash/json" +require "forwardable" +require "date" +require "time" + +module LogStash + class TimestampParserError < StandardError; end + + class Timestamp + extend Forwardable + + def_delegators :@time, :tv_usec, :usec, :year, :iso8601, :to_i, :tv_sec, :to_f, :to_edn + + attr_reader :time + + ISO8601_STRFTIME = "%04d-%02d-%02dT%02d:%02d:%02d.%06d%+03d:00".freeze + ISO8601_PRECISION = 3 + + def initialize(time = Time.new) + @time = time.utc + end + + def self.at(*args) + Timestamp.new(::Time.at(*args)) + end + + def self.parse(*args) + Timestamp.new(::Time.parse(*args)) + end + + def self.now + Timestamp.new(::Time.now) + end + + # coerce tries different strategies based on the time object class to convert into a Timestamp. + # @param [String, Time, Timestamp] time the time object to try coerce + # @return [Timestamp, nil] Timestamp will be returned if successful otherwise nil + # @raise [TimestampParserError] on String with invalid format + def self.coerce(time) + case time + when String + LogStash::Timestamp.parse_iso8601(time) + when LogStash::Timestamp + time + when Time + LogStash::Timestamp.new(time) + else + nil + end + end + + if LogStash::Environment.jruby? + JODA_ISO8601_PARSER = org.joda.time.format.ISODateTimeFormat.dateTimeParser + UTC = org.joda.time.DateTimeZone.forID("UTC") + + def self.parse_iso8601(t) + millis = JODA_ISO8601_PARSER.parseMillis(t) + LogStash::Timestamp.at(millis / 1000, (millis % 1000) * 1000) + rescue => e + raise(TimestampParserError, "invalid timestamp string #{t.inspect}, error=#{e.inspect}") + end + + else + + def self.parse_iso8601(t) + # warning, ruby's Time.parse is *really* terrible and slow. + LogStash::Timestamp.new(::Time.parse(t)) + rescue => e + raise(TimestampParserError, "invalid timestamp string #{t.inspect}, error=#{e.inspect}") + end + end + + def utc + @time.utc # modifies the receiver + self + end + alias_method :gmtime, :utc + + def to_json + LogStash::Json.dump(@time.iso8601(ISO8601_PRECISION)) + end + alias_method :inspect, :to_json + + def to_iso8601 + @time.iso8601(ISO8601_PRECISION) + end + alias_method :to_s, :to_iso8601 + + end +end diff --git a/lib/logstash/util.rb b/lib/logstash/util.rb index 76b5926b3..1ce6cd00e 100644 --- a/lib/logstash/util.rb +++ b/lib/logstash/util.rb @@ -1,5 +1,6 @@ # encoding: utf-8 require "logstash/namespace" +require "logstash/environment" module LogStash::Util UNAME = case RbConfig::CONFIG["host_os"] @@ -14,7 +15,7 @@ module LogStash::Util Java::java.lang.Thread.currentThread.setName(name) end Thread.current[:name] = name - + if UNAME == "linux" require "logstash/util/prctl" # prctl PR_SET_NAME allows up to 16 bytes for a process name @@ -34,7 +35,7 @@ module LogStash::Util dvalue = dst[name] if dvalue.is_a?(Hash) && svalue.is_a?(Hash) dvalue = hash_merge(dvalue, svalue) - elsif svalue.is_a?(Array) + elsif svalue.is_a?(Array) if dvalue.is_a?(Array) # merge arrays without duplicates. dvalue |= svalue @@ -58,7 +59,7 @@ module LogStash::Util return dst end # def self.hash_merge - + # Merge hash 'src' into 'dst' nondestructively # # Duplicate keys will become array values @@ -71,7 +72,7 @@ module LogStash::Util dvalue = dst[name] if dvalue.is_a?(Hash) && svalue.is_a?(Hash) dvalue = hash_merge(dvalue, svalue) - elsif svalue.is_a?(Array) + elsif svalue.is_a?(Array) if dvalue.is_a?(Array) # merge arrays without duplicates. dvalue += svalue @@ -103,4 +104,37 @@ module LogStash::Util end return dst end # def hash_merge_many + + + # nomalize method definition based on platform. + # normalize is used to convert an object create through + # json deserialization from JrJackson in :raw mode to pure Ruby + # to support these pure Ruby object monkey patches. + # see logstash/json.rb and logstash/java_integration.rb + + if LogStash::Environment.jruby? + require "java" + + # recursively convert any Java LinkedHashMap and ArrayList to pure Ruby. + # will not recurse into pure Ruby objects. Pure Ruby object should never + # contain LinkedHashMap and ArrayList since these are only created at + # initial deserialization, anything after (deeper) will be pure Ruby. + def self.normalize(o) + case o + when Java::JavaUtil::LinkedHashMap + o.inject({}){|r, (k, v)| r[k] = normalize(v); r} + when Java::JavaUtil::ArrayList + o.map{|i| normalize(i)} + else + o + end + end + + else + + # identity function, pure Ruby object don't need normalization. + def self.normalize(o); o; end + end + + end # module LogStash::Util diff --git a/logstash.gemspec b/logstash.gemspec index 4917d83ed..702c5129e 100644 --- a/logstash.gemspec +++ b/logstash.gemspec @@ -17,7 +17,6 @@ Gem::Specification.new do |gem| # Core dependencies gem.add_runtime_dependency "cabin", [">=0.6.0"] #(Apache 2.0 license) - gem.add_runtime_dependency "json" #(ruby license) gem.add_runtime_dependency "minitest" # for running the tests from the jar, (MIT license) gem.add_runtime_dependency "pry" #(ruby license) gem.add_runtime_dependency "stud" #(Apache 2.0 license) @@ -68,9 +67,11 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "bouncy-castle-java", "1.5.0147" #(MIT license) gem.add_runtime_dependency "jruby-openssl", "0.8.7" #(CPL/GPL/LGPL license) gem.add_runtime_dependency "msgpack-jruby" #(Apache 2.0 license) + gem.add_runtime_dependency "jrjackson" #(Apache 2.0 license) else gem.add_runtime_dependency "excon" #(MIT license) gem.add_runtime_dependency "msgpack" #(Apache 2.0 license) + gem.add_runtime_dependency "oj" #(MIT-style license) end if RUBY_PLATFORM != 'java' diff --git a/spec/codecs/edn.rb b/spec/codecs/edn.rb index e04cc6595..5fa49e581 100644 --- a/spec/codecs/edn.rb +++ b/spec/codecs/edn.rb @@ -1,5 +1,6 @@ require "logstash/codecs/edn" require "logstash/event" +require "logstash/json" require "insist" require "edn" @@ -10,26 +11,46 @@ describe LogStash::Codecs::EDN do context "#decode" do it "should return an event from edn data" do - data = {"foo" => "bar", "baz" => {"bah" => ["a", "b", "c"]}} + data = {"foo" => "bar", "baz" => {"bah" => ["a", "b", "c"]}, "@timestamp" => "2014-05-30T02:52:17.929Z"} subject.decode(data.to_edn) do |event| insist { event }.is_a?(LogStash::Event) insist { event["foo"] } == data["foo"] insist { event["baz"] } == data["baz"] insist { event["bah"] } == data["bah"] + insist { event["@timestamp"].to_iso8601 } == data["@timestamp"] end end end context "#encode" do - it "should return edn data" do - data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}} + it "should return edn data from pure ruby hash" do + data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}, "@timestamp" => "2014-05-30T02:52:17.929Z"} event = LogStash::Event.new(data) got_event = false subject.on_event do |d| - insist { d.chomp } == LogStash::Event.new(data).to_hash.to_edn insist { EDN.read(d)["foo"] } == data["foo"] insist { EDN.read(d)["baz"] } == data["baz"] insist { EDN.read(d)["bah"] } == data["bah"] + insist { EDN.read(d)["@timestamp"] } == "2014-05-30T02:52:17.929Z" + got_event = true + end + subject.encode(event) + insist { got_event } + end + + # this is to test the case where the event data has been produced by json + # deserialization using JrJackson in :raw mode which creates Java LinkedHashMap + # and not Ruby Hash which will not be monkey patched with the #to_edn method + it "should return edn data from deserialized json with normalization" do + data = LogStash::Json.load('{"foo": "bar", "baz": {"bah": ["a","b","c"]}, "@timestamp": "2014-05-30T02:52:17.929Z"}') + event = LogStash::Event.new(data) + got_event = false + subject.on_event do |d| + insist { EDN.read(d)["foo"] } == data["foo"] + insist { EDN.read(d)["baz"] } == data["baz"] + insist { EDN.read(d)["bah"] } == data["bah"] + insist { EDN.read(d)["@timestamp"] } == "2014-05-30T02:52:17.929Z" + insist { EDN.read(d)["@timestamp"] } == event["@timestamp"].to_iso8601 got_event = true end subject.encode(event) diff --git a/spec/codecs/edn_lines.rb b/spec/codecs/edn_lines.rb index e5c1b711e..79a25ba84 100644 --- a/spec/codecs/edn_lines.rb +++ b/spec/codecs/edn_lines.rb @@ -1,5 +1,6 @@ require "logstash/codecs/edn_lines" require "logstash/event" +require "logstash/json" require "insist" require "edn" @@ -10,17 +11,18 @@ describe LogStash::Codecs::EDNLines do context "#decode" do it "should return an event from edn data" do - data = {"foo" => "bar", "baz" => {"bah" => ["a", "b", "c"]}} + data = {"foo" => "bar", "baz" => {"bah" => ["a", "b", "c"]}, "@timestamp" => "2014-05-30T02:52:17.929Z"} subject.decode(data.to_edn + "\n") do |event| insist { event }.is_a?(LogStash::Event) insist { event["foo"] } == data["foo"] insist { event["baz"] } == data["baz"] insist { event["bah"] } == data["bah"] + insist { event["@timestamp"].to_iso8601 } == data["@timestamp"] end end it "should return an event from edn data when a newline is recieved" do - data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}} + data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}, "@timestamp" => "2014-05-30T02:52:17.929Z"} subject.decode(data.to_edn) do |event| insist {false} end @@ -29,21 +31,39 @@ describe LogStash::Codecs::EDNLines do insist { event["foo"] } == data["foo"] insist { event["baz"] } == data["baz"] insist { event["bah"] } == data["bah"] + insist { event["@timestamp"].to_iso8601 } == data["@timestamp"] end end end context "#encode" do - it "should return edn data" do - data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}} + it "should return edn data from pure ruby hash" do + data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}, "@timestamp" => "2014-05-30T02:52:17.929Z"} event = LogStash::Event.new(data) got_event = false subject.on_event do |d| - insist { d.chomp } == LogStash::Event.new(data).to_hash.to_edn insist { EDN.read(d)["foo"] } == data["foo"] insist { EDN.read(d)["baz"] } == data["baz"] insist { EDN.read(d)["bah"] } == data["bah"] - got_event = true + insist { EDN.read(d)["@timestamp"] } == "2014-05-30T02:52:17.929Z" + insist { EDN.read(d)["@timestamp"] } == event["@timestamp"].to_iso8601 + got_event = true + end + subject.encode(event) + insist { got_event } + end + + it "should return edn data rom deserialized json with normalization" do + data = LogStash::Json.load('{"foo": "bar", "baz": {"bah": ["a","b","c"]}, "@timestamp": "2014-05-30T02:52:17.929Z"}') + event = LogStash::Event.new(data) + got_event = false + subject.on_event do |d| + insist { EDN.read(d)["foo"] } == data["foo"] + insist { EDN.read(d)["baz"] } == data["baz"] + insist { EDN.read(d)["bah"] } == data["bah"] + insist { EDN.read(d)["@timestamp"] } == "2014-05-30T02:52:17.929Z" + insist { EDN.read(d)["@timestamp"] } == event["@timestamp"].to_iso8601 + got_event = true end subject.encode(event) insist { got_event } diff --git a/spec/codecs/graphite.rb b/spec/codecs/graphite.rb index e5be954f1..8be9ef0af 100644 --- a/spec/codecs/graphite.rb +++ b/spec/codecs/graphite.rb @@ -17,7 +17,7 @@ describe LogStash::Codecs::Graphite do insist { event[name] } == value end end - + it "should return multiple events given multiple graphite formated lines" do total_count = Random.rand(20) names = Array.new(total_count) { Random.srand.to_s(36) } @@ -32,7 +32,7 @@ describe LogStash::Codecs::Graphite do end insist { counter } == total_count end - + it "should not return an event until newline is hit" do name = Random.srand.to_s(36) value = Random.rand*1000 @@ -50,7 +50,7 @@ describe LogStash::Codecs::Graphite do insist { event_returned } end end - + context "#encode" do it "should emit an graphite formatted line" do name = Random.srand.to_s(36) @@ -63,23 +63,23 @@ describe LogStash::Codecs::Graphite do end subject.encode(LogStash::Event.new("@timestamp" => timestamp)) end - + it "should treat fields as metrics if fields as metrics flag is set" do name = Random.srand.to_s(36) value = Random.rand*1000 - timestamp = Time.now.gmtime.to_i + timestamp = Time.now.gmtime subject.fields_are_metrics = true subject.on_event do |event| insist { event.is_a? String } insist { event } == "#{name} #{value} #{timestamp.to_i}\n" end subject.encode(LogStash::Event.new({name => value, "@timestamp" => timestamp})) - + #even if metrics param is set subject.metrics = {"foo" => 4} subject.encode(LogStash::Event.new({name => value, "@timestamp" => timestamp})) end - + it "should change the metric name format when metrics_format is set" do name = Random.srand.to_s(36) value = Random.rand*1000 diff --git a/spec/codecs/json.rb b/spec/codecs/json.rb index 11d879f35..4cb128534 100644 --- a/spec/codecs/json.rb +++ b/spec/codecs/json.rb @@ -1,5 +1,6 @@ require "logstash/codecs/json" require "logstash/event" +require "logstash/json" require "insist" describe LogStash::Codecs::JSON do @@ -10,7 +11,7 @@ describe LogStash::Codecs::JSON do context "#decode" do it "should return an event from json data" do data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}} - subject.decode(data.to_json) do |event| + subject.decode(LogStash::Json.dump(data)) do |event| insist { event.is_a? LogStash::Event } insist { event["foo"] } == data["foo"] insist { event["baz"] } == data["baz"] @@ -70,9 +71,9 @@ describe LogStash::Codecs::JSON do got_event = false subject.on_event do |d| insist { d.chomp } == LogStash::Event.new(data).to_json - insist { JSON.parse(d)["foo"] } == data["foo"] - insist { JSON.parse(d)["baz"] } == data["baz"] - insist { JSON.parse(d)["bah"] } == data["bah"] + insist { LogStash::Json.load(d)["foo"] } == data["foo"] + insist { LogStash::Json.load(d)["baz"] } == data["baz"] + insist { LogStash::Json.load(d)["bah"] } == data["bah"] got_event = true end subject.encode(event) diff --git a/spec/codecs/json_lines.rb b/spec/codecs/json_lines.rb index 40cdcba52..630e3fa7b 100644 --- a/spec/codecs/json_lines.rb +++ b/spec/codecs/json_lines.rb @@ -1,5 +1,6 @@ require "logstash/codecs/json_lines" require "logstash/event" +require "logstash/json" require "insist" describe LogStash::Codecs::JSONLines do @@ -10,17 +11,17 @@ describe LogStash::Codecs::JSONLines do context "#decode" do it "should return an event from json data" do data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}} - subject.decode(data.to_json+"\n") do |event| + subject.decode(LogStash::Json.dump(data) + "\n") do |event| insist { event.is_a? LogStash::Event } insist { event["foo"] } == data["foo"] insist { event["baz"] } == data["baz"] insist { event["bah"] } == data["bah"] end end - + it "should return an event from json data when a newline is recieved" do data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}} - subject.decode(data.to_json) do |event| + subject.decode(LogStash::Json.dump(data)) do |event| insist {false} end subject.decode("\n") do |event| @@ -64,10 +65,10 @@ describe LogStash::Codecs::JSONLines do event = LogStash::Event.new(data) got_event = false subject.on_event do |d| - insist { d.chomp } == LogStash::Event.new(data).to_json - insist { JSON.parse(d)["foo"] } == data["foo"] - insist { JSON.parse(d)["baz"] } == data["baz"] - insist { JSON.parse(d)["bah"] } == data["bah"] + insist { d } == "#{LogStash::Event.new(data).to_json}\n" + insist { LogStash::Json.load(d)["foo"] } == data["foo"] + insist { LogStash::Json.load(d)["baz"] } == data["baz"] + insist { LogStash::Json.load(d)["bah"] } == data["bah"] got_event = true end subject.encode(event) diff --git a/spec/codecs/json_spooler.rb b/spec/codecs/json_spooler.rb index 7cb78da0b..20aef79b5 100644 --- a/spec/codecs/json_spooler.rb +++ b/spec/codecs/json_spooler.rb @@ -1,43 +1,47 @@ require "logstash/codecs/json_spooler" require "logstash/event" +require "logstash/json" require "insist" describe LogStash::Codecs::JsonSpooler do - # subject do - # next LogStash::Codecs::JsonSpooler.new - # end + subject do + # mute deprecation message + expect_any_instance_of(LogStash::Codecs::JsonSpooler).to receive(:register).and_return(nil) - # context "#decode" do - # it "should return an event from spooled json data" do - # data = {"a" => 1} - # events = [LogStash::Event.new(data), LogStash::Event.new(data), - # LogStash::Event.new(data)] - # subject.decode(events.to_json) do |event| - # insist { event.is_a? LogStash::Event } - # insist { event["a"] } == data["a"] - # end - # end - # end + LogStash::Codecs::JsonSpooler.new + end - # context "#encode" do - # it "should return spooled json data" do - # data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}} - # subject.spool_size = 3 - # got_event = false - # subject.on_event do |d| - # events = JSON.parse(d) - # insist { events.is_a? Array } - # insist { events[0].is_a? LogStash::Event } - # insist { events[0]["foo"] } == data["foo"] - # insist { events[0]["baz"] } == data["baz"] - # insist { events[0]["bah"] } == data["bah"] - # insist { events.length } == 3 - # got_event = true - # end - # 3.times do - # subject.encode(LogStash::Event.new(data)) - # end - # insist { got_event } - # end - # end + context "#decode" do + it "should return an event from spooled json data" do + data = {"a" => 1} + events = [LogStash::Event.new(data), LogStash::Event.new(data), + LogStash::Event.new(data)] + subject.decode(LogStash::Json.dump(events)) do |event| + insist { event.is_a? LogStash::Event } + insist { event["a"] } == data["a"] + end + end + end + + context "#encode" do + it "should return spooled json data" do + data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}} + subject.spool_size = 3 + got_event = false + subject.on_event do |d| + events = LogStash::Json.load(d) + insist { events.is_a? Array } + insist { events[0].is_a? LogStash::Event } + insist { events[0]["foo"] } == data["foo"] + insist { events[0]["baz"] } == data["baz"] + insist { events[0]["bah"] } == data["bah"] + insist { events.length } == 3 + got_event = true + end + 3.times do + subject.encode(LogStash::Event.new(data)) + end + insist { got_event } + end + end end diff --git a/spec/codecs/msgpack.rb b/spec/codecs/msgpack.rb index fc36c1bab..ba0c451bd 100644 --- a/spec/codecs/msgpack.rb +++ b/spec/codecs/msgpack.rb @@ -2,38 +2,56 @@ require "logstash/codecs/msgpack" require "logstash/event" require "insist" -# Skip msgpack for now since Hash#to_msgpack seems to not be a valid method? -describe LogStash::Codecs::Msgpack, :if => false do +describe LogStash::Codecs::Msgpack do subject do next LogStash::Codecs::Msgpack.new end context "#decode" do it "should return an event from msgpack data" do - data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}} - subject.decode(data.to_msgpack) do |event| + data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}, "@timestamp" => "2014-05-30T02:52:17.929Z"} + subject.decode(MessagePack.pack(data)) do |event| insist { event.is_a? LogStash::Event } insist { event["foo"] } == data["foo"] insist { event["baz"] } == data["baz"] insist { event["bah"] } == data["bah"] + insist { event["@timestamp"].to_iso8601 } == data["@timestamp"] end end end context "#encode" do - it "should return msgpack data" do - data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}} + it "should return msgpack data from pure ruby hash" do + data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}, "@timestamp" => "2014-05-30T02:52:17.929Z"} event = LogStash::Event.new(data) got_event = false subject.on_event do |d| - insist { d } == LogStash::Event.new(data).to_hash.to_msgpack insist { MessagePack.unpack(d)["foo"] } == data["foo"] insist { MessagePack.unpack(d)["baz"] } == data["baz"] insist { MessagePack.unpack(d)["bah"] } == data["bah"] + insist { MessagePack.unpack(d)["@timestamp"] } == "2014-05-30T02:52:17.929Z" + insist { MessagePack.unpack(d)["@timestamp"] } == event["@timestamp"].to_iso8601 + got_event = true + end + subject.encode(event) + insist { got_event } + end + + it "should return msgpack data from deserialized json with normalization" do + data = LogStash::Json.load('{"foo": "bar", "baz": {"bah": ["a","b","c"]}, "@timestamp": "2014-05-30T02:52:17.929Z"}') + event = LogStash::Event.new(data) + got_event = false + subject.on_event do |d| + insist { MessagePack.unpack(d)["foo"] } == data["foo"] + insist { MessagePack.unpack(d)["baz"] } == data["baz"] + insist { MessagePack.unpack(d)["bah"] } == data["bah"] + insist { MessagePack.unpack(d)["@timestamp"] } == "2014-05-30T02:52:17.929Z" + insist { MessagePack.unpack(d)["@timestamp"] } == event["@timestamp"].to_iso8601 got_event = true end subject.encode(event) insist { got_event } end end + end diff --git a/spec/codecs/oldlogstashjson.rb b/spec/codecs/oldlogstashjson.rb index 163980637..3bb037b1a 100644 --- a/spec/codecs/oldlogstashjson.rb +++ b/spec/codecs/oldlogstashjson.rb @@ -1,5 +1,6 @@ require "logstash/codecs/oldlogstashjson" require "logstash/event" +require "logstash/json" require "insist" describe LogStash::Codecs::OldLogStashJSON do @@ -11,7 +12,7 @@ describe LogStash::Codecs::OldLogStashJSON do it "should return a new (v1) event from old (v0) json data" do data = {"@message" => "bar", "@source_host" => "localhost", "@tags" => ["a","b","c"]} - subject.decode(data.to_json) do |event| + subject.decode(LogStash::Json.dump(data)) do |event| insist { event.is_a? LogStash::Event } insist { event["@timestamp"] } != nil insist { event["type"] } == data["@type"] @@ -38,14 +39,14 @@ describe LogStash::Codecs::OldLogStashJSON do event = LogStash::Event.new(data) got_event = false subject.on_event do |d| - insist { JSON.parse(d)["@timestamp"] } != nil - insist { JSON.parse(d)["@type"] } == data["type"] - insist { JSON.parse(d)["@message"] } == data["message"] - insist { JSON.parse(d)["@source_host"] } == data["host"] - insist { JSON.parse(d)["@source_path"] } == data["path"] - insist { JSON.parse(d)["@tags"] } == data["tags"] - insist { JSON.parse(d)["@fields"]["bah"] } == "baz" - insist { JSON.parse(d)["@fields"]["@version"] } == nil + insist { LogStash::Json.load(d)["@timestamp"] } != nil + insist { LogStash::Json.load(d)["@type"] } == data["type"] + insist { LogStash::Json.load(d)["@message"] } == data["message"] + insist { LogStash::Json.load(d)["@source_host"] } == data["host"] + insist { LogStash::Json.load(d)["@source_path"] } == data["path"] + insist { LogStash::Json.load(d)["@tags"] } == data["tags"] + insist { LogStash::Json.load(d)["@fields"]["bah"] } == "baz" + insist { LogStash::Json.load(d)["@fields"]["@version"] } == nil got_event = true end subject.encode(event) diff --git a/spec/event.rb b/spec/event.rb index 17a283da0..e885c4f6e 100644 --- a/spec/event.rb +++ b/spec/event.rb @@ -247,4 +247,51 @@ describe LogStash::Event do end end end + + context "timestamp initialization" do + let(:logger) { double("logger") } + + it "should coerce timestamp" do + t = Time.iso8601("2014-06-12T00:12:17.114Z") + expect(LogStash::Timestamp).to receive(:coerce).exactly(3).times.and_call_original + insist{LogStash::Event.new("@timestamp" => t).timestamp.to_i} == t.to_i + insist{LogStash::Event.new("@timestamp" => LogStash::Timestamp.new(t)).timestamp.to_i} == t.to_i + insist{LogStash::Event.new("@timestamp" => "2014-06-12T00:12:17.114Z").timestamp.to_i} == t.to_i + end + + it "should assign current time when no timestamp" do + ts = LogStash::Timestamp.now + expect(LogStash::Timestamp).to receive(:now).and_return(ts) + insist{LogStash::Event.new({}).timestamp.to_i} == ts.to_i + end + + it "should tag and warn for invalid value" do + ts = LogStash::Timestamp.now + expect(LogStash::Timestamp).to receive(:now).twice.and_return(ts) + expect(Cabin::Channel).to receive(:get).twice.and_return(logger) + expect(logger).to receive(:warn).twice + + event = LogStash::Event.new("@timestamp" => :foo) + insist{event.timestamp.to_i} == ts.to_i + insist{event["tags"]} == [LogStash::Event::TIMESTAMP_FAILURE_TAG] + insist{event[LogStash::Event::TIMESTAMP_FAILURE_FIELD]} == :foo + + event = LogStash::Event.new("@timestamp" => 666) + insist{event.timestamp.to_i} == ts.to_i + insist{event["tags"]} == [LogStash::Event::TIMESTAMP_FAILURE_TAG] + insist{event[LogStash::Event::TIMESTAMP_FAILURE_FIELD]} == 666 + end + + it "should tag and warn for invalid string format" do + ts = LogStash::Timestamp.now + expect(LogStash::Timestamp).to receive(:now).and_return(ts) + expect(Cabin::Channel).to receive(:get).and_return(logger) + expect(logger).to receive(:warn) + + event = LogStash::Event.new("@timestamp" => "foo") + insist{event.timestamp.to_i} == ts.to_i + insist{event["tags"]} == [LogStash::Event::TIMESTAMP_FAILURE_TAG] + insist{event[LogStash::Event::TIMESTAMP_FAILURE_FIELD]} == "foo" + end + end end diff --git a/spec/examples/graphite-input.rb b/spec/examples/graphite-input.rb index fc86b09c4..b1f4e96f5 100644 --- a/spec/examples/graphite-input.rb +++ b/spec/examples/graphite-input.rb @@ -37,7 +37,7 @@ describe "receive graphite input", :if => RUBY_ENGINE == "jruby" do insist { subject["name"] } == "foo.bar.baz" insist { subject["value"] } == 4025.34 - insist { subject["@timestamp"] } == Time.iso8601("2013-03-30T01:22:02.000Z") + insist { subject["@timestamp"].time } == Time.iso8601("2013-03-30T01:22:02.000Z") end end diff --git a/spec/examples/parse-apache-logs.rb b/spec/examples/parse-apache-logs.rb index c7e145379..4407a95f2 100644 --- a/spec/examples/parse-apache-logs.rb +++ b/spec/examples/parse-apache-logs.rb @@ -55,7 +55,7 @@ describe "apache common log format", :if => RUBY_ENGINE == "jruby" do insist { subject["agent"] } == "\"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:14.0) Gecko/20100101 Firefox/14.0.1\"" # Verify date parsing - insist { subject.timestamp } == Time.iso8601("2012-08-30T00:17:38.000Z") + insist { subject.timestamp.time } == Time.iso8601("2012-08-30T00:17:38.000Z") end sample '61.135.248.195 - - [26/Sep/2012:11:49:20 -0400] "GET /projects/keynav/ HTTP/1.1" 200 18985 "" "Mozilla/5.0 (compatible; YodaoBot/1.0; http://www.yodao.com/help/webmaster/spider/; )"' do diff --git a/spec/filters/date.rb b/spec/filters/date.rb index dc27d39d6..2419f4399 100644 --- a/spec/filters/date.rb +++ b/spec/filters/date.rb @@ -52,9 +52,9 @@ RUBY_ENGINE == "jruby" and describe LogStash::Filters::Date do times.each do |input, output| sample("mydate" => input) do - begin + begin insist { subject["mydate"] } == input - insist { subject["@timestamp"] } == Time.iso8601(output).utc + insist { subject["@timestamp"].time } == Time.iso8601(output).utc rescue #require "pry"; binding.pry raise @@ -83,7 +83,7 @@ RUBY_ENGINE == "jruby" and describe LogStash::Filters::Date do times.each do |input, output| sample("mydate" => input) do insist { subject["mydate"] } == input - insist { subject["@timestamp"] } == Time.iso8601(output).utc + insist { subject["@timestamp"].time } == Time.iso8601(output).utc end end # times.each end @@ -109,7 +109,7 @@ RUBY_ENGINE == "jruby" and describe LogStash::Filters::Date do times.each do |input, output| sample("mydate" => input) do insist { subject["mydate"] } == input - insist { subject["@timestamp"] } == Time.iso8601(output).utc + insist { subject["@timestamp"].time } == Time.iso8601(output).utc end end # times.each end @@ -126,7 +126,7 @@ RUBY_ENGINE == "jruby" and describe LogStash::Filters::Date do sample("mydate" => "1350414944.123456") do # Joda time only supports milliseconds :\ - insist { subject.timestamp } == Time.iso8601("2012-10-16T12:15:44.123-07:00").utc + insist { subject.timestamp.time } == Time.iso8601("2012-10-16T12:15:44.123-07:00").utc end end @@ -153,7 +153,7 @@ RUBY_ENGINE == "jruby" and describe LogStash::Filters::Date do times.each do |input, output| sample("mydate" => input) do insist { subject["mydate"] } == input - insist { subject["@timestamp"] } == Time.iso8601(output) + insist { subject["@timestamp"].time } == Time.iso8601(output) end end # times.each end @@ -177,7 +177,7 @@ RUBY_ENGINE == "jruby" and describe LogStash::Filters::Date do locale => "en" } } - output { + output { null { } } CONFIG @@ -199,13 +199,13 @@ RUBY_ENGINE == "jruby" and describe LogStash::Filters::Date do # Try without leading "@" sample("t" => "4000000050d506482dbdf024") do - insist { subject.timestamp } == Time.iso8601("2012-12-22T01:00:46.767Z").utc + insist { subject.timestamp.time } == Time.iso8601("2012-12-22T01:00:46.767Z").utc end # Should still parse successfully if it's a full tai64n time (with leading # '@') sample("t" => "@4000000050d506482dbdf024") do - insist { subject.timestamp } == Time.iso8601("2012-12-22T01:00:46.767Z").utc + insist { subject.timestamp.time } == Time.iso8601("2012-12-22T01:00:46.767Z").utc end end @@ -223,28 +223,28 @@ RUBY_ENGINE == "jruby" and describe LogStash::Filters::Date do sample("mydate" => time) do insist { subject["mydate"] } == time - insist { subject["@timestamp"] } == Time.iso8601(time).utc + insist { subject["@timestamp"].time } == Time.iso8601(time).utc end end - + describe "support deep nested field access" do config <<-CONFIG - filter { + filter { date { match => [ "[data][deep]", "ISO8601" ] locale => "en" } } CONFIG - + sample("data" => { "deep" => "2013-01-01T00:00:00.000Z" }) do - insist { subject["@timestamp"] } == Time.iso8601("2013-01-01T00:00:00.000Z").utc + insist { subject["@timestamp"].time } == Time.iso8601("2013-01-01T00:00:00.000Z").utc end end describe "failing to parse should not throw an exception" do config <<-CONFIG - filter { + filter { date { match => [ "thedate", "yyyy/MM/dd" ] locale => "en" @@ -259,7 +259,7 @@ RUBY_ENGINE == "jruby" and describe LogStash::Filters::Date do describe "success to parse should apply on_success config(add_tag,add_field...)" do config <<-CONFIG - filter { + filter { date { match => [ "thedate", "yyyy/MM/dd" ] add_tag => "tagged" @@ -275,7 +275,7 @@ RUBY_ENGINE == "jruby" and describe LogStash::Filters::Date do describe "failing to parse should not apply on_success config(add_tag,add_field...)" do config <<-CONFIG - filter { + filter { date { match => [ "thedate", "yyyy/MM/dd" ] add_tag => "tagged" @@ -308,7 +308,7 @@ RUBY_ENGINE == "jruby" and describe LogStash::Filters::Date do times.each do |input, output| sample("mydate" => input) do insist { subject["mydate"] } == input - insist { subject["@timestamp"] } == Time.iso8601(output).utc + insist { subject["@timestamp"].time } == Time.iso8601(output).utc end end # times.each end diff --git a/spec/filters/json.rb b/spec/filters/json.rb index 7041a04ba..b571b9b40 100644 --- a/spec/filters/json.rb +++ b/spec/filters/json.rb @@ -1,5 +1,6 @@ require "test_utils" require "logstash/filters/json" +require "logstash/timestamp" describe LogStash::Filters::Json do extend LogStash::RSpec @@ -16,7 +17,7 @@ describe LogStash::Filters::Json do sample '{ "hello": "world", "list": [ 1, 2, 3 ], "hash": { "k": "v" } }' do insist { subject["hello"] } == "world" - insist { subject["list" ] } == [1,2,3] + insist { subject["list" ].to_a } == [1,2,3] # to_a for JRuby + JrJacksom which creates Java ArrayList insist { subject["hash"] } == { "k" => "v" } end end @@ -34,7 +35,7 @@ describe LogStash::Filters::Json do sample '{ "hello": "world", "list": [ 1, 2, 3 ], "hash": { "k": "v" } }' do insist { subject["data"]["hello"] } == "world" - insist { subject["data"]["list" ] } == [1,2,3] + insist { subject["data"]["list" ].to_a } == [1,2,3] # to_a for JRuby + JrJacksom which creates Java ArrayList insist { subject["data"]["hash"] } == { "k" => "v" } end end @@ -65,8 +66,8 @@ describe LogStash::Filters::Json do CONFIG sample "{ \"@timestamp\": \"2013-10-19T00:14:32.996Z\" }" do - insist { subject["@timestamp"] }.is_a?(Time) - insist { subject["@timestamp"].to_json } == "\"2013-10-19T00:14:32.996Z\"" + insist { subject["@timestamp"] }.is_a?(LogStash::Timestamp) + insist { LogStash::Json.dump(subject["@timestamp"]) } == "\"2013-10-19T00:14:32.996Z\"" end end diff --git a/spec/inputs/elasticsearch.rb b/spec/inputs/elasticsearch.rb new file mode 100644 index 000000000..d695a2ed8 --- /dev/null +++ b/spec/inputs/elasticsearch.rb @@ -0,0 +1,80 @@ +require "test_utils" +require "logstash/inputs/elasticsearch" + +describe "inputs/elasticsearch" do + extend LogStash::RSpec + + search_response = <<-RESPONSE + { + "_scroll_id":"xxx", + "took":5, + "timed_out":false, + "_shards":{"total":15,"successful":15,"failed":0}, + "hits":{ + "total":1000050, + "max_score":1.0, + "hits":[ + { + "_index":"logstash2", + "_type":"logs", + "_id":"AmaqL7VuSWKF-F6N_Gz72g", + "_score":1.0, + "_source" : { + "message":"foobar", + "@version":"1", + "@timestamp":"2014-05-19T21:08:39.000Z", + "host":"colin-mbp13r" + } + } + ] + } + } + RESPONSE + + scroll_response = <<-RESPONSE + { + "hits":{ + "hits":[] + } + } + RESPONSE + + config <<-CONFIG + input { + elasticsearch { + host => "localhost" + scan => false + } + } + CONFIG + + it "should retrieve json event from elasticseach" do + # I somewhat duplicated our "input" rspec extension because I needed to add mocks for the the actual ES calls + # and rspec expectations need to be in "it" statement but the "input" extension defines the "it" + # TODO(colin) see how we can improve our rspec extension to better integrate in these scenarios + + expect_any_instance_of(LogStash::Inputs::Elasticsearch).to receive(:execute_search_request).and_return(search_response) + expect_any_instance_of(LogStash::Inputs::Elasticsearch).to receive(:execute_scroll_request).with(any_args).and_return(scroll_response) + + pipeline = LogStash::Pipeline.new(config) + queue = Queue.new + pipeline.instance_eval do + @output_func = lambda { |event| queue << event } + end + pipeline_thread = Thread.new { pipeline.run } + event = queue.pop + + insist { event["message"] } == "foobar" + + # do not call pipeline.shutdown here, as it will stop the plugin execution randomly + # and maybe kill input before calling execute_scroll_request. + # TODO(colin) we should rework the pipeliene shutdown to allow a soft/clean shutdown mecanism, + # using a shutdown event which can be fed into each plugin queue and when the plugin sees it + # exits after completing its processing. + # + # pipeline.shutdown + # + # instead, since our scroll_response will terminate the plugin, we can just join the pipeline thread + pipeline_thread.join + end +end diff --git a/spec/inputs/gelf.rb b/spec/inputs/gelf.rb index e2cab136d..458b34a64 100644 --- a/spec/inputs/gelf.rb +++ b/spec/inputs/gelf.rb @@ -11,39 +11,39 @@ describe "inputs/gelf" do gelfclient = GELF::Notifier.new(host,port,chunksize) config <<-CONFIG -input { - gelf { - port => "#{port}" - host => "#{host}" - } -} + input { + gelf { + port => "#{port}" + host => "#{host}" + } + } CONFIG input do |pipeline, queue| Thread.new { pipeline.run } sleep 0.1 while !pipeline.ready? - - # generate random characters (message is zipped!) from printable ascii ( SPACE till ~ ) + + # generate random characters (message is zipped!) from printable ascii ( SPACE till ~ ) # to trigger gelf chunking s = StringIO.new - for i in 1..2000 - s << 32 + rand(126-32) + for i in 1..2000 + s << 32 + rand(126-32) end large_random = s.string - - [ "hello", - "world", - large_random, - "we survived gelf!" - ].each do |m| - gelfclient.notify!( "short_message" => m ) - # poll at most 10 times - waits = 0 + + [ "hello", + "world", + large_random, + "we survived gelf!" + ].each do |m| + gelfclient.notify!( "short_message" => m ) + # poll at most 10 times + waits = 0 while waits < 10 and queue.size == 0 - sleep 0.1 - waits += 1 + sleep 0.1 + waits += 1 end - insist { queue.size } > 0 + insist { queue.size } > 0 insist { queue.pop["message"] } == m end diff --git a/spec/inputs/tcp.rb b/spec/inputs/tcp.rb index e4ea8312a..b666be464 100644 --- a/spec/inputs/tcp.rb +++ b/spec/inputs/tcp.rb @@ -1,6 +1,7 @@ # coding: utf-8 require "test_utils" require "socket" +require "logstash/json" describe "inputs/tcp", :socket => true do extend LogStash::RSpec @@ -89,7 +90,7 @@ describe "inputs/tcp", :socket => true do } socket = Stud.try(5.times) { TCPSocket.new("127.0.0.1", port) } - socket.puts(data.to_json) + socket.puts(LogStash::Json.dump(data)) socket.close event = queue.pop @@ -123,7 +124,7 @@ describe "inputs/tcp", :socket => true do } socket = Stud.try(5.times) { TCPSocket.new("127.0.0.1", port) } - socket.puts(data.to_json) + socket.puts(LogStash::Json.dump(data)) socket.close event = queue.pop @@ -157,7 +158,7 @@ describe "inputs/tcp", :socket => true do socket = Stud.try(5.times) { TCPSocket.new("127.0.0.1", port) } (1..5).each do |idx| data["idx"] = idx - socket.puts(data.to_json+"\n") + socket.puts(LogStash::Json.dump(data) + "\n") end # do socket.close diff --git a/spec/json.rb b/spec/json.rb new file mode 100644 index 000000000..147b6196d --- /dev/null +++ b/spec/json.rb @@ -0,0 +1,94 @@ +# encoding: utf-8 +require "logstash/json" +require "logstash/environment" +require "logstash/util" + +describe LogStash::Json do + + let(:hash) {{"a" => 1}} + let(:json_hash) {"{\"a\":1}"} + + let(:string) {"foobar"} + let(:json_string) {"\"foobar\""} + + let(:array) {["foo", "bar"]} + let(:json_array) {"[\"foo\",\"bar\"]"} + + let(:multi) { + [ + {:ruby => "foo bar baz", :json => "\"foo bar baz\""}, + {:ruby => "1", :json => "\"1\""}, + {:ruby => {"a" => true}, :json => "{\"a\":true}"}, + {:ruby => {"a" => nil}, :json => "{\"a\":null}"}, + {:ruby => ["a", "b"], :json => "[\"a\",\"b\"]"}, + {:ruby => [1, 2], :json => "[1,2]"}, + {:ruby => [1, nil], :json => "[1,null]"}, + {:ruby => {"a" => [1, 2]}, :json => "{\"a\":[1,2]}"}, + {:ruby => {"a" => {"b" => 2}}, :json => "{\"a\":{\"b\":2}}"}, + # {:ruby => , :json => }, + ] + } + + if LogStash::Environment.jruby? + + ### JRuby specific + + context "jruby deserialize" do + it "should respond to load and deserialize object" do + expect(JrJackson::Raw).to receive(:parse_raw).with(json_hash).and_call_original + expect(LogStash::Json.load(json_hash)).to eql(hash) + end + end + + context "jruby serialize" do + it "should respond to dump and serialize object" do + expect(JrJackson::Json).to receive(:dump).with(string).and_call_original + expect(LogStash::Json.dump(string)).to eql(json_string) + end + + it "should call JrJackson::Raw.generate for Hash" do + expect(JrJackson::Raw).to receive(:generate).with(hash).and_call_original + expect(LogStash::Json.dump(hash)).to eql(json_hash) + end + + it "should call JrJackson::Raw.generate for Array" do + expect(JrJackson::Raw).to receive(:generate).with(array).and_call_original + expect(LogStash::Json.dump(array)).to eql(json_array) + end + end + else + + ### MRI specific + + it "should respond to load and deserialize object on mri" do + expect(Oj).to receive(:load).with(json).and_call_original + expect(LogStash::Json.load(json)).to eql(hash) + end + + it "should respond to dump and serialize object on mri" do + expect(Oj).to receive(:dump).with(hash, anything).and_call_original + expect(LogStash::Json.dump(hash)).to eql(json) + end + end + + ### non specific + + it "should correctly deserialize" do + multi.each do |test| + # because JrJackson in :raw mode uses Java::JavaUtil::LinkedHashMap and + # Java::JavaUtil::ArrayList, we must cast to compare. + # other than that, they quack like their Ruby equivalent + expect(LogStash::Util.normalize(LogStash::Json.load(test[:json]))).to eql(test[:ruby]) + end + end + + it "should correctly serialize" do + multi.each do |test| + expect(LogStash::Json.dump(test[:ruby])).to eql(test[:json]) + end + end + + it "should raise Json::ParserError on invalid json" do + expect{LogStash::Json.load("abc")}.to raise_error LogStash::Json::ParserError + end +end diff --git a/spec/outputs/elasticsearch.rb b/spec/outputs/elasticsearch.rb index a41955e77..836c9ef56 100644 --- a/spec/outputs/elasticsearch.rb +++ b/spec/outputs/elasticsearch.rb @@ -1,6 +1,7 @@ require "test_utils" require "ftw" require "logstash/plugin" +require "logstash/json" describe "outputs/elasticsearch" do extend LogStash::RSpec @@ -53,7 +54,7 @@ describe "outputs/elasticsearch" do data = "" response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*") response.read_body { |chunk| data << chunk } - result = JSON.parse(data) + result = LogStash::Json.load(data) count = result["count"] insist { count } == event_count end @@ -61,7 +62,7 @@ describe "outputs/elasticsearch" do response = ftw.get!("http://127.0.0.1:9200/#{index}/_search?q=*&size=1000") data = "" response.read_body { |chunk| data << chunk } - result = JSON.parse(data) + result = LogStash::Json.load(data) result["hits"]["hits"].each do |doc| # With no 'index_type' set, the document type should be the type # set on the input @@ -104,7 +105,7 @@ describe "outputs/elasticsearch" do data = "" response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*") response.read_body { |chunk| data << chunk } - result = JSON.parse(data) + result = LogStash::Json.load(data) count = result["count"] insist { count } == event_count end @@ -112,7 +113,7 @@ describe "outputs/elasticsearch" do response = ftw.get!("http://127.0.0.1:9200/#{index}/_search?q=*&size=1000") data = "" response.read_body { |chunk| data << chunk } - result = JSON.parse(data) + result = LogStash::Json.load(data) result["hits"]["hits"].each do |doc| insist { doc["_type"] } == "logs" end @@ -151,7 +152,7 @@ describe "outputs/elasticsearch" do data = "" response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*") response.read_body { |chunk| data << chunk } - result = JSON.parse(data) + result = LogStash::Json.load(data) count = result["count"] insist { count } == event_count end @@ -159,7 +160,7 @@ describe "outputs/elasticsearch" do response = ftw.get!("http://127.0.0.1:9200/#{index}/_search?q=*&size=1000") data = "" response.read_body { |chunk| data << chunk } - result = JSON.parse(data) + result = LogStash::Json.load(data) result["hits"]["hits"].each do |doc| insist { doc["_type"] } == "generated" end @@ -195,7 +196,7 @@ describe "outputs/elasticsearch" do data = "" response = ftw.get!("http://127.0.0.1:9200/#{index_name}/_count?q=*") response.read_body { |chunk| data << chunk } - result = JSON.parse(data) + result = LogStash::Json.load(data) count = result["count"] insist { count } == 100 end @@ -203,7 +204,7 @@ describe "outputs/elasticsearch" do response = ftw.get!("http://127.0.0.1:9200/#{index_name}/_search?q=*&size=1000") data = "" response.read_body { |chunk| data << chunk } - result = JSON.parse(data) + result = LogStash::Json.load(data) result["hits"]["hits"].each do |doc| insist { doc["_type"] } == "logs" end @@ -241,7 +242,7 @@ describe "outputs/elasticsearch" do data = "" response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*") response.read_body { |chunk| data << chunk } - result = JSON.parse(data) + result = LogStash::Json.load(data) count = result["count"] insist { count } == event_count end @@ -249,7 +250,7 @@ describe "outputs/elasticsearch" do response = ftw.get!("http://127.0.0.1:9200/#{index}/_search?q=*&size=1000") data = "" response.read_body { |chunk| data << chunk } - result = JSON.parse(data) + result = LogStash::Json.load(data) result["hits"]["hits"].each do |doc| insist { doc["_type"] } == "generated" end diff --git a/spec/outputs/elasticsearch_http.rb b/spec/outputs/elasticsearch_http.rb index d1b1072e0..f668b3719 100644 --- a/spec/outputs/elasticsearch_http.rb +++ b/spec/outputs/elasticsearch_http.rb @@ -1,4 +1,5 @@ require "test_utils" +require "logstash/json" describe "outputs/elasticsearch_http", :elasticsearch => true do extend LogStash::RSpec @@ -45,7 +46,7 @@ describe "outputs/elasticsearch_http", :elasticsearch => true do data = "" response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*") response.read_body { |chunk| data << chunk } - result = JSON.parse(data) + result = LogStash::Json.load(data) count = result["count"] insist { count } == event_count end @@ -53,7 +54,7 @@ describe "outputs/elasticsearch_http", :elasticsearch => true do response = ftw.get!("http://127.0.0.1:9200/#{index}/_search?q=*&size=1000") data = "" response.read_body { |chunk| data << chunk } - result = JSON.parse(data) + result = LogStash::Json.load(data) result["hits"]["hits"].each do |doc| # With no 'index_type' set, the document type should be the type # set on the input @@ -96,7 +97,7 @@ describe "outputs/elasticsearch_http", :elasticsearch => true do data = "" response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*") response.read_body { |chunk| data << chunk } - result = JSON.parse(data) + result = LogStash::Json.load(data) count = result["count"] insist { count } == event_count end @@ -104,7 +105,7 @@ describe "outputs/elasticsearch_http", :elasticsearch => true do response = ftw.get!("http://127.0.0.1:9200/#{index}/_search?q=*&size=1000") data = "" response.read_body { |chunk| data << chunk } - result = JSON.parse(data) + result = LogStash::Json.load(data) result["hits"]["hits"].each do |doc| insist { doc["_type"] } == "logs" end @@ -143,7 +144,7 @@ describe "outputs/elasticsearch_http", :elasticsearch => true do data = "" response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*") response.read_body { |chunk| data << chunk } - result = JSON.parse(data) + result = LogStash::Json.load(data) count = result["count"] insist { count } == event_count end @@ -151,7 +152,7 @@ describe "outputs/elasticsearch_http", :elasticsearch => true do response = ftw.get!("http://127.0.0.1:9200/#{index}/_search?q=*&size=1000") data = "" response.read_body { |chunk| data << chunk } - result = JSON.parse(data) + result = LogStash::Json.load(data) result["hits"]["hits"].each do |doc| insist { doc["_type"] } == "generated" end diff --git a/spec/outputs/file.rb b/spec/outputs/file.rb index bdf6a7698..a49366bd4 100644 --- a/spec/outputs/file.rb +++ b/spec/outputs/file.rb @@ -1,5 +1,6 @@ require "test_utils" require "logstash/outputs/file" +require "logstash/json" require "tempfile" describe LogStash::Outputs::File do @@ -28,7 +29,7 @@ describe LogStash::Outputs::File do line_num = 0 # Now check all events for order and correctness. File.foreach(tmp_file) do |line| - event = LogStash::Event.new(JSON.parse(line)) + event = LogStash::Event.new(LogStash::Json.load(line)) insist {event["message"]} == "hello world" insist {event["sequence"]} == line_num line_num += 1 @@ -61,7 +62,7 @@ describe LogStash::Outputs::File do line_num = 0 # Now check all events for order and correctness. Zlib::GzipReader.open(tmp_file.path).each_line do |line| - event = LogStash::Event.new(JSON.parse(line)) + event = LogStash::Event.new(LogStash::Json.load(line)) insist {event["message"]} == "hello world" insist {event["sequence"]} == line_num line_num += 1 diff --git a/spec/outputs/redis.rb b/spec/outputs/redis.rb index 442d8b017..3c4dbeb04 100644 --- a/spec/outputs/redis.rb +++ b/spec/outputs/redis.rb @@ -1,5 +1,6 @@ require "test_utils" require "logstash/outputs/redis" +require "logstash/json" require "redis" describe LogStash::Outputs::Redis, :redis => true do @@ -36,7 +37,7 @@ describe LogStash::Outputs::Redis, :redis => true do # Now check all events for order and correctness. event_count.times do |value| id, element = redis.blpop(key, 0) - event = LogStash::Event.new(JSON.parse(element)) + event = LogStash::Event.new(LogStash::Json.load(element)) insist { event["sequence"] } == value insist { event["message"] } == "hello world" end @@ -84,7 +85,7 @@ describe LogStash::Outputs::Redis, :redis => true do # Now check all events for order and correctness. event_count.times do |value| id, element = redis.blpop(key, 0) - event = LogStash::Event.new(JSON.parse(element)) + event = LogStash::Event.new(LogStash::Json.load(element)) insist { event["sequence"] } == value insist { event["message"] } == "hello world" end diff --git a/spec/support/date-http.rb b/spec/support/date-http.rb index f9fd883f1..a6ac07966 100644 --- a/spec/support/date-http.rb +++ b/spec/support/date-http.rb @@ -13,6 +13,6 @@ describe "http dates", :if => RUBY_ENGINE == "jruby" do CONFIG sample("timestamp" => "25/Mar/2013:20:33:56 +0000") do - insist { subject["@timestamp"] } == Time.iso8601("2013-03-25T20:33:56.000Z") + insist { subject["@timestamp"].time } == Time.iso8601("2013-03-25T20:33:56.000Z") end end diff --git a/spec/test_utils.rb b/spec/test_utils.rb index bb9f712c8..f890552ef 100644 --- a/spec/test_utils.rb +++ b/spec/test_utils.rb @@ -1,5 +1,8 @@ # encoding: utf-8 +require "logstash/json" +require "logstash/timestamp" + if ENV['COVERAGE'] require 'simplecov' require 'coveralls' @@ -42,8 +45,8 @@ puts("Using Accessor#strict_set for specs") # ugly, I know, but this avoids adding conditionals in performance critical section class LogStash::Event def []=(str, value) - if str == TIMESTAMP && !value.is_a?(Time) - raise TypeError, "The field '@timestamp' must be a Time, not a #{value.class} (#{value})" + if str == TIMESTAMP && !value.is_a?(LogStash::Timestamp) + raise TypeError, "The field '@timestamp' must be a LogStash::Timestamp, not a #{value.class} (#{value})" end @accessors.strict_set(str, value) end # def []= @@ -69,7 +72,7 @@ module LogStash end def sample(sample_event, &block) - name = sample_event.is_a?(String) ? sample_event : sample_event.to_json + name = sample_event.is_a?(String) ? sample_event : LogStash::Json.dump(sample_event) name = name[0..50] + "..." if name.length > 50 describe "\"#{name}\"" do diff --git a/spec/timestamp.rb b/spec/timestamp.rb new file mode 100644 index 000000000..f6e6a0cee --- /dev/null +++ b/spec/timestamp.rb @@ -0,0 +1,35 @@ +require "logstash/timestamp" + +describe LogStash::Timestamp do + + it "should parse its own iso8601 output" do + t = Time.now + ts = LogStash::Timestamp.new(t) + expect(LogStash::Timestamp.parse_iso8601(ts.to_iso8601).to_i).to eq(t.to_i) + end + + it "should coerce iso8601 string" do + t = Time.now + ts = LogStash::Timestamp.new(t) + expect(LogStash::Timestamp.coerce(ts.to_iso8601).to_i).to eq(t.to_i) + end + + it "should coerce Time" do + t = Time.now + expect(LogStash::Timestamp.coerce(t).to_i).to eq(t.to_i) + end + + it "should coerce Timestamp" do + t = LogStash::Timestamp.now + expect(LogStash::Timestamp.coerce(t).to_i).to eq(t.to_i) + end + + it "should raise on invalid string coerce" do + expect{LogStash::Timestamp.coerce("foobar")}.to raise_error LogStash::TimestampParserError + end + + it "should return nil on invalid object coerce" do + expect(LogStash::Timestamp.coerce(:foobar)).to be_nil + end + +end diff --git a/tools/Gemfile.jruby-1.9.lock b/tools/Gemfile.jruby-1.9.lock index dc11fd542..f05fb0e28 100644 --- a/tools/Gemfile.jruby-1.9.lock +++ b/tools/Gemfile.jruby-1.9.lock @@ -1,26 +1,28 @@ GEM remote: https://rubygems.org/ specs: - activesupport (3.2.17) - i18n (~> 0.6, >= 0.6.4) - multi_json (~> 1.0) - addressable (2.3.5) - atomic (1.1.15-java) + activesupport (4.1.1) + i18n (~> 0.6, >= 0.6.9) + json (~> 1.7, >= 1.7.7) + minitest (~> 5.1) + thread_safe (~> 0.1) + tzinfo (~> 1.1) + addressable (2.3.6) + atomic (1.1.16-java) avl_tree (1.1.3) awesome_print (1.2.0) - aws-sdk (1.35.0) + aws-sdk (1.41.0) json (~> 1.4) nokogiri (>= 1.4.4) - uuidtools (~> 2.1) backports (3.6.0) beefcake (0.3.7) - bindata (2.0.0) + bindata (2.1.0) blankslate (2.1.2.4) bouncy-castle-java (1.5.0147) buftok (0.1) builder (3.2.2) cabin (0.6.1) - ci_reporter (1.9.1) + ci_reporter (1.9.2) builder (>= 2.1.2) cinch (2.1.0) clamp (0.6.3) @@ -33,20 +35,19 @@ GEM thor diff-lcs (1.2.5) docile (1.1.3) - edn (1.0.2) + edn (1.0.3) parslet (~> 1.4.0) - elasticsearch (1.0.1) - elasticsearch-api (= 1.0.1) - elasticsearch-transport (= 1.0.1) - elasticsearch-api (1.0.1) + elasticsearch (1.0.2) + elasticsearch-api (= 1.0.2) + elasticsearch-transport (= 1.0.2) + elasticsearch-api (1.0.2) multi_json - elasticsearch-transport (1.0.1) + elasticsearch-transport (1.0.2) faraday multi_json extlib (0.9.16) faraday (0.9.0) multipart-post (>= 1.2, < 3) - ffi (1.9.3) ffi (1.9.3-java) ffi-rzmq (1.0.0) ffi @@ -59,23 +60,21 @@ GEM gelf (1.3.2) json gelfd (0.2.0) - geoip (1.3.5) + geoip (1.4.0) gmetric (0.1.3) - hitimes (1.2.1) hitimes (1.2.1-java) http (0.5.0) http_parser.rb - http_parser.rb (0.5.3) http_parser.rb (0.5.3-java) i18n (0.6.9) insist (1.0.0) jls-grok (0.10.12) cabin (>= 0.6.0) jls-lumberjack (0.0.20) + jrjackson (0.2.7) jruby-httpclient (1.1.1-java) jruby-openssl (0.8.7) bouncy-castle-java (>= 1.5.0147) - json (1.8.1) json (1.8.1-java) kramdown (1.3.3) mail (2.5.3) @@ -90,16 +89,14 @@ GEM avl_tree (~> 1.1.2) hitimes (~> 1.1) mime-types (1.25.1) - mini_portile (0.5.2) - minitest (5.3.0) - mocha (1.0.0) + minitest (5.3.4) + mocha (1.1.0) metaclass (~> 0.0.1) msgpack-jruby (1.4.0-java) - multi_json (1.8.4) + multi_json (1.10.1) multipart-post (2.0.0) murmurhash3 (0.1.4) - nokogiri (1.6.1-java) - mini_portile (~> 0.5.0) + nokogiri (1.6.2.1-java) parslet (1.4.0) blankslate (~> 2.0) polyglot (0.3.4) @@ -109,9 +106,9 @@ GEM slop (~> 3.4) spoon (~> 0.0) rack (1.5.2) - rack-protection (1.5.2) + rack-protection (1.5.3) rack - rbnacl (2.0.0) + rbnacl (3.1.0) ffi redis (3.0.7) rest-client (1.6.7) @@ -120,7 +117,7 @@ GEM rspec-core (~> 2.14.0) rspec-expectations (~> 2.14.0) rspec-mocks (~> 2.14.0) - rspec-core (2.14.7) + rspec-core (2.14.8) rspec-expectations (2.14.5) diff-lcs (>= 1.1.3, < 2.0) rspec-mocks (2.14.6) @@ -131,8 +128,8 @@ GEM shoulda (3.5.0) shoulda-context (~> 1.0, >= 1.0.1) shoulda-matchers (>= 1.4.1, < 3.0) - shoulda-context (1.1.6) - shoulda-matchers (2.5.0) + shoulda-context (1.2.1) + shoulda-matchers (2.6.1) activesupport (>= 3.0.0) simple_oauth (0.2.0) simplecov (0.8.2) @@ -140,11 +137,11 @@ GEM multi_json simplecov-html (~> 0.8.0) simplecov-html (0.8.0) - sinatra (1.4.4) + sinatra (1.4.5) rack (~> 1.4) rack-protection (~> 1.4) tilt (~> 1.3, >= 1.3.4) - slop (3.4.7) + slop (3.5.0) snmp (1.1.1) spoon (0.0.4) ffi @@ -154,11 +151,10 @@ GEM metriks term-ansicolor (1.3.0) tins (~> 1.0) - thor (0.18.1) - thread_safe (0.2.0-java) - atomic (>= 1.1.7, < 2) + thor (0.19.1) + thread_safe (0.3.3-java) tilt (1.4.1) - tins (1.0.0) + tins (1.3.0) treetop (1.4.15) polyglot polyglot (>= 0.3.1) @@ -169,10 +165,9 @@ GEM http_parser.rb (~> 0.5.0) json (~> 1.8) simple_oauth (~> 0.2.0) - tzinfo (1.1.0) + tzinfo (1.2.0) thread_safe (~> 0.1) - user_agent_parser (2.1.2) - uuidtools (2.1.4) + user_agent_parser (2.1.5) xml-simple (1.1.3) xmpp4r (0.5) @@ -206,9 +201,9 @@ DEPENDENCIES insist (= 1.0.0) jls-grok (= 0.10.12) jls-lumberjack (>= 0.0.20) + jrjackson jruby-httpclient jruby-openssl (= 0.8.7) - json kramdown mail march_hare (~> 2.1.0)