mirror of
https://github.com/prometheus-community/postgres_exporter
synced 2025-04-26 21:18:02 +00:00
PMM-12893 Use rolling strategy for connection utilization
This commit is contained in:
parent
8210082259
commit
ae81fb7c19
@ -49,19 +49,8 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
|
||||
continue
|
||||
}
|
||||
|
||||
server, err := e.servers.GetServer(dsn)
|
||||
databaseNames, err := e.getDatabaseNames(dsn)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err)
|
||||
continue
|
||||
}
|
||||
dsns[dsn] = struct{}{}
|
||||
|
||||
// If autoDiscoverDatabases is true, set first dsn as master database (Default: false)
|
||||
server.master = true
|
||||
|
||||
databaseNames, err := queryDatabases(server)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err)
|
||||
continue
|
||||
}
|
||||
for _, databaseName := range databaseNames {
|
||||
@ -99,11 +88,40 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
|
||||
return result
|
||||
}
|
||||
|
||||
func (e *Exporter) getDatabaseNames(dsn string) ([]string, error) {
|
||||
if e.connSema != nil {
|
||||
if err := e.connSema.Acquire(e.ctx, 1); err != nil {
|
||||
level.Warn(logger).Log("msg", "Failed to acquire semaphore", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
defer e.connSema.Release(1)
|
||||
}
|
||||
|
||||
server, err := e.GetServer(dsn)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err)
|
||||
return nil, err // TODO
|
||||
}
|
||||
defer server.Close()
|
||||
|
||||
// If autoDiscoverDatabases is true, set first dsn as master database (Default: false)
|
||||
server.master = true
|
||||
|
||||
dbNames, err := queryDatabases(e.ctx, server)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return dbNames, nil
|
||||
}
|
||||
|
||||
func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error {
|
||||
server, err := e.servers.GetServer(dsn)
|
||||
server, err := e.GetServer(dsn)
|
||||
if err != nil {
|
||||
return err // TODO
|
||||
}
|
||||
defer server.Close()
|
||||
|
||||
level.Debug(logger).Log("msg", "scrapeDSN:"+dsn)
|
||||
|
||||
|
@ -23,16 +23,15 @@ import (
|
||||
"github.com/alecthomas/kingpin/v2"
|
||||
"github.com/go-kit/log"
|
||||
"github.com/go-kit/log/level"
|
||||
"github.com/prometheus-community/postgres_exporter/collector"
|
||||
"github.com/prometheus-community/postgres_exporter/config"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/collectors"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/prometheus/common/promlog"
|
||||
"github.com/prometheus/common/promlog/flag"
|
||||
"github.com/prometheus/common/version"
|
||||
"github.com/prometheus/exporter-toolkit/web"
|
||||
"github.com/prometheus/exporter-toolkit/web/kingpinflag"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -114,61 +113,18 @@ func main() {
|
||||
level.Warn(logger).Log("msg", "Constant labels on all metrics is DEPRECATED")
|
||||
}
|
||||
|
||||
opts := []ExporterOpt{
|
||||
DisableDefaultMetrics(*disableDefaultMetrics),
|
||||
DisableSettingsMetrics(*disableSettingsMetrics),
|
||||
AutoDiscoverDatabases(*autoDiscoverDatabases),
|
||||
WithConstantLabels(*constantLabelsList),
|
||||
ExcludeDatabases(excludedDatabases),
|
||||
IncludeDatabases(*includeDatabases),
|
||||
}
|
||||
|
||||
exporter := NewExporter(dsns, opts...)
|
||||
defer func() {
|
||||
exporter.servers.Close()
|
||||
}()
|
||||
|
||||
versionCollector := version.NewCollector(exporterName)
|
||||
prometheus.MustRegister(versionCollector)
|
||||
|
||||
prometheus.MustRegister(exporter)
|
||||
|
||||
// TODO(@sysadmind): Remove this with multi-target support. We are removing multiple DSN support
|
||||
dsn := ""
|
||||
if len(dsns) > 0 {
|
||||
dsn = dsns[0]
|
||||
}
|
||||
|
||||
cleanup, hr, mr, lr := initializePerconaExporters(dsns)
|
||||
defer cleanup()
|
||||
|
||||
pe, err := collector.NewPostgresCollector(
|
||||
logger,
|
||||
excludedDatabases,
|
||||
dsn,
|
||||
[]string{},
|
||||
)
|
||||
if err != nil {
|
||||
level.Warn(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error())
|
||||
} else {
|
||||
prometheus.MustRegister(pe)
|
||||
}
|
||||
|
||||
psCollector := collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})
|
||||
goCollector := collectors.NewGoCollector()
|
||||
|
||||
promHandler := newHandler(map[string]prometheus.Collector{
|
||||
"exporter": exporter,
|
||||
"custom_query.hr": hr,
|
||||
"custom_query.mr": mr,
|
||||
"custom_query.lr": lr,
|
||||
globalCollectors := map[string]prometheus.Collector{
|
||||
"standard.process": psCollector,
|
||||
"standard.go": goCollector,
|
||||
"version": versionCollector,
|
||||
"postgres": pe,
|
||||
})
|
||||
}
|
||||
|
||||
http.Handle(*metricsPath, promHandler)
|
||||
connSema := semaphore.NewWeighted(5)
|
||||
http.Handle(*metricsPath, Handler(logger, dsns, connSema, globalCollectors))
|
||||
|
||||
if *metricsPath != "/" && *metricsPath != "" {
|
||||
landingConfig := web.LandingConfig{
|
||||
@ -190,7 +146,7 @@ func main() {
|
||||
http.Handle("/", landingPage)
|
||||
}
|
||||
|
||||
http.HandleFunc("/probe", handleProbe(logger, excludedDatabases))
|
||||
http.HandleFunc("/probe", handleProbe(logger, excludedDatabases, connSema))
|
||||
|
||||
level.Info(logger).Log("msg", "Listening on address", "address", *webConfig.WebListenAddresses)
|
||||
srv := &http.Server{}
|
||||
@ -199,80 +155,3 @@ func main() {
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
// handler wraps an unfiltered http.Handler but uses a filtered handler,
|
||||
// created on the fly, if filtering is requested. Create instances with
|
||||
// newHandler. It used for collectors filtering.
|
||||
type handler struct {
|
||||
unfilteredHandler http.Handler
|
||||
collectors map[string]prometheus.Collector
|
||||
}
|
||||
|
||||
func newHandler(collectors map[string]prometheus.Collector) *handler {
|
||||
h := &handler{collectors: collectors}
|
||||
|
||||
innerHandler, err := h.innerHandler()
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "Couldn't create metrics handler", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
h.unfilteredHandler = innerHandler
|
||||
return h
|
||||
}
|
||||
|
||||
// ServeHTTP implements http.Handler.
|
||||
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
filters := r.URL.Query()["collect[]"]
|
||||
level.Debug(logger).Log("msg", "Collect query", "filters", filters)
|
||||
|
||||
if len(filters) == 0 {
|
||||
// No filters, use the prepared unfiltered handler.
|
||||
h.unfilteredHandler.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
filteredHandler, err := h.innerHandler(filters...)
|
||||
if err != nil {
|
||||
level.Warn(logger).Log("msg", "Couldn't create filtered metrics handler", "error", err)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
w.Write([]byte(fmt.Sprintf("Couldn't create filtered metrics handler: %s", err))) // nolint: errcheck
|
||||
return
|
||||
}
|
||||
|
||||
filteredHandler.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
func (h *handler) innerHandler(filters ...string) (http.Handler, error) {
|
||||
registry := prometheus.NewRegistry()
|
||||
|
||||
// register all collectors by default.
|
||||
if len(filters) == 0 {
|
||||
for name, c := range h.collectors {
|
||||
if err := registry.Register(c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
level.Debug(logger).Log("msg", "Collector was registered", "collector", name)
|
||||
}
|
||||
}
|
||||
|
||||
// register only filtered collectors.
|
||||
for _, name := range filters {
|
||||
if c, ok := h.collectors[name]; ok {
|
||||
if err := registry.Register(c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
level.Debug(logger).Log("msg", "Collector was registered", "collector", name)
|
||||
}
|
||||
}
|
||||
|
||||
handler := promhttp.HandlerFor(
|
||||
registry,
|
||||
promhttp.HandlerOpts{
|
||||
// ErrorLog: log.NewNopLogger() .NewErrorLogger(),
|
||||
ErrorHandling: promhttp.ContinueOnError,
|
||||
},
|
||||
)
|
||||
|
||||
return handler, nil
|
||||
}
|
||||
|
@ -1,16 +1,24 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/alecthomas/kingpin/v2"
|
||||
"github.com/blang/semver/v4"
|
||||
"github.com/go-kit/log"
|
||||
"github.com/go-kit/log/level"
|
||||
"github.com/prometheus-community/postgres_exporter/collector"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
type MetricResolution string
|
||||
@ -31,56 +39,180 @@ var (
|
||||
collectCustomQueryHrDirectory = kingpin.Flag("collect.custom_query.hr.directory", "Path to custom queries with high resolution directory.").Envar("PG_EXPORTER_EXTEND_QUERY_HR_PATH").String()
|
||||
)
|
||||
|
||||
func initializePerconaExporters(dsn []string) (func(), *Exporter, *Exporter, *Exporter) {
|
||||
// Handler returns a http.Handler that serves metrics. Can be used instead of
|
||||
// run for hooking up custom HTTP servers.
|
||||
func Handler(logger log.Logger, dsns []string, connSema *semaphore.Weighted, globalCollectors map[string]prometheus.Collector) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
seconds, err := strconv.Atoi(r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds"))
|
||||
// To support also older ones vmagents.
|
||||
if err != nil {
|
||||
seconds = 10
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.Context(), time.Duration(seconds)*time.Second)
|
||||
defer cancel()
|
||||
|
||||
filters := r.URL.Query()["collect[]"]
|
||||
level.Debug(logger).Log("msg", "Collect query", "filters", filters)
|
||||
|
||||
var f Filters
|
||||
if len(filters) == 0 {
|
||||
f.EnableAllCollectors = true
|
||||
} else {
|
||||
for _, filter := range filters {
|
||||
switch filter {
|
||||
case "standard.process":
|
||||
f.EnableProcessCollector = true
|
||||
case "standard.go":
|
||||
f.EnableGoCollector = true
|
||||
case "standard.version":
|
||||
f.EnableVersionCollector = true
|
||||
case "standard.default":
|
||||
f.EnableDefaultCollector = true
|
||||
case "standard.hr":
|
||||
f.EnableHRCollector = true
|
||||
case "standard.mr":
|
||||
f.EnableMRCollector = true
|
||||
case "standard.lr":
|
||||
f.EnableLRCollector = true
|
||||
case "postgres":
|
||||
f.EnablePostgresCollector = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
registry := makeRegistry(ctx, dsns, connSema, globalCollectors, f)
|
||||
|
||||
// Delegate http serving to Prometheus client library, which will call collector.Collect.
|
||||
h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{
|
||||
ErrorHandling: promhttp.ContinueOnError,
|
||||
// ErrorLog: logger, //TODO!!!
|
||||
})
|
||||
|
||||
h.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
// Filters is a struct to enable or disable collectors.
|
||||
type Filters struct {
|
||||
EnableAllCollectors bool
|
||||
EnableLRCollector bool
|
||||
EnableMRCollector bool
|
||||
EnableHRCollector bool
|
||||
EnableDefaultCollector bool
|
||||
EnableGoCollector bool
|
||||
EnableVersionCollector bool
|
||||
EnableProcessCollector bool
|
||||
EnablePostgresCollector bool
|
||||
}
|
||||
|
||||
// makeRegistry creates a new prometheus registry with default and percona exporters.
|
||||
func makeRegistry(ctx context.Context, dsns []string, connSema *semaphore.Weighted, globlalCollectors map[string]prometheus.Collector, filters Filters) *prometheus.Registry {
|
||||
registry := prometheus.NewRegistry()
|
||||
|
||||
excludedDatabases := strings.Split(*excludeDatabases, ",")
|
||||
logger.Log("msg", "Excluded databases", "databases", fmt.Sprintf("%v", excludedDatabases))
|
||||
|
||||
queriesPath := map[MetricResolution]string{
|
||||
HR: *collectCustomQueryHrDirectory,
|
||||
MR: *collectCustomQueryMrDirectory,
|
||||
LR: *collectCustomQueryLrDirectory,
|
||||
}
|
||||
|
||||
excludedDatabases := strings.Split(*excludeDatabases, ",")
|
||||
opts := []ExporterOpt{
|
||||
DisableDefaultMetrics(true),
|
||||
DisableSettingsMetrics(true),
|
||||
AutoDiscoverDatabases(*autoDiscoverDatabases),
|
||||
WithUserQueriesPath(queriesPath),
|
||||
WithConstantLabels(*constantLabelsList),
|
||||
ExcludeDatabases(excludedDatabases),
|
||||
WithConnectionsSemaphore(connSema),
|
||||
WithContext(ctx),
|
||||
}
|
||||
hrExporter := NewExporter(dsn,
|
||||
append(opts,
|
||||
CollectorName("custom_query.hr"),
|
||||
WithUserQueriesEnabled(HR),
|
||||
WithEnabled(*collectCustomQueryHr),
|
||||
WithConstantLabels(*constantLabelsList),
|
||||
)...,
|
||||
)
|
||||
prometheus.MustRegister(hrExporter)
|
||||
|
||||
mrExporter := NewExporter(dsn,
|
||||
append(opts,
|
||||
CollectorName("custom_query.mr"),
|
||||
WithUserQueriesEnabled(MR),
|
||||
WithEnabled(*collectCustomQueryMr),
|
||||
WithConstantLabels(*constantLabelsList),
|
||||
)...,
|
||||
)
|
||||
prometheus.MustRegister(mrExporter)
|
||||
if filters.EnableAllCollectors || filters.EnableDefaultCollector {
|
||||
defaultExporter := NewExporter(dsns, append(
|
||||
opts,
|
||||
DisableDefaultMetrics(*disableDefaultMetrics),
|
||||
DisableSettingsMetrics(*disableSettingsMetrics),
|
||||
IncludeDatabases(*includeDatabases),
|
||||
)...)
|
||||
registry.MustRegister(defaultExporter)
|
||||
}
|
||||
|
||||
lrExporter := NewExporter(dsn,
|
||||
append(opts,
|
||||
CollectorName("custom_query.lr"),
|
||||
WithUserQueriesEnabled(LR),
|
||||
WithEnabled(*collectCustomQueryLr),
|
||||
WithConstantLabels(*constantLabelsList),
|
||||
)...,
|
||||
)
|
||||
prometheus.MustRegister(lrExporter)
|
||||
if filters.EnableAllCollectors || filters.EnableHRCollector {
|
||||
hrExporter := NewExporter(dsns,
|
||||
append(opts,
|
||||
CollectorName("custom_query.hr"),
|
||||
WithUserQueriesEnabled(HR),
|
||||
WithEnabled(*collectCustomQueryHr),
|
||||
DisableDefaultMetrics(true),
|
||||
DisableSettingsMetrics(true),
|
||||
WithUserQueriesPath(queriesPath),
|
||||
)...)
|
||||
registry.MustRegister(hrExporter)
|
||||
|
||||
return func() {
|
||||
hrExporter.servers.Close()
|
||||
mrExporter.servers.Close()
|
||||
lrExporter.servers.Close()
|
||||
}, hrExporter, mrExporter, lrExporter
|
||||
}
|
||||
|
||||
if filters.EnableAllCollectors || filters.EnableMRCollector {
|
||||
mrExporter := NewExporter(dsns,
|
||||
append(opts,
|
||||
CollectorName("custom_query.mr"),
|
||||
WithUserQueriesEnabled(MR),
|
||||
WithEnabled(*collectCustomQueryMr),
|
||||
DisableDefaultMetrics(true),
|
||||
DisableSettingsMetrics(true),
|
||||
WithUserQueriesPath(queriesPath),
|
||||
)...)
|
||||
registry.MustRegister(mrExporter)
|
||||
}
|
||||
|
||||
if filters.EnableAllCollectors || filters.EnableLRCollector {
|
||||
lrExporter := NewExporter(dsns,
|
||||
append(opts,
|
||||
CollectorName("custom_query.lr"),
|
||||
WithUserQueriesEnabled(LR),
|
||||
WithEnabled(*collectCustomQueryLr),
|
||||
DisableDefaultMetrics(true),
|
||||
DisableSettingsMetrics(true),
|
||||
WithUserQueriesPath(queriesPath),
|
||||
)...)
|
||||
registry.MustRegister(lrExporter)
|
||||
}
|
||||
|
||||
if filters.EnableAllCollectors || filters.EnableGoCollector {
|
||||
registry.MustRegister(globlalCollectors["standard.go"])
|
||||
}
|
||||
|
||||
if filters.EnableAllCollectors || filters.EnableProcessCollector {
|
||||
registry.MustRegister(globlalCollectors["standard.process"])
|
||||
}
|
||||
|
||||
if filters.EnableAllCollectors || filters.EnableVersionCollector {
|
||||
registry.MustRegister(globlalCollectors["version"])
|
||||
}
|
||||
|
||||
if filters.EnableAllCollectors || filters.EnablePostgresCollector {
|
||||
// This chunk moved here from main.go
|
||||
// TODO(@sysadmind): Remove this with multi-target support. We are removing multiple DSN support
|
||||
dsn := ""
|
||||
if len(dsns) > 0 {
|
||||
dsn = dsns[0]
|
||||
}
|
||||
|
||||
pe, err := collector.NewPostgresCollector(
|
||||
logger,
|
||||
excludedDatabases,
|
||||
dsn,
|
||||
[]string{},
|
||||
collector.WithContext(ctx),
|
||||
collector.WithConnectionsSemaphore(connSema),
|
||||
)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error())
|
||||
} else {
|
||||
registry.MustRegister(pe)
|
||||
}
|
||||
}
|
||||
|
||||
return registry
|
||||
}
|
||||
|
||||
func (e *Exporter) loadCustomQueries(res MetricResolution, version semver.Version, server *Server) {
|
||||
|
@ -14,17 +14,21 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/blang/semver/v4"
|
||||
"github.com/go-kit/log/level"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
// ColumnUsage should be one of several enum values which describe how a
|
||||
@ -432,7 +436,10 @@ type Exporter struct {
|
||||
|
||||
// servers are used to allow re-using the DB connection between scrapes.
|
||||
// servers contains metrics map and query overrides.
|
||||
servers *Servers
|
||||
// servers *Servers
|
||||
|
||||
connSema *semaphore.Weighted
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// ExporterOpt configures Exporter.
|
||||
@ -466,6 +473,20 @@ func WithEnabled(p bool) ExporterOpt {
|
||||
}
|
||||
}
|
||||
|
||||
// WithContext sets context for the exporter.
|
||||
func WithContext(ctx context.Context) ExporterOpt {
|
||||
return func(e *Exporter) {
|
||||
e.ctx = ctx
|
||||
}
|
||||
}
|
||||
|
||||
// WithConnectionsSemaphore sets the semaphore for limiting the number of connections to the database instance.
|
||||
func WithConnectionsSemaphore(sem *semaphore.Weighted) ExporterOpt {
|
||||
return func(e *Exporter) {
|
||||
e.connSema = sem
|
||||
}
|
||||
}
|
||||
|
||||
// DisableSettingsMetrics configures pg_settings export.
|
||||
func DisableSettingsMetrics(b bool) ExporterOpt {
|
||||
return func(e *Exporter) {
|
||||
@ -547,6 +568,7 @@ func NewExporter(dsn []string, opts ...ExporterOpt) *Exporter {
|
||||
dsn: dsn,
|
||||
builtinMetricMaps: builtinMetricMaps,
|
||||
enabled: true,
|
||||
ctx: context.Background(),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
@ -554,11 +576,38 @@ func NewExporter(dsn []string, opts ...ExporterOpt) *Exporter {
|
||||
}
|
||||
|
||||
e.setupInternalMetrics()
|
||||
e.servers = NewServers(ServerWithLabels(e.constantLabels))
|
||||
// e.servers = NewServers(ServerWithLabels(e.constantLabels))
|
||||
|
||||
return e
|
||||
}
|
||||
|
||||
// GetServer returns a new Server instance for the provided DSN.
|
||||
func (e *Exporter) GetServer(dsn string, opts ...ServerOpt) (*Server, error) {
|
||||
var err error
|
||||
errCount := 0 // start at zero because we increment before doing work
|
||||
retries := 1
|
||||
var server *Server
|
||||
for {
|
||||
if errCount++; errCount > retries {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
server, err = NewServer(dsn, opts...)
|
||||
if err != nil {
|
||||
time.Sleep(time.Duration(errCount) * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
if err = server.Ping(); err != nil {
|
||||
server.Close()
|
||||
time.Sleep(time.Duration(errCount) * time.Second)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
return server, nil
|
||||
}
|
||||
|
||||
func (e *Exporter) setupInternalMetrics() {
|
||||
e.duration = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
@ -697,29 +746,45 @@ func (e *Exporter) scrape(ch chan<- prometheus.Metric) {
|
||||
dsns = e.discoverDatabaseDSNs()
|
||||
}
|
||||
|
||||
var errorsCount int
|
||||
var connectionErrorsCount int
|
||||
var errorsCount atomic.Int32
|
||||
var connectionErrorsCount atomic.Int32
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, dsn := range dsns {
|
||||
if err := e.scrapeDSN(ch, dsn); err != nil {
|
||||
errorsCount++
|
||||
dsn := dsn
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
level.Error(logger).Log("err", err)
|
||||
|
||||
if _, ok := err.(*ErrorConnectToServer); ok {
|
||||
connectionErrorsCount++
|
||||
if e.connSema != nil {
|
||||
if err := e.connSema.Acquire(e.ctx, 1); err != nil {
|
||||
level.Warn(logger).Log("msg", "Failed to acquire semaphore", "err", err)
|
||||
return
|
||||
}
|
||||
defer e.connSema.Release(1)
|
||||
}
|
||||
}
|
||||
if err := e.scrapeDSN(ch, dsn); err != nil {
|
||||
errorsCount.Add(1)
|
||||
|
||||
level.Error(logger).Log("err", err)
|
||||
|
||||
if _, ok := err.(*ErrorConnectToServer); ok {
|
||||
connectionErrorsCount.Add(1)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
switch {
|
||||
case connectionErrorsCount >= len(dsns):
|
||||
case int(connectionErrorsCount.Load()) >= len(dsns):
|
||||
e.psqlUp.Set(0)
|
||||
default:
|
||||
e.psqlUp.Set(1) // Didn't fail, can mark connection as up for this scrape.
|
||||
}
|
||||
|
||||
switch errorsCount {
|
||||
switch errorsCount.Load() {
|
||||
case 0:
|
||||
e.error.Set(0)
|
||||
default:
|
||||
|
@ -23,9 +23,10 @@ import (
|
||||
"github.com/prometheus-community/postgres_exporter/config"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
func handleProbe(logger log.Logger, excludeDatabases []string) http.HandlerFunc {
|
||||
func handleProbe(logger log.Logger, excludeDatabases []string, connSema *semaphore.Weighted) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
conf := c.GetConfig()
|
||||
@ -69,21 +70,24 @@ func handleProbe(logger log.Logger, excludeDatabases []string) http.HandlerFunc
|
||||
DisableDefaultMetrics(*disableDefaultMetrics),
|
||||
DisableSettingsMetrics(*disableSettingsMetrics),
|
||||
AutoDiscoverDatabases(*autoDiscoverDatabases),
|
||||
//WithUserQueriesPath(*queriesPath),
|
||||
// WithUserQueriesPath(*queriesPath),
|
||||
WithConstantLabels(*constantLabelsList),
|
||||
ExcludeDatabases(excludeDatabases),
|
||||
IncludeDatabases(*includeDatabases),
|
||||
WithContext(ctx),
|
||||
WithConnectionsSemaphore(connSema),
|
||||
}
|
||||
|
||||
dsns := []string{dsn.GetConnectionString()}
|
||||
exporter := NewExporter(dsns, opts...)
|
||||
defer func() {
|
||||
exporter.servers.Close()
|
||||
}()
|
||||
|
||||
// defer func() {
|
||||
// exporter.servers.Close()
|
||||
// }()
|
||||
registry.MustRegister(exporter)
|
||||
|
||||
// Run the probe
|
||||
pc, err := collector.NewProbeCollector(tl, excludeDatabases, registry, dsn)
|
||||
pc, err := collector.NewProbeCollector(tl, excludeDatabases, registry, dsn, connSema)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
|
@ -14,6 +14,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
@ -278,8 +279,8 @@ func addQueries(content []byte, pgVersion semver.Version, server *Server) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func queryDatabases(server *Server) ([]string, error) {
|
||||
rows, err := server.db.Query("SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database() AND has_database_privilege(current_user, datname, 'connect')")
|
||||
func queryDatabases(ctx context.Context, server *Server) ([]string, error) {
|
||||
rows, err := server.db.QueryContext(ctx, "SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database() AND has_database_privilege(current_user, datname, 'connect')")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error retrieving databases: %v", err)
|
||||
}
|
||||
|
@ -174,6 +174,7 @@ func (s *Servers) GetServer(dsn string) (*Server, error) {
|
||||
s.servers[dsn] = server
|
||||
}
|
||||
if err = server.Ping(); err != nil {
|
||||
server.Close()
|
||||
delete(s.servers, dsn)
|
||||
time.Sleep(time.Duration(errCount) * time.Second)
|
||||
continue
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"github.com/go-kit/log"
|
||||
"github.com/go-kit/log/level"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -92,6 +93,9 @@ type PostgresCollector struct {
|
||||
logger log.Logger
|
||||
|
||||
instance *instance
|
||||
|
||||
connSema *semaphore.Weighted
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
type Option func(*PostgresCollector) error
|
||||
@ -157,6 +161,22 @@ func NewPostgresCollector(logger log.Logger, excludeDatabases []string, dsn stri
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// WithContext sets context for the collector.
|
||||
func WithContext(ctx context.Context) Option {
|
||||
return func(c *PostgresCollector) error {
|
||||
c.ctx = ctx
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithConnectionsSemaphore sets the semaphore for limiting the number of connections to the database instance.
|
||||
func WithConnectionsSemaphore(sem *semaphore.Weighted) Option {
|
||||
return func(c *PostgresCollector) error {
|
||||
c.connSema = sem
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Describe implements the prometheus.Collector interface.
|
||||
func (p PostgresCollector) Describe(ch chan<- *prometheus.Desc) {
|
||||
ch <- scrapeDurationDesc
|
||||
@ -170,6 +190,14 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) {
|
||||
// copy the instance so that concurrent scrapes have independent instances
|
||||
inst := p.instance.copy()
|
||||
|
||||
if p.connSema != nil {
|
||||
if err := p.connSema.Acquire(p.ctx, 1); err != nil {
|
||||
level.Warn(p.logger).Log("msg", "Failed to acquire semaphore", "err", err)
|
||||
return
|
||||
}
|
||||
defer p.connSema.Release(1)
|
||||
}
|
||||
|
||||
// Set up the database connection for the collector.
|
||||
err := inst.setup()
|
||||
if err != nil {
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"github.com/go-kit/log/level"
|
||||
"github.com/prometheus-community/postgres_exporter/config"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
type ProbeCollector struct {
|
||||
@ -28,9 +29,10 @@ type ProbeCollector struct {
|
||||
collectors map[string]Collector
|
||||
logger log.Logger
|
||||
instance *instance
|
||||
connSema *semaphore.Weighted
|
||||
}
|
||||
|
||||
func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *prometheus.Registry, dsn config.DSN) (*ProbeCollector, error) {
|
||||
func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *prometheus.Registry, dsn config.DSN, connSema *semaphore.Weighted) (*ProbeCollector, error) {
|
||||
collectors := make(map[string]Collector)
|
||||
initiatedCollectorsMtx.Lock()
|
||||
defer initiatedCollectorsMtx.Unlock()
|
||||
@ -68,6 +70,7 @@ func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *p
|
||||
collectors: collectors,
|
||||
logger: logger,
|
||||
instance: instance,
|
||||
connSema: connSema,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -75,6 +78,12 @@ func (pc *ProbeCollector) Describe(ch chan<- *prometheus.Desc) {
|
||||
}
|
||||
|
||||
func (pc *ProbeCollector) Collect(ch chan<- prometheus.Metric) {
|
||||
if err := pc.connSema.Acquire(context.TODO(), 1); err != nil {
|
||||
level.Warn(pc.logger).Log("msg", "Failed to acquire semaphore", "err", err)
|
||||
return
|
||||
}
|
||||
defer pc.connSema.Release(1)
|
||||
|
||||
// Set up the database connection for the collector.
|
||||
err := pc.instance.setup()
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user