mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- start working on proper codec support in the tcp input
(json doesn't work yet because there is no json streaming library that I know of that works in both jruby and mri ruby)
This commit is contained in:
parent
ca99d83079
commit
1bb463ae2e
1 changed files with 30 additions and 31 deletions
|
@ -13,8 +13,7 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
config_name "tcp"
|
||||
milestone 2
|
||||
|
||||
# XXX Refactor this to use the 'line' codec by default.
|
||||
default :codec => "plain"
|
||||
default :codec, "line"
|
||||
|
||||
# When mode is `server`, the address to listen on.
|
||||
# When mode is `client`, the address to connect to.
|
||||
|
@ -97,34 +96,34 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
end # def register
|
||||
|
||||
private
|
||||
def handle_socket(socket, event_source, output_queue)
|
||||
begin
|
||||
while true
|
||||
buf = nil
|
||||
# NOTE(petef): the timeout only hits after the line is read
|
||||
# or socket dies
|
||||
# TODO(sissel): Why do we have a timeout here? What's the point?
|
||||
if @data_timeout == -1
|
||||
buf = readline(socket).chomp
|
||||
else
|
||||
Timeout::timeout(@data_timeout) do
|
||||
buf = readline(socket).chomp
|
||||
end
|
||||
def handle_socket(socket, event_source, output_queue, codec)
|
||||
while true
|
||||
buf = nil
|
||||
# NOTE(petef): the timeout only hits after the line is read
|
||||
# or socket dies
|
||||
# TODO(sissel): Why do we have a timeout here? What's the point?
|
||||
if @data_timeout == -1
|
||||
buf = read(socket)
|
||||
else
|
||||
Timeout::timeout(@data_timeout) do
|
||||
buf = read(socket)
|
||||
end
|
||||
@codec.decode(buf) do |event|
|
||||
event["source"] = event_source
|
||||
event["sslsubject"] = socket.peer_cert.subject if @ssl_enable && @ssl_verify
|
||||
output_queue << event
|
||||
end
|
||||
end # loop do
|
||||
rescue => e
|
||||
@logger.debug("Closing connection", :client => socket.peer,
|
||||
:exception => e, :backtrace => e.backtrace)
|
||||
rescue Timeout::Error
|
||||
@logger.debug("Closing connection after read timeout",
|
||||
:client => socket.peer)
|
||||
end # begin
|
||||
end
|
||||
codec.decode(buf) do |event|
|
||||
event["source"] = event_source
|
||||
event["sslsubject"] = socket.peer_cert.subject if @ssl_enable && @ssl_verify
|
||||
output_queue << event
|
||||
end
|
||||
end # loop do
|
||||
rescue => e
|
||||
codec.respond_to?(:flush) && codec.flush do |event|
|
||||
event["source"] = event_source
|
||||
event["sslsubject"] = socket.peer_cert.subject if @ssl_enable && @ssl_verify
|
||||
output_queue << event
|
||||
end
|
||||
|
||||
@logger.debug("An error occurred. Closing connection",
|
||||
:client => socket.peer, :exception => e)
|
||||
ensure
|
||||
begin
|
||||
socket.close
|
||||
|
@ -139,8 +138,8 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
end # def server?
|
||||
|
||||
private
|
||||
def readline(socket)
|
||||
line = socket.readline
|
||||
def read(socket)
|
||||
return socket.sysread(16384)
|
||||
end # def readline
|
||||
|
||||
public
|
||||
|
@ -166,7 +165,7 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
@logger.debug("Accepted connection", :client => s.peer,
|
||||
:server => "#{@host}:#{@port}")
|
||||
begin
|
||||
handle_socket(s, "tcp://#{s.peer}/", output_queue)
|
||||
handle_socket(s, "tcp://#{s.peer}/", output_queue, @codec.clone)
|
||||
rescue Interrupted
|
||||
s.close rescue nil
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue