Adaptively reduce the wait time for memory series maintenance.

This will make in-memory series maintenance the faster the more chunks
are waiting for persistence.
This commit is contained in:
beorn7 2015-04-01 17:52:03 +02:00
parent b456240c46
commit 3035b8bfdd

View File

@ -32,9 +32,10 @@ const (
chunkLen = 1024
// See waitForNextFP.
fpMaxWaitDuration = 10 * time.Second
fpMaxSweepTime = 6 * time.Hour
fpMaxWaitDuration = 10 * time.Second
// See waitForNextFP.
maxEvictInterval = time.Minute
// If numChunskToPersist is this percentage of maxChunksToPersist, we
@ -532,21 +533,31 @@ func (s *memorySeriesStorage) maybeEvict() {
// another fingerprint so that we will process all fingerprints in a tenth of
// s.dropAfter assuming that the system is doing nothing else, e.g. if we want
// to drop chunks after 40h, we want to cycle through all fingerprints within
// 4h. However, the maximum sweep time is capped at fpMaxSweepTime. If
// s.loopStopped is closed, it will return false immediately. The estimation is
// based on the total number of fingerprints as passed in.
func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool {
// 4h. The estimation is based on the total number of fingerprints as passed
// in. However, the maximum sweep time is capped at fpMaxSweepTime. Also, the
// method will never wait for longer than fpMaxWaitDuration.
//
// The maxWaitDurationFactor can be used to reduce the waiting time if a faster
// processing is required (for example because unpersisted chunks pile up too
// much).
//
// Normally, the method returns true once the wait duration has passed. However,
// if s.loopStopped is closed, it will return false immediately.
func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int, maxWaitDurationFactor float64) bool {
d := fpMaxWaitDuration
if numberOfFPs != 0 {
sweepTime := s.dropAfter / 10
if sweepTime > fpMaxSweepTime {
sweepTime = fpMaxSweepTime
}
d = sweepTime / time.Duration(numberOfFPs)
if d > fpMaxWaitDuration {
d = fpMaxWaitDuration
calculatedWait := time.Duration(float64(sweepTime) / float64(numberOfFPs) * maxWaitDurationFactor)
if calculatedWait < d {
d = calculatedWait
}
}
if d == 0 {
return true
}
t := time.NewTimer(d)
select {
case <-t.C:
@ -575,7 +586,7 @@ func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan clientmodel.
for {
// Initial wait, also important if there are no FPs yet.
if !s.waitForNextFP(s.fpToSeries.length()) {
if !s.waitForNextFP(s.fpToSeries.length(), 1) {
return
}
begin := time.Now()
@ -587,7 +598,8 @@ func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan clientmodel.
case <-s.loopStopping:
return
}
s.waitForNextFP(s.fpToSeries.length())
// Reduce the wait time by the backlog score.
s.waitForNextFP(s.fpToSeries.length(), s.persistenceBacklogScore())
count++
}
if count > 0 {
@ -616,11 +628,11 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode
)
if err != nil {
glog.Error("Failed to lookup archived fingerprint ranges: ", err)
s.waitForNextFP(0)
s.waitForNextFP(0, 1)
continue
}
// Initial wait, also important if there are no FPs yet.
if !s.waitForNextFP(len(archivedFPs)) {
if !s.waitForNextFP(len(archivedFPs), 1) {
return
}
begin := time.Now()
@ -630,7 +642,8 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode
case <-s.loopStopping:
return
}
s.waitForNextFP(len(archivedFPs))
// Never speed up maintenance of archived FPs.
s.waitForNextFP(len(archivedFPs), 1)
}
if len(archivedFPs) > 0 {
glog.Infof(
@ -945,7 +958,7 @@ func (s *memorySeriesStorage) isDegraded() bool {
glog.Warning("Storage has left graceful degradation mode. Things are back to normal.")
} else if !s.degraded && nowDegraded {
glog.Warningf(
"%d chunks waiting for persistence (%d%% of the allowed maximum %d). Storage is now in graceful degradation mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v.",
"%d chunks waiting for persistence (%d%% of the allowed maximum %d). Storage is now in graceful degradation mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible.",
s.getNumChunksToPersist(),
s.getNumChunksToPersist()*100/s.maxChunksToPersist,
s.maxChunksToPersist,
@ -955,6 +968,18 @@ func (s *memorySeriesStorage) isDegraded() bool {
return s.degraded
}
// persistenceBacklogScore works similar to isDegraded, but returns a score
// about how close we are to degradation. This score is 1.0 if no chunks are
// waiting for persistence and 0.0 if we are at or above the degradation
// threshold.
func (s *memorySeriesStorage) persistenceBacklogScore() float64 {
score := 1 - float64(s.getNumChunksToPersist())/float64(s.maxChunksToPersist*percentChunksToPersistForDegradation/100)
if score < 0 {
return 0
}
return score
}
// Describe implements prometheus.Collector.
func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s.persistence.Describe(ch)