add support for pipeline.ordered setting for java execution (#11552)

reuse rubyArray for single element batches

rename preserveBatchOrder to preserveEventOrder

allow boolean and string values for the pipeline.ordered setting, reorg validation

update docs

yml typo

Update docs/static/running-logstash-command-line.asciidoc

Co-Authored-By: Karen Metts <35154725+karenzone@users.noreply.github.com>

Update docs/static/running-logstash-command-line.asciidoc

Co-Authored-By: Karen Metts <35154725+karenzone@users.noreply.github.com>

java execution specs and spec support

docs corrections per review

typo

close not shutdown

Ruby pipeline spec
This commit is contained in:
Colin Surprenant 2020-01-29 14:05:47 -05:00 committed by GitHub
parent 5cbc9eeb47
commit 6a8bebffe6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 301 additions and 15 deletions

View file

@ -57,6 +57,15 @@
#
# pipeline.unsafe_shutdown: false
#
# Set the pipeline event ordering. Options are "auto" (the default), "true" or "false".
# "auto" will automatically enable ordering if the 'pipeline.workers' setting
# is also set to '1'.
# "true" will enforce ordering on the pipeline and prevent logstash from starting
# if there are multiple workers.
# "false" will disable any extra processing necessary for preserving ordering.
#
pipeline.ordered: auto
#
# ------------ Pipeline Configuration Settings --------------
#
# Where to fetch the pipeline configuration for the main pipeline

View file

@ -107,6 +107,19 @@ With this command, Logstash concatenates three config files, `/tmp/one`, `/tmp/t
backing up, or that the CPU is not saturated, consider increasing this number to better utilize
machine processing power. The default is the number of the host's CPU cores.
*`--pipeline.ordered ORDERED`*::
Preserves events order. Possible values are `auto` (default), `true` and `false`.
This setting
will work only when also using a single worker for the pipeline.
Note that when enabled, it may impact the performance of the filters
and ouput processing.
The `auto` option will automatically enable ordering if the
`pipeline.workers` setting is set to `1`.
Use `true` to enable ordering on the pipeline and prevent logstash
from starting if there are multiple workers.
Use `false` to disable any extra processing necessary for preserving
ordering.
*`-b, --pipeline.batch.size SIZE`*::
Size of batches the pipeline is to work in. This option defines the maximum number of events an
individual worker thread will collect from inputs before attempting to execute its filters and outputs.
@ -189,4 +202,3 @@ With this command, Logstash concatenates three config files, `/tmp/one`, `/tmp/t
*`-h, --help`*::
Print help

View file

@ -114,6 +114,22 @@ The `logstash.yml` file includes the following settings.
| (Beta) Load Java plugins in independent classloaders to isolate their dependencies.
| `false`
| `pipeline.ordered`
a|
Set the pipeline event ordering.Valid options are:
* `auto`
* `true`
* `false`
`auto` will automatically enable ordering if the `pipeline.workers` setting is also set to `1`.
`true` will enforce ordering on the pipeline and prevent logstash from starting
if there are multiple workers.
`false` will disable the processing required to preserve order. Ordering will not be
guaranteed, but you save the processing cost of preserving order.
| `auto`
| `path.config`
| The path to the Logstash config for the main pipeline. If you specify a directory or wildcard,
config files are read from the directory in alphabetical order.

View file

@ -45,6 +45,7 @@ module LogStash
Setting::Boolean.new("pipeline.reloadable", true),
Setting::Boolean.new("pipeline.plugin_classloaders", false),
Setting::Boolean.new("pipeline.separate_logs", false),
Setting::CoercibleString.new("pipeline.ordered", "auto", true, ["auto", "true", "false"]),
Setting.new("path.plugins", Array, []),
Setting::NullableString.new("interactive", nil, false),
Setting::Boolean.new("config.debug", false),

View file

@ -212,6 +212,7 @@ module LogStash; class JavaPipeline < JavaBasePipeline
maybe_setup_out_plugins
pipeline_workers = safe_pipeline_worker_count
@preserve_event_order = preserve_event_order?(pipeline_workers)
batch_size = settings.get("pipeline.batch.size")
batch_delay = settings.get("pipeline.batch.delay")
@ -488,7 +489,8 @@ module LogStash; class JavaPipeline < JavaBasePipeline
@flushRequested,
@flushing,
@shutdownRequested,
@drain_queue)
@drain_queue,
@preserve_event_order)
rescue => e
@logger.error(
"Worker loop initialization error",
@ -509,4 +511,19 @@ module LogStash; class JavaPipeline < JavaBasePipeline
keys[:thread] ||= thread.inspect if thread
keys
end
def preserve_event_order?(pipeline_workers)
case settings.get("pipeline.ordered")
when "auto"
if settings.set?("pipeline.workers") && settings.get("pipeline.workers") == 1
@logger.warn("'pipeline.ordered' is enabled and is likely less efficient, consider disabling if preserving event order is not necessary")
return true
end
when "true"
fail("enabling the 'pipeline.ordered' setting requires the use of a single pipeline worker") if pipeline_workers > 1
return true
end
false
end
end; end

View file

@ -293,6 +293,7 @@ module LogStash; class Pipeline < BasePipeline
maybe_setup_out_plugins
pipeline_workers = safe_pipeline_worker_count
verify_event_ordering!(pipeline_workers)
batch_size = settings.get("pipeline.batch.size")
batch_delay = settings.get("pipeline.batch.delay")
@ -653,4 +654,12 @@ module LogStash; class Pipeline < BasePipeline
def draining_queue?
@drain_queue ? !filter_queue_client.empty? : false
end
def verify_event_ordering!(pipeline_workers)
# the Ruby execution keep event order by design but when using a single worker only
if settings.get("pipeline.ordered") == "true" && pipeline_workers > 1
fail("enabling the 'pipeline.ordered' setting requires the use of a single pipeline worker")
end
end
end; end

View file

@ -108,6 +108,11 @@ class LogStash::Runner < Clamp::StrictCommand
:attribute_name => "pipeline.workers",
:default => LogStash::SETTINGS.get_default("pipeline.workers")
option "--pipeline.ordered", "ORDERED",
I18n.t("logstash.runner.flag.pipeline-ordered"),
:attribute_name => "pipeline.ordered",
:default => LogStash::SETTINGS.get_default("pipeline.ordered")
option ["--java-execution"], :flag,
I18n.t("logstash.runner.flag.java-execution"),
:attribute_name => "pipeline.java_execution",

View file

@ -31,6 +31,7 @@ module LogStash
"pipeline.reloadable",
"pipeline.system",
"pipeline.workers",
"pipeline.ordered",
"queue.checkpoint.acks",
"queue.checkpoint.interval",
"queue.checkpoint.writes",
@ -464,6 +465,28 @@ module LogStash
end
end
# The CoercibleString allows user to enter any value which coerces to a String.
# For example for true/false booleans; if the possible_strings are ["foo", "true", "false"]
# then these options in the config file or command line will be all valid: "foo", true, false, "true", "false"
#
class CoercibleString < Coercible
def initialize(name, default=nil, strict=true, possible_strings=[], &validator_proc)
@possible_strings = possible_strings
super(name, Object, default, strict, &validator_proc)
end
def coerce(value)
value.to_s
end
def validate(value)
super(value)
unless @possible_strings.empty? || @possible_strings.include?(value)
raise ArgumentError.new("Invalid value \"#{value}\". Options are: #{@possible_strings.inspect}")
end
end
end
class ExistingFilePath < Setting
def initialize(name, default=nil, strict=true)
super(name, ::String, default, strict) do |file_path|

View file

@ -288,6 +288,18 @@ en:
Sets the ID of the pipeline.
pipeline-workers: |+
Sets the number of pipeline workers to run.
pipeline-ordered: |+
Preserve events order. Possible values are `auto` (default), `true` and `false`.
This setting
will only work when also using a single worker for the pipeline.
Note that when enabled, it may impact the performance of the filters
and ouput processing.
The `auto` option will automatically enable ordering if the
`pipeline.workers` setting is set to `1`.
Use `true` to enable ordering on the pipeline and prevent logstash
from starting if there are multiple workers.
Use `false` to disable any extra processing necessary for preserving
ordering.
java-execution: |+
Use Java execution engine.
plugin-classloaders: |+

View file

@ -3,7 +3,6 @@ require 'spec_helper'
require 'support/pipeline/pipeline_helpers'
module ConditionalFanciness
include PipelineHelpers
def description
return self.metadata[:description]
end
@ -63,6 +62,19 @@ end
describe "conditionals in filter" do
extend ConditionalFanciness
extend PipelineHelpers
let(:settings) do
# settings is used by sample_one.
# This was originally set directly in sample_one and
# pipeline.workers was also set to 1. I am preserving
# this setting here for the sake of minimizing change
# but unsure if this is actually required.
s = LogStash::SETTINGS.clone
s.set_value("pipeline.workers", 1)
s
end
describe "simple" do
config <<-CONFIG

View file

@ -52,6 +52,17 @@ end
describe LogStash::Filters::NOOP do
extend PipelineHelpers
let(:settings) do
# settings is used by sample_one.
# This was originally set directly in sample_one and
# pipeline.workers was also set to 1. I am preserving
# this setting here for the sake of minimizing change
# but unsure if this is actually required.
s = LogStash::SETTINGS.clone
s.set_value("pipeline.workers", 1)
s
end
describe "adding multiple values to one field" do
config <<-CONFIG

View file

@ -4,7 +4,6 @@ require "logstash/inputs/generator"
require "logstash/filters/drop"
require_relative "../support/mocks_classes"
require_relative "../support/helpers"
require_relative "../logstash/pipeline_reporter_spec" # for DummyOutput class
require 'support/pipeline/pipeline_helpers'
require "stud/try"
require 'timeout'
@ -401,6 +400,8 @@ describe LogStash::JavaPipeline do
context "compiled flush function" do
extend PipelineHelpers
let(:settings) { LogStash::SETTINGS.clone }
describe "flusher thread" do
before(:each) do
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput)
@ -448,6 +449,131 @@ describe LogStash::JavaPipeline do
end
end
context "batch order" do
extend PipelineHelpers
context "with a single worker and ordering enabled" do
let(:settings) do
s = LogStash::SETTINGS.clone
s.set_value("pipeline.workers", 1)
s.set_value("pipeline.ordered", "true")
s
end
config <<-CONFIG
filter {
if [message] =~ "\\d" {
mutate { add_tag => "digit" }
} else {
mutate { add_tag => "letter" }
}
}
CONFIG
sample_one(["a", "1", "b", "2", "c", "3"]) do
expect(subject.map{|e| e.get("message")}).to eq(["a", "1", "b", "2", "c", "3"])
end
end
context "with a multiple workers and ordering enabled" do
let(:settings) do
s = LogStash::SETTINGS.clone
s.set_value("pipeline.workers", 2)
s.set_value("pipeline.ordered", "true")
s
end
let(:config) { "input { } output { }" }
let(:pipeline) { mock_java_pipeline_from_string(config, settings) }
it "should raise error" do
expect{pipeline.run}.to raise_error(RuntimeError, /pipeline\.ordered/)
pipeline.close
end
end
context "with an explicit single worker ordering will auto enable" do
let(:settings) do
s = LogStash::SETTINGS.clone
s.set_value("pipeline.workers", 1)
s.set_value("pipeline.ordered", "auto")
s
end
config <<-CONFIG
filter {
if [message] =~ "\\d" {
mutate { add_tag => "digit" }
} else {
mutate { add_tag => "letter" }
}
}
CONFIG
sample_one(["a", "1", "b", "2", "c", "3"]) do
expect(subject.map{|e| e.get("message")}).to eq(["a", "1", "b", "2", "c", "3"])
end
end
context "with an implicit single worker ordering will not auto enable" do
let(:settings) do
s = LogStash::SETTINGS.clone
s.set_value("pipeline.ordered", "auto")
s
end
before(:each) do
# this is to make sure this test will be valid by having a pipeline.workers default value > 1
# and not explicitly set.
expect(settings.get_default("pipeline.workers")).to be > 1
expect(settings.set?("pipeline.workers")).to be_falsey
expect(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").at_least(1).time.and_return(DummyFilter)
expect(LogStash::Plugin).to receive(:lookup).with(any_args).at_least(3).time.and_call_original
end
config <<-CONFIG
filter {
# per above dummyfilter is not threadsafe hence will set the number of workers to 1
dummyfilter { }
if [message] =~ "\\d" {
mutate { add_tag => "digit" }
} else {
mutate { add_tag => "letter" }
}
}
CONFIG
sample_one(["a", "1", "b", "2", "c", "3"]) do
expect(subject.map{|e| e.get("message")}).to eq(["1", "2", "3", "a", "b", "c"])
end
end
context "with a single worker and ordering disabled" do
let(:settings) do
s = LogStash::SETTINGS.clone
s.set_value("pipeline.workers", 1)
s.set_value("pipeline.ordered", "false")
s
end
config <<-CONFIG
filter {
if [message] =~ "\\d" {
mutate { add_tag => "digit" }
} else {
mutate { add_tag => "letter" }
}
}
CONFIG
sample_one(["a", "1", "b", "2", "c", "3"]) do
expect(subject.map{|e| e.get("message")}).to eq(["1", "2", "3", "a", "b", "c"])
end
end
end
describe "max inflight warning" do
let(:config) { "input { dummyinput {} } output { dummyoutput {} }" }
let(:batch_size) { 1 }
@ -489,6 +615,8 @@ describe LogStash::JavaPipeline do
context "compiled filter functions" do
context "new events should propagate down the filters" do
extend PipelineHelpers
let(:settings) { LogStash::SETTINGS.clone }
config <<-CONFIG
filter {
clone {

View file

@ -1020,4 +1020,13 @@ describe LogStash::Pipeline do
end
end
end
context "event ordering" do
let(:pipeline) { mock_pipeline_from_string("input { } output { }", mock_settings("pipeline.ordered" => true, "pipeline.workers" => 2)) }
it "fail running when ordering is set to true and there are multiple workers" do
expect{pipeline.run}.to raise_error(RuntimeError, /pipeline\.ordered/)
pipeline.close
end
end
end

View file

@ -8,7 +8,6 @@ require "thread"
java_import org.logstash.common.SourceWithMetadata
module PipelineHelpers
class SpecSamplerInput < LogStash::Inputs::Base
config_name "spec_sampler_input"
@ -58,9 +57,7 @@ module PipelineHelpers
describe "\"#{name}\"" do
let(:pipeline) do
settings = ::LogStash::SETTINGS.clone
settings.set_value("queue.drain", true)
settings.set_value("pipeline.workers", 1)
LogStash::JavaPipeline.new(
LogStash::Config::PipelineConfig.new(
LogStash::Config::Source::Local, :main,
@ -73,9 +70,9 @@ module PipelineHelpers
end
let(:event) do
sample_event = [sample_event] unless sample_event.is_a?(Array)
next sample_event.collect do |e|
sample_event.map do |e|
e = { "message" => e } if e.is_a?(String)
next LogStash::Event.new(e)
LogStash::Event.new(e)
end
end

View file

@ -4,6 +4,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jruby.RubyArray;
import org.jruby.runtime.ThreadContext;
import org.logstash.RubyUtil;
import org.logstash.config.ir.CompiledPipeline;
@ -37,10 +38,19 @@ public final class WorkerLoop implements Runnable {
private final boolean drainQueue;
public WorkerLoop(final CompiledPipeline pipeline, final QueueReadClient readClient,
final LongAdder filteredCounter, final LongAdder consumedCounter,
final AtomicBoolean flushRequested, final AtomicBoolean flushing,
final AtomicBoolean shutdownRequested, final boolean drainQueue) {
private final boolean preserveEventOrder;
public WorkerLoop(
final CompiledPipeline pipeline,
final QueueReadClient readClient,
final LongAdder filteredCounter,
final LongAdder consumedCounter,
final AtomicBoolean flushRequested,
final AtomicBoolean flushing,
final AtomicBoolean shutdownRequested,
final boolean drainQueue,
final boolean preserveEventOrder)
{
this.consumedCounter = consumedCounter;
this.filteredCounter = filteredCounter;
this.execution = pipeline.buildExecution();
@ -49,6 +59,7 @@ public final class WorkerLoop implements Runnable {
this.flushRequested = flushRequested;
this.flushing = flushing;
this.shutdownRequested = shutdownRequested;
this.preserveEventOrder = preserveEventOrder;
}
@Override
@ -61,7 +72,7 @@ public final class WorkerLoop implements Runnable {
consumedCounter.add(batch.filteredSize());
final boolean isFlush = flushRequested.compareAndSet(true, false);
readClient.startMetrics(batch);
execution.compute(batch.to_a(), isFlush, false);
compute(batch, isFlush, false);
int filteredCount = batch.filteredSize();
filteredCounter.add(filteredCount);
readClient.addOutputMetrics(filteredCount);
@ -75,7 +86,7 @@ public final class WorkerLoop implements Runnable {
//for this we need to create a new empty batch to contain the final flushed events
final QueueBatch batch = readClient.newBatch();
readClient.startMetrics(batch);
execution.compute(batch.to_a(), true, true);
compute(batch, true, true);
readClient.closeBatch(batch);
} catch (final Exception ex) {
LOGGER.error(
@ -86,6 +97,20 @@ public final class WorkerLoop implements Runnable {
}
}
@SuppressWarnings("unchecked")
private void compute(final QueueBatch batch, final boolean flush, final boolean shutdown) {
if (preserveEventOrder) {
// send batch events one-by-one as single-element batches
@SuppressWarnings({"rawtypes"}) final RubyArray singleElementBatch = RubyUtil.RUBY.newArray(1);
batch.to_a().forEach((e) -> {
singleElementBatch.set(0, e);
execution.compute(singleElementBatch, flush, shutdown);
});
} else {
execution.compute(batch.to_a(), flush, shutdown);
}
}
private boolean isDraining() {
return drainQueue && !readClient.isEmpty();
}