From faa1c60ab8d7d387f74628bcf999dd715bbf762c Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Wed, 28 Jun 2023 10:34:27 -0700 Subject: [PATCH] Add archiver and archiver_test Signed-off-by: Felix Yuan --- collector/pg_archiver.go | 81 +++++++++++++++++++++++++++++ collector/pg_archiver_test.go | 96 +++++++++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+) create mode 100644 collector/pg_archiver.go create mode 100644 collector/pg_archiver_test.go diff --git a/collector/pg_archiver.go b/collector/pg_archiver.go new file mode 100644 index 00000000..9c33a0e4 --- /dev/null +++ b/collector/pg_archiver.go @@ -0,0 +1,81 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +func init() { + registerCollector("archiver", defaultDisabled, NewPGArchiverCollector) +} + +type PGArchiverCollector struct { + log log.Logger +} + +const archiverSubsystem = "archiver" + +func NewPGArchiverCollector(config collectorConfig) (Collector, error) { + return &PGArchiverCollector{log: config.logger}, nil +} + +var ( + pgArchiverPendingWalCount = prometheus.NewDesc( + prometheus.BuildFQName(namespace, archiverSubsystem, "pending_wals"), + "Number of WAL files waiting to be archived", + []string{}, prometheus.Labels{}, + ) + + pgArchiverQuery = ` + WITH + current_wal_file AS ( + SELECT CASE WHEN NOT pg_is_in_recovery() THEN pg_walfile_name(pg_current_wal_insert_lsn()) ELSE NULL END pg_walfile_name + ), + current_wal AS ( + SELECT + ('x'||substring(pg_walfile_name,9,8))::bit(32)::int log, + ('x'||substring(pg_walfile_name,17,8))::bit(32)::int seg, + pg_walfile_name + FROM current_wal_file + ), + archive_wal AS( + SELECT + ('x'||substring(last_archived_wal,9,8))::bit(32)::int log, + ('x'||substring(last_archived_wal,17,8))::bit(32)::int seg, + last_archived_wal + FROM pg_stat_archiver + ) + SELECT coalesce(((cw.log - aw.log) * 256) + (cw.seg-aw.seg),'NaN'::float) as pending_wal_count FROM current_wal cw, archive_wal aw + ` +) + +func (c *PGArchiverCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + row := db.QueryRowContext(ctx, + pgArchiverQuery) + var pendingWalCount float64 + err := row.Scan(&pendingWalCount) + if err != nil { + return err + } + ch <- prometheus.MustNewConstMetric( + pgArchiverPendingWalCount, + prometheus.GaugeValue, + pendingWalCount, + ) + return nil +} diff --git a/collector/pg_archiver_test.go b/collector/pg_archiver_test.go new file mode 100644 index 00000000..e2bd5969 --- /dev/null +++ b/collector/pg_archiver_test.go @@ -0,0 +1,96 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "math" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPgArchiverCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + mock.ExpectQuery(sanitizeQuery(pgArchiverQuery)).WillReturnRows(sqlmock.NewRows([]string{"pending_wal_count"}). + AddRow(5)) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGArchiverCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGArchiverCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{}, value: 5, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} + +func TestPgArchiverNaNCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + mock.ExpectQuery(sanitizeQuery(pgArchiverQuery)).WillReturnRows(sqlmock.NewRows([]string{"pending_wal_count"}). + AddRow(math.NaN())) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGArchiverCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGArchiverCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{}, value: math.NaN(), metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect.labels, convey.ShouldResemble, m.labels) + convey.So(math.IsNaN(m.value), convey.ShouldResemble, math.IsNaN(expect.value)) + convey.So(expect.metricType, convey.ShouldEqual, m.metricType) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +}