mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
- Enhance the test example to show the ease of reading from local files
or simply reading parsed messages from an AMQP topic with the same code and only the configuration changed.
This commit is contained in:
parent
cf06e05019
commit
d0c4ccbfdc
2 changed files with 39 additions and 18 deletions
|
@ -1,4 +1,7 @@
|
|||
#!/usr/bin/env ruby
|
||||
#
|
||||
# How to trigger the 'evil ip' message:
|
||||
# % logger -t "pantscon" "naughty host 14.33.24.55 $RANDOM"
|
||||
|
||||
require "rubygems"
|
||||
require "eventmachine"
|
||||
|
@ -6,17 +9,11 @@ require "lib/components/agent"
|
|||
require "ap"
|
||||
|
||||
class MyAgent < LogStash::Components::Agent
|
||||
def initialize
|
||||
super({
|
||||
"input" => [
|
||||
"amqp://localhost/topic/parsed",
|
||||
]
|
||||
})
|
||||
end # def initialize
|
||||
|
||||
def receive(event)
|
||||
filter(event) # Invoke any filters
|
||||
|
||||
return unless event["progname"][0] == "pantscon"
|
||||
return unless event["message"] =~ /naughty host/
|
||||
return unless event.message =~ /naughty host/
|
||||
event["IP"].each do |ip|
|
||||
next unless ip.length > 0
|
||||
puts "Evil IP: #{ip}"
|
||||
|
@ -24,5 +21,20 @@ class MyAgent < LogStash::Components::Agent
|
|||
end # def receive
|
||||
end # class MyAgent
|
||||
|
||||
agent = MyAgent.new
|
||||
# Read a local file, parse it, and react accordingly (see MyAgent#receive)
|
||||
agent = MyAgent.new({
|
||||
"input" => [
|
||||
"/var/log/messages",
|
||||
],
|
||||
"filter" => [ "grok" ],
|
||||
})
|
||||
agent.run
|
||||
|
||||
# Read messages that we expect to be parsed by another agent. Reads
|
||||
# a particular AMQP topic for messages
|
||||
#agent = MyAgent.new({
|
||||
#"input" => [
|
||||
#"amqp://localhost/topic/parsed",
|
||||
#]
|
||||
#})
|
||||
#agent.run
|
||||
|
|
|
@ -58,16 +58,25 @@ class LogStash::Components::Agent
|
|||
end # EventMachine.run
|
||||
end # def run
|
||||
|
||||
protected
|
||||
def filter(event)
|
||||
@filters.each do |f|
|
||||
# TODO(sissel): Add ability for a filter to cancel/drop a message
|
||||
f.filter(event)
|
||||
end
|
||||
end # def filter
|
||||
|
||||
protected
|
||||
def output(event)
|
||||
@outputs.each do |o|
|
||||
o.receive(event)
|
||||
end # each output
|
||||
end # def output
|
||||
|
||||
protected
|
||||
# Process a message
|
||||
def receive(event)
|
||||
@filters.each do |filter|
|
||||
# TODO(sissel): Add ability for a filter to cancel/drop a message
|
||||
filter.filter(event)
|
||||
end
|
||||
|
||||
@outputs.each do |output|
|
||||
output.receive(event)
|
||||
end # each output
|
||||
filter(event)
|
||||
output(event)
|
||||
end # def input
|
||||
end # class LogStash::Components::Agent
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue