mirror of
https://github.com/prometheus-community/postgres_exporter
synced 2025-04-10 11:11:25 +00:00
BREAKING CHANGES: This release changes support for multiple postgres servers to use the multi-target exporter pattern. This makes it much easier to monitor multiple PostgreSQL servers from a single exporter by passing the target via URL params. See the Multi-Target Support section of the README. * [CHANGE] Add multi-target support #618 * [BUGFIX] Add dsn type for handling datasources #678 Signed-off-by: SuperQ <superq@gmail.com>
265 lines
8.6 KiB
Go
265 lines
8.6 KiB
Go
// Copyright 2021 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package main
|
|
|
|
import (
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/blang/semver/v4"
|
|
"github.com/go-kit/log/level"
|
|
"github.com/lib/pq"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
// Query within a namespace mapping and emit metrics. Returns fatal errors if
|
|
// the scrape fails, and a slice of errors if they were non-fatal.
|
|
func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNamespace) ([]prometheus.Metric, []error, error) {
|
|
// Check for a query override for this namespace
|
|
query, found := server.queryOverrides[namespace]
|
|
|
|
// Was this query disabled (i.e. nothing sensible can be queried on cu
|
|
// version of PostgreSQL?
|
|
if query == "" && found {
|
|
// Return success (no pertinent data)
|
|
return []prometheus.Metric{}, []error{}, nil
|
|
}
|
|
|
|
// Don't fail on a bad scrape of one metric
|
|
var rows *sql.Rows
|
|
var err error
|
|
|
|
if !found {
|
|
// I've no idea how to avoid this properly at the moment, but this is
|
|
// an admin tool so you're not injecting SQL right?
|
|
rows, err = server.db.Query(fmt.Sprintf("SELECT * FROM %s;", namespace)) // nolint: gas
|
|
} else {
|
|
rows, err = server.db.Query(query)
|
|
}
|
|
if err != nil {
|
|
return []prometheus.Metric{}, []error{}, fmt.Errorf("Error running query on database %q: %s %v", server, namespace, err)
|
|
}
|
|
defer rows.Close() // nolint: errcheck
|
|
|
|
var columnNames []string
|
|
columnNames, err = rows.Columns()
|
|
if err != nil {
|
|
return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving column list for: ", namespace, err))
|
|
}
|
|
|
|
// Make a lookup map for the column indices
|
|
var columnIdx = make(map[string]int, len(columnNames))
|
|
for i, n := range columnNames {
|
|
columnIdx[n] = i
|
|
}
|
|
|
|
var columnData = make([]interface{}, len(columnNames))
|
|
var scanArgs = make([]interface{}, len(columnNames))
|
|
for i := range columnData {
|
|
scanArgs[i] = &columnData[i]
|
|
}
|
|
|
|
nonfatalErrors := []error{}
|
|
|
|
metrics := make([]prometheus.Metric, 0)
|
|
|
|
for rows.Next() {
|
|
err = rows.Scan(scanArgs...)
|
|
if err != nil {
|
|
return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving rows:", namespace, err))
|
|
}
|
|
|
|
// Get the label values for this row.
|
|
labels := make([]string, len(mapping.labels))
|
|
for idx, label := range mapping.labels {
|
|
labels[idx], _ = dbToString(columnData[columnIdx[label]])
|
|
}
|
|
|
|
// Loop over column names, and match to scan data. Unknown columns
|
|
// will be filled with an untyped metric number *if* they can be
|
|
// converted to float64s. NULLs are allowed and treated as NaN.
|
|
for idx, columnName := range columnNames {
|
|
var metric prometheus.Metric
|
|
if metricMapping, ok := mapping.columnMappings[columnName]; ok {
|
|
// Is this a metricy metric?
|
|
if metricMapping.discard {
|
|
continue
|
|
}
|
|
|
|
if metricMapping.histogram {
|
|
var keys []float64
|
|
err = pq.Array(&keys).Scan(columnData[idx])
|
|
if err != nil {
|
|
return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving", columnName, "buckets:", namespace, err))
|
|
}
|
|
|
|
var values []int64
|
|
valuesIdx, ok := columnIdx[columnName+"_bucket"]
|
|
if !ok {
|
|
nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Missing column: ", namespace, columnName+"_bucket")))
|
|
continue
|
|
}
|
|
err = pq.Array(&values).Scan(columnData[valuesIdx])
|
|
if err != nil {
|
|
return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving", columnName, "bucket values:", namespace, err))
|
|
}
|
|
|
|
buckets := make(map[float64]uint64, len(keys))
|
|
for i, key := range keys {
|
|
if i >= len(values) {
|
|
break
|
|
}
|
|
buckets[key] = uint64(values[i])
|
|
}
|
|
|
|
idx, ok = columnIdx[columnName+"_sum"]
|
|
if !ok {
|
|
nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Missing column: ", namespace, columnName+"_sum")))
|
|
continue
|
|
}
|
|
sum, ok := dbToFloat64(columnData[idx])
|
|
if !ok {
|
|
nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName+"_sum", columnData[idx])))
|
|
continue
|
|
}
|
|
|
|
idx, ok = columnIdx[columnName+"_count"]
|
|
if !ok {
|
|
nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Missing column: ", namespace, columnName+"_count")))
|
|
continue
|
|
}
|
|
count, ok := dbToUint64(columnData[idx])
|
|
if !ok {
|
|
nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName+"_count", columnData[idx])))
|
|
continue
|
|
}
|
|
|
|
metric = prometheus.MustNewConstHistogram(
|
|
metricMapping.desc,
|
|
count, sum, buckets,
|
|
labels...,
|
|
)
|
|
} else {
|
|
value, ok := dbToFloat64(columnData[idx])
|
|
if !ok {
|
|
nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName, columnData[idx])))
|
|
continue
|
|
}
|
|
// Generate the metric
|
|
metric = prometheus.MustNewConstMetric(metricMapping.desc, metricMapping.vtype, value, labels...)
|
|
}
|
|
} else {
|
|
// Unknown metric. Report as untyped if scan to float64 works, else note an error too.
|
|
metricLabel := fmt.Sprintf("%s_%s", namespace, columnName)
|
|
desc := prometheus.NewDesc(metricLabel, fmt.Sprintf("Unknown metric from %s", namespace), mapping.labels, server.labels)
|
|
|
|
// Its not an error to fail here, since the values are
|
|
// unexpected anyway.
|
|
value, ok := dbToFloat64(columnData[idx])
|
|
if !ok {
|
|
nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unparseable column type - discarding: ", namespace, columnName, err)))
|
|
continue
|
|
}
|
|
metric = prometheus.MustNewConstMetric(desc, prometheus.UntypedValue, value, labels...)
|
|
}
|
|
metrics = append(metrics, metric)
|
|
}
|
|
}
|
|
return metrics, nonfatalErrors, nil
|
|
}
|
|
|
|
// Iterate through all the namespace mappings in the exporter and run their
|
|
// queries.
|
|
func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[string]error {
|
|
// Return a map of namespace -> errors
|
|
namespaceErrors := make(map[string]error)
|
|
|
|
scrapeStart := time.Now()
|
|
|
|
for namespace, mapping := range server.metricMap {
|
|
level.Debug(logger).Log("msg", "Querying namespace", "namespace", namespace)
|
|
|
|
if mapping.master && !server.master {
|
|
level.Debug(logger).Log("msg", "Query skipped...")
|
|
continue
|
|
}
|
|
|
|
// check if the query is to be run on specific database server version range or not
|
|
if len(server.runonserver) > 0 {
|
|
serVersion, _ := semver.Parse(server.lastMapVersion.String())
|
|
runServerRange, _ := semver.ParseRange(server.runonserver)
|
|
if !runServerRange(serVersion) {
|
|
level.Debug(logger).Log("msg", "Query skipped for this database version", "version", server.lastMapVersion.String(), "target_version", server.runonserver)
|
|
continue
|
|
}
|
|
}
|
|
|
|
scrapeMetric := false
|
|
// Check if the metric is cached
|
|
server.cacheMtx.Lock()
|
|
cachedMetric, found := server.metricCache[namespace]
|
|
server.cacheMtx.Unlock()
|
|
// If found, check if needs refresh from cache
|
|
if found {
|
|
if scrapeStart.Sub(cachedMetric.lastScrape).Seconds() > float64(mapping.cacheSeconds) {
|
|
scrapeMetric = true
|
|
}
|
|
} else {
|
|
scrapeMetric = true
|
|
}
|
|
|
|
var metrics []prometheus.Metric
|
|
var nonFatalErrors []error
|
|
var err error
|
|
if scrapeMetric {
|
|
metrics, nonFatalErrors, err = queryNamespaceMapping(server, namespace, mapping)
|
|
} else {
|
|
metrics = cachedMetric.metrics
|
|
}
|
|
|
|
// Serious error - a namespace disappeared
|
|
if err != nil {
|
|
namespaceErrors[namespace] = err
|
|
level.Info(logger).Log("err", err)
|
|
}
|
|
// Non-serious errors - likely version or parsing problems.
|
|
if len(nonFatalErrors) > 0 {
|
|
for _, err := range nonFatalErrors {
|
|
level.Info(logger).Log("err", err)
|
|
}
|
|
}
|
|
|
|
// Emit the metrics into the channel
|
|
for _, metric := range metrics {
|
|
ch <- metric
|
|
}
|
|
|
|
if scrapeMetric {
|
|
// Only cache if metric is meaningfully cacheable
|
|
if mapping.cacheSeconds > 0 {
|
|
server.cacheMtx.Lock()
|
|
server.metricCache[namespace] = cachedMetrics{
|
|
metrics: metrics,
|
|
lastScrape: scrapeStart,
|
|
}
|
|
server.cacheMtx.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
return namespaceErrors
|
|
}
|