From bc981e66c3585035c1130526488f36a7f7266680 Mon Sep 17 00:00:00 2001 From: Joe Adams Date: Thu, 1 Jul 2021 16:43:32 -0400 Subject: [PATCH] Refactor code into logical files Moves code into more manageable, logical files to group behavior together. This should help improve a developer's ability to navigate the code. Signed-off-by: Joe Adams --- cmd/postgres_exporter/datasource.go | 168 +++ cmd/postgres_exporter/main.go | 129 +++ cmd/postgres_exporter/namespace.go | 264 +++++ cmd/postgres_exporter/postgres_exporter.go | 1175 +------------------- cmd/postgres_exporter/queries.go | 303 +++++ cmd/postgres_exporter/server.go | 190 ++++ cmd/postgres_exporter/util.go | 218 ++++ 7 files changed, 1300 insertions(+), 1147 deletions(-) create mode 100644 cmd/postgres_exporter/datasource.go create mode 100644 cmd/postgres_exporter/main.go create mode 100644 cmd/postgres_exporter/namespace.go create mode 100644 cmd/postgres_exporter/queries.go create mode 100644 cmd/postgres_exporter/server.go create mode 100644 cmd/postgres_exporter/util.go diff --git a/cmd/postgres_exporter/datasource.go b/cmd/postgres_exporter/datasource.go new file mode 100644 index 00000000..3bbe2f0a --- /dev/null +++ b/cmd/postgres_exporter/datasource.go @@ -0,0 +1,168 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "io/ioutil" + "net/url" + "os" + "regexp" + "strings" + + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" +) + +func (e *Exporter) discoverDatabaseDSNs() []string { + // connstring syntax is complex (and not sure if even regular). + // we don't need to parse it, so just superficially validate that it starts + // with a valid-ish keyword pair + connstringRe := regexp.MustCompile(`^ *[a-zA-Z0-9]+ *= *[^= ]+`) + + dsns := make(map[string]struct{}) + for _, dsn := range e.dsn { + var dsnURI *url.URL + var dsnConnstring string + + if strings.HasPrefix(dsn, "postgresql://") { + var err error + dsnURI, err = url.Parse(dsn) + if err != nil { + level.Error(logger).Log("msg", "Unable to parse DSN as URI", "dsn", loggableDSN(dsn), "err", err) + continue + } + } else if connstringRe.MatchString(dsn) { + dsnConnstring = dsn + } else { + level.Error(logger).Log("msg", "Unable to parse DSN as either URI or connstring", "dsn", loggableDSN(dsn)) + continue + } + + server, err := e.servers.GetServer(dsn) + if err != nil { + level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err) + continue + } + dsns[dsn] = struct{}{} + + // If autoDiscoverDatabases is true, set first dsn as master database (Default: false) + server.master = true + + databaseNames, err := queryDatabases(server) + if err != nil { + level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err) + continue + } + for _, databaseName := range databaseNames { + if contains(e.excludeDatabases, databaseName) { + continue + } + + if len(e.includeDatabases) != 0 && !contains(e.includeDatabases, databaseName) { + continue + } + + if dsnURI != nil { + dsnURI.Path = databaseName + dsn = dsnURI.String() + } else { + // replacing one dbname with another is complicated. + // just append new dbname to override. + dsn = fmt.Sprintf("%s dbname=%s", dsnConnstring, databaseName) + } + dsns[dsn] = struct{}{} + } + } + + result := make([]string, len(dsns)) + index := 0 + for dsn := range dsns { + result[index] = dsn + index++ + } + + return result +} + +func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error { + server, err := e.servers.GetServer(dsn) + + if err != nil { + return &ErrorConnectToServer{fmt.Sprintf("Error opening connection to database (%s): %s", loggableDSN(dsn), err.Error())} + } + + // Check if autoDiscoverDatabases is false, set dsn as master database (Default: false) + if !e.autoDiscoverDatabases { + server.master = true + } + + // Check if map versions need to be updated + if err := e.checkMapVersions(ch, server); err != nil { + level.Warn(logger).Log("msg", "Proceeding with outdated query maps, as the Postgres version could not be determined", "err", err) + } + + return server.Scrape(ch, e.disableSettingsMetrics) +} + +// try to get the DataSource +// DATA_SOURCE_NAME always wins so we do not break older versions +// reading secrets from files wins over secrets in environment variables +// DATA_SOURCE_NAME > DATA_SOURCE_{USER|PASS}_FILE > DATA_SOURCE_{USER|PASS} +func getDataSources() ([]string, error) { + var dsn = os.Getenv("DATA_SOURCE_NAME") + if len(dsn) != 0 { + return strings.Split(dsn, ","), nil + } + + var user, pass, uri string + + dataSourceUserFile := os.Getenv("DATA_SOURCE_USER_FILE") + if len(dataSourceUserFile) != 0 { + fileContents, err := ioutil.ReadFile(dataSourceUserFile) + if err != nil { + return nil, fmt.Errorf("failed loading data source user file %s: %s", dataSourceUserFile, err.Error()) + } + user = strings.TrimSpace(string(fileContents)) + } else { + user = os.Getenv("DATA_SOURCE_USER") + } + + dataSourcePassFile := os.Getenv("DATA_SOURCE_PASS_FILE") + if len(dataSourcePassFile) != 0 { + fileContents, err := ioutil.ReadFile(dataSourcePassFile) + if err != nil { + return nil, fmt.Errorf("failed loading data source pass file %s: %s", dataSourcePassFile, err.Error()) + } + pass = strings.TrimSpace(string(fileContents)) + } else { + pass = os.Getenv("DATA_SOURCE_PASS") + } + + ui := url.UserPassword(user, pass).String() + dataSrouceURIFile := os.Getenv("DATA_SOURCE_URI_FILE") + if len(dataSrouceURIFile) != 0 { + fileContents, err := ioutil.ReadFile(dataSrouceURIFile) + if err != nil { + return nil, fmt.Errorf("failed loading data source URI file %s: %s", dataSrouceURIFile, err.Error()) + } + uri = strings.TrimSpace(string(fileContents)) + } else { + uri = os.Getenv("DATA_SOURCE_URI") + } + + dsn = "postgresql://" + ui + "@" + uri + + return []string{dsn}, nil +} diff --git a/cmd/postgres_exporter/main.go b/cmd/postgres_exporter/main.go new file mode 100644 index 00000000..25f94a74 --- /dev/null +++ b/cmd/postgres_exporter/main.go @@ -0,0 +1,129 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "net/http" + "os" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/common/promlog" + "github.com/prometheus/common/promlog/flag" + "github.com/prometheus/common/version" + "github.com/prometheus/exporter-toolkit/web" + webflag "github.com/prometheus/exporter-toolkit/web/kingpinflag" + "gopkg.in/alecthomas/kingpin.v2" +) + +var ( + listenAddress = kingpin.Flag("web.listen-address", "Address to listen on for web interface and telemetry.").Default(":9187").Envar("PG_EXPORTER_WEB_LISTEN_ADDRESS").String() + webConfig = webflag.AddFlags(kingpin.CommandLine) + metricPath = kingpin.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").Envar("PG_EXPORTER_WEB_TELEMETRY_PATH").String() + disableDefaultMetrics = kingpin.Flag("disable-default-metrics", "Do not include default metrics.").Default("false").Envar("PG_EXPORTER_DISABLE_DEFAULT_METRICS").Bool() + disableSettingsMetrics = kingpin.Flag("disable-settings-metrics", "Do not include pg_settings metrics.").Default("false").Envar("PG_EXPORTER_DISABLE_SETTINGS_METRICS").Bool() + autoDiscoverDatabases = kingpin.Flag("auto-discover-databases", "Whether to discover the databases on a server dynamically.").Default("false").Envar("PG_EXPORTER_AUTO_DISCOVER_DATABASES").Bool() + queriesPath = kingpin.Flag("extend.query-path", "Path to custom queries to run.").Default("").Envar("PG_EXPORTER_EXTEND_QUERY_PATH").String() + onlyDumpMaps = kingpin.Flag("dumpmaps", "Do not run, simply dump the maps.").Bool() + constantLabelsList = kingpin.Flag("constantLabels", "A list of label=value separated by comma(,).").Default("").Envar("PG_EXPORTER_CONSTANT_LABELS").String() + excludeDatabases = kingpin.Flag("exclude-databases", "A list of databases to remove when autoDiscoverDatabases is enabled").Default("").Envar("PG_EXPORTER_EXCLUDE_DATABASES").String() + includeDatabases = kingpin.Flag("include-databases", "A list of databases to include when autoDiscoverDatabases is enabled").Default("").Envar("PG_EXPORTER_INCLUDE_DATABASES").String() + metricPrefix = kingpin.Flag("metric-prefix", "A metric prefix can be used to have non-default (not \"pg\") prefixes for each of the metrics").Default("pg").Envar("PG_EXPORTER_METRIC_PREFIX").String() + logger = log.NewNopLogger() +) + +// Metric name parts. +const ( + // Namespace for all metrics. + namespace = "pg" + // Subsystems. + exporter = "exporter" + // The name of the exporter. + exporterName = "postgres_exporter" + // Metric label used for static string data thats handy to send to Prometheus + // e.g. version + staticLabelName = "static" + // Metric label used for server identification. + serverLabelName = "server" +) + +func main() { + kingpin.Version(version.Print(exporterName)) + promlogConfig := &promlog.Config{} + flag.AddFlags(kingpin.CommandLine, promlogConfig) + kingpin.HelpFlag.Short('h') + kingpin.Parse() + logger = promlog.New(promlogConfig) + + // landingPage contains the HTML served at '/'. + // TODO: Make this nicer and more informative. + var landingPage = []byte(` + Postgres exporter + +

Postgres exporter

+

Metrics

+ + + `) + + if *onlyDumpMaps { + dumpMaps() + return + } + + dsn, err := getDataSources() + if err != nil { + level.Error(logger).Log("msg", "Failed reading data sources", "err", err.Error()) + os.Exit(1) + } + + if len(dsn) == 0 { + level.Error(logger).Log("msg", "Couldn't find environment variables describing the datasource to use") + os.Exit(1) + } + + opts := []ExporterOpt{ + DisableDefaultMetrics(*disableDefaultMetrics), + DisableSettingsMetrics(*disableSettingsMetrics), + AutoDiscoverDatabases(*autoDiscoverDatabases), + WithUserQueriesPath(*queriesPath), + WithConstantLabels(*constantLabelsList), + ExcludeDatabases(*excludeDatabases), + IncludeDatabases(*includeDatabases), + } + + exporter := NewExporter(dsn, opts...) + defer func() { + exporter.servers.Close() + }() + + prometheus.MustRegister(version.NewCollector(exporterName)) + + prometheus.MustRegister(exporter) + + http.Handle(*metricPath, promhttp.Handler()) + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/html; charset=UTF-8") // nolint: errcheck + w.Write(landingPage) // nolint: errcheck + }) + + level.Info(logger).Log("msg", "Listening on address", "address", *listenAddress) + srv := &http.Server{Addr: *listenAddress} + if err := web.ListenAndServe(srv, *webConfig, logger); err != nil { + level.Error(logger).Log("msg", "Error running HTTP server", "err", err) + os.Exit(1) + } +} diff --git a/cmd/postgres_exporter/namespace.go b/cmd/postgres_exporter/namespace.go new file mode 100644 index 00000000..1b9e970e --- /dev/null +++ b/cmd/postgres_exporter/namespace.go @@ -0,0 +1,264 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "database/sql" + "errors" + "fmt" + "time" + + "github.com/blang/semver" + "github.com/go-kit/kit/log/level" + "github.com/lib/pq" + "github.com/prometheus/client_golang/prometheus" +) + +// Query within a namespace mapping and emit metrics. Returns fatal errors if +// the scrape fails, and a slice of errors if they were non-fatal. +func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNamespace) ([]prometheus.Metric, []error, error) { + // Check for a query override for this namespace + query, found := server.queryOverrides[namespace] + + // Was this query disabled (i.e. nothing sensible can be queried on cu + // version of PostgreSQL? + if query == "" && found { + // Return success (no pertinent data) + return []prometheus.Metric{}, []error{}, nil + } + + // Don't fail on a bad scrape of one metric + var rows *sql.Rows + var err error + + if !found { + // I've no idea how to avoid this properly at the moment, but this is + // an admin tool so you're not injecting SQL right? + rows, err = server.db.Query(fmt.Sprintf("SELECT * FROM %s;", namespace)) // nolint: gas + } else { + rows, err = server.db.Query(query) + } + if err != nil { + return []prometheus.Metric{}, []error{}, fmt.Errorf("Error running query on database %q: %s %v", server, namespace, err) + } + defer rows.Close() // nolint: errcheck + + var columnNames []string + columnNames, err = rows.Columns() + if err != nil { + return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving column list for: ", namespace, err)) + } + + // Make a lookup map for the column indices + var columnIdx = make(map[string]int, len(columnNames)) + for i, n := range columnNames { + columnIdx[n] = i + } + + var columnData = make([]interface{}, len(columnNames)) + var scanArgs = make([]interface{}, len(columnNames)) + for i := range columnData { + scanArgs[i] = &columnData[i] + } + + nonfatalErrors := []error{} + + metrics := make([]prometheus.Metric, 0) + + for rows.Next() { + err = rows.Scan(scanArgs...) + if err != nil { + return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving rows:", namespace, err)) + } + + // Get the label values for this row. + labels := make([]string, len(mapping.labels)) + for idx, label := range mapping.labels { + labels[idx], _ = dbToString(columnData[columnIdx[label]]) + } + + // Loop over column names, and match to scan data. Unknown columns + // will be filled with an untyped metric number *if* they can be + // converted to float64s. NULLs are allowed and treated as NaN. + for idx, columnName := range columnNames { + var metric prometheus.Metric + if metricMapping, ok := mapping.columnMappings[columnName]; ok { + // Is this a metricy metric? + if metricMapping.discard { + continue + } + + if metricMapping.histogram { + var keys []float64 + err = pq.Array(&keys).Scan(columnData[idx]) + if err != nil { + return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving", columnName, "buckets:", namespace, err)) + } + + var values []int64 + valuesIdx, ok := columnIdx[columnName+"_bucket"] + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Missing column: ", namespace, columnName+"_bucket"))) + continue + } + err = pq.Array(&values).Scan(columnData[valuesIdx]) + if err != nil { + return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving", columnName, "bucket values:", namespace, err)) + } + + buckets := make(map[float64]uint64, len(keys)) + for i, key := range keys { + if i >= len(values) { + break + } + buckets[key] = uint64(values[i]) + } + + idx, ok = columnIdx[columnName+"_sum"] + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Missing column: ", namespace, columnName+"_sum"))) + continue + } + sum, ok := dbToFloat64(columnData[idx]) + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName+"_sum", columnData[idx]))) + continue + } + + idx, ok = columnIdx[columnName+"_count"] + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Missing column: ", namespace, columnName+"_count"))) + continue + } + count, ok := dbToUint64(columnData[idx]) + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName+"_count", columnData[idx]))) + continue + } + + metric = prometheus.MustNewConstHistogram( + metricMapping.desc, + count, sum, buckets, + labels..., + ) + } else { + value, ok := dbToFloat64(columnData[idx]) + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName, columnData[idx]))) + continue + } + // Generate the metric + metric = prometheus.MustNewConstMetric(metricMapping.desc, metricMapping.vtype, value, labels...) + } + } else { + // Unknown metric. Report as untyped if scan to float64 works, else note an error too. + metricLabel := fmt.Sprintf("%s_%s", namespace, columnName) + desc := prometheus.NewDesc(metricLabel, fmt.Sprintf("Unknown metric from %s", namespace), mapping.labels, server.labels) + + // Its not an error to fail here, since the values are + // unexpected anyway. + value, ok := dbToFloat64(columnData[idx]) + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unparseable column type - discarding: ", namespace, columnName, err))) + continue + } + metric = prometheus.MustNewConstMetric(desc, prometheus.UntypedValue, value, labels...) + } + metrics = append(metrics, metric) + } + } + return metrics, nonfatalErrors, nil +} + +// Iterate through all the namespace mappings in the exporter and run their +// queries. +func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[string]error { + // Return a map of namespace -> errors + namespaceErrors := make(map[string]error) + + scrapeStart := time.Now() + + for namespace, mapping := range server.metricMap { + level.Debug(logger).Log("msg", "Querying namespace", "namespace", namespace) + + if mapping.master && !server.master { + level.Debug(logger).Log("msg", "Query skipped...") + continue + } + + // check if the query is to be run on specific database server version range or not + if len(server.runonserver) > 0 { + serVersion, _ := semver.Parse(server.lastMapVersion.String()) + runServerRange, _ := semver.ParseRange(server.runonserver) + if !runServerRange(serVersion) { + level.Debug(logger).Log("msg", "Query skipped for this database version", "version", server.lastMapVersion.String(), "target_version", server.runonserver) + continue + } + } + + scrapeMetric := false + // Check if the metric is cached + server.cacheMtx.Lock() + cachedMetric, found := server.metricCache[namespace] + server.cacheMtx.Unlock() + // If found, check if needs refresh from cache + if found { + if scrapeStart.Sub(cachedMetric.lastScrape).Seconds() > float64(mapping.cacheSeconds) { + scrapeMetric = true + } + } else { + scrapeMetric = true + } + + var metrics []prometheus.Metric + var nonFatalErrors []error + var err error + if scrapeMetric { + metrics, nonFatalErrors, err = queryNamespaceMapping(server, namespace, mapping) + } else { + metrics = cachedMetric.metrics + } + + // Serious error - a namespace disappeared + if err != nil { + namespaceErrors[namespace] = err + level.Info(logger).Log("err", err) + } + // Non-serious errors - likely version or parsing problems. + if len(nonFatalErrors) > 0 { + for _, err := range nonFatalErrors { + level.Info(logger).Log("err", err) + } + } + + // Emit the metrics into the channel + for _, metric := range metrics { + ch <- metric + } + + if scrapeMetric { + // Only cache if metric is meaningfully cacheable + if mapping.cacheSeconds > 0 { + server.cacheMtx.Lock() + server.metricCache[namespace] = cachedMetrics{ + metrics: metrics, + lastScrape: scrapeStart, + } + server.cacheMtx.Unlock() + } + } + } + + return namespaceErrors +} diff --git a/cmd/postgres_exporter/postgres_exporter.go b/cmd/postgres_exporter/postgres_exporter.go index b2e9f75b..15804811 100644 --- a/cmd/postgres_exporter/postgres_exporter.go +++ b/cmd/postgres_exporter/postgres_exporter.go @@ -20,74 +20,34 @@ import ( "fmt" "io/ioutil" "math" - "net/http" - "net/url" - "os" "regexp" - "strconv" "strings" - "sync" "time" "github.com/blang/semver" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/lib/pq" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/prometheus/common/promlog" - "github.com/prometheus/common/promlog/flag" - "github.com/prometheus/common/version" - "github.com/prometheus/exporter-toolkit/web" - webflag "github.com/prometheus/exporter-toolkit/web/kingpinflag" - "gopkg.in/alecthomas/kingpin.v2" - "gopkg.in/yaml.v2" -) - -var ( - listenAddress = kingpin.Flag("web.listen-address", "Address to listen on for web interface and telemetry.").Default(":9187").Envar("PG_EXPORTER_WEB_LISTEN_ADDRESS").String() - webConfig = webflag.AddFlags(kingpin.CommandLine) - metricPath = kingpin.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").Envar("PG_EXPORTER_WEB_TELEMETRY_PATH").String() - disableDefaultMetrics = kingpin.Flag("disable-default-metrics", "Do not include default metrics.").Default("false").Envar("PG_EXPORTER_DISABLE_DEFAULT_METRICS").Bool() - disableSettingsMetrics = kingpin.Flag("disable-settings-metrics", "Do not include pg_settings metrics.").Default("false").Envar("PG_EXPORTER_DISABLE_SETTINGS_METRICS").Bool() - autoDiscoverDatabases = kingpin.Flag("auto-discover-databases", "Whether to discover the databases on a server dynamically.").Default("false").Envar("PG_EXPORTER_AUTO_DISCOVER_DATABASES").Bool() - queriesPath = kingpin.Flag("extend.query-path", "Path to custom queries to run.").Default("").Envar("PG_EXPORTER_EXTEND_QUERY_PATH").String() - onlyDumpMaps = kingpin.Flag("dumpmaps", "Do not run, simply dump the maps.").Bool() - constantLabelsList = kingpin.Flag("constantLabels", "A list of label=value separated by comma(,).").Default("").Envar("PG_EXPORTER_CONSTANT_LABELS").String() - excludeDatabases = kingpin.Flag("exclude-databases", "A list of databases to remove when autoDiscoverDatabases is enabled").Default("").Envar("PG_EXPORTER_EXCLUDE_DATABASES").String() - includeDatabases = kingpin.Flag("include-databases", "A list of databases to include when autoDiscoverDatabases is enabled").Default("").Envar("PG_EXPORTER_INCLUDE_DATABASES").String() - metricPrefix = kingpin.Flag("metric-prefix", "A metric prefix can be used to have non-default (not \"pg\") prefixes for each of the metrics").Default("pg").Envar("PG_EXPORTER_METRIC_PREFIX").String() - logger = log.NewNopLogger() -) - -// Metric name parts. -const ( - // Namespace for all metrics. - namespace = "pg" - // Subsystems. - exporter = "exporter" - // The name of the exporter. - exporterName = "postgres_exporter" - // Metric label used for static string data thats handy to send to Prometheus - // e.g. version - staticLabelName = "static" - // Metric label used for server identification. - serverLabelName = "server" ) // ColumnUsage should be one of several enum values which describe how a // queried row is to be converted to a Prometheus metric. type ColumnUsage int -// nolint: golint const ( - DISCARD ColumnUsage = iota // Ignore this column - LABEL ColumnUsage = iota // Use this column as a label - COUNTER ColumnUsage = iota // Use this column as a counter - GAUGE ColumnUsage = iota // Use this column as a gauge - MAPPEDMETRIC ColumnUsage = iota // Use this column with the supplied mapping of text values - DURATION ColumnUsage = iota // This column should be interpreted as a text duration (and converted to milliseconds) - HISTOGRAM ColumnUsage = iota // Use this column as a histogram + // DISCARD ignores a column + DISCARD ColumnUsage = iota + // LABEL identifies a column as a label + LABEL ColumnUsage = iota + // COUNTER identifies a column as a counter + COUNTER ColumnUsage = iota + // GAUGE identifies a column as a gauge + GAUGE ColumnUsage = iota + // MAPPEDMETRIC identifies a column as a mapping of text values + MAPPEDMETRIC ColumnUsage = iota + // DURATION identifies a column as a text duration (and converted to milliseconds) + DURATION ColumnUsage = iota + // HISTOGRAM identifies a column as a histogram + HISTOGRAM ColumnUsage = iota ) // UnmarshalYAML implements the yaml.Unmarshaller interface. @@ -114,21 +74,9 @@ type MappingOptions struct { SupportedVersions semver.Range `yaml:"pg_version"` // Semantic version ranges which are supported. Unsupported columns are not queried (internally converted to DISCARD). } -// nolint: golint +// Mapping represents a set of MappingOptions type Mapping map[string]MappingOptions -// nolint: golint -type UserQuery struct { - Query string `yaml:"query"` - Metrics []Mapping `yaml:"metrics"` - Master bool `yaml:"master"` // Querying only for master database - CacheSeconds uint64 `yaml:"cache_seconds"` // Number of seconds to cache the namespace result metrics for. - RunOnServer string `yaml:"runonserver"` // Querying to run on which server version -} - -// nolint: golint -type UserQueries map[string]UserQuery - // Regex used to get the "short-version" from the postgres version field. var versionRegex = regexp.MustCompile(`^\w+ ((\d+)(\.\d+)?(\.\d+)?)`) var lowestSupportedVersion = semver.MustParse("9.1.0") @@ -361,254 +309,6 @@ var builtinMetricMaps = map[string]intermediateMetricMap{ }, } -// OverrideQuery 's are run in-place of simple namespace look ups, and provide -// advanced functionality. But they have a tendency to postgres version specific. -// There aren't too many versions, so we simply store customized versions using -// the semver matching we do for columns. -type OverrideQuery struct { - versionRange semver.Range - query string -} - -// Overriding queries for namespaces above. -// TODO: validate this is a closed set in tests, and there are no overlaps -var queryOverrides = map[string][]OverrideQuery{ - "pg_locks": { - { - semver.MustParseRange(">0.0.0"), - `SELECT pg_database.datname,tmp.mode,COALESCE(count,0) as count - FROM - ( - VALUES ('accesssharelock'), - ('rowsharelock'), - ('rowexclusivelock'), - ('shareupdateexclusivelock'), - ('sharelock'), - ('sharerowexclusivelock'), - ('exclusivelock'), - ('accessexclusivelock'), - ('sireadlock') - ) AS tmp(mode) CROSS JOIN pg_database - LEFT JOIN - (SELECT database, lower(mode) AS mode,count(*) AS count - FROM pg_locks WHERE database IS NOT NULL - GROUP BY database, lower(mode) - ) AS tmp2 - ON tmp.mode=tmp2.mode and pg_database.oid = tmp2.database ORDER BY 1`, - }, - }, - - "pg_stat_replication": { - { - semver.MustParseRange(">=10.0.0"), - ` - SELECT *, - (case pg_is_in_recovery() when 't' then null else pg_current_wal_lsn() end) AS pg_current_wal_lsn, - (case pg_is_in_recovery() when 't' then null else pg_wal_lsn_diff(pg_current_wal_lsn(), pg_lsn('0/0'))::float end) AS pg_current_wal_lsn_bytes, - (case pg_is_in_recovery() when 't' then null else pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn)::float end) AS pg_wal_lsn_diff - FROM pg_stat_replication - `, - }, - { - semver.MustParseRange(">=9.2.0 <10.0.0"), - ` - SELECT *, - (case pg_is_in_recovery() when 't' then null else pg_current_xlog_location() end) AS pg_current_xlog_location, - (case pg_is_in_recovery() when 't' then null else pg_xlog_location_diff(pg_current_xlog_location(), replay_location)::float end) AS pg_xlog_location_diff - FROM pg_stat_replication - `, - }, - { - semver.MustParseRange("<9.2.0"), - ` - SELECT *, - (case pg_is_in_recovery() when 't' then null else pg_current_xlog_location() end) AS pg_current_xlog_location - FROM pg_stat_replication - `, - }, - }, - - "pg_replication_slots": { - { - semver.MustParseRange(">=9.4.0 <10.0.0"), - ` - SELECT slot_name, database, active, pg_xlog_location_diff(pg_current_xlog_location(), restart_lsn) - FROM pg_replication_slots - `, - }, - { - semver.MustParseRange(">=10.0.0"), - ` - SELECT slot_name, database, active, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) - FROM pg_replication_slots - `, - }, - }, - - "pg_stat_archiver": { - { - semver.MustParseRange(">=0.0.0"), - ` - SELECT *, - extract(epoch from now() - last_archived_time) AS last_archive_age - FROM pg_stat_archiver - `, - }, - }, - - "pg_stat_activity": { - // This query only works - { - semver.MustParseRange(">=9.2.0"), - ` - SELECT - pg_database.datname, - tmp.state, - COALESCE(count,0) as count, - COALESCE(max_tx_duration,0) as max_tx_duration - FROM - ( - VALUES ('active'), - ('idle'), - ('idle in transaction'), - ('idle in transaction (aborted)'), - ('fastpath function call'), - ('disabled') - ) AS tmp(state) CROSS JOIN pg_database - LEFT JOIN - ( - SELECT - datname, - state, - count(*) AS count, - MAX(EXTRACT(EPOCH FROM now() - xact_start))::float AS max_tx_duration - FROM pg_stat_activity GROUP BY datname,state) AS tmp2 - ON tmp.state = tmp2.state AND pg_database.datname = tmp2.datname - `, - }, - { - semver.MustParseRange("<9.2.0"), - ` - SELECT - datname, - 'unknown' AS state, - COALESCE(count(*),0) AS count, - COALESCE(MAX(EXTRACT(EPOCH FROM now() - xact_start))::float,0) AS max_tx_duration - FROM pg_stat_activity GROUP BY datname - `, - }, - }, -} - -// Convert the query override file to the version-specific query override file -// for the exporter. -func makeQueryOverrideMap(pgVersion semver.Version, queryOverrides map[string][]OverrideQuery) map[string]string { - resultMap := make(map[string]string) - for name, overrideDef := range queryOverrides { - // Find a matching semver. We make it an error to have overlapping - // ranges at test-time, so only 1 should ever match. - matched := false - for _, queryDef := range overrideDef { - if queryDef.versionRange(pgVersion) { - resultMap[name] = queryDef.query - matched = true - break - } - } - if !matched { - level.Warn(logger).Log("msg", "No query matched override, disabling metric space", "name", name) - resultMap[name] = "" - } - } - - return resultMap -} - -func parseUserQueries(content []byte) (map[string]intermediateMetricMap, map[string]string, error) { - var userQueries UserQueries - - err := yaml.Unmarshal(content, &userQueries) - if err != nil { - return nil, nil, err - } - - // Stores the loaded map representation - metricMaps := make(map[string]intermediateMetricMap) - newQueryOverrides := make(map[string]string) - - for metric, specs := range userQueries { - level.Debug(logger).Log("msg", "New user metric namespace from YAML metric", "metric", metric, "cache_seconds", specs.CacheSeconds) - newQueryOverrides[metric] = specs.Query - metricMap, ok := metricMaps[metric] - if !ok { - // Namespace for metric not found - add it. - newMetricMap := make(map[string]ColumnMapping) - metricMap = intermediateMetricMap{ - columnMappings: newMetricMap, - master: specs.Master, - cacheSeconds: specs.CacheSeconds, - } - metricMaps[metric] = metricMap - } - for _, metric := range specs.Metrics { - for name, mappingOption := range metric { - var columnMapping ColumnMapping - tmpUsage, _ := stringToColumnUsage(mappingOption.Usage) - columnMapping.usage = tmpUsage - columnMapping.description = mappingOption.Description - - // TODO: we should support cu - columnMapping.mapping = nil - // Should we support this for users? - columnMapping.supportedVersions = nil - - metricMap.columnMappings[name] = columnMapping - } - } - } - return metricMaps, newQueryOverrides, nil -} - -// Add queries to the builtinMetricMaps and queryOverrides maps. Added queries do not -// respect version requirements, because it is assumed that the user knows -// what they are doing with their version of postgres. -// -// This function modifies metricMap and queryOverrideMap to contain the new -// queries. -// TODO: test code for all cu. -// TODO: the YAML this supports is "non-standard" - we should move away from it. -func addQueries(content []byte, pgVersion semver.Version, server *Server) error { - metricMaps, newQueryOverrides, err := parseUserQueries(content) - if err != nil { - return err - } - // Convert the loaded metric map into exporter representation - partialExporterMap := makeDescMap(pgVersion, server.labels, metricMaps) - - // Merge the two maps (which are now quite flatteend) - for k, v := range partialExporterMap { - _, found := server.metricMap[k] - if found { - level.Debug(logger).Log("msg", "Overriding metric from user YAML file", "metric", k) - } else { - level.Debug(logger).Log("msg", "Adding new metric from user YAML file", "metric", k) - } - server.metricMap[k] = v - } - - // Merge the query override map - for k, v := range newQueryOverrides { - _, found := server.queryOverrides[k] - if found { - level.Debug(logger).Log("msg", "Overriding query override from user YAML file", "query_override", k) - } else { - level.Debug(logger).Log("msg", "Adding new query override from user YAML file", "query_override", k) - } - server.queryOverrides[k] = v - } - return nil -} - // Turn the MetricMap column mapping into a prometheus descriptor mapping. func makeDescMap(pgVersion semver.Version, serverLabels prometheus.Labels, metricMaps map[string]intermediateMetricMap) map[string]MetricMapNamespace { var metricMap = make(map[string]MetricMapNamespace) @@ -745,359 +445,11 @@ func makeDescMap(pgVersion semver.Version, serverLabels prometheus.Labels, metri return metricMap } -// convert a string to the corresponding ColumnUsage -func stringToColumnUsage(s string) (ColumnUsage, error) { - var u ColumnUsage - var err error - switch s { - case "DISCARD": - u = DISCARD - - case "LABEL": - u = LABEL - - case "COUNTER": - u = COUNTER - - case "GAUGE": - u = GAUGE - - case "HISTOGRAM": - u = HISTOGRAM - - case "MAPPEDMETRIC": - u = MAPPEDMETRIC - - case "DURATION": - u = DURATION - - default: - err = fmt.Errorf("wrong ColumnUsage given : %s", s) - } - - return u, err -} - -// Convert database.sql types to float64s for Prometheus consumption. Null types are mapped to NaN. string and []byte -// types are mapped as NaN and !ok -func dbToFloat64(t interface{}) (float64, bool) { - switch v := t.(type) { - case int64: - return float64(v), true - case float64: - return v, true - case time.Time: - return float64(v.Unix()), true - case []byte: - // Try and convert to string and then parse to a float64 - strV := string(v) - result, err := strconv.ParseFloat(strV, 64) - if err != nil { - level.Info(logger).Log("msg", "Could not parse []byte", "err", err) - return math.NaN(), false - } - return result, true - case string: - result, err := strconv.ParseFloat(v, 64) - if err != nil { - level.Info(logger).Log("msg", "Could not parse string", "err", err) - return math.NaN(), false - } - return result, true - case bool: - if v { - return 1.0, true - } - return 0.0, true - case nil: - return math.NaN(), true - default: - return math.NaN(), false - } -} - -// Convert database.sql types to uint64 for Prometheus consumption. Null types are mapped to 0. string and []byte -// types are mapped as 0 and !ok -func dbToUint64(t interface{}) (uint64, bool) { - switch v := t.(type) { - case uint64: - return v, true - case int64: - return uint64(v), true - case float64: - return uint64(v), true - case time.Time: - return uint64(v.Unix()), true - case []byte: - // Try and convert to string and then parse to a uint64 - strV := string(v) - result, err := strconv.ParseUint(strV, 10, 64) - if err != nil { - level.Info(logger).Log("msg", "Could not parse []byte", "err", err) - return 0, false - } - return result, true - case string: - result, err := strconv.ParseUint(v, 10, 64) - if err != nil { - level.Info(logger).Log("msg", "Could not parse string", "err", err) - return 0, false - } - return result, true - case bool: - if v { - return 1, true - } - return 0, true - case nil: - return 0, true - default: - return 0, false - } -} - -// Convert database.sql to string for Prometheus labels. Null types are mapped to empty strings. -func dbToString(t interface{}) (string, bool) { - switch v := t.(type) { - case int64: - return fmt.Sprintf("%v", v), true - case float64: - return fmt.Sprintf("%v", v), true - case time.Time: - return fmt.Sprintf("%v", v.Unix()), true - case nil: - return "", true - case []byte: - // Try and convert to string - return string(v), true - case string: - return v, true - case bool: - if v { - return "true", true - } - return "false", true - default: - return "", false - } -} - -func parseFingerprint(url string) (string, error) { - dsn, err := pq.ParseURL(url) - if err != nil { - dsn = url - } - - pairs := strings.Split(dsn, " ") - kv := make(map[string]string, len(pairs)) - for _, pair := range pairs { - splitted := strings.SplitN(pair, "=", 2) - if len(splitted) != 2 { - return "", fmt.Errorf("malformed dsn %q", dsn) - } - kv[splitted[0]] = splitted[1] - } - - var fingerprint string - - if host, ok := kv["host"]; ok { - fingerprint += host - } else { - fingerprint += "localhost" - } - - if port, ok := kv["port"]; ok { - fingerprint += ":" + port - } else { - fingerprint += ":5432" - } - - return fingerprint, nil -} - -func loggableDSN(dsn string) string { - pDSN, err := url.Parse(dsn) - if err != nil { - return "could not parse DATA_SOURCE_NAME" - } - // Blank user info if not nil - if pDSN.User != nil { - pDSN.User = url.UserPassword(pDSN.User.Username(), "PASSWORD_REMOVED") - } - - return pDSN.String() -} - type cachedMetrics struct { metrics []prometheus.Metric lastScrape time.Time } -// Server describes a connection to Postgres. -// Also it contains metrics map and query overrides. -type Server struct { - db *sql.DB - labels prometheus.Labels - master bool - runonserver string - - // Last version used to calculate metric map. If mismatch on scrape, - // then maps are recalculated. - lastMapVersion semver.Version - // Currently active metric map - metricMap map[string]MetricMapNamespace - // Currently active query overrides - queryOverrides map[string]string - mappingMtx sync.RWMutex - // Currently cached metrics - metricCache map[string]cachedMetrics - cacheMtx sync.Mutex -} - -// ServerOpt configures a server. -type ServerOpt func(*Server) - -// ServerWithLabels configures a set of labels. -func ServerWithLabels(labels prometheus.Labels) ServerOpt { - return func(s *Server) { - for k, v := range labels { - s.labels[k] = v - } - } -} - -// NewServer establishes a new connection using DSN. -func NewServer(dsn string, opts ...ServerOpt) (*Server, error) { - fingerprint, err := parseFingerprint(dsn) - if err != nil { - return nil, err - } - - db, err := sql.Open("postgres", dsn) - if err != nil { - return nil, err - } - db.SetMaxOpenConns(1) - db.SetMaxIdleConns(1) - - level.Info(logger).Log("msg", "Established new database connection", "fingerprint", fingerprint) - - s := &Server{ - db: db, - master: false, - labels: prometheus.Labels{ - serverLabelName: fingerprint, - }, - metricCache: make(map[string]cachedMetrics), - } - - for _, opt := range opts { - opt(s) - } - - return s, nil -} - -// Close disconnects from Postgres. -func (s *Server) Close() error { - return s.db.Close() -} - -// Ping checks connection availability and possibly invalidates the connection if it fails. -func (s *Server) Ping() error { - if err := s.db.Ping(); err != nil { - if cerr := s.Close(); cerr != nil { - level.Error(logger).Log("msg", "Error while closing non-pinging DB connection", "server", s, "err", cerr) - } - return err - } - return nil -} - -// String returns server's fingerprint. -func (s *Server) String() string { - return s.labels[serverLabelName] -} - -// Scrape loads metrics. -func (s *Server) Scrape(ch chan<- prometheus.Metric, disableSettingsMetrics bool) error { - s.mappingMtx.RLock() - defer s.mappingMtx.RUnlock() - - var err error - - if !disableSettingsMetrics && s.master { - if err = querySettings(ch, s); err != nil { - err = fmt.Errorf("error retrieving settings: %s", err) - } - } - - errMap := queryNamespaceMappings(ch, s) - if len(errMap) > 0 { - err = fmt.Errorf("queryNamespaceMappings returned %d errors", len(errMap)) - } - - return err -} - -// Servers contains a collection of servers to Postgres. -type Servers struct { - m sync.Mutex - servers map[string]*Server - opts []ServerOpt -} - -// NewServers creates a collection of servers to Postgres. -func NewServers(opts ...ServerOpt) *Servers { - return &Servers{ - servers: make(map[string]*Server), - opts: opts, - } -} - -// GetServer returns established connection from a collection. -func (s *Servers) GetServer(dsn string) (*Server, error) { - s.m.Lock() - defer s.m.Unlock() - var err error - var ok bool - errCount := 0 // start at zero because we increment before doing work - retries := 1 - var server *Server - for { - if errCount++; errCount > retries { - return nil, err - } - server, ok = s.servers[dsn] - if !ok { - server, err = NewServer(dsn, s.opts...) - if err != nil { - time.Sleep(time.Duration(errCount) * time.Second) - continue - } - s.servers[dsn] = server - } - if err = server.Ping(); err != nil { - delete(s.servers, dsn) - time.Sleep(time.Duration(errCount) * time.Second) - continue - } - break - } - return server, nil -} - -// Close disconnects from all known servers. -func (s *Servers) Close() { - s.m.Lock() - defer s.m.Unlock() - for _, server := range s.servers { - if err := server.Close(); err != nil { - level.Error(logger).Log("msg", "Failed to close connection", "server", server, "err", err) - } - } -} - // Exporter collects Postgres metrics. It implements prometheus.Collector. type Exporter struct { // Holds a reference to the build in column mappings. Currently this is for testing purposes @@ -1214,15 +566,11 @@ func NewExporter(dsn []string, opts ...ExporterOpt) *Exporter { } e.setupInternalMetrics() - e.setupServers() + e.servers = NewServers(ServerWithLabels(e.constantLabels)) return e } -func (e *Exporter) setupServers() { - e.servers = NewServers(ServerWithLabels(e.constantLabels)) -} - func (e *Exporter) setupInternalMetrics() { e.duration = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, @@ -1282,277 +630,29 @@ func newDesc(subsystem, name, help string, labels prometheus.Labels) *prometheus ) } -func queryDatabases(server *Server) ([]string, error) { - rows, err := server.db.Query("SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database()") +func checkPostgresVersion(db *sql.DB, server string) (semver.Version, string, error) { + level.Debug(logger).Log("msg", "Querying PostgreSQL version", "server", server) + versionRow := db.QueryRow("SELECT version();") + var versionString string + err := versionRow.Scan(&versionString) if err != nil { - return nil, fmt.Errorf("Error retrieving databases: %v", err) - } - defer rows.Close() // nolint: errcheck - - var databaseName string - result := make([]string, 0) - for rows.Next() { - err = rows.Scan(&databaseName) - if err != nil { - return nil, errors.New(fmt.Sprintln("Error retrieving rows:", err)) - } - result = append(result, databaseName) - } - - return result, nil -} - -// Query within a namespace mapping and emit metrics. Returns fatal errors if -// the scrape fails, and a slice of errors if they were non-fatal. -func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNamespace) ([]prometheus.Metric, []error, error) { - // Check for a query override for this namespace - query, found := server.queryOverrides[namespace] - - // Was this query disabled (i.e. nothing sensible can be queried on cu - // version of PostgreSQL? - if query == "" && found { - // Return success (no pertinent data) - return []prometheus.Metric{}, []error{}, nil - } - - // Don't fail on a bad scrape of one metric - var rows *sql.Rows - var err error - - if !found { - // I've no idea how to avoid this properly at the moment, but this is - // an admin tool so you're not injecting SQL right? - rows, err = server.db.Query(fmt.Sprintf("SELECT * FROM %s;", namespace)) // nolint: gas - } else { - rows, err = server.db.Query(query) + return semver.Version{}, "", fmt.Errorf("Error scanning version string on %q: %v", server, err) } + semanticVersion, err := parseVersion(versionString) if err != nil { - return []prometheus.Metric{}, []error{}, fmt.Errorf("Error running query on database %q: %s %v", server, namespace, err) - } - defer rows.Close() // nolint: errcheck - - var columnNames []string - columnNames, err = rows.Columns() - if err != nil { - return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving column list for: ", namespace, err)) + return semver.Version{}, "", fmt.Errorf("Error parsing version string on %q: %v", server, err) } - // Make a lookup map for the column indices - var columnIdx = make(map[string]int, len(columnNames)) - for i, n := range columnNames { - columnIdx[n] = i - } - - var columnData = make([]interface{}, len(columnNames)) - var scanArgs = make([]interface{}, len(columnNames)) - for i := range columnData { - scanArgs[i] = &columnData[i] - } - - nonfatalErrors := []error{} - - metrics := make([]prometheus.Metric, 0) - - for rows.Next() { - err = rows.Scan(scanArgs...) - if err != nil { - return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving rows:", namespace, err)) - } - - // Get the label values for this row. - labels := make([]string, len(mapping.labels)) - for idx, label := range mapping.labels { - labels[idx], _ = dbToString(columnData[columnIdx[label]]) - } - - // Loop over column names, and match to scan data. Unknown columns - // will be filled with an untyped metric number *if* they can be - // converted to float64s. NULLs are allowed and treated as NaN. - for idx, columnName := range columnNames { - var metric prometheus.Metric - if metricMapping, ok := mapping.columnMappings[columnName]; ok { - // Is this a metricy metric? - if metricMapping.discard { - continue - } - - if metricMapping.histogram { - var keys []float64 - err = pq.Array(&keys).Scan(columnData[idx]) - if err != nil { - return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving", columnName, "buckets:", namespace, err)) - } - - var values []int64 - valuesIdx, ok := columnIdx[columnName+"_bucket"] - if !ok { - nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Missing column: ", namespace, columnName+"_bucket"))) - continue - } - err = pq.Array(&values).Scan(columnData[valuesIdx]) - if err != nil { - return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving", columnName, "bucket values:", namespace, err)) - } - - buckets := make(map[float64]uint64, len(keys)) - for i, key := range keys { - if i >= len(values) { - break - } - buckets[key] = uint64(values[i]) - } - - idx, ok = columnIdx[columnName+"_sum"] - if !ok { - nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Missing column: ", namespace, columnName+"_sum"))) - continue - } - sum, ok := dbToFloat64(columnData[idx]) - if !ok { - nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName+"_sum", columnData[idx]))) - continue - } - - idx, ok = columnIdx[columnName+"_count"] - if !ok { - nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Missing column: ", namespace, columnName+"_count"))) - continue - } - count, ok := dbToUint64(columnData[idx]) - if !ok { - nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName+"_count", columnData[idx]))) - continue - } - - metric = prometheus.MustNewConstHistogram( - metricMapping.desc, - count, sum, buckets, - labels..., - ) - } else { - value, ok := dbToFloat64(columnData[idx]) - if !ok { - nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName, columnData[idx]))) - continue - } - // Generate the metric - metric = prometheus.MustNewConstMetric(metricMapping.desc, metricMapping.vtype, value, labels...) - } - } else { - // Unknown metric. Report as untyped if scan to float64 works, else note an error too. - metricLabel := fmt.Sprintf("%s_%s", namespace, columnName) - desc := prometheus.NewDesc(metricLabel, fmt.Sprintf("Unknown metric from %s", namespace), mapping.labels, server.labels) - - // Its not an error to fail here, since the values are - // unexpected anyway. - value, ok := dbToFloat64(columnData[idx]) - if !ok { - nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unparseable column type - discarding: ", namespace, columnName, err))) - continue - } - metric = prometheus.MustNewConstMetric(desc, prometheus.UntypedValue, value, labels...) - } - metrics = append(metrics, metric) - } - } - return metrics, nonfatalErrors, nil -} - -// Iterate through all the namespace mappings in the exporter and run their -// queries. -func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[string]error { - // Return a map of namespace -> errors - namespaceErrors := make(map[string]error) - - scrapeStart := time.Now() - - for namespace, mapping := range server.metricMap { - level.Debug(logger).Log("msg", "Querying namespace", "namespace", namespace) - - if mapping.master && !server.master { - level.Debug(logger).Log("msg", "Query skipped...") - continue - } - - // check if the query is to be run on specific database server version range or not - if len(server.runonserver) > 0 { - serVersion, _ := semver.Parse(server.lastMapVersion.String()) - runServerRange, _ := semver.ParseRange(server.runonserver) - if !runServerRange(serVersion) { - level.Debug(logger).Log("msg", "Query skipped for this database version", "version", server.lastMapVersion.String(), "target_version", server.runonserver) - continue - } - } - - scrapeMetric := false - // Check if the metric is cached - server.cacheMtx.Lock() - cachedMetric, found := server.metricCache[namespace] - server.cacheMtx.Unlock() - // If found, check if needs refresh from cache - if found { - if scrapeStart.Sub(cachedMetric.lastScrape).Seconds() > float64(mapping.cacheSeconds) { - scrapeMetric = true - } - } else { - scrapeMetric = true - } - - var metrics []prometheus.Metric - var nonFatalErrors []error - var err error - if scrapeMetric { - metrics, nonFatalErrors, err = queryNamespaceMapping(server, namespace, mapping) - } else { - metrics = cachedMetric.metrics - } - - // Serious error - a namespace disappeared - if err != nil { - namespaceErrors[namespace] = err - level.Info(logger).Log("err", err) - } - // Non-serious errors - likely version or parsing problems. - if len(nonFatalErrors) > 0 { - for _, err := range nonFatalErrors { - level.Info(logger).Log("err", err) - } - } - - // Emit the metrics into the channel - for _, metric := range metrics { - ch <- metric - } - - if scrapeMetric { - // Only cache if metric is meaningfully cacheable - if mapping.cacheSeconds > 0 { - server.cacheMtx.Lock() - server.metricCache[namespace] = cachedMetrics{ - metrics: metrics, - lastScrape: scrapeStart, - } - server.cacheMtx.Unlock() - } - } - } - - return namespaceErrors + return semanticVersion, versionString, nil } // Check and update the exporters query maps if the version has changed. func (e *Exporter) checkMapVersions(ch chan<- prometheus.Metric, server *Server) error { - level.Debug(logger).Log("msg", "Querying PostgreSQL version", "server", server) - versionRow := server.db.QueryRow("SELECT version();") - var versionString string - err := versionRow.Scan(&versionString) + semanticVersion, versionString, err := checkPostgresVersion(server.db, server.String()) if err != nil { - return fmt.Errorf("Error scanning version string on %q: %v", server, err) - } - semanticVersion, err := parseVersion(versionString) - if err != nil { - return fmt.Errorf("Error parsing version string on %q: %v", server, err) + return fmt.Errorf("Error fetching version string on %q: %v", server, err) } + if !e.disableDefaultMetrics && semanticVersion.LT(lowestSupportedVersion) { level.Warn(logger).Log("msg", "PostgreSQL version is lower than our lowest supported version", "server", server, "version", semanticVersion, "lowest_supported_version", lowestSupportedVersion) } @@ -1650,222 +750,3 @@ func (e *Exporter) scrape(ch chan<- prometheus.Metric) { e.error.Set(1) } } - -func (e *Exporter) discoverDatabaseDSNs() []string { - // connstring syntax is complex (and not sure if even regular). - // we don't need to parse it, so just superficially validate that it starts - // with a valid-ish keyword pair - connstringRe := regexp.MustCompile(`^ *[a-zA-Z0-9]+ *= *[^= ]+`) - - dsns := make(map[string]struct{}) - for _, dsn := range e.dsn { - var dsnURI *url.URL - var dsnConnstring string - - if strings.HasPrefix(dsn, "postgresql://") { - var err error - dsnURI, err = url.Parse(dsn) - if err != nil { - level.Error(logger).Log("msg", "Unable to parse DSN as URI", "dsn", loggableDSN(dsn), "err", err) - continue - } - } else if connstringRe.MatchString(dsn) { - dsnConnstring = dsn - } else { - level.Error(logger).Log("msg", "Unable to parse DSN as either URI or connstring", "dsn", loggableDSN(dsn)) - continue - } - - server, err := e.servers.GetServer(dsn) - if err != nil { - level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err) - continue - } - dsns[dsn] = struct{}{} - - // If autoDiscoverDatabases is true, set first dsn as master database (Default: false) - server.master = true - - databaseNames, err := queryDatabases(server) - if err != nil { - level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err) - continue - } - for _, databaseName := range databaseNames { - if contains(e.excludeDatabases, databaseName) { - continue - } - - if len(e.includeDatabases) != 0 && !contains(e.includeDatabases, databaseName) { - continue - } - - if dsnURI != nil { - dsnURI.Path = databaseName - dsn = dsnURI.String() - } else { - // replacing one dbname with another is complicated. - // just append new dbname to override. - dsn = fmt.Sprintf("%s dbname=%s", dsnConnstring, databaseName) - } - dsns[dsn] = struct{}{} - } - } - - result := make([]string, len(dsns)) - index := 0 - for dsn := range dsns { - result[index] = dsn - index++ - } - - return result -} - -func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error { - server, err := e.servers.GetServer(dsn) - - if err != nil { - return &ErrorConnectToServer{fmt.Sprintf("Error opening connection to database (%s): %s", loggableDSN(dsn), err.Error())} - } - - // Check if autoDiscoverDatabases is false, set dsn as master database (Default: false) - if !e.autoDiscoverDatabases { - server.master = true - } - - // Check if map versions need to be updated - if err := e.checkMapVersions(ch, server); err != nil { - level.Warn(logger).Log("msg", "Proceeding with outdated query maps, as the Postgres version could not be determined", "err", err) - } - - return server.Scrape(ch, e.disableSettingsMetrics) -} - -// try to get the DataSource -// DATA_SOURCE_NAME always wins so we do not break older versions -// reading secrets from files wins over secrets in environment variables -// DATA_SOURCE_NAME > DATA_SOURCE_{USER|PASS}_FILE > DATA_SOURCE_{USER|PASS} -func getDataSources() ([]string, error) { - var dsn = os.Getenv("DATA_SOURCE_NAME") - if len(dsn) != 0 { - return strings.Split(dsn, ","), nil - } - - var user, pass, uri string - - dataSourceUserFile := os.Getenv("DATA_SOURCE_USER_FILE") - if len(dataSourceUserFile) != 0 { - fileContents, err := ioutil.ReadFile(dataSourceUserFile) - if err != nil { - return nil, fmt.Errorf("failed loading data source user file %s: %s", dataSourceUserFile, err.Error()) - } - user = strings.TrimSpace(string(fileContents)) - } else { - user = os.Getenv("DATA_SOURCE_USER") - } - - dataSourcePassFile := os.Getenv("DATA_SOURCE_PASS_FILE") - if len(dataSourcePassFile) != 0 { - fileContents, err := ioutil.ReadFile(dataSourcePassFile) - if err != nil { - return nil, fmt.Errorf("failed loading data source pass file %s: %s", dataSourcePassFile, err.Error()) - } - pass = strings.TrimSpace(string(fileContents)) - } else { - pass = os.Getenv("DATA_SOURCE_PASS") - } - - ui := url.UserPassword(user, pass).String() - dataSrouceURIFile := os.Getenv("DATA_SOURCE_URI_FILE") - if len(dataSrouceURIFile) != 0 { - fileContents, err := ioutil.ReadFile(dataSrouceURIFile) - if err != nil { - return nil, fmt.Errorf("failed loading data source URI file %s: %s", dataSrouceURIFile, err.Error()) - } - uri = strings.TrimSpace(string(fileContents)) - } else { - uri = os.Getenv("DATA_SOURCE_URI") - } - - dsn = "postgresql://" + ui + "@" + uri - - return []string{dsn}, nil -} - -func contains(a []string, x string) bool { - for _, n := range a { - if x == n { - return true - } - } - return false -} - -func main() { - kingpin.Version(version.Print(exporterName)) - promlogConfig := &promlog.Config{} - flag.AddFlags(kingpin.CommandLine, promlogConfig) - kingpin.HelpFlag.Short('h') - kingpin.Parse() - logger = promlog.New(promlogConfig) - - // landingPage contains the HTML served at '/'. - // TODO: Make this nicer and more informative. - var landingPage = []byte(` - Postgres exporter - -

Postgres exporter

-

Metrics

- - - `) - - if *onlyDumpMaps { - dumpMaps() - return - } - - dsn, err := getDataSources() - if err != nil { - level.Error(logger).Log("msg", "Failed reading data sources", "err", err.Error()) - os.Exit(1) - } - - if len(dsn) == 0 { - level.Error(logger).Log("msg", "Couldn't find environment variables describing the datasource to use") - os.Exit(1) - } - - opts := []ExporterOpt{ - DisableDefaultMetrics(*disableDefaultMetrics), - DisableSettingsMetrics(*disableSettingsMetrics), - AutoDiscoverDatabases(*autoDiscoverDatabases), - WithUserQueriesPath(*queriesPath), - WithConstantLabels(*constantLabelsList), - ExcludeDatabases(*excludeDatabases), - IncludeDatabases(*includeDatabases), - } - - exporter := NewExporter(dsn, opts...) - defer func() { - exporter.servers.Close() - }() - - prometheus.MustRegister(version.NewCollector(exporterName)) - - prometheus.MustRegister(exporter) - - http.Handle(*metricPath, promhttp.Handler()) - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "text/html; charset=UTF-8") // nolint: errcheck - w.Write(landingPage) // nolint: errcheck - }) - - level.Info(logger).Log("msg", "Listening on address", "address", *listenAddress) - srv := &http.Server{Addr: *listenAddress} - if err := web.ListenAndServe(srv, *webConfig, logger); err != nil { - level.Error(logger).Log("msg", "Error running HTTP server", "err", err) - os.Exit(1) - } -} diff --git a/cmd/postgres_exporter/queries.go b/cmd/postgres_exporter/queries.go new file mode 100644 index 00000000..903e1a27 --- /dev/null +++ b/cmd/postgres_exporter/queries.go @@ -0,0 +1,303 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "errors" + "fmt" + + "github.com/blang/semver" + "github.com/go-kit/kit/log/level" + "gopkg.in/yaml.v2" +) + +// UserQuery represents a user defined query +type UserQuery struct { + Query string `yaml:"query"` + Metrics []Mapping `yaml:"metrics"` + Master bool `yaml:"master"` // Querying only for master database + CacheSeconds uint64 `yaml:"cache_seconds"` // Number of seconds to cache the namespace result metrics for. + RunOnServer string `yaml:"runonserver"` // Querying to run on which server version +} + +// UserQueries represents a set of UserQuery objects +type UserQueries map[string]UserQuery + +// OverrideQuery 's are run in-place of simple namespace look ups, and provide +// advanced functionality. But they have a tendency to postgres version specific. +// There aren't too many versions, so we simply store customized versions using +// the semver matching we do for columns. +type OverrideQuery struct { + versionRange semver.Range + query string +} + +// Overriding queries for namespaces above. +// TODO: validate this is a closed set in tests, and there are no overlaps +var queryOverrides = map[string][]OverrideQuery{ + "pg_locks": { + { + semver.MustParseRange(">0.0.0"), + `SELECT pg_database.datname,tmp.mode,COALESCE(count,0) as count + FROM + ( + VALUES ('accesssharelock'), + ('rowsharelock'), + ('rowexclusivelock'), + ('shareupdateexclusivelock'), + ('sharelock'), + ('sharerowexclusivelock'), + ('exclusivelock'), + ('accessexclusivelock'), + ('sireadlock') + ) AS tmp(mode) CROSS JOIN pg_database + LEFT JOIN + (SELECT database, lower(mode) AS mode,count(*) AS count + FROM pg_locks WHERE database IS NOT NULL + GROUP BY database, lower(mode) + ) AS tmp2 + ON tmp.mode=tmp2.mode and pg_database.oid = tmp2.database ORDER BY 1`, + }, + }, + + "pg_stat_replication": { + { + semver.MustParseRange(">=10.0.0"), + ` + SELECT *, + (case pg_is_in_recovery() when 't' then null else pg_current_wal_lsn() end) AS pg_current_wal_lsn, + (case pg_is_in_recovery() when 't' then null else pg_wal_lsn_diff(pg_current_wal_lsn(), pg_lsn('0/0'))::float end) AS pg_current_wal_lsn_bytes, + (case pg_is_in_recovery() when 't' then null else pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn)::float end) AS pg_wal_lsn_diff + FROM pg_stat_replication + `, + }, + { + semver.MustParseRange(">=9.2.0 <10.0.0"), + ` + SELECT *, + (case pg_is_in_recovery() when 't' then null else pg_current_xlog_location() end) AS pg_current_xlog_location, + (case pg_is_in_recovery() when 't' then null else pg_xlog_location_diff(pg_current_xlog_location(), replay_location)::float end) AS pg_xlog_location_diff + FROM pg_stat_replication + `, + }, + { + semver.MustParseRange("<9.2.0"), + ` + SELECT *, + (case pg_is_in_recovery() when 't' then null else pg_current_xlog_location() end) AS pg_current_xlog_location + FROM pg_stat_replication + `, + }, + }, + + "pg_replication_slots": { + { + semver.MustParseRange(">=9.4.0 <10.0.0"), + ` + SELECT slot_name, database, active, pg_xlog_location_diff(pg_current_xlog_location(), restart_lsn) + FROM pg_replication_slots + `, + }, + { + semver.MustParseRange(">=10.0.0"), + ` + SELECT slot_name, database, active, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) + FROM pg_replication_slots + `, + }, + }, + + "pg_stat_archiver": { + { + semver.MustParseRange(">=0.0.0"), + ` + SELECT *, + extract(epoch from now() - last_archived_time) AS last_archive_age + FROM pg_stat_archiver + `, + }, + }, + + "pg_stat_activity": { + // This query only works + { + semver.MustParseRange(">=9.2.0"), + ` + SELECT + pg_database.datname, + tmp.state, + COALESCE(count,0) as count, + COALESCE(max_tx_duration,0) as max_tx_duration + FROM + ( + VALUES ('active'), + ('idle'), + ('idle in transaction'), + ('idle in transaction (aborted)'), + ('fastpath function call'), + ('disabled') + ) AS tmp(state) CROSS JOIN pg_database + LEFT JOIN + ( + SELECT + datname, + state, + count(*) AS count, + MAX(EXTRACT(EPOCH FROM now() - xact_start))::float AS max_tx_duration + FROM pg_stat_activity GROUP BY datname,state) AS tmp2 + ON tmp.state = tmp2.state AND pg_database.datname = tmp2.datname + `, + }, + { + semver.MustParseRange("<9.2.0"), + ` + SELECT + datname, + 'unknown' AS state, + COALESCE(count(*),0) AS count, + COALESCE(MAX(EXTRACT(EPOCH FROM now() - xact_start))::float,0) AS max_tx_duration + FROM pg_stat_activity GROUP BY datname + `, + }, + }, +} + +// Convert the query override file to the version-specific query override file +// for the exporter. +func makeQueryOverrideMap(pgVersion semver.Version, queryOverrides map[string][]OverrideQuery) map[string]string { + resultMap := make(map[string]string) + for name, overrideDef := range queryOverrides { + // Find a matching semver. We make it an error to have overlapping + // ranges at test-time, so only 1 should ever match. + matched := false + for _, queryDef := range overrideDef { + if queryDef.versionRange(pgVersion) { + resultMap[name] = queryDef.query + matched = true + break + } + } + if !matched { + level.Warn(logger).Log("msg", "No query matched override, disabling metric space", "name", name) + resultMap[name] = "" + } + } + + return resultMap +} + +func parseUserQueries(content []byte) (map[string]intermediateMetricMap, map[string]string, error) { + var userQueries UserQueries + + err := yaml.Unmarshal(content, &userQueries) + if err != nil { + return nil, nil, err + } + + // Stores the loaded map representation + metricMaps := make(map[string]intermediateMetricMap) + newQueryOverrides := make(map[string]string) + + for metric, specs := range userQueries { + level.Debug(logger).Log("msg", "New user metric namespace from YAML metric", "metric", metric, "cache_seconds", specs.CacheSeconds) + newQueryOverrides[metric] = specs.Query + metricMap, ok := metricMaps[metric] + if !ok { + // Namespace for metric not found - add it. + newMetricMap := make(map[string]ColumnMapping) + metricMap = intermediateMetricMap{ + columnMappings: newMetricMap, + master: specs.Master, + cacheSeconds: specs.CacheSeconds, + } + metricMaps[metric] = metricMap + } + for _, metric := range specs.Metrics { + for name, mappingOption := range metric { + var columnMapping ColumnMapping + tmpUsage, _ := stringToColumnUsage(mappingOption.Usage) + columnMapping.usage = tmpUsage + columnMapping.description = mappingOption.Description + + // TODO: we should support cu + columnMapping.mapping = nil + // Should we support this for users? + columnMapping.supportedVersions = nil + + metricMap.columnMappings[name] = columnMapping + } + } + } + return metricMaps, newQueryOverrides, nil +} + +// Add queries to the builtinMetricMaps and queryOverrides maps. Added queries do not +// respect version requirements, because it is assumed that the user knows +// what they are doing with their version of postgres. +// +// This function modifies metricMap and queryOverrideMap to contain the new +// queries. +// TODO: test code for all cu. +// TODO: the YAML this supports is "non-standard" - we should move away from it. +func addQueries(content []byte, pgVersion semver.Version, server *Server) error { + metricMaps, newQueryOverrides, err := parseUserQueries(content) + if err != nil { + return err + } + // Convert the loaded metric map into exporter representation + partialExporterMap := makeDescMap(pgVersion, server.labels, metricMaps) + + // Merge the two maps (which are now quite flatteend) + for k, v := range partialExporterMap { + _, found := server.metricMap[k] + if found { + level.Debug(logger).Log("msg", "Overriding metric from user YAML file", "metric", k) + } else { + level.Debug(logger).Log("msg", "Adding new metric from user YAML file", "metric", k) + } + server.metricMap[k] = v + } + + // Merge the query override map + for k, v := range newQueryOverrides { + _, found := server.queryOverrides[k] + if found { + level.Debug(logger).Log("msg", "Overriding query override from user YAML file", "query_override", k) + } else { + level.Debug(logger).Log("msg", "Adding new query override from user YAML file", "query_override", k) + } + server.queryOverrides[k] = v + } + return nil +} + +func queryDatabases(server *Server) ([]string, error) { + rows, err := server.db.Query("SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database()") + if err != nil { + return nil, fmt.Errorf("Error retrieving databases: %v", err) + } + defer rows.Close() // nolint: errcheck + + var databaseName string + result := make([]string, 0) + for rows.Next() { + err = rows.Scan(&databaseName) + if err != nil { + return nil, errors.New(fmt.Sprintln("Error retrieving rows:", err)) + } + result = append(result, databaseName) + } + + return result, nil +} diff --git a/cmd/postgres_exporter/server.go b/cmd/postgres_exporter/server.go new file mode 100644 index 00000000..cf2469cc --- /dev/null +++ b/cmd/postgres_exporter/server.go @@ -0,0 +1,190 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "database/sql" + "fmt" + "sync" + "time" + + "github.com/blang/semver" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" +) + +// Server describes a connection to Postgres. +// Also it contains metrics map and query overrides. +type Server struct { + db *sql.DB + labels prometheus.Labels + master bool + runonserver string + + // Last version used to calculate metric map. If mismatch on scrape, + // then maps are recalculated. + lastMapVersion semver.Version + // Currently active metric map + metricMap map[string]MetricMapNamespace + // Currently active query overrides + queryOverrides map[string]string + mappingMtx sync.RWMutex + // Currently cached metrics + metricCache map[string]cachedMetrics + cacheMtx sync.Mutex +} + +// ServerOpt configures a server. +type ServerOpt func(*Server) + +// ServerWithLabels configures a set of labels. +func ServerWithLabels(labels prometheus.Labels) ServerOpt { + return func(s *Server) { + for k, v := range labels { + s.labels[k] = v + } + } +} + +// NewServer establishes a new connection using DSN. +func NewServer(dsn string, opts ...ServerOpt) (*Server, error) { + fingerprint, err := parseFingerprint(dsn) + if err != nil { + return nil, err + } + + db, err := sql.Open("postgres", dsn) + if err != nil { + return nil, err + } + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + + level.Info(logger).Log("msg", "Established new database connection", "fingerprint", fingerprint) + + s := &Server{ + db: db, + master: false, + labels: prometheus.Labels{ + serverLabelName: fingerprint, + }, + metricCache: make(map[string]cachedMetrics), + } + + for _, opt := range opts { + opt(s) + } + + return s, nil +} + +// Close disconnects from Postgres. +func (s *Server) Close() error { + return s.db.Close() +} + +// Ping checks connection availability and possibly invalidates the connection if it fails. +func (s *Server) Ping() error { + if err := s.db.Ping(); err != nil { + if cerr := s.Close(); cerr != nil { + level.Error(logger).Log("msg", "Error while closing non-pinging DB connection", "server", s, "err", cerr) + } + return err + } + return nil +} + +// String returns server's fingerprint. +func (s *Server) String() string { + return s.labels[serverLabelName] +} + +// Scrape loads metrics. +func (s *Server) Scrape(ch chan<- prometheus.Metric, disableSettingsMetrics bool) error { + s.mappingMtx.RLock() + defer s.mappingMtx.RUnlock() + + var err error + + if !disableSettingsMetrics && s.master { + if err = querySettings(ch, s); err != nil { + err = fmt.Errorf("error retrieving settings: %s", err) + } + } + + errMap := queryNamespaceMappings(ch, s) + if len(errMap) > 0 { + err = fmt.Errorf("queryNamespaceMappings returned %d errors", len(errMap)) + } + + return err +} + +// Servers contains a collection of servers to Postgres. +type Servers struct { + m sync.Mutex + servers map[string]*Server + opts []ServerOpt +} + +// NewServers creates a collection of servers to Postgres. +func NewServers(opts ...ServerOpt) *Servers { + return &Servers{ + servers: make(map[string]*Server), + opts: opts, + } +} + +// GetServer returns established connection from a collection. +func (s *Servers) GetServer(dsn string) (*Server, error) { + s.m.Lock() + defer s.m.Unlock() + var err error + var ok bool + errCount := 0 // start at zero because we increment before doing work + retries := 1 + var server *Server + for { + if errCount++; errCount > retries { + return nil, err + } + server, ok = s.servers[dsn] + if !ok { + server, err = NewServer(dsn, s.opts...) + if err != nil { + time.Sleep(time.Duration(errCount) * time.Second) + continue + } + s.servers[dsn] = server + } + if err = server.Ping(); err != nil { + delete(s.servers, dsn) + time.Sleep(time.Duration(errCount) * time.Second) + continue + } + break + } + return server, nil +} + +// Close disconnects from all known servers. +func (s *Servers) Close() { + s.m.Lock() + defer s.m.Unlock() + for _, server := range s.servers { + if err := server.Close(); err != nil { + level.Error(logger).Log("msg", "Failed to close connection", "server", server, "err", err) + } + } +} diff --git a/cmd/postgres_exporter/util.go b/cmd/postgres_exporter/util.go new file mode 100644 index 00000000..5ecb5ec7 --- /dev/null +++ b/cmd/postgres_exporter/util.go @@ -0,0 +1,218 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "math" + "net/url" + "strconv" + "strings" + "time" + + "github.com/go-kit/kit/log/level" + "github.com/lib/pq" +) + +func contains(a []string, x string) bool { + for _, n := range a { + if x == n { + return true + } + } + return false +} + +// convert a string to the corresponding ColumnUsage +func stringToColumnUsage(s string) (ColumnUsage, error) { + var u ColumnUsage + var err error + switch s { + case "DISCARD": + u = DISCARD + + case "LABEL": + u = LABEL + + case "COUNTER": + u = COUNTER + + case "GAUGE": + u = GAUGE + + case "HISTOGRAM": + u = HISTOGRAM + + case "MAPPEDMETRIC": + u = MAPPEDMETRIC + + case "DURATION": + u = DURATION + + default: + err = fmt.Errorf("wrong ColumnUsage given : %s", s) + } + + return u, err +} + +// Convert database.sql types to float64s for Prometheus consumption. Null types are mapped to NaN. string and []byte +// types are mapped as NaN and !ok +func dbToFloat64(t interface{}) (float64, bool) { + switch v := t.(type) { + case int64: + return float64(v), true + case float64: + return v, true + case time.Time: + return float64(v.Unix()), true + case []byte: + // Try and convert to string and then parse to a float64 + strV := string(v) + result, err := strconv.ParseFloat(strV, 64) + if err != nil { + level.Info(logger).Log("msg", "Could not parse []byte", "err", err) + return math.NaN(), false + } + return result, true + case string: + result, err := strconv.ParseFloat(v, 64) + if err != nil { + level.Info(logger).Log("msg", "Could not parse string", "err", err) + return math.NaN(), false + } + return result, true + case bool: + if v { + return 1.0, true + } + return 0.0, true + case nil: + return math.NaN(), true + default: + return math.NaN(), false + } +} + +// Convert database.sql types to uint64 for Prometheus consumption. Null types are mapped to 0. string and []byte +// types are mapped as 0 and !ok +func dbToUint64(t interface{}) (uint64, bool) { + switch v := t.(type) { + case uint64: + return v, true + case int64: + return uint64(v), true + case float64: + return uint64(v), true + case time.Time: + return uint64(v.Unix()), true + case []byte: + // Try and convert to string and then parse to a uint64 + strV := string(v) + result, err := strconv.ParseUint(strV, 10, 64) + if err != nil { + level.Info(logger).Log("msg", "Could not parse []byte", "err", err) + return 0, false + } + return result, true + case string: + result, err := strconv.ParseUint(v, 10, 64) + if err != nil { + level.Info(logger).Log("msg", "Could not parse string", "err", err) + return 0, false + } + return result, true + case bool: + if v { + return 1, true + } + return 0, true + case nil: + return 0, true + default: + return 0, false + } +} + +// Convert database.sql to string for Prometheus labels. Null types are mapped to empty strings. +func dbToString(t interface{}) (string, bool) { + switch v := t.(type) { + case int64: + return fmt.Sprintf("%v", v), true + case float64: + return fmt.Sprintf("%v", v), true + case time.Time: + return fmt.Sprintf("%v", v.Unix()), true + case nil: + return "", true + case []byte: + // Try and convert to string + return string(v), true + case string: + return v, true + case bool: + if v { + return "true", true + } + return "false", true + default: + return "", false + } +} + +func parseFingerprint(url string) (string, error) { + dsn, err := pq.ParseURL(url) + if err != nil { + dsn = url + } + + pairs := strings.Split(dsn, " ") + kv := make(map[string]string, len(pairs)) + for _, pair := range pairs { + splitted := strings.SplitN(pair, "=", 2) + if len(splitted) != 2 { + return "", fmt.Errorf("malformed dsn %q", dsn) + } + kv[splitted[0]] = splitted[1] + } + + var fingerprint string + + if host, ok := kv["host"]; ok { + fingerprint += host + } else { + fingerprint += "localhost" + } + + if port, ok := kv["port"]; ok { + fingerprint += ":" + port + } else { + fingerprint += ":5432" + } + + return fingerprint, nil +} + +func loggableDSN(dsn string) string { + pDSN, err := url.Parse(dsn) + if err != nil { + return "could not parse DATA_SOURCE_NAME" + } + // Blank user info if not nil + if pDSN.User != nil { + pDSN.User = url.UserPassword(pDSN.User.Username(), "PASSWORD_REMOVED") + } + + return pDSN.String() +}