add head compaction test (#7338)
This commit is contained in:
parent
b5d61fb66c
commit
ab6203b7c7
|
@ -2858,3 +2858,95 @@ func TestChunkReader_ConcurrentReads(t *testing.T) {
|
||||||
}
|
}
|
||||||
testutil.Ok(t, r.Close())
|
testutil.Ok(t, r.Close())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestCompactHead ensures that the head compaction
|
||||||
|
// creates a block that is ready for loading and
|
||||||
|
// does not cause data loss.
|
||||||
|
// This test:
|
||||||
|
// * opens a storage;
|
||||||
|
// * appends values;
|
||||||
|
// * compacts the head; and
|
||||||
|
// * queries the db to ensure the samples are present from the compacted head.
|
||||||
|
func TestCompactHead(t *testing.T) {
|
||||||
|
dbDir, err := ioutil.TempDir("", "testFlush")
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer func() { testutil.Ok(t, os.RemoveAll(dbDir)) }()
|
||||||
|
|
||||||
|
// Open a DB and append data to the WAL.
|
||||||
|
tsdbCfg := &Options{
|
||||||
|
RetentionDuration: int64(time.Hour * 24 * 15 / time.Millisecond),
|
||||||
|
NoLockfile: true,
|
||||||
|
MinBlockDuration: int64(time.Hour * 2 / time.Millisecond),
|
||||||
|
MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond),
|
||||||
|
WALCompression: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
app := db.Appender()
|
||||||
|
var expSamples []sample
|
||||||
|
maxt := 100
|
||||||
|
for i := 0; i < maxt; i++ {
|
||||||
|
val := rand.Float64()
|
||||||
|
_, err := app.Add(labels.FromStrings("a", "b"), int64(i), val)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
expSamples = append(expSamples, sample{int64(i), val})
|
||||||
|
}
|
||||||
|
testutil.Ok(t, app.Commit())
|
||||||
|
|
||||||
|
// Compact the Head to create a new block.
|
||||||
|
testutil.Ok(t, db.CompactHead(NewRangeHead(db.Head(), 0, int64(maxt)-1)))
|
||||||
|
testutil.Ok(t, db.Close())
|
||||||
|
|
||||||
|
// Delete everything but the new block and
|
||||||
|
// reopen the db to query it to ensure it includes the head data.
|
||||||
|
testutil.Ok(t, deleteNonBlocks(db.Dir()))
|
||||||
|
db, err = Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Equals(t, 1, len(db.Blocks()))
|
||||||
|
testutil.Equals(t, int64(maxt), db.Head().MinTime())
|
||||||
|
defer func() { testutil.Ok(t, db.Close()) }()
|
||||||
|
querier, err := db.Querier(context.Background(), 0, int64(maxt)-1)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer func() { testutil.Ok(t, querier.Close()) }()
|
||||||
|
|
||||||
|
seriesSet, _, err := querier.Select(false, nil, &labels.Matcher{Type: labels.MatchEqual, Name: "a", Value: "b"})
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
var actSamples []sample
|
||||||
|
|
||||||
|
for seriesSet.Next() {
|
||||||
|
series := seriesSet.At().Iterator()
|
||||||
|
for series.Next() {
|
||||||
|
time, val := series.At()
|
||||||
|
actSamples = append(actSamples, sample{int64(time), val})
|
||||||
|
}
|
||||||
|
testutil.Ok(t, series.Err())
|
||||||
|
}
|
||||||
|
testutil.Equals(t, expSamples, actSamples)
|
||||||
|
testutil.Ok(t, seriesSet.Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
func deleteNonBlocks(dbDir string) error {
|
||||||
|
dirs, err := ioutil.ReadDir(dbDir)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, dir := range dirs {
|
||||||
|
if ok := isBlockDir(dir); !ok {
|
||||||
|
if err := os.RemoveAll(filepath.Join(dbDir, dir.Name())); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dirs, err = ioutil.ReadDir(dbDir)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, dir := range dirs {
|
||||||
|
if ok := isBlockDir(dir); !ok {
|
||||||
|
return errors.Errorf("root folder:%v still hase non block directory:%v", dbDir, dir.Name())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue