Adds pg_stat_progress_vacuum collector

Co-authored-by: Ben Kochie <superq@gmail.com>
Signed-off-by: Ian Bibby <ian.bibby@reddit.com>
This commit is contained in:
Ian Bibby 2025-03-25 20:26:32 -07:00
parent 2ce65c324c
commit a50603fa27
3 changed files with 360 additions and 0 deletions

View File

@ -144,6 +144,9 @@ This will build the docker image as `prometheuscommunity/postgres_exporter:${bra
* `[no-]collector.stat_database`
Enable the `stat_database` collector (default: enabled).
* `[no-]collector.stat_progress_vacuum`
Enable the `stat_progress_vacuum` collector (default: enabled).
* `[no-]collector.stat_statements`
Enable the `stat_statements` collector (default: disabled).

View File

@ -0,0 +1,222 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"context"
"database/sql"
"log/slog"
"github.com/prometheus/client_golang/prometheus"
)
const progressVacuumSubsystem = "stat_progress_vacuum"
func init() {
registerCollector(progressVacuumSubsystem, defaultEnabled, NewPGStatProgressVacuumCollector)
}
type PGStatProgressVacuumCollector struct {
log *slog.Logger
}
func NewPGStatProgressVacuumCollector(config collectorConfig) (Collector, error) {
return &PGStatProgressVacuumCollector{log: config.logger}, nil
}
var vacuumPhases = []string{
"initializing",
"scanning heap",
"vacuuming indexes",
"vacuuming heap",
"cleaning up indexes",
"truncating heap",
"performing final cleanup",
}
var (
statProgressVacuumPhase = prometheus.NewDesc(
prometheus.BuildFQName(namespace, progressVacuumSubsystem, "phase"),
"Current vacuum phase (1 = active, 0 = inactive). Label 'phase' is human-readable.",
[]string{"datname", "relname", "phase"},
nil,
)
statProgressVacuumHeapBlksTotal = prometheus.NewDesc(
prometheus.BuildFQName(namespace, progressVacuumSubsystem, "heap_blks"),
"Total number of heap blocks in the table being vacuumed.",
[]string{"datname", "relname"},
nil,
)
statProgressVacuumHeapBlksScanned = prometheus.NewDesc(
prometheus.BuildFQName(namespace, progressVacuumSubsystem, "heap_blks_scanned"),
"Number of heap blocks scanned so far.",
[]string{"datname", "relname"},
nil,
)
statProgressVacuumHeapBlksVacuumed = prometheus.NewDesc(
prometheus.BuildFQName(namespace, progressVacuumSubsystem, "heap_blks_vacuumed"),
"Number of heap blocks vacuumed so far.",
[]string{"datname", "relname"},
nil,
)
statProgressVacuumIndexVacuumCount = prometheus.NewDesc(
prometheus.BuildFQName(namespace, progressVacuumSubsystem, "index_vacuums"),
"Number of completed index vacuum cycles.",
[]string{"datname", "relname"},
nil,
)
statProgressVacuumMaxDeadTuples = prometheus.NewDesc(
prometheus.BuildFQName(namespace, progressVacuumSubsystem, "max_dead_tuples"),
"Maximum number of dead tuples that can be stored before cleanup is performed.",
[]string{"datname", "relname"},
nil,
)
statProgressVacuumNumDeadTuples = prometheus.NewDesc(
prometheus.BuildFQName(namespace, progressVacuumSubsystem, "num_dead_tuples"),
"Current number of dead tuples found so far.",
[]string{"datname", "relname"},
nil,
)
// This is the view definition of pg_stat_progress_vacuum, albeit without the conversion
// of "phase" to a human-readable string. We will prefer the numeric representation.
statProgressVacuumQuery = `SELECT
d.datname,
s.relid::regclass::text AS relname,
s.param1 AS phase,
s.param2 AS heap_blks_total,
s.param3 AS heap_blks_scanned,
s.param4 AS heap_blks_vacuumed,
s.param5 AS index_vacuum_count,
s.param6 AS max_dead_tuples,
s.param7 AS num_dead_tuples
FROM
pg_stat_get_progress_info('VACUUM'::text)
s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
LEFT JOIN
pg_database d ON s.datid = d.oid`
)
func (c *PGStatProgressVacuumCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
statProgressVacuumQuery)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var (
datname sql.NullString
relname sql.NullString
phase sql.NullInt64
heapBlksTotal sql.NullInt64
heapBlksScanned sql.NullInt64
heapBlksVacuumed sql.NullInt64
indexVacuumCount sql.NullInt64
maxDeadTuples sql.NullInt64
numDeadTuples sql.NullInt64
)
if err := rows.Scan(
&datname,
&relname,
&phase,
&heapBlksTotal,
&heapBlksScanned,
&heapBlksVacuumed,
&indexVacuumCount,
&maxDeadTuples,
&numDeadTuples,
); err != nil {
return err
}
datnameLabel := "unknown"
if datname.Valid {
datnameLabel = datname.String
}
relnameLabel := "unknown"
if relname.Valid {
relnameLabel = relname.String
}
labels := []string{datnameLabel, relnameLabel}
var phaseMetric *float64
if phase.Valid {
v := float64(phase.Int64)
phaseMetric = &v
}
for i, label := range vacuumPhases {
v := 0.0
// Only the current phase should be 1.0.
if phaseMetric != nil && float64(i) == *phaseMetric {
v = 1.0
}
labelsCopy := append(labels, label)
ch <- prometheus.MustNewConstMetric(statProgressVacuumPhase, prometheus.GaugeValue, v, labelsCopy...)
}
heapTotal := 0.0
if heapBlksTotal.Valid {
heapTotal = float64(heapBlksTotal.Int64)
}
ch <- prometheus.MustNewConstMetric(statProgressVacuumHeapBlksTotal, prometheus.GaugeValue, heapTotal, labels...)
heapScanned := 0.0
if heapBlksScanned.Valid {
heapScanned = float64(heapBlksScanned.Int64)
}
ch <- prometheus.MustNewConstMetric(statProgressVacuumHeapBlksScanned, prometheus.GaugeValue, heapScanned, labels...)
heapVacuumed := 0.0
if heapBlksVacuumed.Valid {
heapVacuumed = float64(heapBlksVacuumed.Int64)
}
ch <- prometheus.MustNewConstMetric(statProgressVacuumHeapBlksVacuumed, prometheus.GaugeValue, heapVacuumed, labels...)
indexCount := 0.0
if indexVacuumCount.Valid {
indexCount = float64(indexVacuumCount.Int64)
}
ch <- prometheus.MustNewConstMetric(statProgressVacuumIndexVacuumCount, prometheus.GaugeValue, indexCount, labels...)
maxDead := 0.0
if maxDeadTuples.Valid {
maxDead = float64(maxDeadTuples.Int64)
}
ch <- prometheus.MustNewConstMetric(statProgressVacuumMaxDeadTuples, prometheus.GaugeValue, maxDead, labels...)
numDead := 0.0
if numDeadTuples.Valid {
numDead = float64(numDeadTuples.Int64)
}
ch <- prometheus.MustNewConstMetric(statProgressVacuumNumDeadTuples, prometheus.GaugeValue, numDead, labels...)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,135 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"context"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)
func TestPGStatProgressVacuumCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"datname", "relname", "phase", "heap_blks_total", "heap_blks_scanned",
"heap_blks_vacuumed", "index_vacuum_count", "max_dead_tuples", "num_dead_tuples",
}
rows := sqlmock.NewRows(columns).AddRow(
"postgres", "a_table", 3, 3000, 400, 200, 2, 500, 123)
mock.ExpectQuery(sanitizeQuery(statProgressVacuumQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatProgressVacuumCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatProgressVacuumCollector.Update; %+v", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"datname": "postgres", "relname": "a_table", "phase": "initializing"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "relname": "a_table", "phase": "scanning heap"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "relname": "a_table", "phase": "vacuuming indexes"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "relname": "a_table", "phase": "vacuuming heap"}, metricType: dto.MetricType_GAUGE, value: 1},
{labels: labelMap{"datname": "postgres", "relname": "a_table", "phase": "cleaning up indexes"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "relname": "a_table", "phase": "truncating heap"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "relname": "a_table", "phase": "performing final cleanup"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "relname": "a_table"}, metricType: dto.MetricType_GAUGE, value: 3000},
{labels: labelMap{"datname": "postgres", "relname": "a_table"}, metricType: dto.MetricType_GAUGE, value: 400},
{labels: labelMap{"datname": "postgres", "relname": "a_table"}, metricType: dto.MetricType_GAUGE, value: 200},
{labels: labelMap{"datname": "postgres", "relname": "a_table"}, metricType: dto.MetricType_GAUGE, value: 2},
{labels: labelMap{"datname": "postgres", "relname": "a_table"}, metricType: dto.MetricType_GAUGE, value: 500},
{labels: labelMap{"datname": "postgres", "relname": "a_table"}, metricType: dto.MetricType_GAUGE, value: 123},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(m, convey.ShouldResemble, expect)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("There were unfulfilled exceptions: %+v", err)
}
}
func TestPGStatProgressVacuumCollectorNullValues(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"datname", "relname", "phase", "heap_blks_total", "heap_blks_scanned",
"heap_blks_vacuumed", "index_vacuum_count", "max_dead_tuples", "num_dead_tuples",
}
rows := sqlmock.NewRows(columns).AddRow(
"postgres", nil, nil, nil, nil, nil, nil, nil, nil)
mock.ExpectQuery(sanitizeQuery(statProgressVacuumQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatProgressVacuumCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatProgressVacuumCollector.Update; %+v", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"datname": "postgres", "relname": "unknown", "phase": "initializing"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "relname": "unknown", "phase": "scanning heap"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "relname": "unknown", "phase": "vacuuming indexes"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "relname": "unknown", "phase": "vacuuming heap"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "relname": "unknown", "phase": "cleaning up indexes"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "relname": "unknown", "phase": "truncating heap"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "relname": "unknown", "phase": "performing final cleanup"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("There were unfulfilled exceptions: %+v", err)
}
}