mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- Add internal inputs for ruby-driven agents. Also needed for testing.
This commit is contained in:
parent
daefbe986d
commit
9aa551be74
2 changed files with 66 additions and 0 deletions
36
lib/logstash/inputs/internal.rb
Normal file
36
lib/logstash/inputs/internal.rb
Normal file
|
@ -0,0 +1,36 @@
|
|||
|
||||
require "logstash/inputs/base"
|
||||
require "eventmachine-tail"
|
||||
require "socket" # for Socket.gethostname
|
||||
|
||||
class LogStash::Inputs::Internal < LogStash::Inputs::Base
|
||||
attr_reader :channel
|
||||
|
||||
def initialize(url, type, config={}, &block)
|
||||
super
|
||||
|
||||
# Default host to the machine's hostname if it's not set
|
||||
@url.host ||= Socket.gethostname
|
||||
@channel = EventMachine::Channel.new
|
||||
end
|
||||
|
||||
def register
|
||||
@logger.info("Registering #{@url}")
|
||||
@channel.subscribe do |event|
|
||||
receive(event)
|
||||
end
|
||||
end # def register
|
||||
|
||||
def receive(event)
|
||||
if !event.is_a?(LogStash::Event)
|
||||
event = LogStash::Event.new({
|
||||
"@message" => event,
|
||||
"@type" => @type,
|
||||
"@tags" => @tags.clone,
|
||||
"@source" => @url,
|
||||
})
|
||||
end
|
||||
@logger.debug(["Got event", event])
|
||||
@callback.call(event)
|
||||
end # def receive
|
||||
end # class LogStash::Inputs::Internal
|
30
lib/logstash/outputs/internal.rb
Normal file
30
lib/logstash/outputs/internal.rb
Normal file
|
@ -0,0 +1,30 @@
|
|||
require "logstash/outputs/base"
|
||||
|
||||
class LogStash::Outputs::Internal < LogStash::Outputs::Base
|
||||
def initialize(url, config={}, &block)
|
||||
super
|
||||
@callback = block
|
||||
end
|
||||
|
||||
def register
|
||||
# nothing to do
|
||||
end # def register
|
||||
|
||||
def receive(event)
|
||||
if !@callback
|
||||
@logger.error("No callback for output #{@url}, cannot receive")
|
||||
return
|
||||
end
|
||||
@callback.call(event)
|
||||
end # def event
|
||||
|
||||
# Set the callback by passing a block of code
|
||||
def callback(&block)
|
||||
@callback = block
|
||||
end
|
||||
|
||||
# Set the callback by passing a proc object
|
||||
def callback=(proc_block)
|
||||
@callback = proc_block
|
||||
end
|
||||
end # class LogStash::Outputs::Internal
|
Loading…
Add table
Add a link
Reference in a new issue