PMM-12994 Add plugin information to replication slot collector

This commit is contained in:
Artem Gavrilov 2024-03-15 16:59:57 +02:00
parent 9586668338
commit fa5f3f84e7
2 changed files with 29 additions and 23 deletions

View File

@ -43,7 +43,7 @@ var (
"slot_current_wal_lsn", "slot_current_wal_lsn",
), ),
"current wal lsn value", "current wal lsn value",
[]string{"slot_name", "slot_type"}, nil, []string{"slot_name", "plugin", "slot_type"}, nil,
) )
pgReplicationSlotCurrentFlushDesc = prometheus.NewDesc( pgReplicationSlotCurrentFlushDesc = prometheus.NewDesc(
prometheus.BuildFQName( prometheus.BuildFQName(
@ -52,7 +52,7 @@ var (
"slot_confirmed_flush_lsn", "slot_confirmed_flush_lsn",
), ),
"last lsn confirmed flushed to the replication slot", "last lsn confirmed flushed to the replication slot",
[]string{"slot_name", "slot_type"}, nil, []string{"slot_name", "plugin", "slot_type"}, nil,
) )
pgReplicationSlotIsActiveDesc = prometheus.NewDesc( pgReplicationSlotIsActiveDesc = prometheus.NewDesc(
prometheus.BuildFQName( prometheus.BuildFQName(
@ -61,11 +61,12 @@ var (
"slot_is_active", "slot_is_active",
), ),
"whether the replication slot is active or not", "whether the replication slot is active or not",
[]string{"slot_name", "slot_type"}, nil, []string{"slot_name", "plugin", "slot_type"}, nil,
) )
pgReplicationSlotQuery = `SELECT pgReplicationSlotQuery = `SELECT
slot_name, slot_name,
plugin,
slot_type, slot_type,
CASE WHEN pg_is_in_recovery() THEN CASE WHEN pg_is_in_recovery() THEN
pg_last_wal_receive_lsn() - '0/0' pg_last_wal_receive_lsn() - '0/0'
@ -88,11 +89,12 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance
for rows.Next() { for rows.Next() {
var slotName sql.NullString var slotName sql.NullString
var plugin sql.NullString
var slotType sql.NullString var slotType sql.NullString
var walLSN sql.NullFloat64 var walLSN sql.NullFloat64
var flushLSN sql.NullFloat64 var flushLSN sql.NullFloat64
var isActive sql.NullBool var isActive sql.NullBool
if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive); err != nil { if err := rows.Scan(&slotName, &plugin, &slotType, &walLSN, &flushLSN, &isActive); err != nil {
return err return err
} }
@ -104,6 +106,10 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance
if slotName.Valid { if slotName.Valid {
slotNameLabel = slotName.String slotNameLabel = slotName.String
} }
pluginLabel := "unknown"
if plugin.Valid {
pluginLabel = plugin.String
}
slotTypeLabel := "unknown" slotTypeLabel := "unknown"
if slotType.Valid { if slotType.Valid {
slotTypeLabel = slotType.String slotTypeLabel = slotType.String
@ -115,7 +121,7 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance
} }
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
pgReplicationSlotCurrentWalDesc, pgReplicationSlotCurrentWalDesc,
prometheus.GaugeValue, walLSNMetric, slotNameLabel, slotTypeLabel, prometheus.GaugeValue, walLSNMetric, slotNameLabel, pluginLabel, slotTypeLabel,
) )
if isActive.Valid && isActive.Bool { if isActive.Valid && isActive.Bool {
var flushLSNMetric float64 var flushLSNMetric float64
@ -124,12 +130,12 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance
} }
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
pgReplicationSlotCurrentFlushDesc, pgReplicationSlotCurrentFlushDesc,
prometheus.GaugeValue, flushLSNMetric, slotNameLabel, slotTypeLabel, prometheus.GaugeValue, flushLSNMetric, slotNameLabel, pluginLabel, slotTypeLabel,
) )
} }
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
pgReplicationSlotIsActiveDesc, pgReplicationSlotIsActiveDesc,
prometheus.GaugeValue, isActiveValue, slotNameLabel, slotTypeLabel, prometheus.GaugeValue, isActiveValue, slotNameLabel, pluginLabel, slotTypeLabel,
) )
} }
return rows.Err() return rows.Err()

View File

@ -31,9 +31,9 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) {
inst := &instance{db: db} inst := &instance{db: db}
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"} columns := []string{"slot_name", "plugin", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"}
rows := sqlmock.NewRows(columns). rows := sqlmock.NewRows(columns).
AddRow("test_slot", "physical", 5, 3, true) AddRow("test_slot", "test_decoding", "physical", 5, 3, true)
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric) ch := make(chan prometheus.Metric)
@ -47,9 +47,9 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) {
}() }()
expected := []MetricResult{ expected := []MetricResult{
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 5, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "plugin": "test_decoding", "slot_type": "physical"}, value: 5, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 3, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "plugin": "test_decoding", "slot_type": "physical"}, value: 3, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 1, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "plugin": "test_decoding", "slot_type": "physical"}, value: 1, metricType: dto.MetricType_GAUGE},
} }
convey.Convey("Metrics comparison", t, func() { convey.Convey("Metrics comparison", t, func() {
@ -72,9 +72,9 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) {
inst := &instance{db: db} inst := &instance{db: db}
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"} columns := []string{"slot_name", "plugin", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"}
rows := sqlmock.NewRows(columns). rows := sqlmock.NewRows(columns).
AddRow("test_slot", "physical", 6, 12, false) AddRow("test_slot", "test_decoding", "physical", 6, 12, false)
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric) ch := make(chan prometheus.Metric)
@ -113,9 +113,9 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) {
inst := &instance{db: db} inst := &instance{db: db}
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"} columns := []string{"slot_name", "plugin", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"}
rows := sqlmock.NewRows(columns). rows := sqlmock.NewRows(columns).
AddRow("test_slot", "physical", 6, 12, nil) AddRow("test_slot", "test_decoding", "physical", 6, 12, nil)
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric) ch := make(chan prometheus.Metric)
@ -129,8 +129,8 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) {
}() }()
expected := []MetricResult{ expected := []MetricResult{
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "plugin": "test_decoding", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "plugin": "test_decoding", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE},
} }
convey.Convey("Metrics comparison", t, func() { convey.Convey("Metrics comparison", t, func() {
@ -153,9 +153,9 @@ func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) {
inst := &instance{db: db} inst := &instance{db: db}
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"} columns := []string{"slot_name", "plugin", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"}
rows := sqlmock.NewRows(columns). rows := sqlmock.NewRows(columns).
AddRow(nil, nil, nil, nil, true) AddRow(nil, nil, nil, nil, nil, true)
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric) ch := make(chan prometheus.Metric)
@ -169,9 +169,9 @@ func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) {
}() }()
expected := []MetricResult{ expected := []MetricResult{
{labels: labelMap{"slot_name": "unknown", "slot_type": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "unknown", "plugin": "unknown", "slot_type": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "unknown", "slot_type": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "unknown", "plugin": "unknown", "slot_type": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "unknown", "slot_type": "unknown"}, value: 1, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "unknown", "plugin": "unknown", "slot_type": "unknown"}, value: 1, metricType: dto.MetricType_GAUGE},
} }
convey.Convey("Metrics comparison", t, func() { convey.Convey("Metrics comparison", t, func() {