Instrument indexing: queue length, batch sizes and latencies.

Change-Id: I60bcbd24b160e47d418a485d8cffa39344a257c6
This commit is contained in:
Julius Volz 2014-09-24 16:51:18 +02:00 committed by Bjoern Rabenstein
parent aea32b0b4b
commit a746fbb8bc
2 changed files with 65 additions and 1 deletions

View File

@ -141,6 +141,7 @@ func main() {
if err != nil {
glog.Fatal("Error opening disk persistence: ", err)
}
registry.MustRegister(persistence)
o := &local.MemorySeriesStorageOptions{
Persistence: persistence,

View File

@ -23,6 +23,7 @@ import (
"time"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
clientmodel "github.com/prometheus/client_golang/model"
@ -32,6 +33,9 @@ import (
)
const (
namespace = "prometheus"
subsystem = "persistence"
seriesFileName = "series.db"
seriesTempFileName = "series.db.tmp"
@ -82,10 +86,15 @@ type diskPersistence struct {
indexingQueue chan indexingOp
indexingStopped chan struct{}
indexingFlush chan chan int
indexingQueueLength prometheus.Gauge
indexingQueueCapacity prometheus.Metric
indexingBatchSizes prometheus.Summary
indexingBatchLatency prometheus.Summary
}
// NewDiskPersistence returns a newly allocated Persistence backed by local disk storage, ready to use.
func NewDiskPersistence(basePath string, chunkLen int) (Persistence, error) {
func NewDiskPersistence(basePath string, chunkLen int) (*diskPersistence, error) {
if err := os.MkdirAll(basePath, 0700); err != nil {
return nil, err
}
@ -119,11 +128,61 @@ func NewDiskPersistence(basePath string, chunkLen int) (Persistence, error) {
indexingQueue: make(chan indexingOp, indexingQueueCapacity),
indexingStopped: make(chan struct{}),
indexingFlush: make(chan chan int),
indexingQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "indexing_queue_length",
Help: "The number of metrics waiting to be indexed.",
}),
indexingQueueCapacity: prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "indexing_queue_capacity"),
"The capacity of the indexing queue.",
nil, nil,
),
prometheus.GaugeValue,
float64(indexingQueueCapacity),
),
indexingBatchSizes: prometheus.NewSummary(
prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "indexing_batch_sizes",
Help: "Quantiles for indexing batch sizes (number of metrics per batch).",
},
),
indexingBatchLatency: prometheus.NewSummary(
prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "indexing_batch_latency_milliseconds",
Help: "Quantiles for batch indexing latencies in milliseconds.",
},
),
}
go p.processIndexingQueue()
return p, nil
}
// Describe implements prometheus.Collector.
func (p *diskPersistence) Describe(ch chan<- *prometheus.Desc) {
ch <- p.indexingQueueLength.Desc()
ch <- p.indexingQueueCapacity.Desc()
p.indexingBatchSizes.Describe(ch)
p.indexingBatchLatency.Describe(ch)
}
// Collect implements prometheus.Collector.
func (p *diskPersistence) Collect(ch chan<- prometheus.Metric) {
p.indexingQueueLength.Set(float64(len(p.indexingQueue)))
ch <- p.indexingQueueLength
ch <- p.indexingQueueCapacity
p.indexingBatchSizes.Collect(ch)
p.indexingBatchLatency.Collect(ch)
}
// GetFingerprintsForLabelPair implements persistence.
func (p *diskPersistence) GetFingerprintsForLabelPair(lp metric.LabelPair) (clientmodel.Fingerprints, error) {
fps, _, err := p.labelPairToFingerprints.Lookup(lp)
@ -651,6 +710,10 @@ func (p *diskPersistence) processIndexingQueue() {
defer batchTimeout.Stop()
commitBatch := func() {
begin := time.Now()
defer p.indexingBatchLatency.Observe(float64(time.Since(begin) / time.Millisecond))
p.indexingBatchSizes.Observe(float64(batchSize))
if err := p.labelPairToFingerprints.IndexBatch(pairToFPs); err != nil {
glog.Error("Error indexing label pair to fingerprints batch: ", err)
}