diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index ce9b94d85..5cdd2e81f 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -163,3 +163,22 @@ func (c *chunkWriteQueue) stop() { c.workerWg.Wait() } + +func (c *chunkWriteQueue) queueIsEmpty() bool { + return c.queueSize() == 0 +} + +func (c *chunkWriteQueue) queueIsFull() bool { + // When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh + // because one job is currently being processed and blocked in the writer. + return c.queueSize() == cap(c.jobs)+1 +} + +func (c *chunkWriteQueue) queueSize() int { + c.chunkRefMapMtx.Lock() + defer c.chunkRefMapMtx.Unlock() + + // Looking at chunkRefMap instead of jobCh because the job is popped from the chan before it has + // been fully processed, it remains in the chunkRefMap until the processing is complete. + return len(c.chunkRefMap) +} diff --git a/tsdb/chunks/chunk_write_queue_test.go b/tsdb/chunks/chunk_write_queue_test.go index 251c96395..0e971a336 100644 --- a/tsdb/chunks/chunk_write_queue_test.go +++ b/tsdb/chunks/chunk_write_queue_test.go @@ -146,7 +146,7 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) { } // The queue should be full. - require.True(t, queueIsFull(q)) + require.True(t, q.queueIsFull()) // Adding another job should block as long as no job from the queue gets consumed. addedJob := atomic.NewBool(false) @@ -166,19 +166,19 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) { require.Eventually(t, func() bool { return addedJob.Load() }, time.Second, time.Millisecond*10) // The queue should be full again. - require.True(t, queueIsFull(q)) + require.True(t, q.queueIsFull()) // Consume +1 jobs from the queue. // To drain the queue we need to consume +1 jobs because 1 job // is already in the state of being processed. for job := 0; job < sizeLimit+1; job++ { - require.False(t, queueIsEmpty(q)) + require.False(t, q.queueIsEmpty()) unblockChunkWriter() } // Wait until all jobs have been processed. callbackWg.Wait() - require.True(t, queueIsEmpty(q)) + require.True(t, q.queueIsEmpty()) } func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) { @@ -266,22 +266,3 @@ func BenchmarkChunkWriteQueue_addJob(b *testing.B) { }) } } - -func queueIsEmpty(q *chunkWriteQueue) bool { - return queueSize(q) == 0 -} - -func queueIsFull(q *chunkWriteQueue) bool { - // When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh - // because one job is currently being processed and blocked in the writer. - return queueSize(q) == cap(q.jobs)+1 -} - -func queueSize(q *chunkWriteQueue) int { - q.chunkRefMapMtx.Lock() - defer q.chunkRefMapMtx.Unlock() - - // Looking at chunkRefMap instead of jobCh because the job is popped from the chan before it has - // been fully processed, it remains in the chunkRefMap until the processing is complete. - return len(q.chunkRefMap) -} diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 11c175cbf..1b5845808 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -473,6 +473,10 @@ func (cdm *ChunkDiskMapper) CutNewFile() { cdm.evtlPos.cutFileOnNextChunk() } +func (cdm *ChunkDiskMapper) IsQueueEmpty() bool { + return cdm.writeQueue.queueIsEmpty() +} + // cutAndExpectRef creates a new m-mapped file. // The write lock should be held before calling this. // It ensures that the position in the new file matches the given chunk reference, if not then it errors. diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 45d2325ba..d94c387b3 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1461,6 +1461,10 @@ func TestSizeRetention(t *testing.T) { } require.NoError(t, headApp.Commit()) + require.Eventually(t, func() bool { + return db.Head().chunkDiskMapper.IsQueueEmpty() + }, 2*time.Second, 100*time.Millisecond) + // Test that registered size matches the actual disk size. require.NoError(t, db.reloadBlocks()) // Reload the db to register the new db size. require.Equal(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. diff --git a/tsdb/example_test.go b/tsdb/example_test.go index 9af341e1e..1947a48a5 100644 --- a/tsdb/example_test.go +++ b/tsdb/example_test.go @@ -16,97 +16,89 @@ package tsdb import ( "context" "fmt" + "io/ioutil" "math" - "testing" + "os" "time" - "github.com/stretchr/testify/require" - "github.com/prometheus/prometheus/model/labels" ) -func TestExample(t *testing.T) { +func Example() { // Create a random dir to work in. Open() doesn't require a pre-existing dir, but // we want to make sure not to make a mess where we shouldn't. - dir := t.TempDir() + dir, err := ioutil.TempDir("", "tsdb-test") + noErr(err) // Open a TSDB for reading and/or writing. db, err := Open(dir, nil, nil, DefaultOptions(), nil) - require.NoError(t, err) + noErr(err) // Open an appender for writing. app := db.Appender(context.Background()) - lbls := labels.FromStrings("foo", "bar") - var appendedSamples []sample + series := labels.FromStrings("foo", "bar") // Ref is 0 for the first append since we don't know the reference for the series. - ts, v := time.Now().Unix(), 123.0 - ref, err := app.Append(0, lbls, ts, v) - require.NoError(t, err) - appendedSamples = append(appendedSamples, sample{ts, v}) + ref, err := app.Append(0, series, time.Now().Unix(), 123) + noErr(err) // Another append for a second later. // Re-using the ref from above since it's the same series, makes append faster. time.Sleep(time.Second) - ts, v = time.Now().Unix(), 124 - _, err = app.Append(ref, lbls, ts, v) - require.NoError(t, err) - appendedSamples = append(appendedSamples, sample{ts, v}) + _, err = app.Append(ref, series, time.Now().Unix(), 124) + noErr(err) // Commit to storage. err = app.Commit() - require.NoError(t, err) + noErr(err) // In case you want to do more appends after app.Commit(), // you need a new appender. - // app = db.Appender(context.Background()) - // + app = db.Appender(context.Background()) // ... adding more samples. - // - // Commit to storage. - // err = app.Commit() - // require.NoError(t, err) // Open a querier for reading. querier, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) - require.NoError(t, err) - + noErr(err) ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) - var queriedSamples []sample + for ss.Next() { series := ss.At() fmt.Println("series:", series.Labels().String()) it := series.Iterator() for it.Next() { - ts, v := it.At() - fmt.Println("sample", ts, v) - queriedSamples = append(queriedSamples, sample{ts, v}) + _, v := it.At() // We ignore the timestamp here, only to have a predictable output we can test against (below) + fmt.Println("sample", v) } - require.NoError(t, it.Err()) fmt.Println("it.Err():", it.Err()) } - require.NoError(t, ss.Err()) fmt.Println("ss.Err():", ss.Err()) ws := ss.Warnings() if len(ws) > 0 { fmt.Println("warnings:", ws) } err = querier.Close() - require.NoError(t, err) + noErr(err) // Clean up any last resources when done. err = db.Close() - require.NoError(t, err) - - require.Equal(t, appendedSamples, queriedSamples) + noErr(err) + err = os.RemoveAll(dir) + noErr(err) // Output: // series: {foo="bar"} - // sample 123 - // sample 124 + // sample 123 + // sample 124 // it.Err(): // ss.Err(): } + +func noErr(err error) { + if err != nil { + panic(err) + } +}