From f8ec0074e70d0c31b54e23ded4e6d67e0b47acb7 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 2 Aug 2018 17:46:45 -0400 Subject: [PATCH] Add Replace function Signed-off-by: Fabian Reinartz --- checkpoint.go | 6 +++--- docs/format/wal.md | 4 +--- fileutil/fileutil.go | 23 +++++++++++++++++++++++ wal/wal.go | 16 ++++++++-------- 4 files changed, 35 insertions(+), 14 deletions(-) diff --git a/checkpoint.go b/checkpoint.go index 87ff5597e..d988d3561 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -37,7 +37,7 @@ type CheckpointStats struct { DroppedTombstones int TotalSeries int // Processed series including dropped ones. TotalSamples int // Processed samples inlcuding dropped ones. - TotalTombstones int // Processed tombstones including droppes ones. + TotalTombstones int // Processed tombstones including dropped ones. } // LastCheckpoint returns the directory name of the most recent checkpoint. @@ -260,8 +260,8 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo if err := cp.Close(); err != nil { return nil, errors.Wrap(err, "close checkpoint") } - if err := fileutil.Rename(cpdirtmp, cpdir); err != nil { - return nil, errors.Wrap(err, "rename checkpoint file") + if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { + return nil, errors.Wrap(err, "rename checkpoint directory") } if err := w.Truncate(n + 1); err != nil { // If truncating fails, we'll just try again at the next checkpoint. diff --git a/docs/format/wal.md b/docs/format/wal.md index 6127fd050..7195e0bdf 100644 --- a/docs/format/wal.md +++ b/docs/format/wal.md @@ -7,7 +7,7 @@ may be partial. A WAL record is an opaque byte slice that gets split up into sub should it exceed the remaining space of the current page. Records are never split across segment boundaries. If a single record exceeds the default segment size, a segment with a larger size will be created. -The encoding of pages is largely borrowed from [LevelDB's/RocksDB's write ahead log.][1] +The encoding of pages is largely borrowed from [LevelDB's/RocksDB's write ahead log.](https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format) Notable deviations are that the record fragment is encoded as: @@ -84,5 +84,3 @@ and specify an interval for which samples of a series got deleted. │ . . . │ └─────────────────────────────────────────────────────┘ ``` - -[1][https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format] diff --git a/fileutil/fileutil.go b/fileutil/fileutil.go index a3eb2a7ac..2158bfd26 100644 --- a/fileutil/fileutil.go +++ b/fileutil/fileutil.go @@ -43,3 +43,26 @@ func Rename(from, to string) error { } return pdir.Close() } + +// Replace moves a file or directory to a new location and deletes any previous data. +// It is not atomic. +func Replace(from, to string) error { + if err := os.RemoveAll(to); err != nil { + return nil + } + if err := os.Rename(from, to); err != nil { + return err + } + + // Directory was renamed; sync parent dir to persist rename. + pdir, err := OpenDir(filepath.Dir(to)) + if err != nil { + return err + } + + if err = Fsync(pdir); err != nil { + pdir.Close() + return err + } + return pdir.Close() +} diff --git a/wal/wal.go b/wal/wal.go index c2b333da0..e59b0e15d 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -393,11 +393,11 @@ func (w *WAL) flushPage(clear bool) error { type recType uint8 const ( - recPageTerm recType = 0 // rest of page is empty - recFull recType = 1 // full record - recFirst recType = 2 // first fragment of a record - recMiddle recType = 3 // middle fragments of a record - recLast recType = 4 // final fragment of a record + recPageTerm recType = 0 // Rest of page is empty. + recFull recType = 1 // Full record. + recFirst recType = 2 // First fragment of a record. + recMiddle recType = 3 // Middle fragments of a record. + recLast recType = 4 // Final fragment of a record. ) func (t recType) String() string { @@ -442,8 +442,8 @@ func (w *WAL) log(rec []byte, final bool) error { // If the record is too big to fit within pages in the current // segment, terminate the active segment and advance to the next one. // This ensures that records do not cross segment boundaries. - left := w.page.remaining() - recordHeaderSize // active page - left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // free pages + left := w.page.remaining() - recordHeaderSize // Active pages. + left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // Free pages. if len(rec) > left { if err := w.nextSegment(); err != nil { @@ -716,7 +716,7 @@ func (r *Reader) next() (err error) { // It's not strictly necessary but may catch sketchy state early. k := pageSize - (r.total % pageSize) if k == pageSize { - continue // initial 0 byte was last page byte + continue // Initial 0 byte was last page byte. } n, err := io.ReadFull(r.rdr, buf[:k]) if err != nil {