Merge branch 'master' of git://github.com/logstash/logstash

This commit is contained in:
Alex Wheeler 2012-11-14 14:33:23 -05:00
commit 5abc9b3e95
44 changed files with 985 additions and 274 deletions

1
.gitignore vendored
View file

@ -1,6 +1,7 @@
.*.swp
*.gem
*.class
Gemfile.lock
*.tar.gz
*.jar
.bundle

View file

@ -1,5 +1,49 @@
1.1.5 (????????????????)
- irc input now stores nick
1.1.6 (???)
## Overview of this release:
- bug fixes
## general
- fixed internal dependency versioning on 'addressable' gem (LOGSTASH-694)
1.1.5 (November 10, 2012)
## Overview of this release:
* New inputs: zenoss, gemfire
* New outputs: lumberjack, gemfire
* Many UTF-8 crashing bugs were resolved
## general
- new runner command 'rspec' - lets you run rspec tests from the jar
This means you should now be able to write external tests that execute your
logstash configs and verify functionality.
- "file not found" errors related to paths that had "jar:" prefixes should
now work. (Fixes LOGSTASH-649, LOGSTASH-642, LOGSTASH-655)
- several plugins received UTF-8-related fixes (file, lumberjack, etc)
File bugs if you see any UTF-8 related crashes.
- 'json_event' format inputs will now respect 'tags' (#239, patch by
Tim Laszlo)
- logstash no longer uses nor recommends bundler (see 'gembag.rb'). The
Gemfile will be purged in the near future.
- amqp plugins are now marked 'unsupported' as there is no active maintainer
nor is there source of active support in the community. If you're interested
in maintainership, please email the mailling list or contact Jordan!
## inputs
- irc: now stores irc nick
- new: zenoss (#232, patch by Chet Luther)
- new: gemfire (#235, patch by Andrea Campi)
- bugfix: udp: skip close() call if we're already closed (#238, patch by kcrayon)
## filters
- bugfix: fix for zeromq filter initializer (#237, patch by Tom Howe)
## outputs
- new: lumberjack
- new: gemfire output (#234, patch by Andrea Campi)
- improved: nagios_ncsa (patch by Tomas Doran)
- improved: elasticsearch: permit setting 'host' even if embedded. Also set the
host default to 'localhost' when using embedded. These fixes should help resolve
issues new users have when their distros surprisingly block multicast by default.
- improved: elasticsearch: failed index attempts will be retried
1.1.4 (October 28, 2012)
## Overview of this release:

View file

@ -58,7 +58,7 @@ Contributors:
* Mike Worth (MikeWorth)
* Nic Williams (drnic)
* Tomas Doran (bobtfish)
* zuazo
* Xabier de Zuazo (zuazo)
Note: If you've sent me patches, bug reports, or otherwise contributed to
logstash, and you aren't on the list above and want to be, please let me know

View file

@ -1,191 +0,0 @@
PATH
remote: .
specs:
logstash (1.1.4-java)
addressable (= 2.2.6)
aws-sdk
bunny
cabin (= 0.4.4)
cinch
ffi
ffi-rzmq (= 0.9.3)
filewatch (= 0.5.0)
ftw (~> 0.0.22)
gelf (= 1.3.2)
gelfd (= 0.2.0)
geoip (>= 1.1.0)
gmetric (= 0.1.3)
haml
heroku
jls-grok (= 0.10.7)
jls-lumberjack
jruby-elasticsearch (= 0.0.14)
jruby-httpclient
jruby-openssl
jruby-win32ole
json
mail
minitest
mongo
onstomp
pry
rack
redis
riak-client (= 1.0.3)
riemann-client (= 0.0.6)
sass
sinatra
statsd-ruby (= 0.3.0)
stud
uuidtools
xml-simple
xmpp4r (= 0.5)
GEM
remote: http://rubygems.org/
specs:
activesupport (3.2.8)
i18n (~> 0.6)
multi_json (~> 1.0)
addressable (2.2.6)
aws-sdk (1.6.9)
httparty (~> 0.7)
json (~> 1.4)
nokogiri (>= 1.4.4)
uuidtools (~> 2.1)
backports (2.3.0)
beefcake (0.3.7)
bouncy-castle-java (1.5.0146.1)
bson (1.7.0-java)
builder (3.1.4)
bunny (0.8.0)
cabin (0.4.4)
json
cinch (2.0.3)
coderay (1.0.8)
diff-lcs (1.1.3)
excon (0.16.7)
ffi (1.1.5-java)
ffi-rzmq (0.9.3)
ffi
filewatch (0.5.0)
ftw (0.0.22)
addressable (= 2.2.6)
backports (= 2.3.0)
cabin (> 0)
http_parser.rb (= 0.5.3)
json (= 1.6.5)
minitest (> 0)
gelf (1.3.2)
json
gelfd (0.2.0)
geoip (1.2.0)
gmetric (0.1.3)
haml (3.1.7)
heroku (2.33.0)
heroku-api (~> 0.3.5)
launchy (>= 0.3.2)
netrc (~> 0.7.7)
rest-client (~> 1.6.1)
rubyzip
heroku-api (0.3.5)
excon (~> 0.16.1)
http_parser.rb (0.5.3-java)
httparty (0.9.0)
multi_json (~> 1.0)
multi_xml
i18n (0.6.1)
insist (0.0.7)
jls-grok (0.10.7)
cabin (~> 0.4.0)
jls-lumberjack (0.0.2)
jruby-elasticsearch (0.0.14)
jruby-httpclient (1.1.0-java)
jruby-openssl (0.7.7)
bouncy-castle-java (>= 1.5.0146.1)
jruby-win32ole (0.8.5)
json (1.6.5-java)
launchy (2.0.3-java)
spoon (~> 0.0.1)
mail (2.4.4)
i18n (>= 0.4.0)
mime-types (~> 1.16)
treetop (~> 1.4.8)
metaclass (0.0.1)
method_source (0.8.1)
mime-types (1.19)
minitest (4.1.0)
mocha (0.12.7)
metaclass (~> 0.0.1)
mongo (1.7.0)
bson (~> 1.7.0)
mtrc (0.0.4)
multi_json (1.3.6)
multi_xml (0.5.1)
netrc (0.7.7)
nokogiri (1.5.5-java)
onstomp (1.0.7)
polyglot (0.3.3)
pry (0.9.10-java)
coderay (~> 1.0.5)
method_source (~> 0.8)
slop (~> 3.3.1)
spoon (~> 0.0)
rack (1.4.1)
rack-protection (1.2.0)
rack
redis (3.0.2)
rest-client (1.6.7)
mime-types (>= 1.16)
riak-client (1.0.3)
beefcake (~> 0.3.7)
builder (>= 2.1.2)
i18n (>= 0.4.0)
multi_json (~> 1.0)
riemann-client (0.0.6)
beefcake (>= 0.3.5)
mtrc (>= 0.0.4)
trollop (>= 1.16.2)
rspec (2.11.0)
rspec-core (~> 2.11.0)
rspec-expectations (~> 2.11.0)
rspec-mocks (~> 2.11.0)
rspec-core (2.11.1)
rspec-expectations (2.11.3)
diff-lcs (~> 1.1.3)
rspec-mocks (2.11.3)
rubyzip (0.9.9)
sass (3.2.1)
shoulda (3.3.2)
shoulda-context (~> 1.0.1)
shoulda-matchers (~> 1.4.1)
shoulda-context (1.0.1)
shoulda-matchers (1.4.1)
activesupport (>= 3.0.0)
sinatra (1.3.3)
rack (~> 1.3, >= 1.3.6)
rack-protection (~> 1.2)
tilt (~> 1.3, >= 1.3.3)
slop (3.3.3)
spoon (0.0.1)
statsd-ruby (0.3.0)
stud (0.0.6)
tilt (1.3.3)
treetop (1.4.11)
polyglot
polyglot (>= 0.3.1)
trollop (2.0)
uuidtools (2.1.3)
xml-simple (1.1.2)
xmpp4r (0.5)
PLATFORMS
java
DEPENDENCIES
insist (= 0.0.7)
logstash!
mocha
rspec
shoulda
spoon

View file

@ -17,10 +17,9 @@ JRUBYC=$(WITH_JRUBY) jrubyc
ELASTICSEARCH_URL=http://github.com/downloads/elasticsearch/elasticsearch
ELASTICSEARCH=vendor/jar/elasticsearch-$(ELASTICSEARCH_VERSION)
JODA=vendor/jar/joda-time-$(JODA_VERSION)/joda-time-$(JODA_VERSION).jar
GEOIP=vendor/geoip/GeoCityLite.dat
GEOIP_URL=http://geolite.maxmind.com/download/geoip/database/GeoLiteCity.dat.gz
GEOIP=vendor/geoip/GeoLiteCity.dat
GEOIP_URL=http://logstash.objects.dreamhost.com/maxmind/GeoLiteCity-2012-11-09.dat.gz
PLUGIN_FILES=$(shell git ls-files | egrep '^lib/logstash/(inputs|outputs|filters)/' | egrep -v '/(base|threadable).rb$$|/inputs/ganglia/')
GEM_HOME=build/gems
QUIET=@
WGET=$(shell which wget 2>/dev/null)
@ -39,15 +38,15 @@ default: jar
# Figure out if we're using wget or curl
.PHONY: wget-or-curl
wget-or-curl:
ifeq ($(WGET),)
ifeq ($(CURL),)
ifeq ($(WGET),)
@echo "wget or curl are required."
exit 1
else
DOWNLOAD_COMMAND=curl -L -k -o
DOWNLOAD_COMMAND=wget -q --no-check-certificate -O
endif
else
DOWNLOAD_COMMAND=wget --no-check-certificate -O
DOWNLOAD_COMMAND=curl -s -L -k -o
endif
# Compile config grammar (ragel -> ruby)
@ -79,8 +78,7 @@ copy-ruby-files: | build/ruby
| (cd lib; cpio -p --make-directories ../build/ruby)
$(QUIET)find ./test -name '*.rb' | sed -e 's,^\./test/,,' \
| (cd test; cpio -p --make-directories ../build/ruby)
$(QUIET)find ./spec -name '*.rb' | sed -e 's,^\./spec/,,' \
| (cd spec; cpio -p --make-directories ../build/ruby)
$(QUIET)rsync -av ./spec build/ruby
vendor:
$(QUIET)mkdir $@
@ -119,8 +117,7 @@ vendor/geoip: | vendor
$(QUIET)mkdir $@
$(GEOIP): | vendor/geoip
$(QUIET)wget -q -O - http://geolite.maxmind.com/download/geoip/database/GeoLiteCity.dat.gz \
| gzip -dc - > $@
$(QUIET)wget -q -O - $(GEOIP_URL) | gzip -dc - > $@
# Always run vendor/bundle
.PHONY: fix-bundler
@ -130,14 +127,11 @@ fix-bundler:
.PHONY: vendor-gems
vendor-gems: | vendor/bundle
$(GEM_HOME)/bin/bundle: | $(JRUBY)
@echo "=> Installing bundler ($@)"
$(QUIET)GEM_HOME=$(GEM_HOME) $(WITH_JRUBY) gem install bundler
.PHONY: vendor/bundle
vendor/bundle: | $(GEM_HOME)/bin/bundle fix-bundler
vendor/bundle: | vendor
@echo "=> Installing gems to $@..."
$(QUIET)GEM_HOME=$(GEM_HOME) $(JRUBY_CMD) --1.9 $(GEM_HOME)/bin/bundle install --deployment
#$(QUIET)GEM_HOME=$(GEM_HOME) $(JRUBY_CMD) --1.9 $(GEM_HOME)/bin/bundle install --deployment
$(QUIET)GEM_HOME=./vendor/bundle/jruby/1.9/ GEM_PATH= $(JRUBY_CMD) --1.9 ./gembag.rb logstash.gemspec
@# Purge any junk that fattens our jar without need!
@# The riak gem includes previous gems in the 'pkg' dir. :(
-rm -rf $@/jruby/1.9/gems/riak-client-1.0.3/pkg
@ -197,10 +191,31 @@ build/logstash-$(VERSION)-monolithic.jar: JAR_ARGS+=-C lib logstash/certs
build/logstash-$(VERSION)-monolithic.jar: JAR_ARGS+=-C lib logstash/web/views
build/logstash-$(VERSION)-monolithic.jar: JAR_ARGS+=patterns
build/logstash-$(VERSION)-monolithic.jar:
$(QUIET)rm -f $@
$(QUIET)jar cfe $@ logstash.runner $(JAR_ARGS)
$(QUIET)jar i $@
@echo "Created $@"
build/flatgems: | build vendor/bundle
mkdir $@
for i in $(VENDOR_DIR)/gems/*/lib; do \
rsync -av $$i/ $@/lib ; \
done
flatjar: build/logstash-$(VERSION)-flatjar.jar
build/jar: | build build/flatgems build/monolith
$(QUIET)mkdir build/jar
$(QUIET)rsync -av --delete build/flatgems/lib/ build/monolith/ build/ruby/ patterns build/jar/
$(QUIET)(cd lib; rsync -av --delete logstash/web/public ../build/jar/logstash/web/public)
$(QUIET)(cd lib; rsync -av --delete logstash/web/views ../build/jar/logstash/web/views)
$(QUIET)(cd lib; rsync -av --delete logstash/certs ../build/jar/logstash/certs)
build/logstash-$(VERSION)-flatjar.jar: | build/jar
$(QUIET)rm -f $@
$(QUIET)jar cfe $@ logstash.runner -C build/jar .
$(QUIET)jar i $@
@echo "Created $@"
update-jar: copy-ruby-files
$(QUIET)jar uf build/logstash-$(VERSION)-monolithic.jar -C build/ruby .

View file

@ -9,16 +9,22 @@ are pretty much free to use it however you want in whatever way.
For more info, see <http://logstash.net/>
Need help? Try #logstash on freenode irc or the logstash-users@googlegroups.com mailing list.
## Building
To work on the code without building a jar, install rvm and run the following:
rvm install 1.6.8
rvm use 1.6.8
export JRUBY_OPTS=--1.9
bundle install
bundle exec ruby bin/logstash agent [options]
# Install JRuby with rvm
rvm install jruby-1.7.0
rvm use jruby-1.7.0
jar releases are available here: <http://semicomplete.com/files/logstash/>
# Install logstash dependencies
ruby gembag.rb logstash.gemspec
# Run logstash
bin/logstash agent [options]
jar releases are available here: <http://logstash.objects.dreamhost.com/>
If you want to build the jar yourself, run:
make jar

View file

@ -18,13 +18,15 @@ Do this:
## Code Style
* comment everything you can think of.
* indentation: 2 spaces
* between methods: 1 line
* sort your requires
* long lines should wrap at 80 characters. If you wrap at an operator (or, +,
etc) start the next line with that operator.
* long lines should wrap at 80 characters. If you wrap at an operator ('or',
'+', etc) start the next line with that operator.
* parentheses on function definitions/calls
* explicit is better than implicit
* implicit returns are forbidden except in the case of a single expression
The point is consistency and documentation. If you see inconsistencies, let me
know, and I'll fix them :)
@ -55,6 +57,7 @@ Short example:
some_really_long_function_call_blah_blah_blah(arg1,
arg2, arg3, arg4)
# indent the 'when' inside a 'case'.
case foo
when "bar"
puts "Hello world"

View file

@ -1,13 +1,9 @@
#!/usr/bin/env ruby
#!/bin/sh
$: << File.dirname($0) + "/../lib"
if [ -d .git ] ; then
export GEM_HOME=./vendor/bundle/jruby/1.9
export GEM_PATH=
fi
require "rubygems"
require "logstash/runner"
# If the first argument is a flag, assume agent.
if ARGV[0] =~ /^-/
ARGV.unshift("agent")
end
LogStash::Runner.new.main(ARGV)
export RUBYLIB=./lib
ruby lib/logstash/runner.rb "$@"

View file

@ -6,7 +6,7 @@ layout: content_right
<h3> general </h3>
<ul>
<li> <a href="http://semicomplete.com/files/logstash/logstash-%VERSION%-monolithic.jar"> download logstash %VERSION% </a> </li>
<li> <a href="https://logstash.objects.dreamhost.com/release/logstash-%VERSION%-monolithic.jar"> download logstash %VERSION% </a> </li>
<li> <a href="flags"> command-line flags </a> </li>
<li> <a href="configuration"> configuration file overview </a> </li>
<li> <a href="extending"> writing your own plugins </a> </li>

View file

@ -16,10 +16,15 @@ presentation I gave at CarolinaCon 2011:
logstash, how you can use it, some alternatives, logging best practices,
parsing tools, etc. Video also below:
<!--
<embed src="http://blip.tv/play/gvE9grjcdQI" type="application/x-shockwave-flash" width="480" height="296" allowscriptaccess="always" allowfullscreen="true"></embed>
The slides are available online here: [slides](http://goo.gl/68c62). The slides
include speaker notes (click 'actions' then 'speaker notes').
-->
<iframe width="480" height="296" src="http://www.youtube.com/embed/RuUFnog29M4" frameborder="0" allowfullscreen></iframe>
The slides are available online here: [slides](http://semicomplete.com/presentations/logstash-puppetconf-2012/).
## Getting Help
@ -33,7 +38,7 @@ for such things, that works for me, too.)
## Download It
[Download logstash-%VERSION%](http://semicomplete.com/files/logstash/logstash-%VERSION%-monolithic.jar)
[Download logstash-%VERSION%](https://logstash.objects.dreamhost.com/release/logstash-%VERSION%-monolithic.jar)
## What's next?

View file

@ -15,6 +15,13 @@ while conveying to the end-user a set of expectations about that plugin. This
allows you to make more informed decisions about when and where to use the
functionality provided by the new plugin.
## Unsupported
This plugin is not supported. It should work, but if you have any problems with
it you are unlikely to find any help due to lack of experience in the community.
Additionally, this label may mean that a plugin has no active maintainer.
## Experimental
When a plugin is in the `experimental` state, it is essentially untested. This

View file

@ -83,7 +83,7 @@ _This assumes you have the `curl` command installed._
You should get back some output like so:
"logstash-2012.07.02" : {
           "index" : "logstash-2012.07.02"
"index" : "logstash-2012.07.02"
This means Logstash created a new index based on today's date. Likely your data is in there as well:

33
gembag.rb Normal file
View file

@ -0,0 +1,33 @@
#!/usr/bin/env ruby
require "rubygems/specification"
require "rubygems/commands/install_command"
gemspec = ARGV.shift
spec = Gem::Specification.load(gemspec)
deps = [spec.development_dependencies, spec.runtime_dependencies].flatten
# target for now
target = "vendor/bundle/jruby/1.9/"
deps.each do |dep|
#cmd = "gem install --install-dir #{target} #{dep.name} -v '#{dep.requirement}'"
installer = Gem::Commands::InstallCommand.new
installer.options[:generate_rdoc] = false
installer.options[:generate_ri] = false
installer.options[:version] = dep.requirement
installer.options[:args] = [dep.name]
installer.options[:install_dir] = target
begin
installer.execute
rescue Gem::SystemExitException => e
if e.exit_code != 0
puts "Installation of #{dep.to_s} failed"
raise
end
end
end

View file

@ -172,6 +172,8 @@ module LogStash::Config::Mixin
def validate_plugin_status
docmsg = "For more information about plugin statuses, see http://logstash.net/docs/#{LOGSTASH_VERSION}/plugin-status "
case @plugin_status
when "unsupported"
@logger.warn("Using unsupported plugin '#{@config_name}'. This plugin isn't well supported by the community and likely has no maintainer. #{docmsg}")
when "experimental"
@logger.warn("Using experimental plugin '#{@config_name}'. This plugin is untested and may change in the future. #{docmsg}")
when "beta"

View file

@ -40,6 +40,11 @@ class LogStash::Event
@cancelled = true
end # def cancel
public
def uncancel
@cancelled = false
end # def uncancel
public
def cancelled?
return @cancelled

View file

@ -227,6 +227,7 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
def flush
events = []
@pending.each do |key, value|
value.uncancel
events << value
end
@pending.clear

View file

@ -51,8 +51,8 @@ class LogStash::Filters::ZeroMQ < LogStash::Filters::Base
config :sockopt, :validate => :hash
public
def initialize
super
def initialize(params)
super(params)
@threadsafe = false
end

View file

@ -3,6 +3,11 @@ require "logstash/namespace"
# Pull events from an AMQP exchange.
#
# <b> NOTE: THIS IS ONLY KNOWN TO WORK WITH RECENT RELEASES OF RABBITMQ. Any
# other amqp broker will not work with this plugin. I do not know why. If you
# need support for brokers other than rabbitmq, please file bugs here:
# <https://github.com/ruby-amqp/bunny> </b>
#
# AMQP is a messaging system. It requires you to run an AMQP server or 'broker'
# Examples of AMQP servers are [RabbitMQ](http://www.rabbitmq.com/) and
# [QPid](http://qpid.apache.org/)
@ -12,7 +17,7 @@ require "logstash/namespace"
class LogStash::Inputs::Amqp < LogStash::Inputs::Threadable
config_name "amqp"
plugin_status "beta"
plugin_status "unsupported"
# Your amqp broker's custom arguments. For mirrored queues in RabbitMQ: [ "x-ha-policy", "all" ]
config :arguments, :validate => :array, :default => []

View file

@ -95,6 +95,7 @@ class LogStash::Inputs::Base < LogStash::Plugin
when "json_event"
begin
event = LogStash::Event.from_json(raw)
event.tags += @tags
if @message_format
event.message ||= event.sprintf(@message_format)
end

View file

@ -126,6 +126,8 @@ class LogStash::Inputs::File < LogStash::Inputs::Base
hostname = Socket.gethostname
@tail.subscribe do |path, line|
path = path
line = line.force_encoding("UTF-8")
source = Addressable::URI.new(:scheme => "file", :host => hostname, :path => path).to_s
@logger.debug("Received line", :path => path, :line => line)
e = to_event(line, source)

View file

@ -0,0 +1,227 @@
require "logstash/inputs/threadable"
require "logstash/namespace"
# Push events to a GemFire region.
#
# GemFire is an object database.
#
# To use this plugin you need to add gemfire.jar to your CLASSPATH.
# Using format=json requires jackson.jar too; use of continuous
# queries requires antlr.jar.
#
# Note: this plugin has only been tested with GemFire 7.0.
#
class LogStash::Inputs::Gemfire < LogStash::Inputs::Threadable
config_name "gemfire"
plugin_status "experimental"
# Your client cache name
config :name, :validate => :string, :default => "logstash"
# The path to a GemFire client cache XML file.
#
# Example:
#
# <client-cache>
# <pool name="client-pool" subscription-enabled="true" subscription-redundancy="1">
# <locator host="localhost" port="31331"/>
# </pool>
# <region name="Logstash">
# <region-attributes refid="CACHING_PROXY" pool-name="client-pool" >
# </region-attributes>
# </region>
# </client-cache>
#
config :cache_xml_file, :validate => :string, :default => nil
# The region name
config :region_name, :validate => :string, :default => "Logstash"
# A regexp to use when registering interest for cache events.
# Ignored if a :query is specified.
config :interest_regexp, :validate => :string, :default => ".*"
# A query to run as a GemFire "continuous query"; if specified it takes
# precedence over :interest_regexp which will be ignore.
#
# Important: use of continuous queries requires subscriptions to be enabled on the client pool.
config :query, :validate => :string, :default => nil
# How the message is serialized in the cache. Can be one of "json" or "plain"; default is plain
config :serialization, :validate => :string, :default => nil
public
def initialize(params)
super
@format ||= "plain"
end # def initialize
public
def register
import com.gemstone.gemfire.cache.AttributesMutator
import com.gemstone.gemfire.cache.InterestResultPolicy
import com.gemstone.gemfire.cache.client.ClientCacheFactory
import com.gemstone.gemfire.cache.client.ClientRegionShortcut
import com.gemstone.gemfire.cache.query.CqQuery
import com.gemstone.gemfire.cache.query.CqAttributes
import com.gemstone.gemfire.cache.query.CqAttributesFactory
import com.gemstone.gemfire.cache.query.QueryService
import com.gemstone.gemfire.cache.query.SelectResults
import com.gemstone.gemfire.pdx.JSONFormatter
@logger.info("Registering input", :plugin => self)
end # def register
def run(queue)
return if terminating?
connect
@logstash_queue = queue
if @query
continuous_query(@query)
else
register_interest(@interest_regexp)
end
end # def run
def teardown
@cache.close if @cache
@cache = nil
finished
end # def teardown
protected
def connect
begin
@logger.debug("Connecting to GemFire #{@name}")
@cache = ClientCacheFactory.new.
set("name", @name).
set("cache-xml-file", @cache_xml_file).create
@logger.debug("Created cache #{@cache.inspect}")
rescue => e
if terminating?
return
else
@logger.error("Gemfire connection error (during connect), will reconnect",
:exception => e, :backtrace => e.backtrace)
sleep(1)
retry
end
end
@region = @cache.getRegion(@region_name);
@logger.debug("Created region #{@region.inspect}")
end # def connect
protected
def continuous_query(query)
qs = @cache.getQueryService
cqAf = CqAttributesFactory.new
cqAf.addCqListener(self)
cqa = cqAf.create
@logger.debug("Running continuous query #{query}")
cq = qs.newCq("logstashCQ" + self.object_id.to_s, query, cqa)
cq.executeWithInitialResults
end
def register_interest(interest)
@region.getAttributesMutator.addCacheListener(self)
@region.registerInterestRegex(interest, InterestResultPolicy::NONE, false, true)
end
def deserialize_message(message)
if @serialization == "json"
message ? JSONFormatter.toJSON(message) : "{}"
else
message
end
end
def process_event(event, event_name, source)
message = deserialize_message(event)
e = to_event(message, source)
if e
@logstash_queue << e
end
end
# multiple interfaces
def close
end
#
# CqListener interface
#
def onEvent(event)
key = event.getKey
newValue = event.getNewValue
@logger.debug("onEvent #{event.getQueryOperation} #{key} #{newValue}")
process_event(event.getNewValue, "onEvent", "gemfire://query/#{key}/#{event.getQueryOperation}")
end
def onError(event)
@logger.debug("onError #{event}")
end
#
# CacheListener interface
#
protected
def afterCreate(event)
regionName = event.getRegion.getName
key = event.getKey
newValue = event.getNewValue
@logger.debug("afterCreate #{regionName} #{key} #{newValue}")
process_event(event.getNewValue, "afterCreate", "gemfire://#{regionName}/#{key}/afterCreate")
end
def afterDestroy(event)
regionName = event.getRegion.getName
key = event.getKey
newValue = event.getNewValue
@logger.debug("afterDestroy #{regionName} #{key} #{newValue}")
process_event(nil, "afterDestroy", "gemfire://#{regionName}/#{key}/afterDestroy")
end
def afterUpdate(event)
regionName = event.getRegion.getName
key = event.getKey
oldValue = event.getOldValue
newValue = event.getNewValue
@logger.debug("afterUpdate #{regionName} #{key} #{oldValue} -> #{newValue}")
process_event(event.getNewValue, "afterUpdate", "gemfire://#{regionName}/#{key}/afterUpdate")
end
def afterRegionLive(event)
@logger.debug("afterRegionLive #{event}")
end
def afterRegionCreate(event)
@logger.debug("afterRegionCreate #{event}")
end
def afterRegionClear(event)
@logger.debug("afterRegionClear #{event}")
end
def afterRegionDestroy(event)
@logger.debug("afterRegionDestroy #{event}")
end
def afterRegionInvalidate(event)
@logger.debug("afterRegionInvalidate #{event}")
end
end # class LogStash::Inputs::Amqp

View file

@ -44,7 +44,9 @@ class LogStash::Inputs::Lumberjack < LogStash::Inputs::Base
event = to_event(l.delete("line"), source)
# take any remaining fields in the lumberjack event and merge it as a
# field in the logstash event.
event.fields.merge!(l)
l.each do |key, value|
event[key] = value
end
output_queue << event
end
end # def run

View file

@ -48,7 +48,7 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base
def udp_listener(output_queue)
@logger.info("Starting UDP listener", :address => "#{@host}:#{@port}")
if @udp
if @udp && ! @udp.closed?
@udp.close
end

View file

@ -65,6 +65,7 @@ class LogStash::Inputs::Xmpp < LogStash::Inputs::Base
queue << e
end
end # @client.add_message_callback
sleep
end # def run
end # def class LogStash:Inputs::Xmpp

View file

@ -32,7 +32,7 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base
# you can change the 'mode' setting
# TODO (lusis) add req/rep MAYBE
# TODO (lusis) add router/dealer
config :topology, :validate => ["pushpull", "pubsub", "pair"]
config :topology, :validate => ["pushpull", "pubsub", "pair"], :required => true
# 0mq topic
# This is used for the 'pubsub' topology only

View file

@ -3,6 +3,11 @@ require "logstash/namespace"
# Push events to an AMQP exchange.
#
# <b> NOTE: THIS IS ONLY KNOWN TO WORK WITH RECENT RELEASES OF RABBITMQ. Any
# other amqp broker will not work with this plugin. I do not know why. If you
# need support for brokers other than rabbitmq, please file bugs here:
# <https://github.com/ruby-amqp/bunny> </b>
#
# AMQP is a messaging system. It requires you to run an AMQP server or 'broker'
# Examples of AMQP servers are [RabbitMQ](http://www.rabbitmq.com/) and
# [QPid](http://qpid.apache.org/)
@ -10,7 +15,7 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
MQTYPES = [ "fanout", "direct", "topic" ]
config_name "amqp"
plugin_status "beta"
plugin_status "unsupported"
# Your amqp server address
config :host, :validate => :string, :required => true

View file

@ -82,19 +82,14 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
@logger.setup_log4j
if @embedded
# Check for settings that are incompatible with @embedded
%w(host).each do |name|
if instance_variable_get("@#{name}")
@logger.error("outputs/elasticsearch: You cannot specify " \
"'embedded => true' and also set '#{name}'")
raise "Invalid configuration detected. Please fix."
end
end
# Default @host with embedded to localhost. This should help avoid
# newbies tripping on ubuntu and other distros that have a default
# firewall that blocks multicast.
@host ||= "localhost"
# Start elasticsearch local.
start_local_elasticsearch
end
require "jruby-elasticsearch"
@logger.info("New ElasticSearch output", :cluster => @cluster,
@ -167,10 +162,15 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
#timer.stop
decrement_inflight_request_count
end.on(:failure) do |exception|
@logger.warn("Failed to index an event", :exception => exception,
@logger.warn("Failed to index an event, will retry", :exception => exception,
:event => event.to_hash)
#timer.stop
decrement_inflight_request_count
# Failed to index, try again after a short sleep (incase our hammering is
# the problem).
sleep(0.200)
receive(event)
end
# Execute this request asynchronously.

View file

@ -0,0 +1,102 @@
require "logstash/outputs/base"
require "logstash/namespace"
# Push events to a GemFire region.
#
# GemFire is an object database.
#
# To use this plugin you need to add gemfire.jar to your CLASSPATH;
# using format=json requires jackson.jar too.
#
# Note: this plugin has only been tested with GemFire 7.0.
#
class LogStash::Outputs::Gemfire < LogStash::Outputs::Base
config_name "gemfire"
plugin_status "experimental"
# Your client cache name
config :name, :validate => :string, :default => "logstash"
# The path to a GemFire client cache XML file.
#
# Example:
#
# <client-cache>
# <pool name="client-pool">
# <locator host="localhost" port="31331"/>
# </pool>
# <region name="Logstash">
# <region-attributes refid="CACHING_PROXY" pool-name="client-pool" >
# </region-attributes>
# </region>
# </client-cache>
#
config :cache_xml_file, :validate => :string, :default => nil
# The region name
config :region_name, :validate => :string, :default => "Logstash"
# A sprintf format to use when building keys
config :key_format, :validate => :string, :default => "%{@source}-%{@timestamp}"
public
def register
import com.gemstone.gemfire.cache.client.ClientCacheFactory
import com.gemstone.gemfire.pdx.JSONFormatter
@logger.info("Registering output", :plugin => self)
connect
end # def register
public
def connect
begin
@logger.debug("Connecting to GemFire #{@name}")
@cache = ClientCacheFactory.new.
set("name", @name).
set("cache-xml-file", @cache_xml_file).create
@logger.debug("Created cache #{@cache.inspect}")
rescue => e
if terminating?
return
else
@logger.error("Gemfire connection error (during connect), will reconnect",
:exception => e, :backtrace => e.backtrace)
sleep(1)
retry
end
end
@region = @cache.getRegion(@region_name);
@logger.debug("Created region #{@region.inspect}")
end # def connect
public
def receive(event)
return unless output?(event)
@logger.debug("Sending event", :destination => to_s, :event => event)
key = event.sprintf @key_format
message = JSONFormatter.fromJSON(event.to_json)
@logger.debug("Publishing message", { :destination => to_s, :message => message, :key => key })
@region.put(key, message)
end # def receive
public
def to_s
return "gemfire://#{name}"
end
public
def teardown
@cache.close if @cache
@cache = nil
finished
end # def teardown
end # class LogStash::Outputs::Gemfire

View file

@ -0,0 +1,50 @@
class LogStash::Outputs::Lumberjack < LogStash::Outputs::Base
config_name "lumberjack"
plugin_status "experimental"
# list of addresses lumberjack can send to
config :hosts, :validate => :array, :required => true
# the port to connect to
config :port, :validate => :number, :required => true
# ssl certificate to use
config :ssl_certificate, :validate => :string, :required => true
# window size
config :window_size, :validate => :number, :default => 5000
public
def register
require 'lumberjack/client'
connect
end # def register
public
def receive(event)
return unless output?(event)
begin
@client.write("line" => event.message, "host" => event.source_host, "file" => event.source_path)
rescue Exception => e
@logger.log("Client write error", :e => e, :backtrace => e.backtrace)
connect
retry
end
end # def receive
private
def connect
@logger.info("Connecting to lumberjack server.", :addresses => @hosts, :port => @port,
:ssl_certificate => @ssl_certificate, :window_size => @window_size)
begin
@client = Lumberjack::Client.new(:addresses => @hosts, :port => @port,
:ssl_certificate => @ssl_certificate, :window_size => @window_size)
rescue Exception => e
@logger.error("All hosts unavailable, sleeping", :hosts => @hosts, :e => e,
:backtrace => e.backtrace, :host => @client.host)
sleep(10)
retry
end
end
end

View file

@ -85,10 +85,11 @@ class LogStash::Outputs::NagiosNsca < LogStash::Outputs::Base
msg.gsub!("'", "&#146;")
status = event.sprintf(@nagios_status)
if status.to_i != status
if status.to_i.to_s != status # Check it round-trips to int correctly
msg = "status '#{status}' is not numeric"
status = 2
else
status = status.to_i
if status > 3 || status < 0
msg "status must be > 0 and <= 3, not #{status}"
status = 2

View file

@ -31,7 +31,7 @@ class LogStash::Outputs::ZeroMQ < LogStash::Outputs::Base
# you can change the 'mode' setting
# TODO (lusis) add req/rep MAYBE
# TODO (lusis) add router/dealer
config :topology, :validate => ["pushpull", "pubsub", "pair"]
config :topology, :validate => ["pushpull", "pubsub", "pair"], :required => true
# 0mq topic
# This is used for the 'pubsub' topology only

View file

@ -1,3 +1,16 @@
# Monkeypatch for JRUBY-6970
module Kernel
alias_method :require_JRUBY_6970_hack, :require
def require(path)
if path =~ /^jar:file:.+!.+/
path = path.gsub(/^jar:/, "")
puts "JRUBY-6970: require(#{path})" if ENV["REQUIRE_DEBUG"] == "1"
end
return require_JRUBY_6970_hack(path)
end
end
require "rubygems"
require "logstash/namespace"
require "logstash/program"
@ -74,6 +87,36 @@ class LogStash::Runner
@runners << test
return test.run(args)
end,
"rspec" => lambda do
require "rspec/core/runner"
require "rspec"
fixedargs = args.collect do |arg|
# if the arg ends in .rb or has a "/" in it, assume it's a path.
if arg =~ /\.rb$/ || arg =~ /\//
# check if it's a file, if not, try inside the jar if we are in it.
if !File.exists?(arg) && __FILE__ =~ /file:.*\.jar!\//
# Try inside the jar.
jar_root = __FILE__.gsub(/!.*/,"!")
newpath = File.join(jar_root, args.first)
if File.exists?(newpath)
# Add the 'spec' dir to the load path so specs can run
specpath = File.join(jar_root, "spec")
$: << specpath unless $:.include?(specpath)
newpath
else
arg
end
else
arg
end
else
arg
end
end # args.collect
RSpec::Core::Runner.run(fixedargs)
return []
end,
"irb" => lambda do
require "irb"
return IRB.start(__FILE__)

View file

@ -1,5 +1,5 @@
# The version of logstash.
LOGSTASH_VERSION = "1.1.4"
LOGSTASH_VERSION = "1.1.6.dev"
# Note to authors: this should not include dashes because 'gem' barfs if
# you include a dash in the version string.

View file

@ -7,6 +7,7 @@ Gem::Specification.new do |gem|
gem.description = %q{Library that contains the classes required to create LogStash events}
gem.summary = %q{Library that contains the classes required to create LogStash events}
gem.homepage = "https://github.com/logstash/logstash"
gem.license = "Apache License (2.0)"
gem.files = %w{
lib/logstash-event.rb

View file

@ -17,14 +17,14 @@ Gem::Specification.new do |gem|
gem.version = LOGSTASH_VERSION
# Core dependencies
gem.add_runtime_dependency "cabin", ["0.4.4"]
gem.add_runtime_dependency "cabin", ["0.5.0"]
gem.add_runtime_dependency "json"
gem.add_runtime_dependency "minitest" # for running the tests from the jar
gem.add_runtime_dependency "pry"
gem.add_runtime_dependency "stud"
# Web dependencies
gem.add_runtime_dependency "ftw", ["~> 0.0.22"]
gem.add_runtime_dependency "ftw", ["~> 0.0.23"]
gem.add_runtime_dependency "haml"
gem.add_runtime_dependency "rack"
gem.add_runtime_dependency "sass"
@ -34,7 +34,7 @@ Gem::Specification.new do |gem|
#TODO Can these be optional?
gem.add_runtime_dependency "aws-sdk"
gem.add_runtime_dependency "heroku"
gem.add_runtime_dependency "addressable", ["2.2.6"]
gem.add_runtime_dependency "addressable", ["~> 2.2.6"]
gem.add_runtime_dependency "bunny"
gem.add_runtime_dependency "ffi"
gem.add_runtime_dependency "ffi-rzmq", ["0.9.3"]
@ -42,7 +42,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "gelfd", ["0.2.0"]
gem.add_runtime_dependency "gelf", ["1.3.2"]
gem.add_runtime_dependency "gmetric", ["0.1.3"]
gem.add_runtime_dependency "jls-grok", ["0.10.7"]
gem.add_runtime_dependency "jls-grok", ["0.10.8"]
gem.add_runtime_dependency "mail"
gem.add_runtime_dependency "mongo"
gem.add_runtime_dependency "onstomp"
@ -53,7 +53,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "uuidtools" # For generating amqp queue names
gem.add_runtime_dependency "xml-simple"
gem.add_runtime_dependency "xmpp4r", ["0.5"]
gem.add_runtime_dependency "jls-lumberjack"
gem.add_runtime_dependency "jls-lumberjack", ["0.0.4"]
gem.add_runtime_dependency "geoip", [">= 1.1.0"]
gem.add_runtime_dependency "beefcake", "0.3.7"
@ -71,9 +71,10 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "cinch" # cinch requires 1.9.1+
end
gem.add_development_dependency "spoon"
gem.add_development_dependency "mocha"
gem.add_development_dependency "shoulda"
gem.add_development_dependency "rspec"
gem.add_development_dependency "insist", "0.0.7"
# These are runtime-deps so you can do 'java -jar logstash.jar rspec <test>'
gem.add_runtime_dependency "spoon"
gem.add_runtime_dependency "mocha"
gem.add_runtime_dependency "shoulda"
gem.add_runtime_dependency "rspec"
gem.add_runtime_dependency "insist", "0.0.8"
end

View file

@ -1,2 +1,2 @@
# NetScreen firewall logs
NETSCREENSESSIONLOG %{SYSLOGDATE:date} %{IPORHOST:device} %{IPORHOST}: NetScreen device_id=%{WORD:device_id}%{DATA}: start_time=%{QUOTEDSTRING:start_time} duration=%{INT:duration} policy_id=%{INT:policy_id} service=%{DATA:service} proto=%{INT:proto} src zone=%{WORD:src_zone} dst zone=%{WORD:dst_zone} action=%{WORD:action} sent=%{INT:sent} rcvd=%{INT:rcvd} src=%{IPORHOST:src_ip} dst=%{IPORHOST:dst_ip} src_port=%{INT:src_port} dst_port=%{INT:dst_port} src-xlated ip=%{IPORHOST:src_xlated_ip} port=%{INT:src_xlated_port} dst-xlated ip=%{IPORHOST:dst_xlated_ip} port=%{INT:dst_xlated_port} session_id=%{INT:session_id} reason=%{GREEDYDATA:reason}
NETSCREENSESSIONLOG %{SYSLOGTIMESTAMP:date} %{IPORHOST:device} %{IPORHOST}: NetScreen device_id=%{WORD:device_id}%{DATA}: start_time=%{QUOTEDSTRING:start_time} duration=%{INT:duration} policy_id=%{INT:policy_id} service=%{DATA:service} proto=%{INT:proto} src zone=%{WORD:src_zone} dst zone=%{WORD:dst_zone} action=%{WORD:action} sent=%{INT:sent} rcvd=%{INT:rcvd} src=%{IPORHOST:src_ip} dst=%{IPORHOST:dst_ip} src_port=%{INT:src_port} dst_port=%{INT:dst_port} src-xlated ip=%{IPORHOST:src_xlated_ip} port=%{INT:src_xlated_port} dst-xlated ip=%{IPORHOST:dst_xlated_ip} port=%{INT:dst_xlated_port} session_id=%{INT:session_id} reason=%{GREEDYDATA:reason}

View file

@ -30,12 +30,12 @@ HOSTPORT (?:%{IPORHOST=~/\./}:%{POSINT})
# paths
PATH (?:%{UNIXPATH}|%{WINPATH})
UNIXPATH (?:/(?:[\w_%!$@:.,-]+|\\.)*)+
UNIXPATH (?>/(?>[\w_%!$@:.,-]+|\\.)*)+
#UNIXPATH (?<![\w\/])(?:/[^\/\s?*]*)+
LINUXTTY (?:/dev/pts/%{NONNEGINT})
BSDTTY (?:/dev/tty[pq][a-z0-9])
LINUXTTY (?>/dev/pts/%{NONNEGINT})
BSDTTY (?>/dev/tty[pq][a-z0-9])
TTY (?:%{BSDTTY}|%{LINUXTTY})
WINPATH (?:[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
URIPROTO [A-Za-z]+(\+[A-Za-z+]+)?
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox

91
pl.rb Normal file
View file

@ -0,0 +1,91 @@
# pipeline tests
$: << "lib"
require "logstash/config/file"
config = LogStash::Config::File.new(nil, ARGV[0])
agent = LogStash::Agent.new
inputs, filters, outputs = agent.instance_eval { parse_config(config) }
inputs.collect(&:register)
filters.collect(&:register)
outputs.collect(&:register)
i2f = SizedQueue.new(16)
f2o = SizedQueue.new(16)
i2f = f2o if filters.empty?
input_threads = inputs.collect do |i|
t = Thread.new do
begin
i.run(i2f)
rescue => e
puts :input => i.class, :exception => e
end
end
t[:name] = i.class
t
end
#input_supervisor_thread = Thread.new do
#while true
#input_threads.collect(&:join)
#i2f << :shutdown
#end
#end
filter_thread = Thread.new(filters) do |filters|
if filters.any?
event = i2f.pop
filters.each do |filter|
filter.filter(event)
end
f2o << event
end
end
filter_thread[:name] = "filterworker"
output_thread = Thread.new do
begin
while true
event = f2o.pop
outputs.each do |output|
output.receive(event)
end
end
rescue => e
puts :output_thread => e
end
end
output_thread[:name] = "outputworker"
def twait(thread)
begin
puts :waiting => thread[:name]
thread.join
puts :donewaiting => thread[:name]
rescue => e
puts thread => e
end
end
def shutdown(input, filter, output)
input.each do |i|
i.raise("SHUTDOWN")
twait(i)
end
#filter.raise("SHUTDOWN")
#twait(filter)
output.raise("SHUTDOWN")
twait(output)
end
trap("INT") do
puts "SIGINT"; shutdown(input_threads, filter_thread, output_thread)
exit 1
end
#[*input_threads, filter_thread, output_thread].collect(&:join)
sleep 30

118
pl2.rb Normal file
View file

@ -0,0 +1,118 @@
$: << "lib"
require "logstash/config/file"
class Pipeline
class ShutdownSignal; end
def initialize(configstr)
# hacks for now to parse a config string
config = LogStash::Config::File.new(nil, configstr)
agent = LogStash::Agent.new
@inputs, @filters, @outputs = agent.instance_eval { parse_config(config) }
@inputs.collect(&:register)
@filters.collect(&:register)
@outputs.collect(&:register)
@input_to_filter = SizedQueue(16)
@filter_to_output = SizedQueue(16)
# If no filters, pipe inputs to outputs
if @filters.empty?
input_to_filter = filter_to_output
end
end
def run
# one thread per input
@input_threads = @inputs.collect do |input|
Thread.new(input) do |input|
inputworker(input)
end
end
# one filterworker thread
#@filter_threads = @filters.collect do |input
# TODO(sissel): THIS IS WHERE I STOPPED WORKING
# one outputworker thread
# Now monitor input threads state
# if all inputs are terminated, send shutdown signal to @input_to_filter
end
def inputworker(plugin)
begin
plugin.run(@input_to_filter)
rescue ShutdownSignal
plugin.teardown
rescue => e
@logger.error("Exception in plugin #{plugin.class}, restarting plugin.",
"plugin" => plugin.inspect, "exception" => e)
plugin.teardown
retry
end
end # def
def filterworker
begin
while true
event << @input_to_filter
break if event == :shutdown
@filters.each do |filter|
filter.filter(event)
end
next if event.cancelled?
@filter_to_output << event
end
rescue => e
@logger.error("Exception in plugin #{plugin.class}",
"plugin" => plugin.inspect, "exception" => e)
end
@filters.each(&:teardown)
end # def filterworker
def outputworker
begin
while true
event << @filter_to_output
break if event == :shutdown
@outputs.each do |output|
output.receive(event)
end
end
rescue => e
@logger.error("Exception in plugin #{plugin.class}",
"plugin" => plugin.inspect, "exception" => e)
end
@outputs.each(&:teardown)
end # def filterworker
end # class Pipeline
def twait(thread)
begin
puts :waiting => thread[:name]
thread.join
puts :donewaiting => thread[:name]
rescue => e
puts thread => e
end
end
def shutdown(input, filter, output)
input.each do |i|
i.raise("SHUTDOWN")
end
#filter.raise("SHUTDOWN")
#twait(filter)
output.raise("SHUTDOWN")
twait(output)
end
trap("INT") do
puts "SIGINT"; shutdown(input_threads, filter_thread, output_thread)
exit 1
end

View file

@ -7,7 +7,7 @@ describe LogStash::Filters::Date do
describe "performance test of java syntax parsing" do
event_count = 100000
min_rate = 10000
min_rate = 5000
max_duration = event_count / min_rate
input = "Nov 24 01:29:01 -0800"

View file

@ -0,0 +1,27 @@
require "test_utils"
require "grok-pure"
require "timeout"
describe "grok known timeout failures" do
extend LogStash::RSpec
describe "user reported timeout" do
config <<-'CONFIG'
filter {
grok {
match => [ "@message", "%{SYSLOGBASE:ts1} \[\#\|%{TIMESTAMP_ISO8601:ts2}\|%{DATA} for %{PATH:url} = %{POSINT:delay} ms.%{GREEDYDATA}" ]
}
}
CONFIG
start = Time.now
line = 'Nov 13 19:23:34 qa-api1 glassfish: [#|2012-11-13T19:23:25.604+0000|INFO|glassfish3.1.2|com.locusenergy.platform.messages.LocusMessage|_ThreadID=59;_ThreadName=Thread-2;|API TIMER - Cache HIT user: null for /kiosks/194/energyreadings/data?tz=America/New_York&fields=kwh&type=gen&end=2012-11-13T23:59:59&start=2010-12-16T00:00:00-05:00&gran=yearly = 5 ms.|#]'
sample line do
duration = Time.now - start
insist { duration } < 0.03
end
end
end
__END__

16
spec/jar.rb Normal file
View file

@ -0,0 +1,16 @@
require "insist"
describe "logstash jar features" do
before :each do
@jar_root = __FILE__.split("!").first + "!"
end
it "must be only run from a jar" do
insist { __FILE__ } =~ /file:.*!/
end
it "must contain GeoLiteCity.dat" do
path = File.join(@jar_root, "GeoLiteCity.dat")
insist { File }.exists?(path)
end
end

View file

@ -0,0 +1,66 @@
# This spec covers the question here:
# https://groups.google.com/forum/?fromgroups=#!topic/logstash-users/Ec8ISgamIfo
require "test_utils"
describe "https://groups.google.com/forum/?fromgroups=#!topic/logstash-users/Ec8ISgamIfo" do
extend LogStash::RSpec
config <<-'CONFIG'
filter {
multiline {
type => "java-log"
pattern => "^20"
negate => "true"
what => "previous"
}
grok {
type => "java-log"
tags => [ "dev", "console", "multiline" ]
singles => true
add_tag => "mytag"
match => [ "@message", "^%{DATESTAMP:log_time}%{SPACE}\[%{PROG:thread}\]%{SPACE}%{LOGLEVEL:log_level}%{SPACE}%{WORD:class_name}%{GREEDYDATA}"]
}
}
CONFIG
type "java-log"
tags "dev", "console"
line1 = '2012-11-13 13:55:37,706 [appname.connector.http.mule.default.receiver.14] INFO LoggerMessageProcessor - SRC message is <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:icc="http://researchnow.com/schema/icc"'
line2 = "hello world"
sample line1 do
insist { subject.tags }.include?("dev")
insist { subject.tags }.include?("console")
# This is not a multiline event, so it won't get tagged as multiline
reject { subject.tags }.include?("multiline")
# Since this event doesn't have the 'multiline' tag, grok will not act on
# it, so it should not have the 'mytag' tag given in the grok filter's
# add_tag setting.
reject { subject.tags }.include?("mytag")
end
# Try with a proper multiline event
sample [ line1, line2 ] do
insist { subject.count } == 1
event = subject.first # get the first event.
insist { event.tags }.include?("dev")
insist { event.tags }.include?("console")
insist { event.tags }.include?("multiline")
# grok shouldn't fail.
reject { event.tags }.include?("_grokparsefailure")
# Verify grok is working and pulling out certain fields
insist { event.tags }.include?("mytag")
insist { event["log_time"] } == "2012-11-13 13:55:37,706"
insist { event["thread"] } == "appname.connector.http.mule.default.receiver.14"
insist { event["log_level"] } == "INFO"
insist { event["class_name"] } == "LoggerMessageProcessor"
end
end

View file

@ -18,12 +18,24 @@ module LogStash
@config_str = configstr
end # def config
def type(default_type)
@default_type = default_type
end
def tags(*tags)
@default_tags = tags
puts "Setting default tags: #{@default_tags}"
end
def sample(event, &block)
default_type = @default_type || "default"
default_tags = @default_tags || []
require "logstash/config/file"
config = LogStash::Config::File.new(nil, @config_str)
agent = LogStash::Agent.new
@inputs, @filters, @outputs = agent.instance_eval { parse_config(config) }
[@inputs, @filters, @outputs].flatten.each do |plugin|
plugin.logger = Cabin::Channel.get
plugin.register
end
@ -38,7 +50,8 @@ module LogStash
event = [event] unless event.is_a?(Array)
event = event.collect do |e|
if e.is_a?(String)
LogStash::Event.new("@message" => e)
LogStash::Event.new("@message" => e, "@type" => default_type,
"@tags" => default_tags)
else
LogStash::Event.new(e)
end
@ -53,13 +66,25 @@ module LogStash
results << e unless e.cancelled?
end
filters.select { |f| f.respond_to?(:flush) }.each do |filter|
event = filter.flush
results += event if event
# do any flushing.
filters.each_with_index do |filter, i|
if filter.respond_to?(:flush)
# get any event from flushing
list = filter.flush
if list
list.each do |e|
filters[i+1 .. -1].each do |f|
f.filter(e)
end
results << e unless e.cancelled?
end
end # if list
end # filter.respond_to?(:flush)
end # filters.each_with_index
@results = results
end
end # before :all
if multiple_events
subject { @results }
else