diff --git a/CHANGELOG.md b/CHANGELOG.md index b97bd9f9..4ac268c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## master / unreleased + +* [ENHANCEMENT] Add pg_database_size_bytes metric #613 + ## 0.10.1 / 2022-01-14 * [BUGFIX] Fix broken log-level for values other than debug. #560 diff --git a/cmd/postgres_exporter/main.go b/cmd/postgres_exporter/main.go index 87ce17c8..987ea917 100644 --- a/cmd/postgres_exporter/main.go +++ b/cmd/postgres_exporter/main.go @@ -19,6 +19,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus-community/postgres_exporter/collector" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/promlog" @@ -114,6 +115,13 @@ func main() { prometheus.MustRegister(exporter) + pe, err := collector.NewPostgresCollector(logger, dsn) + if err != nil { + level.Error(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error()) + os.Exit(1) + } + prometheus.MustRegister(pe) + http.Handle(*metricPath, promhttp.Handler()) http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/html; charset=UTF-8") // nolint: errcheck diff --git a/collector/collector.go b/collector/collector.go new file mode 100644 index 00000000..f6a80b60 --- /dev/null +++ b/collector/collector.go @@ -0,0 +1,210 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "gopkg.in/alecthomas/kingpin.v2" +) + +var ( + factories = make(map[string]func(logger log.Logger) (Collector, error)) + initiatedCollectorsMtx = sync.Mutex{} + initiatedCollectors = make(map[string]Collector) + collectorState = make(map[string]*bool) + forcedCollectors = map[string]bool{} // collectors which have been explicitly enabled or disabled +) + +const ( + // Namespace for all metrics. + namespace = "pg" + + defaultEnabled = true + // defaultDisabled = false +) + +var ( + scrapeDurationDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "scrape", "collector_duration_seconds"), + "postgres_exporter: Duration of a collector scrape.", + []string{"collector"}, + nil, + ) + scrapeSuccessDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "scrape", "collector_success"), + "postgres_exporter: Whether a collector succeeded.", + []string{"collector"}, + nil, + ) +) + +type Collector interface { + Update(ctx context.Context, server *server, ch chan<- prometheus.Metric) error +} + +func registerCollector(name string, isDefaultEnabled bool, createFunc func(logger log.Logger) (Collector, error)) { + var helpDefaultState string + if isDefaultEnabled { + helpDefaultState = "enabled" + } else { + helpDefaultState = "disabled" + } + + // Create flag for this collector + flagName := fmt.Sprintf("collector.%s", name) + flagHelp := fmt.Sprintf("Enable the %s collector (default: %s).", name, helpDefaultState) + defaultValue := fmt.Sprintf("%v", isDefaultEnabled) + + flag := kingpin.Flag(flagName, flagHelp).Default(defaultValue).Action(collectorFlagAction(name)).Bool() + collectorState[name] = flag + + // Register the create function for this collector + factories[name] = createFunc +} + +// PostgresCollector implements the prometheus.Collector interface. +type PostgresCollector struct { + Collectors map[string]Collector + logger log.Logger + + servers map[string]*server +} + +// NewPostgresCollector creates a new PostgresCollector. +func NewPostgresCollector(logger log.Logger, dsns []string, filters ...string) (*PostgresCollector, error) { + f := make(map[string]bool) + for _, filter := range filters { + enabled, exist := collectorState[filter] + if !exist { + return nil, fmt.Errorf("missing collector: %s", filter) + } + if !*enabled { + return nil, fmt.Errorf("disabled collector: %s", filter) + } + f[filter] = true + } + collectors := make(map[string]Collector) + initiatedCollectorsMtx.Lock() + defer initiatedCollectorsMtx.Unlock() + for key, enabled := range collectorState { + if !*enabled || (len(f) > 0 && !f[key]) { + continue + } + if collector, ok := initiatedCollectors[key]; ok { + collectors[key] = collector + } else { + collector, err := factories[key](log.With(logger, "collector", key)) + if err != nil { + return nil, err + } + collectors[key] = collector + initiatedCollectors[key] = collector + } + } + + servers := make(map[string]*server) + for _, dsn := range dsns { + s, err := makeServer(dsn) + if err != nil { + return nil, err + } + servers[dsn] = s + } + + return &PostgresCollector{ + Collectors: collectors, + logger: logger, + servers: servers, + }, nil +} + +// Describe implements the prometheus.Collector interface. +func (n PostgresCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- scrapeDurationDesc + ch <- scrapeSuccessDesc +} + +// Collect implements the prometheus.Collector interface. +func (n PostgresCollector) Collect(ch chan<- prometheus.Metric) { + ctx := context.TODO() + wg := sync.WaitGroup{} + wg.Add(len(n.servers)) + for _, s := range n.servers { + go func(s *server) { + n.subCollect(ctx, s, ch) + wg.Done() + }(s) + } + wg.Wait() +} + +func (n PostgresCollector) subCollect(ctx context.Context, server *server, ch chan<- prometheus.Metric) { + wg := sync.WaitGroup{} + wg.Add(len(n.Collectors)) + for name, c := range n.Collectors { + go func(name string, c Collector) { + execute(ctx, name, c, server, ch, n.logger) + wg.Done() + }(name, c) + } + wg.Wait() +} + +func execute(ctx context.Context, name string, c Collector, s *server, ch chan<- prometheus.Metric, logger log.Logger) { + begin := time.Now() + err := c.Update(ctx, s, ch) + duration := time.Since(begin) + var success float64 + + if err != nil { + if IsNoDataError(err) { + level.Debug(logger).Log("msg", "collector returned no data", "name", name, "duration_seconds", duration.Seconds(), "err", err) + } else { + level.Error(logger).Log("msg", "collector failed", "name", name, "duration_seconds", duration.Seconds(), "err", err) + } + success = 0 + } else { + level.Debug(logger).Log("msg", "collector succeeded", "name", name, "duration_seconds", duration.Seconds()) + success = 1 + } + ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), name) + ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, success, name) +} + +// collectorFlagAction generates a new action function for the given collector +// to track whether it has been explicitly enabled or disabled from the command line. +// A new action function is needed for each collector flag because the ParseContext +// does not contain information about which flag called the action. +// See: https://github.com/alecthomas/kingpin/issues/294 +func collectorFlagAction(collector string) func(ctx *kingpin.ParseContext) error { + return func(ctx *kingpin.ParseContext) error { + forcedCollectors[collector] = true + return nil + } +} + +// ErrNoData indicates the collector found no data to collect, but had no other error. +var ErrNoData = errors.New("collector returned no data") + +func IsNoDataError(err error) bool { + return err == ErrNoData +} diff --git a/collector/pg_database.go b/collector/pg_database.go new file mode 100644 index 00000000..5868f66d --- /dev/null +++ b/collector/pg_database.go @@ -0,0 +1,73 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +func init() { + registerCollector("database", defaultEnabled, NewPGDatabaseCollector) +} + +type PGDatabaseCollector struct { + log log.Logger +} + +func NewPGDatabaseCollector(logger log.Logger) (Collector, error) { + return &PGDatabaseCollector{log: logger}, nil +} + +var pgDatabase = map[string]*prometheus.Desc{ + "size_bytes": prometheus.NewDesc( + "pg_database_size_bytes", + "Disk space used by the database", + []string{"datname", "server"}, nil, + ), +} + +func (PGDatabaseCollector) Update(ctx context.Context, server *server, ch chan<- prometheus.Metric) error { + db, err := server.GetDB() + if err != nil { + return err + } + rows, err := db.QueryContext(ctx, + `SELECT pg_database.datname + ,pg_database_size(pg_database.datname) + FROM pg_database;`) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var datname string + var size int64 + if err := rows.Scan(&datname, &size); err != nil { + return err + } + + ch <- prometheus.MustNewConstMetric( + pgDatabase["size_bytes"], + prometheus.GaugeValue, float64(size), datname, server.GetName(), + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/server.go b/collector/server.go new file mode 100644 index 00000000..fa490a2c --- /dev/null +++ b/collector/server.go @@ -0,0 +1,100 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "database/sql" + "fmt" + "strings" + + "github.com/lib/pq" +) + +type server struct { + dsn string + name string + db *sql.DB +} + +func makeServer(dsn string) (*server, error) { + name, err := parseServerName(dsn) + if err != nil { + return nil, err + } + return &server{ + dsn: dsn, + name: name, + }, nil +} + +func (s *server) GetDB() (*sql.DB, error) { + if s.db != nil { + return s.db, nil + } + + db, err := sql.Open("postgres", s.dsn) + if err != nil { + return nil, err + } + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + + s.db = db + + return s.db, nil +} + +func (s *server) GetName() string { + return s.name +} + +func (s *server) String() string { + return s.name +} + +func parseServerName(url string) (string, error) { + dsn, err := pq.ParseURL(url) + if err != nil { + dsn = url + } + + pairs := strings.Split(dsn, " ") + kv := make(map[string]string, len(pairs)) + for _, pair := range pairs { + splitted := strings.SplitN(pair, "=", 2) + if len(splitted) != 2 { + return "", fmt.Errorf("malformed dsn %q", dsn) + } + // Newer versions of pq.ParseURL quote values so trim them off if they exist + key := strings.Trim(splitted[0], "'\"") + value := strings.Trim(splitted[1], "'\"") + kv[key] = value + } + + var fingerprint string + + if host, ok := kv["host"]; ok { + fingerprint += host + } else { + fingerprint += "localhost" + } + + if port, ok := kv["port"]; ok { + fingerprint += ":" + port + } else { + fingerprint += ":5432" + } + + return fingerprint, nil +} diff --git a/queries.yaml b/queries.yaml index 1102da44..6f2008cb 100644 --- a/queries.yaml +++ b/queries.yaml @@ -146,18 +146,6 @@ pg_statio_user_tables: usage: "COUNTER" description: "Number of buffer hits in this table's TOAST table indexes (if any)" -pg_database: - query: "SELECT pg_database.datname, pg_database_size(pg_database.datname) as size_bytes FROM pg_database" - master: true - cache_seconds: 30 - metrics: - - datname: - usage: "LABEL" - description: "Name of the database" - - size_bytes: - usage: "GAUGE" - description: "Disk space used by the database" - # WARNING: This set of metrics can be very expensive on a busy server as every unique query executed will create an additional time series pg_stat_statements: query: "SELECT t2.rolname, t3.datname, queryid, calls, total_time / 1000 as total_time_seconds, min_time / 1000 as min_time_seconds, max_time / 1000 as max_time_seconds, mean_time / 1000 as mean_time_seconds, stddev_time / 1000 as stddev_time_seconds, rows, shared_blks_hit, shared_blks_read, shared_blks_dirtied, shared_blks_written, local_blks_hit, local_blks_read, local_blks_dirtied, local_blks_written, temp_blks_read, temp_blks_written, blk_read_time / 1000 as blk_read_time_seconds, blk_write_time / 1000 as blk_write_time_seconds FROM pg_stat_statements t1 JOIN pg_roles t2 ON (t1.userid=t2.oid) JOIN pg_database t3 ON (t1.dbid=t3.oid) WHERE t2.rolname != 'rdsadmin'"