Test sample timeout delivery.
This commit is contained in:
parent
fdb574b608
commit
731259afd0
|
@ -44,6 +44,9 @@ func (c *TestStorageClient) expectSamples(ss model.Samples) {
|
|||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
c.expectedSamples = map[string][]*prompb.Sample{}
|
||||
c.receivedSamples = map[string][]*prompb.Sample{}
|
||||
|
||||
for _, s := range ss {
|
||||
ts := labelProtosToLabels(MetricToLabelProtos(s.Metric)).String()
|
||||
c.expectedSamples[ts] = append(c.expectedSamples[ts], &prompb.Sample{
|
||||
|
@ -122,6 +125,44 @@ func TestSampleDelivery(t *testing.T) {
|
|||
c.waitForExpectedSamples(t)
|
||||
}
|
||||
|
||||
func TestSampleDeliveryTimeout(t *testing.T) {
|
||||
// Let's send on less sample than batch size, and wait the timeout duration
|
||||
n := config.DefaultQueueConfig.Capacity - 1
|
||||
|
||||
samples := make(model.Samples, 0, n)
|
||||
for i := 0; i < n; i++ {
|
||||
name := model.LabelValue(fmt.Sprintf("test_metric_%d", i))
|
||||
samples = append(samples, &model.Sample{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: name,
|
||||
},
|
||||
Value: model.SampleValue(i),
|
||||
})
|
||||
}
|
||||
|
||||
c := NewTestStorageClient()
|
||||
|
||||
cfg := config.DefaultQueueConfig
|
||||
cfg.MaxShards = 1
|
||||
cfg.BatchSendDeadline = 100 * time.Millisecond
|
||||
m := NewQueueManager(nil, cfg, nil, nil, c)
|
||||
m.Start()
|
||||
defer m.Stop()
|
||||
|
||||
// Send the samples twice, waiting for the samples in the meantime.
|
||||
c.expectSamples(samples)
|
||||
for _, s := range samples {
|
||||
m.Append(s)
|
||||
}
|
||||
c.waitForExpectedSamples(t)
|
||||
|
||||
c.expectSamples(samples)
|
||||
for _, s := range samples {
|
||||
m.Append(s)
|
||||
}
|
||||
c.waitForExpectedSamples(t)
|
||||
}
|
||||
|
||||
func TestSampleDeliveryOrder(t *testing.T) {
|
||||
ts := 10
|
||||
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
|
||||
|
|
Loading…
Reference in New Issue