Merge pull request #13491 from bboreham/faster-store-series
storage/remote: speed up StoreSeries by re-using labels.Builder
This commit is contained in:
commit
8655fe5401
|
@ -413,9 +413,10 @@ type QueueManager struct {
|
|||
clientMtx sync.RWMutex
|
||||
storeClient WriteClient
|
||||
|
||||
seriesMtx sync.Mutex // Covers seriesLabels and droppedSeries.
|
||||
seriesMtx sync.Mutex // Covers seriesLabels, droppedSeries and builder.
|
||||
seriesLabels map[chunks.HeadSeriesRef]labels.Labels
|
||||
droppedSeries map[chunks.HeadSeriesRef]struct{}
|
||||
builder *labels.Builder
|
||||
|
||||
seriesSegmentMtx sync.Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first.
|
||||
seriesSegmentIndexes map[chunks.HeadSeriesRef]int
|
||||
|
@ -482,6 +483,7 @@ func NewQueueManager(
|
|||
seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels),
|
||||
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
|
||||
droppedSeries: make(map[chunks.HeadSeriesRef]struct{}),
|
||||
builder: labels.NewBuilder(labels.EmptyLabels()),
|
||||
|
||||
numShards: cfg.MinShards,
|
||||
reshardChan: make(chan int),
|
||||
|
@ -897,12 +899,14 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
|
|||
// Just make sure all the Refs of Series will insert into seriesSegmentIndexes map for tracking.
|
||||
t.seriesSegmentIndexes[s.Ref] = index
|
||||
|
||||
ls := processExternalLabels(s.Labels, t.externalLabels)
|
||||
lbls, keep := relabel.Process(ls, t.relabelConfigs...)
|
||||
if !keep || lbls.IsEmpty() {
|
||||
t.builder.Reset(s.Labels)
|
||||
processExternalLabels(t.builder, t.externalLabels)
|
||||
keep := relabel.ProcessBuilder(t.builder, t.relabelConfigs...)
|
||||
if !keep {
|
||||
t.droppedSeries[s.Ref] = struct{}{}
|
||||
continue
|
||||
}
|
||||
lbls := t.builder.Labels()
|
||||
t.internLabels(lbls)
|
||||
|
||||
// We should not ever be replacing a series labels in the map, but just
|
||||
|
@ -967,30 +971,14 @@ func (t *QueueManager) releaseLabels(ls labels.Labels) {
|
|||
ls.ReleaseStrings(t.interner.release)
|
||||
}
|
||||
|
||||
// processExternalLabels merges externalLabels into ls. If ls contains
|
||||
// a label in externalLabels, the value in ls wins.
|
||||
func processExternalLabels(ls labels.Labels, externalLabels []labels.Label) labels.Labels {
|
||||
if len(externalLabels) == 0 {
|
||||
return ls
|
||||
}
|
||||
|
||||
b := labels.NewScratchBuilder(ls.Len() + len(externalLabels))
|
||||
j := 0
|
||||
ls.Range(func(l labels.Label) {
|
||||
for j < len(externalLabels) && l.Name > externalLabels[j].Name {
|
||||
b.Add(externalLabels[j].Name, externalLabels[j].Value)
|
||||
j++
|
||||
// processExternalLabels merges externalLabels into b. If b contains
|
||||
// a label in externalLabels, the value in b wins.
|
||||
func processExternalLabels(b *labels.Builder, externalLabels []labels.Label) {
|
||||
for _, el := range externalLabels {
|
||||
if b.Get(el.Name) == "" {
|
||||
b.Set(el.Name, el.Value)
|
||||
}
|
||||
if j < len(externalLabels) && l.Name == externalLabels[j].Name {
|
||||
j++
|
||||
}
|
||||
b.Add(l.Name, l.Value)
|
||||
})
|
||||
for ; j < len(externalLabels); j++ {
|
||||
b.Add(externalLabels[j].Name, externalLabels[j].Value)
|
||||
}
|
||||
|
||||
return b.Labels()
|
||||
}
|
||||
|
||||
func (t *QueueManager) updateShardsLoop() {
|
||||
|
|
|
@ -38,6 +38,7 @@ import (
|
|||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/relabel"
|
||||
"github.com/prometheus/prometheus/model/timestamp"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/scrape"
|
||||
|
@ -868,29 +869,30 @@ func (c *NopWriteClient) Store(context.Context, []byte, int) error { return nil
|
|||
func (c *NopWriteClient) Name() string { return "nopwriteclient" }
|
||||
func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" }
|
||||
|
||||
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
|
||||
var extraLabels []labels.Label = []labels.Label{
|
||||
{Name: "kubernetes_io_arch", Value: "amd64"},
|
||||
{Name: "kubernetes_io_instance_type", Value: "c3.somesize"},
|
||||
{Name: "kubernetes_io_os", Value: "linux"},
|
||||
{Name: "container_name", Value: "some-name"},
|
||||
{Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"},
|
||||
{Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"},
|
||||
{Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"},
|
||||
{Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"},
|
||||
{Name: "instance", Value: "ip-111-11-1-11.ec2.internal"},
|
||||
{Name: "job", Value: "kubernetes-cadvisor"},
|
||||
{Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"},
|
||||
{Name: "monitor", Value: "prod"},
|
||||
{Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"},
|
||||
{Name: "namespace", Value: "kube-system"},
|
||||
{Name: "pod_name", Value: "some-other-name-5j8s8"},
|
||||
}
|
||||
|
||||
func BenchmarkSampleSend(b *testing.B) {
|
||||
// Send one sample per series, which is the typical remote_write case
|
||||
const numSamples = 1
|
||||
const numSeries = 10000
|
||||
|
||||
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
|
||||
extraLabels := []labels.Label{
|
||||
{Name: "kubernetes_io_arch", Value: "amd64"},
|
||||
{Name: "kubernetes_io_instance_type", Value: "c3.somesize"},
|
||||
{Name: "kubernetes_io_os", Value: "linux"},
|
||||
{Name: "container_name", Value: "some-name"},
|
||||
{Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"},
|
||||
{Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"},
|
||||
{Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"},
|
||||
{Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"},
|
||||
{Name: "instance", Value: "ip-111-11-1-11.ec2.internal"},
|
||||
{Name: "job", Value: "kubernetes-cadvisor"},
|
||||
{Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"},
|
||||
{Name: "monitor", Value: "prod"},
|
||||
{Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"},
|
||||
{Name: "namespace", Value: "kube-system"},
|
||||
{Name: "pod_name", Value: "some-other-name-5j8s8"},
|
||||
}
|
||||
samples, series := createTimeseries(numSamples, numSeries, extraLabels...)
|
||||
|
||||
c := NewNopWriteClient()
|
||||
|
@ -921,6 +923,58 @@ func BenchmarkSampleSend(b *testing.B) {
|
|||
b.StopTimer()
|
||||
}
|
||||
|
||||
// Check how long it takes to add N series, including external labels processing.
|
||||
func BenchmarkStoreSeries(b *testing.B) {
|
||||
externalLabels := []labels.Label{
|
||||
{Name: "cluster", Value: "mycluster"},
|
||||
{Name: "replica", Value: "1"},
|
||||
}
|
||||
relabelConfigs := []*relabel.Config{{
|
||||
SourceLabels: model.LabelNames{"namespace"},
|
||||
Separator: ";",
|
||||
Regex: relabel.MustNewRegexp("kube.*"),
|
||||
TargetLabel: "job",
|
||||
Replacement: "$1",
|
||||
Action: relabel.Replace,
|
||||
}}
|
||||
testCases := []struct {
|
||||
name string
|
||||
externalLabels []labels.Label
|
||||
ts []prompb.TimeSeries
|
||||
relabelConfigs []*relabel.Config
|
||||
}{
|
||||
{name: "plain"},
|
||||
{name: "externalLabels", externalLabels: externalLabels},
|
||||
{name: "relabel", relabelConfigs: relabelConfigs},
|
||||
{
|
||||
name: "externalLabels+relabel",
|
||||
externalLabels: externalLabels,
|
||||
relabelConfigs: relabelConfigs,
|
||||
},
|
||||
}
|
||||
|
||||
// numSeries chosen to be big enough that StoreSeries dominates creating a new queue manager.
|
||||
const numSeries = 1000
|
||||
_, series := createTimeseries(0, numSeries, extraLabels...)
|
||||
|
||||
for _, tc := range testCases {
|
||||
b.Run(tc.name, func(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
c := NewTestWriteClient()
|
||||
dir := b.TempDir()
|
||||
cfg := config.DefaultQueueConfig
|
||||
mcfg := config.DefaultMetadataConfig
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
|
||||
m.externalLabels = tc.externalLabels
|
||||
m.relabelConfigs = tc.relabelConfigs
|
||||
|
||||
m.StoreSeries(series, 0)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkStartup(b *testing.B) {
|
||||
dir := os.Getenv("WALDIR")
|
||||
if dir == "" {
|
||||
|
@ -959,7 +1013,8 @@ func BenchmarkStartup(b *testing.B) {
|
|||
}
|
||||
|
||||
func TestProcessExternalLabels(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
b := labels.NewBuilder(labels.EmptyLabels())
|
||||
for i, tc := range []struct {
|
||||
labels labels.Labels
|
||||
externalLabels []labels.Label
|
||||
expected labels.Labels
|
||||
|
@ -1020,7 +1075,9 @@ func TestProcessExternalLabels(t *testing.T) {
|
|||
expected: labels.FromStrings("a", "b", "c", "d", "e", "f"),
|
||||
},
|
||||
} {
|
||||
require.Equal(t, tc.expected, processExternalLabels(tc.labels, tc.externalLabels))
|
||||
b.Reset(tc.labels)
|
||||
processExternalLabels(b, tc.externalLabels)
|
||||
require.Equal(t, tc.expected, b.Labels(), "test %d", i)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue