2013-02-07 10:38:01 +00:00
// Copyright 2013 Prometheus Team
2012-12-09 15:27:12 +00:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2013-02-08 17:03:26 +00:00
package metric
2012-12-09 15:27:12 +00:00
import (
2013-02-08 17:03:26 +00:00
"flag"
2013-04-28 12:47:43 +00:00
"fmt"
2013-02-08 17:03:26 +00:00
"sort"
2013-03-04 19:43:07 +00:00
"sync"
2012-12-12 11:53:34 +00:00
"time"
2012-12-09 15:27:12 +00:00
2013-06-08 08:27:44 +00:00
"code.google.com/p/goprotobuf/proto"
2013-08-12 15:18:02 +00:00
"github.com/golang/glog"
2013-06-08 08:27:44 +00:00
2013-06-25 12:02:27 +00:00
clientmodel "github.com/prometheus/client_golang/model"
2013-06-08 08:27:44 +00:00
"github.com/prometheus/prometheus/storage"
2013-08-05 15:31:49 +00:00
"github.com/prometheus/prometheus/storage/raw"
2013-06-08 08:27:44 +00:00
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
2013-08-12 15:18:02 +00:00
dto "github.com/prometheus/prometheus/model/generated"
2013-02-08 17:03:26 +00:00
)
2013-06-08 08:27:44 +00:00
const sortConcurrency = 2
2013-02-08 17:03:26 +00:00
type LevelDBMetricPersistence struct {
2013-08-06 10:00:31 +00:00
CurationRemarks CurationRemarker
2013-08-12 22:36:12 +00:00
FingerprintToMetrics FingerprintMetricIndex
LabelNameToFingerprints LabelNameFingerprintIndex
LabelSetToFingerprints LabelPairFingerprintIndex
2013-08-05 07:25:47 +00:00
MetricHighWatermarks HighWatermarker
2013-08-12 22:36:12 +00:00
MetricMembershipIndex MetricMembershipIndex
2013-08-07 10:07:35 +00:00
Indexer MetricIndexer
MetricSamples * leveldb . LevelDBPersistence
// The remaining indices will be replaced with generalized interface resolvers:
//
// type FingerprintResolver interface {
// GetFingerprintForMetric(clientmodel.Metric) (*clientmodel.Fingerprint, bool, error)
// GetFingerprintsForLabelName(clientmodel.LabelName) (clientmodel.Fingerprints, bool, error)
// GetFingerprintsForLabelSet(LabelPair) (clientmodel.Fingerprints, bool, error)
// }
// type MetricResolver interface {
// GetMetricsForFingerprint(clientmodel.Fingerprints) (FingerprintMetricMapping, bool, error)
// }
2013-02-08 17:03:26 +00:00
}
var (
2013-05-21 14:11:35 +00:00
leveldbChunkSize = flag . Int ( "leveldbChunkSize" , 200 , "Maximum number of samples stored under one key." )
2013-02-08 17:03:26 +00:00
// These flag values are back of the envelope, though they seem sensible.
// Please re-evaluate based on your own needs.
2013-06-24 10:09:16 +00:00
curationRemarksCacheSize = flag . Int ( "curationRemarksCacheSize" , 5 * 1024 * 1024 , "The size for the curation remarks cache (bytes)." )
fingerprintsToLabelPairCacheSize = flag . Int ( "fingerprintsToLabelPairCacheSizeBytes" , 25 * 1024 * 1024 , "The size for the fingerprint to label pair index (bytes)." )
highWatermarkCacheSize = flag . Int ( "highWatermarksByFingerprintSizeBytes" , 5 * 1024 * 1024 , "The size for the metric high watermarks (bytes)." )
labelNameToFingerprintsCacheSize = flag . Int ( "labelNameToFingerprintsCacheSizeBytes" , 25 * 1024 * 1024 , "The size for the label name to metric fingerprint index (bytes)." )
labelPairToFingerprintsCacheSize = flag . Int ( "labelPairToFingerprintsCacheSizeBytes" , 25 * 1024 * 1024 , "The size for the label pair to metric fingerprint index (bytes)." )
metricMembershipIndexCacheSize = flag . Int ( "metricMembershipCacheSizeBytes" , 5 * 1024 * 1024 , "The size for the metric membership index (bytes)." )
samplesByFingerprintCacheSize = flag . Int ( "samplesByFingerprintCacheSizeBytes" , 50 * 1024 * 1024 , "The size for the samples database (bytes)." )
2013-02-08 17:03:26 +00:00
)
type leveldbOpener func ( )
2013-08-03 16:46:02 +00:00
type errorCloser interface {
Close ( ) error
}
type closer interface {
2013-04-01 11:22:38 +00:00
Close ( )
}
2013-02-08 17:03:26 +00:00
2013-04-01 11:22:38 +00:00
func ( l * LevelDBMetricPersistence ) Close ( ) {
2013-08-03 16:46:02 +00:00
var persistences = [ ] interface { } {
2013-05-02 10:49:13 +00:00
l . CurationRemarks ,
2013-08-12 22:36:12 +00:00
l . FingerprintToMetrics ,
l . LabelNameToFingerprints ,
l . LabelSetToFingerprints ,
2013-05-02 10:49:13 +00:00
l . MetricHighWatermarks ,
2013-08-12 22:36:12 +00:00
l . MetricMembershipIndex ,
2013-05-02 10:49:13 +00:00
l . MetricSamples ,
2013-02-08 17:03:26 +00:00
}
2013-04-01 11:22:38 +00:00
closerGroup := sync . WaitGroup { }
2013-02-08 17:03:26 +00:00
2013-08-03 16:46:02 +00:00
for _ , c := range persistences {
2013-04-01 11:22:38 +00:00
closerGroup . Add ( 1 )
2013-08-03 16:46:02 +00:00
go func ( c interface { } ) {
if c != nil {
switch closer := c . ( type ) {
case closer :
closer . Close ( )
case errorCloser :
if err := closer . Close ( ) ; err != nil {
2013-08-12 16:22:48 +00:00
glog . Error ( "Error closing persistence: " , err )
2013-08-03 16:46:02 +00:00
}
}
2013-04-24 09:51:40 +00:00
}
2013-04-01 11:22:38 +00:00
closerGroup . Done ( )
2013-08-03 16:46:02 +00:00
} ( c )
2013-02-08 17:03:26 +00:00
}
2013-04-01 11:22:38 +00:00
closerGroup . Wait ( )
2013-02-08 17:03:26 +00:00
}
2013-04-28 12:47:43 +00:00
func NewLevelDBMetricPersistence ( baseDirectory string ) ( * LevelDBMetricPersistence , error ) {
workers := utility . NewUncertaintyGroup ( 7 )
2013-02-08 17:03:26 +00:00
2013-06-25 12:02:27 +00:00
emission := new ( LevelDBMetricPersistence )
2013-02-08 17:03:26 +00:00
var subsystemOpeners = [ ] struct {
name string
opener leveldbOpener
} {
{
"Label Names and Value Pairs by Fingerprint" ,
func ( ) {
var err error
2013-08-12 22:36:12 +00:00
emission . FingerprintToMetrics , err = NewLevelDBFingerprintMetricIndex ( LevelDBFingerprintMetricIndexOptions {
2013-08-03 16:46:02 +00:00
LevelDBOptions : leveldb . LevelDBOptions {
2013-08-05 07:25:47 +00:00
Name : "Metrics by Fingerprint" ,
Purpose : "Index" ,
2013-08-03 16:46:02 +00:00
Path : baseDirectory + "/label_name_and_value_pairs_by_fingerprint" ,
CacheSizeBytes : * fingerprintsToLabelPairCacheSize ,
} ,
} )
2013-04-28 12:47:43 +00:00
workers . MayFail ( err )
2013-02-08 17:03:26 +00:00
} ,
} ,
{
"Samples by Fingerprint" ,
func ( ) {
var err error
2013-08-10 15:02:28 +00:00
emission . MetricSamples , err = leveldb . NewLevelDBPersistence ( leveldb . LevelDBOptions {
2013-08-05 07:25:47 +00:00
Name : "Samples" ,
Purpose : "Timeseries" ,
2013-08-03 15:25:03 +00:00
Path : baseDirectory + "/samples_by_fingerprint" ,
CacheSizeBytes : * fingerprintsToLabelPairCacheSize ,
2013-08-10 15:02:28 +00:00
} )
2013-04-28 12:47:43 +00:00
workers . MayFail ( err )
2013-02-08 17:03:26 +00:00
} ,
} ,
2013-03-15 02:24:28 +00:00
{
"High Watermarks by Fingerprint" ,
func ( ) {
var err error
2013-08-10 15:02:28 +00:00
emission . MetricHighWatermarks , err = NewLevelDBHighWatermarker ( LevelDBHighWatermarkerOptions {
2013-08-05 07:25:47 +00:00
LevelDBOptions : leveldb . LevelDBOptions {
Name : "High Watermarks" ,
Purpose : "The youngest sample in the database per metric." ,
Path : baseDirectory + "/high_watermarks_by_fingerprint" ,
CacheSizeBytes : * highWatermarkCacheSize ,
} } )
2013-04-28 12:47:43 +00:00
workers . MayFail ( err )
2013-03-15 02:24:28 +00:00
} ,
} ,
2013-02-08 17:03:26 +00:00
{
"Fingerprints by Label Name" ,
func ( ) {
var err error
2013-08-12 22:36:12 +00:00
emission . LabelNameToFingerprints , err = NewLevelLabelNameFingerprintIndex ( LevelDBLabelNameFingerprintIndexOptions {
2013-08-03 16:46:02 +00:00
LevelDBOptions : leveldb . LevelDBOptions {
2013-08-05 07:25:47 +00:00
Name : "Fingerprints by Label Name" ,
Purpose : "Index" ,
2013-08-03 16:46:02 +00:00
Path : baseDirectory + "/fingerprints_by_label_name" ,
CacheSizeBytes : * labelNameToFingerprintsCacheSize ,
} ,
} )
2013-04-28 12:47:43 +00:00
workers . MayFail ( err )
2013-02-08 17:03:26 +00:00
} ,
} ,
{
"Fingerprints by Label Name and Value Pair" ,
func ( ) {
var err error
2013-08-12 22:36:12 +00:00
emission . LabelSetToFingerprints , err = NewLevelDBLabelSetFingerprintIndex ( LevelDBLabelSetFingerprintIndexOptions {
2013-08-03 16:46:02 +00:00
LevelDBOptions : leveldb . LevelDBOptions {
2013-08-05 07:25:47 +00:00
Name : "Fingerprints by Label Pair" ,
Purpose : "Index" ,
2013-08-03 16:46:02 +00:00
Path : baseDirectory + "/fingerprints_by_label_name_and_value_pair" ,
CacheSizeBytes : * labelPairToFingerprintsCacheSize ,
} ,
} )
2013-04-28 12:47:43 +00:00
workers . MayFail ( err )
2013-02-08 17:03:26 +00:00
} ,
} ,
{
"Metric Membership Index" ,
func ( ) {
var err error
2013-08-12 22:36:12 +00:00
emission . MetricMembershipIndex , err = NewLevelDBMetricMembershipIndex (
2013-08-10 15:02:28 +00:00
LevelDBMetricMembershipIndexOptions {
2013-08-03 16:46:02 +00:00
LevelDBOptions : leveldb . LevelDBOptions {
2013-08-05 07:25:47 +00:00
Name : "Metric Membership" ,
Purpose : "Index" ,
2013-08-03 16:46:02 +00:00
Path : baseDirectory + "/metric_membership_index" ,
CacheSizeBytes : * metricMembershipIndexCacheSize ,
} ,
} )
2013-04-28 12:47:43 +00:00
workers . MayFail ( err )
} ,
} ,
{
"Sample Curation Remarks" ,
func ( ) {
var err error
2013-08-10 15:02:28 +00:00
emission . CurationRemarks , err = NewLevelDBCurationRemarker ( LevelDBCurationRemarkerOptions {
2013-08-06 10:00:31 +00:00
LevelDBOptions : leveldb . LevelDBOptions {
Name : "Sample Curation Remarks" ,
Purpose : "Ledger of Progress for Various Curators" ,
Path : baseDirectory + "/curation_remarks" ,
CacheSizeBytes : * curationRemarksCacheSize ,
} ,
} )
2013-04-28 12:47:43 +00:00
workers . MayFail ( err )
2013-02-08 17:03:26 +00:00
} ,
} ,
}
for _ , subsystem := range subsystemOpeners {
opener := subsystem . opener
go opener ( )
}
2013-04-28 12:47:43 +00:00
if ! workers . Wait ( ) {
for _ , err := range workers . Errors ( ) {
2013-08-12 16:22:48 +00:00
glog . Error ( "Could not open storage: " , err )
2013-02-08 17:03:26 +00:00
}
2013-04-28 12:47:43 +00:00
return nil , fmt . Errorf ( "Unable to open metric persistence." )
2013-02-08 17:03:26 +00:00
}
2013-08-07 10:07:35 +00:00
emission . Indexer = & TotalIndexer {
2013-08-12 22:36:12 +00:00
FingerprintToMetric : emission . FingerprintToMetrics ,
LabelNameToFingerprint : emission . LabelNameToFingerprints ,
LabelPairToFingerprint : emission . LabelSetToFingerprints ,
MetricMembership : emission . MetricMembershipIndex ,
2013-08-07 10:07:35 +00:00
}
2013-04-28 12:47:43 +00:00
return emission , nil
2013-02-08 17:03:26 +00:00
}
2013-06-25 12:02:27 +00:00
func ( l * LevelDBMetricPersistence ) AppendSample ( sample * clientmodel . Sample ) ( err error ) {
2013-04-05 16:03:45 +00:00
defer func ( begin time . Time ) {
2013-03-11 21:21:25 +00:00
duration := time . Since ( begin )
2013-02-08 17:03:26 +00:00
2013-03-01 17:51:36 +00:00
recordOutcome ( duration , err , map [ string ] string { operation : appendSample , result : success } , map [ string ] string { operation : appendSample , result : failure } )
2013-04-05 16:03:45 +00:00
} ( time . Now ( ) )
2013-02-08 17:03:26 +00:00
2013-06-25 12:02:27 +00:00
err = l . AppendSamples ( clientmodel . Samples { sample } )
2013-02-08 17:03:26 +00:00
return
}
2013-03-14 22:42:28 +00:00
// groupByFingerprint collects all of the provided samples, groups them
// together by their respective metric fingerprint, and finally sorts
// them chronologically.
2013-06-25 12:02:27 +00:00
func groupByFingerprint ( samples clientmodel . Samples ) map [ clientmodel . Fingerprint ] clientmodel . Samples {
fingerprintToSamples := map [ clientmodel . Fingerprint ] clientmodel . Samples { }
2013-02-08 17:03:26 +00:00
for _ , sample := range samples {
2013-06-25 12:02:27 +00:00
fingerprint := & clientmodel . Fingerprint { }
fingerprint . LoadFromMetric ( sample . Metric )
samples := fingerprintToSamples [ * fingerprint ]
2013-02-08 17:03:26 +00:00
samples = append ( samples , sample )
2013-06-25 12:02:27 +00:00
fingerprintToSamples [ * fingerprint ] = samples
2013-02-08 17:03:26 +00:00
}
2013-05-21 14:11:35 +00:00
sortingSemaphore := make ( chan bool , sortConcurrency )
doneSorting := sync . WaitGroup { }
2013-03-14 22:42:28 +00:00
2013-02-08 17:03:26 +00:00
for _ , samples := range fingerprintToSamples {
2013-03-04 19:43:07 +00:00
doneSorting . Add ( 1 )
2013-03-14 22:42:28 +00:00
2013-06-25 12:02:27 +00:00
sortingSemaphore <- true
go func ( samples clientmodel . Samples ) {
2013-02-08 17:03:26 +00:00
sort . Sort ( samples )
2013-06-25 12:02:27 +00:00
<- sortingSemaphore
2013-03-04 19:43:07 +00:00
doneSorting . Done ( )
2013-02-08 17:03:26 +00:00
} ( samples )
}
2013-03-04 19:43:07 +00:00
doneSorting . Wait ( )
2013-02-08 17:03:26 +00:00
2013-03-14 22:42:28 +00:00
return fingerprintToSamples
}
2013-03-07 19:01:32 +00:00
2013-06-25 12:02:27 +00:00
func ( l * LevelDBMetricPersistence ) refreshHighWatermarks ( groups map [ clientmodel . Fingerprint ] clientmodel . Samples ) ( err error ) {
2013-04-05 16:03:45 +00:00
defer func ( begin time . Time ) {
2013-03-15 02:24:28 +00:00
duration := time . Since ( begin )
recordOutcome ( duration , err , map [ string ] string { operation : refreshHighWatermarks , result : success } , map [ string ] string { operation : refreshHighWatermarks , result : failure } )
2013-04-05 16:03:45 +00:00
} ( time . Now ( ) )
2013-03-15 02:24:28 +00:00
2013-08-05 07:25:47 +00:00
b := FingerprintHighWatermarkMapping { }
for fp , ss := range groups {
if len ( ss ) == 0 {
2013-06-08 08:27:44 +00:00
continue
}
2013-03-15 02:24:28 +00:00
2013-08-05 07:25:47 +00:00
b [ fp ] = ss [ len ( ss ) - 1 ] . Timestamp
2013-03-15 02:24:28 +00:00
}
2013-08-05 07:25:47 +00:00
return l . MetricHighWatermarks . UpdateBatch ( b )
2013-03-15 02:24:28 +00:00
}
2013-06-25 12:02:27 +00:00
func ( l * LevelDBMetricPersistence ) AppendSamples ( samples clientmodel . Samples ) ( err error ) {
2013-04-05 16:03:45 +00:00
defer func ( begin time . Time ) {
2013-03-14 22:42:28 +00:00
duration := time . Since ( begin )
recordOutcome ( duration , err , map [ string ] string { operation : appendSamples , result : success } , map [ string ] string { operation : appendSamples , result : failure } )
2013-04-05 16:03:45 +00:00
} ( time . Now ( ) )
2013-03-14 22:42:28 +00:00
2013-05-21 14:11:35 +00:00
fingerprintToSamples := groupByFingerprint ( samples )
indexErrChan := make ( chan error , 1 )
watermarkErrChan := make ( chan error , 1 )
2013-03-14 22:42:28 +00:00
2013-06-25 12:02:27 +00:00
go func ( groups map [ clientmodel . Fingerprint ] clientmodel . Samples ) {
2013-08-03 16:46:02 +00:00
metrics := FingerprintMetricMapping { }
2013-03-14 23:55:50 +00:00
for fingerprint , samples := range groups {
metrics [ fingerprint ] = samples [ 0 ] . Metric
}
2013-08-07 10:07:35 +00:00
indexErrChan <- l . Indexer . IndexMetrics ( metrics )
2013-03-14 22:42:28 +00:00
} ( fingerprintToSamples )
2013-06-25 12:02:27 +00:00
go func ( groups map [ clientmodel . Fingerprint ] clientmodel . Samples ) {
2013-03-15 02:24:28 +00:00
watermarkErrChan <- l . refreshHighWatermarks ( groups )
} ( fingerprintToSamples )
2013-03-15 01:09:19 +00:00
samplesBatch := leveldb . NewBatch ( )
defer samplesBatch . Close ( )
2013-03-14 22:42:28 +00:00
2013-03-15 01:09:19 +00:00
for fingerprint , group := range fingerprintToSamples {
for {
lengthOfGroup := len ( group )
2013-03-14 22:42:28 +00:00
2013-03-15 01:09:19 +00:00
if lengthOfGroup == 0 {
break
}
2013-03-14 22:42:28 +00:00
2013-03-15 01:09:19 +00:00
take := * leveldbChunkSize
if lengthOfGroup < take {
take = lengthOfGroup
}
2013-03-14 22:42:28 +00:00
2013-03-15 01:09:19 +00:00
chunk := group [ 0 : take ]
group = group [ take : lengthOfGroup ]
2013-03-14 22:42:28 +00:00
2013-06-25 12:02:27 +00:00
key := SampleKey {
2013-05-17 10:58:15 +00:00
Fingerprint : & fingerprint ,
2013-04-21 17:16:15 +00:00
FirstTimestamp : chunk [ 0 ] . Timestamp ,
LastTimestamp : chunk [ take - 1 ] . Timestamp ,
SampleCount : uint32 ( take ) ,
2013-06-25 12:02:27 +00:00
}
2013-03-14 22:42:28 +00:00
2013-03-15 01:09:19 +00:00
value := & dto . SampleValueSeries { }
for _ , sample := range chunk {
value . Value = append ( value . Value , & dto . SampleValueSeries_Value {
Timestamp : proto . Int64 ( sample . Timestamp . Unix ( ) ) ,
2013-06-25 12:02:27 +00:00
Value : proto . Float64 ( float64 ( sample . Value ) ) ,
2013-03-15 01:09:19 +00:00
} )
2013-03-14 22:42:28 +00:00
}
2013-03-15 01:09:19 +00:00
2013-06-25 12:02:27 +00:00
k := & dto . SampleKey { }
key . Dump ( k )
samplesBatch . Put ( k , value )
2013-03-14 22:42:28 +00:00
}
2013-03-15 01:09:19 +00:00
}
2013-03-14 22:42:28 +00:00
2013-05-02 10:49:13 +00:00
err = l . MetricSamples . Commit ( samplesBatch )
2013-03-15 01:09:19 +00:00
if err != nil {
2013-04-05 16:03:45 +00:00
return
2013-03-15 01:09:19 +00:00
}
2013-03-14 22:42:28 +00:00
err = <- indexErrChan
if err != nil {
2013-04-05 16:03:45 +00:00
return
2013-03-14 22:42:28 +00:00
}
2013-02-08 17:03:26 +00:00
2013-03-15 02:24:28 +00:00
err = <- watermarkErrChan
if err != nil {
2013-04-05 16:03:45 +00:00
return
2013-03-15 02:24:28 +00:00
}
2013-02-08 17:03:26 +00:00
return
}
2013-06-25 12:02:27 +00:00
func extractSampleKey ( i leveldb . Iterator ) ( * SampleKey , error ) {
2013-04-21 17:16:15 +00:00
k := & dto . SampleKey { }
2013-06-25 12:02:27 +00:00
err := proto . Unmarshal ( i . Key ( ) , k )
2013-04-21 17:16:15 +00:00
if err != nil {
2013-06-25 12:02:27 +00:00
return nil , err
2013-03-07 02:16:20 +00:00
}
2013-06-25 12:02:27 +00:00
key := & SampleKey { }
key . Load ( k )
2012-12-25 12:50:36 +00:00
2013-06-25 12:02:27 +00:00
return key , nil
2012-12-25 12:50:36 +00:00
}
2013-06-25 12:02:27 +00:00
func extractSampleValues ( i leveldb . Iterator ) ( Values , error ) {
2013-04-22 11:30:16 +00:00
v := & dto . SampleValueSeries { }
2013-06-25 12:02:27 +00:00
err := proto . Unmarshal ( i . Value ( ) , v )
2013-04-22 11:30:16 +00:00
if err != nil {
2013-06-25 12:02:27 +00:00
return nil , err
2012-12-25 12:50:36 +00:00
}
2013-06-25 12:02:27 +00:00
return NewValuesFromDTO ( v ) , nil
2012-12-25 12:50:36 +00:00
}
2013-08-03 16:46:02 +00:00
func ( l * LevelDBMetricPersistence ) hasIndexMetric ( m clientmodel . Metric ) ( value bool , err error ) {
2013-04-05 16:03:45 +00:00
defer func ( begin time . Time ) {
2013-03-11 21:21:25 +00:00
duration := time . Since ( begin )
2013-01-23 16:18:45 +00:00
2013-03-01 17:51:36 +00:00
recordOutcome ( duration , err , map [ string ] string { operation : hasIndexMetric , result : success } , map [ string ] string { operation : hasIndexMetric , result : failure } )
2013-04-05 16:03:45 +00:00
} ( time . Now ( ) )
2013-01-23 16:18:45 +00:00
2013-08-12 22:36:12 +00:00
return l . MetricMembershipIndex . Has ( m )
2012-12-09 15:27:12 +00:00
}
2013-08-03 16:46:02 +00:00
func ( l * LevelDBMetricPersistence ) HasLabelPair ( p * LabelPair ) ( value bool , err error ) {
2013-04-05 16:03:45 +00:00
defer func ( begin time . Time ) {
2013-03-11 21:21:25 +00:00
duration := time . Since ( begin )
2013-01-23 16:18:45 +00:00
2013-03-01 17:51:36 +00:00
recordOutcome ( duration , err , map [ string ] string { operation : hasLabelPair , result : success } , map [ string ] string { operation : hasLabelPair , result : failure } )
2013-04-05 16:03:45 +00:00
} ( time . Now ( ) )
2013-01-23 16:18:45 +00:00
2013-08-12 22:36:12 +00:00
return l . LabelSetToFingerprints . Has ( p )
2012-12-09 15:27:12 +00:00
}
2013-08-03 16:46:02 +00:00
func ( l * LevelDBMetricPersistence ) HasLabelName ( n clientmodel . LabelName ) ( value bool , err error ) {
2013-04-05 16:03:45 +00:00
defer func ( begin time . Time ) {
2013-03-11 21:21:25 +00:00
duration := time . Since ( begin )
2013-01-23 16:18:45 +00:00
2013-03-01 17:51:36 +00:00
recordOutcome ( duration , err , map [ string ] string { operation : hasLabelName , result : success } , map [ string ] string { operation : hasLabelName , result : failure } )
2013-04-05 16:03:45 +00:00
} ( time . Now ( ) )
2013-01-23 16:18:45 +00:00
2013-08-12 22:36:12 +00:00
value , err = l . LabelNameToFingerprints . Has ( n )
2013-01-23 16:18:45 +00:00
return
2012-12-09 15:27:12 +00:00
}
2013-06-25 12:02:27 +00:00
func ( l * LevelDBMetricPersistence ) GetFingerprintsForLabelSet ( labelSet clientmodel . LabelSet ) ( fps clientmodel . Fingerprints , err error ) {
2013-04-05 16:03:45 +00:00
defer func ( begin time . Time ) {
2013-03-11 21:21:25 +00:00
duration := time . Since ( begin )
2012-12-09 15:27:12 +00:00
2013-03-01 17:51:36 +00:00
recordOutcome ( duration , err , map [ string ] string { operation : getFingerprintsForLabelSet , result : success } , map [ string ] string { operation : getFingerprintsForLabelSet , result : failure } )
2013-04-05 16:03:45 +00:00
} ( time . Now ( ) )
2012-12-09 15:27:12 +00:00
2013-01-13 10:15:01 +00:00
sets := [ ] utility . Set { }
2013-06-25 12:02:27 +00:00
for name , value := range labelSet {
2013-08-12 22:36:12 +00:00
fps , _ , err := l . LabelSetToFingerprints . Lookup ( & LabelPair {
2013-08-03 16:46:02 +00:00
Name : name ,
Value : value ,
} )
2012-12-25 12:50:36 +00:00
if err != nil {
2013-08-03 16:46:02 +00:00
return nil , err
2013-06-08 08:27:44 +00:00
}
2012-12-25 12:50:36 +00:00
2013-01-13 10:15:01 +00:00
set := utility . Set { }
2013-08-03 16:46:02 +00:00
for _ , fp := range fps {
2013-05-17 10:58:15 +00:00
set . Add ( * fp )
2012-12-09 15:27:12 +00:00
}
2013-01-13 10:15:01 +00:00
sets = append ( sets , set )
}
numberOfSets := len ( sets )
if numberOfSets == 0 {
2013-06-25 12:02:27 +00:00
return nil , nil
2013-01-13 10:15:01 +00:00
}
base := sets [ 0 ]
for i := 1 ; i < numberOfSets ; i ++ {
base = base . Intersection ( sets [ i ] )
}
for _ , e := range base . Elements ( ) {
2013-06-25 12:02:27 +00:00
fingerprint := e . ( clientmodel . Fingerprint )
2013-05-17 10:58:15 +00:00
fps = append ( fps , & fingerprint )
2012-12-09 15:27:12 +00:00
}
2013-06-25 12:02:27 +00:00
return fps , nil
2012-12-09 15:27:12 +00:00
}
2013-06-25 12:02:27 +00:00
func ( l * LevelDBMetricPersistence ) GetFingerprintsForLabelName ( labelName clientmodel . LabelName ) ( fps clientmodel . Fingerprints , err error ) {
2013-04-05 16:03:45 +00:00
defer func ( begin time . Time ) {
2013-03-11 21:21:25 +00:00
duration := time . Since ( begin )
2012-12-09 15:27:12 +00:00
2013-03-01 17:51:36 +00:00
recordOutcome ( duration , err , map [ string ] string { operation : getFingerprintsForLabelName , result : success } , map [ string ] string { operation : getFingerprintsForLabelName , result : failure } )
2013-04-05 16:03:45 +00:00
} ( time . Now ( ) )
2012-12-09 15:27:12 +00:00
2013-08-03 16:46:02 +00:00
// TODO(matt): Update signature to work with ok.
2013-08-12 22:36:12 +00:00
fps , _ , err = l . LabelNameToFingerprints . Lookup ( labelName )
2012-12-25 12:50:36 +00:00
2013-08-03 16:46:02 +00:00
return fps , err
2012-12-09 15:27:12 +00:00
}
2013-06-25 12:02:27 +00:00
func ( l * LevelDBMetricPersistence ) GetMetricForFingerprint ( f * clientmodel . Fingerprint ) ( m clientmodel . Metric , err error ) {
2013-04-05 16:03:45 +00:00
defer func ( begin time . Time ) {
2013-03-11 21:21:25 +00:00
duration := time . Since ( begin )
2012-12-25 12:50:36 +00:00
2013-03-01 17:51:36 +00:00
recordOutcome ( duration , err , map [ string ] string { operation : getMetricForFingerprint , result : success } , map [ string ] string { operation : getMetricForFingerprint , result : failure } )
2013-04-05 16:03:45 +00:00
} ( time . Now ( ) )
2012-12-25 12:50:36 +00:00
2013-08-03 16:46:02 +00:00
// TODO(matt): Update signature to work with ok.
2013-08-12 22:36:12 +00:00
m , _ , err = l . FingerprintToMetrics . Lookup ( f )
2012-12-12 11:53:34 +00:00
2013-06-08 08:27:44 +00:00
return m , nil
2012-12-12 11:53:34 +00:00
}
2013-06-25 12:02:27 +00:00
func ( l * LevelDBMetricPersistence ) GetValueAtTime ( f * clientmodel . Fingerprint , t time . Time ) Values {
2013-04-18 14:10:52 +00:00
panic ( "Not implemented" )
2012-12-12 11:53:34 +00:00
}
2013-06-25 12:02:27 +00:00
func ( l * LevelDBMetricPersistence ) GetBoundaryValues ( f * clientmodel . Fingerprint , i Interval ) Values {
2013-04-18 14:10:52 +00:00
panic ( "Not implemented" )
2012-12-19 19:34:54 +00:00
}
2013-06-25 12:02:27 +00:00
func ( l * LevelDBMetricPersistence ) GetRangeValues ( f * clientmodel . Fingerprint , i Interval ) Values {
2013-04-18 14:10:52 +00:00
panic ( "Not implemented" )
2012-12-12 11:53:34 +00:00
}
2013-02-06 16:06:39 +00:00
type MetricKeyDecoder struct { }
func ( d * MetricKeyDecoder ) DecodeKey ( in interface { } ) ( out interface { } , err error ) {
2013-03-26 13:46:02 +00:00
unmarshaled := dto . LabelPair { }
err = proto . Unmarshal ( in . ( [ ] byte ) , & unmarshaled )
2013-02-06 16:06:39 +00:00
if err != nil {
return
}
2013-02-08 17:03:26 +00:00
2013-06-25 12:02:27 +00:00
out = LabelPair {
Name : clientmodel . LabelName ( * unmarshaled . Name ) ,
Value : clientmodel . LabelValue ( * unmarshaled . Value ) ,
2013-03-26 13:46:02 +00:00
}
2013-02-08 17:03:26 +00:00
return
2013-02-06 16:06:39 +00:00
}
func ( d * MetricKeyDecoder ) DecodeValue ( in interface { } ) ( out interface { } , err error ) {
return
}
2013-03-26 10:45:56 +00:00
type LabelNameFilter struct {
2013-06-25 12:02:27 +00:00
labelName clientmodel . LabelName
2013-03-26 10:45:56 +00:00
}
2013-02-06 16:06:39 +00:00
2013-03-26 13:46:02 +00:00
func ( f LabelNameFilter ) Filter ( key , value interface { } ) ( filterResult storage . FilterResult ) {
2013-06-25 12:02:27 +00:00
labelPair , ok := key . ( LabelPair )
2013-03-26 13:46:02 +00:00
if ok && labelPair . Name == f . labelName {
2013-02-07 10:38:01 +00:00
return storage . ACCEPT
}
return storage . SKIP
2013-02-06 16:06:39 +00:00
}
2013-03-26 10:45:56 +00:00
type CollectLabelValuesOp struct {
2013-06-25 12:02:27 +00:00
labelValues [ ] clientmodel . LabelValue
2013-02-06 16:06:39 +00:00
}
2013-03-26 10:45:56 +00:00
func ( op * CollectLabelValuesOp ) Operate ( key , value interface { } ) ( err * storage . OperatorError ) {
2013-06-25 12:02:27 +00:00
labelPair := key . ( LabelPair )
op . labelValues = append ( op . labelValues , clientmodel . LabelValue ( labelPair . Value ) )
2013-02-06 16:06:39 +00:00
return
}
2013-06-25 12:02:27 +00:00
func ( l * LevelDBMetricPersistence ) GetAllValuesForLabel ( labelName clientmodel . LabelName ) ( values clientmodel . LabelValues , err error ) {
2013-03-26 10:45:56 +00:00
filter := & LabelNameFilter {
labelName : labelName ,
}
labelValuesOp := & CollectLabelValuesOp { }
2013-02-06 16:06:39 +00:00
2013-08-12 22:36:12 +00:00
_ , err = l . LabelSetToFingerprints . ForEach ( & MetricKeyDecoder { } , filter , labelValuesOp )
2013-02-06 16:06:39 +00:00
if err != nil {
return
}
2013-03-26 10:45:56 +00:00
values = labelValuesOp . labelValues
2013-02-06 16:06:39 +00:00
return
}
2013-02-08 17:03:26 +00:00
2013-08-06 10:00:31 +00:00
// Prune compacts each database's keyspace serially.
2013-05-10 14:41:02 +00:00
//
// Beware that it would probably be imprudent to run this on a live user-facing
// server due to latency implications.
2013-08-05 15:31:49 +00:00
func ( l * LevelDBMetricPersistence ) Prune ( ) {
l . CurationRemarks . Prune ( )
2013-08-12 22:36:12 +00:00
l . FingerprintToMetrics . Prune ( )
l . LabelNameToFingerprints . Prune ( )
l . LabelSetToFingerprints . Prune ( )
2013-08-05 15:31:49 +00:00
l . MetricHighWatermarks . Prune ( )
2013-08-12 22:36:12 +00:00
l . MetricMembershipIndex . Prune ( )
2013-08-05 15:31:49 +00:00
l . MetricSamples . Prune ( )
2013-05-10 23:02:57 +00:00
}
2013-08-06 10:00:31 +00:00
func ( l * LevelDBMetricPersistence ) Sizes ( ) ( total uint64 , err error ) {
2013-05-10 23:02:57 +00:00
size := uint64 ( 0 )
2013-08-06 10:00:31 +00:00
if size , _ , err = l . CurationRemarks . Size ( ) ; err != nil {
2013-05-10 23:02:57 +00:00
return 0 , err
2013-05-10 14:41:02 +00:00
}
2013-05-10 23:02:57 +00:00
total += size
2013-08-12 22:36:12 +00:00
if size , _ , err = l . FingerprintToMetrics . Size ( ) ; err != nil {
2013-08-05 07:25:47 +00:00
return 0 , err
}
total += size
2013-05-10 23:02:57 +00:00
2013-08-12 22:36:12 +00:00
if size , _ , err = l . LabelNameToFingerprints . Size ( ) ; err != nil {
2013-08-05 07:25:47 +00:00
return 0 , err
}
total += size
2013-05-10 23:02:57 +00:00
2013-08-12 22:36:12 +00:00
if size , _ , err = l . LabelSetToFingerprints . Size ( ) ; err != nil {
2013-08-05 07:25:47 +00:00
return 0 , err
}
total += size
2013-05-10 23:02:57 +00:00
2013-08-05 07:25:47 +00:00
if size , _ , err = l . MetricHighWatermarks . Size ( ) ; err != nil {
2013-05-10 23:02:57 +00:00
return 0 , err
2013-05-10 14:41:02 +00:00
}
2013-05-10 23:02:57 +00:00
total += size
2013-08-12 22:36:12 +00:00
if size , _ , err = l . MetricMembershipIndex . Size ( ) ; err != nil {
2013-08-05 07:25:47 +00:00
return 0 , err
}
total += size
2013-05-10 23:02:57 +00:00
2013-08-06 10:00:31 +00:00
if size , err = l . MetricSamples . Size ( ) ; err != nil {
2013-05-10 23:02:57 +00:00
return 0 , err
2013-05-10 14:41:02 +00:00
}
2013-05-10 23:02:57 +00:00
total += size
2013-05-10 14:41:02 +00:00
2013-05-10 23:02:57 +00:00
return total , nil
2013-05-10 14:41:02 +00:00
}
2013-05-14 09:21:27 +00:00
2013-08-05 15:31:49 +00:00
func ( l * LevelDBMetricPersistence ) States ( ) raw . DatabaseStates {
return raw . DatabaseStates {
2013-08-05 07:25:47 +00:00
l . CurationRemarks . State ( ) ,
2013-08-12 22:36:12 +00:00
l . FingerprintToMetrics . State ( ) ,
l . LabelNameToFingerprints . State ( ) ,
l . LabelSetToFingerprints . State ( ) ,
2013-08-05 07:25:47 +00:00
l . MetricHighWatermarks . State ( ) ,
2013-08-12 22:36:12 +00:00
l . MetricMembershipIndex . State ( ) ,
2013-08-05 07:25:47 +00:00
l . MetricSamples . State ( ) ,
}
2013-05-14 09:21:27 +00:00
}