Merge branch 'master' of git://github.com/logstash/logstash

This commit is contained in:
Louis Zuckerman 2012-12-27 23:18:31 -05:00
commit adc25a29ee
57 changed files with 1931 additions and 326 deletions

View file

@ -2,9 +2,9 @@
## Overview of this release:
- grok now captures (?<somename>...) regexp into 'somename' field
- new 'charset' feature for inputs (for improved UTF-8 conversion support)
- TODO TODO TODO new faster start-time release jars are available, see the 'flatjar' download
option. This flatjar thing may have bugs, so both flatjar and monolithic are
available.
- TODO TODO TODO new faster start-time release jars are available, see the
'flatjar' download option. This flatjar thing may have bugs, so both flatjar
and monolithic are available.
## general
- fixed internal dependency versioning on 'addressable' gem (LOGSTASH-694)
@ -12,13 +12,19 @@
## inputs
- All inputs now have a 'charset' setting to help you inform logstash of the
text encoding of the input. This is useful if you have Shift_JIS or CP1252
text encoding of the input. This is useful if you have Shift_JIS or CP1251
encoded log files. This should help resolve the many UTF-8 bugs that were
reported recently.
- bugfix: zeromq: 'topology' is now a required setting
- lumberjack: jls-lumberjack gem updated to 0.0.7
- misc: lumberjack: jls-lumberjack gem updated to 0.0.7
- bugfix: stomp: fix startup problems causing early termination (#226
## filters
- new: anonymize: supports many hash mechanisms (murmur3, sha, md5, etc) as
well as IP address anonymization (#280, #261; patches by Richard Pijnenburg
and Avishai Ish-Shalom)
- filter: date: now accepts 'match' as a setting. Use of this is preferable
to the old syntax.
- improvement: grok: now accepts (?<foo>...) named captures. This lets you
compose a pattern in the grok config without needing to define it in a
patterns file. Example: (?<hostport>%{HOST}:%{POSINT}) to capture 'hostport'
@ -31,8 +37,13 @@
matched. (LOGSTASH-705)
- improvement: kv: Adds field_split, value_split, prefix, and container
settings. (#225, patch by Alex Wheeler)
- mutate: rename on a nonexistant field now does nothing as expected.
(LOGSTASH-757)
## outputs
- new: syslog output supporting both RFC3164 and RFC5424 (#180, patch by
ruckalvnet)
- new: cloudwatch output to emit metrics and other events to Amazon CloudWatch.
- bugfix: zeromq: 'topology' is now a required setting
- improvement: mongodb: new setting 'isodate', when true, stores the @timestamp
field as a mongodb date instead of a string. (#224, patch by Kevin Amorin)

View file

@ -5,21 +5,19 @@
#
JRUBY_VERSION=1.7.0
ELASTICSEARCH_VERSION=0.19.10
JODA_VERSION=2.1
#VERSION=$(shell ruby -r./lib/logstash/version -e 'puts LOGSTASH_VERSION')
VERSION=$(shell awk -F\" '/LOGSTASH_VERSION/ {print $$2}' lib/logstash/version.rb )
VERSION=$(shell awk -F\" '/LOGSTASH_VERSION/ {print $$2}' lib/logstash/version.rb)
WITH_JRUBY=java -jar $(shell pwd)/$(JRUBY) -S
JRUBY=vendor/jar/jruby-complete-$(JRUBY_VERSION).jar
JRUBY_URL=http://repository.codehaus.org/org/jruby/jruby-complete/$(JRUBY_VERSION)
JRUBY_CMD=java -jar $(JRUBY)
JRUBYC=$(WITH_JRUBY) jrubyc
ELASTICSEARCH_URL=http://github.com/downloads/elasticsearch/elasticsearch
ELASTICSEARCH_URL=http://download.elasticsearch.org/elasticsearch/elasticsearch
ELASTICSEARCH=vendor/jar/elasticsearch-$(ELASTICSEARCH_VERSION)
JODA=vendor/jar/joda-time-$(JODA_VERSION)/joda-time-$(JODA_VERSION).jar
GEOIP=vendor/geoip/GeoLiteCity.dat
GEOIP_URL=http://logstash.objects.dreamhost.com/maxmind/GeoLiteCity-2012-11-09.dat.gz
PLUGIN_FILES=$(shell git ls-files | egrep '^lib/logstash/(inputs|outputs|filters)/' | egrep -v '/(base|threadable).rb$$|/inputs/ganglia/')
PLUGIN_FILES=$(shell git ls-files | egrep '^lib/logstash/(inputs|outputs|filters)/[^/]+$$' | egrep -v '/(base|threadable).rb$$|/inputs/ganglia/')
QUIET=@
WGET=$(shell which wget 2>/dev/null)
@ -96,7 +94,7 @@ $(JRUBY): | vendor/jar
vendor/jar/elasticsearch-$(ELASTICSEARCH_VERSION).tar.gz: | wget-or-curl vendor/jar
@echo "=> Fetching elasticsearch"
$(QUIET)$(DOWNLOAD_COMMAND) $@ $(ELASTICSEARCH_URL)/elasticsearch-$(ELASTICSEARCH_VERSION).tar.gz
vendor/jar/graphtastic-rmiclient.jar: | wget-or-curl vendor/jar
@echo "=> Fetching graphtastic rmi client jar"
$(QUIET)$(DOWNLOAD_COMMAND) $@ http://cloud.github.com/downloads/NickPadilla/GraphTastic/graphtastic-rmiclient.jar
@ -108,12 +106,6 @@ $(ELASTICSEARCH): $(ELASTICSEARCH).tar.gz | vendor/jar
$(QUIET)tar -C $(shell dirname $@) -xf $< $(TAR_OPTS) --exclude '*sigar*' \
'elasticsearch-$(ELASTICSEARCH_VERSION)/lib/*.jar'
vendor/jar/joda-time-$(JODA_VERSION)-dist.tar.gz: | wget-or-curl vendor/jar
$(DOWNLOAD_COMMAND) $@ "http://downloads.sourceforge.net/project/joda-time/joda-time/$(JODA_VERSION)/joda-time-$(JODA_VERSION)-dist.tar.gz"
vendor/jar/joda-time-$(JODA_VERSION)/joda-time-$(JODA_VERSION).jar: vendor/jar/joda-time-$(JODA_VERSION)-dist.tar.gz | vendor/jar
tar -C vendor/jar -zxf $< joda-time-$(JODA_VERSION)/joda-time-$(JODA_VERSION).jar
vendor/geoip: | vendor
$(QUIET)mkdir $@
@ -132,7 +124,7 @@ vendor-gems: | vendor/bundle
vendor/bundle: | vendor $(JRUBY)
@echo "=> Installing gems to $@..."
#$(QUIET)GEM_HOME=$(GEM_HOME) $(JRUBY_CMD) --1.9 $(GEM_HOME)/bin/bundle install --deployment
$(QUIET)GEM_HOME=./vendor/bundle/jruby/1.9/ GEM_PATH= $(JRUBY_CMD) --1.9 ./gembag.rb logstash.gemspec
$(QUIET)GEM_HOME=./vendor/bundle/jruby/1.9/ GEM_PATH= $(JRUBY_CMD) --1.9 ./gembag.rb logstash.gemspec
@# Purge any junk that fattens our jar without need!
@# The riak gem includes previous gems in the 'pkg' dir. :(
-rm -rf $@/jruby/1.9/gems/riak-client-1.0.3/pkg
@ -152,7 +144,7 @@ build/ruby: | build
# TODO(sissel): Skip sigar?
# Run this one always? Hmm..
.PHONY: build/monolith
build/monolith: $(ELASTICSEARCH) $(JRUBY) $(JODA) $(GEOIP) vendor-gems | build
build/monolith: $(ELASTICSEARCH) $(JRUBY) $(GEOIP) vendor-gems | build
build/monolith: compile copy-ruby-files vendor/jar/graphtastic-rmiclient.jar
-$(QUIET)mkdir -p $@
@# Unpack all the 3rdparty jars and any jars in gems
@ -164,10 +156,6 @@ build/monolith: compile copy-ruby-files vendor/jar/graphtastic-rmiclient.jar
$(QUIET)cp -r $$PWD/vendor/bundle/jruby/1.9/gems/jruby-openss*/lib/shared/openssl/* $@/openssl
$(QUIET)cp -r $$PWD/vendor/bundle/jruby/1.9/gems/jruby-openss*/lib/shared/jopenssl/* $@/jopenssl
$(QUIET)cp -r $$PWD/vendor/bundle/jruby/1.9/gems/jruby-openss*/lib/shared/openssl.rb $@/openssl.rb
@# Make sure joda-time gets unpacked last, so it overwrites the joda jruby
@# ships with.
$(QUIET)find $$PWD/vendor/jar/joda-time-$(JODA_VERSION) -name '*.jar' \
| (cd $@; xargs -tn1 jar xf)
@# Purge any extra files we don't need in META-INF (like manifests and
@# signature files)
-$(QUIET)rm -f $@/META-INF/*.LIST

View file

@ -11,7 +11,16 @@ The logstash agent has the following flags (also try using the '--help' flag)
<dl>
<dt> -f, --config CONFIGFILE </dt>
<dd> Load the logstash config from a specific file, directory, or a wildcard. If given a directory or wildcard, config files will be read in order lexigraphically. </dd>
<dt> --log FILE </dt>
<dt> -e CONFIGSTRING </dt>
<dd> Use the given string as the configuration data. Same syntax as the
config file. If not input is specified, 'stdin { type => stdin }' is
default. If no output is specified, 'stdout { debug => true }}' is
default. </dd>
<dt> -w, --filterworks COUNT </dt>
<dd> Run COUNT filter workers (default: 1) </dd>
<dt> --watchdog-timeout TIMEOUT </dt>
<dd> Set watchdog timeout value. </dd>
<dt> -l, --log FILE </dt>
<dd> Log to a given path. Default is to log to stdout </dd>
<dt> -v </dt>
<dd> Increase verbosity. There are multiple levels of verbosity available with
@ -26,6 +35,9 @@ name, like --grok-foo.
## Web UI
The logstash web interface has the following flags (also try using the '--help'
flag)
<dl>
<dt> --log FILE </dt>
<dd> Log to a given path. Default is stdout. </dd>
@ -33,7 +45,9 @@ name, like --grok-foo.
<dd> Address on which to start webserver. Default is 0.0.0.0. </dd>
<dt> --port PORT </dt>
<dd> Port on which to start webserver. Default is 9292. </dd>
<dt> --backend URL </dt>
<dt> -B, --elasticsearch-bind-host ADDRESS </dt>
<dd> Address on which to bind elastic search node. </dd>
<dt> -b, --backend URL </dt>
<dd>The backend URL to use. Default is elasticsearch:/// (assumes multicast discovery).
You can specify elasticsearch://[host][:port]/[clustername]</dd>
</dl>

View file

@ -63,7 +63,7 @@ Building and installing Redis is fairly straightforward. While normally this wou
- Download Redis from http://redis.io/download (The latest stable release is like what you want)
- Extract the source, change to the directory and run `make`
- Run Redis with `src/redis-server`
- Run Redis with `src/redis-server --loglevel verbose`
That's it.
@ -104,6 +104,7 @@ Put this in a file and call it 'shipper.conf' (or anything, really), and run:
This will take anything you type into this console and display it on the console. Additionally it will save events to Redis in a `list` named after the `key` value you provided.
### Testing the Redis output
To verify that the message made it into Redis, check your Redis window. You should see something like the following:
[83019] 02 Jul 12:51:02 - Accepted 127.0.0.1:58312
@ -148,8 +149,9 @@ sample config based on the previous section. Save this as `indexer.conf`
# these settings should match the output of the agent
data_type => "list"
key => "logstash"
# We use json_event here since the sender is a logstash agent
message_format => "json_event"
format => "json_event"
}
}

View file

@ -1,4 +1,5 @@
require "logstash/config/file"
require "logstash/config/file/yaml"
require "logstash/filterworker"
require "logstash/logging"
require "logstash/sized_queue"
@ -34,6 +35,7 @@ class LogStash::Agent
log_to(STDERR)
@config_path = nil
@config_string = nil
@is_yaml = false
@logfile = nil
# flag/config defaults
@ -140,7 +142,7 @@ class LogStash::Agent
# These are 'unknown' flags that begin --<plugin>-flag
# Put any plugin paths into the ruby library path for requiring later.
@plugin_paths.each do |p|
@logger.debug("Adding to ruby load path", :path => p)
@logger.debug? and @logger.debug("Adding to ruby load path", :path => p)
$:.unshift p
end
@ -163,8 +165,8 @@ class LogStash::Agent
%w{inputs outputs filters}.each do |component|
@plugin_paths.each do |path|
plugin = File.join(path, component, name) + ".rb"
@logger.debug("Plugin flag found; trying to load it",
:flag => arg, :plugin => plugin)
@logger.debug? and @logger.debug("Plugin flag found; trying to load it",
:flag => arg, :plugin => plugin)
if File.file?(plugin)
@logger.info("Loading plugin", :plugin => plugin)
require plugin
@ -173,7 +175,7 @@ class LogStash::Agent
# and add any options to our option parser.
klass_name = name.capitalize
if c.const_defined?(klass_name)
@logger.debug("Found plugin class", :class => "#{c}::#{klass_name})")
@logger.debug? and @logger.debug("Found plugin class", :class => "#{c}::#{klass_name})")
klass = c.const_get(klass_name)
# See LogStash::Config::Mixin::DSL#options
klass.options(@opts)
@ -241,8 +243,8 @@ class LogStash::Agent
# Support directory of config files.
# https://logstash.jira.com/browse/LOGSTASH-106
if File.directory?(@config_path)
@logger.debug("Config path is a directory, scanning files",
:path => @config_path)
@logger.debug? and @logger.debug("Config path is a directory, scanning files",
:path => @config_path)
paths = Dir.glob(File.join(@config_path, "*")).sort
else
# Get a list of files matching a glob. If the user specified a single
@ -252,13 +254,25 @@ class LogStash::Agent
concatconfig = []
paths.each do |path|
concatconfig << File.new(path).read
file = File.new(path)
if File.extname(file) == '.yaml'
# assume always YAML if even one file is
@is_yaml = true
end
concatconfig << file.read
end
config = LogStash::Config::File.new(nil, concatconfig.join("\n"))
config_data = concatconfig.join("\n")
else # @config_string
# Given a config string by the user (via the '-e' flag)
config = LogStash::Config::File.new(nil, @config_string)
config_data = @config_string
end
if @is_yaml
config = LogStash::Config::File::Yaml.new(nil, config_data)
else
config = LogStash::Config::File.new(nil, config_data)
end
config.logger = @logger
config
end
@ -332,23 +346,23 @@ class LogStash::Agent
private
def start_input(input)
@logger.debug("Starting input", :plugin => input)
@logger.debug? and @logger.debug("Starting input", :plugin => input)
t = 0
# inputs should write directly to output queue if there are no filters.
input_target = @filters.length > 0 ? @filter_queue : @output_queue
# check to see if input supports multiple threads
if input.threadable
@logger.debug("Threadable input", :plugin => input)
@logger.debug? and @logger.debug("Threadable input", :plugin => input)
# start up extra threads if need be
(input.threads-1).times do
input_thread = input.clone
@logger.debug("Starting thread", :plugin => input, :thread => (t+=1))
@logger.debug? and @logger.debug("Starting thread", :plugin => input, :thread => (t+=1))
@plugins[input_thread] = Thread.new(input_thread, input_target) do |*args|
run_input(*args)
end
end
end
@logger.debug("Starting thread", :plugin => input, :thread => (t+=1))
@logger.debug? and @logger.debug("Starting thread", :plugin => input, :thread => (t+=1))
@plugins[input] = Thread.new(input, input_target) do |*args|
run_input(*args)
end
@ -356,7 +370,7 @@ class LogStash::Agent
private
def start_output(output)
@logger.debug("Starting output", :plugin => output)
@logger.debug? and @logger.debug("Starting output", :plugin => output)
queue = LogStash::SizedQueue.new(10 * @filterworker_count)
queue.logger = @logger
@output_queue.add_queue(queue)
@ -474,7 +488,7 @@ class LogStash::Agent
shutdown
break
end
@logger.debug("heartbeat")
@logger.debug? and @logger.debug("heartbeat")
end
end # def run_with_config
@ -740,7 +754,7 @@ class LogStash::Agent
begin
while event = queue.pop do
@logger.debug("Sending event", :target => output)
@logger.debug? and @logger.debug("Sending event", :target => output)
output.handle(event)
break if output.finished?
end
@ -768,8 +782,9 @@ class LogStash::Agent
remaining = @plugins.count do |plugin, thread|
plugin.is_a?(pluginclass) and plugin.running? and thread.alive?
end
@logger.debug("Plugins still running", :type => pluginclass,
:remaining => remaining)
@logger.debug? and @logger.debug("Plugins still running",
:type => pluginclass,
:remaining => remaining)
if remaining == 0
@logger.warn("All #{pluginclass} finished. Shutting down.")

View file

@ -18,18 +18,24 @@ class LogStash::Config::File
end
end # def initialize
def _get_config_data
if @string.nil?
File.new(@path).read
else
@string
end
end
def _get_config(data)
grammar = LogStash::Config::Grammar.new
grammar.parse(data)
grammar.config
end
public
def parse
grammar = LogStash::Config::Grammar.new
@config = _get_config(_get_config_data);
if @string.nil?
grammar.parse(File.new(@path).read)
else
grammar.parse(@string)
end
@config = grammar.config
registry = LogStash::Config::Registry::registry
each do |o|
# Load the base class for the type given (like inputs/base, or filters/base)

View file

@ -0,0 +1,8 @@
require "logstash/config/file"
require "yaml"
class LogStash::Config::File::Yaml < LogStash::Config::File
def _get_config(data)
return YAML.load(data)
end
end

14
lib/logstash/config/grammar.rb Executable file → Normal file
View file

@ -3,7 +3,7 @@
require "logstash/namespace"
# line 147 "grammar.rl"
# line 150 "grammar.rl"
class LogStash::Config::Grammar
@ -248,7 +248,7 @@ end
self.logstash_config_en_main = 55;
# line 156 "grammar.rl"
# line 159 "grammar.rl"
# END RAGEL DATA
@tokenstack = Array.new
@ -275,7 +275,7 @@ begin
cs = logstash_config_start
end
# line 175 "grammar.rl"
# line 178 "grammar.rl"
# END RAGEL INIT
begin
@ -469,7 +469,7 @@ when 10 then
#puts "Config component: #{name}"
end
when 12 then
# line 142 "grammar.rl"
# line 145 "grammar.rl"
begin
# Compute line and column of the cursor (p)
@ -521,11 +521,11 @@ when 10 then
#puts "Config component: #{name}"
end
when 11 then
# line 141 "grammar.rl"
# line 144 "grammar.rl"
begin
puts "END" end
when 12 then
# line 142 "grammar.rl"
# line 145 "grammar.rl"
begin
# Compute line and column of the cursor (p)
@ -546,7 +546,7 @@ end
end
end
# line 180 "grammar.rl"
# line 183 "grammar.rl"
# END RAGEL EXEC
rescue => e
# Compute line and column of the cursor (p)

View file

@ -302,23 +302,29 @@ module LogStash::Config::Mixin
elsif validator.is_a?(Symbol)
# TODO(sissel): Factor this out into a coersion method?
# TODO(sissel): Document this stuff.
value = hash_or_array(value)
case validator
when :hash
if value.size % 2 == 1
return false, "This field must contain an even number of items, got #{value.size}"
end
if value.is_a?(Hash)
result = value
else
if value.size % 2 == 1
return false, "This field must contain an even number of items, got #{value.size}"
end
# Convert the array the config parser produces into a hash.
result = {}
value.each_slice(2) do |key, value|
entry = result[key]
if entry.nil?
result[key] = value
else
if entry.is_a?(Array)
entry << value
# Convert the array the config parser produces into a hash.
result = {}
value.each_slice(2) do |key, value|
entry = result[key]
if entry.nil?
result[key] = value
else
result[key] = [entry, value]
if entry.is_a?(Array)
entry << value
else
result[key] = [entry, value]
end
end
end
end
@ -342,11 +348,18 @@ module LogStash::Config::Mixin
return false, "Expected boolean, got #{value.inspect}"
end
if value.first !~ /^(true|false)$/
return false, "Expected boolean 'true' or 'false', got #{value.first.inspect}"
end
bool_value = value.first
if !!bool_value == bool_value
# is_a does not work for booleans
# we have Boolean and not a string
result = bool_value
else
if bool_value !~ /^(true|false)$/
return false, "Expected boolean 'true' or 'false', got #{bool_value.inspect}"
end
result = (value.first == "true")
result = (bool_value == "true")
end
when :ipaddr
if value.size > 1 # only one value wanted
return false, "Expected IPaddr, got #{value.inspect}"
@ -376,5 +389,12 @@ module LogStash::Config::Mixin
# Return the validator for later use, like with type coercion.
return true, result
end # def validate_value
def hash_or_array(value)
if !value.is_a?(Hash)
value = [*value] # coerce scalar to array if necessary
end
return value
end
end # module LogStash::Config::DSL
end # module LogStash::Config

View file

@ -1,7 +1,7 @@
require "json"
require "time"
require "date"
require "logstash/time"
require "logstash/time_addon"
require "logstash/namespace"
require "uri"
@ -16,6 +16,7 @@ class LogStash::Event
@cancelled = false
@data = {
"@source_host" => false,
"@source" => "unknown",
"@tags" => [],
"@fields" => {},
@ -24,7 +25,7 @@ class LogStash::Event
@data["@timestamp"] ||= LogStash::Time.now
end # def initialize
if RUBY_ENGINE == "jruby"
if defined?(RUBY_ENGINE) && RUBY_ENGINE == "jruby"
@@date_parser = Java::org.joda.time.format.ISODateTimeFormat.dateTimeParser.withOffsetParsed
else
# TODO(sissel): LOGSTASH-217
@ -91,16 +92,16 @@ class LogStash::Event
public
def source; @data["@source"]; end # def source
def source=(val)
def source=(val)
uri = URI.parse(val) rescue nil
val = uri if uri
if val.is_a?(URI)
@data["@source"] = val.to_s
@data["@source_host"] = val.host
@data["@source_host"] = val.host if @data["@source_host"].nil?
@data["@source_path"] = val.path
else
@data["@source"] = val
@data["@source_host"] = val
@data["@source_host"] = val.host if @data["@source_host"].nil?
end
end # def source=
@ -124,6 +125,9 @@ class LogStash::Event
def tags; @data["@tags"]; end # def tags
def tags=(val); @data["@tags"] = val; end # def tags=
def id; @data["@id"]; end # def id
def id=(val); @data["@id"] = val; end # def id=
# field-related access
public
def [](key)
@ -190,13 +194,13 @@ class LogStash::Event
end # event.fields.each
end # def append
# Remove a field
# Remove a field. Returns the value of that field when deleted
public
def remove(field)
if @data.has_key?(field)
@data.delete(field)
return @data.delete(field)
else
@data["@fields"].delete(field)
return @data["@fields"].delete(field)
end
end # def remove
@ -230,7 +234,7 @@ class LogStash::Event
# Got %{+%s}, support for unix epoch time
if RUBY_ENGINE != "jruby"
# This is really slow. See LOGSTASH-217
Date.parse(self.timestamp).to_i
Time.parse(self.timestamp).to_i
else
datetime = @@date_parser.parseDateTime(self.timestamp)
(datetime.getMillis / 1000).to_i

View file

@ -0,0 +1,87 @@
require "logstash/filters/base"
require "logstash/namespace"
# Anonymize fields using by replacing values with a consistent hash.
class LogStash::Filters::Anonymize < LogStash::Filters::Base
config_name "anonymize"
plugin_status "experimental"
# The fields to be anonymized
config :fields, :validate => :array, :required => true
# Hashing key
# When using MURMUR3 the key is ignored but must still be set.
# When using IPV4_NETWORK key is the subnet prefix lentgh
config :key, :validate => :string, :required => true
# digest/hash type
config :algorithm, :validate => ['SHA', 'SHA1', 'SHA224', 'SHA256', 'SHA384', 'SHA512', 'MD4', 'MD5', "MURMUR3", "IPV4_NETWORK"], :required => true, :default => 'SHA1'
public
def register
# require any library and set the anonymize function
case @algorithm
when "IPV4_NETWORK"
require 'ipaddr'
class << self; alias_method :anonymize, :anonymize_ipv4_network; end
when "MURMUR3"
require "murmurhash3"
class << self; alias_method :anonymize, :anonymize_murmur3; end
else
require 'openssl'
class << self; alias_method :anonymize, :anonymize_openssl; end
end
end # def register
public
def filter(event)
return unless filter?(event)
@fields.each do |field|
event[field] = anonymize(event[field])
end
end # def filter
private
def anonymize_ipv4_network(ip_string)
IPAddr.new(ip_string).mask(@key.to_i).to_s
end
def anonymize_openssl(data)
digest = algorithm()
OpenSSL::HMAC.hexdigest(digest, @key, data)
end
def anonymize_murmur3(value)
case value
when Fixnum
MurmurHash3::V32.int_hash(value)
when String
MurmurHash3::V32.str_hash(value)
end
end
def algorithm
case @algorithm
when 'SHA'
return OpenSSL::Digest::SHA.new
when 'SHA1'
return OpenSSL::Digest::SHA1.new
when 'SHA224'
return OpenSSL::Digest::SHA224.new
when 'SHA256'
return OpenSSL::Digest::SHA256.new
when 'SHA384'
return OpenSSL::Digest::SHA384.new
when 'SHA512'
return OpenSSL::Digest::SHA512.new
when 'MD4'
return OpenSSL::Digest::MD4.new
when 'MD5'
return OpenSSL::Digest::MD5.new
else
@logger.error("Unknown algorithm")
end
end
end # class LogStash::Filters::Anonymize

View file

@ -105,12 +105,14 @@ class LogStash::Filters::Base < LogStash::Plugin
event[field] = [event[field]] if !event[field].is_a?(Array)
event[field] << event.sprintf(value)
end
@logger.debug("filters/#{self.class.name}: adding value to field",
:field => field, :value => value)
@logger.debug? and @logger.debug("filters/#{self.class.name}: adding " \
"value to field", :field => field,
:value => value)
end
(@add_tag or []).each do |tag|
@logger.debug("filters/#{self.class.name}: adding tag", :tag => tag)
@logger.debug? and @logger.debug("filters/#{self.class.name}: adding tag",
:tag => tag)
event.tags << event.sprintf(tag)
#event.tags |= [ event.sprintf(tag) ]
end
@ -119,7 +121,8 @@ class LogStash::Filters::Base < LogStash::Plugin
remove_tags = @remove_tag.map do |tag|
event.sprintf(tag)
end
@logger.debug("filters/#{self.class.name}: removing tags", :tags => (event.tags & remove_tags))
@logger.debug? and @logger.debug("filters/#{self.class.name}: removing tags",
:tags => (event.tags & remove_tags))
event.tags -= remove_tags
end
end # def filter_matched

View file

@ -23,12 +23,17 @@ class LogStash::Filters::CSV < LogStash::Filters::Base
# Optional.
config :fields, :validate => :array, :default => []
# Define the column separator value. If this is not specified the default
# is a comma ','
# Optional.
config :separator, :validate => :string, :default => ","
public
def register
@csv = {}
@config.each do |field, dest|
next if (RESERVED + ["fields"]).member?(field)
next if (RESERVED + ["fields", "separator"]).member?(field)
@csv[field] = dest
end
@ -60,7 +65,7 @@ class LogStash::Filters::CSV < LogStash::Filters::Base
raw = event[key].first
begin
values = CSV.parse_line(raw)
values = CSV.parse_line(raw, {:col_sep => @separator})
data = {}
values.each_index do |i|
field_name = @fields[i] || "field#{i+1}"
@ -82,3 +87,4 @@ class LogStash::Filters::CSV < LogStash::Filters::Base
@logger.debug("Event after csv filter", :event => event)
end # def filter
end # class LogStash::Filters::Csv

View file

@ -47,17 +47,47 @@ class LogStash::Filters::Date < LogStash::Filters::Base
# 2011-04-19T03:44:01.103Z
# * "UNIX" - will parse unix time in seconds since epoch
# * "UNIX_MS" - will parse unix time in milliseconds since epoch
# * "TAI64N" - will parse tai64n time values
#
# For example, if you have a field 'logdate' and with a value that looks like 'Aug 13 2010 00:03:44'
# For example, if you have a field 'logdate' and with a value that looks like
# 'Aug 13 2010 00:03:44'
# you would use this configuration:
#
# logdate => "MMM dd yyyy HH:mm:ss"
# logdate => "MMM dd YYYY HH:mm:ss"
#
# [dateformats]: http://download.oracle.com/javase/1.4.2/docs/api/java/text/SimpleDateFormat.html
config /[A-Za-z0-9_-]+/, :validate => :array
# An array with field name first, and format patterns following, [ field, formats... ]
# Using this more than once will have unpredictable results, so only use it once per date filter.
# The date formats allowed are anything allowed by Joda-Time (java time
# library), generally: [java.text.SimpleDateFormat][dateformats]
#
# An array with field name first, and format patterns following, [ field,
# formats... ]
#
# If your time field has multiple possible formats, you can do this:
#
# match => [ "logdate", "MMM dd YYY HH:mm:ss",
# "MMM d YYY HH:mm:ss", "ISO8601" ]
#
# The above will match a syslog (rfc3164) or iso8601 timestamp.
#
# There are a few special exceptions, the following format literals exist
# to help you save time and ensure correctness of date parsing.
#
# * "ISO8601" - should parse any valid ISO8601 timestamp, such as
# 2011-04-19T03:44:01.103Z
# * "UNIX" - will parse unix time in seconds since epoch
# * "UNIX_MS" - will parse unix time in milliseconds since epoch
# * "TAI64N" - will parse tai64n time values
#
# For example, if you have a field 'logdate' and with a value that looks like
# 'Aug 13 2010 00:03:44', you would use this configuration:
#
# filter {
# date {
# match => [ "logdate", "MMM dd YYYY HH:mm:ss" ]
# }
# }
config :match, :validate => :array, :default => []
# LOGSTASH-34
@ -130,7 +160,12 @@ class LogStash::Filters::Date < LogStash::Filters::Base
when "UNIX_MS" # unix epoch in ms
parser = lambda { |date| org.joda.time.Instant.new(date.to_i).toDateTime }
when "TAI64N" # TAI64 with nanoseconds, -10000 accounts for leap seconds
parser = lambda { |date| org.joda.time.Instant.new((date[1..15].hex * 1000 - 10000)+(date[16..23].hex/1000000)).toDateTime }
parser = lambda do |date|
# Skip leading "@" if it is present (common in tai64n times)
date = date[1..-1] if date[0, 1] == "@"
org.joda.time.Instant.new((date[1..15].hex * 1000 - 10000)+(date[16..23].hex/1000000)).toDateTime
end
else
joda_parser = org.joda.time.format.DateTimeFormat.forPattern(format).withOffsetParsed
if (locale != nil)

View file

@ -104,13 +104,13 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
# Have @@patterns_path show first. Last-in pattern definitions win; this
# will let folks redefine built-in patterns at runtime.
@patterns_dir = @@patterns_path.to_a + @patterns_dir
@logger.info("Grok patterns path", :patterns_dir => @patterns_dir)
@logger.info? and @logger.info("Grok patterns path", :patterns_dir => @patterns_dir)
@patterns_dir.each do |path|
# Can't read relative paths from jars, try to normalize away '../'
while path =~ /file:\/.*\.jar!.*\/\.\.\//
# replace /foo/bar/../baz => /foo/baz
path = path.gsub(/[^\/]+\/\.\.\//, "")
@logger.debug("In-jar path to read", :path => path)
@logger.debug? and @logger.debug("In-jar path to read", :path => path)
end
if File.directory?(path)
@ -118,14 +118,14 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
end
Dir.glob(path).each do |file|
@logger.info("Grok loading patterns from file", :path => file)
@logger.info? and @logger.info("Grok loading patterns from file", :path => file)
@patternfiles << file
end
end
@patterns = Hash.new { |h,k| h[k] = [] }
@logger.info("Match data", :match => @match)
@logger.info? and @logger.info("Match data", :match => @match)
# TODO(sissel): Hash.merge actually overrides, not merges arrays.
# Work around it by implementing our own?
@ -143,9 +143,9 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
add_patterns_from_files(@patternfiles, @patterns[field])
end
@logger.info("Grok compile", :field => field, :patterns => patterns)
@logger.info? and @logger.info("Grok compile", :field => field, :patterns => patterns)
patterns.each do |pattern|
@logger.debug("regexp: #{@type}/#{field}", :pattern => pattern)
@logger.debug? and @logger.debug("regexp: #{@type}/#{field}", :pattern => pattern)
@patterns[field].compile(pattern)
end
end # @config.each
@ -158,17 +158,17 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
# parse it with grok
matched = false
@logger.debug("Running grok filter", :event => event);
@logger.debug? and @logger.debug("Running grok filter", :event => event);
done = false
@patterns.each do |field, pile|
break if done
if !event[field]
@logger.debug("Skipping match object, field not present",
:field => field, :event => event)
@logger.debug? and @logger.debug("Skipping match object, field not present",
:field => field, :event => event)
next
end
@logger.debug("Trying pattern", :pile => pile, :field => field)
@logger.debug? and @logger.debug("Trying pattern", :pile => pile, :field => field)
(event[field].is_a?(Array) ? event[field] : [event[field]]).each do |fieldvalue|
begin
# Coerce all field values to string. This turns arrays, hashes, numbers, etc
@ -197,8 +197,8 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
# Permit typing of captures by giving an additional colon and a type,
# like: %{FOO:name:int} for int coercion.
if type_coerce
@logger.info("Match type coerce: #{type_coerce}")
@logger.info("Patt: #{grok.pattern}")
@logger.info? and @logger.info("Match type coerce: #{type_coerce}")
@logger.info? and @logger.info("Patt: #{grok.pattern}")
end
case type_coerce
@ -211,13 +211,12 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
# Special casing to skip captures that represent the entire log message.
if fieldvalue == value and field == "@message"
# Skip patterns that match the entire message
@logger.debug("Skipping capture since it matches the whole line.", :field => key)
@logger.debug? and @logger.debug("Skipping capture since it matches the whole line.", :field => key)
next
end
if @named_captures_only && !is_named
@logger.debug("Skipping capture since it is not a named " \
"capture and named_captures_only is true.", :field => key)
@logger.debug? and @logger.debug("Skipping capture since it is not a named " "capture and named_captures_only is true.", :field => key)
next
end
@ -253,7 +252,7 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
event.tags << "_grokparsefailure"
end
@logger.debug("Event now: ", :event => event)
@logger.debug? and @logger.debug("Event now: ", :event => event)
end # def filter
private
@ -272,8 +271,8 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
# the end. I don't know if this is a bug or intentional, but we need
# to chomp it.
name, pattern = line.chomp.split(/\s+/, 2)
@logger.debug("Adding pattern from file", :name => name,
:pattern => pattern, :path => path)
@logger.debug? and @logger.debug("Adding pattern from file", :name => name,
:pattern => pattern, :path => path)
pile.add_pattern(name, pattern)
end
else

View file

@ -7,9 +7,9 @@ require "logstash/namespace"
# For example, if you have a log message which contains 'ip=1.2.3.4
# error=REFUSED', you can parse those automatically by doing:
#
# filter {
# kv { }
# }
# filter {
# kv { }
# }
#
# And you will get field 'ip' == "1.2.3.4" etc.
class LogStash::Filters::KV < LogStash::Filters::Base
@ -33,9 +33,10 @@ class LogStash::Filters::KV < LogStash::Filters::Base
#
# Example, to split out the args from a string such as
# '?pin=12345~0&d=123&e=foo@bar.com&oq=bobo&ss=12345':
#
#
# Default to space character for backward compatibility
# filter { kv { field_split => "&?" } }
config :field_split, :validate => :string, :default => ''
config :field_split, :validate => :string, :default => ' '
# A string of characters to use as delimiters for identifying key-value relations.
@ -77,7 +78,7 @@ class LogStash::Filters::KV < LogStash::Filters::Base
when Array; value.each { |v| kv_keys = parse(v, event, kv_keys) }
else
@logger.warn("kv filter has no support for this type of data",
:type => value.type, :value => value)
:type => value.class, :value => value)
end # case value
end
# If we have any keys, create/append the hash
@ -95,7 +96,7 @@ class LogStash::Filters::KV < LogStash::Filters::Base
if !event =~ /[@field_split]/
return kv_keys
end
scan_re = Regexp.new("([^ "+@field_split+@value_split+"]+)["+@value_split+"](?:\"([^\""+@field_split+"]+)\"|'([^'"+@field_split+"]+)'|([^ "+@field_split+"]+))")
scan_re = Regexp.new("((?:\\\\ |[^"+@field_split+@value_split+"])+)["+@value_split+"](?:\"([^\"]+)\"|'([^']+)'|((?:\\\\ |[^"+@field_split+"])+))")
text.scan(scan_re) do |key, v1, v2, v3|
value = v1 || v2 || v3
if !@trim.nil?

View file

@ -14,7 +14,8 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base
def register
require "metriks"
require "socket"
@metric_meters = Hash.new { |h,k| h[k] = Metriks.meter(k) }
@metric_timers = Hash.new { |h,k| h[k] = Metriks.timer(k) }
end # def register
@ -33,6 +34,7 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base
def flush
event = LogStash::Event.new
event.source_host = Socket.gethostname
@metric_meters.each do |name, metric|
event["#{name}.count"] = metric.count
event["#{name}.rate_1m"] = metric.one_minute_rate
@ -42,12 +44,13 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base
@metric_timers.each do |name, metric|
event["#{name}.count"] = metric.count
event["#{name}.rate_1m"] = metric.one_mintute_rate
event["#{name}.rate_1m"] = metric.one_minute_rate
event["#{name}.rate_5m"] = metric.five_minute_rate
event["#{name}.rate_15m"] = metric.fifteen_minute_rate
event["#{name}.min"] = metric.min
event["#{name}.max"] = metric.max
event["#{name}.stddev"] = metric.stddev
event["#{name}.mean"] = metric.mean
end
filter_matched(event)

View file

@ -63,20 +63,25 @@ class LogStash::Filters::Mutate < LogStash::Filters::Base
# Convert a string field by applying a regular expression and a replacement
# if the field is not a string, no action will be taken
#
# this configuration takes an array consisting of 3 elements per field/substitution
# This configuration takes an array consisting of 3 elements per
# field/substitution.
#
# be aware of escaping any backslash in the config file
#
# for example:
#
# mutate {
# gsub => [
# # replace all forward slashes with underscore
# "fieldname", "\\/", "_",
# # replace backslashes, question marks, hashes and minuses with underscore
# "fieldname", "[\\?#-]", "_"
# ]
# }
# filter {
# mutate {
# gsub => [
# # replace all forward slashes with underscore
# "fieldname", "\\/", "_",
#
# # replace backslashes, question marks, hashes and minuses with
# # underscore
# "fieldname", "[\\?#-]", "_"
# ]
# }
# }
#
config :gsub, :validate => :array
@ -84,22 +89,58 @@ class LogStash::Filters::Mutate < LogStash::Filters::Base
#
# Example:
#
# mutate {
# uppercase => [ "fieldname" ]
# }
#
# filter {
# mutate {
# uppercase => [ "fieldname" ]
# }
# }
config :uppercase, :validate => :array
# Convert a string to its lowercase equivalent
#
# Example:
#
# mutate {
# lowercase => [ "fieldname" ]
# }
#
# filter {
# mutate {
# lowercase => [ "fieldname" ]
# }
# }
config :lowercase, :validate => :array
# Split a field to an array using a separator character. Only works on string
# fields.
#
# Example:
#
# filter {
# mutate {
# split => ["fieldname", ","]
# }
# }
config :split, :validate => :hash
# Join an array with a separator character, does nothing on non-array fields
#
# Example:
#
# filter {
# mutate {
# join => ["fieldname", ","]
# }
# }
config :join, :validate => :hash
# Strip whitespaces
#
# Example:
#
# filter {
# mutate {
# strip => ["field1", "field2"]
# }
# }
config :strip, :validate => :array
public
def register
valid_conversions = %w(string integer float)
@ -139,6 +180,8 @@ class LogStash::Filters::Mutate < LogStash::Filters::Base
uppercase(event) if @uppercase
lowercase(event) if @lowercase
remove(event) if @remove
split(event) if @split
join(event) if @join
filter_matched(event)
end # def filter
@ -155,8 +198,8 @@ class LogStash::Filters::Mutate < LogStash::Filters::Base
def rename(event)
# TODO(sissel): use event.sprintf on the field names?
@rename.each do |old, new|
event[new] = event[old]
event.remove(old)
next unless event.include?(old)
event[new] = event.remove(old)
end
end # def rename
@ -254,4 +297,34 @@ class LogStash::Filters::Mutate < LogStash::Filters::Base
end
end
end # def lowercase
private
def split(event)
@split.each do |field, separator|
if event[field].is_a?(String)
event[field] = event[field].split(separator)
end
end
end
private
def join(event)
@join.each do |field, separator|
if event[field].is_a?(Array)
event[field] = event[field].join(separator)
end
end
end
private
def strip(event)
@strip.each do |field|
if event[field].is_a?(Array)
event[field] = event[field].map{|s| s.strip }
elsif event[field].is_a?(String)
event[field] = event[field].strip
end
end
end
end # class LogStash::Filters::Mutate

View file

@ -58,7 +58,7 @@ class LogStash::FilterWorker < LogStash::Plugin
end
events.each do |event|
@logger.debug("Pushing flushed events", :event => event)
@logger.debug? and @logger.debug("Pushing flushed events", :event => event)
@output_queue.push(event) unless event.cancelled?
end
end # def flusher
@ -95,14 +95,14 @@ class LogStash::FilterWorker < LogStash::Plugin
clear_watchdog
end
if event.cancelled?
@logger.debug("Event cancelled", :event => event,
:filter => filter.class)
@logger.debug? and @logger.debug("Event cancelled", :event => event,
:filter => filter.class)
break
end
end # @filters.each
@logger.debug("Event finished filtering", :event => event,
:thread => Thread.current[:name])
@logger.debug? and @logger.debug("Event finished filtering", :event => event,
:thread => Thread.current[:name])
@output_queue.push(event) unless event.cancelled?
end # events.each
end # def filter

View file

@ -30,8 +30,12 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Threadable
# Your amqp password
config :password, :validate => :password, :default => "guest"
# The name of the queue.
config :name, :validate => :string, :default => ""
# The name of the queue. Depricated due to conflicts with puppet naming convention.
# Replaced by 'queue' variable. See LOGSTASH-755
config :name, :validate => :string, :deprecated => true
# The name of the queue.
config :queue, :validate => :string, :default => ""
# The name of the exchange to bind the queue. This is analogous to the 'amqp
# output' [config 'name'](../outputs/amqp)
@ -86,6 +90,14 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Threadable
public
def register
if @name
if @queue
@logger.error("'name' and 'queue' are the same setting, but 'name' is deprecated. Please use only 'queue'")
end
@queue = @name
end
@logger.info("Registering input #{@url}")
require "bunny" # rubygem 'bunny'
@vhost ||= "/"
@ -106,12 +118,12 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Threadable
amqp_credentials << @user if @user
amqp_credentials << ":#{@password}" if @password
@amqpurl += amqp_credentials unless amqp_credentials.nil?
@amqpurl += "#{@host}:#{@port}#{@vhost}/#{@name}"
@amqpurl += "#{@host}:#{@port}#{@vhost}/#{@queue}"
end # def register
def run(queue)
begin
@logger.debug("Connecting with AMQP settings #{@amqpsettings.inspect} to set up queue #{@name.inspect}")
@logger.debug("Connecting with AMQP settings #{@amqpsettings.inspect} to set up queue #{@queue.inspect}")
@bunny = Bunny.new(@amqpsettings)
return if terminating?
@bunny.start
@ -119,15 +131,15 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Threadable
@arguments_hash = Hash[*@arguments]
@queue = @bunny.queue(@name, {:durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive, :arguments => @arguments_hash })
@queue.bind(@exchange, :key => @key)
@bunnyqueue = @bunny.queue(@queue, {:durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive, :arguments => @arguments_hash })
@bunnyqueue.bind(@exchange, :key => @key)
@queue.subscribe({:ack => @ack}) do |data|
@bunnyqueue.subscribe({:ack => @ack}) do |data|
e = to_event(data[:payload], @amqpurl)
if e
queue << e
end
end # @queue.subscribe
end # @bunnyqueue.subscribe
rescue *[Bunny::ConnectionError, Bunny::ServerDownError] => e
@logger.error("AMQP connection error, will reconnect: #{e}")
@ -139,8 +151,8 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Threadable
end # def run
def teardown
@queue.unsubscribe unless @durable == true
@queue.delete unless @durable == true
@bunnyqueue.unsubscribe unless @durable == true
@bunnyqueue.delete unless @durable == true
@bunny.close if @bunny
finished
end # def teardown

View file

@ -107,6 +107,7 @@ class LogStash::Inputs::Base < LogStash::Plugin
:source => source, :exception => e,
:backtrace => e.backtrace)
event.message = raw
event.tags << "_jsonparsefailure"
end
when "json_event"
begin
@ -124,6 +125,7 @@ class LogStash::Inputs::Base < LogStash::Plugin
:input => raw, :source => source, :exception => e,
:backtrace => e.backtrace)
event.message = raw
event.tags << "_jsonparsefailure"
end
if event.source == "unknown"
@ -141,7 +143,7 @@ class LogStash::Inputs::Base < LogStash::Plugin
event[field] << event.sprintf(value)
end
logger.debug(["Received new event", {:source => source, :event => event}])
@logger.debug? and @logger.debug("Received new event", :source => source, :event => event)
return event
end # def to_event
end # class LogStash::Inputs::Base

View file

@ -0,0 +1,328 @@
require "date"
require "logstash/inputs/base"
require "logstash/namespace"
# Retrieve watchdog log events from a Drupal installation with DBLog enabled.
# The events are pulled out directly from the database.
# The original events are not deleted, and on every consecutive run only new
# events are pulled.
#
# The last watchdog event id that was processed is stored in the Drupal
# variable table with the name "logstash_last_wid". Delete this variable or
# set it to 0 if you want to re-import all events.
#
# More info on DBLog: http://drupal.org/documentation/modules/dblog
#
class LogStash::Inputs::DrupalDblog < LogStash::Inputs::Base
config_name "drupal_dblog"
plugin_status "experimental"
# Specify all drupal databases that you whish to import from.
# This can be as many as you whish.
# The format is a hash, with a unique site name as the key, and a databse
# url as the value.
#
# Example:
# [
# "site1", "mysql://user1:password@host1.com/databasename",
# "other_site", "mysql://user2:password@otherhost.com/databasename",
# ...
# ]
config :databases, :validate => :hash
# By default, the event only contains the current user id as a field.
# If you whish to add the username as an additional field, set this to true.
config :add_usernames, :validate => :boolean, :default => false
# Time between checks in minutes.
config :interval, :validate => :number, :default => 10
# The amount of log messages that should be fetched with each query.
# Bulk fetching is done to prevent querying huge data sets when lots of
# messages are in the database.
config :bulksize, :validate => :number, :default => 5000
# Label this input with a type.
# Types are used mainly for filter activation.
#
#
# If you create an input with type "foobar", then only filters
# which also have type "foobar" will act on them.
#
# The type is also stored as part of the event itself, so you
# can also use the type to search for in the web interface.
config :type, :validate => :string, :default => 'watchdog'
public
def initialize(params)
super
@format = "json_event"
end # def initialize
public
def register
require "php_serialize"
if RUBY_PLATFORM == 'java'
require "logstash/inputs/drupal_dblog/jdbcconnection"
else
require "mysql2"
end
end # def register
public
def config_init(params)
super
dbs = {}
valid = true
@databases.each do |name, rawUri|
uri = URI(rawUri)
dbs[name] = {
"site" => name,
"scheme" => uri.scheme,
"host" => uri.host,
"user" => uri.user,
"password" => uri.password,
"database" => uri.path.sub('/', ''),
"port" => uri.port.to_i
}
if not (
uri.scheme and not uri.scheme.empty?\
and uri.host and not uri.host.empty?\
and uri.user and not uri.user.empty?\
and uri.password\
and uri.path and not uri.path.sub('/', '').empty?
)
@logger.error("Drupal DBLog: Invalid database URI for #{name} : #{rawUri}")
valid = false
end
if not uri.scheme == 'mysql'
@logger.error("Drupal DBLog: Only mysql databases are supported.")
valid = false
end
end
if not valid
@logger.error("Config validation failed.")
exit 1
end
@databases = dbs
end #def config_init
public
def run(output_queue)
@logger.info("Initializing drupal_dblog")
loop do
@logger.debug("Drupal DBLog: Starting to fetch new watchdog entries")
start = Time.now.to_i
@databases.each do |name, db|
@logger.debug("Drupal DBLog: Checking database #{name}")
check_database(output_queue, db)
@logger.info("Drupal DBLog: Retrieved all new watchdog messages from #{name}")
end
timeTaken = Time.now.to_i - start
@logger.info("Drupal DBLog: Fetched all new watchdog entries in #{timeTaken} seconds")
# If fetching of all databases took less time than the interval,
# sleep a bit.
sleepTime = @interval * 60 - timeTaken
if sleepTime > 0
@logger.debug("Drupal DBLog: Sleeping for #{sleepTime} seconds")
sleep(sleepTime)
end
end # loop
end # def run
private
def initialize_client(db)
if db["scheme"] == 'mysql'
if not db["port"] > 0
db["port"] = 3306
end
if RUBY_PLATFORM == 'java'
@client = LogStash::DrupalDblogJavaMysqlConnection.new(
db["host"],
db["user"],
db["password"],
db["database"],
db["port"]
)
else
@client = Mysql2::Client.new(
:host => db["host"],
:port => db["port"],
:username => db["user"],
:password => db["password"],
:database => db["database"]
)
end
end
end #def get_client
private
def check_database(output_queue, db)
begin
# connect to the MySQL server
initialize_client(db)
rescue Exception => e
@logger.error("Could not connect to database: " + e.message)
return
end #begin
begin
@sitename = db["site"]
@usermap = @add_usernames ? get_usermap : nil
# Retrieve last pulled watchdog entry id
initialLastWid = get_last_wid
lastWid = nil
if initialLastWid == false
lastWid = 0
set_last_wid(0, true)
else
lastWid = initialLastWid
end
# Fetch new entries, and create the event
while true
results = get_db_rows(lastWid)
if results.length() < 1
break
end
@logger.debug("Fetched " + results.length().to_s + " database rows")
results.each do |row|
event = build_event(row)
if event
output_queue << event
lastWid = row['wid'].to_s
end
end
set_last_wid(lastWid, false)
end
rescue Exception => e
@logger.error("Error while fetching messages: ", :error => e.message)
end # begin
# Close connection
@client.close
end # def check_database
def get_db_rows(lastWid)
query = 'SELECT * from watchdog WHERE wid > ' + lastWid.to_s + " ORDER BY wid asc LIMIT " + @bulksize.to_s
return @client.query(query)
end # def get_db_rows
private
def update_sitename
if @sitename == ""
result = @client.query('SELECT value FROM variable WHERE name="site_name"')
if result.first()
@sitename = PHP.unserialize(result.first()['value'])
end
end
end # def update_sitename
private
def get_last_wid
result = @client.query('SELECT value FROM variable WHERE name="logstash_last_wid"')
lastWid = false
if result.count() > 0
tmp = result.first()["value"].gsub("i:", "").gsub(";", "")
lastWid = tmp.to_i.to_s == tmp ? tmp : "0"
end
return lastWid
end # def get_last_wid
private
def set_last_wid(wid, insert)
wid = PHP.serialize(wid.to_i)
# Update last import wid variable
if insert
# Does not exist yet, so insert
@client.query('INSERT INTO variable (name, value) VALUES("logstash_last_wid", "' + wid + '")')
else
@client.query('UPDATE variable SET value="' + wid + '" WHERE name="logstash_last_wid"')
end
end # def set_last_wid
private
def get_usermap
map = {}
@client.query("SELECT uid, name FROM users").each do |row|
map[row["uid"]] = row["name"]
end
map[0] = "guest"
return map
end # def get_usermap
private
def build_event(row)
# Convert unix timestamp
timestamp = Time.at(row["timestamp"]).to_datetime.iso8601
msg = row["message"]
vars = {}
# Unserialize the variables, and construct the message
if row['variables'] != 'N;'
vars = PHP.unserialize(row["variables"])
if vars.is_a?(Hash)
vars.each_pair do |k, v|
if msg.scan(k).length() > 0
msg = msg.gsub(k.to_s, v.to_s)
else
# If not inside the message, add var as an additional field
row["variable_" + k] = v
end
end
end
end
row.delete("message")
row.delete("variables")
row.delete("timestamp")
row["severity"] = row["severity"].to_i
if @add_usernames and @usermap.has_key?(row["uid"])
row["user"] = @usermap[row["uid"]]
end
entry = {
"@timestamp" => timestamp,
"@tags" => [],
"@type" => "watchdog",
"@source" => @sitename,
"@fields" => row,
"@message" => msg
}
event = to_event(JSON.dump(entry), @sitename)
return event
end # def build_event
end # class LogStash::Inputs::DrupalDblog

View file

@ -0,0 +1,65 @@
require "java"
require "rubygems"
require "jdbc/mysql"
java_import "com.mysql.jdbc.Driver"
# A JDBC mysql connection class.
# The interface is compatible with the mysql2 API.
class LogStash::DrupalDblogJavaMysqlConnection
def initialize(host, username, password, database, port = nil)
port ||= 3306
address = "jdbc:mysql://#{host}:#{port}/#{database}"
@connection = java.sql.DriverManager.getConnection(address, username, password)
end # def initialize
def query(sql)
if sql =~ /select/i
return select(sql)
else
return update(sql)
end
end # def query
def select(sql)
stmt = @connection.createStatement
resultSet = stmt.executeQuery(sql)
meta = resultSet.getMetaData
column_count = meta.getColumnCount
rows = []
while resultSet.next
res = {}
(1..column_count).each do |i|
name = meta.getColumnName(i)
case meta.getColumnType(i)
when java.sql.Types::INTEGER
res[name] = resultSet.getInt(name)
else
res[name] = resultSet.getString(name)
end
end
rows << res
end
stmt.close
return rows
end # def select
def update(sql)
stmt = @connection.createStatement
stmt.execute_update(sql)
stmt.close
end # def update
def close
@connection.close
end # def close
end # class LogStash::DrupalDblogJavaMysqlConnection

View file

@ -5,19 +5,27 @@ require "socket"
# Pull events from a Windows Event Log
#
# To collect Events from the System Event Log, use a config like:
# input {
# eventlog {
# type => 'Win32-EventLog'
# name => 'System'
# }
# }
#
# input {
# eventlog {
# type => 'Win32-EventLog'
# name => 'System'
# }
# }
class LogStash::Inputs::EventLog < LogStash::Inputs::Base
config_name "eventlog"
plugin_status "beta"
# Event Log Name. Depricated due to conflicts with puppet naming convention.
# Replaced by 'logfile' variable. See LOGSTASH-755
config :name, :validate => :string, :deprecated => true
# Event Log Name
config :name, :validate => :string, :required => true, :default => "System"
config :logfile, :validate => :string
#:required => true, :default => "System"
# TODO(sissel): Make 'logfile' required after :name is gone.
public
def initialize(params)
@ -27,8 +35,21 @@ class LogStash::Inputs::EventLog < LogStash::Inputs::Base
public
def register
if @name
@logger.warn("Please use 'logfile' instead of the 'name' setting")
if @logfile
@logger.error("'name' and 'logfile' are the same setting, but 'name' is deprecated. Please use only 'logfile'")
end
@logfile = @name
end
if @logfile.nil?
raise ArgumentError, "Missing required parameter 'logfile' for input/eventlog"
end
@hostname = Socket.gethostname
@logger.info("Registering input eventlog://#{@hostname}/#{@name}")
@logger.info("Registering input eventlog://#{@hostname}/#{@logfile}")
require "win32ole" # rubygem 'win32ole' ('jruby-win32ole' on JRuby)
end # def register
@ -43,7 +64,7 @@ class LogStash::Inputs::EventLog < LogStash::Inputs::Base
newest_shipped_event = latest_record_number
next_newest_shipped_event = newest_shipped_event
begin
@logger.debug("Tailing Windows Event Log '#{@name}'")
@logger.debug("Tailing Windows Event Log '#{@logfile}'")
loop do
event_index = 0
latest_events.each do |event|
@ -51,7 +72,7 @@ class LogStash::Inputs::EventLog < LogStash::Inputs::Base
timestamp = DateTime.strptime(event.TimeGenerated, "%Y%m%d%H%M%S").iso8601
timestamp[19..-1] = DateTime.now.iso8601[19..-1] # Copy over the correct TZ offset
e = LogStash::Event.new({
"@source" => "eventlog://#{@hostname}/#{@name}",
"@source" => "eventlog://#{@hostname}/#{@logfile}",
"@type" => @type,
"@timestamp" => timestamp
})
@ -81,7 +102,7 @@ class LogStash::Inputs::EventLog < LogStash::Inputs::Base
private
def latest_events
wmi_query = "select * from Win32_NTLogEvent where Logfile = '#{@name}'"
wmi_query = "select * from Win32_NTLogEvent where Logfile = '#{@logfile}'"
events = @wmi.ExecQuery(wmi_query)
end # def latest_events

View file

@ -17,7 +17,10 @@ class LogStash::Inputs::Gemfire < LogStash::Inputs::Threadable
plugin_status "experimental"
# Your client cache name
config :name, :validate => :string, :default => "logstash"
config :name, :validate => :string, :deprecated => true
# Your client cache name
config :cache_name, :validate => :string, :default => "logstash"
# The path to a GemFire client cache XML file.
#
@ -51,6 +54,13 @@ class LogStash::Inputs::Gemfire < LogStash::Inputs::Threadable
# How the message is serialized in the cache. Can be one of "json" or "plain"; default is plain
config :serialization, :validate => :string, :default => nil
if @name
if @cache_name
@logger.error("'name' and 'cache_name' are the same setting, but 'name' is deprecated. Please use only 'cache_name'")
end
@cache_name = @name
end
public
def initialize(params)
super
@ -97,10 +107,10 @@ class LogStash::Inputs::Gemfire < LogStash::Inputs::Threadable
protected
def connect
begin
@logger.debug("Connecting to GemFire #{@name}")
@logger.debug("Connecting to GemFire #{@cache_name}")
@cache = ClientCacheFactory.new.
set("name", @name).
set("name", @cache_name).
set("cache-xml-file", @cache_xml_file).create
@logger.debug("Created cache #{@cache.inspect}")

View file

@ -8,10 +8,10 @@ require "logstash/namespace"
#
# Recommended filters:
#
# filter {
# grok {
# pattern => "^%{TIMESTAMP_ISO8601:timestamp} %{WORD:component}\[%{WORD:process}(?:\.%{INT:instance:int})?\]: %{DATA:message}$"
# }
# filter {
# grok {
# pattern => "^%{TIMESTAMP_ISO8601:timestamp} %{WORD:component}\[%{WORD:process}(?:\.%{INT:instance:int})?\]: %{DATA:message}$"
# }
# date { timestamp => ISO8601 }
# }
class LogStash::Inputs::Heroku < LogStash::Inputs::Base

View file

@ -25,7 +25,7 @@ class LogStash::Inputs::Irc < LogStash::Inputs::Base
config :real, :validate => :string, :default => "logstash"
# IRC Server password
config :password, :validate => :password, :default => nil
config :password, :validate => :password
# Channels to listen to
config :channels, :validate => :array, :required => true

View file

@ -58,6 +58,13 @@ class LogStash::Inputs::Stomp < LogStash::Inputs::Base
e = to_event(msg.body, @stomp_url)
@output_queue << e if e
end
#In the event that there is only Stomp input plugin instances
#the process ends prematurely. The above code runs, and return
#the flow control to the 'run' method below. After that, the
#method "run_input" from agent.rb marks 'done' as 'true' and calls
#'finish' over the Stomp plugin instance.
#'Sleeping' the plugin leves the instance alive.
sleep
end
public

View file

@ -67,10 +67,10 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
end # loop do
rescue => e
@logger.debug("Closing connection", :client => socket.peer,
:exception => e, :backtrace => e.backtrace)
:exception => e, :backtrace => e.backtrace)
rescue Timeout::Error
@logger.debug("Closing connection after read timeout",
:client => socket.peer)
:client => socket.peer)
end # begin
begin
@ -95,15 +95,26 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
if server?
loop do
# Start a new thread for each connection.
Thread.start(@server_socket.accept) do |s|
# TODO(sissel): put this block in its own method.
begin
Thread.start(@server_socket.accept) do |s|
# TODO(sissel): put this block in its own method.
# monkeypatch a 'peer' method onto the socket.
s.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
@logger.debug("Accepted connection", :client => s.peer,
:server => "#{@host}:#{@port}")
handle_socket(s, output_queue, "tcp://#{@host}:#{@port}/client/#{s.peer}")
end # Thread.start
# monkeypatch a 'peer' method onto the socket.
s.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
@logger.debug("Accepted connection", :client => s.peer,
:server => "#{@host}:#{@port}")
handle_socket(s, output_queue, "tcp://#{@host}:#{@port}/client/#{s.peer}")
end # Thread.start
rescue IOError
if @interrupted
#Intended shutdown, get out of the loop
break
else
# Else it was a genuine IOError caused by something else, so propagate it up..
raise
end
end
end # loop
else
loop do
@ -114,4 +125,12 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
end # loop
end
end # def run
public
def teardown
if server?
@interrupted = true
@server_socket.close
end
end # def teardown
end # class LogStash::Inputs::Tcp

View file

@ -28,8 +28,12 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
# The exchange type (fanout, topic, direct)
config :exchange_type, :validate => [ "fanout", "direct", "topic"], :required => true
# The name of the exchange. Depricated due to conflicts with puppet naming convention.
# Replaced by 'exchange' variable. See LOGSTASH-755
config :name, :validate => :string, :deprecated => true
# The name of the exchange
config :name, :validate => :string, :required => true
config :exchange, :validate => :string # TODO(sissel): Make it required when 'name' is gone
# Key to route to by default. Defaults to 'logstash'
#
@ -59,6 +63,13 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
def register
require "bunny" # rubygem 'bunny'
if @name
if @exchange
@logger.error("'name' and 'exchange' are the same setting, but 'name' is deprecated. Please use only 'exchange'")
end
@exchange = @name
end
@logger.info("Registering output", :plugin => self)
connect
end # def register
@ -78,7 +89,7 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
begin
@logger.debug("Connecting to AMQP", :settings => amqpsettings,
:exchange_type => @exchange_type, :name => @name)
:exchange_type => @exchange_type, :name => @exchange)
@bunny = Bunny.new(amqpsettings)
@bunny.start
rescue => e
@ -92,11 +103,11 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
end
end
@logger.debug("Declaring exchange", :name => @name, :type => @exchange_type,
@logger.debug("Declaring exchange", :name => @exchange, :type => @exchange_type,
:durable => @durable)
@exchange = @bunny.exchange(@name, :type => @exchange_type.to_sym, :durable => @durable)
@bunnyexchange = @bunny.exchange(@exchange, :type => @exchange_type.to_sym, :durable => @durable)
@logger.debug("Binding exchange", :name => @name, :key => @key)
@logger.debug("Binding exchange", :name => @exchange, :key => @key)
end # def connect
public
@ -118,9 +129,9 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
public
def receive_raw(message, key=@key)
begin
if @exchange
if @bunnyexchange
@logger.debug(["Publishing message", { :destination => to_s, :message => message, :key => key }])
@exchange.publish(message, :persistent => @persistent, :key => key)
@bunnyexchange.publish(message, :persistent => @persistent, :key => key)
else
@logger.warn("Tried to send message, but not connected to amqp yet.")
end
@ -133,14 +144,14 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
public
def to_s
return "amqp://#{@user}@#{@host}:#{@port}#{@vhost}/#{@exchange_type}/#{@name}\##{@key}"
return "amqp://#{@user}@#{@host}:#{@port}#{@vhost}/#{@exchange_type}/#{@exchange}\##{@key}"
end
public
def teardown
@bunny.close rescue nil
@bunny = nil
@exchange = nil
@bunnyexchange = nil
finished
end # def teardown
end # class LogStash::Outputs::Amqp

View file

@ -59,28 +59,28 @@ class LogStash::Outputs::Base < LogStash::Plugin
def output?(event)
if !@type.empty?
if event.type != @type
@logger.debug(["Dropping event because type doesn't match #{@type}", event])
@logger.debug? and @logger.debug(["Dropping event because type doesn't match #{@type}", event])
return false
end
end
if !@tags.empty?
if (event.tags & @tags).size != @tags.size
@logger.debug(["Dropping event because tags don't match #{@tags.inspect}", event])
@logger.debug? and @logger.debug(["Dropping event because tags don't match #{@tags.inspect}", event])
return false
end
end
if !@exclude_tags.empty?
if (diff_tags = (event.tags & @exclude_tags)).size != 0
@logger.debug(["Dropping event because tags contains excluded tags: #{diff_tags.inspect}", event])
@logger.debug? and @logger.debug(["Dropping event because tags contains excluded tags: #{diff_tags.inspect}", event])
return false
end
end
if !@fields.empty?
if (event.fields.keys & @fields).size != @fields.size
@logger.debug(["Dropping event because type doesn't match #{@fields.inspect}", event])
@logger.debug? and @logger.debug(["Dropping event because type doesn't match #{@fields.inspect}", event])
return false
end
end

View file

@ -1,10 +1,6 @@
require "logstash/outputs/base"
require "logstash/namespace"
require "thread"
require "rufus/scheduler"
require "aws"
# This output lets you aggregate and send metric data to AWS CloudWatch
#
# Configuration is done partly in this output and partly using fields added
@ -24,6 +20,20 @@ class LogStash::Outputs::CloudWatch < LogStash::Outputs::Base
config_name "cloudwatch"
plugin_status "experimental"
# Constants
# aggregate_key members
DIMENSIONS = "dimensions"
TIMESTAMP = "timestamp"
METRIC = "metric"
COUNT = "count"
UNIT = "unit"
SUM = "sum"
MIN = "min"
MAX = "max"
# Units
COUNT_UNIT = "Count"
NONE = "None"
# The AWS Region to send logs to.
config :region, :validate => :string, :default => "us-east-1"
@ -43,44 +53,72 @@ class LogStash::Outputs::CloudWatch < LogStash::Outputs::Base
# See here for allowed values: https://github.com/jmettraux/rufus-scheduler#the-time-strings-understood-by-rufus-scheduler
config :timeframe, :validate => :string, :default => "1m"
# How many events to queue before forcing a call to the CloudWatch API ahead of "timeframe" schedule
# Set this to the number of events-per-timeframe you will be sending to CloudWatch to avoid extra API calls
config :queue_size, :validate => :number, :default => 10000
# The default namespace to use for events which do not have a "CW_namespace" field
config :namespace, :validate => :string, :default => "Logstash"
# The name of the field used to set the metric name on an event
config :field_metric, :validate => :string, :default => "CW_metric"
# The name of the field used to set a different namespace per event
config :field_namespace, :validate => :string, :default => "CW_namespace"
# The name of the field used to set the units on an event metric
# The default metric name to use for events which do not have a "CW_metricname" field.
# If this is provided then all events which pass through this output will be aggregated and
# sent to CloudWatch, so use this carefully. Furthermore, when providing this option, you
# will probably want to also restrict events from passing through this output using event
# type, tag, and field matching
#
# At a minimum events must have a "metric name" to be sent to CloudWatch. This can be achieved
# either by providing a default here, as described above, OR by adding a "CW_metricname" field
# to the events themselves, as described below. By default, if no other configuration is
# provided besides a metric name, then events will be counted (Unit: Count, Value: 1)
# by their metric name (either this default or from their CW_metricname field)
config :metricname, :validate => :string
# The name of the field used to set the metric name on an event
config :field_metricname, :validate => :string, :default => "CW_metricname"
VALID_UNITS = ["Seconds", "Microseconds", "Milliseconds", "Bytes",
"Kilobytes", "Megabytes", "Gigabytes", "Terabytes",
"Bits", "Kilobits", "Megabits", "Gigabits", "Terabits",
"Percent", COUNT_UNIT, "Bytes/Second", "Kilobytes/Second",
"Megabytes/Second", "Gigabytes/Second", "Terabytes/Second",
"Bits/Second", "Kilobits/Second", "Megabits/Second",
"Gigabits/Second", "Terabits/Second", "Count/Second", NONE]
# The default unit to use for events which do not have a "CW_unit" field
config :unit, :validate => VALID_UNITS, :default => COUNT_UNIT
# The name of the field used to set the unit on an event metric
config :field_unit, :validate => :string, :default => "CW_unit"
# The default value to use for events which do not have a "CW_value" field
# If provided, this must be a string which can be converted to a float, for example...
# "1", "2.34", ".5", and "0.67"
config :value, :validate => :string, :default => "1"
# The name of the field used to set the value (float) on an event metric
config :field_value, :validate => :string, :default => "CW_value"
# The name of the field used to set the dimension name on an event metric
config :field_dimensionname, :validate => :string, :default => "CW_dimensionName"
# The default dimensions [ name, value, ... ] to use for events which do not have a "CW_dimensions" field
config :dimensions, :validate => :hash
# The name of the field used to set the dimension value on an event metric
config :field_dimensionvalue, :validate => :string, :default => "CW_dimensionValue"
# aggregate_key members
DIM_NAME = "dimensionName"
DIM_VALUE = "dimensionValue"
TIMESTAMP = "timestamp"
METRIC = "metric"
COUNT = "count"
UNIT = "unit"
SUM = "sum"
MIN = "min"
MAX = "max"
# Units
COUNT_UNIT = "Count"
NONE = "None"
# The name of the field used to set the dimensions on an event metric
# this field named here, if present in an event, must have an array of
# one or more key & value pairs, for example...
# add_field => [ "CW_dimensions", "Environment", "CW_dimensions", "prod" ]
# or, equivalently...
# add_field => [ "CW_dimensions", "Environment" ]
# add_field => [ "CW_dimensions", "prod" ]
config :field_dimensions, :validate => :string, :default => "CW_dimensions"
public
def register
require "thread"
require "rufus/scheduler"
require "aws"
AWS.config(
:access_key_id => @access_key,
:secret_access_key => @secret_key,
@ -88,15 +126,13 @@ class LogStash::Outputs::CloudWatch < LogStash::Outputs::Base
)
@cw = AWS::CloudWatch.new
@valid_units = ["Seconds", "Microseconds", "Milliseconds", "Bytes", "Kilobytes", "Megabytes", "Gigabytes", "Terabytes", "Bits", "Kilobits", "Megabits", "Gigabits", "Terabits", "Percent", COUNT_UNIT, "Bytes/Second", "Kilobytes/Second", "Megabytes/Second", "Gigabytes/Second", "Terabytes/Second", "Bits/Second", "Kilobits/Second", "Megabits/Second", "Gigabits/Second", "Terabits/Second", "Count/Second", NONE]
@event_queue = Queue.new
@event_queue = SizedQueue.new(@queue_size)
@scheduler = Rufus::Scheduler.start_new
@job = @scheduler.every @timeframe do
@logger.info("Scheduler Activated")
send(aggregate({}))
publish(aggregate({}))
end
end
end # def register
public
def receive(event)
@ -110,18 +146,23 @@ class LogStash::Outputs::CloudWatch < LogStash::Outputs::Base
return
end
return unless event.fields.member?(@field_metric)
return unless (event[@field_metricname] || @metricname)
if (@event_queue.length >= @event_queue.max)
@job.trigger
@logger.warn("Posted to AWS CloudWatch ahead of schedule. If you see this often, consider increasing the cloudwatch queue_size option.")
end
@logger.info("Queueing event", :event => event)
@event_queue << event
end # def receive
private
def send(aggregates)
aggregates.each { |namespace, data|
def publish(aggregates)
aggregates.each do |namespace, data|
@logger.info("Namespace, data: ", :namespace => namespace, :data => data)
metric_data = []
data.each { |aggregate_key, stats|
data.each do |aggregate_key, stats|
new_data = {
:metric_name => aggregate_key[METRIC],
:timestamp => aggregate_key[TIMESTAMP],
@ -133,36 +174,36 @@ class LogStash::Outputs::CloudWatch < LogStash::Outputs::Base
:maximum => stats[MAX],
}
}
if (aggregate_key[DIM_NAME] != nil && aggregate_key[DIM_VALUE] != nil)
new_data[:dimensions] = [{
:name => aggregate_key[DIM_NAME],
:value => aggregate_key[DIM_VALUE]
}]
dims = aggregate_key[DIMENSIONS]
if (dims.is_a?(Array) && dims.length > 0 && (dims.length % 2) == 0)
new_data[:dimensions] = Array.new
i = 0
while (i < dims.length)
new_data[:dimensions] << {:name => dims[i], :value => dims[i+1]}
i += 2
end
end
metric_data << new_data
} # data.each
end # data.each
begin
response = @cw.put_metric_data(
:namespace => namespace,
:metric_data => metric_data
)
@logger.info("Sent data to AWS CloudWatch OK")
@logger.info("Sent data to AWS CloudWatch OK", :namespace => namespace, :metric_data => metric_data)
rescue Exception => e
@logger.warn("Failed to send to AWS CloudWatch", :exception => e, :namespace => namespace, :metric_data => metric_data)
break
end
} # aggregates.each
end # aggregates.each
return aggregates
end
# def send
end# def publish
private
def aggregate(aggregates)
@logger.info("QUEUE SIZE ", :queuesize => @event_queue.size)
until @event_queue.empty? do
while !@event_queue.empty? do
begin
count(aggregates, @event_queue.pop(true))
rescue Exception => e
@ -171,52 +212,67 @@ class LogStash::Outputs::CloudWatch < LogStash::Outputs::Base
end
end
return aggregates
end
end # def aggregate
private
def count(aggregates, event)
# If the event doesn't declare a namespace, use the default
fnamespace = field(event, @field_namespace)
namespace = (fnamespace ? fnamespace : event.sprintf(@namespace))
# If the event doesnt declare a namespace, use the default
ns = field(event, @field_namespace)
namespace = (!ns) ? @namespace : ns
funit = field(event, @field_unit)
unit = (funit ? funit : event.sprintf(@unit))
unit = field(event, @field_unit)
value = field(event, @field_value)
fvalue = field(event, @field_value)
value = (fvalue ? fvalue : event.sprintf(@value))
# If neither Units nor Value is set, then we simply count the event
if (!unit && !value)
unit = COUNT
value = "1"
# We may get to this point with valid Units but missing value. Send zeros.
val = (!value) ? 0.0 : value.to_f
# Event provides exactly one (but not both) of value or unit
if ( (fvalue == nil) ^ (funit == nil) )
@logger.warn("Likely config error: event has one of #{@field_value} or #{@field_unit} fields but not both.", :event => event)
end
# If Units is still not set (or is invalid), then we know Value must BE set, so set Units to "None"
# And warn about misconfiguration
if (!unit || !@valid_units.include?(unit))
# If Unit is still not set or is invalid warn about misconfiguration & use NONE
if (!VALID_UNITS.include?(unit))
unit = NONE
@logger.warn("Possible config error: CloudWatch Value found with invalid or missing Units")
@logger.warn("Likely config error: invalid or missing Units (#{unit.to_s}), using '#{NONE}' instead", :event => event)
end
if (!aggregates[namespace])
aggregates[namespace] = {}
@logger.info("INITIALIZING NAMESPACE DATA")
end
dims = event[@field_dimensions]
if (dims) # event provides dimensions
# validate the structure
if (!dims.is_a?(Array) || dims.length == 0 || (dims.length % 2) != 0)
@logger.warn("Likely config error: CloudWatch dimensions field (#{dims.to_s}) found which is not a positive- & even-length array. Ignoring it.", :event => event)
dims = nil
end
# Best case, we get here and exit the conditional because dims...
# - is an array
# - with positive length
# - and an even number of elements
elsif (@dimensions.is_a?(Hash)) # event did not provide dimensions, but the output has been configured with a default
dims = @dimensions.flatten.map{|d| event.sprintf(d)} # into the kind of array described just above
else
dims = nil
end
fmetric = field(event, @field_metricname)
aggregate_key = {
METRIC => field(event, @field_metric),
DIM_NAME => field(event, @field_dimensionname),
DIM_VALUE => field(event, @field_dimensionvalue),
METRIC => (fmetric ? fmetric : event.sprintf(@metricname)),
DIMENSIONS => dims,
UNIT => unit,
TIMESTAMP => normalizeTimestamp(event.timestamp)
TIMESTAMP => event.sprintf("%{+YYYY-MM-dd'T'HH:mm:00Z}")
}
if (!aggregates[namespace][aggregate_key])
aggregates[namespace][aggregate_key] = {}
end
# We may get to this point with valid Units but missing value. Send zeros.
val = (!value) ? 0.0 : value.to_f
if (!aggregates[namespace][aggregate_key][MAX] || val > aggregates[namespace][aggregate_key][MAX])
aggregates[namespace][aggregate_key][MAX] = val
end
@ -236,20 +292,19 @@ class LogStash::Outputs::CloudWatch < LogStash::Outputs::Base
else
aggregates[namespace][aggregate_key][SUM] += val
end
end
# Zeros out the seconds in a ISO8601 timestamp like event.timestamp
public
def normalizeTimestamp(time)
tz = (time[-1, 1] == "Z") ? "Z" : time[-5, 5]
totheminute = time[0..16]
normal = totheminute + "00.000" + tz
return normal
end
end # def count
private
def field(event, fieldname)
return event.fields.member?(fieldname) ? event.fields[fieldname][0] : nil
end
if !event[fieldname]
return nil
else
if event[fieldname].is_a?(Array)
return event[fieldname][0]
else
return event[fieldname]
end
end
end # def field
end # class LogStash::Outputs::CloudWatch

View file

@ -5,14 +5,20 @@ require "logstash/outputs/base"
# output for logstash. If you plan on using the logstash web interface, you'll
# need to use this output.
#
# *NOTE*: The elasticsearch client is version 0.19.8. Your elasticsearch
# cluster must be running 0.19.x for API compatibility.
# *VERSION NOTE*: Your elasticsearch cluster must be running elasticsearch
# %ELASTICSEARCH_VERSION%. If you use any other version of elasticsearch,
# you should consider using the [elasticsearch_http](elasticsearch_http)
# output instead.
#
# If you want to set other elasticsearch options that are not exposed directly
# as config options, there are two options:
#
# * create an elasticsearch.yml file in the $PWD of the logstash process
# * pass in es.* java properties (java -Des.node.foo= or ruby -J-Des.node.foo=)
#
# This plugin will join your elasticsearch cluster, so it will show up in
# elasticsearch's cluster health status.
#
# You can learn more about elasticsearch at <http://elasticsearch.org>
class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
@ -31,6 +37,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# similar events to the same 'type'. String expansion '%{foo}' works here.
config :index_type, :validate => :string, :default => "%{@type}"
# The document ID for the index. Overwrites any existing entry in elasticsearch with the same ID.
config :id, :validate => :string, :default => nil
# The name of your cluster if you set it on the ElasticSearch side. Useful
# for discovery.
config :cluster, :validate => :string
@ -154,7 +163,13 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
end
end
req = @client.index(index, type, event.to_hash)
if id.nil?
req = @client.index(index, type, event.to_hash)
else
id = event.sprintf(@id)
req = @client.index(index, type, id, event.to_hash)
end
increment_inflight_request_count
#timer = @logger.time("elasticsearch write")
req.on(:success) do |response|

View file

@ -3,8 +3,9 @@ require "logstash/outputs/base"
# This output lets you store logs in elasticsearch.
#
# This output differs from the 'elasticsearch' output by using the HTTP
# interface for indexing data with elasticsearch.
# This plugin uses the HTTP/REST interface to ElasticSearch, which usually
# lets you use any version of elasticsearch server. It is known to work
# with elasticsearch %ELASTICSEARCH_VERSION%
#
# You can learn more about elasticsearch at <http://elasticsearch.org>
class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base

View file

@ -57,9 +57,13 @@ class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base
# AMQP vhost
config :vhost, :validate => :string, :default => "/"
# AMQP queue name
config :name, :validate => :string, :default => "elasticsearch"
# AMQP queue name. Depricated due to conflicts with puppet naming convention.
# Replaced by 'queue' variable. See LOGSTASH-755
config :name, :validate => :string, :deprecated => true
# AMQP queue name
config :queue, :validate => :string, :default => "elasticsearch"
# AMQP exchange name
config :exchange, :validate => :string, :default => "elasticsearch"
@ -78,6 +82,14 @@ class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base
public
def register
if @name
if @queue
@logger.error("'name' and 'queue' are the same setting, but 'name' is deprecated. Please use only 'queue'")
end
@queue = @name
end
# TODO(sissel): find a better way of declaring where the elasticsearch
# libraries are
# TODO(sissel): can skip this step if we're running from a jar.
@ -126,7 +138,7 @@ class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base
"user" => @user,
"pass" => @password,
"vhost" => @vhost,
"queue" => @name,
"queue" => @queue,
"exchange" => @exchange,
"routing_key" => @key,
"exchange_type" => @exchange_type,

View file

@ -29,7 +29,7 @@ class LogStash::Outputs::Gelf < LogStash::Outputs::Base
# useful if you want to parse the 'log level' from an event and use that
# as the gelf level/severity.
#
# Values here can be integers [0..7] inclusive or any of
# Values here can be integers [0..7] inclusive or any of
# "debug", "info", "warn", "error", "fatal", "unknown" (case insensitive).
# Single-character versions of these are also valid, "d", "i", "w", "e", "f",
# "u"
@ -88,9 +88,9 @@ class LogStash::Outputs::Gelf < LogStash::Outputs::Base
# If we leave that set, the gelf gem will extract the file and line number
# of the source file that logged the message (i.e. logstash/gelf.rb:138).
# With that set to false, it can use the actual event's filename (i.e.
# With that set to false, it can use the actual event's filename (i.e.
# /var/log/syslog), which is much more useful
@gelf.collect_file_and_line = false
@gelf.collect_file_and_line = false
# these are syslog words and abbreviations mapped to RFC 5424 integers
@level_map = {
@ -162,10 +162,10 @@ class LogStash::Outputs::Gelf < LogStash::Outputs::Base
if @level.is_a?(Array)
@level.each do |value|
parsed_value = event.sprintf(value)
if parsed_value
level = parsed_value
break
end
next if value.count('%{') > 0 and parsed_value == value
level = parsed_value
break
end
else
level = event.sprintf(@level.to_s)

View file

@ -16,7 +16,10 @@ class LogStash::Outputs::Gemfire < LogStash::Outputs::Base
plugin_status "experimental"
# Your client cache name
config :name, :validate => :string, :default => "logstash"
config :name, :validate => :string, :deprecated => true
# Your client cache name
config :cache_name, :validate => :string, :default => "logstash"
# The path to a GemFire client cache XML file.
#
@ -40,6 +43,14 @@ class LogStash::Outputs::Gemfire < LogStash::Outputs::Base
# A sprintf format to use when building keys
config :key_format, :validate => :string, :default => "%{@source}-%{@timestamp}"
if @name
if @cache_name
@logger.error("'name' and 'cache_name' are the same setting, but 'name' is deprecated. Please use only 'cache_name'")
end
@cache_name = @name
end
public
def register
import com.gemstone.gemfire.cache.client.ClientCacheFactory
@ -52,10 +63,10 @@ class LogStash::Outputs::Gemfire < LogStash::Outputs::Base
public
def connect
begin
@logger.debug("Connecting to GemFire #{@name}")
@logger.debug("Connecting to GemFire #{@cache_name}")
@cache = ClientCacheFactory.new.
set("name", @name).
set("name", @cache_name).
set("cache-xml-file", @cache_xml_file).create
@logger.debug("Created cache #{@cache.inspect}")
@ -90,7 +101,7 @@ class LogStash::Outputs::Gemfire < LogStash::Outputs::Base
public
def to_s
return "gemfire://#{name}"
return "gemfire://#{cache_name}"
end
public

View file

@ -28,6 +28,9 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base
# coerced will zero (0)
config :metrics, :validate => :hash, :required => true
# Enable debug output
config :debug, :validate => :boolean, :default => false
def register
connect
end # def register
@ -52,8 +55,13 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base
# Catch exceptions like ECONNRESET and friends, reconnect on failure.
@metrics.each do |metric, value|
@logger.debug("processing", :metric => metric, :value => value)
message = [event.sprintf(metric), event.sprintf(value).to_f,
event.sprintf("%{+%s}")].join(" ")
@logger.debug("Sending carbon message", :message => message, :host => @host, :port => @port)
# TODO(sissel): Test error cases. Catch exceptions. Find fortune and glory.
begin
@socket.puts(message)

View file

@ -101,12 +101,16 @@ class LogStash::Outputs::Http < LogStash::Outputs::Base
else
request.body = encode(evt)
end
puts request
puts
puts request.body
#puts "#{request.port} / #{request.protocol}"
#puts request
#puts
#puts request.body
response = @agent.execute(request)
puts response
response.read_body { |c| puts c }
# Consume body to let this connection be reused
rbody = ""
response.read_body { |c| rbody << c }
#puts rbody
rescue Exception => e
@logger.warn("Unhandled exception", :request => request, :response => response, :exception => e, :stacktrace => e.backtrace)
end

View file

@ -24,6 +24,9 @@ class LogStash::Outputs::Irc < LogStash::Outputs::Base
# IRC Real name
config :real, :validate => :string, :default => "logstash"
# IRC server password
config :password, :validate => :password
# Channels to broadcast to
config :channels, :validate => :array, :required => true
@ -45,8 +48,6 @@ class LogStash::Outputs::Irc < LogStash::Outputs::Base
c.user = @user
c.realname = @real
c.channels = @channels
c.channels = @channels
c.channels = @channels
c.password = @password
end
Thread.new(@bot) do |bot|

View file

@ -0,0 +1,139 @@
require "logstash/outputs/base"
require "logstash/namespace"
require "date"
# Send events to a syslog server.
#
# You can send messages compliant with RFC3164 or RFC5424
# UDP or TCP syslog transport is supported
class LogStash::Outputs::Syslog < LogStash::Outputs::Base
config_name "syslog"
plugin_status "experimental"
FACILITY_LABELS = [
"kernel",
"user-level",
"mail",
"daemon",
"security/authorization",
"syslogd",
"line printer",
"network news",
"uucp",
"clock",
"security/authorization",
"ftp",
"ntp",
"log audit",
"log alert",
"clock",
"local0",
"local1",
"local2",
"local3",
"local4",
"local5",
"local6",
"local7",
]
SEVERITY_LABELS = [
"emergency",
"alert",
"critical",
"error",
"warning",
"notice",
"informational",
"debug",
]
# syslog server address to connect to
config :host, :validate => :string, :required => true
# syslog server port to connect to
config :port, :validate => :number, :required => true
# syslog server protocol. you can choose between udp and tcp
config :protocol, :validate => ["tcp", "udp"], :default => "udp"
# facility label for syslog message
config :facility, :validate => FACILITY_LABELS, :required => true
# severity label for syslog message
config :severity, :validate => SEVERITY_LABELS, :required => true
# source host for syslog message
config :sourcehost, :validate => :string, :default => "%{@source_host}"
# timestamp for syslog message
config :timestamp, :validate => :string, :default => "%{@timestamp}"
# application name for syslog message
config :appname, :validate => :string, :default => "LOGSTASH"
# process id for syslog message
config :procid, :validate => :string, :default => "-"
# message id for syslog message
config :msgid, :validate => :string, :default => "-"
# syslog message format: you can choose between rfc3164 or rfc5424
config :rfc, :validate => ["rfc3164", "rfc5424"], :default => "rfc3164"
public
def register
@client_socket = nil
end
private
def udp?
@protocol == "udp"
end
private
def rfc3164?
@rfc == "rfc3164"
end
private
def connect
if udp?
@client_socket = UDPSocket.new
@client_socket.connect(@host, @port)
else
@client_socket = TCPSocket.new(@host, @port)
end
end
public
def receive(event)
return unless output?(event)
sourcehost = event.sprintf(@sourcehost)
facility_code = FACILITY_LABELS.index(@facility)
severity_code = SEVERITY_LABELS.index(@severity)
priority = (facility_code * 8) + severity_code
if rfc3164?
timestamp = DateTime.iso8601(event.sprintf(@timestamp)).strftime("%b %e %H:%M:%S")
syslog_msg = "<"+priority.to_s()+">"+timestamp+" "+sourcehost+" "+@appname+"["+@procid+"]: "+event.message
else
timestamp = DateTime.iso8601(event.sprintf(@timestamp)).rfc3339()
syslog_msg = "<"+priority.to_s()+">1 "+timestamp+" "+sourcehost+" "+@appname+" "+@procid+" "+@msgid+" - "+event.message
end
begin
connect unless @client_socket
@client_socket.write(syslog_msg + "\n")
rescue => e
@logger.warn(@protocol+" output exception", :host => @host, :port => @port,
:exception => e, :backtrace => e.backtrace)
@client_socket.close
end
end
end

View file

@ -26,6 +26,14 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
# `client` connects to a server.
config :mode, :validate => ["server", "client"], :default => "client"
# The format to use when writing events to the file. This value
# supports any string and can include %{name} and other dynamic
# strings.
#
# If this setting is omitted, the full json representation of the
# event will be written as a single line.
config :message_format, :validate => :string
class Client
public
def initialize(socket, logger)
@ -89,19 +97,22 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
def receive(event)
return unless output?(event)
wire_event = event.to_hash.to_json + "\n"
if @message_format
output = event.sprintf(@message_format) + "\n"
else
output = event.to_hash.to_json + "\n"
end
if server?
@client_threads.each do |client_thread|
client_thread[:client].write(wire_event)
client_thread[:client].write(output)
end
@client_threads.reject! {|t| !t.alive? }
else
begin
connect unless @client_socket
@client_socket.write(event.to_hash.to_json)
@client_socket.write("\n")
@client_socket.write(output)
rescue => e
@logger.warn("tcp output exception", :host => @host, :port => @port,
:exception => e, :backtrace => e.backtrace)

View file

@ -16,6 +16,41 @@ require "logstash/namespace"
require "logstash/program"
require "logstash/util"
if ENV["PROFILE_BAD_LOG_CALLS"]
# Set PROFILE_BAD_LOG_CALLS=1 in your environment if you want
# to track down logger calls that cause performance problems
#
# Related research here:
# https://github.com/jordansissel/experiments/tree/master/ruby/logger-string-vs-block
#
# Basically, the following is wastes tons of effort creating objects that are
# never used if the log level hides the log:
#
# logger.debug("something happend", :what => Happened)
#
# This is shown to be 4x faster:
#
# logger.debug(...) if logger.debug?
#
# I originally intended to use RubyParser and SexpProcessor to
# process all the logstash ruby code offline, but it was much
# faster to write this monkeypatch to warn as things are called.
require "cabin/mixins/logger"
module Cabin::Mixins::Logger
LEVELS.keys.each do |level|
m = "original_#{level}".to_sym
predicate = "#{level}?".to_sym
alias_method m, level
define_method(level) do |*args|
if !send(predicate)
warn("Unconditional log call", :location => caller[0])
end
send(m, *args)
end
end
end
end
class LogStash::Runner
include LogStash::Program
@ -129,6 +164,9 @@ class LogStash::Runner
return @result
end
end
$: << File.expand_path("#{File.dirname(__FILE__)}/../../spec")
require "test_utils"
rspec = runner.new(fixedargs)
rspec.run
@runners << rspec

View file

@ -8,7 +8,7 @@ require "logstash/namespace"
# >> LogStash::Time.now.utc.to_iso8601
# => "2010-10-17 07:25:26.788704Z"
module LogStash::Time
if RUBY_ENGINE == "jruby"
if defined?(RUBY_ENGINE) && RUBY_ENGINE == "jruby"
require "java"
DateTime = org.joda.time.DateTime
DateTimeZone = org.joda.time.DateTimeZone

View file

@ -12,7 +12,7 @@
%i
You can click on any search result to see what kind of fields we know about
for that event. You can also click on the graph to zoom to that time period.
The query language is that of Lucene's string query (<a href="http://lucene.apache.org/java/3_4_0/queryparsersyntax.html">docs</a>).
The query language is that of Lucene's string query (<a href="http://lucene.apache.org/core/3_6_1/queryparsersyntax.html">docs</a>).
#visual

View file

@ -15,6 +15,7 @@ Gem::Specification.new do |gem|
lib/logstash/namespace.rb
lib/logstash/time.rb
lib/logstash/version.rb
spec/event.rb
LICENSE
}
@ -22,4 +23,7 @@ Gem::Specification.new do |gem|
gem.name = "logstash-event"
gem.require_paths = ["lib"]
gem.version = LOGSTASH_VERSION
gem.add_development_dependency "rspec"
gem.add_development_dependency "insist", "0.0.8"
end

View file

@ -57,6 +57,8 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "jls-lumberjack", ["0.0.7"]
gem.add_runtime_dependency "geoip", [">= 1.1.0"]
gem.add_runtime_dependency "beefcake", "0.3.7"
gem.add_runtime_dependency "php-serialize" # For input drupal_dblog
gem.add_runtime_dependency "murmurhash3"
gem.add_runtime_dependency "rufus-scheduler"
if RUBY_PLATFORM == 'java'
@ -65,8 +67,10 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "jruby-httpclient"
gem.add_runtime_dependency "jruby-openssl"
gem.add_runtime_dependency "jruby-win32ole"
gem.add_runtime_dependency "jdbc-mysql" # For input drupal_dblog
else
gem.add_runtime_dependency "excon"
gem.add_runtime_dependency "mysql2" # For input drupal_dblog
end
if RUBY_VERSION >= '1.9.1'

View file

@ -68,7 +68,7 @@ SECOND (?:(?:[0-5][0-9]|60)(?:[.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
DATE_EU %{YEAR}[/-]%{MONTHNUM}[/-]%{MONTHDAY}
DATE_EU %{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}
ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
ISO8601_SECOND (?:%{SECOND}|60)
TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?

View file

@ -1,2 +1,2 @@
RUBY_LOGLEVEL (?:DEBUG|FATAL|ERROR|WARN|INFO)
RUBY_LOGGER [DFEWI], \[%{TIMESTAMP_ISO8601} #{POSINT:pid}\] *%{RUBY_LOGLEVEL} -- %{DATA:progname}: %{DATA:message}
RUBY_LOGGER [DFEWI], \[%{TIMESTAMP_ISO8601:timestamp} #%{POSINT:pid}\] *%{RUBY_LOGLEVEL:loglevel} -- *%{DATA:progname}: %{GREEDYDATA:message}

View file

@ -0,0 +1,63 @@
require "test_utils"
describe "apache common log format" do
extend LogStash::RSpec
# The logstash config goes here.
# At this time, only filters are supported.
config_yaml <<-CONFIG
filter:
- grok:
pattern: "%{COMBINEDAPACHELOG}"
singles: true
- date:
timestamp: "dd/MMM/yyyy:HH:mm:ss Z"
CONFIG
# Here we provide a sample log event for the testing suite.
#
# Any filters you define above will be applied the same way the logstash
# agent performs. Inside the 'sample ... ' block the 'subject' will be
# a LogStash::Event object for you to inspect and verify for correctness.
sample '198.151.8.4 - - [29/Aug/2012:20:17:38 -0400] "GET /favicon.ico HTTP/1.1" 200 3638 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:14.0) Gecko/20100101 Firefox/14.0.1"' do
# These 'insist' and 'reject' calls use my 'insist' rubygem.
# See http://rubydoc.info/gems/insist for more info.
# Require that grok does not fail to parse this event.
reject { subject["@tags"] }.include?("_grokparsefailure")
# Ensure that grok captures certain expected fields.
insist { subject }.include?("agent")
insist { subject }.include?("bytes")
insist { subject }.include?("clientip")
insist { subject }.include?("httpversion")
insist { subject }.include?("timestamp")
insist { subject }.include?("verb")
insist { subject }.include?("response")
insist { subject }.include?("request")
# Ensure that those fields match expected values from the event.
insist { subject["clientip"] } == "198.151.8.4"
insist { subject["timestamp"] } == "29/Aug/2012:20:17:38 -0400"
insist { subject["verb"] } == "GET"
insist { subject["request"] } == "/favicon.ico"
insist { subject["httpversion"] } == "1.1"
insist { subject["response"] } == "200"
insist { subject["bytes"] } == "3638"
insist { subject["referrer"] } == '"-"'
insist { subject["agent"] } == "\"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:14.0) Gecko/20100101 Firefox/14.0.1\""
# Verify date parsing
insist { subject.timestamp } == "2012-08-30T00:17:38.000Z"
end
sample '61.135.248.195 - - [26/Sep/2012:11:49:20 -0400] "GET /projects/keynav/ HTTP/1.1" 200 18985 "" "Mozilla/5.0 (compatible; YodaoBot/1.0; http://www.yodao.com/help/webmaster/spider/; )"' do
reject { subject["@tags"] }.include?("_grokparsefailure")
insist { subject["clientip"] } == "61.135.248.195"
end
sample '72.14.164.185 - - [25/Sep/2012:12:05:02 -0400] "GET /robots.txt HTTP/1.1" 200 - "www.brandimensions.com" "BDFetch"' do
reject { subject["@tags"] }.include?("_grokparsefailure")
end
end

185
spec/filters/anonymize.rb Normal file
View file

@ -0,0 +1,185 @@
require "test_utils"
require "logstash/filters/anonymize"
describe LogStash::Filters::Anonymize do
extend LogStash::RSpec
describe "anonymize string with SHA alogrithm" do
# The logstash config goes here.
# At this time, only filters are supported.
config <<-CONFIG
filter {
anonymize {
fields => ["clientip"]
key => "longencryptionkey"
algorithm => 'SHA'
}
}
CONFIG
sample "@fields" => {"clientip" => "123.123.123.123"} do
insist { subject["clientip"] } == "0d01b2191194d261fa1a2e7c18a38d44953ab4e2"
end
end
describe "anonymize ipaddress with IPV4_NETWORK algorithm" do
# The logstash config goes here.
# At this time, only filters are supported.
config <<-CONFIG
filter {
anonymize {
fields => ["clientip"]
algorithm => "IPV4_NETWORK"
key => 24
}
}
CONFIG
sample "@fields" => {"clientip" => "233.255.13.44"} do
insist { subject["clientip"] } == "233.255.13.0"
end
end
describe "anonymize string with MURMUR3 algorithm" do
config <<-CONFIG
filter {
anonymize {
fields => ["clientip"]
algorithm => "MURMUR3"
key => ""
}
}
CONFIG
sample "@fields" => {"clientip" => "123.52.122.33"} do
insist { subject["clientip"] } == 1541804874
end
end
describe "anonymize string with SHA1 alogrithm" do
# The logstash config goes here.
# At this time, only filters are supported.
config <<-CONFIG
filter {
anonymize {
fields => ["clientip"]
key => "longencryptionkey"
algorithm => 'SHA1'
}
}
CONFIG
sample "@fields" => {"clientip" => "123.123.123.123"} do
insist { subject["clientip"] } == "fdc60acc4773dc5ac569ffb78fcb93c9630797f4"
end
end
describe "anonymize string with SHA224 alogrithm" do
# The logstash config goes here.
# At this time, only filters are supported.
config <<-CONFIG
filter {
anonymize {
fields => ["clientip"]
key => "longencryptionkey"
algorithm => 'SHA224'
}
}
CONFIG
sample "@fields" => {"clientip" => "123.123.123.123"} do
insist { subject["clientip"] } == "5744bbcc4f64acb6a805b7fee3013a8958cc8782d3fb0fb318cec915"
end
end
describe "anonymize string with SHA256 alogrithm" do
# The logstash config goes here.
# At this time, only filters are supported.
config <<-CONFIG
filter {
anonymize {
fields => ["clientip"]
key => "longencryptionkey"
algorithm => 'SHA256'
}
}
CONFIG
sample "@fields" => {"clientip" => "123.123.123.123"} do
insist { subject["clientip"] } == "345bec3eff242d53b568916c2610b3e393d885d6b96d643f38494fd74bf4a9ca"
end
end
describe "anonymize string with SHA384 alogrithm" do
# The logstash config goes here.
# At this time, only filters are supported.
config <<-CONFIG
filter {
anonymize {
fields => ["clientip"]
key => "longencryptionkey"
algorithm => 'SHA384'
}
}
CONFIG
sample "@fields" => {"clientip" => "123.123.123.123"} do
insist { subject["clientip"] } == "22d4c0e8c4fbcdc4887d2038fca7650f0e2e0e2457ff41c06eb2a980dded6749561c814fe182aff93e2538d18593947a"
end
end
describe "anonymize string with SHA512 alogrithm" do
# The logstash config goes here.
# At this time, only filters are supported.
config <<-CONFIG
filter {
anonymize {
fields => ["clientip"]
key => "longencryptionkey"
algorithm => 'SHA512'
}
}
CONFIG
sample "@fields" => {"clientip" => "123.123.123.123"} do
insist { subject["clientip"] } == "11c19b326936c08d6c50a3c847d883e5a1362e6a64dd55201a25f2c1ac1b673f7d8bf15b8f112a4978276d573275e3b14166e17246f670c2a539401c5bfdace8"
end
end
describe "anonymize string with MD4 alogrithm" do
# The logstash config goes here.
# At this time, only filters are supported.
config <<-CONFIG
filter {
anonymize {
fields => ["clientip"]
key => "longencryptionkey"
algorithm => 'MD4'
}
}
CONFIG
sample "@fields" => {"clientip" => "123.123.123.123"} do
insist { subject["clientip"] } == "0845cb571ab3646e51a07bcabf05e33d"
end
end
describe "anonymize string with MD5 alogrithm" do
# The logstash config goes here.
# At this time, only filters are supported.
config <<-CONFIG
filter {
anonymize {
fields => ["clientip"]
key => "longencryptionkey"
algorithm => 'MD5'
}
}
CONFIG
sample "@fields" => {"clientip" => "123.123.123.123"} do
insist { subject["clientip"] } == "9336c879e305c9604a3843fc3e75948f"
end
end
end

View file

@ -38,6 +38,21 @@ describe LogStash::Filters::CSV do
end
end
describe "custom separator" do
config <<-CONFIG
filter {
csv {
separator => ";"
}
}
CONFIG
sample "big,bird;sesame street" do
insist { subject["field1"] } == "big,bird"
insist { subject["field2"] } == "sesame street"
end
end
describe "parse csv with more data than defined field names" do
config <<-CONFIG
filter {

View file

@ -161,7 +161,28 @@ describe LogStash::Filters::Date do
end
end
describe "accept match config option with hash value like grep (LOGSTASH-735)" do
describe "TAI64N support" do
config <<-'CONFIG'
filter {
date {
t => TAI64N
}
}
CONFIG
# Try without leading "@"
sample({ "@fields" => { "t" => "4000000050d506482dbdf024" } }) do
insist { subject.timestamp } == "2012-12-22T01:00:46.767Z"
end
# Should still parse successfully if it's a full tai64n time (with leading
# '@')
sample({ "@fields" => { "t" => "@4000000050d506482dbdf024" } }) do
insist { subject.timestamp } == "2012-12-22T01:00:46.767Z"
end
end
describe "accept match config option with hash value (LOGSTASH-735)" do
config <<-CONFIG
filter {
date {
@ -179,3 +200,4 @@ describe LogStash::Filters::Date do
end
end
end

View file

@ -24,6 +24,19 @@ describe LogStash::Filters::KV do
end
describe "LOGSTASH-624: allow escaped space in key or value " do
config <<-CONFIG
filter {
kv { value_split => ':' }
}
CONFIG
sample 'IKE:=Quick\ Mode\ completion IKE\ IDs:=subnet:\ x.x.x.x\ (mask=\ 255.255.255.254)\ and\ host:\ y.y.y.y' do
insist { subject["IKE"] } == '=Quick\ Mode\ completion'
insist { subject['IKE\ IDs'] } == '=subnet:\ x.x.x.x\ (mask=\ 255.255.255.254)\ and\ host:\ y.y.y.y'
end
end
describe "test value_split" do
config <<-CONFIG
filter {
@ -61,6 +74,20 @@ describe LogStash::Filters::KV do
end
describe "delimited fields should override space default (reported by LOGSTASH-733)" do
config <<-CONFIG
filter {
kv { field_split => "|" }
}
CONFIG
sample "field1=test|field2=another test|field3=test3" do
insist { subject["field1"] } == "test"
insist { subject["field2"] } == "another test"
insist { subject["field3"] } == "test3"
end
end
describe "test prefix" do
config <<-CONFIG
filter {

View file

@ -133,7 +133,7 @@ describe LogStash::Filters::Mutate do
end
end
describe "regression - check grok+mutate" do
describe "regression - mutate should lowercase a field created by grok" do
config <<-CONFIG
filter {
grok {
@ -149,4 +149,20 @@ describe LogStash::Filters::Mutate do
insist { subject["foo"] } == ['hello']
end
end
describe "LOGSTASH-757: rename should do nothing with a missing field" do
config <<-CONFIG
filter {
mutate {
rename => [ "nosuchfield", "hello" ]
}
}
CONFIG
sample "whatever" do
reject { subject.fields }.include?("nosuchfield")
reject { subject.fields }.include?("hello")
end
end
end

210
spec/inputs/tcp.rb Normal file
View file

@ -0,0 +1,210 @@
# coding: utf-8
require "test_utils"
require "socket"
# Not sure why but each test need a different port
# TODO: timeout around the thread.join
describe "inputs/tcp" do
extend LogStash::RSpec
describe "read json_event" do
event_count = 10
port = 5511
config <<-CONFIG
input {
tcp {
type => "blah"
port => #{port}
format => "json_event"
}
}
CONFIG
th = Thread.current
input do |plugins|
sequence = 0
tcp = plugins.first
output = Shiftback.new do |event|
sequence += 1
tcp.teardown if sequence == event_count
begin
insist { event["sequence"] } == sequence -1
insist { event["message"]} == "Hello ü Û"
insist { event["message"].encoding } == Encoding.find("UTF-8")
rescue Exception => failure
# Get out of the threads nets
th.raise failure
end
end
#Prepare input
tcp.register
#Run input in a separate thread
thread = Thread.new(tcp, output) do |*args|
tcp.run(output)
end
#Send events from clients sockets
event_count.times do |value|
client_socket = TCPSocket.new("0.0.0.0", port)
event = LogStash::Event.new("@fields" => { "message" => "Hello ü Û", "sequence" => value })
client_socket.puts event.to_json
client_socket.close
# micro sleep to ensure sequencing
sleep(0.1)
end
#wait for input termination
thread.join
end # input
end
describe "read plain events with system defaults, should works on UTF-8 system" do
event_count = 10
port = 5512
config <<-CONFIG
input {
tcp {
type => "blah"
port => #{port}
}
}
CONFIG
th = Thread.current
input do |plugins|
sequence = 0
tcp = plugins.first
output = Shiftback.new do |event|
sequence += 1
tcp.teardown if sequence == event_count
begin
insist { event.message } == "Hello ü Û"
insist { event.message.encoding } == Encoding.find("UTF-8")
rescue Exception => failure
# Get out of the threads nets
th.raise failure
end
end
tcp.register
#Run input in a separate thread
thread = Thread.new(tcp, output) do |*args|
tcp.run(output)
end
#Send events from clients sockets
event_count.times do |value|
client_socket = TCPSocket.new("0.0.0.0", port)
client_socket.write "Hello ü Û"
client_socket.close
# micro sleep to ensure sequencing
sleep(0.1)
end
#wait for input termination
thread.join
end # input
end
describe "read plain events with UTF-8 like charset, to prove that something is wrong with previous failing test" do
event_count = 10
port = 5514
config <<-CONFIG
input {
tcp {
type => "blah"
port => #{port}
charset => "CP65001" #that's just an alias of UTF-8
}
}
CONFIG
th = Thread.current
# Catch aborting reception threads
input do |plugins|
sequence = 0
tcp = plugins.first
output = Shiftback.new do |event|
sequence += 1
tcp.teardown if sequence == event_count
begin
insist { event.message } == "Hello ü Û"
insist { event.message.encoding } == Encoding.find("UTF-8")
rescue Exception => failure
# Get out of the threads nets
th.raise failure
end
end
tcp.register
#Run input in a separate thread
thread = Thread.new(tcp, output) do |*args|
tcp.run(output)
end
#Send events from clients sockets
event_count.times do |value|
client_socket = TCPSocket.new("0.0.0.0", port)
# puts "Encoding of client", client_socket.external_encoding, client_socket.internal_encoding
client_socket.write "Hello ü Û"
client_socket.close
# micro sleep to ensure sequencing, TODO must think of a cleaner solution
sleep(0.1)
end
#wait for input termination
#TODO: timeout
thread.join
end # input
end
describe "read plain events with ISO-8859-1 charset" do
event_count = 10
port = 5513
charset = "ISO-8859-1"
config <<-CONFIG
input {
tcp {
type => "blah"
port => #{port}
charset => "#{charset}"
}
}
CONFIG
th = Thread.current
input do |plugins|
sequence = 0
tcp = plugins.first
output = Shiftback.new do |event|
sequence += 1
tcp.teardown if sequence == event_count
begin
insist { event.message } == "Hello ü Û"
insist { event.message.encoding } == Encoding.find("UTF-8")
rescue Exception => failure
# Get out of the threads nets
th.raise failure
end
end
tcp.register
#Run input in a separate thread
thread = Thread.new(tcp, output) do |*args|
tcp.run(output)
end
#Send events from clients sockets
event_count.times do |value|
client_socket = TCPSocket.new("0.0.0.0", port)
#Force client encoding
client_socket.set_encoding(charset)
client_socket.write "Hello ü Û"
client_socket.close
# micro sleep to ensure sequencing
sleep(0.1)
end
#wait for input termination
thread.join
end # input
end
end

View file

@ -1,4 +1,5 @@
require "insist"
require "logstash/agent"
require "logstash/event"
require "insist"
require "stud/try"
@ -18,6 +19,11 @@ module LogStash
@config_str = configstr
end # def config
def config_yaml(configstr)
@config_str = configstr
@is_yaml = true
end
def type(default_type)
@default_type = default_type
end
@ -30,8 +36,7 @@ module LogStash
def sample(event, &block)
default_type = @default_type || "default"
default_tags = @default_tags || []
require "logstash/config/file"
config = LogStash::Config::File.new(nil, @config_str)
config = get_config
agent = LogStash::Agent.new
@inputs, @filters, @outputs = agent.instance_eval { parse_config(config) }
[@inputs, @filters, @outputs].flatten.each do |plugin|
@ -95,8 +100,7 @@ module LogStash
end # def sample
def input(&block)
require "logstash/config/file"
config = LogStash::Config::File.new(nil, @config_str)
config = get_config
agent = LogStash::Agent.new
it "looks good" do
inputs, filters, outputs = agent.instance_eval { parse_config(config) }
@ -104,6 +108,16 @@ module LogStash
end
end # def input
def get_config
if @is_yaml
require "logstash/config/file/yaml"
config = LogStash::Config::File::Yaml.new(nil, @config_str)
else
require "logstash/config/file"
config = LogStash::Config::File.new(nil, @config_str)
end
end # def get_config
def agent(&block)
@agent_count ||= 0
require "logstash/agent"