diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 5fc66439b..05719c998 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -360,7 +360,6 @@ outer: func (t *QueueManager) Start() { // Initialise some metrics. t.metrics.shardCapacity.Set(float64(t.cfg.Capacity)) - t.metrics.pendingSamples.Set(0) t.metrics.maxNumShards.Set(float64(t.cfg.MaxShards)) t.metrics.minNumShards.Set(float64(t.cfg.MinShards)) t.metrics.desiredNumShards.Set(float64(t.cfg.MinShards)) @@ -667,7 +666,8 @@ type shards struct { // Hard shutdown context is used to terminate outgoing HTTP connections // after giving them a chance to terminate. - hardShutdown context.CancelFunc + hardShutdown context.CancelFunc + droppedOnHardShutdown uint32 } // start the shards; must be called before any call to enqueue. @@ -675,6 +675,9 @@ func (s *shards) start(n int) { s.mtx.Lock() defer s.mtx.Unlock() + s.qm.metrics.pendingSamples.Set(0) + s.qm.metrics.numShards.Set(float64(n)) + newQueues := make([]chan sample, n) for i := 0; i < n; i++ { newQueues[i] = make(chan sample, s.qm.cfg.Capacity) @@ -687,10 +690,10 @@ func (s *shards) start(n int) { s.softShutdown = make(chan struct{}) s.running = int32(n) s.done = make(chan struct{}) + atomic.StoreUint32(&s.droppedOnHardShutdown, 0) for i := 0; i < n; i++ { go s.runShard(hardShutdownCtx, i, newQueues[i]) } - s.qm.metrics.numShards.Set(float64(n)) } // stop the shards; subsequent call to enqueue will return false. @@ -715,12 +718,14 @@ func (s *shards) stop() { case <-s.done: return case <-time.After(s.qm.flushDeadline): - level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown") } // Force an unclean shutdown. s.hardShutdown() <-s.done + if dropped := atomic.LoadUint32(&s.droppedOnHardShutdown); dropped > 0 { + level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown", "count", dropped) + } } // enqueue a sample. If we are currently in the process of shutting down or resharding, @@ -740,6 +745,7 @@ func (s *shards) enqueue(ref uint64, sample sample) bool { case <-s.softShutdown: return false case s.queues[shard] <- sample: + s.qm.metrics.pendingSamples.Inc() return true } } @@ -777,6 +783,12 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) { for { select { case <-ctx.Done(): + // In this case we drop all samples in the buffer and the queue. + // Remove them from pending and mark them as failed. + droppedSamples := nPending + len(queue) + s.qm.metrics.pendingSamples.Sub(float64(droppedSamples)) + s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples)) + atomic.AddUint32(&s.droppedOnHardShutdown, uint32(droppedSamples)) return case sample, ok := <-queue: @@ -797,7 +809,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) { pendingSamples[nPending].Samples[0].Timestamp = sample.t pendingSamples[nPending].Samples[0].Value = sample.v nPending++ - s.qm.metrics.pendingSamples.Inc() if nPending >= max { s.sendSamples(ctx, pendingSamples, &buf)