diff --git a/cmd/postgres_exporter/main.go b/cmd/postgres_exporter/main.go index 87ce17c8..987ea917 100644 --- a/cmd/postgres_exporter/main.go +++ b/cmd/postgres_exporter/main.go @@ -19,6 +19,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus-community/postgres_exporter/collector" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/promlog" @@ -114,6 +115,13 @@ func main() { prometheus.MustRegister(exporter) + pe, err := collector.NewPostgresCollector(logger, dsn) + if err != nil { + level.Error(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error()) + os.Exit(1) + } + prometheus.MustRegister(pe) + http.Handle(*metricPath, promhttp.Handler()) http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/html; charset=UTF-8") // nolint: errcheck diff --git a/cmd/postgres_exporter/server.go b/cmd/postgres_exporter/server.go index 6a08e998..8747dffa 100644 --- a/cmd/postgres_exporter/server.go +++ b/cmd/postgres_exporter/server.go @@ -14,16 +14,13 @@ package main import ( - "context" "database/sql" "fmt" - "log" "sync" "time" "github.com/blang/semver" "github.com/go-kit/log/level" - "github.com/prometheus-community/postgres_exporter/collector" "github.com/prometheus/client_golang/prometheus" ) @@ -131,17 +128,6 @@ func (s *Server) Scrape(ch chan<- prometheus.Metric, disableSettingsMetrics bool err = fmt.Errorf("queryNamespaceMappings returned %d errors", len(errMap)) } - { - pgdb := collector.NewPGDatabaseCollector() - metrics, err := pgdb.Update(context.TODO(), s.db, s.String()) - if err != nil { - log.Printf("Failed to scrape pg_database metrics: %s", err) - } - for _, m := range metrics { - ch <- m - } - } - return err } diff --git a/collector/collector.go b/collector/collector.go new file mode 100644 index 00000000..60a5510e --- /dev/null +++ b/collector/collector.go @@ -0,0 +1,210 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "gopkg.in/alecthomas/kingpin.v2" +) + +var ( + factories = make(map[string]func(logger log.Logger) (Collector, error)) + initiatedCollectorsMtx = sync.Mutex{} + initiatedCollectors = make(map[string]Collector) + collectorState = make(map[string]*bool) + forcedCollectors = map[string]bool{} // collectors which have been explicitly enabled or disabled +) + +const ( + // Namespace for all metrics. + namespace = "pg" + + defaultEnabled = true + defaultDisabled = false +) + +var ( + scrapeDurationDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "scrape", "collector_duration_seconds"), + "postgres_exporter: Duration of a collector scrape.", + []string{"collector"}, + nil, + ) + scrapeSuccessDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "scrape", "collector_success"), + "postgres_exporter: Whether a collector succeeded.", + []string{"collector"}, + nil, + ) +) + +type Collector interface { + Update(ctx context.Context, server *server, ch chan<- prometheus.Metric) error +} + +func registerCollector(name string, isDefaultEnabled bool, createFunc func(logger log.Logger) (Collector, error)) { + var helpDefaultState string + if isDefaultEnabled { + helpDefaultState = "enabled" + } else { + helpDefaultState = "disabled" + } + + // Create flag for this collector + flagName := fmt.Sprintf("collector.%s", name) + flagHelp := fmt.Sprintf("Enable the %s collector (default: %s).", name, helpDefaultState) + defaultValue := fmt.Sprintf("%v", isDefaultEnabled) + + flag := kingpin.Flag(flagName, flagHelp).Default(defaultValue).Action(collectorFlagAction(name)).Bool() + collectorState[name] = flag + + // Register the create function for this collector + factories[name] = createFunc +} + +// PostgresCollector implements the prometheus.Collector interface. +type PostgresCollector struct { + Collectors map[string]Collector + logger log.Logger + + servers map[string]*server +} + +// NewPostgresCollector creates a new PostgresCollector. +func NewPostgresCollector(logger log.Logger, dsns []string, filters ...string) (*PostgresCollector, error) { + f := make(map[string]bool) + for _, filter := range filters { + enabled, exist := collectorState[filter] + if !exist { + return nil, fmt.Errorf("missing collector: %s", filter) + } + if !*enabled { + return nil, fmt.Errorf("disabled collector: %s", filter) + } + f[filter] = true + } + collectors := make(map[string]Collector) + initiatedCollectorsMtx.Lock() + defer initiatedCollectorsMtx.Unlock() + for key, enabled := range collectorState { + if !*enabled || (len(f) > 0 && !f[key]) { + continue + } + if collector, ok := initiatedCollectors[key]; ok { + collectors[key] = collector + } else { + collector, err := factories[key](log.With(logger, "collector", key)) + if err != nil { + return nil, err + } + collectors[key] = collector + initiatedCollectors[key] = collector + } + } + + servers := make(map[string]*server) + for _, dsn := range dsns { + s, err := makeServer(dsn) + if err != nil { + return nil, err + } + servers[dsn] = s + } + + return &PostgresCollector{ + Collectors: collectors, + logger: logger, + servers: servers, + }, nil +} + +// Describe implements the prometheus.Collector interface. +func (n PostgresCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- scrapeDurationDesc + ch <- scrapeSuccessDesc +} + +// Collect implements the prometheus.Collector interface. +func (n PostgresCollector) Collect(ch chan<- prometheus.Metric) { + ctx := context.TODO() + wg := sync.WaitGroup{} + wg.Add(len(n.servers)) + for _, s := range n.servers { + go func(s *server) { + n.subCollect(ctx, s, ch) + wg.Done() + }(s) + } + wg.Wait() +} + +func (n PostgresCollector) subCollect(ctx context.Context, server *server, ch chan<- prometheus.Metric) { + wg := sync.WaitGroup{} + wg.Add(len(n.Collectors)) + for name, c := range n.Collectors { + go func(name string, c Collector) { + execute(ctx, name, c, server, ch, n.logger) + wg.Done() + }(name, c) + } + wg.Wait() +} + +func execute(ctx context.Context, name string, c Collector, s *server, ch chan<- prometheus.Metric, logger log.Logger) { + begin := time.Now() + err := c.Update(ctx, s, ch) + duration := time.Since(begin) + var success float64 + + if err != nil { + if IsNoDataError(err) { + level.Debug(logger).Log("msg", "collector returned no data", "name", name, "duration_seconds", duration.Seconds(), "err", err) + } else { + level.Error(logger).Log("msg", "collector failed", "name", name, "duration_seconds", duration.Seconds(), "err", err) + } + success = 0 + } else { + level.Debug(logger).Log("msg", "collector succeeded", "name", name, "duration_seconds", duration.Seconds()) + success = 1 + } + ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), name) + ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, success, name) +} + +// collectorFlagAction generates a new action function for the given collector +// to track whether it has been explicitly enabled or disabled from the command line. +// A new action function is needed for each collector flag because the ParseContext +// does not contain information about which flag called the action. +// See: https://github.com/alecthomas/kingpin/issues/294 +func collectorFlagAction(collector string) func(ctx *kingpin.ParseContext) error { + return func(ctx *kingpin.ParseContext) error { + forcedCollectors[collector] = true + return nil + } +} + +// ErrNoData indicates the collector found no data to collect, but had no other error. +var ErrNoData = errors.New("collector returned no data") + +func IsNoDataError(err error) bool { + return err == ErrNoData +} diff --git a/collector/pg_database.go b/collector/pg_database.go index 82475045..5868f66d 100644 --- a/collector/pg_database.go +++ b/collector/pg_database.go @@ -1,4 +1,4 @@ -// Copyright 2021 The Prometheus Authors +// Copyright 2022 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -15,15 +15,21 @@ package collector import ( "context" - "database/sql" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" ) -type PGDatabaseCollector struct{} +func init() { + registerCollector("database", defaultEnabled, NewPGDatabaseCollector) +} -func NewPGDatabaseCollector() *PGDatabaseCollector { - return &PGDatabaseCollector{} +type PGDatabaseCollector struct { + log log.Logger +} + +func NewPGDatabaseCollector(logger log.Logger) (Collector, error) { + return &PGDatabaseCollector{log: logger}, nil } var pgDatabase = map[string]*prometheus.Desc{ @@ -34,14 +40,17 @@ var pgDatabase = map[string]*prometheus.Desc{ ), } -func (PGDatabaseCollector) Update(ctx context.Context, db *sql.DB, server string) ([]prometheus.Metric, error) { - metrics := []prometheus.Metric{} +func (PGDatabaseCollector) Update(ctx context.Context, server *server, ch chan<- prometheus.Metric) error { + db, err := server.GetDB() + if err != nil { + return err + } rows, err := db.QueryContext(ctx, `SELECT pg_database.datname ,pg_database_size(pg_database.datname) FROM pg_database;`) if err != nil { - return metrics, err + return err } defer rows.Close() @@ -49,15 +58,16 @@ func (PGDatabaseCollector) Update(ctx context.Context, db *sql.DB, server string var datname string var size int64 if err := rows.Scan(&datname, &size); err != nil { - return metrics, err + return err } - metrics = append(metrics, prometheus.MustNewConstMetric( + + ch <- prometheus.MustNewConstMetric( pgDatabase["size_bytes"], - prometheus.GaugeValue, float64(size), datname, server, - )) + prometheus.GaugeValue, float64(size), datname, server.GetName(), + ) } if err := rows.Err(); err != nil { - return metrics, err + return err } - return metrics, nil + return nil } diff --git a/collector/server.go b/collector/server.go new file mode 100644 index 00000000..fa490a2c --- /dev/null +++ b/collector/server.go @@ -0,0 +1,100 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "database/sql" + "fmt" + "strings" + + "github.com/lib/pq" +) + +type server struct { + dsn string + name string + db *sql.DB +} + +func makeServer(dsn string) (*server, error) { + name, err := parseServerName(dsn) + if err != nil { + return nil, err + } + return &server{ + dsn: dsn, + name: name, + }, nil +} + +func (s *server) GetDB() (*sql.DB, error) { + if s.db != nil { + return s.db, nil + } + + db, err := sql.Open("postgres", s.dsn) + if err != nil { + return nil, err + } + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + + s.db = db + + return s.db, nil +} + +func (s *server) GetName() string { + return s.name +} + +func (s *server) String() string { + return s.name +} + +func parseServerName(url string) (string, error) { + dsn, err := pq.ParseURL(url) + if err != nil { + dsn = url + } + + pairs := strings.Split(dsn, " ") + kv := make(map[string]string, len(pairs)) + for _, pair := range pairs { + splitted := strings.SplitN(pair, "=", 2) + if len(splitted) != 2 { + return "", fmt.Errorf("malformed dsn %q", dsn) + } + // Newer versions of pq.ParseURL quote values so trim them off if they exist + key := strings.Trim(splitted[0], "'\"") + value := strings.Trim(splitted[1], "'\"") + kv[key] = value + } + + var fingerprint string + + if host, ok := kv["host"]; ok { + fingerprint += host + } else { + fingerprint += "localhost" + } + + if port, ok := kv["port"]; ok { + fingerprint += ":" + port + } else { + fingerprint += ":5432" + } + + return fingerprint, nil +}