WIP: Add prelim multi-target support

- Remove multi server support from new collector package
- Add http handler for multi-target support

Signed-off-by: Joe Adams <github@joeadams.io>
This commit is contained in:
Joe Adams 2022-02-25 11:45:15 -05:00
parent 58cc383e8c
commit 713461df98
8 changed files with 239 additions and 171 deletions

View File

@ -162,6 +162,12 @@ func getDataSources() ([]string, error) {
uri = os.Getenv("DATA_SOURCE_URI")
}
// No datasources found. This allows us to support the multi-target pattern
// withouth an explicit datasource.
if uri == "" {
return []string{}, nil
}
dsn = "postgresql://" + ui + "@" + uri
return []string{dsn}, nil

View File

@ -85,16 +85,17 @@ func main() {
return
}
dsn, err := getDataSources()
dsns, err := getDataSources()
if err != nil {
level.Error(logger).Log("msg", "Failed reading data sources", "err", err.Error())
os.Exit(1)
}
if len(dsn) == 0 {
level.Error(logger).Log("msg", "Couldn't find environment variables describing the datasource to use")
os.Exit(1)
}
// TODO(@sysadmind): Remove this with multi-target support
// if len(dsn) == 0 {
// level.Error(logger).Log("msg", "Couldn't find environment variables describing the datasource to use")
// os.Exit(1)
// }
opts := []ExporterOpt{
DisableDefaultMetrics(*disableDefaultMetrics),
@ -106,7 +107,7 @@ func main() {
IncludeDatabases(*includeDatabases),
}
exporter := NewExporter(dsn, opts...)
exporter := NewExporter(dsns, opts...)
defer func() {
exporter.servers.Close()
}()
@ -115,6 +116,12 @@ func main() {
prometheus.MustRegister(exporter)
// TODO(@sysadmind): Remove this with multi-target support. We are removing multiple DSN support
dsn := ""
if len(dsns) > 0 {
dsn = dsns[0]
}
pe, err := collector.NewPostgresCollector(
logger,
dsn,
@ -122,9 +129,9 @@ func main() {
)
if err != nil {
level.Error(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error())
os.Exit(1)
} else {
prometheus.MustRegister(pe)
}
prometheus.MustRegister(pe)
http.Handle(*metricPath, promhttp.Handler())
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
@ -132,6 +139,8 @@ func main() {
w.Write(landingPage) // nolint: errcheck
})
http.HandleFunc("/probe", handleProbe(logger))
level.Info(logger).Log("msg", "Listening on address", "address", *listenAddress)
srv := &http.Server{Addr: *listenAddress}
if err := web.ListenAndServe(srv, *webConfig, logger); err != nil {

View File

@ -0,0 +1,91 @@
// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"net/http"
"time"
"github.com/go-kit/log"
"github.com/prometheus-community/postgres_exporter/collector"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func handleProbe(logger log.Logger) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
params := r.URL.Query()
target := params.Get("target")
if target == "" {
http.Error(w, "target is required", http.StatusBadRequest)
return
}
// TODO: Timeout
// TODO: Auth Module
probeSuccessGauge := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "probe_success",
Help: "Displays whether or not the probe was a success",
})
probeDurationGauge := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "probe_duration_seconds",
Help: "Returns how long the probe took to complete in seconds",
})
tl := log.With(logger, "target", target)
_ = tl
start := time.Now()
registry := prometheus.NewRegistry()
registry.MustRegister(probeSuccessGauge)
registry.MustRegister(probeDurationGauge)
// TODO(@sysadmind): this is a temp hack until we have a proper auth module
target = "postgres://postgres:test@localhost:5432/circle_test?sslmode=disable"
// Run the probe
pc, err := collector.NewProbeCollector(tl, registry, target)
if err != nil {
probeSuccessGauge.Set(0)
probeDurationGauge.Set(time.Since(start).Seconds())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_ = ctx
// TODO: Which way should this be? Register or handle the collection manually?
// Also, what about the context?
// Option 1: Register the collector
registry.MustRegister(pc)
// Option 2: Handle the collection manually. This allows us to collect duration metrics.
// The collectors themselves already support their own duration metrics.
// err = pc.Update(ctx)
// if err != nil {
// probeSuccessGauge.Set(0)
// } else {
// probeSuccessGauge.Set(1)
// }
duration := time.Since(start).Seconds()
probeDurationGauge.Set(duration)
// TODO check success, etc
h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{})
h.ServeHTTP(w, r)
}
}

View File

@ -15,6 +15,7 @@ package collector
import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
@ -58,7 +59,7 @@ var (
)
type Collector interface {
Update(ctx context.Context, server *server, ch chan<- prometheus.Metric) error
Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error
}
func registerCollector(name string, isDefaultEnabled bool, createFunc func(logger log.Logger) (Collector, error)) {
@ -86,13 +87,13 @@ type PostgresCollector struct {
Collectors map[string]Collector
logger log.Logger
servers map[string]*server
db *sql.DB
}
type Option func(*PostgresCollector) error
// NewPostgresCollector creates a new PostgresCollector.
func NewPostgresCollector(logger log.Logger, dsns []string, filters []string, options ...Option) (*PostgresCollector, error) {
func NewPostgresCollector(logger log.Logger, dsn string, filters []string, options ...Option) (*PostgresCollector, error) {
p := &PostgresCollector{
logger: logger,
}
@ -136,17 +137,18 @@ func NewPostgresCollector(logger log.Logger, dsns []string, filters []string, op
p.Collectors = collectors
servers := make(map[string]*server)
for _, dsn := range dsns {
s, err := makeServer(dsn)
if err != nil {
return nil, err
}
servers[dsn] = s
if dsn == "" {
return nil, errors.New("empty dsn")
}
p.servers = servers
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
p.db = db
return p, nil
}
@ -160,32 +162,20 @@ func (p PostgresCollector) Describe(ch chan<- *prometheus.Desc) {
// Collect implements the prometheus.Collector interface.
func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) {
ctx := context.TODO()
wg := sync.WaitGroup{}
wg.Add(len(p.servers))
for _, s := range p.servers {
go func(s *server) {
p.subCollect(ctx, s, ch)
wg.Done()
}(s)
}
wg.Wait()
}
func (p PostgresCollector) subCollect(ctx context.Context, server *server, ch chan<- prometheus.Metric) {
wg := sync.WaitGroup{}
wg.Add(len(p.Collectors))
for name, c := range p.Collectors {
go func(name string, c Collector) {
execute(ctx, name, c, server, ch, p.logger)
execute(ctx, name, c, p.db, ch, p.logger)
wg.Done()
}(name, c)
}
wg.Wait()
}
func execute(ctx context.Context, name string, c Collector, s *server, ch chan<- prometheus.Metric, logger log.Logger) {
func execute(ctx context.Context, name string, c Collector, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) {
begin := time.Now()
err := c.Update(ctx, s, ch)
err := c.Update(ctx, db, ch)
duration := time.Since(begin)
var success float64

View File

@ -15,6 +15,7 @@ package collector
import (
"context"
"database/sql"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
@ -36,15 +37,11 @@ var pgDatabase = map[string]*prometheus.Desc{
"size_bytes": prometheus.NewDesc(
"pg_database_size_bytes",
"Disk space used by the database",
[]string{"datname", "server"}, nil,
[]string{"datname"}, nil,
),
}
func (PGDatabaseCollector) Update(ctx context.Context, server *server, ch chan<- prometheus.Metric) error {
db, err := server.GetDB()
if err != nil {
return err
}
func (PGDatabaseCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error {
rows, err := db.QueryContext(ctx,
`SELECT pg_database.datname
,pg_database_size(pg_database.datname)
@ -63,7 +60,7 @@ func (PGDatabaseCollector) Update(ctx context.Context, server *server, ch chan<-
ch <- prometheus.MustNewConstMetric(
pgDatabase["size_bytes"],
prometheus.GaugeValue, float64(size), datname, server.GetName(),
prometheus.GaugeValue, float64(size), datname,
)
}
if err := rows.Err(); err != nil {

View File

@ -15,6 +15,7 @@ package collector
import (
"context"
"database/sql"
"time"
"github.com/go-kit/log"
@ -38,77 +39,72 @@ var statBGWriter = map[string]*prometheus.Desc{
"checkpoints_timed": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "checkpoints_timed_total"),
"Number of scheduled checkpoints that have been performed",
[]string{"server"},
[]string{},
prometheus.Labels{},
),
"checkpoints_req": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "checkpoints_req_total"),
"Number of requested checkpoints that have been performed",
[]string{"server"},
[]string{},
prometheus.Labels{},
),
"checkpoint_write_time": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "checkpoint_write_time_total"),
"Total amount of time that has been spent in the portion of checkpoint processing where files are written to disk, in milliseconds",
[]string{"server"},
[]string{},
prometheus.Labels{},
),
"checkpoint_sync_time": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "checkpoint_sync_time_total"),
"Total amount of time that has been spent in the portion of checkpoint processing where files are synchronized to disk, in milliseconds",
[]string{"server"},
[]string{},
prometheus.Labels{},
),
"buffers_checkpoint": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "buffers_checkpoint_total"),
"Number of buffers written during checkpoints",
[]string{"server"},
[]string{},
prometheus.Labels{},
),
"buffers_clean": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "buffers_clean_total"),
"Number of buffers written by the background writer",
[]string{"server"},
[]string{},
prometheus.Labels{},
),
"maxwritten_clean": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "maxwritten_clean_total"),
"Number of times the background writer stopped a cleaning scan because it had written too many buffers",
[]string{"server"},
[]string{},
prometheus.Labels{},
),
"buffers_backend": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "buffers_backend_total"),
"Number of buffers written directly by a backend",
[]string{"server"},
[]string{},
prometheus.Labels{},
),
"buffers_backend_fsync": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "buffers_backend_fsync_total"),
"Number of times a backend had to execute its own fsync call (normally the background writer handles those even when the backend does its own write)",
[]string{"server"},
[]string{},
prometheus.Labels{},
),
"buffers_alloc": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "buffers_alloc_total"),
"Number of buffers allocated",
[]string{"server"},
[]string{},
prometheus.Labels{},
),
"stats_reset": prometheus.NewDesc(
prometheus.BuildFQName(namespace, bgWriterSubsystem, "stats_reset_total"),
"Time at which these statistics were last reset",
[]string{"server"},
[]string{},
prometheus.Labels{},
),
}
func (PGStatBGWriterCollector) Update(ctx context.Context, server *server, ch chan<- prometheus.Metric) error {
db, err := server.GetDB()
if err != nil {
return err
}
func (PGStatBGWriterCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error {
row := db.QueryRowContext(ctx,
`SELECT
checkpoints_timed
@ -136,7 +132,7 @@ func (PGStatBGWriterCollector) Update(ctx context.Context, server *server, ch ch
var ba int
var sr time.Time
err = row.Scan(&cpt, &cpr, &cpwt, &cpst, &bcp, &bc, &mwc, &bb, &bbf, &ba, &sr)
err := row.Scan(&cpt, &cpr, &cpwt, &cpst, &bcp, &bc, &mwc, &bb, &bbf, &ba, &sr)
if err != nil {
return err
}
@ -145,67 +141,56 @@ func (PGStatBGWriterCollector) Update(ctx context.Context, server *server, ch ch
statBGWriter["checkpoints_timed"],
prometheus.CounterValue,
float64(cpt),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["checkpoints_req"],
prometheus.CounterValue,
float64(cpr),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["checkpoint_write_time"],
prometheus.CounterValue,
float64(cpwt),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["checkpoint_sync_time"],
prometheus.CounterValue,
float64(cpst),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["buffers_checkpoint"],
prometheus.CounterValue,
float64(bcp),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["buffers_clean"],
prometheus.CounterValue,
float64(bc),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["maxwritten_clean"],
prometheus.CounterValue,
float64(mwc),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["buffers_backend"],
prometheus.CounterValue,
float64(bb),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["buffers_backend_fsync"],
prometheus.CounterValue,
float64(bbf),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["buffers_alloc"],
prometheus.CounterValue,
float64(ba),
server.GetName(),
)
ch <- prometheus.MustNewConstMetric(
statBGWriter["stats_reset"],
prometheus.CounterValue,
float64(sr.Unix()),
server.GetName(),
)
return nil

90
collector/probe.go Normal file
View File

@ -0,0 +1,90 @@
// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"context"
"database/sql"
"fmt"
"strings"
"sync"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)
type ProbeCollector struct {
registry *prometheus.Registry
collectors map[string]Collector
logger log.Logger
db *sql.DB
}
func NewProbeCollector(logger log.Logger, registry *prometheus.Registry, dsn string) (*ProbeCollector, error) {
collectors := make(map[string]Collector)
initiatedCollectorsMtx.Lock()
defer initiatedCollectorsMtx.Unlock()
for key, enabled := range collectorState {
// TODO: Handle filters
// if !*enabled || (len(f) > 0 && !f[key]) {
// continue
// }
if !*enabled {
continue
}
if collector, ok := initiatedCollectors[key]; ok {
collectors[key] = collector
} else {
collector, err := factories[key](log.With(logger, "collector", key))
if err != nil {
return nil, err
}
collectors[key] = collector
initiatedCollectors[key] = collector
}
}
if !strings.HasPrefix(dsn, "postgres://") {
dsn = fmt.Sprintf("postgres://%s", dsn)
}
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
return &ProbeCollector{
registry: registry,
collectors: collectors,
logger: logger,
db: db,
}, nil
}
func (pc *ProbeCollector) Describe(ch chan<- *prometheus.Desc) {
}
func (pc *ProbeCollector) Collect(ch chan<- prometheus.Metric) {
wg := sync.WaitGroup{}
wg.Add(len(pc.collectors))
for name, c := range pc.collectors {
go func(name string, c Collector) {
execute(context.TODO(), name, c, pc.db, ch, pc.logger)
wg.Done()
}(name, c)
}
wg.Wait()
}

View File

@ -1,100 +0,0 @@
// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"database/sql"
"fmt"
"strings"
"github.com/lib/pq"
)
type server struct {
dsn string
name string
db *sql.DB
}
func makeServer(dsn string) (*server, error) {
name, err := parseServerName(dsn)
if err != nil {
return nil, err
}
return &server{
dsn: dsn,
name: name,
}, nil
}
func (s *server) GetDB() (*sql.DB, error) {
if s.db != nil {
return s.db, nil
}
db, err := sql.Open("postgres", s.dsn)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
s.db = db
return s.db, nil
}
func (s *server) GetName() string {
return s.name
}
func (s *server) String() string {
return s.name
}
func parseServerName(url string) (string, error) {
dsn, err := pq.ParseURL(url)
if err != nil {
dsn = url
}
pairs := strings.Split(dsn, " ")
kv := make(map[string]string, len(pairs))
for _, pair := range pairs {
splitted := strings.SplitN(pair, "=", 2)
if len(splitted) != 2 {
return "", fmt.Errorf("malformed dsn %q", dsn)
}
// Newer versions of pq.ParseURL quote values so trim them off if they exist
key := strings.Trim(splitted[0], "'\"")
value := strings.Trim(splitted[1], "'\"")
kv[key] = value
}
var fingerprint string
if host, ok := kv["host"]; ok {
fingerprint += host
} else {
fingerprint += "localhost"
}
if port, ok := kv["port"]; ok {
fingerprint += ":" + port
} else {
fingerprint += ":5432"
}
return fingerprint, nil
}