Merge branch 'master' of github.com:jordansissel/logstash

This commit is contained in:
Jordan Sissel 2010-11-22 15:17:52 -08:00
commit 22e5af6fee
22 changed files with 533 additions and 28 deletions

View file

@ -11,6 +11,8 @@ inputs:
- /var/log/apache2/access.log
apache-error:
- /var/log/apache2/error.log
testing:
- /tmp/logstashtest.log
filters:
- grok:
linux-syslog: # for logs of type 'linux-syslog'
@ -22,6 +24,12 @@ filters:
nagios:
patterns:
- %{NAGIOSLOGLINE}
loggly:
patterns:
- %{JAVASTACKTRACEPART}
testing:
patterns:
- %{JAVASTACKTRACEPART}
- date:
linux-syslog: # for logs of type 'linux-syslog'
# Look for a field 'timestamp' with this format, parse and it for the timestamp
@ -32,6 +40,13 @@ filters:
timestamp: "%d/%b/%Y:%H:%M:%S %Z"
nagios:
epochtime: %s
- multiline:
supervisorlogs:
pattern: ^\s
what: previous
testing:
pattern: ^\s
what: previous
outputs:
- stdout:///
#- elasticsearch://localhost:9200/logstash/all

View file

@ -0,0 +1,56 @@
#!/usr/bin/env ruby
$: << "lib"
require "rubygems"
require "logstash/agent"
require "yaml"
collector_config = YAML.load <<"YAML"
---
inputs:
foo:
- internal:///
outputs:
- amqp://localhost/topic/logstash/testing
YAML
receiver_config = YAML.load <<"YAML"
---
inputs:
foo:
- amqp://localhost/topic/logstash/testing
outputs:
- internal:///
YAML
collector_agent = LogStash::Agent.new(collector_config)
receiver_agent = LogStash::Agent.new(receiver_config)
data = ["hello world", "foo", "bar"]
EventMachine::run do
receiver_agent.register
collector_agent.register
EM::next_tick do
# Register output callback on the receiver
receiver_agent.outputs\
.find { |o| o.is_a?(LogStash::Outputs::Internal) }\
.callback do |event|
puts event
#puts expect.first == event.message
#expect.shift
#agent.stop
end
EM::next_tick do
# Send input to the collector
expect = data.clone
input = collector_agent.inputs\
.find { |i| i.is_a?(LogStash::Inputs::Internal) }
channel = input.channel
data.each { |message| channel.push(message) }
end
end
end

View file

@ -9,6 +9,9 @@ require "logstash/logging"
# Collect logs, ship them out.
class LogStash::Agent
attr_reader :config
attr_reader :inputs
attr_reader :outputs
attr_reader :filters
def initialize(config)
@logger = LogStash::Logger.new(STDERR)
@ -26,7 +29,7 @@ class LogStash::Agent
# Register any event handlers with EventMachine
# Technically, this agent could listen for anything (files, sockets, amqp,
# stomp, etc).
protected
public
def register
# TODO(sissel): warn when no inputs and no outputs are defined.
# TODO(sissel): Refactor this madness into a config lib
@ -80,12 +83,20 @@ class LogStash::Agent
end # def register
public
def run
def run(&block)
EventMachine.run do
self.register
yield if block_given?
end # EventMachine.run
end # def run
public
def stop
# TODO(sissel): Stop inputs, fluch outputs, wait for finish,
# then stop the event loop
EventMachine.stop_event_loop
end
protected
def filter(event)
@filters.each do |f|

View file

@ -66,5 +66,26 @@ module LogStash; class Event
def to_hash; return @data end # def to_hash
def overwrite(event)
@data = event.to_hash
end
def include?(key); return @data.include?(key) end
# Append an event to this one.
def append(event)
self.message += "\n" + event.message
self.tags |= event.tags
# Append all fields
event.fields.each do |name, value|
if self.fields.include?(name)
puts "Merging field #{name}"
self.fields[name] |= value
else
puts "Setting field #{name}"
self.fields[name] = value
end
end # event.fields.each
end
end; end # class LogStash::Event

View file

@ -0,0 +1,158 @@
# multiline filter
#
# This filter will collapse multiline messages into a single event.
#
require "logstash/filters/base"
class LogStash::Filters::Multiline < LogStash::Filters::Base
# The 'date' filter will take a value from your event and use it as the
# event timestamp. This is useful for parsing logs generated on remote
# servers or for importing old logs.
#
# The config looks like this:
#
# filters:
# - multiline:
# <type>:
# pattern: <regexp>
# what: next
# <type>
# pattern: <regexp>
# what: previous
#
# The 'regexp' should match what you believe to be an indicator that
# the field is part of a multi-line event
#
# The 'what' must be "previous" or "next" and indicates the relation
# to the multi-line event.
#
# For example, java stack traces are multiline and usually have the message
# starting at the far-left, then each subsequent line indented. Do this:
#
# filters:
# - multiline:
# somefiletype:
# pattern: /^\s/
# what: previous
#
# This says that any line starting with whitespace belongs to the previous line.
#
# Another example is C line continuations (backslash). Here's how to do that:
#
# filters:
# - multiline:
# somefiletype:
# pattern: /\\$/
# what: next
#
def initialize(config = {})
super
@types = Hash.new { |h,k| h[k] = [] }
@pending = Hash.new
end # def initialize
def register
@config.each do |type, typeconfig|
# typeconfig will be a hash containing 'pattern' and 'what'
@logger.debug "Setting type #{type.inspect} to the config #{typeconfig.inspect}"
raise "type \"#{type}\" defined more than once" unless @types[type].empty?
@types[type] = typeconfig
if !typeconfig.include?("pattern")
@logger.fatal(["'multiline' filter config for type #{type} is missing" \
" 'pattern' setting", typeconfig])
end
if !typeconfig.include?("what")
@logger.fatal(["'multiline' filter config for type #{type} is missing" \
" 'what' setting", typeconfig])
end
if !["next", "previous"].include?(typeconfig["what"])
@logger.fatal(["'multiline' filter config for type #{type} has " \
"invalid 'what' value. Must be 'next' or 'previous'",
typeconfig])
end
begin
typeconfig["pattern"] = Regexp.new(typeconfig["pattern"])
rescue RegexpError => e
@logger.fatal(["Invalid pattern for multiline filter on type '#{type}'",
typeconfig, e])
end
end # @config.each
end # def register
def filter(event)
return unless @types.member?(event.type)
typeconfig = @types[event.type]
match = typeconfig["pattern"].match(event.message)
key = [event.source, event.type]
pending = @pending[key]
@logger.info(["Reg: ", typeconfig["pattern"], event.message, match])
case typeconfig["what"]
when "previous"
if match
event.tags |= ["multiline"]
# previous previous line is part of this event.
# append it to the event and cancel it
if pending
pending.append(event)
else
@pending[key] = event
end
event.cancel
else
# this line is not part of the previous event
# if we have a pending event, it's done, send it.
# put the current event into pending
if pending
tmp = event.to_hash
event.overwrite(pending)
@pending[key] = LogStash::Event.new(tmp)
else
@pending[key] = event
event.cancel
end # if/else pending
end # if/else match
when "next"
if match
event.tags |= ["multiline"]
# this line is part of a multiline event, the next
# line will be part, too, put it into pending.
if pending
pending.append(event)
else
@pending[key] = event
end
event.cancel
else
# if we have something in pending, join it with this message
# and send it. otherwise, this is a new message and not part of
# multiline, send it.
if pending
pending.append(event)
event.overwrite(pending.to_hash)
@pending.delete(key)
end
end # if/else match
else
@logger.warn(["Unknown multiline 'what' value.", typeconfig])
end # case typeconfig["what"]
#end # @types[event.type].each
end # def filter
# flush any pending messages
def flush(source, type)
key = [source, type]
if @pending[key]
event = @pending[key]
@pending.delete(key)
end
return event
end
end # class LogStash::Filters::Date

View file

@ -23,7 +23,7 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
end
def register
@logger.info("Registering #{@url}")
@logger.info("Registering input #{@url}")
@amqp = AMQP.connect(:host => @url.host)
@mq = MQ.new(@amqp)
@target = nil

View file

@ -12,6 +12,7 @@ class LogStash::Inputs::File < LogStash::Inputs::Base
end
def register
@logger.info("Registering #{@url}")
EventMachine::FileGlobWatchTail.new(@url.path, Reader, interval=60,
exclude=[], receiver=self)
end # def register

View file

@ -0,0 +1,36 @@
require "logstash/inputs/base"
require "eventmachine-tail"
require "socket" # for Socket.gethostname
class LogStash::Inputs::Internal < LogStash::Inputs::Base
attr_reader :channel
def initialize(url, type, config={}, &block)
super
# Default host to the machine's hostname if it's not set
@url.host ||= Socket.gethostname
@channel = EventMachine::Channel.new
end
def register
@logger.info("Registering input #{@url}")
@channel.subscribe do |event|
receive(event)
end
end # def register
def receive(event)
if !event.is_a?(LogStash::Event)
event = LogStash::Event.new({
"@message" => event,
"@type" => @type,
"@tags" => @tags.clone,
"@source" => @url,
})
end
@logger.debug(["Got event", event])
@callback.call(event)
end # def receive
end # class LogStash::Inputs::Internal

View file

@ -18,6 +18,9 @@ class LogStash::Logger < Logger
# Set default loglevel to WARN unless $DEBUG is set (run with 'ruby -d')
self.level = $DEBUG ? Logger::DEBUG: Logger::INFO
if ENV["LOGSTASH_DEBUG"]
self.level = Logger::DEBUG
end
# Conditional support for awesome_print
if !@@have_awesome_print && @@notify_awesome_print_load_failed

View file

@ -19,6 +19,7 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
end # def initialize
def register
@logger.info("Registering output #{@url}")
@amqp = AMQP.connect(:host => @url.host)
@mq = MQ.new(@amqp)
@target = nil

View file

@ -29,7 +29,7 @@ class LogStash::Outputs::Gelf < LogStash::Outputs::Base
next if value == nil or value.empty?
gelf.add_additional name, value
end
gelf.add_additional "@timestamp", event.timestamp
gelf.add_additional "event_timestamp", event.timestamp
gelf.send
end # def event
end # class LogStash::Outputs::Gelf

View file

@ -0,0 +1,30 @@
require "logstash/outputs/base"
class LogStash::Outputs::Internal < LogStash::Outputs::Base
def initialize(url, config={}, &block)
super
@callback = block
end
def register
@logger.info("Registering output #{@url}")
end # def register
def receive(event)
if !@callback
@logger.error("No callback for output #{@url}, cannot receive")
return
end
@callback.call(event)
end # def event
# Set the callback by passing a block of code
def callback(&block)
@callback = block
end
# Set the callback by passing a proc object
def callback=(proc_block)
@callback = proc_block
end
end # class LogStash::Outputs::Internal

View file

@ -0,0 +1,23 @@
require "logstash/outputs/base"
class LogStash::Outputs::Tcp < LogStash::Outputs::Base
def initialize(url, config={}, &block)
super
end
def register
# TODO(sissel): Write generic validation methods
if !@url.host or !@url.port
@logger.fatal("No host or port given in #{self.class}: #{@url}")
# TODO(sissel): Make this an actual exception class
raise "configuration error"
end
@connection = EventMachine::connect(@url.host, @url.port)
end # def register
def receive(event)
@connection.send_data(event.to_hash.to_json)
@connection.send_data("\n")
end # def receive
end # class LogStash::Outputs::Tcp

View file

@ -46,8 +46,10 @@ class LogStash::Web::ElasticSearch
data["duration"] = Time.now - start_time
# TODO(sissel): Plugin-ify this (Search filters!)
require "digest/md5"
data["hits"]["hits"].each do |hit|
# Search anonymization
#require "digest/md5"
#data["hits"]["hits"].each do |hit|
[].each do |hit|
event = LogStash::Event.new(hit["_source"])
event.to_hash.each do |key, value|
next unless value.is_a?(String)
@ -73,7 +75,7 @@ class LogStash::Web::ElasticSearch
end
req.errback do
@logger.warn(["Query failed", params, req.response])
yield :failure
yield({ "error" => req.response })
end
end # def search
end

View file

@ -88,7 +88,7 @@
var a = params[p].split("=");
var key = a[0]
var value = a[1]
logstash.params[key] = value
logstash.params[key] = unescape(value)
}
logstash.search(logstash.params.q)
return false;
@ -108,7 +108,6 @@
$(result_row_selector).live("click", function() {
var data = eval($("td.message", this).data("full"));
console.log(event);
/* Apply template to the dialog */
var query = $("#query").val().replace(/^\s+|\s+$/g, "")
var sanitize = function(str) {
@ -121,10 +120,12 @@
var template = $.template("inspector",
"<li>" +
"<b>(${type}) ${field}</b>:" +
"<a href='/search?q=" + query + " ${escape(field)}:${$item.sanitize(value)}'" +
" data-field='${escape(field)}' data-value='${$item.sanitize(value)}'>" +
"${value}" +
"</a>" +
"{{each(idx, val) value}}" +
"<a href='/search?q=" + query + " ${escape(field)}:${$item.sanitize(val)}'" +
" data-field='${escape(field)}' data-value='${$item.sanitize(val)}'>" +
"${val}" +
"</a>, " +
"{{/each}}" +
"</li>");
/* TODO(sissel): recurse through the data */
@ -134,12 +135,19 @@
if (/^[, ]*$/.test(value)) {
continue; /* Skip empty data fields */
}
if (!(value instanceof Array)) {
value = [value];
}
fields.push( { type: "field", field: i, value: value })
}
for (var i in data._source) {
if (i == "@fields") continue;
var value = data._source[i]
if (!(value instanceof Array)) {
value = [value];
}
if (i.charAt(0) == "@") { /* metadata */
fields.push( { type: "metadata", field: i, value: value });
} else { /* data */
@ -154,7 +162,11 @@
if (i == "_source") {
continue; /* already processed this one */
}
fields.push( { type: "metadata", field: i, value: data[i] })
value = data[i]
if (!(value instanceof Array)) {
value = [value];
}
fields.push( { type: "metadata", field: i, value: value })
}
fields.sort(function(a, b) {

View file

@ -49,11 +49,21 @@ class LogStash::Web::Server < Sinatra::Base
count = params["count"] = (params["count"] or 50).to_i
offset = params["offset"] = (params["offset"] or 0).to_i
elasticsearch.search(params) do |@results|
#p instance_variables
if @results.include?("error")
body haml :"search/error", :layout => !request.xhr?
next
end
@hits = (@results["hits"]["hits"] rescue [])
@total = (@results["hits"]["total"] rescue 0)
@graphpoints = []
@results["facets"]["by_hour"]["entries"].each do |entry|
@graphpoints << [entry["key"], entry["count"]]
begin
@results["facets"]["by_hour"]["entries"].each do |entry|
@graphpoints << [entry["key"], entry["count"]]
end
rescue => e
puts e
end
if count and offset
@ -69,12 +79,21 @@ class LogStash::Web::Server < Sinatra::Base
next_params = params.clone
next_params["offset"] = [offset + count, @total - count].min
@next_href = "?" + next_params.collect { |k,v| [URI.escape(k.to_s), URI.escape(v.to_s)].join("=") }.join("&")
last_params = next_params.clone
last_params["offset"] = @total - offset
@last_href = "?" + last_params.collect { |k,v| [URI.escape(k.to_s), URI.escape(v.to_s)].join("=") }.join("&")
end
if offset > 0
prev_params = params.clone
prev_params["offset"] = [offset - count, 0].max
@prev_href = "?" + prev_params.collect { |k,v| [URI.escape(k.to_s), URI.escape(v.to_s)].join("=") }.join("&")
if prev_params["offset"] > 0
first_params = prev_params.clone
first_params["offset"] = 0
@first_href = "?" + first_params.collect { |k,v| [URI.escape(k.to_s), URI.escape(v.to_s)].join("=") }.join("&")
end
end
body haml :"search/ajax", :layout => !request.xhr?

View file

@ -14,6 +14,9 @@
%strong
Results #{@result_start} - #{@result_end} of #{@total}
|
- if @first_href
%a.pager{ :href => @first_href } first
|
- if @prev_href
%a.pager{ :href => @prev_href }
prev
@ -22,16 +25,22 @@
- if @next_href
%a.pager{ :href => @next_href }
next
%table.results
%tr
%th timestamp
%th event
- @hits.reverse.each do |hit|
%tr.event
%td.timestamp&= hit["_source"]["@timestamp"]
%td.message{ :"data-full" => hit.to_json }&= hit["_source"]["@message"]
- if @last_href
|
%a.pager{ :href => @last_href }
last
- if @hits.length == 0
- if !params[:q]
No query given. How about <a href="?q=*" class="querychanger">this?</a>
- else
No results for query '#{params[:q]}'
- else
%table.results
%tr
%th timestamp
%th event
- @hits.reverse.each do |hit|
%tr.event
%td.timestamp&= hit["_source"]["@timestamp"]
%td.message{ :"data-full" => hit.to_json }
%pre&= hit["_source"]["@message"]

View file

@ -0,0 +1,3 @@
#error
%h4 The query '#{params["q"]}' resulted the following error:
%pre&= @results["error"]

View file

@ -22,13 +22,18 @@ body
margin-top: 1em
#content table.results
font-family: monospace
#content td.event
#content td.message
vertical-align: top
padding: 1px
padding-bottom: 3px
white-space: pre-wrap
pre
white-space: pre-wrap
margin: 0
#content td.timestamp
white-space: nowrap
font-size: 85%
text-align: top
padding: 1px
//font-size: 85%
vertical-align: top
#content tr.selected
background-color: #FCE69D !important
#content tr.event:nth-child(2n)
@ -41,6 +46,8 @@ body
background-color: pink
border: 1px solid red
padding: 3px
pre
white-space: pre-wrap
#error h1
font-size: 130%
padding: 0

3
patterns/java Normal file
View file

@ -0,0 +1,3 @@
JAVACLASS (?:[a-zA-Z0-9-]+\.)+[A-Za-z0-9]+
JAVAFILE (?:[A-Za-z0-9_.-]+)
JAVASTACKTRACEPART at %{JAVACLASS:class}\.%{WORD:method}\(%{JAVAFILE:file}:%{NUMBER:line}\)

View file

@ -0,0 +1,35 @@
Feature: multiline filter
In order to ensure multiline filter is working
Events matching the multiline filter should be joined
Scenario: whitespace-leading lines (like java stack traces)
Given a multiline pattern of "^\s"
And a multiline what of "previous"
# We use quotes wrap lines here because cucumber will trim the whitespace
# otherwise
When the inputs are
|hello world|
|" continued!"|
|one|
|two|
|" two again"|
Then the event message should be
|hello world\n continued!|
|one|
|two\n two again|
Scenario: '...' continuation with next
Given a multiline pattern of "\.\.\.$"
And a multiline what of "next"
# We use quotes wrap lines here because cucumber will trim the whitespace
# otherwise
When the inputs are
|hello world... |
|" continued!"|
|one|
|two...|
|" two again"|
Then the event message should be
|hello world...\n continued!|
|one|
|two...\n two again|

View file

@ -0,0 +1,60 @@
$:.unshift("#{File.dirname(__FILE__)}/../../../lib")
require "logstash/event"
require "logstash/filters/multiline"
require "test/unit"
include Test::Unit::Assertions
Given /^a multiline pattern of "([^"]*)"$/ do |pattern|
@pattern = pattern
end
Given /^a multiline what of "([^"]*)"$/ do |what|
@what = what
end
When /^the inputs are$/ do |table|
@eventtype = "testtype"
@multiline = LogStash::Filters::Multiline.new({
@eventtype => {
"pattern" => @pattern,
"what" => @what
}
})
@multiline.register
@events = []
@source = "file:///test"
table.raw.each do |row|
# We are whitespace sensitive, and cucumber trims whitespace on
# table data sets, so let's hack around it with quotes.
message = row.first.gsub(/^"/, "").gsub(/"$/, "")
event = LogStash::Event.new({})
event.message = message
event.source = @source
event.type = @eventtype
@multiline.filter(event)
if !event.cancelled?
@events << event
end
end
lastevent = @multiline.flush(@source, @eventtype)
if lastevent
@events << lastevent
end
end
Then /^the event message should be$/ do |table|
# table is a Cucumber::Ast::Table
assert_equal(table.raw.length, @events.length,
"Should have #{table.raw.length} events, got #{@events.length}")
event_idx = 0
table.raw.each do |row|
message = row.first.gsub(/\\n/, "\n")
assert_equal(message, @events[event_idx].message, "Wrong message")
event_idx += 1
end
end
1