mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
Merge remote branch 'origin/master'
Conflicts: lib/logstash/agent.rb
This commit is contained in:
commit
8a6935b3ec
5 changed files with 114 additions and 62 deletions
|
@ -213,7 +213,7 @@ class LogStash::Agent
|
|||
end # def configure
|
||||
|
||||
public
|
||||
def run
|
||||
def run(&block)
|
||||
LogStash::Util::set_thread_name(self.class.name)
|
||||
|
||||
ok = parse_options
|
||||
|
@ -221,11 +221,15 @@ class LogStash::Agent
|
|||
raise "Option parsing failed. See error log."
|
||||
end
|
||||
|
||||
|
||||
configure
|
||||
|
||||
# Load the config file
|
||||
config = LogStash::Config::File.new(@config_file)
|
||||
|
||||
run_with_config(config, &block)
|
||||
end # def run
|
||||
|
||||
def run_with_config(config)
|
||||
config.parse do |plugin|
|
||||
# 'plugin' is a has containing:
|
||||
# :type => the base class of the plugin (LogStash::Inputs::Base, etc)
|
||||
|
@ -261,6 +265,8 @@ class LogStash::Agent
|
|||
filter_queue = SizedQueue.new(10)
|
||||
output_queue = LogStash::MultiQueue.new
|
||||
|
||||
ready_queue = Queue.new
|
||||
|
||||
input_target = @filters.length > 0 ? filter_queue : output_queue
|
||||
# Start inputs
|
||||
@inputs.each do |input|
|
||||
|
@ -268,6 +274,7 @@ class LogStash::Agent
|
|||
@threads[input] = Thread.new(input_target) do |input_target|
|
||||
input.logger = @logger
|
||||
input.register
|
||||
ready_queue << input
|
||||
input.run(input_target)
|
||||
end # new thread for thsi input
|
||||
end # @inputs.each
|
||||
|
@ -309,6 +316,7 @@ class LogStash::Agent
|
|||
output_queue.add_queue(queue)
|
||||
@threads["outputs/#{output.to_s}"] = Thread.new(queue) do |queue|
|
||||
output.register
|
||||
ready_queue << output
|
||||
begin
|
||||
LogStash::Util::set_thread_name("output/#{output.to_s}")
|
||||
output.logger = @logger
|
||||
|
@ -321,14 +329,26 @@ class LogStash::Agent
|
|||
@logger.warn(["Output #{output.to_s} thread exception", e])
|
||||
@logger.debug(["Output #{output.to_s} thread exception backtrace",
|
||||
e.backtrace])
|
||||
# TODO(sissel): should we abort after too many failures?
|
||||
retry
|
||||
end
|
||||
end # begin/rescue
|
||||
end # Thread.new
|
||||
end # @outputs.each
|
||||
|
||||
# Wait for all inputs and outputs to be registered.
|
||||
wait_count = outputs.size + inputs.size
|
||||
while wait_count > 0 and ready_queue.pop
|
||||
wait_count -= 1
|
||||
end
|
||||
|
||||
# yield to a block in case someone's waiting for us to be done setting up
|
||||
# like tests, etc.
|
||||
yield if block_given?
|
||||
|
||||
# TODO(sissel): Monitor what's going on? Sleep forever? what?
|
||||
while sleep 5
|
||||
end
|
||||
end # def run
|
||||
end # def run_with_config
|
||||
|
||||
public
|
||||
def stop
|
||||
|
|
|
@ -5,14 +5,25 @@ require "logstash/agent"
|
|||
|
||||
class LogStash::Config::File
|
||||
public
|
||||
def initialize(file)
|
||||
@file = file
|
||||
end
|
||||
def initialize(path=nil, string=nil)
|
||||
@path = path
|
||||
@string = string
|
||||
|
||||
if (path.nil? and string.nil?) or (!path.nil? and !string.nil?)
|
||||
raise "Must give path or string, not both or neither"
|
||||
end
|
||||
end # def initialize
|
||||
|
||||
public
|
||||
def parse
|
||||
grammar = LogStash::Config::Grammar.new
|
||||
grammar.parse(File.new(@file).read)
|
||||
|
||||
if @string.nil?
|
||||
grammar.parse(File.new(@path).read)
|
||||
else
|
||||
grammar.parse(@string)
|
||||
end
|
||||
|
||||
@config = grammar.config
|
||||
|
||||
registry = LogStash::Config::Registry::registry
|
||||
|
|
|
@ -41,9 +41,9 @@ class LogStash::File::Manager
|
|||
|
||||
# TODO(sissel): inputs/base should do this.
|
||||
config["tag"] ||= []
|
||||
if !config["tag"].member?(config["type"])
|
||||
config["tag"] << config["type"]
|
||||
end
|
||||
#if !config["tag"].member?(config["type"])
|
||||
#config["tag"] << config["type"]
|
||||
#end
|
||||
|
||||
# TODO(sissel): Need to support file rotation, globs, etc
|
||||
begin
|
||||
|
@ -70,7 +70,7 @@ class LogStash::File::Manager
|
|||
"@type" => config["type"],
|
||||
"@tags" => config["tag"].dup,
|
||||
})
|
||||
e.source = "file://#{@hostname}/#{path}"
|
||||
e.source = "file://#{@hostname}#{path}"
|
||||
@logger.debug(["New event from file input", path, e])
|
||||
@output_queue << e
|
||||
end
|
||||
|
|
|
@ -2,38 +2,31 @@ require "logstash/namespace"
|
|||
require "logstash/outputs/base"
|
||||
|
||||
class LogStash::Outputs::Internal < LogStash::Outputs::Base
|
||||
|
||||
config_name "internal"
|
||||
|
||||
public
|
||||
def initialize(url, config={}, &block)
|
||||
super
|
||||
@callback = block
|
||||
end # def initialize
|
||||
attr_accessor :callback
|
||||
|
||||
public
|
||||
def register
|
||||
@logger.info("Registering output #{@url}")
|
||||
@logger.info("Registering internal output (for testing!)")
|
||||
@callbacks ||= []
|
||||
end # def register
|
||||
|
||||
public
|
||||
def receive(event)
|
||||
if !@callback
|
||||
if @callbacks.empty?
|
||||
@logger.error("No callback for output #{@url}, cannot receive")
|
||||
return
|
||||
end
|
||||
@callback.call(event)
|
||||
|
||||
@callbacks.each do |callback|
|
||||
callback.call(event)
|
||||
end
|
||||
end # def event
|
||||
|
||||
# Set the callback by passing a block of code
|
||||
public
|
||||
def callback(&block)
|
||||
@callback = block
|
||||
end
|
||||
|
||||
# Set the callback by passing a proc object
|
||||
public
|
||||
def callback=(proc_block)
|
||||
@callback = proc_block
|
||||
def subscribe(&block)
|
||||
@callbacks ||= []
|
||||
@callbacks << block
|
||||
end
|
||||
end # class LogStash::Outputs::Internal
|
||||
|
|
|
@ -2,54 +2,82 @@
|
|||
require 'rubygems'
|
||||
$:.unshift File.dirname(__FILE__) + "/../../../lib"
|
||||
$:.unshift File.dirname(__FILE__) + "/../../"
|
||||
|
||||
require "test/unit"
|
||||
require "tempfile"
|
||||
require "logstash/testcase"
|
||||
require "thread"
|
||||
require "logstash/loadlibs"
|
||||
require "logstash/agent"
|
||||
require "logstash/logging"
|
||||
require "logstash/util"
|
||||
require "socket"
|
||||
|
||||
|
||||
# TODO(sissel): refactor this so we can more easily specify tests.
|
||||
class TestInputFile < LogStash::TestCase
|
||||
def em_setup
|
||||
class TestInputFile < Test::Unit::TestCase
|
||||
def setup
|
||||
@tmpfile = Tempfile.new(self.class.name)
|
||||
@hostname = Socket.gethostname
|
||||
@type = "logstash-test"
|
||||
|
||||
config = {
|
||||
"inputs" => {
|
||||
@type => [
|
||||
"file://#{@tmpfile.path}"
|
||||
],
|
||||
},
|
||||
"outputs" => [
|
||||
"internal:///"
|
||||
]
|
||||
} # config
|
||||
@agent = LogStash::Agent.new
|
||||
config = LogStash::Config::File.new(path=nil, string=<<-CONFIG)
|
||||
input {
|
||||
file {
|
||||
path => "#{@tmpfile.path}"
|
||||
type => "#{@type}"
|
||||
}
|
||||
}
|
||||
|
||||
super(config)
|
||||
end
|
||||
output {
|
||||
internal { }
|
||||
}
|
||||
CONFIG
|
||||
|
||||
waitqueue = Queue.new
|
||||
|
||||
Thread.new do
|
||||
@agent.run_with_config(config) do
|
||||
waitqueue << :ready
|
||||
end
|
||||
end
|
||||
|
||||
# Wait for the agent to be ready.
|
||||
waitqueue.pop
|
||||
@output = @agent.outputs.first
|
||||
end # def setup
|
||||
|
||||
def test_simple
|
||||
data = [ "hello", "world", "hello world 1 2 3 4", "1", "2", "3", "4", "5" ]
|
||||
remaining = data.size
|
||||
EventMachine.run do
|
||||
em_setup
|
||||
expect_data = data.clone
|
||||
@output.subscribe do |event|
|
||||
expect_message = expect_data.shift
|
||||
assert_equal(expect_message, event.message)
|
||||
assert_equal("file://#{@hostname}#{@tmpfile.path}", event.source)
|
||||
assert_equal(@type, event.type, "type")
|
||||
assert_equal([], event.tags, "tags should be empty")
|
||||
expect_data = data.clone
|
||||
|
||||
# Done testing if we run out of data.
|
||||
@agent.stop if expect_data.size == 0
|
||||
end
|
||||
queue = Queue.new
|
||||
@output.subscribe do |event|
|
||||
queue << event
|
||||
end
|
||||
|
||||
# Write to the file periodically
|
||||
timer = EM::PeriodicTimer.new(0.2) do
|
||||
# Write to the file periodically
|
||||
Thread.new do
|
||||
LogStash::Util.set_thread_name("#{__FILE__} - periodic writer")
|
||||
loop do
|
||||
out = data.shift((rand * 3).to_i + 1).join("\n")
|
||||
@tmpfile.puts out
|
||||
@tmpfile.flush
|
||||
timer.cancel if data.length == 0
|
||||
break if data.length == 0
|
||||
end # loop
|
||||
end # timer thread
|
||||
|
||||
loop do
|
||||
event = queue.pop
|
||||
expect_message = expect_data.shift
|
||||
assert_equal(expect_message, event.message)
|
||||
assert_equal("file://#{@hostname}#{@tmpfile.path}", event.source)
|
||||
assert_equal(@type, event.type, "type")
|
||||
assert_equal([], event.tags, "tags should be empty")
|
||||
|
||||
# Done testing if we run out of data.
|
||||
if expect_data.size == 0
|
||||
@agent.stop
|
||||
break
|
||||
end
|
||||
end
|
||||
end # def test_simple
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue