Refactor and fix queue resharding (#5286)

- Remove prometheus_remote_queue_last_send_timestamp_seconds metric.  Its not particularly useful, we have highest_timestamp_seconds.
- Factor out maxGauage, a gauge that only increases.
- Change sharding calculations to use max samples in timestamp - max samples out timestamp (not rates).
- Also include the ratio of samples dropped to correctly predict number of pending samples.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
This commit is contained in:
Tom Wilkie 2019-03-01 11:04:26 -08:00 committed by GitHub
parent d8c06bb2b7
commit 67da8e7b46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 118 additions and 131 deletions

View File

@ -0,0 +1,39 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
)
type maxGauge struct {
mtx sync.Mutex
value float64
prometheus.Gauge
}
func (m *maxGauge) Set(value float64) {
m.mtx.Lock()
defer m.mtx.Unlock()
if value > m.value {
m.value = value
m.Gauge.Set(value)
}
}
func (m *maxGauge) Get() float64 {
return m.value
}

View File

@ -27,6 +27,7 @@ import (
"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
pkgrelabel "github.com/prometheus/prometheus/pkg/relabel"
@ -51,7 +52,7 @@ const (
)
var (
succeededSamplesTotal = prometheus.NewCounterVec(
succeededSamplesTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -60,7 +61,7 @@ var (
},
[]string{queue},
)
failedSamplesTotal = prometheus.NewCounterVec(
failedSamplesTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -69,7 +70,7 @@ var (
},
[]string{queue},
)
retriedSamplesTotal = prometheus.NewCounterVec(
retriedSamplesTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -78,7 +79,7 @@ var (
},
[]string{queue},
)
droppedSamplesTotal = prometheus.NewCounterVec(
droppedSamplesTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -87,7 +88,7 @@ var (
},
[]string{queue},
)
enqueueRetriesTotal = prometheus.NewCounterVec(
enqueueRetriesTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -96,7 +97,7 @@ var (
},
[]string{queue},
)
sentBatchDuration = prometheus.NewHistogramVec(
sentBatchDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -106,16 +107,7 @@ var (
},
[]string{queue},
)
queueLastSendTimestamp = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_last_send_timestamp_seconds",
Help: "Timestamp of the last successful send by this queue.",
},
[]string{queue},
)
queueHighestSentTimestamp = prometheus.NewGaugeVec(
queueHighestSentTimestamp = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -124,7 +116,7 @@ var (
},
[]string{queue},
)
queuePendingSamples = prometheus.NewGaugeVec(
queuePendingSamples = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -133,7 +125,7 @@ var (
},
[]string{queue},
)
shardCapacity = prometheus.NewGaugeVec(
shardCapacity = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -142,7 +134,7 @@ var (
},
[]string{queue},
)
numShards = prometheus.NewGaugeVec(
numShards = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -153,20 +145,6 @@ var (
)
)
func init() {
prometheus.MustRegister(succeededSamplesTotal)
prometheus.MustRegister(failedSamplesTotal)
prometheus.MustRegister(retriedSamplesTotal)
prometheus.MustRegister(droppedSamplesTotal)
prometheus.MustRegister(enqueueRetriesTotal)
prometheus.MustRegister(sentBatchDuration)
prometheus.MustRegister(queueLastSendTimestamp)
prometheus.MustRegister(queueHighestSentTimestamp)
prometheus.MustRegister(queuePendingSamples)
prometheus.MustRegister(shardCapacity)
prometheus.MustRegister(numShards)
}
// StorageClient defines an interface for sending a batch of samples to an
// external timeseries database.
type StorageClient interface {
@ -189,17 +167,10 @@ type QueueManager struct {
client StorageClient
queueName string
watcher *WALWatcher
lastSendTimestampMetric prometheus.Gauge
highestSentTimestampMetric prometheus.Gauge
highestSentTimestampMetric *maxGauge
pendingSamplesMetric prometheus.Gauge
enqueueRetriesMetric prometheus.Counter
lastSendTimestamp int64
highestSentTimestamp int64
timestampLock sync.Mutex
highestTimestampIn *int64 // highest timestamp of any sample ingested by remote storage via scrape (Appender)
seriesMtx sync.Mutex
seriesLabels map[uint64][]prompb.Label
seriesSegmentIndexes map[uint64]int
@ -216,14 +187,12 @@ type QueueManager struct {
}
// NewQueueManager builds a new QueueManager.
func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, highestTimestampIn *int64, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*pkgrelabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager {
func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*pkgrelabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager {
if logger == nil {
logger = log.NewNopLogger()
} else {
logger = log.With(logger, "queue", client.Name())
}
t := &QueueManager{
logger: logger,
logger: log.With(logger, "queue", client.Name()),
flushDeadline: flushDeadline,
cfg: cfg,
externalLabels: externalLabels,
@ -231,8 +200,6 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, high
client: client,
queueName: client.Name(),
highestTimestampIn: highestTimestampIn,
seriesLabels: make(map[uint64][]prompb.Label),
seriesSegmentIndexes: make(map[uint64]int),
droppedSeries: make(map[uint64]struct{}),
@ -247,8 +214,9 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, high
samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),
}
t.lastSendTimestampMetric = queueLastSendTimestamp.WithLabelValues(t.queueName)
t.highestSentTimestampMetric = queueHighestSentTimestamp.WithLabelValues(t.queueName)
t.highestSentTimestampMetric = &maxGauge{
Gauge: queueHighestSentTimestamp.WithLabelValues(t.queueName),
}
t.pendingSamplesMetric = queuePendingSamples.WithLabelValues(t.queueName)
t.enqueueRetriesMetric = enqueueRetriesTotal.WithLabelValues(t.queueName)
t.watcher = NewWALWatcher(logger, client.Name(), t, walDir)
@ -411,12 +379,6 @@ func (t *QueueManager) updateShardsLoop() {
for {
select {
case <-ticker.C:
now := time.Now().Unix()
threshold := int64(time.Duration(2 * t.cfg.BatchSendDeadline).Seconds())
if now-t.lastSendTimestamp > threshold {
level.Debug(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold")
continue
}
t.calculateDesiredShards()
case <-t.quit:
return
@ -425,7 +387,6 @@ func (t *QueueManager) updateShardsLoop() {
}
func (t *QueueManager) calculateDesiredShards() {
t.samplesIn.tick()
t.samplesOut.tick()
t.samplesDropped.tick()
t.samplesOutDuration.tick()
@ -437,9 +398,11 @@ func (t *QueueManager) calculateDesiredShards() {
var (
samplesIn = t.samplesIn.rate()
samplesOut = t.samplesOut.rate()
samplesDropped = t.samplesDropped.rate()
samplesPending = samplesIn - samplesDropped - samplesOut
samplesKeptRatio = samplesOut / (t.samplesDropped.rate() + samplesOut)
samplesOutDuration = t.samplesOutDuration.rate()
highestSent = t.highestSentTimestampMetric.Get()
highestRecv = highestTimestamp.Get()
samplesPending = (highestRecv - highestSent) * samplesIn * samplesKeptRatio
)
// We use an integral accumulator, like in a PID, to help dampen oscillation.
@ -451,12 +414,18 @@ func (t *QueueManager) calculateDesiredShards() {
var (
timePerSample = samplesOutDuration / samplesOut
desiredShards = (timePerSample * (samplesIn - samplesDropped + samplesPending + t.integralAccumulator)) / float64(time.Second)
desiredShards = (timePerSample * samplesPending) / float64(time.Second)
)
level.Debug(t.logger).Log("msg", "QueueManager.caclulateDesiredShards",
"samplesIn", samplesIn, "samplesDropped", samplesDropped,
"samplesOut", samplesOut, "samplesPending", samplesPending,
"desiredShards", desiredShards)
"samplesIn", samplesIn,
"samplesOut", samplesOut,
"samplesKeptRatio", samplesKeptRatio,
"samplesPending", samplesPending,
"samplesOutDuration", samplesOutDuration,
"timePerSample", timePerSample,
"desiredShards", desiredShards,
"highestSent", highestSent,
"highestRecv", highestRecv)
// Changes in the number of shards must be greater than shardToleranceFraction.
var (
@ -515,23 +484,6 @@ func (t *QueueManager) newShards() *shards {
return s
}
// Check and set highestSentTimestamp
func (t *QueueManager) setHighestSentTimestamp(highest int64) {
t.timestampLock.Lock()
defer t.timestampLock.Unlock()
if highest > t.highestSentTimestamp {
t.highestSentTimestamp = highest
t.highestSentTimestampMetric.Set(float64(t.highestSentTimestamp) / 1000.)
}
}
func (t *QueueManager) setLastSendTimestamp(now time.Time) {
t.timestampLock.Lock()
defer t.timestampLock.Unlock()
t.lastSendTimestampMetric.Set(float64(now.UnixNano()) / 1e9)
t.lastSendTimestamp = now.Unix()
}
type shards struct {
mtx sync.RWMutex // With the WAL, this is never actually contended.
@ -713,11 +665,12 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries) {
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries) error {
backoff := s.qm.cfg.MinBackoff
req, highest, err := buildWriteRequest(samples)
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
if err != nil {
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
return err
}
for {
select {
case <-ctx.Done():
@ -731,9 +684,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
if err == nil {
succeededSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples)))
now := time.Now()
s.qm.setLastSendTimestamp(now)
s.qm.setHighestSentTimestamp(highest)
s.qm.highestSentTimestampMetric.Set(float64(highest / 1000))
return nil
}
@ -741,7 +692,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
return err
}
retriedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples)))
level.Error(s.qm.logger).Log("err", err)
level.Debug(s.qm.logger).Log("msg", "failed to send batch, retrying", "err", err)
time.Sleep(time.Duration(backoff))
backoff = backoff * 2

View File

@ -54,13 +54,12 @@ func TestSampleDelivery(t *testing.T) {
cfg := config.DefaultQueueConfig
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
cfg.MaxShards = 1
var temp int64
dir, err := ioutil.TempDir("", "TestSampleDeliver")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, cfg, nil, nil, c, defaultFlushDeadline)
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
m.seriesLabels = refSeriesToLabelsProto(series)
// These should be received by the client.
@ -83,13 +82,12 @@ func TestSampleDeliveryTimeout(t *testing.T) {
cfg := config.DefaultQueueConfig
cfg.MaxShards = 1
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
var temp int64
dir, err := ioutil.TempDir("", "TestSampleDeliveryTimeout")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, cfg, nil, nil, c, defaultFlushDeadline)
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
m.seriesLabels = refSeriesToLabelsProto(series)
m.Start()
defer m.Stop()
@ -124,13 +122,12 @@ func TestSampleDeliveryOrder(t *testing.T) {
c := NewTestStorageClient()
c.expectSamples(samples, series)
var temp int64
dir, err := ioutil.TempDir("", "TestSampleDeliveryOrder")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
m.seriesLabels = refSeriesToLabelsProto(series)
m.Start()
@ -144,13 +141,11 @@ func TestShutdown(t *testing.T) {
deadline := 1 * time.Second
c := NewTestBlockedStorageClient()
var temp int64
dir, err := ioutil.TempDir("", "TestShutdown")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, config.DefaultQueueConfig, nil, nil, c, deadline)
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
samples, series := createTimeseries(2 * config.DefaultQueueConfig.MaxSamplesPerSend)
m.seriesLabels = refSeriesToLabelsProto(series)
m.Start()
@ -179,7 +174,6 @@ func TestShutdown(t *testing.T) {
func TestSeriesReset(t *testing.T) {
c := NewTestBlockedStorageClient()
deadline := 5 * time.Second
var temp int64
numSegments := 4
numSeries := 25
@ -187,7 +181,7 @@ func TestSeriesReset(t *testing.T) {
testutil.Ok(t, err)
defer os.RemoveAll(dir)
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, config.DefaultQueueConfig, nil, nil, c, deadline)
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
for i := 0; i < numSegments; i++ {
series := []tsdb.RefSeries{}
for j := 0; j < numSeries; j++ {
@ -211,13 +205,11 @@ func TestReshard(t *testing.T) {
cfg := config.DefaultQueueConfig
cfg.MaxShards = 1
var temp int64
dir, err := ioutil.TempDir("", "TestReshard")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, cfg, nil, nil, c, defaultFlushDeadline)
m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
m.seriesLabels = refSeriesToLabelsProto(series)
m.Start()
@ -406,11 +398,10 @@ func BenchmarkStartup(b *testing.B) {
logger = log.With(logger, "caller", log.DefaultCaller)
for n := 0; n < b.N; n++ {
var temp int64
c := NewTestBlockedStorageClient()
m := NewQueueManager(logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration),
&temp, config.DefaultQueueConfig, nil, nil, c, 1*time.Minute)
config.DefaultQueueConfig, nil, nil, c, 1*time.Minute)
m.watcher.startTime = math.MaxInt64
m.watcher.maxSegment = segments[len(segments)-2]
err := m.watcher.run()

View File

@ -38,18 +38,14 @@ type Storage struct {
mtx sync.RWMutex
// For writes
walDir string
queues []*QueueManager
samplesIn *ewmaRate
samplesInMetric prometheus.Counter
highestTimestampMtx sync.Mutex
highestTimestamp int64
highestTimestampMetric prometheus.Gauge
walDir string
queues []*QueueManager
samplesIn *ewmaRate
flushDeadline time.Duration
// For reads
queryables []storage.Queryable
localStartTimeCallback startTimeCallback
flushDeadline time.Duration
}
// NewStorage returns a remote.Storage.
@ -57,28 +53,25 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
if l == nil {
l = log.NewNopLogger()
}
shardUpdateDuration := 10 * time.Second
s := &Storage{
logger: logging.Dedupe(l, 1*time.Minute),
localStartTimeCallback: stCallback,
flushDeadline: flushDeadline,
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
walDir: walDir,
// queues: make(map[*QueueManager]struct{}),
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
samplesInMetric: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_remote_storage_samples_in_total",
Help: "Samples in to remote storage, compare to samples out for queue managers.",
}),
highestTimestampMetric: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_remote_storage_highest_timestamp_in_seconds",
Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch.",
}),
}
reg.MustRegister(s.samplesInMetric)
reg.MustRegister(s.highestTimestampMetric)
go s.run()
return s
}
func (s *Storage) run() {
ticker := time.NewTicker(shardUpdateDuration)
defer ticker.Stop()
for range ticker.C {
s.samplesIn.tick()
}
}
// ApplyConfig updates the state as the new config requires.
func (s *Storage) ApplyConfig(conf *config.Config) error {
s.mtx.Lock()
@ -101,7 +94,6 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
s.logger,
s.walDir,
s.samplesIn,
&s.highestTimestamp,
rwConf.QueueConfig,
conf.GlobalConfig.ExternalLabels,
rwConf.WriteRelabelConfigs,

View File

@ -14,10 +14,29 @@
package remote
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
)
var (
samplesIn = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "samples_in_total",
Help: "Samples in to remote storage, compare to samples out for queue managers.",
})
highestTimestamp = maxGauge{
Gauge: promauto.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "highest_timestamp_in_seconds",
Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch.",
}),
}
)
// Appender implements scrape.Appendable.
func (s *Storage) Appender() (storage.Appender, error) {
return &timestampTracker{
@ -49,14 +68,9 @@ func (t *timestampTracker) AddFast(l labels.Labels, _ uint64, ts int64, v float6
// Commit implements storage.Appender.
func (t *timestampTracker) Commit() error {
t.storage.samplesIn.incr(t.samples)
t.storage.samplesInMetric.Add(float64(t.samples))
t.storage.highestTimestampMtx.Lock()
defer t.storage.highestTimestampMtx.Unlock()
if t.highestTimestamp > t.storage.highestTimestamp {
t.storage.highestTimestamp = t.highestTimestamp
t.storage.highestTimestampMetric.Set(float64(t.highestTimestamp) / 1000.)
}
samplesIn.Add(float64(t.samples))
highestTimestamp.Set(float64(t.highestTimestamp / 1000))
return nil
}

2
vendor/modules.txt vendored
View File

@ -224,8 +224,8 @@ github.com/prometheus/client_golang/prometheus
github.com/prometheus/client_golang/api
github.com/prometheus/client_golang/api/prometheus/v1
github.com/prometheus/client_golang/prometheus/promhttp
github.com/prometheus/client_golang/prometheus/internal
github.com/prometheus/client_golang/prometheus/promauto
github.com/prometheus/client_golang/prometheus/internal
# github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
github.com/prometheus/client_model/go
# github.com/prometheus/common v0.0.0-20181119215939-b36ad289a3ea