diff --git a/head.go b/head.go index db548b6c7..a74552bca 100644 --- a/head.go +++ b/head.go @@ -240,11 +240,16 @@ func (h *Head) Truncate(mint int64) error { if mint%h.chunkRange != 0 { return errors.Errorf("truncating at %d not aligned", mint) } - if h.minTime >= mint { + if h.MinTime() >= mint { return nil } atomic.StoreInt64(&h.minTime, mint) + // Ensure that max time is at least as high as min time. + for h.MaxTime() < mint { + atomic.CompareAndSwapInt64(&h.maxTime, h.MaxTime(), mint) + } + // This was an initial call to Truncate after loading blocks on startup. // We haven't read back the WAL yet, so do not attempt to truncate it. if initialize { @@ -279,15 +284,15 @@ func (h *Head) Truncate(mint int64) error { // Returns true if the initialization took an effect. func (h *Head) initTime(t int64) (initialized bool) { // In the init state, the head has a high timestamp of math.MinInt64. - if h.MaxTime() != math.MinInt64 { - return false - } mint, _ := rangeForTimestamp(t, h.chunkRange) - if !atomic.CompareAndSwapInt64(&h.maxTime, math.MinInt64, t) { + if !atomic.CompareAndSwapInt64(&h.minTime, math.MinInt64, mint) { return false } - atomic.StoreInt64(&h.minTime, mint-h.chunkRange) + // Ensure that max time is initialized to at least the min time we just set. + // Concurrent appenders may already have set it to a higher value. + atomic.CompareAndSwapInt64(&h.maxTime, math.MinInt64, t) + return true } @@ -335,7 +340,7 @@ func (h *Head) Appender() Appender { // The head cache might not have a starting point yet. The init appender // picks up the first appended timestamp as the base. - if h.MaxTime() == math.MinInt64 { + if h.MinTime() == math.MinInt64 { return &initAppender{head: h} } return h.appender() diff --git a/wal.go b/wal.go index 81406947e..1dadc8f2c 100644 --- a/wal.go +++ b/wal.go @@ -308,9 +308,11 @@ func (w *SegmentWAL) Truncate(mint int64, p Postings) error { if err != nil { return errors.Wrap(err, "create compaction segment") } - csf := newSegmentFile(f) - - activeSeries := []RefSeries{} + var ( + csf = newSegmentFile(f) + crc32 = newCRC32() + activeSeries = []RefSeries{} + ) Loop: for r.next() { @@ -337,7 +339,7 @@ Loop: buf := w.getBuffer() flag = w.encodeSeries(buf, activeSeries) - _, err = w.writeTo(csf, WALEntrySeries, flag, buf.get()) + _, err = w.writeTo(csf, crc32, WALEntrySeries, flag, buf.get()) w.putBuffer(buf) if err != nil { @@ -355,20 +357,17 @@ Loop: if err := csf.Truncate(off); err != nil { return err } - if err := csf.Close(); err != nil { - return errors.Wrap(err, "close tmp file") - } + csf.Sync() + csf.Close() + if err := renameFile(csf.Name(), candidates[0].Name()); err != nil { return err } - for _, f := range candidates[1:] { - if err := f.Close(); err != nil { - return errors.Wrap(err, "close obsolete WAL segment file") - } if err := os.RemoveAll(f.Name()); err != nil { return errors.Wrap(err, "delete WAL segment file") } + f.Close() } if err := w.dirFile.Sync(); err != nil { return err @@ -381,9 +380,7 @@ Loop: return err } // We don't need it to be open. - if err := csf.Close(); err != nil { - return err - } + csf.Close() w.mtx.Lock() w.files = append([]*segmentFile{csf}, w.files[len(candidates):]...) @@ -674,19 +671,19 @@ func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error { return err } } - n, err := w.writeTo(w.cur, t, flag, buf) + n, err := w.writeTo(w.cur, w.crc32, t, flag, buf) w.curN += int64(n) return err } -func (w *SegmentWAL) writeTo(wr io.Writer, t WALEntryType, flag uint8, buf []byte) (int, error) { +func (w *SegmentWAL) writeTo(wr io.Writer, crc32 hash.Hash, t WALEntryType, flag uint8, buf []byte) (int, error) { if len(buf) == 0 { return 0, nil } - w.crc32.Reset() - wr = io.MultiWriter(w.crc32, wr) + crc32.Reset() + wr = io.MultiWriter(crc32, wr) var b [6]byte b[0] = byte(t) @@ -702,7 +699,7 @@ func (w *SegmentWAL) writeTo(wr io.Writer, t WALEntryType, flag uint8, buf []byt if err != nil { return n1 + n2, err } - n3, err := wr.Write(w.crc32.Sum(b[:0])) + n3, err := wr.Write(crc32.Sum(b[:0])) return n1 + n2 + n3, err } diff --git a/wal_test.go b/wal_test.go index 8c856e25c..17a71da75 100644 --- a/wal_test.go +++ b/wal_test.go @@ -122,8 +122,8 @@ func TestSegmentWAL_cut(t *testing.T) { func TestSegmentWAL_Truncate(t *testing.T) { const ( - numMetrics = 50 - batch = 10 + numMetrics = 20000 + batch = 100 ) series, err := readPrometheusLabels("testdata/20k.series", numMetrics) require.NoError(t, err) @@ -134,7 +134,7 @@ func TestSegmentWAL_Truncate(t *testing.T) { w, err := OpenSegmentWAL(dir, nil, 0) require.NoError(t, err) - w.segmentSize = 1000 + w.segmentSize = 10000 for i := 0; i < numMetrics; i += batch { var rs []RefSeries