diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 418541f58..1a11924d1 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -456,16 +456,14 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { type Reader struct { // The underlying bytes holding the encoded series data. // Each slice holds the data for a different segment. - bs []ByteSlice - cs []io.Closer // Closers for resources behind the byte slices. - size int64 // The total size of bytes in the reader. - pool chunkenc.Pool - crc32 hash.Hash - buf [binary.MaxVarintLen32]byte + bs []ByteSlice + cs []io.Closer // Closers for resources behind the byte slices. + size int64 // The total size of bytes in the reader. + pool chunkenc.Pool } func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) { - cr := Reader{pool: pool, bs: bs, cs: cs, crc32: newCRC32()} + cr := Reader{pool: pool, bs: bs, cs: cs} var totalSize int64 for i, b := range cr.bs { @@ -541,6 +539,7 @@ func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { // Get the lower 4 bytes. // These contain the segment offset where the data for this chunk starts. chkStart = int((ref << 32) >> 32) + chkCRC32 = newCRC32() ) if sgmIndex >= len(s.bs) { @@ -569,12 +568,12 @@ func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk - required:%v, available:%v", chkEnd, sgmBytes.Len()) } - sum := sgmBytes.Range(chkEnd-crc32.Size, chkEnd) - s.crc32.Reset() - if _, err := s.crc32.Write(sgmBytes.Range(chkEncStart, chkDataEnd)); err != nil { + sum := sgmBytes.Range(chkDataEnd, chkEnd) + if _, err := chkCRC32.Write(sgmBytes.Range(chkEncStart, chkDataEnd)); err != nil { return nil, err } - if act := s.crc32.Sum(s.buf[:0]); !bytes.Equal(act, sum) { + + if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) { return nil, errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act) } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 1168460ec..70d79118f 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -25,6 +25,7 @@ import ( "path/filepath" "sort" "strconv" + "sync" "testing" "time" @@ -2485,9 +2486,9 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { testutil.Equals(t, 1000.0, sum) } -// TestChunkWriter ensures that chunk segment are cut at the set segment size and +// TestChunkWriter_ReadAfterWrite ensures that chunk segment are cut at the set segment size and // that the resulted segments includes the expected chunks data. -func TestChunkWriter(t *testing.T) { +func TestChunkWriter_ReadAfterWrite(t *testing.T) { chk1 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}}) chk2 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}}) chk3 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 3}}) @@ -2611,22 +2612,19 @@ func TestChunkWriter(t *testing.T) { for i, test := range tests { t.Run(strconv.Itoa(i), func(t *testing.T) { - tmpdir, err := ioutil.TempDir("", "test_chunk_witer") + tempDir, err := ioutil.TempDir("", "test_chunk_writer") testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(tmpdir)) - }() + defer func() { testutil.Ok(t, os.RemoveAll(tempDir)) }() - chunkw, err := chunks.NewWriterWithSegSize(tmpdir, chunks.SegmentHeaderSize+int64(test.segmentSize)) + chunkw, err := chunks.NewWriterWithSegSize(tempDir, chunks.SegmentHeaderSize+int64(test.segmentSize)) testutil.Ok(t, err) for _, chks := range test.chks { - chunkw.WriteChunks(chks...) + testutil.Ok(t, chunkw.WriteChunks(chks...)) } - testutil.Ok(t, chunkw.Close()) - files, err := ioutil.ReadDir(tmpdir) + files, err := ioutil.ReadDir(tempDir) testutil.Ok(t, err) testutil.Equals(t, test.expSegmentsCount, len(files), "expected segments count mismatch") @@ -2655,7 +2653,7 @@ func TestChunkWriter(t *testing.T) { testutil.Equals(t, sizeExp, sizeAct) // Check the content of the chunks. - r, err := chunks.NewDirReader(tmpdir, nil) + r, err := chunks.NewDirReader(tempDir, nil) testutil.Ok(t, err) for _, chks := range test.chks { @@ -2668,3 +2666,43 @@ func TestChunkWriter(t *testing.T) { }) } } + +// TestChunkReader_ConcurrentReads checks that the chunk result can be read concurrently. +// Regression test for https://github.com/prometheus/prometheus/pull/6514. +func TestChunkReader_ConcurrentReads(t *testing.T) { + chks := []chunks.Meta{ + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 3}}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 4}}), + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 5}}), + } + + tempDir, err := ioutil.TempDir("", "test_chunk_writer") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tempDir)) }() + + chunkw, err := chunks.NewWriter(tempDir) + testutil.Ok(t, err) + + testutil.Ok(t, chunkw.WriteChunks(chks...)) + testutil.Ok(t, chunkw.Close()) + + r, err := chunks.NewDirReader(tempDir, nil) + testutil.Ok(t, err) + + var wg sync.WaitGroup + for _, chk := range chks { + for i := 0; i < 100; i++ { + wg.Add(1) + go func(chunk chunks.Meta) { + defer wg.Done() + + chkAct, err := r.Chunk(chunk.Ref) + testutil.Ok(t, err) + testutil.Equals(t, chunk.Chunk.Bytes(), chkAct.Bytes()) + }(chk) + } + wg.Wait() + } +}