prometheus/storage/metric/leveldb/mutable.go

204 lines
5.4 KiB
Go

// Copyright 2012 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
"code.google.com/p/goprotobuf/proto"
registry "github.com/matttproud/golang_instrumentation"
"github.com/matttproud/golang_instrumentation/metrics"
"github.com/matttproud/prometheus/coding"
"github.com/matttproud/prometheus/coding/indexable"
"github.com/matttproud/prometheus/model"
dto "github.com/matttproud/prometheus/model/generated"
"time"
)
var (
appendSuccessCount = &metrics.CounterMetric{}
appendFailureCount = &metrics.CounterMetric{}
)
func init() {
registry.Register("sample_append_success_count_total", "Total successfully appended samples", map[string]string{}, appendSuccessCount)
registry.Register("sample_append_failure_count_total", "Total sample append failures", map[string]string{}, appendFailureCount)
}
func (l *LevelDBMetricPersistence) setLabelPairFingerprints(labelPair *dto.LabelPair, fingerprints *dto.FingerprintCollection) error {
labelPairEncoded := coding.NewProtocolBufferEncoder(labelPair)
fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints)
return l.labelSetToFingerprints.Put(labelPairEncoded, fingerprintsEncoded)
}
func (l *LevelDBMetricPersistence) setLabelNameFingerprints(labelName *dto.LabelName, fingerprints *dto.FingerprintCollection) error {
labelNameEncoded := coding.NewProtocolBufferEncoder(labelName)
fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints)
return l.labelNameToFingerprints.Put(labelNameEncoded, fingerprintsEncoded)
}
func (l *LevelDBMetricPersistence) appendLabelPairFingerprint(labelPair *dto.LabelPair, fingerprint *dto.Fingerprint) (err error) {
has, err := l.HasLabelPair(labelPair)
if err != nil {
return
}
var fingerprints *dto.FingerprintCollection
if has {
fingerprints, err = l.getFingerprintsForLabelSet(labelPair)
if err != nil {
return
}
} else {
fingerprints = &dto.FingerprintCollection{}
}
fingerprints.Member = append(fingerprints.Member, fingerprint)
return l.setLabelPairFingerprints(labelPair, fingerprints)
}
func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *dto.LabelPair, fingerprint *dto.Fingerprint) (err error) {
labelName := &dto.LabelName{
Name: labelPair.Name,
}
has, err := l.HasLabelName(labelName)
if err != nil {
return
}
var fingerprints *dto.FingerprintCollection
if has {
fingerprints, err = l.GetLabelNameFingerprints(labelName)
if err != nil {
return
}
} else {
fingerprints = &dto.FingerprintCollection{}
}
fingerprints.Member = append(fingerprints.Member, fingerprint)
return l.setLabelNameFingerprints(labelName, fingerprints)
}
func (l *LevelDBMetricPersistence) appendFingerprints(m *dto.Metric) (err error) {
fingerprintDTO, err := model.MessageToFingerprintDTO(m)
if err != nil {
return
}
fingerprintKey := coding.NewProtocolBufferEncoder(fingerprintDTO)
metricDTOEncoder := coding.NewProtocolBufferEncoder(m)
err = l.fingerprintToMetrics.Put(fingerprintKey, metricDTOEncoder)
if err != nil {
return
}
labelCount := len(m.LabelPair)
labelPairErrors := make(chan error, labelCount)
labelNameErrors := make(chan error, labelCount)
for _, labelPair := range m.LabelPair {
go func(labelPair *dto.LabelPair) {
labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDTO)
}(labelPair)
go func(labelPair *dto.LabelPair) {
labelPairErrors <- l.appendLabelPairFingerprint(labelPair, fingerprintDTO)
}(labelPair)
}
for i := 0; i < cap(labelPairErrors); i++ {
err = <-labelPairErrors
if err != nil {
return
}
}
for i := 0; i < cap(labelNameErrors); i++ {
err = <-labelNameErrors
if err != nil {
return
}
}
return
}
func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) (err error) {
defer func() {
var m *metrics.CounterMetric = appendSuccessCount
if err != nil {
m = appendFailureCount
}
m.Increment()
}()
operation := func() {
metricDTO := model.SampleToMetricDTO(sample)
indexHas, err := l.hasIndexMetric(metricDTO)
if err != nil {
return
}
if !indexHas {
err = l.indexMetric(metricDTO)
if err != nil {
return
}
err = l.appendFingerprints(metricDTO)
if err != nil {
return
}
}
fingerprintDTO, err := model.MessageToFingerprintDTO(metricDTO)
if err != nil {
return
}
sampleKeyDTO := &dto.SampleKey{
Fingerprint: fingerprintDTO,
Timestamp: indexable.EncodeTime(sample.Timestamp),
}
sampleValueDTO := &dto.SampleValue{
Value: proto.Float32(float32(sample.Value)),
}
sampleKeyEncoded := coding.NewProtocolBufferEncoder(sampleKeyDTO)
sampleValueEncoded := coding.NewProtocolBufferEncoder(sampleValueDTO)
err = l.metricSamples.Put(sampleKeyEncoded, sampleValueEncoded)
if err != nil {
return
}
}
// XXX: Problematic with panics.
accumulator := func(d time.Duration) {
ms := float64(d) / float64(time.Microsecond)
appendLatency.Add(ms)
}
metrics.InstrumentCall(operation, accumulator)
return
}