Replay WBL even if OOO Time Window is 0 (#296)
* Replay WBL even if OOO Time Window is 0 Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Apply feedback Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
parent
21f2680b45
commit
ff0dc75758
|
@ -787,7 +787,12 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if opts.OutOfOrderTimeWindow > 0 {
|
// Check if there is a WBL on disk, in which case we should replay that data.
|
||||||
|
wblSize, err := fileutil.DirSize(wblDir)
|
||||||
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if opts.OutOfOrderTimeWindow > 0 || wblSize > 0 {
|
||||||
wblog, err = wal.NewSize(l, r, wblDir, segmentSize, opts.WALCompression)
|
wblog, err = wal.NewSize(l, r, wblDir, segmentSize, opts.WALCompression)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -5291,3 +5291,64 @@ func TestNoGapAfterRestartWithOOO(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWblReplayAfterOOODisableAndRestart(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
opts := DefaultOptions()
|
||||||
|
opts.OutOfOrderTimeWindow = 60 * time.Minute.Milliseconds()
|
||||||
|
opts.AllowOverlappingQueries = true
|
||||||
|
opts.AllowOverlappingCompaction = true
|
||||||
|
|
||||||
|
db, err := Open(dir, nil, nil, opts, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
db.DisableCompactions()
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
})
|
||||||
|
|
||||||
|
series1 := labels.FromStrings("foo", "bar1")
|
||||||
|
var allSamples []tsdbutil.Sample
|
||||||
|
addSamples := func(fromMins, toMins int64) {
|
||||||
|
app := db.Appender(context.Background())
|
||||||
|
for min := fromMins; min <= toMins; min++ {
|
||||||
|
ts := min * time.Minute.Milliseconds()
|
||||||
|
_, err := app.Append(0, series1, ts, float64(ts))
|
||||||
|
require.NoError(t, err)
|
||||||
|
allSamples = append(allSamples, sample{t: ts, v: float64(ts)})
|
||||||
|
}
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
}
|
||||||
|
|
||||||
|
// In-order samples.
|
||||||
|
addSamples(290, 300)
|
||||||
|
// OOO samples.
|
||||||
|
addSamples(250, 260)
|
||||||
|
|
||||||
|
verifySamples := func(expSamples []tsdbutil.Sample) {
|
||||||
|
sort.Slice(expSamples, func(i, j int) bool {
|
||||||
|
return expSamples[i].T() < expSamples[j].T()
|
||||||
|
})
|
||||||
|
|
||||||
|
expRes := map[string][]tsdbutil.Sample{
|
||||||
|
series1.String(): expSamples,
|
||||||
|
}
|
||||||
|
|
||||||
|
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||||
|
require.Equal(t, expRes, actRes)
|
||||||
|
}
|
||||||
|
|
||||||
|
verifySamples(allSamples)
|
||||||
|
|
||||||
|
// Restart DB with OOO disabled.
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
opts.OutOfOrderTimeWindow = 0
|
||||||
|
db, err = Open(db.dir, nil, nil, opts, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// We can still query OOO samples when OOO is disabled.
|
||||||
|
verifySamples(allSamples)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue