prometheus/storage/metric/curator.go

268 lines
7.8 KiB
Go

// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"code.google.com/p/goprotobuf/proto"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"time"
)
// curator is responsible for effectuating a given curation policy across the
// stored samples on-disk. This is useful to compact sparse sample values into
// single sample entities to reduce keyspace load on the datastore.
type curator struct {
// stop functions as a channel that when empty allows the curator to operate.
// The moment a value is ingested inside of it, the curator goes into drain
// mode.
stop chan bool
// samples is the on-disk metric store that is scanned for compaction
// candidates.
samples raw.Persistence
// watermarks is the on-disk store that is scanned for high watermarks for
// given metrics.
watermarks raw.Persistence
// cutOff represents the most recent time up to which values will be curated.
cutOff time.Time
// groupingQuantity represents the number of samples below which encountered
// samples will be dismembered and reaggregated into larger groups.
groupingQuantity uint32
// curationState is the on-disk store where the curation remarks are made for
// how much progress has been made.
curationState raw.Persistence
}
// newCurator builds a new curator for the given LevelDB databases.
func newCurator(cutOff time.Time, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator {
return curator{
cutOff: cutOff,
stop: make(chan bool),
samples: samples,
curationState: curationState,
watermarks: watermarks,
groupingQuantity: groupingQuantity,
}
}
// run facilitates the curation lifecycle.
func (c curator) run() (err error) {
var (
decoder watermarkDecoder
filter = watermarkFilter{
stop: c.stop,
curationState: c.curationState,
}
operator = watermarkOperator{
olderThan: c.cutOff,
groupSize: c.groupingQuantity,
curationState: c.curationState,
}
)
_, err = c.watermarks.ForEach(decoder, filter, operator)
return
}
// drain instructs the curator to stop at the next convenient moment as to not
// introduce data inconsistencies.
func (c curator) drain() {
if len(c.stop) == 0 {
c.stop <- true
}
}
// watermarkDecoder converts (dto.Fingerprint, dto.MetricHighWatermark) doubles
// into (model.Fingerprint, model.Watermark) doubles.
type watermarkDecoder struct{}
func (w watermarkDecoder) DecodeKey(in interface{}) (out interface{}, err error) {
var (
key = &dto.Fingerprint{}
bytes = in.([]byte)
)
err = proto.Unmarshal(bytes, key)
if err != nil {
panic(err)
}
out = model.NewFingerprintFromRowKey(*key.Signature)
return
}
func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err error) {
var (
dto = &dto.MetricHighWatermark{}
bytes = in.([]byte)
)
err = proto.Unmarshal(bytes, dto)
if err != nil {
panic(err)
}
out = model.NewWatermarkFromHighWatermarkDTO(dto)
return
}
// watermarkFilter determines whether to include or exclude candidate
// values from the curation process by virtue of how old the high watermark is.
type watermarkFilter struct {
// curationState is the table of CurationKey to CurationValues that remark on
// far along the curation process has gone for a given metric fingerprint.
curationState raw.Persistence
// stop, when non-empty, instructs the filter to stop operation.
stop chan bool
}
func (w watermarkFilter) Filter(key, value interface{}) (result storage.FilterResult) {
var (
fingerprint = key.(model.Fingerprint)
watermark = value.(model.Watermark)
curationKey = fingerprint.ToDTO()
rawCurationValue []byte
err error
curationValue = &dto.CurationValue{}
)
rawCurationValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey))
if err != nil {
panic(err)
}
err = proto.Unmarshal(rawCurationValue, curationValue)
if err != nil {
panic(err)
}
switch {
case model.NewCurationRemarkFromDTO(curationValue).OlderThanLimit(watermark.Time):
result = storage.ACCEPT
case len(w.stop) != 0:
result = storage.STOP
default:
result = storage.SKIP
}
return
}
// watermarkOperator scans over the curator.samples table for metrics whose
// high watermark has been determined to be allowable for curation. This type
// is individually responsible for compaction.
//
// The scanning starts from CurationRemark.LastCompletionTimestamp and goes
// forward until the stop point or end of the series is reached.
type watermarkOperator struct {
// olderThan functions as the cutoff when scanning curator.samples for
// uncurated samples to compact. The operator scans forward in the samples
// until olderThan is reached and then stops operation for samples that occur
// after it.
olderThan time.Time
// groupSize is the target quantity of samples to group together for a given
// to-be-written sample. Observed samples of less than groupSize are combined
// up to groupSize if possible. The protocol does not define the behavior if
// observed chunks are larger than groupSize.
groupSize uint32
// curationState is the table of CurationKey to CurationValues that remark on
// far along the curation process has gone for a given metric fingerprint.
curationState raw.Persistence
}
func (w watermarkOperator) Operate(key, value interface{}) (err *storage.OperatorError) {
var (
fingerprint = key.(model.Fingerprint)
watermark = value.(model.Watermark)
queryErr error
hasBeenCurated bool
curationConsistent bool
)
hasBeenCurated, queryErr = w.hasBeenCurated(fingerprint)
if queryErr != nil {
err = &storage.OperatorError{queryErr, false}
return
}
if !hasBeenCurated {
// curate
return
}
curationConsistent, queryErr = w.curationConsistent(fingerprint, watermark)
if queryErr != nil {
err = &storage.OperatorError{queryErr, false}
return
}
if curationConsistent {
return
}
// curate
return
}
// hasBeenCurated answers true if the provided Fingerprint has been curated in
// in the past.
func (w watermarkOperator) hasBeenCurated(f model.Fingerprint) (curated bool, err error) {
curationKey := &dto.CurationKey{
Fingerprint: f.ToDTO(),
OlderThan: proto.Int64(w.olderThan.Unix()),
MinimumGroupSize: proto.Uint32(w.groupSize),
}
curated, err = w.curationState.Has(coding.NewProtocolBufferEncoder(curationKey))
return
}
// curationConsistent determines whether the given metric is in a dirty state
// and needs curation.
func (w watermarkOperator) curationConsistent(f model.Fingerprint, watermark model.Watermark) (consistent bool, err error) {
var (
rawValue []byte
curationValue = &dto.CurationValue{}
curationKey = &dto.CurationKey{
Fingerprint: f.ToDTO(),
OlderThan: proto.Int64(w.olderThan.Unix()),
MinimumGroupSize: proto.Uint32(w.groupSize),
}
)
rawValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey))
if err != nil {
return
}
err = proto.Unmarshal(rawValue, curationValue)
if err != nil {
return
}
curationRemark := model.NewCurationRemarkFromDTO(curationValue)
if !curationRemark.OlderThanLimit(watermark.Time) {
consistent = true
return
}
return
}