diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 100cf932..433f71b8 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -1,3 +1,5 @@ +--- +# This action is synced from https://github.com/prometheus/prometheus name: golangci-lint on: push: @@ -27,4 +29,4 @@ jobs: - name: Lint uses: golangci/golangci-lint-action@v3.4.0 with: - version: v1.51.2 + version: v1.53.3 diff --git a/.golangci.yml b/.golangci.yml index 7a03966a..96487c89 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,6 +1,7 @@ --- linters: enable: + - misspell - revive issues: @@ -14,3 +15,9 @@ linters-settings: exclude-functions: # Never check for logger errors. - (github.com/go-kit/log.Logger).Log + revive: + rules: + # https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#unused-parameter + - name: unused-parameter + severity: warning + disabled: true diff --git a/.yamllint b/.yamllint index 19552574..955a5a62 100644 --- a/.yamllint +++ b/.yamllint @@ -20,5 +20,4 @@ rules: config/testdata/section_key_dup.bad.yml line-length: disable truthy: - ignore: | - .github/workflows/*.yml + check-keys: false diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a13c53e..f9ab3cb2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## 0.13.2 / 2023-07-21 + +* [BUGFIX] Fix type issues on pg_postmaster metrics #828 +* [BUGFIX] Fix pg_replication collector instantiation #854 +* [BUGFIX] Fix pg_process_idle metrics #855 + +## 0.13.1 / 2023-06-27 + +* [BUGFIX] Make collectors not fail on null values #823 + ## 0.13.0 / 2023-06-21 BREAKING CHANGES: diff --git a/Makefile.common b/Makefile.common index e372d347..0ce7ea46 100644 --- a/Makefile.common +++ b/Makefile.common @@ -55,13 +55,13 @@ ifneq ($(shell command -v gotestsum > /dev/null),) endif endif -PROMU_VERSION ?= 0.14.0 +PROMU_VERSION ?= 0.15.0 PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_VERSION)/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM).tar.gz SKIP_GOLANGCI_LINT := GOLANGCI_LINT := GOLANGCI_LINT_OPTS ?= -GOLANGCI_LINT_VERSION ?= v1.51.2 +GOLANGCI_LINT_VERSION ?= v1.53.3 # golangci-lint only supports linux, darwin and windows platforms on i386/amd64. # windows isn't included here because of the path separator being different. ifeq ($(GOHOSTOS),$(filter $(GOHOSTOS),linux darwin)) diff --git a/README.md b/README.md index 912ea0c5..88c6c098 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,26 @@ To use the multi-target functionality, send an http request to the endpoint `/pr To avoid putting sensitive information like username and password in the URL, preconfigured auth modules are supported via the [auth_modules](#auth_modules) section of the config file. auth_modules for DSNs can be used with the `/probe` endpoint by specifying the `?auth_module=foo` http parameter. +Example Prometheus config: +```yaml +scrape_configs: + - job_name: 'postgres' + static_configs: + - targets: + - server1:5432 + - server2:5432 + metrics_path: /probe + params: + auth_module: [foo] + relabel_configs: + - source_labels: [__address__] + target_label: __param_target + - source_labels: [__param_target] + target_label: instance + - target_label: __address__ + replacement: 127.0.0.1:9116 # The postgres exporter's real hostname:port. +``` + ## Configuration File The configuration file controls the behavior of the exporter. It can be set using the `--config.file` command line flag and defaults to `postgres_exporter.yml`. @@ -73,7 +93,10 @@ This will build the docker image as `prometheuscommunity/postgres_exporter:${bra * `[no-]collector.database` - Enable the database collector (default: enabled). + Enable the `database` collector (default: enabled). + +* `[no-]collector.locks` + Enable the `locks` collector (default: enabled). * `[no-]collector.postmaster` Enable the `postmaster` collector (default: enabled). diff --git a/VERSION b/VERSION index 54d1a4f2..c317a918 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.13.0 +0.13.1 diff --git a/cmd/postgres_exporter/postgres_exporter.go b/cmd/postgres_exporter/postgres_exporter.go index 6b5908e4..fa34eecc 100644 --- a/cmd/postgres_exporter/postgres_exporter.go +++ b/cmd/postgres_exporter/postgres_exporter.go @@ -176,15 +176,6 @@ var builtinMetricMaps = map[string]intermediateMetricMap{ true, 0, }, - "pg_locks": { - map[string]ColumnMapping{ - "datname": {LABEL, "Name of this database", nil, nil}, - "mode": {LABEL, "Type of Lock", nil, nil}, - "count": {GAUGE, "Number of locks", nil, nil}, - }, - true, - 0, - }, "pg_stat_replication": { map[string]ColumnMapping{ "procpid": {DISCARD, "Process ID of a WAL sender process", nil, semver.MustParseRange("<9.2.0")}, diff --git a/cmd/postgres_exporter/queries.go b/cmd/postgres_exporter/queries.go index 14c14b4f..fa0b5c27 100644 --- a/cmd/postgres_exporter/queries.go +++ b/cmd/postgres_exporter/queries.go @@ -46,31 +46,6 @@ type OverrideQuery struct { // Overriding queries for namespaces above. // TODO: validate this is a closed set in tests, and there are no overlaps var queryOverrides = map[string][]OverrideQuery{ - "pg_locks": { - { - semver.MustParseRange(">0.0.0"), - `SELECT pg_database.datname,tmp.mode,COALESCE(count,0) as count - FROM - ( - VALUES ('accesssharelock'), - ('rowsharelock'), - ('rowexclusivelock'), - ('shareupdateexclusivelock'), - ('sharelock'), - ('sharerowexclusivelock'), - ('exclusivelock'), - ('accessexclusivelock'), - ('sireadlock') - ) AS tmp(mode) CROSS JOIN pg_database - LEFT JOIN - (SELECT database, lower(mode) AS mode,count(*) AS count - FROM pg_locks WHERE database IS NOT NULL - GROUP BY database, lower(mode) - ) AS tmp2 - ON tmp.mode=tmp2.mode and pg_database.oid = tmp2.database ORDER BY 1`, - }, - }, - "pg_stat_replication": { { semver.MustParseRange(">=10.0.0"), diff --git a/collector/collector_test.go b/collector/collector_test.go index 061de889..18101f00 100644 --- a/collector/collector_test.go +++ b/collector/collector_test.go @@ -49,8 +49,14 @@ func readMetric(m prometheus.Metric) MetricResult { func sanitizeQuery(q string) string { q = strings.Join(strings.Fields(q), " ") q = strings.Replace(q, "(", "\\(", -1) + q = strings.Replace(q, "?", "\\?", -1) q = strings.Replace(q, ")", "\\)", -1) + q = strings.Replace(q, "[", "\\[", -1) + q = strings.Replace(q, "]", "\\]", -1) + q = strings.Replace(q, "{", "\\{", -1) + q = strings.Replace(q, "}", "\\}", -1) q = strings.Replace(q, "*", "\\*", -1) + q = strings.Replace(q, "^", "\\^", -1) q = strings.Replace(q, "$", "\\$", -1) return q } diff --git a/collector/pg_database.go b/collector/pg_database.go index a4ea50d0..d2c4b206 100644 --- a/collector/pg_database.go +++ b/collector/pg_database.go @@ -15,6 +15,7 @@ package collector import ( "context" + "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -79,38 +80,42 @@ func (c PGDatabaseCollector) Update(ctx context.Context, instance *instance, ch var databases []string for rows.Next() { - var datname string + var datname sql.NullString if err := rows.Scan(&datname); err != nil { return err } + if !datname.Valid { + continue + } // Ignore excluded databases // Filtering is done here instead of in the query to avoid // a complicated NOT IN query with a variable number of parameters - if sliceContains(c.excludedDatabases, datname) { + if sliceContains(c.excludedDatabases, datname.String) { continue } - databases = append(databases, datname) + databases = append(databases, datname.String) } // Query the size of the databases for _, datname := range databases { - var size int64 + var size sql.NullFloat64 err = db.QueryRowContext(ctx, pgDatabaseSizeQuery, datname).Scan(&size) if err != nil { return err } + sizeMetric := 0.0 + if size.Valid { + sizeMetric = size.Float64 + } ch <- prometheus.MustNewConstMetric( pgDatabaseSizeDesc, - prometheus.GaugeValue, float64(size), datname, + prometheus.GaugeValue, sizeMetric, datname, ) } - if err := rows.Err(); err != nil { - return err - } - return nil + return rows.Err() } func sliceContains(slice []string, s string) bool { diff --git a/collector/pg_database_test.go b/collector/pg_database_test.go index 058a6d25..b5052c5d 100644 --- a/collector/pg_database_test.go +++ b/collector/pg_database_test.go @@ -59,3 +59,43 @@ func TestPGDatabaseCollector(t *testing.T) { t.Errorf("there were unfulfilled exceptions: %s", err) } } + +// TODO add a null db test + +func TestPGDatabaseCollectorNullMetric(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + + mock.ExpectQuery(sanitizeQuery(pgDatabaseQuery)).WillReturnRows(sqlmock.NewRows([]string{"datname"}). + AddRow("postgres")) + + mock.ExpectQuery(sanitizeQuery(pgDatabaseSizeQuery)).WithArgs("postgres").WillReturnRows(sqlmock.NewRows([]string{"pg_database_size"}). + AddRow(nil)) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGDatabaseCollector{} + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGDatabaseCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{"datname": "postgres"}, value: 0, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_database_wraparound.go b/collector/pg_database_wraparound.go new file mode 100644 index 00000000..d4627063 --- /dev/null +++ b/collector/pg_database_wraparound.go @@ -0,0 +1,115 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "database/sql" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" +) + +const databaseWraparoundSubsystem = "database_wraparound" + +func init() { + registerCollector(databaseWraparoundSubsystem, defaultDisabled, NewPGDatabaseWraparoundCollector) +} + +type PGDatabaseWraparoundCollector struct { + log log.Logger +} + +func NewPGDatabaseWraparoundCollector(config collectorConfig) (Collector, error) { + return &PGDatabaseWraparoundCollector{log: config.logger}, nil +} + +var ( + databaseWraparoundAgeDatfrozenxid = prometheus.NewDesc( + prometheus.BuildFQName(namespace, databaseWraparoundSubsystem, "age_datfrozenxid_seconds"), + "Age of the oldest transaction ID that has not been frozen.", + []string{"datname"}, + prometheus.Labels{}, + ) + databaseWraparoundAgeDatminmxid = prometheus.NewDesc( + prometheus.BuildFQName(namespace, databaseWraparoundSubsystem, "age_datminmxid_seconds"), + "Age of the oldest multi-transaction ID that has been replaced with a transaction ID.", + []string{"datname"}, + prometheus.Labels{}, + ) + + databaseWraparoundQuery = ` + SELECT + datname, + age(d.datfrozenxid) as age_datfrozenxid, + mxid_age(d.datminmxid) as age_datminmxid + FROM + pg_catalog.pg_database d + WHERE + d.datallowconn + ` +) + +func (c *PGDatabaseWraparoundCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + databaseWraparoundQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var datname sql.NullString + var ageDatfrozenxid, ageDatminmxid sql.NullFloat64 + + if err := rows.Scan(&datname, &ageDatfrozenxid, &ageDatminmxid); err != nil { + return err + } + + if !datname.Valid { + level.Debug(c.log).Log("msg", "Skipping database with NULL name") + continue + } + if !ageDatfrozenxid.Valid { + level.Debug(c.log).Log("msg", "Skipping stat emission with NULL age_datfrozenxid") + continue + } + if !ageDatminmxid.Valid { + level.Debug(c.log).Log("msg", "Skipping stat emission with NULL age_datminmxid") + continue + } + + ageDatfrozenxidMetric := ageDatfrozenxid.Float64 + + ch <- prometheus.MustNewConstMetric( + databaseWraparoundAgeDatfrozenxid, + prometheus.GaugeValue, + ageDatfrozenxidMetric, datname.String, + ) + + ageDatminmxidMetric := ageDatminmxid.Float64 + ch <- prometheus.MustNewConstMetric( + databaseWraparoundAgeDatminmxid, + prometheus.GaugeValue, + ageDatminmxidMetric, datname.String, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_database_wraparound_test.go b/collector/pg_database_wraparound_test.go new file mode 100644 index 00000000..d0a74c36 --- /dev/null +++ b/collector/pg_database_wraparound_test.go @@ -0,0 +1,64 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGDatabaseWraparoundCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "datname", + "age_datfrozenxid", + "age_datminmxid", + } + rows := sqlmock.NewRows(columns). + AddRow("newreddit", 87126426, 0) + + mock.ExpectQuery(sanitizeQuery(databaseWraparoundQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGDatabaseWraparoundCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGDatabaseWraparoundCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"datname": "newreddit"}, value: 87126426, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"datname": "newreddit"}, value: 0, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_locks.go b/collector/pg_locks.go new file mode 100644 index 00000000..d2c77ccd --- /dev/null +++ b/collector/pg_locks.go @@ -0,0 +1,129 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "database/sql" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const locksSubsystem = "locks" + +func init() { + registerCollector(locksSubsystem, defaultEnabled, NewPGLocksCollector) +} + +type PGLocksCollector struct { + log log.Logger +} + +func NewPGLocksCollector(config collectorConfig) (Collector, error) { + return &PGLocksCollector{ + log: config.logger, + }, nil +} + +var ( + pgLocksDesc = prometheus.NewDesc( + prometheus.BuildFQName( + namespace, + locksSubsystem, + "count", + ), + "Number of locks", + []string{"datname", "mode"}, nil, + ) + + pgLocksQuery = ` + SELECT + pg_database.datname as datname, + tmp.mode as mode, + COALESCE(count, 0) as count + FROM + ( + VALUES + ('accesssharelock'), + ('rowsharelock'), + ('rowexclusivelock'), + ('shareupdateexclusivelock'), + ('sharelock'), + ('sharerowexclusivelock'), + ('exclusivelock'), + ('accessexclusivelock'), + ('sireadlock') + ) AS tmp(mode) + CROSS JOIN pg_database + LEFT JOIN ( + SELECT + database, + lower(mode) AS mode, + count(*) AS count + FROM + pg_locks + WHERE + database IS NOT NULL + GROUP BY + database, + lower(mode) + ) AS tmp2 ON tmp.mode = tmp2.mode + and pg_database.oid = tmp2.database + ORDER BY + 1 + ` +) + +// Update implements Collector and exposes database locks. +// It is called by the Prometheus registry when collecting metrics. +func (c PGLocksCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + // Query the list of databases + rows, err := db.QueryContext(ctx, + pgLocksQuery, + ) + if err != nil { + return err + } + defer rows.Close() + + var datname, mode sql.NullString + var count sql.NullInt64 + + for rows.Next() { + if err := rows.Scan(&datname, &mode, &count); err != nil { + return err + } + + if !datname.Valid || !mode.Valid { + continue + } + + countMetric := 0.0 + if count.Valid { + countMetric = float64(count.Int64) + } + + ch <- prometheus.MustNewConstMetric( + pgLocksDesc, + prometheus.GaugeValue, countMetric, + datname.String, mode.String, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_locks_test.go b/collector/pg_locks_test.go new file mode 100644 index 00000000..99597ea2 --- /dev/null +++ b/collector/pg_locks_test.go @@ -0,0 +1,60 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGLocksCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + + rows := sqlmock.NewRows([]string{"datname", "mode", "count"}). + AddRow("test", "exclusivelock", 42) + + mock.ExpectQuery(sanitizeQuery(pgLocksQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGLocksCollector{} + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGLocksCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{"datname": "test", "mode": "exclusivelock"}, value: 42, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_postmaster.go b/collector/pg_postmaster.go index eae82d56..b81e4f90 100644 --- a/collector/pg_postmaster.go +++ b/collector/pg_postmaster.go @@ -15,6 +15,7 @@ package collector import ( "context" + "database/sql" "github.com/prometheus/client_golang/prometheus" ) @@ -22,7 +23,7 @@ import ( const postmasterSubsystem = "postmaster" func init() { - registerCollector(postmasterSubsystem, defaultEnabled, NewPGPostmasterCollector) + registerCollector(postmasterSubsystem, defaultDisabled, NewPGPostmasterCollector) } type PGPostmasterCollector struct { @@ -43,7 +44,7 @@ var ( []string{}, nil, ) - pgPostmasterQuery = "SELECT pg_postmaster_start_time from pg_postmaster_start_time();" + pgPostmasterQuery = "SELECT extract(epoch from pg_postmaster_start_time) from pg_postmaster_start_time();" ) func (c *PGPostmasterCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { @@ -51,14 +52,18 @@ func (c *PGPostmasterCollector) Update(ctx context.Context, instance *instance, row := db.QueryRowContext(ctx, pgPostmasterQuery) - var startTimeSeconds float64 + var startTimeSeconds sql.NullFloat64 err := row.Scan(&startTimeSeconds) if err != nil { return err } + startTimeSecondsMetric := 0.0 + if startTimeSeconds.Valid { + startTimeSecondsMetric = startTimeSeconds.Float64 + } ch <- prometheus.MustNewConstMetric( pgPostMasterStartTimeSeconds, - prometheus.GaugeValue, startTimeSeconds, + prometheus.GaugeValue, startTimeSecondsMetric, ) return nil } diff --git a/collector/pg_postmaster_test.go b/collector/pg_postmaster_test.go index c40fe03a..8405b422 100644 --- a/collector/pg_postmaster_test.go +++ b/collector/pg_postmaster_test.go @@ -57,3 +57,39 @@ func TestPgPostmasterCollector(t *testing.T) { t.Errorf("there were unfulfilled exceptions: %s", err) } } + +func TestPgPostmasterCollectorNullTime(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + + mock.ExpectQuery(sanitizeQuery(pgPostmasterQuery)).WillReturnRows(sqlmock.NewRows([]string{"pg_postmaster_start_time"}). + AddRow(nil)) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGPostmasterCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGPostmasterCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{}, value: 0, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_process_idle.go b/collector/pg_process_idle.go index 06244975..c401ab56 100644 --- a/collector/pg_process_idle.go +++ b/collector/pg_process_idle.go @@ -15,21 +15,24 @@ package collector import ( "context" + "database/sql" "github.com/go-kit/log" + "github.com/lib/pq" "github.com/prometheus/client_golang/prometheus" ) -const processIdleSubsystem = "process_idle" - func init() { - registerCollector(processIdleSubsystem, defaultEnabled, NewPGProcessIdleCollector) + // Making this default disabled because we have no tests for it + registerCollector(processIdleSubsystem, defaultDisabled, NewPGProcessIdleCollector) } type PGProcessIdleCollector struct { log log.Logger } +const processIdleSubsystem = "process_idle" + func NewPGProcessIdleCollector(config collectorConfig) (Collector, error) { return &PGProcessIdleCollector{log: config.logger}, nil } @@ -37,7 +40,7 @@ func NewPGProcessIdleCollector(config collectorConfig) (Collector, error) { var pgProcessIdleSeconds = prometheus.NewDesc( prometheus.BuildFQName(namespace, processIdleSubsystem, "seconds"), "Idle time of server processes", - []string{"application_name"}, + []string{"state", "application_name"}, prometheus.Labels{}, ) @@ -47,15 +50,17 @@ func (PGProcessIdleCollector) Update(ctx context.Context, instance *instance, ch `WITH metrics AS ( SELECT + state, application_name, SUM(EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - state_change))::bigint)::float AS process_idle_seconds_sum, COUNT(*) AS process_idle_seconds_count FROM pg_stat_activity - WHERE state = 'idle' - GROUP BY application_name + WHERE state ~ '^idle' + GROUP BY state, application_name ), buckets AS ( SELECT + state, application_name, le, SUM( @@ -67,40 +72,61 @@ func (PGProcessIdleCollector) Update(ctx context.Context, instance *instance, ch FROM pg_stat_activity, UNNEST(ARRAY[1, 2, 5, 15, 30, 60, 90, 120, 300]) AS le - GROUP BY application_name, le - ORDER BY application_name, le + GROUP BY state, application_name, le + ORDER BY state, application_name, le ) SELECT + state, application_name, process_idle_seconds_sum as seconds_sum, process_idle_seconds_count as seconds_count, ARRAY_AGG(le) AS seconds, ARRAY_AGG(bucket) AS seconds_bucket - FROM metrics JOIN buckets USING (application_name) - GROUP BY 1, 2, 3;`) + FROM metrics JOIN buckets USING (state, application_name) + GROUP BY 1, 2, 3, 4;`) - var applicationName string - var secondsSum int64 - var secondsCount uint64 - var seconds []int64 - var secondsBucket []uint64 + var state sql.NullString + var applicationName sql.NullString + var secondsSum sql.NullFloat64 + var secondsCount sql.NullInt64 + var seconds []float64 + var secondsBucket []int64 - err := row.Scan(&applicationName, &secondsSum, &secondsCount, &seconds, &secondsBucket) + err := row.Scan(&state, &applicationName, &secondsSum, &secondsCount, pq.Array(&seconds), pq.Array(&secondsBucket)) + if err != nil { + return err + } var buckets = make(map[float64]uint64, len(seconds)) for i, second := range seconds { if i >= len(secondsBucket) { break } - buckets[float64(second)] = secondsBucket[i] + buckets[second] = uint64(secondsBucket[i]) } - if err != nil { - return err + + stateLabel := "unknown" + if state.Valid { + stateLabel = state.String + } + + applicationNameLabel := "unknown" + if applicationName.Valid { + applicationNameLabel = applicationName.String + } + + var secondsCountMetric uint64 + if secondsCount.Valid { + secondsCountMetric = uint64(secondsCount.Int64) + } + secondsSumMetric := 0.0 + if secondsSum.Valid { + secondsSumMetric = secondsSum.Float64 } ch <- prometheus.MustNewConstHistogram( pgProcessIdleSeconds, - secondsCount, float64(secondsSum), buckets, - applicationName, + secondsCountMetric, secondsSumMetric, buckets, + stateLabel, applicationNameLabel, ) return nil } diff --git a/collector/pg_replication.go b/collector/pg_replication.go index 1a8a3569..790f8532 100644 --- a/collector/pg_replication.go +++ b/collector/pg_replication.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/prometheus/client_golang/prometheus" ) @@ -30,7 +29,7 @@ type PGReplicationCollector struct { } func NewPGReplicationCollector(collectorConfig) (Collector, error) { - return &PGPostmasterCollector{}, nil + return &PGReplicationCollector{}, nil } var ( @@ -64,7 +63,8 @@ var ( END as is_replica` ) -func (c *PGReplicationCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (c *PGReplicationCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() row := db.QueryRowContext(ctx, pgReplicationQuery, ) diff --git a/collector/pg_replication_slot.go b/collector/pg_replication_slot.go index 4278923f..c625fd4d 100644 --- a/collector/pg_replication_slot.go +++ b/collector/pg_replication_slot.go @@ -15,6 +15,7 @@ package collector import ( "context" + "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -65,11 +66,14 @@ var ( pgReplicationSlotQuery = `SELECT slot_name, - pg_current_wal_lsn() - '0/0' AS current_wal_lsn, - coalesce(confirmed_flush_lsn, '0/0') - '0/0', + CASE WHEN pg_is_in_recovery() THEN + pg_last_wal_receive_lsn() - '0/0' + ELSE + pg_current_wal_lsn() - '0/0' + END AS current_wal_lsn, + COALESCE(confirmed_flush_lsn, '0/0') - '0/0', active - FROM - pg_replication_slots;` + FROM pg_replication_slots;` ) func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { @@ -82,36 +86,45 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance defer rows.Close() for rows.Next() { - var slotName string - var walLSN int64 - var flushLSN int64 - var isActive bool + var slotName sql.NullString + var walLSN sql.NullFloat64 + var flushLSN sql.NullFloat64 + var isActive sql.NullBool if err := rows.Scan(&slotName, &walLSN, &flushLSN, &isActive); err != nil { return err } - isActiveValue := 0 - if isActive { - isActiveValue = 1 + isActiveValue := 0.0 + if isActive.Valid && isActive.Bool { + isActiveValue = 1.0 + } + slotNameLabel := "unknown" + if slotName.Valid { + slotNameLabel = slotName.String } + var walLSNMetric float64 + if walLSN.Valid { + walLSNMetric = walLSN.Float64 + } ch <- prometheus.MustNewConstMetric( pgReplicationSlotCurrentWalDesc, - prometheus.GaugeValue, float64(walLSN), slotName, + prometheus.GaugeValue, walLSNMetric, slotNameLabel, ) - if isActive { + if isActive.Valid && isActive.Bool { + var flushLSNMetric float64 + if flushLSN.Valid { + flushLSNMetric = flushLSN.Float64 + } ch <- prometheus.MustNewConstMetric( pgReplicationSlotCurrentFlushDesc, - prometheus.GaugeValue, float64(flushLSN), slotName, + prometheus.GaugeValue, flushLSNMetric, slotNameLabel, ) } ch <- prometheus.MustNewConstMetric( pgReplicationSlotIsActiveDesc, - prometheus.GaugeValue, float64(isActiveValue), slotName, + prometheus.GaugeValue, isActiveValue, slotNameLabel, ) } - if err := rows.Err(); err != nil { - return err - } - return nil + return rows.Err() } diff --git a/collector/pg_replication_slot_test.go b/collector/pg_replication_slot_test.go index cb25b755..7e91ea26 100644 --- a/collector/pg_replication_slot_test.go +++ b/collector/pg_replication_slot_test.go @@ -103,3 +103,84 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { } } + +func TestPgReplicationSlotCollectorActiveNil(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + + columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"} + rows := sqlmock.NewRows(columns). + AddRow("test_slot", 6, 12, nil) + mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGReplicationSlotCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGReplicationSlotCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{"slot_name": "test_slot"}, value: 6, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot"}, value: 0, metricType: dto.MetricType_GAUGE}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} + +func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + + columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"} + rows := sqlmock.NewRows(columns). + AddRow(nil, nil, nil, true) + mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGReplicationSlotCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGReplicationSlotCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{"slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "unknown"}, value: 1, metricType: dto.MetricType_GAUGE}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_replication_test.go b/collector/pg_replication_test.go index 4d240cdf..b6df698e 100644 --- a/collector/pg_replication_test.go +++ b/collector/pg_replication_test.go @@ -29,6 +29,8 @@ func TestPgReplicationCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + columns := []string{"lag", "is_replica"} rows := sqlmock.NewRows(columns). AddRow(1000, 1) @@ -39,7 +41,7 @@ func TestPgReplicationCollector(t *testing.T) { defer close(ch) c := PGReplicationCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGReplicationCollector.Update: %s", err) } }() diff --git a/collector/pg_stat_activity_autovacuum.go b/collector/pg_stat_activity_autovacuum.go new file mode 100644 index 00000000..5e2d2d2c --- /dev/null +++ b/collector/pg_stat_activity_autovacuum.go @@ -0,0 +1,84 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const statActivityAutovacuumSubsystem = "stat_activity_autovacuum" + +func init() { + registerCollector(statActivityAutovacuumSubsystem, defaultDisabled, NewPGStatActivityAutovacuumCollector) +} + +type PGStatActivityAutovacuumCollector struct { + log log.Logger +} + +func NewPGStatActivityAutovacuumCollector(config collectorConfig) (Collector, error) { + return &PGStatActivityAutovacuumCollector{log: config.logger}, nil +} + +var ( + statActivityAutovacuumAgeInSeconds = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statActivityAutovacuumSubsystem, "timestamp_seconds"), + "Start timestamp of the vacuum process in seconds", + []string{"relname"}, + prometheus.Labels{}, + ) + + statActivityAutovacuumQuery = ` + SELECT + SPLIT_PART(query, '.', 2) AS relname, + EXTRACT(xact_start) AS timestamp_seconds + FROM + pg_catalog.pg_stat_activity + WHERE + query LIKE 'autovacuum:%' + ` +) + +func (PGStatActivityAutovacuumCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + statActivityAutovacuumQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var relname string + var ageInSeconds float64 + + if err := rows.Scan(&relname, &ageInSeconds); err != nil { + return err + } + + ch <- prometheus.MustNewConstMetric( + statActivityAutovacuumAgeInSeconds, + prometheus.GaugeValue, + ageInSeconds, relname, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_stat_activity_autovacuum_test.go b/collector/pg_stat_activity_autovacuum_test.go new file mode 100644 index 00000000..a6fcdbca --- /dev/null +++ b/collector/pg_stat_activity_autovacuum_test.go @@ -0,0 +1,62 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGStatActivityAutovacuumCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "relname", + "timestamp_seconds", + } + rows := sqlmock.NewRows(columns). + AddRow("test", 3600) + + mock.ExpectQuery(sanitizeQuery(statActivityAutovacuumQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatActivityAutovacuumCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatActivityAutovacuumCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"relname": "test"}, value: 3600, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_stat_bgwriter.go b/collector/pg_stat_bgwriter.go index 2bdef8d4..ec446d58 100644 --- a/collector/pg_stat_bgwriter.go +++ b/collector/pg_stat_bgwriter.go @@ -15,7 +15,7 @@ package collector import ( "context" - "time" + "database/sql" "github.com/prometheus/client_golang/prometheus" ) @@ -121,77 +121,113 @@ func (PGStatBGWriterCollector) Update(ctx context.Context, instance *instance, c row := db.QueryRowContext(ctx, statBGWriterQuery) - var cpt int - var cpr int - var cpwt float64 - var cpst float64 - var bcp int - var bc int - var mwc int - var bb int - var bbf int - var ba int - var sr time.Time + var cpt, cpr, bcp, bc, mwc, bb, bbf, ba sql.NullInt64 + var cpwt, cpst sql.NullFloat64 + var sr sql.NullTime err := row.Scan(&cpt, &cpr, &cpwt, &cpst, &bcp, &bc, &mwc, &bb, &bbf, &ba, &sr) if err != nil { return err } + cptMetric := 0.0 + if cpt.Valid { + cptMetric = float64(cpt.Int64) + } ch <- prometheus.MustNewConstMetric( statBGWriterCheckpointsTimedDesc, prometheus.CounterValue, - float64(cpt), + cptMetric, ) + cprMetric := 0.0 + if cpr.Valid { + cprMetric = float64(cpr.Int64) + } ch <- prometheus.MustNewConstMetric( statBGWriterCheckpointsReqDesc, prometheus.CounterValue, - float64(cpr), + cprMetric, ) + cpwtMetric := 0.0 + if cpwt.Valid { + cpwtMetric = float64(cpwt.Float64) + } ch <- prometheus.MustNewConstMetric( statBGWriterCheckpointsReqTimeDesc, prometheus.CounterValue, - float64(cpwt), + cpwtMetric, ) + cpstMetric := 0.0 + if cpst.Valid { + cpstMetric = float64(cpst.Float64) + } ch <- prometheus.MustNewConstMetric( statBGWriterCheckpointsSyncTimeDesc, prometheus.CounterValue, - float64(cpst), + cpstMetric, ) + bcpMetric := 0.0 + if bcp.Valid { + bcpMetric = float64(bcp.Int64) + } ch <- prometheus.MustNewConstMetric( statBGWriterBuffersCheckpointDesc, prometheus.CounterValue, - float64(bcp), + bcpMetric, ) + bcMetric := 0.0 + if bc.Valid { + bcMetric = float64(bc.Int64) + } ch <- prometheus.MustNewConstMetric( statBGWriterBuffersCleanDesc, prometheus.CounterValue, - float64(bc), + bcMetric, ) + mwcMetric := 0.0 + if mwc.Valid { + mwcMetric = float64(mwc.Int64) + } ch <- prometheus.MustNewConstMetric( statBGWriterMaxwrittenCleanDesc, prometheus.CounterValue, - float64(mwc), + mwcMetric, ) + bbMetric := 0.0 + if bb.Valid { + bbMetric = float64(bb.Int64) + } ch <- prometheus.MustNewConstMetric( statBGWriterBuffersBackendDesc, prometheus.CounterValue, - float64(bb), + bbMetric, ) + bbfMetric := 0.0 + if bbf.Valid { + bbfMetric = float64(bbf.Int64) + } ch <- prometheus.MustNewConstMetric( statBGWriterBuffersBackendFsyncDesc, prometheus.CounterValue, - float64(bbf), + bbfMetric, ) + baMetric := 0.0 + if ba.Valid { + baMetric = float64(ba.Int64) + } ch <- prometheus.MustNewConstMetric( statBGWriterBuffersAllocDesc, prometheus.CounterValue, - float64(ba), + baMetric, ) + srMetric := 0.0 + if sr.Valid { + srMetric = float64(sr.Time.Unix()) + } ch <- prometheus.MustNewConstMetric( statBGWriterStatsResetDesc, prometheus.CounterValue, - float64(sr.Unix()), + srMetric, ) return nil diff --git a/collector/pg_stat_bgwriter_test.go b/collector/pg_stat_bgwriter_test.go index 11f55f6b..1c2cf98d 100644 --- a/collector/pg_stat_bgwriter_test.go +++ b/collector/pg_stat_bgwriter_test.go @@ -51,7 +51,7 @@ func TestPGStatBGWriterCollector(t *testing.T) { } rows := sqlmock.NewRows(columns). - AddRow(354, 4945, 289097744, 1242257, 3275602074, 89320867, 450139, 2034563757, 0, 2725688749, srT) + AddRow(354, 4945, 289097744, 1242257, int64(3275602074), 89320867, 450139, 2034563757, 0, int64(2725688749), srT) mock.ExpectQuery(sanitizeQuery(statBGWriterQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -88,3 +88,64 @@ func TestPGStatBGWriterCollector(t *testing.T) { t.Errorf("there were unfulfilled exceptions: %s", err) } } + +func TestPGStatBGWriterCollectorNullValues(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + + columns := []string{ + "checkpoints_timed", + "checkpoints_req", + "checkpoint_write_time", + "checkpoint_sync_time", + "buffers_checkpoint", + "buffers_clean", + "maxwritten_clean", + "buffers_backend", + "buffers_backend_fsync", + "buffers_alloc", + "stats_reset"} + + rows := sqlmock.NewRows(columns). + AddRow(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + mock.ExpectQuery(sanitizeQuery(statBGWriterQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatBGWriterCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatBGWriterCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_stat_database.go b/collector/pg_stat_database.go index bb39a84b..382ff782 100644 --- a/collector/pg_stat_database.go +++ b/collector/pg_stat_database.go @@ -17,6 +17,8 @@ import ( "context" "database/sql" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" ) @@ -26,10 +28,12 @@ func init() { registerCollector(statDatabaseSubsystem, defaultEnabled, NewPGStatDatabaseCollector) } -type PGStatDatabaseCollector struct{} +type PGStatDatabaseCollector struct { + log log.Logger +} func NewPGStatDatabaseCollector(config collectorConfig) (Collector, error) { - return &PGStatDatabaseCollector{}, nil + return &PGStatDatabaseCollector{log: config.logger}, nil } var ( @@ -202,12 +206,9 @@ var ( []string{"datid", "datname"}, prometheus.Labels{}, ) -) -func (PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { - db := instance.getDB() - rows, err := db.QueryContext(ctx, - `SELECT + statDatabaseQuery = ` + SELECT datid ,datname ,numbackends @@ -228,7 +229,13 @@ func (PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, c ,blk_write_time ,stats_reset FROM pg_stat_database; - `, + ` +) + +func (c *PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + statDatabaseQuery, ) if err != nil { return err @@ -236,24 +243,8 @@ func (PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, c defer rows.Close() for rows.Next() { - var datid string - var datname string - var numBackends float64 - var xactCommit float64 - var xactRollback float64 - var blksRead float64 - var blksHit float64 - var tupReturned float64 - var tupFetched float64 - var tupInserted float64 - var tupUpdated float64 - var tupDeleted float64 - var conflicts float64 - var tempFiles float64 - var tempBytes float64 - var deadlocks float64 - var blkReadTime float64 - var blkWriteTime float64 + var datid, datname sql.NullString + var numBackends, xactCommit, xactRollback, blksRead, blksHit, tupReturned, tupFetched, tupInserted, tupUpdated, tupDeleted, conflicts, tempFiles, tempBytes, deadlocks, blkReadTime, blkWriteTime sql.NullFloat64 var statsReset sql.NullTime err := rows.Scan( @@ -281,151 +272,203 @@ func (PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, c return err } + if !datid.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no datid") + continue + } + if !datname.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no datname") + continue + } + if !numBackends.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no numbackends") + continue + } + if !xactCommit.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no xact_commit") + continue + } + if !xactRollback.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no xact_rollback") + continue + } + if !blksRead.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blks_read") + continue + } + if !blksHit.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blks_hit") + continue + } + if !tupReturned.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_returned") + continue + } + if !tupFetched.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_fetched") + continue + } + if !tupInserted.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_inserted") + continue + } + if !tupUpdated.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_updated") + continue + } + if !tupDeleted.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_deleted") + continue + } + if !conflicts.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no conflicts") + continue + } + if !tempFiles.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no temp_files") + continue + } + if !tempBytes.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no temp_bytes") + continue + } + if !deadlocks.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no deadlocks") + continue + } + if !blkReadTime.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blk_read_time") + continue + } + if !blkWriteTime.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blk_write_time") + continue + } + if !statsReset.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no stats_reset") + continue + } + + labels := []string{datid.String, datname.String} + ch <- prometheus.MustNewConstMetric( statDatabaseNumbackends, prometheus.GaugeValue, - numBackends, - datid, - datname, + numBackends.Float64, + labels..., ) ch <- prometheus.MustNewConstMetric( statDatabaseXactCommit, prometheus.CounterValue, - xactCommit, - datid, - datname, + xactCommit.Float64, + labels..., ) ch <- prometheus.MustNewConstMetric( statDatabaseXactRollback, prometheus.CounterValue, - xactRollback, - datid, - datname, + xactRollback.Float64, + labels..., ) ch <- prometheus.MustNewConstMetric( statDatabaseBlksRead, prometheus.CounterValue, - blksRead, - datid, - datname, + blksRead.Float64, + labels..., ) ch <- prometheus.MustNewConstMetric( statDatabaseBlksHit, prometheus.CounterValue, - blksHit, - datid, - datname, + blksHit.Float64, + labels..., ) ch <- prometheus.MustNewConstMetric( statDatabaseTupReturned, prometheus.CounterValue, - tupReturned, - datid, - datname, + tupReturned.Float64, + labels..., ) ch <- prometheus.MustNewConstMetric( statDatabaseTupFetched, prometheus.CounterValue, - tupFetched, - datid, - datname, + tupFetched.Float64, + labels..., ) ch <- prometheus.MustNewConstMetric( statDatabaseTupInserted, prometheus.CounterValue, - tupInserted, - datid, - datname, + tupInserted.Float64, + labels..., ) ch <- prometheus.MustNewConstMetric( statDatabaseTupUpdated, prometheus.CounterValue, - tupUpdated, - datid, - datname, + tupUpdated.Float64, + labels..., ) ch <- prometheus.MustNewConstMetric( statDatabaseTupDeleted, prometheus.CounterValue, - tupDeleted, - datid, - datname, + tupDeleted.Float64, + labels..., ) ch <- prometheus.MustNewConstMetric( statDatabaseConflicts, prometheus.CounterValue, - conflicts, - datid, - datname, + conflicts.Float64, + labels..., ) ch <- prometheus.MustNewConstMetric( statDatabaseTempFiles, prometheus.CounterValue, - tempFiles, - datid, - datname, + tempFiles.Float64, + labels..., ) ch <- prometheus.MustNewConstMetric( statDatabaseTempBytes, prometheus.CounterValue, - tempBytes, - datid, - datname, + tempBytes.Float64, + labels..., ) ch <- prometheus.MustNewConstMetric( statDatabaseDeadlocks, prometheus.CounterValue, - deadlocks, - datid, - datname, + deadlocks.Float64, + labels..., ) ch <- prometheus.MustNewConstMetric( statDatabaseBlkReadTime, prometheus.CounterValue, - blkReadTime, - datid, - datname, + blkReadTime.Float64, + labels..., ) ch <- prometheus.MustNewConstMetric( statDatabaseBlkWriteTime, prometheus.CounterValue, - blkWriteTime, - datid, - datname, + blkWriteTime.Float64, + labels..., ) - if statsReset.Valid { - ch <- prometheus.MustNewConstMetric( - statDatabaseStatsReset, - prometheus.CounterValue, - float64(statsReset.Time.Unix()), - datid, - datname, - ) - } else { - ch <- prometheus.MustNewConstMetric( - statDatabaseStatsReset, - prometheus.CounterValue, - 0, - datid, - datname, - ) - } + ch <- prometheus.MustNewConstMetric( + statDatabaseStatsReset, + prometheus.CounterValue, + float64(statsReset.Time.Unix()), + labels..., + ) } return nil } diff --git a/collector/pg_stat_database_test.go b/collector/pg_stat_database_test.go new file mode 100644 index 00000000..70c73eb5 --- /dev/null +++ b/collector/pg_stat_database_test.go @@ -0,0 +1,408 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGStatDatabaseCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + + columns := []string{ + "datid", + "datname", + "numbackends", + "xact_commit", + "xact_rollback", + "blks_read", + "blks_hit", + "tup_returned", + "tup_fetched", + "tup_inserted", + "tup_updated", + "tup_deleted", + "conflicts", + "temp_files", + "temp_bytes", + "deadlocks", + "blk_read_time", + "blk_write_time", + "stats_reset", + } + + srT, err := time.Parse("2006-01-02 15:04:05.00000-07", "2023-05-25 17:10:42.81132-07") + if err != nil { + t.Fatalf("Error parsing time: %s", err) + } + + rows := sqlmock.NewRows(columns). + AddRow( + "pid", + "postgres", + 354, + 4945, + 289097744, + 1242257, + int64(3275602074), + 89320867, + 450139, + 2034563757, + 0, + int64(2725688749), + 23, + 52, + 74, + 925, + 16, + 823, + srT) + + mock.ExpectQuery(sanitizeQuery(statDatabaseQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatDatabaseCollector{ + log: log.With(log.NewNopLogger(), "collector", "pg_stat_database"), + } + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatDatabaseCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_GAUGE, value: 354}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 4945}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 289097744}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1242257}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 3275602074}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 89320867}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 450139}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2034563757}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2725688749}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 23}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 52}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 74}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 925}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 16}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 823}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1685059842}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} + +func TestPGStatDatabaseCollectorNullValues(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + srT, err := time.Parse("2006-01-02 15:04:05.00000-07", "2023-05-25 17:10:42.81132-07") + if err != nil { + t.Fatalf("Error parsing time: %s", err) + } + inst := &instance{db: db} + + columns := []string{ + "datid", + "datname", + "numbackends", + "xact_commit", + "xact_rollback", + "blks_read", + "blks_hit", + "tup_returned", + "tup_fetched", + "tup_inserted", + "tup_updated", + "tup_deleted", + "conflicts", + "temp_files", + "temp_bytes", + "deadlocks", + "blk_read_time", + "blk_write_time", + "stats_reset", + } + + rows := sqlmock.NewRows(columns). + AddRow( + nil, + "postgres", + 354, + 4945, + 289097744, + 1242257, + int64(3275602074), + 89320867, + 450139, + 2034563757, + 0, + int64(2725688749), + 23, + 52, + 74, + 925, + 16, + 823, + srT). + AddRow( + "pid", + "postgres", + 354, + 4945, + 289097744, + 1242257, + int64(3275602074), + 89320867, + 450139, + 2034563757, + 0, + int64(2725688749), + 23, + 52, + 74, + 925, + 16, + 823, + srT) + mock.ExpectQuery(sanitizeQuery(statDatabaseQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatDatabaseCollector{ + log: log.With(log.NewNopLogger(), "collector", "pg_stat_database"), + } + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatDatabaseCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_GAUGE, value: 354}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 4945}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 289097744}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1242257}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 3275602074}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 89320867}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 450139}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2034563757}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2725688749}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 23}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 52}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 74}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 925}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 16}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 823}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1685059842}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} +func TestPGStatDatabaseCollectorRowLeakTest(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + + columns := []string{ + "datid", + "datname", + "numbackends", + "xact_commit", + "xact_rollback", + "blks_read", + "blks_hit", + "tup_returned", + "tup_fetched", + "tup_inserted", + "tup_updated", + "tup_deleted", + "conflicts", + "temp_files", + "temp_bytes", + "deadlocks", + "blk_read_time", + "blk_write_time", + "stats_reset", + } + + srT, err := time.Parse("2006-01-02 15:04:05.00000-07", "2023-05-25 17:10:42.81132-07") + if err != nil { + t.Fatalf("Error parsing time: %s", err) + } + + rows := sqlmock.NewRows(columns). + AddRow( + "pid", + "postgres", + 354, + 4945, + 289097744, + 1242257, + int64(3275602074), + 89320867, + 450139, + 2034563757, + 0, + int64(2725688749), + 23, + 52, + 74, + 925, + 16, + 823, + srT). + AddRow( + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + ). + AddRow( + "pid", + "postgres", + 355, + 4946, + 289097745, + 1242258, + int64(3275602075), + 89320868, + 450140, + 2034563758, + 1, + int64(2725688750), + 24, + 53, + 75, + 926, + 17, + 824, + srT) + mock.ExpectQuery(sanitizeQuery(statDatabaseQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatDatabaseCollector{ + log: log.With(log.NewNopLogger(), "collector", "pg_stat_database"), + } + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatDatabaseCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_GAUGE, value: 354}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 4945}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 289097744}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1242257}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 3275602074}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 89320867}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 450139}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2034563757}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2725688749}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 23}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 52}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 74}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 925}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 16}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 823}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1685059842}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_GAUGE, value: 355}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 4946}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 289097745}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1242258}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 3275602075}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 89320868}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 450140}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2034563758}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2725688750}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 24}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 53}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 75}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 926}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 17}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 824}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1685059842}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_stat_statements.go b/collector/pg_stat_statements.go index eb629c38..c03e78b9 100644 --- a/collector/pg_stat_statements.go +++ b/collector/pg_stat_statements.go @@ -15,7 +15,9 @@ package collector import ( "context" + "database/sql" + "github.com/blang/semver/v4" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" ) @@ -89,60 +91,117 @@ var ( ) ORDER BY seconds_total DESC LIMIT 100;` + + pgStatStatementsNewQuery = `SELECT + pg_get_userbyid(userid) as user, + pg_database.datname, + pg_stat_statements.queryid, + pg_stat_statements.calls as calls_total, + pg_stat_statements.total_exec_time / 1000.0 as seconds_total, + pg_stat_statements.rows as rows_total, + pg_stat_statements.blk_read_time / 1000.0 as block_read_seconds_total, + pg_stat_statements.blk_write_time / 1000.0 as block_write_seconds_total + FROM pg_stat_statements + JOIN pg_database + ON pg_database.oid = pg_stat_statements.dbid + WHERE + total_exec_time > ( + SELECT percentile_cont(0.1) + WITHIN GROUP (ORDER BY total_exec_time) + FROM pg_stat_statements + ) + ORDER BY seconds_total DESC + LIMIT 100;` ) func (PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + query := pgStatStatementsQuery + if instance.version.GE(semver.MustParse("13.0.0")) { + query = pgStatStatementsNewQuery + } + db := instance.getDB() - rows, err := db.QueryContext(ctx, - pgStatStatementsQuery) + rows, err := db.QueryContext(ctx, query) if err != nil { return err } defer rows.Close() for rows.Next() { - var user string - var datname string - var queryid string - var callsTotal int64 - var secondsTotal float64 - var rowsTotal int64 - var blockReadSecondsTotal float64 - var blockWriteSecondsTotal float64 + var user, datname, queryid sql.NullString + var callsTotal, rowsTotal sql.NullInt64 + var secondsTotal, blockReadSecondsTotal, blockWriteSecondsTotal sql.NullFloat64 if err := rows.Scan(&user, &datname, &queryid, &callsTotal, &secondsTotal, &rowsTotal, &blockReadSecondsTotal, &blockWriteSecondsTotal); err != nil { return err } + userLabel := "unknown" + if user.Valid { + userLabel = user.String + } + datnameLabel := "unknown" + if datname.Valid { + datnameLabel = datname.String + } + queryidLabel := "unknown" + if queryid.Valid { + queryidLabel = queryid.String + } + + callsTotalMetric := 0.0 + if callsTotal.Valid { + callsTotalMetric = float64(callsTotal.Int64) + } ch <- prometheus.MustNewConstMetric( statSTatementsCallsTotal, prometheus.CounterValue, - float64(callsTotal), - user, datname, queryid, + callsTotalMetric, + userLabel, datnameLabel, queryidLabel, ) + + secondsTotalMetric := 0.0 + if secondsTotal.Valid { + secondsTotalMetric = secondsTotal.Float64 + } ch <- prometheus.MustNewConstMetric( statStatementsSecondsTotal, prometheus.CounterValue, - secondsTotal, - user, datname, queryid, + secondsTotalMetric, + userLabel, datnameLabel, queryidLabel, ) + + rowsTotalMetric := 0.0 + if rowsTotal.Valid { + rowsTotalMetric = float64(rowsTotal.Int64) + } ch <- prometheus.MustNewConstMetric( statStatementsRowsTotal, prometheus.CounterValue, - float64(rowsTotal), - user, datname, queryid, + rowsTotalMetric, + userLabel, datnameLabel, queryidLabel, ) + + blockReadSecondsTotalMetric := 0.0 + if blockReadSecondsTotal.Valid { + blockReadSecondsTotalMetric = blockReadSecondsTotal.Float64 + } ch <- prometheus.MustNewConstMetric( statStatementsBlockReadSecondsTotal, prometheus.CounterValue, - blockReadSecondsTotal, - user, datname, queryid, + blockReadSecondsTotalMetric, + userLabel, datnameLabel, queryidLabel, ) + + blockWriteSecondsTotalMetric := 0.0 + if blockWriteSecondsTotal.Valid { + blockWriteSecondsTotalMetric = blockWriteSecondsTotal.Float64 + } ch <- prometheus.MustNewConstMetric( statStatementsBlockWriteSecondsTotal, prometheus.CounterValue, - blockWriteSecondsTotal, - user, datname, queryid, + blockWriteSecondsTotalMetric, + userLabel, datnameLabel, queryidLabel, ) } if err := rows.Err(); err != nil { diff --git a/collector/pg_stat_statements_test.go b/collector/pg_stat_statements_test.go index 241699ad..08aba34c 100644 --- a/collector/pg_stat_statements_test.go +++ b/collector/pg_stat_statements_test.go @@ -17,6 +17,7 @@ import ( "testing" "github.com/DATA-DOG/go-sqlmock" + "github.com/blang/semver/v4" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/smartystreets/goconvey/convey" @@ -29,7 +30,7 @@ func TestPGStateStatementsCollector(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &instance{db: db, version: semver.MustParse("12.0.0")} columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). @@ -64,3 +65,89 @@ func TestPGStateStatementsCollector(t *testing.T) { t.Errorf("there were unfulfilled exceptions: %s", err) } } + +func TestPGStateStatementsCollectorNull(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db, version: semver.MustParse("13.3.7")} + + columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} + rows := sqlmock.NewRows(columns). + AddRow(nil, nil, nil, nil, nil, nil, nil, nil) + mock.ExpectQuery(sanitizeQuery(pgStatStatementsNewQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatStatementsCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} + +func TestPGStateStatementsCollectorNewPG(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db, version: semver.MustParse("13.3.7")} + + columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} + rows := sqlmock.NewRows(columns). + AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2) + mock.ExpectQuery(sanitizeQuery(pgStatStatementsNewQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatStatementsCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 5}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.4}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 100}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.1}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.2}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_stat_user_tables.go b/collector/pg_stat_user_tables.go index 48ae96eb..949a0ea2 100644 --- a/collector/pg_stat_user_tables.go +++ b/collector/pg_stat_user_tables.go @@ -15,7 +15,7 @@ package collector import ( "context" - "time" + "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -189,146 +189,235 @@ func (c *PGStatUserTablesCollector) Update(ctx context.Context, instance *instan defer rows.Close() for rows.Next() { - var datname string - var schemaname string - var relname string - var seqScan int64 - var seqTupRead int64 - var idxScan int64 - var idxTupFetch int64 - var nTupIns int64 - var nTupUpd int64 - var nTupDel int64 - var nTupHotUpd int64 - var nLiveTup int64 - var nDeadTup int64 - var nModSinceAnalyze int64 - var lastVacuum time.Time - var lastAutovacuum time.Time - var lastAnalyze time.Time - var lastAutoanalyze time.Time - var vacuumCount int64 - var autovacuumCount int64 - var analyzeCount int64 - var autoanalyzeCount int64 + var datname, schemaname, relname sql.NullString + var seqScan, seqTupRead, idxScan, idxTupFetch, nTupIns, nTupUpd, nTupDel, nTupHotUpd, nLiveTup, nDeadTup, + nModSinceAnalyze, vacuumCount, autovacuumCount, analyzeCount, autoanalyzeCount sql.NullInt64 + var lastVacuum, lastAutovacuum, lastAnalyze, lastAutoanalyze sql.NullTime if err := rows.Scan(&datname, &schemaname, &relname, &seqScan, &seqTupRead, &idxScan, &idxTupFetch, &nTupIns, &nTupUpd, &nTupDel, &nTupHotUpd, &nLiveTup, &nDeadTup, &nModSinceAnalyze, &lastVacuum, &lastAutovacuum, &lastAnalyze, &lastAutoanalyze, &vacuumCount, &autovacuumCount, &analyzeCount, &autoanalyzeCount); err != nil { return err } + datnameLabel := "unknown" + if datname.Valid { + datnameLabel = datname.String + } + schemanameLabel := "unknown" + if schemaname.Valid { + schemanameLabel = schemaname.String + } + relnameLabel := "unknown" + if relname.Valid { + relnameLabel = relname.String + } + + seqScanMetric := 0.0 + if seqScan.Valid { + seqScanMetric = float64(seqScan.Int64) + } ch <- prometheus.MustNewConstMetric( statUserTablesSeqScan, prometheus.CounterValue, - float64(seqScan), - datname, schemaname, relname, + seqScanMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + seqTupReadMetric := 0.0 + if seqTupRead.Valid { + seqTupReadMetric = float64(seqTupRead.Int64) + } ch <- prometheus.MustNewConstMetric( statUserTablesSeqTupRead, prometheus.CounterValue, - float64(seqTupRead), - datname, schemaname, relname, + seqTupReadMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + idxScanMetric := 0.0 + if idxScan.Valid { + idxScanMetric = float64(idxScan.Int64) + } ch <- prometheus.MustNewConstMetric( statUserTablesIdxScan, prometheus.CounterValue, - float64(idxScan), - datname, schemaname, relname, + idxScanMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + idxTupFetchMetric := 0.0 + if idxTupFetch.Valid { + idxTupFetchMetric = float64(idxTupFetch.Int64) + } ch <- prometheus.MustNewConstMetric( statUserTablesIdxTupFetch, prometheus.CounterValue, - float64(idxTupFetch), - datname, schemaname, relname, + idxTupFetchMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + nTupInsMetric := 0.0 + if nTupIns.Valid { + nTupInsMetric = float64(nTupIns.Int64) + } ch <- prometheus.MustNewConstMetric( statUserTablesNTupIns, prometheus.CounterValue, - float64(nTupIns), - datname, schemaname, relname, + nTupInsMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + nTupUpdMetric := 0.0 + if nTupUpd.Valid { + nTupUpdMetric = float64(nTupUpd.Int64) + } ch <- prometheus.MustNewConstMetric( statUserTablesNTupUpd, prometheus.CounterValue, - float64(nTupUpd), - datname, schemaname, relname, + nTupUpdMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + nTupDelMetric := 0.0 + if nTupDel.Valid { + nTupDelMetric = float64(nTupDel.Int64) + } ch <- prometheus.MustNewConstMetric( statUserTablesNTupDel, prometheus.CounterValue, - float64(nTupDel), - datname, schemaname, relname, + nTupDelMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + nTupHotUpdMetric := 0.0 + if nTupHotUpd.Valid { + nTupHotUpdMetric = float64(nTupHotUpd.Int64) + } ch <- prometheus.MustNewConstMetric( statUserTablesNTupHotUpd, prometheus.CounterValue, - float64(nTupHotUpd), - datname, schemaname, relname, + nTupHotUpdMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + nLiveTupMetric := 0.0 + if nLiveTup.Valid { + nLiveTupMetric = float64(nLiveTup.Int64) + } ch <- prometheus.MustNewConstMetric( statUserTablesNLiveTup, prometheus.GaugeValue, - float64(nLiveTup), - datname, schemaname, relname, + nLiveTupMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + nDeadTupMetric := 0.0 + if nDeadTup.Valid { + nDeadTupMetric = float64(nDeadTup.Int64) + } ch <- prometheus.MustNewConstMetric( statUserTablesNDeadTup, prometheus.GaugeValue, - float64(nDeadTup), - datname, schemaname, relname, + nDeadTupMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + nModSinceAnalyzeMetric := 0.0 + if nModSinceAnalyze.Valid { + nModSinceAnalyzeMetric = float64(nModSinceAnalyze.Int64) + } ch <- prometheus.MustNewConstMetric( statUserTablesNModSinceAnalyze, prometheus.GaugeValue, - float64(nModSinceAnalyze), - datname, schemaname, relname, + nModSinceAnalyzeMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + lastVacuumMetric := 0.0 + if lastVacuum.Valid { + lastVacuumMetric = float64(lastVacuum.Time.Unix()) + } ch <- prometheus.MustNewConstMetric( statUserTablesLastVacuum, prometheus.GaugeValue, - float64(lastVacuum.Unix()), - datname, schemaname, relname, + lastVacuumMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + lastAutovacuumMetric := 0.0 + if lastAutovacuum.Valid { + lastAutovacuumMetric = float64(lastAutovacuum.Time.Unix()) + } ch <- prometheus.MustNewConstMetric( statUserTablesLastAutovacuum, prometheus.GaugeValue, - float64(lastAutovacuum.Unix()), - datname, schemaname, relname, + lastAutovacuumMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + lastAnalyzeMetric := 0.0 + if lastAnalyze.Valid { + lastAnalyzeMetric = float64(lastAnalyze.Time.Unix()) + } ch <- prometheus.MustNewConstMetric( statUserTablesLastAnalyze, prometheus.GaugeValue, - float64(lastAnalyze.Unix()), - datname, schemaname, relname, + lastAnalyzeMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + lastAutoanalyzeMetric := 0.0 + if lastAutoanalyze.Valid { + lastAutoanalyzeMetric = float64(lastAutoanalyze.Time.Unix()) + } ch <- prometheus.MustNewConstMetric( statUserTablesLastAutoanalyze, prometheus.GaugeValue, - float64(lastAutoanalyze.Unix()), - datname, schemaname, relname, + lastAutoanalyzeMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + vacuumCountMetric := 0.0 + if vacuumCount.Valid { + vacuumCountMetric = float64(vacuumCount.Int64) + } ch <- prometheus.MustNewConstMetric( statUserTablesVacuumCount, prometheus.CounterValue, - float64(vacuumCount), - datname, schemaname, relname, + vacuumCountMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + autovacuumCountMetric := 0.0 + if autovacuumCount.Valid { + autovacuumCountMetric = float64(autovacuumCount.Int64) + } ch <- prometheus.MustNewConstMetric( statUserTablesAutovacuumCount, prometheus.CounterValue, - float64(autovacuumCount), - datname, schemaname, relname, + autovacuumCountMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + analyzeCountMetric := 0.0 + if analyzeCount.Valid { + analyzeCountMetric = float64(analyzeCount.Int64) + } ch <- prometheus.MustNewConstMetric( statUserTablesAnalyzeCount, prometheus.CounterValue, - float64(analyzeCount), - datname, schemaname, relname, + analyzeCountMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + autoanalyzeCountMetric := 0.0 + if autoanalyzeCount.Valid { + autoanalyzeCountMetric = float64(autoanalyzeCount.Int64) + } ch <- prometheus.MustNewConstMetric( statUserTablesAutoanalyzeCount, prometheus.CounterValue, - float64(autoanalyzeCount), - datname, schemaname, relname, + autoanalyzeCountMetric, + datnameLabel, schemanameLabel, relnameLabel, ) } diff --git a/collector/pg_stat_user_tables_test.go b/collector/pg_stat_user_tables_test.go index 8bb9bc31..e592fa5e 100644 --- a/collector/pg_stat_user_tables_test.go +++ b/collector/pg_stat_user_tables_test.go @@ -138,3 +138,102 @@ func TestPGStatUserTablesCollector(t *testing.T) { t.Errorf("there were unfulfilled exceptions: %s", err) } } + +func TestPGStatUserTablesCollectorNullValues(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + + columns := []string{ + "datname", + "schemaname", + "relname", + "seq_scan", + "seq_tup_read", + "idx_scan", + "idx_tup_fetch", + "n_tup_ins", + "n_tup_upd", + "n_tup_del", + "n_tup_hot_upd", + "n_live_tup", + "n_dead_tup", + "n_mod_since_analyze", + "last_vacuum", + "last_autovacuum", + "last_analyze", + "last_autoanalyze", + "vacuum_count", + "autovacuum_count", + "analyze_count", + "autoanalyze_count"} + rows := sqlmock.NewRows(columns). + AddRow("postgres", + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil) + mock.ExpectQuery(sanitizeQuery(statUserTablesQuery)).WillReturnRows(rows) + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatUserTablesCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatUserTablesCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0}, + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0}, + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0}, + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0}, + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0}, + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0}, + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0}, + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_stat_walreceiver.go b/collector/pg_stat_walreceiver.go new file mode 100644 index 00000000..3134c025 --- /dev/null +++ b/collector/pg_stat_walreceiver.go @@ -0,0 +1,269 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "database/sql" + "fmt" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" +) + +func init() { + registerCollector(statWalReceiverSubsystem, defaultDisabled, NewPGStatWalReceiverCollector) +} + +type PGStatWalReceiverCollector struct { + log log.Logger +} + +const statWalReceiverSubsystem = "stat_wal_receiver" + +func NewPGStatWalReceiverCollector(config collectorConfig) (Collector, error) { + return &PGStatWalReceiverCollector{log: config.logger}, nil +} + +var ( + labelCats = []string{"upstream_host", "slot_name", "status"} + statWalReceiverReceiveStartLsn = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_lsn"), + "First write-ahead log location used when WAL receiver is started represented as a decimal", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverReceiveStartTli = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_tli"), + "First timeline number used when WAL receiver is started", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverFlushedLSN = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "flushed_lsn"), + "Last write-ahead log location already received and flushed to disk, the initial value of this field being the first log location used when WAL receiver is started represented as a decimal", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverReceivedTli = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "received_tli"), + "Timeline number of last write-ahead log location received and flushed to disk", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverLastMsgSendTime = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "last_msg_send_time"), + "Send time of last message received from origin WAL sender", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverLastMsgReceiptTime = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "last_msg_receipt_time"), + "Send time of last message received from origin WAL sender", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverLatestEndLsn = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "latest_end_lsn"), + "Last write-ahead log location reported to origin WAL sender as integer", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverLatestEndTime = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "latest_end_time"), + "Time of last write-ahead log location reported to origin WAL sender", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverUpstreamNode = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "upstream_node"), + "Node ID of the upstream node", + labelCats, + prometheus.Labels{}, + ) + + pgStatWalColumnQuery = ` + SELECT + column_name + FROM information_schema.columns + WHERE + table_name = 'pg_stat_wal_receiver' and + column_name = 'flushed_lsn' + ` + + pgStatWalReceiverQueryTemplate = ` + SELECT + trim(both '''' from substring(conninfo from 'host=([^ ]*)')) as upstream_host, + slot_name, + status, + (receive_start_lsn- '0/0') % (2^52)::bigint as receive_start_lsn, + %s +receive_start_tli, + received_tli, + extract(epoch from last_msg_send_time) as last_msg_send_time, + extract(epoch from last_msg_receipt_time) as last_msg_receipt_time, + (latest_end_lsn - '0/0') % (2^52)::bigint as latest_end_lsn, + extract(epoch from latest_end_time) as latest_end_time, + substring(slot_name from 'repmgr_slot_([0-9]*)') as upstream_node + FROM pg_catalog.pg_stat_wal_receiver + ` +) + +func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + hasFlushedLSNRows, err := db.QueryContext(ctx, pgStatWalColumnQuery) + if err != nil { + return err + } + + defer hasFlushedLSNRows.Close() + hasFlushedLSN := hasFlushedLSNRows.Next() + var query string + if hasFlushedLSN { + query = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "(flushed_lsn - '0/0') % (2^52)::bigint as flushed_lsn,\n") + } else { + query = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "") + } + rows, err := db.QueryContext(ctx, query) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var upstreamHost, slotName, status sql.NullString + var receiveStartLsn, receiveStartTli, flushedLsn, receivedTli, latestEndLsn, upstreamNode sql.NullInt64 + var lastMsgSendTime, lastMsgReceiptTime, latestEndTime sql.NullFloat64 + + if hasFlushedLSN { + if err := rows.Scan(&upstreamHost, &slotName, &status, &receiveStartLsn, &receiveStartTli, &flushedLsn, &receivedTli, &lastMsgSendTime, &lastMsgReceiptTime, &latestEndLsn, &latestEndTime, &upstreamNode); err != nil { + return err + } + } else { + if err := rows.Scan(&upstreamHost, &slotName, &status, &receiveStartLsn, &receiveStartTli, &receivedTli, &lastMsgSendTime, &lastMsgReceiptTime, &latestEndLsn, &latestEndTime, &upstreamNode); err != nil { + return err + } + } + if !upstreamHost.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because upstream host is null") + continue + } + + if !slotName.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because slotname host is null") + continue + } + + if !status.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because status is null") + continue + } + labels := []string{upstreamHost.String, slotName.String, status.String} + + if !receiveStartLsn.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because receive_start_lsn is null") + continue + } + if !receiveStartTli.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because receive_start_tli is null") + continue + } + if hasFlushedLSN && !flushedLsn.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because flushed_lsn is null") + continue + } + if !receivedTli.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because received_tli is null") + continue + } + if !lastMsgSendTime.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because last_msg_send_time is null") + continue + } + if !lastMsgReceiptTime.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because last_msg_receipt_time is null") + continue + } + if !latestEndLsn.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because latest_end_lsn is null") + continue + } + if !latestEndTime.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because latest_end_time is null") + continue + } + if !upstreamNode.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because upstream_node is null") + continue + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverReceiveStartLsn, + prometheus.CounterValue, + float64(receiveStartLsn.Int64), + labels...) + + ch <- prometheus.MustNewConstMetric( + statWalReceiverReceiveStartTli, + prometheus.GaugeValue, + float64(receiveStartTli.Int64), + labels...) + + if hasFlushedLSN { + ch <- prometheus.MustNewConstMetric( + statWalReceiverFlushedLSN, + prometheus.CounterValue, + float64(flushedLsn.Int64), + labels...) + } + + ch <- prometheus.MustNewConstMetric( + statWalReceiverReceivedTli, + prometheus.GaugeValue, + float64(receivedTli.Int64), + labels...) + + ch <- prometheus.MustNewConstMetric( + statWalReceiverLastMsgSendTime, + prometheus.CounterValue, + float64(lastMsgSendTime.Float64), + labels...) + + ch <- prometheus.MustNewConstMetric( + statWalReceiverLastMsgReceiptTime, + prometheus.CounterValue, + float64(lastMsgReceiptTime.Float64), + labels...) + + ch <- prometheus.MustNewConstMetric( + statWalReceiverLatestEndLsn, + prometheus.CounterValue, + float64(latestEndLsn.Int64), + labels...) + + ch <- prometheus.MustNewConstMetric( + statWalReceiverLatestEndTime, + prometheus.CounterValue, + latestEndTime.Float64, + labels...) + + ch <- prometheus.MustNewConstMetric( + statWalReceiverUpstreamNode, + prometheus.GaugeValue, + float64(upstreamNode.Int64), + labels...) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_stat_walreceiver_test.go b/collector/pg_stat_walreceiver_test.go new file mode 100644 index 00000000..3e2418b2 --- /dev/null +++ b/collector/pg_stat_walreceiver_test.go @@ -0,0 +1,186 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "fmt" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +var queryWithFlushedLSN = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "(flushed_lsn - '0/0') % (2^52)::bigint as flushed_lsn,\n") +var queryWithNoFlushedLSN = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "") + +func TestPGStatWalReceiverCollectorWithFlushedLSN(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + infoSchemaColumns := []string{ + "column_name", + } + + infoSchemaRows := sqlmock.NewRows(infoSchemaColumns). + AddRow( + "flushed_lsn", + ) + + mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows) + + columns := []string{ + "upstream_host", + "slot_name", + "status", + "receive_start_lsn", + "receive_start_tli", + "flushed_lsn", + "received_tli", + "last_msg_send_time", + "last_msg_receipt_time", + "latest_end_lsn", + "latest_end_time", + "upstream_node", + } + rows := sqlmock.NewRows(columns). + AddRow( + "foo", + "bar", + "stopping", + 1200668684563608, + 1687321285, + 1200668684563609, + 1687321280, + 1687321275, + 1687321276, + 1200668684563610, + 1687321277, + 5, + ) + + mock.ExpectQuery(sanitizeQuery(queryWithFlushedLSN)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatWalReceiverCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321285, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1200668684563609, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321280, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321275, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321276, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1200668684563610, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321277, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 5, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } + +} + +func TestPGStatWalReceiverCollectorWithNoFlushedLSN(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + infoSchemaColumns := []string{ + "column_name", + } + + infoSchemaRows := sqlmock.NewRows(infoSchemaColumns) + + mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows) + + columns := []string{ + "upstream_host", + "slot_name", + "status", + "receive_start_lsn", + "receive_start_tli", + "received_tli", + "last_msg_send_time", + "last_msg_receipt_time", + "latest_end_lsn", + "latest_end_time", + "upstream_node", + } + rows := sqlmock.NewRows(columns). + AddRow( + "foo", + "bar", + "starting", + 1200668684563608, + 1687321285, + 1687321280, + 1687321275, + 1687321276, + 1200668684563610, + 1687321277, + 5, + ) + mock.ExpectQuery(sanitizeQuery(queryWithNoFlushedLSN)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatWalReceiverCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321285, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321280, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321275, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321276, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1200668684563610, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321277, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 5, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } + +} diff --git a/collector/pg_statio_user_tables.go b/collector/pg_statio_user_tables.go index 03d54161..4315fda0 100644 --- a/collector/pg_statio_user_tables.go +++ b/collector/pg_statio_user_tables.go @@ -15,6 +15,7 @@ package collector import ( "context" + "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -110,73 +111,112 @@ func (PGStatIOUserTablesCollector) Update(ctx context.Context, instance *instanc defer rows.Close() for rows.Next() { - var datname string - var schemaname string - var relname string - var heapBlksRead int64 - var heapBlksHit int64 - var idxBlksRead int64 - var idxBlksHit int64 - var toastBlksRead int64 - var toastBlksHit int64 - var tidxBlksRead int64 - var tidxBlksHit int64 + var datname, schemaname, relname sql.NullString + var heapBlksRead, heapBlksHit, idxBlksRead, idxBlksHit, toastBlksRead, toastBlksHit, tidxBlksRead, tidxBlksHit sql.NullInt64 if err := rows.Scan(&datname, &schemaname, &relname, &heapBlksRead, &heapBlksHit, &idxBlksRead, &idxBlksHit, &toastBlksRead, &toastBlksHit, &tidxBlksRead, &tidxBlksHit); err != nil { return err } + datnameLabel := "unknown" + if datname.Valid { + datnameLabel = datname.String + } + schemanameLabel := "unknown" + if schemaname.Valid { + schemanameLabel = schemaname.String + } + relnameLabel := "unknown" + if relname.Valid { + relnameLabel = relname.String + } + heapBlksReadMetric := 0.0 + if heapBlksRead.Valid { + heapBlksReadMetric = float64(heapBlksRead.Int64) + } ch <- prometheus.MustNewConstMetric( statioUserTablesHeapBlksRead, prometheus.CounterValue, - float64(heapBlksRead), - datname, schemaname, relname, + heapBlksReadMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + heapBlksHitMetric := 0.0 + if heapBlksHit.Valid { + heapBlksHitMetric = float64(heapBlksHit.Int64) + } ch <- prometheus.MustNewConstMetric( statioUserTablesHeapBlksHit, prometheus.CounterValue, - float64(heapBlksHit), - datname, schemaname, relname, + heapBlksHitMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + idxBlksReadMetric := 0.0 + if idxBlksRead.Valid { + idxBlksReadMetric = float64(idxBlksRead.Int64) + } ch <- prometheus.MustNewConstMetric( statioUserTablesIdxBlksRead, prometheus.CounterValue, - float64(idxBlksRead), - datname, schemaname, relname, + idxBlksReadMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + idxBlksHitMetric := 0.0 + if idxBlksHit.Valid { + idxBlksHitMetric = float64(idxBlksHit.Int64) + } ch <- prometheus.MustNewConstMetric( statioUserTablesIdxBlksHit, prometheus.CounterValue, - float64(idxBlksHit), - datname, schemaname, relname, + idxBlksHitMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + toastBlksReadMetric := 0.0 + if toastBlksRead.Valid { + toastBlksReadMetric = float64(toastBlksRead.Int64) + } ch <- prometheus.MustNewConstMetric( statioUserTablesToastBlksRead, prometheus.CounterValue, - float64(toastBlksRead), - datname, schemaname, relname, + toastBlksReadMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + toastBlksHitMetric := 0.0 + if toastBlksHit.Valid { + toastBlksHitMetric = float64(toastBlksHit.Int64) + } ch <- prometheus.MustNewConstMetric( statioUserTablesToastBlksHit, prometheus.CounterValue, - float64(toastBlksHit), - datname, schemaname, relname, + toastBlksHitMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + tidxBlksReadMetric := 0.0 + if tidxBlksRead.Valid { + tidxBlksReadMetric = float64(tidxBlksRead.Int64) + } ch <- prometheus.MustNewConstMetric( statioUserTablesTidxBlksRead, prometheus.CounterValue, - float64(tidxBlksRead), - datname, schemaname, relname, + tidxBlksReadMetric, + datnameLabel, schemanameLabel, relnameLabel, ) + + tidxBlksHitMetric := 0.0 + if tidxBlksHit.Valid { + tidxBlksHitMetric = float64(tidxBlksHit.Int64) + } ch <- prometheus.MustNewConstMetric( statioUserTablesTidxBlksHit, prometheus.CounterValue, - float64(tidxBlksHit), - datname, schemaname, relname, + tidxBlksHitMetric, + datnameLabel, schemanameLabel, relnameLabel, ) } - if err := rows.Err(); err != nil { - return err - } - return nil + return rows.Err() } diff --git a/collector/pg_statio_user_tables_test.go b/collector/pg_statio_user_tables_test.go index d57cab9f..c7304a38 100644 --- a/collector/pg_statio_user_tables_test.go +++ b/collector/pg_statio_user_tables_test.go @@ -88,3 +88,70 @@ func TestPGStatIOUserTablesCollector(t *testing.T) { t.Errorf("there were unfulfilled exceptions: %s", err) } } + +func TestPGStatIOUserTablesCollectorNullValues(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + + columns := []string{ + "datname", + "schemaname", + "relname", + "heap_blks_read", + "heap_blks_hit", + "idx_blks_read", + "idx_blks_hit", + "toast_blks_read", + "toast_blks_hit", + "tidx_blks_read", + "tidx_blks_hit", + } + rows := sqlmock.NewRows(columns). + AddRow(nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil) + mock.ExpectQuery(sanitizeQuery(statioUserTablesQuery)).WillReturnRows(rows) + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatIOUserTablesCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatIOUserTablesCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{"datname": "unknown", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datname": "unknown", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datname": "unknown", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datname": "unknown", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datname": "unknown", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datname": "unknown", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datname": "unknown", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datname": "unknown", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_wal.go b/collector/pg_wal.go new file mode 100644 index 00000000..afa8fcef --- /dev/null +++ b/collector/pg_wal.go @@ -0,0 +1,84 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" +) + +const walSubsystem = "wal" + +func init() { + registerCollector(walSubsystem, defaultEnabled, NewPGWALCollector) +} + +type PGWALCollector struct { +} + +func NewPGWALCollector(config collectorConfig) (Collector, error) { + return &PGWALCollector{}, nil +} + +var ( + pgWALSegments = prometheus.NewDesc( + prometheus.BuildFQName( + namespace, + walSubsystem, + "segments", + ), + "Number of WAL segments", + []string{}, nil, + ) + pgWALSize = prometheus.NewDesc( + prometheus.BuildFQName( + namespace, + walSubsystem, + "size_bytes", + ), + "Total size of WAL segments", + []string{}, nil, + ) + + pgWALQuery = ` + SELECT + COUNT(*) AS segments, + SUM(size) AS size + FROM pg_ls_waldir() + WHERE name ~ '^[0-9A-F]{24}$'` +) + +func (c PGWALCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + row := db.QueryRowContext(ctx, + pgWALQuery, + ) + + var segments uint64 + var size uint64 + err := row.Scan(&segments, &size) + if err != nil { + return err + } + ch <- prometheus.MustNewConstMetric( + pgWALSegments, + prometheus.GaugeValue, float64(segments), + ) + ch <- prometheus.MustNewConstMetric( + pgWALSize, + prometheus.GaugeValue, float64(size), + ) + return nil +} diff --git a/collector/pg_wal_test.go b/collector/pg_wal_test.go new file mode 100644 index 00000000..745105a1 --- /dev/null +++ b/collector/pg_wal_test.go @@ -0,0 +1,63 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPgWALCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + + columns := []string{"segments", "size"} + rows := sqlmock.NewRows(columns). + AddRow(47, 788529152) + mock.ExpectQuery(sanitizeQuery(pgWALQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGWALCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGWALCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{}, value: 47, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{}, value: 788529152, metricType: dto.MetricType_GAUGE}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_xlog_location.go b/collector/pg_xlog_location.go new file mode 100644 index 00000000..92ac44ac --- /dev/null +++ b/collector/pg_xlog_location.go @@ -0,0 +1,80 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const xlogLocationSubsystem = "xlog_location" + +func init() { + registerCollector(xlogLocationSubsystem, defaultDisabled, NewPGXlogLocationCollector) +} + +type PGXlogLocationCollector struct { + log log.Logger +} + +func NewPGXlogLocationCollector(config collectorConfig) (Collector, error) { + return &PGXlogLocationCollector{log: config.logger}, nil +} + +var ( + xlogLocationBytes = prometheus.NewDesc( + prometheus.BuildFQName(namespace, xlogLocationSubsystem, "bytes"), + "Postgres LSN (log sequence number) being generated on primary or replayed on replica (truncated to low 52 bits)", + []string{}, + prometheus.Labels{}, + ) + + xlogLocationQuery = ` + SELECT CASE + WHEN pg_is_in_recovery() THEN (pg_last_xlog_replay_location() - '0/0') % (2^52)::bigint + ELSE (pg_current_xlog_location() - '0/0') % (2^52)::bigint + END AS bytes + ` +) + +func (PGXlogLocationCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + xlogLocationQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var bytes float64 + + if err := rows.Scan(&bytes); err != nil { + return err + } + + ch <- prometheus.MustNewConstMetric( + xlogLocationBytes, + prometheus.GaugeValue, + bytes, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_xlog_location_test.go b/collector/pg_xlog_location_test.go new file mode 100644 index 00000000..561a7df9 --- /dev/null +++ b/collector/pg_xlog_location_test.go @@ -0,0 +1,61 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGXlogLocationCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "bytes", + } + rows := sqlmock.NewRows(columns). + AddRow(53401) + + mock.ExpectQuery(sanitizeQuery(xlogLocationQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGXlogLocationCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGXlogLocationCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{}, value: 53401, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/go.mod b/go.mod index 6444a66e..1e1d70c4 100644 --- a/go.mod +++ b/go.mod @@ -8,11 +8,11 @@ require ( github.com/blang/semver/v4 v4.0.0 github.com/go-kit/log v0.2.1 github.com/lib/pq v1.10.9 - github.com/prometheus/client_golang v1.15.1 + github.com/prometheus/client_golang v1.16.0 github.com/prometheus/client_model v0.4.0 github.com/prometheus/common v0.44.0 github.com/prometheus/exporter-toolkit v0.10.0 - github.com/smartystreets/goconvey v1.8.0 + github.com/smartystreets/goconvey v1.8.1 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 @@ -32,14 +32,14 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect - github.com/prometheus/procfs v0.9.0 // indirect + github.com/prometheus/procfs v0.10.1 // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect - github.com/smartystreets/assertions v1.13.1 // indirect + github.com/smarty/assertions v1.15.0 // indirect github.com/xhit/go-str2duration/v2 v2.1.0 // indirect golang.org/x/crypto v0.8.0 // indirect golang.org/x/net v0.10.0 // indirect golang.org/x/oauth2 v0.8.0 // indirect - golang.org/x/sync v0.1.0 // indirect + golang.org/x/sync v0.2.0 // indirect golang.org/x/sys v0.8.0 // indirect golang.org/x/text v0.9.0 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/go.sum b/go.sum index 12673c11..c6efce43 100644 --- a/go.sum +++ b/go.sum @@ -49,23 +49,23 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.15.1 h1:8tXpTmJbyH5lydzFPoxSIJ0J46jdh3tylbvM1xCv0LI= -github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk= +github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= +github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY= github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= github.com/prometheus/exporter-toolkit v0.10.0 h1:yOAzZTi4M22ZzVxD+fhy1URTuNRj/36uQJJ5S8IPza8= github.com/prometheus/exporter-toolkit v0.10.0/go.mod h1:+sVFzuvV5JDyw+Ih6p3zFxZNVnKQa3x5qPmDSiPu4ZY= -github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= -github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= +github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= +github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= -github.com/smartystreets/assertions v1.13.1 h1:Ef7KhSmjZcK6AVf9YbJdvPYG9avaF0ZxudX+ThRdWfU= -github.com/smartystreets/assertions v1.13.1/go.mod h1:cXr/IwVfSo/RbCSPhoAPv73p3hlSdrBH/b3SdnW/LMY= -github.com/smartystreets/goconvey v1.8.0 h1:Oi49ha/2MURE0WexF052Z0m+BNSGirfjg5RL+JXWq3w= -github.com/smartystreets/goconvey v1.8.0/go.mod h1:EdX8jtrTIj26jmjCOVNMVSIYAtgexqXKHOXW2Dx9JLg= +github.com/smarty/assertions v1.15.0 h1:cR//PqUBUiQRakZWqBiFFQ9wb8emQGDb0HeGdqGByCY= +github.com/smarty/assertions v1.15.0/go.mod h1:yABtdzeQs6l1brC900WlRNwj6ZR55d7B+E8C6HtKdec= +github.com/smartystreets/goconvey v1.8.1 h1:qGjIddxOk4grTu9JPOU31tVfq3cNdBlNa5sSznIX1xY= +github.com/smartystreets/goconvey v1.8.1/go.mod h1:+/u4qLyY6x1jReYOp7GOM2FSt8aP9CzCZL03bI28W60= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= @@ -80,8 +80,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/oauth2 v0.8.0 h1:6dkIjl3j3LtZ/O3sTgZTMsLKSftL/B8Zgq4huOIIUu8= golang.org/x/oauth2 v0.8.0/go.mod h1:yr7u4HXZRm1R1kBWqr/xKNqewf0plRYoB7sla+BCIXE= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI= +golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=