Merge branch 'prometheus-community:master' into master

This commit is contained in:
winfredwz 2023-08-22 20:22:32 +08:00 committed by GitHub
commit efce2f4e49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 2808 additions and 341 deletions

View File

@ -1,3 +1,5 @@
---
# This action is synced from https://github.com/prometheus/prometheus
name: golangci-lint
on:
push:
@ -27,4 +29,4 @@ jobs:
- name: Lint
uses: golangci/golangci-lint-action@v3.4.0
with:
version: v1.51.2
version: v1.53.3

View File

@ -1,6 +1,7 @@
---
linters:
enable:
- misspell
- revive
issues:
@ -14,3 +15,9 @@ linters-settings:
exclude-functions:
# Never check for logger errors.
- (github.com/go-kit/log.Logger).Log
revive:
rules:
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#unused-parameter
- name: unused-parameter
severity: warning
disabled: true

View File

@ -20,5 +20,4 @@ rules:
config/testdata/section_key_dup.bad.yml
line-length: disable
truthy:
ignore: |
.github/workflows/*.yml
check-keys: false

View File

@ -1,3 +1,13 @@
## 0.13.2 / 2023-07-21
* [BUGFIX] Fix type issues on pg_postmaster metrics #828
* [BUGFIX] Fix pg_replication collector instantiation #854
* [BUGFIX] Fix pg_process_idle metrics #855
## 0.13.1 / 2023-06-27
* [BUGFIX] Make collectors not fail on null values #823
## 0.13.0 / 2023-06-21
BREAKING CHANGES:

View File

@ -55,13 +55,13 @@ ifneq ($(shell command -v gotestsum > /dev/null),)
endif
endif
PROMU_VERSION ?= 0.14.0
PROMU_VERSION ?= 0.15.0
PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_VERSION)/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM).tar.gz
SKIP_GOLANGCI_LINT :=
GOLANGCI_LINT :=
GOLANGCI_LINT_OPTS ?=
GOLANGCI_LINT_VERSION ?= v1.51.2
GOLANGCI_LINT_VERSION ?= v1.53.3
# golangci-lint only supports linux, darwin and windows platforms on i386/amd64.
# windows isn't included here because of the path separator being different.
ifeq ($(GOHOSTOS),$(filter $(GOHOSTOS),linux darwin))

View File

@ -30,6 +30,26 @@ To use the multi-target functionality, send an http request to the endpoint `/pr
To avoid putting sensitive information like username and password in the URL, preconfigured auth modules are supported via the [auth_modules](#auth_modules) section of the config file. auth_modules for DSNs can be used with the `/probe` endpoint by specifying the `?auth_module=foo` http parameter.
Example Prometheus config:
```yaml
scrape_configs:
- job_name: 'postgres'
static_configs:
- targets:
- server1:5432
- server2:5432
metrics_path: /probe
params:
auth_module: [foo]
relabel_configs:
- source_labels: [__address__]
target_label: __param_target
- source_labels: [__param_target]
target_label: instance
- target_label: __address__
replacement: 127.0.0.1:9116 # The postgres exporter's real hostname:port.
```
## Configuration File
The configuration file controls the behavior of the exporter. It can be set using the `--config.file` command line flag and defaults to `postgres_exporter.yml`.
@ -73,7 +93,10 @@ This will build the docker image as `prometheuscommunity/postgres_exporter:${bra
* `[no-]collector.database`
Enable the database collector (default: enabled).
Enable the `database` collector (default: enabled).
* `[no-]collector.locks`
Enable the `locks` collector (default: enabled).
* `[no-]collector.postmaster`
Enable the `postmaster` collector (default: enabled).

View File

@ -1 +1 @@
0.13.0
0.13.1

View File

@ -176,15 +176,6 @@ var builtinMetricMaps = map[string]intermediateMetricMap{
true,
0,
},
"pg_locks": {
map[string]ColumnMapping{
"datname": {LABEL, "Name of this database", nil, nil},
"mode": {LABEL, "Type of Lock", nil, nil},
"count": {GAUGE, "Number of locks", nil, nil},
},
true,
0,
},
"pg_stat_replication": {
map[string]ColumnMapping{
"procpid": {DISCARD, "Process ID of a WAL sender process", nil, semver.MustParseRange("<9.2.0")},

View File

@ -46,31 +46,6 @@ type OverrideQuery struct {
// Overriding queries for namespaces above.
// TODO: validate this is a closed set in tests, and there are no overlaps
var queryOverrides = map[string][]OverrideQuery{
"pg_locks": {
{
semver.MustParseRange(">0.0.0"),
`SELECT pg_database.datname,tmp.mode,COALESCE(count,0) as count
FROM
(
VALUES ('accesssharelock'),
('rowsharelock'),
('rowexclusivelock'),
('shareupdateexclusivelock'),
('sharelock'),
('sharerowexclusivelock'),
('exclusivelock'),
('accessexclusivelock'),
('sireadlock')
) AS tmp(mode) CROSS JOIN pg_database
LEFT JOIN
(SELECT database, lower(mode) AS mode,count(*) AS count
FROM pg_locks WHERE database IS NOT NULL
GROUP BY database, lower(mode)
) AS tmp2
ON tmp.mode=tmp2.mode and pg_database.oid = tmp2.database ORDER BY 1`,
},
},
"pg_stat_replication": {
{
semver.MustParseRange(">=10.0.0"),

View File

@ -49,8 +49,14 @@ func readMetric(m prometheus.Metric) MetricResult {
func sanitizeQuery(q string) string {
q = strings.Join(strings.Fields(q), " ")
q = strings.Replace(q, "(", "\\(", -1)
q = strings.Replace(q, "?", "\\?", -1)
q = strings.Replace(q, ")", "\\)", -1)
q = strings.Replace(q, "[", "\\[", -1)
q = strings.Replace(q, "]", "\\]", -1)
q = strings.Replace(q, "{", "\\{", -1)
q = strings.Replace(q, "}", "\\}", -1)
q = strings.Replace(q, "*", "\\*", -1)
q = strings.Replace(q, "^", "\\^", -1)
q = strings.Replace(q, "$", "\\$", -1)
return q
}

View File

@ -15,6 +15,7 @@ package collector
import (
"context"
"database/sql"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
@ -79,38 +80,42 @@ func (c PGDatabaseCollector) Update(ctx context.Context, instance *instance, ch
var databases []string
for rows.Next() {
var datname string
var datname sql.NullString
if err := rows.Scan(&datname); err != nil {
return err
}
if !datname.Valid {
continue
}
// Ignore excluded databases
// Filtering is done here instead of in the query to avoid
// a complicated NOT IN query with a variable number of parameters
if sliceContains(c.excludedDatabases, datname) {
if sliceContains(c.excludedDatabases, datname.String) {
continue
}
databases = append(databases, datname)
databases = append(databases, datname.String)
}
// Query the size of the databases
for _, datname := range databases {
var size int64
var size sql.NullFloat64
err = db.QueryRowContext(ctx, pgDatabaseSizeQuery, datname).Scan(&size)
if err != nil {
return err
}
sizeMetric := 0.0
if size.Valid {
sizeMetric = size.Float64
}
ch <- prometheus.MustNewConstMetric(
pgDatabaseSizeDesc,
prometheus.GaugeValue, float64(size), datname,
prometheus.GaugeValue, sizeMetric, datname,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
return rows.Err()
}
func sliceContains(slice []string, s string) bool {

View File

@ -59,3 +59,43 @@ func TestPGDatabaseCollector(t *testing.T) {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
// TODO add a null db test
func TestPGDatabaseCollectorNullMetric(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
mock.ExpectQuery(sanitizeQuery(pgDatabaseQuery)).WillReturnRows(sqlmock.NewRows([]string{"datname"}).
AddRow("postgres"))
mock.ExpectQuery(sanitizeQuery(pgDatabaseSizeQuery)).WithArgs("postgres").WillReturnRows(sqlmock.NewRows([]string{"pg_database_size"}).
AddRow(nil))
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGDatabaseCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGDatabaseCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"datname": "postgres"}, value: 0, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

View File

@ -0,0 +1,115 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"context"
"database/sql"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
)
const databaseWraparoundSubsystem = "database_wraparound"
func init() {
registerCollector(databaseWraparoundSubsystem, defaultDisabled, NewPGDatabaseWraparoundCollector)
}
type PGDatabaseWraparoundCollector struct {
log log.Logger
}
func NewPGDatabaseWraparoundCollector(config collectorConfig) (Collector, error) {
return &PGDatabaseWraparoundCollector{log: config.logger}, nil
}
var (
databaseWraparoundAgeDatfrozenxid = prometheus.NewDesc(
prometheus.BuildFQName(namespace, databaseWraparoundSubsystem, "age_datfrozenxid_seconds"),
"Age of the oldest transaction ID that has not been frozen.",
[]string{"datname"},
prometheus.Labels{},
)
databaseWraparoundAgeDatminmxid = prometheus.NewDesc(
prometheus.BuildFQName(namespace, databaseWraparoundSubsystem, "age_datminmxid_seconds"),
"Age of the oldest multi-transaction ID that has been replaced with a transaction ID.",
[]string{"datname"},
prometheus.Labels{},
)
databaseWraparoundQuery = `
SELECT
datname,
age(d.datfrozenxid) as age_datfrozenxid,
mxid_age(d.datminmxid) as age_datminmxid
FROM
pg_catalog.pg_database d
WHERE
d.datallowconn
`
)
func (c *PGDatabaseWraparoundCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
databaseWraparoundQuery)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var datname sql.NullString
var ageDatfrozenxid, ageDatminmxid sql.NullFloat64
if err := rows.Scan(&datname, &ageDatfrozenxid, &ageDatminmxid); err != nil {
return err
}
if !datname.Valid {
level.Debug(c.log).Log("msg", "Skipping database with NULL name")
continue
}
if !ageDatfrozenxid.Valid {
level.Debug(c.log).Log("msg", "Skipping stat emission with NULL age_datfrozenxid")
continue
}
if !ageDatminmxid.Valid {
level.Debug(c.log).Log("msg", "Skipping stat emission with NULL age_datminmxid")
continue
}
ageDatfrozenxidMetric := ageDatfrozenxid.Float64
ch <- prometheus.MustNewConstMetric(
databaseWraparoundAgeDatfrozenxid,
prometheus.GaugeValue,
ageDatfrozenxidMetric, datname.String,
)
ageDatminmxidMetric := ageDatminmxid.Float64
ch <- prometheus.MustNewConstMetric(
databaseWraparoundAgeDatminmxid,
prometheus.GaugeValue,
ageDatminmxidMetric, datname.String,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,64 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"context"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)
func TestPGDatabaseWraparoundCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"datname",
"age_datfrozenxid",
"age_datminmxid",
}
rows := sqlmock.NewRows(columns).
AddRow("newreddit", 87126426, 0)
mock.ExpectQuery(sanitizeQuery(databaseWraparoundQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGDatabaseWraparoundCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGDatabaseWraparoundCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"datname": "newreddit"}, value: 87126426, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"datname": "newreddit"}, value: 0, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

129
collector/pg_locks.go Normal file
View File

@ -0,0 +1,129 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"context"
"database/sql"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)
const locksSubsystem = "locks"
func init() {
registerCollector(locksSubsystem, defaultEnabled, NewPGLocksCollector)
}
type PGLocksCollector struct {
log log.Logger
}
func NewPGLocksCollector(config collectorConfig) (Collector, error) {
return &PGLocksCollector{
log: config.logger,
}, nil
}
var (
pgLocksDesc = prometheus.NewDesc(
prometheus.BuildFQName(
namespace,
locksSubsystem,
"count",
),
"Number of locks",
[]string{"datname", "mode"}, nil,
)
pgLocksQuery = `
SELECT
pg_database.datname as datname,
tmp.mode as mode,
COALESCE(count, 0) as count
FROM
(
VALUES
('accesssharelock'),
('rowsharelock'),
('rowexclusivelock'),
('shareupdateexclusivelock'),
('sharelock'),
('sharerowexclusivelock'),
('exclusivelock'),
('accessexclusivelock'),
('sireadlock')
) AS tmp(mode)
CROSS JOIN pg_database
LEFT JOIN (
SELECT
database,
lower(mode) AS mode,
count(*) AS count
FROM
pg_locks
WHERE
database IS NOT NULL
GROUP BY
database,
lower(mode)
) AS tmp2 ON tmp.mode = tmp2.mode
and pg_database.oid = tmp2.database
ORDER BY
1
`
)
// Update implements Collector and exposes database locks.
// It is called by the Prometheus registry when collecting metrics.
func (c PGLocksCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
// Query the list of databases
rows, err := db.QueryContext(ctx,
pgLocksQuery,
)
if err != nil {
return err
}
defer rows.Close()
var datname, mode sql.NullString
var count sql.NullInt64
for rows.Next() {
if err := rows.Scan(&datname, &mode, &count); err != nil {
return err
}
if !datname.Valid || !mode.Valid {
continue
}
countMetric := 0.0
if count.Valid {
countMetric = float64(count.Int64)
}
ch <- prometheus.MustNewConstMetric(
pgLocksDesc,
prometheus.GaugeValue, countMetric,
datname.String, mode.String,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,60 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"context"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)
func TestPGLocksCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
rows := sqlmock.NewRows([]string{"datname", "mode", "count"}).
AddRow("test", "exclusivelock", 42)
mock.ExpectQuery(sanitizeQuery(pgLocksQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGLocksCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGLocksCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"datname": "test", "mode": "exclusivelock"}, value: 42, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

View File

@ -15,6 +15,7 @@ package collector
import (
"context"
"database/sql"
"github.com/prometheus/client_golang/prometheus"
)
@ -22,7 +23,7 @@ import (
const postmasterSubsystem = "postmaster"
func init() {
registerCollector(postmasterSubsystem, defaultEnabled, NewPGPostmasterCollector)
registerCollector(postmasterSubsystem, defaultDisabled, NewPGPostmasterCollector)
}
type PGPostmasterCollector struct {
@ -43,7 +44,7 @@ var (
[]string{}, nil,
)
pgPostmasterQuery = "SELECT pg_postmaster_start_time from pg_postmaster_start_time();"
pgPostmasterQuery = "SELECT extract(epoch from pg_postmaster_start_time) from pg_postmaster_start_time();"
)
func (c *PGPostmasterCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
@ -51,14 +52,18 @@ func (c *PGPostmasterCollector) Update(ctx context.Context, instance *instance,
row := db.QueryRowContext(ctx,
pgPostmasterQuery)
var startTimeSeconds float64
var startTimeSeconds sql.NullFloat64
err := row.Scan(&startTimeSeconds)
if err != nil {
return err
}
startTimeSecondsMetric := 0.0
if startTimeSeconds.Valid {
startTimeSecondsMetric = startTimeSeconds.Float64
}
ch <- prometheus.MustNewConstMetric(
pgPostMasterStartTimeSeconds,
prometheus.GaugeValue, startTimeSeconds,
prometheus.GaugeValue, startTimeSecondsMetric,
)
return nil
}

View File

@ -57,3 +57,39 @@ func TestPgPostmasterCollector(t *testing.T) {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
func TestPgPostmasterCollectorNullTime(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
mock.ExpectQuery(sanitizeQuery(pgPostmasterQuery)).WillReturnRows(sqlmock.NewRows([]string{"pg_postmaster_start_time"}).
AddRow(nil))
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGPostmasterCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGPostmasterCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{}, value: 0, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

View File

@ -15,21 +15,24 @@ package collector
import (
"context"
"database/sql"
"github.com/go-kit/log"
"github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus"
)
const processIdleSubsystem = "process_idle"
func init() {
registerCollector(processIdleSubsystem, defaultEnabled, NewPGProcessIdleCollector)
// Making this default disabled because we have no tests for it
registerCollector(processIdleSubsystem, defaultDisabled, NewPGProcessIdleCollector)
}
type PGProcessIdleCollector struct {
log log.Logger
}
const processIdleSubsystem = "process_idle"
func NewPGProcessIdleCollector(config collectorConfig) (Collector, error) {
return &PGProcessIdleCollector{log: config.logger}, nil
}
@ -37,7 +40,7 @@ func NewPGProcessIdleCollector(config collectorConfig) (Collector, error) {
var pgProcessIdleSeconds = prometheus.NewDesc(
prometheus.BuildFQName(namespace, processIdleSubsystem, "seconds"),
"Idle time of server processes",
[]string{"application_name"},
[]string{"state", "application_name"},
prometheus.Labels{},
)
@ -47,15 +50,17 @@ func (PGProcessIdleCollector) Update(ctx context.Context, instance *instance, ch
`WITH
metrics AS (
SELECT
state,
application_name,
SUM(EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - state_change))::bigint)::float AS process_idle_seconds_sum,
COUNT(*) AS process_idle_seconds_count
FROM pg_stat_activity
WHERE state = 'idle'
GROUP BY application_name
WHERE state ~ '^idle'
GROUP BY state, application_name
),
buckets AS (
SELECT
state,
application_name,
le,
SUM(
@ -67,40 +72,61 @@ func (PGProcessIdleCollector) Update(ctx context.Context, instance *instance, ch
FROM
pg_stat_activity,
UNNEST(ARRAY[1, 2, 5, 15, 30, 60, 90, 120, 300]) AS le
GROUP BY application_name, le
ORDER BY application_name, le
GROUP BY state, application_name, le
ORDER BY state, application_name, le
)
SELECT
state,
application_name,
process_idle_seconds_sum as seconds_sum,
process_idle_seconds_count as seconds_count,
ARRAY_AGG(le) AS seconds,
ARRAY_AGG(bucket) AS seconds_bucket
FROM metrics JOIN buckets USING (application_name)
GROUP BY 1, 2, 3;`)
FROM metrics JOIN buckets USING (state, application_name)
GROUP BY 1, 2, 3, 4;`)
var applicationName string
var secondsSum int64
var secondsCount uint64
var seconds []int64
var secondsBucket []uint64
var state sql.NullString
var applicationName sql.NullString
var secondsSum sql.NullFloat64
var secondsCount sql.NullInt64
var seconds []float64
var secondsBucket []int64
err := row.Scan(&applicationName, &secondsSum, &secondsCount, &seconds, &secondsBucket)
err := row.Scan(&state, &applicationName, &secondsSum, &secondsCount, pq.Array(&seconds), pq.Array(&secondsBucket))
if err != nil {
return err
}
var buckets = make(map[float64]uint64, len(seconds))
for i, second := range seconds {
if i >= len(secondsBucket) {
break
}
buckets[float64(second)] = secondsBucket[i]
buckets[second] = uint64(secondsBucket[i])
}
if err != nil {
return err
stateLabel := "unknown"
if state.Valid {
stateLabel = state.String
}
applicationNameLabel := "unknown"
if applicationName.Valid {
applicationNameLabel = applicationName.String
}
var secondsCountMetric uint64
if secondsCount.Valid {
secondsCountMetric = uint64(secondsCount.Int64)
}
secondsSumMetric := 0.0
if secondsSum.Valid {
secondsSumMetric = secondsSum.Float64
}
ch <- prometheus.MustNewConstHistogram(
pgProcessIdleSeconds,
secondsCount, float64(secondsSum), buckets,
applicationName,
secondsCountMetric, secondsSumMetric, buckets,
stateLabel, applicationNameLabel,
)
return nil
}

View File

@ -15,7 +15,6 @@ package collector
import (
"context"
"database/sql"
"github.com/prometheus/client_golang/prometheus"
)
@ -30,7 +29,7 @@ type PGReplicationCollector struct {
}
func NewPGReplicationCollector(collectorConfig) (Collector, error) {
return &PGPostmasterCollector{}, nil
return &PGReplicationCollector{}, nil
}
var (
@ -64,7 +63,8 @@ var (
END as is_replica`
)
func (c *PGReplicationCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error {
func (c *PGReplicationCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
row := db.QueryRowContext(ctx,
pgReplicationQuery,
)

View File

@ -15,6 +15,7 @@ package collector
import (
"context"
"database/sql"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
@ -65,11 +66,14 @@ var (
pgReplicationSlotQuery = `SELECT
slot_name,
pg_current_wal_lsn() - '0/0' AS current_wal_lsn,
coalesce(confirmed_flush_lsn, '0/0') - '0/0',
CASE WHEN pg_is_in_recovery() THEN
pg_last_wal_receive_lsn() - '0/0'
ELSE
pg_current_wal_lsn() - '0/0'
END AS current_wal_lsn,
COALESCE(confirmed_flush_lsn, '0/0') - '0/0',
active
FROM
pg_replication_slots;`
FROM pg_replication_slots;`
)
func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
@ -82,36 +86,45 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance
defer rows.Close()
for rows.Next() {
var slotName string
var walLSN int64
var flushLSN int64
var isActive bool
var slotName sql.NullString
var walLSN sql.NullFloat64
var flushLSN sql.NullFloat64
var isActive sql.NullBool
if err := rows.Scan(&slotName, &walLSN, &flushLSN, &isActive); err != nil {
return err
}
isActiveValue := 0
if isActive {
isActiveValue = 1
isActiveValue := 0.0
if isActive.Valid && isActive.Bool {
isActiveValue = 1.0
}
slotNameLabel := "unknown"
if slotName.Valid {
slotNameLabel = slotName.String
}
var walLSNMetric float64
if walLSN.Valid {
walLSNMetric = walLSN.Float64
}
ch <- prometheus.MustNewConstMetric(
pgReplicationSlotCurrentWalDesc,
prometheus.GaugeValue, float64(walLSN), slotName,
prometheus.GaugeValue, walLSNMetric, slotNameLabel,
)
if isActive {
if isActive.Valid && isActive.Bool {
var flushLSNMetric float64
if flushLSN.Valid {
flushLSNMetric = flushLSN.Float64
}
ch <- prometheus.MustNewConstMetric(
pgReplicationSlotCurrentFlushDesc,
prometheus.GaugeValue, float64(flushLSN), slotName,
prometheus.GaugeValue, flushLSNMetric, slotNameLabel,
)
}
ch <- prometheus.MustNewConstMetric(
pgReplicationSlotIsActiveDesc,
prometheus.GaugeValue, float64(isActiveValue), slotName,
prometheus.GaugeValue, isActiveValue, slotNameLabel,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
return rows.Err()
}

View File

@ -103,3 +103,84 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) {
}
}
func TestPgReplicationSlotCollectorActiveNil(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"}
rows := sqlmock.NewRows(columns).
AddRow("test_slot", 6, 12, nil)
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGReplicationSlotCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGReplicationSlotCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"slot_name": "test_slot"}, value: 6, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "test_slot"}, value: 0, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"}
rows := sqlmock.NewRows(columns).
AddRow(nil, nil, nil, true)
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGReplicationSlotCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGReplicationSlotCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "unknown"}, value: 1, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

View File

@ -29,6 +29,8 @@ func TestPgReplicationCollector(t *testing.T) {
}
defer db.Close()
inst := &instance{db: db}
columns := []string{"lag", "is_replica"}
rows := sqlmock.NewRows(columns).
AddRow(1000, 1)
@ -39,7 +41,7 @@ func TestPgReplicationCollector(t *testing.T) {
defer close(ch)
c := PGReplicationCollector{}
if err := c.Update(context.Background(), db, ch); err != nil {
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGReplicationCollector.Update: %s", err)
}
}()

View File

@ -0,0 +1,84 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"context"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)
const statActivityAutovacuumSubsystem = "stat_activity_autovacuum"
func init() {
registerCollector(statActivityAutovacuumSubsystem, defaultDisabled, NewPGStatActivityAutovacuumCollector)
}
type PGStatActivityAutovacuumCollector struct {
log log.Logger
}
func NewPGStatActivityAutovacuumCollector(config collectorConfig) (Collector, error) {
return &PGStatActivityAutovacuumCollector{log: config.logger}, nil
}
var (
statActivityAutovacuumAgeInSeconds = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statActivityAutovacuumSubsystem, "timestamp_seconds"),
"Start timestamp of the vacuum process in seconds",
[]string{"relname"},
prometheus.Labels{},
)
statActivityAutovacuumQuery = `
SELECT
SPLIT_PART(query, '.', 2) AS relname,
EXTRACT(xact_start) AS timestamp_seconds
FROM
pg_catalog.pg_stat_activity
WHERE
query LIKE 'autovacuum:%'
`
)
func (PGStatActivityAutovacuumCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
statActivityAutovacuumQuery)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var relname string
var ageInSeconds float64
if err := rows.Scan(&relname, &ageInSeconds); err != nil {
return err
}
ch <- prometheus.MustNewConstMetric(
statActivityAutovacuumAgeInSeconds,
prometheus.GaugeValue,
ageInSeconds, relname,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,62 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"context"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)
func TestPGStatActivityAutovacuumCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"relname",
"timestamp_seconds",
}
rows := sqlmock.NewRows(columns).
AddRow("test", 3600)
mock.ExpectQuery(sanitizeQuery(statActivityAutovacuumQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatActivityAutovacuumCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatActivityAutovacuumCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"relname": "test"}, value: 3600, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

View File

@ -15,7 +15,7 @@ package collector
import (
"context"
"time"
"database/sql"
"github.com/prometheus/client_golang/prometheus"
)
@ -121,77 +121,113 @@ func (PGStatBGWriterCollector) Update(ctx context.Context, instance *instance, c
row := db.QueryRowContext(ctx,
statBGWriterQuery)
var cpt int
var cpr int
var cpwt float64
var cpst float64
var bcp int
var bc int
var mwc int
var bb int
var bbf int
var ba int
var sr time.Time
var cpt, cpr, bcp, bc, mwc, bb, bbf, ba sql.NullInt64
var cpwt, cpst sql.NullFloat64
var sr sql.NullTime
err := row.Scan(&cpt, &cpr, &cpwt, &cpst, &bcp, &bc, &mwc, &bb, &bbf, &ba, &sr)
if err != nil {
return err
}
cptMetric := 0.0
if cpt.Valid {
cptMetric = float64(cpt.Int64)
}
ch <- prometheus.MustNewConstMetric(
statBGWriterCheckpointsTimedDesc,
prometheus.CounterValue,
float64(cpt),
cptMetric,
)
cprMetric := 0.0
if cpr.Valid {
cprMetric = float64(cpr.Int64)
}
ch <- prometheus.MustNewConstMetric(
statBGWriterCheckpointsReqDesc,
prometheus.CounterValue,
float64(cpr),
cprMetric,
)
cpwtMetric := 0.0
if cpwt.Valid {
cpwtMetric = float64(cpwt.Float64)
}
ch <- prometheus.MustNewConstMetric(
statBGWriterCheckpointsReqTimeDesc,
prometheus.CounterValue,
float64(cpwt),
cpwtMetric,
)
cpstMetric := 0.0
if cpst.Valid {
cpstMetric = float64(cpst.Float64)
}
ch <- prometheus.MustNewConstMetric(
statBGWriterCheckpointsSyncTimeDesc,
prometheus.CounterValue,
float64(cpst),
cpstMetric,
)
bcpMetric := 0.0
if bcp.Valid {
bcpMetric = float64(bcp.Int64)
}
ch <- prometheus.MustNewConstMetric(
statBGWriterBuffersCheckpointDesc,
prometheus.CounterValue,
float64(bcp),
bcpMetric,
)
bcMetric := 0.0
if bc.Valid {
bcMetric = float64(bc.Int64)
}
ch <- prometheus.MustNewConstMetric(
statBGWriterBuffersCleanDesc,
prometheus.CounterValue,
float64(bc),
bcMetric,
)
mwcMetric := 0.0
if mwc.Valid {
mwcMetric = float64(mwc.Int64)
}
ch <- prometheus.MustNewConstMetric(
statBGWriterMaxwrittenCleanDesc,
prometheus.CounterValue,
float64(mwc),
mwcMetric,
)
bbMetric := 0.0
if bb.Valid {
bbMetric = float64(bb.Int64)
}
ch <- prometheus.MustNewConstMetric(
statBGWriterBuffersBackendDesc,
prometheus.CounterValue,
float64(bb),
bbMetric,
)
bbfMetric := 0.0
if bbf.Valid {
bbfMetric = float64(bbf.Int64)
}
ch <- prometheus.MustNewConstMetric(
statBGWriterBuffersBackendFsyncDesc,
prometheus.CounterValue,
float64(bbf),
bbfMetric,
)
baMetric := 0.0
if ba.Valid {
baMetric = float64(ba.Int64)
}
ch <- prometheus.MustNewConstMetric(
statBGWriterBuffersAllocDesc,
prometheus.CounterValue,
float64(ba),
baMetric,
)
srMetric := 0.0
if sr.Valid {
srMetric = float64(sr.Time.Unix())
}
ch <- prometheus.MustNewConstMetric(
statBGWriterStatsResetDesc,
prometheus.CounterValue,
float64(sr.Unix()),
srMetric,
)
return nil

View File

@ -51,7 +51,7 @@ func TestPGStatBGWriterCollector(t *testing.T) {
}
rows := sqlmock.NewRows(columns).
AddRow(354, 4945, 289097744, 1242257, 3275602074, 89320867, 450139, 2034563757, 0, 2725688749, srT)
AddRow(354, 4945, 289097744, 1242257, int64(3275602074), 89320867, 450139, 2034563757, 0, int64(2725688749), srT)
mock.ExpectQuery(sanitizeQuery(statBGWriterQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
@ -88,3 +88,64 @@ func TestPGStatBGWriterCollector(t *testing.T) {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
func TestPGStatBGWriterCollectorNullValues(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"checkpoints_timed",
"checkpoints_req",
"checkpoint_write_time",
"checkpoint_sync_time",
"buffers_checkpoint",
"buffers_clean",
"maxwritten_clean",
"buffers_backend",
"buffers_backend_fsync",
"buffers_alloc",
"stats_reset"}
rows := sqlmock.NewRows(columns).
AddRow(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
mock.ExpectQuery(sanitizeQuery(statBGWriterQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatBGWriterCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatBGWriterCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

View File

@ -17,6 +17,8 @@ import (
"context"
"database/sql"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
)
@ -26,10 +28,12 @@ func init() {
registerCollector(statDatabaseSubsystem, defaultEnabled, NewPGStatDatabaseCollector)
}
type PGStatDatabaseCollector struct{}
type PGStatDatabaseCollector struct {
log log.Logger
}
func NewPGStatDatabaseCollector(config collectorConfig) (Collector, error) {
return &PGStatDatabaseCollector{}, nil
return &PGStatDatabaseCollector{log: config.logger}, nil
}
var (
@ -202,12 +206,9 @@ var (
[]string{"datid", "datname"},
prometheus.Labels{},
)
)
func (PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
`SELECT
statDatabaseQuery = `
SELECT
datid
,datname
,numbackends
@ -228,7 +229,13 @@ func (PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, c
,blk_write_time
,stats_reset
FROM pg_stat_database;
`,
`
)
func (c *PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
statDatabaseQuery,
)
if err != nil {
return err
@ -236,24 +243,8 @@ func (PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, c
defer rows.Close()
for rows.Next() {
var datid string
var datname string
var numBackends float64
var xactCommit float64
var xactRollback float64
var blksRead float64
var blksHit float64
var tupReturned float64
var tupFetched float64
var tupInserted float64
var tupUpdated float64
var tupDeleted float64
var conflicts float64
var tempFiles float64
var tempBytes float64
var deadlocks float64
var blkReadTime float64
var blkWriteTime float64
var datid, datname sql.NullString
var numBackends, xactCommit, xactRollback, blksRead, blksHit, tupReturned, tupFetched, tupInserted, tupUpdated, tupDeleted, conflicts, tempFiles, tempBytes, deadlocks, blkReadTime, blkWriteTime sql.NullFloat64
var statsReset sql.NullTime
err := rows.Scan(
@ -281,151 +272,203 @@ func (PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, c
return err
}
if !datid.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no datid")
continue
}
if !datname.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no datname")
continue
}
if !numBackends.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no numbackends")
continue
}
if !xactCommit.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no xact_commit")
continue
}
if !xactRollback.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no xact_rollback")
continue
}
if !blksRead.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blks_read")
continue
}
if !blksHit.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blks_hit")
continue
}
if !tupReturned.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_returned")
continue
}
if !tupFetched.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_fetched")
continue
}
if !tupInserted.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_inserted")
continue
}
if !tupUpdated.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_updated")
continue
}
if !tupDeleted.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_deleted")
continue
}
if !conflicts.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no conflicts")
continue
}
if !tempFiles.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no temp_files")
continue
}
if !tempBytes.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no temp_bytes")
continue
}
if !deadlocks.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no deadlocks")
continue
}
if !blkReadTime.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blk_read_time")
continue
}
if !blkWriteTime.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blk_write_time")
continue
}
if !statsReset.Valid {
level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no stats_reset")
continue
}
labels := []string{datid.String, datname.String}
ch <- prometheus.MustNewConstMetric(
statDatabaseNumbackends,
prometheus.GaugeValue,
numBackends,
datid,
datname,
numBackends.Float64,
labels...,
)
ch <- prometheus.MustNewConstMetric(
statDatabaseXactCommit,
prometheus.CounterValue,
xactCommit,
datid,
datname,
xactCommit.Float64,
labels...,
)
ch <- prometheus.MustNewConstMetric(
statDatabaseXactRollback,
prometheus.CounterValue,
xactRollback,
datid,
datname,
xactRollback.Float64,
labels...,
)
ch <- prometheus.MustNewConstMetric(
statDatabaseBlksRead,
prometheus.CounterValue,
blksRead,
datid,
datname,
blksRead.Float64,
labels...,
)
ch <- prometheus.MustNewConstMetric(
statDatabaseBlksHit,
prometheus.CounterValue,
blksHit,
datid,
datname,
blksHit.Float64,
labels...,
)
ch <- prometheus.MustNewConstMetric(
statDatabaseTupReturned,
prometheus.CounterValue,
tupReturned,
datid,
datname,
tupReturned.Float64,
labels...,
)
ch <- prometheus.MustNewConstMetric(
statDatabaseTupFetched,
prometheus.CounterValue,
tupFetched,
datid,
datname,
tupFetched.Float64,
labels...,
)
ch <- prometheus.MustNewConstMetric(
statDatabaseTupInserted,
prometheus.CounterValue,
tupInserted,
datid,
datname,
tupInserted.Float64,
labels...,
)
ch <- prometheus.MustNewConstMetric(
statDatabaseTupUpdated,
prometheus.CounterValue,
tupUpdated,
datid,
datname,
tupUpdated.Float64,
labels...,
)
ch <- prometheus.MustNewConstMetric(
statDatabaseTupDeleted,
prometheus.CounterValue,
tupDeleted,
datid,
datname,
tupDeleted.Float64,
labels...,
)
ch <- prometheus.MustNewConstMetric(
statDatabaseConflicts,
prometheus.CounterValue,
conflicts,
datid,
datname,
conflicts.Float64,
labels...,
)
ch <- prometheus.MustNewConstMetric(
statDatabaseTempFiles,
prometheus.CounterValue,
tempFiles,
datid,
datname,
tempFiles.Float64,
labels...,
)
ch <- prometheus.MustNewConstMetric(
statDatabaseTempBytes,
prometheus.CounterValue,
tempBytes,
datid,
datname,
tempBytes.Float64,
labels...,
)
ch <- prometheus.MustNewConstMetric(
statDatabaseDeadlocks,
prometheus.CounterValue,
deadlocks,
datid,
datname,
deadlocks.Float64,
labels...,
)
ch <- prometheus.MustNewConstMetric(
statDatabaseBlkReadTime,
prometheus.CounterValue,
blkReadTime,
datid,
datname,
blkReadTime.Float64,
labels...,
)
ch <- prometheus.MustNewConstMetric(
statDatabaseBlkWriteTime,
prometheus.CounterValue,
blkWriteTime,
datid,
datname,
blkWriteTime.Float64,
labels...,
)
if statsReset.Valid {
ch <- prometheus.MustNewConstMetric(
statDatabaseStatsReset,
prometheus.CounterValue,
float64(statsReset.Time.Unix()),
datid,
datname,
)
} else {
ch <- prometheus.MustNewConstMetric(
statDatabaseStatsReset,
prometheus.CounterValue,
0,
datid,
datname,
)
}
ch <- prometheus.MustNewConstMetric(
statDatabaseStatsReset,
prometheus.CounterValue,
float64(statsReset.Time.Unix()),
labels...,
)
}
return nil
}

View File

@ -0,0 +1,408 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"context"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)
func TestPGStatDatabaseCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"datid",
"datname",
"numbackends",
"xact_commit",
"xact_rollback",
"blks_read",
"blks_hit",
"tup_returned",
"tup_fetched",
"tup_inserted",
"tup_updated",
"tup_deleted",
"conflicts",
"temp_files",
"temp_bytes",
"deadlocks",
"blk_read_time",
"blk_write_time",
"stats_reset",
}
srT, err := time.Parse("2006-01-02 15:04:05.00000-07", "2023-05-25 17:10:42.81132-07")
if err != nil {
t.Fatalf("Error parsing time: %s", err)
}
rows := sqlmock.NewRows(columns).
AddRow(
"pid",
"postgres",
354,
4945,
289097744,
1242257,
int64(3275602074),
89320867,
450139,
2034563757,
0,
int64(2725688749),
23,
52,
74,
925,
16,
823,
srT)
mock.ExpectQuery(sanitizeQuery(statDatabaseQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatDatabaseCollector{
log: log.With(log.NewNopLogger(), "collector", "pg_stat_database"),
}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatDatabaseCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_GAUGE, value: 354},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 4945},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 289097744},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1242257},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 3275602074},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 89320867},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 450139},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2034563757},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2725688749},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 23},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 52},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 74},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 925},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 16},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 823},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1685059842},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
func TestPGStatDatabaseCollectorNullValues(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
srT, err := time.Parse("2006-01-02 15:04:05.00000-07", "2023-05-25 17:10:42.81132-07")
if err != nil {
t.Fatalf("Error parsing time: %s", err)
}
inst := &instance{db: db}
columns := []string{
"datid",
"datname",
"numbackends",
"xact_commit",
"xact_rollback",
"blks_read",
"blks_hit",
"tup_returned",
"tup_fetched",
"tup_inserted",
"tup_updated",
"tup_deleted",
"conflicts",
"temp_files",
"temp_bytes",
"deadlocks",
"blk_read_time",
"blk_write_time",
"stats_reset",
}
rows := sqlmock.NewRows(columns).
AddRow(
nil,
"postgres",
354,
4945,
289097744,
1242257,
int64(3275602074),
89320867,
450139,
2034563757,
0,
int64(2725688749),
23,
52,
74,
925,
16,
823,
srT).
AddRow(
"pid",
"postgres",
354,
4945,
289097744,
1242257,
int64(3275602074),
89320867,
450139,
2034563757,
0,
int64(2725688749),
23,
52,
74,
925,
16,
823,
srT)
mock.ExpectQuery(sanitizeQuery(statDatabaseQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatDatabaseCollector{
log: log.With(log.NewNopLogger(), "collector", "pg_stat_database"),
}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatDatabaseCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_GAUGE, value: 354},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 4945},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 289097744},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1242257},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 3275602074},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 89320867},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 450139},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2034563757},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2725688749},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 23},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 52},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 74},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 925},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 16},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 823},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1685059842},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
func TestPGStatDatabaseCollectorRowLeakTest(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"datid",
"datname",
"numbackends",
"xact_commit",
"xact_rollback",
"blks_read",
"blks_hit",
"tup_returned",
"tup_fetched",
"tup_inserted",
"tup_updated",
"tup_deleted",
"conflicts",
"temp_files",
"temp_bytes",
"deadlocks",
"blk_read_time",
"blk_write_time",
"stats_reset",
}
srT, err := time.Parse("2006-01-02 15:04:05.00000-07", "2023-05-25 17:10:42.81132-07")
if err != nil {
t.Fatalf("Error parsing time: %s", err)
}
rows := sqlmock.NewRows(columns).
AddRow(
"pid",
"postgres",
354,
4945,
289097744,
1242257,
int64(3275602074),
89320867,
450139,
2034563757,
0,
int64(2725688749),
23,
52,
74,
925,
16,
823,
srT).
AddRow(
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
).
AddRow(
"pid",
"postgres",
355,
4946,
289097745,
1242258,
int64(3275602075),
89320868,
450140,
2034563758,
1,
int64(2725688750),
24,
53,
75,
926,
17,
824,
srT)
mock.ExpectQuery(sanitizeQuery(statDatabaseQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatDatabaseCollector{
log: log.With(log.NewNopLogger(), "collector", "pg_stat_database"),
}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatDatabaseCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_GAUGE, value: 354},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 4945},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 289097744},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1242257},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 3275602074},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 89320867},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 450139},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2034563757},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2725688749},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 23},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 52},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 74},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 925},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 16},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 823},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1685059842},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_GAUGE, value: 355},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 4946},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 289097745},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1242258},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 3275602075},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 89320868},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 450140},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2034563758},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2725688750},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 24},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 53},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 75},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 926},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 17},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 824},
{labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1685059842},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

View File

@ -15,7 +15,9 @@ package collector
import (
"context"
"database/sql"
"github.com/blang/semver/v4"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)
@ -89,60 +91,117 @@ var (
)
ORDER BY seconds_total DESC
LIMIT 100;`
pgStatStatementsNewQuery = `SELECT
pg_get_userbyid(userid) as user,
pg_database.datname,
pg_stat_statements.queryid,
pg_stat_statements.calls as calls_total,
pg_stat_statements.total_exec_time / 1000.0 as seconds_total,
pg_stat_statements.rows as rows_total,
pg_stat_statements.blk_read_time / 1000.0 as block_read_seconds_total,
pg_stat_statements.blk_write_time / 1000.0 as block_write_seconds_total
FROM pg_stat_statements
JOIN pg_database
ON pg_database.oid = pg_stat_statements.dbid
WHERE
total_exec_time > (
SELECT percentile_cont(0.1)
WITHIN GROUP (ORDER BY total_exec_time)
FROM pg_stat_statements
)
ORDER BY seconds_total DESC
LIMIT 100;`
)
func (PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
query := pgStatStatementsQuery
if instance.version.GE(semver.MustParse("13.0.0")) {
query = pgStatStatementsNewQuery
}
db := instance.getDB()
rows, err := db.QueryContext(ctx,
pgStatStatementsQuery)
rows, err := db.QueryContext(ctx, query)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var user string
var datname string
var queryid string
var callsTotal int64
var secondsTotal float64
var rowsTotal int64
var blockReadSecondsTotal float64
var blockWriteSecondsTotal float64
var user, datname, queryid sql.NullString
var callsTotal, rowsTotal sql.NullInt64
var secondsTotal, blockReadSecondsTotal, blockWriteSecondsTotal sql.NullFloat64
if err := rows.Scan(&user, &datname, &queryid, &callsTotal, &secondsTotal, &rowsTotal, &blockReadSecondsTotal, &blockWriteSecondsTotal); err != nil {
return err
}
userLabel := "unknown"
if user.Valid {
userLabel = user.String
}
datnameLabel := "unknown"
if datname.Valid {
datnameLabel = datname.String
}
queryidLabel := "unknown"
if queryid.Valid {
queryidLabel = queryid.String
}
callsTotalMetric := 0.0
if callsTotal.Valid {
callsTotalMetric = float64(callsTotal.Int64)
}
ch <- prometheus.MustNewConstMetric(
statSTatementsCallsTotal,
prometheus.CounterValue,
float64(callsTotal),
user, datname, queryid,
callsTotalMetric,
userLabel, datnameLabel, queryidLabel,
)
secondsTotalMetric := 0.0
if secondsTotal.Valid {
secondsTotalMetric = secondsTotal.Float64
}
ch <- prometheus.MustNewConstMetric(
statStatementsSecondsTotal,
prometheus.CounterValue,
secondsTotal,
user, datname, queryid,
secondsTotalMetric,
userLabel, datnameLabel, queryidLabel,
)
rowsTotalMetric := 0.0
if rowsTotal.Valid {
rowsTotalMetric = float64(rowsTotal.Int64)
}
ch <- prometheus.MustNewConstMetric(
statStatementsRowsTotal,
prometheus.CounterValue,
float64(rowsTotal),
user, datname, queryid,
rowsTotalMetric,
userLabel, datnameLabel, queryidLabel,
)
blockReadSecondsTotalMetric := 0.0
if blockReadSecondsTotal.Valid {
blockReadSecondsTotalMetric = blockReadSecondsTotal.Float64
}
ch <- prometheus.MustNewConstMetric(
statStatementsBlockReadSecondsTotal,
prometheus.CounterValue,
blockReadSecondsTotal,
user, datname, queryid,
blockReadSecondsTotalMetric,
userLabel, datnameLabel, queryidLabel,
)
blockWriteSecondsTotalMetric := 0.0
if blockWriteSecondsTotal.Valid {
blockWriteSecondsTotalMetric = blockWriteSecondsTotal.Float64
}
ch <- prometheus.MustNewConstMetric(
statStatementsBlockWriteSecondsTotal,
prometheus.CounterValue,
blockWriteSecondsTotal,
user, datname, queryid,
blockWriteSecondsTotalMetric,
userLabel, datnameLabel, queryidLabel,
)
}
if err := rows.Err(); err != nil {

View File

@ -17,6 +17,7 @@ import (
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/blang/semver/v4"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
@ -29,7 +30,7 @@ func TestPGStateStatementsCollector(t *testing.T) {
}
defer db.Close()
inst := &instance{db: db}
inst := &instance{db: db, version: semver.MustParse("12.0.0")}
columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
rows := sqlmock.NewRows(columns).
@ -64,3 +65,89 @@ func TestPGStateStatementsCollector(t *testing.T) {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
func TestPGStateStatementsCollectorNull(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db, version: semver.MustParse("13.3.7")}
columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
rows := sqlmock.NewRows(columns).
AddRow(nil, nil, nil, nil, nil, nil, nil, nil)
mock.ExpectQuery(sanitizeQuery(pgStatStatementsNewQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatStatementsCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
func TestPGStateStatementsCollectorNewPG(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db, version: semver.MustParse("13.3.7")}
columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
rows := sqlmock.NewRows(columns).
AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2)
mock.ExpectQuery(sanitizeQuery(pgStatStatementsNewQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatStatementsCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 5},
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.4},
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 100},
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.1},
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.2},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

View File

@ -15,7 +15,7 @@ package collector
import (
"context"
"time"
"database/sql"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
@ -189,146 +189,235 @@ func (c *PGStatUserTablesCollector) Update(ctx context.Context, instance *instan
defer rows.Close()
for rows.Next() {
var datname string
var schemaname string
var relname string
var seqScan int64
var seqTupRead int64
var idxScan int64
var idxTupFetch int64
var nTupIns int64
var nTupUpd int64
var nTupDel int64
var nTupHotUpd int64
var nLiveTup int64
var nDeadTup int64
var nModSinceAnalyze int64
var lastVacuum time.Time
var lastAutovacuum time.Time
var lastAnalyze time.Time
var lastAutoanalyze time.Time
var vacuumCount int64
var autovacuumCount int64
var analyzeCount int64
var autoanalyzeCount int64
var datname, schemaname, relname sql.NullString
var seqScan, seqTupRead, idxScan, idxTupFetch, nTupIns, nTupUpd, nTupDel, nTupHotUpd, nLiveTup, nDeadTup,
nModSinceAnalyze, vacuumCount, autovacuumCount, analyzeCount, autoanalyzeCount sql.NullInt64
var lastVacuum, lastAutovacuum, lastAnalyze, lastAutoanalyze sql.NullTime
if err := rows.Scan(&datname, &schemaname, &relname, &seqScan, &seqTupRead, &idxScan, &idxTupFetch, &nTupIns, &nTupUpd, &nTupDel, &nTupHotUpd, &nLiveTup, &nDeadTup, &nModSinceAnalyze, &lastVacuum, &lastAutovacuum, &lastAnalyze, &lastAutoanalyze, &vacuumCount, &autovacuumCount, &analyzeCount, &autoanalyzeCount); err != nil {
return err
}
datnameLabel := "unknown"
if datname.Valid {
datnameLabel = datname.String
}
schemanameLabel := "unknown"
if schemaname.Valid {
schemanameLabel = schemaname.String
}
relnameLabel := "unknown"
if relname.Valid {
relnameLabel = relname.String
}
seqScanMetric := 0.0
if seqScan.Valid {
seqScanMetric = float64(seqScan.Int64)
}
ch <- prometheus.MustNewConstMetric(
statUserTablesSeqScan,
prometheus.CounterValue,
float64(seqScan),
datname, schemaname, relname,
seqScanMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
seqTupReadMetric := 0.0
if seqTupRead.Valid {
seqTupReadMetric = float64(seqTupRead.Int64)
}
ch <- prometheus.MustNewConstMetric(
statUserTablesSeqTupRead,
prometheus.CounterValue,
float64(seqTupRead),
datname, schemaname, relname,
seqTupReadMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
idxScanMetric := 0.0
if idxScan.Valid {
idxScanMetric = float64(idxScan.Int64)
}
ch <- prometheus.MustNewConstMetric(
statUserTablesIdxScan,
prometheus.CounterValue,
float64(idxScan),
datname, schemaname, relname,
idxScanMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
idxTupFetchMetric := 0.0
if idxTupFetch.Valid {
idxTupFetchMetric = float64(idxTupFetch.Int64)
}
ch <- prometheus.MustNewConstMetric(
statUserTablesIdxTupFetch,
prometheus.CounterValue,
float64(idxTupFetch),
datname, schemaname, relname,
idxTupFetchMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
nTupInsMetric := 0.0
if nTupIns.Valid {
nTupInsMetric = float64(nTupIns.Int64)
}
ch <- prometheus.MustNewConstMetric(
statUserTablesNTupIns,
prometheus.CounterValue,
float64(nTupIns),
datname, schemaname, relname,
nTupInsMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
nTupUpdMetric := 0.0
if nTupUpd.Valid {
nTupUpdMetric = float64(nTupUpd.Int64)
}
ch <- prometheus.MustNewConstMetric(
statUserTablesNTupUpd,
prometheus.CounterValue,
float64(nTupUpd),
datname, schemaname, relname,
nTupUpdMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
nTupDelMetric := 0.0
if nTupDel.Valid {
nTupDelMetric = float64(nTupDel.Int64)
}
ch <- prometheus.MustNewConstMetric(
statUserTablesNTupDel,
prometheus.CounterValue,
float64(nTupDel),
datname, schemaname, relname,
nTupDelMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
nTupHotUpdMetric := 0.0
if nTupHotUpd.Valid {
nTupHotUpdMetric = float64(nTupHotUpd.Int64)
}
ch <- prometheus.MustNewConstMetric(
statUserTablesNTupHotUpd,
prometheus.CounterValue,
float64(nTupHotUpd),
datname, schemaname, relname,
nTupHotUpdMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
nLiveTupMetric := 0.0
if nLiveTup.Valid {
nLiveTupMetric = float64(nLiveTup.Int64)
}
ch <- prometheus.MustNewConstMetric(
statUserTablesNLiveTup,
prometheus.GaugeValue,
float64(nLiveTup),
datname, schemaname, relname,
nLiveTupMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
nDeadTupMetric := 0.0
if nDeadTup.Valid {
nDeadTupMetric = float64(nDeadTup.Int64)
}
ch <- prometheus.MustNewConstMetric(
statUserTablesNDeadTup,
prometheus.GaugeValue,
float64(nDeadTup),
datname, schemaname, relname,
nDeadTupMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
nModSinceAnalyzeMetric := 0.0
if nModSinceAnalyze.Valid {
nModSinceAnalyzeMetric = float64(nModSinceAnalyze.Int64)
}
ch <- prometheus.MustNewConstMetric(
statUserTablesNModSinceAnalyze,
prometheus.GaugeValue,
float64(nModSinceAnalyze),
datname, schemaname, relname,
nModSinceAnalyzeMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
lastVacuumMetric := 0.0
if lastVacuum.Valid {
lastVacuumMetric = float64(lastVacuum.Time.Unix())
}
ch <- prometheus.MustNewConstMetric(
statUserTablesLastVacuum,
prometheus.GaugeValue,
float64(lastVacuum.Unix()),
datname, schemaname, relname,
lastVacuumMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
lastAutovacuumMetric := 0.0
if lastAutovacuum.Valid {
lastAutovacuumMetric = float64(lastAutovacuum.Time.Unix())
}
ch <- prometheus.MustNewConstMetric(
statUserTablesLastAutovacuum,
prometheus.GaugeValue,
float64(lastAutovacuum.Unix()),
datname, schemaname, relname,
lastAutovacuumMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
lastAnalyzeMetric := 0.0
if lastAnalyze.Valid {
lastAnalyzeMetric = float64(lastAnalyze.Time.Unix())
}
ch <- prometheus.MustNewConstMetric(
statUserTablesLastAnalyze,
prometheus.GaugeValue,
float64(lastAnalyze.Unix()),
datname, schemaname, relname,
lastAnalyzeMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
lastAutoanalyzeMetric := 0.0
if lastAutoanalyze.Valid {
lastAutoanalyzeMetric = float64(lastAutoanalyze.Time.Unix())
}
ch <- prometheus.MustNewConstMetric(
statUserTablesLastAutoanalyze,
prometheus.GaugeValue,
float64(lastAutoanalyze.Unix()),
datname, schemaname, relname,
lastAutoanalyzeMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
vacuumCountMetric := 0.0
if vacuumCount.Valid {
vacuumCountMetric = float64(vacuumCount.Int64)
}
ch <- prometheus.MustNewConstMetric(
statUserTablesVacuumCount,
prometheus.CounterValue,
float64(vacuumCount),
datname, schemaname, relname,
vacuumCountMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
autovacuumCountMetric := 0.0
if autovacuumCount.Valid {
autovacuumCountMetric = float64(autovacuumCount.Int64)
}
ch <- prometheus.MustNewConstMetric(
statUserTablesAutovacuumCount,
prometheus.CounterValue,
float64(autovacuumCount),
datname, schemaname, relname,
autovacuumCountMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
analyzeCountMetric := 0.0
if analyzeCount.Valid {
analyzeCountMetric = float64(analyzeCount.Int64)
}
ch <- prometheus.MustNewConstMetric(
statUserTablesAnalyzeCount,
prometheus.CounterValue,
float64(analyzeCount),
datname, schemaname, relname,
analyzeCountMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
autoanalyzeCountMetric := 0.0
if autoanalyzeCount.Valid {
autoanalyzeCountMetric = float64(autoanalyzeCount.Int64)
}
ch <- prometheus.MustNewConstMetric(
statUserTablesAutoanalyzeCount,
prometheus.CounterValue,
float64(autoanalyzeCount),
datname, schemaname, relname,
autoanalyzeCountMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
}

View File

@ -138,3 +138,102 @@ func TestPGStatUserTablesCollector(t *testing.T) {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
func TestPGStatUserTablesCollectorNullValues(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"datname",
"schemaname",
"relname",
"seq_scan",
"seq_tup_read",
"idx_scan",
"idx_tup_fetch",
"n_tup_ins",
"n_tup_upd",
"n_tup_del",
"n_tup_hot_upd",
"n_live_tup",
"n_dead_tup",
"n_mod_since_analyze",
"last_vacuum",
"last_autovacuum",
"last_analyze",
"last_autoanalyze",
"vacuum_count",
"autovacuum_count",
"analyze_count",
"autoanalyze_count"}
rows := sqlmock.NewRows(columns).
AddRow("postgres",
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil)
mock.ExpectQuery(sanitizeQuery(statUserTablesQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatUserTablesCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatUserTablesCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0},
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datname": "postgres", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

View File

@ -0,0 +1,269 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"context"
"database/sql"
"fmt"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
)
func init() {
registerCollector(statWalReceiverSubsystem, defaultDisabled, NewPGStatWalReceiverCollector)
}
type PGStatWalReceiverCollector struct {
log log.Logger
}
const statWalReceiverSubsystem = "stat_wal_receiver"
func NewPGStatWalReceiverCollector(config collectorConfig) (Collector, error) {
return &PGStatWalReceiverCollector{log: config.logger}, nil
}
var (
labelCats = []string{"upstream_host", "slot_name", "status"}
statWalReceiverReceiveStartLsn = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_lsn"),
"First write-ahead log location used when WAL receiver is started represented as a decimal",
labelCats,
prometheus.Labels{},
)
statWalReceiverReceiveStartTli = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_tli"),
"First timeline number used when WAL receiver is started",
labelCats,
prometheus.Labels{},
)
statWalReceiverFlushedLSN = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "flushed_lsn"),
"Last write-ahead log location already received and flushed to disk, the initial value of this field being the first log location used when WAL receiver is started represented as a decimal",
labelCats,
prometheus.Labels{},
)
statWalReceiverReceivedTli = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "received_tli"),
"Timeline number of last write-ahead log location received and flushed to disk",
labelCats,
prometheus.Labels{},
)
statWalReceiverLastMsgSendTime = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "last_msg_send_time"),
"Send time of last message received from origin WAL sender",
labelCats,
prometheus.Labels{},
)
statWalReceiverLastMsgReceiptTime = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "last_msg_receipt_time"),
"Send time of last message received from origin WAL sender",
labelCats,
prometheus.Labels{},
)
statWalReceiverLatestEndLsn = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "latest_end_lsn"),
"Last write-ahead log location reported to origin WAL sender as integer",
labelCats,
prometheus.Labels{},
)
statWalReceiverLatestEndTime = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "latest_end_time"),
"Time of last write-ahead log location reported to origin WAL sender",
labelCats,
prometheus.Labels{},
)
statWalReceiverUpstreamNode = prometheus.NewDesc(
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "upstream_node"),
"Node ID of the upstream node",
labelCats,
prometheus.Labels{},
)
pgStatWalColumnQuery = `
SELECT
column_name
FROM information_schema.columns
WHERE
table_name = 'pg_stat_wal_receiver' and
column_name = 'flushed_lsn'
`
pgStatWalReceiverQueryTemplate = `
SELECT
trim(both '''' from substring(conninfo from 'host=([^ ]*)')) as upstream_host,
slot_name,
status,
(receive_start_lsn- '0/0') % (2^52)::bigint as receive_start_lsn,
%s
receive_start_tli,
received_tli,
extract(epoch from last_msg_send_time) as last_msg_send_time,
extract(epoch from last_msg_receipt_time) as last_msg_receipt_time,
(latest_end_lsn - '0/0') % (2^52)::bigint as latest_end_lsn,
extract(epoch from latest_end_time) as latest_end_time,
substring(slot_name from 'repmgr_slot_([0-9]*)') as upstream_node
FROM pg_catalog.pg_stat_wal_receiver
`
)
func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
hasFlushedLSNRows, err := db.QueryContext(ctx, pgStatWalColumnQuery)
if err != nil {
return err
}
defer hasFlushedLSNRows.Close()
hasFlushedLSN := hasFlushedLSNRows.Next()
var query string
if hasFlushedLSN {
query = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "(flushed_lsn - '0/0') % (2^52)::bigint as flushed_lsn,\n")
} else {
query = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "")
}
rows, err := db.QueryContext(ctx, query)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var upstreamHost, slotName, status sql.NullString
var receiveStartLsn, receiveStartTli, flushedLsn, receivedTli, latestEndLsn, upstreamNode sql.NullInt64
var lastMsgSendTime, lastMsgReceiptTime, latestEndTime sql.NullFloat64
if hasFlushedLSN {
if err := rows.Scan(&upstreamHost, &slotName, &status, &receiveStartLsn, &receiveStartTli, &flushedLsn, &receivedTli, &lastMsgSendTime, &lastMsgReceiptTime, &latestEndLsn, &latestEndTime, &upstreamNode); err != nil {
return err
}
} else {
if err := rows.Scan(&upstreamHost, &slotName, &status, &receiveStartLsn, &receiveStartTli, &receivedTli, &lastMsgSendTime, &lastMsgReceiptTime, &latestEndLsn, &latestEndTime, &upstreamNode); err != nil {
return err
}
}
if !upstreamHost.Valid {
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because upstream host is null")
continue
}
if !slotName.Valid {
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because slotname host is null")
continue
}
if !status.Valid {
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because status is null")
continue
}
labels := []string{upstreamHost.String, slotName.String, status.String}
if !receiveStartLsn.Valid {
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because receive_start_lsn is null")
continue
}
if !receiveStartTli.Valid {
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because receive_start_tli is null")
continue
}
if hasFlushedLSN && !flushedLsn.Valid {
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because flushed_lsn is null")
continue
}
if !receivedTli.Valid {
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because received_tli is null")
continue
}
if !lastMsgSendTime.Valid {
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because last_msg_send_time is null")
continue
}
if !lastMsgReceiptTime.Valid {
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because last_msg_receipt_time is null")
continue
}
if !latestEndLsn.Valid {
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because latest_end_lsn is null")
continue
}
if !latestEndTime.Valid {
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because latest_end_time is null")
continue
}
if !upstreamNode.Valid {
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because upstream_node is null")
continue
}
ch <- prometheus.MustNewConstMetric(
statWalReceiverReceiveStartLsn,
prometheus.CounterValue,
float64(receiveStartLsn.Int64),
labels...)
ch <- prometheus.MustNewConstMetric(
statWalReceiverReceiveStartTli,
prometheus.GaugeValue,
float64(receiveStartTli.Int64),
labels...)
if hasFlushedLSN {
ch <- prometheus.MustNewConstMetric(
statWalReceiverFlushedLSN,
prometheus.CounterValue,
float64(flushedLsn.Int64),
labels...)
}
ch <- prometheus.MustNewConstMetric(
statWalReceiverReceivedTli,
prometheus.GaugeValue,
float64(receivedTli.Int64),
labels...)
ch <- prometheus.MustNewConstMetric(
statWalReceiverLastMsgSendTime,
prometheus.CounterValue,
float64(lastMsgSendTime.Float64),
labels...)
ch <- prometheus.MustNewConstMetric(
statWalReceiverLastMsgReceiptTime,
prometheus.CounterValue,
float64(lastMsgReceiptTime.Float64),
labels...)
ch <- prometheus.MustNewConstMetric(
statWalReceiverLatestEndLsn,
prometheus.CounterValue,
float64(latestEndLsn.Int64),
labels...)
ch <- prometheus.MustNewConstMetric(
statWalReceiverLatestEndTime,
prometheus.CounterValue,
latestEndTime.Float64,
labels...)
ch <- prometheus.MustNewConstMetric(
statWalReceiverUpstreamNode,
prometheus.GaugeValue,
float64(upstreamNode.Int64),
labels...)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,186 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"context"
"fmt"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)
var queryWithFlushedLSN = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "(flushed_lsn - '0/0') % (2^52)::bigint as flushed_lsn,\n")
var queryWithNoFlushedLSN = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "")
func TestPGStatWalReceiverCollectorWithFlushedLSN(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
infoSchemaColumns := []string{
"column_name",
}
infoSchemaRows := sqlmock.NewRows(infoSchemaColumns).
AddRow(
"flushed_lsn",
)
mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows)
columns := []string{
"upstream_host",
"slot_name",
"status",
"receive_start_lsn",
"receive_start_tli",
"flushed_lsn",
"received_tli",
"last_msg_send_time",
"last_msg_receipt_time",
"latest_end_lsn",
"latest_end_time",
"upstream_node",
}
rows := sqlmock.NewRows(columns).
AddRow(
"foo",
"bar",
"stopping",
1200668684563608,
1687321285,
1200668684563609,
1687321280,
1687321275,
1687321276,
1200668684563610,
1687321277,
5,
)
mock.ExpectQuery(sanitizeQuery(queryWithFlushedLSN)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatWalReceiverCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321285, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1200668684563609, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321280, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321275, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321276, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1200668684563610, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321277, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 5, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
func TestPGStatWalReceiverCollectorWithNoFlushedLSN(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
infoSchemaColumns := []string{
"column_name",
}
infoSchemaRows := sqlmock.NewRows(infoSchemaColumns)
mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows)
columns := []string{
"upstream_host",
"slot_name",
"status",
"receive_start_lsn",
"receive_start_tli",
"received_tli",
"last_msg_send_time",
"last_msg_receipt_time",
"latest_end_lsn",
"latest_end_time",
"upstream_node",
}
rows := sqlmock.NewRows(columns).
AddRow(
"foo",
"bar",
"starting",
1200668684563608,
1687321285,
1687321280,
1687321275,
1687321276,
1200668684563610,
1687321277,
5,
)
mock.ExpectQuery(sanitizeQuery(queryWithNoFlushedLSN)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatWalReceiverCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321285, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321280, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321275, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321276, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1200668684563610, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321277, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 5, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

View File

@ -15,6 +15,7 @@ package collector
import (
"context"
"database/sql"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
@ -110,73 +111,112 @@ func (PGStatIOUserTablesCollector) Update(ctx context.Context, instance *instanc
defer rows.Close()
for rows.Next() {
var datname string
var schemaname string
var relname string
var heapBlksRead int64
var heapBlksHit int64
var idxBlksRead int64
var idxBlksHit int64
var toastBlksRead int64
var toastBlksHit int64
var tidxBlksRead int64
var tidxBlksHit int64
var datname, schemaname, relname sql.NullString
var heapBlksRead, heapBlksHit, idxBlksRead, idxBlksHit, toastBlksRead, toastBlksHit, tidxBlksRead, tidxBlksHit sql.NullInt64
if err := rows.Scan(&datname, &schemaname, &relname, &heapBlksRead, &heapBlksHit, &idxBlksRead, &idxBlksHit, &toastBlksRead, &toastBlksHit, &tidxBlksRead, &tidxBlksHit); err != nil {
return err
}
datnameLabel := "unknown"
if datname.Valid {
datnameLabel = datname.String
}
schemanameLabel := "unknown"
if schemaname.Valid {
schemanameLabel = schemaname.String
}
relnameLabel := "unknown"
if relname.Valid {
relnameLabel = relname.String
}
heapBlksReadMetric := 0.0
if heapBlksRead.Valid {
heapBlksReadMetric = float64(heapBlksRead.Int64)
}
ch <- prometheus.MustNewConstMetric(
statioUserTablesHeapBlksRead,
prometheus.CounterValue,
float64(heapBlksRead),
datname, schemaname, relname,
heapBlksReadMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
heapBlksHitMetric := 0.0
if heapBlksHit.Valid {
heapBlksHitMetric = float64(heapBlksHit.Int64)
}
ch <- prometheus.MustNewConstMetric(
statioUserTablesHeapBlksHit,
prometheus.CounterValue,
float64(heapBlksHit),
datname, schemaname, relname,
heapBlksHitMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
idxBlksReadMetric := 0.0
if idxBlksRead.Valid {
idxBlksReadMetric = float64(idxBlksRead.Int64)
}
ch <- prometheus.MustNewConstMetric(
statioUserTablesIdxBlksRead,
prometheus.CounterValue,
float64(idxBlksRead),
datname, schemaname, relname,
idxBlksReadMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
idxBlksHitMetric := 0.0
if idxBlksHit.Valid {
idxBlksHitMetric = float64(idxBlksHit.Int64)
}
ch <- prometheus.MustNewConstMetric(
statioUserTablesIdxBlksHit,
prometheus.CounterValue,
float64(idxBlksHit),
datname, schemaname, relname,
idxBlksHitMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
toastBlksReadMetric := 0.0
if toastBlksRead.Valid {
toastBlksReadMetric = float64(toastBlksRead.Int64)
}
ch <- prometheus.MustNewConstMetric(
statioUserTablesToastBlksRead,
prometheus.CounterValue,
float64(toastBlksRead),
datname, schemaname, relname,
toastBlksReadMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
toastBlksHitMetric := 0.0
if toastBlksHit.Valid {
toastBlksHitMetric = float64(toastBlksHit.Int64)
}
ch <- prometheus.MustNewConstMetric(
statioUserTablesToastBlksHit,
prometheus.CounterValue,
float64(toastBlksHit),
datname, schemaname, relname,
toastBlksHitMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
tidxBlksReadMetric := 0.0
if tidxBlksRead.Valid {
tidxBlksReadMetric = float64(tidxBlksRead.Int64)
}
ch <- prometheus.MustNewConstMetric(
statioUserTablesTidxBlksRead,
prometheus.CounterValue,
float64(tidxBlksRead),
datname, schemaname, relname,
tidxBlksReadMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
tidxBlksHitMetric := 0.0
if tidxBlksHit.Valid {
tidxBlksHitMetric = float64(tidxBlksHit.Int64)
}
ch <- prometheus.MustNewConstMetric(
statioUserTablesTidxBlksHit,
prometheus.CounterValue,
float64(tidxBlksHit),
datname, schemaname, relname,
tidxBlksHitMetric,
datnameLabel, schemanameLabel, relnameLabel,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
return rows.Err()
}

View File

@ -88,3 +88,70 @@ func TestPGStatIOUserTablesCollector(t *testing.T) {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
func TestPGStatIOUserTablesCollectorNullValues(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"datname",
"schemaname",
"relname",
"heap_blks_read",
"heap_blks_hit",
"idx_blks_read",
"idx_blks_hit",
"toast_blks_read",
"toast_blks_hit",
"tidx_blks_read",
"tidx_blks_hit",
}
rows := sqlmock.NewRows(columns).
AddRow(nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil,
nil)
mock.ExpectQuery(sanitizeQuery(statioUserTablesQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatIOUserTablesCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatIOUserTablesCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{"datname": "unknown", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datname": "unknown", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datname": "unknown", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datname": "unknown", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datname": "unknown", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datname": "unknown", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datname": "unknown", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
{labels: labelMap{"datname": "unknown", "schemaname": "unknown", "relname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

84
collector/pg_wal.go Normal file
View File

@ -0,0 +1,84 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"context"
"github.com/prometheus/client_golang/prometheus"
)
const walSubsystem = "wal"
func init() {
registerCollector(walSubsystem, defaultEnabled, NewPGWALCollector)
}
type PGWALCollector struct {
}
func NewPGWALCollector(config collectorConfig) (Collector, error) {
return &PGWALCollector{}, nil
}
var (
pgWALSegments = prometheus.NewDesc(
prometheus.BuildFQName(
namespace,
walSubsystem,
"segments",
),
"Number of WAL segments",
[]string{}, nil,
)
pgWALSize = prometheus.NewDesc(
prometheus.BuildFQName(
namespace,
walSubsystem,
"size_bytes",
),
"Total size of WAL segments",
[]string{}, nil,
)
pgWALQuery = `
SELECT
COUNT(*) AS segments,
SUM(size) AS size
FROM pg_ls_waldir()
WHERE name ~ '^[0-9A-F]{24}$'`
)
func (c PGWALCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
row := db.QueryRowContext(ctx,
pgWALQuery,
)
var segments uint64
var size uint64
err := row.Scan(&segments, &size)
if err != nil {
return err
}
ch <- prometheus.MustNewConstMetric(
pgWALSegments,
prometheus.GaugeValue, float64(segments),
)
ch <- prometheus.MustNewConstMetric(
pgWALSize,
prometheus.GaugeValue, float64(size),
)
return nil
}

63
collector/pg_wal_test.go Normal file
View File

@ -0,0 +1,63 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"context"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)
func TestPgWALCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{"segments", "size"}
rows := sqlmock.NewRows(columns).
AddRow(47, 788529152)
mock.ExpectQuery(sanitizeQuery(pgWALQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGWALCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGWALCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{}, value: 47, metricType: dto.MetricType_GAUGE},
{labels: labelMap{}, value: 788529152, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

View File

@ -0,0 +1,80 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"context"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)
const xlogLocationSubsystem = "xlog_location"
func init() {
registerCollector(xlogLocationSubsystem, defaultDisabled, NewPGXlogLocationCollector)
}
type PGXlogLocationCollector struct {
log log.Logger
}
func NewPGXlogLocationCollector(config collectorConfig) (Collector, error) {
return &PGXlogLocationCollector{log: config.logger}, nil
}
var (
xlogLocationBytes = prometheus.NewDesc(
prometheus.BuildFQName(namespace, xlogLocationSubsystem, "bytes"),
"Postgres LSN (log sequence number) being generated on primary or replayed on replica (truncated to low 52 bits)",
[]string{},
prometheus.Labels{},
)
xlogLocationQuery = `
SELECT CASE
WHEN pg_is_in_recovery() THEN (pg_last_xlog_replay_location() - '0/0') % (2^52)::bigint
ELSE (pg_current_xlog_location() - '0/0') % (2^52)::bigint
END AS bytes
`
)
func (PGXlogLocationCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
xlogLocationQuery)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var bytes float64
if err := rows.Scan(&bytes); err != nil {
return err
}
ch <- prometheus.MustNewConstMetric(
xlogLocationBytes,
prometheus.GaugeValue,
bytes,
)
}
if err := rows.Err(); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,61 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"context"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)
func TestPGXlogLocationCollector(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
columns := []string{
"bytes",
}
rows := sqlmock.NewRows(columns).
AddRow(53401)
mock.ExpectQuery(sanitizeQuery(xlogLocationQuery)).WillReturnRows(rows)
ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGXlogLocationCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGXlogLocationCollector.Update: %s", err)
}
}()
expected := []MetricResult{
{labels: labelMap{}, value: 53401, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

10
go.mod
View File

@ -8,11 +8,11 @@ require (
github.com/blang/semver/v4 v4.0.0
github.com/go-kit/log v0.2.1
github.com/lib/pq v1.10.9
github.com/prometheus/client_golang v1.15.1
github.com/prometheus/client_golang v1.16.0
github.com/prometheus/client_model v0.4.0
github.com/prometheus/common v0.44.0
github.com/prometheus/exporter-toolkit v0.10.0
github.com/smartystreets/goconvey v1.8.0
github.com/smartystreets/goconvey v1.8.1
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
@ -32,14 +32,14 @@ require (
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/smartystreets/assertions v1.13.1 // indirect
github.com/smarty/assertions v1.15.0 // indirect
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
golang.org/x/crypto v0.8.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/oauth2 v0.8.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/appengine v1.6.7 // indirect

20
go.sum
View File

@ -49,23 +49,23 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.15.1 h1:8tXpTmJbyH5lydzFPoxSIJ0J46jdh3tylbvM1xCv0LI=
github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk=
github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8=
github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc=
github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY=
github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY=
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/exporter-toolkit v0.10.0 h1:yOAzZTi4M22ZzVxD+fhy1URTuNRj/36uQJJ5S8IPza8=
github.com/prometheus/exporter-toolkit v0.10.0/go.mod h1:+sVFzuvV5JDyw+Ih6p3zFxZNVnKQa3x5qPmDSiPu4ZY=
github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI=
github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg=
github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/smartystreets/assertions v1.13.1 h1:Ef7KhSmjZcK6AVf9YbJdvPYG9avaF0ZxudX+ThRdWfU=
github.com/smartystreets/assertions v1.13.1/go.mod h1:cXr/IwVfSo/RbCSPhoAPv73p3hlSdrBH/b3SdnW/LMY=
github.com/smartystreets/goconvey v1.8.0 h1:Oi49ha/2MURE0WexF052Z0m+BNSGirfjg5RL+JXWq3w=
github.com/smartystreets/goconvey v1.8.0/go.mod h1:EdX8jtrTIj26jmjCOVNMVSIYAtgexqXKHOXW2Dx9JLg=
github.com/smarty/assertions v1.15.0 h1:cR//PqUBUiQRakZWqBiFFQ9wb8emQGDb0HeGdqGByCY=
github.com/smarty/assertions v1.15.0/go.mod h1:yABtdzeQs6l1brC900WlRNwj6ZR55d7B+E8C6HtKdec=
github.com/smartystreets/goconvey v1.8.1 h1:qGjIddxOk4grTu9JPOU31tVfq3cNdBlNa5sSznIX1xY=
github.com/smartystreets/goconvey v1.8.1/go.mod h1:+/u4qLyY6x1jReYOp7GOM2FSt8aP9CzCZL03bI28W60=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
@ -80,8 +80,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/oauth2 v0.8.0 h1:6dkIjl3j3LtZ/O3sTgZTMsLKSftL/B8Zgq4huOIIUu8=
golang.org/x/oauth2 v0.8.0/go.mod h1:yr7u4HXZRm1R1kBWqr/xKNqewf0plRYoB7sla+BCIXE=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=