// Copyright 2018 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package wal import ( "fmt" "io" "math" "os" "path" "sort" "strconv" "strings" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/record" ) const ( readPeriod = 10 * time.Millisecond checkpointPeriod = 5 * time.Second segmentCheckPeriod = 100 * time.Millisecond consumer = "consumer" ) // WriteTo is an interface used by the Watcher to send the samples it's read // from the WAL on to somewhere else. Functions will be called concurrently // and it is left to the implementer to make sure they are safe. type WriteTo interface { Append([]record.RefSample) bool StoreSeries([]record.RefSeries, int) // SeriesReset is called after reading a checkpoint to allow the deletion // of all series created in a segment lower than the argument. SeriesReset(int) } type WatcherMetrics struct { recordsRead *prometheus.CounterVec recordDecodeFails *prometheus.CounterVec samplesSentPreTailing *prometheus.CounterVec currentSegment *prometheus.GaugeVec } // Watcher watches the TSDB WAL for a given WriteTo. type Watcher struct { name string writer WriteTo logger log.Logger walDir string lastCheckpoint string metrics *WatcherMetrics readerMetrics *liveReaderMetrics StartTime int64 recordsReadMetric *prometheus.CounterVec recordDecodeFailsMetric prometheus.Counter samplesSentPreTailing prometheus.Counter currentSegmentMetric prometheus.Gauge quit chan struct{} done chan struct{} // For testing, stop when we hit this segment. MaxSegment int } func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { m := &WatcherMetrics{ recordsRead: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "prometheus", Subsystem: "wal_watcher", Name: "records_read_total", Help: "Number of records read by the WAL watcher from the WAL.", }, []string{consumer, "type"}, ), recordDecodeFails: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "prometheus", Subsystem: "wal_watcher", Name: "record_decode_failures_total", Help: "Number of records read by the WAL watcher that resulted in an error when decoding.", }, []string{consumer}, ), samplesSentPreTailing: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "prometheus", Subsystem: "wal_watcher", Name: "samples_sent_pre_tailing_total", Help: "Number of sample records read by the WAL watcher and sent to remote write during replay of existing WAL.", }, []string{consumer}, ), currentSegment: prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "prometheus", Subsystem: "wal_watcher", Name: "current_segment", Help: "Current segment the WAL watcher is reading records from.", }, []string{consumer}, ), } if reg != nil { _ = reg.Register(m.recordsRead) _ = reg.Register(m.recordDecodeFails) _ = reg.Register(m.samplesSentPreTailing) _ = reg.Register(m.currentSegment) } return m } // NewWatcher creates a new WAL watcher for a given WriteTo. func NewWatcher(reg prometheus.Registerer, metrics *WatcherMetrics, logger log.Logger, name string, writer WriteTo, walDir string) *Watcher { if logger == nil { logger = log.NewNopLogger() } return &Watcher{ logger: logger, writer: writer, metrics: metrics, readerMetrics: NewLiveReaderMetrics(reg), walDir: path.Join(walDir, "wal"), name: name, quit: make(chan struct{}), done: make(chan struct{}), MaxSegment: -1, } } func (w *Watcher) setMetrics() { // Setup the WAL Watchers metrics. We do this here rather than in the // constructor because of the ordering of creating Queue Managers's, // stopping them, and then starting new ones in storage/remote/storage.go ApplyConfig. if w.metrics != nil { w.recordsReadMetric = w.metrics.recordsRead.MustCurryWith(prometheus.Labels{consumer: w.name}) w.recordDecodeFailsMetric = w.metrics.recordDecodeFails.WithLabelValues(w.name) w.samplesSentPreTailing = w.metrics.samplesSentPreTailing.WithLabelValues(w.name) w.currentSegmentMetric = w.metrics.currentSegment.WithLabelValues(w.name) } } // Start the Watcher. func (w *Watcher) Start() { w.setMetrics() level.Info(w.logger).Log("msg", "starting WAL watcher", "queue", w.name) go w.loop() } // Stop the Watcher. func (w *Watcher) Stop() { close(w.quit) <-w.done // Records read metric has series and samples. w.metrics.recordsRead.DeleteLabelValues(w.name, "series") w.metrics.recordsRead.DeleteLabelValues(w.name, "samples") w.metrics.recordDecodeFails.DeleteLabelValues(w.name) w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name) w.metrics.currentSegment.DeleteLabelValues(w.name) level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name) } func (w *Watcher) loop() { defer close(w.done) // We may encounter failures processing the WAL; we should wait and retry. for !isClosed(w.quit) { w.StartTime = timestamp.FromTime(time.Now()) if err := w.Run(); err != nil { level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) } select { case <-w.quit: return case <-time.After(5 * time.Second): } } } // Run the watcher, which will tail the WAL until the quit channel is closed // or an error case is hit. func (w *Watcher) Run() error { _, lastSegment, err := w.firstAndLast() if err != nil { return errors.Wrap(err, "wal.Segments") } // Backfill from the checkpoint first if it exists. lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir) if err != nil && err != record.ErrNotFound { return errors.Wrap(err, "tsdb.LastCheckpoint") } if err == nil { if err = w.readCheckpoint(lastCheckpoint); err != nil { return errors.Wrap(err, "readCheckpoint") } } w.lastCheckpoint = lastCheckpoint currentSegment, err := w.findSegmentForIndex(checkpointIndex) if err != nil { return err } level.Debug(w.logger).Log("msg", "tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment, "lastSegment", lastSegment) for !isClosed(w.quit) { w.currentSegmentMetric.Set(float64(currentSegment)) level.Debug(w.logger).Log("msg", "processing segment", "currentSegment", currentSegment) // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil { return err } // For testing: stop when you hit a specific segment. if currentSegment == w.MaxSegment { return nil } currentSegment++ } return nil } // findSegmentForIndex finds the first segment greater than or equal to index. func (w *Watcher) findSegmentForIndex(index int) (int, error) { refs, err := w.segments(w.walDir) if err != nil { return -1, err } for _, r := range refs { if r >= index { return r, nil } } return -1, errors.New("failed to find segment for index") } func (w *Watcher) firstAndLast() (int, int, error) { refs, err := w.segments(w.walDir) if err != nil { return -1, -1, err } if len(refs) == 0 { return -1, -1, nil } return refs[0], refs[len(refs)-1], nil } // Copied from tsdb/wal/wal.go so we do not have to open a WAL. // Plan is to move WAL watcher to TSDB and dedupe these implementations. func (w *Watcher) segments(dir string) ([]int, error) { files, err := fileutil.ReadDir(dir) if err != nil { return nil, err } var refs []int var last int for _, fn := range files { k, err := strconv.Atoi(fn) if err != nil { continue } if len(refs) > 0 && k > last+1 { return nil, errors.New("segments are not sequential") } refs = append(refs, k) last = k } sort.Ints(refs) return refs, nil } // Use tail true to indicate that the reader is currently on a segment that is // actively being written to. If false, assume it's a full segment and we're // replaying it on start to cache the series records. func (w *Watcher) watch(segmentNum int, tail bool) error { segment, err := OpenReadSegment(SegmentName(w.walDir, segmentNum)) if err != nil { return err } defer segment.Close() reader := NewLiveReader(w.logger, w.readerMetrics, segment) readTicker := time.NewTicker(readPeriod) defer readTicker.Stop() checkpointTicker := time.NewTicker(checkpointPeriod) defer checkpointTicker.Stop() segmentTicker := time.NewTicker(segmentCheckPeriod) defer segmentTicker.Stop() // If we're replaying the segment we need to know the size of the file to know // when to return from watch and move on to the next segment. size := int64(math.MaxInt64) if !tail { segmentTicker.Stop() checkpointTicker.Stop() var err error size, err = getSegmentSize(w.walDir, segmentNum) if err != nil { return errors.Wrap(err, "getSegmentSize") } } gcSem := make(chan struct{}, 1) for { select { case <-w.quit: return nil case <-checkpointTicker.C: // Periodically check if there is a new checkpoint so we can garbage // collect labels. As this is considered an optimisation, we ignore // errors during checkpoint processing. Doing the process asynchronously // allows the current WAL segment to be processed while reading the // checkpoint. select { case gcSem <- struct{}{}: go func() { defer func() { <-gcSem }() if err := w.garbageCollectSeries(segmentNum); err != nil { level.Warn(w.logger).Log("msg", "error process checkpoint", "err", err) } }() default: // Currently doing a garbage collect, try again later. } case <-segmentTicker.C: _, last, err := w.firstAndLast() if err != nil { return errors.Wrap(err, "segments") } // Check if new segments exists. if last <= segmentNum { continue } err = w.readSegment(reader, segmentNum, tail) // Ignore errors reading to end of segment whilst replaying the WAL. if !tail { if err != nil && err != io.EOF { level.Warn(w.logger).Log("msg", "ignoring error reading to end of segment, may have dropped data", "err", err) } else if reader.Offset() != size { level.Warn(w.logger).Log("msg", "expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size) } return nil } // Otherwise, when we are tailing, non-EOFs are fatal. if err != io.EOF { return err } return nil case <-readTicker.C: err = w.readSegment(reader, segmentNum, tail) // Ignore all errors reading to end of segment whilst replaying the WAL. if !tail { if err != nil && err != io.EOF { level.Warn(w.logger).Log("msg", "ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err) } else if reader.Offset() != size { level.Warn(w.logger).Log("msg", "expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size) } return nil } // Otherwise, when we are tailing, non-EOFs are fatal. if err != io.EOF { return err } } } } func (w *Watcher) garbageCollectSeries(segmentNum int) error { dir, _, err := LastCheckpoint(w.walDir) if err != nil && err != record.ErrNotFound { return errors.Wrap(err, "tsdb.LastCheckpoint") } if dir == "" || dir == w.lastCheckpoint { return nil } w.lastCheckpoint = dir index, err := checkpointNum(dir) if err != nil { return errors.Wrap(err, "error parsing checkpoint filename") } if index >= segmentNum { level.Debug(w.logger).Log("msg", "current segment is behind the checkpoint, skipping reading of checkpoint", "current", fmt.Sprintf("%08d", segmentNum), "checkpoint", dir) return nil } level.Debug(w.logger).Log("msg", "new checkpoint detected", "new", dir, "currentSegment", segmentNum) if err = w.readCheckpoint(dir); err != nil { return errors.Wrap(err, "readCheckpoint") } // Clear series with a checkpoint or segment index # lower than the checkpoint we just read. w.writer.SeriesReset(index) return nil } func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { var ( dec record.Decoder series []record.RefSeries samples []record.RefSample send []record.RefSample ) for r.Next() && !isClosed(w.quit) { rec := r.Record() w.recordsReadMetric.WithLabelValues(recordType(dec.Type(rec))).Inc() switch dec.Type(rec) { case record.Series: series, err := dec.Series(rec, series[:0]) if err != nil { w.recordDecodeFailsMetric.Inc() return err } w.writer.StoreSeries(series, segmentNum) case record.Samples: // If we're not tailing a segment we can ignore any samples records we see. // This speeds up replay of the WAL by > 10x. if !tail { break } samples, err := dec.Samples(rec, samples[:0]) if err != nil { w.recordDecodeFailsMetric.Inc() return err } for _, s := range samples { if s.T > w.StartTime { send = append(send, s) } } if len(send) > 0 { // Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks). w.writer.Append(send) send = send[:0] } case record.Tombstones: // noop case record.Invalid: return errors.New("invalid record") default: w.recordDecodeFailsMetric.Inc() return errors.New("unknown TSDB record type") } } return r.Err() } func recordType(rt record.Type) string { switch rt { case record.Invalid: return "invalid" case record.Series: return "series" case record.Samples: return "samples" case record.Tombstones: return "tombstones" default: return "unknown" } } // Read all the series records from a Checkpoint directory. func (w *Watcher) readCheckpoint(checkpointDir string) error { level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", checkpointDir) index, err := checkpointNum(checkpointDir) if err != nil { return errors.Wrap(err, "checkpointNum") } // Ensure we read the whole contents of every segment in the checkpoint dir. segs, err := w.segments(checkpointDir) if err != nil { return errors.Wrap(err, "Unable to get segments checkpoint dir") } for _, seg := range segs { size, err := getSegmentSize(checkpointDir, seg) if err != nil { return errors.Wrap(err, "getSegmentSize") } sr, err := OpenReadSegment(SegmentName(checkpointDir, seg)) if err != nil { return errors.Wrap(err, "unable to open segment") } defer sr.Close() r := NewLiveReader(w.logger, w.readerMetrics, sr) if err := w.readSegment(r, index, false); err != io.EOF && err != nil { return errors.Wrap(err, "readSegment") } if r.Offset() != size { return fmt.Errorf("readCheckpoint wasn't able to read all data from the checkpoint %s/%08d, size: %d, totalRead: %d", checkpointDir, seg, size, r.Offset()) } } level.Debug(w.logger).Log("msg", "read series references from checkpoint", "checkpoint", checkpointDir) return nil } func checkpointNum(dir string) (int, error) { // Checkpoint dir names are in the format checkpoint.000001 // dir may contain a hidden directory, so only check the base directory chunks := strings.Split(path.Base(dir), ".") if len(chunks) != 2 { return 0, errors.Errorf("invalid checkpoint dir string: %s", dir) } result, err := strconv.Atoi(chunks[1]) if err != nil { return 0, errors.Errorf("invalid checkpoint dir string: %s", dir) } return result, nil } // Get size of segment. func getSegmentSize(dir string, index int) (int64, error) { i := int64(-1) fi, err := os.Stat(SegmentName(dir, index)) if err == nil { i = fi.Size() } return i, err } func isClosed(c chan struct{}) bool { select { case <-c: return true default: return false } }