Start a new WAL segement on head truncation. (#605)
This reduces disk space usage to not be a minimum of 3 128MB files in small setups. This will possibly also help debug wal data issues, by making things a bit more deterministic. Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
parent
48c4dc63ea
commit
be4edbe174
6
head.go
6
head.go
|
@ -576,6 +576,12 @@ func (h *Head) Truncate(mint int64) (err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "get segment range")
|
return errors.Wrap(err, "get segment range")
|
||||||
}
|
}
|
||||||
|
// Start a new segment, so low ingestion volume TSDB don't have more WAL than
|
||||||
|
// needed.
|
||||||
|
err = h.wal.NextSegment()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "next segment")
|
||||||
|
}
|
||||||
last-- // Never consider last segment for checkpoint.
|
last-- // Never consider last segment for checkpoint.
|
||||||
if last < 0 {
|
if last < 0 {
|
||||||
return nil // no segments yet.
|
return nil // no segments yet.
|
||||||
|
|
37
head_test.go
37
head_test.go
|
@ -1128,3 +1128,40 @@ func TestWalRepair(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNewWalSegmentOnTruncate(t *testing.T) {
|
||||||
|
dir, err := ioutil.TempDir("", "test_wal_segemnts")
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer func() {
|
||||||
|
testutil.Ok(t, os.RemoveAll(dir))
|
||||||
|
}()
|
||||||
|
wlog, err := wal.NewSize(nil, nil, dir, 32768)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
h, err := NewHead(nil, nil, wlog, 1000)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer h.Close()
|
||||||
|
add := func(ts int64) {
|
||||||
|
app := h.Appender()
|
||||||
|
_, err := app.Add(labels.Labels{{"a", "b"}}, ts, 0)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Ok(t, app.Commit())
|
||||||
|
}
|
||||||
|
|
||||||
|
add(0)
|
||||||
|
_, last, err := wlog.Segments()
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Equals(t, 0, last)
|
||||||
|
|
||||||
|
add(1)
|
||||||
|
testutil.Ok(t, h.Truncate(1))
|
||||||
|
_, last, err = wlog.Segments()
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Equals(t, 1, last)
|
||||||
|
|
||||||
|
add(2)
|
||||||
|
testutil.Ok(t, h.Truncate(2))
|
||||||
|
_, last, err = wlog.Segments()
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Equals(t, 2, last)
|
||||||
|
}
|
||||||
|
|
|
@ -393,6 +393,13 @@ func SegmentName(dir string, i int) string {
|
||||||
return filepath.Join(dir, fmt.Sprintf("%08d", i))
|
return filepath.Join(dir, fmt.Sprintf("%08d", i))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NextSegment creates the next segment and closes the previous one.
|
||||||
|
func (w *WAL) NextSegment() error {
|
||||||
|
w.mtx.Lock()
|
||||||
|
defer w.mtx.Unlock()
|
||||||
|
return w.nextSegment()
|
||||||
|
}
|
||||||
|
|
||||||
// nextSegment creates the next segment and closes the previous one.
|
// nextSegment creates the next segment and closes the previous one.
|
||||||
func (w *WAL) nextSegment() error {
|
func (w *WAL) nextSegment() error {
|
||||||
// Only flush the current page if it actually holds data.
|
// Only flush the current page if it actually holds data.
|
||||||
|
|
Loading…
Reference in New Issue