Merge the parallel logic of getSeriesForRange and metricForFingerprint
This commit is contained in:
parent
9445c7053d
commit
e8c1f30ab2
|
@ -500,9 +500,11 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers(
|
||||||
|
|
||||||
result := map[model.Fingerprint]metric.Metric{}
|
result := map[model.Fingerprint]metric.Metric{}
|
||||||
for fp := range resFPs {
|
for fp := range resFPs {
|
||||||
if metric, ok := s.metricForFingerprint(fp, from, through); ok {
|
s.fpLocker.Lock(fp)
|
||||||
result[fp] = metric
|
if met, _, ok := s.metricForRange(fp, from, through); ok {
|
||||||
|
result[fp] = metric.Metric{Metric: met}
|
||||||
}
|
}
|
||||||
|
s.fpLocker.Unlock(fp)
|
||||||
}
|
}
|
||||||
for _, matcher := range filters {
|
for _, matcher := range filters {
|
||||||
for fp, met := range result {
|
for fp, met := range result {
|
||||||
|
@ -514,50 +516,46 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers(
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// metricForFingerprint returns the metric for the given fingerprint if the
|
// metricForRange returns the metric for the given fingerprint if the
|
||||||
// corresponding time series has samples between 'from' and 'through'.
|
// corresponding time series has samples between 'from' and 'through', together
|
||||||
func (s *memorySeriesStorage) metricForFingerprint(
|
// with a pointer to the series if it is in memory already. For a series that
|
||||||
|
// does not have samples between 'from' and 'through', the returned bool is
|
||||||
|
// false. For an archived series that does contain samples between 'from' and
|
||||||
|
// 'through', it returns (metric, nil, true).
|
||||||
|
//
|
||||||
|
// The caller must have locked the fp.
|
||||||
|
func (s *memorySeriesStorage) metricForRange(
|
||||||
fp model.Fingerprint,
|
fp model.Fingerprint,
|
||||||
from, through model.Time,
|
from, through model.Time,
|
||||||
) (metric.Metric, bool) {
|
) (model.Metric, *memorySeries, bool) {
|
||||||
s.fpLocker.Lock(fp)
|
|
||||||
defer s.fpLocker.Unlock(fp)
|
|
||||||
|
|
||||||
series, ok := s.fpToSeries.get(fp)
|
series, ok := s.fpToSeries.get(fp)
|
||||||
if ok {
|
if ok {
|
||||||
if series.lastTime.Before(from) || series.firstTime().After(through) {
|
if series.lastTime.Before(from) || series.firstTime().After(through) {
|
||||||
return metric.Metric{}, false
|
return nil, nil, false
|
||||||
}
|
}
|
||||||
// Wrap the returned metric in a copy-on-write (COW) metric here because
|
return series.metric, series, true
|
||||||
// the caller might mutate it.
|
|
||||||
return metric.Metric{
|
|
||||||
Metric: series.metric,
|
|
||||||
}, true
|
|
||||||
}
|
}
|
||||||
// From here on, we are only concerned with archived metrics.
|
// From here on, we are only concerned with archived metrics.
|
||||||
// If the high watermark of archived series is before 'from', we are done.
|
// If the high watermark of archived series is before 'from', we are done.
|
||||||
watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark)))
|
watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark)))
|
||||||
if watermark < from {
|
if watermark < from {
|
||||||
return metric.Metric{}, false
|
return nil, nil, false
|
||||||
}
|
}
|
||||||
if from.After(model.Earliest) || through.Before(model.Latest) {
|
if from.After(model.Earliest) || through.Before(model.Latest) {
|
||||||
// The range lookup is relatively cheap, so let's do it first if
|
// The range lookup is relatively cheap, so let's do it first if
|
||||||
// we have a chance the archived metric is not in the range.
|
// we have a chance the archived metric is not in the range.
|
||||||
has, first, last := s.persistence.hasArchivedMetric(fp)
|
has, first, last := s.persistence.hasArchivedMetric(fp)
|
||||||
if !has || first.After(through) || last.Before(from) {
|
if !has || first.After(through) || last.Before(from) {
|
||||||
return metric.Metric{}, false
|
return nil, nil, false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
met, _ := s.persistence.archivedMetric(fp) // Ignoring error, there is nothing we can do.
|
metric, err := s.persistence.archivedMetric(fp)
|
||||||
if met == nil {
|
if err != nil {
|
||||||
return metric.Metric{}, false
|
// archivedMetric has already flagged the storage as dirty in this case.
|
||||||
|
return nil, nil, false
|
||||||
}
|
}
|
||||||
|
return metric, nil, true
|
||||||
return metric.Metric{
|
|
||||||
Metric: met,
|
|
||||||
Copied: false,
|
|
||||||
}, true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// LabelValuesForLabelName implements Storage.
|
// LabelValuesForLabelName implements Storage.
|
||||||
|
@ -723,43 +721,20 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
|
||||||
return series, nil
|
return series, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getSeriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant.
|
// seriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant.
|
||||||
//
|
//
|
||||||
// The caller must have locked the fp.
|
// The caller must have locked the fp.
|
||||||
func (s *memorySeriesStorage) getSeriesForRange(
|
func (s *memorySeriesStorage) seriesForRange(
|
||||||
fp model.Fingerprint,
|
fp model.Fingerprint,
|
||||||
from model.Time, through model.Time,
|
from model.Time, through model.Time,
|
||||||
) *memorySeries {
|
) *memorySeries {
|
||||||
series, ok := s.fpToSeries.get(fp)
|
metric, series, ok := s.metricForRange(fp, from, through)
|
||||||
if ok {
|
if !ok {
|
||||||
if series.lastTime.Before(from) || series.firstTime().After(through) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return series
|
|
||||||
}
|
|
||||||
|
|
||||||
watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark)))
|
|
||||||
if watermark < from {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if series == nil {
|
||||||
has, first, last := s.persistence.hasArchivedMetric(fp)
|
series, _ = s.getOrCreateSeries(fp, metric)
|
||||||
if !has {
|
// getOrCreateSeries took care of quarantining already, so ignore the error.
|
||||||
s.invalidPreloadRequestsCount.Inc()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if last.Before(from) || first.After(through) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
metric, err := s.persistence.archivedMetric(fp)
|
|
||||||
if err != nil {
|
|
||||||
// Error already logged, storage declared dirty by archivedMetric.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
series, err = s.getOrCreateSeries(fp, metric)
|
|
||||||
if err != nil {
|
|
||||||
// getOrCreateSeries took care of quarantining already.
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
return series
|
return series
|
||||||
}
|
}
|
||||||
|
@ -771,7 +746,7 @@ func (s *memorySeriesStorage) preloadChunksForRange(
|
||||||
s.fpLocker.Lock(fp)
|
s.fpLocker.Lock(fp)
|
||||||
defer s.fpLocker.Unlock(fp)
|
defer s.fpLocker.Unlock(fp)
|
||||||
|
|
||||||
series := s.getSeriesForRange(fp, from, through)
|
series := s.seriesForRange(fp, from, through)
|
||||||
if series == nil {
|
if series == nil {
|
||||||
return nil, nopIter
|
return nil, nopIter
|
||||||
}
|
}
|
||||||
|
@ -790,7 +765,7 @@ func (s *memorySeriesStorage) preloadChunksForInstant(
|
||||||
s.fpLocker.Lock(fp)
|
s.fpLocker.Lock(fp)
|
||||||
defer s.fpLocker.Unlock(fp)
|
defer s.fpLocker.Unlock(fp)
|
||||||
|
|
||||||
series := s.getSeriesForRange(fp, from, through)
|
series := s.seriesForRange(fp, from, through)
|
||||||
if series == nil {
|
if series == nil {
|
||||||
return nil, nopIter
|
return nil, nopIter
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package local
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
"github.com/prometheus/prometheus/util/testutil"
|
"github.com/prometheus/prometheus/util/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -51,6 +52,7 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage,
|
||||||
SyncStrategy: Adaptive,
|
SyncStrategy: Adaptive,
|
||||||
}
|
}
|
||||||
storage := NewMemorySeriesStorage(o)
|
storage := NewMemorySeriesStorage(o)
|
||||||
|
storage.(*memorySeriesStorage).archiveHighWatermark = model.Latest
|
||||||
if err := storage.Start(); err != nil {
|
if err := storage.Start(); err != nil {
|
||||||
directory.Close()
|
directory.Close()
|
||||||
t.Fatalf("Error creating storage: %s", err)
|
t.Fatalf("Error creating storage: %s", err)
|
||||||
|
|
Loading…
Reference in New Issue