diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 5e43cbd1b..2c26cebf3 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -521,7 +521,6 @@ func TestDB_Snapshot(t *testing.T) { testutil.Ok(t, err) } testutil.Ok(t, app.Commit()) - testutil.Ok(t, app.Rollback()) // create snapshot snap, err := ioutil.TempDir("", "snap") @@ -571,7 +570,6 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) { testutil.Ok(t, err) } testutil.Ok(t, app.Commit()) - testutil.Ok(t, app.Rollback()) snap, err := ioutil.TempDir("", "snap") testutil.Ok(t, err) @@ -939,10 +937,14 @@ func TestWALSegmentSizeOptions(t *testing.T) { opts.WALSegmentSize = segmentSize db := openTestDB(t, opts, nil) - app := db.Appender() for i := int64(0); i < 155; i++ { - _, err := app.Add(labels.Labels{labels.Label{Name: "wal", Value: "size"}}, i, rand.Float64()) + app := db.Appender() + ref, err := app.Add(labels.Labels{labels.Label{Name: "wal" + fmt.Sprintf("%d", i), Value: "size"}}, i, rand.Float64()) testutil.Ok(t, err) + for j := int64(1); j <= 78; j++ { + err := app.AddFast(ref, i+j, rand.Float64()) + testutil.Ok(t, err) + } testutil.Ok(t, app.Commit()) } @@ -2325,6 +2327,7 @@ func TestBlockRanges(t *testing.T) { // Test that wal records are skipped when an existing block covers the same time ranges // and compaction doesn't create an overlapping block. + app = db.Appender() db.DisableCompactions() _, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64()) testutil.Ok(t, err) diff --git a/tsdb/head.go b/tsdb/head.go index 1c547ffe6..066c722ea 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -44,6 +44,9 @@ var ( // ErrInvalidSample is returned if an appended sample is not valid and can't // be ingested. ErrInvalidSample = errors.New("invalid sample") + // ErrAppenderClosed is returned if an appender has already be successfully + // rolled back or commited. + ErrAppenderClosed = errors.New("appender closed") ) // Head handles reads and writes of time series data within a time window. @@ -1093,6 +1096,7 @@ type headAppender struct { sampleSeries []*memSeries appendID, cleanupAppendIDsBelow uint64 + closed bool } func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { @@ -1193,7 +1197,11 @@ func (a *headAppender) log() error { return nil } -func (a *headAppender) Commit() error { +func (a *headAppender) Commit() (err error) { + if a.closed { + return ErrAppenderClosed + } + defer func() { a.closed = true }() if err := a.log(); err != nil { //nolint: errcheck a.Rollback() // Most likely the same error will happen again. @@ -1231,7 +1239,11 @@ func (a *headAppender) Commit() error { return nil } -func (a *headAppender) Rollback() error { +func (a *headAppender) Rollback() (err error) { + if a.closed { + return ErrAppenderClosed + } + defer func() { a.closed = true }() defer a.head.metrics.activeAppenders.Dec() defer a.head.iso.closeAppend(a.appendID) defer a.head.putSeriesBuffer(a.sampleSeries) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 2432ae060..368ed8c8c 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1852,3 +1852,38 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { }) } } + +func TestErrReuseAppender(t *testing.T) { + head, _ := newTestHead(t, 1000, false) + defer func() { + testutil.Ok(t, head.Close()) + }() + + app := head.Appender() + _, err := app.Add(labels.Labels{{Name: "test", Value: "test"}}, 0, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.NotOk(t, app.Commit()) + testutil.NotOk(t, app.Rollback()) + + app = head.Appender() + _, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 1, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Rollback()) + testutil.NotOk(t, app.Rollback()) + testutil.NotOk(t, app.Commit()) + + app = head.Appender() + _, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 2, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.NotOk(t, app.Rollback()) + testutil.NotOk(t, app.Commit()) + + app = head.Appender() + _, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 3, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Rollback()) + testutil.NotOk(t, app.Commit()) + testutil.NotOk(t, app.Rollback()) +}