diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 4ed739707..be82ba8bc 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -179,26 +179,29 @@ func (t *StorageQueueManager) Collect(ch chan<- prometheus.Metric) { func (t *StorageQueueManager) sendSamples(s model.Samples) { t.sendSemaphore <- true - defer func() { - <-t.sendSemaphore + + go func() { + defer func() { + <-t.sendSemaphore + }() + + // Samples are sent to the remote storage on a best-effort basis. If a + // sample isn't sent correctly the first time, it's simply dropped on the + // floor. + begin := time.Now() + err := t.tsdb.Store(s) + duration := time.Since(begin) / time.Second + + labelValue := success + if err != nil { + log.Warnf("error sending %d samples to remote storage: %s", len(s), err) + labelValue = failure + t.failedBatches.Inc() + t.failedSamples.Add(float64(len(s))) + } + t.samplesCount.WithLabelValues(labelValue).Add(float64(len(s))) + t.sendLatency.Observe(float64(duration)) }() - - // Samples are sent to the remote storage on a best-effort basis. If a - // sample isn't sent correctly the first time, it's simply dropped on the - // floor. - begin := time.Now() - err := t.tsdb.Store(s) - duration := time.Since(begin) / time.Second - - labelValue := success - if err != nil { - log.Warnf("error sending %d samples to remote storage: %s", len(s), err) - labelValue = failure - t.failedBatches.Inc() - t.failedSamples.Add(float64(len(s))) - } - t.samplesCount.WithLabelValues(labelValue).Add(float64(len(s))) - t.sendLatency.Observe(float64(duration)) } // Run continuously sends samples to the remote storage. @@ -223,7 +226,7 @@ func (t *StorageQueueManager) Run() { t.pendingSamples = append(t.pendingSamples, s) for len(t.pendingSamples) >= maxSamplesPerSend { - go t.sendSamples(t.pendingSamples[:maxSamplesPerSend]) + t.sendSamples(t.pendingSamples[:maxSamplesPerSend]) t.pendingSamples = t.pendingSamples[maxSamplesPerSend:] } case <-time.After(batchSendDeadline): @@ -235,7 +238,7 @@ func (t *StorageQueueManager) Run() { // Flush flushes remaining queued samples. func (t *StorageQueueManager) flush() { if len(t.pendingSamples) > 0 { - go t.sendSamples(t.pendingSamples) + t.sendSamples(t.pendingSamples) } t.pendingSamples = t.pendingSamples[:0] } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 71a7e17bd..9e2522d0b 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -50,6 +50,33 @@ func (c *TestStorageClient) Name() string { return "teststorageclient" } +type TestBlockingStorageClient struct { + block chan bool + getData chan bool +} + +func NewTestBlockedStorageClient() *TestBlockingStorageClient { + return &TestBlockingStorageClient{ + block: make(chan bool), + getData: make(chan bool), + } +} + +func (c *TestBlockingStorageClient) Store(s model.Samples) error { + <-c.getData + <-c.block + return nil +} + +func (c *TestBlockingStorageClient) unlock() { + close(c.getData) + close(c.block) +} + +func (c *TestBlockingStorageClient) Name() string { + return "testblockingstorageclient" +} + func TestSampleDelivery(t *testing.T) { // Let's create an even number of send batches so we don't run into the // batch timeout case. @@ -82,3 +109,40 @@ func TestSampleDelivery(t *testing.T) { c.waitForExpectedSamples(t) } + +func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { + // `maxSamplesPerSend*maxConcurrentSends + 1` samples should be consumed by + // goroutines, `maxSamplesPerSend` should be still in the queue. + n := maxSamplesPerSend*maxConcurrentSends + maxSamplesPerSend*2 + + samples := make(model.Samples, 0, n) + for i := 0; i < n; i++ { + samples = append(samples, &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: "test_metric", + }, + Value: model.SampleValue(i), + }) + } + + c := NewTestBlockedStorageClient() + m := NewStorageQueueManager(c, n) + + go m.Run() + + for _, s := range samples { + m.Append(s) + } + + for i := 0; i < maxConcurrentSends; i++ { + c.getData <- true // Wait while all goroutines are spawned. + } + + if len(m.queue) != maxSamplesPerSend { + t.Errorf("Queue should contain %d samples, it contains 0.", maxSamplesPerSend) + } + + c.unlock() + + defer m.Stop() +}