- start working on splitting things in to components. I want it to be possible

to run a single agent that does log reading, parsing, and indexing locally.
This commit is contained in:
Jordan Sissel 2010-10-17 03:19:45 +00:00
parent ae21f3d7c0
commit d3280be19b
2 changed files with 55 additions and 56 deletions

47
lib/components/agent.rb Normal file
View file

@ -0,0 +1,47 @@
require "eventmachine"
require "eventmachine-tail"
class Reader < EventMachine::FileTail
def initialize(path, agent)
super(path)
@agent = agent
@buffer = BufferedTokenizer.new # From eventmachine
end
def receive_data(data)
# TODO(sissel): Support multiline log data
@buffer.extract(data).each do |line|
# Package it up into an event object before passing it along.
@agent.process(path, line)
end
end # def receive_data
end # class Reader
# Collect logs, ship them out.
module LogStash; module Components; class Agent
attr_reader :config
def initialize(config)
@config = config
# Config should have:
# - list of logs to monitor
# - log config
# - where to ship to
end # def initialize
# Register any event handlers with EventMachine
# Technically, this agent could listen for anything (files, sockets, amqp,
# stomp, etc).
def register
@config["logs"].each do |path|
EventMachine::FileGlobWatchTail.new(path, Reader, interval=60,
exclude=[], agent=self)
end # each log
end # def register
# Process a message
def process(source, message)
puts "#{source}: #{message}"
end # def process
end; end; end; # class LogStash::Components::Agent

View file

@ -11,7 +11,7 @@ require 'em-http'
PROGRESS_AMOUNT = 500
class GrokReader < EventMachine::FileTail
class Reader < EventMachine::FileTail
def initialize(path, agent)
super(path)
@agent = agent
@ -23,8 +23,7 @@ class GrokReader < EventMachine::FileTail
@agent.process(path, line)
end
end # def receive_data
end # class GrokReader
end # class Reader
module LogStash; module Programs;
class Agent < LogStash::Program
@ -76,68 +75,21 @@ module LogStash; module Programs;
if !matched
puts "nomatch in #{path}: #{line}"
end
end # def process
private
def index(name, entry)
@indexes[name].index(entry)
#logstash_index(name, entry)
#elastic_index(name, entry)
end
def logstash_index(name, entry)
@index.index(entry)
@count += 1
if @count % PROGRESS_AMOUNT == 0
#flush_indexes
#puts "match #{name} in #{path}: #{line}"
puts "count: #{@count} #{PROGRESS_AMOUNT / (Time.now - @start)}"
@start = Time.now
end
end
def elastic_index(name, entry)
http = EventMachine::HttpRequest.new("http://localhost:9200/logstash/#{name}")
req = http.post :body => entry.to_json
@count += 1
puts "count: #{@count} #{PROGRESS_AMOUNT / (Time.now - @start)}"
req.callback do
if @count % PROGRESS_AMOUNT == 0
@start = Time.now
end
end
end
def ferret_index(name, entry)
@indexes[name] << entry
@needs_flushing << name
@count += 1
if @count % PROGRESS_AMOUNT == 0
#flush_indexes
#puts "match #{name} in #{path}: #{line}"
puts "count: #{@count} #{AMOUNT / (Time.now - @start)}"
@start = Time.now
end
def publish(name, entry)
# publish the entry
end
private
def setup_watches
#handler = EventMachine::FileGlobWatchTail.new(GrokReader, self)
#handler = EventMachine::FileGlobWatchTail.new(Reader, self)
@config.watched_paths.each do |path|
$logger.warn("Watching #{path}")
EventMachine::FileGlobWatchTail.new(path, GrokReader, interval=60,
exclude=[], agent=self)
$logger.info("Watching #{path}")
EventMachine::FileGlobWatchTail.new(path, Reader, interval=60,
exclude=[], agent=self)
end
end
private
def flush_indexes
@needs_flushing.each do |name|
$logger.warn("Flushing #{name}")
@indexes[name].flush
end
@needs_flushing.clear
end
end # class Agent
end; end # module LogStash::Programs