From 47ce7ad302c42fb1c68feac94e596c4941435469 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Thu, 14 Mar 2013 18:09:19 -0700 Subject: [PATCH] Extract appending from goroutine. --- storage/metric/leveldb.go | 80 ++++++++++++++++++--------------------- 1 file changed, 37 insertions(+), 43 deletions(-) diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 029bd493c..3a18200f8 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -540,7 +540,6 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err var ( fingerprintToSamples = groupByFingerprint(samples) indexErrChan = make(chan error) - doneCommitting sync.WaitGroup ) go func(groups map[model.Fingerprint]model.Samples) { @@ -555,55 +554,50 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err indexErrChan <- l.indexMetrics(metrics) }(fingerprintToSamples) - go func() { - doneCommitting.Add(1) - samplesBatch := leveldb.NewBatch() - defer samplesBatch.Close() - defer doneCommitting.Done() + samplesBatch := leveldb.NewBatch() + defer samplesBatch.Close() - for fingerprint, group := range fingerprintToSamples { - for { - lengthOfGroup := len(group) + for fingerprint, group := range fingerprintToSamples { + for { + lengthOfGroup := len(group) - if lengthOfGroup == 0 { - break - } - - take := *leveldbChunkSize - if lengthOfGroup < take { - take = lengthOfGroup - } - - chunk := group[0:take] - group = group[take:lengthOfGroup] - - key := &dto.SampleKey{ - Fingerprint: fingerprint.ToDTO(), - Timestamp: indexable.EncodeTime(chunk[0].Timestamp), - LastTimestamp: proto.Int64(chunk[take-1].Timestamp.Unix()), - SampleCount: proto.Uint32(uint32(take)), - } - - value := &dto.SampleValueSeries{} - for _, sample := range chunk { - value.Value = append(value.Value, &dto.SampleValueSeries_Value{ - Timestamp: proto.Int64(sample.Timestamp.Unix()), - Value: proto.Float32(float32(sample.Value)), - }) - } - - samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + if lengthOfGroup == 0 { + break } + + take := *leveldbChunkSize + if lengthOfGroup < take { + take = lengthOfGroup + } + + chunk := group[0:take] + group = group[take:lengthOfGroup] + + key := &dto.SampleKey{ + Fingerprint: fingerprint.ToDTO(), + Timestamp: indexable.EncodeTime(chunk[0].Timestamp), + LastTimestamp: proto.Int64(chunk[take-1].Timestamp.Unix()), + SampleCount: proto.Uint32(uint32(take)), + } + + value := &dto.SampleValueSeries{} + for _, sample := range chunk { + value.Value = append(value.Value, &dto.SampleValueSeries_Value{ + Timestamp: proto.Int64(sample.Timestamp.Unix()), + Value: proto.Float32(float32(sample.Value)), + }) + } + + samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) } + } - err = l.metricSamples.Commit(samplesBatch) + err = l.metricSamples.Commit(samplesBatch) - if err != nil { - panic(err) - } - }() + if err != nil { + panic(err) + } - doneCommitting.Wait() err = <-indexErrChan if err != nil { panic(err)