mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
Merge branch 'tcp-client-and-server' of https://github.com/dpiddy/logstash into dpiddy-tcp-client-and-server
This commit is contained in:
commit
25f720a954
2 changed files with 164 additions and 63 deletions
|
@ -6,67 +6,104 @@ require "timeout"
|
|||
# Read events over a TCP socket.
|
||||
#
|
||||
# Like stdin and file inputs, each event is assumed to be one line of text.
|
||||
#
|
||||
# Can either accept connections from clients or connect to a server,
|
||||
# depending on `mode`.
|
||||
class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
||||
|
||||
config_name "tcp"
|
||||
|
||||
# The address to listen on
|
||||
# When mode is `server`, the address to listen on.
|
||||
# When mode is `client`, the address to connect to.
|
||||
config :host, :validate => :string, :default => "0.0.0.0"
|
||||
|
||||
# the port to listen on
|
||||
|
||||
# When mode is `server`, the port to listen on.
|
||||
# When mode is `client`, the port to connect to.
|
||||
config :port, :validate => :number, :required => true
|
||||
|
||||
# Read timeout in seconds. If a particular tcp connection is
|
||||
# idle for more than this timeout period, we will assume
|
||||
# idle for more than this timeout period, we will assume
|
||||
# it is dead and close it.
|
||||
# If you never want to timeout, use -1.
|
||||
config :data_timeout, :validate => :number, :default => 5
|
||||
|
||||
# Mode to operate in. `server` listens for client connections,
|
||||
# `client` connects to a server.
|
||||
config :mode, :validate => ["server", "client"], :default => "server"
|
||||
|
||||
module SocketPeer
|
||||
public
|
||||
def peer
|
||||
"#{peeraddr[3]}:#{peeraddr[1]}"
|
||||
end # def peer
|
||||
end # module SocketPeer
|
||||
|
||||
public
|
||||
def register
|
||||
@logger.info("Starting tcp listener on #{@host}:#{@port}")
|
||||
@server = TCPServer.new(@host, @port)
|
||||
if server?
|
||||
@logger.info("Starting tcp input listener on #{@host}:#{@port}")
|
||||
@server_socket = TCPServer.new(@host, @port)
|
||||
end
|
||||
end # def register
|
||||
|
||||
private
|
||||
def handle_socket(socket, output_queue, event_source)
|
||||
begin
|
||||
loop do
|
||||
buf = nil
|
||||
# NOTE(petef): the timeout only hits after the line is read
|
||||
# or socket dies
|
||||
# TODO(sissel): Why do we have a timeout here? What's the point?
|
||||
if @data_timeout == -1
|
||||
buf = socket.readline
|
||||
else
|
||||
Timeout::timeout(@data_timeout) do
|
||||
buf = socket.readline
|
||||
end
|
||||
end
|
||||
e = self.to_event(buf, event_source)
|
||||
if e
|
||||
output_queue << e
|
||||
end
|
||||
end # loop do
|
||||
rescue => e
|
||||
@logger.debug(["Closing connection with #{socket.peer}", $!])
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
rescue Timeout::Error
|
||||
@logger.debug("Closing connection with #{socket.peer} after read timeout")
|
||||
end # begin
|
||||
|
||||
begin
|
||||
socket.close
|
||||
rescue IOError
|
||||
pass
|
||||
end # begin
|
||||
end
|
||||
|
||||
private
|
||||
def server?
|
||||
@mode == "server"
|
||||
end # def server?
|
||||
|
||||
public
|
||||
def run(output_queue)
|
||||
loop do
|
||||
# Start a new thread for each connection.
|
||||
Thread.start(@server.accept) do |s|
|
||||
# TODO(sissel): put this block in its own method.
|
||||
peer = "#{s.peeraddr[3]}:#{s.peeraddr[1]}"
|
||||
@logger.debug("Accepted connection from #{peer} on #{@host}:#{@port}")
|
||||
begin
|
||||
loop do
|
||||
buf = nil
|
||||
# NOTE(petef): the timeout only hits after the line is read
|
||||
# or socket dies
|
||||
# TODO(sissel): Why do we have a timeout here? What's the point?
|
||||
if @data_timeout == -1
|
||||
buf = s.readline
|
||||
else
|
||||
Timeout::timeout(@data_timeout) do
|
||||
buf = s.readline
|
||||
end
|
||||
end
|
||||
e = self.to_event(buf, "tcp://#{@host}:#{@port}/client/#{peer}")
|
||||
if e
|
||||
output_queue << e
|
||||
end
|
||||
end # loop do
|
||||
rescue => e
|
||||
@logger.debug(["Closing connection with #{peer}", $!])
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
rescue Timeout::Error
|
||||
@logger.debug("Closing connection with #{peer} after read timeout")
|
||||
end # begin
|
||||
|
||||
begin
|
||||
s.close
|
||||
rescue IOError
|
||||
pass
|
||||
end # begin
|
||||
end # Thread.start
|
||||
end # loop (outer)
|
||||
if server?
|
||||
loop do
|
||||
# Start a new thread for each connection.
|
||||
Thread.start(@server_socket.accept) do |s|
|
||||
# TODO(sissel): put this block in its own method.
|
||||
s.instance_eval { class << self; include SocketPeer end }
|
||||
@logger.debug("Accepted connection from #{s.peer} on #{@host}:#{@port}")
|
||||
handle_socket(s, output_queue, "tcp://#{@host}:#{@port}/client/#{s.peer}")
|
||||
end # Thread.start
|
||||
end # loop
|
||||
else
|
||||
loop do
|
||||
client_socket = TCPSocket.new(@host, @port)
|
||||
client_socket.instance_eval { class << self; include SocketPeer end }
|
||||
@logger.debug("Opened connection to #{client_socket.peer}")
|
||||
handle_socket(client_socket, output_queue, "tcp://#{client_socket.peer}/server")
|
||||
end # loop
|
||||
end
|
||||
end # def run
|
||||
end # class LogStash::Inputs::Tcp
|
||||
|
|
|
@ -1,45 +1,109 @@
|
|||
require "logstash/outputs/base"
|
||||
require "logstash/namespace"
|
||||
require "thread"
|
||||
|
||||
# This output writes each event in json format to
|
||||
# the specified host:port over tcp.
|
||||
|
||||
# Write events over a TCP socket.
|
||||
#
|
||||
# Each event json is separated by a newline.
|
||||
#
|
||||
# Can either accept connections from clients or connect to a server,
|
||||
# depending on `mode`.
|
||||
class LogStash::Outputs::Tcp < LogStash::Outputs::Base
|
||||
|
||||
config_name "tcp"
|
||||
|
||||
# The host to connect to
|
||||
# When mode is `server`, the address to listen on.
|
||||
# When mode is `client`, the address to connect to.
|
||||
config :host, :validate => :string, :required => true
|
||||
|
||||
# The port to connect to
|
||||
# When mode is `server`, the port to listen on.
|
||||
# When mode is `client`, the port to connect to.
|
||||
config :port, :validate => :number, :required => true
|
||||
|
||||
public
|
||||
def initialize(params)
|
||||
super
|
||||
end # def initialize
|
||||
# Mode to operate in. `server` listens for client connections,
|
||||
# `client` connects to a server.
|
||||
config :mode, :validate => ["server", "client"], :default => "client"
|
||||
|
||||
class Client
|
||||
public
|
||||
def initialize(socket, logger)
|
||||
@socket = socket
|
||||
@logger = logger
|
||||
@queue = Queue.new
|
||||
end
|
||||
|
||||
public
|
||||
def run
|
||||
loop do
|
||||
begin
|
||||
@socket.write(@queue.pop)
|
||||
rescue => e
|
||||
@logger.warn(["tcp output exception", @socket, $!])
|
||||
@logger.debug(["backtrace", e.backtrace])
|
||||
break
|
||||
end
|
||||
end
|
||||
end # def run
|
||||
|
||||
public
|
||||
def write(msg)
|
||||
@queue.push(msg)
|
||||
end # def write
|
||||
end # class Client
|
||||
|
||||
public
|
||||
def register
|
||||
@socket = nil
|
||||
if server?
|
||||
@logger.info("Starting tcp output listener on #{@host}:#{@port}")
|
||||
@server_socket = TCPServer.new(@host, @port)
|
||||
@client_threads = []
|
||||
|
||||
@accept_thread = Thread.new(@server_socket) do |server_socket|
|
||||
loop do
|
||||
client_thread = Thread.start(server_socket.accept) do |client_socket|
|
||||
client = Client.new(client_socket, @logger)
|
||||
Thread.current[:client] = client
|
||||
client.run
|
||||
end
|
||||
@client_threads << client_thread
|
||||
end
|
||||
end
|
||||
else
|
||||
@client_socket = nil
|
||||
end
|
||||
end # def register
|
||||
|
||||
private
|
||||
def connect
|
||||
@socket = TCPSocket.new(@host, @port)
|
||||
end
|
||||
@client_socket = TCPSocket.new(@host, @port)
|
||||
end # def connect
|
||||
|
||||
private
|
||||
def server?
|
||||
@mode == "server"
|
||||
end # def server?
|
||||
|
||||
public
|
||||
def receive(event)
|
||||
begin
|
||||
connect unless @socket
|
||||
@socket.write(event.to_hash.to_json)
|
||||
@socket.write("\n")
|
||||
rescue => e
|
||||
@logger.warn(["tcp output exception", @host, @port, $!])
|
||||
@logger.debug(["backtrace", e.backtrace])
|
||||
@socket = nil
|
||||
wire_event = event.to_hash.to_json + "\n"
|
||||
|
||||
if server?
|
||||
@client_threads.each do |client_thread|
|
||||
client_thread[:client].write(wire_event)
|
||||
end
|
||||
|
||||
@client_threads.reject! {|t| !t.alive? }
|
||||
else
|
||||
begin
|
||||
connect unless @client_socket
|
||||
@client_socket.write(event.to_hash.to_json)
|
||||
@client_socket.write("\n")
|
||||
rescue => e
|
||||
@logger.warn(["tcp output exception", @host, @port, $!])
|
||||
@logger.debug(["backtrace", e.backtrace])
|
||||
@client_socket = nil
|
||||
end
|
||||
end
|
||||
end # def receive
|
||||
end # class LogStash::Outputs::Tcp
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue