health: add client and recovery IO stats

This commit is contained in:
Vaibhav Bhembre 2016-02-18 19:09:26 +00:00
parent fa62215e9e
commit fc5806e901
2 changed files with 231 additions and 4 deletions

View File

@ -15,14 +15,27 @@
package collectors
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"log"
"regexp"
"strconv"
"strings"
"github.com/prometheus/client_golang/prometheus"
)
var (
recoveryIORateRegex = regexp.MustCompile(`(\d+) (\w{2})/s`)
recoveryIOKeysRegex = regexp.MustCompile(`(\d+) keys/s`)
recoveryIOObjectsRegex = regexp.MustCompile(`(\d+) objects/s`)
clientIOReadRegex = regexp.MustCompile(`(\d+) (\w{2})/s rd`)
clientIOWriteRegex = regexp.MustCompile(`(\d+) (\w{2})/s wr`)
clientIOOpsRegex = regexp.MustCompile(`(\d+) op/s`)
)
// ClusterHealthCollector collects information about the health of an overall cluster.
// It surfaces changes in the ceph parameters unlike data usage that ClusterUsageCollector
// does.
@ -90,6 +103,25 @@ type ClusterHealthCollector struct {
// RemappedPGs show the no. of PGs that are currently remapped and needs to be moved
// to newer OSDs.
RemappedPGs prometheus.Gauge
// RecoveryIORate shows the i/o rate at which the cluster is performing its ongoing
// recovery at.
RecoveryIORate prometheus.Gauge
// RecoveryIOKeys shows the rate of rados keys recovery.
RecoveryIOKeys prometheus.Gauge
// RecoveryIOObjects shows the rate of rados objects being recovered.
RecoveryIOObjects prometheus.Gauge
// ClientIORead shows the total client read i/o on the cluster.
ClientIORead prometheus.Gauge
// ClientIOWrite shows the total client write i/o on the cluster.
ClientIOWrite prometheus.Gauge
// ClientIOOps shows the rate of total operations conducted by all clients on the cluster.
ClientIOOps prometheus.Gauge
}
const (
@ -222,6 +254,48 @@ func NewClusterHealthCollector(conn Conn) *ClusterHealthCollector {
Help: "No. of PGs that are remapped and incurring cluster-wide movement",
},
),
RecoveryIORate: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: cephNamespace,
Name: "recovery_io_bytes",
Help: "Rate of bytes being recovered in cluster per second",
},
),
RecoveryIOKeys: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: cephNamespace,
Name: "recovery_io_keys",
Help: "Rate of keys being recovered in cluster per second",
},
),
RecoveryIOObjects: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: cephNamespace,
Name: "recovery_io_objects",
Help: "Rate of objects being recovered in cluster per second",
},
),
ClientIORead: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: cephNamespace,
Name: "client_io_read_bytes",
Help: "Rate of bytes being read by all clients per second",
},
),
ClientIOWrite: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: cephNamespace,
Name: "client_io_write_bytes",
Help: "Rate of bytes being written by all clients per second",
},
),
ClientIOOps: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: cephNamespace,
Name: "client_io_ops",
Help: "Total client ops on the cluster measured per second",
},
),
}
}
@ -243,6 +317,12 @@ func (c *ClusterHealthCollector) metricsList() []prometheus.Metric {
c.OSDsIn,
c.OSDsNum,
c.RemappedPGs,
c.RecoveryIORate,
c.RecoveryIOKeys,
c.RecoveryIOObjects,
c.ClientIORead,
c.ClientIOWrite,
c.ClientIOOps,
}
}
@ -265,7 +345,7 @@ type cephHealthStats struct {
}
func (c *ClusterHealthCollector) collect() error {
cmd := c.cephUsageCommand()
cmd := c.cephJSONUsage()
buf, _, err := c.conn.MonCommand(cmd)
if err != nil {
return err
@ -435,10 +515,25 @@ func (c *ClusterHealthCollector) collect() error {
return nil
}
func (c *ClusterHealthCollector) cephUsageCommand() []byte {
type format string
const (
jsonFormat format = "json"
plainFormat format = "plain"
)
func (c *ClusterHealthCollector) cephPlainUsage() []byte {
return c.cephUsageCommand(plainFormat)
}
func (c *ClusterHealthCollector) cephJSONUsage() []byte {
return c.cephUsageCommand(jsonFormat)
}
func (c *ClusterHealthCollector) cephUsageCommand(f format) []byte {
cmd, err := json.Marshal(map[string]interface{}{
"prefix": "status",
"format": "json",
"format": f,
})
if err != nil {
// panic! because ideally in no world this hard-coded input
@ -448,6 +543,117 @@ func (c *ClusterHealthCollector) cephUsageCommand() []byte {
return cmd
}
func (c *ClusterHealthCollector) collectRecoveryClientIO() error {
cmd := c.cephPlainUsage()
buf, _, err := c.conn.MonCommand(cmd)
if err != nil {
return err
}
var matched []string
sc := bufio.NewScanner(bytes.NewReader(buf))
for sc.Scan() {
line := strings.TrimSpace(sc.Text())
switch {
case strings.HasPrefix(line, "recovery io"):
matched = recoveryIORateRegex.FindStringSubmatch(line)
if len(matched) == 3 {
v, err := strconv.Atoi(matched[1])
if err != nil {
return err
}
switch strings.ToLower(matched[2]) {
case "gb":
v = v * 1e9
case "mb":
v = v * 1e6
case "kb":
v = v * 1e3
default:
return fmt.Errorf("can't parse units %q", matched[2])
}
c.RecoveryIORate.Set(float64(v))
}
matched = recoveryIOKeysRegex.FindStringSubmatch(line)
if len(matched) == 2 {
v, err := strconv.Atoi(matched[1])
if err != nil {
return err
}
c.RecoveryIOKeys.Set(float64(v))
}
matched = recoveryIOObjectsRegex.FindStringSubmatch(line)
if len(matched) == 2 {
v, err := strconv.Atoi(matched[1])
if err != nil {
return err
}
c.RecoveryIOObjects.Set(float64(v))
}
case strings.HasPrefix(line, "client io"):
matched = clientIOReadRegex.FindStringSubmatch(line)
if len(matched) == 3 {
v, err := strconv.Atoi(matched[1])
if err != nil {
return err
}
switch strings.ToLower(matched[2]) {
case "gb":
v = v * 1e9
case "mb":
v = v * 1e6
case "kb":
v = v * 1e3
default:
return fmt.Errorf("can't parse units %q", matched[2])
}
c.ClientIORead.Set(float64(v))
}
matched = clientIOWriteRegex.FindStringSubmatch(line)
if len(matched) == 3 {
v, err := strconv.Atoi(matched[1])
if err != nil {
return err
}
switch strings.ToLower(matched[2]) {
case "gb":
v = v * 1e9
case "mb":
v = v * 1e6
case "kb":
v = v * 1e3
default:
return fmt.Errorf("can't parse units %q", matched[2])
}
c.ClientIOWrite.Set(float64(v))
}
matched = clientIOOpsRegex.FindStringSubmatch(line)
if len(matched) == 2 {
v, err := strconv.Atoi(matched[1])
if err != nil {
return err
}
c.ClientIOOps.Set(float64(v))
}
}
}
return nil
}
// Describe sends all the descriptions of individual metrics of ClusterHealthCollector
// to the provided prometheus channel.
func (c *ClusterHealthCollector) Describe(ch chan<- *prometheus.Desc) {
@ -461,7 +667,10 @@ func (c *ClusterHealthCollector) Describe(ch chan<- *prometheus.Desc) {
func (c *ClusterHealthCollector) Collect(ch chan<- prometheus.Metric) {
if err := c.collect(); err != nil {
log.Println("failed collecting cluster health metrics:", err)
return
}
if err := c.collectRecoveryClientIO(); err != nil {
log.Println("failed collecting cluster recovery/client io:", err)
}
for _, metric := range c.metricsList() {

View File

@ -284,6 +284,24 @@ func TestClusterHealthCollector(t *testing.T) {
regexp.MustCompile(`health_status 2`),
},
},
{
input: `
$ sudo ceph -s
cluster eff51be8-938a-4afa-b0d1-7a580b4ceb37
health HEALTH_OK
monmap e3: 3 mons at {mon01,mon02,mon03}
recovery io 5779 MB/s, 4 keys/s, 1522 objects/s
client io 4273 kB/s rd, 2740 MB/s wr, 2863 op/s
`,
regexes: []*regexp.Regexp{
regexp.MustCompile(`recovery_io_bytes 5.779e`),
regexp.MustCompile(`recovery_io_keys 4`),
regexp.MustCompile(`recovery_io_objects 1522`),
regexp.MustCompile(`client_io_ops 2863`),
regexp.MustCompile(`client_io_read_bytes 4.273e`),
regexp.MustCompile(`client_io_write_bytes 2.74e`),
},
},
} {
func() {
collector := NewClusterHealthCollector(NewNoopConn(tt.input))