diff --git a/block.go b/block.go index d9570bdd6e..2d9c81df91 100644 --- a/block.go +++ b/block.go @@ -28,6 +28,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" + tsdb_errors "github.com/prometheus/tsdb/errors" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" ) @@ -244,7 +245,7 @@ func writeMetaFile(dir string, meta *BlockMeta) error { enc := json.NewEncoder(f) enc.SetIndent("", "\t") - var merr MultiError + var merr tsdb_errors.MultiError if merr.Add(enc.Encode(meta)); merr.Err() != nil { merr.Add(f.Close()) @@ -283,7 +284,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er var closers []io.Closer defer func() { if err != nil { - var merr MultiError + var merr tsdb_errors.MultiError merr.Add(err) merr.Add(closeAll(closers)) err = merr.Err() @@ -350,7 +351,7 @@ func (pb *Block) Close() error { pb.pendingReaders.Wait() - var merr MultiError + var merr tsdb_errors.MultiError merr.Add(pb.chunkr.Close()) merr.Add(pb.indexr.Close()) diff --git a/block_test.go b/block_test.go index dd0972d6e8..1f091693db 100644 --- a/block_test.go +++ b/block_test.go @@ -32,7 +32,9 @@ import ( func TestBlockMetaMustNeverBeVersion2(t *testing.T) { dir, err := ioutil.TempDir("", "metaversion") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() testutil.Ok(t, writeMetaFile(dir, &BlockMeta{})) @@ -44,7 +46,9 @@ func TestBlockMetaMustNeverBeVersion2(t *testing.T) { func TestSetCompactionFailed(t *testing.T) { tmpdir, err := ioutil.TempDir("", "test") testutil.Ok(t, err) - defer os.RemoveAll(tmpdir) + defer func() { + testutil.Ok(t, os.RemoveAll(tmpdir)) + }() blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 0)) b, err := OpenBlock(nil, blockDir, nil) @@ -60,6 +64,19 @@ func TestSetCompactionFailed(t *testing.T) { testutil.Ok(t, b.Close()) } +func TestCreateBlock(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "test") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(tmpdir)) + }() + b, err := OpenBlock(nil, createBlock(t, tmpdir, genSeries(1, 1, 0, 10)), nil) + if err == nil { + testutil.Ok(t, b.Close()) + } + testutil.Ok(t, err) +} + // createBlock creates a block with given set of series and returns its dir. func createBlock(tb testing.TB, dir string, series []Series) string { head, err := NewHead(nil, nil, nil, 2*60*60*1000) diff --git a/checkpoint.go b/checkpoint.go index 1c6239232e..d8dee28aa8 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -25,6 +25,7 @@ import ( "strings" "github.com/pkg/errors" + tsdb_errors "github.com/prometheus/tsdb/errors" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/wal" ) @@ -67,7 +68,7 @@ func LastCheckpoint(dir string) (string, int, error) { // DeleteCheckpoints deletes all checkpoints in a directory below a given index. func DeleteCheckpoints(dir string, maxIndex int) error { - var errs MultiError + var errs tsdb_errors.MultiError files, err := ioutil.ReadDir(dir) if err != nil { diff --git a/checkpoint_test.go b/checkpoint_test.go index 8b13c152ae..fe8ee4e93e 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -32,7 +32,9 @@ import ( func TestLastCheckpoint(t *testing.T) { dir, err := ioutil.TempDir("", "test_checkpoint") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() _, _, err = LastCheckpoint(dir) testutil.Equals(t, ErrNotFound, err) @@ -65,7 +67,9 @@ func TestLastCheckpoint(t *testing.T) { func TestDeleteCheckpoints(t *testing.T) { dir, err := ioutil.TempDir("", "test_checkpoint") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() testutil.Ok(t, DeleteCheckpoints(dir, 0)) @@ -84,7 +88,9 @@ func TestDeleteCheckpoints(t *testing.T) { func TestCheckpoint(t *testing.T) { dir, err := ioutil.TempDir("", "test_checkpoint") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() var enc RecordEncoder // Create a dummy segment to bump the initial number. @@ -188,7 +194,9 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) { // Create a new wal with an invalid records. dir, err := ioutil.TempDir("", "test_checkpoint") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() w, err := wal.NewSize(nil, nil, dir, 64*1024) testutil.Ok(t, err) testutil.Ok(t, w.Log([]byte{99})) diff --git a/compact.go b/compact.go index bf07d25e89..c8d293fe00 100644 --- a/compact.go +++ b/compact.go @@ -31,6 +31,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" + tsdb_errors "github.com/prometheus/tsdb/errors" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" @@ -451,7 +452,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u return uid, nil } - var merr MultiError + var merr tsdb_errors.MultiError merr.Add(err) if err != context.Canceled { for _, b := range bs { @@ -529,7 +530,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe tmp := dir + ".tmp" var closers []io.Closer defer func(t time.Time) { - var merr MultiError + var merr tsdb_errors.MultiError merr.Add(err) merr.Add(closeAll(closers)) err = merr.Err() @@ -592,7 +593,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // though these are covered under defer. This is because in Windows, // you cannot delete these unless they are closed and the defer is to // make sure they are closed if the function exits due to an error above. - var merr MultiError + var merr tsdb_errors.MultiError for _, w := range closers { merr.Add(w.Close()) } @@ -658,7 +659,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, overlapping bool ) defer func() { - var merr MultiError + var merr tsdb_errors.MultiError merr.Add(err) merr.Add(closeAll(closers)) err = merr.Err() diff --git a/compact_test.go b/compact_test.go index 1e06b39fc7..1e6a962990 100644 --- a/compact_test.go +++ b/compact_test.go @@ -421,7 +421,9 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) { tmpdir, err := ioutil.TempDir("", "test") testutil.Ok(t, err) - defer os.RemoveAll(tmpdir) + defer func() { + testutil.Ok(t, os.RemoveAll(tmpdir)) + }() testutil.NotOk(t, compactor.write(tmpdir, &BlockMeta{}, erringBReader{})) _, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + ".tmp") @@ -907,7 +909,9 @@ func TestDisableAutoCompactions(t *testing.T) { func TestCancelCompactions(t *testing.T) { tmpdir, err := ioutil.TempDir("", "testCancelCompaction") testutil.Ok(t, err) - defer os.RemoveAll(tmpdir) + defer func() { + testutil.Ok(t, os.RemoveAll(tmpdir)) + }() // Create some blocks to fall within the compaction range. createBlock(t, tmpdir, genSeries(10, 10000, 0, 1000)) @@ -918,7 +922,9 @@ func TestCancelCompactions(t *testing.T) { tmpdirCopy := tmpdir + "Copy" err = fileutil.CopyDirs(tmpdir, tmpdirCopy) testutil.Ok(t, err) - defer os.RemoveAll(tmpdirCopy) + defer func() { + testutil.Ok(t, os.RemoveAll(tmpdirCopy)) + }() // Measure the compaction time without interupting it. var timeCompactionUninterrupted time.Duration diff --git a/db.go b/db.go index 275d96eb7c..a3230eac4a 100644 --- a/db.go +++ b/db.go @@ -15,7 +15,6 @@ package tsdb import ( - "bytes" "context" "fmt" "io" @@ -36,6 +35,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/chunkenc" + tsdb_errors "github.com/prometheus/tsdb/errors" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/wal" @@ -861,7 +861,7 @@ func (db *DB) Close() error { g.Go(pb.Close) } - var merr MultiError + var merr tsdb_errors.MultiError merr.Add(g.Wait()) @@ -1089,50 +1089,8 @@ func nextSequenceFile(dir string) (string, int, error) { return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil } -// The MultiError type implements the error interface, and contains the -// Errors used to construct it. -type MultiError []error - -// Returns a concatenated string of the contained errors -func (es MultiError) Error() string { - var buf bytes.Buffer - - if len(es) > 1 { - fmt.Fprintf(&buf, "%d errors: ", len(es)) - } - - for i, err := range es { - if i != 0 { - buf.WriteString("; ") - } - buf.WriteString(err.Error()) - } - - return buf.String() -} - -// Add adds the error to the error list if it is not nil. -func (es *MultiError) Add(err error) { - if err == nil { - return - } - if merr, ok := err.(MultiError); ok { - *es = append(*es, merr...) - } else { - *es = append(*es, err) - } -} - -// Err returns the error list as an error or nil if it is empty. -func (es MultiError) Err() error { - if len(es) == 0 { - return nil - } - return es -} - func closeAll(cs []io.Closer) error { - var merr MultiError + var merr tsdb_errors.MultiError for _, c := range cs { merr.Add(c.Close()) diff --git a/db_test.go b/db_test.go index eda807f3f6..26325d5fed 100644 --- a/db_test.go +++ b/db_test.go @@ -437,7 +437,9 @@ func TestDB_Snapshot(t *testing.T) { snap, err := ioutil.TempDir("", "snap") testutil.Ok(t, err) - defer os.RemoveAll(snap) + defer func() { + testutil.Ok(t, os.RemoveAll(snap)) + }() testutil.Ok(t, db.Snapshot(snap, true)) testutil.Ok(t, db.Close()) @@ -504,7 +506,9 @@ Outer: snap, err := ioutil.TempDir("", "snap") testutil.Ok(t, err) - defer os.RemoveAll(snap) + defer func() { + testutil.Ok(t, os.RemoveAll(snap)) + }() testutil.Ok(t, db.Snapshot(snap, true)) testutil.Ok(t, db.Close()) @@ -800,7 +804,9 @@ func TestTombstoneClean(t *testing.T) { snap, err := ioutil.TempDir("", "snap") testutil.Ok(t, err) - defer os.RemoveAll(snap) + defer func() { + testutil.Ok(t, os.RemoveAll(snap)) + }() testutil.Ok(t, db.Snapshot(snap, true)) testutil.Ok(t, db.Close()) @@ -1323,10 +1329,13 @@ func TestInitializeHeadTimestamp(t *testing.T) { t.Run("clean", func(t *testing.T) { dir, err := ioutil.TempDir("", "test_head_init") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() db, err := Open(dir, nil, nil, nil) testutil.Ok(t, err) + defer db.Close() // Should be set to init values if no WAL or blocks exist so far. testutil.Equals(t, int64(math.MaxInt64), db.head.MinTime()) @@ -1343,7 +1352,9 @@ func TestInitializeHeadTimestamp(t *testing.T) { t.Run("wal-only", func(t *testing.T) { dir, err := ioutil.TempDir("", "test_head_init") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) w, err := wal.New(nil, nil, path.Join(dir, "wal")) @@ -1365,6 +1376,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { db, err := Open(dir, nil, nil, nil) testutil.Ok(t, err) + defer db.Close() testutil.Equals(t, int64(5000), db.head.MinTime()) testutil.Equals(t, int64(15000), db.head.MaxTime()) @@ -1372,12 +1384,15 @@ func TestInitializeHeadTimestamp(t *testing.T) { t.Run("existing-block", func(t *testing.T) { dir, err := ioutil.TempDir("", "test_head_init") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() createBlock(t, dir, genSeries(1, 1, 1000, 2000)) db, err := Open(dir, nil, nil, nil) testutil.Ok(t, err) + defer db.Close() testutil.Equals(t, int64(2000), db.head.MinTime()) testutil.Equals(t, int64(2000), db.head.MaxTime()) @@ -1385,7 +1400,9 @@ func TestInitializeHeadTimestamp(t *testing.T) { t.Run("existing-block-and-wal", func(t *testing.T) { dir, err := ioutil.TempDir("", "test_head_init") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() createBlock(t, dir, genSeries(1, 1, 1000, 6000)) @@ -1411,6 +1428,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { db, err := Open(dir, nil, r, nil) testutil.Ok(t, err) + defer db.Close() testutil.Equals(t, int64(6000), db.head.MinTime()) testutil.Equals(t, int64(15000), db.head.MaxTime()) diff --git a/errors/errors.go b/errors/errors.go new file mode 100644 index 0000000000..69d3662480 --- /dev/null +++ b/errors/errors.go @@ -0,0 +1,62 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package errors + +import ( + "bytes" + "fmt" +) + +// The MultiError type implements the error interface, and contains the +// Errors used to construct it. +type MultiError []error + +// Returns a concatenated string of the contained errors +func (es MultiError) Error() string { + var buf bytes.Buffer + + if len(es) > 1 { + fmt.Fprintf(&buf, "%d errors: ", len(es)) + } + + for i, err := range es { + if i != 0 { + buf.WriteString("; ") + } + buf.WriteString(err.Error()) + } + + return buf.String() +} + +// Add adds the error to the error list if it is not nil. +func (es *MultiError) Add(err error) { + if err == nil { + return + } + if merr, ok := err.(MultiError); ok { + *es = append(*es, merr...) + } else { + *es = append(*es, err) + } +} + +// Err returns the error list as an error or nil if it is empty. +func (es MultiError) Err() error { + if len(es) == 0 { + return nil + } + return es +} diff --git a/head_test.go b/head_test.go index 679e8c832a..7e8730b915 100644 --- a/head_test.go +++ b/head_test.go @@ -117,7 +117,9 @@ func TestHead_ReadWAL(t *testing.T) { } dir, err := ioutil.TempDir("", "test_read_wal") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() w, err := wal.New(nil, nil, dir) testutil.Ok(t, err) @@ -281,7 +283,9 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { } dir, err := ioutil.TempDir("", "test_delete_series") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() w, err := wal.New(nil, nil, dir) testutil.Ok(t, err) @@ -337,13 +341,17 @@ Outer: for _, c := range cases { dir, err := ioutil.TempDir("", "test_wal_reload") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() w, err := wal.New(nil, nil, path.Join(dir, "wal")) testutil.Ok(t, err) + defer w.Close() head, err := NewHead(nil, nil, w, 1000) testutil.Ok(t, err) + defer head.Close() app := head.Appender() for _, smpl := range smplsAll { @@ -361,8 +369,10 @@ Outer: // Compare the samples for both heads - before and after the reload. reloadedW, err := wal.New(nil, nil, w.Dir()) // Use a new wal to ensure deleted samples are gone even after a reload. testutil.Ok(t, err) + defer reloadedW.Close() reloadedHead, err := NewHead(nil, nil, reloadedW, 1000) testutil.Ok(t, err) + defer reloadedHead.Close() testutil.Ok(t, reloadedHead.Init(0)) for _, h := range []*Head{head, reloadedHead} { indexr, err := h.Index() @@ -543,7 +553,9 @@ func TestDelete_e2e(t *testing.T) { seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{} } dir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() hb, err := NewHead(nil, nil, nil, 100000) testutil.Ok(t, err) defer hb.Close() @@ -907,7 +919,9 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { func TestHead_LogRollback(t *testing.T) { dir, err := ioutil.TempDir("", "wal_rollback") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() w, err := wal.New(nil, nil, dir) testutil.Ok(t, err) @@ -974,7 +988,9 @@ func TestWalRepair(t *testing.T) { t.Run(name, func(t *testing.T) { dir, err := ioutil.TempDir("", "wal_head_repair") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() w, err := wal.New(nil, nil, dir) testutil.Ok(t, err) diff --git a/index/index_test.go b/index/index_test.go index 1395a34a50..bc75a8bbc1 100644 --- a/index/index_test.go +++ b/index/index_test.go @@ -150,7 +150,9 @@ func (m mockIndex) LabelIndices() ([][]string, error) { func TestIndexRW_Create_Open(t *testing.T) { dir, err := ioutil.TempDir("", "test_index_create") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() fn := filepath.Join(dir, indexFilename) @@ -168,6 +170,7 @@ func TestIndexRW_Create_Open(t *testing.T) { testutil.Ok(t, err) _, err = f.WriteAt([]byte{0, 0}, 0) testutil.Ok(t, err) + f.Close() _, err = NewFileReader(dir) testutil.NotOk(t, err) @@ -176,7 +179,9 @@ func TestIndexRW_Create_Open(t *testing.T) { func TestIndexRW_Postings(t *testing.T) { dir, err := ioutil.TempDir("", "test_index_postings") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() fn := filepath.Join(dir, indexFilename) @@ -236,7 +241,9 @@ func TestIndexRW_Postings(t *testing.T) { func TestPersistence_index_e2e(t *testing.T) { dir, err := ioutil.TempDir("", "test_persistence_e2e") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() lbls, err := labels.ReadLabels(filepath.Join("..", "testdata", "20kseries.json"), 20000) testutil.Ok(t, err) diff --git a/querier.go b/querier.go index 3e8cd77ca5..ab86cd655b 100644 --- a/querier.go +++ b/querier.go @@ -21,6 +21,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" + tsdb_errors "github.com/prometheus/tsdb/errors" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" ) @@ -134,7 +135,7 @@ func (q *querier) sel(qs []Querier, ms []labels.Matcher) (SeriesSet, error) { } func (q *querier) Close() error { - var merr MultiError + var merr tsdb_errors.MultiError for _, bq := range q.blocks { merr.Add(bq.Close()) @@ -251,7 +252,7 @@ func (q *blockQuerier) LabelValuesFor(string, labels.Label) ([]string, error) { } func (q *blockQuerier) Close() error { - var merr MultiError + var merr tsdb_errors.MultiError merr.Add(q.index.Close()) merr.Add(q.chunks.Close()) diff --git a/querier_test.go b/querier_test.go index ee426d0442..0ce114d9bf 100644 --- a/querier_test.go +++ b/querier_test.go @@ -1232,7 +1232,9 @@ func BenchmarkPersistedQueries(b *testing.B) { b.Run(fmt.Sprintf("series=%d,samplesPerSeries=%d", nSeries, nSamples), func(b *testing.B) { dir, err := ioutil.TempDir("", "bench_persisted") testutil.Ok(b, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(b, os.RemoveAll(dir)) + }() block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, 1, int64(nSamples))), nil) testutil.Ok(b, err) diff --git a/repair_test.go b/repair_test.go index cbe21691e6..19b663fea5 100644 --- a/repair_test.go +++ b/repair_test.go @@ -70,7 +70,9 @@ func TestRepairBadIndexVersion(t *testing.T) { // Touch chunks dir in block. os.MkdirAll(filepath.Join(dbDir, "chunks"), 0777) - defer os.RemoveAll(filepath.Join(dbDir, "chunks")) + defer func() { + testutil.Ok(t, os.RemoveAll(filepath.Join(dbDir, "chunks"))) + }() r, err := index.NewFileReader(filepath.Join(dbDir, indexFilename)) testutil.Ok(t, err) @@ -89,7 +91,9 @@ func TestRepairBadIndexVersion(t *testing.T) { if err = fileutil.CopyDirs(dbDir, tmpDbDir); err != nil { t.Fatal(err) } - defer os.RemoveAll(tmpDir) + defer func() { + testutil.Ok(t, os.RemoveAll(tmpDir)) + }() // On DB opening all blocks in the base dir should be repaired. db, err := Open(tmpDir, nil, nil, nil) testutil.Ok(t, err) @@ -97,6 +101,7 @@ func TestRepairBadIndexVersion(t *testing.T) { r, err = index.NewFileReader(filepath.Join(tmpDbDir, indexFilename)) testutil.Ok(t, err) + defer r.Close() p, err = r.Postings("b", "1") testutil.Ok(t, err) res := []labels.Labels{} diff --git a/tombstones_test.go b/tombstones_test.go index 95ef42f8fe..80d5ef2040 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -26,7 +26,9 @@ import ( func TestWriteAndReadbackTombStones(t *testing.T) { tmpdir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(tmpdir) + defer func() { + testutil.Ok(t, os.RemoveAll(tmpdir)) + }() ref := uint64(0) diff --git a/wal/reader_test.go b/wal/reader_test.go index d782ae804f..1178aa5ef8 100644 --- a/wal/reader_test.go +++ b/wal/reader_test.go @@ -30,6 +30,7 @@ import ( "time" "github.com/go-kit/kit/log" + tsdb_errors "github.com/prometheus/tsdb/errors" "github.com/prometheus/tsdb/testutil" ) @@ -268,21 +269,43 @@ func generateRandomEntries(w *WAL, records chan []byte) error { return w.Log(recs...) } -func allSegments(dir string) (io.Reader, error) { +type multiReadCloser struct { + reader io.Reader + closers []io.Closer +} + +func (m *multiReadCloser) Read(p []byte) (n int, err error) { + return m.reader.Read(p) +} +func (m *multiReadCloser) Close() error { + var merr tsdb_errors.MultiError + for _, closer := range m.closers { + merr.Add(closer.Close()) + } + return merr.Err() +} + +func allSegments(dir string) (io.ReadCloser, error) { seg, err := listSegments(dir) if err != nil { return nil, err } var readers []io.Reader + var closers []io.Closer for _, r := range seg { f, err := os.Open(filepath.Join(dir, r.name)) if err != nil { return nil, err } readers = append(readers, f) + closers = append(closers, f) } - return io.MultiReader(readers...), nil + + return &multiReadCloser{ + reader: io.MultiReader(readers...), + closers: closers, + }, nil } func TestReaderFuzz(t *testing.T) { @@ -290,7 +313,9 @@ func TestReaderFuzz(t *testing.T) { t.Run(name, func(t *testing.T) { dir, err := ioutil.TempDir("", "wal_fuzz_live") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() w, err := NewSize(nil, nil, dir, 128*pageSize) testutil.Ok(t, err) @@ -306,6 +331,7 @@ func TestReaderFuzz(t *testing.T) { sr, err := allSegments(w.Dir()) testutil.Ok(t, err) + defer sr.Close() reader := fn(sr) for expected := range input { @@ -321,10 +347,13 @@ func TestReaderFuzz_Live(t *testing.T) { logger := testutil.NewLogger(t) dir, err := ioutil.TempDir("", "wal_fuzz_live") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() w, err := NewSize(nil, nil, dir, 128*pageSize) testutil.Ok(t, err) + defer w.Close() // In the background, generate a stream of random records and write them // to the WAL. @@ -343,6 +372,7 @@ func TestReaderFuzz_Live(t *testing.T) { seg, err := OpenReadSegment(SegmentName(dir, m)) testutil.Ok(t, err) + defer seg.Close() r := NewLiveReader(logger, seg) segmentTicker := time.NewTicker(100 * time.Millisecond) @@ -379,6 +409,7 @@ outer: seg, err = OpenReadSegment(SegmentName(dir, seg.i+1)) testutil.Ok(t, err) + defer seg.Close() r = NewLiveReader(logger, seg) case <-readTicker.C: @@ -399,7 +430,9 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) { logger := testutil.NewLogger(t) dir, err := ioutil.TempDir("", "wal_live_corrupt") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() w, err := NewSize(nil, nil, dir, pageSize) testutil.Ok(t, err) @@ -429,6 +462,7 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) { seg, err := OpenReadSegment(SegmentName(dir, m)) testutil.Ok(t, err) + defer seg.Close() r := NewLiveReader(logger, seg) testutil.Assert(t, r.Next() == false, "expected no records") @@ -440,7 +474,9 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) { logger := testutil.NewLogger(t) dir, err := ioutil.TempDir("", "wal_live_corrupt") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() w, err := NewSize(nil, nil, dir, pageSize*2) testutil.Ok(t, err) @@ -474,6 +510,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) { seg, err := OpenReadSegment(SegmentName(dir, m)) testutil.Ok(t, err) + defer seg.Close() r := NewLiveReader(logger, seg) testutil.Assert(t, r.Next() == false, "expected no records") diff --git a/wal/wal_test.go b/wal/wal_test.go index 16d2775391..e7b893a36f 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -109,7 +109,9 @@ func TestWAL_Repair(t *testing.T) { t.Run(name, func(t *testing.T) { dir, err := ioutil.TempDir("", "wal_repair") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() // We create 3 segments with 3 records each and // then corrupt a given record in a given segment. @@ -138,6 +140,7 @@ func TestWAL_Repair(t *testing.T) { w, err = NewSize(nil, nil, dir, segSize) testutil.Ok(t, err) + defer w.Close() sr, err := NewSegmentsReader(dir) testutil.Ok(t, err) @@ -151,6 +154,7 @@ func TestWAL_Repair(t *testing.T) { testutil.Ok(t, w.Repair(r.Err())) sr, err = NewSegmentsReader(dir) testutil.Ok(t, err) + defer sr.Close() r = NewReader(sr) var result [][]byte @@ -181,7 +185,9 @@ func TestWAL_Repair(t *testing.T) { func TestCorruptAndCarryOn(t *testing.T) { dir, err := ioutil.TempDir("", "wal_repair") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() var ( logger = testutil.NewLogger(t) @@ -295,13 +301,16 @@ func TestCorruptAndCarryOn(t *testing.T) { testutil.Equals(t, 9, i, "wrong number of records") testutil.Assert(t, !reader.Next(), "unexpected record") testutil.Equals(t, nil, reader.Err()) + sr.Close() } } func BenchmarkWAL_LogBatched(b *testing.B) { dir, err := ioutil.TempDir("", "bench_logbatch") testutil.Ok(b, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(b, os.RemoveAll(dir)) + }() w, err := New(nil, nil, "testdir") testutil.Ok(b, err) @@ -329,7 +338,9 @@ func BenchmarkWAL_LogBatched(b *testing.B) { func BenchmarkWAL_Log(b *testing.B) { dir, err := ioutil.TempDir("", "bench_logsingle") testutil.Ok(b, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(b, os.RemoveAll(dir)) + }() w, err := New(nil, nil, "testdir") testutil.Ok(b, err) diff --git a/wal_test.go b/wal_test.go index fcda65b417..7b9f965269 100644 --- a/wal_test.go +++ b/wal_test.go @@ -36,7 +36,9 @@ import ( func TestSegmentWAL_cut(t *testing.T) { tmpdir, err := ioutil.TempDir("", "test_wal_cut") testutil.Ok(t, err) - defer os.RemoveAll(tmpdir) + defer func() { + testutil.Ok(t, os.RemoveAll(tmpdir)) + }() // This calls cut() implicitly the first time without a previous tail. w, err := OpenSegmentWAL(tmpdir, nil, 0, nil) @@ -84,7 +86,9 @@ func TestSegmentWAL_Truncate(t *testing.T) { dir, err := ioutil.TempDir("", "test_wal_log_truncate") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() w, err := OpenSegmentWAL(dir, nil, 0, nil) testutil.Ok(t, err) @@ -163,7 +167,9 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { dir, err := ioutil.TempDir("", "test_wal_log_restore") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() var ( recordedSeries [][]RefSeries @@ -270,7 +276,9 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { func TestWALRestoreCorrupted_invalidSegment(t *testing.T) { dir, err := ioutil.TempDir("", "test_wal_log_restore") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() wal, err := OpenSegmentWAL(dir, nil, 0, nil) testutil.Ok(t, err) @@ -367,7 +375,9 @@ func TestWALRestoreCorrupted(t *testing.T) { // for the purpose of this test. dir, err := ioutil.TempDir("", "test_corrupted") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() w, err := OpenSegmentWAL(dir, nil, 0, nil) testutil.Ok(t, err) @@ -442,7 +452,9 @@ func TestMigrateWAL_Empty(t *testing.T) { // which is valid in the new format. dir, err := ioutil.TempDir("", "walmigrate") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() wdir := path.Join(dir, "wal") @@ -457,7 +469,9 @@ func TestMigrateWAL_Empty(t *testing.T) { func TestMigrateWAL_Fuzz(t *testing.T) { dir, err := ioutil.TempDir("", "walmigrate") testutil.Ok(t, err) - defer os.RemoveAll(dir) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() wdir := path.Join(dir, "wal")