From d9483bb77ca811393d697b7e31d9137bb8f967f6 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 29 Jan 2024 12:52:03 +0000 Subject: [PATCH 1/2] storage/remote: add BenchmarkStoreSeries Signed-off-by: Bryan Boreham --- storage/remote/queue_manager_test.go | 90 ++++++++++++++++++++++------ 1 file changed, 72 insertions(+), 18 deletions(-) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 778861e2b..cd3c39d9d 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -38,6 +38,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/scrape" @@ -868,29 +869,30 @@ func (c *NopWriteClient) Store(context.Context, []byte, int) error { return nil func (c *NopWriteClient) Name() string { return "nopwriteclient" } func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" } +// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics. +var extraLabels []labels.Label = []labels.Label{ + {Name: "kubernetes_io_arch", Value: "amd64"}, + {Name: "kubernetes_io_instance_type", Value: "c3.somesize"}, + {Name: "kubernetes_io_os", Value: "linux"}, + {Name: "container_name", Value: "some-name"}, + {Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"}, + {Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"}, + {Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"}, + {Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"}, + {Name: "instance", Value: "ip-111-11-1-11.ec2.internal"}, + {Name: "job", Value: "kubernetes-cadvisor"}, + {Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"}, + {Name: "monitor", Value: "prod"}, + {Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"}, + {Name: "namespace", Value: "kube-system"}, + {Name: "pod_name", Value: "some-other-name-5j8s8"}, +} + func BenchmarkSampleSend(b *testing.B) { // Send one sample per series, which is the typical remote_write case const numSamples = 1 const numSeries = 10000 - // Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics. - extraLabels := []labels.Label{ - {Name: "kubernetes_io_arch", Value: "amd64"}, - {Name: "kubernetes_io_instance_type", Value: "c3.somesize"}, - {Name: "kubernetes_io_os", Value: "linux"}, - {Name: "container_name", Value: "some-name"}, - {Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"}, - {Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"}, - {Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"}, - {Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"}, - {Name: "instance", Value: "ip-111-11-1-11.ec2.internal"}, - {Name: "job", Value: "kubernetes-cadvisor"}, - {Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"}, - {Name: "monitor", Value: "prod"}, - {Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"}, - {Name: "namespace", Value: "kube-system"}, - {Name: "pod_name", Value: "some-other-name-5j8s8"}, - } samples, series := createTimeseries(numSamples, numSeries, extraLabels...) c := NewNopWriteClient() @@ -921,6 +923,58 @@ func BenchmarkSampleSend(b *testing.B) { b.StopTimer() } +// Check how long it takes to add N series, including external labels processing. +func BenchmarkStoreSeries(b *testing.B) { + externalLabels := []labels.Label{ + {Name: "cluster", Value: "mycluster"}, + {Name: "replica", Value: "1"}, + } + relabelConfigs := []*relabel.Config{{ + SourceLabels: model.LabelNames{"namespace"}, + Separator: ";", + Regex: relabel.MustNewRegexp("kube.*"), + TargetLabel: "job", + Replacement: "$1", + Action: relabel.Replace, + }} + testCases := []struct { + name string + externalLabels []labels.Label + ts []prompb.TimeSeries + relabelConfigs []*relabel.Config + }{ + {name: "plain"}, + {name: "externalLabels", externalLabels: externalLabels}, + {name: "relabel", relabelConfigs: relabelConfigs}, + { + name: "externalLabels+relabel", + externalLabels: externalLabels, + relabelConfigs: relabelConfigs, + }, + } + + // numSeries chosen to be big enough that StoreSeries dominates creating a new queue manager. + const numSeries = 1000 + _, series := createTimeseries(0, numSeries, extraLabels...) + + for _, tc := range testCases { + b.Run(tc.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + c := NewTestWriteClient() + dir := b.TempDir() + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig + metrics := newQueueManagerMetrics(nil, "", "") + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m.externalLabels = tc.externalLabels + m.relabelConfigs = tc.relabelConfigs + + m.StoreSeries(series, 0) + } + }) + } +} + func BenchmarkStartup(b *testing.B) { dir := os.Getenv("WALDIR") if dir == "" { From dcd024a095f52e5a58dfc2d6701cc813d06c6594 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 29 Jan 2024 18:49:55 +0000 Subject: [PATCH 2/2] storage/remote: speed up StoreSeries by re-using labels.Builder Relabeling can take a pre-populated `Builder` instead of making a new one every time. This is much more efficient. Signed-off-by: Bryan Boreham --- storage/remote/queue_manager.go | 40 ++++++++++------------------ storage/remote/queue_manager_test.go | 7 +++-- 2 files changed, 19 insertions(+), 28 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index e37ec8c70..4b0c72cf6 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -413,9 +413,10 @@ type QueueManager struct { clientMtx sync.RWMutex storeClient WriteClient - seriesMtx sync.Mutex // Covers seriesLabels and droppedSeries. + seriesMtx sync.Mutex // Covers seriesLabels, droppedSeries and builder. seriesLabels map[chunks.HeadSeriesRef]labels.Labels droppedSeries map[chunks.HeadSeriesRef]struct{} + builder *labels.Builder seriesSegmentMtx sync.Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first. seriesSegmentIndexes map[chunks.HeadSeriesRef]int @@ -482,6 +483,7 @@ func NewQueueManager( seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels), seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), droppedSeries: make(map[chunks.HeadSeriesRef]struct{}), + builder: labels.NewBuilder(labels.EmptyLabels()), numShards: cfg.MinShards, reshardChan: make(chan int), @@ -897,12 +899,14 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) { // Just make sure all the Refs of Series will insert into seriesSegmentIndexes map for tracking. t.seriesSegmentIndexes[s.Ref] = index - ls := processExternalLabels(s.Labels, t.externalLabels) - lbls, keep := relabel.Process(ls, t.relabelConfigs...) - if !keep || lbls.IsEmpty() { + t.builder.Reset(s.Labels) + processExternalLabels(t.builder, t.externalLabels) + keep := relabel.ProcessBuilder(t.builder, t.relabelConfigs...) + if !keep { t.droppedSeries[s.Ref] = struct{}{} continue } + lbls := t.builder.Labels() t.internLabels(lbls) // We should not ever be replacing a series labels in the map, but just @@ -967,30 +971,14 @@ func (t *QueueManager) releaseLabels(ls labels.Labels) { ls.ReleaseStrings(t.interner.release) } -// processExternalLabels merges externalLabels into ls. If ls contains -// a label in externalLabels, the value in ls wins. -func processExternalLabels(ls labels.Labels, externalLabels []labels.Label) labels.Labels { - if len(externalLabels) == 0 { - return ls - } - - b := labels.NewScratchBuilder(ls.Len() + len(externalLabels)) - j := 0 - ls.Range(func(l labels.Label) { - for j < len(externalLabels) && l.Name > externalLabels[j].Name { - b.Add(externalLabels[j].Name, externalLabels[j].Value) - j++ +// processExternalLabels merges externalLabels into b. If b contains +// a label in externalLabels, the value in b wins. +func processExternalLabels(b *labels.Builder, externalLabels []labels.Label) { + for _, el := range externalLabels { + if b.Get(el.Name) == "" { + b.Set(el.Name, el.Value) } - if j < len(externalLabels) && l.Name == externalLabels[j].Name { - j++ - } - b.Add(l.Name, l.Value) - }) - for ; j < len(externalLabels); j++ { - b.Add(externalLabels[j].Name, externalLabels[j].Value) } - - return b.Labels() } func (t *QueueManager) updateShardsLoop() { diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index cd3c39d9d..8a6f68208 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -1013,7 +1013,8 @@ func BenchmarkStartup(b *testing.B) { } func TestProcessExternalLabels(t *testing.T) { - for _, tc := range []struct { + b := labels.NewBuilder(labels.EmptyLabels()) + for i, tc := range []struct { labels labels.Labels externalLabels []labels.Label expected labels.Labels @@ -1074,7 +1075,9 @@ func TestProcessExternalLabels(t *testing.T) { expected: labels.FromStrings("a", "b", "c", "d", "e", "f"), }, } { - require.Equal(t, tc.expected, processExternalLabels(tc.labels, tc.externalLabels)) + b.Reset(tc.labels) + processExternalLabels(b, tc.externalLabels) + require.Equal(t, tc.expected, b.Labels(), "test %d", i) } }