// Copyright 2013 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package remote import ( "context" "fmt" "io/ioutil" "math" "net/url" "os" "sort" "strconv" "strings" "sync" "testing" "time" "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/client_golang/prometheus" client_testutil "github.com/prometheus/client_golang/prometheus/testutil" common_config "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/atomic" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/tsdb/record" ) const defaultFlushDeadline = 1 * time.Minute func newHighestTimestampMetric() *maxTimestamp { return &maxTimestamp{ Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "highest_timestamp_in_seconds", Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch.", }), } } func TestSampleDelivery(t *testing.T) { testcases := []struct { name string samples bool exemplars bool }{ {samples: true, exemplars: false, name: "samples only"}, {samples: true, exemplars: true, name: "both samples and exemplars"}, {samples: false, exemplars: true, name: "exemplars only"}, } // Let's create an even number of send batches so we don't run into the // batch timeout case. n := 3 dir, err := ioutil.TempDir("", "TestSampleDelivery") require.NoError(t, err) defer func() { require.NoError(t, os.RemoveAll(dir)) }() s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) defer s.Close() queueConfig := config.DefaultQueueConfig queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond) queueConfig.MaxShards = 1 writeConfig := config.DefaultRemoteWriteConfig // We need to set URL's so that metric creation doesn't panic. writeConfig.URL = &common_config.URL{ URL: &url.URL{ Host: "http://test-storage.com", }, } writeConfig.QueueConfig = queueConfig writeConfig.SendExemplars = true conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: []*config.RemoteWriteConfig{ &writeConfig, }, } for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { var ( series []record.RefSeries samples []record.RefSample exemplars []record.RefExemplar ) // Generates same series in both cases. if tc.samples { samples, series = createTimeseries(n, n) } if tc.exemplars { exemplars, series = createExemplars(n, n) } // Apply new config. queueConfig.Capacity = len(samples) queueConfig.MaxSamplesPerSend = len(samples) / 2 require.NoError(t, s.ApplyConfig(conf)) hash, err := toHash(writeConfig) require.NoError(t, err) qm := s.rws.queues[hash] c := NewTestWriteClient() qm.SetClient(c) qm.StoreSeries(series, 0) // Send first half of data. c.expectSamples(samples[:len(samples)/2], series) c.expectExemplars(exemplars[:len(exemplars)/2], series) qm.Append(samples[:len(samples)/2]) qm.AppendExemplars(exemplars[:len(exemplars)/2]) c.waitForExpectedData(t) // Send second half of data. c.expectSamples(samples[len(samples)/2:], series) c.expectExemplars(exemplars[len(exemplars)/2:], series) qm.Append(samples[len(samples)/2:]) qm.AppendExemplars(exemplars[len(exemplars)/2:]) c.waitForExpectedData(t) }) } } func TestMetadataDelivery(t *testing.T) { c := NewTestWriteClient() dir, err := ioutil.TempDir("", "TestMetadataDelivery") require.NoError(t, err) defer os.RemoveAll(dir) cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m.Start() defer m.Stop() metadata := []scrape.MetricMetadata{} numMetadata := 1532 for i := 0; i < numMetadata; i++ { metadata = append(metadata, scrape.MetricMetadata{ Metric: "prometheus_remote_storage_sent_metadata_bytes_total_" + strconv.Itoa(i), Type: textparse.MetricTypeCounter, Help: "a nice help text", Unit: "", }) } m.AppendMetadata(context.Background(), metadata) require.Equal(t, numMetadata, len(c.receivedMetadata)) // One more write than the rounded qoutient should be performed in order to get samples that didn't // fit into MaxSamplesPerSend. require.Equal(t, numMetadata/mcfg.MaxSamplesPerSend+1, c.writesReceived) // Make sure the last samples were sent. require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric) } func TestSampleDeliveryTimeout(t *testing.T) { // Let's send one less sample than batch size, and wait the timeout duration n := 9 samples, series := createTimeseries(n, n) c := NewTestWriteClient() cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig cfg.MaxShards = 1 cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) dir, err := ioutil.TempDir("", "TestSampleDeliveryTimeout") require.NoError(t, err) defer func() { require.NoError(t, os.RemoveAll(dir)) }() metrics := newQueueManagerMetrics(nil, "", "") m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m.StoreSeries(series, 0) m.Start() defer m.Stop() // Send the samples twice, waiting for the samples in the meantime. c.expectSamples(samples, series) m.Append(samples) c.waitForExpectedData(t) c.expectSamples(samples, series) m.Append(samples) c.waitForExpectedData(t) } func TestSampleDeliveryOrder(t *testing.T) { ts := 10 n := config.DefaultQueueConfig.MaxSamplesPerSend * ts samples := make([]record.RefSample, 0, n) series := make([]record.RefSeries, 0, n) for i := 0; i < n; i++ { name := fmt.Sprintf("test_metric_%d", i%ts) samples = append(samples, record.RefSample{ Ref: uint64(i), T: int64(i), V: float64(i), }) series = append(series, record.RefSeries{ Ref: uint64(i), Labels: labels.Labels{labels.Label{Name: "__name__", Value: name}}, }) } c := NewTestWriteClient() c.expectSamples(samples, series) dir, err := ioutil.TempDir("", "TestSampleDeliveryOrder") require.NoError(t, err) defer func() { require.NoError(t, os.RemoveAll(dir)) }() cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m.StoreSeries(series, 0) m.Start() defer m.Stop() // These should be received by the client. m.Append(samples) c.waitForExpectedData(t) } func TestShutdown(t *testing.T) { deadline := 1 * time.Second c := NewTestBlockedWriteClient() dir, err := ioutil.TempDir("", "TestShutdown") require.NoError(t, err) defer func() { require.NoError(t, os.RemoveAll(dir)) }() cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false) n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend samples, series := createTimeseries(n, n) m.StoreSeries(series, 0) m.Start() // Append blocks to guarantee delivery, so we do it in the background. go func() { m.Append(samples) }() time.Sleep(100 * time.Millisecond) // Test to ensure that Stop doesn't block. start := time.Now() m.Stop() // The samples will never be delivered, so duration should // be at least equal to deadline, otherwise the flush deadline // was not respected. duration := time.Since(start) if duration > deadline+(deadline/10) { t.Errorf("Took too long to shutdown: %s > %s", duration, deadline) } if duration < deadline { t.Errorf("Shutdown occurred before flush deadline: %s < %s", duration, deadline) } } func TestSeriesReset(t *testing.T) { c := NewTestBlockedWriteClient() deadline := 5 * time.Second numSegments := 4 numSeries := 25 dir, err := ioutil.TempDir("", "TestSeriesReset") require.NoError(t, err) defer func() { require.NoError(t, os.RemoveAll(dir)) }() cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false) for i := 0; i < numSegments; i++ { series := []record.RefSeries{} for j := 0; j < numSeries; j++ { series = append(series, record.RefSeries{Ref: uint64((i * 100) + j), Labels: labels.Labels{{Name: "a", Value: "a"}}}) } m.StoreSeries(series, i) } require.Equal(t, numSegments*numSeries, len(m.seriesLabels)) m.SeriesReset(2) require.Equal(t, numSegments*numSeries/2, len(m.seriesLabels)) } func TestReshard(t *testing.T) { size := 10 // Make bigger to find more races. nSeries := 6 nSamples := config.DefaultQueueConfig.Capacity * size samples, series := createTimeseries(nSamples, nSeries) c := NewTestWriteClient() c.expectSamples(samples, series) cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig cfg.MaxShards = 1 dir, err := ioutil.TempDir("", "TestReshard") require.NoError(t, err) defer func() { require.NoError(t, os.RemoveAll(dir)) }() metrics := newQueueManagerMetrics(nil, "", "") m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m.StoreSeries(series, 0) m.Start() defer m.Stop() go func() { for i := 0; i < len(samples); i += config.DefaultQueueConfig.Capacity { sent := m.Append(samples[i : i+config.DefaultQueueConfig.Capacity]) require.True(t, sent, "samples not sent") time.Sleep(100 * time.Millisecond) } }() for i := 1; i < len(samples)/config.DefaultQueueConfig.Capacity; i++ { m.shards.stop() m.shards.start(i) time.Sleep(100 * time.Millisecond) } c.waitForExpectedData(t) } func TestReshardRaceWithStop(t *testing.T) { c := NewTestWriteClient() var m *QueueManager h := sync.Mutex{} h.Lock() cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig go func() { for { metrics := newQueueManagerMetrics(nil, "", "") m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m.Start() h.Unlock() h.Lock() m.Stop() } }() for i := 1; i < 100; i++ { h.Lock() m.reshardChan <- i h.Unlock() } } func TestReleaseNoninternedString(t *testing.T) { cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") c := NewTestWriteClient() m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m.Start() for i := 1; i < 1000; i++ { m.StoreSeries([]record.RefSeries{ { Ref: uint64(i), Labels: labels.Labels{ labels.Label{ Name: "asdf", Value: fmt.Sprintf("%d", i), }, }, }, }, 0) m.SeriesReset(1) } metric := client_testutil.ToFloat64(noReferenceReleases) require.Equal(t, 0.0, metric, "expected there to be no calls to release for strings that were not already interned: %d", int(metric)) } func TestShouldReshard(t *testing.T) { type testcase struct { startingShards int samplesIn, samplesOut, lastSendTimestamp int64 expectedToReshard bool } cases := []testcase{ { // Resharding shouldn't take place if the last successful send was > batch send deadline*2 seconds ago. startingShards: 10, samplesIn: 1000, samplesOut: 10, lastSendTimestamp: time.Now().Unix() - int64(3*time.Duration(config.DefaultQueueConfig.BatchSendDeadline)/time.Second), expectedToReshard: false, }, { startingShards: 5, samplesIn: 1000, samplesOut: 10, lastSendTimestamp: time.Now().Unix(), expectedToReshard: true, }, } cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig for _, c := range cases { metrics := newQueueManagerMetrics(nil, "", "") client := NewTestWriteClient() m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m.numShards = c.startingShards m.dataIn.incr(c.samplesIn) m.dataOut.incr(c.samplesOut) m.lastSendTimestamp.Store(c.lastSendTimestamp) m.Start() desiredShards := m.calculateDesiredShards() shouldReshard := m.shouldReshard(desiredShards) m.Stop() require.Equal(t, c.expectedToReshard, shouldReshard) } } func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSeries) { samples := make([]record.RefSample, 0, numSamples) series := make([]record.RefSeries, 0, numSeries) for i := 0; i < numSeries; i++ { name := fmt.Sprintf("test_metric_%d", i) for j := 0; j < numSamples; j++ { samples = append(samples, record.RefSample{ Ref: uint64(i), T: int64(j), V: float64(i), }) } series = append(series, record.RefSeries{ Ref: uint64(i), Labels: append(labels.Labels{{Name: "__name__", Value: name}}, extraLabels...), }) } return samples, series } func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) { exemplars := make([]record.RefExemplar, 0, numExemplars) series := make([]record.RefSeries, 0, numSeries) for i := 0; i < numSeries; i++ { name := fmt.Sprintf("test_metric_%d", i) for j := 0; j < numExemplars; j++ { e := record.RefExemplar{ Ref: uint64(i), T: int64(j), V: float64(i), Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i)), } exemplars = append(exemplars, e) } series = append(series, record.RefSeries{ Ref: uint64(i), Labels: labels.Labels{{Name: "__name__", Value: name}}, }) } return exemplars, series } func getSeriesNameFromRef(r record.RefSeries) string { for _, l := range r.Labels { if l.Name == "__name__" { return l.Value } } return "" } type TestWriteClient struct { receivedSamples map[string][]prompb.Sample expectedSamples map[string][]prompb.Sample receivedExemplars map[string][]prompb.Exemplar expectedExemplars map[string][]prompb.Exemplar receivedMetadata map[string][]prompb.MetricMetadata writesReceived int withWaitGroup bool wg sync.WaitGroup mtx sync.Mutex buf []byte } func NewTestWriteClient() *TestWriteClient { return &TestWriteClient{ withWaitGroup: true, receivedSamples: map[string][]prompb.Sample{}, expectedSamples: map[string][]prompb.Sample{}, receivedMetadata: map[string][]prompb.MetricMetadata{}, } } func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) { if !c.withWaitGroup { return } c.mtx.Lock() defer c.mtx.Unlock() c.expectedSamples = map[string][]prompb.Sample{} c.receivedSamples = map[string][]prompb.Sample{} for _, s := range ss { seriesName := getSeriesNameFromRef(series[s.Ref]) c.expectedSamples[seriesName] = append(c.expectedSamples[seriesName], prompb.Sample{ Timestamp: s.T, Value: s.V, }) } c.wg.Add(len(ss)) } func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []record.RefSeries) { if !c.withWaitGroup { return } c.mtx.Lock() defer c.mtx.Unlock() c.expectedExemplars = map[string][]prompb.Exemplar{} c.receivedExemplars = map[string][]prompb.Exemplar{} for _, s := range ss { seriesName := getSeriesNameFromRef(series[s.Ref]) e := prompb.Exemplar{ Labels: labelsToLabelsProto(s.Labels, nil), Timestamp: s.T, Value: s.V, } c.expectedExemplars[seriesName] = append(c.expectedExemplars[seriesName], e) } c.wg.Add(len(ss)) } func (c *TestWriteClient) waitForExpectedData(tb testing.TB) { if !c.withWaitGroup { return } c.wg.Wait() c.mtx.Lock() defer c.mtx.Unlock() for ts, expectedSamples := range c.expectedSamples { require.Equal(tb, expectedSamples, c.receivedSamples[ts], ts) } for ts, expectedExemplar := range c.expectedExemplars { require.Equal(tb, expectedExemplar, c.receivedExemplars[ts], ts) } } func (c *TestWriteClient) expectDataCount(numSamples int) { if !c.withWaitGroup { return } c.mtx.Lock() defer c.mtx.Unlock() c.wg.Add(numSamples) } func (c *TestWriteClient) waitForExpectedDataCount() { if !c.withWaitGroup { return } c.wg.Wait() } func (c *TestWriteClient) Store(_ context.Context, req []byte) error { c.mtx.Lock() defer c.mtx.Unlock() // nil buffers are ok for snappy, ignore cast error. if c.buf != nil { c.buf = c.buf[:cap(c.buf)] } reqBuf, err := snappy.Decode(c.buf, req) c.buf = reqBuf if err != nil { return err } var reqProto prompb.WriteRequest if err := proto.Unmarshal(reqBuf, &reqProto); err != nil { return err } count := 0 for _, ts := range reqProto.Timeseries { var seriesName string labels := labelProtosToLabels(ts.Labels) for _, label := range labels { if label.Name == "__name__" { seriesName = label.Value } } for _, sample := range ts.Samples { count++ c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample) } for _, ex := range ts.Exemplars { count++ c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex) } } if c.withWaitGroup { c.wg.Add(-count) } for _, m := range reqProto.Metadata { c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m) } c.writesReceived++ return nil } func (c *TestWriteClient) Name() string { return "testwriteclient" } func (c *TestWriteClient) Endpoint() string { return "http://test-remote.com/1234" } // TestBlockingWriteClient is a queue_manager WriteClient which will block // on any calls to Store(), until the request's Context is cancelled, at which // point the `numCalls` property will contain a count of how many times Store() // was called. type TestBlockingWriteClient struct { numCalls atomic.Uint64 } func NewTestBlockedWriteClient() *TestBlockingWriteClient { return &TestBlockingWriteClient{} } func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte) error { c.numCalls.Inc() <-ctx.Done() return nil } func (c *TestBlockingWriteClient) NumCalls() uint64 { return c.numCalls.Load() } func (c *TestBlockingWriteClient) Name() string { return "testblockingwriteclient" } func (c *TestBlockingWriteClient) Endpoint() string { return "http://test-remote-blocking.com/1234" } func BenchmarkSampleDelivery(b *testing.B) { // Send one sample per series, which is the typical remote_write case const numSamples = 1 const numSeries = 10000 // Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics. var extraLabels = labels.Labels{ {Name: "kubernetes_io_arch", Value: "amd64"}, {Name: "kubernetes_io_instance_type", Value: "c3.somesize"}, {Name: "kubernetes_io_os", Value: "linux"}, {Name: "container_name", Value: "some-name"}, {Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"}, {Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"}, {Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"}, {Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"}, {Name: "instance", Value: "ip-111-11-1-11.ec2.internal"}, {Name: "job", Value: "kubernetes-cadvisor"}, {Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"}, {Name: "monitor", Value: "prod"}, {Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"}, {Name: "namespace", Value: "kube-system"}, {Name: "pod_name", Value: "some-other-name-5j8s8"}, } samples, series := createTimeseries(numSamples, numSeries, extraLabels...) c := NewTestWriteClient() cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) cfg.MaxShards = 1 dir, err := ioutil.TempDir("", "BenchmarkSampleDelivery") require.NoError(b, err) defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil, "", "") m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m.StoreSeries(series, 0) // These should be received by the client. m.Start() defer m.Stop() b.ResetTimer() for i := 0; i < b.N; i++ { c.expectDataCount(len(samples)) go m.Append(samples) m.UpdateSeriesSegment(series, i+1) // simulate what wal.Watcher.garbageCollectSeries does m.SeriesReset(i + 1) c.waitForExpectedDataCount() } // Do not include shutdown b.StopTimer() } func BenchmarkStartup(b *testing.B) { dir := os.Getenv("WALDIR") if dir == "" { return } // Find the second largest segment; we will replay up to this. // (Second largest as WALWatcher will start tailing the largest). dirents, err := ioutil.ReadDir(dir) require.NoError(b, err) var segments []int for _, dirent := range dirents { if i, err := strconv.Atoi(dirent.Name()); err != nil { segments = append(segments, i) } } sort.Ints(segments) logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout)) logger = log.With(logger, "caller", log.DefaultCaller) cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig for n := 0; n < b.N; n++ { metrics := newQueueManagerMetrics(nil, "", "") c := NewTestBlockedWriteClient() m := NewQueueManager(metrics, nil, nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.MaxSegment = segments[len(segments)-2] err := m.watcher.Run() require.NoError(b, err) } } func TestProcessExternalLabels(t *testing.T) { for _, tc := range []struct { labels labels.Labels externalLabels labels.Labels expected labels.Labels }{ // Test adding labels at the end. { labels: labels.Labels{{Name: "a", Value: "b"}}, externalLabels: labels.Labels{{Name: "c", Value: "d"}}, expected: labels.Labels{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}}, }, // Test adding labels at the beginning. { labels: labels.Labels{{Name: "c", Value: "d"}}, externalLabels: labels.Labels{{Name: "a", Value: "b"}}, expected: labels.Labels{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}}, }, // Test we don't override existing labels. { labels: labels.Labels{{Name: "a", Value: "b"}}, externalLabels: labels.Labels{{Name: "a", Value: "c"}}, expected: labels.Labels{{Name: "a", Value: "b"}}, }, // Test empty externalLabels. { labels: labels.Labels{{Name: "a", Value: "b"}}, externalLabels: labels.Labels{}, expected: labels.Labels{{Name: "a", Value: "b"}}, }, // Test empty labels. { labels: labels.Labels{}, externalLabels: labels.Labels{{Name: "a", Value: "b"}}, expected: labels.Labels{{Name: "a", Value: "b"}}, }, // Test labels is longer than externalLabels. { labels: labels.Labels{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}}, externalLabels: labels.Labels{{Name: "e", Value: "f"}}, expected: labels.Labels{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}, {Name: "e", Value: "f"}}, }, // Test externalLabels is longer than labels. { labels: labels.Labels{{Name: "c", Value: "d"}}, externalLabels: labels.Labels{{Name: "a", Value: "b"}, {Name: "e", Value: "f"}}, expected: labels.Labels{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}, {Name: "e", Value: "f"}}, }, } { require.Equal(t, tc.expected, processExternalLabels(tc.labels, tc.externalLabels)) } } func TestCalculateDesiredShards(t *testing.T) { c := NewTestWriteClient() cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig dir, err := ioutil.TempDir("", "TestCalculateDesiredShards") require.NoError(t, err) defer func() { require.NoError(t, os.RemoveAll(dir)) }() metrics := newQueueManagerMetrics(nil, "", "") samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) // Need to start the queue manager so the proper metrics are initialized. // However we can stop it right away since we don't need to do any actual // processing. m.Start() m.Stop() inputRate := int64(50000) var pendingSamples int64 // Two minute startup, no samples are sent. startedAt := time.Now().Add(-2 * time.Minute) // helper function for adding samples. addSamples := func(s int64, ts time.Duration) { pendingSamples += s samplesIn.incr(s) samplesIn.tick() m.highestRecvTimestamp.Set(float64(startedAt.Add(ts).Unix())) } // helper function for sending samples. sendSamples := func(s int64, ts time.Duration) { pendingSamples -= s m.dataOut.incr(s) m.dataOutDuration.incr(int64(m.numShards) * int64(shardUpdateDuration)) // highest sent is how far back pending samples would be at our input rate. highestSent := startedAt.Add(ts - time.Duration(pendingSamples/inputRate)*time.Second) m.metrics.highestSentTimestamp.Set(float64(highestSent.Unix())) m.lastSendTimestamp.Store(time.Now().Unix()) } ts := time.Duration(0) for ; ts < 120*time.Second; ts += shardUpdateDuration { addSamples(inputRate*int64(shardUpdateDuration/time.Second), ts) m.numShards = m.calculateDesiredShards() require.Equal(t, 1, m.numShards) } // Assume 100ms per request, or 10 requests per second per shard. // Shard calculation should never drop below barely keeping up. minShards := int(inputRate) / cfg.MaxSamplesPerSend / 10 // This test should never go above 200 shards, that would be more resources than needed. maxShards := 200 for ; ts < 15*time.Minute; ts += shardUpdateDuration { sin := inputRate * int64(shardUpdateDuration/time.Second) addSamples(sin, ts) sout := int64(m.numShards*cfg.MaxSamplesPerSend) * int64(shardUpdateDuration/(100*time.Millisecond)) // You can't send samples that don't exist so cap at the number of pending samples. if sout > pendingSamples { sout = pendingSamples } sendSamples(sout, ts) t.Log("desiredShards", m.numShards, "pendingSamples", pendingSamples) m.numShards = m.calculateDesiredShards() require.GreaterOrEqual(t, m.numShards, minShards, "Shards are too low. desiredShards=%d, minShards=%d, t_seconds=%d", m.numShards, minShards, ts/time.Second) require.LessOrEqual(t, m.numShards, maxShards, "Shards are too high. desiredShards=%d, maxShards=%d, t_seconds=%d", m.numShards, maxShards, ts/time.Second) } require.Equal(t, int64(0), pendingSamples, "Remote write never caught up, there are still %d pending samples.", pendingSamples) } func TestQueueManagerMetrics(t *testing.T) { reg := prometheus.NewPedanticRegistry() metrics := newQueueManagerMetrics(reg, "name", "http://localhost:1234") // Make sure metrics pass linting. problems, err := client_testutil.GatherAndLint(reg) require.NoError(t, err) require.Equal(t, 0, len(problems), "Metric linting problems detected: %v", problems) // Make sure all metrics were unregistered. A failure here means you need // unregister a metric in `queueManagerMetrics.unregister()`. metrics.unregister() err = client_testutil.GatherAndCompare(reg, strings.NewReader("")) require.NoError(t, err) }