diff --git a/collector/pg_replication_slots.go b/collector/replication_slots.go similarity index 74% rename from collector/pg_replication_slots.go rename to collector/replication_slots.go index c21d7f41..e1c2f0a8 100644 --- a/collector/pg_replication_slots.go +++ b/collector/replication_slots.go @@ -44,16 +44,22 @@ var pgReplicationSlot = map[string]*prometheus.Desc{ "last lsn confirmed flushed to the replication slot", []string{"slot_name"}, nil, ), + "is_active": prometheus.NewDesc( + "pg_replication_slot_is_active", + "last lsn confirmed flushed to the replication slot", + []string{"slot_name"}, nil, + ), } func (PGReplicationSlotCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { rows, err := db.QueryContext(ctx, `SELECT slot_name, - pg_current_wal_lsn() AS current_wal_lsn, - confirmed_flush_lsn - FROM - pg_replication_slots;`) + pg_current_wal_lsn() - '0/0' AS current_wal_lsn, + coalesce(confirmed_flush_lsn, '0/0') - '0/0', + active + FROM + pg_replication_slots;`) if err != nil { return err } @@ -63,7 +69,8 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, db *sql.DB, ch cha var slot_name string var wal_lsn int64 var flush_lsn int64 - if err := rows.Scan(&slot_name, &wal_lsn, &flush_lsn); err != nil { + var is_active int + if err := rows.Scan(&slot_name, &wal_lsn, &flush_lsn, &is_active); err != nil { return err } @@ -71,9 +78,15 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, db *sql.DB, ch cha pgReplicationSlot["current_wal_lsn"], prometheus.GaugeValue, float64(wal_lsn), slot_name, ) + if (is_active == 1) { + ch <- prometheus.MustNewConstMetric( + pgReplicationSlot["confirmed_flush_lsn"], + prometheus.GaugeValue, float64(flush_lsn), slot_name, + ) + } ch <- prometheus.MustNewConstMetric( - pgReplicationSlot["confirmed_flush_lsn"], - prometheus.GaugeValue, float64(flush_lsn), slot_name, + pgReplicationSlot["is_active"], + prometheus.GaugeValue, int(flush_lsn), slot_name, ) } if err := rows.Err(); err != nil {