mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- Rewrite Agent for local-running only. The goal here is mainly to make the
agent EventMachine-safe. Later I'll pull the index/parse functionality into a module that can be included into the agent for optional local-only or network modes.
This commit is contained in:
parent
7f66dcc2ea
commit
8965fa6c49
9 changed files with 244 additions and 25 deletions
|
@ -2,7 +2,8 @@
|
|||
|
||||
$: << File.join(File.dirname(__FILE__), "..")
|
||||
|
||||
require 'lib/net/clients/agent'
|
||||
require 'rubygems'
|
||||
require 'lib/programs/agent'
|
||||
require 'lib/program'
|
||||
require 'logger'
|
||||
require 'optparse'
|
||||
|
@ -14,18 +15,10 @@ $logger.level = Logger::INFO
|
|||
$logger.progname = $progname
|
||||
$logger.datetime_format = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
class Agent < LogStash::Program
|
||||
def run
|
||||
super
|
||||
agent = LogStash::Net::Clients::Agent.new(@options[:config], $logger)
|
||||
agent.run
|
||||
end
|
||||
end
|
||||
|
||||
def main(args)
|
||||
options = parse_options(args)
|
||||
Agent.new(options).run
|
||||
end
|
||||
return LogStash::Programs::Agent.new(options).run
|
||||
end # def main
|
||||
|
||||
def parse_options(args)
|
||||
options = {:daemonize => true,
|
||||
|
@ -71,6 +64,6 @@ def parse_options(args)
|
|||
options[:config] = args.shift
|
||||
|
||||
return options
|
||||
end
|
||||
end # def parse_options
|
||||
|
||||
exit main(ARGV)
|
||||
|
|
|
@ -1,15 +1,54 @@
|
|||
require "lib/config/base"
|
||||
require 'lib/config/base'
|
||||
require 'lib/logs'
|
||||
require 'lib/log/json'
|
||||
require 'lib/log/text'
|
||||
|
||||
module LogStash; module Config
|
||||
class AgentConfig < BaseConfig
|
||||
attr_reader :sources
|
||||
attr_reader :logs
|
||||
attr_reader :watched_paths
|
||||
attr_reader :logstash_dir
|
||||
attr_reader :pattern_dir
|
||||
|
||||
def initialize(file)
|
||||
super(file)
|
||||
obj = YAML::load(File.open(file).read())
|
||||
@logstash_dir = "/var/logstash"
|
||||
@pattern_dir = "/opt/logstash/patterns"
|
||||
@watched_paths = []
|
||||
@logs = LogStash::Logs.new
|
||||
|
||||
@sources = obj["sources"]
|
||||
end # def initialize
|
||||
data = YAML::load(::File.open(file).read())
|
||||
merge!(data)
|
||||
end
|
||||
|
||||
def merge!(data)
|
||||
@pattern_dir = data["pattern_dir"] if data.has_key?("pattern_dir")
|
||||
@logstash_dir = data["logstash_dir"] if data.has_key?("logstash_dir")
|
||||
@watched_paths = data["watch"] if data.has_key?("watch")
|
||||
|
||||
if data.has_key?("log-types")
|
||||
data["log-types"].each do |log_type, log_data|
|
||||
puts "Got log #{log_type}"
|
||||
log_config = {:type => log_type,
|
||||
:date_key => log_data["date"]["key"],
|
||||
:date_format => log_data["date"]["format"],
|
||||
:logstash_dir => @logstash_dir,
|
||||
:pattern_dir => @pattern_dir,
|
||||
}
|
||||
|
||||
log = nil
|
||||
case log_data["type"]
|
||||
when "text"
|
||||
log_config[:grok_patterns] = log_data["patterns"]
|
||||
log = LogStash::Log::TextLog.new(log_config)
|
||||
when "json"
|
||||
log_config[:line_format] = log_data["display_format"]
|
||||
log = LogStash::Log::JsonLog.new(log_config)
|
||||
end
|
||||
|
||||
@logs.register(log)
|
||||
end
|
||||
end
|
||||
end # def merge!
|
||||
end # class AgentConfig
|
||||
end; end # module LogStash::Config
|
||||
|
|
|
@ -1,12 +1,11 @@
|
|||
require 'rubygems'
|
||||
require 'mqrpc'
|
||||
require 'yaml'
|
||||
|
||||
module LogStash; module Config
|
||||
# Base config class. All configs need to know how to get to a broker.
|
||||
class BaseConfig < MQRPC::Config
|
||||
class BaseConfig
|
||||
def initialize(file)
|
||||
obj = YAML::load(File.open(file).read())
|
||||
obj = YAML::load(::File.open(file).read())
|
||||
@mqhost = obj["mqhost"] || "localhost"
|
||||
@mqport = obj["mqport"] || 5672
|
||||
@mquser = obj["mquser"] || "guest"
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
require 'rubygems'
|
||||
require 'date'
|
||||
require 'json'
|
||||
require 'ferret'
|
||||
|
||||
module LogStash
|
||||
class LogException < StandardError
|
||||
|
|
|
@ -27,5 +27,12 @@ module LogStash
|
|||
def types
|
||||
return @logs.keys
|
||||
end
|
||||
|
||||
def each
|
||||
@logs.each do |k,v|
|
||||
yield k,v
|
||||
end
|
||||
end # def each
|
||||
|
||||
end # class Logs
|
||||
end # module LogStash
|
||||
|
|
|
@ -1,8 +1,4 @@
|
|||
require 'rubygems'
|
||||
#require 'lib/util'
|
||||
|
||||
module LogStash
|
||||
|
||||
class Program
|
||||
class PidFileLockFailed < StandardError
|
||||
end # class LogStash::Program::PidFileLockFailed
|
||||
|
|
93
lib/programs/agent.rb
Normal file
93
lib/programs/agent.rb
Normal file
|
@ -0,0 +1,93 @@
|
|||
require 'lib/config/agent'
|
||||
require 'lib/program'
|
||||
require 'lib/file/tail'
|
||||
require 'grok'
|
||||
require 'set'
|
||||
require 'ap'
|
||||
require 'socket' # for Socket.gethostname
|
||||
require 'eventmachine'
|
||||
require 'eventmachine-tail'
|
||||
|
||||
class GrokReader < EventMachine::FileTail
|
||||
def initialize(path, agent)
|
||||
super(path)
|
||||
@agent = agent
|
||||
@buffer = BufferedTokenizer.new
|
||||
end
|
||||
|
||||
def receive_data(data)
|
||||
@buffer.extract(data).each do |line|
|
||||
@agent.process(path, line)
|
||||
end
|
||||
end # def receive_data
|
||||
|
||||
end # class GrokReader
|
||||
|
||||
module LogStash; module Programs;
|
||||
class Agent < LogStash::Program
|
||||
public
|
||||
def initialize(options)
|
||||
super(options)
|
||||
@config = LogStash::Config::AgentConfig.new(options[:config])
|
||||
@config.merge!(options)
|
||||
@indexes = Hash.new { |h,k| h[k] = @config.logs[k].get_index }
|
||||
@hostname = Socket.gethostname
|
||||
@needs_flushing = Set.new
|
||||
end
|
||||
|
||||
public
|
||||
def run
|
||||
EventMachine.run do
|
||||
super
|
||||
setup_watches
|
||||
|
||||
EventMachine.add_periodic_timer(1) do
|
||||
flush_indexes
|
||||
end
|
||||
ap @options
|
||||
end # EventMachine.run
|
||||
end # def run
|
||||
|
||||
public
|
||||
def process(path, line)
|
||||
@config.logs.each do |name, log|
|
||||
begin
|
||||
entry = log.parse_entry(line)
|
||||
if entry
|
||||
entry["@SOURCE_FILE"] = path
|
||||
entry["@SOURCE_HOST"] = @hostname
|
||||
puts "match #{name} in #{path}: #{line}"
|
||||
index(name, entry)
|
||||
break
|
||||
end
|
||||
rescue LogStash::Log::LogParseError => e
|
||||
# ignore
|
||||
end
|
||||
end # @logs.each
|
||||
end # def process
|
||||
|
||||
private
|
||||
def index(name, entry)
|
||||
@indexes[name] << entry
|
||||
@needs_flushing << name
|
||||
end
|
||||
|
||||
private
|
||||
def setup_watches
|
||||
handler = EventMachine::FileGlobWatchTail.new(GrokReader, self)
|
||||
@config.watched_paths.each do |path|
|
||||
$logger.warn("Watching #{path}")
|
||||
EventMachine::FileGlobWatch.new(path, handler)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
def flush_indexes
|
||||
@needs_flushing.each do |name|
|
||||
$logger.warn("Flushing #{name}")
|
||||
@indexes[name].flush
|
||||
end
|
||||
@needs_flushing.clear
|
||||
end
|
||||
end # class Agent
|
||||
end; end # module LogStash::Programs
|
48
logstash-agent.yaml
Normal file
48
logstash-agent.yaml
Normal file
|
@ -0,0 +1,48 @@
|
|||
---
|
||||
watch:
|
||||
- /var/log/messages
|
||||
- /var/log/*.log
|
||||
|
||||
logstash_dir: /c/logstash
|
||||
pattern_dir: /c/logstash/patterns
|
||||
|
||||
log-types:
|
||||
linux-syslog:
|
||||
type: text
|
||||
date:
|
||||
key: date
|
||||
format: %b %e %H:%M:%S
|
||||
patterns:
|
||||
- %{SYSLOGPAMSESSION}
|
||||
- %{SYSLOGLINE}
|
||||
|
||||
httpd-access:
|
||||
type: text
|
||||
date:
|
||||
key: timestamp
|
||||
format: %d/%b/%Y:%H:%M:%S %Z
|
||||
patterns:
|
||||
- %{COMBINEDAPACHELOG}
|
||||
|
||||
glu:
|
||||
type: json
|
||||
date:
|
||||
key: timestamp
|
||||
format: %Y-%m-%dT%H:%M:%S
|
||||
display_format: "<%= entry['timestamp'] %> | <%= entry['level'] %> | <%= entry['context/sessionKey'] %> | <%= entry['sourceHostName'] %> | <%= entry['context/componentName'] %> | <%= entry['message'] %>"
|
||||
|
||||
netscreen:
|
||||
type: text
|
||||
date:
|
||||
key: date
|
||||
format: %b %e %H:%M:%S
|
||||
patterns:
|
||||
- %{NETSCREENSESSIONLOG}
|
||||
|
||||
haproxy:
|
||||
type: text
|
||||
date:
|
||||
key: date
|
||||
format: %b %e %H:%M:%S
|
||||
patterns:
|
||||
- %{HAPROXYHTTP}
|
44
logstashd.yaml
Normal file
44
logstashd.yaml
Normal file
|
@ -0,0 +1,44 @@
|
|||
---
|
||||
logstash_dir: /c/logstash
|
||||
pattern_dir: /c/logstash/patterns
|
||||
|
||||
log-types:
|
||||
linux-syslog:
|
||||
type: text
|
||||
date:
|
||||
key: date
|
||||
format: %b %e %H:%M:%S
|
||||
patterns:
|
||||
- %{SYSLOGPAMSESSION}
|
||||
- %{SYSLOGLINE}
|
||||
|
||||
httpd-access:
|
||||
type: text
|
||||
date:
|
||||
key: timestamp
|
||||
format: %d/%b/%Y:%H:%M:%S %Z
|
||||
patterns:
|
||||
- %{COMBINEDAPACHELOG}
|
||||
|
||||
glu:
|
||||
type: json
|
||||
date:
|
||||
key: timestamp
|
||||
format: %Y-%m-%dT%H:%M:%S
|
||||
display_format: "<%= entry['timestamp'] %> | <%= entry['level'] %> | <%= entry['context/sessionKey'] %> | <%= entry['sourceHostName'] %> | <%= entry['context/componentName'] %> | <%= entry['message'] %>"
|
||||
|
||||
netscreen:
|
||||
type: text
|
||||
date:
|
||||
key: date
|
||||
format: %b %e %H:%M:%S
|
||||
patterns:
|
||||
- %{NETSCREENSESSIONLOG}
|
||||
|
||||
haproxy:
|
||||
type: text
|
||||
date:
|
||||
key: date
|
||||
format: %b %e %H:%M:%S
|
||||
patterns:
|
||||
- %{HAPROXYHTTP}
|
Loading…
Add table
Add a link
Reference in a new issue