mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
Merge branch 'logstash-master'
This commit is contained in:
commit
20f8cf8ea0
24 changed files with 555 additions and 140 deletions
29
CHANGELOG
29
CHANGELOG
|
@ -1,9 +1,38 @@
|
|||
1.0.7 ( ????? )
|
||||
- logstash 'web' now allows you to specify the elasticsearch clustername;
|
||||
--backend elasticsearch://[host[:port]]/[clustername]
|
||||
- GELF output now supports dynamic strings for level and facility
|
||||
https://logstash.jira.com/browse/LOGSTASH-83
|
||||
- 'amqp' output supports persistent messages over AMQP, now. Tunable.
|
||||
https://logstash.jira.com/browse/LOGSTASH-81
|
||||
- Redis input and output are now supported. (Contributed by dokipen)
|
||||
- Add shutdown processing. Shutdown sequence will start if SIGINT or SIGTERM
|
||||
are received or if all inputs finish (like stdin). The shutdown sequence
|
||||
always starts the inputs. The sequence progresses using the same pipeline
|
||||
as the inputs/filters/outputs, so all in-flight events should finish
|
||||
getting processed before the final shutdown event makes it's way to the
|
||||
outputs.
|
||||
- Add retries to unhandled input exceptions (LOGSTASH-84)
|
||||
|
||||
1.0.6 (May 11, 2011)
|
||||
* Remove 'sigar' from monolithic jar packaging. This removes a boatload of
|
||||
unnecessary warning messages on startup whenever you use elasticsearch
|
||||
output or logstash-web.
|
||||
Issue: https://logstash.jira.com/browse/LOGSTASH-79
|
||||
|
||||
1.0.5 (May 10, 2011)
|
||||
* fix queues when durable is set to true
|
||||
|
||||
1.0.4 (May 9, 2011)
|
||||
* Fix bugs in syslog input
|
||||
|
||||
1.0.2 (May 8, 2011)
|
||||
* Fix default-value handling for configs when the validation type is
|
||||
'password'
|
||||
|
||||
1.0.1 (May 7, 2011)
|
||||
* Fix password auth for amqp and stomp (Reported by Luke Macken)
|
||||
* Fix default elasticsearch target for logstash-web (Reported by Donald Gordon)
|
||||
|
||||
1.0.0 (May 6, 2011)
|
||||
* First major release.
|
||||
|
|
77
Rakefile
77
Rakefile
|
@ -1,9 +1,8 @@
|
|||
require 'tempfile'
|
||||
require 'ftools' # fails in 1.9.2
|
||||
|
||||
# TODO(sissel): load the gemspec and parse the version from it instead.
|
||||
LOGSTASH_VERSION = "1.0.4"
|
||||
|
||||
require File.join(File.dirname(__FILE__), "VERSION") # For LOGSTASH_VERSION
|
||||
|
||||
# Compile config grammar (ragel -> ruby)
|
||||
file "lib/logstash/config/grammar.rb" => ["lib/logstash/config/grammar.rl"] do
|
||||
sh "make -C lib/logstash/config grammar.rb"
|
||||
|
@ -30,22 +29,35 @@ end
|
|||
|
||||
task :clean do
|
||||
sh "rm -rf .bundle"
|
||||
sh "rm -rf build-jar-thin"
|
||||
sh "rm -rf build-jar"
|
||||
#sh "rm -rf build-jar-thin"
|
||||
#sh "rm -rf build-jar"
|
||||
sh "rm -rf build"
|
||||
sh "rm -rf vendor"
|
||||
end
|
||||
|
||||
task :compile => "lib/logstash/config/grammar.rb" do |t|
|
||||
mkdir_p "build"
|
||||
sh "rm -rf lib/net"
|
||||
target = "build/ruby"
|
||||
mkdir_p target if !File.directory?(target)
|
||||
#sh "rm -rf lib/net"
|
||||
Dir.chdir("lib") do
|
||||
args = Dir.glob("**/*.rb")
|
||||
sh "jrubyc", "-t", "../build", *args
|
||||
rel_target = File.join("..", target)
|
||||
sh "jrubyc", "-t", rel_target, "logstash/runner"
|
||||
files = Dir.glob("**/*.rb")
|
||||
files.each do |file|
|
||||
d = File.join(rel_target, File.dirname(file))
|
||||
mkdir_p d if !File.directory?(d)
|
||||
cp file, File.join(d, File.basename(file))
|
||||
end
|
||||
end
|
||||
|
||||
Dir.chdir("test") do
|
||||
args = Dir.glob("**/*.rb")
|
||||
sh "jrubyc", "-t", "../build", *args
|
||||
rel_target = File.join("..", target)
|
||||
files = Dir.glob("**/*.rb")
|
||||
files.each do |file|
|
||||
d = File.join(rel_target, File.dirname(file))
|
||||
mkdir_p d if !File.directory?(d)
|
||||
cp file, File.join(d, File.basename(file))
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -62,7 +74,7 @@ VERSIONS = {
|
|||
|
||||
namespace :vendor do
|
||||
file "vendor/jar" do |t|
|
||||
mkdir_p t.name
|
||||
mkdir_p t.name if !File.directory?(t.name)
|
||||
end
|
||||
|
||||
# Download jruby.jar
|
||||
|
@ -126,13 +138,20 @@ namespace :package do
|
|||
end # package:monolith:tar
|
||||
|
||||
task :jar => monolith_deps do
|
||||
mkdir_p "build-jar"
|
||||
builddir = "build/monolith-jar"
|
||||
mkdir_p builddir if !File.directory?(builddir)
|
||||
|
||||
# Unpack all the 3rdparty jars and any jars in gems
|
||||
Dir.glob("vendor/{bundle,jar}/**/*.jar").each do |jar|
|
||||
puts "=> Unpacking #{jar} into build-jar/"
|
||||
Dir.chdir("build-jar") do
|
||||
sh "jar xf ../#{jar}"
|
||||
if jar =~ /sigar.*\.jar$/
|
||||
puts "=> Skipping #{jar} (sigar not needed)"
|
||||
next
|
||||
end
|
||||
|
||||
puts "=> Unpacking #{jar} into #{builddir}/"
|
||||
relative_path = File.join(builddir.split(File::SEPARATOR).collect { |a| ".." })
|
||||
Dir.chdir(builddir) do
|
||||
sh "jar xf #{relative_path}/#{jar}"
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -148,12 +167,11 @@ namespace :package do
|
|||
# Purge any extra files we don't need in META-INF (like manifests and
|
||||
# jar signatures)
|
||||
["INDEX.LIST", "MANIFEST.MF", "ECLIPSEF.RSA", "ECLIPSEF.SF"].each do |file|
|
||||
File.delete(File.join("build-jar", "META-INF", file)) rescue nil
|
||||
File.delete(File.join(builddir, "META-INF", file)) rescue nil
|
||||
end
|
||||
#FileUtils.rm_r(File.join("build-jar", "META-INF")) rescue nil
|
||||
|
||||
output = "logstash-#{LOGSTASH_VERSION}-monolithic.jar"
|
||||
sh "jar cfe #{output} logstash.runner -C build-jar ."
|
||||
sh "jar cfe #{output} logstash.runner -C #{builddir} ."
|
||||
|
||||
jar_update_args = []
|
||||
|
||||
|
@ -165,11 +183,11 @@ namespace :package do
|
|||
gem_dirs = %w{bin doc gems specifications}
|
||||
gem_root = File.join(%w{vendor bundle jruby 1.8})
|
||||
# for each dir, build args: -C vendor/bundle/jruby/1.8 bin, etc
|
||||
gem_jar_args = gem_dirs.collect { |dir| ["-C", gem_root, dir ] }.flatten
|
||||
gem_jar_args = gem_dirs.collect { |d| ["-C", gem_root, d ] }.flatten
|
||||
jar_update_args += gem_jar_args
|
||||
|
||||
# Add compiled our compiled ruby code
|
||||
jar_update_args += %w{ -C build . }
|
||||
jar_update_args += %w{ -C build/ruby . }
|
||||
|
||||
# Add web stuff
|
||||
jar_update_args += %w{ -C lib logstash/web/public }
|
||||
|
@ -188,14 +206,15 @@ namespace :package do
|
|||
end # namespace monolith
|
||||
|
||||
task :jar => [ "vendor:jruby", "vendor:gems", "compile" ] do
|
||||
builddir = "build-jar-thin"
|
||||
mkdir_p builddir
|
||||
builddir = "build/thin-jar"
|
||||
mkdir_p builddir if !File.directory?(builddir)
|
||||
|
||||
# Unpack jruby
|
||||
relative_path = File.join(builddir.split(File::SEPARATOR).collect { |a| ".." })
|
||||
Dir.glob("vendor/jar/jruby-complete-1.6.0.jar").each do |jar|
|
||||
puts "=> Unpacking #{jar} into #{builddir}/"
|
||||
Dir.chdir(builddir) do
|
||||
sh "jar xf ../#{jar}"
|
||||
sh "jar xf #{relative_path}/#{jar}"
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -258,9 +277,13 @@ task :doccopy => [:require_output_env] do
|
|||
|
||||
Dir.glob("docs/**/*").each do |doc|
|
||||
dir = File.join(ENV["output"], File.dirname(doc).gsub(/docs\/?/, ""))
|
||||
mkdir_p dir
|
||||
puts "Copy #{doc} => #{dir}"
|
||||
cp(doc, dir)
|
||||
mkdir_p dir if !File.directory?(dir)
|
||||
if File.directory?(doc)
|
||||
mkdir_p doc
|
||||
else
|
||||
puts "Copy #{doc} => #{dir}"
|
||||
cp(doc, dir)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
|
1
VERSION.rb
Normal file
1
VERSION.rb
Normal file
|
@ -0,0 +1 @@
|
|||
LOGSTASH_VERSION = "1.0.6"
|
117
docs/extending/example-add-a-new-filter.md
Normal file
117
docs/extending/example-add-a-new-filter.md
Normal file
|
@ -0,0 +1,117 @@
|
|||
---
|
||||
title: How to extend - logstash
|
||||
layout: content_right
|
||||
---
|
||||
# Add a new filter
|
||||
|
||||
This document shows you how to add a new filter to logstash.
|
||||
|
||||
For a general overview of how to add a new plugin, see [the extending
|
||||
logstash](.) overview.
|
||||
|
||||
## Write code.
|
||||
|
||||
Let's write a 'hello world' filter. This filter will replace the 'message' in
|
||||
the event with "Hello world!"
|
||||
|
||||
First, logstash expects plugins in a certain directory structure: logstash/TYPE/PLUGIN_NAME.rb
|
||||
|
||||
Since we're creating a filter, let's mkdir this:
|
||||
|
||||
mkdir -p logstash/filters/
|
||||
cd logstash/filters
|
||||
|
||||
Now add the code:
|
||||
|
||||
# Call this file 'foo.rb' (in logstash/filters, as above)
|
||||
require "logstash/filters/base"
|
||||
require "logstash/namespace"
|
||||
|
||||
class LogStash::Filters::Foo < LogStash::Filters::Base
|
||||
|
||||
# Setting the config_name here is required. This is how you
|
||||
# configure this filter from your logstash config.
|
||||
#
|
||||
# filter {
|
||||
# foo { ... }
|
||||
# }
|
||||
config_name "foo"
|
||||
|
||||
# Replace the message with this value.
|
||||
config :message, :validate => :string
|
||||
|
||||
public
|
||||
def register
|
||||
# nothing to do
|
||||
end # def register
|
||||
|
||||
public
|
||||
def filter(event)
|
||||
if @message
|
||||
# Replace the event message with our message as configured in the
|
||||
# config file.
|
||||
# If no message is specified, do nothing.
|
||||
event.message = @message
|
||||
end
|
||||
end # def filter
|
||||
end # class LogStash::Filters::Foo
|
||||
|
||||
## Add it to your configuration
|
||||
|
||||
For this simple example, let's just use stdin input and stdout output.
|
||||
The config file looks like this:
|
||||
|
||||
input {
|
||||
stdin { type => "foo" }
|
||||
}
|
||||
filter {
|
||||
foo {
|
||||
type => "foo"
|
||||
message => "Hello world!"
|
||||
}
|
||||
}
|
||||
output {
|
||||
stdout { }
|
||||
}
|
||||
|
||||
Call this file 'example.conf'
|
||||
|
||||
## Tell logstash about it.
|
||||
|
||||
Depending on how you installed logstash, you have a few ways of including this
|
||||
plugin.
|
||||
|
||||
You can use the agent flag --pluginpath flag to specify where the root of your
|
||||
plugin tree is. In our case, it's the current directory.
|
||||
|
||||
% logstash --pluginpath . -f example.conf
|
||||
|
||||
If you use the monolith jar release of logstash, you have an additional option
|
||||
- you can include the plugin right in the jar file.
|
||||
|
||||
% jar -uf jar -uf logstash-1.0.5-monolithic.jar logstash/filters/foo.rb
|
||||
|
||||
# Verify it's in the right location in the jar!
|
||||
% jar tf logstash-1.0.5-monolithic.jar | grep foo.rb
|
||||
logstash/filters/foo.rb
|
||||
|
||||
% java -jar logstash-1.0.5-monolithic.jar agent -f example.conf
|
||||
|
||||
## Example running
|
||||
|
||||
In the example below, I typed in "the quick brown fox" after running the java
|
||||
command.
|
||||
|
||||
% java -jar logstash-1.0.5-monolithic.jar agent -f example.conf
|
||||
the quick brown fox
|
||||
2011-05-12T01:05:09.495000Z stdin://snack.home/: Hello world!
|
||||
|
||||
The output is the standard logstash stdout output, but in this case our "the
|
||||
quick brown fox" message was replaced with "Hello world!"
|
||||
|
||||
All done! :)
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -6,8 +6,6 @@ layout: content_right
|
|||
|
||||
You can add your own input, output, or filter plugins to logstash.
|
||||
|
||||
DOCS - TBD
|
||||
|
||||
If you're looking to extend logstash today, please look at the existing plugins.
|
||||
|
||||
Good examples include:
|
||||
|
@ -38,3 +36,9 @@ Outputs have two methods: register and receive.
|
|||
|
||||
* 'register' is called per plugin instantiation. Do any of your initialization here.
|
||||
* 'receive' is called when an event gets pushed to your output
|
||||
|
||||
## Example: new filter
|
||||
|
||||
Learn by example how to [add a new filter to logstash](example-add-a-new-filter)
|
||||
|
||||
|
|
@ -33,17 +33,26 @@ CLASSPATH environment variable to include any elasticsearch jar files.
|
|||
Using this method to download logstash will install all ruby dependencies.
|
||||
|
||||
* You must have jruby already
|
||||
* If you use elasticsearch, you'll have to it and its jars add that to the java
|
||||
classpath. ( See below for web interface notes
|
||||
* If you use grok, you'll need libgrok installed.
|
||||
|
||||
### web interface
|
||||
|
||||
* You have elasticsearch already
|
||||
* You'll need to know the path to your elasticsearch lib directory.
|
||||
|
||||
% CLASSPATH=elasticsearch-0.16.0/lib/*.jar logstash-web
|
||||
>> Thin web server (v1.2.7 codename No Hup)
|
||||
>> Maximum connections set to 1024
|
||||
>> Listening on 0.0.0.0:9292, CTRL+C to stop
|
||||
% CLASSPATH=$(ls /opt/elasticsearch/lib/*.jar | tr '\n' ':') logstash-web
|
||||
Thin web server (v1.2.7 codename No Hup)
|
||||
Maximum connections set to 1024
|
||||
Listening on 0.0.0.0:9292, CTRL+C to stop
|
||||
|
||||
For the above, replace '/opt/elasticsearch/lib' with wherever you downloaded
|
||||
and unpacked elasticsearch.
|
||||
|
||||
### agent
|
||||
|
||||
% logstash -f youragent.conf
|
||||
|
||||
# Or if you need elasticsearch:
|
||||
% CLASSPATH=$(ls /opt/elasticsearch/lib/*.jar | tr '\n' ':') logstash -f youragent.conf
|
||||
|
|
|
@ -35,8 +35,8 @@ for such things, that works for me, too.)
|
|||
|
||||
logstash releases come in a few flavors.
|
||||
|
||||
* [Monolithic jar](http://semicomplete.com/files/logstash/logstash-1.0.4-monolithic.jar)
|
||||
* [rubygem](https://github.com/downloads/logstash/releases/logstash-1.0.4.gem)
|
||||
* [Monolithic jar](http://semicomplete.com/files/logstash/logstash-1.0.6-monolithic.jar)
|
||||
* [rubygem](https://github.com/downloads/logstash/releases/logstash-1.0.6.gem)
|
||||
* [`gem install logstash`](http://rubygems.org/gems/logstash)
|
||||
|
||||
## What's next?
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
require "java"
|
||||
require "logstash/config/file"
|
||||
require "logstash/filters"
|
||||
require "logstash/filterworker"
|
||||
require "logstash/inputs"
|
||||
require "logstash/logging"
|
||||
require "logstash/multiqueue"
|
||||
|
@ -11,6 +12,7 @@ require "logstash/namespace"
|
|||
require "logstash/outputs"
|
||||
require "logstash/util"
|
||||
require "optparse"
|
||||
require "thread"
|
||||
require "uri"
|
||||
|
||||
# TODO(sissel): only enable this if we are in debug mode.
|
||||
|
@ -38,7 +40,8 @@ class LogStash::Agent
|
|||
@verbose = 0
|
||||
@daemonize = false
|
||||
|
||||
@threads = {}
|
||||
@plugins = {}
|
||||
@plugins_mutex = Mutex.new
|
||||
@outputs = []
|
||||
@inputs = []
|
||||
@filters = []
|
||||
|
@ -51,6 +54,7 @@ class LogStash::Agent
|
|||
# TODO(sissel): Other default plugin paths?
|
||||
|
||||
Thread::abort_on_exception = true
|
||||
@is_shutting_down = false
|
||||
end # def initialize
|
||||
|
||||
public
|
||||
|
@ -113,7 +117,7 @@ class LogStash::Agent
|
|||
# These are 'unknown' flags that begin --<plugin>-flag
|
||||
# Put any plugin paths into the ruby library path for requiring later.
|
||||
@plugin_paths.each do |p|
|
||||
@logger.info "Adding #{p.inspect} to ruby load path"
|
||||
@logger.debug("Adding #{p.inspect} to ruby load path")
|
||||
$:.unshift p
|
||||
end
|
||||
|
||||
|
@ -145,7 +149,7 @@ class LogStash::Agent
|
|||
# and add any options to our option parser.
|
||||
klass_name = name.capitalize
|
||||
if c.const_defined?(klass_name)
|
||||
@logger.info("Found plugin class #{c}::#{klass_name})")
|
||||
@logger.debug("Found plugin class #{c}::#{klass_name})")
|
||||
klass = c.const_get(klass_name)
|
||||
# See LogStash::Config::Mixin::DSL#options
|
||||
klass.options(@opts)
|
||||
|
@ -215,6 +219,7 @@ class LogStash::Agent
|
|||
public
|
||||
def run(&block)
|
||||
LogStash::Util::set_thread_name(self.class.name)
|
||||
register_signal_handlers
|
||||
|
||||
ok = parse_options
|
||||
if !ok
|
||||
|
@ -261,52 +266,34 @@ class LogStash::Agent
|
|||
end
|
||||
|
||||
# NOTE(petef) we should use a SizedQueue here (w/config params for size)
|
||||
#filter_queue = Queue.new
|
||||
filter_queue = SizedQueue.new(10)
|
||||
output_queue = LogStash::MultiQueue.new
|
||||
|
||||
ready_queue = Queue.new
|
||||
@ready_queue = Queue.new
|
||||
|
||||
# inputs should write directly to output queue if there are no filters.
|
||||
input_target = @filters.length > 0 ? filter_queue : output_queue
|
||||
# Start inputs
|
||||
@inputs.each do |input|
|
||||
@logger.info(["Starting input", input])
|
||||
@threads[input] = Thread.new(input_target) do |input_target|
|
||||
LogStash::Util::set_thread_name("input|#{input.inspect}")
|
||||
input.logger = @logger
|
||||
input.register
|
||||
ready_queue << input
|
||||
input.run(input_target)
|
||||
end # new thread for thsi input
|
||||
@logger.debug(["Starting input", input])
|
||||
@plugins[input] = Thread.new(input, input_target) do |*args|
|
||||
run_input(*args)
|
||||
end
|
||||
end # @inputs.each
|
||||
|
||||
# Create N filter-worker threads
|
||||
if @filters.length > 0
|
||||
1.times do |n|
|
||||
@logger.info("Starting filter worker thread #{n}")
|
||||
@threads["filter|worker|#{n}"] = Thread.new do
|
||||
LogStash::Util::set_thread_name("filter|worker|#{n}")
|
||||
@filters.each do |filter|
|
||||
filter.logger = @logger
|
||||
filter.register
|
||||
# TODO(sissel): facter this out into a 'filterworker' that accepts
|
||||
# 'shutdown'
|
||||
# Start a filter worker
|
||||
filterworker = LogStash::FilterWorker.new(@filters, filter_queue,
|
||||
output_queue)
|
||||
filterworker.logger = @logger
|
||||
@plugins[filterworker] = \
|
||||
Thread.new(filterworker, n, output_queue) do |*args|
|
||||
run_filter(*args)
|
||||
end
|
||||
|
||||
while event = filter_queue.pop
|
||||
filters.each do |filter|
|
||||
filter.filter(event)
|
||||
if event.cancelled?
|
||||
@logger.debug({:message => "Event cancelled",
|
||||
:event => event,
|
||||
:filter => filter.class,
|
||||
})
|
||||
break
|
||||
end
|
||||
end # filters.each
|
||||
|
||||
@logger.debug(["Event finished filtering", event])
|
||||
output_queue.push(event) unless event.cancelled?
|
||||
end # event pop
|
||||
end # Thread.new
|
||||
end # N.times
|
||||
end # if @filters.length > 0
|
||||
|
||||
|
@ -315,30 +302,14 @@ class LogStash::Agent
|
|||
@outputs.each do |output|
|
||||
queue = SizedQueue.new(10)
|
||||
output_queue.add_queue(queue)
|
||||
@threads["outputs/#{output.to_s}"] = Thread.new(queue) do |queue|
|
||||
output.register
|
||||
ready_queue << output
|
||||
begin
|
||||
LogStash::Util::set_thread_name("output/#{output.to_s}")
|
||||
output.logger = @logger
|
||||
|
||||
while event = queue.pop do
|
||||
@logger.debug("Sending event to #{output.to_s}")
|
||||
output.receive(event)
|
||||
end
|
||||
rescue Exception => e
|
||||
@logger.warn(["Output #{output.to_s} thread exception", e])
|
||||
@logger.debug(["Output #{output.to_s} thread exception backtrace",
|
||||
e.backtrace])
|
||||
# TODO(sissel): should we abort after too many failures?
|
||||
retry
|
||||
end # begin/rescue
|
||||
end # Thread.new
|
||||
@plugins[output] = Thread.new(output, queue) do |*args|
|
||||
run_output(*args)
|
||||
end
|
||||
end # @outputs.each
|
||||
|
||||
# Wait for all inputs and outputs to be registered.
|
||||
wait_count = outputs.size + inputs.size
|
||||
while wait_count > 0 and ready_queue.pop
|
||||
while wait_count > 0 and @ready_queue.pop
|
||||
wait_count -= 1
|
||||
end
|
||||
|
||||
|
@ -357,6 +328,7 @@ class LogStash::Agent
|
|||
# then stop the event loop
|
||||
end # def stop
|
||||
|
||||
# TODO(sissel): Is this method even used anymore?
|
||||
protected
|
||||
def filter(event)
|
||||
@filters.each do |f|
|
||||
|
@ -365,14 +337,16 @@ class LogStash::Agent
|
|||
end
|
||||
end # def filter
|
||||
|
||||
# TODO(sissel): Is this method even used anymore?
|
||||
protected
|
||||
def output(event)
|
||||
# TODO(sissel): write to a multiqueue and do 1 thread per output?
|
||||
@outputs.each do |o|
|
||||
o.receive(event)
|
||||
o.handle(event)
|
||||
end # each output
|
||||
end # def output
|
||||
|
||||
# TODO(sissel): Is this method even used anymore?
|
||||
protected
|
||||
# Process a message
|
||||
def receive(event)
|
||||
|
@ -383,29 +357,153 @@ class LogStash::Agent
|
|||
end
|
||||
end # def input
|
||||
|
||||
# Shutdown the agent.
|
||||
protected
|
||||
def shutdown
|
||||
return if @is_shutting_down
|
||||
|
||||
@is_shutting_down = true
|
||||
Thread.new do
|
||||
LogStash::Util::set_thread_name("logstash shutdown process")
|
||||
|
||||
finished_queue = Queue.new
|
||||
# Tell everything to shutdown.
|
||||
@plugins.each do |plugin, thread|
|
||||
plugin.shutdown(finished_queue)
|
||||
end
|
||||
|
||||
# Now wait until the queues we were given are empty.
|
||||
#@logger.debug(@plugins)
|
||||
loop do
|
||||
@logger.debug("Waiting for plugins to finish.")
|
||||
remaining = @plugins.select { |plugin, thread| plugin.running? }
|
||||
break if remaining.size == 0
|
||||
|
||||
plugin = finished_queue.pop
|
||||
@logger.debug("#{plugin.to_s} finished, waiting on #{remaining.size} plugins")
|
||||
end # loop
|
||||
|
||||
# When we get here, all inputs have finished, all messages are done
|
||||
@logger.info("Shutdown complete")
|
||||
java.lang.System.exit(0)
|
||||
end
|
||||
end # def shutdown
|
||||
|
||||
public
|
||||
def register_signal_handler
|
||||
def register_signal_handlers
|
||||
# TODO(sissel): This doesn't work well in jruby since ObjectSpace is disabled
|
||||
# by default.
|
||||
Signal.trap("USR2") do
|
||||
#Signal.trap("USR2") do
|
||||
# TODO(sissel): Make this a function.
|
||||
#counts = Hash.new { |h,k| h[k] = 0 }
|
||||
#ObjectSpace.each_object do |obj|
|
||||
#counts[obj.class] += 1
|
||||
#end
|
||||
|
||||
@logger.info("SIGUSR1 received. Dumping state")
|
||||
@logger.info("#{self.class.name} config")
|
||||
@logger.info([" Inputs:", @inputs])
|
||||
@logger.info([" Filters:", @filters])
|
||||
@logger.info([" Outputs:", @outputs])
|
||||
#@logger.info("SIGUSR1 received. Dumping state")
|
||||
#@logger.info("#{self.class.name} config")
|
||||
#@logger.info([" Inputs:", @inputs])
|
||||
#@logger.info([" Filters:", @filters])
|
||||
##@logger.info([" Outputs:", @outputs])
|
||||
|
||||
#@logger.info("Dumping counts of objects by class")
|
||||
#counts.sort { |a,b| a[1] <=> b[1] or a[0] <=> b[0] }.each do |key, value|
|
||||
#@logger.info("Class: [#{value}] #{key}")
|
||||
#end
|
||||
end # SIGUSR1
|
||||
##end
|
||||
#end # SIGUSR1
|
||||
|
||||
Signal.trap("INT") do
|
||||
shutdown
|
||||
end
|
||||
|
||||
Signal.trap("TERM") do
|
||||
shutdown
|
||||
end
|
||||
end # def register_signal_handler
|
||||
|
||||
private
|
||||
def run_input(input, queue)
|
||||
LogStash::Util::set_thread_name("input|#{input.to_s}")
|
||||
input.logger = @logger
|
||||
input.register
|
||||
|
||||
@ready_queue << input
|
||||
done = false
|
||||
|
||||
while !done
|
||||
begin
|
||||
input.run(queue)
|
||||
done = true
|
||||
rescue => e
|
||||
@logger.warn(["Input #{input.to_s} thread exception", e])
|
||||
@logger.debug(["Input #{input.to_s} thread exception backtrace",
|
||||
e.backtrace])
|
||||
@logger.error("Restarting input #{input.to_s} due to exception")
|
||||
retry # This jumps to the top of this proc (to the start of 'do'
|
||||
end
|
||||
end
|
||||
|
||||
# If we get here, the plugin finished, check if we need to shutdown.
|
||||
shutdown_if_none_running(LogStash::Inputs::Base, queue)
|
||||
end # def run_input
|
||||
|
||||
# Run a filter thread
|
||||
public
|
||||
def run_filter(filterworker, index, output_queue)
|
||||
LogStash::Util::set_thread_name("filter|worker|#{index}")
|
||||
filterworker.run
|
||||
|
||||
# If we get here, the plugin finished, check if we need to shutdown.
|
||||
shutdown_if_none_running(LogStash::FilterWorker, output_queue)
|
||||
end # def run_filter
|
||||
|
||||
# TODO(sissel): Factor this into an 'outputworker'
|
||||
def run_output(output, queue)
|
||||
LogStash::Util::set_thread_name("output|#{output.to_s}")
|
||||
output.register
|
||||
output.logger = @logger
|
||||
@ready_queue << output
|
||||
|
||||
# TODO(sissel): We need a 'reset' or 'restart' method to call on errors
|
||||
|
||||
begin
|
||||
while event = queue.pop do
|
||||
@logger.debug("Sending event to #{output.to_s}")
|
||||
output.handle(event)
|
||||
end
|
||||
rescue Exception => e
|
||||
@logger.warn(["Output #{output.to_s} thread exception", e])
|
||||
@logger.debug(["Output #{output.to_s} thread exception backtrace",
|
||||
e.backtrace])
|
||||
# TODO(sissel): should we abort after too many failures?
|
||||
retry
|
||||
end # begin/rescue
|
||||
|
||||
# If we get here, the plugin finished, check if we need to shutdown.
|
||||
shutdown_if_none_running(LogStash::Outputs::Base)
|
||||
end # def run_output
|
||||
|
||||
def shutdown_if_none_running(pluginclass, queue=nil)
|
||||
# Send shutdown signal if all inputs are done.
|
||||
@plugins_mutex.synchronize do
|
||||
|
||||
# Look for plugins of type 'pluginclass' (or a subclass)
|
||||
# If none are running, start the shutdown sequence and
|
||||
# send the 'shutdown' event down the pipeline.
|
||||
remaining = @plugins.count do |plugin, thread|
|
||||
plugin.is_a?(pluginclass) and plugin.running?
|
||||
end
|
||||
@logger.debug("#{pluginclass} still running: #{remaining}")
|
||||
|
||||
if remaining == 0
|
||||
@logger.debug("All #{pluginclass} finished. Shutting down.")
|
||||
|
||||
# Send 'shutdown' to the filters.
|
||||
queue << LogStash::SHUTDOWN if !queue.nil?
|
||||
shutdown
|
||||
end # if remaining == 0
|
||||
end # @plugins_mutex.synchronize
|
||||
end # def shutdown_if_none_running
|
||||
end # class LogStash::Agent
|
||||
|
||||
if __FILE__ == $0
|
||||
|
|
|
@ -53,7 +53,7 @@ module LogStash::Config::Mixin
|
|||
next if params.include?(name.to_s)
|
||||
if opts.include?(:default) and (name.is_a?(Symbol) or name.is_a?(String))
|
||||
if opts[:validate] == :password
|
||||
@logger.info("Converting default value in #{self.class.name} (#{name}) to password object")
|
||||
@logger.debug("Converting default value in #{self.class.name} (#{name}) to password object")
|
||||
params[name.to_s] = ::LogStash::Util::Password.new(opts[:default])
|
||||
else
|
||||
params[name.to_s] = opts[:default]
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
require "logstash/namespace"
|
||||
require "logstash/logging"
|
||||
require "logstash/plugin"
|
||||
require "logstash/config/mixin"
|
||||
|
||||
class LogStash::Filters::Base
|
||||
class LogStash::Filters::Base < LogStash::Plugin
|
||||
include LogStash::Config::Mixin
|
||||
|
||||
attr_accessor :logger
|
||||
|
|
|
@ -31,8 +31,8 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
|||
# The vhost to use
|
||||
config :vhost, :validate => :string, :default => "/"
|
||||
|
||||
# Is this exchange durable?
|
||||
config :durable, :validate => :boolean, :default => false
|
||||
# Is this exchange durable? (aka; Should it survive a broker restart?)
|
||||
config :durable, :validate => :boolean, :default => true
|
||||
|
||||
# Enable or disable debugging
|
||||
config :debug, :validate => :boolean, :default => false
|
||||
|
@ -68,7 +68,7 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
|||
begin
|
||||
@bunny.start
|
||||
|
||||
@queue = @bunny.queue(@name)
|
||||
@queue = @bunny.queue(@name, :durable => @durable)
|
||||
exchange = @bunny.exchange(@name, :type => @exchange_type.to_sym, :durable => @durable)
|
||||
@queue.bind(exchange)
|
||||
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
require "logstash/namespace"
|
||||
require "logstash/event"
|
||||
require "logstash/plugin"
|
||||
require "logstash/logging"
|
||||
require "logstash/config/mixin"
|
||||
|
||||
class LogStash::Inputs::Base
|
||||
class LogStash::Inputs::Base < LogStash::Plugin
|
||||
include LogStash::Config::Mixin
|
||||
attr_accessor :logger
|
||||
|
||||
|
|
|
@ -11,19 +11,19 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
# Name is used for logging in case there are multiple instances.
|
||||
config :name, :validate => :string, :default => "default"
|
||||
|
||||
# The hostname of your redis server. Default hostname is 127.0.0.1.
|
||||
config :host, :validate => :string
|
||||
# The hostname of your redis server.
|
||||
config :host, :validate => :string, :default => "127.0.0.1"
|
||||
|
||||
# The port to connect on. The default port is 6379.
|
||||
config :port, :validate => :number
|
||||
# The port to connect on.
|
||||
config :port, :validate => :number, :default => 6379
|
||||
|
||||
# The redis database number. Db is 0 by default.
|
||||
config :db, :validate => :number
|
||||
# The redis database number.
|
||||
config :db, :validate => :number, :default => 0
|
||||
|
||||
# Initial connection timeout in seconds. Default timeout is 5 seconds.
|
||||
config :timeout, :validate => :number
|
||||
# Initial connection timeout in seconds.
|
||||
config :timeout, :validate => :number, :default => 5
|
||||
|
||||
# Password to authenticate with. There is no authentication by default.
|
||||
# Password to authenticate with. There is no authentication by default.
|
||||
config :password, :validate => :password
|
||||
|
||||
# The name of a redis list (we'll use BLPOP against this). Dynamic names are
|
||||
|
|
|
@ -18,7 +18,13 @@ class LogStash::Inputs::Stdin < LogStash::Inputs::Base
|
|||
def run(queue)
|
||||
loop do
|
||||
event = LogStash::Event.new
|
||||
event.message = $stdin.readline.chomp
|
||||
begin
|
||||
event.message = $stdin.readline.chomp
|
||||
rescue *[EOFError, IOError] => e
|
||||
@logger.info("Got EOF from stdin input. Ending")
|
||||
finished
|
||||
return
|
||||
end
|
||||
event.type = @type
|
||||
event.tags = @tags.clone rescue []
|
||||
event.source = "stdin://#{@host}/"
|
||||
|
@ -26,4 +32,9 @@ class LogStash::Inputs::Stdin < LogStash::Inputs::Base
|
|||
queue << event
|
||||
end # loop
|
||||
end # def run
|
||||
|
||||
public
|
||||
def teardown
|
||||
$stdin.close
|
||||
end # def teardown
|
||||
end # class LogStash::Inputs::Stdin
|
||||
|
|
|
@ -8,4 +8,6 @@ module LogStash
|
|||
module Config; end
|
||||
module File; end
|
||||
module Web; end
|
||||
|
||||
SHUTDOWN = :shutdown
|
||||
end # module LogStash
|
||||
|
|
|
@ -27,8 +27,12 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
|||
# The vhost to use
|
||||
config :vhost, :validate => :string, :default => "/"
|
||||
|
||||
# Is this exchange durable?
|
||||
config :durable, :validate => :boolean, :default => false
|
||||
# Is this exchange durable? (aka; Should it survive a broker restart?)
|
||||
config :durable, :validate => :boolean, :default => true
|
||||
|
||||
# Should messages persist to disk on the AMQP broker until they are read by a
|
||||
# consumer?
|
||||
config :persistent, :validate => :boolean, :default => true
|
||||
|
||||
# Enable or disable debugging
|
||||
config :debug, :validate => :boolean, :default => false
|
||||
|
@ -73,7 +77,7 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
|||
loop do
|
||||
@logger.debug(["Sending event", { :destination => to_s, :event => event }])
|
||||
begin
|
||||
@target.publish(event.to_json)
|
||||
@target.publish(event.to_json, :persistent => @persistent)
|
||||
break;
|
||||
rescue *[Bunny::ServerDownError, Errno::ECONNRESET] => e
|
||||
@logger.error("AMQP connection error, will reconnect: #{e}")
|
||||
|
@ -93,4 +97,9 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
|||
def to_s
|
||||
return "amqp://#{@user}@#{@host}:#{@port}#{@vhost}/#{@exchange_type}/#{@name}"
|
||||
end
|
||||
|
||||
public
|
||||
def teardown
|
||||
@bunny.close_connection
|
||||
end # def teardown
|
||||
end # class LogStash::Outputs::Amqp
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
require "cgi"
|
||||
require "logstash/event"
|
||||
require "logstash/logging"
|
||||
require "logstash/plugin"
|
||||
require "logstash/namespace"
|
||||
require "logstash/config/mixin"
|
||||
require "uri"
|
||||
|
||||
class LogStash::Outputs::Base
|
||||
class LogStash::Outputs::Base < LogStash::Plugin
|
||||
include LogStash::Config::Mixin
|
||||
|
||||
attr_accessor :logger
|
||||
|
@ -27,4 +28,14 @@ class LogStash::Outputs::Base
|
|||
def receive(event)
|
||||
raise "#{self.class}#receive must be overidden"
|
||||
end # def receive
|
||||
|
||||
public
|
||||
def handle(event)
|
||||
if event == LogStash::SHUTDOWN
|
||||
finished
|
||||
return
|
||||
end
|
||||
|
||||
receive(event)
|
||||
end # def handle
|
||||
end # class LogStash::Outputs::Base
|
||||
|
|
|
@ -18,36 +18,66 @@ class LogStash::Outputs::Gelf < LogStash::Outputs::Base
|
|||
# The GELF chunksize
|
||||
config :chunksize, :validate => :number, :default => 1420
|
||||
|
||||
# The GELF message level
|
||||
config :level, :validate => :number, :default => 1
|
||||
# The GELF message level. Dynamic values like %{level} are permitted here;
|
||||
# useful if you want to parse the 'log level' from an event and use that
|
||||
# as the gelf level/severity.
|
||||
#
|
||||
# Values here can be integers [0..7] inclusive or any of
|
||||
# "debug", "info", "warn", "error", "fatal", "unknown" (case insensitive).
|
||||
# Single-character versions of these are also valid, "d", "i", "w", "e", "f",
|
||||
# "u"
|
||||
config :level, :validate => :string, :default => "INFO"
|
||||
|
||||
# The GELF facility.
|
||||
# The GELF facility. Dynamic values like %{foo} are permitted here; this
|
||||
# is useful if you need to use a value from the event as the facility name.
|
||||
config :facility, :validate => :string, :default => "logstash-gelf"
|
||||
|
||||
public
|
||||
def register
|
||||
require "gelf" # rubygem 'gelf'
|
||||
option_hash = Hash.new
|
||||
option_hash['level'] = @level
|
||||
option_hash['facility'] = @facility
|
||||
#option_hash['level'] = @level
|
||||
#option_hash['facility'] = @facility
|
||||
|
||||
@gelf = GELF::Notifier.new(@host, @port, @chunksize, option_hash)
|
||||
#@gelf = GELF::Notifier.new(@host, @port, @chunksize, option_hash)
|
||||
@gelf = GELF::Notifier.new(@host, @port, @chunksize)
|
||||
|
||||
# This sets the 'log level' of gelf; since we're forwarding messages, we'll
|
||||
# want to forward *all* messages, so set level to 0 so all messages get
|
||||
# shipped
|
||||
@gelf.level = 0
|
||||
|
||||
@level_map = {
|
||||
"debug" => 7, "d" => 7,
|
||||
"info" => 6, "i" => 6,
|
||||
"warn" => 5, "w" => 5,
|
||||
"error" => 4, "e" => 4,
|
||||
"fatal" => 3, "f" => 3,
|
||||
"unknown" => 1, "u" => 1,
|
||||
}
|
||||
end # def register
|
||||
|
||||
public
|
||||
def receive(event)
|
||||
# We have to make our own hash here because GELF expects a hash
|
||||
# with a specific format.
|
||||
m = Hash.new
|
||||
m["short_message"] = (event.fields["message"] or event.message)
|
||||
m["full_message"] = (event.message)
|
||||
m["host"] = event["@source_host"]
|
||||
m["file"] = event["@source_path"]
|
||||
m["level"] = 1
|
||||
|
||||
event.fields.each do |name, value|
|
||||
next if value == nil or value.empty?
|
||||
m["#{name}"] = value
|
||||
end
|
||||
|
||||
# Allow 'INFO' 'I' or number. for 'level'
|
||||
level = event.sprintf(@level.to_s)
|
||||
m["level"] = (@level_map[level.downcase] || level).to_i
|
||||
m["facility"] = event.sprintf(@facility)
|
||||
m["timestamp"] = event.timestamp
|
||||
|
||||
@gelf.notify!(m)
|
||||
end # def receive
|
||||
end # class LogStash::Outputs::Gelf
|
||||
|
|
|
@ -11,18 +11,17 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
|||
# Name is used for logging in case there are multiple instances.
|
||||
config :name, :validate => :string, :default => 'default'
|
||||
|
||||
# The hostname of your redis server. Hostname is 127.0.0.1 by default.
|
||||
config :host, :validate => :string
|
||||
# The hostname of your redis server.
|
||||
config :host, :validate => :string, :default => "127.0.0.1"
|
||||
|
||||
# The port to connect on. Port is 6379 by default.
|
||||
config :port, :validate => :number
|
||||
# The port to connect on.
|
||||
config :port, :validate => :number, :default => 6379
|
||||
|
||||
# The redis database number. Db is 0 by default.
|
||||
config :db, :validate => :number
|
||||
# The redis database number.
|
||||
config :db, :validate => :number, :default => 0
|
||||
|
||||
# Redis initial connection timeout in seconds. Timeout is 5 seconds by
|
||||
# default.
|
||||
config :timeout, :validate => :number
|
||||
# Redis initial connection timeout in seconds.
|
||||
config :timeout, :validate => :number, :default => 5
|
||||
|
||||
# Password to authenticate with. There is no authentication by default.
|
||||
config :password, :validate => :password
|
||||
|
|
|
@ -28,6 +28,11 @@ class LogStash::Outputs::Stdout < LogStash::Outputs::Base
|
|||
|
||||
public
|
||||
def receive(event)
|
||||
if event == LogStash::SHUTDOWN
|
||||
finished
|
||||
return
|
||||
end
|
||||
|
||||
if @debug
|
||||
if HAVE_AWESOME_PRINT
|
||||
ap event.to_hash
|
||||
|
|
58
lib/logstash/plugin.rb
Normal file
58
lib/logstash/plugin.rb
Normal file
|
@ -0,0 +1,58 @@
|
|||
require "logstash/namespace"
|
||||
require "logstash/logging"
|
||||
require "logstash/config/mixin"
|
||||
|
||||
class LogStash::Plugin
|
||||
|
||||
# This method is called when someone or something wants this plugin to shut
|
||||
# down. When you successfully shutdown, you must call 'finished'
|
||||
# You must also call 'super' in any subclasses.
|
||||
public
|
||||
def shutdown(queue)
|
||||
# By default, shutdown is assumed a no-op for all plugins.
|
||||
# If you need to take special efforts to shutdown (like waiting for
|
||||
# an operation to complete, etc)
|
||||
teardown
|
||||
@logger.info("Got shutdown signal for #{self}")
|
||||
|
||||
@shutdown_queue = queue
|
||||
if @plugin_state == :finished
|
||||
finished
|
||||
else
|
||||
@plugin_state = :terminating
|
||||
end
|
||||
end # def shutdown
|
||||
|
||||
# You should call this method when you (the plugin) are done with work
|
||||
# forever.
|
||||
public
|
||||
def finished
|
||||
if @shutdown_queue
|
||||
@logger.info("Sending shutdown event to agent queue. (plugin #{to_s})")
|
||||
@shutdown_queue << self
|
||||
end
|
||||
|
||||
if @plugin_state != :finished
|
||||
@logger.info("Plugin #{to_s} is finished")
|
||||
@plugin_state = :finished
|
||||
end
|
||||
end # def finished
|
||||
|
||||
# Subclasses should implement this teardown method if you need to perform any
|
||||
# special tasks during shutdown (like flushing, etc.)
|
||||
public
|
||||
def teardown
|
||||
# nothing by default
|
||||
end
|
||||
|
||||
public
|
||||
def finished?
|
||||
return @plugin_state == :finished
|
||||
end # def finished?
|
||||
|
||||
public
|
||||
def running?
|
||||
return @plugin_state != :finished
|
||||
end # def finished?
|
||||
|
||||
end # class LogStash::Plugin
|
|
@ -1,6 +1,7 @@
|
|||
|
||||
require "logstash/namespace"
|
||||
require "logstash/logging"
|
||||
require "logstash/plugin"
|
||||
require "logstash/event"
|
||||
|
||||
class LogStash::Search::Base
|
||||
|
|
|
@ -60,9 +60,11 @@ class LogStash::Web::Server < Sinatra::Base
|
|||
when "elasticsearch"
|
||||
# if host is nil, it will
|
||||
# TODO(sissel): Support 'cluster' name?
|
||||
cluster_name = (backend_url.path != "/" ? backend_url.path[1..-1] : nil)
|
||||
@backend = LogStash::Search::ElasticSearch.new(
|
||||
:host => backend_url.host,
|
||||
:port => backend_url.port
|
||||
:port => backend_url.port,
|
||||
:cluster => cluster_name
|
||||
)
|
||||
when "twitter"
|
||||
require "logstash/search/twitter"
|
||||
|
@ -118,7 +120,9 @@ opts = OptionParser.new do |opts|
|
|||
end
|
||||
|
||||
opts.on("-b", "--backend URL",
|
||||
"The backend URL to use. Default is elasticserach:/// (assumes multicast discovery)") do |url|
|
||||
"The backend URL to use. Default is elasticserach:/// (assumes " \
|
||||
"multicast discovery); You can specify " \
|
||||
"elasticsearch://[host][:port]/[clustername]") do |url|
|
||||
settings.backend_url = url
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
require File.join(File.dirname(__FILE__), "VERSION") # For LOGSTASH_VERSION
|
||||
|
||||
Gem::Specification.new do |spec|
|
||||
files = []
|
||||
paths = %w{lib examples etc patterns}
|
||||
|
@ -16,7 +18,7 @@ Gem::Specification.new do |spec|
|
|||
#rev = %x{svn info}.split("\n").grep(/Revision:/).first.split(" ").last.to_i
|
||||
rev = Time.now.strftime("%Y%m%d%H%M%S")
|
||||
spec.name = "logstash"
|
||||
spec.version = "1.0.4"
|
||||
spec.version = LOGSTASH_VERSION
|
||||
spec.summary = "logstash - log and event management"
|
||||
spec.description = "scalable log and event management (search, archive, pipeline)"
|
||||
spec.license = "Apache License (2.0)"
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue