Merge pull request #5787 from cstyan/reshard-max-logging
Add metrics for max/min/desired shards to queue manager.
This commit is contained in:
commit
3b3eaf3496
|
@ -211,6 +211,26 @@
|
||||||
description: 'Prometheus %(prometheusName)s remote write is {{ printf "%%.1f" $value }}s behind for queue {{$labels.queue}}.' % $._config,
|
description: 'Prometheus %(prometheusName)s remote write is {{ printf "%%.1f" $value }}s behind for queue {{$labels.queue}}.' % $._config,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
alert: 'PrometheusRemoteWriteDesiredShards',
|
||||||
|
expr: |||
|
||||||
|
# Without max_over_time, failed scrapes could create false negatives, see
|
||||||
|
# https://www.robustperception.io/alerting-on-gauges-in-prometheus-2-0 for details.
|
||||||
|
(
|
||||||
|
max_over_time(prometheus_remote_storage_shards_desired{%(prometheusSelector)s}[5m])
|
||||||
|
> on(job, instance) group_right
|
||||||
|
max_over_time(prometheus_remote_storage_shards_max{%(prometheusSelector)s}[5m])
|
||||||
|
)
|
||||||
|
||| % $._config,
|
||||||
|
'for': '15m',
|
||||||
|
labels: {
|
||||||
|
severity: 'warning',
|
||||||
|
},
|
||||||
|
annotations: {
|
||||||
|
summary: 'Prometheus remote write desired shards calculation wants to run more than configured max shards.',
|
||||||
|
description: 'Prometheus %(prometheusName)s remote write desired shards calculation wants to run {{ printf $value }} shards, which is more than the max of {{ printf `prometheus_remote_storage_shards_max{instance="%%s",%(prometheusSelector)s}` $labels.instance | query | first | value }}.' % $._config,
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
alert: 'PrometheusRuleFailures',
|
alert: 'PrometheusRuleFailures',
|
||||||
expr: |||
|
expr: |||
|
||||||
|
|
|
@ -143,6 +143,33 @@ var (
|
||||||
},
|
},
|
||||||
[]string{queue},
|
[]string{queue},
|
||||||
)
|
)
|
||||||
|
maxNumShards = promauto.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "shards_max",
|
||||||
|
Help: "The maximum number of shards that the queue is allowed to run.",
|
||||||
|
},
|
||||||
|
[]string{queue},
|
||||||
|
)
|
||||||
|
minNumShards = promauto.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "shards_min",
|
||||||
|
Help: "The minimum number of shards that the queue is allowed to run.",
|
||||||
|
},
|
||||||
|
[]string{queue},
|
||||||
|
)
|
||||||
|
desiredNumShards = promauto.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "shards_desired",
|
||||||
|
Help: "The number of shards that the queues shard calculation wants to run based on the rate of samples in vs. samples out.",
|
||||||
|
},
|
||||||
|
[]string{queue},
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
// StorageClient defines an interface for sending a batch of samples to an
|
// StorageClient defines an interface for sending a batch of samples to an
|
||||||
|
@ -190,6 +217,9 @@ type QueueManager struct {
|
||||||
succeededSamplesTotal prometheus.Counter
|
succeededSamplesTotal prometheus.Counter
|
||||||
retriedSamplesTotal prometheus.Counter
|
retriedSamplesTotal prometheus.Counter
|
||||||
shardCapacity prometheus.Gauge
|
shardCapacity prometheus.Gauge
|
||||||
|
maxNumShards prometheus.Gauge
|
||||||
|
minNumShards prometheus.Gauge
|
||||||
|
desiredNumShards prometheus.Gauge
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewQueueManager builds a new QueueManager.
|
// NewQueueManager builds a new QueueManager.
|
||||||
|
@ -291,10 +321,16 @@ func (t *QueueManager) Start() {
|
||||||
t.succeededSamplesTotal = succeededSamplesTotal.WithLabelValues(name)
|
t.succeededSamplesTotal = succeededSamplesTotal.WithLabelValues(name)
|
||||||
t.retriedSamplesTotal = retriedSamplesTotal.WithLabelValues(name)
|
t.retriedSamplesTotal = retriedSamplesTotal.WithLabelValues(name)
|
||||||
t.shardCapacity = shardCapacity.WithLabelValues(name)
|
t.shardCapacity = shardCapacity.WithLabelValues(name)
|
||||||
|
t.maxNumShards = maxNumShards.WithLabelValues(name)
|
||||||
|
t.minNumShards = minNumShards.WithLabelValues(name)
|
||||||
|
t.desiredNumShards = desiredNumShards.WithLabelValues(name)
|
||||||
|
|
||||||
// Initialise some metrics.
|
// Initialise some metrics.
|
||||||
t.shardCapacity.Set(float64(t.cfg.Capacity))
|
t.shardCapacity.Set(float64(t.cfg.Capacity))
|
||||||
t.pendingSamplesMetric.Set(0)
|
t.pendingSamplesMetric.Set(0)
|
||||||
|
t.maxNumShards.Set(float64(t.cfg.MaxShards))
|
||||||
|
t.minNumShards.Set(float64(t.cfg.MinShards))
|
||||||
|
t.desiredNumShards.Set(float64(t.cfg.MinShards))
|
||||||
|
|
||||||
t.shards.start(t.numShards)
|
t.shards.start(t.numShards)
|
||||||
t.watcher.Start()
|
t.watcher.Start()
|
||||||
|
@ -334,6 +370,9 @@ func (t *QueueManager) Stop() {
|
||||||
succeededSamplesTotal.DeleteLabelValues(name)
|
succeededSamplesTotal.DeleteLabelValues(name)
|
||||||
retriedSamplesTotal.DeleteLabelValues(name)
|
retriedSamplesTotal.DeleteLabelValues(name)
|
||||||
shardCapacity.DeleteLabelValues(name)
|
shardCapacity.DeleteLabelValues(name)
|
||||||
|
maxNumShards.DeleteLabelValues(name)
|
||||||
|
minNumShards.DeleteLabelValues(name)
|
||||||
|
desiredNumShards.DeleteLabelValues(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
|
// StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
|
||||||
|
@ -502,6 +541,7 @@ func (t *QueueManager) calculateDesiredShards() {
|
||||||
}
|
}
|
||||||
|
|
||||||
numShards := int(math.Ceil(desiredShards))
|
numShards := int(math.Ceil(desiredShards))
|
||||||
|
t.desiredNumShards.Set(float64(numShards))
|
||||||
if numShards > t.cfg.MaxShards {
|
if numShards > t.cfg.MaxShards {
|
||||||
numShards = t.cfg.MaxShards
|
numShards = t.cfg.MaxShards
|
||||||
} else if numShards < t.cfg.MinShards {
|
} else if numShards < t.cfg.MinShards {
|
||||||
|
|
Loading…
Reference in New Issue