Style and documentation.

This commit is contained in:
Dan Peterson 2011-07-14 09:10:19 -03:00
parent 8882a824e8
commit 6d973ec644
2 changed files with 82 additions and 51 deletions

View file

@ -6,14 +6,19 @@ require "timeout"
# Read events over a TCP socket.
#
# Like stdin and file inputs, each event is assumed to be one line of text.
#
# Can either accept connections from clients or connect to a server,
# depending on `mode`.
class LogStash::Inputs::Tcp < LogStash::Inputs::Base
config_name "tcp"
# The address to listen on
# When mode is `server`, the address to listen on.
# When mode is `client`, the address to connect to.
config :host, :validate => :string, :default => "0.0.0.0"
# the port to listen on
# When mode is `server`, the port to listen on.
# When mode is `client`, the port to connect to.
config :port, :validate => :number, :required => true
# Read timeout in seconds. If a particular tcp connection is
@ -22,25 +27,26 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
# If you never want to timeout, use -1.
config :data_timeout, :validate => :number, :default => 5
# Enable server.
config :server, :validate => :boolean, :default => true
# Mode to operate in. `server` listens for client connections,
# `client` connects to a server.
config :mode, :validate => ["server", "client"], :default => "server"
module SocketPeer
public
def peer
"#{peeraddr[3]}:#{peeraddr[1]}"
end
end
end # def peer
end # module SocketPeer
public
def register
if @server
@logger.info("Starting tcp listener on #{@host}:#{@port}")
@server = TCPServer.new(@host, @port)
else
@socket = nil
if server?
@logger.info("Starting tcp input listener on #{@host}:#{@port}")
@server_socket = TCPServer.new(@host, @port)
end
end # def register
private
def handle_socket(socket, output_queue, event_source)
begin
loop do
@ -74,25 +80,30 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
end # begin
end
private
def server?
@mode == "server"
end # def server?
public
def run(output_queue)
if @server
if server?
loop do
# Start a new thread for each connection.
Thread.start(@server.accept) do |s|
Thread.start(@server_socket.accept) do |s|
# TODO(sissel): put this block in its own method.
s.instance_eval { class << self; include SocketPeer end }
@logger.debug("Accepted connection from #{s.peer} on #{@host}:#{@port}")
handle_socket(s, output_queue, "tcp://#{@host}:#{@port}/client/#{s.peer}")
end # Thread.start
end # loop (outer)
end # loop
else
loop do
socket = TCPSocket.new(@host, @port)
socket.instance_eval { class << self; include SocketPeer end }
@logger.debug("Opened connection to #{socket.peer}")
handle_socket(socket, output_queue, "tcp://#{socket.peer}/server")
end
client_socket = TCPSocket.new(@host, @port)
client_socket.instance_eval { class << self; include SocketPeer end }
@logger.debug("Opened connection to #{client_socket.peer}")
handle_socket(client_socket, output_queue, "tcp://#{client_socket.peer}/server")
end # loop
end
end # def run
end # class LogStash::Inputs::Tcp

View file

@ -2,87 +2,107 @@ require "logstash/outputs/base"
require "logstash/namespace"
require "thread"
# This output writes each event in json format to
# the specified host:port over tcp.
# Write events over a TCP socket.
#
# Each event json is separated by a newline.
#
# Can either accept connections from clients or connect to a server,
# depending on `mode`.
class LogStash::Outputs::Tcp < LogStash::Outputs::Base
config_name "tcp"
# The host to connect or bind to
# When mode is `server`, the address to listen on.
# When mode is `client`, the address to connect to.
config :host, :validate => :string, :required => true
# The port to connect or bind to
# When mode is `server`, the port to listen on.
# When mode is `client`, the port to connect to.
config :port, :validate => :number, :required => true
# Enable server.
config :server, :validate => :boolean
# Mode to operate in. `server` listens for client connections,
# `client` connects to a server.
config :mode, :validate => ["server", "client"], :default => "client"
class Client < Thread
def initialize(socket)
class Client
public
def initialize(socket, logger)
@socket = socket
@logger = logger
@queue = Queue.new
end
super do
loop do
begin
@socket.write(@queue.pop)
rescue => e
@logger.warn(["tcp output exception", @socket, $!])
@logger.debug(["backtrace", e.backtrace])
break
end
public
def run
loop do
begin
@socket.write(@queue.pop)
rescue => e
@logger.warn(["tcp output exception", @socket, $!])
@logger.debug(["backtrace", e.backtrace])
break
end
end
end
end # def run
public
def write(msg)
@queue.push(msg)
end
end
end # def write
end # class Client
public
def register
if @server
if server?
@logger.info("Starting tcp output listener on #{@host}:#{@port}")
@server_socket = TCPServer.new(@host, @port)
@client_threads = []
@accept_thread = Thread.new(@server_socket) do |server|
@accept_thread = Thread.new(@server_socket) do |server_socket|
loop do
@client_threads << Client.new(server.accept)
client_thread = Thread.start(server_socket.accept) do |client_socket|
client = Client.new(client_socket, @logger)
Thread.current[:client] = client
client.run
end
@client_threads << client_thread
end
end
else
@socket = nil
@client_socket = nil
end
end # def register
private
def connect
@socket = TCPSocket.new(@host, @port)
end
@client_socket = TCPSocket.new(@host, @port)
end # def connect
private
def server?
@mode == "server"
end # def server?
public
def receive(event)
wire_event = event.to_hash.to_json + "\n"
if @server
if server?
@client_threads.each do |client_thread|
client_thread.write(wire_event)
client_thread[:client].write(wire_event)
end
@client_threads.reject! {|t| !t.alive? }
else
begin
connect unless @socket
@socket.write(event.to_hash.to_json)
@socket.write("\n")
connect unless @client_socket
@client_socket.write(event.to_hash.to_json)
@client_socket.write("\n")
rescue => e
@logger.warn(["tcp output exception", @host, @port, $!])
@logger.debug(["backtrace", e.backtrace])
@socket = nil
@client_socket = nil
end
end
end # def receive