first cut at jruby/threaded logstash

This commit is contained in:
Pete Fritchman 2011-02-13 16:25:22 -08:00 committed by Jordan Sissel
parent 9058b5050c
commit c081a9eec7
10 changed files with 201 additions and 139 deletions

View file

@ -1,9 +1,8 @@
#!/usr/bin/env ruby
#!/usr/bin/env jruby
$: << File.dirname($0) + "/../lib"
require "rubygems"
require "eventmachine"
require "logstash/agent"
require "optparse"
require "yaml"

View file

@ -0,0 +1,13 @@
# Example config that reads parsed logs from AMQP and prints to stdout
inputs:
linux-syslog:
- file:///var/log/messages
filters:
- grep:
linux-syslog:
- match:
@message: test
add_fields:
filter_test: hello world
outputs:
- stdout:///

View file

@ -1,9 +1,13 @@
# Example config that reads parsed logs from AMQP and prints to stdout
inputs:
all:
- amqp://localhost/topic/parsedlogs
#filters:
#field:
#- progname.include?("tester")
linux-syslog:
- file:///var/log/messages
filters:
- grep:
linux-syslog:
- match:
@message: test
add_fields:
filter_test: hello world
outputs:
- stdout:///

View file

@ -1,10 +1,13 @@
require "eventmachine"
require "eventmachine-tail"
require "logstash/filters"
require "logstash/inputs"
require "logstash/logging"
require "logstash/multiqueue"
require "logstash/namespace"
require "logstash/outputs"
require "java"
require "uri"
JThread = java.lang.Thread
# Collect logs, ship them out.
class LogStash::Agent
@ -18,6 +21,7 @@ class LogStash::Agent
log_to(STDERR)
@config = config
@threads = {}
@outputs = []
@inputs = []
@filters = []
@ -25,6 +29,8 @@ class LogStash::Agent
# - list of logs to monitor
# - log config
# - where to ship to
Thread::abort_on_exception = true
end # def initialize
public
@ -32,80 +38,100 @@ class LogStash::Agent
@logger = LogStash::Logger.new(target)
end # def log_to
# Register any event handlers with EventMachine
# Technically, this agent could listen for anything (files, sockets, amqp,
# stomp, etc).
public
def register
# TODO(sissel): warn when no inputs and no outputs are defined.
# TODO(sissel): Refactor this madness into a config lib
if (["inputs", "outputs"] & @config.keys).length == 0
$stderr.puts "No inputs or no outputs configured. This probably isn't what you want."
def run
if @config["inputs"].length == 0 or @config["outputs"].length == 0
raise "Must have both inputs and outputs configured."
end
# Register input and output stuff
inputs = @config["inputs"]
inputs.each do |value|
# If 'url' is an array, then inputs is a hash and the key is the type
if inputs.is_a?(Hash)
type, urls = value
else
raise "config error, no type for url #{urls.inspect}"
end
# XXX we should use a SizedQueue here (w/config params for size)
filter_queue = Queue.new
output_queue = MultiQueue.new
# Register input and output stuff
input_configs = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = [] } }
@config["inputs"].each do |url_type, urls|
# url could be a string or an array.
urls = [urls] if !urls.is_a?(Array)
urls.each do |url|
@logger.debug("Using input #{url} of type #{type}")
input = LogStash::Inputs.from_url(url, type) { |event| receive(event) }
input.logger = @logger
input.register
@inputs << input
urls.each do |url_str|
url = URI.parse(url_str)
input_type = url.scheme
input_configs[input_type][url_type] = url
end
end # each input
if @config.include?("filters")
filters = @config["filters"]
filters.collect { |x| x.to_a[0] }.each do |filter|
name, value = filter
@logger.debug("Using filter #{name} => #{value.inspect}")
filter = LogStash::Filters.from_name(name, value)
filter.logger = @logger
filter.register
@filters << filter
end # each filter
end # if we have filters
input_configs.each do |input_type, config|
if @config.include?("filters")
queue = filter_queue
else
queue = output_queue
end
input = LogStash::Inputs.from_name(input_type, config, queue)
@threads["input/#{input_type}"] = Thread.new do
JThread.currentThread().setName("input/#{input_type}")
input.run
end
end
# Create N filter-worker threads
if @config.include?("filters")
3.times do |n|
@threads["worker/filter/#{n}"] = Thread.new do
JThread.currentThread().setName("worker/filter/#{n}")
puts "top of worker/filter/#{n} thread"
filters = []
@config["filters"].collect { |x| x.to_a[0] }.each do |filter|
name, value = filter
@logger.info("Using filter #{name} => #{value.inspect}")
filter = LogStash::Filters.from_name(name, value)
filter.logger = @logger
filter.register
filters << filter
end
while event = filter_queue.pop
filters.each do |filter|
filter.filter(event)
if event.cancelled?
@logger.debug({:message => "Event cancelled",
:event => event,
:filter => filter.class,
})
break
end
end # filters.each
output_queue.push(event) unless event.cancelled?
end # event pop
end # Thread
end # N.times
end # if @config.include?("filters")
# Create output threads
@config["outputs"].each do |url|
@logger.debug("Using output #{url}")
output = LogStash::Outputs.from_url(url)
output.logger = @logger
output.register
@outputs << output
end # each output
queue = Queue.new
@threads["outputs/#{url}"] = Thread.new do
JThread.currentThread().setName("output:#{url}")
output = LogStash::Outputs.from_url(url)
while event = queue.pop
output.receive(event)
end
end # Thread
output_queue.add_queue(queue)
end
# Register any signal handlers
register_signal_handler
#register_signal_handler
while sleep 5
end
end # def register
public
def run(&block)
EventMachine.run do
self.register
yield if block_given?
end # EventMachine.run
end # def run
public
def stop
# TODO(sissel): Stop inputs, fluch outputs, wait for finish,
# TODO(petef): Stop inputs, fluch outputs, wait for finish,
# then stop the event loop
EventMachine.stop_event_loop
# EventMachine has no default way to indicate a 'stopping' state.
$EVENTMACHINE_STOPPING = true
end # def stop
protected
@ -164,8 +190,7 @@ class LogStash::Agent
end
when :INT
@logger.warn("SIGINT received. Shutting down.")
EventMachine::stop_event_loop
# TODO(sissel): Should have input/output/filter register shutdown
# TODO(petef): Should have input/output/filter register shutdown
# hooks.
end # case msg
end # @sigchannel.subscribe

View file

@ -3,21 +3,11 @@ require "logstash/ruby_fixes"
require "uri"
module LogStash::Inputs
# Given a URL, try to load the class that supports it.
# That is, if we have an input of "foo://blah/" then
# we will try to load logstash/inputs/foo and will
# expect a class LogStash::Inputs::Foo
public
def self.from_url(url, type, &block)
# Assume file paths if we start with "/"
url = "file://#{url}" if url.start_with?("/")
uri = URI.parse(url)
# TODO(sissel): Add error handling
# TODO(sissel): Allow plugin paths
klass = uri.scheme.capitalize
file = uri.scheme.downcase
def self.from_name(type, configs, output_queue)
klass = type.capitalize
file = type.downcase
require "logstash/inputs/#{file}"
LogStash::Inputs.const_get(klass).new(uri, type, &block)
end # def from_url
LogStash::Inputs.const_get(klass).new(configs, output_queue)
end # def from_name
end # module LogStash::Inputs

View file

@ -7,22 +7,24 @@ class LogStash::Inputs::Base
attr_accessor :logger
public
def initialize(url, type, config={}, &block)
def initialize(configs, output_queue)
@logger = LogStash::Logger.new(STDERR)
@url = url
@url = URI.parse(url) if url.is_a? String
@config = config
@callback = block
@type = type
@tags = []
@configs = configs
@output_queue = output_queue
#@url = url
#@url = URI.parse(url) if url.is_a? String
#@config = config
#@callback = block
#@type = type
#@tags = []
@urlopts = {}
if @url.query
@urlopts = CGI.parse(@url.query)
@urlopts.each do |k, v|
@urlopts[k] = v.last if v.is_a?(Array)
end
end
#@urlopts = {}
#if @url.query
# @urlopts = CGI.parse(@url.query)
# @urlopts.each do |k, v|
# @urlopts[k] = v.last if v.is_a?(Array)
# end
#end
end # def initialize
public

View file

@ -1,53 +1,66 @@
require "eventmachine-tail"
require "file/tail"
require "logstash/inputs/base"
require "logstash/namespace"
require "socket" # for Socket.gethostname
class LogStash::Inputs::File < LogStash::Inputs::Base
public
def initialize(url, type, config={}, &block)
def initialize(configs, output_queue)
super
# Hack the hostname into the url.
# This works since file:// urls don't generally have a host in it.
@url.host = Socket.gethostname
@output_queue = output_queue
@file_threads = {}
end # def initialize
public
def register
@logger.info("Registering #{@url}")
EventMachine::FileGlobWatchTail.new(@url.path, Reader, interval=60,
exclude=[], receiver=self)
end # def register
public
def receive(filetail, event)
url = @url.clone
url.path = filetail.path
@logger.debug(["original url", { :originalurl => @url, :newurl => url }])
event = LogStash::Event.new({
"@message" => event,
"@type" => @type,
"@tags" => @tags.clone,
})
event.source = url
@logger.debug(["Got event", event])
@callback.call(event)
end # def receive
private
class Reader < EventMachine::FileTail
def initialize(path, receiver)
super(path)
@receiver = receiver
@buffer = BufferedTokenizer.new # From eventmachine
def run
@configs.each do |type, url|
glob = url.path
if File.exists?(glob)
files = [glob]
else
files = Dir.glob(glob)
end
files.each do |file|
@file_threads[file] = Thread.new do
JThread.currentThread().setName("inputs/file/reader:#{file}")
watch(file, url, type, [type])
end
end
end
def receive_data(data)
# TODO(2.0): Support multiline log data
@buffer.extract(data).each do |line|
@receiver.receive(self, line)
end
end # def receive_data
end # class Reader
# http://jira.codehaus.org/browse/JRUBY-891
# NOTE(petef): IO::Select is broken in jruby, so we start a
# thread per file for now.
while sleep 5
# foo
end
#event = LogStash::Event.new({
# "@message" => event,
# "@type" => @type,
# "@tags" => @tags.clone,
#})
#event.source = url
#@logger.debug(["Got event", event])
#@callback.call(event)
end # def run
private
def watch(file, source_url, type, tags)
File.open(file, "r") do |f|
f.extend(File::Tail)
f.interval = 5
f.backward(0)
f.tail do |line|
e = LogStash::Event.new({
"@message" => line,
"@type" => type,
"@tags" => tags,
})
e.source = source_url.to_s
@output_queue.push(e)
end # f.tail
end # File.open
end
end # class LogStash::Inputs::File

View file

@ -0,0 +1,17 @@
class MultiQueue
def initialize(*queues)
@mutex = Mutex.new
@queues = queues
end
def push(object)
@queues.each { |q| q.push(object) }
end
public
def add_queue(queue)
@mutex.synchronize do
@queues << queue
end
end
end

View file

@ -3,13 +3,13 @@ require "uri"
module LogStash::Outputs
public
def self.from_url(url, &block)
def self.from_url(url)
uri = URI.parse(url)
# TODO(sissel): Add error handling
# TODO(sissel): Allow plugin paths
klass = uri.scheme.capitalize
file = uri.scheme
require "logstash/outputs/#{file}"
LogStash::Outputs.const_get(klass).new(uri, &block)
LogStash::Outputs.const_get(klass).new(uri)
end # def from_url
end # module LogStash::Outputs

View file

@ -8,10 +8,9 @@ class LogStash::Outputs::Base
attr_accessor :logger
public
def initialize(url, config={}, &block)
def initialize(url)
@url = url
@url = URI.parse(url) if url.is_a? String
@config = config
@logger = LogStash::Logger.new(STDOUT)
@urlopts = {}
if @url.query