diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 25aa0365d..fee1de9ae 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -158,18 +158,13 @@ type StorageClient interface { // indicated by the provided StorageClient. Implements writeTo interface // used by WAL Watcher. type QueueManager struct { - logger log.Logger - - flushDeadline time.Duration - cfg config.QueueConfig - externalLabels model.LabelSet - relabelConfigs []*pkgrelabel.Config - client StorageClient - queueName string - watcher *WALWatcher - highestSentTimestampMetric *maxGauge - pendingSamplesMetric prometheus.Gauge - enqueueRetriesMetric prometheus.Counter + logger log.Logger + flushDeadline time.Duration + cfg config.QueueConfig + externalLabels model.LabelSet + relabelConfigs []*pkgrelabel.Config + client StorageClient + watcher *WALWatcher seriesMtx sync.Mutex seriesLabels map[uint64][]prompb.Label @@ -184,6 +179,16 @@ type QueueManager struct { samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate integralAccumulator float64 + + highestSentTimestampMetric *maxGauge + pendingSamplesMetric prometheus.Gauge + enqueueRetriesMetric prometheus.Counter + droppedSamplesTotal prometheus.Counter + numShardsMetric prometheus.Gauge + failedSamplesTotal prometheus.Counter + sentBatchDuration prometheus.Observer + succeededSamplesTotal prometheus.Counter + retriedSamplesTotal prometheus.Counter } // NewQueueManager builds a new QueueManager. @@ -191,14 +196,16 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, cfg if logger == nil { logger = log.NewNopLogger() } + + name := client.Name() + logger = log.With(logger, "queue", name) t := &QueueManager{ - logger: log.With(logger, "queue", client.Name()), + logger: logger, flushDeadline: flushDeadline, cfg: cfg, externalLabels: externalLabels, relabelConfigs: relabelConfigs, client: client, - queueName: client.Name(), seriesLabels: make(map[uint64][]prompb.Label), seriesSegmentIndexes: make(map[uint64]int), @@ -212,26 +219,25 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, cfg samplesDropped: newEWMARate(ewmaWeight, shardUpdateDuration), samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration), samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), + + highestSentTimestampMetric: &maxGauge{ + Gauge: queueHighestSentTimestamp.WithLabelValues(name), + }, + pendingSamplesMetric: queuePendingSamples.WithLabelValues(name), + enqueueRetriesMetric: enqueueRetriesTotal.WithLabelValues(name), + droppedSamplesTotal: droppedSamplesTotal.WithLabelValues(name), + numShardsMetric: numShards.WithLabelValues(name), + failedSamplesTotal: failedSamplesTotal.WithLabelValues(name), + sentBatchDuration: sentBatchDuration.WithLabelValues(name), + succeededSamplesTotal: succeededSamplesTotal.WithLabelValues(name), + retriedSamplesTotal: retriedSamplesTotal.WithLabelValues(name), } - t.highestSentTimestampMetric = &maxGauge{ - Gauge: queueHighestSentTimestamp.WithLabelValues(t.queueName), - } - t.pendingSamplesMetric = queuePendingSamples.WithLabelValues(t.queueName) - t.enqueueRetriesMetric = enqueueRetriesTotal.WithLabelValues(t.queueName) - t.watcher = NewWALWatcher(logger, client.Name(), t, walDir) + t.watcher = NewWALWatcher(logger, name, t, walDir) t.shards = t.newShards() - numShards.WithLabelValues(t.queueName).Set(float64(t.numShards)) - shardCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.Capacity)) - - // Initialize counter labels to zero. - sentBatchDuration.WithLabelValues(t.queueName) - succeededSamplesTotal.WithLabelValues(t.queueName) - failedSamplesTotal.WithLabelValues(t.queueName) - droppedSamplesTotal.WithLabelValues(t.queueName) - retriedSamplesTotal.WithLabelValues(t.queueName) - // Reset pending samples metric to 0. + // Initialise some metrics. + shardCapacity.WithLabelValues(name).Set(float64(t.cfg.Capacity)) t.pendingSamplesMetric.Set(0) return t @@ -250,7 +256,7 @@ func (t *QueueManager) Append(s []tsdb.RefSample) bool { for _, sample := range s { // If we have no labels for the series, due to relabelling or otherwise, don't send the sample. if _, ok := t.seriesLabels[sample.Ref]; !ok { - droppedSamplesTotal.WithLabelValues(t.queueName).Inc() + t.droppedSamplesTotal.Inc() t.samplesDropped.incr(1) if _, ok := t.droppedSeries[sample.Ref]; !ok { level.Info(t.logger).Log("msg", "dropped sample for series that was not explicitly dropped via relabelling", "ref", sample.Ref) @@ -523,7 +529,7 @@ func (s *shards) start(n int) { for i := 0; i < n; i++ { go s.runShard(hardShutdownCtx, i, newQueues[i]) } - numShards.WithLabelValues(s.qm.queueName).Set(float64(n)) + s.qm.numShardsMetric.Set(float64(n)) } // stop the shards; subsequent call to enqueue will return false. @@ -652,7 +658,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries) { err := s.sendSamplesWithBackoff(ctx, samples) if err != nil { level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", len(samples), "err", err) - failedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) + s.qm.failedSamplesTotal.Add(float64(len(samples))) } // These counters are used to calculate the dynamic sharding, and as such @@ -680,10 +686,10 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti begin := time.Now() err := s.qm.client.Store(ctx, req) - sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds()) + s.qm.sentBatchDuration.Observe(time.Since(begin).Seconds()) if err == nil { - succeededSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) + s.qm.succeededSamplesTotal.Add(float64(len(samples))) s.qm.highestSentTimestampMetric.Set(float64(highest / 1000)) return nil } @@ -691,7 +697,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti if _, ok := err.(recoverableError); !ok { return err } - retriedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) + s.qm.retriedSamplesTotal.Add(float64(len(samples))) level.Debug(s.qm.logger).Log("msg", "failed to send batch, retrying", "err", err) time.Sleep(time.Duration(backoff)) diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 5c1587663..cf8a9f5eb 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -35,7 +35,7 @@ type startTimeCallback func() (int64, error) // storage.Storage. type Storage struct { logger log.Logger - mtx sync.RWMutex + mtx sync.Mutex // For writes walDir string @@ -112,8 +112,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { } // Update read clients - - s.queryables = make([]storage.Queryable, 0, len(conf.RemoteReadConfigs)) + queryables := make([]storage.Queryable, 0, len(conf.RemoteReadConfigs)) for i, rrConf := range conf.RemoteReadConfigs { c, err := NewClient(i, &ClientConfig{ URL: rrConf.URL, @@ -132,8 +131,9 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { if !rrConf.ReadRecent { q = PreferLocalStorageFilter(q, s.localStartTimeCallback) } - s.queryables = append(s.queryables, q) + queryables = append(queryables, q) } + s.queryables = queryables return nil } diff --git a/storage/remote/wal_watcher.go b/storage/remote/wal_watcher.go index 79625ac2a..ded13ec62 100644 --- a/storage/remote/wal_watcher.go +++ b/storage/remote/wal_watcher.go @@ -169,12 +169,7 @@ func (w *WALWatcher) loop() { } func (w *WALWatcher) run() error { - nw, err := wal.New(nil, nil, w.walDir) - if err != nil { - return errors.Wrap(err, "wal.New") - } - - _, lastSegment, err := nw.Segments() + _, lastSegment, err := w.firstAndLast() if err != nil { return errors.Wrap(err, "wal.Segments") } @@ -200,10 +195,11 @@ func (w *WALWatcher) run() error { level.Debug(w.logger).Log("msg", "tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment, "lastSegment", lastSegment) for !isClosed(w.quit) { w.currentSegmentMetric.Set(float64(currentSegment)) + level.Debug(w.logger).Log("msg", "processing segment", "currentSegment", currentSegment) // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. - if err := w.watch(nw, currentSegment, currentSegment >= lastSegment); err != nil { + if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil { return err } @@ -220,26 +216,11 @@ func (w *WALWatcher) run() error { // findSegmentForIndex finds the first segment greater than or equal to index. func (w *WALWatcher) findSegmentForIndex(index int) (int, error) { - files, err := fileutil.ReadDir(w.walDir) + refs, err := w.segments() if err != nil { - return -1, err + return -1, nil } - var refs []int - var last int - for _, fn := range files { - k, err := strconv.Atoi(fn) - if err != nil { - continue - } - if len(refs) > 0 && k > last+1 { - return -1, errors.New("segments are not sequential") - } - refs = append(refs, k) - last = k - } - sort.Ints(refs) - for _, r := range refs { if r >= index { return r, nil @@ -249,10 +230,48 @@ func (w *WALWatcher) findSegmentForIndex(index int) (int, error) { return -1, errors.New("failed to find segment for index") } +func (w *WALWatcher) firstAndLast() (int, int, error) { + refs, err := w.segments() + if err != nil { + return -1, -1, nil + } + + if len(refs) == 0 { + return -1, -1, nil + } + return refs[0], refs[len(refs)-1], nil +} + +// Copied from tsdb/wal/wal.go so we do not have to open a WAL. +// Plan is to move WAL watcher to TSDB and dedupe these implementations. +func (w *WALWatcher) segments() ([]int, error) { + files, err := fileutil.ReadDir(w.walDir) + if err != nil { + return nil, err + } + + var refs []int + var last int + for _, fn := range files { + k, err := strconv.Atoi(fn) + if err != nil { + continue + } + if len(refs) > 0 && k > last+1 { + return nil, errors.New("segments are not sequential") + } + refs = append(refs, k) + last = k + } + sort.Ints(refs) + + return refs, nil +} + // Use tail true to indicate that the reader is currently on a segment that is // actively being written to. If false, assume it's a full segment and we're // replaying it on start to cache the series records. -func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error { +func (w *WALWatcher) watch(segmentNum int, tail bool) error { segment, err := wal.OpenReadSegment(wal.SegmentName(w.walDir, segmentNum)) if err != nil { return err @@ -297,7 +316,7 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error { } case <-segmentTicker.C: - _, last, err := wl.Segments() + _, last, err := w.firstAndLast() if err != nil { return errors.Wrap(err, "segments") }