Merge pull request #95 from Gouthamve/wal-ahead
Fix race condition for 2 appenders having same ts
This commit is contained in:
commit
9963a4c7c3
23
head.go
23
head.go
|
@ -521,15 +521,17 @@ func (a *headAppender) AddFast(ref string, t int64, v float64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (a *headAppender) createSeries() {
|
||||
func (a *headAppender) createSeries() error {
|
||||
if len(a.newSeries) == 0 {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
a.newLabels = make([]labels.Labels, 0, len(a.newSeries))
|
||||
base0 := len(a.series)
|
||||
|
||||
a.mtx.RUnlock()
|
||||
defer a.mtx.RLock()
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
base1 := len(a.series)
|
||||
|
||||
|
@ -549,8 +551,12 @@ func (a *headAppender) createSeries() {
|
|||
a.create(l.hash, l.labels)
|
||||
}
|
||||
|
||||
a.mtx.Unlock()
|
||||
a.mtx.RLock()
|
||||
// Write all new series to the WAL.
|
||||
if err := a.wal.LogSeries(a.newLabels); err != nil {
|
||||
return errors.Wrap(err, "WAL log series")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *headAppender) Commit() error {
|
||||
|
@ -558,7 +564,9 @@ func (a *headAppender) Commit() error {
|
|||
defer putHeadAppendBuffer(a.samples)
|
||||
defer a.mtx.RUnlock()
|
||||
|
||||
a.createSeries()
|
||||
if err := a.createSeries(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We have to update the refs of samples for series we just created.
|
||||
for i := range a.samples {
|
||||
|
@ -568,11 +576,8 @@ func (a *headAppender) Commit() error {
|
|||
}
|
||||
}
|
||||
|
||||
// Write all new series and samples to the WAL and add it to the
|
||||
// Write all new samples to the WAL and add them to the
|
||||
// in-mem database on success.
|
||||
if err := a.wal.LogSeries(a.newLabels); err != nil {
|
||||
return errors.Wrap(err, "WAL log series")
|
||||
}
|
||||
if err := a.wal.LogSamples(a.samples); err != nil {
|
||||
return errors.Wrap(err, "WAL log samples")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue