// Copyright 2013 Prometheus Team // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package metric import ( "code.google.com/p/goprotobuf/proto" "fmt" "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/model" dto "github.com/prometheus/prometheus/model/generated" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw/leveldb" "strings" "time" ) // CurationState contains high-level curation state information for the // heads-up-display. type CurationState struct { Active bool Name string Limit time.Duration Fingerprint model.Fingerprint } // watermarkFilter determines whether to include or exclude candidate // values from the curation process by virtue of how old the high watermark is. type watermarkFilter struct { // curationState is the data store for curation remarks. curationState raw.Persistence // ignoreYoungerThan conveys this filter's policy of not working on elements // younger than a given relative time duration. This is persisted to the // curation remark database (curationState) to indicate how far a given // policy of this type has progressed. ignoreYoungerThan time.Duration // processor is the post-processor that performs whatever action is desired on // the data that is deemed valid to be worked on. processor Processor // stop functions as the global stop channel for all future operations. stop chan bool // stopAt is used to determine the elegibility of series for compaction. stopAt time.Time // status is the outbound channel for notifying the status page of its state. status chan CurationState } // curator is responsible for effectuating a given curation policy across the // stored samples on-disk. This is useful to compact sparse sample values into // single sample entities to reduce keyspace load on the datastore. type Curator struct { // Stop functions as a channel that when empty allows the curator to operate. // The moment a value is ingested inside of it, the curator goes into drain // mode. Stop chan bool } // watermarkDecoder converts (dto.Fingerprint, dto.MetricHighWatermark) doubles // into (model.Fingerprint, model.Watermark) doubles. type watermarkDecoder struct{} // watermarkOperator scans over the curator.samples table for metrics whose // high watermark has been determined to be allowable for curation. This type // is individually responsible for compaction. // // The scanning starts from CurationRemark.LastCompletionTimestamp and goes // forward until the stop point or end of the series is reached. type watermarkOperator struct { // curationState is the data store for curation remarks. curationState raw.Persistence // diskFrontier models the available seekable ranges for the provided // sampleIterator. diskFrontier diskFrontier // ignoreYoungerThan is passed into the curation remark for the given series. ignoreYoungerThan time.Duration // processor is responsible for executing a given stategy on the // to-be-operated-on series. processor Processor // sampleIterator is a snapshotted iterator for the time series. sampleIterator leveldb.Iterator // samples samples raw.Persistence // stopAt is a cue for when to stop mutating a given series. stopAt time.Time } // run facilitates the curation lifecycle. // // recencyThreshold represents the most recent time up to which values will be // curated. // curationState is the on-disk store where the curation remarks are made for // how much progress has been made. func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState, samples, watermarks *leveldb.LevelDBPersistence, status chan CurationState) (err error) { defer func(t time.Time) { duration := float64(time.Since(t) / time.Millisecond) labels := map[string]string{ cutOff: fmt.Sprint(ignoreYoungerThan), processorName: processor.Name(), result: success, } if err != nil { labels[result] = failure } curationDuration.IncrementBy(labels, duration) curationDurations.Add(labels, duration) }(time.Now()) defer func() { select { case status <- CurationState{Active: false}: case <-status: default: } }() iterator := samples.NewIterator(true) defer iterator.Close() diskFrontier, err := newDiskFrontier(iterator) if err != nil { return } if diskFrontier == nil { // No sample database exists; no work to do! return } decoder := watermarkDecoder{} filter := watermarkFilter{ curationState: curationState, ignoreYoungerThan: ignoreYoungerThan, processor: processor, status: status, stop: c.Stop, stopAt: instant.Add(-1 * ignoreYoungerThan), } // Right now, the ability to stop a curation is limited to the beginning of // each fingerprint cycle. It is impractical to cease the work once it has // begun for a given series. operator := watermarkOperator{ curationState: curationState, diskFrontier: *diskFrontier, processor: processor, ignoreYoungerThan: ignoreYoungerThan, sampleIterator: iterator, samples: samples, stopAt: instant.Add(-1 * ignoreYoungerThan), } _, err = watermarks.ForEach(decoder, filter, operator) return } // drain instructs the curator to stop at the next convenient moment as to not // introduce data inconsistencies. func (c Curator) Drain() { if len(c.Stop) == 0 { c.Stop <- true } } func (w watermarkDecoder) DecodeKey(in interface{}) (out interface{}, err error) { key := &dto.Fingerprint{} bytes := in.([]byte) err = proto.Unmarshal(bytes, key) if err != nil { return } out = model.NewFingerprintFromDTO(key) return } func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err error) { dto := &dto.MetricHighWatermark{} bytes := in.([]byte) err = proto.Unmarshal(bytes, dto) if err != nil { return } out = model.NewWatermarkFromHighWatermarkDTO(dto) return } func (w watermarkFilter) shouldStop() bool { return len(w.stop) != 0 } func getCurationRemark(states raw.Persistence, processor Processor, ignoreYoungerThan time.Duration, fingerprint model.Fingerprint) (remark *model.CurationRemark, err error) { rawSignature, err := processor.Signature() if err != nil { return } curationKey := model.CurationKey{ Fingerprint: fingerprint, ProcessorMessageRaw: rawSignature, ProcessorMessageTypeName: processor.Name(), IgnoreYoungerThan: ignoreYoungerThan, }.ToDTO() curationValue := &dto.CurationValue{} rawKey := coding.NewProtocolBuffer(curationKey) has, err := states.Has(rawKey) if err != nil { return } if !has { return } rawCurationValue, err := states.Get(rawKey) if err != nil { return } err = proto.Unmarshal(rawCurationValue, curationValue) if err != nil { return } baseRemark := model.NewCurationRemarkFromDTO(curationValue) remark = &baseRemark return } func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult) { fingerprint := key.(model.Fingerprint) defer func() { labels := map[string]string{ cutOff: fmt.Sprint(w.ignoreYoungerThan), result: strings.ToLower(r.String()), processorName: w.processor.Name(), } curationFilterOperations.Increment(labels) }() defer func() { select { case w.status <- CurationState{ Active: true, Name: w.processor.Name(), Limit: w.ignoreYoungerThan, Fingerprint: fingerprint, }: case <-w.status: default: } }() if w.shouldStop() { return storage.STOP } curationRemark, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, fingerprint) if err != nil { return } if curationRemark == nil { r = storage.ACCEPT return } if !curationRemark.OlderThan(w.stopAt) { return storage.SKIP } watermark := value.(model.Watermark) if !curationRemark.OlderThan(watermark.Time) { return storage.SKIP } curationConsistent, err := w.curationConsistent(fingerprint, watermark) if err != nil { return } if curationConsistent { return storage.SKIP } return storage.ACCEPT } // curationConsistent determines whether the given metric is in a dirty state // and needs curation. func (w watermarkFilter) curationConsistent(f model.Fingerprint, watermark model.Watermark) (consistent bool, err error) { curationRemark, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, f) if err != nil { return } if !curationRemark.OlderThan(watermark.Time) { consistent = true } return } func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorError) { fingerprint := key.(model.Fingerprint) seriesFrontier, err := newSeriesFrontier(fingerprint, w.diskFrontier, w.sampleIterator) if err != nil || seriesFrontier == nil { // An anomaly with the series frontier is severe in the sense that some sort // of an illegal state condition exists in the storage layer, which would // probably signify an illegal disk frontier. return &storage.OperatorError{error: err, Continuable: false} } curationState, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, fingerprint) if err != nil { // An anomaly with the curation remark is likely not fatal in the sense that // there was a decoding error with the entity and shouldn't be cause to stop // work. The process will simply start from a pessimistic work time and // work forward. With an idempotent processor, this is safe. return &storage.OperatorError{error: err, Continuable: true} } startKey := model.SampleKey{ Fingerprint: fingerprint, FirstTimestamp: seriesFrontier.optimalStartTime(curationState), } prospectiveKey, err := coding.NewProtocolBuffer(startKey.ToDTO()).Encode() if err != nil { // An encoding failure of a key is no reason to stop. return &storage.OperatorError{error: err, Continuable: true} } if !w.sampleIterator.Seek(prospectiveKey) { // LevelDB is picky about the seek ranges. If an iterator was invalidated, // no work may occur, and the iterator cannot be recovered. return &storage.OperatorError{error: fmt.Errorf("Illegal Condition: Iterator invalidated due to seek range."), Continuable: false} } newestAllowedSample := w.stopAt if !newestAllowedSample.Before(seriesFrontier.lastSupertime) { newestAllowedSample = seriesFrontier.lastSupertime } lastTime, err := w.processor.Apply(w.sampleIterator, w.samples, newestAllowedSample, fingerprint) if err != nil { // We can't divine the severity of a processor error without refactoring the // interface. return &storage.OperatorError{error: err, Continuable: false} } err = w.refreshCurationRemark(fingerprint, lastTime) if err != nil { // Under the assumption that the processors are idempotent, they can be // re-run; thusly, the commitment of the curation remark is no cause // to cease further progress. return &storage.OperatorError{error: err, Continuable: true} } return } func (w watermarkOperator) refreshCurationRemark(f model.Fingerprint, finished time.Time) (err error) { signature, err := w.processor.Signature() if err != nil { return } curationKey := model.CurationKey{ Fingerprint: f, ProcessorMessageRaw: signature, ProcessorMessageTypeName: w.processor.Name(), IgnoreYoungerThan: w.ignoreYoungerThan, }.ToDTO() curationValue := model.CurationRemark{ LastCompletionTimestamp: finished, }.ToDTO() err = w.curationState.Put(coding.NewProtocolBuffer(curationKey), coding.NewProtocolBuffer(curationValue)) return }