mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Refactored file input to use the addressable gem.
Ruby's std-lib URI parsing leaves something to be desired, so using addressable. Also added a test to make sure that the file input creates a correct source and refactored the file input tests to timeout after 5 seconds.
This commit is contained in:
parent
292035ef6f
commit
8f58bd9357
2 changed files with 51 additions and 3 deletions
|
@ -1,7 +1,9 @@
|
||||||
require "logstash/inputs/base"
|
require "logstash/inputs/base"
|
||||||
require "logstash/namespace"
|
require "logstash/namespace"
|
||||||
|
|
||||||
require "socket" # for Socket.gethostname
|
require "socket" # for Socket.gethostname
|
||||||
require "uri"
|
|
||||||
|
require "addressable/uri"
|
||||||
|
|
||||||
# Stream events from files.
|
# Stream events from files.
|
||||||
#
|
#
|
||||||
|
@ -68,7 +70,7 @@ class LogStash::Inputs::File < LogStash::Inputs::Base
|
||||||
hostname = Socket.gethostname
|
hostname = Socket.gethostname
|
||||||
|
|
||||||
tail.subscribe do |path, line|
|
tail.subscribe do |path, line|
|
||||||
source = URI::Generic.new("file", nil, hostname, nil, nil, path, nil, nil, nil).to_s
|
source = Addressable::URI.new(:scheme => "file", :host => hostname, :path => path).to_s
|
||||||
@logger.debug("Received line", :path => path, :line => line)
|
@logger.debug("Received line", :path => path, :line => line)
|
||||||
e = to_event(line, source)
|
e = to_event(line, source)
|
||||||
if e
|
if e
|
||||||
|
|
|
@ -9,7 +9,51 @@ require "logstash/inputs/file"
|
||||||
|
|
||||||
require "tempfile"
|
require "tempfile"
|
||||||
|
|
||||||
|
require "mocha"
|
||||||
|
|
||||||
describe LogStash::Inputs::File do
|
describe LogStash::Inputs::File do
|
||||||
|
|
||||||
|
test "file input sets source properly for events" do
|
||||||
|
stubbed_hostname = "mystubbedhostname"
|
||||||
|
Socket.stubs(:gethostname).returns(stubbed_hostname)
|
||||||
|
|
||||||
|
logfile = Tempfile.new("logstash")
|
||||||
|
begin
|
||||||
|
@input = LogStash::Inputs::File.new("type" => ["testing"], "path" => [logfile.path])
|
||||||
|
@input.register
|
||||||
|
|
||||||
|
queue = Queue.new
|
||||||
|
|
||||||
|
Thread.new { @input.run(queue) }
|
||||||
|
|
||||||
|
event = nil
|
||||||
|
start = Time.now
|
||||||
|
while event.nil? and (Time.now - start) <= 5.0
|
||||||
|
logfile.write("This is my log message.\n")
|
||||||
|
logfile.flush
|
||||||
|
|
||||||
|
begin
|
||||||
|
event = queue.pop(true)
|
||||||
|
rescue ThreadError => error
|
||||||
|
raise error unless error.to_s == "queue empty"
|
||||||
|
sleep(0.05)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@input.teardown
|
||||||
|
|
||||||
|
refute_nil event
|
||||||
|
|
||||||
|
uri = Addressable::URI.parse(event.source)
|
||||||
|
assert_equal "file", uri.scheme
|
||||||
|
assert_equal stubbed_hostname, uri.host
|
||||||
|
assert_equal logfile.path, uri.path
|
||||||
|
ensure
|
||||||
|
logfile.close
|
||||||
|
logfile.unlink
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
test "file input sets source_path properly for events" do
|
test "file input sets source_path properly for events" do
|
||||||
logfile = Tempfile.new("logstash")
|
logfile = Tempfile.new("logstash")
|
||||||
begin
|
begin
|
||||||
|
@ -21,7 +65,8 @@ describe LogStash::Inputs::File do
|
||||||
Thread.new { @input.run(queue) }
|
Thread.new { @input.run(queue) }
|
||||||
|
|
||||||
event = nil
|
event = nil
|
||||||
while event.nil?
|
start = Time.now
|
||||||
|
while event.nil? and (Time.now - start) <= 5.0
|
||||||
logfile.write("This is my log message.\n")
|
logfile.write("This is my log message.\n")
|
||||||
logfile.flush
|
logfile.flush
|
||||||
|
|
||||||
|
@ -41,4 +86,5 @@ describe LogStash::Inputs::File do
|
||||||
logfile.unlink
|
logfile.unlink
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
end # testing for LogStash::Inputs::File
|
end # testing for LogStash::Inputs::File
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue