533 lines
14 KiB
Go
533 lines
14 KiB
Go
// Copyright 2018 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package remote
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"os"
|
|
"path"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/go-kit/kit/log"
|
|
"github.com/go-kit/kit/log/level"
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/prometheus/pkg/timestamp"
|
|
"github.com/prometheus/tsdb"
|
|
"github.com/prometheus/tsdb/fileutil"
|
|
"github.com/prometheus/tsdb/wal"
|
|
)
|
|
|
|
const (
|
|
readPeriod = 10 * time.Millisecond
|
|
checkpointPeriod = 5 * time.Second
|
|
segmentCheckPeriod = 100 * time.Millisecond
|
|
)
|
|
|
|
var (
|
|
watcherRecordsRead = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: "prometheus",
|
|
Subsystem: "wal_watcher",
|
|
Name: "records_read_total",
|
|
Help: "Number of records read by the WAL watcher from the WAL.",
|
|
},
|
|
[]string{queue, "type"},
|
|
)
|
|
watcherRecordDecodeFails = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: "prometheus",
|
|
Subsystem: "wal_watcher",
|
|
Name: "record_decode_failures_total",
|
|
Help: "Number of records read by the WAL watcher that resulted in an error when decoding.",
|
|
},
|
|
[]string{queue},
|
|
)
|
|
watcherSamplesSentPreTailing = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: "prometheus",
|
|
Subsystem: "wal_watcher",
|
|
Name: "samples_sent_pre_tailing_total",
|
|
Help: "Number of sample records read by the WAL watcher and sent to remote write during replay of existing WAL.",
|
|
},
|
|
[]string{queue},
|
|
)
|
|
watcherCurrentSegment = prometheus.NewGaugeVec(
|
|
prometheus.GaugeOpts{
|
|
Namespace: "prometheus",
|
|
Subsystem: "wal_watcher",
|
|
Name: "current_segment",
|
|
Help: "Current segment the WAL watcher is reading records from.",
|
|
},
|
|
[]string{queue},
|
|
)
|
|
)
|
|
|
|
func init() {
|
|
prometheus.MustRegister(watcherRecordsRead)
|
|
prometheus.MustRegister(watcherRecordDecodeFails)
|
|
prometheus.MustRegister(watcherSamplesSentPreTailing)
|
|
prometheus.MustRegister(watcherCurrentSegment)
|
|
}
|
|
|
|
type writeTo interface {
|
|
Append([]tsdb.RefSample) bool
|
|
StoreSeries([]tsdb.RefSeries, int)
|
|
SeriesReset(int)
|
|
}
|
|
|
|
// WALWatcher watches the TSDB WAL for a given WriteTo.
|
|
type WALWatcher struct {
|
|
name string
|
|
writer writeTo
|
|
logger log.Logger
|
|
walDir string
|
|
lastCheckpoint string
|
|
|
|
startTime int64
|
|
|
|
recordsReadMetric *prometheus.CounterVec
|
|
recordDecodeFailsMetric prometheus.Counter
|
|
samplesSentPreTailing prometheus.Counter
|
|
currentSegmentMetric prometheus.Gauge
|
|
|
|
quit chan struct{}
|
|
done chan struct{}
|
|
|
|
// For testing, stop when we hit this segment.
|
|
maxSegment int
|
|
}
|
|
|
|
// NewWALWatcher creates a new WAL watcher for a given WriteTo.
|
|
func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string) *WALWatcher {
|
|
if logger == nil {
|
|
logger = log.NewNopLogger()
|
|
}
|
|
return &WALWatcher{
|
|
logger: logger,
|
|
writer: writer,
|
|
walDir: path.Join(walDir, "wal"),
|
|
name: name,
|
|
quit: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
|
|
recordsReadMetric: watcherRecordsRead.MustCurryWith(prometheus.Labels{queue: name}),
|
|
recordDecodeFailsMetric: watcherRecordDecodeFails.WithLabelValues(name),
|
|
samplesSentPreTailing: watcherSamplesSentPreTailing.WithLabelValues(name),
|
|
currentSegmentMetric: watcherCurrentSegment.WithLabelValues(name),
|
|
}
|
|
}
|
|
|
|
func (w *WALWatcher) Start() {
|
|
level.Info(w.logger).Log("msg", "starting WAL watcher", "queue", w.name)
|
|
go w.loop()
|
|
}
|
|
|
|
func (w *WALWatcher) Stop() {
|
|
close(w.quit)
|
|
<-w.done
|
|
level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name)
|
|
}
|
|
|
|
func (w *WALWatcher) loop() {
|
|
defer close(w.done)
|
|
|
|
// We may encourter failures processing the WAL; we should wait and retry.
|
|
for !isClosed(w.quit) {
|
|
w.startTime = timestamp.FromTime(time.Now())
|
|
if err := w.run(); err != nil {
|
|
level.Error(w.logger).Log("msg", "error tailing WAL", "err", err)
|
|
}
|
|
|
|
select {
|
|
case <-w.quit:
|
|
return
|
|
case <-time.After(5 * time.Second):
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *WALWatcher) run() error {
|
|
nw, err := wal.New(nil, nil, w.walDir)
|
|
if err != nil {
|
|
return errors.Wrap(err, "wal.New")
|
|
}
|
|
|
|
_, last, err := nw.Segments()
|
|
if err != nil {
|
|
return errors.Wrap(err, "wal.Segments")
|
|
}
|
|
|
|
// Backfill from the checkpoint first if it exists.
|
|
lastCheckpoint, nextIndex, err := tsdb.LastCheckpoint(w.walDir)
|
|
if err != nil && err != tsdb.ErrNotFound {
|
|
return errors.Wrap(err, "tsdb.LastCheckpoint")
|
|
}
|
|
|
|
if err == nil {
|
|
if err = w.readCheckpoint(lastCheckpoint); err != nil {
|
|
return errors.Wrap(err, "readCheckpoint")
|
|
}
|
|
}
|
|
w.lastCheckpoint = lastCheckpoint
|
|
|
|
currentSegment, err := w.findSegmentForIndex(nextIndex)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
level.Info(w.logger).Log("msg", "tailing WAL", "lastCheckpoint", lastCheckpoint, "startFrom", nextIndex, "currentSegment", currentSegment, "last", last)
|
|
for !isClosed(w.quit) {
|
|
w.currentSegmentMetric.Set(float64(currentSegment))
|
|
|
|
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
|
|
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
|
|
if err := w.watch(nw, currentSegment, currentSegment >= last); err != nil {
|
|
return err
|
|
}
|
|
|
|
if currentSegment == w.maxSegment {
|
|
return nil
|
|
}
|
|
|
|
currentSegment++
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// findSegmentForIndex finds the first segment greater than or equal to index.
|
|
func (w *WALWatcher) findSegmentForIndex(index int) (int, error) {
|
|
files, err := fileutil.ReadDir(w.walDir)
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
var refs []int
|
|
var last int
|
|
for _, fn := range files {
|
|
k, err := strconv.Atoi(fn)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if len(refs) > 0 && k > last+1 {
|
|
return -1, errors.New("segments are not sequential")
|
|
}
|
|
refs = append(refs, k)
|
|
last = k
|
|
}
|
|
sort.Ints(refs)
|
|
|
|
for _, r := range refs {
|
|
if r >= index {
|
|
return r, nil
|
|
}
|
|
}
|
|
|
|
return -1, errors.New("failed to find segment for index")
|
|
}
|
|
|
|
// Use tail true to indicate thatreader is currently on a segment that is
|
|
// actively being written to. If false, assume it's a full segment and we're
|
|
// replaying it on start to cache the series records.
|
|
func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error {
|
|
segment, err := wal.OpenReadSegment(wal.SegmentName(w.walDir, segmentNum))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer segment.Close()
|
|
|
|
reader := wal.NewLiveReader(w.logger, segment)
|
|
|
|
readTicker := time.NewTicker(readPeriod)
|
|
defer readTicker.Stop()
|
|
|
|
checkpointTicker := time.NewTicker(checkpointPeriod)
|
|
defer checkpointTicker.Stop()
|
|
|
|
segmentTicker := time.NewTicker(segmentCheckPeriod)
|
|
defer segmentTicker.Stop()
|
|
|
|
// If we're replaying the segment we need to know the size of the file to know
|
|
// when to return from watch and move on to the next segment.
|
|
size := int64(math.MaxInt64)
|
|
if !tail {
|
|
segmentTicker.Stop()
|
|
checkpointTicker.Stop()
|
|
var err error
|
|
size, err = getSegmentSize(w.walDir, segmentNum)
|
|
if err != nil {
|
|
return errors.Wrap(err, "getSegmentSize")
|
|
}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-w.quit:
|
|
return nil
|
|
|
|
case <-checkpointTicker.C:
|
|
// Periodically check if there is a new checkpoint so we can garbage
|
|
// collect labels. As this is considered an optimisation, we ignore
|
|
// errors during checkpoint processing.
|
|
if err := w.garbageCollectSeries(segmentNum); err != nil {
|
|
level.Warn(w.logger).Log("msg", "error process checkpoint", "err", err)
|
|
}
|
|
|
|
case <-segmentTicker.C:
|
|
_, last, err := wl.Segments()
|
|
if err != nil {
|
|
return errors.Wrap(err, "segments")
|
|
}
|
|
|
|
// Check if new segments exists.
|
|
if last <= segmentNum {
|
|
continue
|
|
}
|
|
|
|
err = w.readSegment(reader, segmentNum, tail)
|
|
|
|
// Ignore errors reading to end of segment whilst replaying the WAL.
|
|
if !tail {
|
|
if err != nil && err != io.EOF {
|
|
level.Warn(w.logger).Log("msg", "ignoring error reading to end of segment, may have dropped data", "err", err)
|
|
} else if reader.Offset() != size {
|
|
level.Warn(w.logger).Log("msg", "expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Otherwise, when we are tailing, non-EOFs are fatal.
|
|
if err != io.EOF {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
|
|
case <-readTicker.C:
|
|
err = w.readSegment(reader, segmentNum, tail)
|
|
|
|
// Ignore all errors reading to end of segment whilst replaying the WAL.
|
|
if !tail {
|
|
if err != nil && err != io.EOF {
|
|
level.Warn(w.logger).Log("msg", "ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err)
|
|
} else if reader.Offset() != size {
|
|
level.Warn(w.logger).Log("msg", "expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Otherwise, when we are tailing, non-EOFs are fatal.
|
|
if err != io.EOF {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *WALWatcher) garbageCollectSeries(segmentNum int) error {
|
|
dir, _, err := tsdb.LastCheckpoint(w.walDir)
|
|
if err != nil && err != tsdb.ErrNotFound {
|
|
return errors.Wrap(err, "tsdb.LastCheckpoint")
|
|
}
|
|
|
|
if dir == "" || dir == w.lastCheckpoint {
|
|
return nil
|
|
}
|
|
w.lastCheckpoint = dir
|
|
|
|
index, err := checkpointNum(dir)
|
|
if err != nil {
|
|
return errors.Wrap(err, "error parsing checkpoint filename")
|
|
}
|
|
|
|
if index >= segmentNum {
|
|
level.Debug(w.logger).Log("msg", "current segment is behind the checkpoint, skipping reading of checkpoint", "current", fmt.Sprintf("%08d", segmentNum), "checkpoint", dir)
|
|
return nil
|
|
}
|
|
|
|
level.Debug(w.logger).Log("msg", "new checkpoint detected", "new", dir, "currentSegment", segmentNum)
|
|
|
|
// This potentially takes a long time, should we run it in another go routine?
|
|
if err = w.readCheckpoint(dir); err != nil {
|
|
return errors.Wrap(err, "readCheckpoint")
|
|
}
|
|
|
|
// Clear series with a checkpoint or segment index # lower than the checkpoint we just read.
|
|
w.writer.SeriesReset(index)
|
|
return nil
|
|
}
|
|
|
|
func (w *WALWatcher) readSegment(r *wal.LiveReader, segmentNum int, tail bool) error {
|
|
var (
|
|
dec tsdb.RecordDecoder
|
|
series []tsdb.RefSeries
|
|
samples []tsdb.RefSample
|
|
)
|
|
|
|
for r.Next() && !isClosed(w.quit) {
|
|
rec := r.Record()
|
|
w.recordsReadMetric.WithLabelValues(recordType(dec.Type(rec))).Inc()
|
|
|
|
switch dec.Type(rec) {
|
|
case tsdb.RecordSeries:
|
|
series, err := dec.Series(rec, series[:0])
|
|
if err != nil {
|
|
w.recordDecodeFailsMetric.Inc()
|
|
return err
|
|
}
|
|
w.writer.StoreSeries(series, segmentNum)
|
|
|
|
case tsdb.RecordSamples:
|
|
// If we're not tailing a segment we can ignore any samples records we see.
|
|
// This speeds up replay of the WAL by > 10x.
|
|
if !tail {
|
|
break
|
|
}
|
|
samples, err := dec.Samples(rec, samples[:0])
|
|
if err != nil {
|
|
w.recordDecodeFailsMetric.Inc()
|
|
return err
|
|
}
|
|
var send []tsdb.RefSample
|
|
for _, s := range samples {
|
|
if s.T > w.startTime {
|
|
send = append(send, s)
|
|
}
|
|
}
|
|
if len(send) > 0 {
|
|
// Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks).
|
|
w.writer.Append(send)
|
|
}
|
|
|
|
case tsdb.RecordTombstones:
|
|
// noop
|
|
case tsdb.RecordInvalid:
|
|
return errors.New("invalid record")
|
|
|
|
default:
|
|
w.recordDecodeFailsMetric.Inc()
|
|
return errors.New("unknown TSDB record type")
|
|
}
|
|
}
|
|
return r.Err()
|
|
}
|
|
|
|
func recordType(rt tsdb.RecordType) string {
|
|
switch rt {
|
|
case tsdb.RecordInvalid:
|
|
return "invalid"
|
|
case tsdb.RecordSeries:
|
|
return "series"
|
|
case tsdb.RecordSamples:
|
|
return "samples"
|
|
case tsdb.RecordTombstones:
|
|
return "tombstones"
|
|
default:
|
|
return "unkown"
|
|
}
|
|
}
|
|
|
|
// Read all the series records from a Checkpoint directory.
|
|
func (w *WALWatcher) readCheckpoint(checkpointDir string) error {
|
|
level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", checkpointDir)
|
|
index, err := checkpointNum(checkpointDir)
|
|
if err != nil {
|
|
return errors.Wrap(err, "checkpointNum")
|
|
}
|
|
|
|
sr, err := wal.NewSegmentsReader(checkpointDir)
|
|
if err != nil {
|
|
return errors.Wrap(err, "NewSegmentsReader")
|
|
}
|
|
defer sr.Close()
|
|
|
|
size, err := getCheckpointSize(checkpointDir)
|
|
if err != nil {
|
|
return errors.Wrap(err, "getCheckpointSize")
|
|
}
|
|
|
|
r := wal.NewLiveReader(w.logger, sr)
|
|
if err := w.readSegment(r, index, false); err != io.EOF {
|
|
return errors.Wrap(err, "readSegment")
|
|
}
|
|
|
|
if r.Offset() != size {
|
|
level.Warn(w.logger).Log("msg", "may not have read all data from checkpoint", "totalRead", r.Offset(), "size", size)
|
|
}
|
|
level.Debug(w.logger).Log("msg", "read series references from checkpoint", "checkpoint", checkpointDir)
|
|
|
|
return nil
|
|
}
|
|
|
|
func checkpointNum(dir string) (int, error) {
|
|
// Checkpoint dir names are in the format checkpoint.000001
|
|
chunks := strings.Split(dir, ".")
|
|
if len(chunks) != 2 {
|
|
return 0, errors.Errorf("invalid checkpoint dir string: %s", dir)
|
|
}
|
|
|
|
result, err := strconv.Atoi(chunks[1])
|
|
if err != nil {
|
|
return 0, errors.Errorf("invalid checkpoint dir string: %s", dir)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func getCheckpointSize(dir string) (int64, error) {
|
|
i := int64(0)
|
|
segs, err := fileutil.ReadDir(dir)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
for _, fn := range segs {
|
|
num, err := strconv.Atoi(fn)
|
|
if err != nil {
|
|
return i, err
|
|
}
|
|
sz, err := getSegmentSize(dir, num)
|
|
if err != nil {
|
|
return i, err
|
|
}
|
|
i += sz
|
|
}
|
|
return i, nil
|
|
}
|
|
|
|
// Get size of segment.
|
|
func getSegmentSize(dir string, index int) (int64, error) {
|
|
i := int64(-1)
|
|
fi, err := os.Stat(wal.SegmentName(dir, index))
|
|
if err == nil {
|
|
i = fi.Size()
|
|
}
|
|
return i, err
|
|
}
|
|
|
|
func isClosed(c chan struct{}) bool {
|
|
select {
|
|
case <-c:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|