From 1684dc750a74fcd4f23b02c1b5081328968b1a61 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 4 Mar 2019 21:42:45 +0200 Subject: [PATCH] updated tsdb to 0.6.0 (#5292) * updated tsdb to 0.6.0 as part of the update also added the new storage.tsdb.allow-overlapping-blocks flag and mark it as experimental. --- cmd/prometheus/main.go | 4 + go.mod | 2 +- go.sum | 4 +- storage/tsdb/tsdb.go | 15 +- .../github.com/prometheus/tsdb/CHANGELOG.md | 17 +- vendor/github.com/prometheus/tsdb/compact.go | 4 +- vendor/github.com/prometheus/tsdb/db.go | 37 ++- .../prometheus/tsdb/encoding/encoding.go | 244 ++++++++++++++++ .../prometheus/tsdb/encoding_helpers.go | 160 ----------- vendor/github.com/prometheus/tsdb/head.go | 3 +- .../prometheus/tsdb/index/encoding_helpers.go | 218 -------------- .../github.com/prometheus/tsdb/index/index.go | 269 +++++++++--------- .../prometheus/tsdb/index/postings.go | 143 ++++++++-- vendor/github.com/prometheus/tsdb/querier.go | 14 +- vendor/github.com/prometheus/tsdb/record.go | 111 ++++---- .../github.com/prometheus/tsdb/tombstones.go | 47 +-- vendor/github.com/prometheus/tsdb/wal.go | 107 +++---- vendor/github.com/prometheus/tsdb/wal/wal.go | 46 ++- vendor/modules.txt | 3 +- 19 files changed, 753 insertions(+), 695 deletions(-) create mode 100644 vendor/github.com/prometheus/tsdb/encoding/encoding.go delete mode 100644 vendor/github.com/prometheus/tsdb/encoding_helpers.go delete mode 100644 vendor/github.com/prometheus/tsdb/index/encoding_helpers.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index a3f21967f..1312e1862 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -198,6 +198,9 @@ func main() { a.Flag("storage.tsdb.no-lockfile", "Do not create lockfile in data directory."). Default("false").BoolVar(&cfg.tsdb.NoLockfile) + a.Flag("storage.tsdb.allow-overlapping-blocks", "[EXPERIMENTAL] Allow overlapping blocks which in-turn enables vertical compaction and vertical query merge."). + Default("false").BoolVar(&cfg.tsdb.AllowOverlappingBlocks) + a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload."). Default("1m").PlaceHolder("").SetValue(&cfg.RemoteFlushDeadline) @@ -652,6 +655,7 @@ func main() { "NoLockfile", cfg.tsdb.NoLockfile, "RetentionDuration", cfg.tsdb.RetentionDuration, "WALSegmentSize", cfg.tsdb.WALSegmentSize, + "AllowOverlappingBlocks", cfg.tsdb.AllowOverlappingBlocks, ) startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000) diff --git a/go.mod b/go.mod index db21d92d7..b781fbb9c 100644 --- a/go.mod +++ b/go.mod @@ -90,7 +90,7 @@ require ( github.com/prometheus/client_golang v0.9.1 github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 github.com/prometheus/common v0.0.0-20181119215939-b36ad289a3ea - github.com/prometheus/tsdb v0.4.1-0.20190219143357-77d5a7d47a52 + github.com/prometheus/tsdb v0.6.0 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect github.com/rlmcpherson/s3gof3r v0.5.0 // indirect github.com/rubyist/circuitbreaker v2.2.1+incompatible // indirect diff --git a/go.sum b/go.sum index d926f96f0..8f1306575 100644 --- a/go.sum +++ b/go.sum @@ -231,8 +231,8 @@ github.com/prometheus/common v0.0.0-20181119215939-b36ad289a3ea/go.mod h1:daVV7q github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d h1:GoAlyOgbOEIFdaDqxJVlbOQ1DtGmZWs/Qau0hIlk+WQ= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= -github.com/prometheus/tsdb v0.4.1-0.20190219143357-77d5a7d47a52 h1:ULXRH8vXOu1QwA7l7N+zAKS/jfvs7HLCNH77FEdKTTQ= -github.com/prometheus/tsdb v0.4.1-0.20190219143357-77d5a7d47a52/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/prometheus/tsdb v0.6.0 h1:BKo0qP+rz4y79nqwrs6mB080S6qQAlYXEMR3y/luNik= +github.com/prometheus/tsdb v0.6.0/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rlmcpherson/s3gof3r v0.5.0 h1:1izOJpTiohSibfOHuNyEA/yQnAirh05enzEdmhez43k= diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index 4b45dd47b..10d15f706 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -126,6 +126,10 @@ type Options struct { // Disable creation and consideration of lockfile. NoLockfile bool + + // When true it disables the overlapping blocks check. + // This in-turn enables vertical compaction and vertical query merge. + AllowOverlappingBlocks bool } var ( @@ -185,11 +189,12 @@ func Open(path string, l log.Logger, r prometheus.Registerer, opts *Options) (*t } db, err := tsdb.Open(path, l, r, &tsdb.Options{ - WALSegmentSize: int(opts.WALSegmentSize), - RetentionDuration: uint64(time.Duration(opts.RetentionDuration).Seconds() * 1000), - MaxBytes: int64(opts.MaxBytes), - BlockRanges: rngs, - NoLockfile: opts.NoLockfile, + WALSegmentSize: int(opts.WALSegmentSize), + RetentionDuration: uint64(time.Duration(opts.RetentionDuration).Seconds() * 1000), + MaxBytes: int64(opts.MaxBytes), + BlockRanges: rngs, + NoLockfile: opts.NoLockfile, + AllowOverlappingBlocks: opts.AllowOverlappingBlocks, }) if err != nil { return nil, err diff --git a/vendor/github.com/prometheus/tsdb/CHANGELOG.md b/vendor/github.com/prometheus/tsdb/CHANGELOG.md index 705dc6b4c..534de25cb 100644 --- a/vendor/github.com/prometheus/tsdb/CHANGELOG.md +++ b/vendor/github.com/prometheus/tsdb/CHANGELOG.md @@ -1,11 +1,22 @@ ## master / unreleased - - [ENHANCEMENT] Time-ovelapping blocks are now allowed. [#370](https://github.com/prometheus/tsdb/pull/370) + +## 0.6.0 + - [CHANGE] `AllowOverlappingBlock` is now `AllowOverlappingBlocks`. + +## 0.5.0 + - [FEATURE] Time-ovelapping blocks are now allowed. [#370](https://github.com/prometheus/tsdb/pull/370) + - Disabled by default and can be enabled via `AllowOverlappingBlock` option. - Added `MergeChunks` function in `chunkenc/xor.go` to merge 2 time-overlapping chunks. - Added `MergeOverlappingChunks` function in `chunks/chunks.go` to merge multiple time-overlapping Chunk Metas. - Added `MinTime` and `MaxTime` method for `BlockReader`. - - [CHANGE] `NewLeveledCompactor` takes a context so that a compaction is canceled when closing the db. + - [FEATURE] New `dump` command to tsdb tool to dump all samples. + - [FEATURE] New `encoding` package for common binary encoding/decoding helpers. + - Added to remove some code duplication. - [ENHANCEMENT] When closing the db any running compaction will be cancelled so it doesn't block. - - [CHANGE] `prometheus_tsdb_storage_blocks_bytes_total` is now `prometheus_tsdb_storage_blocks_bytes` + - `NewLeveledCompactor` takes a context. + - [CHANGE] `prometheus_tsdb_storage_blocks_bytes_total` is now `prometheus_tsdb_storage_blocks_bytes`. + - [BUGFIX] Improved Postings Merge performance. Fixes a regression from the the previous release. + - [BUGFIX] LiveReader can get into an infinite loop on corrupt WALs. ## 0.4.0 - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. diff --git a/vendor/github.com/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/tsdb/compact.go index 065b43e75..a1a7aa76f 100644 --- a/vendor/github.com/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/tsdb/compact.go @@ -51,8 +51,8 @@ func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 { // Compactor provides compaction against an underlying storage // of time series data. type Compactor interface { - // Plan returns a set of non-overlapping directories that can - // be compacted concurrently. + // Plan returns a set of directories that can be compacted concurrently. + // The directories can be overlapping. // Results returned when compactions are in progress are undefined. Plan(dir string) ([]string, error) diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index fd457ca4c..7b03a209c 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -45,10 +45,11 @@ import ( // DefaultOptions used for the DB. They are sane for setups using // millisecond precision timestamps. var DefaultOptions = &Options{ - WALSegmentSize: wal.DefaultSegmentSize, - RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds - BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5), - NoLockfile: false, + WALSegmentSize: wal.DefaultSegmentSize, + RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds + BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5), + NoLockfile: false, + AllowOverlappingBlocks: false, } // Options of the DB storage. @@ -71,6 +72,10 @@ type Options struct { // NoLockfile disables creation and consideration of a lock file. NoLockfile bool + + // Overlapping blocks are allowed if AllowOverlappingBlocks is true. + // This in-turn enables vertical compaction and vertical query merge. + AllowOverlappingBlocks bool } // Appender allows appending a batch of data. It must be completed with a @@ -548,6 +553,11 @@ func (db *DB) reload() (err error) { sort.Slice(loadable, func(i, j int) bool { return loadable[i].Meta().MinTime < loadable[j].Meta().MinTime }) + if !db.opts.AllowOverlappingBlocks { + if err := validateBlockSequence(loadable); err != nil { + return errors.Wrap(err, "invalid block sequence") + } + } // Swap new blocks first for subsequently created readers to be seen. db.mtx.Lock() @@ -699,6 +709,25 @@ func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error { return nil } +// validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence. +func validateBlockSequence(bs []*Block) error { + if len(bs) <= 1 { + return nil + } + + var metas []BlockMeta + for _, b := range bs { + metas = append(metas, b.meta) + } + + overlaps := OverlappingBlocks(metas) + if len(overlaps) > 0 { + return errors.Errorf("block time ranges overlap: %s", overlaps) + } + + return nil +} + // TimeRange specifies minTime and maxTime range. type TimeRange struct { Min, Max int64 diff --git a/vendor/github.com/prometheus/tsdb/encoding/encoding.go b/vendor/github.com/prometheus/tsdb/encoding/encoding.go new file mode 100644 index 000000000..395e11cf8 --- /dev/null +++ b/vendor/github.com/prometheus/tsdb/encoding/encoding.go @@ -0,0 +1,244 @@ +// Copyright 2018 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package encoding + +import ( + "encoding/binary" + "hash" + "hash/crc32" + "unsafe" + + "github.com/pkg/errors" +) + +var ( + ErrInvalidSize = errors.New("invalid size") + ErrInvalidChecksum = errors.New("invalid checksum") +) + +// enbuf is a helper type to populate a byte slice with various types. +type Encbuf struct { + B []byte + C [binary.MaxVarintLen64]byte +} + +func (e *Encbuf) Reset() { e.B = e.B[:0] } +func (e *Encbuf) Get() []byte { return e.B } +func (e *Encbuf) Len() int { return len(e.B) } + +func (e *Encbuf) PutString(s string) { e.B = append(e.B, s...) } +func (e *Encbuf) PutByte(c byte) { e.B = append(e.B, c) } + +func (e *Encbuf) PutBE32int(x int) { e.PutBE32(uint32(x)) } +func (e *Encbuf) PutUvarint32(x uint32) { e.PutUvarint64(uint64(x)) } +func (e *Encbuf) PutBE64int64(x int64) { e.PutBE64(uint64(x)) } +func (e *Encbuf) PutUvarint(x int) { e.PutUvarint64(uint64(x)) } + +func (e *Encbuf) PutBE32(x uint32) { + binary.BigEndian.PutUint32(e.C[:], x) + e.B = append(e.B, e.C[:4]...) +} + +func (e *Encbuf) PutBE64(x uint64) { + binary.BigEndian.PutUint64(e.C[:], x) + e.B = append(e.B, e.C[:8]...) +} + +func (e *Encbuf) PutUvarint64(x uint64) { + n := binary.PutUvarint(e.C[:], x) + e.B = append(e.B, e.C[:n]...) +} + +func (e *Encbuf) PutVarint64(x int64) { + n := binary.PutVarint(e.C[:], x) + e.B = append(e.B, e.C[:n]...) +} + +// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!). +func (e *Encbuf) PutUvarintStr(s string) { + b := *(*[]byte)(unsafe.Pointer(&s)) + e.PutUvarint(len(b)) + e.PutString(s) +} + +// putHash appends a hash over the buffers current contents to the buffer. +func (e *Encbuf) PutHash(h hash.Hash) { + h.Reset() + _, err := h.Write(e.B) + if err != nil { + panic(err) // The CRC32 implementation does not error + } + e.B = h.Sum(e.B) +} + +// decbuf provides safe methods to extract data from a byte slice. It does all +// necessary bounds checking and advancing of the byte slice. +// Several datums can be extracted without checking for errors. However, before using +// any datum, the err() method must be checked. +type Decbuf struct { + B []byte + E error +} + +// NewDecbufAt returns a new decoding buffer. It expects the first 4 bytes +// after offset to hold the big endian encoded content length, followed by the contents and the expected +// checksum. +func NewDecbufAt(bs ByteSlice, off int, castagnoliTable *crc32.Table) Decbuf { + if bs.Len() < off+4 { + return Decbuf{E: ErrInvalidSize} + } + b := bs.Range(off, off+4) + l := int(binary.BigEndian.Uint32(b)) + + if bs.Len() < off+4+l+4 { + return Decbuf{E: ErrInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = bs.Range(off+4, off+4+l+4) + dec := Decbuf{B: b[:len(b)-4]} + + if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.Crc32(castagnoliTable) != exp { + return Decbuf{E: ErrInvalidChecksum} + } + return dec +} + +// NewDecbufUvarintAt returns a new decoding buffer. It expects the first bytes +// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected +// checksum. +func NewDecbufUvarintAt(bs ByteSlice, off int, castagnoliTable *crc32.Table) Decbuf { + // We never have to access this method at the far end of the byte slice. Thus just checking + // against the MaxVarintLen32 is sufficient. + if bs.Len() < off+binary.MaxVarintLen32 { + return Decbuf{E: ErrInvalidSize} + } + b := bs.Range(off, off+binary.MaxVarintLen32) + + l, n := binary.Uvarint(b) + if n <= 0 || n > binary.MaxVarintLen32 { + return Decbuf{E: errors.Errorf("invalid uvarint %d", n)} + } + + if bs.Len() < off+n+int(l)+4 { + return Decbuf{E: ErrInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = bs.Range(off+n, off+n+int(l)+4) + dec := Decbuf{B: b[:len(b)-4]} + + if dec.Crc32(castagnoliTable) != binary.BigEndian.Uint32(b[len(b)-4:]) { + return Decbuf{E: ErrInvalidChecksum} + } + return dec +} + +func (d *Decbuf) Uvarint() int { return int(d.Uvarint64()) } +func (d *Decbuf) Be32int() int { return int(d.Be32()) } +func (d *Decbuf) Be64int64() int64 { return int64(d.Be64()) } + +// Crc32 returns a CRC32 checksum over the remaining bytes. +func (d *Decbuf) Crc32(castagnoliTable *crc32.Table) uint32 { + return crc32.Checksum(d.B, castagnoliTable) +} + +func (d *Decbuf) UvarintStr() string { + l := d.Uvarint64() + if d.E != nil { + return "" + } + if len(d.B) < int(l) { + d.E = ErrInvalidSize + return "" + } + s := string(d.B[:l]) + d.B = d.B[l:] + return s +} + +func (d *Decbuf) Varint64() int64 { + if d.E != nil { + return 0 + } + x, n := binary.Varint(d.B) + if n < 1 { + d.E = ErrInvalidSize + return 0 + } + d.B = d.B[n:] + return x +} + +func (d *Decbuf) Uvarint64() uint64 { + if d.E != nil { + return 0 + } + x, n := binary.Uvarint(d.B) + if n < 1 { + d.E = ErrInvalidSize + return 0 + } + d.B = d.B[n:] + return x +} + +func (d *Decbuf) Be64() uint64 { + if d.E != nil { + return 0 + } + if len(d.B) < 8 { + d.E = ErrInvalidSize + return 0 + } + x := binary.BigEndian.Uint64(d.B) + d.B = d.B[8:] + return x +} + +func (d *Decbuf) Be32() uint32 { + if d.E != nil { + return 0 + } + if len(d.B) < 4 { + d.E = ErrInvalidSize + return 0 + } + x := binary.BigEndian.Uint32(d.B) + d.B = d.B[4:] + return x +} + +func (d *Decbuf) Byte() byte { + if d.E != nil { + return 0 + } + if len(d.B) < 1 { + d.E = ErrInvalidSize + return 0 + } + x := d.B[0] + d.B = d.B[1:] + return x +} + +func (d *Decbuf) Err() error { return d.E } +func (d *Decbuf) Len() int { return len(d.B) } +func (d *Decbuf) Get() []byte { return d.B } + +// ByteSlice abstracts a byte slice. +type ByteSlice interface { + Len() int + Range(start, end int) []byte +} diff --git a/vendor/github.com/prometheus/tsdb/encoding_helpers.go b/vendor/github.com/prometheus/tsdb/encoding_helpers.go deleted file mode 100644 index 9c10e3160..000000000 --- a/vendor/github.com/prometheus/tsdb/encoding_helpers.go +++ /dev/null @@ -1,160 +0,0 @@ -// Copyright 2018 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tsdb - -import ( - "encoding/binary" - "unsafe" - - "github.com/pkg/errors" -) - -var errInvalidSize = errors.New("invalid size") - -// encbuf is a helper type to populate a byte slice with various types. -type encbuf struct { - b []byte - c [binary.MaxVarintLen64]byte -} - -func (e *encbuf) reset() { e.b = e.b[:0] } -func (e *encbuf) get() []byte { return e.b } - -func (e *encbuf) putString(s string) { e.b = append(e.b, s...) } -func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } - -func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) } -func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } - -func (e *encbuf) putBE32(x uint32) { - binary.BigEndian.PutUint32(e.c[:], x) - e.b = append(e.b, e.c[:4]...) -} - -func (e *encbuf) putBE64(x uint64) { - binary.BigEndian.PutUint64(e.c[:], x) - e.b = append(e.b, e.c[:8]...) -} - -func (e *encbuf) putUvarint64(x uint64) { - n := binary.PutUvarint(e.c[:], x) - e.b = append(e.b, e.c[:n]...) -} - -func (e *encbuf) putVarint64(x int64) { - n := binary.PutVarint(e.c[:], x) - e.b = append(e.b, e.c[:n]...) -} - -// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!). -func (e *encbuf) putUvarintStr(s string) { - b := *(*[]byte)(unsafe.Pointer(&s)) - e.putUvarint(len(b)) - e.putString(s) -} - -// decbuf provides safe methods to extract data from a byte slice. It does all -// necessary bounds checking and advancing of the byte slice. -// Several datums can be extracted without checking for errors. However, before using -// any datum, the err() method must be checked. -type decbuf struct { - b []byte - e error -} - -func (d *decbuf) uvarint() int { return int(d.uvarint64()) } -func (d *decbuf) be64int64() int64 { return int64(d.be64()) } - -func (d *decbuf) uvarintStr() string { - l := d.uvarint64() - if d.e != nil { - return "" - } - if len(d.b) < int(l) { - d.e = errInvalidSize - return "" - } - s := string(d.b[:l]) - d.b = d.b[l:] - return s -} - -func (d *decbuf) varint64() int64 { - if d.e != nil { - return 0 - } - x, n := binary.Varint(d.b) - if n < 1 { - d.e = errInvalidSize - return 0 - } - d.b = d.b[n:] - return x -} - -func (d *decbuf) uvarint64() uint64 { - if d.e != nil { - return 0 - } - x, n := binary.Uvarint(d.b) - if n < 1 { - d.e = errInvalidSize - return 0 - } - d.b = d.b[n:] - return x -} - -func (d *decbuf) be64() uint64 { - if d.e != nil { - return 0 - } - if len(d.b) < 4 { - d.e = errInvalidSize - return 0 - } - x := binary.BigEndian.Uint64(d.b) - d.b = d.b[8:] - return x -} - -func (d *decbuf) be32() uint32 { - if d.e != nil { - return 0 - } - if len(d.b) < 4 { - d.e = errInvalidSize - return 0 - } - x := binary.BigEndian.Uint32(d.b) - d.b = d.b[4:] - return x -} - -func (d *decbuf) byte() byte { - if d.e != nil { - return 0 - } - if len(d.b) < 1 { - d.e = errInvalidSize - return 0 - } - x := d.b[0] - d.b = d.b[1:] - return x -} - -func (d *decbuf) err() error { return d.e } -func (d *decbuf) len() int { return len(d.b) } -func (d *decbuf) get() []byte { return d.b } diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index c5ac06c91..fe2089754 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/encoding" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/wal" @@ -1117,7 +1118,7 @@ func (h *headIndexReader) Symbols() (map[string]struct{}, error) { // LabelValues returns the possible label values func (h *headIndexReader) LabelValues(names ...string) (index.StringTuples, error) { if len(names) != 1 { - return nil, errInvalidSize + return nil, encoding.ErrInvalidSize } h.head.symMtx.RLock() diff --git a/vendor/github.com/prometheus/tsdb/index/encoding_helpers.go b/vendor/github.com/prometheus/tsdb/index/encoding_helpers.go deleted file mode 100644 index 1ed130158..000000000 --- a/vendor/github.com/prometheus/tsdb/index/encoding_helpers.go +++ /dev/null @@ -1,218 +0,0 @@ -// Copyright 2018 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package index - -import ( - "encoding/binary" - "hash" - "hash/crc32" - "unsafe" - - "github.com/pkg/errors" -) - -// enbuf is a helper type to populate a byte slice with various types. -type encbuf struct { - b []byte - c [binary.MaxVarintLen64]byte -} - -func (e *encbuf) reset() { e.b = e.b[:0] } -func (e *encbuf) get() []byte { return e.b } -func (e *encbuf) len() int { return len(e.b) } - -func (e *encbuf) putString(s string) { e.b = append(e.b, s...) } -func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } - -func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) } -func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) } -func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } - -func (e *encbuf) putBE32(x uint32) { - binary.BigEndian.PutUint32(e.c[:], x) - e.b = append(e.b, e.c[:4]...) -} - -func (e *encbuf) putBE64(x uint64) { - binary.BigEndian.PutUint64(e.c[:], x) - e.b = append(e.b, e.c[:8]...) -} - -func (e *encbuf) putUvarint64(x uint64) { - n := binary.PutUvarint(e.c[:], x) - e.b = append(e.b, e.c[:n]...) -} - -func (e *encbuf) putVarint64(x int64) { - n := binary.PutVarint(e.c[:], x) - e.b = append(e.b, e.c[:n]...) -} - -// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!). -func (e *encbuf) putUvarintStr(s string) { - b := *(*[]byte)(unsafe.Pointer(&s)) - e.putUvarint(len(b)) - e.putString(s) -} - -// putHash appends a hash over the buffers current contents to the buffer. -func (e *encbuf) putHash(h hash.Hash) { - h.Reset() - _, err := h.Write(e.b) - if err != nil { - panic(err) // The CRC32 implementation does not error - } - e.b = h.Sum(e.b) -} - -// decbuf provides safe methods to extract data from a byte slice. It does all -// necessary bounds checking and advancing of the byte slice. -// Several datums can be extracted without checking for errors. However, before using -// any datum, the err() method must be checked. -type decbuf struct { - b []byte - e error -} - -// newDecbufAt returns a new decoding buffer. It expects the first 4 bytes -// after offset to hold the big endian encoded content length, followed by the contents and the expected -// checksum. -func newDecbufAt(bs ByteSlice, off int) decbuf { - if bs.Len() < off+4 { - return decbuf{e: errInvalidSize} - } - b := bs.Range(off, off+4) - l := int(binary.BigEndian.Uint32(b)) - - if bs.Len() < off+4+l+4 { - return decbuf{e: errInvalidSize} - } - - // Load bytes holding the contents plus a CRC32 checksum. - b = bs.Range(off+4, off+4+l+4) - dec := decbuf{b: b[:len(b)-4]} - - if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp { - return decbuf{e: errInvalidChecksum} - } - return dec -} - -// decbufUvarintAt returns a new decoding buffer. It expects the first bytes -// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected -// checksum. -func newDecbufUvarintAt(bs ByteSlice, off int) decbuf { - // We never have to access this method at the far end of the byte slice. Thus just checking - // against the MaxVarintLen32 is sufficient. - if bs.Len() < off+binary.MaxVarintLen32 { - return decbuf{e: errInvalidSize} - } - b := bs.Range(off, off+binary.MaxVarintLen32) - - l, n := binary.Uvarint(b) - if n <= 0 || n > binary.MaxVarintLen32 { - return decbuf{e: errors.Errorf("invalid uvarint %d", n)} - } - - if bs.Len() < off+n+int(l)+4 { - return decbuf{e: errInvalidSize} - } - - // Load bytes holding the contents plus a CRC32 checksum. - b = bs.Range(off+n, off+n+int(l)+4) - dec := decbuf{b: b[:len(b)-4]} - - if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) { - return decbuf{e: errInvalidChecksum} - } - return dec -} - -func (d *decbuf) uvarint() int { return int(d.uvarint64()) } -func (d *decbuf) be32int() int { return int(d.be32()) } - -// crc32 returns a CRC32 checksum over the remaining bytes. -func (d *decbuf) crc32() uint32 { - return crc32.Checksum(d.b, castagnoliTable) -} - -func (d *decbuf) uvarintStr() string { - l := d.uvarint64() - if d.e != nil { - return "" - } - if len(d.b) < int(l) { - d.e = errInvalidSize - return "" - } - s := string(d.b[:l]) - d.b = d.b[l:] - return s -} - -func (d *decbuf) varint64() int64 { - if d.e != nil { - return 0 - } - x, n := binary.Varint(d.b) - if n < 1 { - d.e = errInvalidSize - return 0 - } - d.b = d.b[n:] - return x -} - -func (d *decbuf) uvarint64() uint64 { - if d.e != nil { - return 0 - } - x, n := binary.Uvarint(d.b) - if n < 1 { - d.e = errInvalidSize - return 0 - } - d.b = d.b[n:] - return x -} - -func (d *decbuf) be64() uint64 { - if d.e != nil { - return 0 - } - if len(d.b) < 8 { - d.e = errInvalidSize - return 0 - } - x := binary.BigEndian.Uint64(d.b) - d.b = d.b[8:] - return x -} - -func (d *decbuf) be32() uint32 { - if d.e != nil { - return 0 - } - if len(d.b) < 4 { - d.e = errInvalidSize - return 0 - } - x := binary.BigEndian.Uint32(d.b) - d.b = d.b[4:] - return x -} - -func (d *decbuf) err() error { return d.e } -func (d *decbuf) len() int { return len(d.b) } -func (d *decbuf) get() []byte { return d.b } diff --git a/vendor/github.com/prometheus/tsdb/index/index.go b/vendor/github.com/prometheus/tsdb/index/index.go index 442e0255f..580291a71 100644 --- a/vendor/github.com/prometheus/tsdb/index/index.go +++ b/vendor/github.com/prometheus/tsdb/index/index.go @@ -16,7 +16,6 @@ package index import ( "bufio" "encoding/binary" - "fmt" "hash" "hash/crc32" "io" @@ -29,6 +28,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/encoding" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" ) @@ -119,8 +119,8 @@ type Writer struct { stage indexWriterStage // Reusable memory. - buf1 encbuf - buf2 encbuf + buf1 encoding.Encbuf + buf2 encoding.Encbuf uint32s []uint32 symbols map[string]uint32 // symbol offsets @@ -149,28 +149,28 @@ type TOC struct { // NewTOCFromByteSlice return parsed TOC from given index byte slice. func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { if bs.Len() < indexTOCLen { - return nil, errInvalidSize + return nil, encoding.ErrInvalidSize } b := bs.Range(bs.Len()-indexTOCLen, bs.Len()) expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) - d := decbuf{b: b[:len(b)-4]} + d := encoding.Decbuf{B: b[:len(b)-4]} - if d.crc32() != expCRC { - return nil, errors.Wrap(errInvalidChecksum, "read TOC") + if d.Crc32(castagnoliTable) != expCRC { + return nil, errors.Wrap(encoding.ErrInvalidChecksum, "read TOC") } - if err := d.err(); err != nil { + if err := d.Err(); err != nil { return nil, err } return &TOC{ - Symbols: d.be64(), - Series: d.be64(), - LabelIndices: d.be64(), - LabelIndicesTable: d.be64(), - Postings: d.be64(), - PostingsTable: d.be64(), + Symbols: d.Be64(), + Series: d.Be64(), + LabelIndices: d.Be64(), + LabelIndicesTable: d.Be64(), + Postings: d.Be64(), + PostingsTable: d.Be64(), }, nil } @@ -203,8 +203,8 @@ func NewWriter(fn string) (*Writer, error) { stage: idxStageNone, // Reusable memory. - buf1: encbuf{b: make([]byte, 0, 1<<22)}, - buf2: encbuf{b: make([]byte, 0, 1<<22)}, + buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, + buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, uint32s: make([]uint32, 0, 1<<15), // Caches. @@ -288,11 +288,11 @@ func (w *Writer) ensureStage(s indexWriterStage) error { } func (w *Writer) writeMeta() error { - w.buf1.reset() - w.buf1.putBE32(MagicIndex) - w.buf1.putByte(FormatV2) + w.buf1.Reset() + w.buf1.PutBE32(MagicIndex) + w.buf1.PutByte(FormatV2) - return w.write(w.buf1.get()) + return w.write(w.buf1.Get()) } // AddSeries adds the series one at a time along with its chunks. @@ -318,8 +318,8 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta } w.seriesOffsets[ref] = w.pos / 16 - w.buf2.reset() - w.buf2.putUvarint(len(lset)) + w.buf2.Reset() + w.buf2.PutUvarint(len(lset)) for _, l := range lset { // here we have an index for the symbol file if v2, otherwise it's an offset @@ -327,41 +327,41 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta if !ok { return errors.Errorf("symbol entry for %q does not exist", l.Name) } - w.buf2.putUvarint32(index) + w.buf2.PutUvarint32(index) index, ok = w.symbols[l.Value] if !ok { return errors.Errorf("symbol entry for %q does not exist", l.Value) } - w.buf2.putUvarint32(index) + w.buf2.PutUvarint32(index) } - w.buf2.putUvarint(len(chunks)) + w.buf2.PutUvarint(len(chunks)) if len(chunks) > 0 { c := chunks[0] - w.buf2.putVarint64(c.MinTime) - w.buf2.putUvarint64(uint64(c.MaxTime - c.MinTime)) - w.buf2.putUvarint64(c.Ref) + w.buf2.PutVarint64(c.MinTime) + w.buf2.PutUvarint64(uint64(c.MaxTime - c.MinTime)) + w.buf2.PutUvarint64(c.Ref) t0 := c.MaxTime ref0 := int64(c.Ref) for _, c := range chunks[1:] { - w.buf2.putUvarint64(uint64(c.MinTime - t0)) - w.buf2.putUvarint64(uint64(c.MaxTime - c.MinTime)) + w.buf2.PutUvarint64(uint64(c.MinTime - t0)) + w.buf2.PutUvarint64(uint64(c.MaxTime - c.MinTime)) t0 = c.MaxTime - w.buf2.putVarint64(int64(c.Ref) - ref0) + w.buf2.PutVarint64(int64(c.Ref) - ref0) ref0 = int64(c.Ref) } } - w.buf1.reset() - w.buf1.putUvarint(w.buf2.len()) + w.buf1.Reset() + w.buf1.PutUvarint(w.buf2.Len()) - w.buf2.putHash(w.crc32) + w.buf2.PutHash(w.crc32) - if err := w.write(w.buf1.get(), w.buf2.get()); err != nil { + if err := w.write(w.buf1.Get(), w.buf2.Get()); err != nil { return errors.Wrap(err, "write series data") } @@ -382,22 +382,22 @@ func (w *Writer) AddSymbols(sym map[string]struct{}) error { } sort.Strings(symbols) - w.buf1.reset() - w.buf2.reset() + w.buf1.Reset() + w.buf2.Reset() - w.buf2.putBE32int(len(symbols)) + w.buf2.PutBE32int(len(symbols)) w.symbols = make(map[string]uint32, len(symbols)) for index, s := range symbols { w.symbols[s] = uint32(index) - w.buf2.putUvarintStr(s) + w.buf2.PutUvarintStr(s) } - w.buf1.putBE32int(w.buf2.len()) - w.buf2.putHash(w.crc32) + w.buf1.PutBE32int(w.buf2.Len()) + w.buf2.PutHash(w.crc32) - err := w.write(w.buf1.get(), w.buf2.get()) + err := w.write(w.buf1.Get(), w.buf2.Get()) return errors.Wrap(err, "write symbols") } @@ -425,9 +425,9 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error { offset: w.pos, }) - w.buf2.reset() - w.buf2.putBE32int(len(names)) - w.buf2.putBE32int(valt.Len()) + w.buf2.Reset() + w.buf2.PutBE32int(len(names)) + w.buf2.PutBE32int(valt.Len()) // here we have an index for the symbol file if v2, otherwise it's an offset for _, v := range valt.entries { @@ -435,53 +435,53 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error { if !ok { return errors.Errorf("symbol entry for %q does not exist", v) } - w.buf2.putBE32(index) + w.buf2.PutBE32(index) } - w.buf1.reset() - w.buf1.putBE32int(w.buf2.len()) + w.buf1.Reset() + w.buf1.PutBE32int(w.buf2.Len()) - w.buf2.putHash(w.crc32) + w.buf2.PutHash(w.crc32) - err = w.write(w.buf1.get(), w.buf2.get()) + err = w.write(w.buf1.Get(), w.buf2.Get()) return errors.Wrap(err, "write label index") } // writeOffsetTable writes a sequence of readable hash entries. func (w *Writer) writeOffsetTable(entries []hashEntry) error { - w.buf2.reset() - w.buf2.putBE32int(len(entries)) + w.buf2.Reset() + w.buf2.PutBE32int(len(entries)) for _, e := range entries { - w.buf2.putUvarint(len(e.keys)) + w.buf2.PutUvarint(len(e.keys)) for _, k := range e.keys { - w.buf2.putUvarintStr(k) + w.buf2.PutUvarintStr(k) } - w.buf2.putUvarint64(e.offset) + w.buf2.PutUvarint64(e.offset) } - w.buf1.reset() - w.buf1.putBE32int(w.buf2.len()) - w.buf2.putHash(w.crc32) + w.buf1.Reset() + w.buf1.PutBE32int(w.buf2.Len()) + w.buf2.PutHash(w.crc32) - return w.write(w.buf1.get(), w.buf2.get()) + return w.write(w.buf1.Get(), w.buf2.Get()) } const indexTOCLen = 6*8 + 4 func (w *Writer) writeTOC() error { - w.buf1.reset() + w.buf1.Reset() - w.buf1.putBE64(w.toc.Symbols) - w.buf1.putBE64(w.toc.Series) - w.buf1.putBE64(w.toc.LabelIndices) - w.buf1.putBE64(w.toc.LabelIndicesTable) - w.buf1.putBE64(w.toc.Postings) - w.buf1.putBE64(w.toc.PostingsTable) + w.buf1.PutBE64(w.toc.Symbols) + w.buf1.PutBE64(w.toc.Series) + w.buf1.PutBE64(w.toc.LabelIndices) + w.buf1.PutBE64(w.toc.LabelIndicesTable) + w.buf1.PutBE64(w.toc.Postings) + w.buf1.PutBE64(w.toc.PostingsTable) - w.buf1.putHash(w.crc32) + w.buf1.PutHash(w.crc32) - return w.write(w.buf1.get()) + return w.write(w.buf1.Get()) } func (w *Writer) WritePostings(name, value string, it Postings) error { @@ -519,20 +519,20 @@ func (w *Writer) WritePostings(name, value string, it Postings) error { } sort.Sort(uint32slice(refs)) - w.buf2.reset() - w.buf2.putBE32int(len(refs)) + w.buf2.Reset() + w.buf2.PutBE32int(len(refs)) for _, r := range refs { - w.buf2.putBE32(r) + w.buf2.PutBE32(r) } w.uint32s = refs - w.buf1.reset() - w.buf1.putBE32int(w.buf2.len()) + w.buf1.Reset() + w.buf1.PutBE32int(w.buf2.Len()) - w.buf2.putHash(w.crc32) + w.buf2.PutHash(w.crc32) - err := w.write(w.buf1.get(), w.buf2.get()) + err := w.write(w.buf1.Get(), w.buf2.Get()) return errors.Wrap(err, "write postings") } @@ -593,11 +593,6 @@ type Reader struct { version int } -var ( - errInvalidSize = fmt.Errorf("invalid size") - errInvalidChecksum = fmt.Errorf("invalid checksum") -) - // ByteSlice abstracts a byte slice. type ByteSlice interface { Len() int @@ -643,7 +638,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { // Verify header. if r.b.Len() < HeaderLen { - return nil, errors.Wrap(errInvalidSize, "index header") + return nil, errors.Wrap(encoding.ErrInvalidSize, "index header") } if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex { return nil, errors.Errorf("invalid magic number %x", m) @@ -724,13 +719,13 @@ func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) { for k, e := range r.postings { for v, start := range e { - d := newDecbufAt(r.b, int(start)) - if d.err() != nil { - return nil, d.err() + d := encoding.NewDecbufAt(r.b, int(start), castagnoliTable) + if d.Err() != nil { + return nil, d.Err() } m[labels.Label{Name: k, Value: v}] = Range{ Start: int64(start) + 4, - End: int64(start) + 4 + int64(d.len()), + End: int64(start) + 4 + int64(d.Len()), } } } @@ -744,13 +739,13 @@ func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]strin if off == 0 { return nil, nil, nil } - d := newDecbufAt(bs, off) + d := encoding.NewDecbufAt(bs, off, castagnoliTable) var ( - origLen = d.len() - cnt = d.be32int() + origLen = d.Len() + cnt = d.Be32int() basePos = uint32(off) + 4 - nextPos = basePos + uint32(origLen-d.len()) + nextPos = basePos + uint32(origLen-d.Len()) symbolSlice []string symbols = map[uint32]string{} ) @@ -758,35 +753,35 @@ func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]strin symbolSlice = make([]string, 0, cnt) } - for d.err() == nil && d.len() > 0 && cnt > 0 { - s := d.uvarintStr() + for d.Err() == nil && d.Len() > 0 && cnt > 0 { + s := d.UvarintStr() if version == FormatV2 { symbolSlice = append(symbolSlice, s) } else { symbols[nextPos] = s - nextPos = basePos + uint32(origLen-d.len()) + nextPos = basePos + uint32(origLen-d.Len()) } cnt-- } - return symbolSlice, symbols, errors.Wrap(d.err(), "read symbols") + return symbolSlice, symbols, errors.Wrap(d.Err(), "read symbols") } // ReadOffsetTable reads an offset table and at the given position calls f for each // found entry. If f returns an error it stops decoding and returns the received error. func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64) error) error { - d := newDecbufAt(bs, int(off)) - cnt := d.be32() + d := encoding.NewDecbufAt(bs, int(off), castagnoliTable) + cnt := d.Be32() - for d.err() == nil && d.len() > 0 && cnt > 0 { - keyCount := d.uvarint() + for d.Err() == nil && d.Len() > 0 && cnt > 0 { + keyCount := d.Uvarint() keys := make([]string, 0, keyCount) for i := 0; i < keyCount; i++ { - keys = append(keys, d.uvarintStr()) + keys = append(keys, d.UvarintStr()) } - o := d.uvarint64() - if d.err() != nil { + o := d.Uvarint64() + if d.Err() != nil { break } if err := f(keys, o); err != nil { @@ -794,7 +789,7 @@ func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64) error) e } cnt-- } - return d.err() + return d.Err() } // Close the reader and its underlying resources. @@ -843,17 +838,17 @@ func (r *Reader) LabelValues(names ...string) (StringTuples, error) { //return nil, fmt.Errorf("label index doesn't exist") } - d := newDecbufAt(r.b, int(off)) + d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable) - nc := d.be32int() - d.be32() // consume unused value entry count. + nc := d.Be32int() + d.Be32() // consume unused value entry count. - if d.err() != nil { - return nil, errors.Wrap(d.err(), "read label value index") + if d.Err() != nil { + return nil, errors.Wrap(d.Err(), "read label value index") } st := &serializedStringTuples{ idsCount: nc, - idsBytes: d.get(), + idsBytes: d.Get(), lookup: r.lookupSymbol, } return st, nil @@ -882,11 +877,11 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err if r.version == FormatV2 { offset = id * 16 } - d := newDecbufUvarintAt(r.b, int(offset)) - if d.err() != nil { - return d.err() + d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable) + if d.Err() != nil { + return d.Err() } - return errors.Wrap(r.dec.Series(d.get(), lbls, chks), "read series") + return errors.Wrap(r.dec.Series(d.Get(), lbls, chks), "read series") } // Postings returns a postings list for the given label pair. @@ -899,11 +894,11 @@ func (r *Reader) Postings(name, value string) (Postings, error) { if !ok { return EmptyPostings(), nil } - d := newDecbufAt(r.b, int(off)) - if d.err() != nil { - return nil, errors.Wrap(d.err(), "get postings entry") + d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable) + if d.Err() != nil { + return nil, errors.Wrap(d.Err(), "get postings entry") } - _, p, err := r.dec.Postings(d.get()) + _, p, err := r.dec.Postings(d.Get()) if err != nil { return nil, errors.Wrap(err, "decode postings") } @@ -952,7 +947,7 @@ type stringTuples struct { func NewStringTuples(entries []string, length int) (*stringTuples, error) { if len(entries)%length != 0 { - return nil, errors.Wrap(errInvalidSize, "string tuple list") + return nil, errors.Wrap(encoding.ErrInvalidSize, "string tuple list") } return &stringTuples{entries: entries, length: length}, nil } @@ -996,7 +991,7 @@ func (t *serializedStringTuples) Len() int { func (t *serializedStringTuples) At(i int) ([]string, error) { if len(t.idsBytes) < (i+t.idsCount)*4 { - return nil, errInvalidSize + return nil, encoding.ErrInvalidSize } res := make([]string, 0, t.idsCount) @@ -1023,10 +1018,10 @@ type Decoder struct { // Postings returns a postings list for b and its number of elements. func (dec *Decoder) Postings(b []byte) (int, Postings, error) { - d := decbuf{b: b} - n := d.be32int() - l := d.get() - return n, newBigEndianPostings(l), d.err() + d := encoding.Decbuf{B: b} + n := d.Be32int() + l := d.Get() + return n, newBigEndianPostings(l), d.Err() } // Series decodes a series entry from the given byte slice into lset and chks. @@ -1034,16 +1029,16 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e *lbls = (*lbls)[:0] *chks = (*chks)[:0] - d := decbuf{b: b} + d := encoding.Decbuf{B: b} - k := d.uvarint() + k := d.Uvarint() for i := 0; i < k; i++ { - lno := uint32(d.uvarint()) - lvo := uint32(d.uvarint()) + lno := uint32(d.Uvarint()) + lvo := uint32(d.Uvarint()) - if d.err() != nil { - return errors.Wrap(d.err(), "read series label offsets") + if d.Err() != nil { + return errors.Wrap(d.Err(), "read series label offsets") } ln, err := dec.LookupSymbol(lno) @@ -1059,15 +1054,15 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e } // Read the chunks meta data. - k = d.uvarint() + k = d.Uvarint() if k == 0 { return nil } - t0 := d.varint64() - maxt := int64(d.uvarint64()) + t0 - ref0 := int64(d.uvarint64()) + t0 := d.Varint64() + maxt := int64(d.Uvarint64()) + t0 + ref0 := int64(d.Uvarint64()) *chks = append(*chks, chunks.Meta{ Ref: uint64(ref0), @@ -1077,14 +1072,14 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e t0 = maxt for i := 1; i < k; i++ { - mint := int64(d.uvarint64()) + t0 - maxt := int64(d.uvarint64()) + mint + mint := int64(d.Uvarint64()) + t0 + maxt := int64(d.Uvarint64()) + mint - ref0 += d.varint64() + ref0 += d.Varint64() t0 = maxt - if d.err() != nil { - return errors.Wrapf(d.err(), "read meta for chunk %d", i) + if d.Err() != nil { + return errors.Wrapf(d.Err(), "read meta for chunk %d", i) } *chks = append(*chks, chunks.Meta{ @@ -1093,5 +1088,5 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e MaxTime: maxt, }) } - return d.err() + return d.Err() } diff --git a/vendor/github.com/prometheus/tsdb/index/postings.go b/vendor/github.com/prometheus/tsdb/index/postings.go index 6212d07b4..cbad5b74d 100644 --- a/vendor/github.com/prometheus/tsdb/index/postings.go +++ b/vendor/github.com/prometheus/tsdb/index/postings.go @@ -14,6 +14,7 @@ package index import ( + "container/heap" "encoding/binary" "runtime" "sort" @@ -365,25 +366,132 @@ func Merge(its ...Postings) Postings { if len(its) == 1 { return its[0] } - // All the uses of this function immediately expand it, so - // collect everything in a map. This is more efficient - // when there's 100ks of postings, compared to - // having a tree of merge objects. - pm := make(map[uint64]struct{}, len(its)) - for _, it := range its { - for it.Next() { - pm[it.At()] = struct{}{} - } - if it.Err() != nil { - return ErrPostings(it.Err()) + return newMergedPostings(its) +} + +type postingsHeap []Postings + +func (h postingsHeap) Len() int { return len(h) } +func (h postingsHeap) Less(i, j int) bool { return h[i].At() < h[j].At() } +func (h *postingsHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] } + +func (h *postingsHeap) Push(x interface{}) { + *h = append(*h, x.(Postings)) +} + +func (h *postingsHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +type mergedPostings struct { + h postingsHeap + initilized bool + heaped bool + cur uint64 + err error +} + +func newMergedPostings(p []Postings) *mergedPostings { + ph := make(postingsHeap, 0, len(p)) + for _, it := range p { + if it.Next() { + ph = append(ph, it) + } else { + if it.Err() != nil { + return &mergedPostings{err: it.Err()} + } } } - pl := make([]uint64, 0, len(pm)) - for p := range pm { - pl = append(pl, p) + return &mergedPostings{h: ph} +} + +func (it *mergedPostings) Next() bool { + if it.h.Len() == 0 || it.err != nil { + return false } - sort.Slice(pl, func(i, j int) bool { return pl[i] < pl[j] }) - return newListPostings(pl) + + if !it.heaped { + heap.Init(&it.h) + it.heaped = true + } + // The user must issue an initial Next. + if !it.initilized { + it.cur = it.h[0].At() + it.initilized = true + return true + } + + for { + cur := it.h[0] + if !cur.Next() { + heap.Pop(&it.h) + if cur.Err() != nil { + it.err = cur.Err() + return false + } + if it.h.Len() == 0 { + return false + } + } else { + // Value of top of heap has changed, re-heapify. + heap.Fix(&it.h, 0) + } + + if it.h[0].At() != it.cur { + it.cur = it.h[0].At() + return true + } + } +} + +func (it *mergedPostings) Seek(id uint64) bool { + if it.h.Len() == 0 || it.err != nil { + return false + } + if !it.initilized { + if !it.Next() { + return false + } + } + if it.cur >= id { + return true + } + // Heapifying when there is lots of Seeks is inefficient, + // mark to be re-heapified on the Next() call. + it.heaped = false + newH := make(postingsHeap, 0, len(it.h)) + lowest := ^uint64(0) + for _, i := range it.h { + if i.Seek(id) { + newH = append(newH, i) + if i.At() < lowest { + lowest = i.At() + } + } else { + if i.Err() != nil { + it.err = i.Err() + return false + } + } + } + it.h = newH + if len(it.h) == 0 { + return false + } + it.cur = lowest + return true +} + +func (it mergedPostings) At() uint64 { + return it.cur +} + +func (it mergedPostings) Err() error { + return it.err } // Without returns a new postings list that contains all elements from the full list that @@ -498,6 +606,9 @@ func (it *listPostings) Seek(x uint64) bool { if it.cur >= x { return true } + if len(it.list) == 0 { + return false + } // Do binary search between current position and end. i := sort.Search(len(it.list), func(i int) bool { diff --git a/vendor/github.com/prometheus/tsdb/querier.go b/vendor/github.com/prometheus/tsdb/querier.go index 61503d672..3e8cd77ca 100644 --- a/vendor/github.com/prometheus/tsdb/querier.go +++ b/vendor/github.com/prometheus/tsdb/querier.go @@ -354,11 +354,23 @@ func postingsForUnsetLabelMatcher(ix IndexReader, m labels.Matcher) (index.Posti rit = append(rit, it) } + merged := index.Merge(rit...) + // With many many postings, it's best to pre-calculate + // the merged list via next rather than have a ton of seeks + // in Without/Intersection. + if len(rit) > 100 { + pl, err := index.ExpandPostings(merged) + if err != nil { + return nil, err + } + merged = index.NewListPostings(pl) + } + allPostings, err := ix.Postings(index.AllPostingsKey()) if err != nil { return nil, err } - return index.Without(allPostings, index.Merge(rit...)), nil + return index.Without(allPostings, merged), nil } func mergeStrings(a, b []string) []string { diff --git a/vendor/github.com/prometheus/tsdb/record.go b/vendor/github.com/prometheus/tsdb/record.go index 364e8144d..8d9c5751d 100644 --- a/vendor/github.com/prometheus/tsdb/record.go +++ b/vendor/github.com/prometheus/tsdb/record.go @@ -19,6 +19,7 @@ import ( "sort" "github.com/pkg/errors" + "github.com/prometheus/tsdb/encoding" "github.com/prometheus/tsdb/labels" ) @@ -56,19 +57,19 @@ func (d *RecordDecoder) Type(rec []byte) RecordType { // Series appends series in rec to the given slice. func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) { - dec := decbuf{b: rec} + dec := encoding.Decbuf{B: rec} - if RecordType(dec.byte()) != RecordSeries { + if RecordType(dec.Byte()) != RecordSeries { return nil, errors.New("invalid record type") } - for len(dec.b) > 0 && dec.err() == nil { - ref := dec.be64() + for len(dec.B) > 0 && dec.Err() == nil { + ref := dec.Be64() - lset := make(labels.Labels, dec.uvarint()) + lset := make(labels.Labels, dec.Uvarint()) for i := range lset { - lset[i].Name = dec.uvarintStr() - lset[i].Value = dec.uvarintStr() + lset[i].Name = dec.UvarintStr() + lset[i].Value = dec.UvarintStr() } sort.Sort(lset) @@ -77,33 +78,33 @@ func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, err Labels: lset, }) } - if dec.err() != nil { - return nil, dec.err() + if dec.Err() != nil { + return nil, dec.Err() } - if len(dec.b) > 0 { - return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + if len(dec.B) > 0 { + return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B)) } return series, nil } // Samples appends samples in rec to the given slice. func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) { - dec := decbuf{b: rec} + dec := encoding.Decbuf{B: rec} - if RecordType(dec.byte()) != RecordSamples { + if RecordType(dec.Byte()) != RecordSamples { return nil, errors.New("invalid record type") } - if dec.len() == 0 { + if dec.Len() == 0 { return samples, nil } var ( - baseRef = dec.be64() - baseTime = dec.be64int64() + baseRef = dec.Be64() + baseTime = dec.Be64int64() ) - for len(dec.b) > 0 && dec.err() == nil { - dref := dec.varint64() - dtime := dec.varint64() - val := dec.be64() + for len(dec.B) > 0 && dec.Err() == nil { + dref := dec.Varint64() + dtime := dec.Varint64() + val := dec.Be64() samples = append(samples, RefSample{ Ref: uint64(int64(baseRef) + dref), @@ -112,35 +113,35 @@ func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, e }) } - if dec.err() != nil { - return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(samples)) + if dec.Err() != nil { + return nil, errors.Wrapf(dec.Err(), "decode error after %d samples", len(samples)) } - if len(dec.b) > 0 { - return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + if len(dec.B) > 0 { + return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B)) } return samples, nil } // Tombstones appends tombstones in rec to the given slice. func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) { - dec := decbuf{b: rec} + dec := encoding.Decbuf{B: rec} - if RecordType(dec.byte()) != RecordTombstones { + if RecordType(dec.Byte()) != RecordTombstones { return nil, errors.New("invalid record type") } - for dec.len() > 0 && dec.err() == nil { + for dec.Len() > 0 && dec.Err() == nil { tstones = append(tstones, Stone{ - ref: dec.be64(), + ref: dec.Be64(), intervals: Intervals{ - {Mint: dec.varint64(), Maxt: dec.varint64()}, + {Mint: dec.Varint64(), Maxt: dec.Varint64()}, }, }) } - if dec.err() != nil { - return nil, dec.err() + if dec.Err() != nil { + return nil, dec.Err() } - if len(dec.b) > 0 { - return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + if len(dec.B) > 0 { + return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B)) } return tstones, nil } @@ -152,56 +153,56 @@ type RecordEncoder struct { // Series appends the encoded series to b and returns the resulting slice. func (e *RecordEncoder) Series(series []RefSeries, b []byte) []byte { - buf := encbuf{b: b} - buf.putByte(byte(RecordSeries)) + buf := encoding.Encbuf{B: b} + buf.PutByte(byte(RecordSeries)) for _, s := range series { - buf.putBE64(s.Ref) - buf.putUvarint(len(s.Labels)) + buf.PutBE64(s.Ref) + buf.PutUvarint(len(s.Labels)) for _, l := range s.Labels { - buf.putUvarintStr(l.Name) - buf.putUvarintStr(l.Value) + buf.PutUvarintStr(l.Name) + buf.PutUvarintStr(l.Value) } } - return buf.get() + return buf.Get() } // Samples appends the encoded samples to b and returns the resulting slice. func (e *RecordEncoder) Samples(samples []RefSample, b []byte) []byte { - buf := encbuf{b: b} - buf.putByte(byte(RecordSamples)) + buf := encoding.Encbuf{B: b} + buf.PutByte(byte(RecordSamples)) if len(samples) == 0 { - return buf.get() + return buf.Get() } // Store base timestamp and base reference number of first sample. // All samples encode their timestamp and ref as delta to those. first := samples[0] - buf.putBE64(first.Ref) - buf.putBE64int64(first.T) + buf.PutBE64(first.Ref) + buf.PutBE64int64(first.T) for _, s := range samples { - buf.putVarint64(int64(s.Ref) - int64(first.Ref)) - buf.putVarint64(s.T - first.T) - buf.putBE64(math.Float64bits(s.V)) + buf.PutVarint64(int64(s.Ref) - int64(first.Ref)) + buf.PutVarint64(s.T - first.T) + buf.PutBE64(math.Float64bits(s.V)) } - return buf.get() + return buf.Get() } // Tombstones appends the encoded tombstones to b and returns the resulting slice. func (e *RecordEncoder) Tombstones(tstones []Stone, b []byte) []byte { - buf := encbuf{b: b} - buf.putByte(byte(RecordTombstones)) + buf := encoding.Encbuf{B: b} + buf.PutByte(byte(RecordTombstones)) for _, s := range tstones { for _, iv := range s.intervals { - buf.putBE64(s.ref) - buf.putVarint64(iv.Mint) - buf.putVarint64(iv.Maxt) + buf.PutBE64(s.ref) + buf.PutVarint64(iv.Mint) + buf.PutVarint64(iv.Maxt) } } - return buf.get() + return buf.Get() } diff --git a/vendor/github.com/prometheus/tsdb/tombstones.go b/vendor/github.com/prometheus/tsdb/tombstones.go index 078140406..4c36c7b2a 100644 --- a/vendor/github.com/prometheus/tsdb/tombstones.go +++ b/vendor/github.com/prometheus/tsdb/tombstones.go @@ -23,6 +23,7 @@ import ( "sync" "github.com/pkg/errors" + "github.com/prometheus/tsdb/encoding" ) const tombstoneFilename = "tombstones" @@ -64,12 +65,12 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error { } }() - buf := encbuf{b: make([]byte, 3*binary.MaxVarintLen64)} - buf.reset() + buf := encoding.Encbuf{B: make([]byte, 3*binary.MaxVarintLen64)} + buf.Reset() // Write the meta. - buf.putBE32(MagicTombstone) - buf.putByte(tombstoneFormatV1) - _, err = f.Write(buf.get()) + buf.PutBE32(MagicTombstone) + buf.PutByte(tombstoneFormatV1) + _, err = f.Write(buf.Get()) if err != nil { return err } @@ -78,13 +79,13 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error { if err := tr.Iter(func(ref uint64, ivs Intervals) error { for _, iv := range ivs { - buf.reset() + buf.Reset() - buf.putUvarint64(ref) - buf.putVarint64(iv.Mint) - buf.putVarint64(iv.Maxt) + buf.PutUvarint64(ref) + buf.PutVarint64(iv.Mint) + buf.PutVarint64(iv.Maxt) - _, err = mw.Write(buf.get()) + _, err = mw.Write(buf.Get()) if err != nil { return err } @@ -126,24 +127,24 @@ func readTombstones(dir string) (TombstoneReader, SizeReader, error) { } if len(b) < 5 { - return nil, sr, errors.Wrap(errInvalidSize, "tombstones header") + return nil, sr, errors.Wrap(encoding.ErrInvalidSize, "tombstones header") } - d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum. - if mg := d.be32(); mg != MagicTombstone { + d := &encoding.Decbuf{B: b[:len(b)-4]} // 4 for the checksum. + if mg := d.Be32(); mg != MagicTombstone { return nil, sr, fmt.Errorf("invalid magic number %x", mg) } - if flag := d.byte(); flag != tombstoneFormatV1 { + if flag := d.Byte(); flag != tombstoneFormatV1 { return nil, sr, fmt.Errorf("invalid tombstone format %x", flag) } - if d.err() != nil { - return nil, sr, d.err() + if d.Err() != nil { + return nil, sr, d.Err() } // Verify checksum. hash := newCRC32() - if _, err := hash.Write(d.get()); err != nil { + if _, err := hash.Write(d.Get()); err != nil { return nil, sr, errors.Wrap(err, "write to hash") } if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() { @@ -152,12 +153,12 @@ func readTombstones(dir string) (TombstoneReader, SizeReader, error) { stonesMap := newMemTombstones() - for d.len() > 0 { - k := d.uvarint64() - mint := d.varint64() - maxt := d.varint64() - if d.err() != nil { - return nil, sr, d.err() + for d.Len() > 0 { + k := d.Uvarint64() + mint := d.Varint64() + maxt := d.Varint64() + if d.Err() != nil { + return nil, sr, d.Err() } stonesMap.addInterval(k, Interval{mint, maxt}) diff --git a/vendor/github.com/prometheus/tsdb/wal.go b/vendor/github.com/prometheus/tsdb/wal.go index 60e1c5807..d7ffe0c1e 100644 --- a/vendor/github.com/prometheus/tsdb/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal.go @@ -31,6 +31,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/encoding" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/wal" @@ -287,16 +288,16 @@ func (w *SegmentWAL) Reader() WALReader { } } -func (w *SegmentWAL) getBuffer() *encbuf { +func (w *SegmentWAL) getBuffer() *encoding.Encbuf { b := w.buffers.Get() if b == nil { - return &encbuf{b: make([]byte, 0, 64*1024)} + return &encoding.Encbuf{B: make([]byte, 0, 64*1024)} } - return b.(*encbuf) + return b.(*encoding.Encbuf) } -func (w *SegmentWAL) putBuffer(b *encbuf) { - b.reset() +func (w *SegmentWAL) putBuffer(b *encoding.Encbuf) { + b.Reset() w.buffers.Put(b) } @@ -366,7 +367,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { buf := w.getBuffer() flag = w.encodeSeries(buf, activeSeries) - _, err = w.writeTo(csf, crc32, WALEntrySeries, flag, buf.get()) + _, err = w.writeTo(csf, crc32, WALEntrySeries, flag, buf.Get()) w.putBuffer(buf) if err != nil { @@ -427,7 +428,7 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error { w.mtx.Lock() defer w.mtx.Unlock() - err := w.write(WALEntrySeries, flag, buf.get()) + err := w.write(WALEntrySeries, flag, buf.Get()) w.putBuffer(buf) @@ -454,7 +455,7 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error { w.mtx.Lock() defer w.mtx.Unlock() - err := w.write(WALEntrySamples, flag, buf.get()) + err := w.write(WALEntrySamples, flag, buf.Get()) w.putBuffer(buf) @@ -480,7 +481,7 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error { w.mtx.Lock() defer w.mtx.Unlock() - err := w.write(WALEntryDeletes, flag, buf.get()) + err := w.write(WALEntryDeletes, flag, buf.Get()) w.putBuffer(buf) @@ -783,20 +784,20 @@ const ( walDeletesSimple = 1 ) -func (w *SegmentWAL) encodeSeries(buf *encbuf, series []RefSeries) uint8 { +func (w *SegmentWAL) encodeSeries(buf *encoding.Encbuf, series []RefSeries) uint8 { for _, s := range series { - buf.putBE64(s.Ref) - buf.putUvarint(len(s.Labels)) + buf.PutBE64(s.Ref) + buf.PutUvarint(len(s.Labels)) for _, l := range s.Labels { - buf.putUvarintStr(l.Name) - buf.putUvarintStr(l.Value) + buf.PutUvarintStr(l.Name) + buf.PutUvarintStr(l.Value) } } return walSeriesSimple } -func (w *SegmentWAL) encodeSamples(buf *encbuf, samples []RefSample) uint8 { +func (w *SegmentWAL) encodeSamples(buf *encoding.Encbuf, samples []RefSample) uint8 { if len(samples) == 0 { return walSamplesSimple } @@ -806,23 +807,23 @@ func (w *SegmentWAL) encodeSamples(buf *encbuf, samples []RefSample) uint8 { // TODO(fabxc): optimize for all samples having the same timestamp. first := samples[0] - buf.putBE64(first.Ref) - buf.putBE64int64(first.T) + buf.PutBE64(first.Ref) + buf.PutBE64int64(first.T) for _, s := range samples { - buf.putVarint64(int64(s.Ref) - int64(first.Ref)) - buf.putVarint64(s.T - first.T) - buf.putBE64(math.Float64bits(s.V)) + buf.PutVarint64(int64(s.Ref) - int64(first.Ref)) + buf.PutVarint64(s.T - first.T) + buf.PutBE64(math.Float64bits(s.V)) } return walSamplesSimple } -func (w *SegmentWAL) encodeDeletes(buf *encbuf, stones []Stone) uint8 { +func (w *SegmentWAL) encodeDeletes(buf *encoding.Encbuf, stones []Stone) uint8 { for _, s := range stones { for _, iv := range s.intervals { - buf.putBE64(s.ref) - buf.putVarint64(iv.Mint) - buf.putVarint64(iv.Maxt) + buf.PutBE64(s.ref) + buf.PutVarint64(iv.Mint) + buf.PutVarint64(iv.Maxt) } } return walDeletesSimple @@ -1115,16 +1116,16 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { } func (r *walReader) decodeSeries(flag byte, b []byte, res *[]RefSeries) error { - dec := decbuf{b: b} + dec := encoding.Decbuf{B: b} - for len(dec.b) > 0 && dec.err() == nil { - ref := dec.be64() + for len(dec.B) > 0 && dec.Err() == nil { + ref := dec.Be64() - lset := make(labels.Labels, dec.uvarint()) + lset := make(labels.Labels, dec.Uvarint()) for i := range lset { - lset[i].Name = dec.uvarintStr() - lset[i].Value = dec.uvarintStr() + lset[i].Name = dec.UvarintStr() + lset[i].Value = dec.UvarintStr() } sort.Sort(lset) @@ -1133,11 +1134,11 @@ func (r *walReader) decodeSeries(flag byte, b []byte, res *[]RefSeries) error { Labels: lset, }) } - if dec.err() != nil { - return dec.err() + if dec.Err() != nil { + return dec.Err() } - if len(dec.b) > 0 { - return errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + if len(dec.B) > 0 { + return errors.Errorf("unexpected %d bytes left in entry", len(dec.B)) } return nil } @@ -1146,17 +1147,17 @@ func (r *walReader) decodeSamples(flag byte, b []byte, res *[]RefSample) error { if len(b) == 0 { return nil } - dec := decbuf{b: b} + dec := encoding.Decbuf{B: b} var ( - baseRef = dec.be64() - baseTime = dec.be64int64() + baseRef = dec.Be64() + baseTime = dec.Be64int64() ) - for len(dec.b) > 0 && dec.err() == nil { - dref := dec.varint64() - dtime := dec.varint64() - val := dec.be64() + for len(dec.B) > 0 && dec.Err() == nil { + dref := dec.Varint64() + dtime := dec.Varint64() + val := dec.Be64() *res = append(*res, RefSample{ Ref: uint64(int64(baseRef) + dref), @@ -1165,31 +1166,31 @@ func (r *walReader) decodeSamples(flag byte, b []byte, res *[]RefSample) error { }) } - if dec.err() != nil { - return errors.Wrapf(dec.err(), "decode error after %d samples", len(*res)) + if dec.Err() != nil { + return errors.Wrapf(dec.Err(), "decode error after %d samples", len(*res)) } - if len(dec.b) > 0 { - return errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + if len(dec.B) > 0 { + return errors.Errorf("unexpected %d bytes left in entry", len(dec.B)) } return nil } func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { - dec := &decbuf{b: b} + dec := &encoding.Decbuf{B: b} - for dec.len() > 0 && dec.err() == nil { + for dec.Len() > 0 && dec.Err() == nil { *res = append(*res, Stone{ - ref: dec.be64(), + ref: dec.Be64(), intervals: Intervals{ - {Mint: dec.varint64(), Maxt: dec.varint64()}, + {Mint: dec.Varint64(), Maxt: dec.Varint64()}, }, }) } - if dec.err() != nil { - return dec.err() + if dec.Err() != nil { + return dec.Err() } - if len(dec.b) > 0 { - return errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + if len(dec.B) > 0 { + return errors.Errorf("unexpected %d bytes left in entry", len(dec.B)) } return nil } diff --git a/vendor/github.com/prometheus/tsdb/wal/wal.go b/vendor/github.com/prometheus/tsdb/wal/wal.go index dd0be9cd8..b37032604 100644 --- a/vendor/github.com/prometheus/tsdb/wal/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal/wal.go @@ -228,19 +228,23 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi } // Fresh dir, no segments yet. if j == -1 { - if w.segment, err = CreateSegment(w.dir, 0); err != nil { - return nil, err - } - } else { - if w.segment, err = OpenWriteSegment(logger, w.dir, j); err != nil { - return nil, err - } - // Correctly initialize donePages. - stat, err := w.segment.Stat() + segment, err := CreateSegment(w.dir, 0) if err != nil { return nil, err } - w.donePages = int(stat.Size() / pageSize) + + if err := w.setSegment(segment); err != nil { + return nil, err + } + } else { + segment, err := OpenWriteSegment(logger, w.dir, j) + if err != nil { + return nil, err + } + + if err := w.setSegment(segment); err != nil { + return nil, err + } } go w.run() @@ -331,7 +335,9 @@ func (w *WAL) Repair(origErr error) error { if err != nil { return err } - w.segment = s + if err := w.setSegment(s); err != nil { + return err + } f, err := os.Open(tmpfn) if err != nil { @@ -382,8 +388,9 @@ func (w *WAL) nextSegment() error { return errors.Wrap(err, "create new segment file") } prev := w.segment - w.segment = next - w.donePages = 0 + if err := w.setSegment(next); err != nil { + return err + } // Don't block further writes by fsyncing the last segment. w.actorc <- func() { @@ -397,6 +404,19 @@ func (w *WAL) nextSegment() error { return nil } +func (w *WAL) setSegment(segment *Segment) error { + w.segment = segment + + // Correctly initialize donePages. + stat, err := segment.Stat() + if err != nil { + return err + } + w.donePages = int(stat.Size() / pageSize) + + return nil +} + // flushPage writes the new contents of the page to disk. If no more records will fit into // the page, the remaining bytes will be set to zero and a new page will be started. // If clear is true, this is enforced regardless of how many bytes are left in the page. diff --git a/vendor/modules.txt b/vendor/modules.txt index e307a2e02..f67564bbe 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -242,13 +242,14 @@ github.com/prometheus/procfs github.com/prometheus/procfs/nfs github.com/prometheus/procfs/xfs github.com/prometheus/procfs/internal/util -# github.com/prometheus/tsdb v0.4.1-0.20190219143357-77d5a7d47a52 +# github.com/prometheus/tsdb v0.6.0 github.com/prometheus/tsdb github.com/prometheus/tsdb/fileutil github.com/prometheus/tsdb/wal github.com/prometheus/tsdb/labels github.com/prometheus/tsdb/chunkenc github.com/prometheus/tsdb/chunks +github.com/prometheus/tsdb/encoding github.com/prometheus/tsdb/index # github.com/samuel/go-zookeeper v0.0.0-20161028232340-1d7be4effb13 github.com/samuel/go-zookeeper/zk