Merge remote-tracking branch 'logstash/master'

Conflicts:
	lib/logstash/inputs/redis.rb
	lib/logstash/outputs/redis.rb
This commit is contained in:
Bob Corsaro 2011-05-26 10:20:02 -04:00
commit 1ba0c4dce6
47 changed files with 474 additions and 316 deletions

3
.gitignore vendored
View file

@ -3,5 +3,8 @@
*.class
*.tar.gz
*.jar
.bundle
build
local
test/setup/elasticsearch/elasticsearch-*
vendor

View file

@ -1,4 +1,36 @@
1.0.7 ( ????? )
1.0.11 (???)
- Fix bug in grep filter that would drop/cancel events if you had more
than one event type flowing through filters and didn't have a grep
filter defined for each type.
1.0.10 (May 23, 2011)
- Fix tcp input bug (LOGSTASH-88) that would drop connections.
- Grok patterns_dir (filter config) and --grok-patterns-dir (cmdline opt)
are now working.
- GELF output now properly sends extra fields from the log event (prefixed
with a "_") and sets timestamp to seconds-since-epoch (millisecond
precision and time zone information is lost, but this is the format GELF
asks for).
- Inputs support specifying the format of input data (see "format" and
"message_format" input config parameters).
- Grok filter no longer incorrectly tags _grokparsefailure when more than
one grok filter is enabled (for multiple types) or when an event has
no grok configuration for it's type.
- Fix bug where an invalid HTTP Referer: would break grok parsing of the
log line (used to expect %{URI}). Since Referer: is not sanitized in
the HTTP layer, we cannot assume it will be a well formed %{URI}.
1.0.9 (May 18, 2011)
- Fix crash bug caused by refactoring that left 'break' calls in code
that no longer used loops.
1.0.8 (May 17, 2011)
- Remove beanstalk support because the library (beanstalk-client) is GPL3. I
am not a lawyer, but I'm not waiting around to have someone complain about
license incompatibilities.
- fix bug in jar build
1.0.7 (May 16, 2011)
- logstash 'web' now allows you to specify the elasticsearch clustername;
--backend elasticsearch://[host[:port]]/[clustername]
- GELF output now supports dynamic strings for level and facility
@ -7,10 +39,9 @@
https://logstash.jira.com/browse/LOGSTASH-81
- Redis input and output are now supported. (Contributed by dokipen)
- Add shutdown processing. Shutdown starts when all inputs finish (like
stdin). The shutdown sequence always starts the inputs. The sequence
progresses using the same pipeline as the inputs/filters/outputs, so all
in-flight events should finish getting processed before the final shutdown
event makes it's way to the outputs.
stdin) The sequence progresses using the same pipeline as the
inputs/filters/outputs, so all in-flight events should finish getting
processed before the final shutdown event makes it's way to the outputs.
- Add retries to unhandled input exceptions (LOGSTASH-84)
1.0.6 (May 11, 2011)

View file

@ -12,3 +12,4 @@ Contributors:
* Naresh V (nareshov)
* John Vincent (lusis)
* Bob Corsaro (dokipen)
* kjoconnor

View file

@ -4,7 +4,7 @@ gem "bunny" # for amqp support, MIT-style license
gem "uuidtools" # for naming amqp queues, License ???
gem "filewatch", "~> 0.2.3" # for file tailing, BSD License
gem "jls-grok", "~> 0.4.7" # for grok filter, BSD License
gem "jruby-elasticsearch", "~> 0.0.7", BSD License
gem "jruby-elasticsearch", "~> 0.0.7" # BSD License
gem "stomp" # for stomp protocol, Apache 2.0 License
gem "json" # Ruby license
gem "awesome_print" # MIT License
@ -17,7 +17,6 @@ gem "haml" # License: MIT
# TODO(sissel): Put this into a group that's only used for monolith packaging
gem "mongo" # outputs/mongodb, License: Apache 2.0
gem "redis" # outputs/redis, License: MIT-style
gem "beanstalk-client" # for beanstalk support, License: GPL3
gem "gelf" # outputs/gelf, # License: MIT-style
# For testing/dev

View file

@ -2,7 +2,6 @@ GEM
remote: http://rubygems.org/
specs:
awesome_print (0.3.2)
beanstalk-client (1.1.0)
bson (1.3.0-java)
bunny (0.6.0)
daemons (1.1.2)
@ -44,7 +43,6 @@ PLATFORMS
DEPENDENCIES
awesome_print
beanstalk-client
bunny
filewatch (~> 0.2.3)
gelf

View file

@ -2,7 +2,8 @@
logstash is a tool for managing events and logs. You can use it to collect logs, parse them, and store them for later use (like, for searching). Speaking of searching, logstash comes with a web interface for searching and drilling into all of your logs.
It is fully free and fully open source. The license is New BSD, meaning you are pretty much free to use it however you want in whatever way.
It is fully free and fully open source. The license is Apache 2.0, meaning you
are pretty much free to use it however you want in whatever way.
For more info, see <http://logstash.net/>

View file

@ -1,5 +1,5 @@
require 'tempfile'
require 'ftools' # fails in 1.9.2
require "tempfile"
require "ftools" # fails in 1.9.2
require File.join(File.dirname(__FILE__), "VERSION") # For LOGSTASH_VERSION
@ -41,7 +41,7 @@ task :compile => "lib/logstash/config/grammar.rb" do |t|
#sh "rm -rf lib/net"
Dir.chdir("lib") do
rel_target = File.join("..", target)
sh "jrubyc", "-t", rel_target, "logstash/runner"
sh "jrubyc", "-t", rel_target, "logstash/runner.rb"
files = Dir.glob("**/*.rb")
files.each do |file|
d = File.join(rel_target, File.dirname(file))
@ -157,12 +157,12 @@ namespace :package do
# We compile stuff to build/...
# TODO(sissel): Could probably just use 'jar uf' for this?
#Dir.glob("build/**/*.class").each do |file|
#target = File.join("build-jar", file.gsub("build/", ""))
#mkdir_p File.dirname(target)
#puts "=> Copying #{file} => #{target}"
#File.copy(file, target)
#end
Dir.glob("build/ruby/**/*.class").each do |file|
target = File.join(builddir, file.gsub("build/ruby/", ""))
mkdir_p File.dirname(target)
puts "=> Copying #{file} => #{target}"
File.copy(file, target)
end
# Purge any extra files we don't need in META-INF (like manifests and
# jar signatures)
@ -274,9 +274,10 @@ task :doccopy => [:require_output_env] do
if ENV["output"].nil?
raise "No output variable set. Run like: 'rake docs output=path/to/output'"
end
output = ENV["output"].gsub("VERSION", LOGSTASH_VERSION)
Dir.glob("docs/**/*").each do |doc|
dir = File.join(ENV["output"], File.dirname(doc).gsub(/docs\/?/, ""))
dir = File.join(output, File.dirname(doc).gsub(/docs\/?/, ""))
mkdir_p dir if !File.directory?(dir)
if File.directory?(doc)
mkdir_p doc
@ -288,15 +289,17 @@ task :doccopy => [:require_output_env] do
end
task :docindex => [:require_output_env] do
sh "ruby docs/generate_index.rb #{ENV["output"]} > #{ENV["output"]}/index.html"
output = ENV["output"].gsub("VERSION", LOGSTASH_VERSION)
sh "ruby docs/generate_index.rb #{ENV["output"]} > #{output}/index.html"
end
task :docgen => [:require_output_env] do
if ENV["output"].nil?
raise "No output variable set. Run like: 'rake docgen output=path/to/output'"
end
output = ENV["output"].gsub("VERSION", LOGSTASH_VERSION)
sh "find lib/logstash/inputs lib/logstash/filters lib/logstash/outputs -type f -not -name 'base.rb' -a -name '*.rb'| xargs ruby docs/docgen.rb -o #{ENV["output"]}"
sh "find lib/logstash/inputs lib/logstash/filters lib/logstash/outputs -type f -not -name 'base.rb' -a -name '*.rb'| xargs ruby docs/docgen.rb -o #{output}"
end
task :publish do
@ -304,3 +307,18 @@ task :publish do
sh "gem push #{latest_gem}"
end
task :release do
docs_dir = File.join(File.dirname(__FILE__), "..", "logstash.github.com",
"docs", LOGSTASH_VERSION)
ENV["output"] = docs_dir
sh "sed -i -Re 's/1.0.[0-9]/#{LOGSTASH_VERSION}/'"
sh "git tag v#{LOGSTASH_VERSION}"
#Rake::Task["docs"].invoke
Rake::Task["package:gem"].invoke
Rake::Task["package:monolith:jar"].invoke
puts "Packaging complete."
puts "Run the following under ruby 1.8.7 (require bluecloth)"
puts "> rake docs output=#{docs_dir}"
end

View file

@ -1 +1 @@
LOGSTASH_VERSION = "1.0.6"
LOGSTASH_VERSION = "1.0.10"

View file

@ -1,4 +1,4 @@
#!/usr/bin/env ruby
#!/usr/bin/env jruby
$: << File.dirname($0) + "/../lib"
$: << File.dirname($0) + "/../test"

1
docs/.rvmrc Normal file
View file

@ -0,0 +1 @@
rvm 1.8.7

View file

@ -80,7 +80,7 @@ The above would internally be represented as this hash: `{ "field1" =>
Why this syntax? Well frankly it was easier than adding additional grammar to
the config language. Logstash may support ruby- or json-like hash syntax in the
future, but not otday.
future, but not today.
## Further reading

View file

@ -12,6 +12,8 @@ require "bluecloth" # for markdown parsing
$: << Dir.pwd
$: << File.join(File.dirname(__FILE__), "..", "lib")
require File.join(File.dirname(__FILE__), "..", "VERSION")
class LogStashConfigDocGenerator
COMMENT_RE = /^ *#(?: (.*)| *$)/
@ -66,6 +68,9 @@ class LogStashConfigDocGenerator
end # def add_comment
def add_config(code)
# trim off any possible multiline lamdas for a :validate
code.sub!(/, *:validate => \(lambda.*$/, '')
# call the code, which calls 'config' in this class.
# This will let us align comments with config options.
name, opts = eval(code)
@ -84,7 +89,7 @@ class LogStashConfigDocGenerator
name, description = eval(fixed_code)
@flags[name] = description
clear_comments
end # def add_config
end # def add_flag
def set_config_name(code)
name = eval(code)

View file

@ -29,6 +29,7 @@ This is what it might look like in your config file:
<%= section %> {
<%= name %> {
<% sorted_settings.each do |setting_name, config|
next if config[:deprecated]
if config[:validate].is_a?(Array)
annotation = "#{config[:validate].inspect}"
else
@ -49,10 +50,14 @@ This is what it might look like in your config file:
<h4>
<a name="setting_<%= setting_name %>">
<%= setting_name %><%= " (required setting)" if config[:required] %>
<%= " <strong>DEPRECATED</strong>" if config[:deprecated] %>
</a>
</h4>
<ul>
<% if config[:deprecated] -%>
<li> DEPRECATED WARNING: This config item is deprecated. It may be removed in a further version. </li>
<% end -%>
<% if config[:validate].is_a?(Symbol) -%>
<li> Value type is <%= config[:validate] %> </li>
<% elsif config[:validate].nil? -%>
@ -73,4 +78,4 @@ This is what it might look like in your config file:
<hr>
This is documentation from <a href="https://github.com/logstash/logstash/blob/master/<%= file %>"><%= file %>
This is documentation from <a href="https://github.com/logstash/logstash/blob/v<%= LOGSTASH_VERSION %>/<%= file %>"><%= file %>

View file

@ -10,7 +10,7 @@ If you're looking to extend logstash today, please look at the existing plugins.
Good examples include:
* [inputs/beanstalk](https://github.com/logstash/logstash/blob/master/lib/logstash/inputs/beanstalk.rb)
* [inputs/tcp](https://github.com/logstash/logstash/blob/master/lib/logstash/inputs/tcp.rb)
* [filters/multiline](https://github.com/logstash/logstash/blob/master/lib/logstash/filters/multiline.rb)
* [outputs/mongodb](https://github.com/logstash/logstash/blob/master/lib/logstash/outputs/mongodb.rb)

View file

@ -30,5 +30,5 @@ The logstash agent has the following flags (also try using the '--help' flag)
<dt> --port PORT </dt>
<dd> Port on which to start webserver. Default is 9292. </dd>
<dt> --backend URL </dt>
<dd> The backend URL to use. Default is elasticserach://localhost:9200/ </dd>
<dd> The backend URL to use. Default is elasticsearch://localhost:9200/ </dd>
</dl>

View file

@ -35,8 +35,8 @@ for such things, that works for me, too.)
logstash releases come in a few flavors.
* [Monolithic jar](http://semicomplete.com/files/logstash/logstash-1.0.6-monolithic.jar)
* [rubygem](https://github.com/downloads/logstash/releases/logstash-1.0.6.gem)
* [Monolithic jar](http://semicomplete.com/files/logstash/logstash-1.0.10-monolithic.jar)
* [rubygem](http://rubygems.org/gems/logstash/versions/1.0.10)
* [`gem install logstash`](http://rubygems.org/gems/logstash)
## What's next?

View file

@ -19,3 +19,29 @@ I do not suspect the 'x' (currently 1) will change frequently. It should only ch
if there are major, backwards-incompatible changes made to logstash, and I'm
trying to not make those changes, so logstash should forever be at 1.y,z,
right? ;)
# building a release.
* Make sure all tests pass
** rake test
* Update VERSION.rb
* Update docs/learn.md (fix download links)
* Ensure CHANGELOG is up-to-date
* git tag v$(ruby -r./VERSION -e 'puts LOGSTASH_VERSION')
* git push origin master
* git push --tags
* Build binaries
** rake package:gem
** rake package:monolith:jar
* rake docs output=../logstash.github.com/docs/VERSION
** Note: you will need to use c-ruby for this (ruby 1.8.7, etc)
** You'll need 'bluecloth' rubygem installed.
* cd ../logstash.github.com
** git add docs/$VERSION
** git commit -m "version $VERSION docs" && git push origin master
* Publish binaries
** Stage binaries at <tt>carrera.databits.net:/home/jls/s/files/logstash/
** rake publish
* Update #logstash IRC /topic
* Send announcement email to logstash-users@, include relevant download URLs &
changelog (see past emails for a template)

View file

@ -0,0 +1,20 @@
# Useful config for testing grok expressions (update "pattern" below)
input {
stdin {
type => test
}
}
filter {
grok {
type => "test"
pattern => "%{SYSLOGLINE}"
}
}
output {
stdout {
debug => true
}
}

View file

@ -0,0 +1,15 @@
# Example config demonstrating the use of message_format
input {
stdin {
type => test
format => json
message_format => "%{date} | %{user} | %{action} | %{reason}"
}
}
output {
stdout {
debug => true
}
}

View file

@ -458,10 +458,13 @@ class LogStash::Agent
@logger.debug(["Input #{input.to_s} thread exception backtrace",
e.backtrace])
@logger.error("Restarting input #{input.to_s} due to exception")
sleep(1)
retry # This jumps to the top of this proc (to the start of 'do'
end
end
@logger.warn("Input #{input.to_s} shutting down")
# If we get here, the plugin finished, check if we need to shutdown.
shutdown_if_none_running(LogStash::Inputs::Base, queue)
end # def run_input
@ -472,6 +475,8 @@ class LogStash::Agent
LogStash::Util::set_thread_name("filter|worker|#{index}")
filterworker.run
@logger.warn("Filter worker ##{index} shutting down")
# If we get here, the plugin finished, check if we need to shutdown.
shutdown_if_none_running(LogStash::FilterWorker, output_queue)
end # def run_filter
@ -495,9 +500,12 @@ class LogStash::Agent
@logger.debug(["Output #{output.to_s} thread exception backtrace",
e.backtrace])
# TODO(sissel): should we abort after too many failures?
sleep(1)
retry
end # begin/rescue
@logger.warn("Output #{input.to_s} shutting down")
# If we get here, the plugin finished, check if we need to shutdown.
shutdown_if_none_running(LogStash::Outputs::Base)
end # def run_output

View file

@ -48,6 +48,15 @@ module LogStash::Config::Mixin
exit 1
end
# warn about deprecated variable use
params.each do |name, value|
opts = self.class.get_config[name]
if opts && opts[:deprecated]
@logger.warn("Deprecated config item #{name.inspect} set " +
"in #{self.class.name}")
end
end
# Set defaults from 'config :foo, :default => somevalue'
self.class.get_config.each do |name, opts|
next if params.include?(name.to_s)

View file

@ -46,6 +46,11 @@ class LogStash::Event
def timestamp; @data["@timestamp"]; end # def timestamp
def timestamp=(val); @data["@timestamp"] = val; end # def timestamp=
def unix_timestamp
time = @@date_parser.parseDateTime(timestamp)
return time.getMillis.to_f / 1000
end
public
def source; @data["@source"]; end # def source
def source=(val)

View file

@ -14,6 +14,7 @@ class LogStash::File::Manager
def initialize(output_queue)
@tail = FileWatch::TailGlob.new
@watching = Hash.new
@to_event = Hash.new
@watching_lock = Mutex.new
@file_threads = {}
@main_thread = nil
@ -36,11 +37,11 @@ class LogStash::File::Manager
end
public
def watch(paths, config)
def watch(paths, config, to_event)
@watching_lock.synchronize do
paths.each do |path|
if @watching[path]
raise ValueError, "cannot watch the same path #{path} more than once"
raise "cannot watch the same path #{path} more than once"
end
@logger.debug(["watching file", {:path => path, :config => config}])
@ -61,6 +62,7 @@ class LogStash::File::Manager
@tail.tail(path, tailconf) do |fullpath|
@logger.info("New file found: #{fullpath}")
@watching[fullpath] = config
@to_event[fullpath] = to_event
end
# TODO(sissel): Make FileWatch emit real exceptions
rescue RuntimeError
@ -83,16 +85,13 @@ class LogStash::File::Manager
# Maybe extend @tail.tail to accept a extra args that it will
# pass to subscribe's callback?
config = @watching[path]
to_event = @to_event[path]
@logger.debug(["Event from tail", { :path => path, :config => config }])
@buffers[path].extract(data).each do |line|
e = LogStash::Event.new({
"@message" => line,
"@type" => config["type"],
"@tags" => config["tag"].dup,
})
e.source = "file://#{@hostname}#{path}"
@logger.debug(["New event from file input", path, e])
@output_queue << e
e = to_event.call(line, "file://#{@hostname}#{path}")
if e
@output_queue << e
end
end
end
rescue Exception => e

View file

@ -14,7 +14,7 @@ require "logstash/time"
# backfilling old data. If you don't get the date correct in your
# event, then searching for them later will likely sort out of order.
#
# In the absense of this filter, logstash will choose a timestamp based on the
# In the absence of this filter, logstash will choose a timestamp based on the
# first time it sees the event (at input time), if the timestamp is not already
# set in the event. For example, with file input, the timestamp is set to the
# time of reading.
@ -168,6 +168,7 @@ class LogStash::Filters::Date < LogStash::Filters::Base
@logger.debug "Parsed #{value.inspect} as #{event.timestamp}"
rescue => e
@logger.warn "Failed parsing date #{value.inspect} from field #{field}: #{e}"
@logger.debug(["Backtrace", e.backtrace])
# Raising here will bubble all the way up and cause an exit.
# TODO(sissel): Maybe we shouldn't raise?
#raise e

View file

@ -42,7 +42,6 @@ class LogStash::Filters::Grep < LogStash::Filters::Base
def filter(event)
if event.type != @type
@logger.debug("grep: skipping type #{event.type} from #{event.source}")
event.cancel
return
end

View file

@ -1,5 +1,6 @@
require "logstash/filters/base"
require "logstash/namespace"
require "set"
# Parse arbitrary text and structure it.
# Grok is currently the best way in logstash to parse crappy unstructured log
@ -50,20 +51,17 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
# requested in: googlecode/issue/26
config :drop_if_match, :validate => :boolean, :default => false
class << self
attr_accessor :patterns_dir
end
# Detect if we are running from a jarfile, pick the right path.
@@patterns_path ||= Set.new
if __FILE__ =~ /file:\/.*\.jar!.*/
self.patterns_dir = ["#{File.dirname(__FILE__)}/../../patterns/*"]
@@patterns_path += ["#{File.dirname(__FILE__)}/../../patterns/*"]
else
self.patterns_dir = ["#{File.dirname(__FILE__)}/../../../patterns/*"]
@@patterns_path += ["#{File.dirname(__FILE__)}/../../../patterns/*"]
end
# This flag becomes "--grok-patterns-path"
flag("--patterns-path PATH", "Colon-delimited path of patterns to load") do |val|
@patterns_dir += val.split(":")
@@patterns_path += val.split(":")
end
@@grokpiles = Hash.new { |h, k| h[k] = [] }
@ -75,12 +73,14 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
require "grok" # rubygem 'jls-grok'
@pile = Grok::Pile.new
@logger.info("Grok patterns paths: #{self.class.patterns_dir.inspect}")
self.class.patterns_dir.each do |path|
@patterns_dir ||= []
@patterns_dir += @@patterns_path.to_a
@logger.info("Grok patterns path: #{@patterns_dir.join(":")}")
@patterns_dir.each do |path|
# Can't read relative paths from jars, try to normalize away '../'
while path =~ /file:\/.*\.jar!.*\/\.\.\//
# replace /foo/bar/../baz => /foo/baz
path.gsub!(/[^\/]+\/\.\.\//, "")
path = path.gsub(/[^\/]+\/\.\.\//, "")
@logger.debug "In-jar path to read: #{path}"
end
@ -109,6 +109,16 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
# parse it with grok
match = false
# Only filter events we are configured for
if event.type != @type
return
end
if @@grokpiles[event.type].length == 0
@logger.debug("Skipping grok for event type=#{event.type} (no grokpiles defined)")
return
end
if !event.message.is_a?(Array)
messages = [event.message]
else
@ -119,7 +129,7 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
@logger.debug(["Running grok filter", event])
@@grokpiles[event.type].each do |pile|
@logger.debug(["Trying pattern", pile])
@logger.debug(["Trying pattern for type #{event.type}", pile])
grok, match = @pile.match(message)
@logger.debug(["Result", { :grok => grok, :match => match }])
break if match

View file

@ -41,6 +41,8 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
def initialize(params)
super
@format ||= ["json_event"]
if !MQTYPES.include?(@exchange_type)
raise "Invalid type '#{@exchange_type}' must be one of #{MQTYPES.join(", ")}"
end
@ -50,14 +52,21 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
def register
@logger.info("Registering input #{@url}")
require "bunny" # rubygem 'bunny'
@vhost ||= "/"
@port ||= 5672
@amqpsettings = {
:vhost => (@vhost or "/"),
:vhost => @vhost,
:host => @host,
:port => (@port or 5672),
:port => @port,
}
@amqpsettings[:user] = @user if @user
@amqpsettings[:pass] = @password.value if @password
@amqpsettings[:logging] = @debug
@amqpurl = "amqp://"
if @user or @password
@amqpurl += "#{@user}:#{@password}@"
end
@amqpurl += "#{@host}:#{@port}#{@vhost}/#{@name}"
end # def register
def run(queue)
@ -73,14 +82,10 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
@queue.bind(exchange)
@queue.subscribe do |data|
begin
obj = JSON.parse(data[:payload])
rescue => e
@logger.error(["json parse error", { :exception => e }])
raise e
e = to_event(data[:payload], @amqpurl)
if e
queue << e
end
queue << LogStash::Event.new(obj)
end # @queue.subscribe
rescue *[Bunny::ConnectionError, Bunny::ServerDownError] => e
@logger.error("AMQP connection error, will reconnect: #{e}")

View file

@ -16,6 +16,21 @@ class LogStash::Inputs::Base < LogStash::Plugin
# Set this to true to enable debugging on an input.
config :debug, :validate => :boolean, :default => false
# The format of input data (plain, json, json_event)
config :format, :validate => (lambda do |value|
valid_formats = ["plain", "json", "json_event"]
if value.length != 1
false
else
valid_formats.member?(value.first)
end
end) # config :format
# If format is "json", an event sprintf string to build what
# the display @message should be (defaults to the raw JSON).
# sprintf format strings look like %{fieldname} or %{@metadata}.
config :message_format, :validate => :string
# Add any number of arbitrary tags to your event.
#
# This can help with processing later.
@ -48,4 +63,53 @@ class LogStash::Inputs::Base < LogStash::Plugin
def tag(newtag)
@tags << newtag
end # def tag
protected
def to_event(raw, source)
@format ||= ["plain"]
event = LogStash::Event.new
event.type = @type
event.tags = @tags.clone rescue []
event.source = source
case @format.first
when "plain":
event.message = raw
when "json":
begin
fields = JSON.parse(raw)
fields.each { |k, v| event[k] = v }
rescue => e
@logger.warn({:message => "Trouble parsing json input",
:input => raw,
:source => source,
})
@logger.debug(["Backtrace", e.backtrace])
return nil
end
if @message_format
event.message = event.sprintf(@message_format)
else
event.message = raw
end
when "json_event":
begin
event = LogStash::Event.from_json(raw)
rescue => e
@logger.warn({:message => "Trouble parsing json_event input",
:input => raw,
:source => source,
})
@logger.debug(["Backtrace", e.backtrace])
return nil
end
else
raise "unknown event format #{@format.first}, this should never happen"
end
logger.debug(["Received new event", {:source => source, :event => event}])
return event
end
end # class LogStash::Inputs::Base

View file

@ -1,50 +0,0 @@
require "logstash/inputs/base"
require "logstash/namespace"
# Pull events from a beanstalk tube.
#
# TODO(sissel): Document where to learn more about beanstalk.
class LogStash::Inputs::Beanstalk < LogStash::Inputs::Base
config_name "beanstalk"
# The address of the beanstalk server
config :host, :validate => :string, :required => true
# The port of your beanstalk server
config :port, :validate => :number, :default => 11300
# The name of the beanstalk tube
config :tube, :validate => :string, :required => true
public
def register
require "beanstalk-client"
# TODO(petef): support pools of beanstalkd servers
# TODO(petef): check for errors
@beanstalk = Beanstalk::Pool.new(["#{@host}:#{@port}"])
@beanstalk.watch(@tube)
end # def register
public
def run(output_queue)
loop do
job = @beanstalk.reserve
begin
event = LogStash::Event.from_json(job.body)
rescue => e
@logger.warn(["Trouble parsing beanstalk job",
{:error => e.message, :body => job.body,
:backtrace => e.backtrace}])
job.bury(job, 0)
end
output_queue << event
job.delete
end
end # def run
public
def teardown
@beanstalk.close rescue nil
end # def teardown
end # class LogStash::Inputs::Beanstalk

View file

@ -46,6 +46,6 @@ class LogStash::Inputs::File < LogStash::Inputs::Base
end
end
@@filemanager.watch(@path, @config)
@@filemanager.watch(@path, @config, method(:to_event))
end # def run
end # class LogStash::Inputs::File

View file

@ -39,6 +39,14 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
# Maximum number of retries on a read before we give up.
config :retries, :validate => :number, :default => 5
public
def initialize(params)
super
@format ||= ["json_event"]
end # def initialize
public
def register
require 'redis'
@redis = nil
@ -46,8 +54,11 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
unless @list or @channel
raise "Must specify redis list or channel"
end
end
@redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}"
end # def register
private
def connect
Redis.new(
:host => @host,
@ -56,8 +67,9 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
:db => @db,
:password => @password
)
end
end # def connect
public
def run(output_queue)
wait = Proc.new do |command, *args|
retries = @retries
@ -74,22 +86,20 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
response = @redis.send(command, *args)
end
retries = @retries
begin
output_queue << LogStash::Event.new(JSON.parse(response[1]))
rescue # parse or event creation error
@logger.error "failed to create event with '#{response[1]}'"
@logger.error $!
e = to_event(response[1], @redis_url)
if e
output_queue << e
end
rescue => e # redis error
@logger.warn(["Failed to get event from redis #{@name}. " +
"Will retry #{retries} times.", $!])
@logger.debug(["Backtrace", e.backtrace])
if retries <= 0
raise RuntimeError, "Redis connection failed too many times"
end
rescue # redis error
raise "Redis connection failed too many times" if retries <= 0
@redis = nil
@logger.warn "Failed to get event from redis #{@name}. "+
"Will retry #{retries} times."
@logger.warn $!
@logger.warn $!.backtrace
retries -= 1
sleep 1
retry
sleep(1)
end
end
@ -149,16 +159,16 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
@logger.error $!
@logger.error $!.backtrace
end
rescue # redis error
raise "Redis connection failed too many times" if retries <= 0
rescue => e # redis error
@logger.warn(["Failed to get event from redis #{@name}. " +
"Will retry #{retries} times.", $!])
@logger.debug(["Backtrace", e.backtrace])
if retries <= 0
raise RuntimeError, "Redis connection failed too many times"
end
@redis = nil
@logger.warn "Failed to get event from redis #{@name}. "+
"Will retry #{retries} times."
@logger.warn $!
@logger.warn $!.backtrace
retries -= 1
sleep 1
retry
sleep(1)
end
end # loop
end # Thread.new

View file

@ -17,19 +17,10 @@ class LogStash::Inputs::Stdin < LogStash::Inputs::Base
def run(queue)
loop do
event = LogStash::Event.new
begin
event.message = $stdin.readline.chomp
rescue *[EOFError, IOError] => e
@logger.info("Got EOF from stdin input. Ending")
finished
return
e = to_event($stdin.readline.chomp, "stdin://#{@host}/")
if e
queue << e
end
event.type = @type
event.tags = @tags.clone rescue []
event.source = "stdin://#{@host}/"
@logger.debug(["Got event", event])
queue << event
end # loop
end # def run

View file

@ -1,17 +1,17 @@
require "logstash/inputs/base"
require "logstash/namespace"
# TODO(sissel): This class doesn't work yet in JRuby. Google for
# 'execution expired stomp jruby' and you'll find the ticket.
# TODO(sissel): This class doesn't work yet in JRuby.
# http://jira.codehaus.org/browse/JRUBY-4941
# Stream events from a STOMP broker.
#
# TODO(sissel): Include info on where to learn about STOMP
# http://stomp.codehaus.org/
class LogStash::Inputs::Stomp < LogStash::Inputs::Base
config_name "stomp"
# The address of the STOMP server.
config :host, :validate => :string
config :host, :validate => :string, :default => "localhost"
# The port to connet to on your STOMP server.
config :port, :validate => :number, :default => 61613
@ -25,33 +25,37 @@ class LogStash::Inputs::Stomp < LogStash::Inputs::Base
# The destination to read events from.
#
# Example: "/topic/logstash"
config :destination, :validate => :string
config :destination, :validate => :string, :required => true
# Enable debugging output?
config :debug, :validate => :boolean, :default => false
public
def initialize(params)
super
@format ||= "json_event"
raise "Stomp input currently not supported. See " +
"http://jira.codehaus.org/browse/JRUBY-4941 and " +
"https://logstash.jira.com/browse/LOGSTASH-8"
end
public
def register
require "stomp"
if @destination == "" or @destination.nil?
@logger.error("No destination path given for stomp")
return
end
begin
@client = Stomp::Client.new(@user, @password.value, @host, @port)
rescue Errno::ECONNREFUSED
@logger.error("Connection refused to #{@host}:#{@port}...")
# TODO(sissel): Retry?
end
@client = Stomp::Client.new(@user, @password.value, @host, @port)
@stomp_url = "stomp://#{@user}:#{@password}@#{@host}:#{@port}/#{@destination}"
end # def register
def run(queue)
def run(output_queue)
@client.subscribe(@destination) do |msg|
@logger.debug(["Got message from stomp", { :msg => msg }])
#event = LogStash::Event.from_json(message.body)
#queue << event
e = to_event(message.body, @stomp_url)
if e
output_queue << e
end
end
raise "disconnected from stomp server"
end # def run
end # class LogStash::Inputs::Stomp

View file

@ -19,6 +19,14 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
# The port to listen on
config :port, :validate => :number, :default => 514
public
def initialize(params)
super
# force "plain" format. others don't make sense here.
@format = ["plain"]
end # def initialize
public
def register
# This comes from RFC3164, mostly.
@ -36,8 +44,9 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
LogStash::Util::set_thread_name("input|syslog|udp")
begin
udp_listener(output_queue)
rescue
rescue => e
@logger.warn("syslog udp listener died: #{$!}")
@logger.debug(["Backtrace", e.backtrace])
sleep(5)
retry
end # begin
@ -48,8 +57,9 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
LogStash::Util::set_thread_name("input|syslog|tcp")
begin
tcp_listener(output_queue)
rescue
rescue => e
@logger.warn("syslog tcp listener died: #{$!}")
@logger.debug(["Backtrace", e.backtrace])
sleep(5)
retry
end # begin
@ -64,20 +74,13 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
loop do
line, client = server.recvfrom(9000)
p :client => client
p :line => line
begin
event = LogStash::Event.new({
"@message" => line.chomp,
"@type" => @type,
"@tags" => @tags.clone,
})
source = URI::Generic.new("syslog", nil, client[3], nil, nil, nil, nil, nil, nil, nil)
syslog_relay(event, source)
rescue => e
p :exception => e
source = URI::Generic.new("syslog", nil, client[3], nil, nil, nil, nil,
nil, nil, nil)
e = to_event(line.chomp, source.to_s)
if e
syslog_relay(e, source)
output_queue << e
end
output_queue << event
end
ensure
if server
@ -96,19 +99,18 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
ip, port = client.peeraddr[3], client.peeraddr[1]
@logger.warn("got connection from #{ip}:#{port}")
LogStash::Util::set_thread_name("input|syslog|tcp|#{ip}:#{port}}")
source_base = URI::Generic.new("syslog", nil, ip, nil, nil, nil, nil, nil, nil, nil)
source_base = URI::Generic.new("syslog", nil, ip, nil, nil, nil, nil,
nil, nil, nil)
client.each do |line|
event = LogStash::Event.new({
"@message" => line.chomp,
"@type" => @type,
"@tags" => @tags.clone,
})
source = source_base.dup
syslog_relay(event, source)
output_queue << event
end
end
end
e = to_event(line.chomp, source_base.to_s)
if e
source = source_base.dup
syslog_relay(e, source)
output_queue << e
end # e
end # client.each
end # Thread.new
end # loop do
ensure
server.close if server
end # def tcp_listener

View file

@ -44,17 +44,14 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
Timeout::timeout(@data_timeout) do
buf = s.readline
end
e = LogStash::Event.new({
"@message" => buf,
"@type" => @type,
"@tags" => @tags.clone,
})
e.source = "tcp://#{@host}:#{@port}/client/#{peer}"
@logger.debug(["Received message from #{peer}"], e)
output_queue << e
e = self.to_event(buf, "tcp://#{@host}:#{@port}/client/#{peer}")
if e
output_queue << e
end
end # loop do
rescue
@logger.debug("Closing connection with #{peer}")
rescue => e
@logger.debug(["Closing connection with #{peer}", $!])
@logger.debug(["Backtrace", e.backtrace])
rescue Timeout::Error
@logger.debug("Closing connection with #{peer} after read timeout")
end # begin

View file

@ -17,6 +17,15 @@ class LogStash::Inputs::Twitter < LogStash::Inputs::Base
# Any keywords to track in the twitter stream
config :keywords, :validate => :array, :required => true
public
def initialize(params)
super
# Force format to plain. Other values don't make any sense here.
@format = ["plain"]
end # def initialize
public
def register
# TODO(sissel): put buftok in logstash, too
require "filewatch/buftok"
@ -32,34 +41,23 @@ class LogStash::Inputs::Twitter < LogStash::Inputs::Base
@logger.debug :status => status
#@logger.debug("Got twitter status from @#{status[:user][:screen_name]}")
@logger.info("Got twitter status from @#{status["user"]["screen_name"]}")
event = LogStash::Event.new(
#"@message" => status[:text],
"@message" => status["text"],
"@type" => @type,
"@tags" => @tags.clone
)
e = to_event(status["text"], "http://twitter.com/#{status["user"]["screen_name"]}/status/#{status["id"]}")
next unless e
event.fields.merge!(
#"user" => (status[:user][:screen_name] rescue nil),
"user" => (status["user"]["screen_name"] rescue nil),
#"client" => (status[:source] rescue nil),
e.fields.merge!(
"user" => (status["user"]["screen_name"] rescue nil),
"client" => (status["source"] rescue nil),
#"retweeted" => (status[:retweeted] rescue nil)
"retweeted" => (status["retweeted"] rescue nil)
)
#event.fields["in-reply-to"] = status[:in_reply_to_status_id] if status[:in_reply_to_status_id]
event.fields["in-reply-to"] = status["in_reply_to_status_id"] if status["in_reply_to_status_id"]
e.fields["in-reply-to"] = status["in_reply_to_status_id"] if status["in_reply_to_status_id"]
#urls = status[:entities][:urls] rescue []
urls = status["entities"]["urls"] rescue []
if urls.size > 0
event.fields["urls"] = urls.collect { |u| u["url"] }
e.fields["urls"] = urls.collect { |u| u["url"] }
end
event.source = "http://twitter.com/#{event.fields["user"]}/status/#{status["id"]}"
@logger.debug(["Got event", event])
queue << event
queue << e
end # stream.track
# Some closure or error occured, sleep and try again.
@ -91,6 +89,7 @@ class LogStash::Inputs::Twitter < LogStash::Inputs::Base
yield status
rescue => e
@logger.error e
@logger.debug(["Backtrace", e.backtrace])
end
end # buffer.extract
end # response.read_body

View file

@ -63,12 +63,12 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
@logger.debug(["Connecting to AMQP", amqpsettings, @exchange_type, @name])
@bunny = Bunny.new(amqpsettings)
@bunny.start
break # success
rescue Bunny::ServerDownError => e
rescue => e
if terminating?
return
else
@logger.error("AMQP connection error, will reconnect: #{e}")
@logger.debug(["Backtrace", e.backtrace])
sleep(1)
retry
end
@ -81,8 +81,12 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
@logger.debug(["Sending event", { :destination => to_s, :event => event }])
begin
if @target
@target.publish(event.to_json, :persistent => @persistent)
break;
begin
@target.publish(event.to_json, :persistent => @persistent)
rescue JSON::GeneratorError
@logger.warn(["Trouble converting event to JSON", $!, event.to_hash])
return
end
else
@logger.warn("Tried to send message, but not connected to amqp yet.")
end

View file

@ -1,40 +0,0 @@
require "logstash/outputs/base"
require "logstash/namespace"
class LogStash::Outputs::Beanstalk < LogStash::Outputs::Base
config_name "beanstalk"
# The address of the beanstalk server
config :host, :validate => :string, :required => true
# The port of your beanstalk server
config :port, :validate => :number, :default => 11300
# The name of the beanstalk tube
config :tube, :validate => :string, :required => true
# The message priority (see beanstalk docs)
config :priority, :validate => :number, :default => 65536
# The message delay (see beanstalk docs)
config :delay, :validate => :number, :default => 0
# TODO(sissel): Document this
# See beanstalk documentation
config :ttr, :validate => :number, :default => 300
public
def register
require "beanstalk-client"
# TODO(petef): support pools of beanstalkd servers
# TODO(petef): check for errors
@beanstalk = Beanstalk::Pool.new(["#{@host}:#{@port}"])
@beanstalk.use(@tube)
end # def register
public
def receive(event)
@beanstalk.put(event.to_json, @priority, @delay, @ttr)
end # def register
end # class LogStash::Outputs::Beanstalk

View file

@ -69,15 +69,17 @@ class LogStash::Outputs::Gelf < LogStash::Outputs::Base
event.fields.each do |name, value|
next if value == nil or value.empty?
m["#{name}"] = value
name = "_id" if name == "id" # "_id" is reserved, so use "__id"
m["_#{name}"] = (value.length == 1) ? value.first : value
end
# Allow 'INFO' 'I' or number. for 'level'
level = event.sprintf(@level.to_s)
m["level"] = (@level_map[level.downcase] || level).to_i
m["facility"] = event.sprintf(@facility)
m["timestamp"] = event.timestamp
m["timestamp"] = event.unix_timestamp.to_i
@logger.debug(["Sending GELF event", m])
@gelf.notify!(m)
end # def receive
end # class LogStash::Outputs::Gelf

View file

@ -15,7 +15,7 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base
config :database, :validate => :string, :required => true
# The collection to use. This value can use %{foo} values to dynamically
# select a collection based on data in th eevent.
# select a collection based on data in the event.
config :collection, :validate => :string, :required => true
public

View file

@ -106,10 +106,11 @@ class LogStash::Outputs::Nagios < LogStash::Outputs::Base
f.puts(cmd)
f.flush # TODO(sissel): probably don't need this.
end
rescue
rescue => e
@logger.warn(["Skipping nagios output; error writing to command file",
{"error" => $!, "commandfile" => @commandfile,
"missed_event" => event}])
@logger.debug(["Backtrace", e.backtrace])
end
end # def receive
end # class LogStash::Outputs::Nagios

View file

@ -71,16 +71,17 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
@redis.rpush event.sprintf(@list), event.to_json if @list
@redis.publish event.sprintf(@channel), event.to_json if @channel
@redis.exec if tx
rescue
rescue => e
# TODO(sissel): Be specific in the exceptions we rescue.
# Drop the redis connection to be picked up later during a retry.
@redis = nil
@logger.warn "Failed to log #{event.to_s} to redis #{@name}. "+
"Will retry #{tries} times."
@logger.warn $!
@logger.warn("Failed to log #{event.to_s} to redis #{@name}. "+
"Will retry #{tries} times.")
@logger.warn($!)
@logger.debug(["Backtrace", e.backtrace])
Thread.new do
sleep 1
receive event, tries - 1
receive(event, tries - 1)
end
end
end # def receive

View file

@ -3,10 +3,8 @@ require "logstash/namespace"
class LogStash::Outputs::Stdout < LogStash::Outputs::Base
begin
require "ap"
HAVE_AWESOME_PRINT = true
require "ap"
rescue LoadError
HAVE_AWESOME_PRINT = false
end
config_name "stdout"
@ -14,16 +12,19 @@ class LogStash::Outputs::Stdout < LogStash::Outputs::Base
# Enable debugging. Tries to pretty-print the entire event object.
config :debug, :validate => :boolean
public
def initialize(params)
super
#@debug ||= false
end
# Debug output format: ruby (default), json
config :debug_format, :default => ["ruby"], :validate => (lambda do |value|
valid_formats = ["ruby", "json"]
if value.length != 1
false
else
valid_formats.member?(value.first)
end
end) # config :debug_format
public
def register
# nothing to do
@print_method = method(:ap) rescue method(:p)
end
public
@ -34,10 +35,13 @@ class LogStash::Outputs::Stdout < LogStash::Outputs::Base
end
if @debug
if HAVE_AWESOME_PRINT
ap event.to_hash
case @debug_format.first
when "ruby":
@print_method.call(event.to_hash)
when "json":
puts event.to_json
else
p event.to_hash
raise "unknown debug_format #{@debug_format}, this should never happen"
end
else
puts event.to_s

View file

@ -8,7 +8,7 @@ class LogStash::Outputs::Stomp < LogStash::Outputs::Base
# The address of the STOMP server.
config :host, :validate => :string
# The port to connet to on your STOMP server.
# The port to connect to on your STOMP server.
config :port, :validate => :number, :default => 61613
# The username to authenticate with.

View file

@ -36,8 +36,9 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
connect unless @socket
@socket.write(event.to_hash.to_json)
@socket.write("\n")
rescue
rescue => e
@logger.warn(["tcp output exception", @host, @port, $!])
@logger.debug(["backtrace", e.backtrace])
@socket = nil
end
end # def receive

View file

@ -1,28 +1,37 @@
require "rubygems"
require "logstash/namespace"
$: << File.join(File.dirname(__FILE__), "../")
command = ARGV.shift
class LogStash::Runner
def self.main(args)
$: << File.join(File.dirname(__FILE__), "../")
command = args.shift
commands = {
"agent" => proc do
require "logstash/agent"
agent = LogStash::Agent.new
agent.argv = ARGV
agent.run
end,
"web" => proc do
require "logstash/web/server"
end,
"test" => proc do
require "logstash_test_runner"
end
}
commands = {
"agent" => proc do
require "logstash/agent"
agent = LogStash::Agent.new
agent.argv = args
agent.run
end,
"web" => proc do
require "logstash/web/server"
end,
"test" => proc do
require "logstash_test_runner"
end
}
if commands.include?(command)
commands[command].call
else
$stderr.puts "No such command #{command.inspect}"
$stderr.puts "Available commands:"
$stderr.puts commands.keys.map { |s| " #{s}" }.join("\n")
exit 1
if commands.include?(command)
commands[command].call
else
$stderr.puts "No such command #{command.inspect}"
$stderr.puts "Available commands:"
$stderr.puts commands.keys.map { |s| " #{s}" }.join("\n")
exit 1
end
end # def self.main
end # class LogStash::Runner
if $0 == __FILE__
LogStash::Runner.main(ARGV)
end

View file

@ -88,7 +88,7 @@ QS %{QUOTEDSTRING}
# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
COMBINEDAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) "(?:%{URI:referrer}|-)" %{QS:agent}
COMBINEDAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"%{URI:referrer}"|%{QUOTEDSTRING:referrer}|"-") %{QS:agent}
#
# Custom formats