Cache disk fingerprint->metric lookups in memory.
This commit is contained in:
parent
55cb835867
commit
71199e2c93
|
@ -175,14 +175,28 @@ func (s *memorySeriesStorage) AppendSample(sample model.Sample) error {
|
|||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
metric := sample.Metric
|
||||
fingerprint := model.NewFingerprintFromMetric(metric)
|
||||
series, ok := s.fingerprintToSeries[*fingerprint]
|
||||
fingerprint := model.NewFingerprintFromMetric(sample.Metric)
|
||||
series := s.getOrCreateSeries(sample.Metric, fingerprint)
|
||||
series.add(sample.Timestamp, sample.Value)
|
||||
|
||||
if s.wmCache != nil {
|
||||
s.wmCache.Set(fingerprint, &Watermarks{High: sample.Timestamp})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) CreateEmptySeries(metric model.Metric) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
fingerprint := model.NewFingerprintFromMetric(metric)
|
||||
s.getOrCreateSeries(metric, fingerprint)
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) getOrCreateSeries(metric model.Metric, fingerprint *model.Fingerprint) *stream {
|
||||
series, ok := s.fingerprintToSeries[*fingerprint]
|
||||
|
||||
if !ok {
|
||||
series = newStream(metric)
|
||||
s.fingerprintToSeries[*fingerprint] = series
|
||||
|
@ -201,10 +215,7 @@ func (s *memorySeriesStorage) AppendSample(sample model.Sample) error {
|
|||
s.labelNameToFingerprints[k] = labelNameValues
|
||||
}
|
||||
}
|
||||
|
||||
series.add(sample.Timestamp, sample.Value)
|
||||
|
||||
return nil
|
||||
return series
|
||||
}
|
||||
|
||||
// Append raw samples, bypassing indexing. Only used to add data to views,
|
||||
|
|
|
@ -666,6 +666,7 @@ func (t *TieredStorage) GetMetricForFingerprint(f *model.Fingerprint) (model.Met
|
|||
}
|
||||
if m == nil {
|
||||
m, err = t.DiskStorage.GetMetricForFingerprint(f)
|
||||
t.memoryArena.CreateEmptySeries(m)
|
||||
}
|
||||
return m, err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue