tsdb: extract functions to encode and decode labels (#11045)
* tsdb/record: Extract functions to encode and decode labels Signed-off-by: Bryan Boreham <bjboreham@gmail.com> * tsdb: make use of Encode/Decode Labels Simplify the code by re-using routines from tsdb/record. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
parent
d9c650b92e
commit
00ec720c29
|
@ -498,11 +498,7 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {
|
|||
|
||||
buf.PutByte(chunkSnapshotRecordTypeSeries)
|
||||
buf.PutBE64(uint64(s.ref))
|
||||
buf.PutUvarint(len(s.lset))
|
||||
for _, l := range s.lset {
|
||||
buf.PutUvarintStr(l.Name)
|
||||
buf.PutUvarintStr(l.Value)
|
||||
}
|
||||
record.EncodeLabels(&buf, s.lset)
|
||||
buf.PutBE64int64(s.chunkRange)
|
||||
|
||||
s.Lock()
|
||||
|
@ -525,7 +521,7 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {
|
|||
return buf.Get()
|
||||
}
|
||||
|
||||
func decodeSeriesFromChunkSnapshot(b []byte) (csr chunkSnapshotRecord, err error) {
|
||||
func decodeSeriesFromChunkSnapshot(d *record.Decoder, b []byte) (csr chunkSnapshotRecord, err error) {
|
||||
dec := encoding.Decbuf{B: b}
|
||||
|
||||
if flag := dec.Byte(); flag != chunkSnapshotRecordTypeSeries {
|
||||
|
@ -533,13 +529,9 @@ func decodeSeriesFromChunkSnapshot(b []byte) (csr chunkSnapshotRecord, err error
|
|||
}
|
||||
|
||||
csr.ref = chunks.HeadSeriesRef(dec.Be64())
|
||||
|
||||
// The label set written to the disk is already sorted.
|
||||
csr.lset = make(labels.Labels, dec.Uvarint())
|
||||
for i := range csr.lset {
|
||||
csr.lset[i].Name = dec.UvarintStr()
|
||||
csr.lset[i].Value = dec.UvarintStr()
|
||||
}
|
||||
// TODO: figure out why DecodeLabels calls Sort(), and perhaps remove it.
|
||||
csr.lset = d.DecodeLabels(&dec)
|
||||
|
||||
csr.chunkRange = dec.Be64int64()
|
||||
if dec.Uvarint() == 0 {
|
||||
|
@ -971,7 +963,7 @@ Outer:
|
|||
switch rec[0] {
|
||||
case chunkSnapshotRecordTypeSeries:
|
||||
numSeries++
|
||||
csr, err := decodeSeriesFromChunkSnapshot(rec)
|
||||
csr, err := decodeSeriesFromChunkSnapshot(&dec, rec)
|
||||
if err != nil {
|
||||
loopErr = errors.Wrap(err, "decode series record")
|
||||
break Outer
|
||||
|
|
|
@ -183,14 +183,7 @@ func (d *Decoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) {
|
|||
}
|
||||
for len(dec.B) > 0 && dec.Err() == nil {
|
||||
ref := storage.SeriesRef(dec.Be64())
|
||||
|
||||
lset := make(labels.Labels, dec.Uvarint())
|
||||
|
||||
for i := range lset {
|
||||
lset[i].Name = dec.UvarintStr()
|
||||
lset[i].Value = dec.UvarintStr()
|
||||
}
|
||||
sort.Sort(lset)
|
||||
lset := d.DecodeLabels(&dec)
|
||||
|
||||
series = append(series, RefSeries{
|
||||
Ref: chunks.HeadSeriesRef(ref),
|
||||
|
@ -249,6 +242,18 @@ func (d *Decoder) Metadata(rec []byte, metadata []RefMetadata) ([]RefMetadata, e
|
|||
return metadata, nil
|
||||
}
|
||||
|
||||
// DecodeLabels decodes one set of labels from buf.
|
||||
func (d *Decoder) DecodeLabels(dec *encoding.Decbuf) labels.Labels {
|
||||
lset := make(labels.Labels, dec.Uvarint())
|
||||
|
||||
for i := range lset {
|
||||
lset[i].Name = dec.UvarintStr()
|
||||
lset[i].Value = dec.UvarintStr()
|
||||
}
|
||||
sort.Sort(lset)
|
||||
return lset
|
||||
}
|
||||
|
||||
// Samples appends samples in rec to the given slice.
|
||||
func (d *Decoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) {
|
||||
dec := encoding.Decbuf{B: rec}
|
||||
|
@ -330,13 +335,7 @@ func (d *Decoder) ExemplarsFromBuffer(dec *encoding.Decbuf, exemplars []RefExemp
|
|||
dref := dec.Varint64()
|
||||
dtime := dec.Varint64()
|
||||
val := dec.Be64()
|
||||
|
||||
lset := make(labels.Labels, dec.Uvarint())
|
||||
for i := range lset {
|
||||
lset[i].Name = dec.UvarintStr()
|
||||
lset[i].Value = dec.UvarintStr()
|
||||
}
|
||||
sort.Sort(lset)
|
||||
lset := d.DecodeLabels(dec)
|
||||
|
||||
exemplars = append(exemplars, RefExemplar{
|
||||
Ref: chunks.HeadSeriesRef(baseRef + uint64(dref)),
|
||||
|
@ -366,12 +365,7 @@ func (e *Encoder) Series(series []RefSeries, b []byte) []byte {
|
|||
|
||||
for _, s := range series {
|
||||
buf.PutBE64(uint64(s.Ref))
|
||||
buf.PutUvarint(len(s.Labels))
|
||||
|
||||
for _, l := range s.Labels {
|
||||
buf.PutUvarintStr(l.Name)
|
||||
buf.PutUvarintStr(l.Value)
|
||||
}
|
||||
EncodeLabels(&buf, s.Labels)
|
||||
}
|
||||
return buf.Get()
|
||||
}
|
||||
|
@ -396,6 +390,16 @@ func (e *Encoder) Metadata(metadata []RefMetadata, b []byte) []byte {
|
|||
return buf.Get()
|
||||
}
|
||||
|
||||
// EncodeLabels encodes the contents of labels into buf.
|
||||
func EncodeLabels(buf *encoding.Encbuf, lbls labels.Labels) {
|
||||
buf.PutUvarint(len(lbls))
|
||||
|
||||
for _, l := range lbls {
|
||||
buf.PutUvarintStr(l.Name)
|
||||
buf.PutUvarintStr(l.Value)
|
||||
}
|
||||
}
|
||||
|
||||
// Samples appends the encoded samples to b and returns the resulting slice.
|
||||
func (e *Encoder) Samples(samples []RefSample, b []byte) []byte {
|
||||
buf := encoding.Encbuf{B: b}
|
||||
|
@ -460,11 +464,6 @@ func (e *Encoder) EncodeExemplarsIntoBuffer(exemplars []RefExemplar, buf *encodi
|
|||
buf.PutVarint64(int64(ex.Ref) - int64(first.Ref))
|
||||
buf.PutVarint64(ex.T - first.T)
|
||||
buf.PutBE64(math.Float64bits(ex.V))
|
||||
|
||||
buf.PutUvarint(len(ex.Labels))
|
||||
for _, l := range ex.Labels {
|
||||
buf.PutUvarintStr(l.Name)
|
||||
buf.PutUvarintStr(l.Value)
|
||||
}
|
||||
EncodeLabels(buf, ex.Labels)
|
||||
}
|
||||
}
|
||||
|
|
19
tsdb/wal.go
19
tsdb/wal.go
|
@ -23,7 +23,6 @@ import (
|
|||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -32,7 +31,6 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/prometheus/tsdb/encoding"
|
||||
|
@ -790,12 +788,7 @@ const (
|
|||
func (w *SegmentWAL) encodeSeries(buf *encoding.Encbuf, series []record.RefSeries) uint8 {
|
||||
for _, s := range series {
|
||||
buf.PutBE64(uint64(s.Ref))
|
||||
buf.PutUvarint(len(s.Labels))
|
||||
|
||||
for _, l := range s.Labels {
|
||||
buf.PutUvarintStr(l.Name)
|
||||
buf.PutUvarintStr(l.Value)
|
||||
}
|
||||
record.EncodeLabels(buf, s.Labels)
|
||||
}
|
||||
return walSeriesSimple
|
||||
}
|
||||
|
@ -840,6 +833,7 @@ type walReader struct {
|
|||
cur int
|
||||
buf []byte
|
||||
crc32 hash.Hash32
|
||||
dec record.Decoder
|
||||
|
||||
curType WALEntryType
|
||||
curFlag byte
|
||||
|
@ -1123,14 +1117,7 @@ func (r *walReader) decodeSeries(flag byte, b []byte, res *[]record.RefSeries) e
|
|||
|
||||
for len(dec.B) > 0 && dec.Err() == nil {
|
||||
ref := chunks.HeadSeriesRef(dec.Be64())
|
||||
|
||||
lset := make(labels.Labels, dec.Uvarint())
|
||||
|
||||
for i := range lset {
|
||||
lset[i].Name = dec.UvarintStr()
|
||||
lset[i].Value = dec.UvarintStr()
|
||||
}
|
||||
sort.Sort(lset)
|
||||
lset := r.dec.DecodeLabels(&dec)
|
||||
|
||||
*res = append(*res, record.RefSeries{
|
||||
Ref: ref,
|
||||
|
|
Loading…
Reference in New Issue