From 21a19ed252ef2a7106ce511fd701b937481a7a45 Mon Sep 17 00:00:00 2001 From: Joe Adams Date: Wed, 9 Feb 2022 21:28:40 -0500 Subject: [PATCH 1/4] Add pg_database collector Converts the pg_database metrics from queries.yaml to a built in collector. This is enabled by default because it is not likely to be a performance problem and likely very useful data. Signed-off-by: Joe Adams --- CHANGELOG.md | 4 +++ cmd/postgres_exporter/server.go | 14 ++++++++ collector/pg_database.go | 63 +++++++++++++++++++++++++++++++++ queries.yaml | 12 ------- 4 files changed, 81 insertions(+), 12 deletions(-) create mode 100644 collector/pg_database.go diff --git a/CHANGELOG.md b/CHANGELOG.md index b97bd9f9..4ac268c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## master / unreleased + +* [ENHANCEMENT] Add pg_database_size_bytes metric #613 + ## 0.10.1 / 2022-01-14 * [BUGFIX] Fix broken log-level for values other than debug. #560 diff --git a/cmd/postgres_exporter/server.go b/cmd/postgres_exporter/server.go index 8747dffa..6a08e998 100644 --- a/cmd/postgres_exporter/server.go +++ b/cmd/postgres_exporter/server.go @@ -14,13 +14,16 @@ package main import ( + "context" "database/sql" "fmt" + "log" "sync" "time" "github.com/blang/semver" "github.com/go-kit/log/level" + "github.com/prometheus-community/postgres_exporter/collector" "github.com/prometheus/client_golang/prometheus" ) @@ -128,6 +131,17 @@ func (s *Server) Scrape(ch chan<- prometheus.Metric, disableSettingsMetrics bool err = fmt.Errorf("queryNamespaceMappings returned %d errors", len(errMap)) } + { + pgdb := collector.NewPGDatabaseCollector() + metrics, err := pgdb.Update(context.TODO(), s.db, s.String()) + if err != nil { + log.Printf("Failed to scrape pg_database metrics: %s", err) + } + for _, m := range metrics { + ch <- m + } + } + return err } diff --git a/collector/pg_database.go b/collector/pg_database.go new file mode 100644 index 00000000..a7cb66f9 --- /dev/null +++ b/collector/pg_database.go @@ -0,0 +1,63 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "database/sql" + + "github.com/prometheus/client_golang/prometheus" +) + +type PGDatabaseCollector struct{} + +func NewPGDatabaseCollector() *PGDatabaseCollector { + return &PGDatabaseCollector{} +} + +var pgDatabase = map[string]*prometheus.Desc{ + "size_bytes": prometheus.NewDesc( + "pg_database_size_bytes", + "Disk space used by the database", + []string{"datname"}, nil, + ), +} + +func (PGDatabaseCollector) Update(ctx context.Context, db *sql.DB, server string) ([]prometheus.Metric, error) { + metrics := []prometheus.Metric{} + rows, err := db.QueryContext(ctx, + `SELECT pg_database.datname + ,pg_database_size(pg_database.datname) + FROM pg_database;`) + if err != nil { + return metrics, err + } + defer rows.Close() + + for rows.Next() { + var datname string + var size int64 + if err := rows.Scan(&datname, &size); err != nil { + return metrics, err + } + metrics = append(metrics, prometheus.MustNewConstMetric( + pgDatabase["size_bytes"], + prometheus.GaugeValue, float64(size), datname, + )) + } + if err := rows.Err(); err != nil { + return metrics, err + } + return metrics, nil +} diff --git a/queries.yaml b/queries.yaml index 1102da44..6f2008cb 100644 --- a/queries.yaml +++ b/queries.yaml @@ -146,18 +146,6 @@ pg_statio_user_tables: usage: "COUNTER" description: "Number of buffer hits in this table's TOAST table indexes (if any)" -pg_database: - query: "SELECT pg_database.datname, pg_database_size(pg_database.datname) as size_bytes FROM pg_database" - master: true - cache_seconds: 30 - metrics: - - datname: - usage: "LABEL" - description: "Name of the database" - - size_bytes: - usage: "GAUGE" - description: "Disk space used by the database" - # WARNING: This set of metrics can be very expensive on a busy server as every unique query executed will create an additional time series pg_stat_statements: query: "SELECT t2.rolname, t3.datname, queryid, calls, total_time / 1000 as total_time_seconds, min_time / 1000 as min_time_seconds, max_time / 1000 as max_time_seconds, mean_time / 1000 as mean_time_seconds, stddev_time / 1000 as stddev_time_seconds, rows, shared_blks_hit, shared_blks_read, shared_blks_dirtied, shared_blks_written, local_blks_hit, local_blks_read, local_blks_dirtied, local_blks_written, temp_blks_read, temp_blks_written, blk_read_time / 1000 as blk_read_time_seconds, blk_write_time / 1000 as blk_write_time_seconds FROM pg_stat_statements t1 JOIN pg_roles t2 ON (t1.userid=t2.oid) JOIN pg_database t3 ON (t1.dbid=t3.oid) WHERE t2.rolname != 'rdsadmin'" From 1981623d9d391a8076d52d3938d56f0648e11344 Mon Sep 17 00:00:00 2001 From: Joe Adams Date: Wed, 9 Feb 2022 21:56:48 -0500 Subject: [PATCH 2/4] Add missing server label Signed-off-by: Joe Adams --- collector/pg_database.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/collector/pg_database.go b/collector/pg_database.go index a7cb66f9..82475045 100644 --- a/collector/pg_database.go +++ b/collector/pg_database.go @@ -30,7 +30,7 @@ var pgDatabase = map[string]*prometheus.Desc{ "size_bytes": prometheus.NewDesc( "pg_database_size_bytes", "Disk space used by the database", - []string{"datname"}, nil, + []string{"datname", "server"}, nil, ), } @@ -53,7 +53,7 @@ func (PGDatabaseCollector) Update(ctx context.Context, db *sql.DB, server string } metrics = append(metrics, prometheus.MustNewConstMetric( pgDatabase["size_bytes"], - prometheus.GaugeValue, float64(size), datname, + prometheus.GaugeValue, float64(size), datname, server, )) } if err := rows.Err(); err != nil { From c3b02063690499f33e94ad9343e2c6922fe4f1af Mon Sep 17 00:00:00 2001 From: Joe Adams Date: Mon, 14 Feb 2022 21:12:27 -0500 Subject: [PATCH 3/4] Add collector interface Uses node_exporter style collector registration Signed-off-by: Joe Adams --- cmd/postgres_exporter/main.go | 8 ++ cmd/postgres_exporter/server.go | 14 --- collector/collector.go | 210 ++++++++++++++++++++++++++++++++ collector/pg_database.go | 38 +++--- collector/server.go | 100 +++++++++++++++ 5 files changed, 342 insertions(+), 28 deletions(-) create mode 100644 collector/collector.go create mode 100644 collector/server.go diff --git a/cmd/postgres_exporter/main.go b/cmd/postgres_exporter/main.go index 87ce17c8..987ea917 100644 --- a/cmd/postgres_exporter/main.go +++ b/cmd/postgres_exporter/main.go @@ -19,6 +19,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus-community/postgres_exporter/collector" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/promlog" @@ -114,6 +115,13 @@ func main() { prometheus.MustRegister(exporter) + pe, err := collector.NewPostgresCollector(logger, dsn) + if err != nil { + level.Error(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error()) + os.Exit(1) + } + prometheus.MustRegister(pe) + http.Handle(*metricPath, promhttp.Handler()) http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/html; charset=UTF-8") // nolint: errcheck diff --git a/cmd/postgres_exporter/server.go b/cmd/postgres_exporter/server.go index 6a08e998..8747dffa 100644 --- a/cmd/postgres_exporter/server.go +++ b/cmd/postgres_exporter/server.go @@ -14,16 +14,13 @@ package main import ( - "context" "database/sql" "fmt" - "log" "sync" "time" "github.com/blang/semver" "github.com/go-kit/log/level" - "github.com/prometheus-community/postgres_exporter/collector" "github.com/prometheus/client_golang/prometheus" ) @@ -131,17 +128,6 @@ func (s *Server) Scrape(ch chan<- prometheus.Metric, disableSettingsMetrics bool err = fmt.Errorf("queryNamespaceMappings returned %d errors", len(errMap)) } - { - pgdb := collector.NewPGDatabaseCollector() - metrics, err := pgdb.Update(context.TODO(), s.db, s.String()) - if err != nil { - log.Printf("Failed to scrape pg_database metrics: %s", err) - } - for _, m := range metrics { - ch <- m - } - } - return err } diff --git a/collector/collector.go b/collector/collector.go new file mode 100644 index 00000000..60a5510e --- /dev/null +++ b/collector/collector.go @@ -0,0 +1,210 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "gopkg.in/alecthomas/kingpin.v2" +) + +var ( + factories = make(map[string]func(logger log.Logger) (Collector, error)) + initiatedCollectorsMtx = sync.Mutex{} + initiatedCollectors = make(map[string]Collector) + collectorState = make(map[string]*bool) + forcedCollectors = map[string]bool{} // collectors which have been explicitly enabled or disabled +) + +const ( + // Namespace for all metrics. + namespace = "pg" + + defaultEnabled = true + defaultDisabled = false +) + +var ( + scrapeDurationDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "scrape", "collector_duration_seconds"), + "postgres_exporter: Duration of a collector scrape.", + []string{"collector"}, + nil, + ) + scrapeSuccessDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "scrape", "collector_success"), + "postgres_exporter: Whether a collector succeeded.", + []string{"collector"}, + nil, + ) +) + +type Collector interface { + Update(ctx context.Context, server *server, ch chan<- prometheus.Metric) error +} + +func registerCollector(name string, isDefaultEnabled bool, createFunc func(logger log.Logger) (Collector, error)) { + var helpDefaultState string + if isDefaultEnabled { + helpDefaultState = "enabled" + } else { + helpDefaultState = "disabled" + } + + // Create flag for this collector + flagName := fmt.Sprintf("collector.%s", name) + flagHelp := fmt.Sprintf("Enable the %s collector (default: %s).", name, helpDefaultState) + defaultValue := fmt.Sprintf("%v", isDefaultEnabled) + + flag := kingpin.Flag(flagName, flagHelp).Default(defaultValue).Action(collectorFlagAction(name)).Bool() + collectorState[name] = flag + + // Register the create function for this collector + factories[name] = createFunc +} + +// PostgresCollector implements the prometheus.Collector interface. +type PostgresCollector struct { + Collectors map[string]Collector + logger log.Logger + + servers map[string]*server +} + +// NewPostgresCollector creates a new PostgresCollector. +func NewPostgresCollector(logger log.Logger, dsns []string, filters ...string) (*PostgresCollector, error) { + f := make(map[string]bool) + for _, filter := range filters { + enabled, exist := collectorState[filter] + if !exist { + return nil, fmt.Errorf("missing collector: %s", filter) + } + if !*enabled { + return nil, fmt.Errorf("disabled collector: %s", filter) + } + f[filter] = true + } + collectors := make(map[string]Collector) + initiatedCollectorsMtx.Lock() + defer initiatedCollectorsMtx.Unlock() + for key, enabled := range collectorState { + if !*enabled || (len(f) > 0 && !f[key]) { + continue + } + if collector, ok := initiatedCollectors[key]; ok { + collectors[key] = collector + } else { + collector, err := factories[key](log.With(logger, "collector", key)) + if err != nil { + return nil, err + } + collectors[key] = collector + initiatedCollectors[key] = collector + } + } + + servers := make(map[string]*server) + for _, dsn := range dsns { + s, err := makeServer(dsn) + if err != nil { + return nil, err + } + servers[dsn] = s + } + + return &PostgresCollector{ + Collectors: collectors, + logger: logger, + servers: servers, + }, nil +} + +// Describe implements the prometheus.Collector interface. +func (n PostgresCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- scrapeDurationDesc + ch <- scrapeSuccessDesc +} + +// Collect implements the prometheus.Collector interface. +func (n PostgresCollector) Collect(ch chan<- prometheus.Metric) { + ctx := context.TODO() + wg := sync.WaitGroup{} + wg.Add(len(n.servers)) + for _, s := range n.servers { + go func(s *server) { + n.subCollect(ctx, s, ch) + wg.Done() + }(s) + } + wg.Wait() +} + +func (n PostgresCollector) subCollect(ctx context.Context, server *server, ch chan<- prometheus.Metric) { + wg := sync.WaitGroup{} + wg.Add(len(n.Collectors)) + for name, c := range n.Collectors { + go func(name string, c Collector) { + execute(ctx, name, c, server, ch, n.logger) + wg.Done() + }(name, c) + } + wg.Wait() +} + +func execute(ctx context.Context, name string, c Collector, s *server, ch chan<- prometheus.Metric, logger log.Logger) { + begin := time.Now() + err := c.Update(ctx, s, ch) + duration := time.Since(begin) + var success float64 + + if err != nil { + if IsNoDataError(err) { + level.Debug(logger).Log("msg", "collector returned no data", "name", name, "duration_seconds", duration.Seconds(), "err", err) + } else { + level.Error(logger).Log("msg", "collector failed", "name", name, "duration_seconds", duration.Seconds(), "err", err) + } + success = 0 + } else { + level.Debug(logger).Log("msg", "collector succeeded", "name", name, "duration_seconds", duration.Seconds()) + success = 1 + } + ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), name) + ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, success, name) +} + +// collectorFlagAction generates a new action function for the given collector +// to track whether it has been explicitly enabled or disabled from the command line. +// A new action function is needed for each collector flag because the ParseContext +// does not contain information about which flag called the action. +// See: https://github.com/alecthomas/kingpin/issues/294 +func collectorFlagAction(collector string) func(ctx *kingpin.ParseContext) error { + return func(ctx *kingpin.ParseContext) error { + forcedCollectors[collector] = true + return nil + } +} + +// ErrNoData indicates the collector found no data to collect, but had no other error. +var ErrNoData = errors.New("collector returned no data") + +func IsNoDataError(err error) bool { + return err == ErrNoData +} diff --git a/collector/pg_database.go b/collector/pg_database.go index 82475045..5868f66d 100644 --- a/collector/pg_database.go +++ b/collector/pg_database.go @@ -1,4 +1,4 @@ -// Copyright 2021 The Prometheus Authors +// Copyright 2022 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -15,15 +15,21 @@ package collector import ( "context" - "database/sql" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" ) -type PGDatabaseCollector struct{} +func init() { + registerCollector("database", defaultEnabled, NewPGDatabaseCollector) +} -func NewPGDatabaseCollector() *PGDatabaseCollector { - return &PGDatabaseCollector{} +type PGDatabaseCollector struct { + log log.Logger +} + +func NewPGDatabaseCollector(logger log.Logger) (Collector, error) { + return &PGDatabaseCollector{log: logger}, nil } var pgDatabase = map[string]*prometheus.Desc{ @@ -34,14 +40,17 @@ var pgDatabase = map[string]*prometheus.Desc{ ), } -func (PGDatabaseCollector) Update(ctx context.Context, db *sql.DB, server string) ([]prometheus.Metric, error) { - metrics := []prometheus.Metric{} +func (PGDatabaseCollector) Update(ctx context.Context, server *server, ch chan<- prometheus.Metric) error { + db, err := server.GetDB() + if err != nil { + return err + } rows, err := db.QueryContext(ctx, `SELECT pg_database.datname ,pg_database_size(pg_database.datname) FROM pg_database;`) if err != nil { - return metrics, err + return err } defer rows.Close() @@ -49,15 +58,16 @@ func (PGDatabaseCollector) Update(ctx context.Context, db *sql.DB, server string var datname string var size int64 if err := rows.Scan(&datname, &size); err != nil { - return metrics, err + return err } - metrics = append(metrics, prometheus.MustNewConstMetric( + + ch <- prometheus.MustNewConstMetric( pgDatabase["size_bytes"], - prometheus.GaugeValue, float64(size), datname, server, - )) + prometheus.GaugeValue, float64(size), datname, server.GetName(), + ) } if err := rows.Err(); err != nil { - return metrics, err + return err } - return metrics, nil + return nil } diff --git a/collector/server.go b/collector/server.go new file mode 100644 index 00000000..fa490a2c --- /dev/null +++ b/collector/server.go @@ -0,0 +1,100 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "database/sql" + "fmt" + "strings" + + "github.com/lib/pq" +) + +type server struct { + dsn string + name string + db *sql.DB +} + +func makeServer(dsn string) (*server, error) { + name, err := parseServerName(dsn) + if err != nil { + return nil, err + } + return &server{ + dsn: dsn, + name: name, + }, nil +} + +func (s *server) GetDB() (*sql.DB, error) { + if s.db != nil { + return s.db, nil + } + + db, err := sql.Open("postgres", s.dsn) + if err != nil { + return nil, err + } + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + + s.db = db + + return s.db, nil +} + +func (s *server) GetName() string { + return s.name +} + +func (s *server) String() string { + return s.name +} + +func parseServerName(url string) (string, error) { + dsn, err := pq.ParseURL(url) + if err != nil { + dsn = url + } + + pairs := strings.Split(dsn, " ") + kv := make(map[string]string, len(pairs)) + for _, pair := range pairs { + splitted := strings.SplitN(pair, "=", 2) + if len(splitted) != 2 { + return "", fmt.Errorf("malformed dsn %q", dsn) + } + // Newer versions of pq.ParseURL quote values so trim them off if they exist + key := strings.Trim(splitted[0], "'\"") + value := strings.Trim(splitted[1], "'\"") + kv[key] = value + } + + var fingerprint string + + if host, ok := kv["host"]; ok { + fingerprint += host + } else { + fingerprint += "localhost" + } + + if port, ok := kv["port"]; ok { + fingerprint += ":" + port + } else { + fingerprint += ":5432" + } + + return fingerprint, nil +} From 9dad33c39711b7e71dfae75320f8ecf33ca13be4 Mon Sep 17 00:00:00 2001 From: Joe Adams Date: Mon, 14 Feb 2022 21:14:17 -0500 Subject: [PATCH 4/4] Comment unused var Signed-off-by: Joe Adams --- collector/collector.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/collector/collector.go b/collector/collector.go index 60a5510e..f6a80b60 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -38,8 +38,8 @@ const ( // Namespace for all metrics. namespace = "pg" - defaultEnabled = true - defaultDisabled = false + defaultEnabled = true + // defaultDisabled = false ) var (