Add an indexing queue and batch the ops.

Some other improvements on the way, in particular codec -> codable
renaming and addition of LookupSet methods.

Change-Id: I978f8f3f84ca8e4d39a9d9f152ae0ad274bbf4e2
This commit is contained in:
Bjoern Rabenstein 2014-09-23 19:21:10 +02:00
parent 71206dbc06
commit c7aad110fb
8 changed files with 534 additions and 441 deletions

View File

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// Package codec provides types that implement encoding.BinaryMarshaler and
// Package codable provides types that implement encoding.BinaryMarshaler and
// encoding.BinaryUnmarshaler and functions that help to encode and decode
// primitives. The Prometheus storage backend uses them to persist objects to
// files and to save objects in LevelDB.
@ -28,11 +28,10 @@
//
// Maps are encoded as the number of mappings as a varint, followed by the
// mappings, each of which consists of the key followed by the value.
package codec
package codable
import (
"bytes"
"encoding"
"encoding/binary"
"fmt"
"io"
@ -43,14 +42,6 @@ import (
"github.com/prometheus/prometheus/storage/metric"
)
// codable implements both, encoding.BinaryMarshaler and
// encoding.BinaryUnmarshaler, which is only needed internally and therefore not
// exported for now.
type codable interface {
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
}
// A byteReader is an io.ByteReader that also implements the vanilla io.Reader
// interface.
type byteReader interface {
@ -145,12 +136,12 @@ func decodeString(b byteReader) (string, error) {
return string(buf), nil
}
// A CodableMetric is a clientmodel.Metric that implements
// A Metric is a clientmodel.Metric that implements
// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler.
type CodableMetric clientmodel.Metric
type Metric clientmodel.Metric
// MarshalBinary implements encoding.BinaryMarshaler.
func (m CodableMetric) MarshalBinary() ([]byte, error) {
func (m Metric) MarshalBinary() ([]byte, error) {
buf := &bytes.Buffer{}
if err := EncodeVarint(buf, int64(len(m))); err != nil {
return nil, err
@ -167,20 +158,20 @@ func (m CodableMetric) MarshalBinary() ([]byte, error) {
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler. It can be used with the
// zero value of CodableMetric.
func (m *CodableMetric) UnmarshalBinary(buf []byte) error {
// zero value of Metric.
func (m *Metric) UnmarshalBinary(buf []byte) error {
return m.UnmarshalFromReader(bytes.NewReader(buf))
}
// UnmarshalFromReader unmarshals a CodableMetric from a reader that implements
// UnmarshalFromReader unmarshals a Metric from a reader that implements
// both, io.Reader and io.ByteReader. It can be used with the zero value of
// CodableMetric.
func (m *CodableMetric) UnmarshalFromReader(r byteReader) error {
// Metric.
func (m *Metric) UnmarshalFromReader(r byteReader) error {
numLabelPairs, err := binary.ReadVarint(r)
if err != nil {
return err
}
*m = make(CodableMetric, numLabelPairs)
*m = make(Metric, numLabelPairs)
for ; numLabelPairs > 0; numLabelPairs-- {
ln, err := decodeString(r)
@ -196,31 +187,64 @@ func (m *CodableMetric) UnmarshalFromReader(r byteReader) error {
return nil
}
// A CodableFingerprint is a clientmodel.Fingerprint that implements
// A Fingerprint is a clientmodel.Fingerprint that implements
// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. The implementation
// depends on clientmodel.Fingerprint to be convertible to uint64. It encodes
// the fingerprint as a big-endian uint64.
type CodableFingerprint clientmodel.Fingerprint
type Fingerprint clientmodel.Fingerprint
// MarshalBinary implements encoding.BinaryMarshaler.
func (fp CodableFingerprint) MarshalBinary() ([]byte, error) {
func (fp Fingerprint) MarshalBinary() ([]byte, error) {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(fp))
return b, nil
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (fp *CodableFingerprint) UnmarshalBinary(buf []byte) error {
*fp = CodableFingerprint(binary.BigEndian.Uint64(buf))
func (fp *Fingerprint) UnmarshalBinary(buf []byte) error {
*fp = Fingerprint(binary.BigEndian.Uint64(buf))
return nil
}
// CodableFingerprints is a clientmodel.Fingerprints that implements
// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler.
type CodableFingerprints clientmodel.Fingerprints
// FingerprintSet is a map[clientmodel.Fingerprint]struct{} that
// implements encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. Its
// binary form is identical to that of Fingerprints.
type FingerprintSet map[clientmodel.Fingerprint]struct{}
// MarshalBinary implements encoding.BinaryMarshaler.
func (fps CodableFingerprints) MarshalBinary() ([]byte, error) {
func (fps FingerprintSet) MarshalBinary() ([]byte, error) {
b := make([]byte, binary.MaxVarintLen64+len(fps)*8)
lenBytes := binary.PutVarint(b, int64(len(fps)))
offset := lenBytes
for fp := range fps {
binary.BigEndian.PutUint64(b[offset:], uint64(fp))
offset += 8
}
return b[:len(fps)*8+lenBytes], nil
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (fps *FingerprintSet) UnmarshalBinary(buf []byte) error {
numFPs, offset := binary.Varint(buf)
if offset <= 0 {
return fmt.Errorf("could not decode length of Fingerprints, varint decoding returned %d", offset)
}
*fps = make(FingerprintSet, numFPs)
for i := 0; i < int(numFPs); i++ {
(*fps)[clientmodel.Fingerprint(binary.BigEndian.Uint64(buf[offset+i*8:]))] = struct{}{}
}
return nil
}
// Fingerprints is a clientmodel.Fingerprints that implements
// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. Its binary form is
// identical to that of FingerprintSet.
type Fingerprints clientmodel.Fingerprints
// MarshalBinary implements encoding.BinaryMarshaler.
func (fps Fingerprints) MarshalBinary() ([]byte, error) {
b := make([]byte, binary.MaxVarintLen64+len(fps)*8)
lenBytes := binary.PutVarint(b, int64(len(fps)))
@ -231,12 +255,12 @@ func (fps CodableFingerprints) MarshalBinary() ([]byte, error) {
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (fps *CodableFingerprints) UnmarshalBinary(buf []byte) error {
func (fps *Fingerprints) UnmarshalBinary(buf []byte) error {
numFPs, offset := binary.Varint(buf)
if offset <= 0 {
return fmt.Errorf("could not decode length of CodableFingerprints, varint decoding returned %d", offset)
return fmt.Errorf("could not decode length of Fingerprints, varint decoding returned %d", offset)
}
*fps = make(CodableFingerprints, numFPs)
*fps = make(Fingerprints, numFPs)
for i := range *fps {
(*fps)[i] = clientmodel.Fingerprint(binary.BigEndian.Uint64(buf[offset+i*8:]))
@ -244,12 +268,12 @@ func (fps *CodableFingerprints) UnmarshalBinary(buf []byte) error {
return nil
}
// CodableLabelPair is a metric.LabelPair that implements
// LabelPair is a metric.LabelPair that implements
// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler.
type CodableLabelPair metric.LabelPair
type LabelPair metric.LabelPair
// MarshalBinary implements encoding.BinaryMarshaler.
func (lp CodableLabelPair) MarshalBinary() ([]byte, error) {
func (lp LabelPair) MarshalBinary() ([]byte, error) {
buf := &bytes.Buffer{}
if err := encodeString(buf, string(lp.Name)); err != nil {
return nil, err
@ -261,7 +285,7 @@ func (lp CodableLabelPair) MarshalBinary() ([]byte, error) {
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (lp *CodableLabelPair) UnmarshalBinary(buf []byte) error {
func (lp *LabelPair) UnmarshalBinary(buf []byte) error {
r := bytes.NewReader(buf)
n, err := decodeString(r)
if err != nil {
@ -276,12 +300,12 @@ func (lp *CodableLabelPair) UnmarshalBinary(buf []byte) error {
return nil
}
// CodableLabelName is a clientmodel.LabelName that implements
// LabelName is a clientmodel.LabelName that implements
// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler.
type CodableLabelName clientmodel.LabelName
type LabelName clientmodel.LabelName
// MarshalBinary implements encoding.BinaryMarshaler.
func (l CodableLabelName) MarshalBinary() ([]byte, error) {
func (l LabelName) MarshalBinary() ([]byte, error) {
buf := &bytes.Buffer{}
if err := encodeString(buf, string(l)); err != nil {
return nil, err
@ -290,22 +314,61 @@ func (l CodableLabelName) MarshalBinary() ([]byte, error) {
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (l *CodableLabelName) UnmarshalBinary(buf []byte) error {
func (l *LabelName) UnmarshalBinary(buf []byte) error {
r := bytes.NewReader(buf)
n, err := decodeString(r)
if err != nil {
return err
}
*l = CodableLabelName(n)
*l = LabelName(n)
return nil
}
// CodableLabelValues is a clientmodel.LabelValues that implements
// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler.
type CodableLabelValues clientmodel.LabelValues
// LabelValueSet is a map[clientmodel.LabelValue]struct{} that implements
// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. Its binary form is
// identical to that of LabelValues.
type LabelValueSet map[clientmodel.LabelValue]struct{}
// MarshalBinary implements encoding.BinaryMarshaler.
func (vs CodableLabelValues) MarshalBinary() ([]byte, error) {
func (vs LabelValueSet) MarshalBinary() ([]byte, error) {
buf := &bytes.Buffer{}
if err := EncodeVarint(buf, int64(len(vs))); err != nil {
return nil, err
}
for v := range vs {
if err := encodeString(buf, string(v)); err != nil {
return nil, err
}
}
return buf.Bytes(), nil
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (vs *LabelValueSet) UnmarshalBinary(buf []byte) error {
r := bytes.NewReader(buf)
numValues, err := binary.ReadVarint(r)
if err != nil {
return err
}
*vs = make(LabelValueSet, numValues)
for i := int64(0); i < numValues; i++ {
v, err := decodeString(r)
if err != nil {
return err
}
(*vs)[clientmodel.LabelValue(v)] = struct{}{}
}
return nil
}
// LabelValues is a clientmodel.LabelValues that implements
// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. Its binary form is
// identical to that of LabelValueSet.
type LabelValues clientmodel.LabelValues
// MarshalBinary implements encoding.BinaryMarshaler.
func (vs LabelValues) MarshalBinary() ([]byte, error) {
buf := &bytes.Buffer{}
if err := EncodeVarint(buf, int64(len(vs))); err != nil {
return nil, err
@ -319,13 +382,13 @@ func (vs CodableLabelValues) MarshalBinary() ([]byte, error) {
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (vs *CodableLabelValues) UnmarshalBinary(buf []byte) error {
func (vs *LabelValues) UnmarshalBinary(buf []byte) error {
r := bytes.NewReader(buf)
numValues, err := binary.ReadVarint(r)
if err != nil {
return err
}
*vs = make(CodableLabelValues, numValues)
*vs = make(LabelValues, numValues)
for i := range *vs {
v, err := decodeString(r)
@ -337,14 +400,14 @@ func (vs *CodableLabelValues) UnmarshalBinary(buf []byte) error {
return nil
}
// CodableTimeRange is used to define a time range and implements
// TimeRange is used to define a time range and implements
// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler.
type CodableTimeRange struct {
type TimeRange struct {
First, Last clientmodel.Timestamp
}
// MarshalBinary implements encoding.BinaryMarshaler.
func (tr CodableTimeRange) MarshalBinary() ([]byte, error) {
func (tr TimeRange) MarshalBinary() ([]byte, error) {
buf := &bytes.Buffer{}
if err := EncodeVarint(buf, int64(tr.First)); err != nil {
return nil, err
@ -356,7 +419,7 @@ func (tr CodableTimeRange) MarshalBinary() ([]byte, error) {
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (tr *CodableTimeRange) UnmarshalBinary(buf []byte) error {
func (tr *TimeRange) UnmarshalBinary(buf []byte) error {
r := bytes.NewReader(buf)
first, err := binary.ReadVarint(r)
if err != nil {

View File

@ -0,0 +1,149 @@
// Copyright 2014 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package codable
import (
"encoding"
"reflect"
"testing"
)
func newFingerprint(fp int64) *Fingerprint {
cfp := Fingerprint(fp)
return &cfp
}
func newLabelName(ln string) *LabelName {
cln := LabelName(ln)
return &cln
}
func TestCodec(t *testing.T) {
scenarios := []struct {
in encoding.BinaryMarshaler
out encoding.BinaryUnmarshaler
equal func(in, out interface{}) bool
}{
{
in: &Metric{
"label_1": "value_2",
"label_2": "value_2",
"label_3": "value_3",
},
out: &Metric{},
}, {
in: newFingerprint(12345),
out: newFingerprint(0),
}, {
in: &Fingerprints{1, 2, 56, 1234},
out: &Fingerprints{},
}, {
in: &Fingerprints{1, 2, 56, 1234},
out: &FingerprintSet{},
equal: func(in, out interface{}) bool {
inSet := FingerprintSet{}
for _, fp := range *(in.(*Fingerprints)) {
inSet[fp] = struct{}{}
}
return reflect.DeepEqual(inSet, *(out.(*FingerprintSet)))
},
}, {
in: &FingerprintSet{
1: struct{}{},
2: struct{}{},
56: struct{}{},
1234: struct{}{},
},
out: &FingerprintSet{},
}, {
in: &FingerprintSet{
1: struct{}{},
2: struct{}{},
56: struct{}{},
1234: struct{}{},
},
out: &Fingerprints{},
equal: func(in, out interface{}) bool {
outSet := FingerprintSet{}
for _, fp := range *(out.(*Fingerprints)) {
outSet[fp] = struct{}{}
}
return reflect.DeepEqual(outSet, *(in.(*FingerprintSet)))
},
}, {
in: &LabelPair{
Name: "label_name",
Value: "label_value",
},
out: &LabelPair{},
}, {
in: newLabelName("label_name"),
out: newLabelName(""),
}, {
in: &LabelValues{"value_1", "value_2", "value_3"},
out: &LabelValues{},
}, {
in: &LabelValues{"value_1", "value_2", "value_3"},
out: &LabelValueSet{},
equal: func(in, out interface{}) bool {
inSet := LabelValueSet{}
for _, lv := range *(in.(*LabelValues)) {
inSet[lv] = struct{}{}
}
return reflect.DeepEqual(inSet, *(out.(*LabelValueSet)))
},
}, {
in: &LabelValueSet{
"value_1": struct{}{},
"value_2": struct{}{},
"value_3": struct{}{},
},
out: &LabelValueSet{},
}, {
in: &LabelValueSet{
"value_1": struct{}{},
"value_2": struct{}{},
"value_3": struct{}{},
},
out: &LabelValues{},
equal: func(in, out interface{}) bool {
outSet := LabelValueSet{}
for _, lv := range *(out.(*LabelValues)) {
outSet[lv] = struct{}{}
}
return reflect.DeepEqual(outSet, *(in.(*LabelValueSet)))
},
}, {
in: &TimeRange{42, 2001},
out: &TimeRange{},
},
}
for i, s := range scenarios {
encoded, err := s.in.MarshalBinary()
if err != nil {
t.Fatal(err)
}
if err := s.out.UnmarshalBinary(encoded); err != nil {
t.Fatal(err)
}
equal := s.equal
if equal == nil {
equal = reflect.DeepEqual
}
if !equal(s.in, s.out) {
t.Errorf("%d. Got: %v; want %v; encoded bytes are: %v", i, s.out, s.in, encoded)
}
}
}

View File

@ -1,130 +0,0 @@
// Copyright 2014 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package codec
import (
"testing"
clientmodel "github.com/prometheus/client_golang/model"
)
func newCodableFingerprint(fp int64) *CodableFingerprint {
cfp := CodableFingerprint(fp)
return &cfp
}
func newCodableLabelName(ln string) *CodableLabelName {
cln := CodableLabelName(ln)
return &cln
}
func TestCodec(t *testing.T) {
scenarios := []struct {
in codable
out codable
equal func(in, out codable) bool
}{
{
in: &CodableMetric{
"label_1": "value_2",
"label_2": "value_2",
"label_3": "value_3",
},
out: &CodableMetric{},
equal: func(in, out codable) bool {
m1 := (*clientmodel.Metric)(in.(*CodableMetric))
m2 := (*clientmodel.Metric)(out.(*CodableMetric))
return m1.Equal(*m2)
},
}, {
in: newCodableFingerprint(12345),
out: newCodableFingerprint(0),
equal: func(in, out codable) bool {
return *in.(*CodableFingerprint) == *out.(*CodableFingerprint)
},
}, {
in: &CodableFingerprints{1, 2, 56, 1234},
out: &CodableFingerprints{},
equal: func(in, out codable) bool {
fps1 := *in.(*CodableFingerprints)
fps2 := *out.(*CodableFingerprints)
if len(fps1) != len(fps2) {
return false
}
for i := range fps1 {
if fps1[i] != fps2[i] {
return false
}
}
return true
},
}, {
in: &CodableLabelPair{
Name: "label_name",
Value: "label_value",
},
out: &CodableLabelPair{},
equal: func(in, out codable) bool {
lp1 := *in.(*CodableLabelPair)
lp2 := *out.(*CodableLabelPair)
return lp1 == lp2
},
}, {
in: newCodableLabelName("label_name"),
out: newCodableLabelName(""),
equal: func(in, out codable) bool {
ln1 := *in.(*CodableLabelName)
ln2 := *out.(*CodableLabelName)
return ln1 == ln2
},
}, {
in: &CodableLabelValues{"value_1", "value_2", "value_3"},
out: &CodableLabelValues{},
equal: func(in, out codable) bool {
lvs1 := *in.(*CodableLabelValues)
lvs2 := *out.(*CodableLabelValues)
if len(lvs1) != len(lvs2) {
return false
}
for i := range lvs1 {
if lvs1[i] != lvs2[i] {
return false
}
}
return true
},
}, {
in: &CodableTimeRange{42, 2001},
out: &CodableTimeRange{},
equal: func(in, out codable) bool {
ln1 := *in.(*CodableTimeRange)
ln2 := *out.(*CodableTimeRange)
return ln1 == ln2
},
},
}
for i, s := range scenarios {
encoded, err := s.in.MarshalBinary()
if err != nil {
t.Fatal(err)
}
if err := s.out.UnmarshalBinary(encoded); err != nil {
t.Fatal(err)
}
if !s.equal(s.in, s.out) {
t.Errorf("%d. Got: %v; want %v; encoded bytes are: %v", i, s.out, s.in, encoded)
}
}
}

View File

@ -22,9 +22,8 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/local/codec"
"github.com/prometheus/prometheus/storage/local/codable"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility"
)
const (
@ -58,7 +57,7 @@ func (i *FingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) er
b := i.NewBatch()
for fp, m := range mapping {
b.Put(codec.CodableFingerprint(fp), codec.CodableMetric(m))
b.Put(codable.Fingerprint(fp), codable.Metric(m))
}
return i.Commit(b)
@ -73,7 +72,7 @@ func (i *FingerprintMetricIndex) UnindexBatch(mapping FingerprintMetricMapping)
b := i.NewBatch()
for fp := range mapping {
b.Delete(codec.CodableFingerprint(fp))
b.Delete(codable.Fingerprint(fp))
}
return i.Commit(b)
@ -84,7 +83,7 @@ func (i *FingerprintMetricIndex) UnindexBatch(mapping FingerprintMetricMapping)
//
// This method is goroutine-safe.
func (i *FingerprintMetricIndex) Lookup(fp clientmodel.Fingerprint) (metric clientmodel.Metric, ok bool, err error) {
ok, err = i.Get(codec.CodableFingerprint(fp), (*codec.CodableMetric)(&metric))
ok, err = i.Get(codable.Fingerprint(fp), (*codable.Metric)(&metric))
return
}
@ -105,7 +104,7 @@ func NewFingerprintMetricIndex(basePath string) (*FingerprintMetricIndex, error)
// LabelNameLabelValuesMapping is an in-memory map of label names to
// label values.
type LabelNameLabelValuesMapping map[clientmodel.LabelName]clientmodel.LabelValues
type LabelNameLabelValuesMapping map[clientmodel.LabelName]codable.LabelValueSet
// LabelNameLabelValuesIndex is a KeyValueStore that maps existing label names
// to all label values stored for that label name.
@ -118,19 +117,17 @@ type LabelNameLabelValuesIndex struct {
// a deletion of that mapping from the index.
//
// While this method is fundamentally goroutine-safe, note that the order of
// execution for multiple batches executed concurrently is undefined. Also, it
// is in general not safe to mutate the index while Extend or Reduce are
// running.
// execution for multiple batches executed concurrently is undefined.
func (i *LabelNameLabelValuesIndex) IndexBatch(b LabelNameLabelValuesMapping) error {
batch := i.NewBatch()
for name, values := range b {
if len(values) == 0 {
if err := batch.Delete(codec.CodableLabelName(name)); err != nil {
if err := batch.Delete(codable.LabelName(name)); err != nil {
return err
}
} else {
if err := batch.Put(codec.CodableLabelName(name), codec.CodableLabelValues(values)); err != nil {
if err := batch.Put(codable.LabelName(name), values); err != nil {
return err
}
}
@ -145,80 +142,21 @@ func (i *LabelNameLabelValuesIndex) IndexBatch(b LabelNameLabelValuesMapping) er
//
// This method is goroutine-safe.
func (i *LabelNameLabelValuesIndex) Lookup(l clientmodel.LabelName) (values clientmodel.LabelValues, ok bool, err error) {
ok, err = i.Get(codec.CodableLabelName(l), (*codec.CodableLabelValues)(&values))
ok, err = i.Get(codable.LabelName(l), (*codable.LabelValues)(&values))
return
}
// Extend incorporates the given metric into the index, i.e. it creates new
// label name to label values mappings for new label names, and it extends the
// label values list mapped from already existing label names appropriately.
// LookupSet looks up all label values for a given label name. Looking up a
// non-existing label name is not an error. In that case, (nil, false, nil) is
// returned.
//
// This method is not goroutine-safe.
func (i *LabelNameLabelValuesIndex) Extend(m clientmodel.Metric) error {
b := make(LabelNameLabelValuesMapping, len(m))
for ln, lv := range m {
baseLVs, _, err := i.Lookup(ln)
if err != nil {
return err
}
lvSet := utility.Set{}
for _, baseLV := range baseLVs {
lvSet.Add(baseLV)
}
lvSet.Add(lv)
if len(lvSet) == len(baseLVs) {
continue
}
lvs := make(clientmodel.LabelValues, 0, len(lvSet))
for v := range lvSet {
lvs = append(lvs, v.(clientmodel.LabelValue))
}
b[ln] = lvs
// This method is goroutine-safe.
func (i *LabelNameLabelValuesIndex) LookupSet(l clientmodel.LabelName) (values map[clientmodel.LabelValue]struct{}, ok bool, err error) {
ok, err = i.Get(codable.LabelName(l), (*codable.LabelValueSet)(&values))
if values == nil {
values = map[clientmodel.LabelValue]struct{}{}
}
return i.IndexBatch(b)
}
// Reduce removes label values from the index based on the given label pair to
// fingerprints mapping. The mapping to be passed in here is returned by
// LabelPairFingerprintIndex.Reduce. It contains all the label pairs that have
// now fewer fingerprints mapped to it. This method checks if any label pair has
// arrived at zero mapped fingerprints. In that case, the value of that label
// pair is removed from the list of label values mapped to the name of that
// label pair. Label names that are then mapped to zero label values are removed
// entirely from the index.
//
// This method is not goroutine-safe.
func (i *LabelNameLabelValuesIndex) Reduce(m LabelPairFingerprintsMapping) error {
b := make(LabelNameLabelValuesMapping, len(m))
for lp, fps := range m {
if len(fps) != 0 {
continue
}
ln := lp.Name
lv := lp.Value
baseValues, ok := b[ln]
if !ok {
var err error
baseValues, _, err = i.Lookup(ln)
if err != nil {
return err
}
}
lvSet := utility.Set{}
for _, baseValue := range baseValues {
lvSet.Add(baseValue)
}
lvSet.Remove(lv)
if len(lvSet) == len(baseValues) {
continue
}
lvs := make(clientmodel.LabelValues, 0, len(lvSet))
for v := range lvSet {
lvs = append(lvs, v.(clientmodel.LabelValue))
}
b[ln] = lvs
}
return i.IndexBatch(b)
return
}
// NewLabelNameLabelValuesIndex returns a LevelDB-backed
@ -238,7 +176,7 @@ func NewLabelNameLabelValuesIndex(basePath string) (*LabelNameLabelValuesIndex,
// LabelPairFingerprintsMapping is an in-memory map of label pairs to
// fingerprints.
type LabelPairFingerprintsMapping map[metric.LabelPair]clientmodel.Fingerprints
type LabelPairFingerprintsMapping map[metric.LabelPair]codable.FingerprintSet
// LabelPairFingerprintIndex is a KeyValueStore that maps existing label pairs
// to the fingerprints of all metrics containing those label pairs.
@ -251,17 +189,15 @@ type LabelPairFingerprintIndex struct {
// from the index.
//
// While this method is fundamentally goroutine-safe, note that the order of
// execution for multiple batches executed concurrently is undefined. Also, it
// is in general not safe to mutate the index while Extend or Reduce are
// running.
// execution for multiple batches executed concurrently is undefined.
func (i *LabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintsMapping) error {
batch := i.NewBatch()
for pair, fps := range m {
if len(fps) == 0 {
batch.Delete(codec.CodableLabelPair(pair))
batch.Delete(codable.LabelPair(pair))
} else {
batch.Put(codec.CodableLabelPair(pair), codec.CodableFingerprints(fps))
batch.Put(codable.LabelPair(pair), fps)
}
}
@ -274,70 +210,21 @@ func (i *LabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintsMapping) e
//
// This method is goroutine-safe.
func (i *LabelPairFingerprintIndex) Lookup(p metric.LabelPair) (fps clientmodel.Fingerprints, ok bool, err error) {
ok, err = i.Get((codec.CodableLabelPair)(p), (*codec.CodableFingerprints)(&fps))
ok, err = i.Get((codable.LabelPair)(p), (*codable.Fingerprints)(&fps))
return
}
// Extend incorporates the given metric into the index, i.e. it creates new
// label pair to fingerprint mappings for new label pairs, and it extends the
// fingerprint list mapped from already existing label pairs appropriately.
// LookupSet looks up all fingerprints for a given label pair. Looking up a
// non-existing label pair is not an error. In that case, (nil, false, nil) is
// returned.
//
// This method is not goroutine-safe.
func (i *LabelPairFingerprintIndex) Extend(m clientmodel.Metric, fp clientmodel.Fingerprint) error {
b := make(LabelPairFingerprintsMapping, len(m))
for ln, lv := range m {
lp := metric.LabelPair{Name: ln, Value: lv}
baseFPs, _, err := i.Lookup(lp)
if err != nil {
return err
}
fpSet := utility.Set{}
for _, baseFP := range baseFPs {
fpSet.Add(baseFP)
}
fpSet.Add(fp)
if len(fpSet) == len(baseFPs) {
continue
}
fps := make(clientmodel.Fingerprints, 0, len(fpSet))
for f := range fpSet {
fps = append(fps, f.(clientmodel.Fingerprint))
}
b[lp] = fps
// This method is goroutine-safe.
func (i *LabelPairFingerprintIndex) LookupSet(p metric.LabelPair) (fps map[clientmodel.Fingerprint]struct{}, ok bool, err error) {
ok, err = i.Get((codable.LabelPair)(p), (*codable.FingerprintSet)(&fps))
if fps == nil {
fps = map[clientmodel.Fingerprint]struct{}{}
}
return i.IndexBatch(b)
}
// Reduce removes the given fingerprint from the fingerprint lists mapped from
// the label pairs contained in the given metric. All the updated mappings are
// returned (for consumption by LabelNameLabelValuesIndex.Reduce).
//
// This method is not goroutine-safe.
func (i *LabelPairFingerprintIndex) Reduce(m clientmodel.Metric, fp clientmodel.Fingerprint) (LabelPairFingerprintsMapping, error) {
b := make(LabelPairFingerprintsMapping, len(m))
for ln, lv := range m {
lp := metric.LabelPair{Name: ln, Value: lv}
baseFPs, _, err := i.Lookup(lp)
if err != nil {
return nil, err
}
fpSet := utility.Set{}
for _, baseFP := range baseFPs {
fpSet.Add(baseFP)
}
fpSet.Remove(fp)
if len(fpSet) == len(baseFPs) {
continue
}
fps := make(clientmodel.Fingerprints, 0, len(fpSet))
for f := range fpSet {
fps = append(fps, f.(clientmodel.Fingerprint))
}
b[lp] = fps
}
return b, i.IndexBatch(b)
return
}
// NewLabelPairFingerprintIndex returns a LevelDB-backed
@ -367,8 +254,8 @@ type FingerprintTimeRangeIndex struct {
//
// This method is goroutine-safe.
func (i *FingerprintTimeRangeIndex) Lookup(fp clientmodel.Fingerprint) (firstTime, lastTime clientmodel.Timestamp, ok bool, err error) {
var tr codec.CodableTimeRange
ok, err = i.Get(codec.CodableFingerprint(fp), &tr)
var tr codable.TimeRange
ok, err = i.Get(codable.Fingerprint(fp), &tr)
return tr.First, tr.Last, ok, err
}
@ -376,7 +263,7 @@ func (i *FingerprintTimeRangeIndex) Lookup(fp clientmodel.Fingerprint) (firstTim
//
// This method is goroutine-safe.
func (i *FingerprintTimeRangeIndex) Has(fp clientmodel.Fingerprint) (ok bool, err error) {
return i.KeyValueStore.Has(codec.CodableFingerprint(fp))
return i.KeyValueStore.Has(codable.Fingerprint(fp))
}
// NewFingerprintTimeRangeIndex returns a LevelDB-backed

View File

@ -90,20 +90,20 @@ type Persistence interface {
// GetLabelValuesForLabelName returns the label values for the given
// label name.
GetLabelValuesForLabelName(clientmodel.LabelName) (clientmodel.LabelValues, error)
// GetFingerprintsModifiedBefore returns the fingerprints whose timeseries
// have live samples before the provided timestamp.
GetFingerprintsModifiedBefore(clientmodel.Timestamp) ([]clientmodel.Fingerprint, error)
// IndexMetric indexes the given metric for the needs of
// GetFingerprintsForLabelPair and GetLabelValuesForLabelName.
IndexMetric(clientmodel.Metric, clientmodel.Fingerprint) error
// UnindexMetric removes references to the given metric from the indexes
// used for GetFingerprintsForLabelPair and
// IndexMetric queues the given metric for addition to the indexes
// needed by GetFingerprintsForLabelPair and GetLabelValuesForLabelName.
// If the queue is full, this method blocks until the metric can be queued.
// This method is goroutine-safe.
IndexMetric(clientmodel.Metric, clientmodel.Fingerprint)
// UnindexMetric queues references to the given metric for removal from
// the indexes used for GetFingerprintsForLabelPair and
// GetLabelValuesForLabelName. The index of fingerprints to archived
// metrics is not affected by this method. (In fact, never call this
// metrics is not affected by this removal. (In fact, never call this
// method for an archived metric. To drop an archived metric, call
// DropArchivedFingerprint.)
UnindexMetric(clientmodel.Metric, clientmodel.Fingerprint) error
// DropArchivedFingerprint.) If the queue is full, this method blocks
// until the metric can be queued. This method is goroutine-safe.
UnindexMetric(clientmodel.Metric, clientmodel.Fingerprint)
// ArchiveMetric persists the mapping of the given fingerprint to the
// given metric, together with the first and last timestamp of the
@ -118,12 +118,15 @@ type Persistence interface {
HasArchivedMetric(clientmodel.Fingerprint) (
hasMetric bool, firstTime, lastTime clientmodel.Timestamp, err error,
)
// GetFingerprintsModifiedBefore returns the fingerprints of archived
// timeseries that have live samples before the provided timestamp.
GetFingerprintsModifiedBefore(clientmodel.Timestamp) ([]clientmodel.Fingerprint, error)
// GetArchivedMetric retrieves the archived metric with the given
// fingerprint.
GetArchivedMetric(clientmodel.Fingerprint) (clientmodel.Metric, error)
// DropArchivedMetric deletes an archived fingerprint and its
// corresponding metric entirely. It also un-indexes the metric (no need
// to call UnindexMetric for the deleted metric.)
// corresponding metric entirely. It also queues the metric for
// un-indexing (no need to call UnindexMetric for the deleted metric.)
DropArchivedMetric(clientmodel.Fingerprint) error
// UnarchiveMetric deletes an archived fingerprint and its metric, but
// (in contrast to DropArchivedMetric) does not un-index the metric.

View File

@ -20,12 +20,13 @@ import (
"io"
"os"
"path"
"time"
"github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/local/codec"
"github.com/prometheus/prometheus/storage/local/codable"
"github.com/prometheus/prometheus/storage/local/index"
"github.com/prometheus/prometheus/storage/metric"
)
@ -44,6 +45,10 @@ const (
chunkHeaderTypeOffset = 0
chunkHeaderFirstTimeOffset = 1
chunkHeaderLastTimeOffset = 9
indexingMaxBatchSize = 1024
indexingBatchTimeout = 500 * time.Millisecond // Commit batch when idle for that long.
indexingQueueCapacity = 10 * indexingMaxBatchSize // TODO: Export as metric.
)
const (
@ -52,6 +57,19 @@ const (
flagHeadChunkPersisted
)
type indexingOpType byte
const (
add indexingOpType = iota
remove
)
type indexingOp struct {
fingerprint clientmodel.Fingerprint
metric clientmodel.Metric
opType indexingOpType
}
type diskPersistence struct {
basePath string
chunkLen int
@ -60,6 +78,9 @@ type diskPersistence struct {
archivedFingerprintToTimeRange *index.FingerprintTimeRangeIndex
labelPairToFingerprints *index.LabelPairFingerprintIndex
labelNameToLabelValues *index.LabelNameLabelValuesIndex
indexingQueue chan indexingOp
indexingStopped chan struct{}
}
// NewDiskPersistence returns a newly allocated Persistence backed by local disk storage, ready to use.
@ -67,29 +88,38 @@ func NewDiskPersistence(basePath string, chunkLen int) (Persistence, error) {
if err := os.MkdirAll(basePath, 0700); err != nil {
return nil, err
}
dp := &diskPersistence{
basePath: basePath,
chunkLen: chunkLen,
}
var err error
dp.archivedFingerprintToMetrics, err = index.NewFingerprintMetricIndex(basePath)
archivedFingerprintToMetrics, err := index.NewFingerprintMetricIndex(basePath)
if err != nil {
return nil, err
}
dp.archivedFingerprintToTimeRange, err = index.NewFingerprintTimeRangeIndex(basePath)
archivedFingerprintToTimeRange, err := index.NewFingerprintTimeRangeIndex(basePath)
if err != nil {
return nil, err
}
dp.labelPairToFingerprints, err = index.NewLabelPairFingerprintIndex(basePath)
labelPairToFingerprints, err := index.NewLabelPairFingerprintIndex(basePath)
if err != nil {
return nil, err
}
dp.labelNameToLabelValues, err = index.NewLabelNameLabelValuesIndex(basePath)
labelNameToLabelValues, err := index.NewLabelNameLabelValuesIndex(basePath)
if err != nil {
return nil, err
}
return dp, nil
p := &diskPersistence{
basePath: basePath,
chunkLen: chunkLen,
archivedFingerprintToMetrics: archivedFingerprintToMetrics,
archivedFingerprintToTimeRange: archivedFingerprintToTimeRange,
labelPairToFingerprints: labelPairToFingerprints,
labelNameToLabelValues: labelNameToLabelValues,
indexingQueue: make(chan indexingOp, indexingQueueCapacity),
indexingStopped: make(chan struct{}),
}
go p.processIndexingQueue()
return p, nil
}
func (p *diskPersistence) GetFingerprintsForLabelPair(lp metric.LabelPair) (clientmodel.Fingerprints, error) {
@ -109,8 +139,8 @@ func (p *diskPersistence) GetLabelValuesForLabelName(ln clientmodel.LabelName) (
}
func (p *diskPersistence) GetFingerprintsModifiedBefore(beforeTime clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) {
var fp codec.CodableFingerprint
var tr codec.CodableTimeRange
var fp codable.Fingerprint
var tr codable.TimeRange
fps := []clientmodel.Fingerprint{}
p.archivedFingerprintToTimeRange.ForEach(func(kv index.KeyValueAccessor) error {
if err := kv.Value(&tr); err != nil {
@ -257,10 +287,10 @@ func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap
if _, err := w.WriteString(headsMagicString); err != nil {
return err
}
if err := codec.EncodeVarint(w, headsFormatVersion); err != nil {
if err := codable.EncodeVarint(w, headsFormatVersion); err != nil {
return err
}
if err := codec.EncodeVarint(w, int64(len(fingerprintToSeries))); err != nil {
if err := codable.EncodeVarint(w, int64(len(fingerprintToSeries))); err != nil {
return err
}
@ -275,23 +305,23 @@ func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap
if err := w.WriteByte(seriesFlags); err != nil {
return err
}
if err := codec.EncodeUint64(w, uint64(fp)); err != nil {
if err := codable.EncodeUint64(w, uint64(fp)); err != nil {
return err
}
buf, err := codec.CodableMetric(series.metric).MarshalBinary()
buf, err := codable.Metric(series.metric).MarshalBinary()
if err != nil {
return err
}
w.Write(buf)
if err := codec.EncodeVarint(w, int64(len(series.chunkDescs))); err != nil {
if err := codable.EncodeVarint(w, int64(len(series.chunkDescs))); err != nil {
return err
}
for i, chunkDesc := range series.chunkDescs {
if series.headChunkPersisted || i < len(series.chunkDescs)-1 {
if err := codec.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil {
if err := codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil {
return err
}
if err := codec.EncodeVarint(w, int64(chunkDesc.lastTime())); err != nil {
if err := codable.EncodeVarint(w, int64(chunkDesc.lastTime())); err != nil {
return err
}
} else {
@ -345,11 +375,11 @@ func (p *diskPersistence) LoadSeriesMapAndHeads() (SeriesMap, error) {
return nil, err
}
headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0
fp, err := codec.DecodeUint64(r)
fp, err := codable.DecodeUint64(r)
if err != nil {
return nil, err
}
var metric codec.CodableMetric
var metric codable.Metric
if err := metric.UnmarshalFromReader(r); err != nil {
return nil, err
}
@ -458,35 +488,24 @@ func (p *diskPersistence) DropChunks(fp clientmodel.Fingerprint, beforeTime clie
return false, nil
}
func (p *diskPersistence) IndexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) error {
// TODO: Don't do it directly, but add it to a queue (which needs to be
// drained before shutdown). Queuing would make this asynchronously, and
// then batches could be created easily.
if err := p.labelNameToLabelValues.Extend(m); err != nil {
return err
}
return p.labelPairToFingerprints.Extend(m, fp)
// IndexMetric implements Persistence.
func (p *diskPersistence) IndexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) {
p.indexingQueue <- indexingOp{fp, m, add}
}
func (p *diskPersistence) UnindexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) error {
// TODO: Don't do it directly, but add it to a queue (which needs to be
// drained before shutdown). Queuing would make this asynchronously, and
// then batches could be created easily.
labelPairs, err := p.labelPairToFingerprints.Reduce(m, fp)
if err != nil {
return err
}
return p.labelNameToLabelValues.Reduce(labelPairs)
// UnindexMetric implements Persistence.
func (p *diskPersistence) UnindexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) {
p.indexingQueue <- indexingOp{fp, m, remove}
}
func (p *diskPersistence) ArchiveMetric(
// TODO: Two step process, make sure this happens atomically.
fp clientmodel.Fingerprint, m clientmodel.Metric, first, last clientmodel.Timestamp,
) error {
if err := p.archivedFingerprintToMetrics.Put(codec.CodableFingerprint(fp), codec.CodableMetric(m)); err != nil {
if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil {
return err
}
if err := p.archivedFingerprintToTimeRange.Put(codec.CodableFingerprint(fp), codec.CodableTimeRange{First: first, Last: last}); err != nil {
if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil {
return err
}
return nil
@ -510,13 +529,14 @@ func (p *diskPersistence) DropArchivedMetric(fp clientmodel.Fingerprint) error {
if err != nil || metric == nil {
return err
}
if err := p.archivedFingerprintToMetrics.Delete(codec.CodableFingerprint(fp)); err != nil {
if err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp)); err != nil {
return err
}
if err := p.archivedFingerprintToTimeRange.Delete(codec.CodableFingerprint(fp)); err != nil {
if err := p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp)); err != nil {
return err
}
return p.UnindexMetric(metric, fp)
p.UnindexMetric(metric, fp)
return nil
}
func (p *diskPersistence) UnarchiveMetric(fp clientmodel.Fingerprint) (bool, error) {
@ -525,16 +545,19 @@ func (p *diskPersistence) UnarchiveMetric(fp clientmodel.Fingerprint) (bool, err
if err != nil || !has {
return false, err
}
if err := p.archivedFingerprintToMetrics.Delete(codec.CodableFingerprint(fp)); err != nil {
if err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp)); err != nil {
return false, err
}
if err := p.archivedFingerprintToTimeRange.Delete(codec.CodableFingerprint(fp)); err != nil {
if err := p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp)); err != nil {
return false, err
}
return true, nil
}
func (p *diskPersistence) Close() error {
close(p.indexingQueue)
<-p.indexingStopped
var lastError error
if err := p.archivedFingerprintToMetrics.Close(); err != nil {
lastError = err
@ -596,6 +619,89 @@ func (p *diskPersistence) headsPath() string {
return path.Join(p.basePath, headsFileName)
}
func (p *diskPersistence) processIndexingQueue() {
batchSize := 0
nameToValues := index.LabelNameLabelValuesMapping{}
pairToFPs := index.LabelPairFingerprintsMapping{}
batchTimeout := time.NewTimer(indexingBatchTimeout)
defer batchTimeout.Stop()
commitBatch := func() {
if err := p.labelPairToFingerprints.IndexBatch(pairToFPs); err != nil {
glog.Error("Error indexing label pair to fingerprints batch: ", err)
}
if err := p.labelNameToLabelValues.IndexBatch(nameToValues); err != nil {
glog.Error("Error indexing label name to label values batch: ", err)
}
batchSize = 0
nameToValues = index.LabelNameLabelValuesMapping{}
pairToFPs = index.LabelPairFingerprintsMapping{}
}
loop:
for {
select {
case <-batchTimeout.C:
if batchSize > 0 {
commitBatch()
}
batchTimeout.Reset(indexingBatchTimeout)
case op, ok := <-p.indexingQueue:
batchTimeout.Stop()
if !ok {
if batchSize > 0 {
commitBatch()
}
break loop
}
batchSize++
for ln, lv := range op.metric {
lp := metric.LabelPair{Name: ln, Value: lv}
baseFPs, ok := pairToFPs[lp]
if !ok {
var err error
baseFPs, _, err = p.labelPairToFingerprints.LookupSet(lp)
if err != nil {
glog.Errorf("Error looking up label pair %v: %s", lp, err)
continue
}
pairToFPs[lp] = baseFPs
}
baseValues, ok := nameToValues[ln]
if !ok {
var err error
baseValues, _, err = p.labelNameToLabelValues.LookupSet(ln)
if err != nil {
glog.Errorf("Error looking up label name %v: %s", ln, err)
continue
}
nameToValues[ln] = baseValues
}
switch op.opType {
case add:
baseFPs[op.fingerprint] = struct{}{}
baseValues[lv] = struct{}{}
case remove:
delete(baseFPs, op.fingerprint)
if len(baseFPs) == 0 {
delete(baseValues, lv)
}
default:
panic("unknown op type")
}
}
if batchSize >= indexingMaxBatchSize {
commitBatch()
}
batchTimeout.Reset(indexingBatchTimeout)
}
}
close(p.indexingStopped)
}
// exists returns true when the given file or directory exists.
func exists(path string) (bool, error) {
_, err := os.Stat(path)

View File

@ -14,11 +14,13 @@
package local
import (
"sort"
"reflect"
"testing"
"time"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/local/codable"
"github.com/prometheus/prometheus/storage/local/index"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility/test"
@ -130,36 +132,46 @@ func TestIndexing(t *testing.T) {
},
},
expectedLnToLvs: index.LabelNameLabelValuesMapping{
clientmodel.MetricNameLabel: clientmodel.LabelValues{"metric_0", "metric_1"},
"label_1": clientmodel.LabelValues{"value_1", "value_2"},
"label_2": clientmodel.LabelValues{"value_2"},
"label_3": clientmodel.LabelValues{"value_3"},
clientmodel.MetricNameLabel: codable.LabelValueSet{
"metric_0": struct{}{},
"metric_1": struct{}{},
},
"label_1": codable.LabelValueSet{
"value_1": struct{}{},
"value_2": struct{}{},
},
"label_2": codable.LabelValueSet{
"value_2": struct{}{},
},
"label_3": codable.LabelValueSet{
"value_3": struct{}{},
},
},
expectedLpToFps: index.LabelPairFingerprintsMapping{
metric.LabelPair{
Name: clientmodel.MetricNameLabel,
Value: "metric_0",
}: {0, 1},
}: codable.FingerprintSet{0: struct{}{}, 1: struct{}{}},
metric.LabelPair{
Name: clientmodel.MetricNameLabel,
Value: "metric_1",
}: {2},
}: codable.FingerprintSet{2: struct{}{}},
metric.LabelPair{
Name: "label_1",
Value: "value_1",
}: {0},
}: codable.FingerprintSet{0: struct{}{}},
metric.LabelPair{
Name: "label_1",
Value: "value_2",
}: {2},
}: codable.FingerprintSet{2: struct{}{}},
metric.LabelPair{
Name: "label_2",
Value: "value_2",
}: {1},
}: codable.FingerprintSet{1: struct{}{}},
metric.LabelPair{
Name: "label_3",
Value: "value_3",
}: {1},
}: codable.FingerprintSet{1: struct{}{}},
},
}, {
fpToMetric: index.FingerprintMetricMapping{
@ -178,48 +190,61 @@ func TestIndexing(t *testing.T) {
},
},
expectedLnToLvs: index.LabelNameLabelValuesMapping{
clientmodel.MetricNameLabel: clientmodel.LabelValues{"metric_0", "metric_1", "metric_2"},
"label_1": clientmodel.LabelValues{"value_1", "value_2", "value_3"},
"label_2": clientmodel.LabelValues{"value_2"},
"label_3": clientmodel.LabelValues{"value_1", "value_3"},
clientmodel.MetricNameLabel: codable.LabelValueSet{
"metric_0": struct{}{},
"metric_1": struct{}{},
"metric_2": struct{}{},
},
"label_1": codable.LabelValueSet{
"value_1": struct{}{},
"value_2": struct{}{},
"value_3": struct{}{},
},
"label_2": codable.LabelValueSet{
"value_2": struct{}{},
},
"label_3": codable.LabelValueSet{
"value_1": struct{}{},
"value_3": struct{}{},
},
},
expectedLpToFps: index.LabelPairFingerprintsMapping{
metric.LabelPair{
Name: clientmodel.MetricNameLabel,
Value: "metric_0",
}: {0, 1, 3},
}: codable.FingerprintSet{0: struct{}{}, 1: struct{}{}, 3: struct{}{}},
metric.LabelPair{
Name: clientmodel.MetricNameLabel,
Value: "metric_1",
}: {2, 5},
}: codable.FingerprintSet{2: struct{}{}, 5: struct{}{}},
metric.LabelPair{
Name: clientmodel.MetricNameLabel,
Value: "metric_2",
}: {4},
}: codable.FingerprintSet{4: struct{}{}},
metric.LabelPair{
Name: "label_1",
Value: "value_1",
}: {0},
}: codable.FingerprintSet{0: struct{}{}},
metric.LabelPair{
Name: "label_1",
Value: "value_2",
}: {2},
}: codable.FingerprintSet{2: struct{}{}},
metric.LabelPair{
Name: "label_1",
Value: "value_3",
}: {3, 5},
}: codable.FingerprintSet{3: struct{}{}, 5: struct{}{}},
metric.LabelPair{
Name: "label_2",
Value: "value_2",
}: {1, 4},
}: codable.FingerprintSet{1: struct{}{}, 4: struct{}{}},
metric.LabelPair{
Name: "label_3",
Value: "value_1",
}: {4},
}: codable.FingerprintSet{4: struct{}{}},
metric.LabelPair{
Name: "label_3",
Value: "value_3",
}: {1},
}: codable.FingerprintSet{1: struct{}{}},
},
},
}
@ -230,24 +255,24 @@ func TestIndexing(t *testing.T) {
indexedFpsToMetrics := index.FingerprintMetricMapping{}
for i, b := range batches {
for fp, m := range b.fpToMetric {
if err := p.IndexMetric(m, fp); err != nil {
t.Fatal(err)
}
p.IndexMetric(m, fp)
if err := p.ArchiveMetric(fp, m, 1, 2); err != nil {
t.Fatal(err)
}
indexedFpsToMetrics[fp] = m
}
// TODO: Find a better solution than sleeping.
time.Sleep(2 * indexingBatchTimeout)
verifyIndexedState(i, t, b, indexedFpsToMetrics, p.(*diskPersistence))
}
for i := len(batches) - 1; i >= 0; i-- {
b := batches[i]
// TODO: Find a better solution than sleeping.
time.Sleep(2 * indexingBatchTimeout)
verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p.(*diskPersistence))
for fp, m := range b.fpToMetric {
if err := p.UnindexMetric(m, fp); err != nil {
t.Fatal(err)
}
p.UnindexMetric(m, fp)
unarchived, err := p.UnarchiveMetric(fp)
if err != nil {
t.Fatal(err)
@ -294,16 +319,13 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
t.Fatal(err)
}
sort.Sort(lvs)
sort.Sort(outLvs)
if len(lvs) != len(outLvs) {
t.Errorf("%d. different number of label values. Got: %d; want %d", i, len(outLvs), len(lvs))
outSet := codable.LabelValueSet{}
for _, lv := range outLvs {
outSet[lv] = struct{}{}
}
for j := range lvs {
if lvs[j] != outLvs[j] {
t.Errorf("%d.%d. label values don't match. Got: %s; want %s", i, j, outLvs[j], lvs[j])
}
if !reflect.DeepEqual(lvs, outSet) {
t.Errorf("%d. label values don't match. Got: %v; want %v", i, outSet, lvs)
}
}
@ -314,16 +336,13 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
t.Fatal(err)
}
sort.Sort(fps)
sort.Sort(outFps)
if len(fps) != len(outFps) {
t.Errorf("%d. %v: different number of fingerprints. Got: %d; want %d", i, lp, len(outFps), len(fps))
outSet := codable.FingerprintSet{}
for _, fp := range outFps {
outSet[fp] = struct{}{}
}
for j := range fps {
if fps[j] != outFps[j] {
t.Errorf("%d.%d. %v: fingerprints don't match. Got: %d; want %d", i, j, lp, outFps[j], fps[j])
}
if !reflect.DeepEqual(fps, outSet) {
t.Errorf("%d. %v: fingerprints don't match. Got: %v; want %v", i, lp, outSet, fps)
}
}
}

View File

@ -149,9 +149,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(m clientmodel.Metric) *memorySer
series.headChunkPersisted = true
} else {
// This was a genuinely new series, so index the metric.
if err := s.persistence.IndexMetric(m, fp); err != nil {
glog.Errorf("Error indexing metric %v: %v", m, err)
}
s.persistence.IndexMetric(m, fp)
}
}
return series
@ -362,9 +360,7 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime
if series, ok := s.fingerprintToSeries[fp]; ok {
if series.purgeOlderThan(beforeTime) && allDropped {
delete(s.fingerprintToSeries, fp)
if err := s.persistence.UnindexMetric(series.metric, fp); err != nil {
glog.Errorf("Error unindexing metric %v: %v", series.metric, err)
}
s.persistence.UnindexMetric(series.metric, fp)
}
return
}