Fix exclude-databases for collector package

The pg_database collector was not respecting the --exclude-databases flag and causing problems where databases were not accessible. This now respects the list of databases to exclude.

- Adjusts the Collector create func to take a config struct instead of a logger. This allows more changes like this in the future. I figured we would need to do this at some point but I wasn't sure if we could hold off.
- Split the database size collection to a separate query when database is not excluded.
- Comment some probe code that was not useful/accurate

Signed-off-by: Joe Adams <github@joeadams.io>
This commit is contained in:
Joe Adams 2022-10-03 22:30:07 -04:00
parent 2197e73643
commit 799f3e15b2
7 changed files with 97 additions and 39 deletions

View File

@ -14,8 +14,10 @@
package main
import (
"fmt"
"net/http"
"os"
"strings"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
@ -101,13 +103,16 @@ func main() {
os.Exit(1)
}
excludedDatabases := strings.Split(*excludeDatabases, ",")
logger.Log("msg", "Excluded databases", "databases", fmt.Sprintf("%v", excludedDatabases))
opts := []ExporterOpt{
DisableDefaultMetrics(*disableDefaultMetrics),
DisableSettingsMetrics(*disableSettingsMetrics),
AutoDiscoverDatabases(*autoDiscoverDatabases),
WithUserQueriesPath(*queriesPath),
WithConstantLabels(*constantLabelsList),
ExcludeDatabases(*excludeDatabases),
ExcludeDatabases(excludedDatabases),
IncludeDatabases(*includeDatabases),
}
@ -128,6 +133,7 @@ func main() {
pe, err := collector.NewPostgresCollector(
logger,
excludedDatabases,
dsn,
[]string{},
)
@ -143,7 +149,7 @@ func main() {
w.Write(landingPage) // nolint: errcheck
})
http.HandleFunc("/probe", handleProbe(logger))
http.HandleFunc("/probe", handleProbe(logger, excludedDatabases))
srv := &http.Server{}
if err := web.ListenAndServe(srv, webConfig, logger); err != nil {

View File

@ -484,9 +484,9 @@ func AutoDiscoverDatabases(b bool) ExporterOpt {
}
// ExcludeDatabases allows to filter out result from AutoDiscoverDatabases
func ExcludeDatabases(s string) ExporterOpt {
func ExcludeDatabases(s []string) ExporterOpt {
return func(e *Exporter) {
e.excludeDatabases = strings.Split(s, ",")
e.excludeDatabases = s
}
}

View File

@ -16,7 +16,6 @@ package main
import (
"fmt"
"net/http"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
@ -26,7 +25,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func handleProbe(logger log.Logger) http.HandlerFunc {
func handleProbe(logger log.Logger, excludeDatabases []string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
conf := c.GetConfig()
@ -62,21 +61,21 @@ func handleProbe(logger log.Logger) http.HandlerFunc {
// TODO(@sysadmind): Timeout
probeSuccessGauge := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "probe_success",
Help: "Displays whether or not the probe was a success",
})
probeDurationGauge := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "probe_duration_seconds",
Help: "Returns how long the probe took to complete in seconds",
})
// probeSuccessGauge := prometheus.NewGauge(prometheus.GaugeOpts{
// Name: "probe_success",
// Help: "Displays whether or not the probe was a success",
// })
// probeDurationGauge := prometheus.NewGauge(prometheus.GaugeOpts{
// Name: "probe_duration_seconds",
// Help: "Returns how long the probe took to complete in seconds",
// })
tl := log.With(logger, "target", target)
start := time.Now()
// start := time.Now()
registry := prometheus.NewRegistry()
registry.MustRegister(probeSuccessGauge)
registry.MustRegister(probeDurationGauge)
// registry.MustRegister(probeSuccessGauge)
// registry.MustRegister(probeDurationGauge)
opts := []ExporterOpt{
DisableDefaultMetrics(*disableDefaultMetrics),
@ -96,10 +95,10 @@ func handleProbe(logger log.Logger) http.HandlerFunc {
registry.MustRegister(exporter)
// Run the probe
pc, err := collector.NewProbeCollector(tl, registry, dsn)
pc, err := collector.NewProbeCollector(tl, excludeDatabases, registry, dsn)
if err != nil {
probeSuccessGauge.Set(0)
probeDurationGauge.Set(time.Since(start).Seconds())
// probeSuccessGauge.Set(0)
// probeDurationGauge.Set(time.Since(start).Seconds())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@ -115,10 +114,6 @@ func handleProbe(logger log.Logger) http.HandlerFunc {
registry.MustRegister(pc)
duration := time.Since(start).Seconds()
probeDurationGauge.Set(duration)
probeSuccessGauge.Set(1)
// TODO check success, etc
h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{})
h.ServeHTTP(w, r)

View File

@ -28,7 +28,7 @@ import (
)
var (
factories = make(map[string]func(logger log.Logger) (Collector, error))
factories = make(map[string]func(collectorConfig) (Collector, error))
initiatedCollectorsMtx = sync.Mutex{}
initiatedCollectors = make(map[string]Collector)
collectorState = make(map[string]*bool)
@ -62,7 +62,12 @@ type Collector interface {
Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error
}
func registerCollector(name string, isDefaultEnabled bool, createFunc func(logger log.Logger) (Collector, error)) {
type collectorConfig struct {
logger log.Logger
excludeDatabases []string
}
func registerCollector(name string, isDefaultEnabled bool, createFunc func(collectorConfig) (Collector, error)) {
var helpDefaultState string
if isDefaultEnabled {
helpDefaultState = "enabled"
@ -93,7 +98,7 @@ type PostgresCollector struct {
type Option func(*PostgresCollector) error
// NewPostgresCollector creates a new PostgresCollector.
func NewPostgresCollector(logger log.Logger, dsn string, filters []string, options ...Option) (*PostgresCollector, error) {
func NewPostgresCollector(logger log.Logger, excludeDatabases []string, dsn string, filters []string, options ...Option) (*PostgresCollector, error) {
p := &PostgresCollector{
logger: logger,
}
@ -126,7 +131,10 @@ func NewPostgresCollector(logger log.Logger, dsn string, filters []string, optio
if collector, ok := initiatedCollectors[key]; ok {
collectors[key] = collector
} else {
collector, err := factories[key](log.With(logger, "collector", key))
collector, err := factories[key](collectorConfig{
logger: log.With(logger, "collector", key),
excludeDatabases: excludeDatabases,
})
if err != nil {
return nil, err
}

View File

@ -26,11 +26,19 @@ func init() {
}
type PGDatabaseCollector struct {
log log.Logger
log log.Logger
excludedDatabases []string
}
func NewPGDatabaseCollector(logger log.Logger) (Collector, error) {
return &PGDatabaseCollector{log: logger}, nil
func NewPGDatabaseCollector(config collectorConfig) (Collector, error) {
exclude := config.excludeDatabases
if exclude == nil {
exclude = []string{}
}
return &PGDatabaseCollector{
log: config.logger,
excludedDatabases: exclude,
}, nil
}
var pgDatabase = map[string]*prometheus.Desc{
@ -41,20 +49,49 @@ var pgDatabase = map[string]*prometheus.Desc{
),
}
func (PGDatabaseCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error {
// Update implements Collector and exposes database size.
// It is called by the Prometheus registry when collecting metrics.
// The list of databases is retrieved from pg_database and filtered
// by the excludeDatabase config parameter. The tradeoff here is that
// we have to query the list of databases and then query the size of
// each database individually. This is because we can't filter the
// list of databases in the query because the list of excluded
// databases is dynamic.
func (c PGDatabaseCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error {
// Query the list of databases
rows, err := db.QueryContext(ctx,
`SELECT pg_database.datname
,pg_database_size(pg_database.datname)
FROM pg_database;`)
FROM pg_database;
`,
)
if err != nil {
return err
}
defer rows.Close()
var databases []string
for rows.Next() {
var datname string
if err := rows.Scan(&datname); err != nil {
return err
}
// Ignore excluded databases
// Filtering is done here instead of in the query to avoid
// a complicated NOT IN query with a variable number of parameters
if sliceContains(c.excludedDatabases, datname) {
continue
}
databases = append(databases, datname)
}
// Query the size of the databases
for _, datname := range databases {
var size int64
if err := rows.Scan(&datname, &size); err != nil {
err = db.QueryRowContext(ctx, "SELECT pg_database_size($1)", datname).Scan(&size)
if err != nil {
return err
}
@ -68,3 +105,12 @@ func (PGDatabaseCollector) Update(ctx context.Context, db *sql.DB, ch chan<- pro
}
return nil
}
func sliceContains(slice []string, s string) bool {
for _, item := range slice {
if item == s {
return true
}
}
return false
}

View File

@ -18,7 +18,6 @@ import (
"database/sql"
"time"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)
@ -29,7 +28,7 @@ func init() {
type PGStatBGWriterCollector struct {
}
func NewPGStatBGWriterCollector(logger log.Logger) (Collector, error) {
func NewPGStatBGWriterCollector(collectorConfig) (Collector, error) {
return &PGStatBGWriterCollector{}, nil
}

View File

@ -30,7 +30,7 @@ type ProbeCollector struct {
db *sql.DB
}
func NewProbeCollector(logger log.Logger, registry *prometheus.Registry, dsn config.DSN) (*ProbeCollector, error) {
func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *prometheus.Registry, dsn config.DSN) (*ProbeCollector, error) {
collectors := make(map[string]Collector)
initiatedCollectorsMtx.Lock()
defer initiatedCollectorsMtx.Unlock()
@ -45,7 +45,11 @@ func NewProbeCollector(logger log.Logger, registry *prometheus.Registry, dsn con
if collector, ok := initiatedCollectors[key]; ok {
collectors[key] = collector
} else {
collector, err := factories[key](log.With(logger, "collector", key))
collector, err := factories[key](
collectorConfig{
logger: log.With(logger, "collector", key),
excludeDatabases: excludeDatabases,
})
if err != nil {
return nil, err
}