mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
fix tcp output
This commit is contained in:
parent
de568d2b46
commit
67f271962e
1 changed files with 25 additions and 10 deletions
|
@ -2,21 +2,36 @@ require "logstash/outputs/base"
|
|||
require "logstash/namespace"
|
||||
|
||||
class LogStash::Outputs::Tcp < LogStash::Outputs::Base
|
||||
|
||||
config_name "tcp"
|
||||
|
||||
config :host, :validate => :string, :required => true
|
||||
config :port, :validate => :number, :required => true
|
||||
|
||||
public
|
||||
def initialize(params)
|
||||
super
|
||||
end # def initialize
|
||||
|
||||
public
|
||||
def register
|
||||
# TODO(sissel): Write generic validation methods
|
||||
if !@url.host or !@url.port
|
||||
@logger.fatal("No host or port given in #{self.class}: #{@url}")
|
||||
# TODO(sissel): Make this an actual exception class
|
||||
raise "configuration error"
|
||||
end
|
||||
|
||||
@connection = EventMachine::connect(@url.host, @url.port)
|
||||
@socket = nil
|
||||
end # def register
|
||||
|
||||
private
|
||||
def connect
|
||||
@socket = TCPSocket.new(@host, @port)
|
||||
end
|
||||
|
||||
public
|
||||
def receive(event)
|
||||
@connection.send_data(event.to_hash.to_json)
|
||||
@connection.send_data("\n")
|
||||
begin
|
||||
connect unless @socket
|
||||
@socket.write(event.to_hash.to_json)
|
||||
@socket.write("\n")
|
||||
rescue
|
||||
@logger.warn(["tcp output exception", @host, @port, $!])
|
||||
@socket = nil
|
||||
end
|
||||
end # def receive
|
||||
end # class LogStash::Outputs::Tcp
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue