diff --git a/storage/remote/max_gauge.go b/storage/remote/max_gauge.go new file mode 100644 index 000000000..a2b2ccb0e --- /dev/null +++ b/storage/remote/max_gauge.go @@ -0,0 +1,39 @@ +// Copyright 2019 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +type maxGauge struct { + mtx sync.Mutex + value float64 + prometheus.Gauge +} + +func (m *maxGauge) Set(value float64) { + m.mtx.Lock() + defer m.mtx.Unlock() + if value > m.value { + m.value = value + m.Gauge.Set(value) + } +} + +func (m *maxGauge) Get() float64 { + return m.value +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 489731b44..3ffd2f8af 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -27,6 +27,7 @@ import ( "github.com/golang/snappy" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" pkgrelabel "github.com/prometheus/prometheus/pkg/relabel" @@ -51,7 +52,7 @@ const ( ) var ( - succeededSamplesTotal = prometheus.NewCounterVec( + succeededSamplesTotal = promauto.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -60,7 +61,7 @@ var ( }, []string{queue}, ) - failedSamplesTotal = prometheus.NewCounterVec( + failedSamplesTotal = promauto.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -69,7 +70,7 @@ var ( }, []string{queue}, ) - retriedSamplesTotal = prometheus.NewCounterVec( + retriedSamplesTotal = promauto.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -78,7 +79,7 @@ var ( }, []string{queue}, ) - droppedSamplesTotal = prometheus.NewCounterVec( + droppedSamplesTotal = promauto.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -87,7 +88,7 @@ var ( }, []string{queue}, ) - enqueueRetriesTotal = prometheus.NewCounterVec( + enqueueRetriesTotal = promauto.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -96,7 +97,7 @@ var ( }, []string{queue}, ) - sentBatchDuration = prometheus.NewHistogramVec( + sentBatchDuration = promauto.NewHistogramVec( prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, @@ -106,16 +107,7 @@ var ( }, []string{queue}, ) - queueLastSendTimestamp = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "queue_last_send_timestamp_seconds", - Help: "Timestamp of the last successful send by this queue.", - }, - []string{queue}, - ) - queueHighestSentTimestamp = prometheus.NewGaugeVec( + queueHighestSentTimestamp = promauto.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, @@ -124,7 +116,7 @@ var ( }, []string{queue}, ) - queuePendingSamples = prometheus.NewGaugeVec( + queuePendingSamples = promauto.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, @@ -133,7 +125,7 @@ var ( }, []string{queue}, ) - shardCapacity = prometheus.NewGaugeVec( + shardCapacity = promauto.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, @@ -142,7 +134,7 @@ var ( }, []string{queue}, ) - numShards = prometheus.NewGaugeVec( + numShards = promauto.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, @@ -153,20 +145,6 @@ var ( ) ) -func init() { - prometheus.MustRegister(succeededSamplesTotal) - prometheus.MustRegister(failedSamplesTotal) - prometheus.MustRegister(retriedSamplesTotal) - prometheus.MustRegister(droppedSamplesTotal) - prometheus.MustRegister(enqueueRetriesTotal) - prometheus.MustRegister(sentBatchDuration) - prometheus.MustRegister(queueLastSendTimestamp) - prometheus.MustRegister(queueHighestSentTimestamp) - prometheus.MustRegister(queuePendingSamples) - prometheus.MustRegister(shardCapacity) - prometheus.MustRegister(numShards) -} - // StorageClient defines an interface for sending a batch of samples to an // external timeseries database. type StorageClient interface { @@ -189,17 +167,10 @@ type QueueManager struct { client StorageClient queueName string watcher *WALWatcher - lastSendTimestampMetric prometheus.Gauge - highestSentTimestampMetric prometheus.Gauge + highestSentTimestampMetric *maxGauge pendingSamplesMetric prometheus.Gauge enqueueRetriesMetric prometheus.Counter - lastSendTimestamp int64 - highestSentTimestamp int64 - timestampLock sync.Mutex - - highestTimestampIn *int64 // highest timestamp of any sample ingested by remote storage via scrape (Appender) - seriesMtx sync.Mutex seriesLabels map[uint64][]prompb.Label seriesSegmentIndexes map[uint64]int @@ -216,14 +187,12 @@ type QueueManager struct { } // NewQueueManager builds a new QueueManager. -func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, highestTimestampIn *int64, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*pkgrelabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager { +func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*pkgrelabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager { if logger == nil { logger = log.NewNopLogger() - } else { - logger = log.With(logger, "queue", client.Name()) } t := &QueueManager{ - logger: logger, + logger: log.With(logger, "queue", client.Name()), flushDeadline: flushDeadline, cfg: cfg, externalLabels: externalLabels, @@ -231,8 +200,6 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, high client: client, queueName: client.Name(), - highestTimestampIn: highestTimestampIn, - seriesLabels: make(map[uint64][]prompb.Label), seriesSegmentIndexes: make(map[uint64]int), droppedSeries: make(map[uint64]struct{}), @@ -247,8 +214,9 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, high samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), } - t.lastSendTimestampMetric = queueLastSendTimestamp.WithLabelValues(t.queueName) - t.highestSentTimestampMetric = queueHighestSentTimestamp.WithLabelValues(t.queueName) + t.highestSentTimestampMetric = &maxGauge{ + Gauge: queueHighestSentTimestamp.WithLabelValues(t.queueName), + } t.pendingSamplesMetric = queuePendingSamples.WithLabelValues(t.queueName) t.enqueueRetriesMetric = enqueueRetriesTotal.WithLabelValues(t.queueName) t.watcher = NewWALWatcher(logger, client.Name(), t, walDir) @@ -411,12 +379,6 @@ func (t *QueueManager) updateShardsLoop() { for { select { case <-ticker.C: - now := time.Now().Unix() - threshold := int64(time.Duration(2 * t.cfg.BatchSendDeadline).Seconds()) - if now-t.lastSendTimestamp > threshold { - level.Debug(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold") - continue - } t.calculateDesiredShards() case <-t.quit: return @@ -425,7 +387,6 @@ func (t *QueueManager) updateShardsLoop() { } func (t *QueueManager) calculateDesiredShards() { - t.samplesIn.tick() t.samplesOut.tick() t.samplesDropped.tick() t.samplesOutDuration.tick() @@ -437,9 +398,11 @@ func (t *QueueManager) calculateDesiredShards() { var ( samplesIn = t.samplesIn.rate() samplesOut = t.samplesOut.rate() - samplesDropped = t.samplesDropped.rate() - samplesPending = samplesIn - samplesDropped - samplesOut + samplesKeptRatio = samplesOut / (t.samplesDropped.rate() + samplesOut) samplesOutDuration = t.samplesOutDuration.rate() + highestSent = t.highestSentTimestampMetric.Get() + highestRecv = highestTimestamp.Get() + samplesPending = (highestRecv - highestSent) * samplesIn * samplesKeptRatio ) // We use an integral accumulator, like in a PID, to help dampen oscillation. @@ -451,12 +414,18 @@ func (t *QueueManager) calculateDesiredShards() { var ( timePerSample = samplesOutDuration / samplesOut - desiredShards = (timePerSample * (samplesIn - samplesDropped + samplesPending + t.integralAccumulator)) / float64(time.Second) + desiredShards = (timePerSample * samplesPending) / float64(time.Second) ) level.Debug(t.logger).Log("msg", "QueueManager.caclulateDesiredShards", - "samplesIn", samplesIn, "samplesDropped", samplesDropped, - "samplesOut", samplesOut, "samplesPending", samplesPending, - "desiredShards", desiredShards) + "samplesIn", samplesIn, + "samplesOut", samplesOut, + "samplesKeptRatio", samplesKeptRatio, + "samplesPending", samplesPending, + "samplesOutDuration", samplesOutDuration, + "timePerSample", timePerSample, + "desiredShards", desiredShards, + "highestSent", highestSent, + "highestRecv", highestRecv) // Changes in the number of shards must be greater than shardToleranceFraction. var ( @@ -515,23 +484,6 @@ func (t *QueueManager) newShards() *shards { return s } -// Check and set highestSentTimestamp -func (t *QueueManager) setHighestSentTimestamp(highest int64) { - t.timestampLock.Lock() - defer t.timestampLock.Unlock() - if highest > t.highestSentTimestamp { - t.highestSentTimestamp = highest - t.highestSentTimestampMetric.Set(float64(t.highestSentTimestamp) / 1000.) - } -} - -func (t *QueueManager) setLastSendTimestamp(now time.Time) { - t.timestampLock.Lock() - defer t.timestampLock.Unlock() - t.lastSendTimestampMetric.Set(float64(now.UnixNano()) / 1e9) - t.lastSendTimestamp = now.Unix() -} - type shards struct { mtx sync.RWMutex // With the WAL, this is never actually contended. @@ -713,11 +665,12 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries) { func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries) error { backoff := s.qm.cfg.MinBackoff req, highest, err := buildWriteRequest(samples) - // Failing to build the write request is non-recoverable, since it will - // only error if marshaling the proto to bytes fails. if err != nil { + // Failing to build the write request is non-recoverable, since it will + // only error if marshaling the proto to bytes fails. return err } + for { select { case <-ctx.Done(): @@ -731,9 +684,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti if err == nil { succeededSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) - now := time.Now() - s.qm.setLastSendTimestamp(now) - s.qm.setHighestSentTimestamp(highest) + s.qm.highestSentTimestampMetric.Set(float64(highest / 1000)) return nil } @@ -741,7 +692,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti return err } retriedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) - level.Error(s.qm.logger).Log("err", err) + level.Debug(s.qm.logger).Log("msg", "failed to send batch, retrying", "err", err) time.Sleep(time.Duration(backoff)) backoff = backoff * 2 diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 1811c605c..4eff92fda 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -54,13 +54,12 @@ func TestSampleDelivery(t *testing.T) { cfg := config.DefaultQueueConfig cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) cfg.MaxShards = 1 - var temp int64 dir, err := ioutil.TempDir("", "TestSampleDeliver") testutil.Ok(t, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.seriesLabels = refSeriesToLabelsProto(series) // These should be received by the client. @@ -83,13 +82,12 @@ func TestSampleDeliveryTimeout(t *testing.T) { cfg := config.DefaultQueueConfig cfg.MaxShards = 1 cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) - var temp int64 dir, err := ioutil.TempDir("", "TestSampleDeliveryTimeout") testutil.Ok(t, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.seriesLabels = refSeriesToLabelsProto(series) m.Start() defer m.Stop() @@ -124,13 +122,12 @@ func TestSampleDeliveryOrder(t *testing.T) { c := NewTestStorageClient() c.expectSamples(samples, series) - var temp int64 dir, err := ioutil.TempDir("", "TestSampleDeliveryOrder") testutil.Ok(t, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.seriesLabels = refSeriesToLabelsProto(series) m.Start() @@ -144,13 +141,11 @@ func TestShutdown(t *testing.T) { deadline := 1 * time.Second c := NewTestBlockedStorageClient() - var temp int64 - dir, err := ioutil.TempDir("", "TestShutdown") testutil.Ok(t, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, config.DefaultQueueConfig, nil, nil, c, deadline) + m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) samples, series := createTimeseries(2 * config.DefaultQueueConfig.MaxSamplesPerSend) m.seriesLabels = refSeriesToLabelsProto(series) m.Start() @@ -179,7 +174,6 @@ func TestShutdown(t *testing.T) { func TestSeriesReset(t *testing.T) { c := NewTestBlockedStorageClient() deadline := 5 * time.Second - var temp int64 numSegments := 4 numSeries := 25 @@ -187,7 +181,7 @@ func TestSeriesReset(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, config.DefaultQueueConfig, nil, nil, c, deadline) + m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) for i := 0; i < numSegments; i++ { series := []tsdb.RefSeries{} for j := 0; j < numSeries; j++ { @@ -211,13 +205,11 @@ func TestReshard(t *testing.T) { cfg := config.DefaultQueueConfig cfg.MaxShards = 1 - var temp int64 - dir, err := ioutil.TempDir("", "TestReshard") testutil.Ok(t, err) defer os.RemoveAll(dir) - m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), &temp, cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.seriesLabels = refSeriesToLabelsProto(series) m.Start() @@ -406,11 +398,10 @@ func BenchmarkStartup(b *testing.B) { logger = log.With(logger, "caller", log.DefaultCaller) for n := 0; n < b.N; n++ { - var temp int64 c := NewTestBlockedStorageClient() m := NewQueueManager(logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), - &temp, config.DefaultQueueConfig, nil, nil, c, 1*time.Minute) + config.DefaultQueueConfig, nil, nil, c, 1*time.Minute) m.watcher.startTime = math.MaxInt64 m.watcher.maxSegment = segments[len(segments)-2] err := m.watcher.run() diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 96163bf29..5c1587663 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -38,18 +38,14 @@ type Storage struct { mtx sync.RWMutex // For writes - walDir string - queues []*QueueManager - samplesIn *ewmaRate - samplesInMetric prometheus.Counter - highestTimestampMtx sync.Mutex - highestTimestamp int64 - highestTimestampMetric prometheus.Gauge + walDir string + queues []*QueueManager + samplesIn *ewmaRate + flushDeadline time.Duration // For reads queryables []storage.Queryable localStartTimeCallback startTimeCallback - flushDeadline time.Duration } // NewStorage returns a remote.Storage. @@ -57,28 +53,25 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal if l == nil { l = log.NewNopLogger() } - shardUpdateDuration := 10 * time.Second s := &Storage{ logger: logging.Dedupe(l, 1*time.Minute), localStartTimeCallback: stCallback, flushDeadline: flushDeadline, + samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), walDir: walDir, - // queues: make(map[*QueueManager]struct{}), - samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), - samplesInMetric: prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_remote_storage_samples_in_total", - Help: "Samples in to remote storage, compare to samples out for queue managers.", - }), - highestTimestampMetric: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_remote_storage_highest_timestamp_in_seconds", - Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch.", - }), } - reg.MustRegister(s.samplesInMetric) - reg.MustRegister(s.highestTimestampMetric) + go s.run() return s } +func (s *Storage) run() { + ticker := time.NewTicker(shardUpdateDuration) + defer ticker.Stop() + for range ticker.C { + s.samplesIn.tick() + } +} + // ApplyConfig updates the state as the new config requires. func (s *Storage) ApplyConfig(conf *config.Config) error { s.mtx.Lock() @@ -101,7 +94,6 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { s.logger, s.walDir, s.samplesIn, - &s.highestTimestamp, rwConf.QueueConfig, conf.GlobalConfig.ExternalLabels, rwConf.WriteRelabelConfigs, diff --git a/storage/remote/write.go b/storage/remote/write.go index 5038425a6..b1e3ee9fd 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -14,10 +14,29 @@ package remote import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" ) +var ( + samplesIn = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "samples_in_total", + Help: "Samples in to remote storage, compare to samples out for queue managers.", + }) + highestTimestamp = maxGauge{ + Gauge: promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "highest_timestamp_in_seconds", + Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch.", + }), + } +) + // Appender implements scrape.Appendable. func (s *Storage) Appender() (storage.Appender, error) { return ×tampTracker{ @@ -49,14 +68,9 @@ func (t *timestampTracker) AddFast(l labels.Labels, _ uint64, ts int64, v float6 // Commit implements storage.Appender. func (t *timestampTracker) Commit() error { t.storage.samplesIn.incr(t.samples) - t.storage.samplesInMetric.Add(float64(t.samples)) - t.storage.highestTimestampMtx.Lock() - defer t.storage.highestTimestampMtx.Unlock() - if t.highestTimestamp > t.storage.highestTimestamp { - t.storage.highestTimestamp = t.highestTimestamp - t.storage.highestTimestampMetric.Set(float64(t.highestTimestamp) / 1000.) - } + samplesIn.Add(float64(t.samples)) + highestTimestamp.Set(float64(t.highestTimestamp / 1000)) return nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index f9017254b..e307a2e02 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -224,8 +224,8 @@ github.com/prometheus/client_golang/prometheus github.com/prometheus/client_golang/api github.com/prometheus/client_golang/api/prometheus/v1 github.com/prometheus/client_golang/prometheus/promhttp -github.com/prometheus/client_golang/prometheus/internal github.com/prometheus/client_golang/prometheus/promauto +github.com/prometheus/client_golang/prometheus/internal # github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 github.com/prometheus/client_model/go # github.com/prometheus/common v0.0.0-20181119215939-b36ad289a3ea