mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- Some small changes to make it easier for testing
- Improve logging from input/output registration - Add additional way to put logstash in debug: set LOGSTASH_DEBUG in env - Add tcp output. Writes json-encoded messages
This commit is contained in:
parent
9aa551be74
commit
3dca9e22c9
7 changed files with 43 additions and 5 deletions
|
@ -9,6 +9,9 @@ require "logstash/logging"
|
|||
# Collect logs, ship them out.
|
||||
class LogStash::Agent
|
||||
attr_reader :config
|
||||
attr_reader :inputs
|
||||
attr_reader :outputs
|
||||
attr_reader :filters
|
||||
|
||||
def initialize(config)
|
||||
@logger = LogStash::Logger.new(STDERR)
|
||||
|
@ -26,7 +29,7 @@ class LogStash::Agent
|
|||
# Register any event handlers with EventMachine
|
||||
# Technically, this agent could listen for anything (files, sockets, amqp,
|
||||
# stomp, etc).
|
||||
protected
|
||||
public
|
||||
def register
|
||||
# TODO(sissel): warn when no inputs and no outputs are defined.
|
||||
# TODO(sissel): Refactor this madness into a config lib
|
||||
|
@ -80,12 +83,20 @@ class LogStash::Agent
|
|||
end # def register
|
||||
|
||||
public
|
||||
def run
|
||||
def run(&block)
|
||||
EventMachine.run do
|
||||
self.register
|
||||
yield if block_given?
|
||||
end # EventMachine.run
|
||||
end # def run
|
||||
|
||||
public
|
||||
def stop
|
||||
# TODO(sissel): Stop inputs, fluch outputs, wait for finish,
|
||||
# then stop the event loop
|
||||
EventMachine.stop_event_loop
|
||||
end
|
||||
|
||||
protected
|
||||
def filter(event)
|
||||
@filters.each do |f|
|
||||
|
|
|
@ -23,7 +23,7 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
|||
end
|
||||
|
||||
def register
|
||||
@logger.info("Registering #{@url}")
|
||||
@logger.info("Registering input #{@url}")
|
||||
@amqp = AMQP.connect(:host => @url.host)
|
||||
@mq = MQ.new(@amqp)
|
||||
@target = nil
|
||||
|
|
|
@ -15,7 +15,7 @@ class LogStash::Inputs::Internal < LogStash::Inputs::Base
|
|||
end
|
||||
|
||||
def register
|
||||
@logger.info("Registering #{@url}")
|
||||
@logger.info("Registering input #{@url}")
|
||||
@channel.subscribe do |event|
|
||||
receive(event)
|
||||
end
|
||||
|
|
|
@ -18,6 +18,9 @@ class LogStash::Logger < Logger
|
|||
|
||||
# Set default loglevel to WARN unless $DEBUG is set (run with 'ruby -d')
|
||||
self.level = $DEBUG ? Logger::DEBUG: Logger::INFO
|
||||
if ENV["LOGSTASH_DEBUG"]
|
||||
self.level = Logger::DEBUG
|
||||
end
|
||||
|
||||
# Conditional support for awesome_print
|
||||
if !@@have_awesome_print && @@notify_awesome_print_load_failed
|
||||
|
|
|
@ -19,6 +19,7 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
|||
end # def initialize
|
||||
|
||||
def register
|
||||
@logger.info("Registering output #{@url}")
|
||||
@amqp = AMQP.connect(:host => @url.host)
|
||||
@mq = MQ.new(@amqp)
|
||||
@target = nil
|
||||
|
|
|
@ -7,7 +7,7 @@ class LogStash::Outputs::Internal < LogStash::Outputs::Base
|
|||
end
|
||||
|
||||
def register
|
||||
# nothing to do
|
||||
@logger.info("Registering output #{@url}")
|
||||
end # def register
|
||||
|
||||
def receive(event)
|
||||
|
|
23
lib/logstash/outputs/tcp.rb
Normal file
23
lib/logstash/outputs/tcp.rb
Normal file
|
@ -0,0 +1,23 @@
|
|||
require "logstash/outputs/base"
|
||||
|
||||
class LogStash::Outputs::Tcp < LogStash::Outputs::Base
|
||||
def initialize(url, config={}, &block)
|
||||
super
|
||||
end
|
||||
|
||||
def register
|
||||
# TODO(sissel): Write generic validation methods
|
||||
if !@url.host or !@url.port
|
||||
@logger.fatal("No host or port given in #{self.class}: #{@url}")
|
||||
# TODO(sissel): Make this an actual exception class
|
||||
raise "configuration error"
|
||||
end
|
||||
|
||||
@connection = EventMachine::connect(@url.host, @url.port)
|
||||
end # def register
|
||||
|
||||
def receive(event)
|
||||
@connection.send_data(event.to_hash.to_json)
|
||||
@connection.send_data("\n")
|
||||
end # def receive
|
||||
end # class LogStash::Outputs::Tcp
|
Loading…
Add table
Add a link
Reference in a new issue