diff --git a/pkg/histogram/sparse_histogram.go b/pkg/histogram/sparse_histogram.go index 0e4df9372..5092760a1 100644 --- a/pkg/histogram/sparse_histogram.go +++ b/pkg/histogram/sparse_histogram.go @@ -25,3 +25,26 @@ type Span struct { Offset int32 Length uint32 } + +func (s SparseHistogram) Copy() SparseHistogram { + c := s + + if s.PositiveSpans != nil { + c.PositiveSpans = make([]Span, len(s.PositiveSpans)) + copy(c.PositiveSpans, s.PositiveSpans) + } + if s.NegativeSpans != nil { + c.NegativeSpans = make([]Span, len(s.NegativeSpans)) + copy(c.NegativeSpans, s.NegativeSpans) + } + if s.PositiveBuckets != nil { + c.PositiveBuckets = make([]int64, len(s.PositiveBuckets)) + copy(c.PositiveBuckets, s.PositiveBuckets) + } + if s.NegativeBuckets != nil { + c.NegativeBuckets = make([]int64, len(s.NegativeBuckets)) + copy(c.NegativeBuckets, s.NegativeBuckets) + } + + return c +} diff --git a/tsdb/chunkenc/histo.go b/tsdb/chunkenc/histo.go index a043f4caf..fcf5cdb83 100644 --- a/tsdb/chunkenc/histo.go +++ b/tsdb/chunkenc/histo.go @@ -75,10 +75,6 @@ const () type HistoChunk struct { b bstream - - // "metadata" describing all the data within this chunk - schema int32 - posSpans, negSpans []histogram.Span } // NewHistoChunk returns a new chunk with Histo encoding of the given size. @@ -102,6 +98,16 @@ func (c *HistoChunk) NumSamples() int { return int(binary.BigEndian.Uint16(c.Bytes())) } +// Meta returns the histogram metadata. +// callers may only call this on chunks that have at least one sample +func (c *HistoChunk) Meta() (int32, []histogram.Span, []histogram.Span, error) { + if c.NumSamples() == 0 { + panic("HistoChunk.Meta() called on an empty chunk") + } + b := newBReader(c.Bytes()[2:]) + return readHistoChunkMeta(&b) +} + func (c *HistoChunk) Compact() { if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold { buf := make([]byte, l) @@ -124,13 +130,11 @@ func (c *HistoChunk) Appender() (Appender, error) { } a := &histoAppender{ - c: c, b: &c.b, - schema: c.schema, - posSpans: c.posSpans, - negSpans: c.negSpans, - + schema: it.schema, + posSpans: it.posSpans, + negSpans: it.negSpans, t: it.t, cnt: it.cnt, zcnt: it.zcnt, @@ -154,8 +158,16 @@ func (c *HistoChunk) Appender() (Appender, error) { return a, nil } -// TODO fix this +func countSpans(spans []histogram.Span) int { + var cnt int + for _, s := range spans { + cnt += int(s.Length) + } + return cnt +} + func (c *HistoChunk) iterator(it Iterator) *histoIterator { + // TODO fix this. this is taken from xor.go // Should iterators guarantee to act on a copy of the data so it doesn't lock append? // When using striped locks to guard access to chunks, probably yes. // Could only copy data if the chunk is not completed yet. @@ -164,29 +176,12 @@ func (c *HistoChunk) iterator(it Iterator) *histoIterator { // return histoIter //} - var numPosBuckets, numNegBuckets int - for _, s := range c.posSpans { - numPosBuckets += int(s.Length) - } - for _, s := range c.negSpans { - numNegBuckets += int(s.Length) - } - return &histoIterator{ // The first 2 bytes contain chunk headers. // We skip that for actual samples. br: newBReader(c.b.bytes()[2:]), numTotal: binary.BigEndian.Uint16(c.b.bytes()), t: math.MinInt64, - - schema: c.schema, - posSpans: c.posSpans, - negSpans: c.negSpans, - - posbuckets: make([]int64, numPosBuckets), - negbuckets: make([]int64, numNegBuckets), - posbucketsDelta: make([]int64, numPosBuckets), - negbucketsDelta: make([]int64, numNegBuckets), } } @@ -196,8 +191,6 @@ func (c *HistoChunk) Iterator(it Iterator) Iterator { } type histoAppender struct { - c *HistoChunk // this is such that during the first append we can set the metadata on the chunk. not sure if that's how it should work - b *bstream // Meta @@ -233,27 +226,6 @@ func putUvarint(b *bstream, buf []byte, x uint64) { } } -// we use this for millisec timestamps and all counts -// for now this is copied from xor.go - we will probably want to be more conservative (use fewer bits for small values) - can be tweaked later -func putDod(b *bstream, dod int64) { - switch { - case dod == 0: - b.writeBit(zero) - case bitRange(dod, 14): - b.writeBits(0x02, 2) // '10' - b.writeBits(uint64(dod), 14) - case bitRange(dod, 17): - b.writeBits(0x06, 3) // '110' - b.writeBits(uint64(dod), 17) - case bitRange(dod, 20): - b.writeBits(0x0e, 4) // '1110' - b.writeBits(uint64(dod), 20) - default: - b.writeBits(0x0f, 4) // '1111' - b.writeBits(uint64(dod), 64) - } -} - func (a *histoAppender) Append(int64, float64) { panic("cannot call histoAppender.Append().") } @@ -265,14 +237,19 @@ func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { num := binary.BigEndian.Uint16(a.b.bytes()) if num == 0 { - // the first append gets the privilege to dictate the metadata, on both the appender and the chunk - // TODO we should probably not reach back into the chunk here. should metadata be set when we create the chunk? - a.c.schema = h.Schema - a.c.posSpans, a.c.negSpans = h.PositiveSpans, h.NegativeSpans + // the first append gets the privilege to dictate the metadata + // but it's also responsible for encoding it into the chunk! + writeHistoChunkMeta(a.b, h.Schema, h.PositiveSpans, h.NegativeSpans) a.schema = h.Schema a.posSpans, a.negSpans = h.PositiveSpans, h.NegativeSpans + numPosBuckets, numNegBuckets := countSpans(h.PositiveSpans), countSpans(h.NegativeSpans) + a.posbuckets = make([]int64, numPosBuckets) + a.negbuckets = make([]int64, numNegBuckets) + a.posbucketsDelta = make([]int64, numPosBuckets) + a.negbucketsDelta = make([]int64, numNegBuckets) + // now store actual data putVarint(a.b, a.buf64, t) putUvarint(a.b, a.buf64, h.Count) putUvarint(a.b, a.buf64, h.ZeroCount) @@ -313,22 +290,22 @@ func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { cntDod := cntDelta - a.cntDelta zcntDod := zcntDelta - a.zcntDelta - putDod(a.b, tDod) - putDod(a.b, cntDod) - putDod(a.b, zcntDod) + putInt64VBBucket(a.b, tDod) + putInt64VBBucket(a.b, cntDod) + putInt64VBBucket(a.b, zcntDod) a.writeSumDelta(h.Sum) for i, buck := range h.PositiveBuckets { delta := buck - a.posbuckets[i] dod := delta - a.posbucketsDelta[i] - putDod(a.b, dod) + putInt64VBBucket(a.b, dod) a.posbucketsDelta[i] = delta } for i, buck := range h.NegativeBuckets { delta := buck - a.negbuckets[i] dod := delta - a.negbucketsDelta[i] - putDod(a.b, dod) + putInt64VBBucket(a.b, dod) a.negbucketsDelta[i] = delta } } @@ -475,6 +452,23 @@ func (it *histoIterator) Next() bool { } if it.numRead == 0 { + + // first read is responsible for reading chunk metadata and initializing fields that depend on it + schema, posSpans, negSpans, err := readHistoChunkMeta(&it.br) + if err != nil { + it.err = err + return false + } + it.schema = schema + it.posSpans, it.negSpans = posSpans, negSpans + numPosBuckets, numNegBuckets := countSpans(posSpans), countSpans(negSpans) + it.posbuckets = make([]int64, numPosBuckets) + it.negbuckets = make([]int64, numNegBuckets) + it.posbucketsDelta = make([]int64, numPosBuckets) + it.negbucketsDelta = make([]int64, numNegBuckets) + + // now read actual data + t, err := binary.ReadVarint(&it.br) if err != nil { it.err = err @@ -577,45 +571,50 @@ func (it *histoIterator) Next() bool { return true } - tDod, ok := it.readDod() - if !ok { - return ok + tDod, err := readInt64VBBucket(&it.br) + if err != nil { + it.err = err + return false } it.tDelta = it.tDelta + tDod it.t += it.tDelta - cntDod, ok := it.readDod() - if !ok { - return ok + cntDod, err := readInt64VBBucket(&it.br) + if err != nil { + it.err = err + return false } it.cntDelta = it.cntDelta + cntDod it.cnt = uint64(int64(it.cnt) + it.cntDelta) - zcntDod, ok := it.readDod() - if !ok { - return ok + zcntDod, err := readInt64VBBucket(&it.br) + if err != nil { + it.err = err + return false } it.zcntDelta = it.zcntDelta + zcntDod it.zcnt = uint64(int64(it.zcnt) + it.zcntDelta) - ok = it.readSum() + ok := it.readSum() if !ok { return false } for i := range it.posbuckets { - dod, ok := it.readDod() - if !ok { - return ok + dod, err := readInt64VBBucket(&it.br) + if err != nil { + it.err = err + return false } it.posbucketsDelta[i] = it.posbucketsDelta[i] + dod it.posbuckets[i] = it.posbuckets[i] + it.posbucketsDelta[i] } for i := range it.negbuckets { - dod, ok := it.readDod() - if !ok { - return ok + dod, err := readInt64VBBucket(&it.br) + if err != nil { + it.err = err + return false } it.negbucketsDelta[i] = it.negbucketsDelta[i] + dod it.negbuckets[i] = it.negbuckets[i] + it.negbucketsDelta[i] @@ -624,66 +623,6 @@ func (it *histoIterator) Next() bool { return true } -func (it *histoIterator) readDod() (int64, bool) { - var d byte - // read delta-of-delta - for i := 0; i < 4; i++ { - d <<= 1 - bit, err := it.br.readBitFast() - if err != nil { - bit, err = it.br.readBit() - } - if err != nil { - it.err = err - return 0, false - } - if bit == zero { - break - } - d |= 1 - } - - var sz uint8 - var dod int64 - switch d { - case 0x00: - // dod == 0 - case 0x02: - sz = 14 - case 0x06: - sz = 17 - case 0x0e: - sz = 20 - case 0x0f: - // Do not use fast because it's very unlikely it will succeed. - bits, err := it.br.readBits(64) - if err != nil { - it.err = err - return 0, false - } - - dod = int64(bits) - } - - if sz != 0 { - bits, err := it.br.readBitsFast(sz) - if err != nil { - bits, err = it.br.readBits(sz) - } - if err != nil { - it.err = err - return 0, false - } - if bits > (1 << (sz - 1)) { - // or something - bits = bits - (1 << sz) - } - dod = int64(bits) - } - - return dod, true -} - func (it *histoIterator) readSum() bool { bit, err := it.br.readBitFast() if err != nil { diff --git a/tsdb/chunkenc/histo_meta.go b/tsdb/chunkenc/histo_meta.go new file mode 100644 index 000000000..0b5877448 --- /dev/null +++ b/tsdb/chunkenc/histo_meta.go @@ -0,0 +1,82 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The code in this file was largely written by Damian Gryski as part of +// https://github.com/dgryski/go-tsz and published under the license below. +// It was modified to accommodate reading from byte slices without modifying +// the underlying bytes, which would panic when reading from mmap'd +// read-only byte slices. +package chunkenc + +import "github.com/prometheus/prometheus/pkg/histogram" + +func writeHistoChunkMeta(b *bstream, schema int32, posSpans, negSpans []histogram.Span) { + putInt64VBBucket(b, int64(schema)) + putHistoChunkMetaSpans(b, posSpans) + putHistoChunkMetaSpans(b, negSpans) +} + +func putHistoChunkMetaSpans(b *bstream, spans []histogram.Span) { + putInt64VBBucket(b, int64(len(spans))) + for _, s := range spans { + putInt64VBBucket(b, int64(s.Length)) + putInt64VBBucket(b, int64(s.Offset)) + } +} + +func readHistoChunkMeta(b *bstreamReader) (int32, []histogram.Span, []histogram.Span, error) { + + v, err := readInt64VBBucket(b) + if err != nil { + return 0, nil, nil, err + } + schema := int32(v) + + posSpans, err := readHistoChunkMetaSpans(b) + if err != nil { + return 0, nil, nil, err + } + + negSpans, err := readHistoChunkMetaSpans(b) + if err != nil { + return 0, nil, nil, err + } + + return schema, posSpans, negSpans, nil +} + +func readHistoChunkMetaSpans(b *bstreamReader) ([]histogram.Span, error) { + var spans []histogram.Span + num, err := readInt64VBBucket(b) + if err != nil { + return nil, err + } + for i := 0; i < int(num); i++ { + + length, err := readInt64VBBucket(b) + if err != nil { + return nil, err + } + + offset, err := readInt64VBBucket(b) + if err != nil { + return nil, err + } + + spans = append(spans, histogram.Span{ + Length: uint32(length), + Offset: int32(offset), + }) + } + return spans, nil +} diff --git a/tsdb/chunkenc/histo_test.go b/tsdb/chunkenc/histo_test.go index de534b0b6..36d429a11 100644 --- a/tsdb/chunkenc/histo_test.go +++ b/tsdb/chunkenc/histo_test.go @@ -47,8 +47,8 @@ func TestHistoChunkSameBuckets(t *testing.T) { {Offset: 0, Length: 2}, {Offset: 1, Length: 2}, }, - NegativeSpans: []histogram.Span{}, - PositiveBuckets: []int64{1, 1, -1, 0}, + PositiveBuckets: []int64{1, 1, -1, 0}, // counts: 1, 2, 1, 1 (total 5) + NegativeSpans: nil, NegativeBuckets: []int64{}, } @@ -59,17 +59,35 @@ func TestHistoChunkSameBuckets(t *testing.T) { {t: ts, h: h}, } - // TODO add an update - // h.Count = 9 - // h.Sum = 61 + // add an updated histogram - // TODO add update with new appender - // Start with a new appender every 10th sample. This emulates starting - // appending to a partially filled chunk. - // app, err = c.Appender() - // require.NoError(t, err) + ts += 16 + h.Count += 9 + h.ZeroCount++ + h.Sum = 24.4 + h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14) - // app.Append(ts, v) + app.AppendHistogram(ts, h) + exp = append(exp, res{t: ts, h: h}) + + require.Equal(t, c.NumSamples(), 2) + + // add update with new appender + + app, err = c.Appender() + require.NoError(t, err) + require.Equal(t, c.NumSamples(), 2) + + ts += 14 + h.Count += 13 + h.ZeroCount += 2 + h.Sum = 24.4 + h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27) + + app.AppendHistogram(ts, h) + exp = append(exp, res{t: ts, h: h}) + + require.Equal(t, c.NumSamples(), 3) // 1. Expand iterator in simple case. it1 := c.iterator(nil) @@ -77,7 +95,7 @@ func TestHistoChunkSameBuckets(t *testing.T) { var res1 []res for it1.Next() { ts, h := it1.AtHistogram() - res1 = append(res1, res{t: ts, h: h}) + res1 = append(res1, res{t: ts, h: h.Copy()}) } require.NoError(t, it1.Err()) require.Equal(t, exp, res1) diff --git a/tsdb/chunkenc/varbit_buckets.go b/tsdb/chunkenc/varbit_buckets.go new file mode 100644 index 000000000..289061523 --- /dev/null +++ b/tsdb/chunkenc/varbit_buckets.go @@ -0,0 +1,128 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The code in this file was largely written by Damian Gryski as part of +// https://github.com/dgryski/go-tsz and published under the license below. +// It was modified to accommodate reading from byte slices without modifying +// the underlying bytes, which would panic when reading from mmap'd +// read-only byte slices. + +// Copyright (c) 2015,2016 Damian Gryski +// All rights reserved. + +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: + +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +package chunkenc + +// putInt64VBBucket writes an int64 using varbit optimized for SHS buckets +// note: we could improve this further: each branch doesn't need to support any values of any of the prior branches, so we can expand the range of each branch. do more with fewer bits +func putInt64VBBucket(b *bstream, val int64) { + switch { + case val == 0: + b.writeBit(zero) + case bitRange(val, 3): // -3 <= val <= 4 + b.writeBits(0x02, 2) // '10' + b.writeBits(uint64(val), 3) + case bitRange(val, 6): // -31 <= val <= 32 + b.writeBits(0x06, 3) // '110' + b.writeBits(uint64(val), 6) + case bitRange(val, 9): // -255 <= val <= 256 + b.writeBits(0x0e, 4) // '1110' + b.writeBits(uint64(val), 9) + case bitRange(val, 12): // -2047 <= val <= 2048 + b.writeBits(0x1e, 5) // '11110' + b.writeBits(uint64(val), 12) + default: + b.writeBits(0x3e, 5) // '11111' + b.writeBits(uint64(val), 64) + } +} + +// readInt64VBBucket reads an int64 using varbit optimized for SHS buckets +func readInt64VBBucket(b *bstreamReader) (int64, error) { + var d byte + for i := 0; i < 5; i++ { + d <<= 1 + bit, err := b.readBitFast() + if err != nil { + bit, err = b.readBit() + } + if err != nil { + return 0, err + } + if bit == zero { + break + } + d |= 1 + } + + var val int64 + var sz uint8 + + switch d { + case 0x00: + // val == 0 + case 0x02: // '10' + sz = 3 + case 0x06: // '110' + sz = 6 + case 0x0e: // '1110' + sz = 9 + case 0x1e: // '11110' + sz = 12 + case 0x3e: // '11111' + // Do not use fast because it's very unlikely it will succeed. + bits, err := b.readBits(64) + if err != nil { + return 0, err + } + + val = int64(bits) + } + + if sz != 0 { + bits, err := b.readBitsFast(sz) + if err != nil { + bits, err = b.readBits(sz) + } + if err != nil { + return 0, err + } + if bits > (1 << (sz - 1)) { + // or something + bits = bits - (1 << sz) + } + val = int64(bits) + } + + return val, nil +} diff --git a/tsdb/docs/format/chunks.md b/tsdb/docs/format/chunks.md index e34f8aab9..5a8b9edc7 100644 --- a/tsdb/docs/format/chunks.md +++ b/tsdb/docs/format/chunks.md @@ -33,3 +33,17 @@ in-file offset (lower 4 bytes) and segment sequence number (upper 4 bytes). │ len │ encoding <1 byte> │ data │ CRC32 <4 byte> │ └───────────────┴───────────────────┴──────────────┴────────────────┘ ``` + +## Histogram chunk + +``` +┌──────────────┬─────────────────┬──────────────────────────┬──────────────────────────┬──────────────┐ +│ len │ schema │ pos-spans │ neg-spans │ data │ +└──────────────┴─────────────────┴──────────────────────────┴──────────────────────────┴──────────────┘ + +span-section: + +┌──────────────┬──────────────────┬──────────────────┬────────────┐ +│ len │ length1 │ offset1 │ length2... │ +└──────────────┴──────────────────┴──────────────────┴────────────┘ +```