Make the persist queue length configurable.

Also, set a much higher default value.

Chunk persist requests can be quite spiky. If you collect a large
number of time series that are very similar, they will tend to finish
up a chunk at about the same time. There is no reason we need to back
up scraping just because of that. The rationale of the new default
value is "1/8 of the chunks in memory".
This commit is contained in:
beorn7 2015-02-06 14:54:53 +01:00
parent 198ac9538b
commit d2ab49c396
2 changed files with 19 additions and 22 deletions

10
main.go
View File

@ -47,7 +47,7 @@ var (
alertmanagerURL = flag.String("alertmanager.url", "", "The URL of the alert manager to send notifications to.") alertmanagerURL = flag.String("alertmanager.url", "", "The URL of the alert manager to send notifications to.")
notificationQueueCapacity = flag.Int("alertmanager.notification-queue-capacity", 100, "The capacity of the queue for pending alert manager notifications.") notificationQueueCapacity = flag.Int("alertmanager.notification-queue-capacity", 100, "The capacity of the queue for pending alert manager notifications.")
metricsStoragePath = flag.String("storage.local.path", "/tmp/metrics", "Base path for metrics storage.") persistenceStoragePath = flag.String("storage.local.path", "/tmp/metrics", "Base path for metrics storage.")
remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.") remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.")
remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.") remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.")
@ -56,7 +56,8 @@ var (
numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.") numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")
storageRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.") persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.")
persistenceQueueCapacity = flag.Int("storage.local.persistence-queue-capacity", 128*1024, "How many chunks can be waiting for being persisted before sample ingestion will stop.")
checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.") checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.")
checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.") checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.")
@ -116,8 +117,9 @@ func NewPrometheus() *prometheus {
o := &local.MemorySeriesStorageOptions{ o := &local.MemorySeriesStorageOptions{
MemoryChunks: *numMemoryChunks, MemoryChunks: *numMemoryChunks,
PersistenceStoragePath: *metricsStoragePath, PersistenceStoragePath: *persistenceStoragePath,
PersistenceRetentionPeriod: *storageRetentionPeriod, PersistenceRetentionPeriod: *persistenceRetentionPeriod,
PersistenceQueueCapacity: *persistenceQueueCapacity,
CheckpointInterval: *checkpointInterval, CheckpointInterval: *checkpointInterval,
CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit, CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit,
Dirty: *storageDirty, Dirty: *storageDirty,

View File

@ -29,7 +29,6 @@ import (
) )
const ( const (
persistQueueCap = 1024
evictRequestsCap = 1024 evictRequestsCap = 1024
chunkLen = 1024 chunkLen = 1024
@ -82,6 +81,7 @@ type memorySeriesStorage struct {
persistLatency prometheus.Summary persistLatency prometheus.Summary
persistErrors prometheus.Counter persistErrors prometheus.Counter
persistQueueCapacity prometheus.Metric
persistQueueLength prometheus.Gauge persistQueueLength prometheus.Gauge
numSeries prometheus.Gauge numSeries prometheus.Gauge
seriesOps *prometheus.CounterVec seriesOps *prometheus.CounterVec
@ -97,6 +97,7 @@ type MemorySeriesStorageOptions struct {
MemoryChunks int // How many chunks to keep in memory. MemoryChunks int // How many chunks to keep in memory.
PersistenceStoragePath string // Location of persistence files. PersistenceStoragePath string // Location of persistence files.
PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged. PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged.
PersistenceQueueCapacity int // Capacity of queue for chunks to be persisted.
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks. CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint. CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
Dirty bool // Force the storage to consider itself dirty on startup. Dirty bool // Force the storage to consider itself dirty on startup.
@ -134,7 +135,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
checkpointInterval: o.CheckpointInterval, checkpointInterval: o.CheckpointInterval,
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
persistQueue: make(chan persistRequest, persistQueueCap), persistQueue: make(chan persistRequest, o.PersistenceQueueCapacity),
persistStopped: make(chan struct{}), persistStopped: make(chan struct{}),
persistence: p, persistence: p,
@ -157,6 +158,14 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
Name: "persist_errors_total", Name: "persist_errors_total",
Help: "The total number of errors while persisting chunks.", Help: "The total number of errors while persisting chunks.",
}), }),
persistQueueCapacity: prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "persist_queue_capacity"),
"The total capacity of the persist queue.",
nil, nil,
),
prometheus.GaugeValue, float64(o.PersistenceQueueCapacity),
),
persistQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{ persistQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
@ -837,32 +846,19 @@ func (s *memorySeriesStorage) loadChunkDescs(fp clientmodel.Fingerprint, beforeT
return s.persistence.loadChunkDescs(fp, beforeTime) return s.persistence.loadChunkDescs(fp, beforeTime)
} }
// To expose persistQueueCap as metric:
var (
persistQueueCapDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "persist_queue_capacity"),
"The total capacity of the persist queue.",
nil, nil,
)
persistQueueCapGauge = prometheus.MustNewConstMetric(
persistQueueCapDesc, prometheus.GaugeValue, persistQueueCap,
)
)
// Describe implements prometheus.Collector. // Describe implements prometheus.Collector.
func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s.persistence.Describe(ch) s.persistence.Describe(ch)
ch <- s.persistLatency.Desc() ch <- s.persistLatency.Desc()
ch <- s.persistErrors.Desc() ch <- s.persistErrors.Desc()
ch <- s.persistQueueCapacity.Desc()
ch <- s.persistQueueLength.Desc() ch <- s.persistQueueLength.Desc()
ch <- s.numSeries.Desc() ch <- s.numSeries.Desc()
s.seriesOps.Describe(ch) s.seriesOps.Describe(ch)
ch <- s.ingestedSamplesCount.Desc() ch <- s.ingestedSamplesCount.Desc()
ch <- s.invalidPreloadRequestsCount.Desc() ch <- s.invalidPreloadRequestsCount.Desc()
ch <- persistQueueCapDesc
ch <- numMemChunksDesc ch <- numMemChunksDesc
} }
@ -872,14 +868,13 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
ch <- s.persistLatency ch <- s.persistLatency
ch <- s.persistErrors ch <- s.persistErrors
ch <- s.persistQueueCapacity
ch <- s.persistQueueLength ch <- s.persistQueueLength
ch <- s.numSeries ch <- s.numSeries
s.seriesOps.Collect(ch) s.seriesOps.Collect(ch)
ch <- s.ingestedSamplesCount ch <- s.ingestedSamplesCount
ch <- s.invalidPreloadRequestsCount ch <- s.invalidPreloadRequestsCount
ch <- persistQueueCapGauge
count := atomic.LoadInt64(&numMemChunks) count := atomic.LoadInt64(&numMemChunks)
ch <- prometheus.MustNewConstMetric(numMemChunksDesc, prometheus.GaugeValue, float64(count)) ch <- prometheus.MustNewConstMetric(numMemChunksDesc, prometheus.GaugeValue, float64(count))
} }