diff --git a/cmd/postgres_exporter/datasource.go b/cmd/postgres_exporter/datasource.go index 0227edba..a41cceec 100644 --- a/cmd/postgres_exporter/datasource.go +++ b/cmd/postgres_exporter/datasource.go @@ -49,19 +49,8 @@ func (e *Exporter) discoverDatabaseDSNs() []string { continue } - server, err := e.servers.GetServer(dsn) + databaseNames, err := e.getDatabaseNames(dsn) if err != nil { - level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err) - continue - } - dsns[dsn] = struct{}{} - - // If autoDiscoverDatabases is true, set first dsn as master database (Default: false) - server.master = true - - databaseNames, err := queryDatabases(server) - if err != nil { - level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err) continue } for _, databaseName := range databaseNames { @@ -99,11 +88,40 @@ func (e *Exporter) discoverDatabaseDSNs() []string { return result } +func (e *Exporter) getDatabaseNames(dsn string) ([]string, error) { + if e.connSema != nil { + if err := e.connSema.Acquire(e.ctx, 1); err != nil { + level.Warn(logger).Log("msg", "Failed to acquire semaphore", "err", err) + return nil, err + } + defer e.connSema.Release(1) + } + + server, err := e.GetServer(dsn) + if err != nil { + level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err) + return nil, err // TODO + } + defer server.Close() + + // If autoDiscoverDatabases is true, set first dsn as master database (Default: false) + server.master = true + + dbNames, err := queryDatabases(e.ctx, server) + if err != nil { + level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err) + return nil, err + } + + return dbNames, nil +} + func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error { - server, err := e.servers.GetServer(dsn) + server, err := e.GetServer(dsn) if err != nil { return err // TODO } + defer server.Close() level.Debug(logger).Log("msg", "scrapeDSN:"+dsn) diff --git a/cmd/postgres_exporter/main.go b/cmd/postgres_exporter/main.go index 16f25ed4..58d711b8 100644 --- a/cmd/postgres_exporter/main.go +++ b/cmd/postgres_exporter/main.go @@ -23,16 +23,15 @@ import ( "github.com/alecthomas/kingpin/v2" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/prometheus-community/postgres_exporter/collector" "github.com/prometheus-community/postgres_exporter/config" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/promlog" "github.com/prometheus/common/promlog/flag" "github.com/prometheus/common/version" "github.com/prometheus/exporter-toolkit/web" "github.com/prometheus/exporter-toolkit/web/kingpinflag" + "golang.org/x/sync/semaphore" ) var ( @@ -114,61 +113,18 @@ func main() { level.Warn(logger).Log("msg", "Constant labels on all metrics is DEPRECATED") } - opts := []ExporterOpt{ - DisableDefaultMetrics(*disableDefaultMetrics), - DisableSettingsMetrics(*disableSettingsMetrics), - AutoDiscoverDatabases(*autoDiscoverDatabases), - WithConstantLabels(*constantLabelsList), - ExcludeDatabases(excludedDatabases), - IncludeDatabases(*includeDatabases), - } - - exporter := NewExporter(dsns, opts...) - defer func() { - exporter.servers.Close() - }() - versionCollector := version.NewCollector(exporterName) - prometheus.MustRegister(versionCollector) - - prometheus.MustRegister(exporter) - - // TODO(@sysadmind): Remove this with multi-target support. We are removing multiple DSN support - dsn := "" - if len(dsns) > 0 { - dsn = dsns[0] - } - - cleanup, hr, mr, lr := initializePerconaExporters(dsns) - defer cleanup() - - pe, err := collector.NewPostgresCollector( - logger, - excludedDatabases, - dsn, - []string{}, - ) - if err != nil { - level.Warn(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error()) - } else { - prometheus.MustRegister(pe) - } - psCollector := collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}) goCollector := collectors.NewGoCollector() - promHandler := newHandler(map[string]prometheus.Collector{ - "exporter": exporter, - "custom_query.hr": hr, - "custom_query.mr": mr, - "custom_query.lr": lr, + globalCollectors := map[string]prometheus.Collector{ "standard.process": psCollector, "standard.go": goCollector, "version": versionCollector, - "postgres": pe, - }) + } - http.Handle(*metricsPath, promHandler) + connSema := semaphore.NewWeighted(5) + http.Handle(*metricsPath, Handler(logger, dsns, connSema, globalCollectors)) if *metricsPath != "/" && *metricsPath != "" { landingConfig := web.LandingConfig{ @@ -190,7 +146,7 @@ func main() { http.Handle("/", landingPage) } - http.HandleFunc("/probe", handleProbe(logger, excludedDatabases)) + http.HandleFunc("/probe", handleProbe(logger, excludedDatabases, connSema)) level.Info(logger).Log("msg", "Listening on address", "address", *webConfig.WebListenAddresses) srv := &http.Server{} @@ -199,80 +155,3 @@ func main() { os.Exit(1) } } - -// handler wraps an unfiltered http.Handler but uses a filtered handler, -// created on the fly, if filtering is requested. Create instances with -// newHandler. It used for collectors filtering. -type handler struct { - unfilteredHandler http.Handler - collectors map[string]prometheus.Collector -} - -func newHandler(collectors map[string]prometheus.Collector) *handler { - h := &handler{collectors: collectors} - - innerHandler, err := h.innerHandler() - if err != nil { - level.Error(logger).Log("msg", "Couldn't create metrics handler", "error", err) - os.Exit(1) - } - - h.unfilteredHandler = innerHandler - return h -} - -// ServeHTTP implements http.Handler. -func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - filters := r.URL.Query()["collect[]"] - level.Debug(logger).Log("msg", "Collect query", "filters", filters) - - if len(filters) == 0 { - // No filters, use the prepared unfiltered handler. - h.unfilteredHandler.ServeHTTP(w, r) - return - } - - filteredHandler, err := h.innerHandler(filters...) - if err != nil { - level.Warn(logger).Log("msg", "Couldn't create filtered metrics handler", "error", err) - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(fmt.Sprintf("Couldn't create filtered metrics handler: %s", err))) // nolint: errcheck - return - } - - filteredHandler.ServeHTTP(w, r) -} - -func (h *handler) innerHandler(filters ...string) (http.Handler, error) { - registry := prometheus.NewRegistry() - - // register all collectors by default. - if len(filters) == 0 { - for name, c := range h.collectors { - if err := registry.Register(c); err != nil { - return nil, err - } - level.Debug(logger).Log("msg", "Collector was registered", "collector", name) - } - } - - // register only filtered collectors. - for _, name := range filters { - if c, ok := h.collectors[name]; ok { - if err := registry.Register(c); err != nil { - return nil, err - } - level.Debug(logger).Log("msg", "Collector was registered", "collector", name) - } - } - - handler := promhttp.HandlerFor( - registry, - promhttp.HandlerOpts{ - // ErrorLog: log.NewNopLogger() .NewErrorLogger(), - ErrorHandling: promhttp.ContinueOnError, - }, - ) - - return handler, nil -} diff --git a/cmd/postgres_exporter/percona_exporter.go b/cmd/postgres_exporter/percona_exporter.go index 22ace948..0b79a3c6 100644 --- a/cmd/postgres_exporter/percona_exporter.go +++ b/cmd/postgres_exporter/percona_exporter.go @@ -1,16 +1,24 @@ package main import ( + "context" "crypto/sha256" "fmt" + "net/http" "os" "path/filepath" + "strconv" "strings" + "time" "github.com/alecthomas/kingpin/v2" "github.com/blang/semver/v4" + "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus-community/postgres_exporter/collector" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "golang.org/x/sync/semaphore" ) type MetricResolution string @@ -31,56 +39,180 @@ var ( collectCustomQueryHrDirectory = kingpin.Flag("collect.custom_query.hr.directory", "Path to custom queries with high resolution directory.").Envar("PG_EXPORTER_EXTEND_QUERY_HR_PATH").String() ) -func initializePerconaExporters(dsn []string) (func(), *Exporter, *Exporter, *Exporter) { +// Handler returns a http.Handler that serves metrics. Can be used instead of +// run for hooking up custom HTTP servers. +func Handler(logger log.Logger, dsns []string, connSema *semaphore.Weighted, globalCollectors map[string]prometheus.Collector) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + seconds, err := strconv.Atoi(r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds")) + // To support also older ones vmagents. + if err != nil { + seconds = 10 + } + + ctx, cancel := context.WithTimeout(r.Context(), time.Duration(seconds)*time.Second) + defer cancel() + + filters := r.URL.Query()["collect[]"] + level.Debug(logger).Log("msg", "Collect query", "filters", filters) + + var f Filters + if len(filters) == 0 { + f.EnableAllCollectors = true + } else { + for _, filter := range filters { + switch filter { + case "standard.process": + f.EnableProcessCollector = true + case "standard.go": + f.EnableGoCollector = true + case "standard.version": + f.EnableVersionCollector = true + case "standard.default": + f.EnableDefaultCollector = true + case "standard.hr": + f.EnableHRCollector = true + case "standard.mr": + f.EnableMRCollector = true + case "standard.lr": + f.EnableLRCollector = true + case "postgres": + f.EnablePostgresCollector = true + } + } + } + + registry := makeRegistry(ctx, dsns, connSema, globalCollectors, f) + + // Delegate http serving to Prometheus client library, which will call collector.Collect. + h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{ + ErrorHandling: promhttp.ContinueOnError, + // ErrorLog: logger, //TODO!!! + }) + + h.ServeHTTP(w, r) + }) +} + +// Filters is a struct to enable or disable collectors. +type Filters struct { + EnableAllCollectors bool + EnableLRCollector bool + EnableMRCollector bool + EnableHRCollector bool + EnableDefaultCollector bool + EnableGoCollector bool + EnableVersionCollector bool + EnableProcessCollector bool + EnablePostgresCollector bool +} + +// makeRegistry creates a new prometheus registry with default and percona exporters. +func makeRegistry(ctx context.Context, dsns []string, connSema *semaphore.Weighted, globlalCollectors map[string]prometheus.Collector, filters Filters) *prometheus.Registry { + registry := prometheus.NewRegistry() + + excludedDatabases := strings.Split(*excludeDatabases, ",") + logger.Log("msg", "Excluded databases", "databases", fmt.Sprintf("%v", excludedDatabases)) + queriesPath := map[MetricResolution]string{ HR: *collectCustomQueryHrDirectory, MR: *collectCustomQueryMrDirectory, LR: *collectCustomQueryLrDirectory, } - excludedDatabases := strings.Split(*excludeDatabases, ",") opts := []ExporterOpt{ - DisableDefaultMetrics(true), - DisableSettingsMetrics(true), AutoDiscoverDatabases(*autoDiscoverDatabases), - WithUserQueriesPath(queriesPath), + WithConstantLabels(*constantLabelsList), ExcludeDatabases(excludedDatabases), + WithConnectionsSemaphore(connSema), + WithContext(ctx), } - hrExporter := NewExporter(dsn, - append(opts, - CollectorName("custom_query.hr"), - WithUserQueriesEnabled(HR), - WithEnabled(*collectCustomQueryHr), - WithConstantLabels(*constantLabelsList), - )..., - ) - prometheus.MustRegister(hrExporter) - mrExporter := NewExporter(dsn, - append(opts, - CollectorName("custom_query.mr"), - WithUserQueriesEnabled(MR), - WithEnabled(*collectCustomQueryMr), - WithConstantLabels(*constantLabelsList), - )..., - ) - prometheus.MustRegister(mrExporter) + if filters.EnableAllCollectors || filters.EnableDefaultCollector { + defaultExporter := NewExporter(dsns, append( + opts, + DisableDefaultMetrics(*disableDefaultMetrics), + DisableSettingsMetrics(*disableSettingsMetrics), + IncludeDatabases(*includeDatabases), + )...) + registry.MustRegister(defaultExporter) + } - lrExporter := NewExporter(dsn, - append(opts, - CollectorName("custom_query.lr"), - WithUserQueriesEnabled(LR), - WithEnabled(*collectCustomQueryLr), - WithConstantLabels(*constantLabelsList), - )..., - ) - prometheus.MustRegister(lrExporter) + if filters.EnableAllCollectors || filters.EnableHRCollector { + hrExporter := NewExporter(dsns, + append(opts, + CollectorName("custom_query.hr"), + WithUserQueriesEnabled(HR), + WithEnabled(*collectCustomQueryHr), + DisableDefaultMetrics(true), + DisableSettingsMetrics(true), + WithUserQueriesPath(queriesPath), + )...) + registry.MustRegister(hrExporter) - return func() { - hrExporter.servers.Close() - mrExporter.servers.Close() - lrExporter.servers.Close() - }, hrExporter, mrExporter, lrExporter + } + + if filters.EnableAllCollectors || filters.EnableMRCollector { + mrExporter := NewExporter(dsns, + append(opts, + CollectorName("custom_query.mr"), + WithUserQueriesEnabled(MR), + WithEnabled(*collectCustomQueryMr), + DisableDefaultMetrics(true), + DisableSettingsMetrics(true), + WithUserQueriesPath(queriesPath), + )...) + registry.MustRegister(mrExporter) + } + + if filters.EnableAllCollectors || filters.EnableLRCollector { + lrExporter := NewExporter(dsns, + append(opts, + CollectorName("custom_query.lr"), + WithUserQueriesEnabled(LR), + WithEnabled(*collectCustomQueryLr), + DisableDefaultMetrics(true), + DisableSettingsMetrics(true), + WithUserQueriesPath(queriesPath), + )...) + registry.MustRegister(lrExporter) + } + + if filters.EnableAllCollectors || filters.EnableGoCollector { + registry.MustRegister(globlalCollectors["standard.go"]) + } + + if filters.EnableAllCollectors || filters.EnableProcessCollector { + registry.MustRegister(globlalCollectors["standard.process"]) + } + + if filters.EnableAllCollectors || filters.EnableVersionCollector { + registry.MustRegister(globlalCollectors["version"]) + } + + if filters.EnableAllCollectors || filters.EnablePostgresCollector { + // This chunk moved here from main.go + // TODO(@sysadmind): Remove this with multi-target support. We are removing multiple DSN support + dsn := "" + if len(dsns) > 0 { + dsn = dsns[0] + } + + pe, err := collector.NewPostgresCollector( + logger, + excludedDatabases, + dsn, + []string{}, + collector.WithContext(ctx), + collector.WithConnectionsSemaphore(connSema), + ) + if err != nil { + level.Error(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error()) + } else { + registry.MustRegister(pe) + } + } + + return registry } func (e *Exporter) loadCustomQueries(res MetricResolution, version semver.Version, server *Server) { diff --git a/cmd/postgres_exporter/postgres_exporter.go b/cmd/postgres_exporter/postgres_exporter.go index ac0931bc..88f4b9d4 100644 --- a/cmd/postgres_exporter/postgres_exporter.go +++ b/cmd/postgres_exporter/postgres_exporter.go @@ -14,17 +14,21 @@ package main import ( + "context" "database/sql" "errors" "fmt" "math" "regexp" "strings" + "sync" + "sync/atomic" "time" "github.com/blang/semver/v4" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sync/semaphore" ) // ColumnUsage should be one of several enum values which describe how a @@ -432,7 +436,10 @@ type Exporter struct { // servers are used to allow re-using the DB connection between scrapes. // servers contains metrics map and query overrides. - servers *Servers + // servers *Servers + + connSema *semaphore.Weighted + ctx context.Context } // ExporterOpt configures Exporter. @@ -466,6 +473,20 @@ func WithEnabled(p bool) ExporterOpt { } } +// WithContext sets context for the exporter. +func WithContext(ctx context.Context) ExporterOpt { + return func(e *Exporter) { + e.ctx = ctx + } +} + +// WithConnectionsSemaphore sets the semaphore for limiting the number of connections to the database instance. +func WithConnectionsSemaphore(sem *semaphore.Weighted) ExporterOpt { + return func(e *Exporter) { + e.connSema = sem + } +} + // DisableSettingsMetrics configures pg_settings export. func DisableSettingsMetrics(b bool) ExporterOpt { return func(e *Exporter) { @@ -547,6 +568,7 @@ func NewExporter(dsn []string, opts ...ExporterOpt) *Exporter { dsn: dsn, builtinMetricMaps: builtinMetricMaps, enabled: true, + ctx: context.Background(), } for _, opt := range opts { @@ -554,11 +576,38 @@ func NewExporter(dsn []string, opts ...ExporterOpt) *Exporter { } e.setupInternalMetrics() - e.servers = NewServers(ServerWithLabels(e.constantLabels)) + // e.servers = NewServers(ServerWithLabels(e.constantLabels)) return e } +// GetServer returns a new Server instance for the provided DSN. +func (e *Exporter) GetServer(dsn string, opts ...ServerOpt) (*Server, error) { + var err error + errCount := 0 // start at zero because we increment before doing work + retries := 1 + var server *Server + for { + if errCount++; errCount > retries { + return nil, err + } + + server, err = NewServer(dsn, opts...) + if err != nil { + time.Sleep(time.Duration(errCount) * time.Second) + continue + } + + if err = server.Ping(); err != nil { + server.Close() + time.Sleep(time.Duration(errCount) * time.Second) + continue + } + break + } + return server, nil +} + func (e *Exporter) setupInternalMetrics() { e.duration = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, @@ -697,29 +746,45 @@ func (e *Exporter) scrape(ch chan<- prometheus.Metric) { dsns = e.discoverDatabaseDSNs() } - var errorsCount int - var connectionErrorsCount int + var errorsCount atomic.Int32 + var connectionErrorsCount atomic.Int32 + var wg sync.WaitGroup for _, dsn := range dsns { - if err := e.scrapeDSN(ch, dsn); err != nil { - errorsCount++ + dsn := dsn + wg.Add(1) + go func() { + defer wg.Done() - level.Error(logger).Log("err", err) - - if _, ok := err.(*ErrorConnectToServer); ok { - connectionErrorsCount++ + if e.connSema != nil { + if err := e.connSema.Acquire(e.ctx, 1); err != nil { + level.Warn(logger).Log("msg", "Failed to acquire semaphore", "err", err) + return + } + defer e.connSema.Release(1) } - } + if err := e.scrapeDSN(ch, dsn); err != nil { + errorsCount.Add(1) + + level.Error(logger).Log("err", err) + + if _, ok := err.(*ErrorConnectToServer); ok { + connectionErrorsCount.Add(1) + } + } + }() } + wg.Wait() + switch { - case connectionErrorsCount >= len(dsns): + case int(connectionErrorsCount.Load()) >= len(dsns): e.psqlUp.Set(0) default: e.psqlUp.Set(1) // Didn't fail, can mark connection as up for this scrape. } - switch errorsCount { + switch errorsCount.Load() { case 0: e.error.Set(0) default: diff --git a/cmd/postgres_exporter/probe.go b/cmd/postgres_exporter/probe.go index a200ad2e..2d9c0814 100644 --- a/cmd/postgres_exporter/probe.go +++ b/cmd/postgres_exporter/probe.go @@ -23,9 +23,10 @@ import ( "github.com/prometheus-community/postgres_exporter/config" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "golang.org/x/sync/semaphore" ) -func handleProbe(logger log.Logger, excludeDatabases []string) http.HandlerFunc { +func handleProbe(logger log.Logger, excludeDatabases []string, connSema *semaphore.Weighted) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() conf := c.GetConfig() @@ -69,21 +70,24 @@ func handleProbe(logger log.Logger, excludeDatabases []string) http.HandlerFunc DisableDefaultMetrics(*disableDefaultMetrics), DisableSettingsMetrics(*disableSettingsMetrics), AutoDiscoverDatabases(*autoDiscoverDatabases), - //WithUserQueriesPath(*queriesPath), + // WithUserQueriesPath(*queriesPath), WithConstantLabels(*constantLabelsList), ExcludeDatabases(excludeDatabases), IncludeDatabases(*includeDatabases), + WithContext(ctx), + WithConnectionsSemaphore(connSema), } dsns := []string{dsn.GetConnectionString()} exporter := NewExporter(dsns, opts...) - defer func() { - exporter.servers.Close() - }() + + // defer func() { + // exporter.servers.Close() + // }() registry.MustRegister(exporter) // Run the probe - pc, err := collector.NewProbeCollector(tl, excludeDatabases, registry, dsn) + pc, err := collector.NewProbeCollector(tl, excludeDatabases, registry, dsn, connSema) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/cmd/postgres_exporter/queries.go b/cmd/postgres_exporter/queries.go index 8338c033..57a1b749 100644 --- a/cmd/postgres_exporter/queries.go +++ b/cmd/postgres_exporter/queries.go @@ -14,6 +14,7 @@ package main import ( + "context" "errors" "fmt" @@ -278,8 +279,8 @@ func addQueries(content []byte, pgVersion semver.Version, server *Server) error return nil } -func queryDatabases(server *Server) ([]string, error) { - rows, err := server.db.Query("SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database() AND has_database_privilege(current_user, datname, 'connect')") +func queryDatabases(ctx context.Context, server *Server) ([]string, error) { + rows, err := server.db.QueryContext(ctx, "SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database() AND has_database_privilege(current_user, datname, 'connect')") if err != nil { return nil, fmt.Errorf("Error retrieving databases: %v", err) } diff --git a/cmd/postgres_exporter/server.go b/cmd/postgres_exporter/server.go index 1cd4051b..574ce9f6 100644 --- a/cmd/postgres_exporter/server.go +++ b/cmd/postgres_exporter/server.go @@ -174,6 +174,7 @@ func (s *Servers) GetServer(dsn string) (*Server, error) { s.servers[dsn] = server } if err = server.Ping(); err != nil { + server.Close() delete(s.servers, dsn) time.Sleep(time.Duration(errCount) * time.Second) continue diff --git a/collector/collector.go b/collector/collector.go index 12112987..8391a296 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -24,6 +24,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sync/semaphore" ) var ( @@ -92,6 +93,9 @@ type PostgresCollector struct { logger log.Logger instance *instance + + connSema *semaphore.Weighted + ctx context.Context } type Option func(*PostgresCollector) error @@ -157,6 +161,22 @@ func NewPostgresCollector(logger log.Logger, excludeDatabases []string, dsn stri return p, nil } +// WithContext sets context for the collector. +func WithContext(ctx context.Context) Option { + return func(c *PostgresCollector) error { + c.ctx = ctx + return nil + } +} + +// WithConnectionsSemaphore sets the semaphore for limiting the number of connections to the database instance. +func WithConnectionsSemaphore(sem *semaphore.Weighted) Option { + return func(c *PostgresCollector) error { + c.connSema = sem + return nil + } +} + // Describe implements the prometheus.Collector interface. func (p PostgresCollector) Describe(ch chan<- *prometheus.Desc) { ch <- scrapeDurationDesc @@ -170,6 +190,14 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) { // copy the instance so that concurrent scrapes have independent instances inst := p.instance.copy() + if p.connSema != nil { + if err := p.connSema.Acquire(p.ctx, 1); err != nil { + level.Warn(p.logger).Log("msg", "Failed to acquire semaphore", "err", err) + return + } + defer p.connSema.Release(1) + } + // Set up the database connection for the collector. err := inst.setup() if err != nil { diff --git a/collector/probe.go b/collector/probe.go index 4c0f0419..5746f725 100644 --- a/collector/probe.go +++ b/collector/probe.go @@ -21,6 +21,7 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus-community/postgres_exporter/config" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sync/semaphore" ) type ProbeCollector struct { @@ -28,9 +29,10 @@ type ProbeCollector struct { collectors map[string]Collector logger log.Logger instance *instance + connSema *semaphore.Weighted } -func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *prometheus.Registry, dsn config.DSN) (*ProbeCollector, error) { +func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *prometheus.Registry, dsn config.DSN, connSema *semaphore.Weighted) (*ProbeCollector, error) { collectors := make(map[string]Collector) initiatedCollectorsMtx.Lock() defer initiatedCollectorsMtx.Unlock() @@ -68,6 +70,7 @@ func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *p collectors: collectors, logger: logger, instance: instance, + connSema: connSema, }, nil } @@ -75,6 +78,12 @@ func (pc *ProbeCollector) Describe(ch chan<- *prometheus.Desc) { } func (pc *ProbeCollector) Collect(ch chan<- prometheus.Metric) { + if err := pc.connSema.Acquire(context.TODO(), 1); err != nil { + level.Warn(pc.logger).Log("msg", "Failed to acquire semaphore", "err", err) + return + } + defer pc.connSema.Release(1) + // Set up the database connection for the collector. err := pc.instance.setup() if err != nil {