Merge pull request #532 from prometheus/beorn7/ingestion-tweaks

Improve persisting chunks to disk.
This commit is contained in:
Julius Volz 2015-02-17 17:31:00 +01:00
commit 79a4a6d8e8
6 changed files with 267 additions and 51 deletions

View File

@ -57,7 +57,7 @@ var (
numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")
persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.")
persistenceQueueCapacity = flag.Int("storage.local.persistence-queue-capacity", 128*1024, "How many chunks can be waiting for being persisted before sample ingestion will stop.")
persistenceQueueCapacity = flag.Int("storage.local.persistence-queue-capacity", 32*1024, "How many chunks can be waiting for being persisted before sample ingestion will stop.")
checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.")
checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.")

View File

@ -84,7 +84,7 @@ type indexingOp struct {
// A Persistence is used by a Storage implementation to store samples
// persistently across restarts. The methods are only goroutine-safe if
// explicitly marked as such below. The chunk-related methods persistChunk,
// explicitly marked as such below. The chunk-related methods persistChunks,
// dropChunks, loadChunks, and loadChunkDescs can be called concurrently with
// each other if each call refers to a different fingerprint.
type persistence struct {
@ -283,35 +283,35 @@ func (p *persistence) getLabelValuesForLabelName(ln clientmodel.LabelName) (clie
return lvs, nil
}
// persistChunk persists a single chunk of a series. It is the caller's
// responsibility to not modify the chunk concurrently and to not persist or
// drop anything for the same fingerprint concurrently. It returns the
// (zero-based) index of the persisted chunk within the series file. In case of
// an error, the returned index is -1 (to avoid the misconception that the chunk
// was written at position 0).
func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) (int, error) {
// 1. Open chunk file.
// persistChunks persists a number of consecutive chunks of a series. It is the
// caller's responsibility to not modify the chunks concurrently and to not
// persist or drop anything for the same fingerprint concurrently. It returns
// the (zero-based) index of the first persisted chunk within the series
// file. In case of an error, the returned index is -1 (to avoid the
// misconception that the chunk was written at position 0).
func (p *persistence) persistChunks(fp clientmodel.Fingerprint, chunks []chunk) (int, error) {
f, err := p.openChunkFileForWriting(fp)
if err != nil {
return -1, err
}
defer f.Close()
b := bufio.NewWriterSize(f, chunkHeaderLen+p.chunkLen)
b := bufio.NewWriterSize(f, len(chunks)*(chunkHeaderLen+p.chunkLen))
// 2. Write the header (chunk type and first/last times).
err = writeChunkHeader(b, c)
if err != nil {
return -1, err
for _, c := range chunks {
err = writeChunkHeader(b, c)
if err != nil {
return -1, err
}
err = c.marshal(b)
if err != nil {
return -1, err
}
}
// 3. Write chunk into file.
err = c.marshal(b)
if err != nil {
return -1, err
}
// 4. Determine index within the file.
// Determine index within the file.
b.Flush()
offset, err := f.Seek(0, os.SEEK_CUR)
if err != nil {
@ -322,7 +322,7 @@ func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) (int, er
return -1, err
}
return index - 1, err
return index - len(chunks), err
}
// loadChunks loads a group of chunks of a timeseries by their index. The chunk

View File

@ -83,7 +83,7 @@ func TestPersistLoadDropChunks(t *testing.T) {
for fp, chunks := range fpToChunks {
for i, c := range chunks {
index, err := p.persistChunk(fp, c)
index, err := p.persistChunks(fp, []chunk{c})
if err != nil {
t.Fatal(err)
}

View File

@ -76,9 +76,11 @@ type memorySeriesStorage struct {
appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue.
appendWaitGroup sync.WaitGroup // To wait for all appended samples to be processed.
persistQueue chan persistRequest
persistStopped chan struct{}
persistence *persistence
persistQueue chan persistRequest
persistQueueCap int // Not actually the cap of above channel. See handlePersistQueue.
persistStopped chan struct{}
persistence *persistence
countPersistedHeadChunks chan struct{}
@ -145,9 +147,13 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
appendLastTimestamp: clientmodel.Earliest,
appendQueue: make(chan *clientmodel.Sample, appendQueueCap),
persistQueue: make(chan persistRequest, o.PersistenceQueueCapacity),
persistStopped: make(chan struct{}),
persistence: p,
// The actual buffering happens within handlePersistQueue, so
// cap of persistQueue just has to be enough to not block while
// handlePersistQueue is writing to disk (20ms or so).
persistQueue: make(chan persistRequest, 1024),
persistQueueCap: o.PersistenceQueueCapacity,
persistStopped: make(chan struct{}),
persistence: p,
countPersistedHeadChunks: make(chan struct{}, 1024),
@ -568,32 +574,100 @@ func (s *memorySeriesStorage) maybeEvict() {
}
func (s *memorySeriesStorage) handlePersistQueue() {
for req := range s.persistQueue {
s.persistQueueLength.Set(float64(len(s.persistQueue)))
start := time.Now()
s.fpLocker.Lock(req.fingerprint)
offset, err := s.persistence.persistChunk(req.fingerprint, req.chunkDesc.chunk)
if series, seriesInMemory := s.fpToSeries.get(req.fingerprint); err == nil && seriesInMemory && series.chunkDescsOffset == -1 {
// This is the first chunk persisted for a newly created
// series that had prior chunks on disk. Finally, we can
// set the chunkDescsOffset.
series.chunkDescsOffset = offset
chunkMaps := chunkMaps{}
chunkCount := 0
persistMostConsecutiveChunks := func() {
fp, cds := chunkMaps.pop()
if err := s.persistChunks(fp, cds); err != nil {
// Need to put chunks back for retry.
for _, cd := range cds {
chunkMaps.add(fp, cd)
}
return
}
s.fpLocker.Unlock(req.fingerprint)
s.persistLatency.Observe(float64(time.Since(start)) / float64(time.Microsecond))
if err != nil {
s.persistErrors.Inc()
glog.Error("Error persisting chunk: ", err)
s.persistence.setDirty(true)
continue
}
req.chunkDesc.unpin(s.evictRequests)
chunkOps.WithLabelValues(persistAndUnpin).Inc()
chunkCount -= len(cds)
}
loop:
for {
if chunkCount >= s.persistQueueCap && chunkCount > 0 {
glog.Warningf("%d chunks queued for persistence. Ingestion pipeline will backlog.", chunkCount)
persistMostConsecutiveChunks()
}
select {
case req, ok := <-s.persistQueue:
if !ok {
break loop
}
chunkMaps.add(req.fingerprint, req.chunkDesc)
chunkCount++
default:
if chunkCount > 0 {
persistMostConsecutiveChunks()
continue loop
}
// If we are here, there is nothing to do right now. So
// just wait for a persist request to come in.
req, ok := <-s.persistQueue
if !ok {
break loop
}
chunkMaps.add(req.fingerprint, req.chunkDesc)
chunkCount++
}
s.persistQueueLength.Set(float64(chunkCount))
}
// Drain all requests.
for _, m := range chunkMaps {
for fp, cds := range m {
if s.persistChunks(fp, cds) == nil {
chunkCount -= len(cds)
if (chunkCount+len(cds))/1000 > chunkCount/1000 {
glog.Infof(
"Still draining persist queue, %d chunks left to persist...",
chunkCount,
)
}
s.persistQueueLength.Set(float64(chunkCount))
}
}
}
glog.Info("Persist queue drained and stopped.")
close(s.persistStopped)
}
func (s *memorySeriesStorage) persistChunks(fp clientmodel.Fingerprint, cds []*chunkDesc) error {
start := time.Now()
chunks := make([]chunk, len(cds))
for i, cd := range cds {
chunks[i] = cd.chunk
}
s.fpLocker.Lock(fp)
offset, err := s.persistence.persistChunks(fp, chunks)
if series, seriesInMemory := s.fpToSeries.get(fp); err == nil && seriesInMemory && series.chunkDescsOffset == -1 {
// This is the first chunk persisted for a newly created
// series that had prior chunks on disk. Finally, we can
// set the chunkDescsOffset.
series.chunkDescsOffset = offset
}
s.fpLocker.Unlock(fp)
s.persistLatency.Observe(float64(time.Since(start)) / float64(time.Microsecond))
if err != nil {
s.persistErrors.Inc()
glog.Error("Error persisting chunks: ", err)
s.persistence.setDirty(true)
return err
}
for _, cd := range cds {
cd.unpin(s.evictRequests)
}
chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds)))
return nil
}
// waitForNextFP waits an estimated duration, after which we want to process
// another fingerprint so that we will process all fingerprints in a tenth of
// s.purgeAfter assuming that the system is doing nothing else, e.g. if we want
@ -927,3 +1001,52 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
count := atomic.LoadInt64(&numMemChunks)
ch <- prometheus.MustNewConstMetric(numMemChunksDesc, prometheus.GaugeValue, float64(count))
}
// chunkMaps is a slice of maps with chunkDescs to be persisted.
// Each chunk map contains n consecutive chunks to persist, where
// n is the index+1.
type chunkMaps []map[clientmodel.Fingerprint][]*chunkDesc
// add adds a chunk to chunkMaps.
func (cm *chunkMaps) add(fp clientmodel.Fingerprint, cd *chunkDesc) {
// Runtime of this method is linear with the number of
// chunkMaps. However, we expect only ever very few maps.
numMaps := len(*cm)
for i, m := range *cm {
if cds, ok := m[fp]; ok {
// Found our fp! Add cd and level up.
cds = append(cds, cd)
delete(m, fp)
if i == numMaps-1 {
*cm = append(*cm, map[clientmodel.Fingerprint][]*chunkDesc{})
}
(*cm)[i+1][fp] = cds
return
}
}
// Our fp isn't contained in cm yet. Add it to the first map (and add a
// first map if there is none).
if numMaps == 0 {
*cm = chunkMaps{map[clientmodel.Fingerprint][]*chunkDesc{}}
}
(*cm)[0][fp] = []*chunkDesc{cd}
}
// pop retrieves and removes a fingerprint with all its chunks. It chooses one
// of the fingerprints with the most chunks. It panics if cm has no entries.
func (cm *chunkMaps) pop() (clientmodel.Fingerprint, []*chunkDesc) {
m := (*cm)[len(*cm)-1]
for fp, cds := range m {
delete(m, fp)
// Prune empty maps from top level.
for len(m) == 0 {
*cm = (*cm)[:len(*cm)-1]
if len(*cm) == 0 {
break
}
m = (*cm)[len(*cm)-1]
}
return fp, cds
}
panic("popped from empty chunkMaps")
}

View File

@ -16,12 +16,15 @@ package local
import (
"fmt"
"math/rand"
"reflect"
"testing"
"testing/quick"
"time"
"github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility/test"
)
@ -662,3 +665,93 @@ func verifyStorage(t testing.TB, s Storage, samples clientmodel.Samples, maxAge
}
return result
}
func TestChunkMaps(t *testing.T) {
cm := chunkMaps{}
cd1 := &chunkDesc{refCount: 1} // Abuse refCount as identifier.
cd21 := &chunkDesc{refCount: 21}
cd22 := &chunkDesc{refCount: 22}
cd31 := &chunkDesc{refCount: 31}
cd32 := &chunkDesc{refCount: 32}
cd33 := &chunkDesc{refCount: 33}
cd41 := &chunkDesc{refCount: 41}
cd42 := &chunkDesc{refCount: 42}
cd43 := &chunkDesc{refCount: 43}
cd44 := &chunkDesc{refCount: 44}
cd51 := &chunkDesc{refCount: 51}
cd52 := &chunkDesc{refCount: 52}
cd53 := &chunkDesc{refCount: 53}
cd54 := &chunkDesc{refCount: 54}
cd55 := &chunkDesc{refCount: 55}
cm.add(5, cd51)
cm.add(3, cd31)
cm.add(5, cd52)
cm.add(1, cd1)
cm.add(4, cd41)
cm.add(4, cd42)
cm.add(5, cd53)
cm.add(3, cd32)
cm.add(2, cd21)
cm.add(5, cd54)
cm.add(3, cd33)
cm.add(4, cd43)
cm.add(2, cd22)
cm.add(4, cd44)
cm.add(5, cd55)
var fpWant, fpGot clientmodel.Fingerprint
var cdsWant, cdsGot []*chunkDesc
fpWant = 5
cdsWant = []*chunkDesc{cd51, cd52, cd53, cd54, cd55}
fpGot, cdsGot = cm.pop()
if fpWant != fpGot {
t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot)
}
if !reflect.DeepEqual(cdsWant, cdsGot) {
t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot)
}
fpWant = 4
cdsWant = []*chunkDesc{cd41, cd42, cd43, cd44}
fpGot, cdsGot = cm.pop()
if fpWant != fpGot {
t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot)
}
if !reflect.DeepEqual(cdsWant, cdsGot) {
t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot)
}
fpWant = 3
cdsWant = []*chunkDesc{cd31, cd32, cd33}
fpGot, cdsGot = cm.pop()
if fpWant != fpGot {
t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot)
}
if !reflect.DeepEqual(cdsWant, cdsGot) {
t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot)
}
fpWant = 2
cdsWant = []*chunkDesc{cd21, cd22}
fpGot, cdsGot = cm.pop()
if fpWant != fpGot {
t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot)
}
if !reflect.DeepEqual(cdsWant, cdsGot) {
t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot)
}
fpWant = 1
cdsWant = []*chunkDesc{cd1}
fpGot, cdsGot = cm.pop()
if fpWant != fpGot {
t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot)
}
if !reflect.DeepEqual(cdsWant, cdsGot) {
t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot)
}
}

View File

@ -41,7 +41,7 @@ func NewTestStorage(t test.T) (Storage, test.Closer) {
directory := test.NewTemporaryDirectory("test_storage", t)
o := &MemorySeriesStorageOptions{
MemoryChunks: 1000000,
PersistenceRetentionPeriod: 24 * 7 * time.Hour,
PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging.
PersistenceStoragePath: directory.Path(),
CheckpointInterval: time.Hour,
}