Systemd refactor (#1254)

This reduces the system metric collection time by using a wait group
and go routines to allow the systemd metric calls happen concurrently.

Also, makes the start time, restarts, tasks_max, and tasks_current metrics disabled by default
because these can be time consuming to gather.

Signed-off-by: Paul Gier <pgier@redhat.com>
This commit is contained in:
Paul Gier 2019-02-11 16:27:21 -06:00 committed by Ben Kochie
parent 1ba436e194
commit cb9e23c536
3 changed files with 181 additions and 195 deletions

View File

@ -7,6 +7,8 @@
* The cpufreq metrics now separate the `cpufreq` and `scaling` data based on what the driver provides. #1248 * The cpufreq metrics now separate the `cpufreq` and `scaling` data based on what the driver provides. #1248
* The labels for the network_up metric have changed, see issue #1236 * The labels for the network_up metric have changed, see issue #1236
* Bonding collector now uses `mii_status` instead of `operstatus` #1124 * Bonding collector now uses `mii_status` instead of `operstatus` #1124
* Several systemd metrics have been turned off by default to improve performance #1254
These include unit_tasks_current, unit_tasks_max, service_restart_total, and unit_start_time_seconds
### Changes ### Changes
@ -16,6 +18,7 @@
* [CHANGE] Add TCPSynRetrans to netstat default filter #1143 * [CHANGE] Add TCPSynRetrans to netstat default filter #1143
* [CHANGE] Add a limit to the number of in-flight requests #1166 * [CHANGE] Add a limit to the number of in-flight requests #1166
* [CHANGE] Add separate cpufreq and scaling metrics #1248 * [CHANGE] Add separate cpufreq and scaling metrics #1248
* [CHANGE] Several systemd metrics have been turned off by default to improve performance #1254
* [ENHANCEMENT] Add Infiniband counters #1120 * [ENHANCEMENT] Add Infiniband counters #1120
* [ENHANCEMENT] Move network_up labels into new metric network_info #1236 * [ENHANCEMENT] Move network_up labels into new metric network_info #1236
* [FEATURE] Add a flag to disable exporter metrics #1148 * [FEATURE] Add a flag to disable exporter metrics #1148

View File

@ -20,6 +20,8 @@ import (
"math" "math"
"regexp" "regexp"
"strings" "strings"
"sync"
"time"
"github.com/coreos/go-systemd/dbus" "github.com/coreos/go-systemd/dbus"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -31,6 +33,9 @@ var (
unitWhitelist = kingpin.Flag("collector.systemd.unit-whitelist", "Regexp of systemd units to whitelist. Units must both match whitelist and not match blacklist to be included.").Default(".+").String() unitWhitelist = kingpin.Flag("collector.systemd.unit-whitelist", "Regexp of systemd units to whitelist. Units must both match whitelist and not match blacklist to be included.").Default(".+").String()
unitBlacklist = kingpin.Flag("collector.systemd.unit-blacklist", "Regexp of systemd units to blacklist. Units must both match whitelist and not match blacklist to be included.").Default(".+\\.scope").String() unitBlacklist = kingpin.Flag("collector.systemd.unit-blacklist", "Regexp of systemd units to blacklist. Units must both match whitelist and not match blacklist to be included.").Default(".+\\.scope").String()
systemdPrivate = kingpin.Flag("collector.systemd.private", "Establish a private, direct connection to systemd without dbus.").Bool() systemdPrivate = kingpin.Flag("collector.systemd.private", "Establish a private, direct connection to systemd without dbus.").Bool()
enableTaskMetrics = kingpin.Flag("collector.systemd.enable-task-metrics", "Enables service unit tasks metrics unit_tasks_current and unit_tasks_max").Bool()
enableRestartsMetrics = kingpin.Flag("collector.systemd.enable-restarts-metrics", "Enables service unit metric service_restart_total").Bool()
enableStartTimeMetrics = kingpin.Flag("collector.systemd.enable-start-time-metrics", "Enables service unit metric unit_start_time_seconds").Bool()
) )
type systemdCollector struct { type systemdCollector struct {
@ -118,34 +123,102 @@ func NewSystemdCollector() (Collector, error) {
}, nil }, nil
} }
// Update gathers metrics from systemd. Dbus collection is done in parallel
// to reduce wait time for responses.
func (c *systemdCollector) Update(ch chan<- prometheus.Metric) error { func (c *systemdCollector) Update(ch chan<- prometheus.Metric) error {
allUnits, err := c.getAllUnits() begin := time.Now()
conn, err := c.newDbus()
if err != nil {
return fmt.Errorf("couldn't get dbus connection: %s", err)
}
defer conn.Close()
allUnits, err := c.getAllUnits(conn)
if err != nil { if err != nil {
return fmt.Errorf("couldn't get units: %s", err) return fmt.Errorf("couldn't get units: %s", err)
} }
log.Debugf("systemd getAllUnits took %f", time.Since(begin).Seconds())
begin = time.Now()
summary := summarizeUnits(allUnits) summary := summarizeUnits(allUnits)
c.collectSummaryMetrics(ch, summary) c.collectSummaryMetrics(ch, summary)
log.Debugf("systemd collectSummaryMetrics took %f", time.Since(begin).Seconds())
begin = time.Now()
units := filterUnits(allUnits, c.unitWhitelistPattern, c.unitBlacklistPattern) units := filterUnits(allUnits, c.unitWhitelistPattern, c.unitBlacklistPattern)
c.collectUnitStatusMetrics(ch, units) log.Debugf("systemd filterUnits took %f", time.Since(begin).Seconds())
c.collectUnitStartTimeMetrics(ch, units)
c.collectUnitTasksCurrentMetrics(ch, units)
c.collectUnitTasksMaxMetrics(ch, units)
c.collectTimers(ch, units)
c.collectSockets(ch, units)
systemState, err := c.getSystemState() var wg sync.WaitGroup
if err != nil { defer wg.Wait()
return fmt.Errorf("couldn't get system state: %s", err)
wg.Add(1)
go func() {
defer wg.Done()
begin = time.Now()
c.collectUnitStatusMetrics(conn, ch, units)
log.Debugf("systemd collectUnitStatusMetrics took %f", time.Since(begin).Seconds())
}()
if *enableStartTimeMetrics {
wg.Add(1)
go func() {
defer wg.Done()
begin = time.Now()
c.collectUnitStartTimeMetrics(conn, ch, units)
log.Debugf("systemd collectUnitStartTimeMetrics took %f", time.Since(begin).Seconds())
}()
} }
c.collectSystemState(ch, systemState)
return nil if *enableTaskMetrics {
wg.Add(1)
go func() {
defer wg.Done()
begin = time.Now()
c.collectUnitTasksMetrics(conn, ch, units)
log.Debugf("systemd collectUnitTasksMetrics took %f", time.Since(begin).Seconds())
}()
}
wg.Add(1)
go func() {
defer wg.Done()
begin = time.Now()
c.collectTimers(conn, ch, units)
log.Debugf("systemd collectTimers took %f", time.Since(begin).Seconds())
}()
wg.Add(1)
go func() {
defer wg.Done()
begin = time.Now()
c.collectSockets(conn, ch, units)
log.Debugf("systemd collectSockets took %f", time.Since(begin).Seconds())
}()
begin = time.Now()
err = c.collectSystemState(conn, ch)
log.Debugf("systemd collectSystemState took %f", time.Since(begin).Seconds())
return err
} }
func (c *systemdCollector) collectUnitStatusMetrics(ch chan<- prometheus.Metric, units []unit) { func (c *systemdCollector) collectUnitStatusMetrics(conn *dbus.Conn, ch chan<- prometheus.Metric, units []unit) {
for _, unit := range units { for _, unit := range units {
serviceType := ""
if strings.HasSuffix(unit.Name, ".service") {
serviceTypeProperty, err := conn.GetUnitTypeProperty(unit.Name, "Service", "Type")
if err != nil {
log.Debugf("couldn't get unit '%s' Type: %s", unit.Name, err)
} else {
serviceType = serviceTypeProperty.Value.Value().(string)
}
} else if strings.HasSuffix(unit.Name, ".mount") {
serviceTypeProperty, err := conn.GetUnitTypeProperty(unit.Name, "Mount", "Type")
if err != nil {
log.Debugf("couldn't get unit '%s' Type: %s", unit.Name, err)
} else {
serviceType = serviceTypeProperty.Value.Value().(string)
}
}
for _, stateName := range unitStatesName { for _, stateName := range unitStatesName {
isActive := 0.0 isActive := 0.0
if stateName == unit.ActiveState { if stateName == unit.ActiveState {
@ -153,73 +226,126 @@ func (c *systemdCollector) collectUnitStatusMetrics(ch chan<- prometheus.Metric,
} }
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.unitDesc, prometheus.GaugeValue, isActive, c.unitDesc, prometheus.GaugeValue, isActive,
unit.Name, stateName, unit.serviceType) unit.Name, stateName, serviceType)
} }
if strings.HasSuffix(unit.Name, ".service") && unit.nRestarts != nil { if *enableRestartsMetrics && strings.HasSuffix(unit.Name, ".service") {
// NRestarts wasn't added until systemd 235.
restartsCount, err := conn.GetUnitTypeProperty(unit.Name, "Service", "NRestarts")
if err != nil {
log.Debugf("couldn't get unit '%s' NRestarts: %s", unit.Name, err)
} else {
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.nRestartsDesc, prometheus.CounterValue, c.nRestartsDesc, prometheus.CounterValue,
float64(*unit.nRestarts), unit.Name) float64(restartsCount.Value.Value().(uint32)), unit.Name)
}
} }
} }
} }
func (c *systemdCollector) collectSockets(ch chan<- prometheus.Metric, units []unit) { func (c *systemdCollector) collectSockets(conn *dbus.Conn, ch chan<- prometheus.Metric, units []unit) {
for _, unit := range units { for _, unit := range units {
if !strings.HasSuffix(unit.Name, ".socket") { if !strings.HasSuffix(unit.Name, ".socket") {
continue continue
} }
acceptedConnectionCount, err := conn.GetUnitTypeProperty(unit.Name, "Socket", "NAccepted")
if err != nil {
log.Debugf("couldn't get unit '%s' NAccepted: %s", unit.Name, err)
continue
}
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.socketAcceptedConnectionsDesc, prometheus.CounterValue, c.socketAcceptedConnectionsDesc, prometheus.CounterValue,
float64(unit.acceptedConnections), unit.Name) float64(acceptedConnectionCount.Value.Value().(uint32)), unit.Name)
currentConnectionCount, err := conn.GetUnitTypeProperty(unit.Name, "Socket", "NConnections")
if err != nil {
log.Debugf("couldn't get unit '%s' NConnections: %s", unit.Name, err)
continue
}
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.socketCurrentConnectionsDesc, prometheus.GaugeValue, c.socketCurrentConnectionsDesc, prometheus.GaugeValue,
float64(unit.currentConnections), unit.Name) float64(currentConnectionCount.Value.Value().(uint32)), unit.Name)
if unit.refusedConnections != nil {
// NRefused wasn't added until systemd 239.
refusedConnectionCount, err := conn.GetUnitTypeProperty(unit.Name, "Socket", "NRefused")
if err != nil {
//log.Debugf("couldn't get unit '%s' NRefused: %s", unit.Name, err)
} else {
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.socketRefusedConnectionsDesc, prometheus.GaugeValue, c.socketRefusedConnectionsDesc, prometheus.GaugeValue,
float64(*unit.refusedConnections), unit.Name) float64(refusedConnectionCount.Value.Value().(uint32)), unit.Name)
} }
} }
} }
func (c *systemdCollector) collectUnitStartTimeMetrics(ch chan<- prometheus.Metric, units []unit) { func (c *systemdCollector) collectUnitStartTimeMetrics(conn *dbus.Conn, ch chan<- prometheus.Metric, units []unit) {
var startTimeUsec uint64
for _, unit := range units { for _, unit := range units {
if unit.ActiveState != "active" {
startTimeUsec = 0
} else {
timestampValue, err := conn.GetUnitProperty(unit.Name, "ActiveEnterTimestamp")
if err != nil {
log.Debugf("couldn't get unit '%s' StartTimeUsec: %s", unit.Name, err)
continue
}
startTimeUsec = timestampValue.Value.Value().(uint64)
}
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.unitStartTimeDesc, prometheus.GaugeValue, c.unitStartTimeDesc, prometheus.GaugeValue,
float64(unit.startTimeUsec)/1e6, unit.Name) float64(startTimeUsec)/1e6, unit.Name)
} }
} }
func (c *systemdCollector) collectUnitTasksCurrentMetrics(ch chan<- prometheus.Metric, units []unit) { func (c *systemdCollector) collectUnitTasksMetrics(conn *dbus.Conn, ch chan<- prometheus.Metric, units []unit) {
var val uint64
for _, unit := range units { for _, unit := range units {
if unit.tasksCurrent != nil { if strings.HasSuffix(unit.Name, ".service") {
tasksCurrentCount, err := conn.GetUnitTypeProperty(unit.Name, "Service", "TasksCurrent")
if err != nil {
log.Debugf("couldn't get unit '%s' TasksCurrent: %s", unit.Name, err)
} else {
val = tasksCurrentCount.Value.Value().(uint64)
// Don't set if tasksCurrent if dbus reports MaxUint64.
if val != math.MaxUint64 {
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.unitTasksCurrentDesc, prometheus.GaugeValue, c.unitTasksCurrentDesc, prometheus.GaugeValue,
float64(*unit.tasksCurrent), unit.Name) float64(val), unit.Name)
} }
} }
} tasksMaxCount, err := conn.GetUnitTypeProperty(unit.Name, "Service", "TasksMax")
if err != nil {
func (c *systemdCollector) collectUnitTasksMaxMetrics(ch chan<- prometheus.Metric, units []unit) { log.Debugf("couldn't get unit '%s' TasksMax: %s", unit.Name, err)
for _, unit := range units { } else {
if unit.tasksMax != nil { val = tasksMaxCount.Value.Value().(uint64)
// Don't set if tasksMax if dbus reports MaxUint64.
if val != math.MaxUint64 {
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.unitTasksMaxDesc, prometheus.GaugeValue, c.unitTasksMaxDesc, prometheus.GaugeValue,
float64(*unit.tasksMax), unit.Name) float64(val), unit.Name)
}
}
} }
} }
} }
func (c *systemdCollector) collectTimers(ch chan<- prometheus.Metric, units []unit) { func (c *systemdCollector) collectTimers(conn *dbus.Conn, ch chan<- prometheus.Metric, units []unit) {
for _, unit := range units { for _, unit := range units {
if !strings.HasSuffix(unit.Name, ".timer") { if !strings.HasSuffix(unit.Name, ".timer") {
continue continue
} }
lastTriggerValue, err := conn.GetUnitTypeProperty(unit.Name, "Timer", "LastTriggerUSec")
if err != nil {
log.Debugf("couldn't get unit '%s' LastTriggerUSec: %s", unit.Name, err)
continue
}
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.timerLastTriggerDesc, prometheus.GaugeValue, c.timerLastTriggerDesc, prometheus.GaugeValue,
float64(unit.lastTriggerUsec)/1e6, unit.Name) float64(lastTriggerValue.Value.Value().(uint64))/1e6, unit.Name)
} }
} }
@ -230,12 +356,17 @@ func (c *systemdCollector) collectSummaryMetrics(ch chan<- prometheus.Metric, su
} }
} }
func (c *systemdCollector) collectSystemState(ch chan<- prometheus.Metric, systemState string) { func (c *systemdCollector) collectSystemState(conn *dbus.Conn, ch chan<- prometheus.Metric) error {
systemState, err := conn.GetManagerProperty("SystemState")
if err != nil {
return fmt.Errorf("couldn't get system state: %s", err)
}
isSystemRunning := 0.0 isSystemRunning := 0.0
if systemState == `"running"` { if systemState == `"running"` {
isSystemRunning = 1.0 isSystemRunning = 1.0
} }
ch <- prometheus.MustNewConstMetric(c.systemRunningDesc, prometheus.GaugeValue, isSystemRunning) ch <- prometheus.MustNewConstMetric(c.systemRunningDesc, prometheus.GaugeValue, isSystemRunning)
return nil
} }
func (c *systemdCollector) newDbus() (*dbus.Conn, error) { func (c *systemdCollector) newDbus() (*dbus.Conn, error) {
@ -247,37 +378,10 @@ func (c *systemdCollector) newDbus() (*dbus.Conn, error) {
type unit struct { type unit struct {
dbus.UnitStatus dbus.UnitStatus
lastTriggerUsec uint64
startTimeUsec uint64
tasksCurrent *uint64
tasksMax *uint64
nRestarts *uint32
serviceType string
acceptedConnections uint32
currentConnections uint32
refusedConnections *uint32
} }
// unitType gets the suffix after the last "." in the func (c *systemdCollector) getAllUnits(conn *dbus.Conn) ([]unit, error) {
// unit name and capitalizes the first letter
func (u *unit) unitType() string {
suffixIndex := strings.LastIndex(u.Name, ".") + 1
if suffixIndex < 1 || suffixIndex > len(u.Name) {
return ""
}
return strings.Title(u.Name[suffixIndex:])
}
func (c *systemdCollector) getAllUnits() ([]unit, error) {
conn, err := c.newDbus()
if err != nil {
return nil, fmt.Errorf("couldn't get dbus connection: %s", err)
}
defer conn.Close()
// Filter out any units that are not installed and are pulled in only as dependencies.
allUnits, err := conn.ListUnits() allUnits, err := conn.ListUnits()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -287,96 +391,6 @@ func (c *systemdCollector) getAllUnits() ([]unit, error) {
unit := unit{ unit := unit{
UnitStatus: status, UnitStatus: status,
} }
unitType := unit.unitType()
if unitType == "Service" || unitType == "Mount" {
serviceType, err := conn.GetUnitTypeProperty(unit.Name, unitType, "Type")
if err != nil {
log.Debugf("couldn't get type for unit '%s': %s", unit.Name, err)
} else {
unit.serviceType = serviceType.Value.Value().(string)
}
}
if strings.HasSuffix(unit.Name, ".timer") {
lastTriggerValue, err := conn.GetUnitTypeProperty(unit.Name, "Timer", "LastTriggerUSec")
if err != nil {
log.Debugf("couldn't get unit '%s' LastTriggerUSec: %s", unit.Name, err)
continue
}
unit.lastTriggerUsec = lastTriggerValue.Value.Value().(uint64)
}
if strings.HasSuffix(unit.Name, ".service") {
// NRestarts wasn't added until systemd 235.
restartsCount, err := conn.GetUnitTypeProperty(unit.Name, "Service", "NRestarts")
if err != nil {
log.Debugf("couldn't get unit '%s' NRestarts: %s", unit.Name, err)
} else {
nRestarts := restartsCount.Value.Value().(uint32)
unit.nRestarts = &nRestarts
}
tasksCurrentCount, err := conn.GetUnitTypeProperty(unit.Name, "Service", "TasksCurrent")
if err != nil {
log.Debugf("couldn't get unit '%s' TasksCurrent: %s", unit.Name, err)
} else {
val := tasksCurrentCount.Value.Value().(uint64)
// Don't set if tasksCurrent if dbus reports MaxUint64.
if val != math.MaxUint64 {
unit.tasksCurrent = &val
}
}
tasksMaxCount, err := conn.GetUnitTypeProperty(unit.Name, "Service", "TasksMax")
if err != nil {
log.Debugf("couldn't get unit '%s' TasksMax: %s", unit.Name, err)
} else {
val := tasksMaxCount.Value.Value().(uint64)
// Don't set if tasksMax if dbus reports MaxUint64.
if val != math.MaxUint64 {
unit.tasksMax = &val
}
}
}
if strings.HasSuffix(unit.Name, ".socket") {
acceptedConnectionCount, err := conn.GetUnitTypeProperty(unit.Name, "Socket", "NAccepted")
if err != nil {
log.Debugf("couldn't get unit '%s' NAccepted: %s", unit.Name, err)
continue
}
unit.acceptedConnections = acceptedConnectionCount.Value.Value().(uint32)
currentConnectionCount, err := conn.GetUnitTypeProperty(unit.Name, "Socket", "NConnections")
if err != nil {
log.Debugf("couldn't get unit '%s' NConnections: %s", unit.Name, err)
continue
}
unit.currentConnections = currentConnectionCount.Value.Value().(uint32)
// NRefused wasn't added until systemd 239.
refusedConnectionCount, err := conn.GetUnitTypeProperty(unit.Name, "Socket", "NRefused")
if err != nil {
log.Debugf("couldn't get unit '%s' NRefused: %s", unit.Name, err)
} else {
nRefused := refusedConnectionCount.Value.Value().(uint32)
unit.refusedConnections = &nRefused
}
}
if unit.ActiveState != "active" {
unit.startTimeUsec = 0
} else {
timestampValue, err := conn.GetUnitProperty(unit.Name, "ActiveEnterTimestamp")
if err != nil {
log.Debugf("couldn't get unit '%s' StartTimeUsec: %s", unit.Name, err)
continue
}
unit.startTimeUsec = timestampValue.Value.Value().(uint64)
}
result = append(result, unit) result = append(result, unit)
} }
@ -410,13 +424,3 @@ func filterUnits(units []unit, whitelistPattern, blacklistPattern *regexp.Regexp
return filtered return filtered
} }
func (c *systemdCollector) getSystemState() (state string, err error) {
conn, err := c.newDbus()
if err != nil {
return "", fmt.Errorf("couldn't get dbus connection: %s", err)
}
state, err = conn.GetManagerProperty("SystemState")
conn.Close()
return state, err
}

View File

@ -18,7 +18,6 @@ import (
"testing" "testing"
"github.com/coreos/go-systemd/dbus" "github.com/coreos/go-systemd/dbus"
"github.com/prometheus/client_golang/prometheus"
) )
// Creates mock UnitLists // Creates mock UnitLists
@ -87,26 +86,6 @@ func getUnitListFixtures() [][]unit {
return [][]unit{fixture1, fixture2} return [][]unit{fixture1, fixture2}
} }
func TestSystemdCollectorDoesntCrash(t *testing.T) {
c, err := NewSystemdCollector()
if err != nil {
t.Fatal(err)
}
sink := make(chan prometheus.Metric)
go func() {
for {
<-sink
}
}()
fixtures := getUnitListFixtures()
collector := (c).(*systemdCollector)
for _, units := range fixtures {
collector.collectUnitStatusMetrics(sink, units)
collector.collectSockets(sink, units)
}
}
func TestSystemdIgnoreFilter(t *testing.T) { func TestSystemdIgnoreFilter(t *testing.T) {
fixtures := getUnitListFixtures() fixtures := getUnitListFixtures()
whitelistPattern := regexp.MustCompile("^foo$") whitelistPattern := regexp.MustCompile("^foo$")