267 lines
7.8 KiB
Go
267 lines
7.8 KiB
Go
|
// Copyright 2013 Prometheus Team
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
package metric
|
||
|
|
||
|
import (
|
||
|
"code.google.com/p/goprotobuf/proto"
|
||
|
"github.com/prometheus/prometheus/coding"
|
||
|
"github.com/prometheus/prometheus/model"
|
||
|
dto "github.com/prometheus/prometheus/model/generated"
|
||
|
"github.com/prometheus/prometheus/storage"
|
||
|
"github.com/prometheus/prometheus/storage/raw"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
// curator is responsible for effectuating a given curation policy across the
|
||
|
// stored samples on-disk. This is useful to compact sparse sample values into
|
||
|
// single sample entities to reduce keyspace load on the datastore.
|
||
|
type curator struct {
|
||
|
// stop functions as a channel that when empty allows the curator to operate.
|
||
|
// The moment a value is ingested inside of it, the curator goes into drain
|
||
|
// mode.
|
||
|
stop chan bool
|
||
|
// samples is the on-disk metric store that is scanned for compaction
|
||
|
// candidates.
|
||
|
samples raw.Persistence
|
||
|
// watermarks is the on-disk store that is scanned for high watermarks for
|
||
|
// given metrics.
|
||
|
watermarks raw.Persistence
|
||
|
// cutOff represents the most recent time up to which values will be curated.
|
||
|
cutOff time.Time
|
||
|
// groupingQuantity represents the number of samples below which encountered
|
||
|
// samples will be dismembered and reaggregated into larger groups.
|
||
|
groupingQuantity uint32
|
||
|
// curationState is the on-disk store where the curation remarks are made for
|
||
|
// how much progress has been made.
|
||
|
curationState raw.Persistence
|
||
|
}
|
||
|
|
||
|
// newCurator builds a new curator for the given LevelDB databases.
|
||
|
func newCurator(cutOff time.Time, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator {
|
||
|
return curator{
|
||
|
cutOff: cutOff,
|
||
|
stop: make(chan bool),
|
||
|
samples: samples,
|
||
|
curationState: curationState,
|
||
|
watermarks: watermarks,
|
||
|
groupingQuantity: groupingQuantity,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// run facilitates the curation lifecycle.
|
||
|
func (c curator) run() (err error) {
|
||
|
var (
|
||
|
decoder watermarkDecoder
|
||
|
filter = watermarkFilter{
|
||
|
stop: c.stop,
|
||
|
}
|
||
|
operator = watermarkOperator{
|
||
|
olderThan: c.cutOff,
|
||
|
groupSize: c.groupingQuantity,
|
||
|
curationState: c.curationState,
|
||
|
}
|
||
|
)
|
||
|
|
||
|
_, err = c.watermarks.ForEach(decoder, filter, operator)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// drain instructs the curator to stop at the next convenient moment as to not
|
||
|
// introduce data inconsistencies.
|
||
|
func (c curator) drain() {
|
||
|
if len(c.stop) == 0 {
|
||
|
c.stop <- true
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// watermarkDecoder converts (dto.Fingerprint, dto.MetricHighWatermark) doubles
|
||
|
// into (model.Fingerprint, model.Watermark) doubles.
|
||
|
type watermarkDecoder struct{}
|
||
|
|
||
|
func (w watermarkDecoder) DecodeKey(in interface{}) (out interface{}, err error) {
|
||
|
var (
|
||
|
key = &dto.Fingerprint{}
|
||
|
bytes = in.([]byte)
|
||
|
)
|
||
|
|
||
|
err = proto.Unmarshal(bytes, key)
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
|
||
|
out = model.NewFingerprintFromRowKey(*key.Signature)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err error) {
|
||
|
var (
|
||
|
dto = &dto.MetricHighWatermark{}
|
||
|
bytes = in.([]byte)
|
||
|
)
|
||
|
|
||
|
err = proto.Unmarshal(bytes, dto)
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
|
||
|
out = model.NewWatermarkFromHighWatermarkDTO(dto)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// watermarkFilter determines whether to include or exclude candidate
|
||
|
// values from the curation process by virtue of how old the high watermark is.
|
||
|
type watermarkFilter struct {
|
||
|
// curationState is the table of CurationKey to CurationValues that remark on
|
||
|
// far along the curation process has gone for a given metric fingerprint.
|
||
|
curationState raw.Persistence
|
||
|
// stop, when non-empty, instructs the filter to stop operation.
|
||
|
stop chan bool
|
||
|
}
|
||
|
|
||
|
func (w watermarkFilter) Filter(key, value interface{}) (result storage.FilterResult) {
|
||
|
var (
|
||
|
fingerprint = key.(model.Fingerprint)
|
||
|
watermark = value.(model.Watermark)
|
||
|
curationKey = fingerprint.ToDTO()
|
||
|
rawCurationValue []byte
|
||
|
err error
|
||
|
curationValue = &dto.CurationValue{}
|
||
|
)
|
||
|
|
||
|
rawCurationValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey))
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
|
||
|
err = proto.Unmarshal(rawCurationValue, curationValue)
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
|
||
|
switch {
|
||
|
case model.NewCurationRemarkFromDTO(curationValue).OlderThanLimit(watermark.Time):
|
||
|
result = storage.ACCEPT
|
||
|
case len(w.stop) != 0:
|
||
|
result = storage.STOP
|
||
|
default:
|
||
|
result = storage.SKIP
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// watermarkOperator scans over the curator.samples table for metrics whose
|
||
|
// high watermark has been determined to be allowable for curation. This type
|
||
|
// is individually responsible for compaction.
|
||
|
//
|
||
|
// The scanning starts from CurationRemark.LastCompletionTimestamp and goes
|
||
|
// forward until the stop point or end of the series is reached.
|
||
|
type watermarkOperator struct {
|
||
|
// olderThan functions as the cutoff when scanning curator.samples for
|
||
|
// uncurated samples to compact. The operator scans forward in the samples
|
||
|
// until olderThan is reached and then stops operation for samples that occur
|
||
|
// after it.
|
||
|
olderThan time.Time
|
||
|
// groupSize is the target quantity of samples to group together for a given
|
||
|
// to-be-written sample. Observed samples of less than groupSize are combined
|
||
|
// up to groupSize if possible. The protocol does not define the behavior if
|
||
|
// observed chunks are larger than groupSize.
|
||
|
groupSize uint32
|
||
|
// curationState is the table of CurationKey to CurationValues that remark on
|
||
|
// far along the curation process has gone for a given metric fingerprint.
|
||
|
curationState raw.Persistence
|
||
|
}
|
||
|
|
||
|
func (w watermarkOperator) Operate(key, value interface{}) (err *storage.OperatorError) {
|
||
|
var (
|
||
|
fingerprint = key.(model.Fingerprint)
|
||
|
watermark = value.(model.Watermark)
|
||
|
queryErr error
|
||
|
hasBeenCurated bool
|
||
|
curationConsistent bool
|
||
|
)
|
||
|
hasBeenCurated, queryErr = w.hasBeenCurated(fingerprint)
|
||
|
if queryErr != nil {
|
||
|
err = &storage.OperatorError{queryErr, false}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if !hasBeenCurated {
|
||
|
// curate
|
||
|
return
|
||
|
}
|
||
|
|
||
|
curationConsistent, queryErr = w.curationConsistent(fingerprint, watermark)
|
||
|
if queryErr != nil {
|
||
|
err = &storage.OperatorError{queryErr, false}
|
||
|
return
|
||
|
}
|
||
|
if curationConsistent {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// curate
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// hasBeenCurated answers true if the provided Fingerprint has been curated in
|
||
|
// in the past.
|
||
|
func (w watermarkOperator) hasBeenCurated(f model.Fingerprint) (curated bool, err error) {
|
||
|
curationKey := &dto.CurationKey{
|
||
|
Fingerprint: f.ToDTO(),
|
||
|
OlderThan: proto.Int64(w.olderThan.Unix()),
|
||
|
MinimumGroupSize: proto.Uint32(w.groupSize),
|
||
|
}
|
||
|
|
||
|
curated, err = w.curationState.Has(coding.NewProtocolBufferEncoder(curationKey))
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// curationConsistent determines whether the given metric is in a dirty state
|
||
|
// and needs curation.
|
||
|
func (w watermarkOperator) curationConsistent(f model.Fingerprint, watermark model.Watermark) (consistent bool, err error) {
|
||
|
var (
|
||
|
rawValue []byte
|
||
|
curationValue = &dto.CurationValue{}
|
||
|
curationKey = &dto.CurationKey{
|
||
|
Fingerprint: f.ToDTO(),
|
||
|
OlderThan: proto.Int64(w.olderThan.Unix()),
|
||
|
MinimumGroupSize: proto.Uint32(w.groupSize),
|
||
|
}
|
||
|
)
|
||
|
|
||
|
rawValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey))
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
err = proto.Unmarshal(rawValue, curationValue)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
curationRemark := model.NewCurationRemarkFromDTO(curationValue)
|
||
|
if !curationRemark.OlderThanLimit(watermark.Time) {
|
||
|
consistent = true
|
||
|
return
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|