diff --git a/pkg/histogram/sparse_histogram.go b/pkg/histogram/sparse_histogram.go index 6ea1cb76e..0e4df9372 100644 --- a/pkg/histogram/sparse_histogram.go +++ b/pkg/histogram/sparse_histogram.go @@ -14,7 +14,6 @@ package histogram type SparseHistogram struct { - Ts int64 Count, ZeroCount uint64 Sum, ZeroThreshold float64 Schema int32 diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 0eddb4ea5..63d3a6111 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -73,7 +73,7 @@ type Chunk interface { // Appender adds sample pairs to a chunk. type Appender interface { Append(int64, float64) - AppendHistogram(h histogram.SparseHistogram) + AppendHistogram(t int64, h histogram.SparseHistogram) } // Iterator is a simple iterator that can only get the next value. diff --git a/tsdb/chunkenc/histo.go b/tsdb/chunkenc/histo.go index d5c6d1afb..08f2070e6 100644 --- a/tsdb/chunkenc/histo.go +++ b/tsdb/chunkenc/histo.go @@ -206,9 +206,10 @@ type histoAppender struct { posSpans, negSpans []histogram.Span // for the fields that are tracked as dod's + // note that we expect to handle negative deltas (e.g. resets) by creating new chunks, we still want to support it in general hence signed integer types t int64 cnt, zcnt uint64 - tDelta, cntDelta, zcntDelta uint64 + tDelta, cntDelta, zcntDelta int64 posbuckets, negbuckets []int64 posbucketsDelta, negbucketsDelta []int64 @@ -260,8 +261,8 @@ func (a *histoAppender) Append(int64, float64) { // AppendHistogram appends a SparseHistogram to the chunk // we assume the histogram is properly structured. E.g. that the number pos/neg buckets used corresponds to the number conveyed by the pos/neg span structures -func (a *histoAppender) AppendHistogram(h histogram.SparseHistogram) { - var tDelta, cntDelta, zcntDelta uint64 +func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { + var tDelta, cntDelta, zcntDelta int64 num := binary.BigEndian.Uint16(a.b.bytes()) if num == 0 { @@ -273,7 +274,7 @@ func (a *histoAppender) AppendHistogram(h histogram.SparseHistogram) { a.schema = h.Schema a.posSpans, a.negSpans = h.PositiveSpans, h.NegativeSpans - putVarint(a.b, a.buf64, h.Ts) + putVarint(a.b, a.buf64, t) putUvarint(a.b, a.buf64, h.Count) putUvarint(a.b, a.buf64, h.ZeroCount) a.b.writeBits(math.Float64bits(h.Sum), 64) @@ -284,16 +285,13 @@ func (a *histoAppender) AppendHistogram(h histogram.SparseHistogram) { putVarint(a.b, a.buf64, buck) } } else if num == 1 { - tDelta = uint64(h.Ts - a.t) + tDelta = t - a.t + cntDelta = int64(h.Count) - int64(a.cnt) + zcntDelta = int64(h.ZeroCount) - int64(a.zcnt) - // WARNING: we assume all counts go up. what guarantee do we have this is true? uint may underflow if not. - - cntDelta = h.Count - a.cnt - zcntDelta = h.ZeroCount - a.zcnt - - putUvarint(a.b, a.buf64, tDelta) - putUvarint(a.b, a.buf64, cntDelta) - putUvarint(a.b, a.buf64, zcntDelta) + putVarint(a.b, a.buf64, tDelta) + putVarint(a.b, a.buf64, cntDelta) + putVarint(a.b, a.buf64, zcntDelta) a.writeSumDelta(h.Sum) @@ -308,13 +306,13 @@ func (a *histoAppender) AppendHistogram(h histogram.SparseHistogram) { a.negbucketsDelta[i] = delta } } else { - tDelta = uint64(h.Ts - a.t) - cntDelta = h.Count - a.cnt - zcntDelta = h.ZeroCount - a.zcnt + tDelta = t - a.t + cntDelta = int64(h.Count) - int64(a.cnt) + zcntDelta = int64(h.ZeroCount) - int64(a.zcnt) - tDod := int64(tDelta - a.tDelta) - cntDod := int64(cntDelta - a.cntDelta) - zcntDod := int64(zcntDelta - a.zcntDelta) + tDod := tDelta - a.tDelta + cntDod := cntDelta - a.cntDelta + zcntDod := zcntDelta - a.zcntDelta putDod(a.b, tDod) putDod(a.b, cntDod) @@ -338,7 +336,7 @@ func (a *histoAppender) AppendHistogram(h histogram.SparseHistogram) { binary.BigEndian.PutUint16(a.b.bytes(), num+1) - a.t = h.Ts + a.t = t a.cnt = h.Count a.zcnt = h.ZeroCount a.tDelta = tDelta @@ -399,7 +397,7 @@ type histoIterator struct { // for the fields that are tracked as dod's t int64 cnt, zcnt uint64 - tDelta, cntDelta, zcntDelta uint64 + tDelta, cntDelta, zcntDelta int64 posbuckets, negbuckets []int64 posbucketsDelta, negbucketsDelta []int64 @@ -424,9 +422,8 @@ func (it *histoIterator) Seek(t int64) bool { } return true } -func (it *histoIterator) At() (h histogram.SparseHistogram) { - return histogram.SparseHistogram{ - Ts: it.t, +func (it *histoIterator) At() (t int64, h histogram.SparseHistogram) { + return it.t, histogram.SparseHistogram{ Count: it.cnt, ZeroCount: it.zcnt, Sum: it.sum, @@ -524,7 +521,7 @@ func (it *histoIterator) Next() bool { } if it.numRead == 1 { - tDelta, err := binary.ReadUvarint(&it.br) + tDelta, err := binary.ReadVarint(&it.br) if err != nil { it.err = err return false @@ -532,21 +529,21 @@ func (it *histoIterator) Next() bool { it.tDelta = tDelta it.t += int64(it.tDelta) - cntDelta, err := binary.ReadUvarint(&it.br) + cntDelta, err := binary.ReadVarint(&it.br) if err != nil { it.err = err return false } it.cntDelta = cntDelta - it.cnt += it.cntDelta + it.cnt = uint64(int64(it.cnt) + it.cntDelta) - zcntDelta, err := binary.ReadUvarint(&it.br) + zcntDelta, err := binary.ReadVarint(&it.br) if err != nil { it.err = err return false } it.zcntDelta = zcntDelta - it.zcnt += it.zcntDelta + it.zcnt = uint64(int64(it.zcnt) + it.zcntDelta) ok := it.readSum() if !ok { @@ -580,22 +577,22 @@ func (it *histoIterator) Next() bool { if !ok { return ok } - it.tDelta = uint64(int64(it.tDelta) + tDod) - it.t += int64(it.tDelta) + it.tDelta = it.tDelta + tDod + it.t += it.tDelta cntDod, ok := it.readDod() if !ok { return ok } - it.cntDelta = uint64(int64(it.cntDelta) + cntDod) - it.cnt += it.cntDelta + it.cntDelta = it.cntDelta + cntDod + it.cnt = uint64(int64(it.cnt) + it.cntDelta) zcntDod, ok := it.readDod() if !ok { return ok } - it.zcntDelta = uint64(int64(it.zcntDelta) + zcntDod) - it.zcnt += it.zcntDelta + it.zcntDelta = it.zcntDelta + zcntDod + it.zcnt = uint64(int64(it.zcnt) + it.zcntDelta) ok = it.readSum() if !ok { diff --git a/tsdb/chunkenc/histo_test.go b/tsdb/chunkenc/histo_test.go index c5e3b5b2b..907756a68 100644 --- a/tsdb/chunkenc/histo_test.go +++ b/tsdb/chunkenc/histo_test.go @@ -24,12 +24,20 @@ func TestHistoChunkSameBuckets(t *testing.T) { c := NewHistoChunk() + type res struct { + t int64 + h histogram.SparseHistogram + } + + // create fresh appender and add the first histogram + app, err := c.Appender() require.NoError(t, err) require.Equal(t, c.NumSamples(), 0) + ts := int64(1234567890) + h := histogram.SparseHistogram{ - Ts: 1234567890, Count: 5, ZeroCount: 2, Sum: 18.4, @@ -44,11 +52,11 @@ func TestHistoChunkSameBuckets(t *testing.T) { NegativeBuckets: []int64{}, } - app.AppendHistogram(h) + app.AppendHistogram(ts, h) require.Equal(t, c.NumSamples(), 1) - exp := []histogram.SparseHistogram{ - h, + exp := []res{ + {t: ts, h: h}, } // TODO add an update @@ -66,9 +74,10 @@ func TestHistoChunkSameBuckets(t *testing.T) { // 1. Expand iterator in simple case. it1 := c.iterator(nil) require.NoError(t, it1.Err()) - var res1 []histogram.SparseHistogram + var res1 []res for it1.Next() { - res1 = append(res1, it1.At()) + ts, h := it1.At() + res1 = append(res1, res{t: ts, h: h}) } require.NoError(t, it1.Err()) require.Equal(t, exp, res1) diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index b46e88854..d1dc09f8f 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -149,7 +149,7 @@ type xorAppender struct { trailing uint8 } -func (a *xorAppender) AppendHistogram(h histogram.SparseHistogram) { +func (a *xorAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { panic("cannot call xorAppender.AppendHistogram().") } diff --git a/tsdb/head.go b/tsdb/head.go index 296fa56c0..42e4f86f0 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1199,7 +1199,7 @@ func (a *initAppender) AppendHistogram(ref uint64, l labels.Labels, sh histogram if a.app != nil { return a.app.AppendHistogram(ref, l, sh) } - a.head.initTime(sh.Ts) + //a.head.initTime(sh.Ts) FIXME(ganesh) a.app = a.head.appender() return a.app.AppendHistogram(ref, l, sh)