diff --git a/cmd/postgres_exporter/postgres_exporter.go b/cmd/postgres_exporter/postgres_exporter.go index 0f941616..9151ba3d 100644 --- a/cmd/postgres_exporter/postgres_exporter.go +++ b/cmd/postgres_exporter/postgres_exporter.go @@ -80,6 +80,7 @@ const ( GAUGE ColumnUsage = iota // Use this column as a gauge MAPPEDMETRIC ColumnUsage = iota // Use this column with the supplied mapping of text values DURATION ColumnUsage = iota // This column should be interpreted as a text duration (and converted to milliseconds) + HISTOGRAM ColumnUsage = iota // Use this column as a histogram ) // UnmarshalYAML implements the yaml.Unmarshaller interface. @@ -169,6 +170,7 @@ type MetricMapNamespace struct { // be mapped to by the collector type MetricMap struct { discard bool // Should metric be discarded during mapping? + histogram bool // Should metric be treated as a histogram? vtype prometheus.ValueType // Prometheus valuetype desc *prometheus.Desc // Prometheus descriptor conversion func(interface{}) (float64, bool) // Conversion function to turn PG result into float64 @@ -650,6 +652,27 @@ func makeDescMap(pgVersion semver.Version, serverLabels prometheus.Labels, metri return dbToFloat64(in) }, } + case HISTOGRAM: + thisMap[columnName] = MetricMap{ + histogram: true, + vtype: prometheus.UntypedValue, + desc: prometheus.NewDesc(fmt.Sprintf("%s_%s", namespace, columnName), columnMapping.description, variableLabels, serverLabels), + conversion: func(in interface{}) (float64, bool) { + return dbToFloat64(in) + }, + } + thisMap[columnName+"_bucket"] = MetricMap{ + histogram: true, + discard: true, + } + thisMap[columnName+"_sum"] = MetricMap{ + histogram: true, + discard: true, + } + thisMap[columnName+"_count"] = MetricMap{ + histogram: true, + discard: true, + } case MAPPEDMETRIC: thisMap[columnName] = MetricMap{ vtype: prometheus.GaugeValue, @@ -721,6 +744,9 @@ func stringToColumnUsage(s string) (ColumnUsage, error) { case "GAUGE": u = GAUGE + case "HISTOGRAM": + u = HISTOGRAM + case "MAPPEDMETRIC": u = MAPPEDMETRIC @@ -772,6 +798,46 @@ func dbToFloat64(t interface{}) (float64, bool) { } } +// Convert database.sql types to uint64 for Prometheus consumption. Null types are mapped to 0. string and []byte +// types are mapped as 0 and !ok +func dbToUint64(t interface{}) (uint64, bool) { + switch v := t.(type) { + case uint64: + return v, true + case int64: + return uint64(v), true + case float64: + return uint64(v), true + case time.Time: + return uint64(v.Unix()), true + case []byte: + // Try and convert to string and then parse to a uint64 + strV := string(v) + result, err := strconv.ParseUint(strV, 10, 64) + if err != nil { + log.Infoln("Could not parse []byte:", err) + return 0, false + } + return result, true + case string: + result, err := strconv.ParseUint(v, 10, 64) + if err != nil { + log.Infoln("Could not parse string:", err) + return 0, false + } + return result, true + case bool: + if v { + return 1, true + } + return 0, true + case nil: + return 0, true + default: + return 0, false + } +} + // Convert database.sql to string for Prometheus labels. Null types are mapped to empty strings. func dbToString(t interface{}) (string, bool) { switch v := t.(type) { @@ -1304,13 +1370,68 @@ func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNa continue } - value, ok := dbToFloat64(columnData[idx]) - if !ok { - nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName, columnData[idx]))) - continue + if metricMapping.histogram { + var keys []float64 + err = pq.Array(&keys).Scan(columnData[idx]) + if err != nil { + return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving", columnName, "buckets:", namespace, err)) + } + + var values []int64 + valuesIdx, ok := columnIdx[columnName+"_bucket"] + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Missing column: ", namespace, columnName+"_bucket"))) + continue + } + err = pq.Array(&values).Scan(columnData[valuesIdx]) + if err != nil { + return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving", columnName, "bucket values:", namespace, err)) + } + + buckets := make(map[float64]uint64, len(keys)) + for i, key := range keys { + if i >= len(values) { + break + } + buckets[key] = uint64(values[i]) + } + + idx, ok = columnIdx[columnName+"_sum"] + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Missing column: ", namespace, columnName+"_sum"))) + continue + } + sum, ok := dbToFloat64(columnData[idx]) + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName+"_sum", columnData[idx]))) + continue + } + + idx, ok = columnIdx[columnName+"_count"] + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Missing column: ", namespace, columnName+"_count"))) + continue + } + count, ok := dbToUint64(columnData[idx]) + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName+"_count", columnData[idx]))) + continue + } + + metric = prometheus.MustNewConstHistogram( + metricMapping.desc, + count, sum, buckets, + labels..., + ) + } else { + value, ok := dbToFloat64(columnData[idx]) + if !ok { + nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName, columnData[idx]))) + continue + } + // Generate the metric + metric = prometheus.MustNewConstMetric(metricMapping.desc, metricMapping.vtype, value, labels...) } - // Generate the metric - metric = prometheus.MustNewConstMetric(metricMapping.desc, metricMapping.vtype, value, labels...) } else { // Unknown metric. Report as untyped if scan to float64 works, else note an error too. metricLabel := fmt.Sprintf("%s_%s", namespace, columnName) diff --git a/cmd/postgres_exporter/postgres_exporter_integration_test.go b/cmd/postgres_exporter/postgres_exporter_integration_test.go index 0363af96..d575692a 100644 --- a/cmd/postgres_exporter/postgres_exporter_integration_test.go +++ b/cmd/postgres_exporter/postgres_exporter_integration_test.go @@ -126,3 +126,26 @@ func (s *IntegrationSuite) TestUnknownMetricParsingDoesntCrash(c *C) { // scrape the exporter and make sure it works exporter.scrape(ch) } + +// TestExtendQueriesDoesntCrash tests that specifying extend.query-path doesn't +// crash. +func (s *IntegrationSuite) TestExtendQueriesDoesntCrash(c *C) { + // Setup a dummy channel to consume metrics + ch := make(chan prometheus.Metric, 100) + go func() { + for range ch { + } + }() + + dsn := os.Getenv("DATA_SOURCE_NAME") + c.Assert(dsn, Not(Equals), "") + + exporter := NewExporter( + strings.Split(dsn, ","), + WithUserQueriesPath("../user_queries_test.yaml"), + ) + c.Assert(exporter, NotNil) + + // scrape the exporter and make sure it works + exporter.scrape(ch) +} diff --git a/cmd/postgres_exporter/postgres_exporter_test.go b/cmd/postgres_exporter/postgres_exporter_test.go index 8222bf9d..0a471750 100644 --- a/cmd/postgres_exporter/postgres_exporter_test.go +++ b/cmd/postgres_exporter/postgres_exporter_test.go @@ -4,9 +4,11 @@ package main import ( "io/ioutil" + "math" "os" "reflect" "testing" + "time" "github.com/blang/semver" "github.com/prometheus/client_golang/prometheus" @@ -287,6 +289,22 @@ func UnsetEnvironment(c *C, d string) { c.Assert(err, IsNil) } +type isNaNChecker struct { + *CheckerInfo +} + +var IsNaN Checker = &isNaNChecker{ + &CheckerInfo{Name: "IsNaN", Params: []string{"value"}}, +} + +func (checker *isNaNChecker) Check(params []interface{}, names []string) (result bool, error string) { + param, ok := (params[0]).(float64) + if !ok { + return false, "obtained value type is not a float" + } + return math.IsNaN(param), "" +} + // test boolean metric type gets converted to float func (s *FunctionalSuite) TestBooleanConversionToValueAndString(c *C) { @@ -294,6 +312,7 @@ func (s *FunctionalSuite) TestBooleanConversionToValueAndString(c *C) { input interface{} expectedString string expectedValue float64 + expectedCount uint64 expectedOK bool } @@ -302,19 +321,71 @@ func (s *FunctionalSuite) TestBooleanConversionToValueAndString(c *C) { input: true, expectedString: "true", expectedValue: 1.0, + expectedCount: 1, expectedOK: true, }, { input: false, expectedString: "false", expectedValue: 0.0, + expectedCount: 0, + expectedOK: true, + }, + { + input: nil, + expectedString: "", + expectedValue: math.NaN(), + expectedCount: 0, + expectedOK: true, + }, + { + input: TestCase{}, + expectedString: "", + expectedValue: math.NaN(), + expectedCount: 0, + expectedOK: false, + }, + { + input: 123.0, + expectedString: "123", + expectedValue: 123.0, + expectedCount: 123, + expectedOK: true, + }, + { + input: "123", + expectedString: "123", + expectedValue: 123.0, + expectedCount: 123, + expectedOK: true, + }, + { + input: []byte("123"), + expectedString: "123", + expectedValue: 123.0, + expectedCount: 123, + expectedOK: true, + }, + { + input: time.Unix(1600000000, 0), + expectedString: "1600000000", + expectedValue: 1600000000.0, + expectedCount: 1600000000, expectedOK: true, }, } for _, cs := range cases { value, ok := dbToFloat64(cs.input) - c.Assert(value, Equals, cs.expectedValue) + if math.IsNaN(cs.expectedValue) { + c.Assert(value, IsNaN) + } else { + c.Assert(value, Equals, cs.expectedValue) + } + c.Assert(ok, Equals, cs.expectedOK) + + count, ok := dbToUint64(cs.input) + c.Assert(count, Equals, cs.expectedCount) c.Assert(ok, Equals, cs.expectedOK) str, ok := dbToString(cs.input) diff --git a/cmd/postgres_exporter/tests/user_queries_test.yaml b/cmd/postgres_exporter/tests/user_queries_test.yaml new file mode 100644 index 00000000..c9a39655 --- /dev/null +++ b/cmd/postgres_exporter/tests/user_queries_test.yaml @@ -0,0 +1,51 @@ +random: + query: | + WITH data AS (SELECT floor(random()*10) AS d FROM generate_series(1,100)), + metrics AS (SELECT SUM(d) AS sum, COUNT(*) AS count FROM data), + buckets AS (SELECT le, SUM(CASE WHEN d <= le THEN 1 ELSE 0 END) AS d + FROM data, UNNEST(ARRAY[1, 2, 4, 8]) AS le GROUP BY le) + SELECT + sum AS histogram_sum, + count AS histogram_count, + ARRAY_AGG(le) AS histogram, + ARRAY_AGG(d) AS histogram_bucket, + ARRAY_AGG(le) AS missing, + ARRAY_AGG(le) AS missing_sum, + ARRAY_AGG(d) AS missing_sum_bucket, + ARRAY_AGG(le) AS missing_count, + ARRAY_AGG(d) AS missing_count_bucket, + sum AS missing_count_sum, + ARRAY_AGG(le) AS unexpected_sum, + ARRAY_AGG(d) AS unexpected_sum_bucket, + 'data' AS unexpected_sum_sum, + ARRAY_AGG(le) AS unexpected_count, + ARRAY_AGG(d) AS unexpected_count_bucket, + sum AS unexpected_count_sum, + 'nan'::varchar AS unexpected_count_count, + ARRAY_AGG(le) AS unexpected_bytes, + ARRAY_AGG(d) AS unexpected_bytes_bucket, + sum AS unexpected_bytes_sum, + 'nan'::bytea AS unexpected_bytes_count + FROM metrics, buckets GROUP BY 1,2 + metrics: + - histogram: + usage: "HISTOGRAM" + description: "Random data" + - missing: + usage: "HISTOGRAM" + description: "nonfatal error" + - missing_sum: + usage: "HISTOGRAM" + description: "nonfatal error" + - missing_count: + usage: "HISTOGRAM" + description: "nonfatal error" + - unexpected_sum: + usage: "HISTOGRAM" + description: "nonfatal error" + - unexpected_count: + usage: "HISTOGRAM" + description: "nonfatal error" + - unexpected_bytes: + usage: "HISTOGRAM" + description: "nonfatal error" diff --git a/queries.yaml b/queries.yaml index d0fdc520..24abb9a9 100644 --- a/queries.yaml +++ b/queries.yaml @@ -228,3 +228,47 @@ pg_stat_statements: - blk_write_time_seconds: usage: "COUNTER" description: "Total time the statement spent writing blocks, in milliseconds (if track_io_timing is enabled, otherwise zero)" + +pg_stat_activity: + query: | + WITH + metrics AS ( + SELECT + application_name, + SUM(EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - state_change))::bigint)::float AS process_idle_seconds_sum, + COUNT(*) AS process_idle_seconds_count + FROM pg_stat_activity + WHERE state = 'idle' + GROUP BY application_name + ), + buckets AS ( + SELECT + application_name, + le, + SUM( + CASE WHEN EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - state_change)) <= le + THEN 1 + ELSE 0 + END + )::bigint AS bucket + FROM + pg_stat_activity, + UNNEST(ARRAY[1, 2, 5, 15, 30, 60, 90, 120, 300]) AS le + GROUP BY application_name, le + ORDER BY application_name, le + ) + SELECT + application_name, + process_idle_seconds_sum, + process_idle_seconds_count, + ARRAY_AGG(le) AS process_idle_seconds, + ARRAY_AGG(bucket) AS process_idle_seconds_bucket + FROM metrics JOIN buckets USING (application_name) + GROUP BY 1, 2, 3 + metrics: + - application_name: + usage: "LABEL" + description: "Application Name" + - process_idle_seconds: + usage: "HISTOGRAM" + description: "Idle time of server processes"