From 3fd1c2c0ebc3b712f788481bd67e3d688112681c Mon Sep 17 00:00:00 2001 From: Corin Lawson Date: Fri, 25 Dec 2020 02:34:26 +1100 Subject: [PATCH] Introduce histogram support (#435) * Introduce histogram support Prior to this change, the custom queries were restricted to counters and gauges. This change introduces a new ColumnUsage, namely HISTOGRAM, that expects the column to contain an array of upper inclusive bounds for each observation bucket in the emitted metric. It also expects three more columns to be present with the suffixes: - `_bucket`, containing an array of cumulative counters for the observation buckets; - `_sum`, the total sum of all observed values; and - `_count`, the count of events that have been observed. A flag has been added to the MetricMap struct to easily identify metrics that should emit a histogram and the construction of a histogram metric is aided by the pg.Array function and a new helper dbToUint64 function. Finally, and example of usage is given in queries.yaml. fixes #402 Signed-off-by: Corin Lawson * Introduces tests for histogram support Prior to this change, the histogram support was untested. This change introduces a new integration test that reads a user query containing a number of histogram metrics. Also, additional checks have been added to TestBooleanConversionToValueAndString to test dbToUint64. Signed-off-by: Corin Lawson --- cmd/postgres_exporter/postgres_exporter.go | 133 +++++++++++++++++- .../postgres_exporter_integration_test.go | 23 +++ .../postgres_exporter_test.go | 73 +++++++++- .../tests/user_queries_test.yaml | 51 +++++++ queries.yaml | 44 ++++++ 5 files changed, 317 insertions(+), 7 deletions(-) create mode 100644 cmd/postgres_exporter/tests/user_queries_test.yaml diff --git a/cmd/postgres_exporter/postgres_exporter.go b/cmd/postgres_exporter/postgres_exporter.go index 0f941616..9151ba3d 100644 --- a/cmd/postgres_exporter/postgres_exporter.go +++ b/cmd/postgres_exporter/postgres_exporter.go @@ -80,6 +80,7 @@ const ( GAUGE ColumnUsage = iota // Use this column as a gauge MAPPEDMETRIC ColumnUsage = iota // Use this column with the supplied mapping of text values DURATION ColumnUsage = iota // This column should be interpreted as a text duration (and converted to milliseconds) + HISTOGRAM ColumnUsage = iota // Use this column as a histogram ) // UnmarshalYAML implements the yaml.Unmarshaller interface. @@ -169,6 +170,7 @@ type MetricMapNamespace struct { // be mapped to by the collector type MetricMap struct { discard bool // Should metric be discarded during mapping? + histogram bool // Should metric be treated as a histogram? vtype prometheus.ValueType // Prometheus valuetype desc *prometheus.Desc // Prometheus descriptor conversion func(interface{}) (float64, bool) // Conversion function to turn PG result into float64 @@ -650,6 +652,27 @@ func makeDescMap(pgVersion semver.Version, serverLabels prometheus.Labels, metri return dbToFloat64(in) }, } + case HISTOGRAM: + thisMap[columnName] = MetricMap{ + histogram: true, + vtype: prometheus.UntypedValue, + desc: prometheus.NewDesc(fmt.Sprintf("%s_%s", namespace, columnName), columnMapping.description, variableLabels, serverLabels), + conversion: func(in interface{}) (float64, bool) { + return dbToFloat64(in) + }, + } + thisMap[columnName+"_bucket"] = MetricMap{ + histogram: true, + discard: true, + } + thisMap[columnName+"_sum"] = MetricMap{ + histogram: true, + discard: true, + } + thisMap[columnName+"_count"] = MetricMap{ + histogram: true, + discard: true, + } case MAPPEDMETRIC: thisMap[columnName] = MetricMap{ vtype: prometheus.GaugeValue, @@ -721,6 +744,9 @@ func stringToColumnUsage(s string) (ColumnUsage, error) { case "GAUGE": u = GAUGE + case "HISTOGRAM": + u = HISTOGRAM + case "MAPPEDMETRIC": u = MAPPEDMETRIC @@ -772,6 +798,46 @@ func dbToFloat64(t interface{}) (float64, bool) { } } +// Convert database.sql types to uint64 for Prometheus consumption. Null types are mapped to 0. string and []byte +// types are mapped as 0 and !ok +func dbToUint64(t interface{}) (uint64, bool) { + switch v := t.(type) { + case uint64: + return v, true + case int64: + return uint64(v), true + case float64: + return uint64(v), true + case time.Time: + return uint64(v.Unix()), true + case []byte: + // Try and convert to string and then parse to a uint64 + strV := string(v) + result, err := strconv.ParseUint(strV, 10, 64) + if err != nil { + log.Infoln("Could not parse []byte:", err) + return 0, false + } + return result, true + case string: + result, err := strconv.ParseUint(v, 10, 64) + if err != nil { + log.Infoln("Could not parse string:", err) + return 0, false + } + return result, true + case bool: + if v { + return 1, true + } + return 0, true + case nil: + return 0, true + default: + return 0, false + } +} + // Convert database.sql to string for Prometheus labels. Null types are mapped to empty strings. func dbToString(t interface{}) (string, bool) { switch v := t.(type) { @@ -1304,13 +1370,68 @@ func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNa continue } - value, ok := dbToFloat64(columnData[idx]) - if !ok { - nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName, columnData[idx]))) - continue + if metricMapping.histogram { + var keys []float64 + err = pq.Array(&keys).Scan(columnData[idx]) + if err != nil { + return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving", columnName, "buckets:", namespace, err)) + } + + var values []int64 + valuesIdx, ok := columnIdx[columnName+"_bucket"] + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Missing column: ", namespace, columnName+"_bucket"))) + continue + } + err = pq.Array(&values).Scan(columnData[valuesIdx]) + if err != nil { + return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving", columnName, "bucket values:", namespace, err)) + } + + buckets := make(map[float64]uint64, len(keys)) + for i, key := range keys { + if i >= len(values) { + break + } + buckets[key] = uint64(values[i]) + } + + idx, ok = columnIdx[columnName+"_sum"] + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Missing column: ", namespace, columnName+"_sum"))) + continue + } + sum, ok := dbToFloat64(columnData[idx]) + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName+"_sum", columnData[idx]))) + continue + } + + idx, ok = columnIdx[columnName+"_count"] + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Missing column: ", namespace, columnName+"_count"))) + continue + } + count, ok := dbToUint64(columnData[idx]) + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName+"_count", columnData[idx]))) + continue + } + + metric = prometheus.MustNewConstHistogram( + metricMapping.desc, + count, sum, buckets, + labels..., + ) + } else { + value, ok := dbToFloat64(columnData[idx]) + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName, columnData[idx]))) + continue + } + // Generate the metric + metric = prometheus.MustNewConstMetric(metricMapping.desc, metricMapping.vtype, value, labels...) } - // Generate the metric - metric = prometheus.MustNewConstMetric(metricMapping.desc, metricMapping.vtype, value, labels...) } else { // Unknown metric. Report as untyped if scan to float64 works, else note an error too. metricLabel := fmt.Sprintf("%s_%s", namespace, columnName) diff --git a/cmd/postgres_exporter/postgres_exporter_integration_test.go b/cmd/postgres_exporter/postgres_exporter_integration_test.go index 0363af96..d575692a 100644 --- a/cmd/postgres_exporter/postgres_exporter_integration_test.go +++ b/cmd/postgres_exporter/postgres_exporter_integration_test.go @@ -126,3 +126,26 @@ func (s *IntegrationSuite) TestUnknownMetricParsingDoesntCrash(c *C) { // scrape the exporter and make sure it works exporter.scrape(ch) } + +// TestExtendQueriesDoesntCrash tests that specifying extend.query-path doesn't +// crash. +func (s *IntegrationSuite) TestExtendQueriesDoesntCrash(c *C) { + // Setup a dummy channel to consume metrics + ch := make(chan prometheus.Metric, 100) + go func() { + for range ch { + } + }() + + dsn := os.Getenv("DATA_SOURCE_NAME") + c.Assert(dsn, Not(Equals), "") + + exporter := NewExporter( + strings.Split(dsn, ","), + WithUserQueriesPath("../user_queries_test.yaml"), + ) + c.Assert(exporter, NotNil) + + // scrape the exporter and make sure it works + exporter.scrape(ch) +} diff --git a/cmd/postgres_exporter/postgres_exporter_test.go b/cmd/postgres_exporter/postgres_exporter_test.go index 8222bf9d..0a471750 100644 --- a/cmd/postgres_exporter/postgres_exporter_test.go +++ b/cmd/postgres_exporter/postgres_exporter_test.go @@ -4,9 +4,11 @@ package main import ( "io/ioutil" + "math" "os" "reflect" "testing" + "time" "github.com/blang/semver" "github.com/prometheus/client_golang/prometheus" @@ -287,6 +289,22 @@ func UnsetEnvironment(c *C, d string) { c.Assert(err, IsNil) } +type isNaNChecker struct { + *CheckerInfo +} + +var IsNaN Checker = &isNaNChecker{ + &CheckerInfo{Name: "IsNaN", Params: []string{"value"}}, +} + +func (checker *isNaNChecker) Check(params []interface{}, names []string) (result bool, error string) { + param, ok := (params[0]).(float64) + if !ok { + return false, "obtained value type is not a float" + } + return math.IsNaN(param), "" +} + // test boolean metric type gets converted to float func (s *FunctionalSuite) TestBooleanConversionToValueAndString(c *C) { @@ -294,6 +312,7 @@ func (s *FunctionalSuite) TestBooleanConversionToValueAndString(c *C) { input interface{} expectedString string expectedValue float64 + expectedCount uint64 expectedOK bool } @@ -302,19 +321,71 @@ func (s *FunctionalSuite) TestBooleanConversionToValueAndString(c *C) { input: true, expectedString: "true", expectedValue: 1.0, + expectedCount: 1, expectedOK: true, }, { input: false, expectedString: "false", expectedValue: 0.0, + expectedCount: 0, + expectedOK: true, + }, + { + input: nil, + expectedString: "", + expectedValue: math.NaN(), + expectedCount: 0, + expectedOK: true, + }, + { + input: TestCase{}, + expectedString: "", + expectedValue: math.NaN(), + expectedCount: 0, + expectedOK: false, + }, + { + input: 123.0, + expectedString: "123", + expectedValue: 123.0, + expectedCount: 123, + expectedOK: true, + }, + { + input: "123", + expectedString: "123", + expectedValue: 123.0, + expectedCount: 123, + expectedOK: true, + }, + { + input: []byte("123"), + expectedString: "123", + expectedValue: 123.0, + expectedCount: 123, + expectedOK: true, + }, + { + input: time.Unix(1600000000, 0), + expectedString: "1600000000", + expectedValue: 1600000000.0, + expectedCount: 1600000000, expectedOK: true, }, } for _, cs := range cases { value, ok := dbToFloat64(cs.input) - c.Assert(value, Equals, cs.expectedValue) + if math.IsNaN(cs.expectedValue) { + c.Assert(value, IsNaN) + } else { + c.Assert(value, Equals, cs.expectedValue) + } + c.Assert(ok, Equals, cs.expectedOK) + + count, ok := dbToUint64(cs.input) + c.Assert(count, Equals, cs.expectedCount) c.Assert(ok, Equals, cs.expectedOK) str, ok := dbToString(cs.input) diff --git a/cmd/postgres_exporter/tests/user_queries_test.yaml b/cmd/postgres_exporter/tests/user_queries_test.yaml new file mode 100644 index 00000000..c9a39655 --- /dev/null +++ b/cmd/postgres_exporter/tests/user_queries_test.yaml @@ -0,0 +1,51 @@ +random: + query: | + WITH data AS (SELECT floor(random()*10) AS d FROM generate_series(1,100)), + metrics AS (SELECT SUM(d) AS sum, COUNT(*) AS count FROM data), + buckets AS (SELECT le, SUM(CASE WHEN d <= le THEN 1 ELSE 0 END) AS d + FROM data, UNNEST(ARRAY[1, 2, 4, 8]) AS le GROUP BY le) + SELECT + sum AS histogram_sum, + count AS histogram_count, + ARRAY_AGG(le) AS histogram, + ARRAY_AGG(d) AS histogram_bucket, + ARRAY_AGG(le) AS missing, + ARRAY_AGG(le) AS missing_sum, + ARRAY_AGG(d) AS missing_sum_bucket, + ARRAY_AGG(le) AS missing_count, + ARRAY_AGG(d) AS missing_count_bucket, + sum AS missing_count_sum, + ARRAY_AGG(le) AS unexpected_sum, + ARRAY_AGG(d) AS unexpected_sum_bucket, + 'data' AS unexpected_sum_sum, + ARRAY_AGG(le) AS unexpected_count, + ARRAY_AGG(d) AS unexpected_count_bucket, + sum AS unexpected_count_sum, + 'nan'::varchar AS unexpected_count_count, + ARRAY_AGG(le) AS unexpected_bytes, + ARRAY_AGG(d) AS unexpected_bytes_bucket, + sum AS unexpected_bytes_sum, + 'nan'::bytea AS unexpected_bytes_count + FROM metrics, buckets GROUP BY 1,2 + metrics: + - histogram: + usage: "HISTOGRAM" + description: "Random data" + - missing: + usage: "HISTOGRAM" + description: "nonfatal error" + - missing_sum: + usage: "HISTOGRAM" + description: "nonfatal error" + - missing_count: + usage: "HISTOGRAM" + description: "nonfatal error" + - unexpected_sum: + usage: "HISTOGRAM" + description: "nonfatal error" + - unexpected_count: + usage: "HISTOGRAM" + description: "nonfatal error" + - unexpected_bytes: + usage: "HISTOGRAM" + description: "nonfatal error" diff --git a/queries.yaml b/queries.yaml index d0fdc520..24abb9a9 100644 --- a/queries.yaml +++ b/queries.yaml @@ -228,3 +228,47 @@ pg_stat_statements: - blk_write_time_seconds: usage: "COUNTER" description: "Total time the statement spent writing blocks, in milliseconds (if track_io_timing is enabled, otherwise zero)" + +pg_stat_activity: + query: | + WITH + metrics AS ( + SELECT + application_name, + SUM(EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - state_change))::bigint)::float AS process_idle_seconds_sum, + COUNT(*) AS process_idle_seconds_count + FROM pg_stat_activity + WHERE state = 'idle' + GROUP BY application_name + ), + buckets AS ( + SELECT + application_name, + le, + SUM( + CASE WHEN EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - state_change)) <= le + THEN 1 + ELSE 0 + END + )::bigint AS bucket + FROM + pg_stat_activity, + UNNEST(ARRAY[1, 2, 5, 15, 30, 60, 90, 120, 300]) AS le + GROUP BY application_name, le + ORDER BY application_name, le + ) + SELECT + application_name, + process_idle_seconds_sum, + process_idle_seconds_count, + ARRAY_AGG(le) AS process_idle_seconds, + ARRAY_AGG(bucket) AS process_idle_seconds_bucket + FROM metrics JOIN buckets USING (application_name) + GROUP BY 1, 2, 3 + metrics: + - application_name: + usage: "LABEL" + description: "Application Name" + - process_idle_seconds: + usage: "HISTOGRAM" + description: "Idle time of server processes"