Fix deadlock when stopping a shard (#10279)

If a queue is stopped and one of its shards happens to hit the
batch_send_deadline at the same time a deadlock can occur where stop
holds the mutex and will not release it until the send is finished, but
the send needs the mutex to retrieve the most recent batch. This is
fixed by using a second mutex just for writing.

In addition, the test I wrote exposed a case where during shutdown a
batch could be sent twice due to concurrent calls to queue.Batch() and
queue.FlushAndShutdown(). Protect these with a mutex as well.

Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
This commit is contained in:
Chris Marchbanks 2022-02-11 07:07:41 -07:00 committed by GitHub
parent 1b3443ede1
commit bfb1500a38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 69 additions and 12 deletions

View File

@ -906,7 +906,8 @@ func (t *QueueManager) newShards() *shards {
}
type shards struct {
mtx sync.RWMutex // With the WAL, this is never actually contended.
mtx sync.RWMutex // With the WAL, this is never actually contended.
writeMtx sync.Mutex
qm *QueueManager
queues []*queue
@ -949,6 +950,8 @@ func (s *shards) start(n int) {
s.softShutdown = make(chan struct{})
s.running.Store(int32(n))
s.done = make(chan struct{})
s.enqueuedSamples.Store(0)
s.enqueuedExemplars.Store(0)
s.samplesDroppedOnHardShutdown.Store(0)
s.exemplarsDroppedOnHardShutdown.Store(0)
for i := 0; i < n; i++ {
@ -996,6 +999,8 @@ func (s *shards) stop() {
func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool {
s.mtx.RLock()
defer s.mtx.RUnlock()
s.writeMtx.Lock()
defer s.writeMtx.Unlock()
select {
case <-s.softShutdown:
@ -1024,14 +1029,18 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool {
}
type queue struct {
// batchMtx covers sending to the batchQueue and batch operations other
// than appending a sample. It is mainly to make sure (*queue).Batch() and
// (*queue).FlushAndShutdown() are not called concurrently.
batchMtx sync.Mutex
batch []sampleOrExemplar
batchQueue chan []sampleOrExemplar
// Since we know there are a limited number of batches out, using a stack
// is easy and safe so a sync.Pool is not necessary.
// poolMtx covers adding and removing batches from the batchPool.
poolMtx sync.Mutex
batchPool [][]sampleOrExemplar
// This mutex covers adding and removing batches from the batchPool.
poolMux sync.Mutex
}
type sampleOrExemplar struct {
@ -1076,6 +1085,9 @@ func (q *queue) Chan() <-chan []sampleOrExemplar {
// Batch returns the current batch and allocates a new batch. Must not be
// called concurrently with Append.
func (q *queue) Batch() []sampleOrExemplar {
q.batchMtx.Lock()
defer q.batchMtx.Unlock()
batch := q.batch
q.batch = q.newBatch(cap(batch))
return batch
@ -1083,8 +1095,8 @@ func (q *queue) Batch() []sampleOrExemplar {
// ReturnForReuse adds the batch buffer back to the internal pool.
func (q *queue) ReturnForReuse(batch []sampleOrExemplar) {
q.poolMux.Lock()
defer q.poolMux.Unlock()
q.poolMtx.Lock()
defer q.poolMtx.Unlock()
if len(q.batchPool) < cap(q.batchPool) {
q.batchPool = append(q.batchPool, batch[:0])
}
@ -1093,6 +1105,9 @@ func (q *queue) ReturnForReuse(batch []sampleOrExemplar) {
// FlushAndShutdown stops the queue and flushes any samples. No appends can be
// made after this is called.
func (q *queue) FlushAndShutdown(done <-chan struct{}) {
q.batchMtx.Lock()
defer q.batchMtx.Unlock()
if len(q.batch) > 0 {
select {
case q.batchQueue <- q.batch:
@ -1106,8 +1121,8 @@ func (q *queue) FlushAndShutdown(done <-chan struct{}) {
}
func (q *queue) newBatch(capacity int) []sampleOrExemplar {
q.poolMux.Lock()
defer q.poolMux.Unlock()
q.poolMtx.Lock()
defer q.poolMtx.Unlock()
batches := len(q.batchPool)
if batches > 0 {
batch := q.batchPool[batches-1]
@ -1186,18 +1201,22 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
case <-timer.C:
// We need to take the lock when getting a batch to avoid
// We need to take the write lock when getting a batch to avoid
// concurrent Appends. Generally this will only happen on low
// traffic instances.
s.mtx.Lock()
// First, we need to see if we can happen to get a batch from the queue if it filled while acquiring the lock.
// traffic instances or during resharding. We have to use writeMtx
// and not the batchMtx on a queue because we do not want to have
// to lock each queue for each sample, and cannot call
// queue.Batch() while an Append happens.
s.writeMtx.Lock()
// First, we need to see if we can happen to get a batch from the
// queue if it filled while acquiring the lock.
var batch []sampleOrExemplar
select {
case batch = <-batchQueue:
default:
batch = queue.Batch()
}
s.mtx.Unlock()
s.writeMtx.Unlock()
if len(batch) > 0 {
nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData)
n := nPendingSamples + nPendingExemplars

View File

@ -382,6 +382,44 @@ func TestReshardRaceWithStop(t *testing.T) {
<-exitCh
}
func TestReshardPartialBatch(t *testing.T) {
samples, series := createTimeseries(1, 10)
c := NewTestBlockedWriteClient()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
batchSendDeadline := time.Millisecond
flushDeadline := 10 * time.Millisecond
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.StoreSeries(series, 0)
m.Start()
for i := 0; i < 100; i++ {
done := make(chan struct{})
go func() {
m.Append(samples)
time.Sleep(batchSendDeadline)
m.shards.stop()
m.shards.start(1)
done <- struct{}{}
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Error("Deadlock between sending and stopping detected")
t.FailNow()
}
}
// We can only call stop if there was not a deadlock.
m.Stop()
}
func TestReleaseNoninternedString(t *testing.T) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig