// Copyright 2018 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package wal import ( "fmt" "io" "io/ioutil" "math" "os" "path/filepath" "sort" "strconv" "strings" "github.com/pkg/errors" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tombstones" ) // CheckpointStats returns stats about a created checkpoint. type CheckpointStats struct { DroppedSeries int DroppedSamples int DroppedTombstones int TotalSeries int // Processed series including dropped ones. TotalSamples int // Processed samples including dropped ones. TotalTombstones int // Processed tombstones including dropped ones. } // LastCheckpoint returns the directory name and index of the most recent checkpoint. // If dir does not contain any checkpoints, ErrNotFound is returned. func LastCheckpoint(dir string) (string, int, error) { checkpoints, err := listCheckpoints(dir) if err != nil { return "", 0, err } if len(checkpoints) == 0 { return "", 0, record.ErrNotFound } checkpoint := checkpoints[len(checkpoints)-1] return filepath.Join(dir, checkpoint.name), checkpoint.index, nil } // DeleteCheckpoints deletes all checkpoints in a directory below a given index. func DeleteCheckpoints(dir string, maxIndex int) error { checkpoints, err := listCheckpoints(dir) if err != nil { return err } var errs tsdb_errors.MultiError for _, checkpoint := range checkpoints { if checkpoint.index >= maxIndex { break } if err := os.RemoveAll(filepath.Join(dir, checkpoint.name)); err != nil { errs.Add(err) } } return errs.Err() } const checkpointPrefix = "checkpoint." // Checkpoint creates a compacted checkpoint of segments in range [first, last] in the given WAL. // It includes the most recent checkpoint if it exists. // All series not satisfying keep and samples below mint are dropped. // // The checkpoint is stored in a directory named checkpoint.N in the same // segmented format as the original WAL itself. // This makes it easy to read it through the WAL package and concatenate // it with the original WAL. func Checkpoint(w *WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { stats := &CheckpointStats{} var sgmReader io.ReadCloser { var sgmRange []SegmentRange dir, idx, err := LastCheckpoint(w.Dir()) if err != nil && err != record.ErrNotFound { return nil, errors.Wrap(err, "find last checkpoint") } last := idx + 1 if err == nil { if from > last { return nil, fmt.Errorf("unexpected gap to last checkpoint. expected:%v, requested:%v", last, from) } // Ignore WAL files below the checkpoint. They shouldn't exist to begin with. from = last sgmRange = append(sgmRange, SegmentRange{Dir: dir, Last: math.MaxInt32}) } sgmRange = append(sgmRange, SegmentRange{Dir: w.Dir(), First: from, Last: to}) sgmReader, err = NewSegmentsRangeReader(sgmRange...) if err != nil { return nil, errors.Wrap(err, "create segment reader") } defer sgmReader.Close() } cpdir := checkpointDir(w.Dir(), to) cpdirtmp := cpdir + ".tmp" if err := os.RemoveAll(cpdirtmp); err != nil { return nil, errors.Wrap(err, "remove previous temporary checkpoint dir") } if err := os.MkdirAll(cpdirtmp, 0777); err != nil { return nil, errors.Wrap(err, "create checkpoint dir") } cp, err := New(nil, nil, cpdirtmp, w.CompressionEnabled()) if err != nil { return nil, errors.Wrap(err, "open checkpoint") } // Ensures that an early return caused by an error doesn't leave any tmp files. defer func() { cp.Close() os.RemoveAll(cpdirtmp) }() r := NewReader(sgmReader) var ( series []record.RefSeries samples []record.RefSample tstones []tombstones.Stone dec record.Decoder enc record.Encoder buf []byte recs [][]byte ) for r.Next() { series, samples, tstones = series[:0], samples[:0], tstones[:0] // We don't reset the buffer since we batch up multiple records // before writing them to the checkpoint. // Remember where the record for this iteration starts. start := len(buf) rec := r.Record() switch dec.Type(rec) { case record.Series: series, err = dec.Series(rec, series) if err != nil { return nil, errors.Wrap(err, "decode series") } // Drop irrelevant series in place. repl := series[:0] for _, s := range series { if keep(s.Ref) { repl = append(repl, s) } } if len(repl) > 0 { buf = enc.Series(repl, buf) } stats.TotalSeries += len(series) stats.DroppedSeries += len(series) - len(repl) case record.Samples: samples, err = dec.Samples(rec, samples) if err != nil { return nil, errors.Wrap(err, "decode samples") } // Drop irrelevant samples in place. repl := samples[:0] for _, s := range samples { if s.T >= mint { repl = append(repl, s) } } if len(repl) > 0 { buf = enc.Samples(repl, buf) } stats.TotalSamples += len(samples) stats.DroppedSamples += len(samples) - len(repl) case record.Tombstones: tstones, err = dec.Tombstones(rec, tstones) if err != nil { return nil, errors.Wrap(err, "decode deletes") } // Drop irrelevant tombstones in place. repl := tstones[:0] for _, s := range tstones { for _, iv := range s.Intervals { if iv.Maxt >= mint { repl = append(repl, s) break } } } if len(repl) > 0 { buf = enc.Tombstones(repl, buf) } stats.TotalTombstones += len(tstones) stats.DroppedTombstones += len(tstones) - len(repl) default: return nil, errors.New("invalid record type") } if len(buf[start:]) == 0 { continue // All contents discarded. } recs = append(recs, buf[start:]) // Flush records in 1 MB increments. if len(buf) > 1*1024*1024 { if err := cp.Log(recs...); err != nil { return nil, errors.Wrap(err, "flush records") } buf, recs = buf[:0], recs[:0] } } // If we hit any corruption during checkpointing, repairing is not an option. // The head won't know which series records are lost. if r.Err() != nil { return nil, errors.Wrap(r.Err(), "read segments") } // Flush remaining records. if err := cp.Log(recs...); err != nil { return nil, errors.Wrap(err, "flush records") } if err := cp.Close(); err != nil { return nil, errors.Wrap(err, "close checkpoint") } if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { return nil, errors.Wrap(err, "rename checkpoint directory") } return stats, nil } func checkpointDir(dir string, i int) string { return filepath.Join(dir, fmt.Sprintf(checkpointPrefix+"%08d", i)) } type checkpointRef struct { name string index int } func listCheckpoints(dir string) (refs []checkpointRef, err error) { files, err := ioutil.ReadDir(dir) if err != nil { return nil, err } for i := 0; i < len(files); i++ { fi := files[i] if !strings.HasPrefix(fi.Name(), checkpointPrefix) { continue } if !fi.IsDir() { return nil, errors.Errorf("checkpoint %s is not a directory", fi.Name()) } idx, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) if err != nil { continue } refs = append(refs, checkpointRef{name: fi.Name(), index: idx}) } sort.Slice(refs, func(i, j int) bool { return refs[i].index < refs[j].index }) return refs, nil }