Introduce histogram support ()

* Introduce histogram support

Prior to this change, the custom queries were restricted to counters and
gauges.

This change introduces a new ColumnUsage, namely HISTOGRAM, that expects
the column to contain an array of upper inclusive bounds for each
observation bucket in the emitted metric.  It also expects three more
columns to be present with the suffixes:
- `_bucket`, containing an array of cumulative counters for the
  observation buckets;
- `_sum`, the total sum of all observed values; and
- `_count`, the count of events that have been observed.

A flag has been added to the MetricMap struct to easily identify metrics
that should emit a histogram and the construction of a histogram metric
is aided by the pg.Array function and a new helper dbToUint64 function.

Finally, and example of usage is given in queries.yaml.

fixes 

Signed-off-by: Corin Lawson <corin@responsight.com>

* Introduces tests for histogram support

Prior to this change, the histogram support was untested.

This change introduces a new integration test that reads a user query
containing a number of histogram metrics.  Also, additional checks have
been added to TestBooleanConversionToValueAndString to test dbToUint64.

Signed-off-by: Corin Lawson <corin@responsight.com>
This commit is contained in:
Corin Lawson 2020-12-25 02:34:26 +11:00 committed by GitHub
parent 3864bbc9cd
commit 3fd1c2c0eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 317 additions and 7 deletions

View File

@ -80,6 +80,7 @@ const (
GAUGE ColumnUsage = iota // Use this column as a gauge
MAPPEDMETRIC ColumnUsage = iota // Use this column with the supplied mapping of text values
DURATION ColumnUsage = iota // This column should be interpreted as a text duration (and converted to milliseconds)
HISTOGRAM ColumnUsage = iota // Use this column as a histogram
)
// UnmarshalYAML implements the yaml.Unmarshaller interface.
@ -169,6 +170,7 @@ type MetricMapNamespace struct {
// be mapped to by the collector
type MetricMap struct {
discard bool // Should metric be discarded during mapping?
histogram bool // Should metric be treated as a histogram?
vtype prometheus.ValueType // Prometheus valuetype
desc *prometheus.Desc // Prometheus descriptor
conversion func(interface{}) (float64, bool) // Conversion function to turn PG result into float64
@ -650,6 +652,27 @@ func makeDescMap(pgVersion semver.Version, serverLabels prometheus.Labels, metri
return dbToFloat64(in)
},
}
case HISTOGRAM:
thisMap[columnName] = MetricMap{
histogram: true,
vtype: prometheus.UntypedValue,
desc: prometheus.NewDesc(fmt.Sprintf("%s_%s", namespace, columnName), columnMapping.description, variableLabels, serverLabels),
conversion: func(in interface{}) (float64, bool) {
return dbToFloat64(in)
},
}
thisMap[columnName+"_bucket"] = MetricMap{
histogram: true,
discard: true,
}
thisMap[columnName+"_sum"] = MetricMap{
histogram: true,
discard: true,
}
thisMap[columnName+"_count"] = MetricMap{
histogram: true,
discard: true,
}
case MAPPEDMETRIC:
thisMap[columnName] = MetricMap{
vtype: prometheus.GaugeValue,
@ -721,6 +744,9 @@ func stringToColumnUsage(s string) (ColumnUsage, error) {
case "GAUGE":
u = GAUGE
case "HISTOGRAM":
u = HISTOGRAM
case "MAPPEDMETRIC":
u = MAPPEDMETRIC
@ -772,6 +798,46 @@ func dbToFloat64(t interface{}) (float64, bool) {
}
}
// Convert database.sql types to uint64 for Prometheus consumption. Null types are mapped to 0. string and []byte
// types are mapped as 0 and !ok
func dbToUint64(t interface{}) (uint64, bool) {
switch v := t.(type) {
case uint64:
return v, true
case int64:
return uint64(v), true
case float64:
return uint64(v), true
case time.Time:
return uint64(v.Unix()), true
case []byte:
// Try and convert to string and then parse to a uint64
strV := string(v)
result, err := strconv.ParseUint(strV, 10, 64)
if err != nil {
log.Infoln("Could not parse []byte:", err)
return 0, false
}
return result, true
case string:
result, err := strconv.ParseUint(v, 10, 64)
if err != nil {
log.Infoln("Could not parse string:", err)
return 0, false
}
return result, true
case bool:
if v {
return 1, true
}
return 0, true
case nil:
return 0, true
default:
return 0, false
}
}
// Convert database.sql to string for Prometheus labels. Null types are mapped to empty strings.
func dbToString(t interface{}) (string, bool) {
switch v := t.(type) {
@ -1304,13 +1370,68 @@ func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNa
continue
}
value, ok := dbToFloat64(columnData[idx])
if !ok {
nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName, columnData[idx])))
continue
if metricMapping.histogram {
var keys []float64
err = pq.Array(&keys).Scan(columnData[idx])
if err != nil {
return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving", columnName, "buckets:", namespace, err))
}
var values []int64
valuesIdx, ok := columnIdx[columnName+"_bucket"]
if !ok {
nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Missing column: ", namespace, columnName+"_bucket")))
continue
}
err = pq.Array(&values).Scan(columnData[valuesIdx])
if err != nil {
return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving", columnName, "bucket values:", namespace, err))
}
buckets := make(map[float64]uint64, len(keys))
for i, key := range keys {
if i >= len(values) {
break
}
buckets[key] = uint64(values[i])
}
idx, ok = columnIdx[columnName+"_sum"]
if !ok {
nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Missing column: ", namespace, columnName+"_sum")))
continue
}
sum, ok := dbToFloat64(columnData[idx])
if !ok {
nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName+"_sum", columnData[idx])))
continue
}
idx, ok = columnIdx[columnName+"_count"]
if !ok {
nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Missing column: ", namespace, columnName+"_count")))
continue
}
count, ok := dbToUint64(columnData[idx])
if !ok {
nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName+"_count", columnData[idx])))
continue
}
metric = prometheus.MustNewConstHistogram(
metricMapping.desc,
count, sum, buckets,
labels...,
)
} else {
value, ok := dbToFloat64(columnData[idx])
if !ok {
nonfatalErrors = append(nonfatalErrors, errors.New(fmt.Sprintln("Unexpected error parsing column: ", namespace, columnName, columnData[idx])))
continue
}
// Generate the metric
metric = prometheus.MustNewConstMetric(metricMapping.desc, metricMapping.vtype, value, labels...)
}
// Generate the metric
metric = prometheus.MustNewConstMetric(metricMapping.desc, metricMapping.vtype, value, labels...)
} else {
// Unknown metric. Report as untyped if scan to float64 works, else note an error too.
metricLabel := fmt.Sprintf("%s_%s", namespace, columnName)

View File

@ -126,3 +126,26 @@ func (s *IntegrationSuite) TestUnknownMetricParsingDoesntCrash(c *C) {
// scrape the exporter and make sure it works
exporter.scrape(ch)
}
// TestExtendQueriesDoesntCrash tests that specifying extend.query-path doesn't
// crash.
func (s *IntegrationSuite) TestExtendQueriesDoesntCrash(c *C) {
// Setup a dummy channel to consume metrics
ch := make(chan prometheus.Metric, 100)
go func() {
for range ch {
}
}()
dsn := os.Getenv("DATA_SOURCE_NAME")
c.Assert(dsn, Not(Equals), "")
exporter := NewExporter(
strings.Split(dsn, ","),
WithUserQueriesPath("../user_queries_test.yaml"),
)
c.Assert(exporter, NotNil)
// scrape the exporter and make sure it works
exporter.scrape(ch)
}

View File

@ -4,9 +4,11 @@ package main
import (
"io/ioutil"
"math"
"os"
"reflect"
"testing"
"time"
"github.com/blang/semver"
"github.com/prometheus/client_golang/prometheus"
@ -287,6 +289,22 @@ func UnsetEnvironment(c *C, d string) {
c.Assert(err, IsNil)
}
type isNaNChecker struct {
*CheckerInfo
}
var IsNaN Checker = &isNaNChecker{
&CheckerInfo{Name: "IsNaN", Params: []string{"value"}},
}
func (checker *isNaNChecker) Check(params []interface{}, names []string) (result bool, error string) {
param, ok := (params[0]).(float64)
if !ok {
return false, "obtained value type is not a float"
}
return math.IsNaN(param), ""
}
// test boolean metric type gets converted to float
func (s *FunctionalSuite) TestBooleanConversionToValueAndString(c *C) {
@ -294,6 +312,7 @@ func (s *FunctionalSuite) TestBooleanConversionToValueAndString(c *C) {
input interface{}
expectedString string
expectedValue float64
expectedCount uint64
expectedOK bool
}
@ -302,19 +321,71 @@ func (s *FunctionalSuite) TestBooleanConversionToValueAndString(c *C) {
input: true,
expectedString: "true",
expectedValue: 1.0,
expectedCount: 1,
expectedOK: true,
},
{
input: false,
expectedString: "false",
expectedValue: 0.0,
expectedCount: 0,
expectedOK: true,
},
{
input: nil,
expectedString: "",
expectedValue: math.NaN(),
expectedCount: 0,
expectedOK: true,
},
{
input: TestCase{},
expectedString: "",
expectedValue: math.NaN(),
expectedCount: 0,
expectedOK: false,
},
{
input: 123.0,
expectedString: "123",
expectedValue: 123.0,
expectedCount: 123,
expectedOK: true,
},
{
input: "123",
expectedString: "123",
expectedValue: 123.0,
expectedCount: 123,
expectedOK: true,
},
{
input: []byte("123"),
expectedString: "123",
expectedValue: 123.0,
expectedCount: 123,
expectedOK: true,
},
{
input: time.Unix(1600000000, 0),
expectedString: "1600000000",
expectedValue: 1600000000.0,
expectedCount: 1600000000,
expectedOK: true,
},
}
for _, cs := range cases {
value, ok := dbToFloat64(cs.input)
c.Assert(value, Equals, cs.expectedValue)
if math.IsNaN(cs.expectedValue) {
c.Assert(value, IsNaN)
} else {
c.Assert(value, Equals, cs.expectedValue)
}
c.Assert(ok, Equals, cs.expectedOK)
count, ok := dbToUint64(cs.input)
c.Assert(count, Equals, cs.expectedCount)
c.Assert(ok, Equals, cs.expectedOK)
str, ok := dbToString(cs.input)

View File

@ -0,0 +1,51 @@
random:
query: |
WITH data AS (SELECT floor(random()*10) AS d FROM generate_series(1,100)),
metrics AS (SELECT SUM(d) AS sum, COUNT(*) AS count FROM data),
buckets AS (SELECT le, SUM(CASE WHEN d <= le THEN 1 ELSE 0 END) AS d
FROM data, UNNEST(ARRAY[1, 2, 4, 8]) AS le GROUP BY le)
SELECT
sum AS histogram_sum,
count AS histogram_count,
ARRAY_AGG(le) AS histogram,
ARRAY_AGG(d) AS histogram_bucket,
ARRAY_AGG(le) AS missing,
ARRAY_AGG(le) AS missing_sum,
ARRAY_AGG(d) AS missing_sum_bucket,
ARRAY_AGG(le) AS missing_count,
ARRAY_AGG(d) AS missing_count_bucket,
sum AS missing_count_sum,
ARRAY_AGG(le) AS unexpected_sum,
ARRAY_AGG(d) AS unexpected_sum_bucket,
'data' AS unexpected_sum_sum,
ARRAY_AGG(le) AS unexpected_count,
ARRAY_AGG(d) AS unexpected_count_bucket,
sum AS unexpected_count_sum,
'nan'::varchar AS unexpected_count_count,
ARRAY_AGG(le) AS unexpected_bytes,
ARRAY_AGG(d) AS unexpected_bytes_bucket,
sum AS unexpected_bytes_sum,
'nan'::bytea AS unexpected_bytes_count
FROM metrics, buckets GROUP BY 1,2
metrics:
- histogram:
usage: "HISTOGRAM"
description: "Random data"
- missing:
usage: "HISTOGRAM"
description: "nonfatal error"
- missing_sum:
usage: "HISTOGRAM"
description: "nonfatal error"
- missing_count:
usage: "HISTOGRAM"
description: "nonfatal error"
- unexpected_sum:
usage: "HISTOGRAM"
description: "nonfatal error"
- unexpected_count:
usage: "HISTOGRAM"
description: "nonfatal error"
- unexpected_bytes:
usage: "HISTOGRAM"
description: "nonfatal error"

View File

@ -228,3 +228,47 @@ pg_stat_statements:
- blk_write_time_seconds:
usage: "COUNTER"
description: "Total time the statement spent writing blocks, in milliseconds (if track_io_timing is enabled, otherwise zero)"
pg_stat_activity:
query: |
WITH
metrics AS (
SELECT
application_name,
SUM(EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - state_change))::bigint)::float AS process_idle_seconds_sum,
COUNT(*) AS process_idle_seconds_count
FROM pg_stat_activity
WHERE state = 'idle'
GROUP BY application_name
),
buckets AS (
SELECT
application_name,
le,
SUM(
CASE WHEN EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - state_change)) <= le
THEN 1
ELSE 0
END
)::bigint AS bucket
FROM
pg_stat_activity,
UNNEST(ARRAY[1, 2, 5, 15, 30, 60, 90, 120, 300]) AS le
GROUP BY application_name, le
ORDER BY application_name, le
)
SELECT
application_name,
process_idle_seconds_sum,
process_idle_seconds_count,
ARRAY_AGG(le) AS process_idle_seconds,
ARRAY_AGG(bucket) AS process_idle_seconds_bucket
FROM metrics JOIN buckets USING (application_name)
GROUP BY 1, 2, 3
metrics:
- application_name:
usage: "LABEL"
description: "Application Name"
- process_idle_seconds:
usage: "HISTOGRAM"
description: "Idle time of server processes"