BUG: Memory Queue needs to always be drained

Fixes #9024
This commit is contained in:
Armin 2018-01-24 13:39:23 +01:00 committed by Andrew Cholakian
parent 499a982762
commit b30252fb19
3 changed files with 3 additions and 3 deletions

View file

@ -171,7 +171,7 @@ module LogStash; class JavaPipeline < JavaBasePipeline
@filter_queue_client.set_pipeline_metric(
metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :events])
)
@drain_queue = @settings.get_value("queue.drain")
@drain_queue = @settings.get_value("queue.drain") || settings.get("queue.type") == "memory"
@events_filtered = Concurrent::AtomicFixnum.new(0)
@events_consumed = Concurrent::AtomicFixnum.new(0)

View file

@ -183,7 +183,7 @@ module LogStash; class Pipeline < BasePipeline
@filter_queue_client.set_pipeline_metric(
metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :events])
)
@drain_queue = @settings.get_value("queue.drain")
@drain_queue = @settings.get_value("queue.drain") || settings.get("queue.type") == "memory"
@events_filtered = Concurrent::AtomicFixnum.new(0)

View file

@ -66,7 +66,7 @@ public final class WorkerLoop implements Runnable {
if (isFlush) {
flushing.set(false);
}
} while (!shutdownRequested && !isDraining(context));
} while (!shutdownRequested || isDraining(context));
//we are shutting down, queue is drained if it was required, now perform a final flush.
//for this we need to create a new empty batch to contain the final flushed events
final IRubyObject batch = readClient.callMethod(context, "new_batch");