From 3dabba80faa27addf9c6005c075623ea9d6e4ef0 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Sat, 25 Apr 2015 15:13:33 +0200 Subject: [PATCH] revert multi_filter removal and add check for cancelled events add filter and flush compiles functions specs missing multiline filter for core specs fixes #3100 --- lib/logstash/config/config_ast.rb | 2 +- lib/logstash/filters/base.rb | 19 +++ rakelib/default_plugins.rb | 1 + spec/core/pipeline_spec.rb | 185 +++++++++++++++++++++--------- spec/filters/base_spec.rb | 21 +++- 5 files changed, 174 insertions(+), 54 deletions(-) diff --git a/lib/logstash/config/config_ast.rb b/lib/logstash/config/config_ast.rb index ad014ec5b..35ba4d440 100644 --- a/lib/logstash/config/config_ast.rb +++ b/lib/logstash/config/config_ast.rb @@ -230,7 +230,7 @@ module LogStash; module Config; module AST return "start_input(#{variable_name})" when "filter" return <<-CODE - #{variable_name}.filter(event) {|new_event| events << new_event } + events = #{variable_name}.multi_filter(events) CODE when "output" return "#{variable_name}.handle(event)\n" diff --git a/lib/logstash/filters/base.rb b/lib/logstash/filters/base.rb index 903912467..131661cfb 100644 --- a/lib/logstash/filters/base.rb +++ b/lib/logstash/filters/base.rb @@ -146,6 +146,25 @@ class LogStash::Filters::Base < LogStash::Plugin raise "#{self.class}#filter must be overidden" end # def filter + # in 1.5.0 multi_filter is meant to be used in the generated filter function in LogStash::Config::AST::Plugin only + # and is temporary until we refactor the filter method interface to accept events list and return events list, + # just list in multi_filter see https://github.com/elastic/logstash/issues/2872. + # refactoring the filter method will mean updating all plugins which we want to avoid doing for 1.5.0. + # + # @param events [Array 2 - } - } - eos - } - - context "output teardown" do - it "should call teardown of output without output-workers" do - pipeline = TestPipeline.new(test_config_without_output_workers) - pipeline.run - - expect(pipeline.outputs.size ).to eq(1) - expect(pipeline.outputs.first.worker_plugins.size ).to eq(1) - expect(pipeline.outputs.first.worker_plugins.first.num_teardowns ).to eq(1) + before(:each) do + LogStash::Plugin.stub(:lookup) + .with("input", "dummyinput").and_return(DummyInput) + LogStash::Plugin.stub(:lookup) + .with("codec", "plain").and_return(DummyCodec) + LogStash::Plugin.stub(:lookup) + .with("output", "dummyoutput").and_return(DummyOutput) end - it "should call output teardown correctly with output workers" do - pipeline = TestPipeline.new(test_config_with_output_workers) - pipeline.run + let(:test_config_without_output_workers) { + <<-eos + input { + dummyinput {} + } - expect(pipeline.outputs.size ).to eq(1) - expect(pipeline.outputs.first.num_teardowns).to eq(0) - pipeline.outputs.first.worker_plugins.each do |plugin| - expect(plugin.num_teardowns ).to eq(1) + output { + dummyoutput {} + } + eos + } + + let(:test_config_with_output_workers) { + <<-eos + input { + dummyinput {} + } + + output { + dummyoutput { + workers => 2 + } + } + eos + } + + context "output teardown" do + it "should call teardown of output without output-workers" do + pipeline = TestPipeline.new(test_config_without_output_workers) + pipeline.run + + expect(pipeline.outputs.size ).to eq(1) + expect(pipeline.outputs.first.worker_plugins.size ).to eq(1) + expect(pipeline.outputs.first.worker_plugins.first.num_teardowns ).to eq(1) + end + + it "should call output teardown correctly with output workers" do + pipeline = TestPipeline.new(test_config_with_output_workers) + pipeline.run + + expect(pipeline.outputs.size ).to eq(1) + expect(pipeline.outputs.first.num_teardowns).to eq(0) + pipeline.outputs.first.worker_plugins.each do |plugin| + expect(plugin.num_teardowns ).to eq(1) + end end end end + + context "compiled flush function" do + + context "cancelled events should not propagate down the filters" do + config <<-CONFIG + filter { + multiline { + pattern => "hello" + what => next + } + multiline { + pattern => "hello" + what => next + } + } + CONFIG + + sample("hello") do + expect(subject["message"]).to eq("hello") + end + end + + context "new events should propagate down the filters" do + config <<-CONFIG + filter { + clone { + clones => ["clone1"] + } + multiline { + pattern => "bar" + what => previous + } + } + CONFIG + + sample(["foo", "bar"]) do + expect(subject.size).to eq(2) + + expect(subject[0]["message"]).to eq("foo\nbar") + expect(subject[0]["type"]).to be_nil + expect(subject[1]["message"]).to eq("foo\nbar") + expect(subject[1]["type"]).to eq("clone1") + end + end + end + + context "compiled filter funtions" do + + context "new events should propagate down the filters" do + config <<-CONFIG + filter { + clone { + clones => ["clone1", "clone2"] + } + mutate { + add_field => {"foo" => "bar"} + } + } + CONFIG + + sample("hello") do + expect(subject.size).to eq(3) + + expect(subject[0]["message"]).to eq("hello") + expect(subject[0]["type"]).to be_nil + expect(subject[0]["foo"]).to eq("bar") + + expect(subject[1]["message"]).to eq("hello") + expect(subject[1]["type"]).to eq("clone1") + expect(subject[1]["foo"]).to eq("bar") + + expect(subject[2]["message"]).to eq("hello") + expect(subject[2]["type"]).to eq("clone2") + expect(subject[2]["foo"]).to eq("bar") + end + end + + end end diff --git a/spec/filters/base_spec.rb b/spec/filters/base_spec.rb index e49957fce..321b72965 100644 --- a/spec/filters/base_spec.rb +++ b/spec/filters/base_spec.rb @@ -24,10 +24,29 @@ describe LogStash::Filters::Base do end it "should provide class public API" do - [:register, :filter, :execute, :threadsafe?, :filter_matched, :filter?, :teardown].each do |method| + [:register, :filter, :multi_filter, :execute, :threadsafe?, :filter_matched, :filter?, :teardown].each do |method| expect(subject).to respond_to(method) end end + + context "multi_filter" do + let(:event1){LogStash::Event.new} + let(:event2){LogStash::Event.new} + + it "should multi_filter without new events" do + allow(subject).to receive(:filter) do |event, &block| + nil + end + expect(subject.multi_filter([event1])).to eq([event1]) + end + + it "should multi_filter with new events" do + allow(subject).to receive(:filter) do |event, &block| + block.call(event2) + end + expect(subject.multi_filter([event1])).to eq([event1, event2]) + end + end end describe LogStash::Filters::NOOP do