Add cgroup information to the api

When logstash is run under a linux container we will gather statistic about the cgroup and the
cpu usage. This information will should in the /_node/stats api and the result will look like this:

```
  "os" : {
    "cgroup" : {
      "cpuacct" : {
        "usage" : 789470280230,
        "control_group" : "/user.slice/user-1000.slice"
      },
      "cpu" : {
        "cfs_quota_micros" : -1,
        "control_group" : "/user.slice/user-1000.slice",
        "stat" : {
          "number_of_times_throttled" : 0,
          "time_throttled_nanos" : 0,
          "number_of_periods" : 0
        },
        "cfs_period_micros" : 100000
      }
    }
  }
```

Fixes: #6252

Fixes #6357
This commit is contained in:
Pier-Hugues Pellerin 2016-12-01 15:05:16 -05:00
parent 8dfefad58a
commit 9c8bf203e2
9 changed files with 410 additions and 5 deletions

View file

@ -65,6 +65,14 @@ module LogStash
}
end
def os
service.get_shallow(:os)
rescue
# The only currently fetch OS information is about the linux
# containers.
{}
end
def gc
service.get_shallow(:jvm, :gc)
end

View file

@ -13,12 +13,16 @@ module LogStash
:jvm => jvm_payload,
:process => process_payload,
:pipeline => pipeline_payload,
:reloads => reloads
:reloads => reloads,
:os => os_payload
}
respond_with(payload, {:filter => params["filter"]})
end
private
def os_payload
@stats.os
end
def events_payload
@stats.events

View file

@ -30,8 +30,9 @@ module LogStash
:jvm => {
:timestamp => stats_command.started_at,
:uptime_in_millis => stats_command.uptime,
:memory => stats_command.memory
}
:memory => stats_command.memory,
},
:os => stats_command.os
}
respond_with(payload, {:filter => params["filter"]})
end

View file

@ -12,6 +12,8 @@ module LogStash module Instrument module PeriodicPoller
:polling_timeout => 120
}
attr_reader :metric
public
def initialize(metric, options = {})
@metric = metric

View file

@ -0,0 +1,137 @@
# encoding: utf-8
require "pathname"
require "logstash/util/loggable"
# Logic from elasticsearch/core/src/main/java/org/elasticsearch/monitor/os/OsProbe.java
# Move to ruby to remove any existing dependency
module LogStash module Instrument module PeriodicPoller
class Cgroup
include LogStash::Util::Loggable
CONTROL_GROUP_RE = Regexp.compile("\\d+:([^:,]+(?:,[^:,]+)?):(/.*)");
CONTROLLER_SEPERATOR_RE = ","
PROC_SELF_CGROUP_FILE = Pathname.new("/proc/self/cgroup")
PROC_CGROUP_CPU_DIR = Pathname.new("/sys/fs/cgroup/cpu")
PROC_CGROUP_CPUACCT_DIR = Pathname.new("/sys/fs/cgroup/cpuacct")
GROUP_CPUACCT = "cpuacct"
CPUACCT_USAGE_FILE = "cpuacct.usage"
GROUP_CPU = "cpu"
CPU_FS_PERIOD_US_FILE = "cpu.cfs_period_us"
CPU_FS_QUOTA_US_FILE = "cpu.cfs_quota_us"
CPU_STATS_FILE = "cpu.stat"
class << self
def are_cgroup_available?
[::File.exist?(PROC_SELF_CGROUP_FILE),
Dir.exist?(PROC_CGROUP_CPU_DIR),
Dir.exist?(PROC_CGROUP_CPUACCT_DIR)].all?
end
def control_groups
response = {}
read_proc_self_cgroup_lines.each do |line|
matches = CONTROL_GROUP_RE.match(line)
# multiples controlles, same hierachy
controllers = matches[1].split(CONTROLLER_SEPERATOR_RE)
controllers.each_with_object(response) { |controller| response[controller] = matches[2] }
end
response
end
def read_first_line(path)
IO.readlines(path).first
end
def cgroup_cpuacct_usage_nanos(control_group)
read_first_line(::File.join(PROC_CGROUP_CPUACCT_DIR, control_group, CPUACCT_USAGE_FILE)).to_i
end
def cgroup_cpu_fs_period_micros(control_group)
read_first_line(::File.join(PROC_CGROUP_CPUACCT_DIR, control_group, CPU_FS_PERIOD_US_FILE)).to_i
end
def cgroup_cpu_fs_quota_micros(control_group)
read_first_line(::File.join(PROC_CGROUP_CPUACCT_DIR, control_group, CPU_FS_QUOTA_US_FILE)).to_i
end
def read_proc_self_cgroup_lines
IO.readlines(PROC_SELF_CGROUP_FILE)
end
class CpuStats
attr_reader :number_of_periods, :number_of_times_throttled, :time_throttled_nanos
def initialize(number_of_periods, number_of_times_throttled, time_throttled_nanos)
@number_of_periods = number_of_periods
@number_of_times_throttled = number_of_times_throttled
@time_throttled_nanos = time_throttled_nanos
end
end
def read_sys_fs_cgroup_cpuacct_cpu_stat(control_group)
IO.readlines(::File.join(PROC_CGROUP_CPU_DIR, control_group, CPU_STATS_FILE))
end
def cgroup_cpuacct_cpu_stat(control_group)
lines = read_sys_fs_cgroup_cpuacct_cpu_stat(control_group);
number_of_periods = -1;
number_of_times_throttled = -1;
time_throttled_nanos = -1;
lines.each do |line|
fields = line.split(/\s+/)
case fields.first
when "nr_periods" then number_of_periods = fields[1].to_i
when "nr_throttled" then number_of_times_throttled= fields[1].to_i
when "throttled_time" then time_throttled_nanos = fields[1].to_i
end
end
CpuStats.new(number_of_periods, number_of_times_throttled, time_throttled_nanos)
end
def get_all
groups = control_groups
return if groups.empty?
cgroups_stats = {
:cpuacct => {},
:cpu => {}
}
cpuacct_group = groups[GROUP_CPUACCT]
cgroups_stats[:cpuacct][:control_group] = cpuacct_group
cgroups_stats[:cpuacct][:usage] = cgroup_cpuacct_usage_nanos(cpuacct_group)
cpu_group = groups[GROUP_CPU]
cgroups_stats[:cpu][:control_group] = cpu_group
cgroups_stats[:cpu][:cfs_period_micros] = cgroup_cpu_fs_period_micros(cpu_group)
cgroups_stats[:cpu][:cfs_quota_micros] = cgroup_cpu_fs_quota_micros(cpu_group)
cpu_stats = cgroup_cpuacct_cpu_stat(cpu_group)
cgroups_stats[:cpu][:stat] = {
:number_of_periods => cpu_stats.number_of_periods,
:number_of_times_throttled => cpu_stats.number_of_times_throttled,
:time_throttled_nanos => cpu_stats.time_throttled_nanos
}
cgroups_stats
rescue => e
logger.debug("Error, cannot retrieve cgroups information", :exception => e.class.name, :message => e.message) if logger.debug?
nil
end
def get
are_cgroup_available? ? get_all : nil
end
end
end
end end end

View file

@ -39,7 +39,6 @@ module LogStash module Instrument module PeriodicPoller
def initialize(metric, options = {})
super(metric, options)
@metric = metric
@load_average = LoadAverage.create
end
@ -114,7 +113,7 @@ module LogStash module Instrument module PeriodicPoller
metric.gauge([:jvm, :process, :cpu], :load_average, load_average) unless load_average.nil?
end
def collect_jvm_metrics(data)
runtime_mx_bean = ManagementFactory.getRuntimeMXBean()
metric.gauge([:jvm], :uptime_in_millis, runtime_mx_bean.getUptime())

View file

@ -1,5 +1,6 @@
# encoding: utf-8
require "logstash/instrument/periodic_poller/base"
require "logstash/instrument/periodic_poller/cgroup"
module LogStash module Instrument module PeriodicPoller
class Os < Base
@ -8,6 +9,26 @@ module LogStash module Instrument module PeriodicPoller
end
def collect
collect_cgroup
end
def collect_cgroup
if stats = Cgroup.get
save_metric([:os], :cgroup, stats)
end
end
# Recursive function to create the Cgroups values form the created hash
def save_metric(namespace, k, v)
if v.is_a?(Hash)
v.each do |new_key, new_value|
n = namespace.dup
n << k.to_sym
save_metric(n, new_key, new_value)
end
else
metric.gauge(namespace, k.to_sym, v)
end
end
end
end; end; end

View file

@ -0,0 +1,148 @@
# encoding: utf-8
require "logstash/instrument/periodic_poller/cgroup"
require "spec_helper"
describe LogStash::Instrument::PeriodicPoller::Cgroup do
subject { described_class }
context ".are_cgroup_available?" do
context "all the file exist" do
before do
allow(::File).to receive(:exist?).with(subject::PROC_SELF_CGROUP_FILE).and_return(true)
allow(::Dir).to receive(:exist?).with(subject::PROC_CGROUP_CPU_DIR).and_return(true)
allow(::Dir).to receive(:exist?).with(subject::PROC_CGROUP_CPUACCT_DIR).and_return(true)
end
it "returns true" do
expect(subject.are_cgroup_available?).to be_truthy
end
end
context "not all the file exist" do
before do
allow(::File).to receive(:exist?).with(subject::PROC_SELF_CGROUP_FILE).and_return(true)
allow(::Dir).to receive(:exist?).with(subject::PROC_CGROUP_CPU_DIR).and_return(false)
allow(::Dir).to receive(:exist?).with(subject::PROC_CGROUP_CPUACCT_DIR).and_return(true)
end
it "returns false" do
expect(subject.are_cgroup_available?).to be_falsey
end
end
end
context ".control_groups" do
let(:proc_self_cgroup_content) {
%w(14:name=systemd,holaunlimited:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
13:pids:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
12:hugetlb:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
11:net_prio:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
10:perf_event:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
9:net_cls:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
8:freezer:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
7:devices:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
6:memory:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
5:blkio:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
4:cpuacct:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
3:cpu:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
2:cpuset:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
1:name=openrc:/docker) }
before do
allow(subject).to receive(:read_proc_self_cgroup_lines).and_return(proc_self_cgroup_content)
end
it "returns the control groups" do
expect(subject.control_groups).to match({
"name=systemd" => "/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61",
"holaunlimited" => "/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61",
"pids" => "/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61",
"hugetlb" => "/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61",
"net_prio" => "/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61",
"perf_event" => "/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61",
"net_cls" => "/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61",
"freezer" => "/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61",
"devices" => "/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61",
"memory" => "/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61",
"blkio" => "/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61",
"cpuacct" => "/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61",
"cpu" => "/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61",
"cpuset" => "/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61",
"name=openrc" => "/docker"
})
end
end
context ".get_all" do
context "when we can retreive the stats" do
let(:cpuacct_control_group) { "/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61" }
let(:cpuacct_usage) { 1982 }
let(:cpu_control_group) { "/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61" }
let(:cfs_period_micros) { 500 }
let(:cfs_quota_micros) { 98 }
let(:cpu_stats_number_of_periods) { 1 }
let(:cpu_stats_number_of_time_throttled) { 2 }
let(:cpu_stats_time_throttled_nanos) { 3 }
let(:proc_self_cgroup_content) {
%W(14:name=systemd,holaunlimited:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
13:pids:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
12:hugetlb:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
11:net_prio:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
10:perf_event:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
9:net_cls:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
8:freezer:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
7:devices:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
6:memory:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
5:blkio:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
4:cpuacct:#{cpuacct_control_group}
3:cpu:#{cpu_control_group}
2:cpuset:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
1:name=openrc:/docker) }
let(:cpu_stat_file_content) {
[
"nr_periods #{cpu_stats_number_of_periods}",
"nr_throttled #{cpu_stats_number_of_time_throttled}",
"throttled_time #{cpu_stats_time_throttled_nanos}"
]
}
before do
allow(subject).to receive(:read_proc_self_cgroup_lines).and_return(proc_self_cgroup_content)
allow(subject).to receive(:read_sys_fs_cgroup_cpuacct_cpu_stat).and_return(cpu_stat_file_content)
allow(subject).to receive(:cgroup_cpuacct_usage_nanos).with(cpuacct_control_group).and_return(cpuacct_usage)
allow(subject).to receive(:cgroup_cpu_fs_period_micros).with(cpu_control_group).and_return(cfs_period_micros)
allow(subject).to receive(:cgroup_cpu_fs_quota_micros).with(cpu_control_group).and_return(cfs_quota_micros)
end
it "returns all the stats" do
expect(subject.get_all).to match(
:cpuacct => {
:control_group => cpuacct_control_group,
:usage => cpuacct_usage,
},
:cpu => {
:control_group => cpu_control_group,
:cfs_period_micros => cfs_period_micros,
:cfs_quota_micros => cfs_quota_micros,
:stat => {
:number_of_periods => cpu_stats_number_of_periods,
:number_of_times_throttled => cpu_stats_number_of_time_throttled,
:time_throttled_nanos => cpu_stats_time_throttled_nanos
}
}
)
end
end
context "when an exception is raised" do
before do
allow(subject).to receive(:control_groups).and_raise("Something went wrong")
end
it "returns nil" do
expect(subject.get_all).to be_nil
end
end
end
end

View file

@ -0,0 +1,85 @@
# encoding: utf-8
require "logstash/instrument/periodic_poller/os"
require "logstash/instrument/metric"
require "logstash/instrument/collector"
describe LogStash::Instrument::PeriodicPoller::Os do
let(:metric) { LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new) }
context "recorded cgroup metrics (mocked cgroup env)" do
subject { described_class.new(metric, {})}
let(:snapshot_store) { metric.collector.snapshot_metric.metric_store }
let(:os_metrics) { snapshot_store.get_shallow(:os) }
let(:cpuacct_control_group) { "/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61" }
let(:cpuacct_usage) { 1982 }
let(:cpu_control_group) { "/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61" }
let(:cpu_period_micros) { 500 }
let(:cpu_quota_micros) { 98 }
let(:cpu_stats_number_of_periods) { 1 }
let(:cpu_stats_number_of_time_throttled) { 2 }
let(:cpu_stats_time_throttled_nanos) { 3 }
let(:proc_self_cgroup_content) {
%W(14:name=systemd,holaunlimited:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
13:pids:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
12:hugetlb:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
11:net_prio:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
10:perf_event:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
9:net_cls:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
8:freezer:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
7:devices:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
6:memory:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
5:blkio:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
4:cpuacct:#{cpuacct_control_group}
3:cpu:#{cpu_control_group}
2:cpuset:/docker/a10687343f90e97bbb1f7181bd065a42de96c40c4aa91764a9d526ea30475f61
1:name=openrc:/docker) }
let(:cpu_stat_file_content) {
[
"nr_periods #{cpu_stats_number_of_periods}",
"nr_throttled #{cpu_stats_number_of_time_throttled}",
"throttled_time #{cpu_stats_time_throttled_nanos}"
]
}
before do
allow(LogStash::Instrument::PeriodicPoller::Cgroup).to receive(:are_cgroup_available?).and_return(true)
allow(LogStash::Instrument::PeriodicPoller::Cgroup).to receive(:read_proc_self_cgroup_lines).and_return(proc_self_cgroup_content)
allow(LogStash::Instrument::PeriodicPoller::Cgroup).to receive(:read_sys_fs_cgroup_cpuacct_cpu_stat).and_return(cpu_stat_file_content)
allow(LogStash::Instrument::PeriodicPoller::Cgroup).to receive(:cgroup_cpuacct_usage_nanos).with(cpuacct_control_group).and_return(cpuacct_usage)
allow(LogStash::Instrument::PeriodicPoller::Cgroup).to receive(:cgroup_cpu_fs_period_micros).with(cpu_control_group).and_return(cpu_period_micros)
allow(LogStash::Instrument::PeriodicPoller::Cgroup).to receive(:cgroup_cpu_fs_quota_micros).with(cpu_control_group).and_return(cpu_quota_micros)
subject.collect
end
def mval(*metric_path)
metric_path.reduce(os_metrics) {|acc,k| acc[k]}.value
end
it "should have a value for #{[:cgroup, :cpuacc, :control_group]} that is a String" do
expect(mval(:cgroup, :cpuacct, :control_group)).to be_a(String)
end
it "should have a value for #{[:cgroup, :cpu, :control_group]} that is a String" do
expect(mval(:cgroup, :cpu, :control_group)).to be_a(String)
end
[
[:cgroup, :cpuacct, :usage],
[:cgroup, :cpu, :cfs_period_micros],
[:cgroup, :cpu, :cfs_quota_micros],
[:cgroup, :cpu, :stat, :number_of_periods],
[:cgroup, :cpu, :stat, :number_of_times_throttled],
[:cgroup, :cpu, :stat, :time_throttled_nanos]
].each do |path|
path = Array(path)
it "should have a value for #{path} that is Numeric" do
expect(mval(*path)).to be_a(Numeric)
end
end
end
end