Merge pull request #241 from digitalocean/oldest-inactive-internal-poll
osd: Internally poll PG dump for oldest active PG tracking
This commit is contained in:
commit
76ec6f81de
101
ceph/osd.go
101
ceph/osd.go
|
@ -29,12 +29,12 @@ import (
|
|||
|
||||
const (
|
||||
osdLabelFormat = "osd.%v"
|
||||
)
|
||||
|
||||
const (
|
||||
scrubStateIdle = 0
|
||||
scrubStateScrubbing = 1
|
||||
scrubStateDeepScrubbing = 2
|
||||
|
||||
oldestInactivePGUpdatePeriod = 10 * time.Second
|
||||
)
|
||||
|
||||
// OSDCollector displays statistics about OSD in the Ceph cluster.
|
||||
|
@ -54,9 +54,6 @@ type OSDCollector struct {
|
|||
// a PG to not have an active state in it.
|
||||
oldestInactivePGMap map[string]time.Time
|
||||
|
||||
// pgDumpBrief holds the content of PG dump brief
|
||||
pgDumpBrief cephPGDumpBrief
|
||||
|
||||
// CrushWeight is a persistent setting, and it affects how CRUSH assigns data to OSDs.
|
||||
// It displays the CRUSH weight for the OSD
|
||||
CrushWeight *prometheus.GaugeVec
|
||||
|
@ -159,7 +156,7 @@ func NewOSDCollector(exporter *Exporter) *OSDCollector {
|
|||
labels["cluster"] = exporter.Cluster
|
||||
osdLabels := []string{"osd", "device_class", "host", "rack", "root"}
|
||||
|
||||
return &OSDCollector{
|
||||
o := &OSDCollector{
|
||||
conn: exporter.Conn,
|
||||
logger: exporter.Logger,
|
||||
|
||||
|
@ -438,6 +435,9 @@ func NewOSDCollector(exporter *Exporter) *OSDCollector {
|
|||
},
|
||||
),
|
||||
}
|
||||
|
||||
go o.oldestInactivePGLoop()
|
||||
return o
|
||||
}
|
||||
|
||||
func (o *OSDCollector) collectorList() []prometheus.Collector {
|
||||
|
@ -951,7 +951,7 @@ func (o *OSDCollector) collectOSDDump() error {
|
|||
|
||||
}
|
||||
|
||||
func (o *OSDCollector) performPGDumpBrief() error {
|
||||
func (o *OSDCollector) performPGDumpBrief() (*cephPGDumpBrief, error) {
|
||||
args := o.cephPGDumpCommand()
|
||||
buf, _, err := o.conn.MgrCommand(args)
|
||||
if err != nil {
|
||||
|
@ -959,18 +959,23 @@ func (o *OSDCollector) performPGDumpBrief() error {
|
|||
"args", string(bytes.Join(args, []byte(","))),
|
||||
).Error("error executing mgr command")
|
||||
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
o.pgDumpBrief = cephPGDumpBrief{}
|
||||
if err := json.Unmarshal(buf, &o.pgDumpBrief); err != nil {
|
||||
return err
|
||||
pgDumpBrief := cephPGDumpBrief{}
|
||||
if err := json.Unmarshal(buf, &pgDumpBrief); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil
|
||||
return &pgDumpBrief, nil
|
||||
}
|
||||
|
||||
func (o *OSDCollector) collectOSDScrubState(ch chan<- prometheus.Metric) error {
|
||||
pgDumpBrief, err := o.performPGDumpBrief()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// need to reset the PG scrub state since the scrub might have ended within
|
||||
// the last prom scrape interval.
|
||||
// This forces us to report scrub state on all previously discovered OSDs We
|
||||
|
@ -980,7 +985,7 @@ func (o *OSDCollector) collectOSDScrubState(ch chan<- prometheus.Metric) error {
|
|||
o.osdScrubCache[i] = scrubStateIdle
|
||||
}
|
||||
|
||||
for _, pg := range o.pgDumpBrief.PGStats {
|
||||
for _, pg := range pgDumpBrief.PGStats {
|
||||
if strings.Contains(pg.State, "scrubbing") {
|
||||
scrubState := scrubStateScrubbing
|
||||
if strings.Contains(pg.State, "deep") {
|
||||
|
@ -1070,36 +1075,46 @@ func (o *OSDCollector) cephPGDumpCommand() [][]byte {
|
|||
return [][]byte{cmd}
|
||||
}
|
||||
|
||||
func (o *OSDCollector) collectPGStates(ch chan<- prometheus.Metric) error {
|
||||
// - See if there are PGs that we're tracking that are now active
|
||||
// - See if there are new ones to add
|
||||
// - Find the oldest one
|
||||
now := time.Now()
|
||||
oldestTime := now
|
||||
|
||||
for _, pg := range o.pgDumpBrief.PGStats {
|
||||
// If we were tracking it, and it's now active, remove it
|
||||
active := strings.Contains(pg.State, "active")
|
||||
if active {
|
||||
delete(o.oldestInactivePGMap, pg.PGID)
|
||||
func (o *OSDCollector) oldestInactivePGLoop() {
|
||||
for {
|
||||
pgDumpBrief, err := o.performPGDumpBrief()
|
||||
if err != nil {
|
||||
o.logger.WithError(err).Warning("failed to get latest PG dump for oldest inactive PG update")
|
||||
time.Sleep(oldestInactivePGUpdatePeriod)
|
||||
continue
|
||||
}
|
||||
|
||||
// Now see if it's not here, we'll need to track it now
|
||||
pgTime, ok := o.oldestInactivePGMap[pg.PGID]
|
||||
if !ok {
|
||||
pgTime = now
|
||||
o.oldestInactivePGMap[pg.PGID] = now
|
||||
// - See if there are PGs that we're tracking that are now active
|
||||
// - See if there are new ones to add
|
||||
// - Find the oldest one
|
||||
now := time.Now()
|
||||
oldestTime := now
|
||||
|
||||
for _, pg := range pgDumpBrief.PGStats {
|
||||
// If we were tracking it, and it's now active, remove it
|
||||
active := strings.Contains(pg.State, "active")
|
||||
if active {
|
||||
delete(o.oldestInactivePGMap, pg.PGID)
|
||||
continue
|
||||
}
|
||||
|
||||
// Now see if it's not here, we'll need to track it now
|
||||
pgTime, ok := o.oldestInactivePGMap[pg.PGID]
|
||||
if !ok {
|
||||
pgTime = now
|
||||
o.oldestInactivePGMap[pg.PGID] = now
|
||||
}
|
||||
|
||||
// And finally, track our oldest time
|
||||
if pgTime.Before(oldestTime) {
|
||||
oldestTime = pgTime
|
||||
}
|
||||
}
|
||||
|
||||
// And finally, track our oldest time
|
||||
if pgTime.Before(oldestTime) {
|
||||
oldestTime = pgTime
|
||||
}
|
||||
o.OldestInactivePG.Set(float64(now.Unix() - oldestTime.Unix()))
|
||||
|
||||
time.Sleep(oldestInactivePGUpdatePeriod)
|
||||
}
|
||||
|
||||
o.OldestInactivePG.Set(float64(now.Unix() - oldestTime.Unix()))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Describe sends the descriptors of each OSDCollector related metrics we have
|
||||
|
@ -1169,21 +1184,13 @@ func (o *OSDCollector) Collect(ch chan<- prometheus.Metric, version *Version) {
|
|||
localWg.Add(1)
|
||||
go func() {
|
||||
defer localWg.Done()
|
||||
if err := o.performPGDumpBrief(); err != nil {
|
||||
o.logger.WithError(err).Error("error collecting PG dump metrics")
|
||||
if err := o.collectOSDScrubState(ch); err != nil {
|
||||
o.logger.WithError(err).Error("error collecting OSD scrub metrics")
|
||||
}
|
||||
}()
|
||||
|
||||
localWg.Wait()
|
||||
|
||||
// These don't run any mon/mgr commands, and are dependent on the goroutines completing
|
||||
if err := o.collectOSDScrubState(ch); err != nil {
|
||||
o.logger.WithError(err).Error("error collecting OSD scrub metrics")
|
||||
}
|
||||
if err := o.collectPGStates(ch); err != nil {
|
||||
o.logger.WithError(err).Error("error collecting PG state metrics")
|
||||
}
|
||||
|
||||
for _, metric := range o.collectorList() {
|
||||
metric.Collect(ch)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue