diff --git a/compact.go b/compact.go index bb5d53e2e..80d33dff8 100644 --- a/compact.go +++ b/compact.go @@ -412,7 +412,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u var merr MultiError merr.Add(err) - if err != ErrCompactionCanceled { + if err != context.Canceled { for _, b := range bs { if err := b.setCompactionFailed(); err != nil { merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir())) @@ -481,20 +481,19 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { return w.ChunkWriter.WriteChunks(chunks...) } -// ErrCompactionCanceled is returned when the compaction was canceled during shutdown. -var ErrCompactionCanceled = errors.New("compaction cancelled") - // write creates a new block that is the union of the provided blocks into dir. // It cleans up all files of the old blocks after completing successfully. func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { dir := filepath.Join(dest, meta.ULID.String()) tmp := dir + ".tmp" var writers []io.Closer + var merr MultiError defer func(t time.Time) { + merr.Add(err) for _, w := range writers { - w.Close() + merr.Add(w.Close()) } - if err != nil { + if merr.Err() != nil { c.metrics.failed.Inc() // TODO(gouthamve): Handle error how? if err := os.RemoveAll(tmp); err != nil { @@ -542,16 +541,9 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return errors.Wrap(err, "write compaction") } - // Compaction was canceled so remove tmp folders and return early. select { case <-c.ctx.Done(): - for _, w := range writers { - w.Close() - } - if err := os.RemoveAll(tmp); err != nil { - level.Error(c.logger).Log("msg", "removed tmp folder after canceled compaction", "err", err.Error()) - } - return ErrCompactionCanceled + return c.ctx.Err() default: } @@ -560,9 +552,11 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // you cannot delete these unless they are closed and the defer is to // make sure they are closed if the function exits due to an error above. for _, w := range writers { - if err := w.Close(); err != nil { - return err - } + merr.Add(w.Close()) + } + writers = writers[:0] // Avoid closing the writers twice in the defer. + if merr.Err() != nil { + return merr.Err() } // Populated block is empty, so cleanup and exit. @@ -632,7 +626,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, for i, b := range blocks { select { case <-c.ctx.Done(): - return nil + return c.ctx.Err() default: } @@ -694,7 +688,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, for set.Next() { select { case <-c.ctx.Done(): - return nil + return c.ctx.Err() default: } diff --git a/db.go b/db.go index 9436809b7..8bec75447 100644 --- a/db.go +++ b/db.go @@ -129,7 +129,7 @@ type DB struct { autoCompact bool // Cancel a running compaction when a shutdown is initiated. - compactCnl func() + compactCancel context.CancelFunc } type dbMetrics struct { @@ -275,13 +275,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db db.lockf = lockf } - ctx, cnl := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) db.compactor, err = NewLeveledCompactor(ctx, r, l, opts.BlockRanges, db.chunkPool) if err != nil { - cnl() + cancel() return nil, errors.Wrap(err, "create leveled compactor") } - db.compactCnl = cnl + db.compactCancel = cancel segmentSize := wal.DefaultSegmentSize if opts.WALSegmentSize > 0 { @@ -819,7 +819,7 @@ func (db *DB) Head() *Head { // Close the partition. func (db *DB) Close() error { close(db.stopc) - db.compactCnl() + db.compactCancel() <-db.donec db.mtx.Lock()