prometheus/storage/metric/memory.go

355 lines
8.3 KiB
Go
Raw Normal View History

2013-02-08 17:03:26 +00:00
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/utility"
2013-05-21 16:12:02 +00:00
"sort"
"sync"
2013-02-08 17:03:26 +00:00
"time"
)
2013-05-21 16:12:02 +00:00
const (
// Assuming sample rate of 1 / 15Hz, this allows for one hour's worth of
// storage per metric without any major reallocations.
2013-05-22 11:42:44 +00:00
initialSeriesArenaSize = 4 * 60
2013-05-21 16:12:02 +00:00
)
2013-02-08 17:03:26 +00:00
// Models a given sample entry stored in the in-memory arena.
type value interface {
// Gets the given value.
get() model.SampleValue
}
// Models a single sample value. It presumes that there is either no subsequent
// value seen or that any subsequent values are of a different value.
type singletonValue model.SampleValue
func (v singletonValue) get() model.SampleValue {
return model.SampleValue(v)
}
type stream struct {
2013-05-21 16:12:02 +00:00
sync.RWMutex
2013-02-08 17:03:26 +00:00
metric model.Metric
2013-05-21 16:12:02 +00:00
values model.Values
2013-02-08 17:03:26 +00:00
}
func (s *stream) add(timestamp time.Time, value model.SampleValue) {
2013-05-21 16:12:02 +00:00
s.Lock()
defer s.Unlock()
// BUG(all): https://github.com/prometheus/prometheus/pull/265/files#r4336435.
2013-05-21 16:12:02 +00:00
s.values = append(s.values, model.SamplePair{
Timestamp: timestamp,
Value: value,
})
2013-02-08 17:03:26 +00:00
}
2013-05-21 16:12:02 +00:00
func (s *stream) clone() model.Values {
s.RLock()
defer s.RUnlock()
// BUG(all): Examine COW technique.
2013-05-21 16:12:02 +00:00
clone := make(model.Values, len(s.values))
copy(clone, s.values)
2013-03-07 01:16:39 +00:00
2013-05-21 16:12:02 +00:00
return clone
}
2013-02-08 17:03:26 +00:00
2013-05-21 16:12:02 +00:00
func (s *stream) getValueAtTime(t time.Time) model.Values {
s.RLock()
defer s.RUnlock()
// BUG(all): May be avenues for simplification.
2013-05-21 16:12:02 +00:00
l := len(s.values)
switch l {
case 0:
return model.Values{}
case 1:
return model.Values{s.values[0]}
default:
index := sort.Search(l, func(i int) bool {
return !s.values[i].Timestamp.Before(t)
})
if index == 0 {
return model.Values{s.values[0]}
2013-02-08 17:03:26 +00:00
}
2013-05-21 16:12:02 +00:00
if index == l {
return model.Values{s.values[l-1]}
2013-02-08 17:03:26 +00:00
}
2013-05-21 16:12:02 +00:00
if s.values[index].Timestamp.Equal(t) {
return model.Values{s.values[index]}
2013-03-07 01:16:39 +00:00
}
2013-05-21 16:12:02 +00:00
return model.Values{s.values[index-1], s.values[index]}
2013-02-08 17:03:26 +00:00
}
2013-05-21 16:12:02 +00:00
}
func (s *stream) getBoundaryValues(in model.Interval) model.Values {
s.RLock()
defer s.RUnlock()
oldest := sort.Search(len(s.values), func(i int) bool {
return !s.values[i].Timestamp.Before(in.OldestInclusive)
})
newest := sort.Search(len(s.values), func(i int) bool {
return s.values[i].Timestamp.After(in.NewestInclusive)
})
resultRange := s.values[oldest:newest]
switch len(resultRange) {
case 0:
return model.Values{}
case 1:
return model.Values{resultRange[0]}
default:
return model.Values{resultRange[0], resultRange[len(resultRange)-1]}
}
2013-05-21 16:12:02 +00:00
}
func (s *stream) getRangeValues(in model.Interval) model.Values {
s.RLock()
defer s.RUnlock()
oldest := sort.Search(len(s.values), func(i int) bool {
return !s.values[i].Timestamp.Before(in.OldestInclusive)
})
newest := sort.Search(len(s.values), func(i int) bool {
return s.values[i].Timestamp.After(in.NewestInclusive)
})
result := make(model.Values, newest-oldest)
copy(result, s.values[oldest:newest])
return result
2013-02-08 17:03:26 +00:00
}
func newStream(metric model.Metric) *stream {
return &stream{
2013-02-08 17:03:26 +00:00
metric: metric,
2013-05-22 11:42:44 +00:00
values: make(model.Values, 0, initialSeriesArenaSize),
2013-02-08 17:03:26 +00:00
}
}
type memorySeriesStorage struct {
2013-05-21 16:12:02 +00:00
sync.RWMutex
fingerprintToSeries map[model.Fingerprint]*stream
labelPairToFingerprints map[model.LabelPair]model.Fingerprints
2013-02-08 17:03:26 +00:00
labelNameToFingerprints map[model.LabelName]model.Fingerprints
}
2013-05-16 14:02:07 +00:00
func (s *memorySeriesStorage) AppendSamples(samples model.Samples) error {
2013-02-08 17:03:26 +00:00
for _, sample := range samples {
s.AppendSample(sample)
}
2013-05-16 14:02:07 +00:00
return nil
2013-02-08 17:03:26 +00:00
}
2013-05-16 14:02:07 +00:00
func (s *memorySeriesStorage) AppendSample(sample model.Sample) error {
metric := sample.Metric
fingerprint := model.NewFingerprintFromMetric(metric)
2013-05-21 16:12:02 +00:00
s.RLock()
2013-05-17 10:58:15 +00:00
series, ok := s.fingerprintToSeries[*fingerprint]
2013-05-21 16:12:02 +00:00
s.RUnlock()
2013-02-08 17:03:26 +00:00
if !ok {
series = newStream(metric)
2013-05-21 16:12:02 +00:00
s.Lock()
2013-05-17 10:58:15 +00:00
s.fingerprintToSeries[*fingerprint] = series
2013-02-08 17:03:26 +00:00
for k, v := range metric {
labelPair := model.LabelPair{
Name: k,
Value: v,
}
2013-02-08 17:03:26 +00:00
labelPairValues := s.labelPairToFingerprints[labelPair]
labelPairValues = append(labelPairValues, fingerprint)
s.labelPairToFingerprints[labelPair] = labelPairValues
labelNameValues := s.labelNameToFingerprints[k]
labelNameValues = append(labelNameValues, fingerprint)
s.labelNameToFingerprints[k] = labelNameValues
}
s.Unlock()
2013-02-08 17:03:26 +00:00
}
2013-03-07 01:16:39 +00:00
series.add(sample.Timestamp, sample.Value)
2013-02-08 17:03:26 +00:00
2013-05-16 14:02:07 +00:00
return nil
2013-02-08 17:03:26 +00:00
}
// Append raw sample, bypassing indexing. Only used to add data to views, which
// don't need to lookup by metric.
func (s *memorySeriesStorage) appendSampleWithoutIndexing(f *model.Fingerprint, timestamp time.Time, value model.SampleValue) {
2013-05-21 16:12:02 +00:00
s.RLock()
series, ok := s.fingerprintToSeries[*f]
2013-05-21 16:12:02 +00:00
s.RUnlock()
if !ok {
series = newStream(model.Metric{})
2013-05-21 16:12:02 +00:00
s.Lock()
s.fingerprintToSeries[*f] = series
2013-05-21 16:12:02 +00:00
s.Unlock()
}
series.add(timestamp, value)
}
2013-05-16 14:02:07 +00:00
func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fingerprints model.Fingerprints, err error) {
2013-02-08 17:03:26 +00:00
sets := []utility.Set{}
2013-05-21 16:12:02 +00:00
s.RLock()
2013-02-08 17:03:26 +00:00
for k, v := range l {
values := s.labelPairToFingerprints[model.LabelPair{
Name: k,
Value: v,
}]
2013-02-08 17:03:26 +00:00
set := utility.Set{}
for _, fingerprint := range values {
2013-05-17 10:58:15 +00:00
set.Add(*fingerprint)
2013-02-08 17:03:26 +00:00
}
sets = append(sets, set)
}
2013-05-21 16:12:02 +00:00
s.RUnlock()
2013-02-08 17:03:26 +00:00
setCount := len(sets)
if setCount == 0 {
return fingerprints, nil
2013-02-08 17:03:26 +00:00
}
base := sets[0]
for i := 1; i < setCount; i++ {
base = base.Intersection(sets[i])
}
for _, e := range base.Elements() {
fingerprint := e.(model.Fingerprint)
2013-05-17 10:58:15 +00:00
fingerprints = append(fingerprints, &fingerprint)
2013-02-08 17:03:26 +00:00
}
return fingerprints, nil
2013-02-08 17:03:26 +00:00
}
2013-05-21 16:12:02 +00:00
func (s *memorySeriesStorage) GetFingerprintsForLabelName(l model.LabelName) (model.Fingerprints, error) {
s.RLock()
2013-05-22 12:23:35 +00:00
defer s.RUnlock()
2013-05-21 16:12:02 +00:00
values, ok := s.labelNameToFingerprints[l]
if !ok {
return nil, nil
}
2013-02-08 17:03:26 +00:00
2013-05-21 16:12:02 +00:00
fingerprints := make(model.Fingerprints, len(values))
copy(fingerprints, values)
2013-02-08 17:03:26 +00:00
return fingerprints, nil
2013-02-08 17:03:26 +00:00
}
2013-05-17 10:58:15 +00:00
func (s *memorySeriesStorage) GetMetricForFingerprint(f *model.Fingerprint) (model.Metric, error) {
2013-05-21 16:12:02 +00:00
s.RLock()
2013-05-17 10:58:15 +00:00
series, ok := s.fingerprintToSeries[*f]
2013-05-21 16:12:02 +00:00
s.RUnlock()
2013-02-08 17:03:26 +00:00
if !ok {
return nil, nil
2013-02-08 17:03:26 +00:00
}
metric := model.Metric{}
for label, value := range series.metric {
metric[label] = value
}
2013-02-08 17:03:26 +00:00
return metric, nil
2013-02-08 17:03:26 +00:00
}
2013-05-21 16:12:02 +00:00
func (s *memorySeriesStorage) CloneSamples(f *model.Fingerprint) model.Values {
s.RLock()
2013-05-17 10:58:15 +00:00
series, ok := s.fingerprintToSeries[*f]
2013-05-21 16:12:02 +00:00
s.RUnlock()
2013-02-08 17:03:26 +00:00
if !ok {
2013-05-21 16:12:02 +00:00
return nil
2013-02-08 17:03:26 +00:00
}
2013-05-21 16:12:02 +00:00
return series.clone()
}
2013-05-21 16:12:02 +00:00
func (s *memorySeriesStorage) GetValueAtTime(f *model.Fingerprint, t time.Time) model.Values {
s.RLock()
series, ok := s.fingerprintToSeries[*f]
s.RUnlock()
if !ok {
return nil
2013-02-08 17:03:26 +00:00
}
2013-05-21 16:12:02 +00:00
return series.getValueAtTime(t)
2013-02-08 17:03:26 +00:00
}
func (s *memorySeriesStorage) GetBoundaryValues(f *model.Fingerprint, i model.Interval) model.Values {
2013-05-21 16:12:02 +00:00
s.RLock()
2013-05-17 10:58:15 +00:00
series, ok := s.fingerprintToSeries[*f]
2013-05-21 16:12:02 +00:00
s.RUnlock()
2013-02-08 17:03:26 +00:00
if !ok {
return nil
2013-02-08 17:03:26 +00:00
}
2013-05-21 16:12:02 +00:00
return series.getBoundaryValues(i)
}
2013-02-08 17:03:26 +00:00
2013-05-21 16:12:02 +00:00
func (s *memorySeriesStorage) GetRangeValues(f *model.Fingerprint, i model.Interval) model.Values {
s.RLock()
series, ok := s.fingerprintToSeries[*f]
s.RUnlock()
2013-02-08 17:03:26 +00:00
2013-05-21 16:12:02 +00:00
if !ok {
return nil
2013-02-08 17:03:26 +00:00
}
2013-05-21 16:12:02 +00:00
return series.getRangeValues(i)
2013-02-08 17:03:26 +00:00
}
2013-05-20 18:31:58 +00:00
func (s *memorySeriesStorage) Close() {
s.fingerprintToSeries = map[model.Fingerprint]*stream{}
s.labelPairToFingerprints = map[model.LabelPair]model.Fingerprints{}
s.labelNameToFingerprints = map[model.LabelName]model.Fingerprints{}
2013-02-08 17:03:26 +00:00
}
2013-05-16 14:02:07 +00:00
func (s *memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) {
valueSet := map[model.LabelValue]bool{}
for _, series := range s.fingerprintToSeries {
if value, ok := series.metric[labelName]; ok {
2013-03-26 13:46:02 +00:00
if !valueSet[value] {
values = append(values, value)
valueSet[value] = true
}
}
}
return
2013-02-08 17:03:26 +00:00
}
2013-05-20 18:31:58 +00:00
func NewMemorySeriesStorage() *memorySeriesStorage {
return &memorySeriesStorage{
fingerprintToSeries: make(map[model.Fingerprint]*stream),
labelPairToFingerprints: make(map[model.LabelPair]model.Fingerprints),
2013-02-08 17:03:26 +00:00
labelNameToFingerprints: make(map[model.LabelName]model.Fingerprints),
}
}