mirror of
https://github.com/prometheus/prometheus
synced 2024-12-28 09:42:22 +00:00
Merge pull request #259 from prometheus/optimize/fingerprinting-time
Optimize fingerprinting time and repointerize fingerprint usage
This commit is contained in:
commit
32c3510e66
@ -57,10 +57,10 @@ func NewCurationRemarkFromDTO(d *dto.CurationValue) CurationRemark {
|
||||
}
|
||||
}
|
||||
|
||||
// CurationKey provides a representation of dto.CurationKey with asociated
|
||||
// CurationKey provides a representation of dto.CurationKey with associated
|
||||
// business logic methods attached to it to enhance code readability.
|
||||
type CurationKey struct {
|
||||
Fingerprint Fingerprint
|
||||
Fingerprint *Fingerprint
|
||||
ProcessorMessageRaw []byte
|
||||
ProcessorMessageTypeName string
|
||||
IgnoreYoungerThan time.Duration
|
||||
|
@ -113,7 +113,7 @@ func LabelNameToDTO(l *LabelName) *dto.LabelName {
|
||||
}
|
||||
}
|
||||
|
||||
func FingerprintToDTO(f Fingerprint) *dto.Fingerprint {
|
||||
func FingerprintToDTO(f *Fingerprint) *dto.Fingerprint {
|
||||
return &dto.Fingerprint{
|
||||
Signature: proto.String(f.ToRowKey()),
|
||||
}
|
||||
|
@ -30,22 +30,8 @@ const (
|
||||
rowKeyDelimiter = "-"
|
||||
)
|
||||
|
||||
// Provides a compact representation of a Metric.
|
||||
type Fingerprint interface {
|
||||
// Transforms the fingerprint into a database row key.
|
||||
ToRowKey() string
|
||||
Hash() uint64
|
||||
FirstCharacterOfFirstLabelName() string
|
||||
LabelMatterLength() uint
|
||||
LastCharacterOfLastLabelValue() string
|
||||
ToDTO() *dto.Fingerprint
|
||||
Less(Fingerprint) bool
|
||||
Equal(Fingerprint) bool
|
||||
String() string
|
||||
}
|
||||
|
||||
// Builds a Fingerprint from a row key.
|
||||
func NewFingerprintFromRowKey(rowKey string) Fingerprint {
|
||||
func NewFingerprintFromRowKey(rowKey string) *Fingerprint {
|
||||
components := strings.Split(rowKey, rowKeyDelimiter)
|
||||
hash, err := strconv.ParseUint(components[0], 10, 64)
|
||||
if err != nil {
|
||||
@ -56,7 +42,7 @@ func NewFingerprintFromRowKey(rowKey string) Fingerprint {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return fingerprint{
|
||||
return &Fingerprint{
|
||||
hash: hash,
|
||||
firstCharacterOfFirstLabelName: components[1],
|
||||
labelMatterLength: uint(labelMatterLength),
|
||||
@ -65,12 +51,12 @@ func NewFingerprintFromRowKey(rowKey string) Fingerprint {
|
||||
}
|
||||
|
||||
// Builds a Fingerprint from a datastore entry.
|
||||
func NewFingerprintFromDTO(f *dto.Fingerprint) Fingerprint {
|
||||
func NewFingerprintFromDTO(f *dto.Fingerprint) *Fingerprint {
|
||||
return NewFingerprintFromRowKey(*f.Signature)
|
||||
}
|
||||
|
||||
// Decomposes a Metric into a Fingerprint.
|
||||
func NewFingerprintFromMetric(metric Metric) Fingerprint {
|
||||
func NewFingerprintFromMetric(metric Metric) *Fingerprint {
|
||||
labelLength := len(metric)
|
||||
labelNames := make([]string, 0, labelLength)
|
||||
|
||||
@ -103,7 +89,7 @@ func NewFingerprintFromMetric(metric Metric) Fingerprint {
|
||||
summer.Write([]byte(labelValue))
|
||||
}
|
||||
|
||||
return fingerprint{
|
||||
return &Fingerprint{
|
||||
firstCharacterOfFirstLabelName: firstCharacterOfFirstLabelName,
|
||||
hash: binary.LittleEndian.Uint64(summer.Sum(nil)),
|
||||
labelMatterLength: uint(labelMatterLength % 10),
|
||||
@ -112,7 +98,7 @@ func NewFingerprintFromMetric(metric Metric) Fingerprint {
|
||||
}
|
||||
|
||||
// A simplified representation of an entity.
|
||||
type fingerprint struct {
|
||||
type Fingerprint struct {
|
||||
// A hashed representation of the underyling entity. For our purposes, FNV-1A
|
||||
// 64-bit is used.
|
||||
hash uint64
|
||||
@ -121,41 +107,69 @@ type fingerprint struct {
|
||||
lastCharacterOfLastLabelValue string
|
||||
}
|
||||
|
||||
func (f fingerprint) String() string {
|
||||
func (f *Fingerprint) String() string {
|
||||
return f.ToRowKey()
|
||||
}
|
||||
|
||||
func (f fingerprint) ToRowKey() string {
|
||||
// Transforms the Fingerprint into a database row key.
|
||||
func (f *Fingerprint) ToRowKey() string {
|
||||
return strings.Join([]string{fmt.Sprintf("%020d", f.hash), f.firstCharacterOfFirstLabelName, fmt.Sprint(f.labelMatterLength), f.lastCharacterOfLastLabelValue}, rowKeyDelimiter)
|
||||
}
|
||||
|
||||
func (f fingerprint) ToDTO() *dto.Fingerprint {
|
||||
func (f *Fingerprint) ToDTO() *dto.Fingerprint {
|
||||
return &dto.Fingerprint{
|
||||
Signature: proto.String(f.ToRowKey()),
|
||||
}
|
||||
}
|
||||
|
||||
func (f fingerprint) Hash() uint64 {
|
||||
func (f *Fingerprint) Hash() uint64 {
|
||||
return f.hash
|
||||
}
|
||||
|
||||
func (f fingerprint) FirstCharacterOfFirstLabelName() string {
|
||||
func (f *Fingerprint) FirstCharacterOfFirstLabelName() string {
|
||||
return f.firstCharacterOfFirstLabelName
|
||||
}
|
||||
|
||||
func (f fingerprint) LabelMatterLength() uint {
|
||||
func (f *Fingerprint) LabelMatterLength() uint {
|
||||
return f.labelMatterLength
|
||||
}
|
||||
|
||||
func (f fingerprint) LastCharacterOfLastLabelValue() string {
|
||||
func (f *Fingerprint) LastCharacterOfLastLabelValue() string {
|
||||
return f.lastCharacterOfLastLabelValue
|
||||
}
|
||||
|
||||
func (f fingerprint) Less(o Fingerprint) bool {
|
||||
return f.String() < o.String()
|
||||
func (f *Fingerprint) Less(o *Fingerprint) bool {
|
||||
if f.hash < o.hash {
|
||||
return true
|
||||
}
|
||||
if f.hash > o.hash {
|
||||
return false
|
||||
}
|
||||
|
||||
if f.firstCharacterOfFirstLabelName < o.firstCharacterOfFirstLabelName {
|
||||
return true
|
||||
}
|
||||
if f.firstCharacterOfFirstLabelName > o.firstCharacterOfFirstLabelName {
|
||||
return false
|
||||
}
|
||||
|
||||
if f.labelMatterLength < o.labelMatterLength {
|
||||
return true
|
||||
}
|
||||
if f.labelMatterLength > o.labelMatterLength {
|
||||
return false
|
||||
}
|
||||
|
||||
if f.lastCharacterOfLastLabelValue < o.lastCharacterOfLastLabelValue {
|
||||
return true
|
||||
}
|
||||
if f.lastCharacterOfLastLabelValue > o.lastCharacterOfLastLabelValue {
|
||||
return false
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (f fingerprint) Equal(o Fingerprint) (equal bool) {
|
||||
func (f *Fingerprint) Equal(o *Fingerprint) (equal bool) {
|
||||
equal = f.Hash() == o.Hash()
|
||||
if !equal {
|
||||
return
|
||||
@ -178,7 +192,7 @@ func (f fingerprint) Equal(o Fingerprint) (equal bool) {
|
||||
|
||||
// Represents a collection of Fingerprint subject to a given natural sorting
|
||||
// scheme.
|
||||
type Fingerprints []Fingerprint
|
||||
type Fingerprints []*Fingerprint
|
||||
|
||||
func (f Fingerprints) Len() int {
|
||||
return len(f)
|
||||
|
@ -14,11 +14,12 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestFingerprintComparison(t *testing.T) {
|
||||
fingerprints := []fingerprint{
|
||||
fingerprints := []*Fingerprint{
|
||||
{
|
||||
hash: 0,
|
||||
firstCharacterOfFirstLabelName: "b",
|
||||
@ -66,3 +67,38 @@ func TestFingerprintComparison(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkFingerprinting(b *testing.B) {
|
||||
b.StopTimer()
|
||||
fps := []*Fingerprint{
|
||||
{
|
||||
hash: 0,
|
||||
firstCharacterOfFirstLabelName: "a",
|
||||
labelMatterLength: 2,
|
||||
lastCharacterOfLastLabelValue: "z",
|
||||
},
|
||||
{
|
||||
hash: 0,
|
||||
firstCharacterOfFirstLabelName: "a",
|
||||
labelMatterLength: 2,
|
||||
lastCharacterOfLastLabelValue: "z",
|
||||
},
|
||||
}
|
||||
for i := 0; i < 10; i++ {
|
||||
fps[0].Less(fps[1])
|
||||
}
|
||||
b.Logf("N: %v", b.N)
|
||||
b.StartTimer()
|
||||
|
||||
var pre runtime.MemStats
|
||||
runtime.ReadMemStats(&pre)
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
fps[0].Less(fps[1])
|
||||
}
|
||||
|
||||
var post runtime.MemStats
|
||||
runtime.ReadMemStats(&post)
|
||||
|
||||
b.Logf("allocs: %d items: ", post.TotalAlloc-pre.TotalAlloc)
|
||||
}
|
||||
|
@ -204,7 +204,7 @@ func (v Values) ToDTO() (out *dto.SampleValueSeries) {
|
||||
return
|
||||
}
|
||||
|
||||
func (v Values) ToSampleKey(f Fingerprint) SampleKey {
|
||||
func (v Values) ToSampleKey(f *Fingerprint) SampleKey {
|
||||
return SampleKey{
|
||||
Fingerprint: f,
|
||||
FirstTimestamp: v[0].Timestamp,
|
||||
|
@ -24,7 +24,7 @@ import (
|
||||
// SampleKey models the business logic around the data-transfer object
|
||||
// SampleKey.
|
||||
type SampleKey struct {
|
||||
Fingerprint Fingerprint
|
||||
Fingerprint *Fingerprint
|
||||
FirstTimestamp time.Time
|
||||
LastTimestamp time.Time
|
||||
SampleCount uint32
|
||||
|
@ -103,7 +103,7 @@ func (rule AlertingRule) Eval(timestamp time.Time, storage *metric.TieredStorage
|
||||
// Create pending alerts for any new vector elements in the alert expression.
|
||||
resultFingerprints := utility.Set{}
|
||||
for _, sample := range exprResult {
|
||||
fp := model.NewFingerprintFromMetric(sample.Metric)
|
||||
fp := *model.NewFingerprintFromMetric(sample.Metric)
|
||||
resultFingerprints.Add(fp)
|
||||
|
||||
if _, ok := rule.activeAlerts[fp]; !ok {
|
||||
|
@ -58,8 +58,8 @@ func (analyzer *QueryAnalyzer) Visit(node Node) {
|
||||
}
|
||||
n.fingerprints = fingerprints
|
||||
for _, fingerprint := range fingerprints {
|
||||
if !analyzer.IntervalRanges[fingerprint] {
|
||||
analyzer.IntervalRanges[fingerprint] = true
|
||||
if !analyzer.IntervalRanges[*fingerprint] {
|
||||
analyzer.IntervalRanges[*fingerprint] = true
|
||||
}
|
||||
}
|
||||
case *MatrixLiteral:
|
||||
@ -73,12 +73,12 @@ func (analyzer *QueryAnalyzer) Visit(node Node) {
|
||||
interval := n.interval
|
||||
// If an interval has already been recorded for this fingerprint, merge
|
||||
// it with the current interval.
|
||||
if oldInterval, ok := analyzer.FullRanges[fingerprint]; ok {
|
||||
if oldInterval, ok := analyzer.FullRanges[*fingerprint]; ok {
|
||||
if oldInterval > interval {
|
||||
interval = oldInterval
|
||||
}
|
||||
}
|
||||
analyzer.FullRanges[fingerprint] = interval
|
||||
analyzer.FullRanges[*fingerprint] = interval
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ type CurationState struct {
|
||||
Active bool
|
||||
Name string
|
||||
Limit time.Duration
|
||||
Fingerprint model.Fingerprint
|
||||
Fingerprint *model.Fingerprint
|
||||
}
|
||||
|
||||
// watermarkFilter determines whether to include or exclude candidate
|
||||
@ -206,7 +206,7 @@ func (w watermarkFilter) shouldStop() bool {
|
||||
return len(w.stop) != 0
|
||||
}
|
||||
|
||||
func getCurationRemark(states raw.Persistence, processor Processor, ignoreYoungerThan time.Duration, fingerprint model.Fingerprint) (remark *model.CurationRemark, err error) {
|
||||
func getCurationRemark(states raw.Persistence, processor Processor, ignoreYoungerThan time.Duration, fingerprint *model.Fingerprint) (remark *model.CurationRemark, err error) {
|
||||
rawSignature, err := processor.Signature()
|
||||
if err != nil {
|
||||
return
|
||||
@ -247,7 +247,7 @@ func getCurationRemark(states raw.Persistence, processor Processor, ignoreYounge
|
||||
}
|
||||
|
||||
func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult) {
|
||||
fingerprint := key.(model.Fingerprint)
|
||||
fingerprint := key.(*model.Fingerprint)
|
||||
|
||||
defer func() {
|
||||
labels := map[string]string{
|
||||
@ -304,7 +304,7 @@ func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult)
|
||||
|
||||
// curationConsistent determines whether the given metric is in a dirty state
|
||||
// and needs curation.
|
||||
func (w watermarkFilter) curationConsistent(f model.Fingerprint, watermark model.Watermark) (consistent bool, err error) {
|
||||
func (w watermarkFilter) curationConsistent(f *model.Fingerprint, watermark model.Watermark) (consistent bool, err error) {
|
||||
curationRemark, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, f)
|
||||
if err != nil {
|
||||
return
|
||||
@ -317,7 +317,7 @@ func (w watermarkFilter) curationConsistent(f model.Fingerprint, watermark model
|
||||
}
|
||||
|
||||
func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorError) {
|
||||
fingerprint := key.(model.Fingerprint)
|
||||
fingerprint := key.(*model.Fingerprint)
|
||||
|
||||
seriesFrontier, err := newSeriesFrontier(fingerprint, w.diskFrontier, w.sampleIterator)
|
||||
if err != nil || seriesFrontier == nil {
|
||||
@ -371,7 +371,7 @@ func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorEr
|
||||
return
|
||||
}
|
||||
|
||||
func (w watermarkOperator) refreshCurationRemark(f model.Fingerprint, finished time.Time) (err error) {
|
||||
func (w watermarkOperator) refreshCurationRemark(f *model.Fingerprint, finished time.Time) (err error) {
|
||||
signature, err := w.processor.Signature()
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -29,9 +29,9 @@ import (
|
||||
// This is used to reduce the burden associated with LevelDB iterator
|
||||
// management.
|
||||
type diskFrontier struct {
|
||||
firstFingerprint model.Fingerprint
|
||||
firstFingerprint *model.Fingerprint
|
||||
firstSupertime time.Time
|
||||
lastFingerprint model.Fingerprint
|
||||
lastFingerprint *model.Fingerprint
|
||||
lastSupertime time.Time
|
||||
}
|
||||
|
||||
@ -39,7 +39,7 @@ func (f diskFrontier) String() string {
|
||||
return fmt.Sprintf("diskFrontier from %s at %s to %s at %s", f.firstFingerprint.ToRowKey(), f.firstSupertime, f.lastFingerprint.ToRowKey(), f.lastSupertime)
|
||||
}
|
||||
|
||||
func (f diskFrontier) ContainsFingerprint(fingerprint model.Fingerprint) bool {
|
||||
func (f diskFrontier) ContainsFingerprint(fingerprint *model.Fingerprint) bool {
|
||||
return !(fingerprint.Less(f.firstFingerprint) || f.lastFingerprint.Less(fingerprint))
|
||||
}
|
||||
|
||||
@ -89,7 +89,7 @@ func (f seriesFrontier) String() string {
|
||||
// newSeriesFrontier furnishes a populated diskFrontier for a given
|
||||
// fingerprint. A nil diskFrontier will be returned if the series cannot
|
||||
// be found in the store.
|
||||
func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) (s *seriesFrontier, err error) {
|
||||
func newSeriesFrontier(f *model.Fingerprint, d diskFrontier, i leveldb.Iterator) (s *seriesFrontier, err error) {
|
||||
lowerSeek := firstSupertime
|
||||
upperSeek := lastSupertime
|
||||
|
||||
|
@ -45,11 +45,11 @@ type MetricPersistence interface {
|
||||
GetFingerprintsForLabelName(model.LabelName) (model.Fingerprints, error)
|
||||
|
||||
// Get the metric associated with the provided fingerprint.
|
||||
GetMetricForFingerprint(model.Fingerprint) (model.Metric, error)
|
||||
GetMetricForFingerprint(*model.Fingerprint) (model.Metric, error)
|
||||
|
||||
GetValueAtTime(model.Fingerprint, time.Time) model.Values
|
||||
GetBoundaryValues(model.Fingerprint, model.Interval) (first model.Values, second model.Values)
|
||||
GetRangeValues(model.Fingerprint, model.Interval) model.Values
|
||||
GetValueAtTime(*model.Fingerprint, time.Time) model.Values
|
||||
GetBoundaryValues(*model.Fingerprint, model.Interval) (first model.Values, second model.Values)
|
||||
GetRangeValues(*model.Fingerprint, model.Interval) model.Values
|
||||
|
||||
ForEachSample(IteratorsForFingerprintBuilder) (err error)
|
||||
|
||||
@ -64,16 +64,16 @@ type MetricPersistence interface {
|
||||
// View provides view of the values in the datastore subject to the request of a
|
||||
// preloading operation.
|
||||
type View interface {
|
||||
GetValueAtTime(model.Fingerprint, time.Time) model.Values
|
||||
GetBoundaryValues(model.Fingerprint, model.Interval) (first model.Values, second model.Values)
|
||||
GetRangeValues(model.Fingerprint, model.Interval) model.Values
|
||||
GetValueAtTime(*model.Fingerprint, time.Time) model.Values
|
||||
GetBoundaryValues(*model.Fingerprint, model.Interval) (first model.Values, second model.Values)
|
||||
GetRangeValues(*model.Fingerprint, model.Interval) model.Values
|
||||
|
||||
// Destroy this view.
|
||||
Close()
|
||||
}
|
||||
|
||||
type Series interface {
|
||||
Fingerprint() model.Fingerprint
|
||||
Fingerprint() *model.Fingerprint
|
||||
Metric() model.Metric
|
||||
}
|
||||
|
||||
|
@ -193,7 +193,7 @@ func groupByFingerprint(samples model.Samples) map[model.Fingerprint]model.Sampl
|
||||
)
|
||||
|
||||
for _, sample := range samples {
|
||||
fingerprint := model.NewFingerprintFromMetric(sample.Metric)
|
||||
fingerprint := *model.NewFingerprintFromMetric(sample.Metric)
|
||||
samples := fingerprintToSamples[fingerprint]
|
||||
samples = append(samples, sample)
|
||||
fingerprintToSamples[fingerprint] = samples
|
||||
@ -277,7 +277,7 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint
|
||||
}
|
||||
|
||||
for _, fingerprint := range fingerprints {
|
||||
fingerprintSet.Add(fingerprint)
|
||||
fingerprintSet.Add(*fingerprint)
|
||||
}
|
||||
}
|
||||
|
||||
@ -291,8 +291,9 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint
|
||||
|
||||
for labelName, fingerprintSet := range labelNameFingerprints {
|
||||
fingerprints := model.Fingerprints{}
|
||||
for fingerprint := range fingerprintSet {
|
||||
fingerprints = append(fingerprints, fingerprint.(model.Fingerprint))
|
||||
for e := range fingerprintSet {
|
||||
fingerprint := e.(model.Fingerprint)
|
||||
fingerprints = append(fingerprints, &fingerprint)
|
||||
}
|
||||
|
||||
sort.Sort(fingerprints)
|
||||
@ -348,7 +349,7 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint
|
||||
}
|
||||
|
||||
for _, fingerprint := range fingerprints {
|
||||
fingerprintSet.Add(fingerprint)
|
||||
fingerprintSet.Add(*fingerprint)
|
||||
}
|
||||
}
|
||||
|
||||
@ -362,8 +363,9 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint
|
||||
|
||||
for labelPair, fingerprintSet := range labelPairFingerprints {
|
||||
fingerprints := model.Fingerprints{}
|
||||
for fingerprint := range fingerprintSet {
|
||||
fingerprints = append(fingerprints, fingerprint.(model.Fingerprint))
|
||||
for e := range fingerprintSet {
|
||||
fingerprint := e.(model.Fingerprint)
|
||||
fingerprints = append(fingerprints, &fingerprint)
|
||||
}
|
||||
|
||||
sort.Sort(fingerprints)
|
||||
@ -578,7 +580,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
|
||||
group = group[take:lengthOfGroup]
|
||||
|
||||
key := model.SampleKey{
|
||||
Fingerprint: fingerprint,
|
||||
Fingerprint: &fingerprint,
|
||||
FirstTimestamp: chunk[0].Timestamp,
|
||||
LastTimestamp: chunk[take-1].Timestamp,
|
||||
SampleCount: uint32(take),
|
||||
@ -722,7 +724,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab
|
||||
|
||||
for _, m := range unmarshaled.Member {
|
||||
fp := model.NewFingerprintFromRowKey(*m.Signature)
|
||||
set.Add(fp)
|
||||
set.Add(*fp)
|
||||
}
|
||||
|
||||
sets = append(sets, set)
|
||||
@ -739,7 +741,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab
|
||||
}
|
||||
for _, e := range base.Elements() {
|
||||
fingerprint := e.(model.Fingerprint)
|
||||
fps = append(fps, fingerprint)
|
||||
fps = append(fps, &fingerprint)
|
||||
}
|
||||
|
||||
return
|
||||
@ -772,7 +774,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L
|
||||
return
|
||||
}
|
||||
|
||||
func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint) (m model.Metric, err error) {
|
||||
func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint) (m model.Metric, err error) {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
@ -799,15 +801,15 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint)
|
||||
return
|
||||
}
|
||||
|
||||
func (l LevelDBMetricPersistence) GetValueAtTime(f model.Fingerprint, t time.Time) (samples model.Values) {
|
||||
func (l LevelDBMetricPersistence) GetValueAtTime(f *model.Fingerprint, t time.Time) (samples model.Values) {
|
||||
panic("Not implemented")
|
||||
}
|
||||
|
||||
func (l LevelDBMetricPersistence) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first model.Values, second model.Values) {
|
||||
func (l LevelDBMetricPersistence) GetBoundaryValues(f *model.Fingerprint, i model.Interval) (first model.Values, second model.Values) {
|
||||
panic("Not implemented")
|
||||
}
|
||||
|
||||
func (l *LevelDBMetricPersistence) GetRangeValues(f model.Fingerprint, i model.Interval) (samples model.Values) {
|
||||
func (l *LevelDBMetricPersistence) GetRangeValues(f *model.Fingerprint, i model.Interval) (samples model.Values) {
|
||||
panic("Not implemented")
|
||||
}
|
||||
|
||||
|
@ -114,11 +114,11 @@ func (s *memorySeriesStorage) AppendSamples(samples model.Samples) error {
|
||||
func (s *memorySeriesStorage) AppendSample(sample model.Sample) error {
|
||||
metric := sample.Metric
|
||||
fingerprint := model.NewFingerprintFromMetric(metric)
|
||||
series, ok := s.fingerprintToSeries[fingerprint]
|
||||
series, ok := s.fingerprintToSeries[*fingerprint]
|
||||
|
||||
if !ok {
|
||||
series = newStream(metric)
|
||||
s.fingerprintToSeries[fingerprint] = series
|
||||
s.fingerprintToSeries[*fingerprint] = series
|
||||
|
||||
for k, v := range metric {
|
||||
labelPair := model.LabelPair{
|
||||
@ -142,12 +142,12 @@ func (s *memorySeriesStorage) AppendSample(sample model.Sample) error {
|
||||
|
||||
// Append raw sample, bypassing indexing. Only used to add data to views, which
|
||||
// don't need to lookup by metric.
|
||||
func (s *memorySeriesStorage) appendSampleWithoutIndexing(f model.Fingerprint, timestamp time.Time, value model.SampleValue) {
|
||||
series, ok := s.fingerprintToSeries[f]
|
||||
func (s *memorySeriesStorage) appendSampleWithoutIndexing(f *model.Fingerprint, timestamp time.Time, value model.SampleValue) {
|
||||
series, ok := s.fingerprintToSeries[*f]
|
||||
|
||||
if !ok {
|
||||
series = newStream(model.Metric{})
|
||||
s.fingerprintToSeries[f] = series
|
||||
s.fingerprintToSeries[*f] = series
|
||||
}
|
||||
|
||||
series.add(timestamp, value)
|
||||
@ -164,7 +164,7 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fing
|
||||
}]
|
||||
set := utility.Set{}
|
||||
for _, fingerprint := range values {
|
||||
set.Add(fingerprint)
|
||||
set.Add(*fingerprint)
|
||||
}
|
||||
sets = append(sets, set)
|
||||
}
|
||||
@ -180,7 +180,7 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fing
|
||||
}
|
||||
for _, e := range base.Elements() {
|
||||
fingerprint := e.(model.Fingerprint)
|
||||
fingerprints = append(fingerprints, fingerprint)
|
||||
fingerprints = append(fingerprints, &fingerprint)
|
||||
}
|
||||
|
||||
return fingerprints, nil
|
||||
@ -194,8 +194,8 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelName(l model.LabelName) (fi
|
||||
return fingerprints, nil
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) GetMetricForFingerprint(f model.Fingerprint) (model.Metric, error) {
|
||||
series, ok := s.fingerprintToSeries[f]
|
||||
func (s *memorySeriesStorage) GetMetricForFingerprint(f *model.Fingerprint) (model.Metric, error) {
|
||||
series, ok := s.fingerprintToSeries[*f]
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
@ -208,8 +208,8 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(f model.Fingerprint) (mode
|
||||
return metric, nil
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) GetValueAtTime(f model.Fingerprint, t time.Time) (samples model.Values) {
|
||||
series, ok := s.fingerprintToSeries[f]
|
||||
func (s *memorySeriesStorage) GetValueAtTime(f *model.Fingerprint, t time.Time) (samples model.Values) {
|
||||
series, ok := s.fingerprintToSeries[*f]
|
||||
if !ok {
|
||||
return samples
|
||||
}
|
||||
@ -249,12 +249,12 @@ func (s *memorySeriesStorage) GetValueAtTime(f model.Fingerprint, t time.Time) (
|
||||
return samples
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) GetBoundaryValues(f model.Fingerprint, i model.Interval) (model.Values, model.Values) {
|
||||
func (s *memorySeriesStorage) GetBoundaryValues(f *model.Fingerprint, i model.Interval) (model.Values, model.Values) {
|
||||
return s.GetValueAtTime(f, i.OldestInclusive), s.GetValueAtTime(f, i.NewestInclusive)
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) GetRangeValues(f model.Fingerprint, i model.Interval) (samples model.Values) {
|
||||
series, ok := s.fingerprintToSeries[f]
|
||||
func (s *memorySeriesStorage) GetRangeValues(f *model.Fingerprint, i model.Interval) (samples model.Values) {
|
||||
series, ok := s.fingerprintToSeries[*f]
|
||||
if !ok {
|
||||
return samples
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ type Processor interface {
|
||||
//
|
||||
// Upon completion or error, the last time at which the processor finished
|
||||
// shall be emitted in addition to any errors.
|
||||
Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error)
|
||||
Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint *model.Fingerprint) (lastCurated time.Time, err error)
|
||||
}
|
||||
|
||||
// CompactionProcessor combines sparse values in the database together such
|
||||
@ -83,7 +83,7 @@ func (p CompactionProcessor) String() string {
|
||||
return fmt.Sprintf("compactionProcessor for minimum group size %d", p.MinimumGroupSize)
|
||||
}
|
||||
|
||||
func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) {
|
||||
func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint *model.Fingerprint) (lastCurated time.Time, err error) {
|
||||
var pendingBatch raw.Batch = nil
|
||||
|
||||
defer func() {
|
||||
@ -262,7 +262,7 @@ func (p DeletionProcessor) String() string {
|
||||
return "deletionProcessor"
|
||||
}
|
||||
|
||||
func (p DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) {
|
||||
func (p DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint *model.Fingerprint) (lastCurated time.Time, err error) {
|
||||
var pendingBatch raw.Batch = nil
|
||||
|
||||
defer func() {
|
||||
|
@ -21,7 +21,7 @@ import (
|
||||
|
||||
// scanJob models a range of queries.
|
||||
type scanJob struct {
|
||||
fingerprint model.Fingerprint
|
||||
fingerprint *model.Fingerprint
|
||||
operations ops
|
||||
}
|
||||
|
||||
|
@ -188,7 +188,7 @@ func AppendSampleAsPureSingleEntityAppendTests(p MetricPersistence, t test.Teste
|
||||
}
|
||||
}
|
||||
|
||||
func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp model.Fingerprint, i model.Interval) (samples model.Values, err error) {
|
||||
func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp *model.Fingerprint, i model.Interval) (samples model.Values, err error) {
|
||||
k := &dto.SampleKey{
|
||||
Fingerprint: fp.ToDTO(),
|
||||
Timestamp: indexable.EncodeTime(i.OldestInclusive),
|
||||
|
@ -471,7 +471,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
|
||||
return
|
||||
}
|
||||
|
||||
func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk model.Values) {
|
||||
func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint *model.Fingerprint, ts time.Time) (chunk model.Values) {
|
||||
var (
|
||||
targetKey = &dto.SampleKey{
|
||||
Fingerprint: fingerprint.ToDTO(),
|
||||
@ -582,17 +582,18 @@ func (t *TieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fin
|
||||
}
|
||||
fingerprintSet := map[model.Fingerprint]bool{}
|
||||
for _, fingerprint := range append(memFingerprints, diskFingerprints...) {
|
||||
fingerprintSet[fingerprint] = true
|
||||
fingerprintSet[*fingerprint] = true
|
||||
}
|
||||
for fingerprint := range fingerprintSet {
|
||||
fingerprints = append(fingerprints, fingerprint)
|
||||
fpCopy := fingerprint
|
||||
fingerprints = append(fingerprints, &fpCopy)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Get the metric associated with the provided fingerprint.
|
||||
func (t *TieredStorage) GetMetricForFingerprint(f model.Fingerprint) (m model.Metric, err error) {
|
||||
func (t *TieredStorage) GetMetricForFingerprint(f *model.Fingerprint) (m model.Metric, err error) {
|
||||
m, err = t.memoryArena.GetMetricForFingerprint(f)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -53,7 +53,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
|
||||
var (
|
||||
instant = time.Date(1984, 3, 30, 0, 0, 0, 0, time.Local)
|
||||
metric = model.Metric{model.MetricNameLabel: "request_count"}
|
||||
fingerprint = model.NewFingerprintFromMetric(metric)
|
||||
fingerprint = *model.NewFingerprintFromMetric(metric)
|
||||
scenarios = []struct {
|
||||
data []model.Sample
|
||||
in in
|
||||
@ -370,7 +370,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
|
||||
}
|
||||
|
||||
for j, atTime := range scenario.in.atTime {
|
||||
actual := v.GetValueAtTime(fingerprint, atTime.time)
|
||||
actual := v.GetValueAtTime(&fingerprint, atTime.time)
|
||||
|
||||
if len(actual) != len(scenario.out.atTime[j]) {
|
||||
t.Fatalf("%d.%d. expected %d output, got %d", i, j, len(scenario.out.atTime[j]), len(actual))
|
||||
|
@ -87,8 +87,9 @@ func (v viewRequestBuilder) ScanJobs() (j scanJobs) {
|
||||
for fingerprint, operations := range v.operations {
|
||||
sort.Sort(startsAtSort{operations})
|
||||
|
||||
fpCopy := fingerprint
|
||||
j = append(j, scanJob{
|
||||
fingerprint: fingerprint,
|
||||
fingerprint: &fpCopy,
|
||||
operations: optimize(operations),
|
||||
})
|
||||
|
||||
@ -104,7 +105,7 @@ type view struct {
|
||||
*memorySeriesStorage
|
||||
}
|
||||
|
||||
func (v view) appendSample(fingerprint model.Fingerprint, timestamp time.Time, value model.SampleValue) {
|
||||
func (v view) appendSample(fingerprint *model.Fingerprint, timestamp time.Time, value model.SampleValue) {
|
||||
v.memorySeriesStorage.appendSampleWithoutIndexing(fingerprint, timestamp, value)
|
||||
}
|
||||
|
||||
|
@ -144,17 +144,17 @@ func testBuilder(t test.Tester) {
|
||||
}
|
||||
|
||||
for _, atTime := range scenario.in.atTimes {
|
||||
fingerprint := model.NewFingerprintFromRowKey(atTime.fingerprint)
|
||||
fingerprint := *model.NewFingerprintFromRowKey(atTime.fingerprint)
|
||||
builder.GetMetricAtTime(fingerprint, atTime.time)
|
||||
}
|
||||
|
||||
for _, atInterval := range scenario.in.atIntervals {
|
||||
fingerprint := model.NewFingerprintFromRowKey(atInterval.fingerprint)
|
||||
fingerprint := *model.NewFingerprintFromRowKey(atInterval.fingerprint)
|
||||
builder.GetMetricAtInterval(fingerprint, atInterval.from, atInterval.through, atInterval.interval)
|
||||
}
|
||||
|
||||
for _, atRange := range scenario.in.atRanges {
|
||||
fingerprint := model.NewFingerprintFromRowKey(atRange.fingerprint)
|
||||
fingerprint := *model.NewFingerprintFromRowKey(atRange.fingerprint)
|
||||
builder.GetMetricRange(fingerprint, atRange.from, atRange.through)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user