Implement batch database sample curator.

This commit introduces to Prometheus a batch database sample curator,
which corroborates the high watermarks for sample series against the
curation watermark table to see whether a curator of a given type
needs to be run.

The curator is an abstract executor, which runs various curation
strategies across the database.  It remarks the progress for each
type of curation processor that runs for a given sample series.

A curation procesor is responsible for effectuating the underlying
batch changes that are request.  In this commit, we introduce the
CompactionProcessor, which takes several bits of runtime metadata and
combine sparse sample entries in the database together to form larger
groups.  For instance, for a given series it would be possible to
have the curator effectuate the following grouping:

- Samples Older than Two Weeks: Grouped into Bunches of 10000
- Samples Older than One Week: Grouped into Bunches of 1000
- Samples Older than One Day: Grouped into Bunches of 100
- Samples Older than One Hour: Grouped into Bunches of 10

The benefits hereof of such a compaction are 1. a smaller search
space in the database keyspace, 2. better employment of compression
for repetious values, and 3. reduced seek times.
This commit is contained in:
Matt T. Proud 2013-04-05 18:03:45 +02:00
parent 169a7dc26c
commit b3e34c6658
18 changed files with 1758 additions and 816 deletions

View File

@ -29,6 +29,8 @@ var (
snappyVersion string
)
// BuildInfo encapsulates compile-time metadata about Prometheus made available
// via go tool ld such that this can be reported on-demand.
var BuildInfo = map[string]string{
"version": buildVersion,
"branch": buildBranch,

View File

@ -22,7 +22,7 @@ include ../Makefile.INCLUDE
generated: generated-stamp
generated-stamp: data.proto
protoc --proto_path=$(PREFIX)/include:. --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto
protoc --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto
touch $@
clean:

View File

@ -14,6 +14,9 @@
package model
import (
"bytes"
"code.google.com/p/goprotobuf/proto"
"fmt"
dto "github.com/prometheus/prometheus/model/generated"
"time"
)
@ -24,16 +27,77 @@ type CurationRemark struct {
LastCompletionTimestamp time.Time
}
// OlderThanLimit answers whether this CurationRemark is older than the provided
// OlderThan answers whether this CurationRemark is older than the provided
// cutOff time.
func (c CurationRemark) OlderThanLimit(cutOff time.Time) bool {
return c.LastCompletionTimestamp.Before(cutOff)
func (c CurationRemark) OlderThan(t time.Time) bool {
return c.LastCompletionTimestamp.Before(t)
}
// Equal answers whether the two CurationRemarks are equivalent.
func (c CurationRemark) Equal(o CurationRemark) bool {
return c.LastCompletionTimestamp.Equal(o.LastCompletionTimestamp)
}
func (c CurationRemark) String() string {
return fmt.Sprintf("Last curated at %s", c.LastCompletionTimestamp)
}
// ToDTO generates the dto.CurationValue representation of this.
func (c CurationRemark) ToDTO() *dto.CurationValue {
return &dto.CurationValue{
LastCompletionTimestamp: proto.Int64(c.LastCompletionTimestamp.Unix()),
}
}
// NewCurationRemarkFromDTO builds CurationRemark from the provided
// dto.CurationValue object.
func NewCurationRemarkFromDTO(d *dto.CurationValue) CurationRemark {
return CurationRemark{
LastCompletionTimestamp: time.Unix(*d.LastCompletionTimestamp, 0),
LastCompletionTimestamp: time.Unix(*d.LastCompletionTimestamp, 0).UTC(),
}
}
// CurationKey provides a representation of dto.CurationKey with asociated
// business logic methods attached to it to enhance code readability.
type CurationKey struct {
Fingerprint Fingerprint
ProcessorMessageRaw []byte
ProcessorMessageTypeName string
IgnoreYoungerThan time.Duration
}
// Equal answers whether the two CurationKeys are equivalent.
func (c CurationKey) Equal(o CurationKey) (equal bool) {
switch {
case !c.Fingerprint.Equal(o.Fingerprint):
return
case bytes.Compare(c.ProcessorMessageRaw, o.ProcessorMessageRaw) != 0:
return
case c.ProcessorMessageTypeName != o.ProcessorMessageTypeName:
return
case c.IgnoreYoungerThan != o.IgnoreYoungerThan:
return
}
return true
}
// ToDTO generates a dto.CurationKey representation of this.
func (c CurationKey) ToDTO() *dto.CurationKey {
return &dto.CurationKey{
Fingerprint: c.Fingerprint.ToDTO(),
ProcessorMessageRaw: c.ProcessorMessageRaw,
ProcessorMessageTypeName: proto.String(c.ProcessorMessageTypeName),
IgnoreYoungerThan: proto.Int64(int64(c.IgnoreYoungerThan)),
}
}
// NewCurationKeyFromDTO builds CurationKey from the provided dto.CurationKey.
func NewCurationKeyFromDTO(d *dto.CurationKey) CurationKey {
return CurationKey{
Fingerprint: NewFingerprintFromDTO(d.Fingerprint),
ProcessorMessageRaw: d.ProcessorMessageRaw,
ProcessorMessageTypeName: *d.ProcessorMessageTypeName,
IgnoreYoungerThan: time.Duration(*d.IgnoreYoungerThan),
}
}

View File

@ -65,6 +65,14 @@ message MetricHighWatermark {
optional int64 timestamp = 1;
}
// CompactionProcessorDefinition models a curation process across the sample
// corpus that ensures that sparse samples.
message CompactionProcessorDefinition {
// minimum_group_size identifies how minimally samples should be grouped
// together to write a new SampleValueSeries chunk.
optional uint32 minimum_group_size = 1;
}
// CurationKey models the state of curation for a given metric fingerprint and
// its associated samples. The time series database only knows about compaction
// and resampling behaviors that are explicitly defined to it in its runtime
@ -73,19 +81,38 @@ message MetricHighWatermark {
// effectuation state for a given metric fingerprint is.
//
// For instance, how far along as a rule for (Fingerprint A, Samples Older Than
// B, and Grouped Together in Size of C) has been effectuated on-disk.
// B, and Curation Processor) has been effectuated on-disk.
message CurationKey {
// fingerprint identifies the fingerprint for the given policy.
optional Fingerprint fingerprint = 1;
// older_than represents in seconds relative to when curation cycle starts
// into the past when the curator should stop operating on a given metric
// fingerprint's samples:
optional Fingerprint fingerprint = 1;
// processor_message_type_name identifies the underlying message type that
// was used to encode processor_message_raw.
optional string processor_message_type_name = 2;
// processor_message_raw identifies the serialized ProcessorSignature for this
// operation.
optional bytes processor_message_raw = 3;
// ignore_younger_than represents in seconds relative to when the curation
// cycle start when the curator should stop operating. For instance, if
// the curation cycle starts at time T and the curation remark dictates that
// the curation should starts processing samples at time S, the curator should
// work from S until ignore_younger_than seconds before T:
//
// [Oldest Sample Time, time.Now().Sub(time.Second * older_than))
optional int64 older_than = 2;
// minimum_group_size identifies how minimally samples should be grouped
// together to write a new SampleValueSeries chunk.
optional uint32 minimum_group_size = 3;
// PAST NOW FUTURE
//
// S--------------->|----------T
// |---IYT----|
//
// [Curation Resumption Time (S), T - IYT)
optional int64 ignore_younger_than = 4;
// This could be populated by decoding the generated descriptor file into a
// FileDescriptorSet message and extracting the type definition for the given
// message schema that describes processor_message_type_name.
//
// optional google.protobuf.DescriptorProto processor_message_type_descriptor_raw = 5;
}
// CurationValue models the progress for a given CurationKey.

View File

@ -1,2 +1,2 @@
data.pb.go
descriptor.blob
descriptor.blob

View File

@ -92,7 +92,11 @@ func (s SampleValue) ToDTO() *float64 {
}
func (v SampleValue) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf("\"%f\"", v)), nil
return []byte(fmt.Sprintf(`"%f"`, v)), nil
}
func (v SampleValue) String() string {
return fmt.Sprint(float64(v))
}
func (s SamplePair) MarshalJSON() ([]byte, error) {
@ -108,6 +112,19 @@ func (s SamplePair) Equal(o SamplePair) bool {
return s.Value.Equal(o.Value) && s.Timestamp.Equal(o.Timestamp)
}
func (s SamplePair) ToDTO() (out *dto.SampleValueSeries_Value) {
out = &dto.SampleValueSeries_Value{
Timestamp: proto.Int64(s.Timestamp.Unix()),
Value: s.Value.ToDTO(),
}
return
}
func (s SamplePair) String() string {
return fmt.Sprintf("SamplePair at %s of %s", s.Timestamp, s.Value)
}
type Values []SamplePair
func (v Values) Len() int {
@ -174,6 +191,40 @@ func (v Values) TruncateBefore(t time.Time) (values Values) {
return
}
func (v Values) ToDTO() (out *dto.SampleValueSeries) {
out = &dto.SampleValueSeries{}
for _, value := range v {
out.Value = append(out.Value, value.ToDTO())
}
return
}
func (v Values) ToSampleKey(f Fingerprint) SampleKey {
return SampleKey{
Fingerprint: f,
FirstTimestamp: v[0].Timestamp,
LastTimestamp: v[len(v)-1].Timestamp,
SampleCount: uint32(len(v)),
}
}
func (v Values) String() string {
buffer := bytes.Buffer{}
fmt.Fprintf(&buffer, "[")
for i, value := range v {
fmt.Fprintf(&buffer, "%d. %s", i, value)
if i != len(v)-1 {
fmt.Fprintf(&buffer, "\n")
}
}
fmt.Fprintf(&buffer, "]")
return buffer.String()
}
func NewValuesFromDTO(dto *dto.SampleValueSeries) (v Values) {
for _, value := range dto.Value {
v = append(v, SamplePair{

View File

@ -15,6 +15,7 @@ package model
import (
"code.google.com/p/goprotobuf/proto"
"fmt"
"github.com/prometheus/prometheus/coding/indexable"
dto "github.com/prometheus/prometheus/model/generated"
"time"
@ -67,6 +68,10 @@ func (s SampleKey) ToPartialDTO(out *dto.SampleKey) {
return
}
func (s SampleKey) String() string {
return fmt.Sprintf("SampleKey for %s at %s to %s with %d values.", s.Fingerprint, s.FirstTimestamp, s.LastTimestamp, s.SampleCount)
}
// NewSampleKeyFromDTO builds a new SampleKey from a provided data-transfer
// object.
func NewSampleKeyFromDTO(dto *dto.SampleKey) SampleKey {

View File

@ -41,11 +41,11 @@ const (
// alert is used to track active (pending/firing) alerts over time.
type alert struct {
// The name of the alert.
name string
name string
// The vector element labelset triggering this alert.
metric model.Metric
metric model.Metric
// The state of the alert (PENDING or FIRING).
state alertState
state alertState
// The time when the alert first transitioned into PENDING state.
activeSince time.Time
}
@ -71,14 +71,14 @@ func (a alert) sample(timestamp time.Time, value model.SampleValue) model.Sample
// An alerting rule generates alerts from its vector expression.
type AlertingRule struct {
// The name of the alert.
name string
// The vector expression from which to generate alerts.
vector ast.VectorNode
name string
// The vector expression from which to generate alerts.
vector ast.VectorNode
// The duration for which a labelset needs to persist in the expression
// output vector before an alert transitions from PENDING to FIRING state.
holdDuration time.Duration
// Extra labels to attach to the resulting alert sample vectors.
labels model.LabelSet
labels model.LabelSet
// A map of alerts which are currently active (PENDING or FIRING), keyed by
// the fingerprint of the labelset they correspond to.
activeAlerts map[model.Fingerprint]*alert

View File

@ -34,6 +34,19 @@ const (
ACCEPT
)
func (f FilterResult) String() string {
switch f {
case STOP:
return "STOP"
case SKIP:
return "SKIP"
case ACCEPT:
return "ACCEPT"
}
panic("unknown")
}
type OperatorErrorType int
type OperatorError struct {

View File

@ -15,14 +15,36 @@ package metric
import (
"code.google.com/p/goprotobuf/proto"
"fmt"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"strings"
"time"
)
// watermarkFilter determines whether to include or exclude candidate
// values from the curation process by virtue of how old the high watermark is.
type watermarkFilter struct {
// curationState is the data store for curation remarks.
curationState raw.Persistence
// ignoreYoungerThan conveys this filter's policy of not working on elements
// younger than a given relative time duration. This is persisted to the
// curation remark database (curationState) to indicate how far a given
// policy of this type has progressed.
ignoreYoungerThan time.Duration
// processor is the post-processor that performs whatever action is desired on
// the data that is deemed valid to be worked on.
processor processor
// stop functions as the global stop channel for all future operations.
stop chan bool
// stopAt is used to determine the elegibility of series for compaction.
stopAt time.Time
}
// curator is responsible for effectuating a given curation policy across the
// stored samples on-disk. This is useful to compact sparse sample values into
// single sample entities to reduce keyspace load on the datastore.
@ -31,51 +53,103 @@ type curator struct {
// The moment a value is ingested inside of it, the curator goes into drain
// mode.
stop chan bool
// samples is the on-disk metric store that is scanned for compaction
// candidates.
samples raw.Persistence
// watermarks is the on-disk store that is scanned for high watermarks for
// given metrics.
watermarks raw.Persistence
// recencyThreshold represents the most recent time up to which values will be
// curated.
recencyThreshold time.Duration
// groupingQuantity represents the number of samples below which encountered
// samples will be dismembered and reaggregated into larger groups.
groupingQuantity uint32
// curationState is the on-disk store where the curation remarks are made for
// how much progress has been made.
}
// watermarkDecoder converts (dto.Fingerprint, dto.MetricHighWatermark) doubles
// into (model.Fingerprint, model.Watermark) doubles.
type watermarkDecoder struct{}
// watermarkOperator scans over the curator.samples table for metrics whose
// high watermark has been determined to be allowable for curation. This type
// is individually responsible for compaction.
//
// The scanning starts from CurationRemark.LastCompletionTimestamp and goes
// forward until the stop point or end of the series is reached.
type watermarkOperator struct {
// curationState is the data store for curation remarks.
curationState raw.Persistence
// diskFrontier models the available seekable ranges for the provided
// sampleIterator.
diskFrontier diskFrontier
// ignoreYoungerThan is passed into the curation remark for the given series.
ignoreYoungerThan time.Duration
// processor is responsible for executing a given stategy on the
// to-be-operated-on series.
processor processor
// sampleIterator is a snapshotted iterator for the time series.
sampleIterator leveldb.Iterator
// samples
samples raw.Persistence
// stopAt is a cue for when to stop mutating a given series.
stopAt time.Time
}
// newCurator builds a new curator for the given LevelDB databases.
func newCurator(recencyThreshold time.Duration, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator {
func newCurator() curator {
return curator{
recencyThreshold: recencyThreshold,
stop: make(chan bool),
samples: samples,
curationState: curationState,
watermarks: watermarks,
groupingQuantity: groupingQuantity,
stop: make(chan bool),
}
}
// run facilitates the curation lifecycle.
func (c curator) run(instant time.Time) (err error) {
decoder := watermarkDecoder{}
filter := watermarkFilter{
stop: c.stop,
curationState: c.curationState,
groupSize: c.groupingQuantity,
recencyThreshold: c.recencyThreshold,
//
// recencyThreshold represents the most recent time up to which values will be
// curated.
// curationState is the on-disk store where the curation remarks are made for
// how much progress has been made.
func (c curator) run(ignoreYoungerThan time.Duration, instant time.Time, processor processor, curationState, samples, watermarks *leveldb.LevelDBPersistence) (err error) {
defer func(t time.Time) {
duration := float64(time.Since(t))
labels := map[string]string{
cutOff: fmt.Sprint(ignoreYoungerThan),
processorName: processor.Name(),
result: success,
}
if err != nil {
labels[result] = failure
}
curationDuration.IncrementBy(labels, duration)
curationDurations.Add(labels, duration)
}(time.Now())
iterator := samples.NewIterator(true)
defer iterator.Close()
diskFrontier, err := newDiskFrontier(iterator)
if err != nil {
return
}
operator := watermarkOperator{
olderThan: instant.Add(-1 * c.recencyThreshold),
groupSize: c.groupingQuantity,
curationState: c.curationState,
if diskFrontier == nil {
// No sample database exists; no work to do!
return
}
_, err = c.watermarks.ForEach(decoder, filter, operator)
decoder := watermarkDecoder{}
filter := watermarkFilter{
curationState: curationState,
processor: processor,
ignoreYoungerThan: ignoreYoungerThan,
stop: c.stop,
stopAt: instant.Add(-1 * ignoreYoungerThan),
}
// Right now, the ability to stop a curation is limited to the beginning of
// each fingerprint cycle. It is impractical to cease the work once it has
// begun for a given series.
operator := watermarkOperator{
curationState: curationState,
diskFrontier: *diskFrontier,
processor: processor,
ignoreYoungerThan: ignoreYoungerThan,
sampleIterator: iterator,
samples: samples,
stopAt: instant.Add(-1 * ignoreYoungerThan),
}
_, err = watermarks.ForEach(decoder, filter, operator)
return
}
@ -88,35 +162,27 @@ func (c curator) drain() {
}
}
// watermarkDecoder converts (dto.Fingerprint, dto.MetricHighWatermark) doubles
// into (model.Fingerprint, model.Watermark) doubles.
type watermarkDecoder struct{}
func (w watermarkDecoder) DecodeKey(in interface{}) (out interface{}, err error) {
var (
key = &dto.Fingerprint{}
bytes = in.([]byte)
)
key := &dto.Fingerprint{}
bytes := in.([]byte)
err = proto.Unmarshal(bytes, key)
if err != nil {
panic(err)
return
}
out = model.NewFingerprintFromRowKey(*key.Signature)
out = model.NewFingerprintFromDTO(key)
return
}
func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err error) {
var (
dto = &dto.MetricHighWatermark{}
bytes = in.([]byte)
)
dto := &dto.MetricHighWatermark{}
bytes := in.([]byte)
err = proto.Unmarshal(bytes, dto)
if err != nil {
panic(err)
return
}
out = model.NewWatermarkFromHighWatermarkDTO(dto)
@ -124,149 +190,181 @@ func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err erro
return
}
// watermarkFilter determines whether to include or exclude candidate
// values from the curation process by virtue of how old the high watermark is.
type watermarkFilter struct {
// curationState is the table of CurationKey to CurationValues that rema
// far along the curation process has gone for a given metric fingerprint.
curationState raw.Persistence
// stop, when non-empty, instructs the filter to stop operation.
stop chan bool
// groupSize refers to the target groupSize from the curator.
groupSize uint32
// recencyThreshold refers to the target recencyThreshold from the curator.
recencyThreshold time.Duration
func (w watermarkFilter) shouldStop() bool {
return len(w.stop) != 0
}
func (w watermarkFilter) Filter(key, value interface{}) (result storage.FilterResult) {
fingerprint := key.(model.Fingerprint)
watermark := value.(model.Watermark)
curationKey := &dto.CurationKey{
Fingerprint: fingerprint.ToDTO(),
MinimumGroupSize: proto.Uint32(w.groupSize),
OlderThan: proto.Int64(int64(w.recencyThreshold)),
func getCurationRemark(states raw.Persistence, processor processor, ignoreYoungerThan time.Duration, fingerprint model.Fingerprint) (remark *model.CurationRemark, err error) {
rawSignature, err := processor.Signature()
if err != nil {
return
}
curationKey := model.CurationKey{
Fingerprint: fingerprint,
ProcessorMessageRaw: rawSignature,
ProcessorMessageTypeName: processor.Name(),
IgnoreYoungerThan: ignoreYoungerThan,
}.ToDTO()
curationValue := &dto.CurationValue{}
rawCurationValue, err := w.curationState.Get(coding.NewProtocolBuffer(curationKey))
rawKey := coding.NewProtocolBuffer(curationKey)
has, err := states.Has(rawKey)
if err != nil {
panic(err)
return
}
if !has {
return
}
rawCurationValue, err := states.Get(rawKey)
if err != nil {
return
}
err = proto.Unmarshal(rawCurationValue, curationValue)
if err != nil {
panic(err)
return
}
switch {
case model.NewCurationRemarkFromDTO(curationValue).OlderThanLimit(watermark.Time):
result = storage.ACCEPT
case len(w.stop) != 0:
result = storage.STOP
default:
result = storage.SKIP
}
baseRemark := model.NewCurationRemarkFromDTO(curationValue)
remark = &baseRemark
return
}
// watermarkOperator scans over the curator.samples table for metrics whose
// high watermark has been determined to be allowable for curation. This type
// is individually responsible for compaction.
//
// The scanning starts from CurationRemark.LastCompletionTimestamp and goes
// forward until the stop point or end of the series is reached.
type watermarkOperator struct {
// olderThan functions as the cutoff when scanning curator.samples for
// uncurated samples to compact. The operator scans forward in the samples
// until olderThan is reached and then stops operation for samples that occur
// after it.
olderThan time.Time
// groupSize is the target quantity of samples to group together for a given
// to-be-written sample. Observed samples of less than groupSize are combined
// up to groupSize if possible. The protocol does not define the behavior if
// observed chunks are larger than groupSize.
groupSize uint32
// curationState is the table of CurationKey to CurationValues that remark on
// far along the curation process has gone for a given metric fingerprint.
curationState raw.Persistence
}
func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult) {
defer func() {
labels := map[string]string{
cutOff: fmt.Sprint(w.ignoreYoungerThan),
result: strings.ToLower(r.String()),
processorName: w.processor.Name(),
}
func (w watermarkOperator) Operate(key, value interface{}) (err *storage.OperatorError) {
var (
fingerprint = key.(model.Fingerprint)
watermark = value.(model.Watermark)
queryErr error
hasBeenCurated bool
curationConsistent bool
)
hasBeenCurated, queryErr = w.hasBeenCurated(fingerprint)
if queryErr != nil {
err = &storage.OperatorError{queryErr, false}
return
curationFilterOperations.Increment(labels)
}()
if w.shouldStop() {
return storage.STOP
}
if !hasBeenCurated {
// curate
fingerprint := key.(model.Fingerprint)
curationRemark, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, fingerprint)
if err != nil {
return
}
curationConsistent, queryErr = w.curationConsistent(fingerprint, watermark)
if queryErr != nil {
err = &storage.OperatorError{queryErr, false}
if curationRemark == nil {
r = storage.ACCEPT
return
}
if !curationRemark.OlderThan(w.stopAt) {
return storage.SKIP
}
watermark := value.(model.Watermark)
if !curationRemark.OlderThan(watermark.Time) {
return storage.SKIP
}
curationConsistent, err := w.curationConsistent(fingerprint, watermark)
if err != nil {
return
}
if curationConsistent {
return
return storage.SKIP
}
// curate
return
}
// hasBeenCurated answers true if the provided Fingerprint has been curated in
// in the past.
func (w watermarkOperator) hasBeenCurated(f model.Fingerprint) (curated bool, err error) {
curationKey := &dto.CurationKey{
Fingerprint: f.ToDTO(),
OlderThan: proto.Int64(w.olderThan.Unix()),
MinimumGroupSize: proto.Uint32(w.groupSize),
}
curated, err = w.curationState.Has(coding.NewProtocolBuffer(curationKey))
return
return storage.ACCEPT
}
// curationConsistent determines whether the given metric is in a dirty state
// and needs curation.
func (w watermarkOperator) curationConsistent(f model.Fingerprint, watermark model.Watermark) (consistent bool, err error) {
var (
rawValue []byte
curationValue = &dto.CurationValue{}
curationKey = &dto.CurationKey{
Fingerprint: f.ToDTO(),
OlderThan: proto.Int64(w.olderThan.Unix()),
MinimumGroupSize: proto.Uint32(w.groupSize),
}
)
rawValue, err = w.curationState.Get(coding.NewProtocolBuffer(curationKey))
func (w watermarkFilter) curationConsistent(f model.Fingerprint, watermark model.Watermark) (consistent bool, err error) {
curationRemark, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, f)
if err != nil {
return
}
err = proto.Unmarshal(rawValue, curationValue)
if err != nil {
return
}
curationRemark := model.NewCurationRemarkFromDTO(curationValue)
if !curationRemark.OlderThanLimit(watermark.Time) {
if !curationRemark.OlderThan(watermark.Time) {
consistent = true
return
}
return
}
func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorError) {
fingerprint := key.(model.Fingerprint)
seriesFrontier, err := newSeriesFrontier(fingerprint, w.diskFrontier, w.sampleIterator)
if err != nil || seriesFrontier == nil {
// An anomaly with the series frontier is severe in the sense that some sort
// of an illegal state condition exists in the storage layer, which would
// probably signify an illegal disk frontier.
return &storage.OperatorError{err, false}
}
curationState, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, fingerprint)
if err != nil {
// An anomaly with the curation remark is likely not fatal in the sense that
// there was a decoding error with the entity and shouldn't be cause to stop
// work. The process will simply start from a pessimistic work time and
// work forward. With an idempotent processor, this is safe.
return &storage.OperatorError{err, true}
}
startKey := model.SampleKey{
Fingerprint: fingerprint,
FirstTimestamp: seriesFrontier.optimalStartTime(curationState),
}
prospectiveKey, err := coding.NewProtocolBuffer(startKey.ToDTO()).Encode()
if err != nil {
// An encoding failure of a key is no reason to stop.
return &storage.OperatorError{err, true}
}
if !w.sampleIterator.Seek(prospectiveKey) {
// LevelDB is picky about the seek ranges. If an iterator was invalidated,
// no work may occur, and the iterator cannot be recovered.
return &storage.OperatorError{fmt.Errorf("Illegal Condition: Iterator invalidated due to seek range."), false}
}
newestAllowedSample := w.stopAt
if !newestAllowedSample.Before(seriesFrontier.lastSupertime) {
newestAllowedSample = seriesFrontier.lastSupertime
}
lastTime, err := w.processor.Apply(w.sampleIterator, w.samples, newestAllowedSample, fingerprint)
if err != nil {
// We can't divine the severity of a processor error without refactoring the
// interface.
return &storage.OperatorError{err, false}
}
err = w.refreshCurationRemark(fingerprint, lastTime)
if err != nil {
// Under the assumption that the processors are idempotent, they can be
// re-run; thusly, the commitment of the curation remark is no cause
// to cease further progress.
return &storage.OperatorError{err, true}
}
return
}
func (w watermarkOperator) refreshCurationRemark(f model.Fingerprint, finished time.Time) (err error) {
signature, err := w.processor.Signature()
if err != nil {
return
}
curationKey := model.CurationKey{
Fingerprint: f,
ProcessorMessageRaw: signature,
ProcessorMessageTypeName: w.processor.Name(),
IgnoreYoungerThan: w.ignoreYoungerThan,
}.ToDTO()
curationValue := model.CurationRemark{
LastCompletionTimestamp: finished,
}.ToDTO()
err = w.curationState.Put(coding.NewProtocolBuffer(curationKey), coding.NewProtocolBuffer(curationValue))
return
}

View File

@ -1,523 +0,0 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"code.google.com/p/goprotobuf/proto"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage/raw/leveldb"
fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test"
"testing"
"time"
)
type (
curationState struct {
fingerprint string
groupSize int
recencyThreshold time.Duration
lastCurated time.Time
}
watermarkState struct {
fingerprint string
lastAppended time.Time
}
sample struct {
time time.Time
value model.SampleValue
}
sampleGroup struct {
fingerprint string
values []sample
}
in struct {
curationStates fixture.Pairs
watermarkStates fixture.Pairs
sampleGroups fixture.Pairs
recencyThreshold time.Duration
groupSize uint32
}
)
func (c curationState) Get() (key, value coding.Encoder) {
key = coding.NewProtocolBuffer(&dto.CurationKey{
Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint).ToDTO(),
MinimumGroupSize: proto.Uint32(uint32(c.groupSize)),
OlderThan: proto.Int64(int64(c.recencyThreshold)),
})
value = coding.NewProtocolBuffer(&dto.CurationValue{
LastCompletionTimestamp: proto.Int64(c.lastCurated.Unix()),
})
return
}
func (w watermarkState) Get() (key, value coding.Encoder) {
key = coding.NewProtocolBuffer(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO())
value = coding.NewProtocolBuffer(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO())
return
}
func (s sampleGroup) Get() (key, value coding.Encoder) {
key = coding.NewProtocolBuffer(&dto.SampleKey{
Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint).ToDTO(),
Timestamp: indexable.EncodeTime(s.values[0].time),
LastTimestamp: proto.Int64(s.values[len(s.values)-1].time.Unix()),
SampleCount: proto.Uint32(uint32(len(s.values))),
})
series := &dto.SampleValueSeries{}
for _, value := range s.values {
series.Value = append(series.Value, &dto.SampleValueSeries_Value{
Timestamp: proto.Int64(value.time.Unix()),
Value: value.value.ToDTO(),
})
}
value = coding.NewProtocolBuffer(series)
return
}
func TestCurator(t *testing.T) {
var (
scenarios = []struct {
in in
}{
{
in: in{
recencyThreshold: 1 * time.Hour,
groupSize: 5,
curationStates: fixture.Pairs{
curationState{
fingerprint: "0001-A-1-Z",
groupSize: 5,
recencyThreshold: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
},
curationState{
fingerprint: "0002-A-2-Z",
groupSize: 5,
recencyThreshold: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
},
// This rule should effectively be ignored.
curationState{
fingerprint: "0002-A-2-Z",
groupSize: 2,
recencyThreshold: 30 * time.Minute,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
},
},
watermarkStates: fixture.Pairs{
watermarkState{
fingerprint: "0001-A-1-Z",
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
},
watermarkState{
fingerprint: "0002-A-2-Z",
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
},
},
sampleGroups: fixture.Pairs{
sampleGroup{
fingerprint: "0001-A-1-Z",
values: []sample{
{
time: testInstant.Add(-1 * 90 * time.Minute),
value: 0,
},
{
time: testInstant.Add(-1 * 85 * time.Minute),
value: 1,
},
{
time: testInstant.Add(-1 * 80 * time.Minute),
value: 2,
},
{
time: testInstant.Add(-1 * 75 * time.Minute),
value: 3,
},
{
time: testInstant.Add(-1 * 70 * time.Minute),
value: 4,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: []sample{
{
time: testInstant.Add(-1 * 65 * time.Minute),
value: 0,
},
{
time: testInstant.Add(-1 * 60 * time.Minute),
value: 1,
},
{
time: testInstant.Add(-1 * 55 * time.Minute),
value: 2,
},
{
time: testInstant.Add(-1 * 50 * time.Minute),
value: 3,
},
{
time: testInstant.Add(-1 * 45 * time.Minute),
value: 4,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: []sample{
{
time: testInstant.Add(-1 * 40 * time.Minute),
value: 0,
},
{
time: testInstant.Add(-1 * 35 * time.Minute),
value: 1,
},
{
time: testInstant.Add(-1 * 30 * time.Minute),
value: 2,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: []sample{
{
time: testInstant.Add(-1 * 25 * time.Minute),
value: 0,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: []sample{
{
time: testInstant.Add(-1 * 35 * time.Minute),
value: 1,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: []sample{
{
time: testInstant.Add(-1 * 30 * time.Minute),
value: 2,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 90 * time.Minute),
value: 0,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 89 * time.Minute),
value: 1,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 88 * time.Minute),
value: 2,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 87 * time.Minute),
value: 3,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 86 * time.Minute),
value: 4,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 85 * time.Minute),
value: 5,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 84 * time.Minute),
value: 6,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 83 * time.Minute),
value: 7,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 82 * time.Minute),
value: 8,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 81 * time.Minute),
value: 9,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 80 * time.Minute),
value: 10,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 79 * time.Minute),
value: 11,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 78 * time.Minute),
value: 12,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 77 * time.Minute),
value: 13,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 76 * time.Minute),
value: 14,
},
{
time: testInstant.Add(-1 * 75 * time.Minute),
value: 15,
},
{
time: testInstant.Add(-1 * 74 * time.Minute),
value: 16,
},
{
time: testInstant.Add(-1 * 73 * time.Minute),
value: 17,
},
{
time: testInstant.Add(-1 * 72 * time.Minute),
value: 18,
},
{
time: testInstant.Add(-1 * 71 * time.Minute),
value: 19,
},
{
time: testInstant.Add(-1 * 70 * time.Minute),
value: 20,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 69 * time.Minute),
value: 21,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 68 * time.Minute),
value: 22,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 67 * time.Minute),
value: 23,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 66 * time.Minute),
value: 24,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 65 * time.Minute),
value: 25,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 64 * time.Minute),
value: 26,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 63 * time.Minute),
value: 27,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 62 * time.Minute),
value: 28,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 61 * time.Minute),
value: 29,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 60 * time.Minute),
value: 30,
},
},
},
},
},
},
}
)
for _, scenario := range scenarios {
curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.in.curationStates))
defer curatorDirectory.Close()
watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.in.watermarkStates))
defer watermarkDirectory.Close()
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups))
defer sampleDirectory.Close()
curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0)
if err != nil {
t.Fatal(err)
}
defer curatorStates.Close()
watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0)
if err != nil {
t.Fatal(err)
}
defer watermarkStates.Close()
samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0)
if err != nil {
t.Fatal(err)
}
defer samples.Close()
c := newCurator(scenario.in.recencyThreshold, scenario.in.groupSize, curatorStates, samples, watermarkStates)
c.run(testInstant)
}
}

View File

@ -51,7 +51,7 @@ func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) {
lastKey, err := extractSampleKey(i)
if err != nil {
panic(err)
panic(fmt.Sprintln(err, i.Key(), i.Value()))
}
if !i.SeekToFirst() || i.Key() == nil {
@ -90,10 +90,8 @@ func (f seriesFrontier) String() string {
// fingerprint. A nil diskFrontier will be returned if the series cannot
// be found in the store.
func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) (s *seriesFrontier, err error) {
var (
lowerSeek = firstSupertime
upperSeek = lastSupertime
)
lowerSeek := firstSupertime
upperSeek := lastSupertime
// If the diskFrontier for this iterator says that the candidate fingerprint
// is outside of its seeking domain, there is no way that a seriesFrontier
@ -180,3 +178,44 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator)
return
}
// Contains indicates whether a given time value is within the recorded
// interval.
func (s seriesFrontier) Contains(t time.Time) bool {
return !(t.Before(s.firstSupertime) || t.After(s.lastTime))
}
// InSafeSeekRange indicates whether the time is within the recorded time range
// and is safely seekable such that a seek does not result in an iterator point
// after the last value of the series or outside of the entire store.
func (s seriesFrontier) InSafeSeekRange(t time.Time) (safe bool) {
if !s.Contains(t) {
return
}
if s.lastSupertime.Before(t) {
return
}
return true
}
func (s seriesFrontier) After(t time.Time) bool {
return s.firstSupertime.After(t)
}
// optimalStartTime indicates what the best start time for a curation operation
// should be given the curation remark.
func (s seriesFrontier) optimalStartTime(remark *model.CurationRemark) (t time.Time) {
switch {
case remark == nil:
t = s.firstSupertime
case s.After(remark.LastCompletionTimestamp):
t = s.firstSupertime
case !s.InSafeSeekRange(remark.LastCompletionTimestamp):
t = s.lastSupertime
default:
t = remark.LastCompletionTimestamp
}
return
}

View File

@ -54,6 +54,9 @@ const (
setLabelNameFingerprints = "set_label_name_fingerprints"
setLabelPairFingerprints = "set_label_pair_fingerprints"
writeMemory = "write_memory"
cutOff = "recency_threshold"
processorName = "processor"
)
var (
@ -65,6 +68,7 @@ var (
curationDuration = metrics.NewCounter()
curationDurations = metrics.NewHistogram(diskLatencyHistogram)
curationFilterOperations = metrics.NewCounter()
storageOperations = metrics.NewCounter()
storageOperationDurations = metrics.NewCounter()
storageLatency = metrics.NewHistogram(diskLatencyHistogram)
@ -88,4 +92,5 @@ func init() {
registry.Register("prometheus_metric_disk_latency_microseconds", "Latency for metric disk operations in microseconds.", registry.NilLabels, storageLatency)
registry.Register("prometheus_storage_operation_time_total_microseconds", "The total time spent performing a given storage operation.", registry.NilLabels, storageOperationDurations)
registry.Register("prometheus_storage_queue_sizes_total", "The various sizes and capacities of the storage queues.", registry.NilLabels, queueSizes)
registry.Register("curation_filter_operations_total", "The number of curation filter operations completed.", registry.NilLabels, curationFilterOperations)
}

View File

@ -164,12 +164,11 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr
}
func (l *LevelDBMetricPersistence) AppendSample(sample model.Sample) (err error) {
begin := time.Now()
defer func() {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: appendSample, result: failure})
}()
}(time.Now())
err = l.AppendSamples(model.Samples{sample})
@ -220,23 +219,20 @@ func groupByFingerprint(samples model.Samples) map[model.Fingerprint]model.Sampl
// in the keyspace and returns a map of Fingerprint-Metric pairs that are
// absent.
func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[model.Fingerprint]model.Metric) (unindexed map[model.Fingerprint]model.Metric, err error) {
begin := time.Now()
defer func() {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: findUnindexedMetrics, result: success}, map[string]string{operation: findUnindexedMetrics, result: failure})
}()
}(time.Now())
unindexed = make(map[model.Fingerprint]model.Metric)
// Determine which metrics are unknown in the database.
for fingerprint, metric := range candidates {
var (
dto = model.MetricToDTO(metric)
indexHas, err = l.hasIndexMetric(dto)
)
dto := model.MetricToDTO(metric)
indexHas, err := l.hasIndexMetric(dto)
if err != nil {
panic(err)
return unindexed, err
}
if !indexHas {
unindexed[fingerprint] = metric
@ -252,12 +248,11 @@ func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[model.Fin
//
// This operation is idempotent.
func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint]model.Metric) (err error) {
begin := time.Now()
defer func() {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: indexLabelNames, result: success}, map[string]string{operation: indexLabelNames, result: failure})
}()
}(time.Now())
labelNameFingerprints := map[model.LabelName]utility.Set{}
@ -269,7 +264,6 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint
fingerprints, err := l.GetFingerprintsForLabelName(labelName)
if err != nil {
panic(err)
return err
}
@ -307,7 +301,6 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint
err = l.labelNameToFingerprints.Commit(batch)
if err != nil {
panic(err)
return
}
@ -320,12 +313,11 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint
//
// This operation is idempotent.
func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint]model.Metric) (err error) {
begin := time.Now()
defer func() {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: indexLabelPairs, result: success}, map[string]string{operation: indexLabelPairs, result: failure})
}()
}(time.Now())
labelPairFingerprints := map[model.LabelPair]utility.Set{}
@ -343,7 +335,6 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint
labelName: labelValue,
})
if err != nil {
panic(err)
return err
}
@ -382,7 +373,6 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint
err = l.labelSetToFingerprints.Commit(batch)
if err != nil {
panic(err)
return
}
@ -394,12 +384,11 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint
//
// This operation is idempotent.
func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerprint]model.Metric) (err error) {
begin := time.Now()
defer func() {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: indexFingerprints, result: success}, map[string]string{operation: indexFingerprints, result: failure})
}()
}(time.Now())
batch := leveldb.NewBatch()
defer batch.Close()
@ -412,7 +401,7 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri
err = l.fingerprintToMetrics.Commit(batch)
if err != nil {
panic(err)
return
}
return
@ -422,12 +411,11 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri
// that are unknown to the storage stack, and then proceeds to update all
// affected indices.
func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerprint]model.Metric) (err error) {
begin := time.Now()
defer func() {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: indexMetrics, result: success}, map[string]string{operation: indexMetrics, result: failure})
}()
}(time.Now())
var (
absentMetrics map[model.Fingerprint]model.Metric
@ -435,7 +423,7 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
absentMetrics, err = l.findUnindexedMetrics(fingerprints)
if err != nil {
panic(err)
return
}
if len(absentMetrics) == 0 {
@ -444,11 +432,9 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
// TODO: For the missing fingerprints, determine what label names and pairs
// are absent and act accordingly and append fingerprints.
var (
doneBuildingLabelNameIndex = make(chan error)
doneBuildingLabelPairIndex = make(chan error)
doneBuildingFingerprintIndex = make(chan error)
)
doneBuildingLabelNameIndex := make(chan error)
doneBuildingLabelPairIndex := make(chan error)
doneBuildingFingerprintIndex := make(chan error)
go func() {
doneBuildingLabelNameIndex <- l.indexLabelNames(absentMetrics)
@ -462,22 +448,17 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
doneBuildingFingerprintIndex <- l.indexFingerprints(absentMetrics)
}()
makeTopLevelIndex := true
err = <-doneBuildingLabelNameIndex
if err != nil {
panic(err)
makeTopLevelIndex = false
return
}
err = <-doneBuildingLabelPairIndex
if err != nil {
panic(err)
makeTopLevelIndex = false
return
}
err = <-doneBuildingFingerprintIndex
if err != nil {
panic(err)
makeTopLevelIndex = false
return
}
// If any of the preceding operations failed, we will have inconsistent
@ -485,62 +466,51 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
// its state is used to determine whether to bulk update the other indices.
// Given that those operations are idempotent, it is OK to repeat them;
// however, it will consume considerable amounts of time.
if makeTopLevelIndex {
batch := leveldb.NewBatch()
defer batch.Close()
batch := leveldb.NewBatch()
defer batch.Close()
// WART: We should probably encode simple fingerprints.
for _, metric := range absentMetrics {
key := coding.NewProtocolBuffer(model.MetricToDTO(metric))
batch.Put(key, key)
}
// WART: We should probably encode simple fingerprints.
for _, metric := range absentMetrics {
key := coding.NewProtocolBuffer(model.MetricToDTO(metric))
batch.Put(key, key)
}
err := l.metricMembershipIndex.Commit(batch)
if err != nil {
panic(err)
// Not critical.
log.Println(err)
}
err = l.metricMembershipIndex.Commit(batch)
if err != nil {
return err
}
return
}
func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Fingerprint]model.Samples) (err error) {
begin := time.Now()
defer func() {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: refreshHighWatermarks, result: success}, map[string]string{operation: refreshHighWatermarks, result: failure})
}()
}(time.Now())
batch := leveldb.NewBatch()
defer batch.Close()
var (
mutationCount = 0
)
mutationCount := 0
for fingerprint, samples := range groups {
var (
key = &dto.Fingerprint{}
value = &dto.MetricHighWatermark{}
raw []byte
newestSampleTimestamp = samples[len(samples)-1].Timestamp
keyEncoded = coding.NewProtocolBuffer(key)
)
key := &dto.Fingerprint{}
value := &dto.MetricHighWatermark{}
raw := []byte{}
newestSampleTimestamp := samples[len(samples)-1].Timestamp
keyEncoded := coding.NewProtocolBuffer(key)
key.Signature = proto.String(fingerprint.ToRowKey())
raw, err = l.metricHighWatermarks.Get(keyEncoded)
if err != nil {
panic(err)
return
}
if raw != nil {
err = proto.Unmarshal(raw, value)
if err != nil {
panic(err)
continue
return
}
if newestSampleTimestamp.Before(time.Unix(*value.Timestamp, 0)) {
@ -554,19 +524,18 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
err = l.metricHighWatermarks.Commit(batch)
if err != nil {
panic(err)
return
}
return
}
func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err error) {
begin := time.Now()
defer func() {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: appendSamples, result: success}, map[string]string{operation: appendSamples, result: failure})
}()
}(time.Now())
var (
fingerprintToSamples = groupByFingerprint(samples)
@ -630,17 +599,17 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
err = l.metricSamples.Commit(samplesBatch)
if err != nil {
panic(err)
return
}
err = <-indexErrChan
if err != nil {
panic(err)
return
}
err = <-watermarkErrChan
if err != nil {
panic(err)
return
}
return
@ -691,13 +660,11 @@ func fingerprintsEqual(l *dto.Fingerprint, r *dto.Fingerprint) bool {
}
func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, err error) {
begin := time.Now()
defer func() {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure})
}()
}(time.Now())
dtoKey := coding.NewProtocolBuffer(dto)
value, err = l.metricMembershipIndex.Has(dtoKey)
@ -706,13 +673,11 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool,
}
func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, err error) {
begin := time.Now()
defer func() {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure})
}()
}(time.Now())
dtoKey := coding.NewProtocolBuffer(dto)
value, err = l.labelSetToFingerprints.Has(dtoKey)
@ -721,13 +686,11 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool,
}
func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, err error) {
begin := time.Now()
defer func() {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure})
}()
}(time.Now())
dtoKey := coding.NewProtocolBuffer(dto)
value, err = l.labelNameToFingerprints.Has(dtoKey)
@ -736,13 +699,11 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool,
}
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fps model.Fingerprints, err error) {
begin := time.Now()
defer func() {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelSet, result: success}, map[string]string{operation: getFingerprintsForLabelSet, result: failure})
}()
}(time.Now())
sets := []utility.Set{}
@ -786,13 +747,11 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab
}
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.LabelName) (fps model.Fingerprints, err error) {
begin := time.Now()
defer func() {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure})
}()
}(time.Now())
raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBuffer(model.LabelNameToDTO(&labelName)))
if err != nil {
@ -815,13 +774,11 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L
}
func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint) (m *model.Metric, err error) {
begin := time.Now()
defer func() {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure})
}()
}(time.Now())
raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBuffer(model.FingerprintToDTO(f)))
if err != nil {

233
storage/metric/processor.go Normal file
View File

@ -0,0 +1,233 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"code.google.com/p/goprotobuf/proto"
"fmt"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"time"
)
// processor models a post-processing agent that performs work given a sample
// corpus.
type processor interface {
// Name emits the name of this processor's signature encoder. It must be
// fully-qualified in the sense that it could be used via a Protocol Buffer
// registry to extract the descriptor to reassemble this message.
Name() string
// Signature emits a byte signature for this process for the purpose of
// remarking how far along it has been applied to the database.
Signature() (signature []byte, err error)
// Apply runs this processor against the sample set. sampleIterator expects
// to be pre-seeked to the initial starting position. The processor will
// run until up until stopAt has been reached. It is imperative that the
// provided stopAt is within the interval of the series frontier.
//
// Upon completion or error, the last time at which the processor finished
// shall be emitted in addition to any errors.
Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error)
}
// compactionProcessor combines sparse values in the database together such
// that at least MinimumGroupSize is fulfilled from the
type compactionProcessor struct {
// MaximumMutationPoolBatch represents approximately the largest pending
// batch of mutation operations for the database before pausing to
// commit before resumption.
//
// A reasonable value would be (MinimumGroupSize * 2) + 1.
MaximumMutationPoolBatch int
// MinimumGroupSize represents the smallest allowed sample chunk size in the
// database.
MinimumGroupSize int
// signature is the byte representation of the compactionProcessor's settings,
// used for purely memoization purposes across an instance.
signature []byte
}
func (p compactionProcessor) Name() string {
return "io.prometheus.CompactionProcessorDefinition"
}
func (p *compactionProcessor) Signature() (out []byte, err error) {
if len(p.signature) == 0 {
out, err = proto.Marshal(&dto.CompactionProcessorDefinition{
MinimumGroupSize: proto.Uint32(uint32(p.MinimumGroupSize)),
})
p.signature = out
}
out = p.signature
return
}
func (p compactionProcessor) String() string {
return fmt.Sprintf("compactionProcess for minimum group size %d", p.MinimumGroupSize)
}
func (p compactionProcessor) Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) {
var pendingBatch raw.Batch = nil
defer func() {
if pendingBatch != nil {
pendingBatch.Close()
}
}()
var pendingMutations = 0
var pendingSamples model.Values
var sampleKey model.SampleKey
var sampleValues model.Values
var lastTouchedTime time.Time
var keyDropped bool
sampleKey, err = extractSampleKey(sampleIterator)
if err != nil {
return
}
sampleValues, err = extractSampleValues(sampleIterator)
if err != nil {
return
}
for lastCurated.Before(stopAt) && lastTouchedTime.Before(stopAt) {
switch {
// Furnish a new pending batch operation if none is available.
case pendingBatch == nil:
pendingBatch = leveldb.NewBatch()
// If there are no sample values to extract from the datastore, let's
// continue extracting more values to use. We know that the time.Before()
// block would prevent us from going into unsafe territory.
case len(sampleValues) == 0:
if !sampleIterator.Next() {
return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation")
}
keyDropped = false
sampleKey, err = extractSampleKey(sampleIterator)
if err != nil {
return
}
sampleValues, err = extractSampleValues(sampleIterator)
if err != nil {
return
}
// If the number of pending mutations exceeds the allowed batch amount,
// commit to disk and delete the batch. A new one will be recreated if
// necessary.
case pendingMutations >= p.MaximumMutationPoolBatch:
err = samples.Commit(pendingBatch)
if err != nil {
return
}
pendingMutations = 0
pendingBatch.Close()
pendingBatch = nil
case len(pendingSamples) == 0 && len(sampleValues) >= p.MinimumGroupSize:
lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp
sampleValues = model.Values{}
case len(pendingSamples)+len(sampleValues) < p.MinimumGroupSize:
if !keyDropped {
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
pendingBatch.Drop(key)
keyDropped = true
}
pendingSamples = append(pendingSamples, sampleValues...)
lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp
sampleValues = model.Values{}
pendingMutations++
// If the number of pending writes equals the target group size
case len(pendingSamples) == p.MinimumGroupSize:
newSampleKey := pendingSamples.ToSampleKey(fingerprint)
key := coding.NewProtocolBuffer(newSampleKey.ToDTO())
value := coding.NewProtocolBuffer(pendingSamples.ToDTO())
pendingBatch.Put(key, value)
pendingMutations++
lastCurated = newSampleKey.FirstTimestamp.In(time.UTC)
if len(sampleValues) > 0 {
if !keyDropped {
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
pendingBatch.Drop(key)
keyDropped = true
}
if len(sampleValues) > p.MinimumGroupSize {
pendingSamples = sampleValues[:p.MinimumGroupSize]
sampleValues = sampleValues[p.MinimumGroupSize:]
lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp
} else {
pendingSamples = sampleValues
lastTouchedTime = pendingSamples[len(pendingSamples)-1].Timestamp
sampleValues = model.Values{}
}
}
case len(pendingSamples)+len(sampleValues) >= p.MinimumGroupSize:
if !keyDropped {
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
pendingBatch.Drop(key)
keyDropped = true
}
remainder := p.MinimumGroupSize - len(pendingSamples)
pendingSamples = append(pendingSamples, sampleValues[:remainder]...)
sampleValues = sampleValues[remainder:]
if len(sampleValues) == 0 {
lastTouchedTime = pendingSamples[len(pendingSamples)-1].Timestamp
} else {
lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp
}
pendingMutations++
default:
err = fmt.Errorf("Unhandled processing case.")
}
}
if len(sampleValues) > 0 || len(pendingSamples) > 0 {
pendingSamples = append(sampleValues, pendingSamples...)
newSampleKey := pendingSamples.ToSampleKey(fingerprint)
key := coding.NewProtocolBuffer(newSampleKey.ToDTO())
value := coding.NewProtocolBuffer(pendingSamples.ToDTO())
pendingBatch.Put(key, value)
pendingSamples = model.Values{}
pendingMutations++
lastCurated = newSampleKey.FirstTimestamp.In(time.UTC)
}
// This is not deferred due to the off-chance that a pre-existing commit
// failed.
if pendingBatch != nil && pendingMutations > 0 {
err = samples.Commit(pendingBatch)
if err != nil {
return
}
}
return
}

View File

@ -0,0 +1,962 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"code.google.com/p/goprotobuf/proto"
"fmt"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage/raw/leveldb"
fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test"
"testing"
"time"
)
type curationState struct {
fingerprint string
ignoreYoungerThan time.Duration
lastCurated time.Time
processor processor
}
type watermarkState struct {
fingerprint string
lastAppended time.Time
}
type sampleGroup struct {
fingerprint string
values model.Values
}
type in struct {
curationStates fixture.Pairs
watermarkStates fixture.Pairs
sampleGroups fixture.Pairs
ignoreYoungerThan time.Duration
groupSize uint32
processor processor
}
type out struct {
curationStates []curationState
sampleGroups []sampleGroup
}
func (c curationState) Get() (key, value coding.Encoder) {
signature, err := c.processor.Signature()
if err != nil {
panic(err)
}
key = coding.NewProtocolBuffer(model.CurationKey{
Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint),
ProcessorMessageRaw: signature,
ProcessorMessageTypeName: c.processor.Name(),
IgnoreYoungerThan: c.ignoreYoungerThan,
}.ToDTO())
value = coding.NewProtocolBuffer(model.CurationRemark{
LastCompletionTimestamp: c.lastCurated,
}.ToDTO())
return
}
func (w watermarkState) Get() (key, value coding.Encoder) {
key = coding.NewProtocolBuffer(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO())
value = coding.NewProtocolBuffer(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO())
return
}
func (s sampleGroup) Get() (key, value coding.Encoder) {
key = coding.NewProtocolBuffer(model.SampleKey{
Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint),
FirstTimestamp: s.values[0].Timestamp,
LastTimestamp: s.values[len(s.values)-1].Timestamp,
SampleCount: uint32(len(s.values)),
}.ToDTO())
value = coding.NewProtocolBuffer(s.values.ToDTO())
return
}
func TestCuratorCompactionProcessor(t *testing.T) {
scenarios := []struct {
in in
out out
}{
{
in: in{
processor: &compactionProcessor{
MinimumGroupSize: 5,
MaximumMutationPoolBatch: 15,
},
ignoreYoungerThan: 1 * time.Hour,
groupSize: 5,
curationStates: fixture.Pairs{
curationState{
fingerprint: "0001-A-1-Z",
ignoreYoungerThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
processor: &compactionProcessor{
MinimumGroupSize: 5,
MaximumMutationPoolBatch: 15,
},
},
curationState{
fingerprint: "0002-A-2-Z",
ignoreYoungerThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
processor: &compactionProcessor{
MinimumGroupSize: 5,
MaximumMutationPoolBatch: 15,
},
},
// This rule should effectively be ignored.
curationState{
fingerprint: "0002-A-2-Z",
processor: &compactionProcessor{
MinimumGroupSize: 2,
MaximumMutationPoolBatch: 15,
},
ignoreYoungerThan: 30 * time.Minute,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
},
},
watermarkStates: fixture.Pairs{
watermarkState{
fingerprint: "0001-A-1-Z",
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
},
watermarkState{
fingerprint: "0002-A-2-Z",
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
},
},
sampleGroups: fixture.Pairs{
sampleGroup{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 90 * time.Minute),
Value: 0,
},
{
Timestamp: testInstant.Add(-1 * 85 * time.Minute),
Value: 1,
},
{
Timestamp: testInstant.Add(-1 * 80 * time.Minute),
Value: 2,
},
{
Timestamp: testInstant.Add(-1 * 75 * time.Minute),
Value: 3,
},
{
Timestamp: testInstant.Add(-1 * 70 * time.Minute),
Value: 4,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 65 * time.Minute),
Value: 0.25,
},
{
Timestamp: testInstant.Add(-1 * 60 * time.Minute),
Value: 1.25,
},
{
Timestamp: testInstant.Add(-1 * 55 * time.Minute),
Value: 2.25,
},
{
Timestamp: testInstant.Add(-1 * 50 * time.Minute),
Value: 3.25,
},
{
Timestamp: testInstant.Add(-1 * 45 * time.Minute),
Value: 4.25,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 40 * time.Minute),
Value: 0.50,
},
{
Timestamp: testInstant.Add(-1 * 35 * time.Minute),
Value: 1.50,
},
{
Timestamp: testInstant.Add(-1 * 30 * time.Minute),
Value: 2.50,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 25 * time.Minute),
Value: 0.75,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 20 * time.Minute),
Value: -2,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 15 * time.Minute),
Value: -3,
},
},
},
sampleGroup{
// Moved into Block 1
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 90 * time.Minute),
Value: 0,
},
},
},
sampleGroup{
// Moved into Block 1
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 89 * time.Minute),
Value: 1,
},
},
},
sampleGroup{
// Moved into Block 1
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 88 * time.Minute),
Value: 2,
},
},
},
sampleGroup{
// Moved into Block 1
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 87 * time.Minute),
Value: 3,
},
},
},
sampleGroup{
// Moved into Block 1
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 86 * time.Minute),
Value: 4,
},
},
},
sampleGroup{
// Moved into Block 2
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 85 * time.Minute),
Value: 5,
},
},
},
sampleGroup{
// Moved into Block 2
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 84 * time.Minute),
Value: 6,
},
},
},
sampleGroup{
// Moved into Block 2
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 83 * time.Minute),
Value: 7,
},
},
},
sampleGroup{
// Moved into Block 2
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 82 * time.Minute),
Value: 8,
},
},
},
sampleGroup{
// Moved into Block 2
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 81 * time.Minute),
Value: 9,
},
},
},
sampleGroup{
// Moved into Block 3
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 80 * time.Minute),
Value: 10,
},
},
},
sampleGroup{
// Moved into Block 3
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 79 * time.Minute),
Value: 11,
},
},
},
sampleGroup{
// Moved into Block 3
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 78 * time.Minute),
Value: 12,
},
},
},
sampleGroup{
// Moved into Block 3
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 77 * time.Minute),
Value: 13,
},
},
},
sampleGroup{
// Moved into Blocks 3 and 4 and 5
fingerprint: "0002-A-2-Z",
values: model.Values{
{
// Block 3
Timestamp: testInstant.Add(-1 * 76 * time.Minute),
Value: 14,
},
{
// Block 4
Timestamp: testInstant.Add(-1 * 75 * time.Minute),
Value: 15,
},
{
// Block 4
Timestamp: testInstant.Add(-1 * 74 * time.Minute),
Value: 16,
},
{
// Block 4
Timestamp: testInstant.Add(-1 * 73 * time.Minute),
Value: 17,
},
{
// Block 4
Timestamp: testInstant.Add(-1 * 72 * time.Minute),
Value: 18,
},
{
// Block 4
Timestamp: testInstant.Add(-1 * 71 * time.Minute),
Value: 19,
},
{
// Block 5
Timestamp: testInstant.Add(-1 * 70 * time.Minute),
Value: 20,
},
},
},
sampleGroup{
// Moved into Block 5
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 69 * time.Minute),
Value: 21,
},
},
},
sampleGroup{
// Moved into Block 5
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 68 * time.Minute),
Value: 22,
},
},
},
sampleGroup{
// Moved into Block 5
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 67 * time.Minute),
Value: 23,
},
},
},
sampleGroup{
// Moved into Block 5
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 66 * time.Minute),
Value: 24,
},
},
},
sampleGroup{
// Moved into Block 6
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 65 * time.Minute),
Value: 25,
},
},
},
sampleGroup{
// Moved into Block 6
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 64 * time.Minute),
Value: 26,
},
},
},
sampleGroup{
// Moved into Block 6
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 63 * time.Minute),
Value: 27,
},
},
},
sampleGroup{
// Moved into Block 6
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 62 * time.Minute),
Value: 28,
},
},
},
sampleGroup{
// Moved into Block 6
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 61 * time.Minute),
Value: 29,
},
},
},
sampleGroup{
// Moved into Block 7
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 60 * time.Minute),
Value: 30,
},
},
},
},
},
out: out{
curationStates: []curationState{
{
fingerprint: "0001-A-1-Z",
ignoreYoungerThan: time.Hour,
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
processor: &compactionProcessor{
MinimumGroupSize: 5,
MaximumMutationPoolBatch: 15,
},
},
{
fingerprint: "0002-A-2-Z",
ignoreYoungerThan: 30 * time.Minute,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
processor: &compactionProcessor{
MinimumGroupSize: 2,
MaximumMutationPoolBatch: 15,
},
},
{
fingerprint: "0002-A-2-Z",
ignoreYoungerThan: time.Hour,
lastCurated: testInstant.Add(-1 * 60 * time.Minute),
processor: &compactionProcessor{
MinimumGroupSize: 5,
MaximumMutationPoolBatch: 15,
},
},
},
sampleGroups: []sampleGroup{
{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 90 * time.Minute),
Value: 0,
},
{
Timestamp: testInstant.Add(-1 * 85 * time.Minute),
Value: 1,
},
{
Timestamp: testInstant.Add(-1 * 80 * time.Minute),
Value: 2,
},
{
Timestamp: testInstant.Add(-1 * 75 * time.Minute),
Value: 3,
},
{
Timestamp: testInstant.Add(-1 * 70 * time.Minute),
Value: 4,
},
},
},
{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 65 * time.Minute),
Value: 0.25,
},
{
Timestamp: testInstant.Add(-1 * 60 * time.Minute),
Value: 1.25,
},
{
Timestamp: testInstant.Add(-1 * 55 * time.Minute),
Value: 2.25,
},
{
Timestamp: testInstant.Add(-1 * 50 * time.Minute),
Value: 3.25,
},
{
Timestamp: testInstant.Add(-1 * 45 * time.Minute),
Value: 4.25,
},
},
},
{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 40 * time.Minute),
Value: 0.50,
},
{
Timestamp: testInstant.Add(-1 * 35 * time.Minute),
Value: 1.50,
},
{
Timestamp: testInstant.Add(-1 * 30 * time.Minute),
Value: 2.50,
},
},
},
{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 25 * time.Minute),
Value: 0.75,
},
},
},
{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 20 * time.Minute),
Value: -2,
},
},
},
{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 15 * time.Minute),
Value: -3,
},
},
},
{
// Block 1
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 90 * time.Minute),
Value: 0,
},
{
Timestamp: testInstant.Add(-1 * 89 * time.Minute),
Value: 1,
},
{
Timestamp: testInstant.Add(-1 * 88 * time.Minute),
Value: 2,
},
{
Timestamp: testInstant.Add(-1 * 87 * time.Minute),
Value: 3,
},
{
Timestamp: testInstant.Add(-1 * 86 * time.Minute),
Value: 4,
},
},
},
{
// Block 2
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 85 * time.Minute),
Value: 5,
},
{
Timestamp: testInstant.Add(-1 * 84 * time.Minute),
Value: 6,
},
{
Timestamp: testInstant.Add(-1 * 83 * time.Minute),
Value: 7,
},
{
Timestamp: testInstant.Add(-1 * 82 * time.Minute),
Value: 8,
},
{
Timestamp: testInstant.Add(-1 * 81 * time.Minute),
Value: 9,
},
},
},
{
// Block 3
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 80 * time.Minute),
Value: 10,
},
{
Timestamp: testInstant.Add(-1 * 79 * time.Minute),
Value: 11,
},
{
Timestamp: testInstant.Add(-1 * 78 * time.Minute),
Value: 12,
},
{
Timestamp: testInstant.Add(-1 * 77 * time.Minute),
Value: 13,
},
{
Timestamp: testInstant.Add(-1 * 76 * time.Minute),
Value: 14,
},
},
},
{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 75 * time.Minute),
Value: 15,
},
{
Timestamp: testInstant.Add(-1 * 74 * time.Minute),
Value: 16,
},
{
Timestamp: testInstant.Add(-1 * 73 * time.Minute),
Value: 17,
},
{
Timestamp: testInstant.Add(-1 * 72 * time.Minute),
Value: 18,
},
{
Timestamp: testInstant.Add(-1 * 71 * time.Minute),
Value: 19,
},
},
},
{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 70 * time.Minute),
Value: 20,
},
{
Timestamp: testInstant.Add(-1 * 69 * time.Minute),
Value: 21,
},
{
Timestamp: testInstant.Add(-1 * 68 * time.Minute),
Value: 22,
},
{
Timestamp: testInstant.Add(-1 * 67 * time.Minute),
Value: 23,
},
{
Timestamp: testInstant.Add(-1 * 66 * time.Minute),
Value: 24,
},
},
},
{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 65 * time.Minute),
Value: 25,
},
{
Timestamp: testInstant.Add(-1 * 64 * time.Minute),
Value: 26,
},
{
Timestamp: testInstant.Add(-1 * 63 * time.Minute),
Value: 27,
},
{
Timestamp: testInstant.Add(-1 * 62 * time.Minute),
Value: 28,
},
{
Timestamp: testInstant.Add(-1 * 61 * time.Minute),
Value: 29,
},
},
},
{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 60 * time.Minute),
Value: 30,
},
},
},
},
},
},
}
for i, scenario := range scenarios {
curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.in.curationStates))
defer curatorDirectory.Close()
watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.in.watermarkStates))
defer watermarkDirectory.Close()
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups))
defer sampleDirectory.Close()
curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0)
if err != nil {
t.Fatal(err)
}
defer curatorStates.Close()
watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0)
if err != nil {
t.Fatal(err)
}
defer watermarkStates.Close()
samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0)
if err != nil {
t.Fatal(err)
}
defer samples.Close()
c := newCurator()
err = c.run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates)
if err != nil {
t.Fatal(err)
}
iterator := curatorStates.NewIterator(true)
defer iterator.Close()
for j, expected := range scenario.out.curationStates {
switch j {
case 0:
if !iterator.SeekToFirst() {
t.Fatalf("%d.%d. could not seek to beginning.", i, j)
}
default:
if !iterator.Next() {
t.Fatalf("%d.%d. could not seek to next.", i, j)
}
}
curationKeyDto := &dto.CurationKey{}
curationValueDto := &dto.CurationValue{}
err = proto.Unmarshal(iterator.Key(), curationKeyDto)
if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s\n", i, j, err)
}
err = proto.Unmarshal(iterator.Value(), curationValueDto)
if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s\n", i, j, err)
}
curationKey := model.NewCurationKeyFromDTO(curationKeyDto)
actualCurationRemark := model.NewCurationRemarkFromDTO(curationValueDto)
signature, err := expected.processor.Signature()
if err != nil {
t.Fatal(err)
}
actualKey := curationKey
expectedKey := model.CurationKey{
Fingerprint: model.NewFingerprintFromRowKey(expected.fingerprint),
IgnoreYoungerThan: expected.ignoreYoungerThan,
ProcessorMessageRaw: signature,
ProcessorMessageTypeName: expected.processor.Name(),
}
if !actualKey.Equal(expectedKey) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedKey, actualKey)
}
expectedCurationRemark := model.CurationRemark{
LastCompletionTimestamp: expected.lastCurated,
}
if !actualCurationRemark.Equal(expectedCurationRemark) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedCurationRemark, actualCurationRemark)
}
}
iterator = samples.NewIterator(true)
defer iterator.Close()
for j, expected := range scenario.out.sampleGroups {
switch j {
case 0:
if !iterator.SeekToFirst() {
t.Fatalf("%d.%d. could not seek to beginning.", i, j)
}
default:
if !iterator.Next() {
t.Fatalf("%d.%d. could not seek to next, expected %s", i, j, expected)
}
}
sampleKey, err := extractSampleKey(iterator)
if err != nil {
t.Fatalf("%d.%d. error %s", i, j, err)
}
sampleValues, err := extractSampleValues(iterator)
if err != nil {
t.Fatalf("%d.%d. error %s", i, j, err)
}
if !model.NewFingerprintFromRowKey(expected.fingerprint).Equal(sampleKey.Fingerprint) {
t.Fatalf("%d.%d. expected fingerprint %s, got %s", i, j, expected.fingerprint, sampleKey.Fingerprint)
}
if int(sampleKey.SampleCount) != len(expected.values) {
t.Fatalf("%d.%d. expected %d values, got %d", i, j, len(expected.values), sampleKey.SampleCount)
}
if len(sampleValues) != len(expected.values) {
t.Fatalf("%d.%d. expected %d values, got %d", i, j, len(expected.values), len(sampleValues))
}
if !sampleKey.FirstTimestamp.Equal(expected.values[0].Timestamp) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expected.values[0].Timestamp, sampleKey.FirstTimestamp)
}
for k, actualValue := range sampleValues {
if expected.values[k].Value != actualValue.Value {
t.Fatalf("%d.%d.%d. expected %d, got %d", i, j, k, expected.values[k].Value, actualValue.Value)
}
if !expected.values[k].Timestamp.Equal(actualValue.Timestamp) {
t.Fatalf("%d.%d.%d. expected %s, got %s", i, j, k, expected.values[k].Timestamp, actualValue.Timestamp)
}
}
if !sampleKey.LastTimestamp.Equal(expected.values[len(expected.values)-1].Timestamp) {
fmt.Println("last", sampleValues[len(expected.values)-1].Value, expected.values[len(expected.values)-1].Value)
t.Errorf("%d.%d. expected %s, got %s", i, j, expected.values[len(expected.values)-1].Timestamp, sampleKey.LastTimestamp)
}
}
}
}

View File

@ -14,30 +14,34 @@
package leveldb
import (
"fmt"
"github.com/jmhodges/levigo"
"github.com/prometheus/prometheus/coding"
)
type batch struct {
batch *levigo.WriteBatch
drops uint32
puts uint32
}
func NewBatch() batch {
return batch{
func NewBatch() *batch {
return &batch{
batch: levigo.NewWriteBatch(),
}
}
func (b batch) Drop(key coding.Encoder) {
func (b *batch) Drop(key coding.Encoder) {
keyEncoded, err := key.Encode()
if err != nil {
panic(err)
}
b.drops++
b.batch.Delete(keyEncoded)
}
func (b batch) Put(key, value coding.Encoder) {
func (b *batch) Put(key, value coding.Encoder) {
keyEncoded, err := key.Encode()
if err != nil {
panic(err)
@ -46,6 +50,7 @@ func (b batch) Put(key, value coding.Encoder) {
if err != nil {
panic(err)
}
b.puts++
b.batch.Put(keyEncoded, valueEncoded)
}
@ -53,3 +58,7 @@ func (b batch) Put(key, value coding.Encoder) {
func (b batch) Close() {
b.batch.Close()
}
func (b batch) String() string {
return fmt.Sprintf("LevelDB batch with %d puts and %d drops.", b.puts, b.drops)
}

View File

@ -298,7 +298,7 @@ func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) {
// tests, we could create a Batch struct that journals pending
// operations which the given Persistence implementation could convert
// to its specific commit requirements.
batch, ok := b.(batch)
batch, ok := b.(*batch)
if !ok {
panic("leveldb.batch expected")
}