osd: Internally poll PG dump for oldest active PG tracking
Without this, the granularity of the oldest active PG is based on external scrape frequency, and an unlucky sequence of scrapes could see the same PG inactive two scrapes in a row even though it was active in between. Preferably, we would update this even more often than 10 seconds, but PG dumps can take a while.
This commit is contained in:
parent
e6a0a46acf
commit
5b487fad9c
101
ceph/osd.go
101
ceph/osd.go
|
@ -29,12 +29,12 @@ import (
|
|||
|
||||
const (
|
||||
osdLabelFormat = "osd.%v"
|
||||
)
|
||||
|
||||
const (
|
||||
scrubStateIdle = 0
|
||||
scrubStateScrubbing = 1
|
||||
scrubStateDeepScrubbing = 2
|
||||
|
||||
oldestInactivePGUpdatePeriod = 10 * time.Second
|
||||
)
|
||||
|
||||
// OSDCollector displays statistics about OSD in the Ceph cluster.
|
||||
|
@ -54,9 +54,6 @@ type OSDCollector struct {
|
|||
// a PG to not have an active state in it.
|
||||
oldestInactivePGMap map[string]time.Time
|
||||
|
||||
// pgDumpBrief holds the content of PG dump brief
|
||||
pgDumpBrief cephPGDumpBrief
|
||||
|
||||
// CrushWeight is a persistent setting, and it affects how CRUSH assigns data to OSDs.
|
||||
// It displays the CRUSH weight for the OSD
|
||||
CrushWeight *prometheus.GaugeVec
|
||||
|
@ -159,7 +156,7 @@ func NewOSDCollector(exporter *Exporter) *OSDCollector {
|
|||
labels["cluster"] = exporter.Cluster
|
||||
osdLabels := []string{"osd", "device_class", "host", "rack", "root"}
|
||||
|
||||
return &OSDCollector{
|
||||
o := &OSDCollector{
|
||||
conn: exporter.Conn,
|
||||
logger: exporter.Logger,
|
||||
|
||||
|
@ -438,6 +435,9 @@ func NewOSDCollector(exporter *Exporter) *OSDCollector {
|
|||
},
|
||||
),
|
||||
}
|
||||
|
||||
go o.oldestInactivePGLoop()
|
||||
return o
|
||||
}
|
||||
|
||||
func (o *OSDCollector) collectorList() []prometheus.Collector {
|
||||
|
@ -951,7 +951,7 @@ func (o *OSDCollector) collectOSDDump() error {
|
|||
|
||||
}
|
||||
|
||||
func (o *OSDCollector) performPGDumpBrief() error {
|
||||
func (o *OSDCollector) performPGDumpBrief() (*cephPGDumpBrief, error) {
|
||||
args := o.cephPGDumpCommand()
|
||||
buf, _, err := o.conn.MgrCommand(args)
|
||||
if err != nil {
|
||||
|
@ -959,18 +959,23 @@ func (o *OSDCollector) performPGDumpBrief() error {
|
|||
"args", string(bytes.Join(args, []byte(","))),
|
||||
).Error("error executing mgr command")
|
||||
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
o.pgDumpBrief = cephPGDumpBrief{}
|
||||
if err := json.Unmarshal(buf, &o.pgDumpBrief); err != nil {
|
||||
return err
|
||||
pgDumpBrief := cephPGDumpBrief{}
|
||||
if err := json.Unmarshal(buf, &pgDumpBrief); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil
|
||||
return &pgDumpBrief, nil
|
||||
}
|
||||
|
||||
func (o *OSDCollector) collectOSDScrubState(ch chan<- prometheus.Metric) error {
|
||||
pgDumpBrief, err := o.performPGDumpBrief()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// need to reset the PG scrub state since the scrub might have ended within
|
||||
// the last prom scrape interval.
|
||||
// This forces us to report scrub state on all previously discovered OSDs We
|
||||
|
@ -980,7 +985,7 @@ func (o *OSDCollector) collectOSDScrubState(ch chan<- prometheus.Metric) error {
|
|||
o.osdScrubCache[i] = scrubStateIdle
|
||||
}
|
||||
|
||||
for _, pg := range o.pgDumpBrief.PGStats {
|
||||
for _, pg := range pgDumpBrief.PGStats {
|
||||
if strings.Contains(pg.State, "scrubbing") {
|
||||
scrubState := scrubStateScrubbing
|
||||
if strings.Contains(pg.State, "deep") {
|
||||
|
@ -1070,36 +1075,46 @@ func (o *OSDCollector) cephPGDumpCommand() [][]byte {
|
|||
return [][]byte{cmd}
|
||||
}
|
||||
|
||||
func (o *OSDCollector) collectPGStates(ch chan<- prometheus.Metric) error {
|
||||
// - See if there are PGs that we're tracking that are now active
|
||||
// - See if there are new ones to add
|
||||
// - Find the oldest one
|
||||
now := time.Now()
|
||||
oldestTime := now
|
||||
|
||||
for _, pg := range o.pgDumpBrief.PGStats {
|
||||
// If we were tracking it, and it's now active, remove it
|
||||
active := strings.Contains(pg.State, "active")
|
||||
if active {
|
||||
delete(o.oldestInactivePGMap, pg.PGID)
|
||||
func (o *OSDCollector) oldestInactivePGLoop() {
|
||||
for {
|
||||
pgDumpBrief, err := o.performPGDumpBrief()
|
||||
if err != nil {
|
||||
o.logger.WithError(err).Warning("failed to get latest PG dump for oldest inactive PG update")
|
||||
time.Sleep(oldestInactivePGUpdatePeriod)
|
||||
continue
|
||||
}
|
||||
|
||||
// Now see if it's not here, we'll need to track it now
|
||||
pgTime, ok := o.oldestInactivePGMap[pg.PGID]
|
||||
if !ok {
|
||||
pgTime = now
|
||||
o.oldestInactivePGMap[pg.PGID] = now
|
||||
// - See if there are PGs that we're tracking that are now active
|
||||
// - See if there are new ones to add
|
||||
// - Find the oldest one
|
||||
now := time.Now()
|
||||
oldestTime := now
|
||||
|
||||
for _, pg := range pgDumpBrief.PGStats {
|
||||
// If we were tracking it, and it's now active, remove it
|
||||
active := strings.Contains(pg.State, "active")
|
||||
if active {
|
||||
delete(o.oldestInactivePGMap, pg.PGID)
|
||||
continue
|
||||
}
|
||||
|
||||
// Now see if it's not here, we'll need to track it now
|
||||
pgTime, ok := o.oldestInactivePGMap[pg.PGID]
|
||||
if !ok {
|
||||
pgTime = now
|
||||
o.oldestInactivePGMap[pg.PGID] = now
|
||||
}
|
||||
|
||||
// And finally, track our oldest time
|
||||
if pgTime.Before(oldestTime) {
|
||||
oldestTime = pgTime
|
||||
}
|
||||
}
|
||||
|
||||
// And finally, track our oldest time
|
||||
if pgTime.Before(oldestTime) {
|
||||
oldestTime = pgTime
|
||||
}
|
||||
o.OldestInactivePG.Set(float64(now.Unix() - oldestTime.Unix()))
|
||||
|
||||
time.Sleep(oldestInactivePGUpdatePeriod)
|
||||
}
|
||||
|
||||
o.OldestInactivePG.Set(float64(now.Unix() - oldestTime.Unix()))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Describe sends the descriptors of each OSDCollector related metrics we have
|
||||
|
@ -1169,21 +1184,13 @@ func (o *OSDCollector) Collect(ch chan<- prometheus.Metric, version *Version) {
|
|||
localWg.Add(1)
|
||||
go func() {
|
||||
defer localWg.Done()
|
||||
if err := o.performPGDumpBrief(); err != nil {
|
||||
o.logger.WithError(err).Error("error collecting PG dump metrics")
|
||||
if err := o.collectOSDScrubState(ch); err != nil {
|
||||
o.logger.WithError(err).Error("error collecting OSD scrub metrics")
|
||||
}
|
||||
}()
|
||||
|
||||
localWg.Wait()
|
||||
|
||||
// These don't run any mon/mgr commands, and are dependent on the goroutines completing
|
||||
if err := o.collectOSDScrubState(ch); err != nil {
|
||||
o.logger.WithError(err).Error("error collecting OSD scrub metrics")
|
||||
}
|
||||
if err := o.collectPGStates(ch); err != nil {
|
||||
o.logger.WithError(err).Error("error collecting PG state metrics")
|
||||
}
|
||||
|
||||
for _, metric := range o.collectorList() {
|
||||
metric.Collect(ch)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue