From 15180831684c5fb9541f8490467dd1a0a49f2192 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Tue, 25 Feb 2020 11:10:57 -0800 Subject: [PATCH] Rw testability improvements (#6537) * Change createTimeseries to take values for number of series and number of samples per series. Signed-off-by: Callum Styan * Take num of samples to expect in expectSampleCount instead of array of samples. Signed-off-by: Callum Styan * Add field to TestStorageClient to ignore samples sent waitgroup for potential tests where we don't care about delivery of all samples. Signed-off-by: Callum Styan * Fix up tests a little bit. Signed-off-by: Callum Styan --- storage/remote/queue_manager_test.go | 61 +++++++++++++++++++--------- 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 8c66041c6..e4917441f 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -47,7 +47,7 @@ func TestSampleDelivery(t *testing.T) { // Let's create an even number of send batches so we don't run into the // batch timeout case. n := config.DefaultQueueConfig.MaxSamplesPerSend * 2 - samples, series := createTimeseries(n) + samples, series := createTimeseries(n, n) c := NewTestStorageClient() c.expectSamples(samples[:len(samples)/2], series) @@ -70,15 +70,15 @@ func TestSampleDelivery(t *testing.T) { defer m.Stop() c.waitForExpectedSamples(t) - m.Append(samples[len(samples)/2:]) c.expectSamples(samples[len(samples)/2:], series) + m.Append(samples[len(samples)/2:]) c.waitForExpectedSamples(t) } func TestSampleDeliveryTimeout(t *testing.T) { // Let's send one less sample than batch size, and wait the timeout duration n := 9 - samples, series := createTimeseries(n) + samples, series := createTimeseries(n, n) c := NewTestStorageClient() cfg := config.DefaultQueueConfig @@ -151,7 +151,8 @@ func TestShutdown(t *testing.T) { metrics := newQueueManagerMetrics(nil) m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) - samples, series := createTimeseries(2 * config.DefaultQueueConfig.MaxSamplesPerSend) + n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend + samples, series := createTimeseries(n, n) m.StoreSeries(series, 0) m.Start() @@ -202,8 +203,9 @@ func TestSeriesReset(t *testing.T) { func TestReshard(t *testing.T) { size := 10 // Make bigger to find more races. - n := config.DefaultQueueConfig.Capacity * size - samples, series := createTimeseries(n) + nSeries := 6 + nSamples := config.DefaultQueueConfig.Capacity * size + samples, series := createTimeseries(nSamples, nSeries) c := NewTestStorageClient() c.expectSamples(samples, series) @@ -335,16 +337,19 @@ func TestCalculateDesiredsShards(t *testing.T) { } } -func createTimeseries(n int) ([]record.RefSample, []record.RefSeries) { - samples := make([]record.RefSample, 0, n) - series := make([]record.RefSeries, 0, n) - for i := 0; i < n; i++ { +func createTimeseries(numSamples, numSeries int) ([]record.RefSample, []record.RefSeries) { + samples := make([]record.RefSample, 0, numSamples) + series := make([]record.RefSeries, 0, numSeries) + for i := 0; i < numSeries; i++ { name := fmt.Sprintf("test_metric_%d", i) - samples = append(samples, record.RefSample{ - Ref: uint64(i), - T: int64(i), - V: float64(i), - }) + for j := 0; j < numSamples; j++ { + samples = append(samples, record.RefSample{ + Ref: uint64(i), + T: int64(j), + V: float64(i), + }) + + } series = append(series, record.RefSeries{ Ref: uint64(i), Labels: labels.Labels{{Name: "__name__", Value: name}}, @@ -365,6 +370,7 @@ func getSeriesNameFromRef(r record.RefSeries) string { type TestStorageClient struct { receivedSamples map[string][]prompb.Sample expectedSamples map[string][]prompb.Sample + withWaitGroup bool wg sync.WaitGroup mtx sync.Mutex buf []byte @@ -372,12 +378,16 @@ type TestStorageClient struct { func NewTestStorageClient() *TestStorageClient { return &TestStorageClient{ + withWaitGroup: true, receivedSamples: map[string][]prompb.Sample{}, expectedSamples: map[string][]prompb.Sample{}, } } func (c *TestStorageClient) expectSamples(ss []record.RefSample, series []record.RefSeries) { + if !c.withWaitGroup { + return + } c.mtx.Lock() defer c.mtx.Unlock() @@ -395,6 +405,9 @@ func (c *TestStorageClient) expectSamples(ss []record.RefSample, series []record } func (c *TestStorageClient) waitForExpectedSamples(tb testing.TB) { + if !c.withWaitGroup { + return + } c.wg.Wait() c.mtx.Lock() defer c.mtx.Unlock() @@ -405,13 +418,19 @@ func (c *TestStorageClient) waitForExpectedSamples(tb testing.TB) { } } -func (c *TestStorageClient) expectSampleCount(ss []record.RefSample) { +func (c *TestStorageClient) expectSampleCount(numSamples int) { + if !c.withWaitGroup { + return + } c.mtx.Lock() defer c.mtx.Unlock() - c.wg.Add(len(ss)) + c.wg.Add(numSamples) } func (c *TestStorageClient) waitForExpectedSampleCount() { + if !c.withWaitGroup { + return + } c.wg.Wait() } @@ -447,7 +466,9 @@ func (c *TestStorageClient) Store(_ context.Context, req []byte) error { c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample) } } - c.wg.Add(-count) + if c.withWaitGroup { + c.wg.Add(-count) + } return nil } @@ -493,7 +514,7 @@ func BenchmarkSampleDelivery(b *testing.B) { // Let's create an even number of send batches so we don't run into the // batch timeout case. n := config.DefaultQueueConfig.MaxSamplesPerSend * 10 - samples, series := createTimeseries(n) + samples, series := createTimeseries(n, n) c := NewTestStorageClient() @@ -515,7 +536,7 @@ func BenchmarkSampleDelivery(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - c.expectSampleCount(samples) + c.expectSampleCount(len(samples)) m.Append(samples) c.waitForExpectedSampleCount() }