From ae81fb7c1948072dced2e8299c1b17ee38246f73 Mon Sep 17 00:00:00 2001
From: Artem Gavrilov <artem.gavrilov@percona.com>
Date: Fri, 16 Feb 2024 18:24:38 +0200
Subject: [PATCH] PMM-12893 Use rolling strategy for connection utilization

---
 cmd/postgres_exporter/datasource.go        |  44 +++--
 cmd/postgres_exporter/main.go              | 133 +------------
 cmd/postgres_exporter/percona_exporter.go  | 206 +++++++++++++++++----
 cmd/postgres_exporter/postgres_exporter.go |  91 +++++++--
 cmd/postgres_exporter/probe.go             |  16 +-
 cmd/postgres_exporter/queries.go           |   5 +-
 cmd/postgres_exporter/server.go            |   1 +
 collector/collector.go                     |  28 +++
 collector/probe.go                         |  11 +-
 9 files changed, 336 insertions(+), 199 deletions(-)

diff --git a/cmd/postgres_exporter/datasource.go b/cmd/postgres_exporter/datasource.go
index 0227edba..a41cceec 100644
--- a/cmd/postgres_exporter/datasource.go
+++ b/cmd/postgres_exporter/datasource.go
@@ -49,19 +49,8 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
 			continue
 		}
 
-		server, err := e.servers.GetServer(dsn)
+		databaseNames, err := e.getDatabaseNames(dsn)
 		if err != nil {
-			level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err)
-			continue
-		}
-		dsns[dsn] = struct{}{}
-
-		// If autoDiscoverDatabases is true, set first dsn as master database (Default: false)
-		server.master = true
-
-		databaseNames, err := queryDatabases(server)
-		if err != nil {
-			level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err)
 			continue
 		}
 		for _, databaseName := range databaseNames {
@@ -99,11 +88,40 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
 	return result
 }
 
+func (e *Exporter) getDatabaseNames(dsn string) ([]string, error) {
+	if e.connSema != nil {
+		if err := e.connSema.Acquire(e.ctx, 1); err != nil {
+			level.Warn(logger).Log("msg", "Failed to acquire semaphore", "err", err)
+			return nil, err
+		}
+		defer e.connSema.Release(1)
+	}
+
+	server, err := e.GetServer(dsn)
+	if err != nil {
+		level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err)
+		return nil, err // TODO
+	}
+	defer server.Close()
+
+	// If autoDiscoverDatabases is true, set first dsn as master database (Default: false)
+	server.master = true
+
+	dbNames, err := queryDatabases(e.ctx, server)
+	if err != nil {
+		level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err)
+		return nil, err
+	}
+
+	return dbNames, nil
+}
+
 func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error {
-	server, err := e.servers.GetServer(dsn)
+	server, err := e.GetServer(dsn)
 	if err != nil {
 		return err // TODO
 	}
+	defer server.Close()
 
 	level.Debug(logger).Log("msg", "scrapeDSN:"+dsn)
 
diff --git a/cmd/postgres_exporter/main.go b/cmd/postgres_exporter/main.go
index 16f25ed4..58d711b8 100644
--- a/cmd/postgres_exporter/main.go
+++ b/cmd/postgres_exporter/main.go
@@ -23,16 +23,15 @@ import (
 	"github.com/alecthomas/kingpin/v2"
 	"github.com/go-kit/log"
 	"github.com/go-kit/log/level"
-	"github.com/prometheus-community/postgres_exporter/collector"
 	"github.com/prometheus-community/postgres_exporter/config"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/client_golang/prometheus/collectors"
-	"github.com/prometheus/client_golang/prometheus/promhttp"
 	"github.com/prometheus/common/promlog"
 	"github.com/prometheus/common/promlog/flag"
 	"github.com/prometheus/common/version"
 	"github.com/prometheus/exporter-toolkit/web"
 	"github.com/prometheus/exporter-toolkit/web/kingpinflag"
+	"golang.org/x/sync/semaphore"
 )
 
 var (
@@ -114,61 +113,18 @@ func main() {
 		level.Warn(logger).Log("msg", "Constant labels on all metrics is DEPRECATED")
 	}
 
-	opts := []ExporterOpt{
-		DisableDefaultMetrics(*disableDefaultMetrics),
-		DisableSettingsMetrics(*disableSettingsMetrics),
-		AutoDiscoverDatabases(*autoDiscoverDatabases),
-		WithConstantLabels(*constantLabelsList),
-		ExcludeDatabases(excludedDatabases),
-		IncludeDatabases(*includeDatabases),
-	}
-
-	exporter := NewExporter(dsns, opts...)
-	defer func() {
-		exporter.servers.Close()
-	}()
-
 	versionCollector := version.NewCollector(exporterName)
-	prometheus.MustRegister(versionCollector)
-
-	prometheus.MustRegister(exporter)
-
-	// TODO(@sysadmind): Remove this with multi-target support. We are removing multiple DSN support
-	dsn := ""
-	if len(dsns) > 0 {
-		dsn = dsns[0]
-	}
-
-	cleanup, hr, mr, lr := initializePerconaExporters(dsns)
-	defer cleanup()
-
-	pe, err := collector.NewPostgresCollector(
-		logger,
-		excludedDatabases,
-		dsn,
-		[]string{},
-	)
-	if err != nil {
-		level.Warn(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error())
-	} else {
-		prometheus.MustRegister(pe)
-	}
-
 	psCollector := collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})
 	goCollector := collectors.NewGoCollector()
 
-	promHandler := newHandler(map[string]prometheus.Collector{
-		"exporter":         exporter,
-		"custom_query.hr":  hr,
-		"custom_query.mr":  mr,
-		"custom_query.lr":  lr,
+	globalCollectors := map[string]prometheus.Collector{
 		"standard.process": psCollector,
 		"standard.go":      goCollector,
 		"version":          versionCollector,
-		"postgres":         pe,
-	})
+	}
 
-	http.Handle(*metricsPath, promHandler)
+	connSema := semaphore.NewWeighted(5)
+	http.Handle(*metricsPath, Handler(logger, dsns, connSema, globalCollectors))
 
 	if *metricsPath != "/" && *metricsPath != "" {
 		landingConfig := web.LandingConfig{
@@ -190,7 +146,7 @@ func main() {
 		http.Handle("/", landingPage)
 	}
 
-	http.HandleFunc("/probe", handleProbe(logger, excludedDatabases))
+	http.HandleFunc("/probe", handleProbe(logger, excludedDatabases, connSema))
 
 	level.Info(logger).Log("msg", "Listening on address", "address", *webConfig.WebListenAddresses)
 	srv := &http.Server{}
@@ -199,80 +155,3 @@ func main() {
 		os.Exit(1)
 	}
 }
-
-// handler wraps an unfiltered http.Handler but uses a filtered handler,
-// created on the fly, if filtering is requested. Create instances with
-// newHandler. It used for collectors filtering.
-type handler struct {
-	unfilteredHandler http.Handler
-	collectors        map[string]prometheus.Collector
-}
-
-func newHandler(collectors map[string]prometheus.Collector) *handler {
-	h := &handler{collectors: collectors}
-
-	innerHandler, err := h.innerHandler()
-	if err != nil {
-		level.Error(logger).Log("msg", "Couldn't create metrics handler", "error", err)
-		os.Exit(1)
-	}
-
-	h.unfilteredHandler = innerHandler
-	return h
-}
-
-// ServeHTTP implements http.Handler.
-func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-	filters := r.URL.Query()["collect[]"]
-	level.Debug(logger).Log("msg", "Collect query", "filters", filters)
-
-	if len(filters) == 0 {
-		// No filters, use the prepared unfiltered handler.
-		h.unfilteredHandler.ServeHTTP(w, r)
-		return
-	}
-
-	filteredHandler, err := h.innerHandler(filters...)
-	if err != nil {
-		level.Warn(logger).Log("msg", "Couldn't create filtered metrics handler", "error", err)
-		w.WriteHeader(http.StatusBadRequest)
-		w.Write([]byte(fmt.Sprintf("Couldn't create filtered metrics handler: %s", err))) // nolint: errcheck
-		return
-	}
-
-	filteredHandler.ServeHTTP(w, r)
-}
-
-func (h *handler) innerHandler(filters ...string) (http.Handler, error) {
-	registry := prometheus.NewRegistry()
-
-	// register all collectors by default.
-	if len(filters) == 0 {
-		for name, c := range h.collectors {
-			if err := registry.Register(c); err != nil {
-				return nil, err
-			}
-			level.Debug(logger).Log("msg", "Collector was registered", "collector", name)
-		}
-	}
-
-	// register only filtered collectors.
-	for _, name := range filters {
-		if c, ok := h.collectors[name]; ok {
-			if err := registry.Register(c); err != nil {
-				return nil, err
-			}
-			level.Debug(logger).Log("msg", "Collector was registered", "collector", name)
-		}
-	}
-
-	handler := promhttp.HandlerFor(
-		registry,
-		promhttp.HandlerOpts{
-			// ErrorLog:       log.NewNopLogger() .NewErrorLogger(),
-			ErrorHandling: promhttp.ContinueOnError,
-		},
-	)
-
-	return handler, nil
-}
diff --git a/cmd/postgres_exporter/percona_exporter.go b/cmd/postgres_exporter/percona_exporter.go
index 22ace948..0b79a3c6 100644
--- a/cmd/postgres_exporter/percona_exporter.go
+++ b/cmd/postgres_exporter/percona_exporter.go
@@ -1,16 +1,24 @@
 package main
 
 import (
+	"context"
 	"crypto/sha256"
 	"fmt"
+	"net/http"
 	"os"
 	"path/filepath"
+	"strconv"
 	"strings"
+	"time"
 
 	"github.com/alecthomas/kingpin/v2"
 	"github.com/blang/semver/v4"
+	"github.com/go-kit/log"
 	"github.com/go-kit/log/level"
+	"github.com/prometheus-community/postgres_exporter/collector"
 	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promhttp"
+	"golang.org/x/sync/semaphore"
 )
 
 type MetricResolution string
@@ -31,56 +39,180 @@ var (
 	collectCustomQueryHrDirectory = kingpin.Flag("collect.custom_query.hr.directory", "Path to custom queries with high resolution directory.").Envar("PG_EXPORTER_EXTEND_QUERY_HR_PATH").String()
 )
 
-func initializePerconaExporters(dsn []string) (func(), *Exporter, *Exporter, *Exporter) {
+// Handler returns a http.Handler that serves metrics. Can be used instead of
+// run for hooking up custom HTTP servers.
+func Handler(logger log.Logger, dsns []string, connSema *semaphore.Weighted, globalCollectors map[string]prometheus.Collector) http.Handler {
+	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		seconds, err := strconv.Atoi(r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds"))
+		// To support also older ones vmagents.
+		if err != nil {
+			seconds = 10
+		}
+
+		ctx, cancel := context.WithTimeout(r.Context(), time.Duration(seconds)*time.Second)
+		defer cancel()
+
+		filters := r.URL.Query()["collect[]"]
+		level.Debug(logger).Log("msg", "Collect query", "filters", filters)
+
+		var f Filters
+		if len(filters) == 0 {
+			f.EnableAllCollectors = true
+		} else {
+			for _, filter := range filters {
+				switch filter {
+				case "standard.process":
+					f.EnableProcessCollector = true
+				case "standard.go":
+					f.EnableGoCollector = true
+				case "standard.version":
+					f.EnableVersionCollector = true
+				case "standard.default":
+					f.EnableDefaultCollector = true
+				case "standard.hr":
+					f.EnableHRCollector = true
+				case "standard.mr":
+					f.EnableMRCollector = true
+				case "standard.lr":
+					f.EnableLRCollector = true
+				case "postgres":
+					f.EnablePostgresCollector = true
+				}
+			}
+		}
+
+		registry := makeRegistry(ctx, dsns, connSema, globalCollectors, f)
+
+		// Delegate http serving to Prometheus client library, which will call collector.Collect.
+		h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{
+			ErrorHandling: promhttp.ContinueOnError,
+			// ErrorLog:      logger, //TODO!!!
+		})
+
+		h.ServeHTTP(w, r)
+	})
+}
+
+// Filters is a struct to enable or disable collectors.
+type Filters struct {
+	EnableAllCollectors     bool
+	EnableLRCollector       bool
+	EnableMRCollector       bool
+	EnableHRCollector       bool
+	EnableDefaultCollector  bool
+	EnableGoCollector       bool
+	EnableVersionCollector  bool
+	EnableProcessCollector  bool
+	EnablePostgresCollector bool
+}
+
+// makeRegistry creates a new prometheus registry with default and percona exporters.
+func makeRegistry(ctx context.Context, dsns []string, connSema *semaphore.Weighted, globlalCollectors map[string]prometheus.Collector, filters Filters) *prometheus.Registry {
+	registry := prometheus.NewRegistry()
+
+	excludedDatabases := strings.Split(*excludeDatabases, ",")
+	logger.Log("msg", "Excluded databases", "databases", fmt.Sprintf("%v", excludedDatabases))
+
 	queriesPath := map[MetricResolution]string{
 		HR: *collectCustomQueryHrDirectory,
 		MR: *collectCustomQueryMrDirectory,
 		LR: *collectCustomQueryLrDirectory,
 	}
 
-	excludedDatabases := strings.Split(*excludeDatabases, ",")
 	opts := []ExporterOpt{
-		DisableDefaultMetrics(true),
-		DisableSettingsMetrics(true),
 		AutoDiscoverDatabases(*autoDiscoverDatabases),
-		WithUserQueriesPath(queriesPath),
+		WithConstantLabels(*constantLabelsList),
 		ExcludeDatabases(excludedDatabases),
+		WithConnectionsSemaphore(connSema),
+		WithContext(ctx),
 	}
-	hrExporter := NewExporter(dsn,
-		append(opts,
-			CollectorName("custom_query.hr"),
-			WithUserQueriesEnabled(HR),
-			WithEnabled(*collectCustomQueryHr),
-			WithConstantLabels(*constantLabelsList),
-		)...,
-	)
-	prometheus.MustRegister(hrExporter)
 
-	mrExporter := NewExporter(dsn,
-		append(opts,
-			CollectorName("custom_query.mr"),
-			WithUserQueriesEnabled(MR),
-			WithEnabled(*collectCustomQueryMr),
-			WithConstantLabels(*constantLabelsList),
-		)...,
-	)
-	prometheus.MustRegister(mrExporter)
+	if filters.EnableAllCollectors || filters.EnableDefaultCollector {
+		defaultExporter := NewExporter(dsns, append(
+			opts,
+			DisableDefaultMetrics(*disableDefaultMetrics),
+			DisableSettingsMetrics(*disableSettingsMetrics),
+			IncludeDatabases(*includeDatabases),
+		)...)
+		registry.MustRegister(defaultExporter)
+	}
 
-	lrExporter := NewExporter(dsn,
-		append(opts,
-			CollectorName("custom_query.lr"),
-			WithUserQueriesEnabled(LR),
-			WithEnabled(*collectCustomQueryLr),
-			WithConstantLabels(*constantLabelsList),
-		)...,
-	)
-	prometheus.MustRegister(lrExporter)
+	if filters.EnableAllCollectors || filters.EnableHRCollector {
+		hrExporter := NewExporter(dsns,
+			append(opts,
+				CollectorName("custom_query.hr"),
+				WithUserQueriesEnabled(HR),
+				WithEnabled(*collectCustomQueryHr),
+				DisableDefaultMetrics(true),
+				DisableSettingsMetrics(true),
+				WithUserQueriesPath(queriesPath),
+			)...)
+		registry.MustRegister(hrExporter)
 
-	return func() {
-		hrExporter.servers.Close()
-		mrExporter.servers.Close()
-		lrExporter.servers.Close()
-	}, hrExporter, mrExporter, lrExporter
+	}
+
+	if filters.EnableAllCollectors || filters.EnableMRCollector {
+		mrExporter := NewExporter(dsns,
+			append(opts,
+				CollectorName("custom_query.mr"),
+				WithUserQueriesEnabled(MR),
+				WithEnabled(*collectCustomQueryMr),
+				DisableDefaultMetrics(true),
+				DisableSettingsMetrics(true),
+				WithUserQueriesPath(queriesPath),
+			)...)
+		registry.MustRegister(mrExporter)
+	}
+
+	if filters.EnableAllCollectors || filters.EnableLRCollector {
+		lrExporter := NewExporter(dsns,
+			append(opts,
+				CollectorName("custom_query.lr"),
+				WithUserQueriesEnabled(LR),
+				WithEnabled(*collectCustomQueryLr),
+				DisableDefaultMetrics(true),
+				DisableSettingsMetrics(true),
+				WithUserQueriesPath(queriesPath),
+			)...)
+		registry.MustRegister(lrExporter)
+	}
+
+	if filters.EnableAllCollectors || filters.EnableGoCollector {
+		registry.MustRegister(globlalCollectors["standard.go"])
+	}
+
+	if filters.EnableAllCollectors || filters.EnableProcessCollector {
+		registry.MustRegister(globlalCollectors["standard.process"])
+	}
+
+	if filters.EnableAllCollectors || filters.EnableVersionCollector {
+		registry.MustRegister(globlalCollectors["version"])
+	}
+
+	if filters.EnableAllCollectors || filters.EnablePostgresCollector {
+		// This chunk moved here from main.go
+		// TODO(@sysadmind): Remove this with multi-target support. We are removing multiple DSN support
+		dsn := ""
+		if len(dsns) > 0 {
+			dsn = dsns[0]
+		}
+
+		pe, err := collector.NewPostgresCollector(
+			logger,
+			excludedDatabases,
+			dsn,
+			[]string{},
+			collector.WithContext(ctx),
+			collector.WithConnectionsSemaphore(connSema),
+		)
+		if err != nil {
+			level.Error(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error())
+		} else {
+			registry.MustRegister(pe)
+		}
+	}
+
+	return registry
 }
 
 func (e *Exporter) loadCustomQueries(res MetricResolution, version semver.Version, server *Server) {
diff --git a/cmd/postgres_exporter/postgres_exporter.go b/cmd/postgres_exporter/postgres_exporter.go
index ac0931bc..88f4b9d4 100644
--- a/cmd/postgres_exporter/postgres_exporter.go
+++ b/cmd/postgres_exporter/postgres_exporter.go
@@ -14,17 +14,21 @@
 package main
 
 import (
+	"context"
 	"database/sql"
 	"errors"
 	"fmt"
 	"math"
 	"regexp"
 	"strings"
+	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/blang/semver/v4"
 	"github.com/go-kit/log/level"
 	"github.com/prometheus/client_golang/prometheus"
+	"golang.org/x/sync/semaphore"
 )
 
 // ColumnUsage should be one of several enum values which describe how a
@@ -432,7 +436,10 @@ type Exporter struct {
 
 	// servers are used to allow re-using the DB connection between scrapes.
 	// servers contains metrics map and query overrides.
-	servers *Servers
+	// servers *Servers
+
+	connSema *semaphore.Weighted
+	ctx      context.Context
 }
 
 // ExporterOpt configures Exporter.
@@ -466,6 +473,20 @@ func WithEnabled(p bool) ExporterOpt {
 	}
 }
 
+// WithContext sets context for the exporter.
+func WithContext(ctx context.Context) ExporterOpt {
+	return func(e *Exporter) {
+		e.ctx = ctx
+	}
+}
+
+// WithConnectionsSemaphore sets the semaphore for limiting the number of connections to the database instance.
+func WithConnectionsSemaphore(sem *semaphore.Weighted) ExporterOpt {
+	return func(e *Exporter) {
+		e.connSema = sem
+	}
+}
+
 // DisableSettingsMetrics configures pg_settings export.
 func DisableSettingsMetrics(b bool) ExporterOpt {
 	return func(e *Exporter) {
@@ -547,6 +568,7 @@ func NewExporter(dsn []string, opts ...ExporterOpt) *Exporter {
 		dsn:               dsn,
 		builtinMetricMaps: builtinMetricMaps,
 		enabled:           true,
+		ctx:               context.Background(),
 	}
 
 	for _, opt := range opts {
@@ -554,11 +576,38 @@ func NewExporter(dsn []string, opts ...ExporterOpt) *Exporter {
 	}
 
 	e.setupInternalMetrics()
-	e.servers = NewServers(ServerWithLabels(e.constantLabels))
+	// e.servers = NewServers(ServerWithLabels(e.constantLabels))
 
 	return e
 }
 
+// GetServer returns a new Server instance for the provided DSN.
+func (e *Exporter) GetServer(dsn string, opts ...ServerOpt) (*Server, error) {
+	var err error
+	errCount := 0 // start at zero because we increment before doing work
+	retries := 1
+	var server *Server
+	for {
+		if errCount++; errCount > retries {
+			return nil, err
+		}
+
+		server, err = NewServer(dsn, opts...)
+		if err != nil {
+			time.Sleep(time.Duration(errCount) * time.Second)
+			continue
+		}
+
+		if err = server.Ping(); err != nil {
+			server.Close()
+			time.Sleep(time.Duration(errCount) * time.Second)
+			continue
+		}
+		break
+	}
+	return server, nil
+}
+
 func (e *Exporter) setupInternalMetrics() {
 	e.duration = prometheus.NewGauge(prometheus.GaugeOpts{
 		Namespace:   namespace,
@@ -697,29 +746,45 @@ func (e *Exporter) scrape(ch chan<- prometheus.Metric) {
 		dsns = e.discoverDatabaseDSNs()
 	}
 
-	var errorsCount int
-	var connectionErrorsCount int
+	var errorsCount atomic.Int32
+	var connectionErrorsCount atomic.Int32
+	var wg sync.WaitGroup
 
 	for _, dsn := range dsns {
-		if err := e.scrapeDSN(ch, dsn); err != nil {
-			errorsCount++
+		dsn := dsn
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
 
-			level.Error(logger).Log("err", err)
-
-			if _, ok := err.(*ErrorConnectToServer); ok {
-				connectionErrorsCount++
+			if e.connSema != nil {
+				if err := e.connSema.Acquire(e.ctx, 1); err != nil {
+					level.Warn(logger).Log("msg", "Failed to acquire semaphore", "err", err)
+					return
+				}
+				defer e.connSema.Release(1)
 			}
-		}
+			if err := e.scrapeDSN(ch, dsn); err != nil {
+				errorsCount.Add(1)
+
+				level.Error(logger).Log("err", err)
+
+				if _, ok := err.(*ErrorConnectToServer); ok {
+					connectionErrorsCount.Add(1)
+				}
+			}
+		}()
 	}
 
+	wg.Wait()
+
 	switch {
-	case connectionErrorsCount >= len(dsns):
+	case int(connectionErrorsCount.Load()) >= len(dsns):
 		e.psqlUp.Set(0)
 	default:
 		e.psqlUp.Set(1) // Didn't fail, can mark connection as up for this scrape.
 	}
 
-	switch errorsCount {
+	switch errorsCount.Load() {
 	case 0:
 		e.error.Set(0)
 	default:
diff --git a/cmd/postgres_exporter/probe.go b/cmd/postgres_exporter/probe.go
index a200ad2e..2d9c0814 100644
--- a/cmd/postgres_exporter/probe.go
+++ b/cmd/postgres_exporter/probe.go
@@ -23,9 +23,10 @@ import (
 	"github.com/prometheus-community/postgres_exporter/config"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/client_golang/prometheus/promhttp"
+	"golang.org/x/sync/semaphore"
 )
 
-func handleProbe(logger log.Logger, excludeDatabases []string) http.HandlerFunc {
+func handleProbe(logger log.Logger, excludeDatabases []string, connSema *semaphore.Weighted) http.HandlerFunc {
 	return func(w http.ResponseWriter, r *http.Request) {
 		ctx := r.Context()
 		conf := c.GetConfig()
@@ -69,21 +70,24 @@ func handleProbe(logger log.Logger, excludeDatabases []string) http.HandlerFunc
 			DisableDefaultMetrics(*disableDefaultMetrics),
 			DisableSettingsMetrics(*disableSettingsMetrics),
 			AutoDiscoverDatabases(*autoDiscoverDatabases),
-			//WithUserQueriesPath(*queriesPath),
+			// WithUserQueriesPath(*queriesPath),
 			WithConstantLabels(*constantLabelsList),
 			ExcludeDatabases(excludeDatabases),
 			IncludeDatabases(*includeDatabases),
+			WithContext(ctx),
+			WithConnectionsSemaphore(connSema),
 		}
 
 		dsns := []string{dsn.GetConnectionString()}
 		exporter := NewExporter(dsns, opts...)
-		defer func() {
-			exporter.servers.Close()
-		}()
+
+		// defer func() {
+		// 	exporter.servers.Close()
+		// }()
 		registry.MustRegister(exporter)
 
 		// Run the probe
-		pc, err := collector.NewProbeCollector(tl, excludeDatabases, registry, dsn)
+		pc, err := collector.NewProbeCollector(tl, excludeDatabases, registry, dsn, connSema)
 		if err != nil {
 			http.Error(w, err.Error(), http.StatusInternalServerError)
 			return
diff --git a/cmd/postgres_exporter/queries.go b/cmd/postgres_exporter/queries.go
index 8338c033..57a1b749 100644
--- a/cmd/postgres_exporter/queries.go
+++ b/cmd/postgres_exporter/queries.go
@@ -14,6 +14,7 @@
 package main
 
 import (
+	"context"
 	"errors"
 	"fmt"
 
@@ -278,8 +279,8 @@ func addQueries(content []byte, pgVersion semver.Version, server *Server) error
 	return nil
 }
 
-func queryDatabases(server *Server) ([]string, error) {
-	rows, err := server.db.Query("SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database() AND has_database_privilege(current_user, datname, 'connect')")
+func queryDatabases(ctx context.Context, server *Server) ([]string, error) {
+	rows, err := server.db.QueryContext(ctx, "SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database() AND has_database_privilege(current_user, datname, 'connect')")
 	if err != nil {
 		return nil, fmt.Errorf("Error retrieving databases: %v", err)
 	}
diff --git a/cmd/postgres_exporter/server.go b/cmd/postgres_exporter/server.go
index 1cd4051b..574ce9f6 100644
--- a/cmd/postgres_exporter/server.go
+++ b/cmd/postgres_exporter/server.go
@@ -174,6 +174,7 @@ func (s *Servers) GetServer(dsn string) (*Server, error) {
 			s.servers[dsn] = server
 		}
 		if err = server.Ping(); err != nil {
+			server.Close()
 			delete(s.servers, dsn)
 			time.Sleep(time.Duration(errCount) * time.Second)
 			continue
diff --git a/collector/collector.go b/collector/collector.go
index 12112987..8391a296 100644
--- a/collector/collector.go
+++ b/collector/collector.go
@@ -24,6 +24,7 @@ import (
 	"github.com/go-kit/log"
 	"github.com/go-kit/log/level"
 	"github.com/prometheus/client_golang/prometheus"
+	"golang.org/x/sync/semaphore"
 )
 
 var (
@@ -92,6 +93,9 @@ type PostgresCollector struct {
 	logger     log.Logger
 
 	instance *instance
+
+	connSema *semaphore.Weighted
+	ctx      context.Context
 }
 
 type Option func(*PostgresCollector) error
@@ -157,6 +161,22 @@ func NewPostgresCollector(logger log.Logger, excludeDatabases []string, dsn stri
 	return p, nil
 }
 
+// WithContext sets context for the collector.
+func WithContext(ctx context.Context) Option {
+	return func(c *PostgresCollector) error {
+		c.ctx = ctx
+		return nil
+	}
+}
+
+// WithConnectionsSemaphore sets the semaphore for limiting the number of connections to the database instance.
+func WithConnectionsSemaphore(sem *semaphore.Weighted) Option {
+	return func(c *PostgresCollector) error {
+		c.connSema = sem
+		return nil
+	}
+}
+
 // Describe implements the prometheus.Collector interface.
 func (p PostgresCollector) Describe(ch chan<- *prometheus.Desc) {
 	ch <- scrapeDurationDesc
@@ -170,6 +190,14 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) {
 	// copy the instance so that concurrent scrapes have independent instances
 	inst := p.instance.copy()
 
+	if p.connSema != nil {
+		if err := p.connSema.Acquire(p.ctx, 1); err != nil {
+			level.Warn(p.logger).Log("msg", "Failed to acquire semaphore", "err", err)
+			return
+		}
+		defer p.connSema.Release(1)
+	}
+
 	// Set up the database connection for the collector.
 	err := inst.setup()
 	if err != nil {
diff --git a/collector/probe.go b/collector/probe.go
index 4c0f0419..5746f725 100644
--- a/collector/probe.go
+++ b/collector/probe.go
@@ -21,6 +21,7 @@ import (
 	"github.com/go-kit/log/level"
 	"github.com/prometheus-community/postgres_exporter/config"
 	"github.com/prometheus/client_golang/prometheus"
+	"golang.org/x/sync/semaphore"
 )
 
 type ProbeCollector struct {
@@ -28,9 +29,10 @@ type ProbeCollector struct {
 	collectors map[string]Collector
 	logger     log.Logger
 	instance   *instance
+	connSema   *semaphore.Weighted
 }
 
-func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *prometheus.Registry, dsn config.DSN) (*ProbeCollector, error) {
+func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *prometheus.Registry, dsn config.DSN, connSema *semaphore.Weighted) (*ProbeCollector, error) {
 	collectors := make(map[string]Collector)
 	initiatedCollectorsMtx.Lock()
 	defer initiatedCollectorsMtx.Unlock()
@@ -68,6 +70,7 @@ func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *p
 		collectors: collectors,
 		logger:     logger,
 		instance:   instance,
+		connSema:   connSema,
 	}, nil
 }
 
@@ -75,6 +78,12 @@ func (pc *ProbeCollector) Describe(ch chan<- *prometheus.Desc) {
 }
 
 func (pc *ProbeCollector) Collect(ch chan<- prometheus.Metric) {
+	if err := pc.connSema.Acquire(context.TODO(), 1); err != nil {
+		level.Warn(pc.logger).Log("msg", "Failed to acquire semaphore", "err", err)
+		return
+	}
+	defer pc.connSema.Release(1)
+
 	// Set up the database connection for the collector.
 	err := pc.instance.setup()
 	if err != nil {