mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- add codec support
- message_format is deprecated
This commit is contained in:
parent
f303105308
commit
4b3c0a34af
1 changed files with 30 additions and 26 deletions
|
@ -14,6 +14,8 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
|
|||
config_name "tcp"
|
||||
milestone 2
|
||||
|
||||
default :codec, "json"
|
||||
|
||||
# When mode is `server`, the address to listen on.
|
||||
# When mode is `client`, the address to connect to.
|
||||
config :host, :validate => :string, :required => true
|
||||
|
@ -35,7 +37,7 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
|
|||
#
|
||||
# If this setting is omitted, the full json representation of the
|
||||
# event will be written as a single line.
|
||||
config :message_format, :validate => :string
|
||||
config :message_format, :validate => :string, :deprecated => true
|
||||
|
||||
class Client
|
||||
public
|
||||
|
@ -52,7 +54,7 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
|
|||
@socket.write(@queue.pop)
|
||||
rescue => e
|
||||
@logger.warn("tcp output exception", :socket => @socket,
|
||||
:exception => e, :backtrace => e.backtrace)
|
||||
:exception => e)
|
||||
break
|
||||
end
|
||||
end
|
||||
|
@ -81,8 +83,27 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
|
|||
@client_threads << client_thread
|
||||
end
|
||||
end
|
||||
|
||||
@codec.on_event do |payload|
|
||||
@client_threads.each do |client_thread|
|
||||
client_thread[:client].write(payload)
|
||||
end
|
||||
@client_threads.reject! {|t| !t.alive? }
|
||||
end
|
||||
else
|
||||
@client_socket = nil
|
||||
@codec.on_event do |payload|
|
||||
begin
|
||||
connect unless @client_socket
|
||||
@client_socket.write(payload)
|
||||
rescue => e
|
||||
@logger.warn("tcp output exception", :host => @host, :port => @port,
|
||||
:exception => e, :backtrace => e.backtrace)
|
||||
sleep @reconnect_interval
|
||||
@client_socket = nil
|
||||
retry
|
||||
end
|
||||
end
|
||||
end
|
||||
end # def register
|
||||
|
||||
|
@ -100,29 +121,12 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
|
|||
def receive(event)
|
||||
return unless output?(event)
|
||||
|
||||
if @message_format
|
||||
output = event.sprintf(@message_format) + "\n"
|
||||
else
|
||||
output = event.to_hash.to_json + "\n"
|
||||
end
|
||||
|
||||
if server?
|
||||
@client_threads.each do |client_thread|
|
||||
client_thread[:client].write(output)
|
||||
end
|
||||
|
||||
@client_threads.reject! {|t| !t.alive? }
|
||||
else
|
||||
begin
|
||||
connect unless @client_socket
|
||||
@client_socket.write(output)
|
||||
rescue => e
|
||||
@logger.warn("tcp output exception", :host => @host, :port => @port,
|
||||
:exception => e, :backtrace => e.backtrace)
|
||||
sleep @reconnect_interval
|
||||
@client_socket = nil
|
||||
retry
|
||||
end
|
||||
end
|
||||
#if @message_format
|
||||
#output = event.sprintf(@message_format) + "\n"
|
||||
#else
|
||||
#output = event.to_hash.to_json + "\n"
|
||||
#end
|
||||
|
||||
@codec.encode(event)
|
||||
end # def receive
|
||||
end # class LogStash::Outputs::Tcp
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue