diff --git a/db.go b/db.go index d9c9506f4..32732f6a1 100644 --- a/db.go +++ b/db.go @@ -38,19 +38,24 @@ type Options struct { WALFlushInterval time.Duration } -// Appender allows committing batches of samples to a database. -// The data held by the appender is reset after Commit returndb. +// Appender allows appending a batch of data. It must be completed with a +// call to Commit or Rollback and must not be reused afterwards. type Appender interface { - // SetSeries registers a new known series label set with the appender - // and returns a reference number used to add samples to it over the - // life time of the Appender. + // SetSeries ensures that a series with the given label set exists and + // returns a unique reference number identifying it. Returned reference + // numbers are ephemeral and may be rejected in calls to Add() at any point. + // A new reference number can then be requested with another call to + // SetSeries. SetSeries(labels.Labels) (uint64, error) - // Add adds a sample pair for the referenced seriedb. + // Add adds a sample pair for the referenced serie. Add(ref uint64, t int64, v float64) error // Commit submits the collected samples and purges the batch. Commit() error + + // Rollback rolls back all modifications made in the appender so far. + Rollback() error } type hashedSample struct { @@ -382,6 +387,12 @@ func (a *dbAppender) Commit() error { return err } +func (a *dbAppender) Rollback() error { + err := a.head.Rollback() + a.db.mtx.RUnlock() + return err +} + func (db *DB) headForDir(dir string) (int, bool) { for i, b := range db.heads { if b.Dir() == dir { @@ -646,13 +657,21 @@ func (a *partitionedAppender) Add(ref uint64, t int64, v float64) error { func (a *partitionedAppender) Commit() error { var merr MultiError - // Spill buckets into partitiondb. for _, p := range a.partitions { merr.Add(p.Commit()) } return merr.Err() } +func (a *partitionedAppender) Rollback() error { + var merr MultiError + + for _, p := range a.partitions { + merr.Add(p.Rollback()) + } + return merr.Err() +} + // The MultiError type implements the error interface, and contains the // Errors used to construct it. type MultiError []error diff --git a/head.go b/head.go index ce89fca7b..42dda760f 100644 --- a/head.go +++ b/head.go @@ -266,6 +266,7 @@ func (a *headAppender) Commit() error { } func (a *headAppender) Rollback() error { + putHeadAppendBuffer(a.samples) a.mtx.RUnlock() return nil }