more Event java impl, relates to #4191

event clone

append in Ruby

tag & append

add timestamp assignment spec

comment out original classes

fix at signature

implemented del

remove from array

del test
This commit is contained in:
Colin Surprenant 2015-06-02 15:07:25 -04:00
parent 731d5dbb00
commit 4ada9363f9
12 changed files with 371 additions and 240 deletions

View file

@ -27,5 +27,13 @@ end
class LogStash::Event
TIMESTAMP = "@timestamp"
def append(event)
# non-destructively merge that event with ourselves.
# no need to reset @accessors here because merging will not disrupt any existing field paths
# and if new ones are created they will be picked up.
LogStash::Util.hash_merge(self.to_hash, event.to_hash)
end # append
end

View file

@ -5,93 +5,93 @@ require "forwardable"
require "date"
require "time"
module LogStash
class TimestampParserError < StandardError; end
class Timestamp
extend Forwardable
include Comparable
def_delegators :@time, :tv_usec, :usec, :year, :iso8601, :to_i, :tv_sec, :to_f, :to_edn, :<=>, :+
attr_reader :time
ISO8601_STRFTIME = "%04d-%02d-%02dT%02d:%02d:%02d.%06d%+03d:00".freeze
ISO8601_PRECISION = 3
def initialize(time = Time.new)
@time = time.utc
end
def self.at(*args)
Timestamp.new(::Time.at(*args))
end
def self.parse(*args)
Timestamp.new(::Time.parse(*args))
end
def self.now
Timestamp.new(::Time.now)
end
# coerce tries different strategies based on the time object class to convert into a Timestamp.
# @param [String, Time, Timestamp] time the time object to try coerce
# @return [Timestamp, nil] Timestamp will be returned if successful otherwise nil
# @raise [TimestampParserError] on String with invalid format
def self.coerce(time)
case time
when String
LogStash::Timestamp.parse_iso8601(time)
when LogStash::Timestamp
time
when Time
LogStash::Timestamp.new(time)
else
nil
end
end
if LogStash::Environment.jruby?
JODA_ISO8601_PARSER = org.joda.time.format.ISODateTimeFormat.dateTimeParser
UTC = org.joda.time.DateTimeZone.forID("UTC")
def self.parse_iso8601(t)
millis = JODA_ISO8601_PARSER.parseMillis(t)
LogStash::Timestamp.at(millis / 1000, (millis % 1000) * 1000)
rescue => e
raise(TimestampParserError, "invalid timestamp string #{t.inspect}, error=#{e.inspect}")
end
else
def self.parse_iso8601(t)
# warning, ruby's Time.parse is *really* terrible and slow.
LogStash::Timestamp.new(::Time.parse(t))
rescue => e
raise(TimestampParserError, "invalid timestamp string #{t.inspect}, error=#{e.inspect}")
end
end
def utc
@time.utc # modifies the receiver
self
end
alias_method :gmtime, :utc
def to_json(*args)
# ignore arguments to respect accepted to_json method signature
"\"" + to_iso8601 + "\""
end
alias_method :inspect, :to_json
def to_iso8601
@iso8601 ||= @time.iso8601(ISO8601_PRECISION)
end
alias_method :to_s, :to_iso8601
def -(value)
@time - (value.is_a?(Timestamp) ? value.time : value)
end
end
end
# module LogStash
# class TimestampParserError < StandardError; end
#
# class Timestamp
# extend Forwardable
# include Comparable
#
# def_delegators :@time, :tv_usec, :usec, :year, :iso8601, :to_i, :tv_sec, :to_f, :to_edn, :<=>, :+
#
# attr_reader :time
#
# ISO8601_STRFTIME = "%04d-%02d-%02dT%02d:%02d:%02d.%06d%+03d:00".freeze
# ISO8601_PRECISION = 3
#
# def initialize(time = Time.new)
# @time = time.utc
# end
#
# def self.at(*args)
# Timestamp.new(::Time.at(*args))
# end
#
# def self.parse(*args)
# Timestamp.new(::Time.parse(*args))
# end
#
# def self.now
# Timestamp.new(::Time.now)
# end
#
# # coerce tries different strategies based on the time object class to convert into a Timestamp.
# # @param [String, Time, Timestamp] time the time object to try coerce
# # @return [Timestamp, nil] Timestamp will be returned if successful otherwise nil
# # @raise [TimestampParserError] on String with invalid format
# def self.coerce(time)
# case time
# when String
# LogStash::Timestamp.parse_iso8601(time)
# when LogStash::Timestamp
# time
# when Time
# LogStash::Timestamp.new(time)
# else
# nil
# end
# end
#
# if LogStash::Environment.jruby?
# JODA_ISO8601_PARSER = org.joda.time.format.ISODateTimeFormat.dateTimeParser
# UTC = org.joda.time.DateTimeZone.forID("UTC")
#
# def self.parse_iso8601(t)
# millis = JODA_ISO8601_PARSER.parseMillis(t)
# LogStash::Timestamp.at(millis / 1000, (millis % 1000) * 1000)
# rescue => e
# raise(TimestampParserError, "invalid timestamp string #{t.inspect}, error=#{e.inspect}")
# end
#
# else
#
# def self.parse_iso8601(t)
# # warning, ruby's Time.parse is *really* terrible and slow.
# LogStash::Timestamp.new(::Time.parse(t))
# rescue => e
# raise(TimestampParserError, "invalid timestamp string #{t.inspect}, error=#{e.inspect}")
# end
# end
#
# def utc
# @time.utc # modifies the receiver
# self
# end
# alias_method :gmtime, :utc
#
# def to_json(*args)
# # ignore arguments to respect accepted to_json method signature
# "\"" + to_iso8601 + "\""
# end
# alias_method :inspect, :to_json
#
# def to_iso8601
# @iso8601 ||= @time.iso8601(ISO8601_PRECISION)
# end
# alias_method :to_s, :to_iso8601
#
# def -(value)
# @time - (value.is_a?(Timestamp) ? value.time : value)
# end
# end
# end

View file

@ -3,121 +3,122 @@ require "logstash/namespace"
require "logstash/util"
require "thread_safe"
module LogStash::Util
# module LogStash::Util
#
# # PathCache is a singleton which globally caches the relation between a field reference and its
# # decomposition into a [key, path array] tuple. For example the field reference [foo][bar][baz]
# # is decomposed into ["baz", ["foo", "bar"]].
# module PathCache
# extend self
#
# # requiring libraries and defining constants is thread safe in JRuby so
# # PathCache::CACHE will be corretly initialized, once, when accessors.rb
# # will be first required
# CACHE = ThreadSafe::Cache.new
#
# def get(field_reference)
# # the "get_or_default(x, nil) || put(x, parse(x))" is ~2x faster than "get || put" because the get call is
# # proxied through the JRuby JavaProxy op_aref method. the correct idiom here would be to use
# # "compute_if_absent(x){parse(x)}" but because of the closure creation, it is ~1.5x slower than
# # "get_or_default || put".
# # this "get_or_default || put" is obviously non-atomic which is not really important here
# # since all threads will set the same value and this cache will stabilize very quickly after the first
# # few events.
# CACHE.get_or_default(field_reference, nil) || CACHE.put(field_reference, parse(field_reference))
# end
#
# def parse(field_reference)
# path = field_reference.split(/[\[\]]/).select{|s| !s.empty?}
# [path.pop, path]
# end
# end
#
# # Accessors uses a lookup table to speedup access of a field reference of the form
# # "[hello][world]" to the underlying store hash into {"hello" => {"world" => "foo"}}
# class Accessors
#
# # @param store [Hash] the backing data store field refereces point to
# def initialize(store)
# @store = store
#
# # @lut is a lookup table between a field reference and a [target, key] tuple
# # where target is the containing Hash or Array for key in @store.
# # this allows us to directly access the containing object for key instead of
# # walking the field reference path into the inner @store objects
# @lut = {}
# end
#
# # @param field_reference [String] the field reference
# # @return [Object] the value in @store for this field reference
# def get(field_reference)
# target, key = lookup(field_reference)
# return nil unless target
# target.is_a?(Array) ? target[key.to_i] : target[key]
# end
#
# # @param field_reference [String] the field reference
# # @param value [Object] the value to set in @store for this field reference
# # @return [Object] the value set
# def set(field_reference, value)
# target, key = lookup_or_create(field_reference)
# target[target.is_a?(Array) ? key.to_i : key] = value
# end
#
# # @param field_reference [String] the field reference to remove
# # @return [Object] the removed value in @store for this field reference
# def del(field_reference)
# target, key = lookup(field_reference)
# return nil unless target
# target.is_a?(Array) ? target.delete_at(key.to_i) : target.delete(key)
# end
#
# # @param field_reference [String] the field reference to test for inclusion in the store
# # @return [Boolean] true if the store contains a value for this field reference
# def include?(field_reference)
# target, key = lookup(field_reference)
# return false unless target
#
# target.is_a?(Array) ? !target[key.to_i].nil? : target.include?(key)
# end
#
# private
#
# # retrieve the [target, key] tuple associated with this field reference
# # @param field_reference [String] the field referece
# # @return [[Object, String]] the [target, key] tuple associated with this field reference
# def lookup(field_reference)
# @lut[field_reference] ||= find_target(field_reference)
# end
#
# # retrieve the [target, key] tuple associated with this field reference and create inner
# # container objects if they do not exists
# # @param field_reference [String] the field referece
# # @return [[Object, String]] the [target, key] tuple associated with this field reference
# def lookup_or_create(field_reference)
# @lut[field_reference] ||= find_or_create_target(field_reference)
# end
#
# # find the target container object in store for this field reference
# # @param field_reference [String] the field referece
# # @return [Object] the target container object in store associated with this field reference
# def find_target(field_reference)
# key, path = PathCache.get(field_reference)
# target = path.inject(@store) do |r, k|
# return nil unless r
# r[r.is_a?(Array) ? k.to_i : k]
# end
# target ? [target, key] : nil
# end
#
# # find the target container object in store for this field reference and create inner
# # container objects if they do not exists
# # @param field_reference [String] the field referece
# # @return [Object] the target container object in store associated with this field reference
# def find_or_create_target(accessor)
# key, path = PathCache.get(accessor)
# target = path.inject(@store) {|r, k| r[r.is_a?(Array) ? k.to_i : k] ||= {}}
# [target, key]
# end
# end # class Accessors
# end # module LogStash::Util
# PathCache is a singleton which globally caches the relation between a field reference and its
# decomposition into a [key, path array] tuple. For example the field reference [foo][bar][baz]
# is decomposed into ["baz", ["foo", "bar"]].
module PathCache
extend self
# requiring libraries and defining constants is thread safe in JRuby so
# PathCache::CACHE will be corretly initialized, once, when accessors.rb
# will be first required
CACHE = ThreadSafe::Cache.new
def get(field_reference)
# the "get_or_default(x, nil) || put(x, parse(x))" is ~2x faster than "get || put" because the get call is
# proxied through the JRuby JavaProxy op_aref method. the correct idiom here would be to use
# "compute_if_absent(x){parse(x)}" but because of the closure creation, it is ~1.5x slower than
# "get_or_default || put".
# this "get_or_default || put" is obviously non-atomic which is not really important here
# since all threads will set the same value and this cache will stabilize very quickly after the first
# few events.
CACHE.get_or_default(field_reference, nil) || CACHE.put(field_reference, parse(field_reference))
end
def parse(field_reference)
path = field_reference.split(/[\[\]]/).select{|s| !s.empty?}
[path.pop, path]
end
end
# Accessors uses a lookup table to speedup access of a field reference of the form
# "[hello][world]" to the underlying store hash into {"hello" => {"world" => "foo"}}
class Accessors
# @param store [Hash] the backing data store field refereces point to
def initialize(store)
@store = store
# @lut is a lookup table between a field reference and a [target, key] tuple
# where target is the containing Hash or Array for key in @store.
# this allows us to directly access the containing object for key instead of
# walking the field reference path into the inner @store objects
@lut = {}
end
# @param field_reference [String] the field reference
# @return [Object] the value in @store for this field reference
def get(field_reference)
target, key = lookup(field_reference)
return nil unless target
target.is_a?(Array) ? target[key.to_i] : target[key]
end
# @param field_reference [String] the field reference
# @param value [Object] the value to set in @store for this field reference
# @return [Object] the value set
def set(field_reference, value)
target, key = lookup_or_create(field_reference)
target[target.is_a?(Array) ? key.to_i : key] = value
end
# @param field_reference [String] the field reference to remove
# @return [Object] the removed value in @store for this field reference
def del(field_reference)
target, key = lookup(field_reference)
return nil unless target
target.is_a?(Array) ? target.delete_at(key.to_i) : target.delete(key)
end
# @param field_reference [String] the field reference to test for inclusion in the store
# @return [Boolean] true if the store contains a value for this field reference
def include?(field_reference)
target, key = lookup(field_reference)
return false unless target
target.is_a?(Array) ? !target[key.to_i].nil? : target.include?(key)
end
private
# retrieve the [target, key] tuple associated with this field reference
# @param field_reference [String] the field referece
# @return [[Object, String]] the [target, key] tuple associated with this field reference
def lookup(field_reference)
@lut[field_reference] ||= find_target(field_reference)
end
# retrieve the [target, key] tuple associated with this field reference and create inner
# container objects if they do not exists
# @param field_reference [String] the field referece
# @return [[Object, String]] the [target, key] tuple associated with this field reference
def lookup_or_create(field_reference)
@lut[field_reference] ||= find_or_create_target(field_reference)
end
# find the target container object in store for this field reference
# @param field_reference [String] the field referece
# @return [Object] the target container object in store associated with this field reference
def find_target(field_reference)
key, path = PathCache.get(field_reference)
target = path.inject(@store) do |r, k|
return nil unless r
r[r.is_a?(Array) ? k.to_i : k]
end
target ? [target, key] : nil
end
# find the target container object in store for this field reference and create inner
# container objects if they do not exists
# @param field_reference [String] the field referece
# @return [Object] the target container object in store associated with this field reference
def find_or_create_target(accessor)
key, path = PathCache.get(accessor)
target = path.inject(@store) {|r, k| r[r.is_a?(Array) ? k.to_i : k] ||= {}}
[target, key]
end
end # class Accessors
end # module LogStash::Util

View file

@ -1,6 +1,8 @@
$LOAD_PATH << File.expand_path("../../../lib", __FILE__)
require "jruby_event/jruby_event"
require "logstash/util"
require "logstash/event"
TIMESTAMP = "@timestamp"
@ -106,6 +108,31 @@ describe LogStash::Event do
# now make sure the original map was not touched
expect(e.to_java.get_field(TIMESTAMP)).to be_kind_of(Java::ComLogstash::Timestamp)
end
it "should set timestamp" do
e = LogStash::Event.new
now = Time.now
e["@timestamp"] = LogStash::Timestamp.at(now.to_i)
expect(e.timestamp.to_i).to eq(now.to_i)
expect(e["@timestamp"].to_i).to eq(now.to_i)
end
end
context "append" do
it "show append" do
event = LogStash::Event.new("message" => "hello world")
event.append(LogStash::Event.new("message" => "another thing"))
expect(event["message"]).to eq(["hello world", "another thing"])
end
end
context "tags" do
it "should tag" do
event = LogStash::Event.new("message" => "hello world")
tag = "foo"
event["tags"] = []
event["tags"] << tag unless event["tags"].include?(tag)
end
end
end
``

View file

@ -27,7 +27,21 @@ public class Accessors {
}
public Object del(String reference) {
// TODO: implement
FieldReference field = PathCache.getInstance().cache(reference);
Object target = findTarget(field);
if (target != null) {
if (target instanceof Map) {
return ((Map<String, Object>) target).remove(field.getKey());
} else if (target instanceof List) {
int i = Integer.parseInt(field.getKey());
if (i < 0 || i >= ((List) target).size()) {
return null;
}
return ((List<Object>) target).remove(i);
} else {
throw new ClassCastException("expecting List or Map");
}
}
return null;
}
@ -90,9 +104,6 @@ public class Accessors {
private Object fetch(Object target, String key) {
if (target instanceof Map) {
Object result = ((Map<String, Object>) target).get(key);
// if (result != null) {
// System.out.println("fetch class=" + result.getClass().getName() + ", toString=" + result.toString());
// }
return result;
} else if (target instanceof List) {
int i = Integer.parseInt(key);
@ -100,9 +111,6 @@ public class Accessors {
return null;
}
Object result = ((List<Object>) target).get(i);
// if (result != null) {
// System.out.println("fetch class=" + result.getClass().getName() + ", toString=" + result.toString());
// }
return result;
} else {
throw new ClassCastException("expecting List or Map");

View file

@ -1,11 +1,9 @@
package com.logstash;
import org.codehaus.jackson.JsonGenerationException;
import java.io.IOException;
import java.util.Map;
public interface Event {
public interface Event extends Cloneable {
String toString();
@ -15,12 +13,14 @@ public interface Event {
boolean isCancelled();
Event clone();
Map<String, Object> getData();
void setData(Map<String, Object> data);
Accessors getAccessors();
void setAccessors(Accessors accessors);
Timestamp getTimestamp();
void setTimestamp(Timestamp t);
@ -43,4 +43,8 @@ public interface Event {
Event append(Event e);
String sprintf(String s) throws IOException;
void tag(String tag);
Event clone() throws CloneNotSupportedException;
}

View file

@ -51,11 +51,21 @@ public class EventImpl implements Event, Cloneable, Serializable {
return this.data;
}
@Override
public void setData(Map<String, Object> data) {
this.data = data;
}
@Override
public Accessors getAccessors() {
return this.accessors;
}
@Override
public void setAccessors(Accessors accessors) {
this.accessors = accessors;
}
@Override
public void cancel() {
this.cancelled = true;
@ -137,8 +147,12 @@ public class EventImpl implements Event, Cloneable, Serializable {
return StringInterpolation.getInstance().evaluate(this, s);
}
public Event clone() {
throw new UnsupportedOperationException("clone() not yet implemented");
public Event clone()
throws CloneNotSupportedException
{
Event clone = (Event)super.clone();
clone.setAccessors(new Accessors(clone.getData()));
return clone;
}
public String toString() {
@ -172,19 +186,22 @@ public class EventImpl implements Event, Cloneable, Serializable {
}
} catch (IllegalArgumentException e) {
// TODO: add error logging
List<Object> tags = (List<Object>) this.data.get("tags");
if (tags == null) {
tags = new ArrayList<>();
this.data.put("tags", tags);
}
if (!tags.contains(TIMESTAMP_FAILURE_TAG)) {
tags.add(TIMESTAMP_FAILURE_TAG);
}
tag(TIMESTAMP_FAILURE_TAG);
this.data.put(TIMESTAMP_FAILURE_FIELD, o.toString());
return new Timestamp();
}
}
public void tag(String tag) {
List<Object> tags = (List<Object>) this.data.get("tags");
if (tags == null) {
tags = new ArrayList<>();
this.data.put("tags", tags);
}
if (!tags.contains(tag)) {
tags.add(tag);
}
}
}

View file

@ -11,7 +11,7 @@ import org.jruby.RubyString;
import java.util.Date;
@JsonSerialize(using = TimestampSerializer.class)
public class Timestamp {
public class Timestamp implements Cloneable {
private DateTime time;
// TODO: is this DateTimeFormatter thread safe?
@ -49,6 +49,10 @@ public class Timestamp {
return time;
}
public void setTime(DateTime time) {
this.time = time;
}
public static Timestamp now() {
return new Timestamp();
}
@ -60,4 +64,11 @@ public class Timestamp {
public String toString() {
return toIso8601();
}
@Override
public Timestamp clone() throws CloneNotSupportedException {
Timestamp clone = (Timestamp)super.clone();
clone.setTime(this.getTime());
return clone;
}
}

View file

@ -41,7 +41,7 @@ public class JrubyEventExtLibrary implements Library {
}
public RubyEvent(Ruby runtime) {
this(runtime, runtime.getModule("LogStash").getClass("Timestamp"));
this(runtime, runtime.getModule("LogStash").getClass("Event"));
}
public RubyEvent(Ruby runtime, Event event) {
@ -115,6 +115,10 @@ public class JrubyEventExtLibrary implements Library {
} else if (value instanceof JrubyTimestampExtLibrary.RubyTimestamp) {
// RubyTimestamp could be assigned in another field thant @timestamp
this.event.setField(r, ((JrubyTimestampExtLibrary.RubyTimestamp) value).getTimestamp());
} else if (value instanceof RubyArray) {
this.event.setField(r, new ArrayList<Object>(Arrays.asList(((RubyArray) value).toJavaArray())));
} else {
throw context.runtime.newTypeError("wrong argument type " + value.getMetaClass());
}
}
return value;
@ -171,7 +175,11 @@ public class JrubyEventExtLibrary implements Library {
@JRubyMethod(name = "clone")
public IRubyObject ruby_clone(ThreadContext context)
{
return RubyEvent.newRubyEvent(context.runtime, this.event.clone());
try {
return RubyEvent.newRubyEvent(context.runtime, this.event.clone());
} catch (CloneNotSupportedException e) {
throw context.runtime.newRuntimeError(e.getMessage());
}
}
@JRubyMethod(name = "overwrite", required = 1)
@ -232,5 +240,12 @@ public class JrubyEventExtLibrary implements Library {
// TODO: add UTF-8 validation
return value;
}
@JRubyMethod(name = "tag", required = 1)
public IRubyObject ruby_tag(ThreadContext context, RubyString value)
{
this.event.tag(((RubyString) value).asJavaString());
return context.runtime.getNil();
}
}
}

View file

@ -121,14 +121,16 @@ public class JrubyTimestampExtLibrary implements Library {
return RubyString.newString(context.runtime, "\"" + this.timestamp.toIso8601() + "\"");
}
@JRubyMethod(name = "at", required = 1, meta = true)
public static IRubyObject ruby_at(ThreadContext context, IRubyObject recv, IRubyObject epoch)
@JRubyMethod(name = "at", required = 1, optional = 1, meta = true)
public static IRubyObject ruby_at(ThreadContext context, IRubyObject recv, IRubyObject[] args)
{
if (!(epoch instanceof RubyInteger)) {
throw context.runtime.newTypeError("wrong argument type " + epoch.getMetaClass() + " (expected integer Fixnum)");
RubyTime t;
if (args.length == 1) {
t = (RubyTime)RubyTime.at(context, context.runtime.getTime(), args[0]);
} else {
t = (RubyTime)RubyTime.at(context, context.runtime.getTime(), args[0], args[1]);
}
//
return RubyTimestamp.newRubyTimestamp(context.runtime, (((RubyInteger) epoch).getLongValue()));
return RubyTimestamp.newRubyTimestamp(context.runtime, new Timestamp(t.getDateTime()));
}
@JRubyMethod(name = "now", meta = true)

View file

@ -157,4 +157,20 @@ public class AccessorsTest {
assertEquals(accessors.lutGet(reference), data.get("foo"));
assertEquals(accessors.get(reference), "baz");
}
@Test
public void testDel() throws Exception {
Map data = new HashMap();
List inner = new ArrayList();
data.put("foo", inner);
inner.add("bar");
data.put("bar", "baz");
TestableAccessors accessors = new TestableAccessors(data);
assertEquals(accessors.del("[foo][0]"), "bar");
assertEquals(accessors.del("[foo][0]"), null);
assertEquals(accessors.get("[foo]"), new ArrayList<>());
assertEquals(accessors.del("[bar]"), "baz");
assertEquals(accessors.get("[bar]"), null);
}
}

View file

@ -84,4 +84,26 @@ public class EventTest {
Event e = new EventImpl(data);
assertEquals("baz", e.getField("[foo][0][bar]"));
}
@Test
public void testClone() throws Exception {
Map data = new HashMap();
List l = new ArrayList();
data.put("array", l);
Map m = new HashMap();
m.put("foo", "bar");
l.add(m);
data.put("foo", 1.0);
data.put("bar", "bar");
data.put("baz", 1);
Event e = new EventImpl(data);
Event f = e.clone();
assertEquals("{\"bar\":\"bar\",\"@timestamp\":\"" + e.getTimestamp().toIso8601() + "\",\"array\":[{\"foo\":\"bar\"}],\"foo\":1.0,\"@version\":\"1\",\"baz\":1}", f.toJson());
assertEquals(f.toJson(), e.toJson());
}
}