Revert "PMM-12154 performance improvement."

This reverts commit 3e125ad52b.
This commit is contained in:
Artem Gavrilov 2024-02-14 16:03:49 +02:00
parent 0b10d1aa11
commit 4328194f15
6 changed files with 57 additions and 78 deletions

View File

@ -49,17 +49,19 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
continue continue
} }
server, err := e.servers.GetServer(dsn, e.resolutionEnabled) server, err := e.servers.GetServer(dsn)
if err != nil { if err != nil {
level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err) level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err)
continue continue
} }
server.dbMtx.Lock()
dsns[dsn] = struct{}{} dsns[dsn] = struct{}{}
// If autoDiscoverDatabases is true, set first dsn as master database (Default: false) // If autoDiscoverDatabases is true, set first dsn as master database (Default: false)
server.master = true server.master = true
databaseNames, err := queryDatabases(server) databaseNames, err := queryDatabases(server)
server.dbMtx.Unlock()
if err != nil { if err != nil {
level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err) level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err)
continue continue
@ -100,7 +102,11 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
} }
func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error { func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error {
server, err := e.servers.GetServer(dsn, e.resolutionEnabled) server, err := e.servers.GetServer(dsn)
server.dbMtx.Lock()
defer server.dbMtx.Unlock()
level.Debug(logger).Log("msg", "scrapeDSN:"+dsn)
if err != nil { if err != nil {
return &ErrorConnectToServer{fmt.Sprintf("Error opening connection to database (%s): %s", loggableDSN(dsn), err.Error())} return &ErrorConnectToServer{fmt.Sprintf("Error opening connection to database (%s): %s", loggableDSN(dsn), err.Error())}
@ -116,7 +122,7 @@ func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error {
level.Warn(logger).Log("msg", "Proceeding with outdated query maps, as the Postgres version could not be determined", "err", err) level.Warn(logger).Log("msg", "Proceeding with outdated query maps, as the Postgres version could not be determined", "err", err)
} }
return server.Scrape(ch, e.disableSettingsMetrics) return server.Scrape(ch, e.disableSettingsMetrics, e.resolutionEnabled)
} }
// try to get the DataSource // try to get the DataSource

View File

@ -183,7 +183,7 @@ func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNa
// Iterate through all the namespace mappings in the exporter and run their // Iterate through all the namespace mappings in the exporter and run their
// queries. // queries.
func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[string]error { func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server, res MetricResolution) map[string]error {
// Return a map of namespace -> errors // Return a map of namespace -> errors
namespaceErrors := make(map[string]error) namespaceErrors := make(map[string]error)

View File

@ -2,7 +2,6 @@ package main
import ( import (
"crypto/sha256" "crypto/sha256"
"database/sql"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
@ -51,7 +50,7 @@ func initializePerconaExporters(dsn []string, servers *Servers) (func(), *Export
hrExporter := NewExporter(dsn, hrExporter := NewExporter(dsn,
append(opts, append(opts,
CollectorName("custom_query.hr"), CollectorName("custom_query.hr"),
WithUserQueriesResolutionEnabled(HR), WithUserQueriesEnabled(HR),
WithEnabled(*collectCustomQueryHr), WithEnabled(*collectCustomQueryHr),
WithConstantLabels(*constantLabelsList), WithConstantLabels(*constantLabelsList),
)..., )...,
@ -61,7 +60,7 @@ func initializePerconaExporters(dsn []string, servers *Servers) (func(), *Export
mrExporter := NewExporter(dsn, mrExporter := NewExporter(dsn,
append(opts, append(opts,
CollectorName("custom_query.mr"), CollectorName("custom_query.mr"),
WithUserQueriesResolutionEnabled(MR), WithUserQueriesEnabled(MR),
WithEnabled(*collectCustomQueryMr), WithEnabled(*collectCustomQueryMr),
WithConstantLabels(*constantLabelsList), WithConstantLabels(*constantLabelsList),
)..., )...,
@ -71,7 +70,7 @@ func initializePerconaExporters(dsn []string, servers *Servers) (func(), *Export
lrExporter := NewExporter(dsn, lrExporter := NewExporter(dsn,
append(opts, append(opts,
CollectorName("custom_query.lr"), CollectorName("custom_query.lr"),
WithUserQueriesResolutionEnabled(LR), WithUserQueriesEnabled(LR),
WithEnabled(*collectCustomQueryLr), WithEnabled(*collectCustomQueryLr),
WithConstantLabels(*constantLabelsList), WithConstantLabels(*constantLabelsList),
)..., )...,
@ -128,21 +127,3 @@ func (e *Exporter) addCustomQueriesFromFile(path string, version semver.Version,
// Mark user queries as successfully loaded // Mark user queries as successfully loaded
e.userQueriesError.WithLabelValues(path, hashsumStr).Set(0) e.userQueriesError.WithLabelValues(path, hashsumStr).Set(0)
} }
// NewDB establishes a new connection using DSN.
func NewDB(dsn string) (*sql.DB, error) {
fingerprint, err := parseFingerprint(dsn)
if err != nil {
return nil, err
}
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
level.Info(logger).Log("msg", "Established new database connection", "fingerprint", fingerprint)
return db, nil
}

View File

@ -452,14 +452,14 @@ func CollectorName(name string) ExporterOpt {
} }
} }
// WithUserQueriesResolutionEnabled enables resolution for user's queries. // WithUserQueriesEnabled enables user's queries.
func WithUserQueriesResolutionEnabled(p MetricResolution) ExporterOpt { func WithUserQueriesEnabled(p MetricResolution) ExporterOpt {
return func(e *Exporter) { return func(e *Exporter) {
e.resolutionEnabled = p e.resolutionEnabled = p
} }
} }
// WithEnabled enables user's queries. // WithUserQueriesEnabled enables user's queries.
func WithEnabled(p bool) ExporterOpt { func WithEnabled(p bool) ExporterOpt {
return func(e *Exporter) { return func(e *Exporter) {
e.enabled = p e.enabled = p
@ -655,31 +655,31 @@ func (e *Exporter) checkMapVersions(ch chan<- prometheus.Metric, server *Server)
} }
// Check if semantic version changed and recalculate maps if needed. // Check if semantic version changed and recalculate maps if needed.
if semanticVersion.NE(server.lastMapVersion) || server.metricMap == nil { //if semanticVersion.NE(server.lastMapVersion[e.resolutionEnabled]) || server.metricMap == nil {
level.Info(logger).Log("msg", "Semantic version changed", "server", server, "from", server.lastMapVersion, "to", semanticVersion) // level.Info(logger).Log("msg", "Semantic version changed", "server", server, "from", server.lastMapVersion[e.resolutionEnabled], "to", semanticVersion)
server.mappingMtx.Lock() server.mappingMtx.Lock()
// Get Default Metrics only for master database // Get Default Metrics only for master database
if !e.disableDefaultMetrics && server.master { if !e.disableDefaultMetrics && server.master {
server.metricMap = makeDescMap(semanticVersion, server.labels, e.builtinMetricMaps) server.metricMap = makeDescMap(semanticVersion, server.labels, e.builtinMetricMaps)
server.queryOverrides = makeQueryOverrideMap(semanticVersion, queryOverrides) server.queryOverrides = makeQueryOverrideMap(semanticVersion, queryOverrides)
} else { } else {
server.metricMap = make(map[string]MetricMapNamespace) server.metricMap = make(map[string]MetricMapNamespace)
server.queryOverrides = make(map[string]string) server.queryOverrides = make(map[string]string)
}
server.lastMapVersion = semanticVersion
if e.userQueriesPath[e.resolutionEnabled] != "" {
// Clear the metric while reload
e.userQueriesError.Reset()
}
e.loadCustomQueries(e.resolutionEnabled, semanticVersion, server)
server.mappingMtx.Unlock()
} }
//server.lastMapVersion[e.resolutionEnabled] = semanticVersion
if e.userQueriesPath[HR] != "" || e.userQueriesPath[MR] != "" || e.userQueriesPath[LR] != "" {
// Clear the metric while reload
e.userQueriesError.Reset()
}
e.loadCustomQueries(e.resolutionEnabled, semanticVersion, server)
server.mappingMtx.Unlock()
//}
// Output the version as a special metric only for master database // Output the version as a special metric only for master database
versionDesc := prometheus.NewDesc(fmt.Sprintf("%s_%s", namespace, staticLabelName), versionDesc := prometheus.NewDesc(fmt.Sprintf("%s_%s", namespace, staticLabelName),
"Version string as reported by postgres", []string{"version", "short_version"}, server.labels) "Version string as reported by postgres", []string{"version", "short_version"}, server.labels)

View File

@ -62,11 +62,7 @@ func (s *IntegrationSuite) TestAllNamespacesReturnResults(c *C) {
for _, dsn := range s.e.dsn { for _, dsn := range s.e.dsn {
// Open a database connection // Open a database connection
db, err := NewDB(dsn) server, err := NewServer(dsn)
c.Assert(db, NotNil)
c.Assert(err, IsNil)
server, err := NewServer(dsn, db)
c.Assert(server, NotNil) c.Assert(server, NotNil)
c.Assert(err, IsNil) c.Assert(err, IsNil)

View File

@ -28,6 +28,7 @@ import (
// Also it contains metrics map and query overrides. // Also it contains metrics map and query overrides.
type Server struct { type Server struct {
db *sql.DB db *sql.DB
dbMtx sync.Mutex
labels prometheus.Labels labels prometheus.Labels
master bool master bool
runonserver string runonserver string
@ -59,12 +60,21 @@ func ServerWithLabels(labels prometheus.Labels) ServerOpt {
} }
// NewServer establishes a new connection using DSN. // NewServer establishes a new connection using DSN.
func NewServer(dsn string, db *sql.DB, opts ...ServerOpt) (*Server, error) { func NewServer(dsn string, opts ...ServerOpt) (*Server, error) {
fingerprint, err := parseFingerprint(dsn) fingerprint, err := parseFingerprint(dsn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
level.Info(logger).Log("msg", "Established new database connection", "fingerprint", fingerprint)
s := &Server{ s := &Server{
db: db, db: db,
master: false, master: false,
@ -103,7 +113,7 @@ func (s *Server) String() string {
} }
// Scrape loads metrics. // Scrape loads metrics.
func (s *Server) Scrape(ch chan<- prometheus.Metric, disableSettingsMetrics bool) error { func (s *Server) Scrape(ch chan<- prometheus.Metric, disableSettingsMetrics bool, res MetricResolution) error {
s.mappingMtx.RLock() s.mappingMtx.RLock()
defer s.mappingMtx.RUnlock() defer s.mappingMtx.RUnlock()
@ -115,7 +125,7 @@ func (s *Server) Scrape(ch chan<- prometheus.Metric, disableSettingsMetrics bool
} }
} }
errMap := queryNamespaceMappings(ch, s) errMap := queryNamespaceMappings(ch, s, res)
if len(errMap) > 0 { if len(errMap) > 0 {
err = fmt.Errorf("queryNamespaceMappings returned %d errors", len(errMap)) err = fmt.Errorf("queryNamespaceMappings returned %d errors", len(errMap))
level.Error(logger).Log("msg", "NAMESPACE ERRORS FOUND") level.Error(logger).Log("msg", "NAMESPACE ERRORS FOUND")
@ -131,7 +141,6 @@ func (s *Server) Scrape(ch chan<- prometheus.Metric, disableSettingsMetrics bool
type Servers struct { type Servers struct {
m sync.Mutex m sync.Mutex
servers map[string]*Server servers map[string]*Server
dbs map[string]*sql.DB
opts []ServerOpt opts []ServerOpt
} }
@ -139,47 +148,34 @@ type Servers struct {
func NewServers(opts ...ServerOpt) *Servers { func NewServers(opts ...ServerOpt) *Servers {
return &Servers{ return &Servers{
servers: make(map[string]*Server), servers: make(map[string]*Server),
dbs: make(map[string]*sql.DB),
opts: opts, opts: opts,
} }
} }
// GetServer returns established connection from a collection. // GetServer returns established connection from a collection.
func (s *Servers) GetServer(dsn string, res MetricResolution) (*Server, error) { func (s *Servers) GetServer(dsn string) (*Server, error) {
s.m.Lock() s.m.Lock()
defer s.m.Unlock() defer s.m.Unlock()
var err error var err error
var ok bool var ok bool
errCount := 0 // start at zero because we increment before doing work errCount := 0 // start at zero because we increment before doing work
retries := 1 retries := 1
var db *sql.DB
var server *Server var server *Server
for { for {
if errCount++; errCount > retries { if errCount++; errCount > retries {
return nil, err return nil, err
} }
db, ok = s.dbs[dsn] server, ok = s.servers[dsn]
if !ok { if !ok {
db, err = NewDB(dsn) server, err = NewServer(dsn, s.opts...)
if err != nil { if err != nil {
time.Sleep(time.Duration(errCount) * time.Second) time.Sleep(time.Duration(errCount) * time.Second)
continue continue
} }
s.dbs[dsn] = db s.servers[dsn] = server
}
key := dsn + ":" + string(res)
server, ok = s.servers[key]
if !ok {
server, err = NewServer(dsn, db, s.opts...)
if err != nil {
time.Sleep(time.Duration(errCount) * time.Second)
continue
}
s.servers[key] = server
} }
if err = server.Ping(); err != nil { if err = server.Ping(); err != nil {
delete(s.servers, key) delete(s.servers, dsn)
delete(s.dbs, dsn)
time.Sleep(time.Duration(errCount) * time.Second) time.Sleep(time.Duration(errCount) * time.Second)
continue continue
} }