Metrics: More consistent serialization, Witness support for DLQ, PQ, and Java clean up

* Additional consistent serialization (don't check for null value, continuation of https://github.com/elastic/logstash/pull/8009 / 5d83a71aa4)
* Add support for Dead Letter Queue
* Add support for persitent Queue
* Scope clean up

Part of #7788

Fixes #8023
This commit is contained in:
Jake Landis 2017-08-17 19:22:33 -05:00
parent f08a628b5c
commit 17c9d668cd
20 changed files with 711 additions and 72 deletions

View file

@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.logstash.instrument.metrics.Metric;
import org.logstash.instrument.metrics.gauge.BooleanGauge;
import org.logstash.instrument.metrics.gauge.LongGauge;
import org.logstash.instrument.metrics.gauge.TextGauge;
import java.io.IOException;
@ -22,6 +23,7 @@ final public class ConfigWitness implements SerializableWitness {
private final LongGauge workers;
private final LongGauge batchDelay;
private final LongGauge configReloadInterval;
private final TextGauge deadLetterQueuePath;
private final Snitch snitch;
private final static String KEY = "config";
private static final Serializer SERIALIZER = new Serializer();
@ -37,6 +39,7 @@ final public class ConfigWitness implements SerializableWitness {
workers = new LongGauge("workers");
batchDelay = new LongGauge("batch_delay");
configReloadInterval = new LongGauge("config_reload_interval");
deadLetterQueuePath = new TextGauge("dead_letter_queue_path");
snitch = new Snitch(this);
}
@ -85,6 +88,15 @@ final public class ConfigWitness implements SerializableWitness {
deadLetterQueueEnabled.set(enabled);
}
/**
* The configured path for the dead letter queue.
*
* @param path the path used by the dead letter queue
*/
public void deadLetterQueuePath(String path) {
deadLetterQueuePath.set(path);
}
/**
* The number of configured workers
*
@ -141,6 +153,7 @@ final public class ConfigWitness implements SerializableWitness {
MetricSerializer<Metric<Long>> longSerializer = MetricSerializer.Get.longSerializer(gen);
MetricSerializer<Metric<Boolean>> booleanSerializer = MetricSerializer.Get.booleanSerializer(gen);
MetricSerializer<Metric<String>> stringSerializer = MetricSerializer.Get.stringSerializer(gen);
longSerializer.serialize(witness.batchSize);
longSerializer.serialize(witness.workers);
@ -148,6 +161,7 @@ final public class ConfigWitness implements SerializableWitness {
longSerializer.serialize(witness.configReloadInterval);
booleanSerializer.serialize(witness.configReloadAutomatic);
booleanSerializer.serialize(witness.deadLetterQueueEnabled);
stringSerializer.serialize(witness.deadLetterQueuePath);
gen.writeEndObject();
}
}
@ -155,10 +169,10 @@ final public class ConfigWitness implements SerializableWitness {
/**
* The snitch for the errors. Used to retrieve discrete metric values.
*/
public static class Snitch {
public class Snitch {
private final ConfigWitness witness;
Snitch(ConfigWitness witness) {
private Snitch(ConfigWitness witness) {
this.witness = witness;
}
@ -166,9 +180,9 @@ final public class ConfigWitness implements SerializableWitness {
/**
* Gets the configured batch delay
*
* @return the batch delay
* @return the batch delay. May be {@code null}
*/
public long batchDelay() {
public Long batchDelay() {
return witness.batchDelay.getValue();
}
@ -176,9 +190,9 @@ final public class ConfigWitness implements SerializableWitness {
/**
* Gets the configured batch size
*
* @return the batch size
* @return the batch size. May be {@code null}
*/
public long batchSize() {
public Long batchSize() {
return witness.batchSize.getValue();
}
@ -188,15 +202,16 @@ final public class ConfigWitness implements SerializableWitness {
* @return true if configured for automatic, false otherwise
*/
public boolean configReloadAutomatic() {
return witness.configReloadAutomatic.getValue();
Boolean reload = witness.configReloadAutomatic.getValue();
return reload == null ? false : reload;
}
/**
* Gets the configured reload interval
*
* @return the configured reload interval
* @return the configured reload interval. May be {@code null}
*/
public long configReloadInterval() {
public Long configReloadInterval() {
return witness.configReloadInterval.getValue();
}
@ -206,20 +221,26 @@ final public class ConfigWitness implements SerializableWitness {
* @return true if the dead letter queue is configured to be enabled, false otherwise
*/
public boolean deadLetterQueueEnabled() {
return witness.deadLetterQueueEnabled.getValue();
Boolean enabled = witness.deadLetterQueueEnabled.getValue();
return enabled == null ? false : enabled;
}
/**
* Gets the path that the dead letter queue is configured.
*
* @return the configured path for the dead letter queue. May be {@code null}
*/
public String deadLetterQueuePath() {
return witness.deadLetterQueuePath.getValue();
}
/**
* Gets the number of configured workers
*
* @return the configured number of workers.
* @return the configured number of workers. May be {@code null}
*/
public long workers() {
public Long workers() {
return witness.workers.getValue();
}
}
}

View file

@ -0,0 +1,110 @@
package org.logstash.instrument.witness;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.logstash.instrument.metrics.Metric;
import org.logstash.instrument.metrics.gauge.LongGauge;
import java.io.IOException;
/**
* Witness for the Dead Letter Queue
*/
@JsonSerialize(using = DeadLetterQueueWitness.Serializer.class)
public class DeadLetterQueueWitness implements SerializableWitness {
private static String KEY = "dead_letter_queue";
private static final Serializer SERIALIZER = new Serializer();
private final Snitch snitch;
private final LongGauge queueSizeInBytes;
/**
* Constructor
*/
public DeadLetterQueueWitness() {
queueSizeInBytes = new LongGauge("queue_size_in_bytes");
snitch = new Snitch(this);
}
/**
* Set the dead letter queue size, represented in bytes
*
* @param size the byte size of the queue
*/
public void queueSizeInBytes(long size) {
queueSizeInBytes.set(size);
}
/**
* Get a reference to associated snitch to get discrete metric values.
*
* @return the associate {@link Snitch}
*/
public Snitch snitch() {
return this.snitch;
}
@Override
public void genJson(JsonGenerator gen, SerializerProvider provider) throws IOException {
SERIALIZER.innerSerialize(this, gen, provider);
}
/**
* The Jackson serializer.
*/
static class Serializer extends StdSerializer<DeadLetterQueueWitness> {
/**
* Default constructor - required for Jackson
*/
public Serializer() {
this(DeadLetterQueueWitness.class);
}
/**
* Constructor
*
* @param t the type to serialize
*/
protected Serializer(Class<DeadLetterQueueWitness> t) {
super(t);
}
@Override
public void serialize(DeadLetterQueueWitness witness, JsonGenerator gen, SerializerProvider provider) throws IOException {
gen.writeStartObject();
innerSerialize(witness, gen, provider);
gen.writeEndObject();
}
void innerSerialize(DeadLetterQueueWitness witness, JsonGenerator gen, SerializerProvider provider) throws IOException {
gen.writeObjectFieldStart(KEY);
MetricSerializer<Metric<Long>> longSerializer = MetricSerializer.Get.longSerializer(gen);
longSerializer.serialize(witness.queueSizeInBytes);
gen.writeEndObject();
}
}
/**
* The snitch for the dead letter queue. Used to retrieve discrete metric values.
*/
public class Snitch {
private final DeadLetterQueueWitness witness;
private Snitch(DeadLetterQueueWitness witness) {
this.witness = witness;
}
/**
* Gets the queue size in bytes
*
* @return the queue size in bytes. May be {@code null}
*/
public Long queueSizeInBytes() {
return witness.queueSizeInBytes.getValue();
}
}
}

View file

@ -85,7 +85,7 @@ public class ErrorWitness implements SerializableWitness {
/**
* The Jackson serializer.
*/
public static class Serializer extends StdSerializer<ErrorWitness> {
static class Serializer extends StdSerializer<ErrorWitness> {
/**
* Default constructor - required for Jackson
@ -122,10 +122,10 @@ public class ErrorWitness implements SerializableWitness {
/**
* The snitch for the errors. Used to retrieve discrete metric values.
*/
public static class Snitch {
public class Snitch {
private final ErrorWitness witness;
Snitch(ErrorWitness witness) {
private Snitch(ErrorWitness witness) {
this.witness = witness;
}

View file

@ -131,7 +131,7 @@ final public class EventsWitness implements SerializableWitness {
/**
* The Jackson serializer.
*/
public static class Serializer extends StdSerializer<EventsWitness> {
static class Serializer extends StdSerializer<EventsWitness> {
/**
* Default constructor - required for Jackson
@ -171,11 +171,11 @@ final public class EventsWitness implements SerializableWitness {
/**
* The snitch for the {@link EventsWitness}. Allows to read discrete metrics values.
*/
public static class Snitch {
public class Snitch {
private final EventsWitness witness;
Snitch(EventsWitness witness) {
private Snitch(EventsWitness witness) {
this.witness = witness;
}

View file

@ -36,8 +36,9 @@ public interface MetricSerializer<T extends Metric<?>> {
*/
static MetricSerializer<Metric<Long>> longSerializer(JsonGenerator gen) {
return m -> {
if (m != null && m.getValue() != null) {
gen.writeNumberField(m.getName(), m.getValue());
if (m != null) {
Long value = m.getValue();
gen.writeNumberField(m.getName(), value == null ? 0 : value);
}
};
}
@ -50,8 +51,9 @@ public interface MetricSerializer<T extends Metric<?>> {
*/
static MetricSerializer<Metric<Boolean>> booleanSerializer(JsonGenerator gen) {
return m -> {
if (m != null && m.getValue() != null) {
gen.writeBooleanField(m.getName(), m.getValue());
if (m != null) {
Boolean value = m.getValue();
gen.writeBooleanField(m.getName(), value == null ? false : value);
}
};
}
@ -64,7 +66,7 @@ public interface MetricSerializer<T extends Metric<?>> {
*/
static MetricSerializer<Metric<String>> stringSerializer(JsonGenerator gen) {
return m -> {
if (m != null && m.getValue() != null) {
if (m != null) {
gen.writeStringField(m.getName(), m.getValue());
}
};

View file

@ -18,6 +18,7 @@ final public class PipelineWitness implements SerializableWitness {
private final ConfigWitness configWitness;
private final PluginsWitness pluginsWitness;
private final QueueWitness queueWitness;
private final DeadLetterQueueWitness deadLetterQueueWitness;
private final String KEY;
private static final Serializer SERIALIZER = new Serializer();
@ -33,6 +34,7 @@ final public class PipelineWitness implements SerializableWitness {
this.configWitness = new ConfigWitness();
this.pluginsWitness = new PluginsWitness();
this.queueWitness = new QueueWitness();
this.deadLetterQueueWitness = new DeadLetterQueueWitness();
}
/**
@ -44,6 +46,14 @@ final public class PipelineWitness implements SerializableWitness {
return configWitness;
}
/**
* Get a reference to the associated dead letter queue witness
* @return The associated {@link DeadLetterQueueWitness}
*/
public DeadLetterQueueWitness dlq() {
return deadLetterQueueWitness;
}
/**
* Get a reference to associated events witness
*
@ -132,7 +142,7 @@ final public class PipelineWitness implements SerializableWitness {
/**
* The Jackson serializer.
*/
public static class Serializer extends StdSerializer<PipelineWitness> {
static class Serializer extends StdSerializer<PipelineWitness> {
/**
* Default constructor - required for Jackson
@ -163,6 +173,9 @@ final public class PipelineWitness implements SerializableWitness {
witness.plugins().genJson(gen, provider);
witness.reloads().genJson(gen, provider);
witness.queue().genJson(gen, provider);
if (witness.config().snitch().deadLetterQueueEnabled()) {
witness.dlq().genJson(gen, provider);
}
gen.writeEndObject();
}
}

View file

@ -45,7 +45,7 @@ final public class PipelinesWitness implements SerializableWitness {
/**
* The Jackson serializer.
*/
public static class Serializer extends StdSerializer<PipelinesWitness> {
static class Serializer extends StdSerializer<PipelinesWitness> {
/**
* Default constructor - required for Jackson

View file

@ -70,7 +70,7 @@ public class PluginWitness implements SerializableWitness {
/**
* The Jackson JSON serializer.
*/
public static class Serializer extends StdSerializer<PluginWitness> {
static class Serializer extends StdSerializer<PluginWitness> {
/**
* Default constructor - required for Jackson
@ -106,11 +106,11 @@ public class PluginWitness implements SerializableWitness {
/**
* Snitch for a plugin. Provides discrete metric values.
*/
public static class Snitch {
public class Snitch {
private final PluginWitness witness;
Snitch(PluginWitness witness) {
private Snitch(PluginWitness witness) {
this.witness = witness;
}

View file

@ -103,7 +103,7 @@ public class PluginsWitness implements SerializableWitness {
/**
* The Jackson serializer.
*/
public static class Serializer extends StdSerializer<PluginsWitness> {
static class Serializer extends StdSerializer<PluginsWitness> {
/**
* Default constructor - required for Jackson

View file

@ -4,6 +4,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.logstash.instrument.metrics.Metric;
import org.logstash.instrument.metrics.gauge.LongGauge;
import org.logstash.instrument.metrics.gauge.TextGauge;
import java.io.IOException;
@ -15,7 +17,10 @@ import java.io.IOException;
final public class QueueWitness implements SerializableWitness {
private final TextGauge type;
private final LongGauge events; // note this is NOT an EventsWitness
private final Snitch snitch;
private final CapacityWitness capacity;
private final DataWitness data;
private final static String KEY = "queue";
private static final Serializer SERIALIZER = new Serializer();
@ -24,7 +29,37 @@ final public class QueueWitness implements SerializableWitness {
*/
public QueueWitness() {
type = new TextGauge("type");
events = new LongGauge("events");
snitch = new Snitch(this);
capacity = new CapacityWitness();
data = new DataWitness();
}
/**
* The number of events currently in the queue.
*
* @param count the count of events currently in the queue
*/
public void events(long count) {
events.set(count);
}
/**
* Get the capacity witness for this queue.
*
* @return the associated {@link CapacityWitness}
*/
public CapacityWitness capacity() {
return capacity;
}
/**
* Get the data witness for this queue.
*
* @return the associated {@link DataWitness}
*/
public DataWitness data() {
return data;
}
/**
@ -50,10 +85,222 @@ final public class QueueWitness implements SerializableWitness {
SERIALIZER.innerSerialize(this, gen, provider);
}
/**
* Inner witness for the queue capacity
*/
public class CapacityWitness {
private final LongGauge queueSizeInBytes;
private final LongGauge pageCapacityInBytes;
private final LongGauge maxQueueSizeInBytes;
private final LongGauge maxUnreadEvents;
private final Snitch snitch;
private final static String KEY = "capacity";
private CapacityWitness() {
queueSizeInBytes = new LongGauge("queue_size_in_bytes");
pageCapacityInBytes = new LongGauge("page_capacity_in_bytes");
maxQueueSizeInBytes = new LongGauge("max_queue_size_in_bytes");
maxUnreadEvents = new LongGauge("max_unread_events");
snitch = new Snitch(this);
}
/**
* Set the queue size for this queue, represented in bytes
*
* @param size the byte size of this queue
*/
public void queueSizeInBytes(long size) {
queueSizeInBytes.set(size);
}
/**
* Set the page capacity for this queue, represented in bytes.
*
* @param capacity the byte capacity of this queue.
*/
public void pageCapacityInBytes(long capacity) {
pageCapacityInBytes.set(capacity);
}
/**
* Set the max queue size, represented in bytes.
*
* @param max the max queue size of this queue.
*/
public void maxQueueSizeInBytes(long max) {
maxQueueSizeInBytes.set(max);
}
/**
* Set the max unread events count.
*
* @param max the max unread events.
*/
public void maxUnreadEvents(long max) {
maxUnreadEvents.set(max);
}
/**
* Get a reference to associated snitch to get discrete metric values.
*
* @return the associate {@link Snitch}
*/
public Snitch snitch() {
return snitch;
}
/**
* Snitch for queue capacity. Provides discrete metric values.
*/
public class Snitch {
private final CapacityWitness witness;
private Snitch(CapacityWitness witness) {
this.witness = witness;
}
/**
* Gets the queue size in bytes
*
* @return the queue size in bytes. May be {@code null}
*/
public Long queueSizeInBytes() {
return witness.queueSizeInBytes.getValue();
}
/**
* Gets the page queue capacity in bytes.
*
* @return the page queue capacity.
*/
public Long pageCapacityInBytes() {
return witness.pageCapacityInBytes.getValue();
}
/**
* Gets the max queue size in bytes.
*
* @return the max queue size.
*/
public Long maxQueueSizeInBytes() {
return witness.maxQueueSizeInBytes.getValue();
}
/**
* Get the max unread events from this queue.
*
* @return the max unread events.
*/
public Long maxUnreadEvents() {
return witness.maxUnreadEvents.getValue();
}
}
}
/**
* Inner witness for the queue data
*/
public class DataWitness {
private final TextGauge path;
private final LongGauge freeSpaceInBytes;
private final TextGauge storageType;
private final Snitch snitch;
private final static String KEY = "data";
private DataWitness() {
path = new TextGauge("path");
freeSpaceInBytes = new LongGauge("free_space_in_bytes");
storageType = new TextGauge("storage_type");
snitch = new Snitch(this);
}
/**
* Set the free space for this queue, represented in bytes
*
* @param space the free byte size for this queue
*/
public void freeSpaceInBytes(long space) {
freeSpaceInBytes.set(space);
}
/**
* Set the path for this persistent queue.
*
* @param path the path to the persistent queue
*/
public void path(String path) {
this.path.set(path);
}
/**
* Set the storage type for this queue.
*
* @param storageType the storage type for this queue.
*/
public void storageType(String storageType) {
this.storageType.set(storageType);
}
/**
* Get a reference to associated snitch to get discrete metric values.
*
* @return the associate {@link Snitch}
*/
public Snitch snitch() {
return snitch;
}
/**
* Snitch for queue capacity. Provides discrete metric values.
*/
public class Snitch {
private final DataWitness witness;
private Snitch(DataWitness witness) {
this.witness = witness;
}
/**
* Gets the path of this persistent queue.
*
* @return the path to the persistent queue. May be {@code null}
*/
public String path() {
return witness.path.getValue();
}
/**
* Gets the free space of the queue in bytes.
*
* @return the free space of the queue
*/
public Long freeSpaceInBytes() {
return witness.freeSpaceInBytes.getValue();
}
/**
* Gets the storage type of the queue.
*
* @return the storage type.
*/
public String storageType() {
return witness.storageType.getValue();
}
}
}
/**
* The Jackson serializer.
*/
public static class Serializer extends StdSerializer<QueueWitness> {
static class Serializer extends StdSerializer<QueueWitness> {
/**
* Default constructor - required for Jackson
*/
@ -79,7 +326,25 @@ final public class QueueWitness implements SerializableWitness {
void innerSerialize(QueueWitness witness, JsonGenerator gen, SerializerProvider provider) throws IOException {
gen.writeObjectFieldStart(KEY);
MetricSerializer.Get.stringSerializer(gen).serialize(witness.type);
MetricSerializer<Metric<Long>> longSerializer = MetricSerializer.Get.longSerializer(gen);
MetricSerializer<Metric<String>> stringSerializer = MetricSerializer.Get.stringSerializer(gen);
stringSerializer.serialize(witness.type);
if ("persisted".equals(witness.type.getValue())) {
longSerializer.serialize(witness.events);
//capacity
gen.writeObjectFieldStart(CapacityWitness.KEY);
longSerializer.serialize(witness.capacity.queueSizeInBytes);
longSerializer.serialize(witness.capacity.pageCapacityInBytes);
longSerializer.serialize(witness.capacity.maxQueueSizeInBytes);
longSerializer.serialize(witness.capacity.maxUnreadEvents);
gen.writeEndObject();
//data
gen.writeObjectFieldStart(DataWitness.KEY);
stringSerializer.serialize(witness.data.path);
longSerializer.serialize(witness.data.freeSpaceInBytes);
stringSerializer.serialize(witness.data.storageType);
gen.writeEndObject();
}
gen.writeEndObject();
}
}
@ -87,22 +352,31 @@ final public class QueueWitness implements SerializableWitness {
/**
* Snitch for queue. Provides discrete metric values.
*/
public static class Snitch {
public class Snitch {
private final QueueWitness witness;
Snitch(QueueWitness witness) {
private Snitch(QueueWitness witness) {
this.witness = witness;
}
/**
* Gets the type of queue
*
* @return the queue type.
* @return the queue type. May be {@code null}
*/
public String type() {
return witness.type.getValue();
}
/**
* Gets the number of events currently in the queue
*
* @return the count of events in the queue. {@code null}
*/
public Long events() {
return witness.events.getValue();
}
}
}

View file

@ -118,7 +118,7 @@ final public class ReloadWitness implements SerializableWitness {
/**
* The Jackson serializer.
*/
public static class Serializer extends StdSerializer<ReloadWitness> {
static class Serializer extends StdSerializer<ReloadWitness> {
/**
* Default constructor - required for Jackson

View file

@ -21,18 +21,21 @@ public class ConfigWitnessTest {
@Test
public void testBatchDelay() {
assertThat(witness.snitch().batchDelay()).isNull();
witness.batchDelay(99);
assertThat(witness.snitch().batchDelay()).isEqualTo(99);
}
@Test
public void testBatchSize() {
assertThat(witness.snitch().batchSize()).isNull();
witness.batchSize(98);
assertThat(witness.snitch().batchSize()).isEqualTo(98);
}
@Test
public void testConfigReloadAutomatic() {
assertThat(witness.snitch().configReloadAutomatic()).isFalse();
witness.configReloadAutomatic(true);
assertThat(witness.snitch().configReloadAutomatic()).isTrue();
witness.configReloadAutomatic(false);
@ -41,20 +44,30 @@ public class ConfigWitnessTest {
@Test
public void testConfigReloadInterval() {
assertThat(witness.snitch().configReloadInterval()).isNull();
witness.configReloadInterval(97);
assertThat(witness.snitch().configReloadInterval()).isEqualTo(97);
}
@Test
public void testDeadLetterQueueEnabled() {
assertThat(witness.snitch().deadLetterQueueEnabled()).isFalse();
witness.deadLetterQueueEnabled(true);
assertThat(witness.snitch().deadLetterQueueEnabled()).isTrue();
witness.deadLetterQueueEnabled(false);
assertThat(witness.snitch().deadLetterQueueEnabled()).isFalse();
}
@Test
public void testDeadLetterQueuePath() {
assertThat(witness.snitch().deadLetterQueuePath()).isNull();
witness.deadLetterQueuePath("/var/dlq");
assertThat(witness.snitch().deadLetterQueuePath()).isEqualTo("/var/dlq");
}
@Test
public void testWorkers() {
assertThat(witness.snitch().workers()).isNull();
witness.workers(96);
assertThat(witness.snitch().workers()).isEqualTo(96);
}
@ -68,7 +81,8 @@ public class ConfigWitnessTest {
@Test
public void testSerializeEmpty() throws Exception {
String json = witness.asJson();
assertThat(json).isEqualTo("{\"config\":{}}");
assertThat(json).isEqualTo("{\"config\":{\"batch_size\":0,\"workers\":0,\"batch_delay\":0,\"config_reload_interval\":0,\"config_reload_automatic\":false," +
"\"dead_letter_queue_enabled\":false,\"dead_letter_queue_path\":null}}");
}
@Test
@ -82,35 +96,48 @@ public class ConfigWitnessTest {
public void testSerializeWorkersSize() throws Exception {
witness.workers(888);
String json = witness.asJson();
assertThat(json).isEqualTo("{\"config\":{\"workers\":888}}");
assertThat(json).isEqualTo("{\"config\":{\"batch_size\":0,\"workers\":888,\"batch_delay\":0,\"config_reload_interval\":0,\"config_reload_automatic\":false," +
"\"dead_letter_queue_enabled\":false,\"dead_letter_queue_path\":null}}");
}
@Test
public void testSerializeBatchDelay() throws Exception {
witness.batchDelay(777);
String json = witness.asJson();
assertThat(json).isEqualTo("{\"config\":{\"batch_delay\":777}}");
assertThat(json).isEqualTo("{\"config\":{\"batch_size\":0,\"workers\":0,\"batch_delay\":777,\"config_reload_interval\":0,\"config_reload_automatic\":false," +
"\"dead_letter_queue_enabled\":false,\"dead_letter_queue_path\":null}}");
}
@Test
public void testSerializeAutoConfigReload() throws Exception {
witness.configReloadAutomatic(true);
String json = witness.asJson();
assertThat(json).isEqualTo("{\"config\":{\"config_reload_automatic\":true}}");
assertThat(json).isEqualTo("{\"config\":{\"batch_size\":0,\"workers\":0,\"batch_delay\":0,\"config_reload_interval\":0,\"config_reload_automatic\":true," +
"\"dead_letter_queue_enabled\":false,\"dead_letter_queue_path\":null}}");
}
@Test
public void testSerializeReloadInterval() throws Exception {
witness.configReloadInterval(666);
String json = witness.asJson();
assertThat(json).isEqualTo("{\"config\":{\"config_reload_interval\":666}}");
assertThat(json).isEqualTo("{\"config\":{\"batch_size\":0,\"workers\":0,\"batch_delay\":0,\"config_reload_interval\":666,\"config_reload_automatic\":false," +
"\"dead_letter_queue_enabled\":false,\"dead_letter_queue_path\":null}}");
}
@Test
public void testSerializeEnableDeadLetterQueue() throws Exception {
witness.deadLetterQueueEnabled(true);
String json = witness.asJson();
assertThat(json).isEqualTo("{\"config\":{\"dead_letter_queue_enabled\":true}}");
assertThat(json).isEqualTo("{\"config\":{\"batch_size\":0,\"workers\":0,\"batch_delay\":0,\"config_reload_interval\":0,\"config_reload_automatic\":false," +
"\"dead_letter_queue_enabled\":true,\"dead_letter_queue_path\":null}}");
}
@Test
public void testSerializeEnableDeadLetterPath() throws Exception {
witness.deadLetterQueuePath("/var/dlq");
String json = witness.asJson();
assertThat(json).isEqualTo("{\"config\":{\"batch_size\":0,\"workers\":0,\"batch_delay\":0,\"config_reload_interval\":0,\"config_reload_automatic\":false," +
"\"dead_letter_queue_enabled\":false,\"dead_letter_queue_path\":\"/var/dlq\"}}");
}
}

View file

@ -0,0 +1,47 @@
package org.logstash.instrument.witness;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Before;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Unit tests for {@link DeadLetterQueueWitness}
*/
public class DeadLetterQueueWitnessTest {
private DeadLetterQueueWitness witness;
@Before
public void setup() {
witness = new DeadLetterQueueWitness();
}
@Test
public void queueSizeInBytes() {
assertThat(witness.snitch().queueSizeInBytes()).isNull();
witness.queueSizeInBytes(99);
assertThat(witness.snitch().queueSizeInBytes()).isEqualTo(99);
}
@Test
public void testAsJson() throws Exception {
ObjectMapper mapper = new ObjectMapper();
assertThat(mapper.writeValueAsString(witness)).isEqualTo(witness.asJson());
}
@Test
public void testSerializeEmpty() throws Exception {
String json = witness.asJson();
assertThat(json).isEqualTo("{\"dead_letter_queue\":{\"queue_size_in_bytes\":0}}");
}
@Test
public void testSerializeQueueSize() throws Exception {
witness.queueSizeInBytes(98);
String json = witness.asJson();
assertThat(json).isEqualTo("{\"dead_letter_queue\":{\"queue_size_in_bytes\":98}}");
}
}

View file

@ -3,6 +3,7 @@ package org.logstash.instrument.witness;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Before;
import org.junit.Test;
import org.logstash.instrument.metrics.gauge.LongGauge;
import static org.assertj.core.api.Assertions.assertThat;
@ -47,14 +48,14 @@ public class ErrorWitnessTest {
@Test
public void testSerializeEmpty() throws Exception {
String json = witness.asJson();
assertThat(json).isEqualTo("{\"last_error\":{}}");
assertThat(json).isEqualTo("{\"last_error\":{\"message\":null,\"backtrace\":null}}");
}
@Test
public void testSerializeMessage() throws Exception {
witness.message("whoops");
String json = witness.asJson();
assertThat(json).isEqualTo("{\"last_error\":{\"message\":\"whoops\"}}");
assertThat(json).isEqualTo("{\"last_error\":{\"message\":\"whoops\",\"backtrace\":null}}");
}
@Test

View file

@ -15,13 +15,13 @@ public class PipelineWitnessTest {
private PipelineWitness witness;
@Before
public void setup(){
public void setup() {
witness = new PipelineWitness("default");
}
@Test
public void testNotNull(){
public void testNotNull() {
assertThat(witness.inputs("123")).isNotNull();
assertThat(witness.filters("456")).isNotNull();
assertThat(witness.outputs("789")).isNotNull();
@ -33,7 +33,7 @@ public class PipelineWitnessTest {
}
@Test
public void testForget(){
public void testForget() {
witness.inputs("123").events().in(99);
witness.filters("456").events().in(98);
witness.outputs("789").events().in(97);
@ -74,12 +74,12 @@ public class PipelineWitnessTest {
public void testSerializeEmpty() throws Exception {
String json = witness.asJson();
assertThat(json).isEqualTo("{\"default\":{\"events\":{\"duration_in_millis\":0,\"in\":0,\"out\":0,\"filtered\":0,\"queue_push_duration_in_millis\":0}," +
"\"plugins\":{\"inputs\":[],\"filters\":[],\"outputs\":[]},\"reloads\":{\"last_error\":{},\"successes\":0,\"last_success_timestamp\":null," +
"\"last_failure_timestamp\":null,\"failures\":0},\"queue\":{}}}");
"\"plugins\":{\"inputs\":[],\"filters\":[],\"outputs\":[]},\"reloads\":{\"last_error\":{\"message\":null,\"backtrace\":null},\"successes\":0," +
"\"last_success_timestamp\":null,\"last_failure_timestamp\":null,\"failures\":0},\"queue\":{\"type\":null}}}");
}
@Test
public void testSerializeEvents() throws Exception{
public void testSerializeEvents() throws Exception {
witness.events().in(99);
String json = witness.asJson();
assertThat(json).contains("99");
@ -90,7 +90,7 @@ public class PipelineWitnessTest {
}
@Test
public void testSerializePlugins() throws Exception{
public void testSerializePlugins() throws Exception {
witness.inputs("aaa");
witness.filters("bbb");
witness.outputs("ccc");
@ -103,17 +103,30 @@ public class PipelineWitnessTest {
}
@Test
public void testSerializeReloads() throws Exception{
public void testSerializeReloads() throws Exception {
witness.reloads().successes(98);
String json = witness.asJson();
assertThat(json).contains("98");
}
@Test
public void testSerializeQueue() throws Exception{
public void testSerializeQueue() throws Exception {
witness.queue().type("quantum");
String json = witness.asJson();
assertThat(json).contains("quantum");
}
/**
* Only serialize the DeadLetterQueue if enabled
* @throws Exception if an Exception is thrown.
*/
@Test
public void testSerializeDeadLetterQueue() throws Exception {
witness.config().deadLetterQueueEnabled(false);
String json = witness.asJson();
assertThat(json).doesNotContain("dead_letter_queue");
witness.config().deadLetterQueueEnabled(true);
json = witness.asJson();
assertThat(json).contains("\"dead_letter_queue\":{\"queue_size_in_bytes\":0}");
}
}

View file

@ -40,7 +40,7 @@ public class PluginWitnessTest {
@Test
public void testSerializationEmpty() throws Exception {
String json = witness.asJson();
assertThat(json).isEqualTo("{\"id\":\"123\",\"events\":{\"duration_in_millis\":0,\"in\":0,\"out\":0,\"filtered\":0,\"queue_push_duration_in_millis\":0}}");
assertThat(json).isEqualTo("{\"id\":\"123\",\"events\":{\"duration_in_millis\":0,\"in\":0,\"out\":0,\"filtered\":0,\"queue_push_duration_in_millis\":0},\"name\":null}");
}
@Test
@ -54,6 +54,6 @@ public class PluginWitnessTest {
public void testSerializationEvents() throws Exception {
witness.events().in();
String json = witness.asJson();
assertThat(json).isEqualTo("{\"id\":\"123\",\"events\":{\"duration_in_millis\":0,\"in\":1,\"out\":0,\"filtered\":0,\"queue_push_duration_in_millis\":0}}");
assertThat(json).isEqualTo("{\"id\":\"123\",\"events\":{\"duration_in_millis\":0,\"in\":1,\"out\":0,\"filtered\":0,\"queue_push_duration_in_millis\":0},\"name\":null}");
}
}

View file

@ -55,7 +55,7 @@ public class PluginsWitnessTest {
witness.inputs("foo");
String json = witness.asJson();
assertThat(json).isEqualTo("{\"plugins\":{\"inputs\":[{\"id\":\"foo\",\"events\":{\"duration_in_millis\":0,\"in\":0,\"out\":0,\"filtered\":0," +
"\"queue_push_duration_in_millis\":0}}],\"filters\":[],\"outputs\":[]}}");
"\"queue_push_duration_in_millis\":0},\"name\":null}],\"filters\":[],\"outputs\":[]}}");
witness.forgetAll();
json = witness.asJson();
assertThat(json).isEqualTo("{\"plugins\":{\"inputs\":[],\"filters\":[],\"outputs\":[]}}");
@ -66,7 +66,7 @@ public class PluginsWitnessTest {
witness.filters("foo");
String json = witness.asJson();
assertThat(json).isEqualTo("{\"plugins\":{\"inputs\":[],\"filters\":[{\"id\":\"foo\",\"events\":{\"duration_in_millis\":0,\"in\":0,\"out\":0,\"filtered\":0," +
"\"queue_push_duration_in_millis\":0}}],\"outputs\":[]}}");
"\"queue_push_duration_in_millis\":0},\"name\":null}],\"outputs\":[]}}");
witness.forgetAll();
json = witness.asJson();
assertThat(json).isEqualTo("{\"plugins\":{\"inputs\":[],\"filters\":[],\"outputs\":[]}}");
@ -77,7 +77,7 @@ public class PluginsWitnessTest {
witness.outputs("foo");
String json = witness.asJson();
assertThat(json).isEqualTo("{\"plugins\":{\"inputs\":[],\"filters\":[],\"outputs\":[{\"id\":\"foo\",\"events\":{\"duration_in_millis\":0,\"in\":0,\"out\":0," +
"\"filtered\":0,\"queue_push_duration_in_millis\":0}}]}}");
"\"filtered\":0,\"queue_push_duration_in_millis\":0},\"name\":null}]}}");
witness.forgetAll();
json = witness.asJson();
assertThat(json).isEqualTo("{\"plugins\":{\"inputs\":[],\"filters\":[],\"outputs\":[]}}");

View file

@ -15,15 +15,65 @@ public class QueueWitnessTest {
private QueueWitness witness;
@Before
public void setup(){
public void setup() {
witness = new QueueWitness();
}
@Test
public void testType(){
public void testType() {
witness.type("memory");
assertThat(witness.snitch().type()).isEqualTo("memory");
}
@Test
public void testEvents() {
assertThat(witness.snitch().events()).isNull();
witness.events(101);
assertThat(witness.snitch().events()).isEqualTo(101);
}
@Test
public void testQueueSizeInBytes(){
witness.capacity().queueSizeInBytes(99);
assertThat(witness.capacity().snitch().queueSizeInBytes()).isEqualTo(99);
}
@Test
public void testPageCapacityInBytes(){
witness.capacity().pageCapacityInBytes(98);
assertThat(witness.capacity().snitch().pageCapacityInBytes()).isEqualTo(98);
}
@Test
public void testMaxQueueSizeInBytes(){
witness.capacity().maxQueueSizeInBytes(97);
assertThat(witness.capacity().snitch().maxQueueSizeInBytes()).isEqualTo(97);
}
@Test
public void testMaxUnreadEvents(){
witness.capacity().maxUnreadEvents(96);
assertThat(witness.capacity().snitch().maxUnreadEvents()).isEqualTo(96);
}
@Test
public void testPath(){
witness.data().path("/var/ls/q");
assertThat(witness.data().snitch().path()).isEqualTo("/var/ls/q");
}
@Test
public void testFreeSpace(){
witness.data().freeSpaceInBytes(77);
assertThat(witness.data().snitch().freeSpaceInBytes()).isEqualTo(77);
}
@Test
public void testStorageType(){
witness.data().storageType("ext4");
assertThat(witness.data().snitch().storageType()).isEqualTo("ext4");
}
@Test
public void testAsJson() throws Exception {
ObjectMapper mapper = new ObjectMapper();
@ -31,16 +81,95 @@ public class QueueWitnessTest {
}
@Test
public void testSerializeEmpty() throws Exception{
public void testSerializeEmpty() throws Exception {
String json = witness.asJson();
assertThat(json).isEqualTo("{\"queue\":{}}");
assertThat(json).isEqualTo("{\"queue\":{\"type\":null}}");
}
@Test
public void testSerializeType() throws Exception{
public void testSerializeMemoryType() throws Exception {
witness.type("memory");
String json = witness.asJson();
assertThat(json).isEqualTo("{\"queue\":{\"type\":\"memory\"}}");
}
@Test
public void testSerializePersistedType() throws Exception {
witness.type("persisted");
String json = witness.asJson();
assertThat(json).isEqualTo("{\"queue\":{\"type\":\"persisted\",\"events\":0,\"capacity\":{\"queue_size_in_bytes\":0,\"page_capacity_in_bytes\":0," +
"\"max_queue_size_in_bytes\":0,\"max_unread_events\":0},\"data\":{\"path\":null,\"free_space_in_bytes\":0,\"storage_type\":null}}}");
}
@Test
public void testSerializeQueueSize() throws Exception {
witness.type("persisted");
witness.capacity().queueSizeInBytes(88);
String json = witness.asJson();
assertThat(json).isEqualTo("{\"queue\":{\"type\":\"persisted\",\"events\":0,\"capacity\":{\"queue_size_in_bytes\":88,\"page_capacity_in_bytes\":0," +
"\"max_queue_size_in_bytes\":0,\"max_unread_events\":0},\"data\":{\"path\":null,\"free_space_in_bytes\":0,\"storage_type\":null}}}");
}
@Test
public void testSerializeQueuePageCapacity() throws Exception {
witness.type("persisted");
witness.capacity().pageCapacityInBytes(87);
String json = witness.asJson();
assertThat(json).isEqualTo("{\"queue\":{\"type\":\"persisted\",\"events\":0,\"capacity\":{\"queue_size_in_bytes\":0,\"page_capacity_in_bytes\":87," +
"\"max_queue_size_in_bytes\":0,\"max_unread_events\":0},\"data\":{\"path\":null,\"free_space_in_bytes\":0,\"storage_type\":null}}}");
}
@Test
public void testSerializeMaxQueueSize() throws Exception {
witness.type("persisted");
witness.capacity().maxUnreadEvents(86);
String json = witness.asJson();
assertThat(json).isEqualTo("{\"queue\":{\"type\":\"persisted\",\"events\":0,\"capacity\":{\"queue_size_in_bytes\":0,\"page_capacity_in_bytes\":0," +
"\"max_queue_size_in_bytes\":0,\"max_unread_events\":86},\"data\":{\"path\":null,\"free_space_in_bytes\":0,\"storage_type\":null}}}");
}
@Test
public void testSerializeMaxUnreadEvents() throws Exception {
witness.type("persisted");
witness.capacity().maxUnreadEvents(85);
String json = witness.asJson();
assertThat(json).isEqualTo("{\"queue\":{\"type\":\"persisted\",\"events\":0,\"capacity\":{\"queue_size_in_bytes\":0,\"page_capacity_in_bytes\":0," +
"\"max_queue_size_in_bytes\":0,\"max_unread_events\":85},\"data\":{\"path\":null,\"free_space_in_bytes\":0,\"storage_type\":null}}}");
}
@Test
public void testSerializePath() throws Exception{
witness.type("persisted");
witness.data().path("/var/ls/q2");
String json = witness.asJson();
assertThat(json).isEqualTo("{\"queue\":{\"type\":\"persisted\",\"events\":0,\"capacity\":{\"queue_size_in_bytes\":0,\"page_capacity_in_bytes\":0," +
"\"max_queue_size_in_bytes\":0,\"max_unread_events\":0},\"data\":{\"path\":\"/var/ls/q2\",\"free_space_in_bytes\":0,\"storage_type\":null}}}");
}
@Test
public void testSerializeFreeSpace() throws Exception{
witness.type("persisted");
witness.data().freeSpaceInBytes(66);
String json = witness.asJson();
assertThat(json).isEqualTo("{\"queue\":{\"type\":\"persisted\",\"events\":0,\"capacity\":{\"queue_size_in_bytes\":0,\"page_capacity_in_bytes\":0," +
"\"max_queue_size_in_bytes\":0,\"max_unread_events\":0},\"data\":{\"path\":null,\"free_space_in_bytes\":66,\"storage_type\":null}}}");
}
@Test
public void testSerializeStorageType() throws Exception{
witness.type("persisted");
witness.data().storageType("xfs");
String json = witness.asJson();
assertThat(json).isEqualTo("{\"queue\":{\"type\":\"persisted\",\"events\":0,\"capacity\":{\"queue_size_in_bytes\":0,\"page_capacity_in_bytes\":0," +
"\"max_queue_size_in_bytes\":0,\"max_unread_events\":0},\"data\":{\"path\":null,\"free_space_in_bytes\":0,\"storage_type\":\"xfs\"}}}");
}
@Test
public void testSerializeEvents() throws Exception{
witness.type("persisted");
witness.events(102);
String json = witness.asJson();
assertThat(json).isEqualTo("{\"queue\":{\"type\":\"persisted\",\"events\":102,\"capacity\":{\"queue_size_in_bytes\":0,\"page_capacity_in_bytes\":0," +
"\"max_queue_size_in_bytes\":0,\"max_unread_events\":0},\"data\":{\"path\":null,\"free_space_in_bytes\":0,\"storage_type\":null}}}");
}
}

View file

@ -65,7 +65,8 @@ public class ReloadWitnessTest {
@Test
public void testSerializeEmpty() throws Exception {
String json = witness.asJson();
assertThat(json).isEqualTo("{\"reloads\":{\"last_error\":{},\"successes\":0,\"last_success_timestamp\":null,\"last_failure_timestamp\":null,\"failures\":0}}");
assertThat(json).isEqualTo("{\"reloads\":{\"last_error\":{\"message\":null,\"backtrace\":null},\"successes\":0,\"last_success_timestamp\":null," +
"\"last_failure_timestamp\":null,\"failures\":0}}");
}
@Test
@ -73,7 +74,7 @@ public class ReloadWitnessTest {
witness.success();
witness.lastSuccessTimestamp(rubyTimestamp);
String json = witness.asJson();
assertThat(json).isEqualTo("{\"reloads\":{\"last_error\":{},\"successes\":1,\"last_success_timestamp\":\"" + timestamp.toIso8601() +
assertThat(json).isEqualTo("{\"reloads\":{\"last_error\":{\"message\":null,\"backtrace\":null},\"successes\":1,\"last_success_timestamp\":\"" + timestamp.toIso8601() +
"\",\"last_failure_timestamp\":null,\"failures\":0}}");
}
@ -82,8 +83,8 @@ public class ReloadWitnessTest {
witness.failure();
witness.lastFailureTimestamp(rubyTimestamp);
String json = witness.asJson();
assertThat(json).isEqualTo("{\"reloads\":{\"last_error\":{},\"successes\":0,\"last_success_timestamp\":null,\"last_failure_timestamp\":\""
+ timestamp.toIso8601() + "\",\"failures\":1}}");
assertThat(json).isEqualTo("{\"reloads\":{\"last_error\":{\"message\":null,\"backtrace\":null},\"successes\":0,\"last_success_timestamp\":null," +
"\"last_failure_timestamp\":\"" + timestamp.toIso8601() + "\",\"failures\":1}}");
}
@Test

View file

@ -51,8 +51,9 @@ public class WitnessTest {
witness = new Witness();
String json = witness.asJson();
//empty pipelines
assertThat(json).contains("{\"events\":{\"duration_in_millis\":0,\"in\":0,\"out\":0,\"filtered\":0,\"queue_push_duration_in_millis\":0},\"reloads\":{\"last_error\":{}," +
"\"successes\":0,\"last_success_timestamp\":null,\"last_failure_timestamp\":null,\"failures\":0},\"pipelines\":{}}");
assertThat(json).isEqualTo("{\"events\":{\"duration_in_millis\":0,\"in\":0,\"out\":0,\"filtered\":0,\"queue_push_duration_in_millis\":0}," +
"\"reloads\":{\"last_error\":{\"message\":null,\"backtrace\":null},\"successes\":0,\"last_success_timestamp\":null,\"last_failure_timestamp\":null," +
"\"failures\":0},\"pipelines\":{}}");
}
@Test