diff --git a/tsdb/db_test.go b/tsdb/db_test.go index cc65069e4..33a47ea9e 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -130,7 +130,25 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str return result } -// queryChunks runs a matcher query against the querier and fully expands its data. +// queryAndExpandChunks runs a matcher query against the querier and fully expands its data into samples. +func queryAndExpandChunks(t testing.TB, q storage.ChunkQuerier, matchers ...*labels.Matcher) map[string][][]tsdbutil.Sample { + s := queryChunks(t, q, matchers...) + + res := make(map[string][][]tsdbutil.Sample) + for k, v := range s { + var samples [][]tsdbutil.Sample + for _, chk := range v { + sam, err := storage.ExpandSamples(chk.Chunk.Iterator(nil), nil) + require.NoError(t, err) + samples = append(samples, sam) + } + res[k] = samples + } + + return res +} + +// queryChunks runs a matcher query against the querier and expands its data. func queryChunks(t testing.TB, q storage.ChunkQuerier, matchers ...*labels.Matcher) map[string][]chunks.Meta { ss := q.Select(false, nil, matchers...) defer func() { @@ -2367,7 +2385,7 @@ func TestDBReadOnly(t *testing.T) { logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) expBlocks []*Block expSeries map[string][]tsdbutil.Sample - expChunks map[string][]chunks.Meta + expChunks map[string][][]tsdbutil.Sample expDBHash []byte matchAll = labels.MustNewMatcher(labels.MatchEqual, "", "") err error @@ -2418,7 +2436,7 @@ func TestDBReadOnly(t *testing.T) { expSeries = query(t, q, matchAll) cq, err := dbWritable.ChunkQuerier(context.TODO(), math.MinInt64, math.MaxInt64) require.NoError(t, err) - expChunks = queryChunks(t, cq, matchAll) + expChunks = queryAndExpandChunks(t, cq, matchAll) require.NoError(t, dbWritable.Close()) // Close here to allow getting the dir hash for windows. expDBHash = testutil.DirHash(t, dbWritable.Dir()) @@ -2452,7 +2470,7 @@ func TestDBReadOnly(t *testing.T) { t.Run("chunk querier", func(t *testing.T) { cq, err := dbReadOnly.ChunkQuerier(context.TODO(), math.MinInt64, math.MaxInt64) require.NoError(t, err) - readOnlySeries := queryChunks(t, cq, matchAll) + readOnlySeries := queryAndExpandChunks(t, cq, matchAll) readOnlyDBHash := testutil.DirHash(t, dbDir) require.Equal(t, len(expChunks), len(readOnlySeries), "total series mismatch") @@ -6434,3 +6452,76 @@ func compareSeries(t require.TestingT, expected, actual map[string][]tsdbutil.Sa } } } + +// TestChunkQuerierReadWriteRace looks for any possible race between appending +// samples and reading chunks because the head chunk that is being appended to +// can be read in parallel and we should be able to make a copy of the chunk without +// worrying about the parallel write. +func TestChunkQuerierReadWriteRace(t *testing.T) { + db := openTestDB(t, nil, nil) + defer func() { + require.NoError(t, db.Close()) + }() + + lbls := labels.FromStrings("foo", "bar") + + writer := func() error { + <-time.After(5 * time.Millisecond) // Initial pause while readers start. + ts := 0 + for i := 0; i < 500; i++ { + app := db.Appender(context.Background()) + for j := 0; j < 10; j++ { + ts++ + _, err := app.Append(0, lbls, int64(ts), float64(ts*100)) + if err != nil { + return err + } + } + err := app.Commit() + if err != nil { + return err + } + <-time.After(time.Millisecond) + } + return nil + } + + reader := func() { + querier, err := db.ChunkQuerier(context.Background(), math.MinInt64, math.MaxInt64) + require.NoError(t, err) + defer func(q storage.ChunkQuerier) { + require.NoError(t, q.Close()) + }(querier) + ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + for ss.Next() { + cs := ss.At() + it := cs.Iterator(nil) + for it.Next() { + m := it.At() + b := m.Chunk.Bytes() + bb := make([]byte, len(b)) + copy(bb, b) // This copying of chunk bytes detects any race. + } + } + require.NoError(t, ss.Err()) + } + + ch := make(chan struct{}) + var writerErr error + go func() { + defer close(ch) + writerErr = writer() + }() + +Outer: + for { + reader() + select { + case <-ch: + break Outer + default: + } + } + + require.NoError(t, writerErr) +} diff --git a/tsdb/head_read.go b/tsdb/head_read.go index efcafcf6c..b54e53aa0 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -274,22 +274,36 @@ func (h *headChunkReader) Close() error { // Chunk returns the chunk for the reference number. func (h *headChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) { + chk, _, err := h.chunk(meta, false) + return chk, err +} + +// ChunkWithCopy returns the chunk for the reference number. +// If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk. +func (h *headChunkReader) ChunkWithCopy(meta chunks.Meta) (chunkenc.Chunk, int64, error) { + return h.chunk(meta, true) +} + +// chunk returns the chunk for the reference number. +// If copyLastChunk is true, then it makes a copy of the head chunk if asked for it. +// Also returns max time of the chunk. +func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.Chunk, int64, error) { sid, cid := chunks.HeadChunkRef(meta.Ref).Unpack() s := h.head.series.getByID(sid) // This means that the series has been garbage collected. if s == nil { - return nil, storage.ErrNotFound + return nil, 0, storage.ErrNotFound } s.Lock() - c, garbageCollect, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool) + c, headChunk, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool) if err != nil { s.Unlock() - return nil, err + return nil, 0, err } defer func() { - if garbageCollect { + if !headChunk { // Set this to nil so that Go GC can collect it after it has been used. c.chunk = nil h.head.memChunkPool.Put(c) @@ -299,22 +313,36 @@ func (h *headChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) { // This means that the chunk is outside the specified range. if !c.OverlapsClosedInterval(h.mint, h.maxt) { s.Unlock() - return nil, storage.ErrNotFound + return nil, 0, storage.ErrNotFound + } + + chk, maxTime := c.chunk, c.maxTime + if headChunk && copyLastChunk { + // The caller may ask to copy the head chunk in order to take the + // bytes of the chunk without causing the race between read and append. + b := s.headChunk.chunk.Bytes() + newB := make([]byte, len(b)) + copy(newB, b) // TODO(codesome): Use bytes.Clone() when we upgrade to Go 1.20. + // TODO(codesome): Put back in the pool (non-trivial). + chk, err = h.head.opts.ChunkPool.Get(s.headChunk.chunk.Encoding(), newB) + if err != nil { + return nil, 0, err + } } s.Unlock() return &safeChunk{ - Chunk: c.chunk, + Chunk: chk, s: s, cid: cid, isoState: h.isoState, - }, nil + }, maxTime, nil } // chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk. -// If garbageCollect is true, it means that the returned *memChunk +// If headChunk is true, it means that the returned *memChunk // (and not the chunkenc.Chunk inside it) can be garbage collected after its usage. -func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk *memChunk, garbageCollect bool, err error) { +func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk *memChunk, headChunk bool, err error) { // ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are // incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index. // The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix @@ -323,11 +351,12 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi if ix < 0 || ix > len(s.mmappedChunks) { return nil, false, storage.ErrNotFound } + if ix == len(s.mmappedChunks) { if s.headChunk == nil { return nil, false, errors.New("invalid head chunk") } - return s.headChunk, false, nil + return s.headChunk, true, nil } chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref) if err != nil { @@ -340,7 +369,7 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi mc.chunk = chk mc.minTime = s.mmappedChunks[ix].minTime mc.maxTime = s.mmappedChunks[ix].maxTime - return mc, true, nil + return mc, false, nil } // oooMergedChunk returns the requested chunk based on the given chunks.Meta diff --git a/tsdb/querier.go b/tsdb/querier.go index 061d5b394..b4513218e 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -584,7 +584,11 @@ func (p *populateWithDelGenericSeriesIterator) reset(blockID ulid.ULID, cr Chunk p.currChkMeta = chunks.Meta{} } -func (p *populateWithDelGenericSeriesIterator) next() bool { +// If copyHeadChunk is true, then the head chunk (i.e. the in-memory chunk of the TSDB) +// is deep copied to avoid races between reads and copying chunk bytes. +// However, if the deletion intervals overlaps with the head chunk, then the head chunk is +// not copied irrespective of copyHeadChunk because it will be re-encoded later anyway. +func (p *populateWithDelGenericSeriesIterator) next(copyHeadChunk bool) bool { if p.err != nil || p.i >= len(p.chks)-1 { return false } @@ -592,12 +596,6 @@ func (p *populateWithDelGenericSeriesIterator) next() bool { p.i++ p.currChkMeta = p.chks[p.i] - p.currChkMeta.Chunk, p.err = p.chunks.Chunk(p.currChkMeta) - if p.err != nil { - p.err = errors.Wrapf(p.err, "cannot populate chunk %d from block %s", p.currChkMeta.Ref, p.blockID.String()) - return false - } - p.bufIter.Intervals = p.bufIter.Intervals[:0] for _, interval := range p.intervals { if p.currChkMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) { @@ -605,22 +603,28 @@ func (p *populateWithDelGenericSeriesIterator) next() bool { } } - // Re-encode head chunks that are still open (being appended to) or - // outside the compacted MaxTime range. - // The chunk.Bytes() method is not safe for open chunks hence the re-encoding. - // This happens when snapshotting the head block or just fetching chunks from TSDB. - // - // TODO(codesome): think how to avoid the typecasting to verify when it is head block. - _, isSafeChunk := p.currChkMeta.Chunk.(*safeChunk) - if len(p.bufIter.Intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) { - // If there is no overlap with deletion intervals AND it's NOT - // an "open" head chunk, we can take chunk as it is. + hcr, ok := p.chunks.(*headChunkReader) + if ok && copyHeadChunk && len(p.bufIter.Intervals) == 0 { + // ChunkWithCopy will copy the head chunk. + var maxt int64 + p.currChkMeta.Chunk, maxt, p.err = hcr.ChunkWithCopy(p.currChkMeta) + // For the in-memory head chunk the index reader sets maxt as MaxInt64. We fix it here. + p.currChkMeta.MaxTime = maxt + } else { + p.currChkMeta.Chunk, p.err = p.chunks.Chunk(p.currChkMeta) + } + if p.err != nil { + p.err = errors.Wrapf(p.err, "cannot populate chunk %d from block %s", p.currChkMeta.Ref, p.blockID.String()) + return false + } + + if len(p.bufIter.Intervals) == 0 { + // If there is no overlap with deletion intervals, we can take chunk as it is. p.currDelIter = nil return true } - // We don't want the full chunk, or it's potentially still opened, take - // just a part of it. + // We don't want the full chunk, take just a part of it. p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(p.bufIter.Iter) p.currDelIter = &p.bufIter return true @@ -677,7 +681,7 @@ func (p *populateWithDelSeriesIterator) Next() chunkenc.ValueType { } } - for p.next() { + for p.next(false) { if p.currDelIter != nil { p.curr = p.currDelIter } else { @@ -742,7 +746,7 @@ func (p *populateWithDelChunkSeriesIterator) reset(blockID ulid.ULID, cr ChunkRe } func (p *populateWithDelChunkSeriesIterator) Next() bool { - if !p.next() { + if !p.next(true) { return false } p.curr = p.currChkMeta diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index e6e9f143f..cf9867a4f 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -235,7 +235,19 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C chksRes, errRes := storage.ExpandChunks(sres.Iterator(nil)) rmChunkRefs(chksRes) require.Equal(t, errExp, errRes) - require.Equal(t, chksExp, chksRes) + + require.Equal(t, len(chksExp), len(chksRes)) + var exp, act [][]tsdbutil.Sample + for i := range chksExp { + samples, err := storage.ExpandSamples(chksExp[i].Chunk.Iterator(nil), nil) + require.NoError(t, err) + exp = append(exp, samples) + samples, err = storage.ExpandSamples(chksRes[i].Chunk.Iterator(nil), nil) + require.NoError(t, err) + act = append(act, samples) + } + + require.Equal(t, exp, act) } require.NoError(t, res.Err()) }) @@ -2246,3 +2258,93 @@ func TestBlockBaseSeriesSet(t *testing.T) { require.NoError(t, bcs.Err()) } } + +func BenchmarkHeadChunkQuerier(b *testing.B) { + db := openTestDB(b, nil, nil) + defer func() { + require.NoError(b, db.Close()) + }() + + // 3h of data. + numTimeseries := 100 + app := db.Appender(context.Background()) + for i := 0; i < 120*6; i++ { + for j := 0; j < numTimeseries; j++ { + lbls := labels.FromStrings("foo", fmt.Sprintf("bar%d", j)) + if i%10 == 0 { + require.NoError(b, app.Commit()) + app = db.Appender(context.Background()) + } + _, err := app.Append(0, lbls, int64(i*15)*time.Second.Milliseconds(), float64(i*100)) + require.NoError(b, err) + } + } + require.NoError(b, app.Commit()) + + querier, err := db.ChunkQuerier(context.Background(), math.MinInt64, math.MaxInt64) + require.NoError(b, err) + defer func(q storage.ChunkQuerier) { + require.NoError(b, q.Close()) + }(querier) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) + total := 0 + for ss.Next() { + cs := ss.At() + it := cs.Iterator(nil) + for it.Next() { + m := it.At() + total += m.Chunk.NumSamples() + } + } + _ = total + require.NoError(b, ss.Err()) + } +} + +func BenchmarkHeadQuerier(b *testing.B) { + db := openTestDB(b, nil, nil) + defer func() { + require.NoError(b, db.Close()) + }() + + // 3h of data. + numTimeseries := 100 + app := db.Appender(context.Background()) + for i := 0; i < 120*6; i++ { + for j := 0; j < numTimeseries; j++ { + lbls := labels.FromStrings("foo", fmt.Sprintf("bar%d", j)) + if i%10 == 0 { + require.NoError(b, app.Commit()) + app = db.Appender(context.Background()) + } + _, err := app.Append(0, lbls, int64(i*15)*time.Second.Milliseconds(), float64(i*100)) + require.NoError(b, err) + } + } + require.NoError(b, app.Commit()) + + querier, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) + require.NoError(b, err) + defer func(q storage.Querier) { + require.NoError(b, q.Close()) + }(querier) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) + total := int64(0) + for ss.Next() { + cs := ss.At() + it := cs.Iterator(nil) + for it.Next() != chunkenc.ValNone { + ts, _ := it.At() + total += ts + } + } + _ = total + require.NoError(b, ss.Err()) + } +} diff --git a/tsdb/test.txt b/tsdb/test.txt index e69de29bb..a66a6fb72 100644 --- a/tsdb/test.txt +++ b/tsdb/test.txt @@ -0,0 +1 @@ +make: Nothing to be done for `test'.