mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
Theses core input are not currently used inside logstash so it make sense to remove them
Fixes #6268
This commit is contained in:
parent
26ab1f5474
commit
cd84576584
2 changed files with 0 additions and 98 deletions
|
@ -1,47 +0,0 @@
|
|||
# encoding: utf-8
|
||||
require "logstash/event"
|
||||
require "logstash/inputs/base"
|
||||
require "logstash/instrument/collector"
|
||||
|
||||
module LogStash module Inputs
|
||||
# The Metrics inputs is responable of registring itself to the collector.
|
||||
# The collector class will periodically emits new snapshot of the system,
|
||||
# The metrics need to take that information and transform it into
|
||||
# a `Logstash::Event`, which can be consumed by the shipper and send to
|
||||
# Elasticsearch
|
||||
class Metrics < LogStash::Inputs::Base
|
||||
config_name "metrics"
|
||||
milestone 3
|
||||
|
||||
def register
|
||||
end
|
||||
|
||||
def run(queue)
|
||||
@logger.debug("Metric: input started")
|
||||
@queue = queue
|
||||
|
||||
# we register to the collector after receiving the pipeline queue
|
||||
metric.collector.add_observer(self)
|
||||
|
||||
# Keep this plugin thread alive,
|
||||
# until we shutdown the metric pipeline
|
||||
sleep(1) while !stop?
|
||||
end
|
||||
|
||||
def stop
|
||||
@logger.debug("Metrics input: stopped")
|
||||
metric.collector.delete_observer(self)
|
||||
end
|
||||
|
||||
def update(snapshot)
|
||||
@logger.debug("Metrics input: received a new snapshot", :created_at => snapshot.created_at, :snapshot => snapshot, :event => snapshot.metric_store.to_event) if @logger.debug?
|
||||
|
||||
# The back pressure is handled in the collector's
|
||||
# scheduled task (running into his own thread) if something append to one of the listener it will
|
||||
# will timeout. In a sane pipeline, with a low traffic of events it shouldn't be a problems.
|
||||
snapshot.metric_store.each do |metric|
|
||||
@queue << LogStash::Event.new({ "@timestamp" => snapshot.created_at }.merge(metric.to_hash))
|
||||
end
|
||||
end
|
||||
end
|
||||
end;end
|
|
@ -1,51 +0,0 @@
|
|||
# encoding: utf-8
|
||||
require "logstash/inputs/metrics"
|
||||
require "spec_helper"
|
||||
|
||||
describe LogStash::Inputs::Metrics do
|
||||
let(:collector) { LogStash::Instrument::Collector.new }
|
||||
let(:metric) { LogStash::Instrument::Metric.new(collector) }
|
||||
let(:queue) { [] }
|
||||
|
||||
before :each do
|
||||
subject.metric = metric
|
||||
end
|
||||
|
||||
describe "#run" do
|
||||
it "should register itself to the collector observer" do
|
||||
expect(collector).to receive(:add_observer).with(subject)
|
||||
t = Thread.new { subject.run(queue) }
|
||||
sleep(0.1) # give a bit of time to the thread to start
|
||||
subject.stop
|
||||
end
|
||||
end
|
||||
|
||||
describe "#update" do
|
||||
it "should fill up the queue with received events" do
|
||||
Thread.new { subject.run(queue) }
|
||||
sleep(0.1)
|
||||
subject.stop
|
||||
|
||||
metric.increment([:root, :test], :plugin)
|
||||
|
||||
subject.update(collector.snapshot_metric)
|
||||
expect(queue.count).to eq(1)
|
||||
end
|
||||
end
|
||||
|
||||
describe "#stop" do
|
||||
it "should remove itself from the the collector observer" do
|
||||
expect(collector).to receive(:delete_observer).with(subject)
|
||||
t = Thread.new { subject.run(queue) }
|
||||
sleep(0.1) # give a bit of time to the thread to start
|
||||
subject.stop
|
||||
end
|
||||
|
||||
it "should unblock the input" do
|
||||
t = Thread.new { subject.run(queue) }
|
||||
sleep(0.1) # give a bit of time to the thread to start
|
||||
subject.do_stop
|
||||
wait_for { t.status }.to be_falsey
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Add table
Add a link
Reference in a new issue