From a4ac0e674769805a9b282edb9a7791e64cd977e3 Mon Sep 17 00:00:00 2001 From: Marc W <113890636+MarcWort@users.noreply.github.com> Date: Sat, 11 May 2024 14:59:55 +0200 Subject: [PATCH] feat: Add safe_wal_size and wal_status to replication_slot (#1027) * feat: Add safe_wal_size to replication_slot Signed-off-by: MarcWort <113890636+MarcWort@users.noreply.github.com> * feat: Add wal_status to replication_slot Signed-off-by: MarcWort <113890636+MarcWort@users.noreply.github.com> --------- Signed-off-by: MarcWort <113890636+MarcWort@users.noreply.github.com> --- collector/pg_replication_slot.go | 40 +++++++++++++++++++++++++-- collector/pg_replication_slot_test.go | 21 ++++++++------ 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/collector/pg_replication_slot.go b/collector/pg_replication_slot.go index 7f1ba003..1d29f849 100644 --- a/collector/pg_replication_slot.go +++ b/collector/pg_replication_slot.go @@ -63,6 +63,24 @@ var ( "whether the replication slot is active or not", []string{"slot_name", "slot_type"}, nil, ) + pgReplicationSlotSafeWal = prometheus.NewDesc( + prometheus.BuildFQName( + namespace, + replicationSlotSubsystem, + "safe_wal_size_bytes", + ), + "number of bytes that can be written to WAL such that this slot is not in danger of getting in state lost", + []string{"slot_name", "slot_type"}, nil, + ) + pgReplicationSlotWalStatus = prometheus.NewDesc( + prometheus.BuildFQName( + namespace, + replicationSlotSubsystem, + "wal_status", + ), + "availability of WAL files claimed by this slot", + []string{"slot_name", "slot_type", "wal_status"}, nil, + ) pgReplicationSlotQuery = `SELECT slot_name, @@ -73,7 +91,9 @@ var ( pg_current_wal_lsn() - '0/0' END AS current_wal_lsn, COALESCE(confirmed_flush_lsn, '0/0') - '0/0' AS confirmed_flush_lsn, - active + active, + safe_wal_size, + wal_status FROM pg_replication_slots;` ) @@ -92,7 +112,9 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance var walLSN sql.NullFloat64 var flushLSN sql.NullFloat64 var isActive sql.NullBool - if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive); err != nil { + var safeWalSize sql.NullInt64 + var walStatus sql.NullString + if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive, &safeWalSize, &walStatus); err != nil { return err } @@ -131,6 +153,20 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance pgReplicationSlotIsActiveDesc, prometheus.GaugeValue, isActiveValue, slotNameLabel, slotTypeLabel, ) + + if safeWalSize.Valid { + ch <- prometheus.MustNewConstMetric( + pgReplicationSlotSafeWal, + prometheus.GaugeValue, float64(safeWalSize.Int64), slotNameLabel, slotTypeLabel, + ) + } + + if walStatus.Valid { + ch <- prometheus.MustNewConstMetric( + pgReplicationSlotWalStatus, + prometheus.GaugeValue, 1, slotNameLabel, slotTypeLabel, walStatus.String, + ) + } } return rows.Err() } diff --git a/collector/pg_replication_slot_test.go b/collector/pg_replication_slot_test.go index 212050c4..174743ac 100644 --- a/collector/pg_replication_slot_test.go +++ b/collector/pg_replication_slot_test.go @@ -31,9 +31,9 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). - AddRow("test_slot", "physical", 5, 3, true) + AddRow("test_slot", "physical", 5, 3, true, 323906992, "reserved") mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -50,6 +50,8 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 5, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 3, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 1, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 323906992, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical", "wal_status": "reserved"}, value: 1, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { @@ -72,9 +74,9 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). - AddRow("test_slot", "physical", 6, 12, false) + AddRow("test_slot", "physical", 6, 12, false, -4000, "extended") mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -90,6 +92,8 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { expected := []MetricResult{ {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: -4000, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical", "wal_status": "extended"}, value: 1, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { @@ -113,9 +117,9 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). - AddRow("test_slot", "physical", 6, 12, nil) + AddRow("test_slot", "physical", 6, 12, nil, nil, "lost") mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -131,6 +135,7 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) { expected := []MetricResult{ {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical", "wal_status": "lost"}, value: 1, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { @@ -153,9 +158,9 @@ func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). - AddRow(nil, nil, nil, nil, true) + AddRow(nil, nil, nil, nil, true, nil, nil) mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric)