update checkpoint replay status (#8898)

* Consider  wal checkpoint replay status

Signed-off-by: XiaoYu Zhang <ideoutrea@163.com>

* Fix tests failed

Signed-off-by: XiaoYu Zhang <ideoutrea@163.com>

* Update checkpoint replay status

Signed-off-by: XiaoYu Zhang <ideoutrea@163.com>
This commit is contained in:
ide-rea 2021-07-13 18:08:07 +08:00 committed by GitHub
parent e98b639ac7
commit 14b24d15b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 12 additions and 9 deletions

View File

@ -805,6 +805,15 @@ func (h *Head) Init(minValidTime int64) error {
if err != nil && err != record.ErrNotFound {
return errors.Wrap(err, "find last checkpoint")
}
// Find the last segment.
_, endAt, e := wal.Segments(h.wal.Dir())
if e != nil {
return errors.Wrap(e, "finding WAL segments")
}
h.startWALReplayStatus(startFrom, endAt)
multiRef := map[uint64]uint64{}
if err == nil {
sr, err := wal.NewSegmentsReader(dir)
@ -822,22 +831,16 @@ func (h *Head) Init(minValidTime int64) error {
if err := h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks); err != nil {
return errors.Wrap(err, "backfill checkpoint")
}
h.updateWALReplayStatusRead(startFrom)
startFrom++
level.Info(h.logger).Log("msg", "WAL checkpoint loaded")
}
checkpointReplayDuration := time.Since(checkpointReplayStart)
walReplayStart := time.Now()
// Find the last segment.
_, last, err := wal.Segments(h.wal.Dir())
if err != nil {
return errors.Wrap(err, "finding WAL segments")
}
h.startWALReplayStatus(startFrom, last)
// Backfill segments from the most recent checkpoint onwards.
for i := startFrom; i <= last; i++ {
for i := startFrom; i <= endAt; i++ {
s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i))
if err != nil {
return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
@ -851,7 +854,7 @@ func (h *Head) Init(minValidTime int64) error {
if err != nil {
return err
}
level.Info(h.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", last)
level.Info(h.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", endAt)
h.updateWALReplayStatusRead(i)
}