mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
revert multi_filter removal and add check for cancelled events
add filter and flush compiles functions specs missing multiline filter for core specs fixes #3100
This commit is contained in:
parent
c70f252fe4
commit
3dabba80fa
5 changed files with 174 additions and 54 deletions
|
@ -230,7 +230,7 @@ module LogStash; module Config; module AST
|
|||
return "start_input(#{variable_name})"
|
||||
when "filter"
|
||||
return <<-CODE
|
||||
#{variable_name}.filter(event) {|new_event| events << new_event }
|
||||
events = #{variable_name}.multi_filter(events)
|
||||
CODE
|
||||
when "output"
|
||||
return "#{variable_name}.handle(event)\n"
|
||||
|
|
|
@ -146,6 +146,25 @@ class LogStash::Filters::Base < LogStash::Plugin
|
|||
raise "#{self.class}#filter must be overidden"
|
||||
end # def filter
|
||||
|
||||
# in 1.5.0 multi_filter is meant to be used in the generated filter function in LogStash::Config::AST::Plugin only
|
||||
# and is temporary until we refactor the filter method interface to accept events list and return events list,
|
||||
# just list in multi_filter see https://github.com/elastic/logstash/issues/2872.
|
||||
# refactoring the filter method will mean updating all plugins which we want to avoid doing for 1.5.0.
|
||||
#
|
||||
# @param events [Array<LogStash::Event] list of events to filter
|
||||
# @return [Array<LogStash::Event] filtered events and any new events generated by the filter
|
||||
public
|
||||
def multi_filter(events)
|
||||
result = []
|
||||
events.each do |event|
|
||||
unless event.cancelled?
|
||||
result << event
|
||||
filter(event){|new_event| result << new_event}
|
||||
end
|
||||
end
|
||||
result
|
||||
end
|
||||
|
||||
public
|
||||
def execute(event, &block)
|
||||
filter(event, &block)
|
||||
|
|
|
@ -111,6 +111,7 @@ module LogStash
|
|||
CORE_SPECS_PLUGINS = %w(
|
||||
logstash-filter-clone
|
||||
logstash-filter-mutate
|
||||
logstash-filter-multiline
|
||||
logstash-input-generator
|
||||
logstash-input-stdin
|
||||
logstash-input-tcp
|
||||
|
|
|
@ -58,60 +58,141 @@ end
|
|||
|
||||
describe LogStash::Pipeline do
|
||||
|
||||
before(:each) do
|
||||
LogStash::Plugin.stub(:lookup)
|
||||
.with("input", "dummyinput").and_return(DummyInput)
|
||||
LogStash::Plugin.stub(:lookup)
|
||||
.with("codec", "plain").and_return(DummyCodec)
|
||||
LogStash::Plugin.stub(:lookup)
|
||||
.with("output", "dummyoutput").and_return(DummyOutput)
|
||||
end
|
||||
context "teardown" do
|
||||
|
||||
let(:test_config_without_output_workers) {
|
||||
<<-eos
|
||||
input {
|
||||
dummyinput {}
|
||||
}
|
||||
|
||||
output {
|
||||
dummyoutput {}
|
||||
}
|
||||
eos
|
||||
}
|
||||
|
||||
let(:test_config_with_output_workers) {
|
||||
<<-eos
|
||||
input {
|
||||
dummyinput {}
|
||||
}
|
||||
|
||||
output {
|
||||
dummyoutput {
|
||||
workers => 2
|
||||
}
|
||||
}
|
||||
eos
|
||||
}
|
||||
|
||||
context "output teardown" do
|
||||
it "should call teardown of output without output-workers" do
|
||||
pipeline = TestPipeline.new(test_config_without_output_workers)
|
||||
pipeline.run
|
||||
|
||||
expect(pipeline.outputs.size ).to eq(1)
|
||||
expect(pipeline.outputs.first.worker_plugins.size ).to eq(1)
|
||||
expect(pipeline.outputs.first.worker_plugins.first.num_teardowns ).to eq(1)
|
||||
before(:each) do
|
||||
LogStash::Plugin.stub(:lookup)
|
||||
.with("input", "dummyinput").and_return(DummyInput)
|
||||
LogStash::Plugin.stub(:lookup)
|
||||
.with("codec", "plain").and_return(DummyCodec)
|
||||
LogStash::Plugin.stub(:lookup)
|
||||
.with("output", "dummyoutput").and_return(DummyOutput)
|
||||
end
|
||||
|
||||
it "should call output teardown correctly with output workers" do
|
||||
pipeline = TestPipeline.new(test_config_with_output_workers)
|
||||
pipeline.run
|
||||
let(:test_config_without_output_workers) {
|
||||
<<-eos
|
||||
input {
|
||||
dummyinput {}
|
||||
}
|
||||
|
||||
expect(pipeline.outputs.size ).to eq(1)
|
||||
expect(pipeline.outputs.first.num_teardowns).to eq(0)
|
||||
pipeline.outputs.first.worker_plugins.each do |plugin|
|
||||
expect(plugin.num_teardowns ).to eq(1)
|
||||
output {
|
||||
dummyoutput {}
|
||||
}
|
||||
eos
|
||||
}
|
||||
|
||||
let(:test_config_with_output_workers) {
|
||||
<<-eos
|
||||
input {
|
||||
dummyinput {}
|
||||
}
|
||||
|
||||
output {
|
||||
dummyoutput {
|
||||
workers => 2
|
||||
}
|
||||
}
|
||||
eos
|
||||
}
|
||||
|
||||
context "output teardown" do
|
||||
it "should call teardown of output without output-workers" do
|
||||
pipeline = TestPipeline.new(test_config_without_output_workers)
|
||||
pipeline.run
|
||||
|
||||
expect(pipeline.outputs.size ).to eq(1)
|
||||
expect(pipeline.outputs.first.worker_plugins.size ).to eq(1)
|
||||
expect(pipeline.outputs.first.worker_plugins.first.num_teardowns ).to eq(1)
|
||||
end
|
||||
|
||||
it "should call output teardown correctly with output workers" do
|
||||
pipeline = TestPipeline.new(test_config_with_output_workers)
|
||||
pipeline.run
|
||||
|
||||
expect(pipeline.outputs.size ).to eq(1)
|
||||
expect(pipeline.outputs.first.num_teardowns).to eq(0)
|
||||
pipeline.outputs.first.worker_plugins.each do |plugin|
|
||||
expect(plugin.num_teardowns ).to eq(1)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "compiled flush function" do
|
||||
|
||||
context "cancelled events should not propagate down the filters" do
|
||||
config <<-CONFIG
|
||||
filter {
|
||||
multiline {
|
||||
pattern => "hello"
|
||||
what => next
|
||||
}
|
||||
multiline {
|
||||
pattern => "hello"
|
||||
what => next
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
sample("hello") do
|
||||
expect(subject["message"]).to eq("hello")
|
||||
end
|
||||
end
|
||||
|
||||
context "new events should propagate down the filters" do
|
||||
config <<-CONFIG
|
||||
filter {
|
||||
clone {
|
||||
clones => ["clone1"]
|
||||
}
|
||||
multiline {
|
||||
pattern => "bar"
|
||||
what => previous
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
sample(["foo", "bar"]) do
|
||||
expect(subject.size).to eq(2)
|
||||
|
||||
expect(subject[0]["message"]).to eq("foo\nbar")
|
||||
expect(subject[0]["type"]).to be_nil
|
||||
expect(subject[1]["message"]).to eq("foo\nbar")
|
||||
expect(subject[1]["type"]).to eq("clone1")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "compiled filter funtions" do
|
||||
|
||||
context "new events should propagate down the filters" do
|
||||
config <<-CONFIG
|
||||
filter {
|
||||
clone {
|
||||
clones => ["clone1", "clone2"]
|
||||
}
|
||||
mutate {
|
||||
add_field => {"foo" => "bar"}
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
sample("hello") do
|
||||
expect(subject.size).to eq(3)
|
||||
|
||||
expect(subject[0]["message"]).to eq("hello")
|
||||
expect(subject[0]["type"]).to be_nil
|
||||
expect(subject[0]["foo"]).to eq("bar")
|
||||
|
||||
expect(subject[1]["message"]).to eq("hello")
|
||||
expect(subject[1]["type"]).to eq("clone1")
|
||||
expect(subject[1]["foo"]).to eq("bar")
|
||||
|
||||
expect(subject[2]["message"]).to eq("hello")
|
||||
expect(subject[2]["type"]).to eq("clone2")
|
||||
expect(subject[2]["foo"]).to eq("bar")
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
|
|
@ -24,10 +24,29 @@ describe LogStash::Filters::Base do
|
|||
end
|
||||
|
||||
it "should provide class public API" do
|
||||
[:register, :filter, :execute, :threadsafe?, :filter_matched, :filter?, :teardown].each do |method|
|
||||
[:register, :filter, :multi_filter, :execute, :threadsafe?, :filter_matched, :filter?, :teardown].each do |method|
|
||||
expect(subject).to respond_to(method)
|
||||
end
|
||||
end
|
||||
|
||||
context "multi_filter" do
|
||||
let(:event1){LogStash::Event.new}
|
||||
let(:event2){LogStash::Event.new}
|
||||
|
||||
it "should multi_filter without new events" do
|
||||
allow(subject).to receive(:filter) do |event, &block|
|
||||
nil
|
||||
end
|
||||
expect(subject.multi_filter([event1])).to eq([event1])
|
||||
end
|
||||
|
||||
it "should multi_filter with new events" do
|
||||
allow(subject).to receive(:filter) do |event, &block|
|
||||
block.call(event2)
|
||||
end
|
||||
expect(subject.multi_filter([event1])).to eq([event1, event2])
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe LogStash::Filters::NOOP do
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue