diff --git a/CHANGELOG b/CHANGELOG index dd2e3f82c..55ac0dba3 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,9 +2,9 @@ ## Overview of this release: - grok now captures (?...) regexp into 'somename' field - new 'charset' feature for inputs (for improved UTF-8 conversion support) - - TODO TODO TODO new faster start-time release jars are available, see the 'flatjar' download - option. This flatjar thing may have bugs, so both flatjar and monolithic are - available. + - TODO TODO TODO new faster start-time release jars are available, see the + 'flatjar' download option. This flatjar thing may have bugs, so both flatjar + and monolithic are available. ## general - fixed internal dependency versioning on 'addressable' gem (LOGSTASH-694) @@ -12,13 +12,19 @@ ## inputs - All inputs now have a 'charset' setting to help you inform logstash of the - text encoding of the input. This is useful if you have Shift_JIS or CP1252 + text encoding of the input. This is useful if you have Shift_JIS or CP1251 encoded log files. This should help resolve the many UTF-8 bugs that were reported recently. - bugfix: zeromq: 'topology' is now a required setting - - lumberjack: jls-lumberjack gem updated to 0.0.7 + - misc: lumberjack: jls-lumberjack gem updated to 0.0.7 + - bugfix: stomp: fix startup problems causing early termination (#226 ## filters + - new: anonymize: supports many hash mechanisms (murmur3, sha, md5, etc) as + well as IP address anonymization (#280, #261; patches by Richard Pijnenburg + and Avishai Ish-Shalom) + - filter: date: now accepts 'match' as a setting. Use of this is preferable + to the old syntax. - improvement: grok: now accepts (?...) named captures. This lets you compose a pattern in the grok config without needing to define it in a patterns file. Example: (?%{HOST}:%{POSINT}) to capture 'hostport' @@ -31,8 +37,13 @@ matched. (LOGSTASH-705) - improvement: kv: Adds field_split, value_split, prefix, and container settings. (#225, patch by Alex Wheeler) + - mutate: rename on a nonexistant field now does nothing as expected. + (LOGSTASH-757) ## outputs + - new: syslog output supporting both RFC3164 and RFC5424 (#180, patch by + ruckalvnet) + - new: cloudwatch output to emit metrics and other events to Amazon CloudWatch. - bugfix: zeromq: 'topology' is now a required setting - improvement: mongodb: new setting 'isodate', when true, stores the @timestamp field as a mongodb date instead of a string. (#224, patch by Kevin Amorin) diff --git a/Makefile b/Makefile index 9aa4c0e35..75d105bb8 100644 --- a/Makefile +++ b/Makefile @@ -5,21 +5,19 @@ # JRUBY_VERSION=1.7.0 ELASTICSEARCH_VERSION=0.19.10 -JODA_VERSION=2.1 #VERSION=$(shell ruby -r./lib/logstash/version -e 'puts LOGSTASH_VERSION') -VERSION=$(shell awk -F\" '/LOGSTASH_VERSION/ {print $$2}' lib/logstash/version.rb ) +VERSION=$(shell awk -F\" '/LOGSTASH_VERSION/ {print $$2}' lib/logstash/version.rb) WITH_JRUBY=java -jar $(shell pwd)/$(JRUBY) -S JRUBY=vendor/jar/jruby-complete-$(JRUBY_VERSION).jar JRUBY_URL=http://repository.codehaus.org/org/jruby/jruby-complete/$(JRUBY_VERSION) JRUBY_CMD=java -jar $(JRUBY) JRUBYC=$(WITH_JRUBY) jrubyc -ELASTICSEARCH_URL=http://github.com/downloads/elasticsearch/elasticsearch +ELASTICSEARCH_URL=http://download.elasticsearch.org/elasticsearch/elasticsearch ELASTICSEARCH=vendor/jar/elasticsearch-$(ELASTICSEARCH_VERSION) -JODA=vendor/jar/joda-time-$(JODA_VERSION)/joda-time-$(JODA_VERSION).jar GEOIP=vendor/geoip/GeoLiteCity.dat GEOIP_URL=http://logstash.objects.dreamhost.com/maxmind/GeoLiteCity-2012-11-09.dat.gz -PLUGIN_FILES=$(shell git ls-files | egrep '^lib/logstash/(inputs|outputs|filters)/' | egrep -v '/(base|threadable).rb$$|/inputs/ganglia/') +PLUGIN_FILES=$(shell git ls-files | egrep '^lib/logstash/(inputs|outputs|filters)/[^/]+$$' | egrep -v '/(base|threadable).rb$$|/inputs/ganglia/') QUIET=@ WGET=$(shell which wget 2>/dev/null) @@ -96,7 +94,7 @@ $(JRUBY): | vendor/jar vendor/jar/elasticsearch-$(ELASTICSEARCH_VERSION).tar.gz: | wget-or-curl vendor/jar @echo "=> Fetching elasticsearch" $(QUIET)$(DOWNLOAD_COMMAND) $@ $(ELASTICSEARCH_URL)/elasticsearch-$(ELASTICSEARCH_VERSION).tar.gz - + vendor/jar/graphtastic-rmiclient.jar: | wget-or-curl vendor/jar @echo "=> Fetching graphtastic rmi client jar" $(QUIET)$(DOWNLOAD_COMMAND) $@ http://cloud.github.com/downloads/NickPadilla/GraphTastic/graphtastic-rmiclient.jar @@ -108,12 +106,6 @@ $(ELASTICSEARCH): $(ELASTICSEARCH).tar.gz | vendor/jar $(QUIET)tar -C $(shell dirname $@) -xf $< $(TAR_OPTS) --exclude '*sigar*' \ 'elasticsearch-$(ELASTICSEARCH_VERSION)/lib/*.jar' -vendor/jar/joda-time-$(JODA_VERSION)-dist.tar.gz: | wget-or-curl vendor/jar - $(DOWNLOAD_COMMAND) $@ "http://downloads.sourceforge.net/project/joda-time/joda-time/$(JODA_VERSION)/joda-time-$(JODA_VERSION)-dist.tar.gz" - -vendor/jar/joda-time-$(JODA_VERSION)/joda-time-$(JODA_VERSION).jar: vendor/jar/joda-time-$(JODA_VERSION)-dist.tar.gz | vendor/jar - tar -C vendor/jar -zxf $< joda-time-$(JODA_VERSION)/joda-time-$(JODA_VERSION).jar - vendor/geoip: | vendor $(QUIET)mkdir $@ @@ -132,7 +124,7 @@ vendor-gems: | vendor/bundle vendor/bundle: | vendor $(JRUBY) @echo "=> Installing gems to $@..." #$(QUIET)GEM_HOME=$(GEM_HOME) $(JRUBY_CMD) --1.9 $(GEM_HOME)/bin/bundle install --deployment - $(QUIET)GEM_HOME=./vendor/bundle/jruby/1.9/ GEM_PATH= $(JRUBY_CMD) --1.9 ./gembag.rb logstash.gemspec + $(QUIET)GEM_HOME=./vendor/bundle/jruby/1.9/ GEM_PATH= $(JRUBY_CMD) --1.9 ./gembag.rb logstash.gemspec @# Purge any junk that fattens our jar without need! @# The riak gem includes previous gems in the 'pkg' dir. :( -rm -rf $@/jruby/1.9/gems/riak-client-1.0.3/pkg @@ -152,7 +144,7 @@ build/ruby: | build # TODO(sissel): Skip sigar? # Run this one always? Hmm.. .PHONY: build/monolith -build/monolith: $(ELASTICSEARCH) $(JRUBY) $(JODA) $(GEOIP) vendor-gems | build +build/monolith: $(ELASTICSEARCH) $(JRUBY) $(GEOIP) vendor-gems | build build/monolith: compile copy-ruby-files vendor/jar/graphtastic-rmiclient.jar -$(QUIET)mkdir -p $@ @# Unpack all the 3rdparty jars and any jars in gems @@ -164,10 +156,6 @@ build/monolith: compile copy-ruby-files vendor/jar/graphtastic-rmiclient.jar $(QUIET)cp -r $$PWD/vendor/bundle/jruby/1.9/gems/jruby-openss*/lib/shared/openssl/* $@/openssl $(QUIET)cp -r $$PWD/vendor/bundle/jruby/1.9/gems/jruby-openss*/lib/shared/jopenssl/* $@/jopenssl $(QUIET)cp -r $$PWD/vendor/bundle/jruby/1.9/gems/jruby-openss*/lib/shared/openssl.rb $@/openssl.rb - @# Make sure joda-time gets unpacked last, so it overwrites the joda jruby - @# ships with. - $(QUIET)find $$PWD/vendor/jar/joda-time-$(JODA_VERSION) -name '*.jar' \ - | (cd $@; xargs -tn1 jar xf) @# Purge any extra files we don't need in META-INF (like manifests and @# signature files) -$(QUIET)rm -f $@/META-INF/*.LIST diff --git a/docs/flags.md b/docs/flags.md index ee14036c1..fc9113426 100644 --- a/docs/flags.md +++ b/docs/flags.md @@ -11,7 +11,16 @@ The logstash agent has the following flags (also try using the '--help' flag)
-f, --config CONFIGFILE
Load the logstash config from a specific file, directory, or a wildcard. If given a directory or wildcard, config files will be read in order lexigraphically.
-
--log FILE
+
-e CONFIGSTRING
+
Use the given string as the configuration data. Same syntax as the +config file. If not input is specified, 'stdin { type => stdin }' is +default. If no output is specified, 'stdout { debug => true }}' is +default.
+
-w, --filterworks COUNT
+
Run COUNT filter workers (default: 1)
+
--watchdog-timeout TIMEOUT
+
Set watchdog timeout value.
+
-l, --log FILE
Log to a given path. Default is to log to stdout
-v
Increase verbosity. There are multiple levels of verbosity available with @@ -26,6 +35,9 @@ name, like --grok-foo. ## Web UI +The logstash web interface has the following flags (also try using the '--help' +flag) +
--log FILE
Log to a given path. Default is stdout.
@@ -33,7 +45,9 @@ name, like --grok-foo.
Address on which to start webserver. Default is 0.0.0.0.
--port PORT
Port on which to start webserver. Default is 9292.
-
--backend URL
+
-B, --elasticsearch-bind-host ADDRESS
+
Address on which to bind elastic search node.
+
-b, --backend URL
The backend URL to use. Default is elasticsearch:/// (assumes multicast discovery). You can specify elasticsearch://[host][:port]/[clustername]
diff --git a/docs/tutorials/getting-started-centralized.md b/docs/tutorials/getting-started-centralized.md index 0be15a1c3..c54371ed4 100644 --- a/docs/tutorials/getting-started-centralized.md +++ b/docs/tutorials/getting-started-centralized.md @@ -63,7 +63,7 @@ Building and installing Redis is fairly straightforward. While normally this wou - Download Redis from http://redis.io/download (The latest stable release is like what you want) - Extract the source, change to the directory and run `make` -- Run Redis with `src/redis-server` +- Run Redis with `src/redis-server --loglevel verbose` That's it. @@ -104,6 +104,7 @@ Put this in a file and call it 'shipper.conf' (or anything, really), and run: This will take anything you type into this console and display it on the console. Additionally it will save events to Redis in a `list` named after the `key` value you provided. ### Testing the Redis output + To verify that the message made it into Redis, check your Redis window. You should see something like the following: [83019] 02 Jul 12:51:02 - Accepted 127.0.0.1:58312 @@ -148,8 +149,9 @@ sample config based on the previous section. Save this as `indexer.conf` # these settings should match the output of the agent data_type => "list" key => "logstash" + # We use json_event here since the sender is a logstash agent - message_format => "json_event" + format => "json_event" } } diff --git a/lib/logstash/agent.rb b/lib/logstash/agent.rb index 6083a68db..65413b733 100644 --- a/lib/logstash/agent.rb +++ b/lib/logstash/agent.rb @@ -1,4 +1,5 @@ require "logstash/config/file" +require "logstash/config/file/yaml" require "logstash/filterworker" require "logstash/logging" require "logstash/sized_queue" @@ -34,6 +35,7 @@ class LogStash::Agent log_to(STDERR) @config_path = nil @config_string = nil + @is_yaml = false @logfile = nil # flag/config defaults @@ -140,7 +142,7 @@ class LogStash::Agent # These are 'unknown' flags that begin ---flag # Put any plugin paths into the ruby library path for requiring later. @plugin_paths.each do |p| - @logger.debug("Adding to ruby load path", :path => p) + @logger.debug? and @logger.debug("Adding to ruby load path", :path => p) $:.unshift p end @@ -163,8 +165,8 @@ class LogStash::Agent %w{inputs outputs filters}.each do |component| @plugin_paths.each do |path| plugin = File.join(path, component, name) + ".rb" - @logger.debug("Plugin flag found; trying to load it", - :flag => arg, :plugin => plugin) + @logger.debug? and @logger.debug("Plugin flag found; trying to load it", + :flag => arg, :plugin => plugin) if File.file?(plugin) @logger.info("Loading plugin", :plugin => plugin) require plugin @@ -173,7 +175,7 @@ class LogStash::Agent # and add any options to our option parser. klass_name = name.capitalize if c.const_defined?(klass_name) - @logger.debug("Found plugin class", :class => "#{c}::#{klass_name})") + @logger.debug? and @logger.debug("Found plugin class", :class => "#{c}::#{klass_name})") klass = c.const_get(klass_name) # See LogStash::Config::Mixin::DSL#options klass.options(@opts) @@ -241,8 +243,8 @@ class LogStash::Agent # Support directory of config files. # https://logstash.jira.com/browse/LOGSTASH-106 if File.directory?(@config_path) - @logger.debug("Config path is a directory, scanning files", - :path => @config_path) + @logger.debug? and @logger.debug("Config path is a directory, scanning files", + :path => @config_path) paths = Dir.glob(File.join(@config_path, "*")).sort else # Get a list of files matching a glob. If the user specified a single @@ -252,13 +254,25 @@ class LogStash::Agent concatconfig = [] paths.each do |path| - concatconfig << File.new(path).read + file = File.new(path) + if File.extname(file) == '.yaml' + # assume always YAML if even one file is + @is_yaml = true + end + concatconfig << file.read end - config = LogStash::Config::File.new(nil, concatconfig.join("\n")) + config_data = concatconfig.join("\n") else # @config_string # Given a config string by the user (via the '-e' flag) - config = LogStash::Config::File.new(nil, @config_string) + config_data = @config_string end + + if @is_yaml + config = LogStash::Config::File::Yaml.new(nil, config_data) + else + config = LogStash::Config::File.new(nil, config_data) + end + config.logger = @logger config end @@ -332,23 +346,23 @@ class LogStash::Agent private def start_input(input) - @logger.debug("Starting input", :plugin => input) + @logger.debug? and @logger.debug("Starting input", :plugin => input) t = 0 # inputs should write directly to output queue if there are no filters. input_target = @filters.length > 0 ? @filter_queue : @output_queue # check to see if input supports multiple threads if input.threadable - @logger.debug("Threadable input", :plugin => input) + @logger.debug? and @logger.debug("Threadable input", :plugin => input) # start up extra threads if need be (input.threads-1).times do input_thread = input.clone - @logger.debug("Starting thread", :plugin => input, :thread => (t+=1)) + @logger.debug? and @logger.debug("Starting thread", :plugin => input, :thread => (t+=1)) @plugins[input_thread] = Thread.new(input_thread, input_target) do |*args| run_input(*args) end end end - @logger.debug("Starting thread", :plugin => input, :thread => (t+=1)) + @logger.debug? and @logger.debug("Starting thread", :plugin => input, :thread => (t+=1)) @plugins[input] = Thread.new(input, input_target) do |*args| run_input(*args) end @@ -356,7 +370,7 @@ class LogStash::Agent private def start_output(output) - @logger.debug("Starting output", :plugin => output) + @logger.debug? and @logger.debug("Starting output", :plugin => output) queue = LogStash::SizedQueue.new(10 * @filterworker_count) queue.logger = @logger @output_queue.add_queue(queue) @@ -474,7 +488,7 @@ class LogStash::Agent shutdown break end - @logger.debug("heartbeat") + @logger.debug? and @logger.debug("heartbeat") end end # def run_with_config @@ -740,7 +754,7 @@ class LogStash::Agent begin while event = queue.pop do - @logger.debug("Sending event", :target => output) + @logger.debug? and @logger.debug("Sending event", :target => output) output.handle(event) break if output.finished? end @@ -768,8 +782,9 @@ class LogStash::Agent remaining = @plugins.count do |plugin, thread| plugin.is_a?(pluginclass) and plugin.running? and thread.alive? end - @logger.debug("Plugins still running", :type => pluginclass, - :remaining => remaining) + @logger.debug? and @logger.debug("Plugins still running", + :type => pluginclass, + :remaining => remaining) if remaining == 0 @logger.warn("All #{pluginclass} finished. Shutting down.") diff --git a/lib/logstash/config/file.rb b/lib/logstash/config/file.rb index 4a94e1fd3..3a3c0f983 100644 --- a/lib/logstash/config/file.rb +++ b/lib/logstash/config/file.rb @@ -18,18 +18,24 @@ class LogStash::Config::File end end # def initialize + def _get_config_data + if @string.nil? + File.new(@path).read + else + @string + end + end + + def _get_config(data) + grammar = LogStash::Config::Grammar.new + grammar.parse(data) + grammar.config + end + public def parse - grammar = LogStash::Config::Grammar.new + @config = _get_config(_get_config_data); - if @string.nil? - grammar.parse(File.new(@path).read) - else - grammar.parse(@string) - end - - @config = grammar.config - registry = LogStash::Config::Registry::registry each do |o| # Load the base class for the type given (like inputs/base, or filters/base) diff --git a/lib/logstash/config/file/yaml.rb b/lib/logstash/config/file/yaml.rb new file mode 100755 index 000000000..2786f779e --- /dev/null +++ b/lib/logstash/config/file/yaml.rb @@ -0,0 +1,8 @@ +require "logstash/config/file" +require "yaml" + +class LogStash::Config::File::Yaml < LogStash::Config::File + def _get_config(data) + return YAML.load(data) + end +end diff --git a/lib/logstash/config/grammar.rb b/lib/logstash/config/grammar.rb old mode 100755 new mode 100644 index 333af8248..3e7181d5a --- a/lib/logstash/config/grammar.rb +++ b/lib/logstash/config/grammar.rb @@ -3,7 +3,7 @@ require "logstash/namespace" -# line 147 "grammar.rl" +# line 150 "grammar.rl" class LogStash::Config::Grammar @@ -248,7 +248,7 @@ end self.logstash_config_en_main = 55; -# line 156 "grammar.rl" +# line 159 "grammar.rl" # END RAGEL DATA @tokenstack = Array.new @@ -275,7 +275,7 @@ begin cs = logstash_config_start end -# line 175 "grammar.rl" +# line 178 "grammar.rl" # END RAGEL INIT begin @@ -469,7 +469,7 @@ when 10 then #puts "Config component: #{name}" end when 12 then -# line 142 "grammar.rl" +# line 145 "grammar.rl" begin # Compute line and column of the cursor (p) @@ -521,11 +521,11 @@ when 10 then #puts "Config component: #{name}" end when 11 then -# line 141 "grammar.rl" +# line 144 "grammar.rl" begin puts "END" end when 12 then -# line 142 "grammar.rl" +# line 145 "grammar.rl" begin # Compute line and column of the cursor (p) @@ -546,7 +546,7 @@ end end end -# line 180 "grammar.rl" +# line 183 "grammar.rl" # END RAGEL EXEC rescue => e # Compute line and column of the cursor (p) diff --git a/lib/logstash/config/mixin.rb b/lib/logstash/config/mixin.rb index cf8359c66..8cc1d3ff6 100644 --- a/lib/logstash/config/mixin.rb +++ b/lib/logstash/config/mixin.rb @@ -302,23 +302,29 @@ module LogStash::Config::Mixin elsif validator.is_a?(Symbol) # TODO(sissel): Factor this out into a coersion method? # TODO(sissel): Document this stuff. + value = hash_or_array(value) + case validator when :hash - if value.size % 2 == 1 - return false, "This field must contain an even number of items, got #{value.size}" - end + if value.is_a?(Hash) + result = value + else + if value.size % 2 == 1 + return false, "This field must contain an even number of items, got #{value.size}" + end - # Convert the array the config parser produces into a hash. - result = {} - value.each_slice(2) do |key, value| - entry = result[key] - if entry.nil? - result[key] = value - else - if entry.is_a?(Array) - entry << value + # Convert the array the config parser produces into a hash. + result = {} + value.each_slice(2) do |key, value| + entry = result[key] + if entry.nil? + result[key] = value else - result[key] = [entry, value] + if entry.is_a?(Array) + entry << value + else + result[key] = [entry, value] + end end end end @@ -342,11 +348,18 @@ module LogStash::Config::Mixin return false, "Expected boolean, got #{value.inspect}" end - if value.first !~ /^(true|false)$/ - return false, "Expected boolean 'true' or 'false', got #{value.first.inspect}" - end + bool_value = value.first + if !!bool_value == bool_value + # is_a does not work for booleans + # we have Boolean and not a string + result = bool_value + else + if bool_value !~ /^(true|false)$/ + return false, "Expected boolean 'true' or 'false', got #{bool_value.inspect}" + end - result = (value.first == "true") + result = (bool_value == "true") + end when :ipaddr if value.size > 1 # only one value wanted return false, "Expected IPaddr, got #{value.inspect}" @@ -376,5 +389,12 @@ module LogStash::Config::Mixin # Return the validator for later use, like with type coercion. return true, result end # def validate_value + + def hash_or_array(value) + if !value.is_a?(Hash) + value = [*value] # coerce scalar to array if necessary + end + return value + end end # module LogStash::Config::DSL end # module LogStash::Config diff --git a/lib/logstash/event.rb b/lib/logstash/event.rb index c77845468..34c2e923c 100644 --- a/lib/logstash/event.rb +++ b/lib/logstash/event.rb @@ -1,7 +1,7 @@ require "json" require "time" require "date" -require "logstash/time" +require "logstash/time_addon" require "logstash/namespace" require "uri" @@ -16,6 +16,7 @@ class LogStash::Event @cancelled = false @data = { + "@source_host" => false, "@source" => "unknown", "@tags" => [], "@fields" => {}, @@ -24,7 +25,7 @@ class LogStash::Event @data["@timestamp"] ||= LogStash::Time.now end # def initialize - if RUBY_ENGINE == "jruby" + if defined?(RUBY_ENGINE) && RUBY_ENGINE == "jruby" @@date_parser = Java::org.joda.time.format.ISODateTimeFormat.dateTimeParser.withOffsetParsed else # TODO(sissel): LOGSTASH-217 @@ -91,16 +92,16 @@ class LogStash::Event public def source; @data["@source"]; end # def source - def source=(val) + def source=(val) uri = URI.parse(val) rescue nil val = uri if uri if val.is_a?(URI) @data["@source"] = val.to_s - @data["@source_host"] = val.host + @data["@source_host"] = val.host if @data["@source_host"].nil? @data["@source_path"] = val.path else @data["@source"] = val - @data["@source_host"] = val + @data["@source_host"] = val.host if @data["@source_host"].nil? end end # def source= @@ -124,6 +125,9 @@ class LogStash::Event def tags; @data["@tags"]; end # def tags def tags=(val); @data["@tags"] = val; end # def tags= + def id; @data["@id"]; end # def id + def id=(val); @data["@id"] = val; end # def id= + # field-related access public def [](key) @@ -190,13 +194,13 @@ class LogStash::Event end # event.fields.each end # def append - # Remove a field + # Remove a field. Returns the value of that field when deleted public def remove(field) if @data.has_key?(field) - @data.delete(field) + return @data.delete(field) else - @data["@fields"].delete(field) + return @data["@fields"].delete(field) end end # def remove @@ -230,7 +234,7 @@ class LogStash::Event # Got %{+%s}, support for unix epoch time if RUBY_ENGINE != "jruby" # This is really slow. See LOGSTASH-217 - Date.parse(self.timestamp).to_i + Time.parse(self.timestamp).to_i else datetime = @@date_parser.parseDateTime(self.timestamp) (datetime.getMillis / 1000).to_i diff --git a/lib/logstash/filters/anonymize.rb b/lib/logstash/filters/anonymize.rb new file mode 100644 index 000000000..6393f6cf8 --- /dev/null +++ b/lib/logstash/filters/anonymize.rb @@ -0,0 +1,87 @@ +require "logstash/filters/base" +require "logstash/namespace" + +# Anonymize fields using by replacing values with a consistent hash. +class LogStash::Filters::Anonymize < LogStash::Filters::Base + config_name "anonymize" + plugin_status "experimental" + + # The fields to be anonymized + config :fields, :validate => :array, :required => true + + # Hashing key + # When using MURMUR3 the key is ignored but must still be set. + # When using IPV4_NETWORK key is the subnet prefix lentgh + config :key, :validate => :string, :required => true + + # digest/hash type + config :algorithm, :validate => ['SHA', 'SHA1', 'SHA224', 'SHA256', 'SHA384', 'SHA512', 'MD4', 'MD5', "MURMUR3", "IPV4_NETWORK"], :required => true, :default => 'SHA1' + + public + def register + # require any library and set the anonymize function + case @algorithm + when "IPV4_NETWORK" + require 'ipaddr' + class << self; alias_method :anonymize, :anonymize_ipv4_network; end + when "MURMUR3" + require "murmurhash3" + class << self; alias_method :anonymize, :anonymize_murmur3; end + else + require 'openssl' + class << self; alias_method :anonymize, :anonymize_openssl; end + end + end # def register + + public + def filter(event) + return unless filter?(event) + @fields.each do |field| + event[field] = anonymize(event[field]) + end + end # def filter + + private + def anonymize_ipv4_network(ip_string) + IPAddr.new(ip_string).mask(@key.to_i).to_s + end + + def anonymize_openssl(data) + digest = algorithm() + OpenSSL::HMAC.hexdigest(digest, @key, data) + end + + def anonymize_murmur3(value) + case value + when Fixnum + MurmurHash3::V32.int_hash(value) + when String + MurmurHash3::V32.str_hash(value) + end + end + + def algorithm + + case @algorithm + when 'SHA' + return OpenSSL::Digest::SHA.new + when 'SHA1' + return OpenSSL::Digest::SHA1.new + when 'SHA224' + return OpenSSL::Digest::SHA224.new + when 'SHA256' + return OpenSSL::Digest::SHA256.new + when 'SHA384' + return OpenSSL::Digest::SHA384.new + when 'SHA512' + return OpenSSL::Digest::SHA512.new + when 'MD4' + return OpenSSL::Digest::MD4.new + when 'MD5' + return OpenSSL::Digest::MD5.new + else + @logger.error("Unknown algorithm") + end + end + +end # class LogStash::Filters::Anonymize diff --git a/lib/logstash/filters/base.rb b/lib/logstash/filters/base.rb index 524830b7f..f9ff123bf 100644 --- a/lib/logstash/filters/base.rb +++ b/lib/logstash/filters/base.rb @@ -105,12 +105,14 @@ class LogStash::Filters::Base < LogStash::Plugin event[field] = [event[field]] if !event[field].is_a?(Array) event[field] << event.sprintf(value) end - @logger.debug("filters/#{self.class.name}: adding value to field", - :field => field, :value => value) + @logger.debug? and @logger.debug("filters/#{self.class.name}: adding " \ + "value to field", :field => field, + :value => value) end (@add_tag or []).each do |tag| - @logger.debug("filters/#{self.class.name}: adding tag", :tag => tag) + @logger.debug? and @logger.debug("filters/#{self.class.name}: adding tag", + :tag => tag) event.tags << event.sprintf(tag) #event.tags |= [ event.sprintf(tag) ] end @@ -119,7 +121,8 @@ class LogStash::Filters::Base < LogStash::Plugin remove_tags = @remove_tag.map do |tag| event.sprintf(tag) end - @logger.debug("filters/#{self.class.name}: removing tags", :tags => (event.tags & remove_tags)) + @logger.debug? and @logger.debug("filters/#{self.class.name}: removing tags", + :tags => (event.tags & remove_tags)) event.tags -= remove_tags end end # def filter_matched diff --git a/lib/logstash/filters/csv.rb b/lib/logstash/filters/csv.rb index bc878bc09..d3a5da9cd 100644 --- a/lib/logstash/filters/csv.rb +++ b/lib/logstash/filters/csv.rb @@ -23,12 +23,17 @@ class LogStash::Filters::CSV < LogStash::Filters::Base # Optional. config :fields, :validate => :array, :default => [] + # Define the column separator value. If this is not specified the default + # is a comma ',' + # Optional. + config :separator, :validate => :string, :default => "," + public def register @csv = {} @config.each do |field, dest| - next if (RESERVED + ["fields"]).member?(field) + next if (RESERVED + ["fields", "separator"]).member?(field) @csv[field] = dest end @@ -60,7 +65,7 @@ class LogStash::Filters::CSV < LogStash::Filters::Base raw = event[key].first begin - values = CSV.parse_line(raw) + values = CSV.parse_line(raw, {:col_sep => @separator}) data = {} values.each_index do |i| field_name = @fields[i] || "field#{i+1}" @@ -82,3 +87,4 @@ class LogStash::Filters::CSV < LogStash::Filters::Base @logger.debug("Event after csv filter", :event => event) end # def filter end # class LogStash::Filters::Csv + diff --git a/lib/logstash/filters/date.rb b/lib/logstash/filters/date.rb index b63332301..1e39194cc 100644 --- a/lib/logstash/filters/date.rb +++ b/lib/logstash/filters/date.rb @@ -47,17 +47,47 @@ class LogStash::Filters::Date < LogStash::Filters::Base # 2011-04-19T03:44:01.103Z # * "UNIX" - will parse unix time in seconds since epoch # * "UNIX_MS" - will parse unix time in milliseconds since epoch + # * "TAI64N" - will parse tai64n time values # - # For example, if you have a field 'logdate' and with a value that looks like 'Aug 13 2010 00:03:44' + # For example, if you have a field 'logdate' and with a value that looks like + # 'Aug 13 2010 00:03:44' # you would use this configuration: # - # logdate => "MMM dd yyyy HH:mm:ss" + # logdate => "MMM dd YYYY HH:mm:ss" # # [dateformats]: http://download.oracle.com/javase/1.4.2/docs/api/java/text/SimpleDateFormat.html config /[A-Za-z0-9_-]+/, :validate => :array - # An array with field name first, and format patterns following, [ field, formats... ] - # Using this more than once will have unpredictable results, so only use it once per date filter. + # The date formats allowed are anything allowed by Joda-Time (java time + # library), generally: [java.text.SimpleDateFormat][dateformats] + # + # An array with field name first, and format patterns following, [ field, + # formats... ] + # + # If your time field has multiple possible formats, you can do this: + # + # match => [ "logdate", "MMM dd YYY HH:mm:ss", + # "MMM d YYY HH:mm:ss", "ISO8601" ] + # + # The above will match a syslog (rfc3164) or iso8601 timestamp. + # + # There are a few special exceptions, the following format literals exist + # to help you save time and ensure correctness of date parsing. + # + # * "ISO8601" - should parse any valid ISO8601 timestamp, such as + # 2011-04-19T03:44:01.103Z + # * "UNIX" - will parse unix time in seconds since epoch + # * "UNIX_MS" - will parse unix time in milliseconds since epoch + # * "TAI64N" - will parse tai64n time values + # + # For example, if you have a field 'logdate' and with a value that looks like + # 'Aug 13 2010 00:03:44', you would use this configuration: + # + # filter { + # date { + # match => [ "logdate", "MMM dd YYYY HH:mm:ss" ] + # } + # } config :match, :validate => :array, :default => [] # LOGSTASH-34 @@ -130,7 +160,12 @@ class LogStash::Filters::Date < LogStash::Filters::Base when "UNIX_MS" # unix epoch in ms parser = lambda { |date| org.joda.time.Instant.new(date.to_i).toDateTime } when "TAI64N" # TAI64 with nanoseconds, -10000 accounts for leap seconds - parser = lambda { |date| org.joda.time.Instant.new((date[1..15].hex * 1000 - 10000)+(date[16..23].hex/1000000)).toDateTime } + parser = lambda do |date| + # Skip leading "@" if it is present (common in tai64n times) + date = date[1..-1] if date[0, 1] == "@" + + org.joda.time.Instant.new((date[1..15].hex * 1000 - 10000)+(date[16..23].hex/1000000)).toDateTime + end else joda_parser = org.joda.time.format.DateTimeFormat.forPattern(format).withOffsetParsed if (locale != nil) diff --git a/lib/logstash/filters/grok.rb b/lib/logstash/filters/grok.rb index 04995b74a..2c358c8fb 100644 --- a/lib/logstash/filters/grok.rb +++ b/lib/logstash/filters/grok.rb @@ -104,13 +104,13 @@ class LogStash::Filters::Grok < LogStash::Filters::Base # Have @@patterns_path show first. Last-in pattern definitions win; this # will let folks redefine built-in patterns at runtime. @patterns_dir = @@patterns_path.to_a + @patterns_dir - @logger.info("Grok patterns path", :patterns_dir => @patterns_dir) + @logger.info? and @logger.info("Grok patterns path", :patterns_dir => @patterns_dir) @patterns_dir.each do |path| # Can't read relative paths from jars, try to normalize away '../' while path =~ /file:\/.*\.jar!.*\/\.\.\// # replace /foo/bar/../baz => /foo/baz path = path.gsub(/[^\/]+\/\.\.\//, "") - @logger.debug("In-jar path to read", :path => path) + @logger.debug? and @logger.debug("In-jar path to read", :path => path) end if File.directory?(path) @@ -118,14 +118,14 @@ class LogStash::Filters::Grok < LogStash::Filters::Base end Dir.glob(path).each do |file| - @logger.info("Grok loading patterns from file", :path => file) + @logger.info? and @logger.info("Grok loading patterns from file", :path => file) @patternfiles << file end end @patterns = Hash.new { |h,k| h[k] = [] } - @logger.info("Match data", :match => @match) + @logger.info? and @logger.info("Match data", :match => @match) # TODO(sissel): Hash.merge actually overrides, not merges arrays. # Work around it by implementing our own? @@ -143,9 +143,9 @@ class LogStash::Filters::Grok < LogStash::Filters::Base add_patterns_from_files(@patternfiles, @patterns[field]) end - @logger.info("Grok compile", :field => field, :patterns => patterns) + @logger.info? and @logger.info("Grok compile", :field => field, :patterns => patterns) patterns.each do |pattern| - @logger.debug("regexp: #{@type}/#{field}", :pattern => pattern) + @logger.debug? and @logger.debug("regexp: #{@type}/#{field}", :pattern => pattern) @patterns[field].compile(pattern) end end # @config.each @@ -158,17 +158,17 @@ class LogStash::Filters::Grok < LogStash::Filters::Base # parse it with grok matched = false - @logger.debug("Running grok filter", :event => event); + @logger.debug? and @logger.debug("Running grok filter", :event => event); done = false @patterns.each do |field, pile| break if done if !event[field] - @logger.debug("Skipping match object, field not present", - :field => field, :event => event) + @logger.debug? and @logger.debug("Skipping match object, field not present", + :field => field, :event => event) next end - @logger.debug("Trying pattern", :pile => pile, :field => field) + @logger.debug? and @logger.debug("Trying pattern", :pile => pile, :field => field) (event[field].is_a?(Array) ? event[field] : [event[field]]).each do |fieldvalue| begin # Coerce all field values to string. This turns arrays, hashes, numbers, etc @@ -197,8 +197,8 @@ class LogStash::Filters::Grok < LogStash::Filters::Base # Permit typing of captures by giving an additional colon and a type, # like: %{FOO:name:int} for int coercion. if type_coerce - @logger.info("Match type coerce: #{type_coerce}") - @logger.info("Patt: #{grok.pattern}") + @logger.info? and @logger.info("Match type coerce: #{type_coerce}") + @logger.info? and @logger.info("Patt: #{grok.pattern}") end case type_coerce @@ -211,13 +211,12 @@ class LogStash::Filters::Grok < LogStash::Filters::Base # Special casing to skip captures that represent the entire log message. if fieldvalue == value and field == "@message" # Skip patterns that match the entire message - @logger.debug("Skipping capture since it matches the whole line.", :field => key) + @logger.debug? and @logger.debug("Skipping capture since it matches the whole line.", :field => key) next end if @named_captures_only && !is_named - @logger.debug("Skipping capture since it is not a named " \ - "capture and named_captures_only is true.", :field => key) + @logger.debug? and @logger.debug("Skipping capture since it is not a named " "capture and named_captures_only is true.", :field => key) next end @@ -253,7 +252,7 @@ class LogStash::Filters::Grok < LogStash::Filters::Base event.tags << "_grokparsefailure" end - @logger.debug("Event now: ", :event => event) + @logger.debug? and @logger.debug("Event now: ", :event => event) end # def filter private @@ -272,8 +271,8 @@ class LogStash::Filters::Grok < LogStash::Filters::Base # the end. I don't know if this is a bug or intentional, but we need # to chomp it. name, pattern = line.chomp.split(/\s+/, 2) - @logger.debug("Adding pattern from file", :name => name, - :pattern => pattern, :path => path) + @logger.debug? and @logger.debug("Adding pattern from file", :name => name, + :pattern => pattern, :path => path) pile.add_pattern(name, pattern) end else diff --git a/lib/logstash/filters/kv.rb b/lib/logstash/filters/kv.rb index d38093691..3fbe0486c 100644 --- a/lib/logstash/filters/kv.rb +++ b/lib/logstash/filters/kv.rb @@ -7,9 +7,9 @@ require "logstash/namespace" # For example, if you have a log message which contains 'ip=1.2.3.4 # error=REFUSED', you can parse those automatically by doing: # -# filter { -# kv { } -# } +# filter { +# kv { } +# } # # And you will get field 'ip' == "1.2.3.4" etc. class LogStash::Filters::KV < LogStash::Filters::Base @@ -33,9 +33,10 @@ class LogStash::Filters::KV < LogStash::Filters::Base # # Example, to split out the args from a string such as # '?pin=12345~0&d=123&e=foo@bar.com&oq=bobo&ss=12345': - # + # + # Default to space character for backward compatibility # filter { kv { field_split => "&?" } } - config :field_split, :validate => :string, :default => '' + config :field_split, :validate => :string, :default => ' ' # A string of characters to use as delimiters for identifying key-value relations. @@ -77,7 +78,7 @@ class LogStash::Filters::KV < LogStash::Filters::Base when Array; value.each { |v| kv_keys = parse(v, event, kv_keys) } else @logger.warn("kv filter has no support for this type of data", - :type => value.type, :value => value) + :type => value.class, :value => value) end # case value end # If we have any keys, create/append the hash @@ -95,7 +96,7 @@ class LogStash::Filters::KV < LogStash::Filters::Base if !event =~ /[@field_split]/ return kv_keys end - scan_re = Regexp.new("([^ "+@field_split+@value_split+"]+)["+@value_split+"](?:\"([^\""+@field_split+"]+)\"|'([^'"+@field_split+"]+)'|([^ "+@field_split+"]+))") + scan_re = Regexp.new("((?:\\\\ |[^"+@field_split+@value_split+"])+)["+@value_split+"](?:\"([^\"]+)\"|'([^']+)'|((?:\\\\ |[^"+@field_split+"])+))") text.scan(scan_re) do |key, v1, v2, v3| value = v1 || v2 || v3 if !@trim.nil? diff --git a/lib/logstash/filters/metrics.rb b/lib/logstash/filters/metrics.rb index 7623eb1a3..7d924d307 100644 --- a/lib/logstash/filters/metrics.rb +++ b/lib/logstash/filters/metrics.rb @@ -14,7 +14,8 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base def register require "metriks" - + require "socket" + @metric_meters = Hash.new { |h,k| h[k] = Metriks.meter(k) } @metric_timers = Hash.new { |h,k| h[k] = Metriks.timer(k) } end # def register @@ -33,6 +34,7 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base def flush event = LogStash::Event.new + event.source_host = Socket.gethostname @metric_meters.each do |name, metric| event["#{name}.count"] = metric.count event["#{name}.rate_1m"] = metric.one_minute_rate @@ -42,12 +44,13 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base @metric_timers.each do |name, metric| event["#{name}.count"] = metric.count - event["#{name}.rate_1m"] = metric.one_mintute_rate + event["#{name}.rate_1m"] = metric.one_minute_rate event["#{name}.rate_5m"] = metric.five_minute_rate event["#{name}.rate_15m"] = metric.fifteen_minute_rate event["#{name}.min"] = metric.min event["#{name}.max"] = metric.max event["#{name}.stddev"] = metric.stddev + event["#{name}.mean"] = metric.mean end filter_matched(event) diff --git a/lib/logstash/filters/mutate.rb b/lib/logstash/filters/mutate.rb index 33ff575be..3c53551a7 100644 --- a/lib/logstash/filters/mutate.rb +++ b/lib/logstash/filters/mutate.rb @@ -63,20 +63,25 @@ class LogStash::Filters::Mutate < LogStash::Filters::Base # Convert a string field by applying a regular expression and a replacement # if the field is not a string, no action will be taken # - # this configuration takes an array consisting of 3 elements per field/substitution + # This configuration takes an array consisting of 3 elements per + # field/substitution. # # be aware of escaping any backslash in the config file # # for example: # - # mutate { - # gsub => [ - # # replace all forward slashes with underscore - # "fieldname", "\\/", "_", - # # replace backslashes, question marks, hashes and minuses with underscore - # "fieldname", "[\\?#-]", "_" - # ] - # } + # filter { + # mutate { + # gsub => [ + # # replace all forward slashes with underscore + # "fieldname", "\\/", "_", + # + # # replace backslashes, question marks, hashes and minuses with + # # underscore + # "fieldname", "[\\?#-]", "_" + # ] + # } + # } # config :gsub, :validate => :array @@ -84,22 +89,58 @@ class LogStash::Filters::Mutate < LogStash::Filters::Base # # Example: # - # mutate { - # uppercase => [ "fieldname" ] - # } - # + # filter { + # mutate { + # uppercase => [ "fieldname" ] + # } + # } config :uppercase, :validate => :array # Convert a string to its lowercase equivalent # # Example: # - # mutate { - # lowercase => [ "fieldname" ] - # } - # + # filter { + # mutate { + # lowercase => [ "fieldname" ] + # } + # } config :lowercase, :validate => :array + # Split a field to an array using a separator character. Only works on string + # fields. + # + # Example: + # + # filter { + # mutate { + # split => ["fieldname", ","] + # } + # } + config :split, :validate => :hash + + # Join an array with a separator character, does nothing on non-array fields + # + # Example: + # + # filter { + # mutate { + # join => ["fieldname", ","] + # } + # } + config :join, :validate => :hash + + # Strip whitespaces + # + # Example: + # + # filter { + # mutate { + # strip => ["field1", "field2"] + # } + # } + config :strip, :validate => :array + public def register valid_conversions = %w(string integer float) @@ -139,6 +180,8 @@ class LogStash::Filters::Mutate < LogStash::Filters::Base uppercase(event) if @uppercase lowercase(event) if @lowercase remove(event) if @remove + split(event) if @split + join(event) if @join filter_matched(event) end # def filter @@ -155,8 +198,8 @@ class LogStash::Filters::Mutate < LogStash::Filters::Base def rename(event) # TODO(sissel): use event.sprintf on the field names? @rename.each do |old, new| - event[new] = event[old] - event.remove(old) + next unless event.include?(old) + event[new] = event.remove(old) end end # def rename @@ -254,4 +297,34 @@ class LogStash::Filters::Mutate < LogStash::Filters::Base end end end # def lowercase + + private + def split(event) + @split.each do |field, separator| + if event[field].is_a?(String) + event[field] = event[field].split(separator) + end + end + end + + private + def join(event) + @join.each do |field, separator| + if event[field].is_a?(Array) + event[field] = event[field].join(separator) + end + end + end + + private + def strip(event) + @strip.each do |field| + if event[field].is_a?(Array) + event[field] = event[field].map{|s| s.strip } + elsif event[field].is_a?(String) + event[field] = event[field].strip + end + end + end + end # class LogStash::Filters::Mutate diff --git a/lib/logstash/filterworker.rb b/lib/logstash/filterworker.rb index ed9b00ca7..b1f47e9a9 100644 --- a/lib/logstash/filterworker.rb +++ b/lib/logstash/filterworker.rb @@ -58,7 +58,7 @@ class LogStash::FilterWorker < LogStash::Plugin end events.each do |event| - @logger.debug("Pushing flushed events", :event => event) + @logger.debug? and @logger.debug("Pushing flushed events", :event => event) @output_queue.push(event) unless event.cancelled? end end # def flusher @@ -95,14 +95,14 @@ class LogStash::FilterWorker < LogStash::Plugin clear_watchdog end if event.cancelled? - @logger.debug("Event cancelled", :event => event, - :filter => filter.class) + @logger.debug? and @logger.debug("Event cancelled", :event => event, + :filter => filter.class) break end end # @filters.each - @logger.debug("Event finished filtering", :event => event, - :thread => Thread.current[:name]) + @logger.debug? and @logger.debug("Event finished filtering", :event => event, + :thread => Thread.current[:name]) @output_queue.push(event) unless event.cancelled? end # events.each end # def filter diff --git a/lib/logstash/inputs/amqp.rb b/lib/logstash/inputs/amqp.rb index cd7db790d..346227e9d 100644 --- a/lib/logstash/inputs/amqp.rb +++ b/lib/logstash/inputs/amqp.rb @@ -30,8 +30,12 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Threadable # Your amqp password config :password, :validate => :password, :default => "guest" - # The name of the queue. - config :name, :validate => :string, :default => "" + # The name of the queue. Depricated due to conflicts with puppet naming convention. + # Replaced by 'queue' variable. See LOGSTASH-755 + config :name, :validate => :string, :deprecated => true + + # The name of the queue. + config :queue, :validate => :string, :default => "" # The name of the exchange to bind the queue. This is analogous to the 'amqp # output' [config 'name'](../outputs/amqp) @@ -86,6 +90,14 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Threadable public def register + + if @name + if @queue + @logger.error("'name' and 'queue' are the same setting, but 'name' is deprecated. Please use only 'queue'") + end + @queue = @name + end + @logger.info("Registering input #{@url}") require "bunny" # rubygem 'bunny' @vhost ||= "/" @@ -106,12 +118,12 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Threadable amqp_credentials << @user if @user amqp_credentials << ":#{@password}" if @password @amqpurl += amqp_credentials unless amqp_credentials.nil? - @amqpurl += "#{@host}:#{@port}#{@vhost}/#{@name}" + @amqpurl += "#{@host}:#{@port}#{@vhost}/#{@queue}" end # def register def run(queue) begin - @logger.debug("Connecting with AMQP settings #{@amqpsettings.inspect} to set up queue #{@name.inspect}") + @logger.debug("Connecting with AMQP settings #{@amqpsettings.inspect} to set up queue #{@queue.inspect}") @bunny = Bunny.new(@amqpsettings) return if terminating? @bunny.start @@ -119,15 +131,15 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Threadable @arguments_hash = Hash[*@arguments] - @queue = @bunny.queue(@name, {:durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive, :arguments => @arguments_hash }) - @queue.bind(@exchange, :key => @key) + @bunnyqueue = @bunny.queue(@queue, {:durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive, :arguments => @arguments_hash }) + @bunnyqueue.bind(@exchange, :key => @key) - @queue.subscribe({:ack => @ack}) do |data| + @bunnyqueue.subscribe({:ack => @ack}) do |data| e = to_event(data[:payload], @amqpurl) if e queue << e end - end # @queue.subscribe + end # @bunnyqueue.subscribe rescue *[Bunny::ConnectionError, Bunny::ServerDownError] => e @logger.error("AMQP connection error, will reconnect: #{e}") @@ -139,8 +151,8 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Threadable end # def run def teardown - @queue.unsubscribe unless @durable == true - @queue.delete unless @durable == true + @bunnyqueue.unsubscribe unless @durable == true + @bunnyqueue.delete unless @durable == true @bunny.close if @bunny finished end # def teardown diff --git a/lib/logstash/inputs/base.rb b/lib/logstash/inputs/base.rb index 9ab98b500..37e306822 100644 --- a/lib/logstash/inputs/base.rb +++ b/lib/logstash/inputs/base.rb @@ -107,6 +107,7 @@ class LogStash::Inputs::Base < LogStash::Plugin :source => source, :exception => e, :backtrace => e.backtrace) event.message = raw + event.tags << "_jsonparsefailure" end when "json_event" begin @@ -124,6 +125,7 @@ class LogStash::Inputs::Base < LogStash::Plugin :input => raw, :source => source, :exception => e, :backtrace => e.backtrace) event.message = raw + event.tags << "_jsonparsefailure" end if event.source == "unknown" @@ -141,7 +143,7 @@ class LogStash::Inputs::Base < LogStash::Plugin event[field] << event.sprintf(value) end - logger.debug(["Received new event", {:source => source, :event => event}]) + @logger.debug? and @logger.debug("Received new event", :source => source, :event => event) return event end # def to_event end # class LogStash::Inputs::Base diff --git a/lib/logstash/inputs/drupal_dblog.rb b/lib/logstash/inputs/drupal_dblog.rb new file mode 100644 index 000000000..5abdba025 --- /dev/null +++ b/lib/logstash/inputs/drupal_dblog.rb @@ -0,0 +1,328 @@ +require "date" +require "logstash/inputs/base" +require "logstash/namespace" + +# Retrieve watchdog log events from a Drupal installation with DBLog enabled. +# The events are pulled out directly from the database. +# The original events are not deleted, and on every consecutive run only new +# events are pulled. +# +# The last watchdog event id that was processed is stored in the Drupal +# variable table with the name "logstash_last_wid". Delete this variable or +# set it to 0 if you want to re-import all events. +# +# More info on DBLog: http://drupal.org/documentation/modules/dblog +# +class LogStash::Inputs::DrupalDblog < LogStash::Inputs::Base + config_name "drupal_dblog" + plugin_status "experimental" + + # Specify all drupal databases that you whish to import from. + # This can be as many as you whish. + # The format is a hash, with a unique site name as the key, and a databse + # url as the value. + # + # Example: + # [ + # "site1", "mysql://user1:password@host1.com/databasename", + # "other_site", "mysql://user2:password@otherhost.com/databasename", + # ... + # ] + config :databases, :validate => :hash + + # By default, the event only contains the current user id as a field. + # If you whish to add the username as an additional field, set this to true. + config :add_usernames, :validate => :boolean, :default => false + + # Time between checks in minutes. + config :interval, :validate => :number, :default => 10 + + # The amount of log messages that should be fetched with each query. + # Bulk fetching is done to prevent querying huge data sets when lots of + # messages are in the database. + config :bulksize, :validate => :number, :default => 5000 + + # Label this input with a type. + # Types are used mainly for filter activation. + # + # + # If you create an input with type "foobar", then only filters + # which also have type "foobar" will act on them. + # + # The type is also stored as part of the event itself, so you + # can also use the type to search for in the web interface. + config :type, :validate => :string, :default => 'watchdog' + + public + def initialize(params) + super + @format = "json_event" + end # def initialize + + public + def register + require "php_serialize" + + if RUBY_PLATFORM == 'java' + require "logstash/inputs/drupal_dblog/jdbcconnection" + else + require "mysql2" + end + end # def register + + public + def config_init(params) + super + + dbs = {} + valid = true + + @databases.each do |name, rawUri| + uri = URI(rawUri) + + dbs[name] = { + "site" => name, + "scheme" => uri.scheme, + "host" => uri.host, + "user" => uri.user, + "password" => uri.password, + "database" => uri.path.sub('/', ''), + "port" => uri.port.to_i + } + + if not ( + uri.scheme and not uri.scheme.empty?\ + and uri.host and not uri.host.empty?\ + and uri.user and not uri.user.empty?\ + and uri.password\ + and uri.path and not uri.path.sub('/', '').empty? + ) + @logger.error("Drupal DBLog: Invalid database URI for #{name} : #{rawUri}") + valid = false + end + if not uri.scheme == 'mysql' + @logger.error("Drupal DBLog: Only mysql databases are supported.") + valid = false + end + end + + if not valid + @logger.error("Config validation failed.") + exit 1 + end + + @databases = dbs + end #def config_init + + public + def run(output_queue) + @logger.info("Initializing drupal_dblog") + + loop do + @logger.debug("Drupal DBLog: Starting to fetch new watchdog entries") + start = Time.now.to_i + + @databases.each do |name, db| + @logger.debug("Drupal DBLog: Checking database #{name}") + check_database(output_queue, db) + @logger.info("Drupal DBLog: Retrieved all new watchdog messages from #{name}") + end + + timeTaken = Time.now.to_i - start + @logger.info("Drupal DBLog: Fetched all new watchdog entries in #{timeTaken} seconds") + + # If fetching of all databases took less time than the interval, + # sleep a bit. + sleepTime = @interval * 60 - timeTaken + if sleepTime > 0 + @logger.debug("Drupal DBLog: Sleeping for #{sleepTime} seconds") + sleep(sleepTime) + end + end # loop + end # def run + + private + def initialize_client(db) + if db["scheme"] == 'mysql' + + if not db["port"] > 0 + db["port"] = 3306 + end + + if RUBY_PLATFORM == 'java' + @client = LogStash::DrupalDblogJavaMysqlConnection.new( + db["host"], + db["user"], + db["password"], + db["database"], + db["port"] + ) + else + @client = Mysql2::Client.new( + :host => db["host"], + :port => db["port"], + :username => db["user"], + :password => db["password"], + :database => db["database"] + ) + end + end + end #def get_client + + private + def check_database(output_queue, db) + + begin + # connect to the MySQL server + initialize_client(db) + rescue Exception => e + @logger.error("Could not connect to database: " + e.message) + return + end #begin + + begin + @sitename = db["site"] + + @usermap = @add_usernames ? get_usermap : nil + + # Retrieve last pulled watchdog entry id + initialLastWid = get_last_wid + lastWid = nil + + + if initialLastWid == false + lastWid = 0 + set_last_wid(0, true) + else + lastWid = initialLastWid + end + + # Fetch new entries, and create the event + while true + results = get_db_rows(lastWid) + if results.length() < 1 + break + end + + @logger.debug("Fetched " + results.length().to_s + " database rows") + + results.each do |row| + event = build_event(row) + if event + output_queue << event + lastWid = row['wid'].to_s + end + end + + set_last_wid(lastWid, false) + end + rescue Exception => e + @logger.error("Error while fetching messages: ", :error => e.message) + end # begin + + # Close connection + @client.close + end # def check_database + + def get_db_rows(lastWid) + query = 'SELECT * from watchdog WHERE wid > ' + lastWid.to_s + " ORDER BY wid asc LIMIT " + @bulksize.to_s + return @client.query(query) + end # def get_db_rows + + private + def update_sitename + if @sitename == "" + result = @client.query('SELECT value FROM variable WHERE name="site_name"') + if result.first() + @sitename = PHP.unserialize(result.first()['value']) + end + end + end # def update_sitename + + private + def get_last_wid + result = @client.query('SELECT value FROM variable WHERE name="logstash_last_wid"') + lastWid = false + + if result.count() > 0 + tmp = result.first()["value"].gsub("i:", "").gsub(";", "") + lastWid = tmp.to_i.to_s == tmp ? tmp : "0" + end + + return lastWid + end # def get_last_wid + + private + def set_last_wid(wid, insert) + wid = PHP.serialize(wid.to_i) + + # Update last import wid variable + if insert + # Does not exist yet, so insert + @client.query('INSERT INTO variable (name, value) VALUES("logstash_last_wid", "' + wid + '")') + else + @client.query('UPDATE variable SET value="' + wid + '" WHERE name="logstash_last_wid"') + end + end # def set_last_wid + + private + def get_usermap + map = {} + + @client.query("SELECT uid, name FROM users").each do |row| + map[row["uid"]] = row["name"] + end + + map[0] = "guest" + return map + end # def get_usermap + + private + def build_event(row) + # Convert unix timestamp + timestamp = Time.at(row["timestamp"]).to_datetime.iso8601 + + msg = row["message"] + vars = {} + + # Unserialize the variables, and construct the message + if row['variables'] != 'N;' + vars = PHP.unserialize(row["variables"]) + + if vars.is_a?(Hash) + vars.each_pair do |k, v| + if msg.scan(k).length() > 0 + msg = msg.gsub(k.to_s, v.to_s) + else + # If not inside the message, add var as an additional field + row["variable_" + k] = v + end + end + end + end + + row.delete("message") + row.delete("variables") + row.delete("timestamp") + + row["severity"] = row["severity"].to_i + + if @add_usernames and @usermap.has_key?(row["uid"]) + row["user"] = @usermap[row["uid"]] + end + + entry = { + "@timestamp" => timestamp, + "@tags" => [], + "@type" => "watchdog", + "@source" => @sitename, + "@fields" => row, + "@message" => msg + } + + event = to_event(JSON.dump(entry), @sitename) + + return event + end # def build_event + +end # class LogStash::Inputs::DrupalDblog diff --git a/lib/logstash/inputs/drupal_dblog/jdbcconnection.rb b/lib/logstash/inputs/drupal_dblog/jdbcconnection.rb new file mode 100644 index 000000000..85aea3d79 --- /dev/null +++ b/lib/logstash/inputs/drupal_dblog/jdbcconnection.rb @@ -0,0 +1,65 @@ +require "java" +require "rubygems" +require "jdbc/mysql" + +java_import "com.mysql.jdbc.Driver" + +# A JDBC mysql connection class. +# The interface is compatible with the mysql2 API. +class LogStash::DrupalDblogJavaMysqlConnection + + def initialize(host, username, password, database, port = nil) + port ||= 3306 + + address = "jdbc:mysql://#{host}:#{port}/#{database}" + @connection = java.sql.DriverManager.getConnection(address, username, password) + end # def initialize + + def query(sql) + if sql =~ /select/i + return select(sql) + else + return update(sql) + end + end # def query + + def select(sql) + stmt = @connection.createStatement + resultSet = stmt.executeQuery(sql) + + meta = resultSet.getMetaData + column_count = meta.getColumnCount + + rows = [] + + while resultSet.next + res = {} + + (1..column_count).each do |i| + name = meta.getColumnName(i) + case meta.getColumnType(i) + when java.sql.Types::INTEGER + res[name] = resultSet.getInt(name) + else + res[name] = resultSet.getString(name) + end + end + + rows << res + end + + stmt.close + return rows + end # def select + + def update(sql) + stmt = @connection.createStatement + stmt.execute_update(sql) + stmt.close + end # def update + + def close + @connection.close + end # def close + +end # class LogStash::DrupalDblogJavaMysqlConnection diff --git a/lib/logstash/inputs/eventlog.rb b/lib/logstash/inputs/eventlog.rb index cc930f56f..f231a6345 100644 --- a/lib/logstash/inputs/eventlog.rb +++ b/lib/logstash/inputs/eventlog.rb @@ -5,19 +5,27 @@ require "socket" # Pull events from a Windows Event Log # # To collect Events from the System Event Log, use a config like: -# input { -# eventlog { -# type => 'Win32-EventLog' -# name => 'System' -# } -# } +# +# input { +# eventlog { +# type => 'Win32-EventLog' +# name => 'System' +# } +# } class LogStash::Inputs::EventLog < LogStash::Inputs::Base config_name "eventlog" plugin_status "beta" + # Event Log Name. Depricated due to conflicts with puppet naming convention. + # Replaced by 'logfile' variable. See LOGSTASH-755 + config :name, :validate => :string, :deprecated => true + # Event Log Name - config :name, :validate => :string, :required => true, :default => "System" + config :logfile, :validate => :string + #:required => true, :default => "System" + + # TODO(sissel): Make 'logfile' required after :name is gone. public def initialize(params) @@ -27,8 +35,21 @@ class LogStash::Inputs::EventLog < LogStash::Inputs::Base public def register + + if @name + @logger.warn("Please use 'logfile' instead of the 'name' setting") + if @logfile + @logger.error("'name' and 'logfile' are the same setting, but 'name' is deprecated. Please use only 'logfile'") + end + @logfile = @name + end + + if @logfile.nil? + raise ArgumentError, "Missing required parameter 'logfile' for input/eventlog" + end + @hostname = Socket.gethostname - @logger.info("Registering input eventlog://#{@hostname}/#{@name}") + @logger.info("Registering input eventlog://#{@hostname}/#{@logfile}") require "win32ole" # rubygem 'win32ole' ('jruby-win32ole' on JRuby) end # def register @@ -43,7 +64,7 @@ class LogStash::Inputs::EventLog < LogStash::Inputs::Base newest_shipped_event = latest_record_number next_newest_shipped_event = newest_shipped_event begin - @logger.debug("Tailing Windows Event Log '#{@name}'") + @logger.debug("Tailing Windows Event Log '#{@logfile}'") loop do event_index = 0 latest_events.each do |event| @@ -51,7 +72,7 @@ class LogStash::Inputs::EventLog < LogStash::Inputs::Base timestamp = DateTime.strptime(event.TimeGenerated, "%Y%m%d%H%M%S").iso8601 timestamp[19..-1] = DateTime.now.iso8601[19..-1] # Copy over the correct TZ offset e = LogStash::Event.new({ - "@source" => "eventlog://#{@hostname}/#{@name}", + "@source" => "eventlog://#{@hostname}/#{@logfile}", "@type" => @type, "@timestamp" => timestamp }) @@ -81,7 +102,7 @@ class LogStash::Inputs::EventLog < LogStash::Inputs::Base private def latest_events - wmi_query = "select * from Win32_NTLogEvent where Logfile = '#{@name}'" + wmi_query = "select * from Win32_NTLogEvent where Logfile = '#{@logfile}'" events = @wmi.ExecQuery(wmi_query) end # def latest_events diff --git a/lib/logstash/inputs/gemfire.rb b/lib/logstash/inputs/gemfire.rb index b2230b055..2b38c4040 100644 --- a/lib/logstash/inputs/gemfire.rb +++ b/lib/logstash/inputs/gemfire.rb @@ -17,7 +17,10 @@ class LogStash::Inputs::Gemfire < LogStash::Inputs::Threadable plugin_status "experimental" # Your client cache name - config :name, :validate => :string, :default => "logstash" + config :name, :validate => :string, :deprecated => true + + # Your client cache name + config :cache_name, :validate => :string, :default => "logstash" # The path to a GemFire client cache XML file. # @@ -51,6 +54,13 @@ class LogStash::Inputs::Gemfire < LogStash::Inputs::Threadable # How the message is serialized in the cache. Can be one of "json" or "plain"; default is plain config :serialization, :validate => :string, :default => nil + if @name + if @cache_name + @logger.error("'name' and 'cache_name' are the same setting, but 'name' is deprecated. Please use only 'cache_name'") + end + @cache_name = @name + end + public def initialize(params) super @@ -97,10 +107,10 @@ class LogStash::Inputs::Gemfire < LogStash::Inputs::Threadable protected def connect begin - @logger.debug("Connecting to GemFire #{@name}") + @logger.debug("Connecting to GemFire #{@cache_name}") @cache = ClientCacheFactory.new. - set("name", @name). + set("name", @cache_name). set("cache-xml-file", @cache_xml_file).create @logger.debug("Created cache #{@cache.inspect}") diff --git a/lib/logstash/inputs/heroku.rb b/lib/logstash/inputs/heroku.rb index 802a3c629..2da8571e9 100644 --- a/lib/logstash/inputs/heroku.rb +++ b/lib/logstash/inputs/heroku.rb @@ -8,10 +8,10 @@ require "logstash/namespace" # # Recommended filters: # -# filter { -# grok { -# pattern => "^%{TIMESTAMP_ISO8601:timestamp} %{WORD:component}\[%{WORD:process}(?:\.%{INT:instance:int})?\]: %{DATA:message}$" -# } +# filter { +# grok { +# pattern => "^%{TIMESTAMP_ISO8601:timestamp} %{WORD:component}\[%{WORD:process}(?:\.%{INT:instance:int})?\]: %{DATA:message}$" +# } # date { timestamp => ISO8601 } # } class LogStash::Inputs::Heroku < LogStash::Inputs::Base diff --git a/lib/logstash/inputs/irc.rb b/lib/logstash/inputs/irc.rb index 431d1da1e..9cdaeac75 100644 --- a/lib/logstash/inputs/irc.rb +++ b/lib/logstash/inputs/irc.rb @@ -25,7 +25,7 @@ class LogStash::Inputs::Irc < LogStash::Inputs::Base config :real, :validate => :string, :default => "logstash" # IRC Server password - config :password, :validate => :password, :default => nil + config :password, :validate => :password # Channels to listen to config :channels, :validate => :array, :required => true diff --git a/lib/logstash/inputs/stomp.rb b/lib/logstash/inputs/stomp.rb index a15626399..6c13dbef5 100644 --- a/lib/logstash/inputs/stomp.rb +++ b/lib/logstash/inputs/stomp.rb @@ -58,6 +58,13 @@ class LogStash::Inputs::Stomp < LogStash::Inputs::Base e = to_event(msg.body, @stomp_url) @output_queue << e if e end + #In the event that there is only Stomp input plugin instances + #the process ends prematurely. The above code runs, and return + #the flow control to the 'run' method below. After that, the + #method "run_input" from agent.rb marks 'done' as 'true' and calls + #'finish' over the Stomp plugin instance. + #'Sleeping' the plugin leves the instance alive. + sleep end public diff --git a/lib/logstash/inputs/tcp.rb b/lib/logstash/inputs/tcp.rb index 7f63b0f8a..a94f84fcd 100644 --- a/lib/logstash/inputs/tcp.rb +++ b/lib/logstash/inputs/tcp.rb @@ -67,10 +67,10 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base end # loop do rescue => e @logger.debug("Closing connection", :client => socket.peer, - :exception => e, :backtrace => e.backtrace) + :exception => e, :backtrace => e.backtrace) rescue Timeout::Error @logger.debug("Closing connection after read timeout", - :client => socket.peer) + :client => socket.peer) end # begin begin @@ -95,15 +95,26 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base if server? loop do # Start a new thread for each connection. - Thread.start(@server_socket.accept) do |s| - # TODO(sissel): put this block in its own method. + begin + Thread.start(@server_socket.accept) do |s| + # TODO(sissel): put this block in its own method. - # monkeypatch a 'peer' method onto the socket. - s.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } - @logger.debug("Accepted connection", :client => s.peer, - :server => "#{@host}:#{@port}") - handle_socket(s, output_queue, "tcp://#{@host}:#{@port}/client/#{s.peer}") - end # Thread.start + # monkeypatch a 'peer' method onto the socket. + s.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } + @logger.debug("Accepted connection", :client => s.peer, + :server => "#{@host}:#{@port}") + handle_socket(s, output_queue, "tcp://#{@host}:#{@port}/client/#{s.peer}") + + end # Thread.start + rescue IOError + if @interrupted + #Intended shutdown, get out of the loop + break + else + # Else it was a genuine IOError caused by something else, so propagate it up.. + raise + end + end end # loop else loop do @@ -114,4 +125,12 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base end # loop end end # def run + + public + def teardown + if server? + @interrupted = true + @server_socket.close + end + end # def teardown end # class LogStash::Inputs::Tcp diff --git a/lib/logstash/outputs/amqp.rb b/lib/logstash/outputs/amqp.rb index bd1ff2f11..676d62ea8 100644 --- a/lib/logstash/outputs/amqp.rb +++ b/lib/logstash/outputs/amqp.rb @@ -28,8 +28,12 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base # The exchange type (fanout, topic, direct) config :exchange_type, :validate => [ "fanout", "direct", "topic"], :required => true + # The name of the exchange. Depricated due to conflicts with puppet naming convention. + # Replaced by 'exchange' variable. See LOGSTASH-755 + config :name, :validate => :string, :deprecated => true + # The name of the exchange - config :name, :validate => :string, :required => true + config :exchange, :validate => :string # TODO(sissel): Make it required when 'name' is gone # Key to route to by default. Defaults to 'logstash' # @@ -59,6 +63,13 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base def register require "bunny" # rubygem 'bunny' + if @name + if @exchange + @logger.error("'name' and 'exchange' are the same setting, but 'name' is deprecated. Please use only 'exchange'") + end + @exchange = @name + end + @logger.info("Registering output", :plugin => self) connect end # def register @@ -78,7 +89,7 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base begin @logger.debug("Connecting to AMQP", :settings => amqpsettings, - :exchange_type => @exchange_type, :name => @name) + :exchange_type => @exchange_type, :name => @exchange) @bunny = Bunny.new(amqpsettings) @bunny.start rescue => e @@ -92,11 +103,11 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base end end - @logger.debug("Declaring exchange", :name => @name, :type => @exchange_type, + @logger.debug("Declaring exchange", :name => @exchange, :type => @exchange_type, :durable => @durable) - @exchange = @bunny.exchange(@name, :type => @exchange_type.to_sym, :durable => @durable) + @bunnyexchange = @bunny.exchange(@exchange, :type => @exchange_type.to_sym, :durable => @durable) - @logger.debug("Binding exchange", :name => @name, :key => @key) + @logger.debug("Binding exchange", :name => @exchange, :key => @key) end # def connect public @@ -118,9 +129,9 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base public def receive_raw(message, key=@key) begin - if @exchange + if @bunnyexchange @logger.debug(["Publishing message", { :destination => to_s, :message => message, :key => key }]) - @exchange.publish(message, :persistent => @persistent, :key => key) + @bunnyexchange.publish(message, :persistent => @persistent, :key => key) else @logger.warn("Tried to send message, but not connected to amqp yet.") end @@ -133,14 +144,14 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base public def to_s - return "amqp://#{@user}@#{@host}:#{@port}#{@vhost}/#{@exchange_type}/#{@name}\##{@key}" + return "amqp://#{@user}@#{@host}:#{@port}#{@vhost}/#{@exchange_type}/#{@exchange}\##{@key}" end public def teardown @bunny.close rescue nil @bunny = nil - @exchange = nil + @bunnyexchange = nil finished end # def teardown end # class LogStash::Outputs::Amqp diff --git a/lib/logstash/outputs/base.rb b/lib/logstash/outputs/base.rb index 37bab342b..f993036c4 100644 --- a/lib/logstash/outputs/base.rb +++ b/lib/logstash/outputs/base.rb @@ -59,28 +59,28 @@ class LogStash::Outputs::Base < LogStash::Plugin def output?(event) if !@type.empty? if event.type != @type - @logger.debug(["Dropping event because type doesn't match #{@type}", event]) + @logger.debug? and @logger.debug(["Dropping event because type doesn't match #{@type}", event]) return false end end if !@tags.empty? if (event.tags & @tags).size != @tags.size - @logger.debug(["Dropping event because tags don't match #{@tags.inspect}", event]) + @logger.debug? and @logger.debug(["Dropping event because tags don't match #{@tags.inspect}", event]) return false end end if !@exclude_tags.empty? if (diff_tags = (event.tags & @exclude_tags)).size != 0 - @logger.debug(["Dropping event because tags contains excluded tags: #{diff_tags.inspect}", event]) + @logger.debug? and @logger.debug(["Dropping event because tags contains excluded tags: #{diff_tags.inspect}", event]) return false end end if !@fields.empty? if (event.fields.keys & @fields).size != @fields.size - @logger.debug(["Dropping event because type doesn't match #{@fields.inspect}", event]) + @logger.debug? and @logger.debug(["Dropping event because type doesn't match #{@fields.inspect}", event]) return false end end diff --git a/lib/logstash/outputs/cloudwatch.rb b/lib/logstash/outputs/cloudwatch.rb index 8d42d9624..fe5b03dc7 100644 --- a/lib/logstash/outputs/cloudwatch.rb +++ b/lib/logstash/outputs/cloudwatch.rb @@ -1,10 +1,6 @@ require "logstash/outputs/base" require "logstash/namespace" -require "thread" -require "rufus/scheduler" -require "aws" - # This output lets you aggregate and send metric data to AWS CloudWatch # # Configuration is done partly in this output and partly using fields added @@ -24,6 +20,20 @@ class LogStash::Outputs::CloudWatch < LogStash::Outputs::Base config_name "cloudwatch" plugin_status "experimental" + # Constants + # aggregate_key members + DIMENSIONS = "dimensions" + TIMESTAMP = "timestamp" + METRIC = "metric" + COUNT = "count" + UNIT = "unit" + SUM = "sum" + MIN = "min" + MAX = "max" + # Units + COUNT_UNIT = "Count" + NONE = "None" + # The AWS Region to send logs to. config :region, :validate => :string, :default => "us-east-1" @@ -43,44 +53,72 @@ class LogStash::Outputs::CloudWatch < LogStash::Outputs::Base # See here for allowed values: https://github.com/jmettraux/rufus-scheduler#the-time-strings-understood-by-rufus-scheduler config :timeframe, :validate => :string, :default => "1m" + # How many events to queue before forcing a call to the CloudWatch API ahead of "timeframe" schedule + # Set this to the number of events-per-timeframe you will be sending to CloudWatch to avoid extra API calls + config :queue_size, :validate => :number, :default => 10000 + # The default namespace to use for events which do not have a "CW_namespace" field config :namespace, :validate => :string, :default => "Logstash" - # The name of the field used to set the metric name on an event - config :field_metric, :validate => :string, :default => "CW_metric" - # The name of the field used to set a different namespace per event config :field_namespace, :validate => :string, :default => "CW_namespace" - # The name of the field used to set the units on an event metric + # The default metric name to use for events which do not have a "CW_metricname" field. + # If this is provided then all events which pass through this output will be aggregated and + # sent to CloudWatch, so use this carefully. Furthermore, when providing this option, you + # will probably want to also restrict events from passing through this output using event + # type, tag, and field matching + # + # At a minimum events must have a "metric name" to be sent to CloudWatch. This can be achieved + # either by providing a default here, as described above, OR by adding a "CW_metricname" field + # to the events themselves, as described below. By default, if no other configuration is + # provided besides a metric name, then events will be counted (Unit: Count, Value: 1) + # by their metric name (either this default or from their CW_metricname field) + config :metricname, :validate => :string + + # The name of the field used to set the metric name on an event + config :field_metricname, :validate => :string, :default => "CW_metricname" + + VALID_UNITS = ["Seconds", "Microseconds", "Milliseconds", "Bytes", + "Kilobytes", "Megabytes", "Gigabytes", "Terabytes", + "Bits", "Kilobits", "Megabits", "Gigabits", "Terabits", + "Percent", COUNT_UNIT, "Bytes/Second", "Kilobytes/Second", + "Megabytes/Second", "Gigabytes/Second", "Terabytes/Second", + "Bits/Second", "Kilobits/Second", "Megabits/Second", + "Gigabits/Second", "Terabits/Second", "Count/Second", NONE] + + # The default unit to use for events which do not have a "CW_unit" field + config :unit, :validate => VALID_UNITS, :default => COUNT_UNIT + + # The name of the field used to set the unit on an event metric config :field_unit, :validate => :string, :default => "CW_unit" + # The default value to use for events which do not have a "CW_value" field + # If provided, this must be a string which can be converted to a float, for example... + # "1", "2.34", ".5", and "0.67" + config :value, :validate => :string, :default => "1" + # The name of the field used to set the value (float) on an event metric config :field_value, :validate => :string, :default => "CW_value" - # The name of the field used to set the dimension name on an event metric - config :field_dimensionname, :validate => :string, :default => "CW_dimensionName" + # The default dimensions [ name, value, ... ] to use for events which do not have a "CW_dimensions" field + config :dimensions, :validate => :hash - # The name of the field used to set the dimension value on an event metric - config :field_dimensionvalue, :validate => :string, :default => "CW_dimensionValue" - - # aggregate_key members - DIM_NAME = "dimensionName" - DIM_VALUE = "dimensionValue" - TIMESTAMP = "timestamp" - METRIC = "metric" - COUNT = "count" - UNIT = "unit" - SUM = "sum" - MIN = "min" - MAX = "max" - - # Units - COUNT_UNIT = "Count" - NONE = "None" + # The name of the field used to set the dimensions on an event metric + # this field named here, if present in an event, must have an array of + # one or more key & value pairs, for example... + # add_field => [ "CW_dimensions", "Environment", "CW_dimensions", "prod" ] + # or, equivalently... + # add_field => [ "CW_dimensions", "Environment" ] + # add_field => [ "CW_dimensions", "prod" ] + config :field_dimensions, :validate => :string, :default => "CW_dimensions" public def register + require "thread" + require "rufus/scheduler" + require "aws" + AWS.config( :access_key_id => @access_key, :secret_access_key => @secret_key, @@ -88,15 +126,13 @@ class LogStash::Outputs::CloudWatch < LogStash::Outputs::Base ) @cw = AWS::CloudWatch.new - @valid_units = ["Seconds", "Microseconds", "Milliseconds", "Bytes", "Kilobytes", "Megabytes", "Gigabytes", "Terabytes", "Bits", "Kilobits", "Megabits", "Gigabits", "Terabits", "Percent", COUNT_UNIT, "Bytes/Second", "Kilobytes/Second", "Megabytes/Second", "Gigabytes/Second", "Terabytes/Second", "Bits/Second", "Kilobits/Second", "Megabits/Second", "Gigabits/Second", "Terabits/Second", "Count/Second", NONE] - - @event_queue = Queue.new + @event_queue = SizedQueue.new(@queue_size) @scheduler = Rufus::Scheduler.start_new @job = @scheduler.every @timeframe do @logger.info("Scheduler Activated") - send(aggregate({})) + publish(aggregate({})) end - end + end # def register public def receive(event) @@ -110,18 +146,23 @@ class LogStash::Outputs::CloudWatch < LogStash::Outputs::Base return end - return unless event.fields.member?(@field_metric) + return unless (event[@field_metricname] || @metricname) + + if (@event_queue.length >= @event_queue.max) + @job.trigger + @logger.warn("Posted to AWS CloudWatch ahead of schedule. If you see this often, consider increasing the cloudwatch queue_size option.") + end @logger.info("Queueing event", :event => event) @event_queue << event end # def receive private - def send(aggregates) - aggregates.each { |namespace, data| + def publish(aggregates) + aggregates.each do |namespace, data| @logger.info("Namespace, data: ", :namespace => namespace, :data => data) metric_data = [] - data.each { |aggregate_key, stats| + data.each do |aggregate_key, stats| new_data = { :metric_name => aggregate_key[METRIC], :timestamp => aggregate_key[TIMESTAMP], @@ -133,36 +174,36 @@ class LogStash::Outputs::CloudWatch < LogStash::Outputs::Base :maximum => stats[MAX], } } - if (aggregate_key[DIM_NAME] != nil && aggregate_key[DIM_VALUE] != nil) - new_data[:dimensions] = [{ - :name => aggregate_key[DIM_NAME], - :value => aggregate_key[DIM_VALUE] - }] + dims = aggregate_key[DIMENSIONS] + if (dims.is_a?(Array) && dims.length > 0 && (dims.length % 2) == 0) + new_data[:dimensions] = Array.new + i = 0 + while (i < dims.length) + new_data[:dimensions] << {:name => dims[i], :value => dims[i+1]} + i += 2 + end end metric_data << new_data - } # data.each + end # data.each begin response = @cw.put_metric_data( :namespace => namespace, :metric_data => metric_data ) - @logger.info("Sent data to AWS CloudWatch OK") + @logger.info("Sent data to AWS CloudWatch OK", :namespace => namespace, :metric_data => metric_data) rescue Exception => e @logger.warn("Failed to send to AWS CloudWatch", :exception => e, :namespace => namespace, :metric_data => metric_data) break end - } # aggregates.each + end # aggregates.each return aggregates - end - - # def send + end# def publish private def aggregate(aggregates) - @logger.info("QUEUE SIZE ", :queuesize => @event_queue.size) - until @event_queue.empty? do + while !@event_queue.empty? do begin count(aggregates, @event_queue.pop(true)) rescue Exception => e @@ -171,52 +212,67 @@ class LogStash::Outputs::CloudWatch < LogStash::Outputs::Base end end return aggregates - end + end # def aggregate private def count(aggregates, event) + # If the event doesn't declare a namespace, use the default + fnamespace = field(event, @field_namespace) + namespace = (fnamespace ? fnamespace : event.sprintf(@namespace)) - # If the event doesnt declare a namespace, use the default - ns = field(event, @field_namespace) - namespace = (!ns) ? @namespace : ns + funit = field(event, @field_unit) + unit = (funit ? funit : event.sprintf(@unit)) - unit = field(event, @field_unit) - value = field(event, @field_value) + fvalue = field(event, @field_value) + value = (fvalue ? fvalue : event.sprintf(@value)) - # If neither Units nor Value is set, then we simply count the event - if (!unit && !value) - unit = COUNT - value = "1" + # We may get to this point with valid Units but missing value. Send zeros. + val = (!value) ? 0.0 : value.to_f + + # Event provides exactly one (but not both) of value or unit + if ( (fvalue == nil) ^ (funit == nil) ) + @logger.warn("Likely config error: event has one of #{@field_value} or #{@field_unit} fields but not both.", :event => event) end - # If Units is still not set (or is invalid), then we know Value must BE set, so set Units to "None" - # And warn about misconfiguration - if (!unit || !@valid_units.include?(unit)) + # If Unit is still not set or is invalid warn about misconfiguration & use NONE + if (!VALID_UNITS.include?(unit)) unit = NONE - @logger.warn("Possible config error: CloudWatch Value found with invalid or missing Units") + @logger.warn("Likely config error: invalid or missing Units (#{unit.to_s}), using '#{NONE}' instead", :event => event) end - if (!aggregates[namespace]) aggregates[namespace] = {} - @logger.info("INITIALIZING NAMESPACE DATA") end + dims = event[@field_dimensions] + if (dims) # event provides dimensions + # validate the structure + if (!dims.is_a?(Array) || dims.length == 0 || (dims.length % 2) != 0) + @logger.warn("Likely config error: CloudWatch dimensions field (#{dims.to_s}) found which is not a positive- & even-length array. Ignoring it.", :event => event) + dims = nil + end + # Best case, we get here and exit the conditional because dims... + # - is an array + # - with positive length + # - and an even number of elements + elsif (@dimensions.is_a?(Hash)) # event did not provide dimensions, but the output has been configured with a default + dims = @dimensions.flatten.map{|d| event.sprintf(d)} # into the kind of array described just above + else + dims = nil + end + + fmetric = field(event, @field_metricname) aggregate_key = { - METRIC => field(event, @field_metric), - DIM_NAME => field(event, @field_dimensionname), - DIM_VALUE => field(event, @field_dimensionvalue), + METRIC => (fmetric ? fmetric : event.sprintf(@metricname)), + DIMENSIONS => dims, UNIT => unit, - TIMESTAMP => normalizeTimestamp(event.timestamp) + TIMESTAMP => event.sprintf("%{+YYYY-MM-dd'T'HH:mm:00Z}") } if (!aggregates[namespace][aggregate_key]) aggregates[namespace][aggregate_key] = {} end - # We may get to this point with valid Units but missing value. Send zeros. - val = (!value) ? 0.0 : value.to_f - if (!aggregates[namespace][aggregate_key][MAX] || val > aggregates[namespace][aggregate_key][MAX]) aggregates[namespace][aggregate_key][MAX] = val end @@ -236,20 +292,19 @@ class LogStash::Outputs::CloudWatch < LogStash::Outputs::Base else aggregates[namespace][aggregate_key][SUM] += val end - end - - # Zeros out the seconds in a ISO8601 timestamp like event.timestamp - public - def normalizeTimestamp(time) - tz = (time[-1, 1] == "Z") ? "Z" : time[-5, 5] - totheminute = time[0..16] - normal = totheminute + "00.000" + tz - return normal - end + end # def count private def field(event, fieldname) - return event.fields.member?(fieldname) ? event.fields[fieldname][0] : nil - end + if !event[fieldname] + return nil + else + if event[fieldname].is_a?(Array) + return event[fieldname][0] + else + return event[fieldname] + end + end + end # def field end # class LogStash::Outputs::CloudWatch diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index e01afbd7d..f4d7deaf3 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -5,14 +5,20 @@ require "logstash/outputs/base" # output for logstash. If you plan on using the logstash web interface, you'll # need to use this output. # -# *NOTE*: The elasticsearch client is version 0.19.8. Your elasticsearch -# cluster must be running 0.19.x for API compatibility. +# *VERSION NOTE*: Your elasticsearch cluster must be running elasticsearch +# %ELASTICSEARCH_VERSION%. If you use any other version of elasticsearch, +# you should consider using the [elasticsearch_http](elasticsearch_http) +# output instead. # # If you want to set other elasticsearch options that are not exposed directly # as config options, there are two options: +# # * create an elasticsearch.yml file in the $PWD of the logstash process # * pass in es.* java properties (java -Des.node.foo= or ruby -J-Des.node.foo=) # +# This plugin will join your elasticsearch cluster, so it will show up in +# elasticsearch's cluster health status. +# # You can learn more about elasticsearch at class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base @@ -31,6 +37,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # similar events to the same 'type'. String expansion '%{foo}' works here. config :index_type, :validate => :string, :default => "%{@type}" + # The document ID for the index. Overwrites any existing entry in elasticsearch with the same ID. + config :id, :validate => :string, :default => nil + # The name of your cluster if you set it on the ElasticSearch side. Useful # for discovery. config :cluster, :validate => :string @@ -154,7 +163,13 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base end end - req = @client.index(index, type, event.to_hash) + if id.nil? + req = @client.index(index, type, event.to_hash) + else + id = event.sprintf(@id) + req = @client.index(index, type, id, event.to_hash) + end + increment_inflight_request_count #timer = @logger.time("elasticsearch write") req.on(:success) do |response| diff --git a/lib/logstash/outputs/elasticsearch_http.rb b/lib/logstash/outputs/elasticsearch_http.rb index 03f54fc15..8a5e9ce8d 100644 --- a/lib/logstash/outputs/elasticsearch_http.rb +++ b/lib/logstash/outputs/elasticsearch_http.rb @@ -3,8 +3,9 @@ require "logstash/outputs/base" # This output lets you store logs in elasticsearch. # -# This output differs from the 'elasticsearch' output by using the HTTP -# interface for indexing data with elasticsearch. +# This plugin uses the HTTP/REST interface to ElasticSearch, which usually +# lets you use any version of elasticsearch server. It is known to work +# with elasticsearch %ELASTICSEARCH_VERSION% # # You can learn more about elasticsearch at class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base diff --git a/lib/logstash/outputs/elasticsearch_river.rb b/lib/logstash/outputs/elasticsearch_river.rb index 605bae493..18075c7f5 100644 --- a/lib/logstash/outputs/elasticsearch_river.rb +++ b/lib/logstash/outputs/elasticsearch_river.rb @@ -57,9 +57,13 @@ class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base # AMQP vhost config :vhost, :validate => :string, :default => "/" - # AMQP queue name - config :name, :validate => :string, :default => "elasticsearch" + # AMQP queue name. Depricated due to conflicts with puppet naming convention. + # Replaced by 'queue' variable. See LOGSTASH-755 + config :name, :validate => :string, :deprecated => true + # AMQP queue name + config :queue, :validate => :string, :default => "elasticsearch" + # AMQP exchange name config :exchange, :validate => :string, :default => "elasticsearch" @@ -78,6 +82,14 @@ class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base public def register + + if @name + if @queue + @logger.error("'name' and 'queue' are the same setting, but 'name' is deprecated. Please use only 'queue'") + end + @queue = @name + end + # TODO(sissel): find a better way of declaring where the elasticsearch # libraries are # TODO(sissel): can skip this step if we're running from a jar. @@ -126,7 +138,7 @@ class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base "user" => @user, "pass" => @password, "vhost" => @vhost, - "queue" => @name, + "queue" => @queue, "exchange" => @exchange, "routing_key" => @key, "exchange_type" => @exchange_type, diff --git a/lib/logstash/outputs/gelf.rb b/lib/logstash/outputs/gelf.rb index 300115a03..884a8c6fd 100644 --- a/lib/logstash/outputs/gelf.rb +++ b/lib/logstash/outputs/gelf.rb @@ -29,7 +29,7 @@ class LogStash::Outputs::Gelf < LogStash::Outputs::Base # useful if you want to parse the 'log level' from an event and use that # as the gelf level/severity. # - # Values here can be integers [0..7] inclusive or any of + # Values here can be integers [0..7] inclusive or any of # "debug", "info", "warn", "error", "fatal", "unknown" (case insensitive). # Single-character versions of these are also valid, "d", "i", "w", "e", "f", # "u" @@ -88,9 +88,9 @@ class LogStash::Outputs::Gelf < LogStash::Outputs::Base # If we leave that set, the gelf gem will extract the file and line number # of the source file that logged the message (i.e. logstash/gelf.rb:138). - # With that set to false, it can use the actual event's filename (i.e. + # With that set to false, it can use the actual event's filename (i.e. # /var/log/syslog), which is much more useful - @gelf.collect_file_and_line = false + @gelf.collect_file_and_line = false # these are syslog words and abbreviations mapped to RFC 5424 integers @level_map = { @@ -162,10 +162,10 @@ class LogStash::Outputs::Gelf < LogStash::Outputs::Base if @level.is_a?(Array) @level.each do |value| parsed_value = event.sprintf(value) - if parsed_value - level = parsed_value - break - end + next if value.count('%{') > 0 and parsed_value == value + + level = parsed_value + break end else level = event.sprintf(@level.to_s) diff --git a/lib/logstash/outputs/gemfire.rb b/lib/logstash/outputs/gemfire.rb index 2bf51248d..708bd5d66 100644 --- a/lib/logstash/outputs/gemfire.rb +++ b/lib/logstash/outputs/gemfire.rb @@ -16,7 +16,10 @@ class LogStash::Outputs::Gemfire < LogStash::Outputs::Base plugin_status "experimental" # Your client cache name - config :name, :validate => :string, :default => "logstash" + config :name, :validate => :string, :deprecated => true + + # Your client cache name + config :cache_name, :validate => :string, :default => "logstash" # The path to a GemFire client cache XML file. # @@ -40,6 +43,14 @@ class LogStash::Outputs::Gemfire < LogStash::Outputs::Base # A sprintf format to use when building keys config :key_format, :validate => :string, :default => "%{@source}-%{@timestamp}" + if @name + if @cache_name + @logger.error("'name' and 'cache_name' are the same setting, but 'name' is deprecated. Please use only 'cache_name'") + end + @cache_name = @name + end + + public def register import com.gemstone.gemfire.cache.client.ClientCacheFactory @@ -52,10 +63,10 @@ class LogStash::Outputs::Gemfire < LogStash::Outputs::Base public def connect begin - @logger.debug("Connecting to GemFire #{@name}") + @logger.debug("Connecting to GemFire #{@cache_name}") @cache = ClientCacheFactory.new. - set("name", @name). + set("name", @cache_name). set("cache-xml-file", @cache_xml_file).create @logger.debug("Created cache #{@cache.inspect}") @@ -90,7 +101,7 @@ class LogStash::Outputs::Gemfire < LogStash::Outputs::Base public def to_s - return "gemfire://#{name}" + return "gemfire://#{cache_name}" end public diff --git a/lib/logstash/outputs/graphite.rb b/lib/logstash/outputs/graphite.rb index b0efee006..207427639 100644 --- a/lib/logstash/outputs/graphite.rb +++ b/lib/logstash/outputs/graphite.rb @@ -28,6 +28,9 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base # coerced will zero (0) config :metrics, :validate => :hash, :required => true + # Enable debug output + config :debug, :validate => :boolean, :default => false + def register connect end # def register @@ -52,8 +55,13 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base # Catch exceptions like ECONNRESET and friends, reconnect on failure. @metrics.each do |metric, value| + @logger.debug("processing", :metric => metric, :value => value) + message = [event.sprintf(metric), event.sprintf(value).to_f, event.sprintf("%{+%s}")].join(" ") + + @logger.debug("Sending carbon message", :message => message, :host => @host, :port => @port) + # TODO(sissel): Test error cases. Catch exceptions. Find fortune and glory. begin @socket.puts(message) diff --git a/lib/logstash/outputs/http.rb b/lib/logstash/outputs/http.rb index 94efd02dc..480cf4b23 100644 --- a/lib/logstash/outputs/http.rb +++ b/lib/logstash/outputs/http.rb @@ -101,12 +101,16 @@ class LogStash::Outputs::Http < LogStash::Outputs::Base else request.body = encode(evt) end - puts request - puts - puts request.body + #puts "#{request.port} / #{request.protocol}" + #puts request + #puts + #puts request.body response = @agent.execute(request) - puts response - response.read_body { |c| puts c } + + # Consume body to let this connection be reused + rbody = "" + response.read_body { |c| rbody << c } + #puts rbody rescue Exception => e @logger.warn("Unhandled exception", :request => request, :response => response, :exception => e, :stacktrace => e.backtrace) end diff --git a/lib/logstash/outputs/irc.rb b/lib/logstash/outputs/irc.rb index 1db3d821c..00cdfaddf 100644 --- a/lib/logstash/outputs/irc.rb +++ b/lib/logstash/outputs/irc.rb @@ -24,6 +24,9 @@ class LogStash::Outputs::Irc < LogStash::Outputs::Base # IRC Real name config :real, :validate => :string, :default => "logstash" + # IRC server password + config :password, :validate => :password + # Channels to broadcast to config :channels, :validate => :array, :required => true @@ -45,8 +48,6 @@ class LogStash::Outputs::Irc < LogStash::Outputs::Base c.user = @user c.realname = @real c.channels = @channels - c.channels = @channels - c.channels = @channels c.password = @password end Thread.new(@bot) do |bot| diff --git a/lib/logstash/outputs/syslog.rb b/lib/logstash/outputs/syslog.rb new file mode 100644 index 000000000..81e80534d --- /dev/null +++ b/lib/logstash/outputs/syslog.rb @@ -0,0 +1,139 @@ +require "logstash/outputs/base" +require "logstash/namespace" +require "date" + +# Send events to a syslog server. +# +# You can send messages compliant with RFC3164 or RFC5424 +# UDP or TCP syslog transport is supported +class LogStash::Outputs::Syslog < LogStash::Outputs::Base + config_name "syslog" + plugin_status "experimental" + + FACILITY_LABELS = [ + "kernel", + "user-level", + "mail", + "daemon", + "security/authorization", + "syslogd", + "line printer", + "network news", + "uucp", + "clock", + "security/authorization", + "ftp", + "ntp", + "log audit", + "log alert", + "clock", + "local0", + "local1", + "local2", + "local3", + "local4", + "local5", + "local6", + "local7", + ] + + SEVERITY_LABELS = [ + "emergency", + "alert", + "critical", + "error", + "warning", + "notice", + "informational", + "debug", + ] + + # syslog server address to connect to + config :host, :validate => :string, :required => true + + # syslog server port to connect to + config :port, :validate => :number, :required => true + + # syslog server protocol. you can choose between udp and tcp + config :protocol, :validate => ["tcp", "udp"], :default => "udp" + + # facility label for syslog message + config :facility, :validate => FACILITY_LABELS, :required => true + + # severity label for syslog message + config :severity, :validate => SEVERITY_LABELS, :required => true + + # source host for syslog message + config :sourcehost, :validate => :string, :default => "%{@source_host}" + + # timestamp for syslog message + config :timestamp, :validate => :string, :default => "%{@timestamp}" + + # application name for syslog message + config :appname, :validate => :string, :default => "LOGSTASH" + + # process id for syslog message + config :procid, :validate => :string, :default => "-" + + # message id for syslog message + config :msgid, :validate => :string, :default => "-" + + # syslog message format: you can choose between rfc3164 or rfc5424 + config :rfc, :validate => ["rfc3164", "rfc5424"], :default => "rfc3164" + + + public + def register + @client_socket = nil + end + + private + def udp? + @protocol == "udp" + end + + private + def rfc3164? + @rfc == "rfc3164" + end + + private + def connect + if udp? + @client_socket = UDPSocket.new + @client_socket.connect(@host, @port) + else + @client_socket = TCPSocket.new(@host, @port) + end + end + + public + def receive(event) + return unless output?(event) + + sourcehost = event.sprintf(@sourcehost) + + facility_code = FACILITY_LABELS.index(@facility) + + severity_code = SEVERITY_LABELS.index(@severity) + + priority = (facility_code * 8) + severity_code + + if rfc3164? + timestamp = DateTime.iso8601(event.sprintf(@timestamp)).strftime("%b %e %H:%M:%S") + syslog_msg = "<"+priority.to_s()+">"+timestamp+" "+sourcehost+" "+@appname+"["+@procid+"]: "+event.message + else + timestamp = DateTime.iso8601(event.sprintf(@timestamp)).rfc3339() + syslog_msg = "<"+priority.to_s()+">1 "+timestamp+" "+sourcehost+" "+@appname+" "+@procid+" "+@msgid+" - "+event.message + end + + begin + connect unless @client_socket + @client_socket.write(syslog_msg + "\n") + rescue => e + @logger.warn(@protocol+" output exception", :host => @host, :port => @port, + :exception => e, :backtrace => e.backtrace) + @client_socket.close + end + end +end diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index f08aea807..b6e168807 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -26,6 +26,14 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base # `client` connects to a server. config :mode, :validate => ["server", "client"], :default => "client" + # The format to use when writing events to the file. This value + # supports any string and can include %{name} and other dynamic + # strings. + # + # If this setting is omitted, the full json representation of the + # event will be written as a single line. + config :message_format, :validate => :string + class Client public def initialize(socket, logger) @@ -89,19 +97,22 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base def receive(event) return unless output?(event) - wire_event = event.to_hash.to_json + "\n" + if @message_format + output = event.sprintf(@message_format) + "\n" + else + output = event.to_hash.to_json + "\n" + end if server? @client_threads.each do |client_thread| - client_thread[:client].write(wire_event) + client_thread[:client].write(output) end @client_threads.reject! {|t| !t.alive? } else begin connect unless @client_socket - @client_socket.write(event.to_hash.to_json) - @client_socket.write("\n") + @client_socket.write(output) rescue => e @logger.warn("tcp output exception", :host => @host, :port => @port, :exception => e, :backtrace => e.backtrace) diff --git a/lib/logstash/runner.rb b/lib/logstash/runner.rb index 6ab8bf335..d8608f6f0 100644 --- a/lib/logstash/runner.rb +++ b/lib/logstash/runner.rb @@ -16,6 +16,41 @@ require "logstash/namespace" require "logstash/program" require "logstash/util" +if ENV["PROFILE_BAD_LOG_CALLS"] + # Set PROFILE_BAD_LOG_CALLS=1 in your environment if you want + # to track down logger calls that cause performance problems + # + # Related research here: + # https://github.com/jordansissel/experiments/tree/master/ruby/logger-string-vs-block + # + # Basically, the following is wastes tons of effort creating objects that are + # never used if the log level hides the log: + # + # logger.debug("something happend", :what => Happened) + # + # This is shown to be 4x faster: + # + # logger.debug(...) if logger.debug? + # + # I originally intended to use RubyParser and SexpProcessor to + # process all the logstash ruby code offline, but it was much + # faster to write this monkeypatch to warn as things are called. + require "cabin/mixins/logger" + module Cabin::Mixins::Logger + LEVELS.keys.each do |level| + m = "original_#{level}".to_sym + predicate = "#{level}?".to_sym + alias_method m, level + define_method(level) do |*args| + if !send(predicate) + warn("Unconditional log call", :location => caller[0]) + end + send(m, *args) + end + end + end +end + class LogStash::Runner include LogStash::Program @@ -129,6 +164,9 @@ class LogStash::Runner return @result end end + + $: << File.expand_path("#{File.dirname(__FILE__)}/../../spec") + require "test_utils" rspec = runner.new(fixedargs) rspec.run @runners << rspec diff --git a/lib/logstash/time.rb b/lib/logstash/time_addon.rb similarity index 95% rename from lib/logstash/time.rb rename to lib/logstash/time_addon.rb index 7017e0272..8034f5dde 100644 --- a/lib/logstash/time.rb +++ b/lib/logstash/time_addon.rb @@ -8,7 +8,7 @@ require "logstash/namespace" # >> LogStash::Time.now.utc.to_iso8601 # => "2010-10-17 07:25:26.788704Z" module LogStash::Time - if RUBY_ENGINE == "jruby" + if defined?(RUBY_ENGINE) && RUBY_ENGINE == "jruby" require "java" DateTime = org.joda.time.DateTime DateTimeZone = org.joda.time.DateTimeZone diff --git a/lib/logstash/web/views/search/results.haml b/lib/logstash/web/views/search/results.haml index 95157b3a8..606d3bb21 100644 --- a/lib/logstash/web/views/search/results.haml +++ b/lib/logstash/web/views/search/results.haml @@ -12,7 +12,7 @@ %i You can click on any search result to see what kind of fields we know about for that event. You can also click on the graph to zoom to that time period. - The query language is that of Lucene's string query (docs). + The query language is that of Lucene's string query (docs). #visual diff --git a/logstash-event.gemspec b/logstash-event.gemspec index 13e9306dd..9e162f04b 100644 --- a/logstash-event.gemspec +++ b/logstash-event.gemspec @@ -15,6 +15,7 @@ Gem::Specification.new do |gem| lib/logstash/namespace.rb lib/logstash/time.rb lib/logstash/version.rb + spec/event.rb LICENSE } @@ -22,4 +23,7 @@ Gem::Specification.new do |gem| gem.name = "logstash-event" gem.require_paths = ["lib"] gem.version = LOGSTASH_VERSION + + gem.add_development_dependency "rspec" + gem.add_development_dependency "insist", "0.0.8" end diff --git a/logstash.gemspec b/logstash.gemspec index 172b46971..02e345eca 100644 --- a/logstash.gemspec +++ b/logstash.gemspec @@ -57,6 +57,8 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "jls-lumberjack", ["0.0.7"] gem.add_runtime_dependency "geoip", [">= 1.1.0"] gem.add_runtime_dependency "beefcake", "0.3.7" + gem.add_runtime_dependency "php-serialize" # For input drupal_dblog + gem.add_runtime_dependency "murmurhash3" gem.add_runtime_dependency "rufus-scheduler" if RUBY_PLATFORM == 'java' @@ -65,8 +67,10 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "jruby-httpclient" gem.add_runtime_dependency "jruby-openssl" gem.add_runtime_dependency "jruby-win32ole" + gem.add_runtime_dependency "jdbc-mysql" # For input drupal_dblog else gem.add_runtime_dependency "excon" + gem.add_runtime_dependency "mysql2" # For input drupal_dblog end if RUBY_VERSION >= '1.9.1' diff --git a/patterns/grok-patterns b/patterns/grok-patterns index 4f9d4229e..2ed0045d1 100755 --- a/patterns/grok-patterns +++ b/patterns/grok-patterns @@ -68,7 +68,7 @@ SECOND (?:(?:[0-5][0-9]|60)(?:[.,][0-9]+)?) TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9]) # datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it) DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR} -DATE_EU %{YEAR}[/-]%{MONTHNUM}[/-]%{MONTHDAY} +DATE_EU %{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY} ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE})) ISO8601_SECOND (?:%{SECOND}|60) TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}? diff --git a/patterns/ruby b/patterns/ruby index f8cbb990a..638274217 100644 --- a/patterns/ruby +++ b/patterns/ruby @@ -1,2 +1,2 @@ RUBY_LOGLEVEL (?:DEBUG|FATAL|ERROR|WARN|INFO) -RUBY_LOGGER [DFEWI], \[%{TIMESTAMP_ISO8601} #{POSINT:pid}\] *%{RUBY_LOGLEVEL} -- %{DATA:progname}: %{DATA:message} +RUBY_LOGGER [DFEWI], \[%{TIMESTAMP_ISO8601:timestamp} #%{POSINT:pid}\] *%{RUBY_LOGLEVEL:loglevel} -- *%{DATA:progname}: %{GREEDYDATA:message} diff --git a/spec/examples/parse-apache-logs-yaml.rb b/spec/examples/parse-apache-logs-yaml.rb new file mode 100644 index 000000000..876d260cd --- /dev/null +++ b/spec/examples/parse-apache-logs-yaml.rb @@ -0,0 +1,63 @@ +require "test_utils" + +describe "apache common log format" do + extend LogStash::RSpec + + # The logstash config goes here. + # At this time, only filters are supported. + config_yaml <<-CONFIG + filter: + - grok: + pattern: "%{COMBINEDAPACHELOG}" + singles: true + - date: + timestamp: "dd/MMM/yyyy:HH:mm:ss Z" + CONFIG + + # Here we provide a sample log event for the testing suite. + # + # Any filters you define above will be applied the same way the logstash + # agent performs. Inside the 'sample ... ' block the 'subject' will be + # a LogStash::Event object for you to inspect and verify for correctness. + sample '198.151.8.4 - - [29/Aug/2012:20:17:38 -0400] "GET /favicon.ico HTTP/1.1" 200 3638 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:14.0) Gecko/20100101 Firefox/14.0.1"' do + + # These 'insist' and 'reject' calls use my 'insist' rubygem. + # See http://rubydoc.info/gems/insist for more info. + + # Require that grok does not fail to parse this event. + reject { subject["@tags"] }.include?("_grokparsefailure") + + # Ensure that grok captures certain expected fields. + insist { subject }.include?("agent") + insist { subject }.include?("bytes") + insist { subject }.include?("clientip") + insist { subject }.include?("httpversion") + insist { subject }.include?("timestamp") + insist { subject }.include?("verb") + insist { subject }.include?("response") + insist { subject }.include?("request") + + # Ensure that those fields match expected values from the event. + insist { subject["clientip"] } == "198.151.8.4" + insist { subject["timestamp"] } == "29/Aug/2012:20:17:38 -0400" + insist { subject["verb"] } == "GET" + insist { subject["request"] } == "/favicon.ico" + insist { subject["httpversion"] } == "1.1" + insist { subject["response"] } == "200" + insist { subject["bytes"] } == "3638" + insist { subject["referrer"] } == '"-"' + insist { subject["agent"] } == "\"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:14.0) Gecko/20100101 Firefox/14.0.1\"" + + # Verify date parsing + insist { subject.timestamp } == "2012-08-30T00:17:38.000Z" + end + + sample '61.135.248.195 - - [26/Sep/2012:11:49:20 -0400] "GET /projects/keynav/ HTTP/1.1" 200 18985 "" "Mozilla/5.0 (compatible; YodaoBot/1.0; http://www.yodao.com/help/webmaster/spider/; )"' do + reject { subject["@tags"] }.include?("_grokparsefailure") + insist { subject["clientip"] } == "61.135.248.195" + end + + sample '72.14.164.185 - - [25/Sep/2012:12:05:02 -0400] "GET /robots.txt HTTP/1.1" 200 - "www.brandimensions.com" "BDFetch"' do + reject { subject["@tags"] }.include?("_grokparsefailure") + end +end diff --git a/spec/filters/anonymize.rb b/spec/filters/anonymize.rb new file mode 100644 index 000000000..94fb06958 --- /dev/null +++ b/spec/filters/anonymize.rb @@ -0,0 +1,185 @@ +require "test_utils" +require "logstash/filters/anonymize" + +describe LogStash::Filters::Anonymize do + extend LogStash::RSpec + + describe "anonymize string with SHA alogrithm" do + # The logstash config goes here. + # At this time, only filters are supported. + config <<-CONFIG + filter { + anonymize { + fields => ["clientip"] + key => "longencryptionkey" + algorithm => 'SHA' + } + } + CONFIG + + sample "@fields" => {"clientip" => "123.123.123.123"} do + insist { subject["clientip"] } == "0d01b2191194d261fa1a2e7c18a38d44953ab4e2" + end + end + + describe "anonymize ipaddress with IPV4_NETWORK algorithm" do + # The logstash config goes here. + # At this time, only filters are supported. + config <<-CONFIG + filter { + anonymize { + fields => ["clientip"] + algorithm => "IPV4_NETWORK" + key => 24 + } + } + CONFIG + + sample "@fields" => {"clientip" => "233.255.13.44"} do + insist { subject["clientip"] } == "233.255.13.0" + end + end + + describe "anonymize string with MURMUR3 algorithm" do + config <<-CONFIG + filter { + anonymize { + fields => ["clientip"] + algorithm => "MURMUR3" + key => "" + } + } + CONFIG + + sample "@fields" => {"clientip" => "123.52.122.33"} do + insist { subject["clientip"] } == 1541804874 + end + end + + describe "anonymize string with SHA1 alogrithm" do + # The logstash config goes here. + # At this time, only filters are supported. + config <<-CONFIG + filter { + anonymize { + fields => ["clientip"] + key => "longencryptionkey" + algorithm => 'SHA1' + } + } + CONFIG + + sample "@fields" => {"clientip" => "123.123.123.123"} do + insist { subject["clientip"] } == "fdc60acc4773dc5ac569ffb78fcb93c9630797f4" + end + end + + describe "anonymize string with SHA224 alogrithm" do + # The logstash config goes here. + # At this time, only filters are supported. + config <<-CONFIG + filter { + anonymize { + fields => ["clientip"] + key => "longencryptionkey" + algorithm => 'SHA224' + } + } + CONFIG + + sample "@fields" => {"clientip" => "123.123.123.123"} do + insist { subject["clientip"] } == "5744bbcc4f64acb6a805b7fee3013a8958cc8782d3fb0fb318cec915" + end + end + + describe "anonymize string with SHA256 alogrithm" do + # The logstash config goes here. + # At this time, only filters are supported. + config <<-CONFIG + filter { + anonymize { + fields => ["clientip"] + key => "longencryptionkey" + algorithm => 'SHA256' + } + } + CONFIG + + sample "@fields" => {"clientip" => "123.123.123.123"} do + insist { subject["clientip"] } == "345bec3eff242d53b568916c2610b3e393d885d6b96d643f38494fd74bf4a9ca" + end + end + + describe "anonymize string with SHA384 alogrithm" do + # The logstash config goes here. + # At this time, only filters are supported. + config <<-CONFIG + filter { + anonymize { + fields => ["clientip"] + key => "longencryptionkey" + algorithm => 'SHA384' + } + } + CONFIG + + sample "@fields" => {"clientip" => "123.123.123.123"} do + insist { subject["clientip"] } == "22d4c0e8c4fbcdc4887d2038fca7650f0e2e0e2457ff41c06eb2a980dded6749561c814fe182aff93e2538d18593947a" + end + end + + describe "anonymize string with SHA512 alogrithm" do + # The logstash config goes here. + # At this time, only filters are supported. + config <<-CONFIG + filter { + anonymize { + fields => ["clientip"] + key => "longencryptionkey" + algorithm => 'SHA512' + } + } + CONFIG + + sample "@fields" => {"clientip" => "123.123.123.123"} do + insist { subject["clientip"] } == "11c19b326936c08d6c50a3c847d883e5a1362e6a64dd55201a25f2c1ac1b673f7d8bf15b8f112a4978276d573275e3b14166e17246f670c2a539401c5bfdace8" + end + end + + describe "anonymize string with MD4 alogrithm" do + # The logstash config goes here. + # At this time, only filters are supported. + config <<-CONFIG + filter { + anonymize { + fields => ["clientip"] + key => "longencryptionkey" + algorithm => 'MD4' + } + } + CONFIG + + sample "@fields" => {"clientip" => "123.123.123.123"} do + insist { subject["clientip"] } == "0845cb571ab3646e51a07bcabf05e33d" + end + end + + describe "anonymize string with MD5 alogrithm" do + # The logstash config goes here. + # At this time, only filters are supported. + config <<-CONFIG + filter { + anonymize { + fields => ["clientip"] + key => "longencryptionkey" + algorithm => 'MD5' + } + } + CONFIG + + sample "@fields" => {"clientip" => "123.123.123.123"} do + insist { subject["clientip"] } == "9336c879e305c9604a3843fc3e75948f" + end + end + +end diff --git a/spec/filters/csv.rb b/spec/filters/csv.rb index 238d7f533..3de9e6be2 100644 --- a/spec/filters/csv.rb +++ b/spec/filters/csv.rb @@ -38,6 +38,21 @@ describe LogStash::Filters::CSV do end end + describe "custom separator" do + config <<-CONFIG + filter { + csv { + separator => ";" + } + } + CONFIG + + sample "big,bird;sesame street" do + insist { subject["field1"] } == "big,bird" + insist { subject["field2"] } == "sesame street" + end + end + describe "parse csv with more data than defined field names" do config <<-CONFIG filter { diff --git a/spec/filters/date.rb b/spec/filters/date.rb index c2381a1d3..a4dd2a827 100644 --- a/spec/filters/date.rb +++ b/spec/filters/date.rb @@ -161,7 +161,28 @@ describe LogStash::Filters::Date do end end - describe "accept match config option with hash value like grep (LOGSTASH-735)" do + describe "TAI64N support" do + config <<-'CONFIG' + filter { + date { + t => TAI64N + } + } + CONFIG + + # Try without leading "@" + sample({ "@fields" => { "t" => "4000000050d506482dbdf024" } }) do + insist { subject.timestamp } == "2012-12-22T01:00:46.767Z" + end + + # Should still parse successfully if it's a full tai64n time (with leading + # '@') + sample({ "@fields" => { "t" => "@4000000050d506482dbdf024" } }) do + insist { subject.timestamp } == "2012-12-22T01:00:46.767Z" + end + end + + describe "accept match config option with hash value (LOGSTASH-735)" do config <<-CONFIG filter { date { @@ -179,3 +200,4 @@ describe LogStash::Filters::Date do end end end + diff --git a/spec/filters/kv.rb b/spec/filters/kv.rb index 23214c8d6..7a3b27bbc 100644 --- a/spec/filters/kv.rb +++ b/spec/filters/kv.rb @@ -24,6 +24,19 @@ describe LogStash::Filters::KV do end + describe "LOGSTASH-624: allow escaped space in key or value " do + config <<-CONFIG + filter { + kv { value_split => ':' } + } + CONFIG + + sample 'IKE:=Quick\ Mode\ completion IKE\ IDs:=subnet:\ x.x.x.x\ (mask=\ 255.255.255.254)\ and\ host:\ y.y.y.y' do + insist { subject["IKE"] } == '=Quick\ Mode\ completion' + insist { subject['IKE\ IDs'] } == '=subnet:\ x.x.x.x\ (mask=\ 255.255.255.254)\ and\ host:\ y.y.y.y' + end + end + describe "test value_split" do config <<-CONFIG filter { @@ -61,6 +74,20 @@ describe LogStash::Filters::KV do end + describe "delimited fields should override space default (reported by LOGSTASH-733)" do + config <<-CONFIG + filter { + kv { field_split => "|" } + } + CONFIG + + sample "field1=test|field2=another test|field3=test3" do + insist { subject["field1"] } == "test" + insist { subject["field2"] } == "another test" + insist { subject["field3"] } == "test3" + end + end + describe "test prefix" do config <<-CONFIG filter { diff --git a/spec/filters/mutate.rb b/spec/filters/mutate.rb index c94b556e4..5f31b6a62 100644 --- a/spec/filters/mutate.rb +++ b/spec/filters/mutate.rb @@ -133,7 +133,7 @@ describe LogStash::Filters::Mutate do end end - describe "regression - check grok+mutate" do + describe "regression - mutate should lowercase a field created by grok" do config <<-CONFIG filter { grok { @@ -149,4 +149,20 @@ describe LogStash::Filters::Mutate do insist { subject["foo"] } == ['hello'] end end + + describe "LOGSTASH-757: rename should do nothing with a missing field" do + config <<-CONFIG + filter { + mutate { + rename => [ "nosuchfield", "hello" ] + } + } + CONFIG + + sample "whatever" do + reject { subject.fields }.include?("nosuchfield") + reject { subject.fields }.include?("hello") + end + end end + diff --git a/spec/inputs/tcp.rb b/spec/inputs/tcp.rb new file mode 100644 index 000000000..044d53810 --- /dev/null +++ b/spec/inputs/tcp.rb @@ -0,0 +1,210 @@ +# coding: utf-8 +require "test_utils" +require "socket" + +# Not sure why but each test need a different port +# TODO: timeout around the thread.join +describe "inputs/tcp" do + extend LogStash::RSpec + + describe "read json_event" do + + event_count = 10 + port = 5511 + config <<-CONFIG + input { + tcp { + type => "blah" + port => #{port} + format => "json_event" + } + } + CONFIG + + th = Thread.current + input do |plugins| + sequence = 0 + tcp = plugins.first + output = Shiftback.new do |event| + sequence += 1 + tcp.teardown if sequence == event_count + begin + insist { event["sequence"] } == sequence -1 + insist { event["message"]} == "Hello ü Û" + insist { event["message"].encoding } == Encoding.find("UTF-8") + rescue Exception => failure + # Get out of the threads nets + th.raise failure + end + end + #Prepare input + tcp.register + #Run input in a separate thread + thread = Thread.new(tcp, output) do |*args| + tcp.run(output) + end + #Send events from clients sockets + event_count.times do |value| + client_socket = TCPSocket.new("0.0.0.0", port) + event = LogStash::Event.new("@fields" => { "message" => "Hello ü Û", "sequence" => value }) + client_socket.puts event.to_json + client_socket.close + # micro sleep to ensure sequencing + sleep(0.1) + end + #wait for input termination + thread.join + end # input + end + + describe "read plain events with system defaults, should works on UTF-8 system" do + event_count = 10 + port = 5512 + config <<-CONFIG + input { + tcp { + type => "blah" + port => #{port} + } + } + CONFIG + + th = Thread.current + input do |plugins| + sequence = 0 + tcp = plugins.first + output = Shiftback.new do |event| + sequence += 1 + tcp.teardown if sequence == event_count + begin + insist { event.message } == "Hello ü Û" + insist { event.message.encoding } == Encoding.find("UTF-8") + rescue Exception => failure + # Get out of the threads nets + th.raise failure + end + end + + tcp.register + #Run input in a separate thread + thread = Thread.new(tcp, output) do |*args| + tcp.run(output) + end + #Send events from clients sockets + event_count.times do |value| + client_socket = TCPSocket.new("0.0.0.0", port) + client_socket.write "Hello ü Û" + client_socket.close + # micro sleep to ensure sequencing + sleep(0.1) + end + #wait for input termination + thread.join + end # input + end + + describe "read plain events with UTF-8 like charset, to prove that something is wrong with previous failing test" do + event_count = 10 + port = 5514 + config <<-CONFIG + input { + tcp { + type => "blah" + port => #{port} + charset => "CP65001" #that's just an alias of UTF-8 + } + } + CONFIG + + th = Thread.current + # Catch aborting reception threads + input do |plugins| + sequence = 0 + tcp = plugins.first + output = Shiftback.new do |event| + sequence += 1 + tcp.teardown if sequence == event_count + begin + insist { event.message } == "Hello ü Û" + insist { event.message.encoding } == Encoding.find("UTF-8") + rescue Exception => failure + # Get out of the threads nets + th.raise failure + end + end + + tcp.register + #Run input in a separate thread + + thread = Thread.new(tcp, output) do |*args| + tcp.run(output) + end + #Send events from clients sockets + event_count.times do |value| + client_socket = TCPSocket.new("0.0.0.0", port) + # puts "Encoding of client", client_socket.external_encoding, client_socket.internal_encoding + client_socket.write "Hello ü Û" + client_socket.close + # micro sleep to ensure sequencing, TODO must think of a cleaner solution + sleep(0.1) + end + #wait for input termination + #TODO: timeout + thread.join + end # input + end + + describe "read plain events with ISO-8859-1 charset" do + event_count = 10 + port = 5513 + charset = "ISO-8859-1" + config <<-CONFIG + input { + tcp { + type => "blah" + port => #{port} + charset => "#{charset}" + } + } + CONFIG + + th = Thread.current + input do |plugins| + sequence = 0 + tcp = plugins.first + output = Shiftback.new do |event| + sequence += 1 + tcp.teardown if sequence == event_count + begin + insist { event.message } == "Hello ü Û" + insist { event.message.encoding } == Encoding.find("UTF-8") + rescue Exception => failure + # Get out of the threads nets + th.raise failure + end + end + + tcp.register + #Run input in a separate thread + + thread = Thread.new(tcp, output) do |*args| + tcp.run(output) + end + #Send events from clients sockets + event_count.times do |value| + client_socket = TCPSocket.new("0.0.0.0", port) + #Force client encoding + client_socket.set_encoding(charset) + client_socket.write "Hello ü Û" + client_socket.close + # micro sleep to ensure sequencing + sleep(0.1) + end + #wait for input termination + thread.join + end # input + end +end + + + diff --git a/spec/test_utils.rb b/spec/test_utils.rb index 6f3380b33..ec908b9a3 100644 --- a/spec/test_utils.rb +++ b/spec/test_utils.rb @@ -1,4 +1,5 @@ require "insist" +require "logstash/agent" require "logstash/event" require "insist" require "stud/try" @@ -18,6 +19,11 @@ module LogStash @config_str = configstr end # def config + def config_yaml(configstr) + @config_str = configstr + @is_yaml = true + end + def type(default_type) @default_type = default_type end @@ -30,8 +36,7 @@ module LogStash def sample(event, &block) default_type = @default_type || "default" default_tags = @default_tags || [] - require "logstash/config/file" - config = LogStash::Config::File.new(nil, @config_str) + config = get_config agent = LogStash::Agent.new @inputs, @filters, @outputs = agent.instance_eval { parse_config(config) } [@inputs, @filters, @outputs].flatten.each do |plugin| @@ -95,8 +100,7 @@ module LogStash end # def sample def input(&block) - require "logstash/config/file" - config = LogStash::Config::File.new(nil, @config_str) + config = get_config agent = LogStash::Agent.new it "looks good" do inputs, filters, outputs = agent.instance_eval { parse_config(config) } @@ -104,6 +108,16 @@ module LogStash end end # def input + def get_config + if @is_yaml + require "logstash/config/file/yaml" + config = LogStash::Config::File::Yaml.new(nil, @config_str) + else + require "logstash/config/file" + config = LogStash::Config::File.new(nil, @config_str) + end + end # def get_config + def agent(&block) @agent_count ||= 0 require "logstash/agent"