Add a heuristics to checkpoint early if there are many "dirty" series..

This commit is contained in:
Bjoern Rabenstein 2015-01-08 20:15:58 +01:00
parent 622e8350cd
commit 0851945054
2 changed files with 56 additions and 16 deletions

View File

@ -59,7 +59,8 @@ var (
storageRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.") storageRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.")
checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.") checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.")
checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.")
storageDirty = flag.Bool("storage.local.dirty", false, "If set, the local storage layer will perform crash recovery even if the last shutdown appears to be clean.") storageDirty = flag.Bool("storage.local.dirty", false, "If set, the local storage layer will perform crash recovery even if the last shutdown appears to be clean.")
@ -119,7 +120,8 @@ func NewPrometheus() *prometheus {
PersistenceStoragePath: *metricsStoragePath, PersistenceStoragePath: *metricsStoragePath,
PersistenceRetentionPeriod: *storageRetentionPeriod, PersistenceRetentionPeriod: *storageRetentionPeriod,
CheckpointInterval: *checkpointInterval, CheckpointInterval: *checkpointInterval,
Dirty: *storageDirty, CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit,
Dirty: *storageDirty,
} }
memStorage, err := local.NewMemorySeriesStorage(o) memStorage, err := local.NewMemorySeriesStorage(o)
if err != nil { if err != nil {

View File

@ -64,15 +64,18 @@ type memorySeriesStorage struct {
fpLocker *fingerprintLocker fpLocker *fingerprintLocker
fpToSeries *seriesMap fpToSeries *seriesMap
loopStopping, loopStopped chan struct{} loopStopping, loopStopped chan struct{}
maxMemoryChunks int maxMemoryChunks int
purgeAfter time.Duration purgeAfter time.Duration
checkpointInterval time.Duration checkpointInterval time.Duration
checkpointDirtySeriesLimit int
persistQueue chan persistRequest persistQueue chan persistRequest
persistStopped chan struct{} persistStopped chan struct{}
persistence *persistence persistence *persistence
countPersistedHeadChunks chan struct{}
evictList *list.List evictList *list.List
evictRequests chan evictRequest evictRequests chan evictRequest
evictStopping, evictStopped chan struct{} evictStopping, evictStopped chan struct{}
@ -95,6 +98,7 @@ type MemorySeriesStorageOptions struct {
PersistenceStoragePath string // Location of persistence files. PersistenceStoragePath string // Location of persistence files.
PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged. PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged.
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks. CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
Dirty bool // Force the storage to consider itself dirty on startup. Dirty bool // Force the storage to consider itself dirty on startup.
} }
@ -123,16 +127,19 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
fpLocker: newFingerprintLocker(256), fpLocker: newFingerprintLocker(256),
fpToSeries: fpToSeries, fpToSeries: fpToSeries,
loopStopping: make(chan struct{}), loopStopping: make(chan struct{}),
loopStopped: make(chan struct{}), loopStopped: make(chan struct{}),
maxMemoryChunks: o.MemoryChunks, maxMemoryChunks: o.MemoryChunks,
purgeAfter: o.PersistenceRetentionPeriod, purgeAfter: o.PersistenceRetentionPeriod,
checkpointInterval: o.CheckpointInterval, checkpointInterval: o.CheckpointInterval,
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
persistQueue: make(chan persistRequest, persistQueueCap), persistQueue: make(chan persistRequest, persistQueueCap),
persistStopped: make(chan struct{}), persistStopped: make(chan struct{}),
persistence: p, persistence: p,
countPersistedHeadChunks: make(chan struct{}, 1024),
evictList: list.New(), evictList: list.New(),
evictRequests: make(chan evictRequest, evictRequestsCap), evictRequests: make(chan evictRequest, evictRequestsCap),
evictStopping: make(chan struct{}), evictStopping: make(chan struct{}),
@ -363,11 +370,20 @@ func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
Timestamp: sample.Timestamp, Timestamp: sample.Timestamp,
}) })
s.fpLocker.Unlock(fp) s.fpLocker.Unlock(fp)
if len(chunkDescsToPersist) == 0 {
return
}
// Queue only outside of the locked area, processing the persistQueue // Queue only outside of the locked area, processing the persistQueue
// requires the same lock! // requires the same lock!
for _, cd := range chunkDescsToPersist { for _, cd := range chunkDescsToPersist {
s.persistQueue <- persistRequest{fp, cd} s.persistQueue <- persistRequest{fp, cd}
} }
// Count that a head chunk was persisted, but only best effort, i.e. we
// don't want to block here.
select {
case s.countPersistedHeadChunks <- struct{}{}: // Counted.
default: // Meh...
}
} }
func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries { func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries {
@ -642,10 +658,19 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode
} }
func (s *memorySeriesStorage) loop() { func (s *memorySeriesStorage) loop() {
checkpointTicker := time.NewTicker(s.checkpointInterval) checkpointTimer := time.NewTimer(s.checkpointInterval)
// We take the number of head chunks persisted since the last checkpoint
// as an approximation for the number of series that are "dirty",
// i.e. whose head chunk is different from the one in the most recent
// checkpoint or for which the fact that the head chunk has been
// persisted is not reflected in the most recent checkpoint. This count
// could overestimate the number of dirty series, but it's good enough
// as a heuristics.
headChunksPersistedSinceLastCheckpoint := 0
defer func() { defer func() {
checkpointTicker.Stop() checkpointTimer.Stop()
glog.Info("Maintenance loop stopped.") glog.Info("Maintenance loop stopped.")
close(s.loopStopped) close(s.loopStopped)
}() }()
@ -658,15 +683,21 @@ loop:
select { select {
case <-s.loopStopping: case <-s.loopStopping:
break loop break loop
case <-checkpointTicker.C: case <-checkpointTimer.C:
s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker) s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker)
headChunksPersistedSinceLastCheckpoint = 0
checkpointTimer.Reset(s.checkpointInterval)
case fp := <-memoryFingerprints: case fp := <-memoryFingerprints:
s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter)) s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter))
s.maintainSeries(fp)
s.seriesOps.WithLabelValues(memoryMaintenance).Inc() s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
case fp := <-archivedFingerprints: case fp := <-archivedFingerprints:
s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter)) s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter))
s.seriesOps.WithLabelValues(archiveMaintenance).Inc() s.seriesOps.WithLabelValues(archiveMaintenance).Inc()
case <-s.countPersistedHeadChunks:
headChunksPersistedSinceLastCheckpoint++
if headChunksPersistedSinceLastCheckpoint >= s.checkpointDirtySeriesLimit {
checkpointTimer.Reset(0)
}
} }
} }
// Wait until both channels are closed. // Wait until both channels are closed.
@ -677,7 +708,8 @@ loop:
} }
// maintainSeries closes the head chunk if not touched in a while. It archives a // maintainSeries closes the head chunk if not touched in a while. It archives a
// series if all chunks are evicted. It evicts chunkDescs if there are too many. // series if all chunks are evicted. It evicts chunkDescs if there are too
// many.
func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) { func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) {
var headChunkToPersist *chunkDesc var headChunkToPersist *chunkDesc
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
@ -687,6 +719,12 @@ func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) {
if headChunkToPersist != nil { if headChunkToPersist != nil {
s.persistQueue <- persistRequest{fp, headChunkToPersist} s.persistQueue <- persistRequest{fp, headChunkToPersist}
} }
// Count that a head chunk was persisted, but only best effort, i.e. we
// don't want to block here.
select {
case s.countPersistedHeadChunks <- struct{}{}: // Counted.
default: // Meh...
}
}() }()
series, ok := s.fpToSeries.get(fp) series, ok := s.fpToSeries.get(fp)