From ed95a7a532f092950f8d36ea7263e754b5d2201c Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Tue, 2 Jul 2013 10:27:27 -0700 Subject: [PATCH] - make the plain codec line-terminator aware --- lib/logstash/codecs/plain.rb | 18 +++++++++++++----- lib/logstash/inputs/stdin.rb | 11 ++++++----- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/lib/logstash/codecs/plain.rb b/lib/logstash/codecs/plain.rb index ce9a17eff..6c81770a4 100644 --- a/lib/logstash/codecs/plain.rb +++ b/lib/logstash/codecs/plain.rb @@ -16,15 +16,23 @@ class LogStash::Codecs::Plain < LogStash::Codecs::Base # # This only affects "plain" format logs since json is UTF-8 already. config :charset, :validate => ::Encoding.name_list, :default => "UTF-8" + + public + def register + require "logstash/util/buftok" + @buffer = FileWatch::BufferedTokenizer.new + end public def decode(data) - data.force_encoding(@charset) - if @charset != "UTF-8" - # Convert to UTF-8 if not in that character set. - data = data.encode("UTF-8", :invalid => :replace, :undef => :replace) + @buffer.extract(data).each do |line| + line.force_encoding(@charset) + if @charset != "UTF-8" + # Convert to UTF-8 if not in that character set. + line = line.encode("UTF-8", :invalid => :replace, :undef => :replace) + end + yield LogStash::Event.new({"message" => line}) end - yield LogStash::Event.new({"message" => data}) end # def decode public diff --git a/lib/logstash/inputs/stdin.rb b/lib/logstash/inputs/stdin.rb index 7e2a5c883..7f7ff960c 100644 --- a/lib/logstash/inputs/stdin.rb +++ b/lib/logstash/inputs/stdin.rb @@ -16,18 +16,19 @@ class LogStash::Inputs::Stdin < LogStash::Inputs::Base end # def register def run(queue) - require "ap" while true begin - line = $stdin.readline.chomp - @codec.decode(line) do |event| + # Based on some testing, there is no way to interrupt an IO.sysread nor + # IO.select call in JRuby. Bummer :( + data = $stdin.sysread(16384) + @codec.decode(data) do |event| event["source"] = @host event["type"] = @type if @type @tags && @tags.each { |t| event.tag(t) } queue << event end - rescue EOFError => ex - # stdin closed, finish + rescue EOFError, LogStash::ShutdownSignal + # stdin closed or a requested shutdown break end end # while true