Add support for optional namespace caching. (#319)

In the user queries.yml file, the created namespaces can now be optionally
cached by setting cache_seconds, which will prevent the query being re-run
within that timeframe if previous results are available.

Supercedes #211, credit to @SamSaffron for the original PR.
This commit is contained in:
Will Rouesnel 2019-11-01 00:17:31 +11:00 committed by GitHub
parent 1385b4f658
commit 34fdb69ee2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 198 additions and 122 deletions

View File

@ -100,6 +100,7 @@ type Mapping map[string]MappingOptions
type UserQuery struct {
Query string `yaml:"query"`
Metrics []Mapping `yaml:"metrics"`
CacheSeconds uint64 `yaml:"cache_seconds"` // Number of seconds to cache the namespace result metrics for.
}
// nolint: golint
@ -134,10 +135,18 @@ func (cm *ColumnMapping) UnmarshalYAML(unmarshal func(interface{}) error) error
return unmarshal((*plain)(cm))
}
// intermediateMetricMap holds the partially loaded metric map parsing.
// This is mainly so we can parse cacheSeconds around.
type intermediateMetricMap struct {
columnMappings map[string]ColumnMapping
cacheSeconds uint64
}
// MetricMapNamespace groups metric maps under a shared set of labels.
type MetricMapNamespace struct {
labels []string // Label names for this namespace
columnMappings map[string]MetricMap // Column mappings in this namespace
cacheSeconds uint64 // Number of seconds this metric namespace can be cached. 0 disables.
}
// MetricMap stores the prometheus metric description which a given column will
@ -172,28 +181,16 @@ func dumpMaps() {
}
}
for column, details := range cmap {
for column, details := range cmap.columnMappings {
fmt.Printf(" %-40s %v\n", column, details)
}
fmt.Println()
}
}
var builtinMetricMaps = map[string]map[string]ColumnMapping{
"pg_stat_bgwriter": {
"checkpoints_timed": {COUNTER, "Number of scheduled checkpoints that have been performed", nil, nil},
"checkpoints_req": {COUNTER, "Number of requested checkpoints that have been performed", nil, nil},
"checkpoint_write_time": {COUNTER, "Total amount of time that has been spent in the portion of checkpoint processing where files are written to disk, in milliseconds", nil, nil},
"checkpoint_sync_time": {COUNTER, "Total amount of time that has been spent in the portion of checkpoint processing where files are synchronized to disk, in milliseconds", nil, nil},
"buffers_checkpoint": {COUNTER, "Number of buffers written during checkpoints", nil, nil},
"buffers_clean": {COUNTER, "Number of buffers written by the background writer", nil, nil},
"maxwritten_clean": {COUNTER, "Number of times the background writer stopped a cleaning scan because it had written too many buffers", nil, nil},
"buffers_backend": {COUNTER, "Number of buffers written directly by a backend", nil, nil},
"buffers_backend_fsync": {COUNTER, "Number of times a backend had to execute its own fsync call (normally the background writer handles those even when the backend does its own write)", nil, nil},
"buffers_alloc": {COUNTER, "Number of buffers allocated", nil, nil},
"stats_reset": {COUNTER, "Time at which these statistics were last reset", nil, nil},
},
var builtinMetricMaps = map[string]intermediateMetricMap{
"pg_stat_database": {
map[string]ColumnMapping{
"datid": {LABEL, "OID of a database", nil, nil},
"datname": {LABEL, "Name of this database", nil, nil},
"numbackends": {GAUGE, "Number of backends currently connected to this database. This is the only column in this view that returns a value reflecting current state; all other columns return the accumulated values since the last reset.", nil, nil},
@ -214,7 +211,10 @@ var builtinMetricMaps = map[string]map[string]ColumnMapping{
"blk_write_time": {COUNTER, "Time spent writing data file blocks by backends in this database, in milliseconds", nil, nil},
"stats_reset": {COUNTER, "Time at which these statistics were last reset", nil, nil},
},
0,
},
"pg_stat_database_conflicts": {
map[string]ColumnMapping{
"datid": {LABEL, "OID of a database", nil, nil},
"datname": {LABEL, "Name of this database", nil, nil},
"confl_tablespace": {COUNTER, "Number of queries in this database that have been canceled due to dropped tablespaces", nil, nil},
@ -223,12 +223,18 @@ var builtinMetricMaps = map[string]map[string]ColumnMapping{
"confl_bufferpin": {COUNTER, "Number of queries in this database that have been canceled due to pinned buffers", nil, nil},
"confl_deadlock": {COUNTER, "Number of queries in this database that have been canceled due to deadlocks", nil, nil},
},
0,
},
"pg_locks": {
map[string]ColumnMapping{
"datname": {LABEL, "Name of this database", nil, nil},
"mode": {LABEL, "Type of Lock", nil, nil},
"count": {GAUGE, "Number of locks", nil, nil},
},
0,
},
"pg_stat_replication": {
map[string]ColumnMapping{
"procpid": {DISCARD, "Process ID of a WAL sender process", nil, semver.MustParseRange("<9.2.0")},
"pid": {DISCARD, "Process ID of a WAL sender process", nil, semver.MustParseRange(">=9.2.0")},
"usesysid": {DISCARD, "OID of the user logged into this WAL sender process", nil, nil},
@ -270,12 +276,17 @@ var builtinMetricMaps = map[string]map[string]ColumnMapping{
"flush_lag": {DISCARD, "Time elapsed between flushing recent WAL locally and receiving notification that this standby server has written and flushed it (but not yet applied it). This can be used to gauge the delay that synchronous_commit level remote_flush incurred while committing if this server was configured as a synchronous standby.", nil, semver.MustParseRange(">=10.0.0")},
"replay_lag": {DISCARD, "Time elapsed between flushing recent WAL locally and receiving notification that this standby server has written, flushed and applied it. This can be used to gauge the delay that synchronous_commit level remote_apply incurred while committing if this server was configured as a synchronous standby.", nil, semver.MustParseRange(">=10.0.0")},
},
0,
},
"pg_stat_activity": {
map[string]ColumnMapping{
"datname": {LABEL, "Name of this database", nil, nil},
"state": {LABEL, "connection state", nil, semver.MustParseRange(">=9.2.0")},
"count": {GAUGE, "number of connections in this state", nil, nil},
"max_tx_duration": {GAUGE, "max duration in seconds any active transaction has been running", nil, nil},
},
0,
},
}
// OverrideQuery 's are run in-place of simple namespace look ups, and provide
@ -412,7 +423,7 @@ func makeQueryOverrideMap(pgVersion semver.Version, queryOverrides map[string][]
return resultMap
}
func parseUserQueries(content []byte) (map[string]map[string]ColumnMapping, map[string]string, error) {
func parseUserQueries(content []byte) (map[string]intermediateMetricMap, map[string]string, error) {
var userQueries UserQueries
err := yaml.Unmarshal(content, &userQueries)
@ -421,16 +432,20 @@ func parseUserQueries(content []byte) (map[string]map[string]ColumnMapping, map[
}
// Stores the loaded map representation
metricMaps := make(map[string]map[string]ColumnMapping)
metricMaps := make(map[string]intermediateMetricMap)
newQueryOverrides := make(map[string]string)
for metric, specs := range userQueries {
log.Debugln("New user metric namespace from YAML:", metric)
log.Debugln("New user metric namespace from YAML:", metric, "Will cache results for:", specs.CacheSeconds)
newQueryOverrides[metric] = specs.Query
metricMap, ok := metricMaps[metric]
if !ok {
// Namespace for metric not found - add it.
metricMap = make(map[string]ColumnMapping)
newMetricMap := make(map[string]ColumnMapping)
metricMap = intermediateMetricMap{
columnMappings: newMetricMap,
cacheSeconds: specs.CacheSeconds,
}
metricMaps[metric] = metricMap
}
for _, metric := range specs.Metrics {
@ -444,7 +459,8 @@ func parseUserQueries(content []byte) (map[string]map[string]ColumnMapping, map[
columnMapping.mapping = nil
// Should we support this for users?
columnMapping.supportedVersions = nil
metricMap[name] = columnMapping
metricMap.columnMappings[name] = columnMapping
}
}
}
@ -492,21 +508,21 @@ func addQueries(content []byte, pgVersion semver.Version, server *Server) error
}
// Turn the MetricMap column mapping into a prometheus descriptor mapping.
func makeDescMap(pgVersion semver.Version, serverLabels prometheus.Labels, metricMaps map[string]map[string]ColumnMapping) map[string]MetricMapNamespace {
func makeDescMap(pgVersion semver.Version, serverLabels prometheus.Labels, metricMaps map[string]intermediateMetricMap) map[string]MetricMapNamespace {
var metricMap = make(map[string]MetricMapNamespace)
for namespace, mappings := range metricMaps {
for namespace, intermediateMappings := range metricMaps {
thisMap := make(map[string]MetricMap)
// Get the constant labels
var variableLabels []string
for columnName, columnMapping := range mappings {
for columnName, columnMapping := range intermediateMappings.columnMappings {
if columnMapping.usage == LABEL {
variableLabels = append(variableLabels, columnName)
}
}
for columnName, columnMapping := range mappings {
for columnName, columnMapping := range intermediateMappings.columnMappings {
// Check column version compatibility for the current map
// Force to discard if not compatible.
if columnMapping.supportedVersions != nil {
@ -598,7 +614,7 @@ func makeDescMap(pgVersion semver.Version, serverLabels prometheus.Labels, metri
}
}
metricMap[namespace] = MetricMapNamespace{variableLabels, thisMap}
metricMap[namespace] = MetricMapNamespace{variableLabels, thisMap, intermediateMappings.cacheSeconds}
}
return metricMap
@ -744,6 +760,11 @@ func loggableDSN(dsn string) string {
return pDSN.String()
}
type cachedMetrics struct {
metrics []prometheus.Metric
lastScrape time.Time
}
// Server describes a connection to Postgres.
// Also it contains metrics map and query overrides.
type Server struct {
@ -759,6 +780,9 @@ type Server struct {
// Currently active query overrides
queryOverrides map[string]string
mappingMtx sync.RWMutex
// Currently cached metrics
metricCache map[string]cachedMetrics
cacheMtx sync.Mutex
}
// ServerOpt configures a server.
@ -795,6 +819,7 @@ func NewServer(dsn string, opts ...ServerOpt) (*Server, error) {
labels: prometheus.Labels{
serverLabelName: fingerprint,
},
metricCache: make(map[string]cachedMetrics),
}
for _, opt := range opts {
@ -908,7 +933,7 @@ func (s *Servers) Close() {
type Exporter struct {
// Holds a reference to the build in column mappings. Currently this is for testing purposes
// only, since it just points to the global.
builtinMetricMaps map[string]map[string]ColumnMapping
builtinMetricMaps map[string]intermediateMetricMap
disableDefaultMetrics, disableSettingsMetrics, autoDiscoverDatabases bool
@ -1123,7 +1148,7 @@ func queryDatabases(server *Server) ([]string, error) {
// Query within a namespace mapping and emit metrics. Returns fatal errors if
// the scrape fails, and a slice of errors if they were non-fatal.
func queryNamespaceMapping(ch chan<- prometheus.Metric, server *Server, namespace string, mapping MetricMapNamespace) ([]error, error) {
func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNamespace) ([]prometheus.Metric, []error, error) {
// Check for a query override for this namespace
query, found := server.queryOverrides[namespace]
@ -1131,7 +1156,7 @@ func queryNamespaceMapping(ch chan<- prometheus.Metric, server *Server, namespac
// version of PostgreSQL?
if query == "" && found {
// Return success (no pertinent data)
return []error{}, nil
return []prometheus.Metric{}, []error{}, nil
}
// Don't fail on a bad scrape of one metric
@ -1146,14 +1171,14 @@ func queryNamespaceMapping(ch chan<- prometheus.Metric, server *Server, namespac
rows, err = server.db.Query(query) // nolint: safesql
}
if err != nil {
return []error{}, fmt.Errorf("Error running query on database %q: %s %v", server, namespace, err)
return []prometheus.Metric{}, []error{}, fmt.Errorf("Error running query on database %q: %s %v", server, namespace, err)
}
defer rows.Close() // nolint: errcheck
var columnNames []string
columnNames, err = rows.Columns()
if err != nil {
return []error{}, errors.New(fmt.Sprintln("Error retrieving column list for: ", namespace, err))
return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving column list for: ", namespace, err))
}
// Make a lookup map for the column indices
@ -1170,10 +1195,12 @@ func queryNamespaceMapping(ch chan<- prometheus.Metric, server *Server, namespac
nonfatalErrors := []error{}
metrics := make([]prometheus.Metric, 0)
for rows.Next() {
err = rows.Scan(scanArgs...)
if err != nil {
return []error{}, errors.New(fmt.Sprintln("Error retrieving rows:", namespace, err))
return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving rows:", namespace, err))
}
// Get the label values for this row.
@ -1186,6 +1213,7 @@ func queryNamespaceMapping(ch chan<- prometheus.Metric, server *Server, namespac
// will be filled with an untyped metric number *if* they can be
// converted to float64s. NULLs are allowed and treated as NaN.
for idx, columnName := range columnNames {
var metric prometheus.Metric
if metricMapping, ok := mapping.columnMappings[columnName]; ok {
// Is this a metricy metric?
if metricMapping.discard {
@ -1197,9 +1225,8 @@ func queryNamespaceMapping(ch chan<- prometheus.Metric, server *Server, namespac
nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName, columnData[idx])))
continue
}
// Generate the metric
ch <- prometheus.MustNewConstMetric(metricMapping.desc, metricMapping.vtype, value, labels...)
metric = prometheus.MustNewConstMetric(metricMapping.desc, metricMapping.vtype, value, labels...)
} else {
// Unknown metric. Report as untyped if scan to float64 works, else note an error too.
metricLabel := fmt.Sprintf("%s_%s", namespace, columnName)
@ -1212,11 +1239,12 @@ func queryNamespaceMapping(ch chan<- prometheus.Metric, server *Server, namespac
nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unparseable column type - discarding: ", namespace, columnName, err)))
continue
}
ch <- prometheus.MustNewConstMetric(desc, prometheus.UntypedValue, value, labels...)
metric = prometheus.MustNewConstMetric(desc, prometheus.UntypedValue, value, labels...)
}
metrics = append(metrics, metric)
}
}
}
return nonfatalErrors, nil
return metrics, nonfatalErrors, nil
}
// Iterate through all the namespace mappings in the exporter and run their
@ -1225,9 +1253,33 @@ func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[str
// Return a map of namespace -> errors
namespaceErrors := make(map[string]error)
scrapeStart := time.Now()
for namespace, mapping := range server.metricMap {
log.Debugln("Querying namespace: ", namespace)
nonFatalErrors, err := queryNamespaceMapping(ch, server, namespace, mapping)
scrapeMetric := false
// Check if the metric is cached
server.cacheMtx.Lock()
cachedMetric, found := server.metricCache[namespace]
server.cacheMtx.Unlock()
// If found, check if needs refresh from cache
if found {
if scrapeStart.Sub(cachedMetric.lastScrape).Seconds() > float64(mapping.cacheSeconds) {
scrapeMetric = true
}
} else {
scrapeMetric = true
}
var metrics []prometheus.Metric
var nonFatalErrors []error
var err error
if scrapeMetric {
metrics, nonFatalErrors, err = queryNamespaceMapping(server, namespace, mapping)
} else {
metrics = cachedMetric.metrics
}
// Serious error - a namespace disappeared
if err != nil {
namespaceErrors[namespace] = err
@ -1239,6 +1291,23 @@ func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[str
log.Infoln(err.Error())
}
}
// Emit the metrics into the channel
for _, metric := range metrics {
ch <- metric
}
if scrapeMetric {
// Only cache if metric is meaningfully cacheable
if mapping.cacheSeconds > 0 {
server.cacheMtx.Lock()
server.metricCache[namespace] = cachedMetrics{
metrics: metrics,
lastScrape: scrapeStart,
}
server.cacheMtx.Unlock()
}
}
}
return namespaceErrors
@ -1399,7 +1468,7 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error {
server, err := e.servers.GetServer(dsn)
if err != nil {
return &ErrorConnectToServer{fmt.Sprintf("Error opening connection to database (%s): %s", loggableDSN(dsn), err)}
return &ErrorConnectToServer{fmt.Sprintf("Error opening connection to database (%s): %s", loggableDSN(dsn), err.Error())}
}
// Check if map versions need to be updated

View File

@ -113,9 +113,12 @@ func (s *IntegrationSuite) TestUnknownMetricParsingDoesntCrash(c *C) {
c.Assert(exporter, NotNil)
// Convert the default maps into a list of empty maps.
emptyMaps := make(map[string]map[string]ColumnMapping, 0)
emptyMaps := make(map[string]intermediateMetricMap, 0)
for k := range exporter.builtinMetricMaps {
emptyMaps[k] = map[string]ColumnMapping{}
emptyMaps[k] = intermediateMetricMap{
map[string]ColumnMapping{},
0,
}
}
exporter.builtinMetricMaps = emptyMaps

View File

@ -26,11 +26,14 @@ func (s *FunctionalSuite) SetUpSuite(c *C) {
}
func (s *FunctionalSuite) TestSemanticVersionColumnDiscard(c *C) {
testMetricMap := map[string]map[string]ColumnMapping{
testMetricMap := map[string]intermediateMetricMap{
"test_namespace": {
map[string]ColumnMapping{
"metric_which_stays": {COUNTER, "This metric should not be eliminated", nil, nil},
"metric_which_discards": {COUNTER, "This metric should be forced to DISCARD", nil, nil},
},
0,
},
}
{
@ -51,9 +54,9 @@ func (s *FunctionalSuite) TestSemanticVersionColumnDiscard(c *C) {
// nolint: dupl
{
// Update the map so the discard metric should be eliminated
discardableMetric := testMetricMap["test_namespace"]["metric_which_discards"]
discardableMetric := testMetricMap["test_namespace"].columnMappings["metric_which_discards"]
discardableMetric.supportedVersions = semver.MustParseRange(">0.0.1")
testMetricMap["test_namespace"]["metric_which_discards"] = discardableMetric
testMetricMap["test_namespace"].columnMappings["metric_which_discards"] = discardableMetric
// Discard metric should be discarded
resultMap := makeDescMap(semver.MustParse("0.0.1"), prometheus.Labels{}, testMetricMap)
@ -72,9 +75,9 @@ func (s *FunctionalSuite) TestSemanticVersionColumnDiscard(c *C) {
// nolint: dupl
{
// Update the map so the discard metric should be kept but has a version
discardableMetric := testMetricMap["test_namespace"]["metric_which_discards"]
discardableMetric := testMetricMap["test_namespace"].columnMappings["metric_which_discards"]
discardableMetric.supportedVersions = semver.MustParseRange(">0.0.1")
testMetricMap["test_namespace"]["metric_which_discards"] = discardableMetric
testMetricMap["test_namespace"].columnMappings["metric_which_discards"] = discardableMetric
// Discard metric should be discarded
resultMap := makeDescMap(semver.MustParse("0.0.2"), prometheus.Labels{}, testMetricMap)

View File

@ -115,6 +115,7 @@ pg_statio_user_tables:
pg_database:
query: "SELECT pg_database.datname, pg_database_size(pg_database.datname) as size FROM pg_database"
cache_seconds: 30
metrics:
- datname:
usage: "LABEL"