Synchronous memory appends and more fine-grained storage locks.

This does two things:

1) Make TieredStorage.AppendSamples() write directly to memory instead of
   buffering to a channel first. This is needed in cases where a rule might
   immediately need the data generated by a previous rule.

2) Replace the single storage mutex by two new ones:
   - memoryMutex - needs to be locked at any time that two concurrent
                   goroutines could be accessing (via read or write) the
                   TieredStorage memoryArena.
   - memoryDeleteMutex - used to prevent any deletion of samples from
                         memoryArena as long as renderView is running and
                         assembling data from it.
   The LevelDB disk storage does not need to be protected by a mutex when
   rendering a view since renderView works off a LevelDB snapshot.

The rationale against adding memoryMutex directly to the memory storage: taking
a mutex does come with a small inherent time cost, and taking it is only
required in few places. In fact, no locking is required for the memory storage
instance which is part of a view (and not the TieredStorage).
This commit is contained in:
Julius Volz 2013-05-08 15:30:27 +02:00 committed by Julius Volz
parent 76521c3ff0
commit ce1ee444f1
5 changed files with 46 additions and 75 deletions

View File

@ -133,7 +133,7 @@ func main() {
log.Fatalf("Error loading configuration from %s: %v", *configFile, err)
}
ts, err := metric.NewTieredStorage(uint(*memoryAppendQueueCapacity), uint(*diskAppendQueueCapacity), 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath)
ts, err := metric.NewTieredStorage(uint(*diskAppendQueueCapacity), 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath)
if err != nil {
log.Fatalf("Error opening storage: %s", err)
}

View File

@ -33,7 +33,7 @@ type MetricPersistence interface {
// Record a new sample in the storage layer.
AppendSample(model.Sample) error
// Record a new sample in the storage layer.
// Record a group of new samples in the storage layer.
AppendSamples(model.Samples) error
// Get all of the metric fingerprints that are associated with the provided

View File

@ -26,10 +26,6 @@ var (
testInstant = time.Date(1972, 7, 18, 19, 5, 45, 0, usEastern).In(time.UTC)
)
const (
appendQueueSize = 100
)
func testAppendSample(p MetricPersistence, s model.Sample, t test.Tester) {
err := p.AppendSample(s)
if err != nil {
@ -90,7 +86,7 @@ func (t testTieredStorageCloser) Close() {
func NewTestTieredStorage(t test.Tester) (storage *TieredStorage, closer test.Closer) {
var directory test.TemporaryDirectory
directory = test.NewTemporaryDirectory("test_tiered_storage", t)
storage, err := NewTieredStorage(appendQueueSize, 2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, directory.Path())
storage, err := NewTieredStorage(2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, directory.Path())
if err != nil {
if storage != nil {

View File

@ -60,13 +60,17 @@ type TieredStorage struct {
DiskStorage *LevelDBMetricPersistence
appendToDiskQueue chan model.Samples
appendToMemoryQueue chan model.Samples
diskFrontier *diskFrontier
draining chan chan bool
flushMemoryInterval time.Duration
memoryArena memorySeriesStorage
memoryTTL time.Duration
mutex sync.Mutex
// This mutex manages any concurrent reads/writes of the memoryArena.
memoryMutex sync.RWMutex
// This mutex blocks only deletions from the memoryArena. It is held for a
// potentially long time for an entire renderView() duration, since we depend
// on no samples being removed from memory after grabbing a LevelDB snapshot.
memoryDeleteMutex sync.RWMutex
viewQueue chan viewJob
writeMemoryInterval time.Duration
}
@ -79,7 +83,7 @@ type viewJob struct {
err chan error
}
func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, writeMemoryInterval, memoryTTL time.Duration, root string) (storage *TieredStorage, err error) {
func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, writeMemoryInterval, memoryTTL time.Duration, root string) (storage *TieredStorage, err error) {
diskStorage, err := NewLevelDBMetricPersistence(root)
if err != nil {
return
@ -87,7 +91,6 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu
storage = &TieredStorage{
appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth),
appendToMemoryQueue: make(chan model.Samples, appendToMemoryQueueDepth),
DiskStorage: diskStorage,
draining: make(chan chan bool),
flushMemoryInterval: flushMemoryInterval,
@ -100,12 +103,14 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu
}
// Enqueues Samples for storage.
func (t *TieredStorage) AppendSamples(s model.Samples) (err error) {
func (t *TieredStorage) AppendSamples(samples model.Samples) (err error) {
if len(t.draining) > 0 {
return fmt.Errorf("Storage is in the process of draining.")
}
t.appendToMemoryQueue <- s
t.memoryMutex.Lock()
t.memoryArena.AppendSamples(samples)
t.memoryMutex.Unlock()
return
}
@ -175,8 +180,6 @@ func (t *TieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) {
func (t *TieredStorage) Serve() {
flushMemoryTicker := time.NewTicker(t.flushMemoryInterval)
defer flushMemoryTicker.Stop()
writeMemoryTicker := time.NewTicker(t.writeMemoryInterval)
defer writeMemoryTicker.Stop()
reportTicker := time.NewTicker(time.Second)
defer reportTicker.Stop()
@ -188,14 +191,12 @@ func (t *TieredStorage) Serve() {
for {
select {
case <-writeMemoryTicker.C:
t.writeMemory()
case <-flushMemoryTicker.C:
t.flushMemory()
case viewRequest := <-t.viewQueue:
t.renderView(viewRequest)
case drainingDone := <-t.draining:
t.flush()
t.Flush()
drainingDone <- true
return
}
@ -206,33 +207,12 @@ func (t *TieredStorage) reportQueues() {
queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "occupancy"}, float64(len(t.appendToDiskQueue)))
queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "capacity"}, float64(cap(t.appendToDiskQueue)))
queueSizes.Set(map[string]string{"queue": "append_to_memory", "facet": "occupancy"}, float64(len(t.appendToMemoryQueue)))
queueSizes.Set(map[string]string{"queue": "append_to_memory", "facet": "capacity"}, float64(cap(t.appendToMemoryQueue)))
queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "occupancy"}, float64(len(t.viewQueue)))
queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "capacity"}, float64(cap(t.viewQueue)))
}
func (t *TieredStorage) writeMemory() {
begin := time.Now()
defer func() {
duration := time.Since(begin)
recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: writeMemory, result: failure})
}()
t.mutex.Lock()
defer t.mutex.Unlock()
pendingLength := len(t.appendToMemoryQueue)
for i := 0; i < pendingLength; i++ {
t.memoryArena.AppendSamples(<-t.appendToMemoryQueue)
}
}
func (t *TieredStorage) Flush() {
t.flush()
t.flushMemory()
}
func (t *TieredStorage) Close() {
@ -242,31 +222,23 @@ func (t *TieredStorage) Close() {
t.memoryArena.Close()
close(t.appendToDiskQueue)
close(t.appendToMemoryQueue)
close(t.viewQueue)
log.Println("Done.")
}
// Write all pending appends.
func (t *TieredStorage) flush() (err error) {
// Trim any old values to reduce iterative write costs.
t.flushMemory()
t.writeMemory()
t.flushMemory()
return
}
type memoryToDiskFlusher struct {
toDiskQueue chan model.Samples
disk MetricPersistence
olderThan time.Time
valuesAccepted int
valuesRejected int
toDiskQueue chan model.Samples
disk MetricPersistence
olderThan time.Time
valuesAccepted int
valuesRejected int
memoryDeleteMutex *sync.RWMutex
}
type memoryToDiskFlusherVisitor struct {
stream stream
flusher *memoryToDiskFlusher
stream stream
flusher *memoryToDiskFlusher
memoryDeleteMutex *sync.RWMutex
}
func (f memoryToDiskFlusherVisitor) DecodeKey(in interface{}) (out interface{}, err error) {
@ -311,15 +283,18 @@ func (f memoryToDiskFlusherVisitor) Operate(key, value interface{}) (err *storag
},
}
f.memoryDeleteMutex.Lock()
f.stream.values.Delete(skipListTime(recordTime))
f.memoryDeleteMutex.Unlock()
return
}
func (f *memoryToDiskFlusher) ForStream(stream stream) (decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) {
visitor := memoryToDiskFlusherVisitor{
stream: stream,
flusher: f,
stream: stream,
flusher: f,
memoryDeleteMutex: f.memoryDeleteMutex,
}
return visitor, visitor, visitor
@ -338,7 +313,7 @@ func (f memoryToDiskFlusher) Close() {
f.Flush()
}
// Persist a whole bunch of samples to the datastore.
// Persist a whole bunch of samples from memory to the datastore.
func (t *TieredStorage) flushMemory() {
begin := time.Now()
defer func() {
@ -347,13 +322,14 @@ func (t *TieredStorage) flushMemory() {
recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: flushMemory, result: failure})
}()
t.mutex.Lock()
defer t.mutex.Unlock()
t.memoryMutex.RLock()
defer t.memoryMutex.RUnlock()
flusher := &memoryToDiskFlusher{
disk: t.DiskStorage,
olderThan: time.Now().Add(-1 * t.memoryTTL),
toDiskQueue: t.appendToDiskQueue,
disk: t.DiskStorage,
olderThan: time.Now().Add(-1 * t.memoryTTL),
toDiskQueue: t.appendToDiskQueue,
memoryDeleteMutex: &t.memoryDeleteMutex,
}
defer flusher.Close()
@ -372,15 +348,14 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
recordOutcome(duration, err, map[string]string{operation: renderView, result: success}, map[string]string{operation: renderView, result: failure})
}()
t.mutex.Lock()
defer t.mutex.Unlock()
// No samples may be deleted from memory while rendering a view.
t.memoryDeleteMutex.RLock()
defer t.memoryDeleteMutex.RUnlock()
var (
scans = viewJob.builder.ScanJobs()
view = newView()
// Get a single iterator that will be used for all data extraction below.
iterator = t.DiskStorage.MetricSamples.NewIterator(true)
)
scans := viewJob.builder.ScanJobs()
view := newView()
// Get a single iterator that will be used for all data extraction below.
iterator := t.DiskStorage.MetricSamples.NewIterator(true)
defer iterator.Close()
// Rebuilding of the frontier should happen on a conditional basis if a
@ -410,7 +385,9 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
targetTime := *standingOps[0].CurrentTime()
currentChunk := chunk{}
t.memoryMutex.RLock()
memValues := t.memoryArena.GetValueAtTime(scanJob.fingerprint, targetTime)
t.memoryMutex.RUnlock()
// If we aimed before the oldest value in memory, load more data from disk.
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && seriesFrontier != nil {
// XXX: For earnest performance gains analagous to the benchmarking we

View File

@ -347,8 +347,6 @@ func testMakeView(t test.Tester, flushToDisk bool) {
if flushToDisk {
tiered.Flush()
} else {
tiered.writeMemory()
}
requestBuilder := NewViewRequestBuilder()