diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index 949aa32af..5ec4ced2b 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -473,14 +473,14 @@ func (a *HistogramAppender) AppendHistogram(t int64, h *histogram.Histogram) { a.writeSumDelta(h.Sum) - for i, buck := range h.PositiveBuckets { - delta := buck - a.pBuckets[i] + for i, b := range h.PositiveBuckets { + delta := b - a.pBuckets[i] dod := delta - a.pBucketsDelta[i] putVarbitInt(a.b, dod) a.pBucketsDelta[i] = delta } - for i, buck := range h.NegativeBuckets { - delta := buck - a.nBuckets[i] + for i, b := range h.NegativeBuckets { + delta := b - a.nBuckets[i] dod := delta - a.nBucketsDelta[i] putVarbitInt(a.b, dod) a.nBucketsDelta[i] = delta @@ -505,7 +505,8 @@ func (a *HistogramAppender) AppendHistogram(t int64, h *histogram.Histogram) { // Recode converts the current chunk to accommodate an expansion of the set of // (positive and/or negative) buckets used, according to the provided // interjections, resulting in the honoring of the provided new positive and -// negative spans. +// negative spans. To continue appending, use the returned Appender rather than +// the receiver of this method. func (a *HistogramAppender) Recode( positiveInterjections, negativeInterjections []Interjection, positiveSpans, negativeSpans []histogram.Span, @@ -513,7 +514,7 @@ func (a *HistogramAppender) Recode( // TODO(beorn7): This currently just decodes everything and then encodes // it again with the new span layout. This can probably be done in-place // by editing the chunk. But let's first see how expensive it is in the - // big picture. + // big picture. Also, in-place editing might create concurrency issues. byts := a.b.bytes() it := newHistogramIterator(byts) hc := NewHistogramChunk() diff --git a/tsdb/compact.go b/tsdb/compact.go index 1d7e93f7f..08fd27a31 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -768,7 +768,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, chksIter := s.Iterator() chks = chks[:0] for chksIter.Next() { - // We are not iterating in streaming way over chunk as it's more efficient to do bulk write for index and + // We are not iterating in streaming way over chunk as + // it's more efficient to do bulk write for index and // chunk file purposes. chks = append(chks, chksIter.At()) } diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 9ded58dea..d759ffaa4 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -634,28 +634,31 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper // appendHistogram adds the histogram. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { - // Head controls the execution of recoding, so that we own the proper chunk reference afterwards. - // We check for Appendable before appendPreprocessor because in case it ends up creating a new chunk, - // we need to know if there was also a counter reset or not to set the meta properly. + // Head controls the execution of recoding, so that we own the proper + // chunk reference afterwards. We check for Appendable before + // appendPreprocessor because in case it ends up creating a new chunk, + // we need to know if there was also a counter reset or not to set the + // meta properly. app, _ := s.app.(*chunkenc.HistogramAppender) var ( positiveInterjections, negativeInterjections []chunkenc.Interjection okToAppend, counterReset bool ) - if app != nil { - positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(h) - } - c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper) if !sampleInOrder { return sampleInOrder, chunkCreated } + if app != nil { + positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(h) + } + if !chunkCreated { // We have 3 cases here // - !okToAppend -> We need to cut a new chunk. - // - okToAppend but we have interjections -> Existing chunk needs recoding before we can append our histogram. - // - okToAppend and no interjections -> Chunk is ready to support our histogram. + // - okToAppend but we have interjections → Existing chunk needs + // recoding before we can append our histogram. + // - okToAppend and no interjections → Chunk is ready to support our histogram. if !okToAppend || counterReset { c = s.cutNewHeadChunk(t, chunkenc.EncHistogram, chunkDiskMapper) chunkCreated = true @@ -663,12 +666,11 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui // New buckets have appeared. We need to recode all // prior histogram samples within the chunk before we // can process this one. - chunk, app := app.Recode(positiveInterjections, negativeInterjections, h.PositiveSpans, h.NegativeSpans) - s.headChunk = &memChunk{ - minTime: s.headChunk.minTime, - maxTime: s.headChunk.maxTime, - chunk: chunk, - } + chunk, app := app.Recode( + positiveInterjections, negativeInterjections, + h.PositiveSpans, h.NegativeSpans, + ) + c.chunk = chunk s.app = app } } @@ -704,7 +706,9 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui // appendPreprocessor takes care of cutting new chunks and m-mapping old chunks. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // This should be called only when appending data. -func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper) (c *memChunk, sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendPreprocessor( + t int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper, +) (c *memChunk, sampleInOrder, chunkCreated bool) { // Based on Gorilla white papers this offers near-optimal compression ratio // so anything bigger that this has diminishing returns and increases // the time range within which we have to decompress all samples. @@ -774,7 +778,9 @@ func computeChunkEndTime(start, cur, max int64) int64 { return start + (max-start)/n } -func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk { +func (s *memSeries) cutNewHeadChunk( + mint int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper, +) *memChunk { s.mmapCurrentHeadChunk(chunkDiskMapper) s.headChunk = &memChunk{ diff --git a/tsdb/querier.go b/tsdb/querier.go index b1c304e2c..d279e5080 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -528,16 +528,20 @@ func (b *blockBaseSeriesSet) Err() error { func (b *blockBaseSeriesSet) Warnings() storage.Warnings { return nil } -// populateWithDelGenericSeriesIterator allows to iterate over given chunk metas. In each iteration it ensures -// that chunks are trimmed based on given tombstones interval if any. +// populateWithDelGenericSeriesIterator allows to iterate over given chunk +// metas. In each iteration it ensures that chunks are trimmed based on given +// tombstones interval if any. // -// populateWithDelGenericSeriesIterator assumes that chunks that would be fully removed by intervals are filtered out in previous phase. +// populateWithDelGenericSeriesIterator assumes that chunks that would be fully +// removed by intervals are filtered out in previous phase. // -// On each iteration currChkMeta is available. If currDelIter is not nil, it means that chunk iterator in currChkMeta -// is invalid and chunk rewrite is needed, currDelIter should be used. +// On each iteration currChkMeta is available. If currDelIter is not nil, it +// means that the chunk iterator in currChkMeta is invalid and a chunk rewrite +// is needed, for which currDelIter should be used. type populateWithDelGenericSeriesIterator struct { chunks ChunkReader - // chks are expected to be sorted by minTime and should be related to the same, single series. + // chks are expected to be sorted by minTime and should be related to + // the same, single series. chks []chunks.Meta i int @@ -589,15 +593,17 @@ func (p *populateWithDelGenericSeriesIterator) next() bool { // The chunk.Bytes() method is not safe for open chunks hence the re-encoding. // This happens when snapshotting the head block or just fetching chunks from TSDB. // - // TODO think how to avoid the typecasting to verify when it is head block. + // TODO(codesome): think how to avoid the typecasting to verify when it is head block. _, isSafeChunk := p.currChkMeta.Chunk.(*safeChunk) if len(p.bufIter.Intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) { - // If there are no overlap with deletion intervals AND it's NOT an "open" head chunk, we can take chunk as it is. + // If there is no overlap with deletion intervals AND it's NOT + // an "open" head chunk, we can take chunk as it is. p.currDelIter = nil return true } - // We don't want full chunk or it's potentially still opened, take just part of it. + // We don't want the full chunk, or it's potentially still opened, take + // just a part of it. p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(nil) p.currDelIter = p.bufIter return true @@ -703,12 +709,14 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { return false } - // Empty chunk, this should not happen, as we assume full deletions being filtered before this iterator. + // Empty chunk, this should not happen, as we assume full + // deletions being filtered before this iterator. p.err = errors.New("populateWithDelChunkSeriesIterator: unexpected empty chunk found while rewriting chunk") return false } - // Re-encode the chunk if iterator is provider. This means that it has some samples to be deleted or chunk is opened. + // Re-encode the chunk if iterator is provider. This means that it has + // some samples to be deleted or chunk is opened. var ( newChunk chunkenc.Chunk app chunkenc.Appender @@ -727,10 +735,33 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { var h *histogram.Histogram t, h = p.currDelIter.AtHistogram() p.curr.MinTime = t + app.AppendHistogram(t, h) - for p.currDelIter.Next() == chunkenc.ValHistogram { - // TODO(beorn7): Is it possible that the value type changes during iteration? + for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() { + if vt != chunkenc.ValHistogram { + err = fmt.Errorf("found value type %v in histogram chunk", vt) + break + } t, h = p.currDelIter.AtHistogram() + + // Defend against corrupted chunks. + pI, nI, okToAppend, counterReset := app.(*chunkenc.HistogramAppender).Appendable(h) + if len(pI)+len(nI) > 0 { + err = fmt.Errorf( + "bucket layout has changed unexpectedly: %d positive and %d negative bucket interjections required", + len(pI), len(nI), + ) + break + } + if counterReset { + err = errors.New("detected unexpected counter reset in histogram") + break + } + if !okToAppend { + err = errors.New("unable to append histogram due to unexpected schema change") + break + } + app.AppendHistogram(t, h) } case chunkenc.ValFloat: @@ -742,8 +773,11 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { t, v = p.currDelIter.At() p.curr.MinTime = t app.Append(t, v) - for p.currDelIter.Next() == chunkenc.ValFloat { - // TODO(beorn7): Is it possible that the value type changes during iteration? + for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() { + if vt != chunkenc.ValFloat { + err = fmt.Errorf("found value type %v in float chunk", vt) + break + } t, v = p.currDelIter.At() app.Append(t, v) }