diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 32a4aba4a..22da78dca 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -906,7 +906,8 @@ func (t *QueueManager) newShards() *shards { } type shards struct { - mtx sync.RWMutex // With the WAL, this is never actually contended. + mtx sync.RWMutex // With the WAL, this is never actually contended. + writeMtx sync.Mutex qm *QueueManager queues []*queue @@ -949,6 +950,8 @@ func (s *shards) start(n int) { s.softShutdown = make(chan struct{}) s.running.Store(int32(n)) s.done = make(chan struct{}) + s.enqueuedSamples.Store(0) + s.enqueuedExemplars.Store(0) s.samplesDroppedOnHardShutdown.Store(0) s.exemplarsDroppedOnHardShutdown.Store(0) for i := 0; i < n; i++ { @@ -996,6 +999,8 @@ func (s *shards) stop() { func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool { s.mtx.RLock() defer s.mtx.RUnlock() + s.writeMtx.Lock() + defer s.writeMtx.Unlock() select { case <-s.softShutdown: @@ -1024,14 +1029,18 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool { } type queue struct { + // batchMtx covers sending to the batchQueue and batch operations other + // than appending a sample. It is mainly to make sure (*queue).Batch() and + // (*queue).FlushAndShutdown() are not called concurrently. + batchMtx sync.Mutex batch []sampleOrExemplar batchQueue chan []sampleOrExemplar // Since we know there are a limited number of batches out, using a stack // is easy and safe so a sync.Pool is not necessary. + // poolMtx covers adding and removing batches from the batchPool. + poolMtx sync.Mutex batchPool [][]sampleOrExemplar - // This mutex covers adding and removing batches from the batchPool. - poolMux sync.Mutex } type sampleOrExemplar struct { @@ -1076,6 +1085,9 @@ func (q *queue) Chan() <-chan []sampleOrExemplar { // Batch returns the current batch and allocates a new batch. Must not be // called concurrently with Append. func (q *queue) Batch() []sampleOrExemplar { + q.batchMtx.Lock() + defer q.batchMtx.Unlock() + batch := q.batch q.batch = q.newBatch(cap(batch)) return batch @@ -1083,8 +1095,8 @@ func (q *queue) Batch() []sampleOrExemplar { // ReturnForReuse adds the batch buffer back to the internal pool. func (q *queue) ReturnForReuse(batch []sampleOrExemplar) { - q.poolMux.Lock() - defer q.poolMux.Unlock() + q.poolMtx.Lock() + defer q.poolMtx.Unlock() if len(q.batchPool) < cap(q.batchPool) { q.batchPool = append(q.batchPool, batch[:0]) } @@ -1093,6 +1105,9 @@ func (q *queue) ReturnForReuse(batch []sampleOrExemplar) { // FlushAndShutdown stops the queue and flushes any samples. No appends can be // made after this is called. func (q *queue) FlushAndShutdown(done <-chan struct{}) { + q.batchMtx.Lock() + defer q.batchMtx.Unlock() + if len(q.batch) > 0 { select { case q.batchQueue <- q.batch: @@ -1106,8 +1121,8 @@ func (q *queue) FlushAndShutdown(done <-chan struct{}) { } func (q *queue) newBatch(capacity int) []sampleOrExemplar { - q.poolMux.Lock() - defer q.poolMux.Unlock() + q.poolMtx.Lock() + defer q.poolMtx.Unlock() batches := len(q.batchPool) if batches > 0 { batch := q.batchPool[batches-1] @@ -1186,18 +1201,22 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) case <-timer.C: - // We need to take the lock when getting a batch to avoid + // We need to take the write lock when getting a batch to avoid // concurrent Appends. Generally this will only happen on low - // traffic instances. - s.mtx.Lock() - // First, we need to see if we can happen to get a batch from the queue if it filled while acquiring the lock. + // traffic instances or during resharding. We have to use writeMtx + // and not the batchMtx on a queue because we do not want to have + // to lock each queue for each sample, and cannot call + // queue.Batch() while an Append happens. + s.writeMtx.Lock() + // First, we need to see if we can happen to get a batch from the + // queue if it filled while acquiring the lock. var batch []sampleOrExemplar select { case batch = <-batchQueue: default: batch = queue.Batch() } - s.mtx.Unlock() + s.writeMtx.Unlock() if len(batch) > 0 { nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData) n := nPendingSamples + nPendingExemplars diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index a2d63b0bc..14fbb324d 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -382,6 +382,44 @@ func TestReshardRaceWithStop(t *testing.T) { <-exitCh } +func TestReshardPartialBatch(t *testing.T) { + samples, series := createTimeseries(1, 10) + + c := NewTestBlockedWriteClient() + + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig + cfg.MaxShards = 1 + batchSendDeadline := time.Millisecond + flushDeadline := 10 * time.Millisecond + cfg.BatchSendDeadline = model.Duration(batchSendDeadline) + + metrics := newQueueManagerMetrics(nil, "", "") + m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false) + m.StoreSeries(series, 0) + + m.Start() + + for i := 0; i < 100; i++ { + done := make(chan struct{}) + go func() { + m.Append(samples) + time.Sleep(batchSendDeadline) + m.shards.stop() + m.shards.start(1) + done <- struct{}{} + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Error("Deadlock between sending and stopping detected") + t.FailNow() + } + } + // We can only call stop if there was not a deadlock. + m.Stop() +} + func TestReleaseNoninternedString(t *testing.T) { cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig