Add Replace function

Signed-off-by: Fabian Reinartz <freinartz@google.com>
This commit is contained in:
Fabian Reinartz 2018-08-02 17:46:45 -04:00
parent b81e0fbf2a
commit f8ec0074e7
4 changed files with 35 additions and 14 deletions

View File

@ -37,7 +37,7 @@ type CheckpointStats struct {
DroppedTombstones int DroppedTombstones int
TotalSeries int // Processed series including dropped ones. TotalSeries int // Processed series including dropped ones.
TotalSamples int // Processed samples inlcuding dropped ones. TotalSamples int // Processed samples inlcuding dropped ones.
TotalTombstones int // Processed tombstones including droppes ones. TotalTombstones int // Processed tombstones including dropped ones.
} }
// LastCheckpoint returns the directory name of the most recent checkpoint. // LastCheckpoint returns the directory name of the most recent checkpoint.
@ -260,8 +260,8 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
if err := cp.Close(); err != nil { if err := cp.Close(); err != nil {
return nil, errors.Wrap(err, "close checkpoint") return nil, errors.Wrap(err, "close checkpoint")
} }
if err := fileutil.Rename(cpdirtmp, cpdir); err != nil { if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
return nil, errors.Wrap(err, "rename checkpoint file") return nil, errors.Wrap(err, "rename checkpoint directory")
} }
if err := w.Truncate(n + 1); err != nil { if err := w.Truncate(n + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint. // If truncating fails, we'll just try again at the next checkpoint.

View File

@ -7,7 +7,7 @@ may be partial. A WAL record is an opaque byte slice that gets split up into sub
should it exceed the remaining space of the current page. Records are never split across should it exceed the remaining space of the current page. Records are never split across
segment boundaries. If a single record exceeds the default segment size, a segment with segment boundaries. If a single record exceeds the default segment size, a segment with
a larger size will be created. a larger size will be created.
The encoding of pages is largely borrowed from [LevelDB's/RocksDB's write ahead log.][1] The encoding of pages is largely borrowed from [LevelDB's/RocksDB's write ahead log.](https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format)
Notable deviations are that the record fragment is encoded as: Notable deviations are that the record fragment is encoded as:
@ -84,5 +84,3 @@ and specify an interval for which samples of a series got deleted.
│ . . . │ │ . . . │
└─────────────────────────────────────────────────────┘ └─────────────────────────────────────────────────────┘
``` ```
[1][https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format]

View File

@ -43,3 +43,26 @@ func Rename(from, to string) error {
} }
return pdir.Close() return pdir.Close()
} }
// Replace moves a file or directory to a new location and deletes any previous data.
// It is not atomic.
func Replace(from, to string) error {
if err := os.RemoveAll(to); err != nil {
return nil
}
if err := os.Rename(from, to); err != nil {
return err
}
// Directory was renamed; sync parent dir to persist rename.
pdir, err := OpenDir(filepath.Dir(to))
if err != nil {
return err
}
if err = Fsync(pdir); err != nil {
pdir.Close()
return err
}
return pdir.Close()
}

View File

@ -393,11 +393,11 @@ func (w *WAL) flushPage(clear bool) error {
type recType uint8 type recType uint8
const ( const (
recPageTerm recType = 0 // rest of page is empty recPageTerm recType = 0 // Rest of page is empty.
recFull recType = 1 // full record recFull recType = 1 // Full record.
recFirst recType = 2 // first fragment of a record recFirst recType = 2 // First fragment of a record.
recMiddle recType = 3 // middle fragments of a record recMiddle recType = 3 // Middle fragments of a record.
recLast recType = 4 // final fragment of a record recLast recType = 4 // Final fragment of a record.
) )
func (t recType) String() string { func (t recType) String() string {
@ -442,8 +442,8 @@ func (w *WAL) log(rec []byte, final bool) error {
// If the record is too big to fit within pages in the current // If the record is too big to fit within pages in the current
// segment, terminate the active segment and advance to the next one. // segment, terminate the active segment and advance to the next one.
// This ensures that records do not cross segment boundaries. // This ensures that records do not cross segment boundaries.
left := w.page.remaining() - recordHeaderSize // active page left := w.page.remaining() - recordHeaderSize // Active pages.
left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // free pages left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // Free pages.
if len(rec) > left { if len(rec) > left {
if err := w.nextSegment(); err != nil { if err := w.nextSegment(); err != nil {
@ -716,7 +716,7 @@ func (r *Reader) next() (err error) {
// It's not strictly necessary but may catch sketchy state early. // It's not strictly necessary but may catch sketchy state early.
k := pageSize - (r.total % pageSize) k := pageSize - (r.total % pageSize)
if k == pageSize { if k == pageSize {
continue // initial 0 byte was last page byte continue // Initial 0 byte was last page byte.
} }
n, err := io.ReadFull(r.rdr, buf[:k]) n, err := io.ReadFull(r.rdr, buf[:k])
if err != nil { if err != nil {