2017-05-10 09:44:13 +00:00
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
2018-05-29 08:51:29 +00:00
"context"
2017-05-10 09:44:13 +00:00
"fmt"
2019-02-14 01:11:17 +00:00
"io/ioutil"
2019-02-20 04:03:41 +00:00
"math"
2019-02-14 01:11:17 +00:00
"os"
2017-10-23 20:28:17 +00:00
"reflect"
2019-02-20 07:51:08 +00:00
"sort"
"strconv"
2017-05-10 09:44:13 +00:00
"sync"
"sync/atomic"
"testing"
"time"
2019-02-20 04:03:41 +00:00
"github.com/go-kit/kit/log"
2018-09-07 21:26:04 +00:00
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
2019-04-24 09:46:31 +00:00
client_testutil "github.com/prometheus/client_golang/prometheus/testutil"
2017-05-10 09:44:13 +00:00
"github.com/prometheus/common/model"
2017-10-23 13:57:44 +00:00
"github.com/prometheus/prometheus/config"
2019-03-08 16:29:25 +00:00
"github.com/prometheus/prometheus/pkg/labels"
2019-11-27 00:53:11 +00:00
"github.com/prometheus/prometheus/pkg/timestamp"
2017-10-23 20:28:17 +00:00
"github.com/prometheus/prometheus/prompb"
2019-09-19 09:15:41 +00:00
"github.com/prometheus/prometheus/tsdb/record"
2018-09-07 21:26:04 +00:00
"github.com/prometheus/prometheus/util/testutil"
2017-05-10 09:44:13 +00:00
)
2018-05-23 14:03:54 +00:00
const defaultFlushDeadline = 1 * time . Minute
2017-05-10 09:44:13 +00:00
func TestSampleDelivery ( t * testing . T ) {
// Let's create an even number of send batches so we don't run into the
// batch timeout case.
2019-08-13 09:10:21 +00:00
n := config . DefaultQueueConfig . MaxSamplesPerSend * 2
2020-02-25 19:10:57 +00:00
samples , series := createTimeseries ( n , n )
2017-05-10 09:44:13 +00:00
c := NewTestStorageClient ( )
2018-09-07 21:26:04 +00:00
c . expectSamples ( samples [ : len ( samples ) / 2 ] , series )
2017-05-10 09:44:13 +00:00
2017-10-23 13:57:44 +00:00
cfg := config . DefaultQueueConfig
2018-09-07 21:26:04 +00:00
cfg . BatchSendDeadline = model . Duration ( 100 * time . Millisecond )
2017-05-10 09:44:13 +00:00
cfg . MaxShards = 1
2019-02-14 01:11:17 +00:00
dir , err := ioutil . TempDir ( "" , "TestSampleDeliver" )
testutil . Ok ( t , err )
defer os . RemoveAll ( dir )
2020-02-03 21:47:03 +00:00
metrics := newQueueManagerMetrics ( nil )
m := NewQueueManager ( nil , metrics , nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , cfg , nil , nil , c , defaultFlushDeadline )
2019-03-13 10:02:36 +00:00
m . StoreSeries ( series , 0 )
2017-05-10 09:44:13 +00:00
// These should be received by the client.
m . Start ( )
2018-09-07 21:26:04 +00:00
m . Append ( samples [ : len ( samples ) / 2 ] )
2017-05-10 09:44:13 +00:00
defer m . Stop ( )
c . waitForExpectedSamples ( t )
2018-09-07 21:26:04 +00:00
c . expectSamples ( samples [ len ( samples ) / 2 : ] , series )
2020-02-25 19:10:57 +00:00
m . Append ( samples [ len ( samples ) / 2 : ] )
2018-09-07 21:26:04 +00:00
c . waitForExpectedSamples ( t )
2017-05-10 09:44:13 +00:00
}
2018-03-12 15:35:43 +00:00
func TestSampleDeliveryTimeout ( t * testing . T ) {
2018-03-12 16:48:51 +00:00
// Let's send one less sample than batch size, and wait the timeout duration
2018-09-07 21:26:04 +00:00
n := 9
2020-02-25 19:10:57 +00:00
samples , series := createTimeseries ( n , n )
2018-03-12 15:35:43 +00:00
c := NewTestStorageClient ( )
cfg := config . DefaultQueueConfig
cfg . MaxShards = 1
2018-08-24 14:55:21 +00:00
cfg . BatchSendDeadline = model . Duration ( 100 * time . Millisecond )
2019-02-14 01:11:17 +00:00
dir , err := ioutil . TempDir ( "" , "TestSampleDeliveryTimeout" )
testutil . Ok ( t , err )
defer os . RemoveAll ( dir )
2020-02-03 21:47:03 +00:00
metrics := newQueueManagerMetrics ( nil )
m := NewQueueManager ( nil , metrics , nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , cfg , nil , nil , c , defaultFlushDeadline )
2019-03-13 10:02:36 +00:00
m . StoreSeries ( series , 0 )
2018-03-12 15:35:43 +00:00
m . Start ( )
defer m . Stop ( )
// Send the samples twice, waiting for the samples in the meantime.
2018-09-07 21:26:04 +00:00
c . expectSamples ( samples , series )
m . Append ( samples )
2018-03-12 15:35:43 +00:00
c . waitForExpectedSamples ( t )
2018-09-07 21:26:04 +00:00
c . expectSamples ( samples , series )
m . Append ( samples )
2018-03-12 15:35:43 +00:00
c . waitForExpectedSamples ( t )
}
2017-05-10 09:44:13 +00:00
func TestSampleDeliveryOrder ( t * testing . T ) {
ts := 10
2017-10-23 13:57:44 +00:00
n := config . DefaultQueueConfig . MaxSamplesPerSend * ts
2019-09-19 09:15:41 +00:00
samples := make ( [ ] record . RefSample , 0 , n )
series := make ( [ ] record . RefSeries , 0 , n )
2017-05-10 09:44:13 +00:00
for i := 0 ; i < n ; i ++ {
2018-09-07 21:26:04 +00:00
name := fmt . Sprintf ( "test_metric_%d" , i % ts )
2019-09-19 09:15:41 +00:00
samples = append ( samples , record . RefSample {
2018-09-07 21:26:04 +00:00
Ref : uint64 ( i ) ,
T : int64 ( i ) ,
V : float64 ( i ) ,
} )
2019-09-19 09:15:41 +00:00
series = append ( series , record . RefSeries {
2018-09-07 21:26:04 +00:00
Ref : uint64 ( i ) ,
2019-11-18 19:53:33 +00:00
Labels : labels . Labels { labels . Label { Name : "__name__" , Value : name } } ,
2017-05-10 09:44:13 +00:00
} )
}
c := NewTestStorageClient ( )
2018-09-07 21:26:04 +00:00
c . expectSamples ( samples , series )
2019-02-14 01:11:17 +00:00
dir , err := ioutil . TempDir ( "" , "TestSampleDeliveryOrder" )
testutil . Ok ( t , err )
defer os . RemoveAll ( dir )
2020-02-03 21:47:03 +00:00
metrics := newQueueManagerMetrics ( nil )
m := NewQueueManager ( nil , metrics , nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , config . DefaultQueueConfig , nil , nil , c , defaultFlushDeadline )
2019-03-13 10:02:36 +00:00
m . StoreSeries ( series , 0 )
2017-05-10 09:44:13 +00:00
m . Start ( )
defer m . Stop ( )
2018-09-07 21:26:04 +00:00
// These should be received by the client.
m . Append ( samples )
2017-05-10 09:44:13 +00:00
c . waitForExpectedSamples ( t )
}
2018-09-07 21:26:04 +00:00
func TestShutdown ( t * testing . T ) {
2019-02-19 16:43:58 +00:00
deadline := 1 * time . Second
2018-09-07 21:26:04 +00:00
c := NewTestBlockedStorageClient ( )
2017-05-10 09:44:13 +00:00
2019-02-14 01:11:17 +00:00
dir , err := ioutil . TempDir ( "" , "TestShutdown" )
testutil . Ok ( t , err )
defer os . RemoveAll ( dir )
2020-02-03 21:47:03 +00:00
metrics := newQueueManagerMetrics ( nil )
m := NewQueueManager ( nil , metrics , nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , config . DefaultQueueConfig , nil , nil , c , deadline )
2020-02-25 19:10:57 +00:00
n := 2 * config . DefaultQueueConfig . MaxSamplesPerSend
samples , series := createTimeseries ( n , n )
2019-03-13 10:02:36 +00:00
m . StoreSeries ( series , 0 )
2018-09-07 21:26:04 +00:00
m . Start ( )
// Append blocks to guarantee delivery, so we do it in the background.
go func ( ) {
m . Append ( samples )
} ( )
2019-02-19 16:43:58 +00:00
time . Sleep ( 100 * time . Millisecond )
2018-09-07 21:26:04 +00:00
// Test to ensure that Stop doesn't block.
start := time . Now ( )
m . Stop ( )
// The samples will never be delivered, so duration should
// be at least equal to deadline, otherwise the flush deadline
// was not respected.
duration := time . Since ( start )
if duration > time . Duration ( deadline + ( deadline / 10 ) ) {
t . Errorf ( "Took too long to shutdown: %s > %s" , duration , deadline )
}
if duration < time . Duration ( deadline ) {
t . Errorf ( "Shutdown occurred before flush deadline: %s < %s" , duration , deadline )
2017-05-10 09:44:13 +00:00
}
}
2018-09-07 21:26:04 +00:00
func TestSeriesReset ( t * testing . T ) {
c := NewTestBlockedStorageClient ( )
deadline := 5 * time . Second
numSegments := 4
numSeries := 25
2019-02-14 01:11:17 +00:00
dir , err := ioutil . TempDir ( "" , "TestSeriesReset" )
testutil . Ok ( t , err )
defer os . RemoveAll ( dir )
2020-02-03 21:47:03 +00:00
metrics := newQueueManagerMetrics ( nil )
m := NewQueueManager ( nil , metrics , nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , config . DefaultQueueConfig , nil , nil , c , deadline )
2018-09-07 21:26:04 +00:00
for i := 0 ; i < numSegments ; i ++ {
2019-09-19 09:15:41 +00:00
series := [ ] record . RefSeries { }
2018-09-07 21:26:04 +00:00
for j := 0 ; j < numSeries ; j ++ {
2019-11-18 19:53:33 +00:00
series = append ( series , record . RefSeries { Ref : uint64 ( ( i * 100 ) + j ) , Labels : labels . Labels { { Name : "a" , Value : "a" } } } )
2018-09-07 21:26:04 +00:00
}
m . StoreSeries ( series , i )
2018-05-29 08:51:29 +00:00
}
2018-09-07 21:26:04 +00:00
testutil . Equals ( t , numSegments * numSeries , len ( m . seriesLabels ) )
m . SeriesReset ( 2 )
testutil . Equals ( t , numSegments * numSeries / 2 , len ( m . seriesLabels ) )
2017-05-10 09:44:13 +00:00
}
2018-09-07 21:26:04 +00:00
func TestReshard ( t * testing . T ) {
size := 10 // Make bigger to find more races.
2020-02-25 19:10:57 +00:00
nSeries := 6
nSamples := config . DefaultQueueConfig . Capacity * size
samples , series := createTimeseries ( nSamples , nSeries )
2018-09-07 21:26:04 +00:00
c := NewTestStorageClient ( )
c . expectSamples ( samples , series )
cfg := config . DefaultQueueConfig
cfg . MaxShards = 1
2019-02-14 01:11:17 +00:00
dir , err := ioutil . TempDir ( "" , "TestReshard" )
testutil . Ok ( t , err )
defer os . RemoveAll ( dir )
2020-02-03 21:47:03 +00:00
metrics := newQueueManagerMetrics ( nil )
m := NewQueueManager ( nil , metrics , nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , cfg , nil , nil , c , defaultFlushDeadline )
2019-03-13 10:02:36 +00:00
m . StoreSeries ( series , 0 )
2018-09-07 21:26:04 +00:00
m . Start ( )
defer m . Stop ( )
go func ( ) {
for i := 0 ; i < len ( samples ) ; i += config . DefaultQueueConfig . Capacity {
sent := m . Append ( samples [ i : i + config . DefaultQueueConfig . Capacity ] )
2019-08-09 01:36:42 +00:00
testutil . Assert ( t , sent , "samples not sent" )
2018-09-07 21:26:04 +00:00
time . Sleep ( 100 * time . Millisecond )
}
} ( )
for i := 1 ; i < len ( samples ) / config . DefaultQueueConfig . Capacity ; i ++ {
m . shards . stop ( )
m . shards . start ( i )
time . Sleep ( 100 * time . Millisecond )
}
c . waitForExpectedSamples ( t )
2017-05-10 09:44:13 +00:00
}
2019-04-16 10:25:19 +00:00
func TestReshardRaceWithStop ( t * testing . T ) {
c := NewTestStorageClient ( )
var m * QueueManager
h := sync . Mutex { }
h . Lock ( )
go func ( ) {
for {
2020-02-03 21:47:03 +00:00
metrics := newQueueManagerMetrics ( nil )
m = NewQueueManager ( nil , metrics , nil , "" , newEWMARate ( ewmaWeight , shardUpdateDuration ) , config . DefaultQueueConfig , nil , nil , c , defaultFlushDeadline )
2019-04-16 10:25:19 +00:00
m . Start ( )
h . Unlock ( )
h . Lock ( )
m . Stop ( )
}
} ( )
for i := 1 ; i < 100 ; i ++ {
h . Lock ( )
m . reshardChan <- i
h . Unlock ( )
}
}
2019-04-24 09:46:31 +00:00
func TestReleaseNoninternedString ( t * testing . T ) {
2020-02-03 21:47:03 +00:00
metrics := newQueueManagerMetrics ( nil )
2019-04-24 09:46:31 +00:00
c := NewTestStorageClient ( )
2020-02-03 21:47:03 +00:00
m := NewQueueManager ( nil , metrics , nil , "" , newEWMARate ( ewmaWeight , shardUpdateDuration ) , config . DefaultQueueConfig , nil , nil , c , defaultFlushDeadline )
2019-04-24 09:46:31 +00:00
m . Start ( )
for i := 1 ; i < 1000 ; i ++ {
2019-09-19 09:15:41 +00:00
m . StoreSeries ( [ ] record . RefSeries {
2019-08-13 08:34:14 +00:00
{
2019-04-24 09:46:31 +00:00
Ref : uint64 ( i ) ,
2019-11-18 19:53:33 +00:00
Labels : labels . Labels {
labels . Label {
2019-04-24 09:46:31 +00:00
Name : "asdf" ,
Value : fmt . Sprintf ( "%d" , i ) ,
} ,
} ,
} ,
} , 0 )
2019-06-27 18:48:21 +00:00
m . SeriesReset ( 1 )
2019-04-24 09:46:31 +00:00
}
metric := client_testutil . ToFloat64 ( noReferenceReleases )
testutil . Assert ( t , metric == 0 , "expected there to be no calls to release for strings that were not already interned: %d" , int ( metric ) )
}
2019-10-21 21:54:25 +00:00
func TestCalculateDesiredsShards ( t * testing . T ) {
type testcase struct {
startingShards int
samplesIn , samplesOut int64
reshard bool
}
cases := [ ] testcase {
{
// Test that ensures that if we haven't successfully sent a
// sample recently the queue will not reshard.
startingShards : 10 ,
reshard : false ,
samplesIn : 1000 ,
samplesOut : 10 ,
} ,
{
startingShards : 5 ,
reshard : true ,
samplesIn : 1000 ,
samplesOut : 10 ,
} ,
}
for _ , c := range cases {
2020-02-03 21:47:03 +00:00
metrics := newQueueManagerMetrics ( nil )
2019-10-21 21:54:25 +00:00
client := NewTestStorageClient ( )
2020-02-03 21:47:03 +00:00
m := NewQueueManager ( nil , metrics , nil , "" , newEWMARate ( ewmaWeight , shardUpdateDuration ) , config . DefaultQueueConfig , nil , nil , client , defaultFlushDeadline )
2019-10-21 21:54:25 +00:00
m . numShards = c . startingShards
m . samplesIn . incr ( c . samplesIn )
m . samplesOut . incr ( c . samplesOut )
m . lastSendTimestamp = time . Now ( ) . Unix ( )
// Resharding shouldn't take place if the last successful send was > batch send deadline*2 seconds ago.
if ! c . reshard {
m . lastSendTimestamp = m . lastSendTimestamp - int64 ( 3 * time . Duration ( config . DefaultQueueConfig . BatchSendDeadline ) / time . Second )
}
m . Start ( )
desiredShards := m . calculateDesiredShards ( )
m . Stop ( )
if ! c . reshard {
testutil . Assert ( t , desiredShards == m . numShards , "expected calculateDesiredShards to not want to reshard, wants to change from %d to %d shards" , m . numShards , desiredShards )
} else {
testutil . Assert ( t , desiredShards != m . numShards , "expected calculateDesiredShards to want to reshard, wants to change from %d to %d shards" , m . numShards , desiredShards )
}
}
}
2020-02-25 19:10:57 +00:00
func createTimeseries ( numSamples , numSeries int ) ( [ ] record . RefSample , [ ] record . RefSeries ) {
samples := make ( [ ] record . RefSample , 0 , numSamples )
series := make ( [ ] record . RefSeries , 0 , numSeries )
for i := 0 ; i < numSeries ; i ++ {
2018-09-07 21:26:04 +00:00
name := fmt . Sprintf ( "test_metric_%d" , i )
2020-02-25 19:10:57 +00:00
for j := 0 ; j < numSamples ; j ++ {
samples = append ( samples , record . RefSample {
Ref : uint64 ( i ) ,
T : int64 ( j ) ,
V : float64 ( i ) ,
} )
}
2019-09-19 09:15:41 +00:00
series = append ( series , record . RefSeries {
2018-09-07 21:26:04 +00:00
Ref : uint64 ( i ) ,
2019-11-18 19:53:33 +00:00
Labels : labels . Labels { { Name : "__name__" , Value : name } } ,
2018-09-07 21:26:04 +00:00
} )
}
return samples , series
2017-05-10 09:44:13 +00:00
}
2019-09-19 09:15:41 +00:00
func getSeriesNameFromRef ( r record . RefSeries ) string {
2018-09-07 21:26:04 +00:00
for _ , l := range r . Labels {
if l . Name == "__name__" {
return l . Value
}
}
return ""
2017-05-10 09:44:13 +00:00
}
2018-09-07 21:26:04 +00:00
type TestStorageClient struct {
receivedSamples map [ string ] [ ] prompb . Sample
expectedSamples map [ string ] [ ] prompb . Sample
2020-02-25 19:10:57 +00:00
withWaitGroup bool
2018-09-07 21:26:04 +00:00
wg sync . WaitGroup
mtx sync . Mutex
2019-06-27 18:48:21 +00:00
buf [ ] byte
2018-09-07 21:26:04 +00:00
}
2017-05-10 09:44:13 +00:00
2018-09-07 21:26:04 +00:00
func NewTestStorageClient ( ) * TestStorageClient {
return & TestStorageClient {
2020-02-25 19:10:57 +00:00
withWaitGroup : true ,
2018-09-07 21:26:04 +00:00
receivedSamples : map [ string ] [ ] prompb . Sample { } ,
expectedSamples : map [ string ] [ ] prompb . Sample { } ,
2017-05-10 09:44:13 +00:00
}
2018-09-07 21:26:04 +00:00
}
2017-05-10 09:44:13 +00:00
2019-09-19 09:15:41 +00:00
func ( c * TestStorageClient ) expectSamples ( ss [ ] record . RefSample , series [ ] record . RefSeries ) {
2020-02-25 19:10:57 +00:00
if ! c . withWaitGroup {
return
}
2018-09-07 21:26:04 +00:00
c . mtx . Lock ( )
defer c . mtx . Unlock ( )
2017-05-10 09:44:13 +00:00
2018-09-07 21:26:04 +00:00
c . expectedSamples = map [ string ] [ ] prompb . Sample { }
c . receivedSamples = map [ string ] [ ] prompb . Sample { }
2017-05-10 09:44:13 +00:00
2018-09-07 21:26:04 +00:00
for _ , s := range ss {
seriesName := getSeriesNameFromRef ( series [ s . Ref ] )
c . expectedSamples [ seriesName ] = append ( c . expectedSamples [ seriesName ] , prompb . Sample {
Timestamp : s . T ,
Value : s . V ,
} )
}
c . wg . Add ( len ( ss ) )
}
2017-05-10 09:44:13 +00:00
2019-06-27 18:48:21 +00:00
func ( c * TestStorageClient ) waitForExpectedSamples ( tb testing . TB ) {
2020-02-25 19:10:57 +00:00
if ! c . withWaitGroup {
return
}
2018-09-07 21:26:04 +00:00
c . wg . Wait ( )
c . mtx . Lock ( )
defer c . mtx . Unlock ( )
for ts , expectedSamples := range c . expectedSamples {
if ! reflect . DeepEqual ( expectedSamples , c . receivedSamples [ ts ] ) {
2019-06-27 18:48:21 +00:00
tb . Fatalf ( "%s: Expected %v, got %v" , ts , expectedSamples , c . receivedSamples [ ts ] )
2018-09-07 21:26:04 +00:00
}
2017-05-10 09:44:13 +00:00
}
2018-09-07 21:26:04 +00:00
}
2017-05-10 09:44:13 +00:00
2020-02-25 19:10:57 +00:00
func ( c * TestStorageClient ) expectSampleCount ( numSamples int ) {
if ! c . withWaitGroup {
return
}
2019-06-27 18:48:21 +00:00
c . mtx . Lock ( )
defer c . mtx . Unlock ( )
2020-02-25 19:10:57 +00:00
c . wg . Add ( numSamples )
2019-06-27 18:48:21 +00:00
}
func ( c * TestStorageClient ) waitForExpectedSampleCount ( ) {
2020-02-25 19:10:57 +00:00
if ! c . withWaitGroup {
return
}
2019-06-27 18:48:21 +00:00
c . wg . Wait ( )
}
2018-09-07 21:26:04 +00:00
func ( c * TestStorageClient ) Store ( _ context . Context , req [ ] byte ) error {
c . mtx . Lock ( )
defer c . mtx . Unlock ( )
2019-06-27 18:48:21 +00:00
// nil buffers are ok for snappy, ignore cast error.
if c . buf != nil {
c . buf = c . buf [ : cap ( c . buf ) ]
}
reqBuf , err := snappy . Decode ( c . buf , req )
c . buf = reqBuf
2018-09-07 21:26:04 +00:00
if err != nil {
return err
2017-05-10 09:44:13 +00:00
}
2018-09-07 21:26:04 +00:00
var reqProto prompb . WriteRequest
if err := proto . Unmarshal ( reqBuf , & reqProto ) ; err != nil {
return err
2017-05-10 09:44:13 +00:00
}
2018-09-07 21:26:04 +00:00
count := 0
for _ , ts := range reqProto . Timeseries {
var seriesName string
labels := labelProtosToLabels ( ts . Labels )
for _ , label := range labels {
if label . Name == "__name__" {
seriesName = label . Value
}
}
for _ , sample := range ts . Samples {
count ++
c . receivedSamples [ seriesName ] = append ( c . receivedSamples [ seriesName ] , sample )
}
2017-05-10 09:44:13 +00:00
}
2020-02-25 19:10:57 +00:00
if c . withWaitGroup {
c . wg . Add ( - count )
}
2018-09-07 21:26:04 +00:00
return nil
2017-05-10 09:44:13 +00:00
}
2018-05-29 08:51:29 +00:00
2018-09-07 21:26:04 +00:00
func ( c * TestStorageClient ) Name ( ) string {
return "teststorageclient"
}
2018-05-29 08:51:29 +00:00
2019-12-12 20:47:23 +00:00
func ( c * TestStorageClient ) Endpoint ( ) string {
return "http://test-remote.com/1234"
}
2018-09-07 21:26:04 +00:00
// TestBlockingStorageClient is a queue_manager StorageClient which will block
// on any calls to Store(), until the request's Context is cancelled, at which
// point the `numCalls` property will contain a count of how many times Store()
// was called.
type TestBlockingStorageClient struct {
numCalls uint64
}
func NewTestBlockedStorageClient ( ) * TestBlockingStorageClient {
return & TestBlockingStorageClient { }
}
func ( c * TestBlockingStorageClient ) Store ( ctx context . Context , _ [ ] byte ) error {
atomic . AddUint64 ( & c . numCalls , 1 )
<- ctx . Done ( )
return nil
}
func ( c * TestBlockingStorageClient ) NumCalls ( ) uint64 {
return atomic . LoadUint64 ( & c . numCalls )
}
func ( c * TestBlockingStorageClient ) Name ( ) string {
return "testblockingstorageclient"
2018-05-29 08:51:29 +00:00
}
2019-02-20 04:03:41 +00:00
2019-12-12 20:47:23 +00:00
func ( c * TestBlockingStorageClient ) Endpoint ( ) string {
return "http://test-remote-blocking.com/1234"
}
2019-06-27 18:48:21 +00:00
func BenchmarkSampleDelivery ( b * testing . B ) {
// Let's create an even number of send batches so we don't run into the
// batch timeout case.
n := config . DefaultQueueConfig . MaxSamplesPerSend * 10
2020-02-25 19:10:57 +00:00
samples , series := createTimeseries ( n , n )
2019-06-27 18:48:21 +00:00
c := NewTestStorageClient ( )
cfg := config . DefaultQueueConfig
cfg . BatchSendDeadline = model . Duration ( 100 * time . Millisecond )
cfg . MaxShards = 1
dir , err := ioutil . TempDir ( "" , "BenchmarkSampleDelivery" )
testutil . Ok ( b , err )
defer os . RemoveAll ( dir )
2020-02-03 21:47:03 +00:00
metrics := newQueueManagerMetrics ( nil )
m := NewQueueManager ( nil , metrics , nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , cfg , nil , nil , c , defaultFlushDeadline )
2019-06-27 18:48:21 +00:00
m . StoreSeries ( series , 0 )
// These should be received by the client.
m . Start ( )
defer m . Stop ( )
b . ResetTimer ( )
for i := 0 ; i < b . N ; i ++ {
2020-02-25 19:10:57 +00:00
c . expectSampleCount ( len ( samples ) )
2019-06-27 18:48:21 +00:00
m . Append ( samples )
c . waitForExpectedSampleCount ( )
}
// Do not include shutdown
b . StopTimer ( )
}
2019-02-20 04:03:41 +00:00
func BenchmarkStartup ( b * testing . B ) {
dir := os . Getenv ( "WALDIR" )
if dir == "" {
return
}
2019-02-20 07:51:08 +00:00
// Find the second largest segment; we will replay up to this.
// (Second largest as WALWatcher will start tailing the largest).
dirents , err := ioutil . ReadDir ( dir )
testutil . Ok ( b , err )
var segments [ ] int
for _ , dirent := range dirents {
if i , err := strconv . Atoi ( dirent . Name ( ) ) ; err != nil {
segments = append ( segments , i )
}
}
sort . Ints ( segments )
2019-02-20 04:03:41 +00:00
logger := log . NewLogfmtLogger ( log . NewSyncWriter ( os . Stdout ) )
logger = log . With ( logger , "caller" , log . DefaultCaller )
for n := 0 ; n < b . N ; n ++ {
2020-02-03 21:47:03 +00:00
metrics := newQueueManagerMetrics ( nil )
2019-02-20 04:03:41 +00:00
c := NewTestBlockedStorageClient ( )
2020-02-03 21:47:03 +00:00
m := NewQueueManager ( nil , metrics , logger , dir ,
2019-02-20 04:03:41 +00:00
newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
2019-03-01 19:04:26 +00:00
config . DefaultQueueConfig , nil , nil , c , 1 * time . Minute )
2019-11-27 00:53:11 +00:00
m . watcher . SetStartTime ( timestamp . Time ( math . MaxInt64 ) )
2019-09-19 09:15:41 +00:00
m . watcher . MaxSegment = segments [ len ( segments ) - 2 ]
err := m . watcher . Run ( )
2019-02-20 07:51:08 +00:00
testutil . Ok ( b , err )
2019-02-20 04:03:41 +00:00
}
}
2019-03-08 16:29:25 +00:00
func TestProcessExternalLabels ( t * testing . T ) {
for _ , tc := range [ ] struct {
2019-11-18 19:53:33 +00:00
labels labels . Labels
2019-03-08 16:29:25 +00:00
externalLabels labels . Labels
expected labels . Labels
} {
// Test adding labels at the end.
{
2019-11-18 19:53:33 +00:00
labels : labels . Labels { { Name : "a" , Value : "b" } } ,
2019-03-08 16:29:25 +00:00
externalLabels : labels . Labels { { Name : "c" , Value : "d" } } ,
expected : labels . Labels { { Name : "a" , Value : "b" } , { Name : "c" , Value : "d" } } ,
} ,
// Test adding labels at the beginning.
{
2019-11-18 19:53:33 +00:00
labels : labels . Labels { { Name : "c" , Value : "d" } } ,
2019-03-08 16:29:25 +00:00
externalLabels : labels . Labels { { Name : "a" , Value : "b" } } ,
expected : labels . Labels { { Name : "a" , Value : "b" } , { Name : "c" , Value : "d" } } ,
} ,
// Test we don't override existing labels.
{
2019-11-18 19:53:33 +00:00
labels : labels . Labels { { Name : "a" , Value : "b" } } ,
2019-03-08 16:29:25 +00:00
externalLabels : labels . Labels { { Name : "a" , Value : "c" } } ,
expected : labels . Labels { { Name : "a" , Value : "b" } } ,
} ,
} {
2019-08-09 01:36:42 +00:00
testutil . Equals ( t , tc . expected , processExternalLabels ( tc . labels , tc . externalLabels ) )
2019-03-08 16:29:25 +00:00
}
}
2019-12-24 15:59:50 +00:00
func TestCalculateDesiredShards ( t * testing . T ) {
c := NewTestStorageClient ( )
cfg := config . DefaultQueueConfig
dir , err := ioutil . TempDir ( "" , "TestCalculateDesiredShards" )
testutil . Ok ( t , err )
defer os . RemoveAll ( dir )
2020-02-03 21:47:03 +00:00
metrics := newQueueManagerMetrics ( nil )
2019-12-24 15:59:50 +00:00
samplesIn := newEWMARate ( ewmaWeight , shardUpdateDuration )
2020-02-03 21:47:03 +00:00
m := NewQueueManager ( nil , metrics , nil , dir , samplesIn , cfg , nil , nil , c , defaultFlushDeadline )
2019-12-24 15:59:50 +00:00
// Need to start the queue manager so the proper metrics are initialized.
// However we can stop it right away since we don't need to do any actual
// processing.
m . Start ( )
m . Stop ( )
inputRate := int64 ( 50000 )
var pendingSamples int64
// Two minute startup, no samples are sent.
startedAt := time . Now ( ) . Add ( - 2 * time . Minute )
// helper function for adding samples.
addSamples := func ( s int64 , ts time . Duration ) {
pendingSamples += s
samplesIn . incr ( s )
samplesIn . tick ( )
highestTimestamp . Set ( float64 ( startedAt . Add ( ts ) . Unix ( ) ) )
}
// helper function for sending samples.
sendSamples := func ( s int64 , ts time . Duration ) {
pendingSamples -= s
m . samplesOut . incr ( s )
m . samplesOutDuration . incr ( int64 ( m . numShards ) * int64 ( shardUpdateDuration ) )
// highest sent is how far back pending samples would be at our input rate.
highestSent := startedAt . Add ( ts - time . Duration ( pendingSamples / inputRate ) * time . Second )
m . highestSentTimestampMetric . Set ( float64 ( highestSent . Unix ( ) ) )
atomic . StoreInt64 ( & m . lastSendTimestamp , time . Now ( ) . Unix ( ) )
}
ts := time . Duration ( 0 )
for ; ts < 120 * time . Second ; ts += shardUpdateDuration {
addSamples ( inputRate * int64 ( shardUpdateDuration / time . Second ) , ts )
m . numShards = m . calculateDesiredShards ( )
testutil . Equals ( t , 1 , m . numShards )
}
// Assume 100ms per request, or 10 requests per second per shard.
// Shard calculation should never drop below barely keeping up.
minShards := int ( inputRate ) / cfg . MaxSamplesPerSend / 10
// This test should never go above 200 shards, that would be more resources than needed.
maxShards := 200
for ; ts < 15 * time . Minute ; ts += shardUpdateDuration {
sin := inputRate * int64 ( shardUpdateDuration / time . Second )
addSamples ( sin , ts )
sout := int64 ( m . numShards * cfg . MaxSamplesPerSend ) * int64 ( shardUpdateDuration / ( 100 * time . Millisecond ) )
// You can't send samples that don't exist so cap at the number of pending samples.
if sout > pendingSamples {
sout = pendingSamples
}
sendSamples ( sout , ts )
t . Log ( "desiredShards" , m . numShards , "pendingSamples" , pendingSamples )
m . numShards = m . calculateDesiredShards ( )
testutil . Assert ( t , m . numShards >= minShards , "Shards are too low. desiredShards=%d, minShards=%d, t_seconds=%d" , m . numShards , minShards , ts / time . Second )
testutil . Assert ( t , m . numShards <= maxShards , "Shards are too high. desiredShards=%d, maxShards=%d, t_seconds=%d" , m . numShards , maxShards , ts / time . Second )
}
testutil . Assert ( t , pendingSamples == 0 , "Remote write never caught up, there are still %d pending samples." , pendingSamples )
}