Optionally collect RGW GC task stats (#94)

* Optionally collect RGW GC task stats

* Minor changes per code-review, add some additional tests to squeeze out extra coverage
This commit is contained in:
ssobolewski 2018-08-01 07:37:07 -06:00 committed by GitHub
parent ae0f874abb
commit dc6ab9c636
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 360 additions and 5 deletions

View File

@ -33,7 +33,7 @@ RUN apt-get update && \
RUN wget -q -O- 'https://download.ceph.com/keys/release.asc' | apt-key add -
RUN echo "deb https://download.ceph.com/debian-luminous xenial main" >> /etc/apt/sources.list && \
apt-get update && \
apt-get install -y --force-yes librados2 librbd1 && \
apt-get install -y --force-yes librados2 librbd1 ceph-common && \
rm -rf /var/lib/apt/lists/*

183
collectors/rgw.go Normal file
View File

@ -0,0 +1,183 @@
package collectors
import (
"encoding/json"
"log"
"os/exec"
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
)
const rgwGCTimeFormat = "2006-01-02 15:04:05"
const radosgwAdminPath = "/usr/bin/radosgw-admin"
type rgwTaskGC struct {
Tag string `json:"tag"`
Time string `json:"time"`
Objects []struct {
Pool string `json:"pool"`
OID string `json:"oid"`
Key string `json:"ky"`
Instance string `json:"instance"`
} `json:"objs"`
}
// Expires returns the timestamp that this task will expire and become active
func (gc rgwTaskGC) ExpiresAt() time.Time {
tmp := strings.SplitN(gc.Time, ".", 2)
last, err := time.Parse(rgwGCTimeFormat, tmp[0])
if err != nil {
return time.Now()
}
return last
}
// rgwGetGCTaskList get the RGW Garbage Collection task list
func rgwGetGCTaskList(config string) ([]byte, error) {
var (
out []byte
err error
)
if out, err = exec.Command(radosgwAdminPath, "-c", config, "gc", "list", "--include-all").Output(); err != nil {
return nil, err
}
return out, nil
}
// RGWCollector collects metrics from the RGW service
type RGWCollector struct {
config string
// ActiveTasks reports the number of (expired) RGW GC tasks
ActiveTasks *prometheus.GaugeVec
// ActiveObjects reports the total number of RGW GC objects contained in active tasks
ActiveObjects *prometheus.GaugeVec
// PendingTasks reports the number of RGW GC tasks queued but not yet expired
PendingTasks *prometheus.GaugeVec
// PendingObjects reports the total number of RGW GC objects contained in pending tasks
PendingObjects *prometheus.GaugeVec
getRGWGCTaskList func(string) ([]byte, error)
}
// NewRGWCollector creates an instance of the RGWCollector and instantiates
// the individual metrics that we can collect from the RGW service
func NewRGWCollector(cluster string, config string) *RGWCollector {
labels := make(prometheus.Labels)
labels["cluster"] = cluster
return &RGWCollector{
config: config,
getRGWGCTaskList: rgwGetGCTaskList,
ActiveTasks: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: cephNamespace,
Name: "rgw_gc_active_tasks",
Help: "RGW GC active task count",
ConstLabels: labels,
},
[]string{},
),
ActiveObjects: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: cephNamespace,
Name: "rgw_gc_active_objects",
Help: "RGW GC active object count",
ConstLabels: labels,
},
[]string{},
),
PendingTasks: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: cephNamespace,
Name: "rgw_gc_pending_tasks",
Help: "RGW GC pending task count",
ConstLabels: labels,
},
[]string{},
),
PendingObjects: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: cephNamespace,
Name: "rgw_gc_pending_objects",
Help: "RGW GC pending object count",
ConstLabels: labels,
},
[]string{},
),
}
}
func (r *RGWCollector) collectorList() []prometheus.Collector {
return []prometheus.Collector{
r.ActiveTasks,
r.ActiveObjects,
r.PendingTasks,
r.PendingObjects,
}
}
func (r *RGWCollector) collect() error {
data, err := r.getRGWGCTaskList(r.config)
if err != nil {
return err
}
tasks := make([]rgwTaskGC, 0, 0)
err = json.Unmarshal(data, &tasks)
if err != nil {
return err
}
activeTaskCount := int(0)
activeObjectCount := int(0)
pendingTaskCount := int(0)
pendingObjectCount := int(0)
now := time.Now()
for _, task := range tasks {
if now.Sub(task.ExpiresAt()) > 0 {
// timer expired these are active
activeTaskCount += 1
activeObjectCount += len(task.Objects)
} else {
pendingTaskCount += 1
pendingObjectCount += len(task.Objects)
}
}
r.ActiveTasks.WithLabelValues().Set(float64(activeTaskCount))
r.PendingTasks.WithLabelValues().Set(float64(pendingTaskCount))
r.ActiveObjects.WithLabelValues().Set(float64(activeObjectCount))
r.PendingObjects.WithLabelValues().Set(float64(pendingObjectCount))
return nil
}
// Describe sends the descriptors of each RGWCollector related metrics we have defined
// to the provided prometheus channel.
func (r *RGWCollector) Describe(ch chan<- *prometheus.Desc) {
for _, metric := range r.collectorList() {
metric.Describe(ch)
}
}
// Collect sends all the collected metrics to the provided prometheus channel.
// It requires the caller to handle synchronization.
func (r *RGWCollector) Collect(ch chan<- prometheus.Metric) {
err := r.collect()
if err != nil {
log.Println("Failed to collect RGW GC stats", err)
}
for _, metric := range r.collectorList() {
metric.Collect(ch)
}
}

157
collectors/rgw_test.go Normal file
View File

@ -0,0 +1,157 @@
package collectors
import (
"errors"
"io/ioutil"
"net/http"
"net/http/httptest"
"regexp"
"testing"
"github.com/prometheus/client_golang/prometheus"
)
func TestRGWCollector(t *testing.T) {
for _, tt := range []struct {
input []byte
reMatch []*regexp.Regexp
reUnmatch []*regexp.Regexp
}{
{
input: []byte(`
[
{
"tag": "00000000-0001-0000-0000-9ec86fa9a561.9695966.3129536\u0000",
"time": "1975-01-01 16:31:09.0.564455s",
"objs": [
{
"pool": "pool.rgw.buckets.data",
"oid": "12345678-0001-0000-0000-000000000000.123456.1100__shadow_.tNcmQWnIAlJMd33ZIdhnLF9HoaY9TOv_1",
"key": "",
"instance": ""
},
{
"pool": "pool.rgw.buckets.data",
"oid": "12345678-0002-0000-0000-000000000000.123456.1100__shadow_.tNcmQWnIAlJMd33ZIdhnLF9HoaY9TOv_1",
"key": "",
"instance": ""
}
]
},
{
"tag": "00000000-0002-0000-0000-9ec86fa9a561.9695966.3129536\u0000",
"time": "1975-01-01 17:31:09.0.564455s",
"objs": [
{
"pool": "pool.rgw.buckets.data",
"oid": "12345678-0004-0000-0000-000000000000.123456.1100__shadow_.tNcmQWnIAlJMd33ZIdhnLF9HoaY9TOv_1",
"key": "",
"instance": ""
},
{
"pool": "pool.rgw.buckets.data",
"oid": "12345678-0005-0000-0000-000000000000.123456.1100__shadow_.tNcmQWnIAlJMd33ZIdhnLF9HoaY9TOv_1",
"key": "",
"instance": ""
}
]
},
{
"tag": "00000000-0002-0000-0000-9ec86fa9a561.9695966.3129536\u0000",
"time": "3075-01-01 11:30:09.0.123456s",
"objs": [
{
"pool": "pool.rgw.buckets.data",
"oid": "12345678-0001-5555-0000-000000000000.123456.1100__shadow_.tNcmQWnIAlJMd33ZIdhnLF9HoaY9TOv_1",
"key": "",
"instance": ""
},
{
"pool": "pool.rgw.buckets.data",
"oid": "12345678-0002-5555-0000-000000000000.123456.1100__shadow_.tNcmQWnIAlJMd33ZIdhnLF9HoaY9TOv_1",
"key": "",
"instance": ""
},
{
"pool": "pool.rgw.buckets.data",
"oid": "12345678-0003-5555-0000-000000000000.123456.1100__shadow_.tNcmQWnIAlJMd33ZIdhnLF9HoaY9TOv_1",
"key": "",
"instance": ""
}
]
}
]
`),
reMatch: []*regexp.Regexp{
regexp.MustCompile(`ceph_rgw_gc_active_tasks{cluster="ceph"} 2`),
regexp.MustCompile(`ceph_rgw_gc_active_objects{cluster="ceph"} 4`),
regexp.MustCompile(`ceph_rgw_gc_pending_tasks{cluster="ceph"} 1`),
regexp.MustCompile(`ceph_rgw_gc_pending_objects{cluster="ceph"} 3`),
},
},
{
input: []byte(`[]`),
reMatch: []*regexp.Regexp{
regexp.MustCompile(`ceph_rgw_gc_active_tasks{cluster="ceph"} 0`),
regexp.MustCompile(`ceph_rgw_gc_active_objects{cluster="ceph"} 0`),
regexp.MustCompile(`ceph_rgw_gc_pending_tasks{cluster="ceph"} 0`),
regexp.MustCompile(`ceph_rgw_gc_pending_objects{cluster="ceph"} 0`),
},
},
{
// force an error return json deserialization
input: []byte(`[ { "bad-object": 17,,, ]`),
reUnmatch: []*regexp.Regexp{
regexp.MustCompile(`ceph_rgw_gc`),
},
},
{
// force an error return from getRGWGCTaskList
input: nil,
reUnmatch: []*regexp.Regexp{
regexp.MustCompile(`ceph_rgw_gc`),
},
},
} {
func() {
collector := NewRGWCollector("ceph", "")
collector.getRGWGCTaskList = func(cluster string) ([]byte, error) {
if tt.input != nil {
return tt.input, nil
}
return nil, errors.New("fake error")
}
if err := prometheus.Register(collector); err != nil {
t.Fatalf("collector failed to register: %s", err)
}
defer prometheus.Unregister(collector)
server := httptest.NewServer(prometheus.Handler())
defer server.Close()
resp, err := http.Get(server.URL)
if err != nil {
t.Fatalf("unexpected failed response from prometheus: %s", err)
}
defer resp.Body.Close()
buf, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("failed reading server response: %s", err)
}
for _, re := range tt.reMatch {
if !re.Match(buf) {
t.Errorf("failed matching: %q", re)
}
}
for _, re := range tt.reUnmatch {
if re.Match(buf) {
t.Errorf("should not have matched %q", re)
}
}
}()
}
}

View File

@ -31,6 +31,11 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)
const (
defaultCephClusterLabel = "ceph"
defaultCephConfigPath = "/etc/ceph/ceph.conf"
)
// This horrible thing is a copy of tcpKeepAliveListener, tweaked to
// specifically check if it hits EMFILE when doing an accept, and if so,
// terminate the process.
@ -74,8 +79,8 @@ var _ prometheus.Collector = &CephExporter{}
// NewCephExporter creates an instance to CephExporter and returns a reference
// to it. We can choose to enable a collector to extract stats out of by adding
// it to the list of collectors.
func NewCephExporter(conn *rados.Conn, cluster string) *CephExporter {
return &CephExporter{
func NewCephExporter(conn *rados.Conn, cluster string, config string, withRGW bool) *CephExporter {
c := &CephExporter{
collectors: []prometheus.Collector{
collectors.NewClusterUsageCollector(conn, cluster),
collectors.NewPoolUsageCollector(conn, cluster),
@ -84,6 +89,14 @@ func NewCephExporter(conn *rados.Conn, cluster string) *CephExporter {
collectors.NewOSDCollector(conn, cluster),
},
}
if withRGW {
c.collectors = append(c.collectors,
collectors.NewRGWCollector(cluster, config),
)
}
return c
}
// Describe sends all the descriptors of the collectors included to
@ -113,6 +126,8 @@ func main() {
cephConfig = flag.String("ceph.config", "", "path to ceph config file")
cephUser = flag.String("ceph.user", "admin", "Ceph user to connect to cluster.")
withRGW = flag.Bool("with-rgw", false, "Enable collection of stats from RGW")
exporterConfig = flag.String("exporter.config", "/etc/ceph/exporter.yml", "Path to ceph exporter config.")
)
flag.Parse()
@ -143,7 +158,7 @@ func main() {
defer conn.Shutdown()
log.Printf("Starting ceph exporter for cluster: %s", cluster.ClusterLabel)
err = prometheus.Register(NewCephExporter(conn, cluster.ClusterLabel))
err = prometheus.Register(NewCephExporter(conn, cluster.ClusterLabel, cluster.ConfigFile, *withRGW))
if err != nil {
log.Fatalf("cannot export cluster: %s error: %v", cluster.ClusterLabel, err)
}
@ -168,7 +183,7 @@ func main() {
}
defer conn.Shutdown()
prometheus.MustRegister(NewCephExporter(conn, "ceph"))
prometheus.MustRegister(NewCephExporter(conn, defaultCephClusterLabel, defaultCephConfigPath, *withRGW))
}
http.Handle(*metricsPath, promhttp.Handler())