Merge pull request #222 from digitalocean/tbrekke/rbd-mirror

Add rbd-mirror health status
This commit is contained in:
Tyler Brekke 2022-08-24 14:24:28 -07:00 committed by GitHub
commit 63c7e97a32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 420 additions and 49 deletions

View File

@ -16,8 +16,10 @@ package ceph
import (
"encoding/json"
"fmt"
"sync"
"github.com/Jeffail/gabs"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
@ -28,13 +30,14 @@ import (
// prometheus. It also implements a prometheus.Collector interface in order
// to register it correctly.
type Exporter struct {
mu sync.Mutex
Conn Conn
Cluster string
Config string
RgwMode int
Logger *logrus.Logger
Version *Version
mu sync.Mutex
Conn Conn
Cluster string
Config string
RgwMode int
RbdMirror bool
Logger *logrus.Logger
Version *Version
}
// NewExporter returns an initialized *Exporter
@ -60,6 +63,10 @@ func (exporter *Exporter) getCollectors() []prometheus.Collector {
NewCrashesCollector(exporter),
}
if exporter.RbdMirror {
standardCollectors = append(standardCollectors, NewRbdMirrorStatusCollector(exporter))
}
switch exporter.RgwMode {
case RGWModeForeground:
standardCollectors = append(standardCollectors, NewRGWCollector(exporter, false))
@ -86,6 +93,83 @@ func (exporter *Exporter) cephVersionCmd() []byte {
return cmd
}
func CephVersionsCmd() ([]byte, error) {
// Ceph versions
cmd, err := json.Marshal(map[string]interface{}{
"prefix": "versions",
"format": "json",
})
if err != nil {
return nil, fmt.Errorf("failed to marshal ceph versions command: %s", err)
}
return cmd, nil
}
func ParseCephVersions(buf []byte) (map[string]map[string]float64, error) {
// Rather than a dedicated type, have dynamic daemons and versions
// {"daemon": {"version1": 123, "version2": 234}}
parsed, err := gabs.ParseJSON(buf)
if err != nil {
return nil, err
}
parsedMap, err := parsed.ChildrenMap()
if err != nil {
return nil, err
}
versions := make(map[string]map[string]float64)
for daemonKey, innerObj := range parsedMap {
// Read each daemon, and overall counts
versionMap, err := innerObj.ChildrenMap()
if err == gabs.ErrNotObj {
continue
} else if err != nil {
return nil, err
}
versions[daemonKey] = make(map[string]float64)
for version, countContainer := range versionMap {
count, ok := countContainer.Data().(float64)
if ok {
versions[daemonKey][version] = count
}
}
}
return versions, nil
}
func (exporter *Exporter) setRbdMirror() error {
cmd, err := CephVersionsCmd()
if err != nil {
exporter.Logger.WithError(err).Panic("failed to marshal ceph versions command")
}
buf, _, err := exporter.Conn.MonCommand(cmd)
if err != nil {
exporter.Logger.WithError(err).WithField(
"args", string(cmd),
).Error("error executing mon command")
return err
}
versions, err := ParseCephVersions(buf)
if err != nil {
return err
}
// check to see if rbd-mirror is in ceph version output and not empty
if _, exists := versions["rbd-mirror"]; exists {
if len(versions["rbd-mirror"]) > 0 {
exporter.RbdMirror = true
}
}
return nil
}
func (exporter *Exporter) setCephVersion() error {
buf, _, err := exporter.Conn.MonCommand(exporter.cephVersionCmd())
if err != nil {
@ -121,6 +205,12 @@ func (exporter *Exporter) Describe(ch chan<- *prometheus.Desc) {
return
}
err = exporter.setRbdMirror()
if err != nil {
exporter.Logger.WithError(err).Error("failed to set rbd mirror")
return
}
for _, cc := range exporter.getCollectors() {
cc.Describe(ch)
}
@ -139,6 +229,12 @@ func (exporter *Exporter) Collect(ch chan<- prometheus.Metric) {
return
}
err = exporter.setRbdMirror()
if err != nil {
exporter.Logger.WithError(err).Error("failed to set rbd mirror")
return
}
for _, cc := range exporter.getCollectors() {
cc.Collect(ch)
}

View File

@ -325,7 +325,7 @@ func (m *MonitorCollector) collect() error {
}
// Ceph versions
cmd = m.cephVersionsCommand()
cmd, _ = CephVersionsCmd()
buf, _, err = m.conn.MonCommand(cmd)
if err != nil {
m.logger.WithError(err).WithField(
@ -335,35 +335,9 @@ func (m *MonitorCollector) collect() error {
return err
}
// Rather than a dedicated type, have dynamic daemons and versions
// {"daemon": {"version1": 123, "version2": 234}}
parsed, err := gabs.ParseJSON(buf)
versions, err := ParseCephVersions(buf)
if err != nil {
return err
}
parsedMap, err := parsed.ChildrenMap()
if err != nil {
return err
}
versions := make(map[string]map[string]float64)
for daemonKey, innerObj := range parsedMap {
// Read each daemon, and overall counts
versionMap, err := innerObj.ChildrenMap()
if err == gabs.ErrNotObj {
continue
} else if err != nil {
return err
}
versions[daemonKey] = make(map[string]float64)
for version, countContainer := range versionMap {
count, ok := countContainer.Data().(float64)
if ok {
versions[daemonKey][version] = count
}
}
m.logger.WithError(err).Error("error parsing ceph versions command")
}
// Ceph features
@ -379,12 +353,12 @@ func (m *MonitorCollector) collect() error {
// Like versions, the same with features
// {"daemon": [ ... ]}
parsed, err = gabs.ParseJSON(buf)
parsed, err := gabs.ParseJSON(buf)
if err != nil {
return err
}
parsedMap, err = parsed.ChildrenMap()
parsedMap, err := parsed.ChildrenMap()
if err != nil {
return err
}
@ -554,17 +528,6 @@ func (m *MonitorCollector) cephTimeSyncStatusCommand() []byte {
return cmd
}
func (m *MonitorCollector) cephVersionsCommand() []byte {
cmd, err := json.Marshal(map[string]interface{}{
"prefix": "versions",
"format": "json",
})
if err != nil {
m.logger.WithError(err).Panic("error marshalling ceph versions")
}
return cmd
}
func (m *MonitorCollector) cephFeaturesCommand() []byte {
cmd, err := json.Marshal(map[string]interface{}{
"prefix": "features",

176
ceph/rbd_mirror_status.go Normal file
View File

@ -0,0 +1,176 @@
// Copyright 2022 DigitalOcean
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ceph
import (
"encoding/json"
"os/exec"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
const rbdPath = "/usr/bin/rbd"
const (
// RbdMirrorOK denotes the status of the rbd-mirror when healthy.
RbdMirrorOK = "OK"
// RbdMirrorWarn denotes the status of rbd-mirror when unhealthy.
RbdMirrorWarn = "WARNING"
// RbdMirrorErr denotes the status of rbd-mirror when unhealthy but usually needs
// manual intervention.
RbdMirrorErr = "ERROR"
)
type rbdMirrorPoolStatus struct {
Summary struct {
Health string `json:"health"`
DaemonHealth string `json:"daemon_health"`
ImageHealth string `json:"image_health"`
States struct {
} `json:"states"`
} `json:"summary"`
}
// RbdMirrorStatusCollector displays statistics about each pool in the Ceph cluster.
type RbdMirrorStatusCollector struct {
config string
logger *logrus.Logger
version *Version
getRbdMirrorStatus func(config string) ([]byte, error)
// RbdMirrorStatus shows the overall health status of a rbd-mirror.
RbdMirrorStatus prometheus.Gauge
// RbdMirrorDaemonStatus shows the health status of a rbd-mirror daemons.
RbdMirrorDaemonStatus prometheus.Gauge
// RbdMirrorImageStatus shows the health status of rbd-mirror images.
RbdMirrorImageStatus prometheus.Gauge
}
// rbdMirrorStatus get the RBD Mirror Pool Status
var rbdMirrorStatus = func(config string) ([]byte, error) {
out, err := exec.Command(rbdPath, "-c", config, "mirror", "pool", "status", "--format", "json").Output()
if err != nil {
return nil, err
}
return out, nil
}
// NewRbdMirrorStatusCollector creates a new RbdMirrorStatusCollector instance
func NewRbdMirrorStatusCollector(exporter *Exporter) *RbdMirrorStatusCollector {
labels := make(prometheus.Labels)
labels["cluster"] = exporter.Cluster
collector := &RbdMirrorStatusCollector{
config: exporter.Config,
logger: exporter.Logger,
version: exporter.Version,
getRbdMirrorStatus: rbdMirrorStatus,
RbdMirrorStatus: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: cephNamespace,
Name: "rbd_mirror_pool_status",
Help: "Health status of rbd-mirror, can vary only between 3 states (err:2, warn:1, ok:0)",
ConstLabels: labels,
},
),
RbdMirrorDaemonStatus: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: cephNamespace,
Name: "rbd_mirror_pool_daemon_status",
Help: "Health status of rbd-mirror daemons, can vary only between 3 states (err:2, warn:1, ok:0)",
ConstLabels: labels,
},
),
RbdMirrorImageStatus: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: cephNamespace,
Name: "rbd_mirror_pool_image_status",
Help: "Health status of rbd-mirror images, can vary only between 3 states (err:2, warn:1, ok:0)",
ConstLabels: labels,
},
),
}
return collector
}
func (c *RbdMirrorStatusCollector) metricsList() []prometheus.Metric {
if c.version.IsAtLeast(Pacific) {
return []prometheus.Metric{
c.RbdMirrorStatus,
c.RbdMirrorDaemonStatus,
c.RbdMirrorImageStatus,
}
} else {
return []prometheus.Metric{
c.RbdMirrorStatus,
}
}
}
func (c *RbdMirrorStatusCollector) mirrorStatusStringToInt(status string) float64 {
switch status {
case RbdMirrorOK:
return 0
case RbdMirrorWarn:
return 1
case RbdMirrorErr:
return 2
default:
c.logger.Errorf("Unknown rbd-mirror status: %s", status)
return -1
}
}
// Describe provides the metrics descriptions to Prometheus
func (c *RbdMirrorStatusCollector) Describe(ch chan<- *prometheus.Desc) {
for _, metric := range c.metricsList() {
ch <- metric.Desc()
}
}
// Collect sends all the collected metrics Prometheus.
func (c *RbdMirrorStatusCollector) Collect(ch chan<- prometheus.Metric) {
status, err := rbdMirrorStatus(c.config)
if err != nil {
c.logger.WithError(err).Error("failed to run 'rbd mirror pool status'")
}
var rbdStatus rbdMirrorPoolStatus
if err = json.Unmarshal(status, &rbdStatus); err != nil {
c.logger.WithError(err).Error("failed to Unmarshal rbd mirror pool status output")
}
c.RbdMirrorStatus.Set(c.mirrorStatusStringToInt(rbdStatus.Summary.Health))
if c.version.IsAtLeast(Pacific) {
c.RbdMirrorDaemonStatus.Set(c.mirrorStatusStringToInt(rbdStatus.Summary.DaemonHealth))
c.RbdMirrorImageStatus.Set(c.mirrorStatusStringToInt(rbdStatus.Summary.ImageHealth))
}
for _, metric := range c.metricsList() {
ch <- metric
}
}

View File

@ -0,0 +1,136 @@
// Copyright 2022 DigitalOcean
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ceph
import (
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
"regexp"
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
)
func setStatus(b []byte) {
rbdMirrorStatus = func(string) ([]byte, error) {
return b, nil
}
}
func TestRbdMirrorStatusCollector(t *testing.T) {
for _, tt := range []struct {
input []byte
reMatch []*regexp.Regexp
}{
{
input: []byte(`
{
"summary": {
"health": "WARNING",
"daemon_health": "OK",
"image_health": "WARNING",
"states": {
"unknown": 1
}
}
}`),
reMatch: []*regexp.Regexp{
regexp.MustCompile(`ceph_rbd_mirror_pool_status{cluster="ceph"} 1`),
regexp.MustCompile(`ceph_rbd_mirror_pool_image_status{cluster="ceph"} 1`),
regexp.MustCompile(`ceph_rbd_mirror_pool_daemon_status{cluster="ceph"} 0`),
},
},
{
input: []byte(`
{
"summary": {
"health": "WARNING",
"daemon_health": "WARNING",
"image_health": "OK",
"states": {}
}
}`),
reMatch: []*regexp.Regexp{
regexp.MustCompile(`ceph_rbd_mirror_pool_status{cluster="ceph"} 1`),
regexp.MustCompile(`ceph_rbd_mirror_pool_daemon_status{cluster="ceph"} 1`),
regexp.MustCompile(`ceph_rbd_mirror_pool_image_status{cluster="ceph"} 0`),
},
},
{
input: []byte(`
{
"summary": {
"health": "OK",
"daemon_health": "OK",
"image_health": "OK",
"states": {}
}
}`),
reMatch: []*regexp.Regexp{
regexp.MustCompile(`ceph_rbd_mirror_pool_status{cluster="ceph"} 0`),
regexp.MustCompile(`ceph_rbd_mirror_pool_daemon_status{cluster="ceph"} 0`),
regexp.MustCompile(`ceph_rbd_mirror_pool_image_status{cluster="ceph"} 0`),
},
},
{
input: []byte(`
{
"summary": {
"health": "ERROR",
"daemon_health": "OK",
"image_health": "ERROR",
"states": {}
}
}`),
reMatch: []*regexp.Regexp{
regexp.MustCompile(`ceph_rbd_mirror_pool_status{cluster="ceph"} 2`),
regexp.MustCompile(`ceph_rbd_mirror_pool_daemon_status{cluster="ceph"} 0`),
regexp.MustCompile(`ceph_rbd_mirror_pool_image_status{cluster="ceph"} 2`),
},
},
} {
func() {
collector := NewRbdMirrorStatusCollector(&Exporter{Cluster: "ceph", Version: Pacific, Logger: logrus.New()})
var rbdStatus rbdMirrorPoolStatus
setStatus(tt.input)
_ = json.Unmarshal([]byte(tt.input), &rbdStatus)
err := prometheus.Register(collector)
require.NoError(t, err)
defer prometheus.Unregister(collector)
server := httptest.NewServer(promhttp.Handler())
defer server.Close()
resp, err := http.Get(server.URL)
require.NoError(t, err)
defer resp.Body.Close()
buf, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
for _, re := range tt.reMatch {
require.True(t, re.Match(buf))
}
}()
}
}