From 8aedb7671e13e2657c85a533c763d449af54366c Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 27 Oct 2017 13:47:07 +0200 Subject: [PATCH 1/2] wal: synchronize background operations This adds an actor channel and thereby serializes all background operations through the run() method. Fixes an existing race. --- wal.go | 47 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/wal.go b/wal.go index 225851de1..8efa3cd28 100644 --- a/wal.go +++ b/wal.go @@ -190,6 +190,7 @@ type SegmentWAL struct { stopc chan struct{} donec chan struct{} + actorc chan func() error // sequantilized background operations buffers sync.Pool } @@ -213,6 +214,7 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration, flushInterval: flushInterval, donec: make(chan struct{}), stopc: make(chan struct{}), + actorc: make(chan func() error, 1), segmentSize: walSegmentSizeBytes, crc32: newCRC32(), } @@ -569,18 +571,21 @@ func (w *SegmentWAL) cut() error { // Finish last segment asynchronously to not block the WAL moving along // in the new segment. go func() { - off, err := hf.Seek(0, os.SEEK_CUR) - if err != nil { - level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) - } - if err := hf.Truncate(off); err != nil { - level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) - } - if err := hf.Sync(); err != nil { - level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) - } - if err := hf.Close(); err != nil { - level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + w.actorc <- func() error { + off, err := hf.Seek(0, os.SEEK_CUR) + if err != nil { + return errors.Wrapf(err, "finish old segment %s", hf.Name()) + } + if err := hf.Truncate(off); err != nil { + return errors.Wrapf(err, "finish old segment %s", hf.Name()) + } + if err := hf.Sync(); err != nil { + return errors.Wrapf(err, "finish old segment %s", hf.Name()) + } + if err := hf.Close(); err != nil { + return errors.Wrapf(err, "finish old segment %s", hf.Name()) + } + return nil } }() } @@ -595,8 +600,8 @@ func (w *SegmentWAL) cut() error { } go func() { - if err = w.dirFile.Sync(); err != nil { - level.Error(w.logger).Log("msg", "sync WAL directory", "err", err) + w.actorc <- func() error { + return errors.Wrap(w.dirFile.Sync(), "sync WAL directory") } }() @@ -675,9 +680,23 @@ func (w *SegmentWAL) run(interval time.Duration) { defer close(w.donec) for { + // Processing all enqueued operations has precedence over shutdown and + // background syncs. + select { + case f := <-w.actorc: + if err := f(); err != nil { + level.Error(w.logger).Log("msg", "operation failed", "err", err) + } + continue + default: + } select { case <-w.stopc: return + case f := <-w.actorc: + if err := f(); err != nil { + level.Error(w.logger).Log("msg", "operation failed", "err", err) + } case <-tick: if err := w.Sync(); err != nil { level.Error(w.logger).Log("msg", "sync failed", "err", err) From a993f0ccc03ad8a5641590addd82d2a2fdd90de0 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 1 Nov 2017 18:11:09 +0100 Subject: [PATCH 2/2] Fix typo --- wal.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wal.go b/wal.go index 8efa3cd28..41589d4bb 100644 --- a/wal.go +++ b/wal.go @@ -190,7 +190,7 @@ type SegmentWAL struct { stopc chan struct{} donec chan struct{} - actorc chan func() error // sequantilized background operations + actorc chan func() error // sequentialized background operations buffers sync.Pool }