From 4aa8cd4996715213df9c4c51223e37c334f929c2 Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Fri, 14 Jul 2023 13:42:12 -0700 Subject: [PATCH 01/14] Gitlab collector: Database wraparound collector and test (#834) * Database wraparound collector and test --------- Signed-off-by: Felix Yuan Co-authored-by: Joe Adams --- collector/pg_database_wraparound.go | 115 +++++++++++++++++++++++ collector/pg_database_wraparound_test.go | 64 +++++++++++++ 2 files changed, 179 insertions(+) create mode 100644 collector/pg_database_wraparound.go create mode 100644 collector/pg_database_wraparound_test.go diff --git a/collector/pg_database_wraparound.go b/collector/pg_database_wraparound.go new file mode 100644 index 00000000..d4627063 --- /dev/null +++ b/collector/pg_database_wraparound.go @@ -0,0 +1,115 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "database/sql" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" +) + +const databaseWraparoundSubsystem = "database_wraparound" + +func init() { + registerCollector(databaseWraparoundSubsystem, defaultDisabled, NewPGDatabaseWraparoundCollector) +} + +type PGDatabaseWraparoundCollector struct { + log log.Logger +} + +func NewPGDatabaseWraparoundCollector(config collectorConfig) (Collector, error) { + return &PGDatabaseWraparoundCollector{log: config.logger}, nil +} + +var ( + databaseWraparoundAgeDatfrozenxid = prometheus.NewDesc( + prometheus.BuildFQName(namespace, databaseWraparoundSubsystem, "age_datfrozenxid_seconds"), + "Age of the oldest transaction ID that has not been frozen.", + []string{"datname"}, + prometheus.Labels{}, + ) + databaseWraparoundAgeDatminmxid = prometheus.NewDesc( + prometheus.BuildFQName(namespace, databaseWraparoundSubsystem, "age_datminmxid_seconds"), + "Age of the oldest multi-transaction ID that has been replaced with a transaction ID.", + []string{"datname"}, + prometheus.Labels{}, + ) + + databaseWraparoundQuery = ` + SELECT + datname, + age(d.datfrozenxid) as age_datfrozenxid, + mxid_age(d.datminmxid) as age_datminmxid + FROM + pg_catalog.pg_database d + WHERE + d.datallowconn + ` +) + +func (c *PGDatabaseWraparoundCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + databaseWraparoundQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var datname sql.NullString + var ageDatfrozenxid, ageDatminmxid sql.NullFloat64 + + if err := rows.Scan(&datname, &ageDatfrozenxid, &ageDatminmxid); err != nil { + return err + } + + if !datname.Valid { + level.Debug(c.log).Log("msg", "Skipping database with NULL name") + continue + } + if !ageDatfrozenxid.Valid { + level.Debug(c.log).Log("msg", "Skipping stat emission with NULL age_datfrozenxid") + continue + } + if !ageDatminmxid.Valid { + level.Debug(c.log).Log("msg", "Skipping stat emission with NULL age_datminmxid") + continue + } + + ageDatfrozenxidMetric := ageDatfrozenxid.Float64 + + ch <- prometheus.MustNewConstMetric( + databaseWraparoundAgeDatfrozenxid, + prometheus.GaugeValue, + ageDatfrozenxidMetric, datname.String, + ) + + ageDatminmxidMetric := ageDatminmxid.Float64 + ch <- prometheus.MustNewConstMetric( + databaseWraparoundAgeDatminmxid, + prometheus.GaugeValue, + ageDatminmxidMetric, datname.String, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_database_wraparound_test.go b/collector/pg_database_wraparound_test.go new file mode 100644 index 00000000..d0a74c36 --- /dev/null +++ b/collector/pg_database_wraparound_test.go @@ -0,0 +1,64 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGDatabaseWraparoundCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "datname", + "age_datfrozenxid", + "age_datminmxid", + } + rows := sqlmock.NewRows(columns). + AddRow("newreddit", 87126426, 0) + + mock.ExpectQuery(sanitizeQuery(databaseWraparoundQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGDatabaseWraparoundCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGDatabaseWraparoundCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"datname": "newreddit"}, value: 87126426, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"datname": "newreddit"}, value: 0, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} From 12c12cf368cd9dabc305336bbec11f2f9d61ca93 Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Wed, 19 Jul 2023 14:24:08 -0700 Subject: [PATCH 02/14] Add a logger to stat_database collector to get better handle on error (also clean up some metric validity checks) Signed-off-by: Felix Yuan --- collector/pg_stat_database.go | 246 ++++++++++++++--------------- collector/pg_stat_database_test.go | 165 ++++++++++++------- 2 files changed, 225 insertions(+), 186 deletions(-) diff --git a/collector/pg_stat_database.go b/collector/pg_stat_database.go index 8a882f89..382ff782 100644 --- a/collector/pg_stat_database.go +++ b/collector/pg_stat_database.go @@ -17,6 +17,8 @@ import ( "context" "database/sql" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" ) @@ -26,10 +28,12 @@ func init() { registerCollector(statDatabaseSubsystem, defaultEnabled, NewPGStatDatabaseCollector) } -type PGStatDatabaseCollector struct{} +type PGStatDatabaseCollector struct { + log log.Logger +} func NewPGStatDatabaseCollector(config collectorConfig) (Collector, error) { - return &PGStatDatabaseCollector{}, nil + return &PGStatDatabaseCollector{log: config.logger}, nil } var ( @@ -228,7 +232,7 @@ var ( ` ) -func (PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (c *PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { db := instance.getDB() rows, err := db.QueryContext(ctx, statDatabaseQuery, @@ -267,217 +271,203 @@ func (PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, c if err != nil { return err } - datidLabel := "unknown" - if datid.Valid { - datidLabel = datid.String + + if !datid.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no datid") + continue } - datnameLabel := "unknown" - if datname.Valid { - datnameLabel = datname.String + if !datname.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no datname") + continue + } + if !numBackends.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no numbackends") + continue + } + if !xactCommit.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no xact_commit") + continue + } + if !xactRollback.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no xact_rollback") + continue + } + if !blksRead.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blks_read") + continue + } + if !blksHit.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blks_hit") + continue + } + if !tupReturned.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_returned") + continue + } + if !tupFetched.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_fetched") + continue + } + if !tupInserted.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_inserted") + continue + } + if !tupUpdated.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_updated") + continue + } + if !tupDeleted.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_deleted") + continue + } + if !conflicts.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no conflicts") + continue + } + if !tempFiles.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no temp_files") + continue + } + if !tempBytes.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no temp_bytes") + continue + } + if !deadlocks.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no deadlocks") + continue + } + if !blkReadTime.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blk_read_time") + continue + } + if !blkWriteTime.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blk_write_time") + continue + } + if !statsReset.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no stats_reset") + continue } - numBackendsMetric := 0.0 - if numBackends.Valid { - numBackendsMetric = numBackends.Float64 - } + labels := []string{datid.String, datname.String} + ch <- prometheus.MustNewConstMetric( statDatabaseNumbackends, prometheus.GaugeValue, - numBackendsMetric, - datidLabel, - datnameLabel, + numBackends.Float64, + labels..., ) - xactCommitMetric := 0.0 - if xactCommit.Valid { - xactCommitMetric = xactCommit.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseXactCommit, prometheus.CounterValue, - xactCommitMetric, - datidLabel, - datnameLabel, + xactCommit.Float64, + labels..., ) - xactRollbackMetric := 0.0 - if xactRollback.Valid { - xactRollbackMetric = xactRollback.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseXactRollback, prometheus.CounterValue, - xactRollbackMetric, - datidLabel, - datnameLabel, + xactRollback.Float64, + labels..., ) - blksReadMetric := 0.0 - if blksRead.Valid { - blksReadMetric = blksRead.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseBlksRead, prometheus.CounterValue, - blksReadMetric, - datidLabel, - datnameLabel, + blksRead.Float64, + labels..., ) - blksHitMetric := 0.0 - if blksHit.Valid { - blksHitMetric = blksHit.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseBlksHit, prometheus.CounterValue, - blksHitMetric, - datidLabel, - datnameLabel, + blksHit.Float64, + labels..., ) - tupReturnedMetric := 0.0 - if tupReturned.Valid { - tupReturnedMetric = tupReturned.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseTupReturned, prometheus.CounterValue, - tupReturnedMetric, - datidLabel, - datnameLabel, + tupReturned.Float64, + labels..., ) - tupFetchedMetric := 0.0 - if tupFetched.Valid { - tupFetchedMetric = tupFetched.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseTupFetched, prometheus.CounterValue, - tupFetchedMetric, - datidLabel, - datnameLabel, + tupFetched.Float64, + labels..., ) - tupInsertedMetric := 0.0 - if tupInserted.Valid { - tupInsertedMetric = tupInserted.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseTupInserted, prometheus.CounterValue, - tupInsertedMetric, - datidLabel, - datnameLabel, + tupInserted.Float64, + labels..., ) - tupUpdatedMetric := 0.0 - if tupUpdated.Valid { - tupUpdatedMetric = tupUpdated.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseTupUpdated, prometheus.CounterValue, - tupUpdatedMetric, - datidLabel, - datnameLabel, + tupUpdated.Float64, + labels..., ) - tupDeletedMetric := 0.0 - if tupDeleted.Valid { - tupDeletedMetric = tupDeleted.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseTupDeleted, prometheus.CounterValue, - tupDeletedMetric, - datidLabel, - datnameLabel, + tupDeleted.Float64, + labels..., ) - conflictsMetric := 0.0 - if conflicts.Valid { - conflictsMetric = conflicts.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseConflicts, prometheus.CounterValue, - conflictsMetric, - datidLabel, - datnameLabel, + conflicts.Float64, + labels..., ) - tempFilesMetric := 0.0 - if tempFiles.Valid { - tempFilesMetric = tempFiles.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseTempFiles, prometheus.CounterValue, - tempFilesMetric, - datidLabel, - datnameLabel, + tempFiles.Float64, + labels..., ) - tempBytesMetric := 0.0 - if tempBytes.Valid { - tempBytesMetric = tempBytes.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseTempBytes, prometheus.CounterValue, - tempBytesMetric, - datidLabel, - datnameLabel, + tempBytes.Float64, + labels..., ) - deadlocksMetric := 0.0 - if deadlocks.Valid { - deadlocksMetric = deadlocks.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseDeadlocks, prometheus.CounterValue, - deadlocksMetric, - datidLabel, - datnameLabel, + deadlocks.Float64, + labels..., ) - blkReadTimeMetric := 0.0 - if blkReadTime.Valid { - blkReadTimeMetric = blkReadTime.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseBlkReadTime, prometheus.CounterValue, - blkReadTimeMetric, - datidLabel, - datnameLabel, + blkReadTime.Float64, + labels..., ) - blkWriteTimeMetric := 0.0 - if blkWriteTime.Valid { - blkWriteTimeMetric = blkWriteTime.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseBlkWriteTime, prometheus.CounterValue, - blkWriteTimeMetric, - datidLabel, - datnameLabel, + blkWriteTime.Float64, + labels..., ) - statsResetMetric := 0.0 - if statsReset.Valid { - statsResetMetric = float64(statsReset.Time.Unix()) - } ch <- prometheus.MustNewConstMetric( statDatabaseStatsReset, prometheus.CounterValue, - statsResetMetric, - datidLabel, - datnameLabel, + float64(statsReset.Time.Unix()), + labels..., ) } return nil diff --git a/collector/pg_stat_database_test.go b/collector/pg_stat_database_test.go index 1fe92eed..70c73eb5 100644 --- a/collector/pg_stat_database_test.go +++ b/collector/pg_stat_database_test.go @@ -18,6 +18,7 @@ import ( "time" "github.com/DATA-DOG/go-sqlmock" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/smartystreets/goconvey/convey" @@ -86,7 +87,9 @@ func TestPGStatDatabaseCollector(t *testing.T) { ch := make(chan prometheus.Metric) go func() { defer close(ch) - c := PGStatDatabaseCollector{} + c := PGStatDatabaseCollector{ + log: log.With(log.NewNopLogger(), "collector", "pg_stat_database"), + } if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGStatDatabaseCollector.Update: %s", err) @@ -131,6 +134,10 @@ func TestPGStatDatabaseCollectorNullValues(t *testing.T) { } defer db.Close() + srT, err := time.Parse("2006-01-02 15:04:05.00000-07", "2023-05-25 17:10:42.81132-07") + if err != nil { + t.Fatalf("Error parsing time: %s", err) + } inst := &instance{db: db} columns := []string{ @@ -158,31 +165,52 @@ func TestPGStatDatabaseCollectorNullValues(t *testing.T) { rows := sqlmock.NewRows(columns). AddRow( nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - ) + "postgres", + 354, + 4945, + 289097744, + 1242257, + int64(3275602074), + 89320867, + 450139, + 2034563757, + 0, + int64(2725688749), + 23, + 52, + 74, + 925, + 16, + 823, + srT). + AddRow( + "pid", + "postgres", + 354, + 4945, + 289097744, + 1242257, + int64(3275602074), + 89320867, + 450139, + 2034563757, + 0, + int64(2725688749), + 23, + 52, + 74, + 925, + 16, + 823, + srT) mock.ExpectQuery(sanitizeQuery(statDatabaseQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { defer close(ch) - c := PGStatDatabaseCollector{} + c := PGStatDatabaseCollector{ + log: log.With(log.NewNopLogger(), "collector", "pg_stat_database"), + } if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGStatDatabaseCollector.Update: %s", err) @@ -190,23 +218,23 @@ func TestPGStatDatabaseCollectorNullValues(t *testing.T) { }() expected := []MetricResult{ - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_GAUGE, value: 354}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 4945}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 289097744}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1242257}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 3275602074}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 89320867}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 450139}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2034563757}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2725688749}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 23}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 52}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 74}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 925}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 16}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 823}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1685059842}, } convey.Convey("Metrics comparison", t, func() { @@ -296,14 +324,35 @@ func TestPGStatDatabaseCollectorRowLeakTest(t *testing.T) { nil, nil, nil, - ) - + ). + AddRow( + "pid", + "postgres", + 355, + 4946, + 289097745, + 1242258, + int64(3275602075), + 89320868, + 450140, + 2034563758, + 1, + int64(2725688750), + 24, + 53, + 75, + 926, + 17, + 824, + srT) mock.ExpectQuery(sanitizeQuery(statDatabaseQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { defer close(ch) - c := PGStatDatabaseCollector{} + c := PGStatDatabaseCollector{ + log: log.With(log.NewNopLogger(), "collector", "pg_stat_database"), + } if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGStatDatabaseCollector.Update: %s", err) @@ -328,23 +377,23 @@ func TestPGStatDatabaseCollectorRowLeakTest(t *testing.T) { {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 16}, {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 823}, {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1685059842}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_GAUGE, value: 355}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 4946}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 289097745}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1242258}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 3275602075}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 89320868}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 450140}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2034563758}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2725688750}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 24}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 53}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 75}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 926}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 17}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 824}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1685059842}, } convey.Convey("Metrics comparison", t, func() { From 24a45f2fe3856671449e83479ea7a511f27b3a20 Mon Sep 17 00:00:00 2001 From: Joe Adams Date: Fri, 21 Jul 2023 14:20:19 -0400 Subject: [PATCH 03/14] Update changelog for release 0.13.2 (#872) Signed-off-by: Joe Adams --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 318e3648..f9ab3cb2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.13.2 / 2023-07-21 + +* [BUGFIX] Fix type issues on pg_postmaster metrics #828 +* [BUGFIX] Fix pg_replication collector instantiation #854 +* [BUGFIX] Fix pg_process_idle metrics #855 + ## 0.13.1 / 2023-06-27 * [BUGFIX] Make collectors not fail on null values #823 From dc3e813f430602dddeb15146a9960960c61a7ffd Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Fri, 21 Jul 2023 11:41:25 -0700 Subject: [PATCH 04/14] Gitlab Collector: Autovacuum collector and test (#840) * Autovacuum collector and test Signed-off-by: Felix Yuan * Update collector/pg_stat_activity_autovacuum.go Co-authored-by: Joe Adams Signed-off-by: Felix Yuan * Update collector/pg_stat_activity_autovacuum.go Co-authored-by: Joe Adams Signed-off-by: Felix Yuan * Use timestamp seconds Signed-off-by: Felix Yuan * query formating Signed-off-by: Felix Yuan * SQL format Signed-off-by: Felix Yuan * Loosen autovacuum query Signed-off-by: Felix Yuan --------- Signed-off-by: Felix Yuan Co-authored-by: Joe Adams --- collector/pg_stat_activity_autovacuum.go | 84 +++++++++++++++++++ collector/pg_stat_activity_autovacuum_test.go | 62 ++++++++++++++ 2 files changed, 146 insertions(+) create mode 100644 collector/pg_stat_activity_autovacuum.go create mode 100644 collector/pg_stat_activity_autovacuum_test.go diff --git a/collector/pg_stat_activity_autovacuum.go b/collector/pg_stat_activity_autovacuum.go new file mode 100644 index 00000000..5e2d2d2c --- /dev/null +++ b/collector/pg_stat_activity_autovacuum.go @@ -0,0 +1,84 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const statActivityAutovacuumSubsystem = "stat_activity_autovacuum" + +func init() { + registerCollector(statActivityAutovacuumSubsystem, defaultDisabled, NewPGStatActivityAutovacuumCollector) +} + +type PGStatActivityAutovacuumCollector struct { + log log.Logger +} + +func NewPGStatActivityAutovacuumCollector(config collectorConfig) (Collector, error) { + return &PGStatActivityAutovacuumCollector{log: config.logger}, nil +} + +var ( + statActivityAutovacuumAgeInSeconds = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statActivityAutovacuumSubsystem, "timestamp_seconds"), + "Start timestamp of the vacuum process in seconds", + []string{"relname"}, + prometheus.Labels{}, + ) + + statActivityAutovacuumQuery = ` + SELECT + SPLIT_PART(query, '.', 2) AS relname, + EXTRACT(xact_start) AS timestamp_seconds + FROM + pg_catalog.pg_stat_activity + WHERE + query LIKE 'autovacuum:%' + ` +) + +func (PGStatActivityAutovacuumCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + statActivityAutovacuumQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var relname string + var ageInSeconds float64 + + if err := rows.Scan(&relname, &ageInSeconds); err != nil { + return err + } + + ch <- prometheus.MustNewConstMetric( + statActivityAutovacuumAgeInSeconds, + prometheus.GaugeValue, + ageInSeconds, relname, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_stat_activity_autovacuum_test.go b/collector/pg_stat_activity_autovacuum_test.go new file mode 100644 index 00000000..a6fcdbca --- /dev/null +++ b/collector/pg_stat_activity_autovacuum_test.go @@ -0,0 +1,62 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGStatActivityAutovacuumCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "relname", + "timestamp_seconds", + } + rows := sqlmock.NewRows(columns). + AddRow("test", 3600) + + mock.ExpectQuery(sanitizeQuery(statActivityAutovacuumQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatActivityAutovacuumCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatActivityAutovacuumCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"relname": "test"}, value: 3600, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} From 2d7e15275147ce7dfb7b7dc6f959d60e106c11a1 Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Fri, 21 Jul 2023 11:42:08 -0700 Subject: [PATCH 05/14] Gitlab Collector: Wal Receiver Collector and Test (#844) * Wal Receiver Collector and Test Signed-off-by: Felix Yuan * Add more escapes Signed-off-by: Felix Yuan * Corrections to wal_receiver Signed-off-by: Felix Yuan * Continue on null labels Signed-off-by: Felix Yuan * Skip nulls and log a message Signed-off-by: Felix Yuan * Redundant breaks Signed-off-by: Felix Yuan * Fix up walreceiver Signed-off-by: Felix Yuan * Remove extra label Signed-off-by: Felix Yuan * Update collector/pg_stat_walreceiver.go Co-authored-by: Ben Kochie Signed-off-by: Felix Yuan * Clean up the extra assignments Signed-off-by: Felix Yuan * Update collector/pg_stat_walreceiver.go Co-authored-by: Joe Adams Signed-off-by: Felix Yuan --------- Signed-off-by: Felix Yuan Co-authored-by: Ben Kochie Co-authored-by: Joe Adams --- collector/collector_test.go | 1 + collector/pg_stat_walreceiver.go | 269 ++++++++++++++++++++++++++ collector/pg_stat_walreceiver_test.go | 186 ++++++++++++++++++ 3 files changed, 456 insertions(+) create mode 100644 collector/pg_stat_walreceiver.go create mode 100644 collector/pg_stat_walreceiver_test.go diff --git a/collector/collector_test.go b/collector/collector_test.go index 00c21ed2..18101f00 100644 --- a/collector/collector_test.go +++ b/collector/collector_test.go @@ -49,6 +49,7 @@ func readMetric(m prometheus.Metric) MetricResult { func sanitizeQuery(q string) string { q = strings.Join(strings.Fields(q), " ") q = strings.Replace(q, "(", "\\(", -1) + q = strings.Replace(q, "?", "\\?", -1) q = strings.Replace(q, ")", "\\)", -1) q = strings.Replace(q, "[", "\\[", -1) q = strings.Replace(q, "]", "\\]", -1) diff --git a/collector/pg_stat_walreceiver.go b/collector/pg_stat_walreceiver.go new file mode 100644 index 00000000..3134c025 --- /dev/null +++ b/collector/pg_stat_walreceiver.go @@ -0,0 +1,269 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "database/sql" + "fmt" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" +) + +func init() { + registerCollector(statWalReceiverSubsystem, defaultDisabled, NewPGStatWalReceiverCollector) +} + +type PGStatWalReceiverCollector struct { + log log.Logger +} + +const statWalReceiverSubsystem = "stat_wal_receiver" + +func NewPGStatWalReceiverCollector(config collectorConfig) (Collector, error) { + return &PGStatWalReceiverCollector{log: config.logger}, nil +} + +var ( + labelCats = []string{"upstream_host", "slot_name", "status"} + statWalReceiverReceiveStartLsn = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_lsn"), + "First write-ahead log location used when WAL receiver is started represented as a decimal", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverReceiveStartTli = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_tli"), + "First timeline number used when WAL receiver is started", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverFlushedLSN = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "flushed_lsn"), + "Last write-ahead log location already received and flushed to disk, the initial value of this field being the first log location used when WAL receiver is started represented as a decimal", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverReceivedTli = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "received_tli"), + "Timeline number of last write-ahead log location received and flushed to disk", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverLastMsgSendTime = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "last_msg_send_time"), + "Send time of last message received from origin WAL sender", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverLastMsgReceiptTime = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "last_msg_receipt_time"), + "Send time of last message received from origin WAL sender", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverLatestEndLsn = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "latest_end_lsn"), + "Last write-ahead log location reported to origin WAL sender as integer", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverLatestEndTime = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "latest_end_time"), + "Time of last write-ahead log location reported to origin WAL sender", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverUpstreamNode = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "upstream_node"), + "Node ID of the upstream node", + labelCats, + prometheus.Labels{}, + ) + + pgStatWalColumnQuery = ` + SELECT + column_name + FROM information_schema.columns + WHERE + table_name = 'pg_stat_wal_receiver' and + column_name = 'flushed_lsn' + ` + + pgStatWalReceiverQueryTemplate = ` + SELECT + trim(both '''' from substring(conninfo from 'host=([^ ]*)')) as upstream_host, + slot_name, + status, + (receive_start_lsn- '0/0') % (2^52)::bigint as receive_start_lsn, + %s +receive_start_tli, + received_tli, + extract(epoch from last_msg_send_time) as last_msg_send_time, + extract(epoch from last_msg_receipt_time) as last_msg_receipt_time, + (latest_end_lsn - '0/0') % (2^52)::bigint as latest_end_lsn, + extract(epoch from latest_end_time) as latest_end_time, + substring(slot_name from 'repmgr_slot_([0-9]*)') as upstream_node + FROM pg_catalog.pg_stat_wal_receiver + ` +) + +func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + hasFlushedLSNRows, err := db.QueryContext(ctx, pgStatWalColumnQuery) + if err != nil { + return err + } + + defer hasFlushedLSNRows.Close() + hasFlushedLSN := hasFlushedLSNRows.Next() + var query string + if hasFlushedLSN { + query = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "(flushed_lsn - '0/0') % (2^52)::bigint as flushed_lsn,\n") + } else { + query = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "") + } + rows, err := db.QueryContext(ctx, query) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var upstreamHost, slotName, status sql.NullString + var receiveStartLsn, receiveStartTli, flushedLsn, receivedTli, latestEndLsn, upstreamNode sql.NullInt64 + var lastMsgSendTime, lastMsgReceiptTime, latestEndTime sql.NullFloat64 + + if hasFlushedLSN { + if err := rows.Scan(&upstreamHost, &slotName, &status, &receiveStartLsn, &receiveStartTli, &flushedLsn, &receivedTli, &lastMsgSendTime, &lastMsgReceiptTime, &latestEndLsn, &latestEndTime, &upstreamNode); err != nil { + return err + } + } else { + if err := rows.Scan(&upstreamHost, &slotName, &status, &receiveStartLsn, &receiveStartTli, &receivedTli, &lastMsgSendTime, &lastMsgReceiptTime, &latestEndLsn, &latestEndTime, &upstreamNode); err != nil { + return err + } + } + if !upstreamHost.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because upstream host is null") + continue + } + + if !slotName.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because slotname host is null") + continue + } + + if !status.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because status is null") + continue + } + labels := []string{upstreamHost.String, slotName.String, status.String} + + if !receiveStartLsn.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because receive_start_lsn is null") + continue + } + if !receiveStartTli.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because receive_start_tli is null") + continue + } + if hasFlushedLSN && !flushedLsn.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because flushed_lsn is null") + continue + } + if !receivedTli.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because received_tli is null") + continue + } + if !lastMsgSendTime.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because last_msg_send_time is null") + continue + } + if !lastMsgReceiptTime.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because last_msg_receipt_time is null") + continue + } + if !latestEndLsn.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because latest_end_lsn is null") + continue + } + if !latestEndTime.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because latest_end_time is null") + continue + } + if !upstreamNode.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because upstream_node is null") + continue + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverReceiveStartLsn, + prometheus.CounterValue, + float64(receiveStartLsn.Int64), + labels...) + + ch <- prometheus.MustNewConstMetric( + statWalReceiverReceiveStartTli, + prometheus.GaugeValue, + float64(receiveStartTli.Int64), + labels...) + + if hasFlushedLSN { + ch <- prometheus.MustNewConstMetric( + statWalReceiverFlushedLSN, + prometheus.CounterValue, + float64(flushedLsn.Int64), + labels...) + } + + ch <- prometheus.MustNewConstMetric( + statWalReceiverReceivedTli, + prometheus.GaugeValue, + float64(receivedTli.Int64), + labels...) + + ch <- prometheus.MustNewConstMetric( + statWalReceiverLastMsgSendTime, + prometheus.CounterValue, + float64(lastMsgSendTime.Float64), + labels...) + + ch <- prometheus.MustNewConstMetric( + statWalReceiverLastMsgReceiptTime, + prometheus.CounterValue, + float64(lastMsgReceiptTime.Float64), + labels...) + + ch <- prometheus.MustNewConstMetric( + statWalReceiverLatestEndLsn, + prometheus.CounterValue, + float64(latestEndLsn.Int64), + labels...) + + ch <- prometheus.MustNewConstMetric( + statWalReceiverLatestEndTime, + prometheus.CounterValue, + latestEndTime.Float64, + labels...) + + ch <- prometheus.MustNewConstMetric( + statWalReceiverUpstreamNode, + prometheus.GaugeValue, + float64(upstreamNode.Int64), + labels...) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_stat_walreceiver_test.go b/collector/pg_stat_walreceiver_test.go new file mode 100644 index 00000000..3e2418b2 --- /dev/null +++ b/collector/pg_stat_walreceiver_test.go @@ -0,0 +1,186 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "fmt" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +var queryWithFlushedLSN = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "(flushed_lsn - '0/0') % (2^52)::bigint as flushed_lsn,\n") +var queryWithNoFlushedLSN = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "") + +func TestPGStatWalReceiverCollectorWithFlushedLSN(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + infoSchemaColumns := []string{ + "column_name", + } + + infoSchemaRows := sqlmock.NewRows(infoSchemaColumns). + AddRow( + "flushed_lsn", + ) + + mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows) + + columns := []string{ + "upstream_host", + "slot_name", + "status", + "receive_start_lsn", + "receive_start_tli", + "flushed_lsn", + "received_tli", + "last_msg_send_time", + "last_msg_receipt_time", + "latest_end_lsn", + "latest_end_time", + "upstream_node", + } + rows := sqlmock.NewRows(columns). + AddRow( + "foo", + "bar", + "stopping", + 1200668684563608, + 1687321285, + 1200668684563609, + 1687321280, + 1687321275, + 1687321276, + 1200668684563610, + 1687321277, + 5, + ) + + mock.ExpectQuery(sanitizeQuery(queryWithFlushedLSN)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatWalReceiverCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321285, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1200668684563609, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321280, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321275, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321276, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1200668684563610, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321277, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 5, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } + +} + +func TestPGStatWalReceiverCollectorWithNoFlushedLSN(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + infoSchemaColumns := []string{ + "column_name", + } + + infoSchemaRows := sqlmock.NewRows(infoSchemaColumns) + + mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows) + + columns := []string{ + "upstream_host", + "slot_name", + "status", + "receive_start_lsn", + "receive_start_tli", + "received_tli", + "last_msg_send_time", + "last_msg_receipt_time", + "latest_end_lsn", + "latest_end_time", + "upstream_node", + } + rows := sqlmock.NewRows(columns). + AddRow( + "foo", + "bar", + "starting", + 1200668684563608, + 1687321285, + 1687321280, + 1687321275, + 1687321276, + 1200668684563610, + 1687321277, + 5, + ) + mock.ExpectQuery(sanitizeQuery(queryWithNoFlushedLSN)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatWalReceiverCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321285, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321280, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321275, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321276, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1200668684563610, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321277, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 5, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } + +} From 74800f483a9bdcbd7efe0c4baf0f8ac1820f21db Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Fri, 21 Jul 2023 11:42:43 -0700 Subject: [PATCH 06/14] Gitlab collector: Xlog location collector and test (#849) * Xlog location collector and test Signed-off-by: Felix Yuan * Add more escapes Signed-off-by: Felix Yuan * Change to Gauge Signed-off-by: Felix Yuan --------- Signed-off-by: Felix Yuan --- collector/pg_xlog_location.go | 80 ++++++++++++++++++++++++++++++ collector/pg_xlog_location_test.go | 61 +++++++++++++++++++++++ 2 files changed, 141 insertions(+) create mode 100644 collector/pg_xlog_location.go create mode 100644 collector/pg_xlog_location_test.go diff --git a/collector/pg_xlog_location.go b/collector/pg_xlog_location.go new file mode 100644 index 00000000..92ac44ac --- /dev/null +++ b/collector/pg_xlog_location.go @@ -0,0 +1,80 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const xlogLocationSubsystem = "xlog_location" + +func init() { + registerCollector(xlogLocationSubsystem, defaultDisabled, NewPGXlogLocationCollector) +} + +type PGXlogLocationCollector struct { + log log.Logger +} + +func NewPGXlogLocationCollector(config collectorConfig) (Collector, error) { + return &PGXlogLocationCollector{log: config.logger}, nil +} + +var ( + xlogLocationBytes = prometheus.NewDesc( + prometheus.BuildFQName(namespace, xlogLocationSubsystem, "bytes"), + "Postgres LSN (log sequence number) being generated on primary or replayed on replica (truncated to low 52 bits)", + []string{}, + prometheus.Labels{}, + ) + + xlogLocationQuery = ` + SELECT CASE + WHEN pg_is_in_recovery() THEN (pg_last_xlog_replay_location() - '0/0') % (2^52)::bigint + ELSE (pg_current_xlog_location() - '0/0') % (2^52)::bigint + END AS bytes + ` +) + +func (PGXlogLocationCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + xlogLocationQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var bytes float64 + + if err := rows.Scan(&bytes); err != nil { + return err + } + + ch <- prometheus.MustNewConstMetric( + xlogLocationBytes, + prometheus.GaugeValue, + bytes, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_xlog_location_test.go b/collector/pg_xlog_location_test.go new file mode 100644 index 00000000..561a7df9 --- /dev/null +++ b/collector/pg_xlog_location_test.go @@ -0,0 +1,61 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGXlogLocationCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "bytes", + } + rows := sqlmock.NewRows(columns). + AddRow(53401) + + mock.ExpectQuery(sanitizeQuery(xlogLocationQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGXlogLocationCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGXlogLocationCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{}, value: 53401, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} From f9277b04b7f614a03a4ca70194abe2045f17c36a Mon Sep 17 00:00:00 2001 From: Ben Kochie Date: Tue, 25 Jul 2023 16:20:37 +0200 Subject: [PATCH 07/14] Handle new pg_stat_statements column names (#874) Update pg_stat_statements collector to handle the new column names in PostgreSQL 13. Fixes: https://github.com/prometheus-community/postgres_exporter/issues/502 Signed-off-by: SuperQ --- collector/pg_stat_statements.go | 30 +++++++++++++++-- collector/pg_stat_statements_test.go | 50 ++++++++++++++++++++++++++-- 2 files changed, 75 insertions(+), 5 deletions(-) diff --git a/collector/pg_stat_statements.go b/collector/pg_stat_statements.go index bbfee1a2..e28c1c59 100644 --- a/collector/pg_stat_statements.go +++ b/collector/pg_stat_statements.go @@ -17,6 +17,7 @@ import ( "context" "database/sql" + "github.com/blang/semver/v4" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" ) @@ -90,12 +91,37 @@ var ( ) ORDER BY seconds_total DESC LIMIT 100;` + + pgStatStatementsNewQuery = `SELECT + pg_get_userbyid(userid) as user, + pg_database.datname, + pg_stat_statements.queryid, + pg_stat_statements.calls as calls_total, + pg_stat_statements.total_exec_time / 1000.0 as seconds_total, + pg_stat_statements.rows as rows_total, + pg_stat_statements.blk_read_time / 1000.0 as block_read_seconds_total, + pg_stat_statements.blk_write_time / 1000.0 as block_write_seconds_total + FROM pg_stat_statements + JOIN pg_database + ON pg_database.oid = pg_stat_statements.dbid + WHERE + total_time > ( + SELECT percentile_cont(0.1) + WITHIN GROUP (ORDER BY total_time) + FROM pg_stat_statements + ) + ORDER BY seconds_total DESC + LIMIT 100;` ) func (PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + query := pgStatStatementsQuery + if instance.version.GE(semver.MustParse("13.0.0")) { + query = pgStatStatementsNewQuery + } + db := instance.getDB() - rows, err := db.QueryContext(ctx, - pgStatStatementsQuery) + rows, err := db.QueryContext(ctx, query) if err != nil { return err diff --git a/collector/pg_stat_statements_test.go b/collector/pg_stat_statements_test.go index c4f89a60..08aba34c 100644 --- a/collector/pg_stat_statements_test.go +++ b/collector/pg_stat_statements_test.go @@ -17,6 +17,7 @@ import ( "testing" "github.com/DATA-DOG/go-sqlmock" + "github.com/blang/semver/v4" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/smartystreets/goconvey/convey" @@ -29,7 +30,7 @@ func TestPGStateStatementsCollector(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &instance{db: db, version: semver.MustParse("12.0.0")} columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). @@ -72,12 +73,12 @@ func TestPGStateStatementsCollectorNull(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &instance{db: db, version: semver.MustParse("13.3.7")} columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). AddRow(nil, nil, nil, nil, nil, nil, nil, nil) - mock.ExpectQuery(sanitizeQuery(pgStatStatementsQuery)).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(pgStatStatementsNewQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -107,3 +108,46 @@ func TestPGStateStatementsCollectorNull(t *testing.T) { t.Errorf("there were unfulfilled exceptions: %s", err) } } + +func TestPGStateStatementsCollectorNewPG(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db, version: semver.MustParse("13.3.7")} + + columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} + rows := sqlmock.NewRows(columns). + AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2) + mock.ExpectQuery(sanitizeQuery(pgStatStatementsNewQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatStatementsCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 5}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.4}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 100}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.1}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.2}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} From 716ac23f202f664845d04c0d7b07000feae19c59 Mon Sep 17 00:00:00 2001 From: Ben Kochie Date: Tue, 25 Jul 2023 22:36:51 +0200 Subject: [PATCH 08/14] Fixup new pg_stats_statements query (#876) Fix all renames of `total_time` to `total_exec_time`. Fixes: https://github.com/prometheus-community/postgres_exporter/issues/502 Signed-off-by: SuperQ --- collector/pg_stat_statements.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/collector/pg_stat_statements.go b/collector/pg_stat_statements.go index e28c1c59..c03e78b9 100644 --- a/collector/pg_stat_statements.go +++ b/collector/pg_stat_statements.go @@ -105,9 +105,9 @@ var ( JOIN pg_database ON pg_database.oid = pg_stat_statements.dbid WHERE - total_time > ( + total_exec_time > ( SELECT percentile_cont(0.1) - WITHIN GROUP (ORDER BY total_time) + WITHIN GROUP (ORDER BY total_exec_time) FROM pg_stat_statements ) ORDER BY seconds_total DESC From 04bb60ce31575d30d4380630ed2f2b98010e69fe Mon Sep 17 00:00:00 2001 From: Ben Kochie Date: Tue, 15 Aug 2023 13:49:05 +0200 Subject: [PATCH 09/14] Add a multi-target example config (#890) Add an example Prometheus scrape config, similar to the blackbox_exporter's example config. Fixes: https://github.com/prometheus-community/postgres_exporter/issues/888 Signed-off-by: SuperQ --- README.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/README.md b/README.md index e3774c85..88c6c098 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,26 @@ To use the multi-target functionality, send an http request to the endpoint `/pr To avoid putting sensitive information like username and password in the URL, preconfigured auth modules are supported via the [auth_modules](#auth_modules) section of the config file. auth_modules for DSNs can be used with the `/probe` endpoint by specifying the `?auth_module=foo` http parameter. +Example Prometheus config: +```yaml +scrape_configs: + - job_name: 'postgres' + static_configs: + - targets: + - server1:5432 + - server2:5432 + metrics_path: /probe + params: + auth_module: [foo] + relabel_configs: + - source_labels: [__address__] + target_label: __param_target + - source_labels: [__param_target] + target_label: instance + - target_label: __address__ + replacement: 127.0.0.1:9116 # The postgres exporter's real hostname:port. +``` + ## Configuration File The configuration file controls the behavior of the exporter. It can be set using the `--config.file` command line flag and defaults to `postgres_exporter.yml`. From b74852a5353b4bf2cb2cc944f329d651fbf99064 Mon Sep 17 00:00:00 2001 From: Joe Adams Date: Wed, 23 Aug 2023 17:33:47 -0400 Subject: [PATCH 10/14] Delay database connection until scrape (#882) This no longer returns an error when creating a collector.instance when the database cannot be reached for the version query. This will resolve the entire postgresCollector not being registered for metrics collection when a database is not available. If the version query fails, the scrape will fail. Resolves #880 Signed-off-by: Joe Adams --- collector/collector.go | 8 ++++++++ collector/instance.go | 30 ++++++++++++++++++++++-------- collector/probe.go | 8 ++++++++ 3 files changed, 38 insertions(+), 8 deletions(-) diff --git a/collector/collector.go b/collector/collector.go index c1bf2af9..e06ef7d1 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -166,6 +166,14 @@ func (p PostgresCollector) Describe(ch chan<- *prometheus.Desc) { // Collect implements the prometheus.Collector interface. func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) { ctx := context.TODO() + + // Set up the database connection for the collector. + err := p.instance.setup() + if err != nil { + level.Error(p.logger).Log("msg", "Error opening connection to database", "err", err) + return + } + wg := sync.WaitGroup{} wg.Add(len(p.Collectors)) for name, c := range p.Collectors { diff --git a/collector/instance.go b/collector/instance.go index 9b2bbf47..87eb0591 100644 --- a/collector/instance.go +++ b/collector/instance.go @@ -22,29 +22,43 @@ import ( ) type instance struct { + dsn string db *sql.DB version semver.Version } func newInstance(dsn string) (*instance, error) { - i := &instance{} + i := &instance{ + dsn: dsn, + } + + // "Create" a database handle to verify the DSN provided is valid. + // Open is not guaranteed to create a connection. db, err := sql.Open("postgres", dsn) if err != nil { return nil, err } + db.Close() + + return i, nil +} + +func (i *instance) setup() error { + db, err := sql.Open("postgres", i.dsn) + if err != nil { + return err + } db.SetMaxOpenConns(1) db.SetMaxIdleConns(1) i.db = db - version, err := queryVersion(db) + version, err := queryVersion(i.db) if err != nil { - db.Close() - return nil, err + return fmt.Errorf("error querying postgresql version: %w", err) + } else { + i.version = version } - - i.version = version - - return i, nil + return nil } func (i *instance) getDB() *sql.DB { diff --git a/collector/probe.go b/collector/probe.go index 834c6517..a7630272 100644 --- a/collector/probe.go +++ b/collector/probe.go @@ -18,6 +18,7 @@ import ( "sync" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/prometheus-community/postgres_exporter/config" "github.com/prometheus/client_golang/prometheus" ) @@ -74,6 +75,13 @@ func (pc *ProbeCollector) Describe(ch chan<- *prometheus.Desc) { } func (pc *ProbeCollector) Collect(ch chan<- prometheus.Metric) { + // Set up the database connection for the collector. + err := pc.instance.setup() + if err != nil { + level.Error(pc.logger).Log("msg", "Error opening connection to database", "err", err) + return + } + wg := sync.WaitGroup{} wg.Add(len(pc.collectors)) for name, c := range pc.collectors { From 2402783205210bb61e01b3e4e15451e7d4655a7b Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Thu, 24 Aug 2023 00:51:26 -0700 Subject: [PATCH 11/14] Bugfix: Make statsreset nullable (#877) * Stats_reset as null seems to actually be legitimate for new databases, so don't fail for it --------- Signed-off-by: Felix Yuan Co-authored-by: Ben Kochie --- collector/pg_stat_database.go | 10 ++- collector/pg_stat_database_test.go | 98 ++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 3 deletions(-) diff --git a/collector/pg_stat_database.go b/collector/pg_stat_database.go index 382ff782..328afee2 100644 --- a/collector/pg_stat_database.go +++ b/collector/pg_stat_database.go @@ -344,9 +344,13 @@ func (c *PGStatDatabaseCollector) Update(ctx context.Context, instance *instance level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blk_write_time") continue } + + statsResetMetric := 0.0 if !statsReset.Valid { - level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no stats_reset") - continue + level.Debug(c.log).Log("msg", "No metric for stats_reset, will collect 0 instead") + } + if statsReset.Valid { + statsResetMetric = float64(statsReset.Time.Unix()) } labels := []string{datid.String, datname.String} @@ -466,7 +470,7 @@ func (c *PGStatDatabaseCollector) Update(ctx context.Context, instance *instance ch <- prometheus.MustNewConstMetric( statDatabaseStatsReset, prometheus.CounterValue, - float64(statsReset.Time.Unix()), + statsResetMetric, labels..., ) } diff --git a/collector/pg_stat_database_test.go b/collector/pg_stat_database_test.go index 70c73eb5..fe1b1706 100644 --- a/collector/pg_stat_database_test.go +++ b/collector/pg_stat_database_test.go @@ -406,3 +406,101 @@ func TestPGStatDatabaseCollectorRowLeakTest(t *testing.T) { t.Errorf("there were unfulfilled exceptions: %s", err) } } + +func TestPGStatDatabaseCollectorTestNilStatReset(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + + columns := []string{ + "datid", + "datname", + "numbackends", + "xact_commit", + "xact_rollback", + "blks_read", + "blks_hit", + "tup_returned", + "tup_fetched", + "tup_inserted", + "tup_updated", + "tup_deleted", + "conflicts", + "temp_files", + "temp_bytes", + "deadlocks", + "blk_read_time", + "blk_write_time", + "stats_reset", + } + + rows := sqlmock.NewRows(columns). + AddRow( + "pid", + "postgres", + 354, + 4945, + 289097744, + 1242257, + int64(3275602074), + 89320867, + 450139, + 2034563757, + 0, + int64(2725688749), + 23, + 52, + 74, + 925, + 16, + 823, + nil) + + mock.ExpectQuery(sanitizeQuery(statDatabaseQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatDatabaseCollector{ + log: log.With(log.NewNopLogger(), "collector", "pg_stat_database"), + } + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatDatabaseCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_GAUGE, value: 354}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 4945}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 289097744}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1242257}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 3275602074}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 89320867}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 450139}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2034563757}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2725688749}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 23}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 52}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 74}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 925}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 16}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 823}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 0}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} From ce74daee9216f8530bcf13fafc03c0d430bf0a1f Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Thu, 24 Aug 2023 00:55:26 -0700 Subject: [PATCH 12/14] Gitlab Collector: User Index io stats collector and test (#845) * User Index io stats collector and test --------- Signed-off-by: Felix Yuan --- collector/pg_statio_user_indexes.go | 118 +++++++++++++++++++++++ collector/pg_statio_user_indexes_test.go | 109 +++++++++++++++++++++ 2 files changed, 227 insertions(+) create mode 100644 collector/pg_statio_user_indexes.go create mode 100644 collector/pg_statio_user_indexes_test.go diff --git a/collector/pg_statio_user_indexes.go b/collector/pg_statio_user_indexes.go new file mode 100644 index 00000000..b5516338 --- /dev/null +++ b/collector/pg_statio_user_indexes.go @@ -0,0 +1,118 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "database/sql" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +func init() { + registerCollector(statioUserIndexesSubsystem, defaultDisabled, NewPGStatioUserIndexesCollector) +} + +type PGStatioUserIndexesCollector struct { + log log.Logger +} + +const statioUserIndexesSubsystem = "statio_user_indexes" + +func NewPGStatioUserIndexesCollector(config collectorConfig) (Collector, error) { + return &PGStatioUserIndexesCollector{log: config.logger}, nil +} + +var ( + statioUserIndexesIdxBlksRead = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statioUserIndexesSubsystem, "idx_blks_read_total"), + "Number of disk blocks read from this index", + []string{"schemaname", "relname", "indexrelname"}, + prometheus.Labels{}, + ) + statioUserIndexesIdxBlksHit = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statioUserIndexesSubsystem, "idx_blks_hit_total"), + "Number of buffer hits in this index", + []string{"schemaname", "relname", "indexrelname"}, + prometheus.Labels{}, + ) + + statioUserIndexesQuery = ` + SELECT + schemaname, + relname, + indexrelname, + idx_blks_read, + idx_blks_hit + FROM pg_statio_user_indexes + ` +) + +func (c *PGStatioUserIndexesCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + statioUserIndexesQuery) + + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var schemaname, relname, indexrelname sql.NullString + var idxBlksRead, idxBlksHit sql.NullFloat64 + + if err := rows.Scan(&schemaname, &relname, &indexrelname, &idxBlksRead, &idxBlksHit); err != nil { + return err + } + schemanameLabel := "unknown" + if schemaname.Valid { + schemanameLabel = schemaname.String + } + relnameLabel := "unknown" + if relname.Valid { + relnameLabel = relname.String + } + indexrelnameLabel := "unknown" + if indexrelname.Valid { + indexrelnameLabel = indexrelname.String + } + labels := []string{schemanameLabel, relnameLabel, indexrelnameLabel} + + idxBlksReadMetric := 0.0 + if idxBlksRead.Valid { + idxBlksReadMetric = idxBlksRead.Float64 + } + ch <- prometheus.MustNewConstMetric( + statioUserIndexesIdxBlksRead, + prometheus.CounterValue, + idxBlksReadMetric, + labels..., + ) + + idxBlksHitMetric := 0.0 + if idxBlksHit.Valid { + idxBlksHitMetric = idxBlksHit.Float64 + } + ch <- prometheus.MustNewConstMetric( + statioUserIndexesIdxBlksHit, + prometheus.CounterValue, + idxBlksHitMetric, + labels..., + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_statio_user_indexes_test.go b/collector/pg_statio_user_indexes_test.go new file mode 100644 index 00000000..17401216 --- /dev/null +++ b/collector/pg_statio_user_indexes_test.go @@ -0,0 +1,109 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPgStatioUserIndexesCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "schemaname", + "relname", + "indexrelname", + "idx_blks_read", + "idx_blks_hit", + } + rows := sqlmock.NewRows(columns). + AddRow("public", "pgtest_accounts", "pgtest_accounts_pkey", 8, 9) + + mock.ExpectQuery(sanitizeQuery(statioUserIndexesQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatioUserIndexesCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatioUserIndexesCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"schemaname": "public", "relname": "pgtest_accounts", "indexrelname": "pgtest_accounts_pkey"}, value: 8, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"schemaname": "public", "relname": "pgtest_accounts", "indexrelname": "pgtest_accounts_pkey"}, value: 9, metricType: dto.MetricType_COUNTER}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} + +func TestPgStatioUserIndexesCollectorNull(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "schemaname", + "relname", + "indexrelname", + "idx_blks_read", + "idx_blks_hit", + } + rows := sqlmock.NewRows(columns). + AddRow(nil, nil, nil, nil, nil) + + mock.ExpectQuery(sanitizeQuery(statioUserIndexesQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatioUserIndexesCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatioUserIndexesCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"schemaname": "unknown", "relname": "unknown", "indexrelname": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"schemaname": "unknown", "relname": "unknown", "indexrelname": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} From ce4ee0507fb644f166021449b105c5d468203874 Mon Sep 17 00:00:00 2001 From: Mathis Raguin Date: Thu, 24 Aug 2023 09:58:41 +0200 Subject: [PATCH 13/14] Update README to reflect changes made in #828 (#894) Signed-off-by: Mathis Raguin --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 88c6c098..5a41f50e 100644 --- a/README.md +++ b/README.md @@ -102,7 +102,7 @@ This will build the docker image as `prometheuscommunity/postgres_exporter:${bra Enable the `postmaster` collector (default: enabled). * `[no-]collector.process_idle` - Enable the `process_idle` collector (default: enabled). + Enable the `process_idle` collector (default: disabled). * `[no-]collector.replication` Enable the `replication` collector (default: enabled). From 589087912638d738d1dfeaffbace77660dc3e52e Mon Sep 17 00:00:00 2001 From: Felix Yuan Date: Fri, 25 Aug 2023 02:20:10 -0700 Subject: [PATCH 14/14] Gitlab Collector: Long running transactions collector and test (#836) * Long running transactions collector and test --------- Signed-off-by: Felix Yuan Co-authored-by: Ben Kochie --- collector/pg_long_running_transactions.go | 93 +++++++++++++++++++ .../pg_long_running_transactions_test.go | 63 +++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 collector/pg_long_running_transactions.go create mode 100644 collector/pg_long_running_transactions_test.go diff --git a/collector/pg_long_running_transactions.go b/collector/pg_long_running_transactions.go new file mode 100644 index 00000000..ffd89d5f --- /dev/null +++ b/collector/pg_long_running_transactions.go @@ -0,0 +1,93 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const longRunningTransactionsSubsystem = "long_running_transactions" + +func init() { + registerCollector(longRunningTransactionsSubsystem, defaultDisabled, NewPGLongRunningTransactionsCollector) +} + +type PGLongRunningTransactionsCollector struct { + log log.Logger +} + +func NewPGLongRunningTransactionsCollector(config collectorConfig) (Collector, error) { + return &PGLongRunningTransactionsCollector{log: config.logger}, nil +} + +var ( + longRunningTransactionsCount = prometheus.NewDesc( + "pg_long_running_transactions", + "Current number of long running transactions", + []string{}, + prometheus.Labels{}, + ) + + longRunningTransactionsAgeInSeconds = prometheus.NewDesc( + prometheus.BuildFQName(namespace, longRunningTransactionsSubsystem, "oldest_timestamp_seconds"), + "The current maximum transaction age in seconds", + []string{}, + prometheus.Labels{}, + ) + + longRunningTransactionsQuery = ` + SELECT + COUNT(*) as transactions, + MAX(EXTRACT(EPOCH FROM clock_timestamp())) AS oldest_timestamp_seconds + FROM pg_catalog.pg_stat_activity + WHERE state is distinct from 'idle' AND query not like 'autovacuum:%' + ` +) + +func (PGLongRunningTransactionsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + longRunningTransactionsQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var transactions, ageInSeconds float64 + + if err := rows.Scan(&transactions, &ageInSeconds); err != nil { + return err + } + + ch <- prometheus.MustNewConstMetric( + longRunningTransactionsCount, + prometheus.GaugeValue, + transactions, + ) + ch <- prometheus.MustNewConstMetric( + longRunningTransactionsAgeInSeconds, + prometheus.GaugeValue, + ageInSeconds, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_long_running_transactions_test.go b/collector/pg_long_running_transactions_test.go new file mode 100644 index 00000000..eedda7c6 --- /dev/null +++ b/collector/pg_long_running_transactions_test.go @@ -0,0 +1,63 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGLongRunningTransactionsCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "transactions", + "age_in_seconds", + } + rows := sqlmock.NewRows(columns). + AddRow(20, 1200) + + mock.ExpectQuery(sanitizeQuery(longRunningTransactionsQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGLongRunningTransactionsCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGLongRunningTransactionsCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{}, value: 20, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{}, value: 1200, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +}