Fix the weird chunkDesc shuffling bug.

The root cause was that after chunkDesc eviction, the offset between
memory representation of chunk layout (via chunkDescs in memory) was
shiftet against chunks as layed out on disk. Keeping the offset up to
date is by no means trivial, so this commit is pretty involved.

Also, found a race that for some reason didn't bite us so far:
Persisting chunks was completel unlocked, so if chunks were purged on
disk at the same time, disaster would strike. However, locking the
persisting of chunk revealed interesting dead locks. Basically, never
queue under the fp lock.

Change-Id: I1ea9e4e71024cabbc1f9601b28e74db0c5c55db8
This commit is contained in:
Bjoern Rabenstein 2014-10-27 20:40:48 +01:00
parent a617269b12
commit d215e013b7
5 changed files with 193 additions and 123 deletions

View File

@ -58,8 +58,10 @@ const (
)
const (
flagChunkDescsLoaded byte = 1 << iota
flagHeadChunkPersisted
flagHeadChunkPersisted byte = 1 << iota
// Add more flags here like:
// flagFoo
// flagBar
)
type indexingOpType byte
@ -228,34 +230,53 @@ func (p *persistence) getLabelValuesForLabelName(ln clientmodel.LabelName) (clie
}
// persistChunk persists a single chunk of a series. It is the caller's
// responsibility to not modify chunk concurrently and to not persist or drop anything
// for the same fingerprint concurrently.
func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) error {
// responsibility to not modify chunk concurrently and to not persist or drop
// anything for the same fingerprint concurrently. It returns the (zero-based)
// index of the persisted chunk within the series file. In case of an error, the
// returned index is -1 (to avoid the misconception that the chunk was written
// at position 0).
func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) (int, error) {
// 1. Open chunk file.
f, err := p.openChunkFileForWriting(fp)
if err != nil {
return err
return -1, err
}
defer f.Close()
b := bufio.NewWriterSize(f, chunkHeaderLen+p.chunkLen)
defer b.Flush()
// 2. Write the header (chunk type and first/last times).
err = writeChunkHeader(b, c)
if err != nil {
return err
return -1, err
}
// 3. Write chunk into file.
return c.marshal(b)
err = c.marshal(b)
if err != nil {
return -1, err
}
// 4. Determine index within the file.
b.Flush()
offset, err := f.Seek(0, os.SEEK_CUR)
if err != nil {
return -1, err
}
index, err := p.chunkIndexForOffset(offset)
if err != nil {
return -1, err
}
return index - 1, err
}
// loadChunks loads a group of chunks of a timeseries by their index. The chunk
// with the earliest time will have index 0, the following ones will have
// incrementally larger indexes. It is the caller's responsibility to not
// persist or drop anything for the same fingerprint concurrently.
func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int) ([]chunk, error) {
// incrementally larger indexes. The indexOffset denotes the offset to be added to
// each index in indexes. It is the caller's responsibility to not persist or
// drop anything for the same fingerprint concurrently.
func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) {
// TODO: we need to verify at some point that file length is a multiple of
// the chunk size. When is the best time to do this, and where to remember
// it? Right now, we only do it when loading chunkDescs.
@ -268,7 +289,7 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int) ([]c
chunks := make([]chunk, 0, len(indexes))
typeBuf := make([]byte, 1)
for _, idx := range indexes {
_, err := f.Seek(p.offsetForChunkIndex(idx), os.SEEK_SET)
_, err := f.Seek(p.offsetForChunkIndex(idx+indexOffset), os.SEEK_SET)
if err != nil {
return nil, err
}
@ -339,7 +360,7 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
chunkFirstTime: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf)),
chunkLastTime: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf[8:])),
}
if !cd.firstTime().Before(beforeTime) {
if cd.chunkLastTime.After(beforeTime) {
// From here on, we have chunkDescs in memory already.
break
}
@ -412,9 +433,6 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
}
realNumberOfSeries++
var seriesFlags byte
if m.series.chunkDescsLoaded {
seriesFlags |= flagChunkDescsLoaded
}
if m.series.headChunkPersisted {
seriesFlags |= flagHeadChunkPersisted
}
@ -430,6 +448,9 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
return
}
w.Write(buf)
if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset)); err != nil {
return
}
if _, err = codable.EncodeVarint(w, int64(len(m.series.chunkDescs))); err != nil {
return
}
@ -523,6 +544,10 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) {
if err := metric.UnmarshalFromReader(r); err != nil {
return nil, err
}
chunkDescsOffset, err := binary.ReadVarint(r)
if err != nil {
return nil, err
}
numChunkDescs, err := binary.ReadVarint(r)
if err != nil {
return nil, err
@ -562,7 +587,7 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) {
fingerprintToSeries[clientmodel.Fingerprint(fp)] = &memorySeries{
metric: clientmodel.Metric(metric),
chunkDescs: chunkDescs,
chunkDescsLoaded: seriesFlags&flagChunkDescsLoaded != 0,
chunkDescsOffset: int(chunkDescsOffset),
headChunkPersisted: headChunkPersisted,
}
}
@ -572,24 +597,25 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) {
}
// dropChunks deletes all chunks from a series whose last sample time is before
// beforeTime. It returns true if all chunks of the series have been deleted.
// It is the caller's responsibility to make sure nothing is persisted or loaded
// for the same fingerprint concurrently.
func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (bool, error) {
// beforeTime. It returns the number of deleted chunks and true if all chunks of
// the series have been deleted. It is the caller's responsibility to make sure
// nothing is persisted or loaded for the same fingerprint concurrently.
func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (int, bool, error) {
f, err := p.openChunkFileForReading(fp)
if os.IsNotExist(err) {
return true, nil
return 0, true, nil
}
if err != nil {
return false, err
return 0, false, err
}
defer f.Close()
// Find the first chunk that should be kept.
for i := 0; ; i++ {
var i int
for ; ; i++ {
_, err := f.Seek(p.offsetForChunkIndex(i)+chunkHeaderLastTimeOffset, os.SEEK_SET)
if err != nil {
return false, err
return 0, false, err
}
lastTimeBuf := make([]byte, 8)
_, err = io.ReadAtLeast(f, lastTimeBuf, 8)
@ -598,12 +624,12 @@ func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmo
// be kept. Remove the whole file.
chunkOps.WithLabelValues(purge).Add(float64(i))
if err := os.Remove(f.Name()); err != nil {
return true, err
return 0, true, err
}
return true, nil
return i, true, nil
}
if err != nil {
return false, err
return 0, false, err
}
lastTime := clientmodel.Timestamp(binary.LittleEndian.Uint64(lastTimeBuf))
if !lastTime.Before(beforeTime) {
@ -617,21 +643,23 @@ func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmo
// file.
_, err = f.Seek(-(chunkHeaderLastTimeOffset + 8), os.SEEK_CUR)
if err != nil {
return false, err
return 0, false, err
}
temp, err := os.OpenFile(p.tempFileNameForFingerprint(fp), os.O_WRONLY|os.O_CREATE, 0640)
if err != nil {
return false, err
return 0, false, err
}
defer temp.Close()
if _, err := io.Copy(temp, f); err != nil {
return false, err
return 0, false, err
}
os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp))
return false, nil
if err := os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp)); err != nil {
return 0, false, err
}
return i, false, nil
}
// indexMetric queues the given metric for addition to the indexes needed by
@ -836,6 +864,16 @@ func (p *persistence) offsetForChunkIndex(i int) int64 {
return int64(i * (chunkHeaderLen + p.chunkLen))
}
func (p *persistence) chunkIndexForOffset(offset int64) (int, error) {
if int(offset)%(chunkHeaderLen+p.chunkLen) != 0 {
return -1, fmt.Errorf(
"offset %d is not a multiple of on-disk chunk length %d",
offset, chunkHeaderLen+p.chunkLen,
)
}
return int(offset) / (chunkHeaderLen + p.chunkLen), nil
}
func (p *persistence) headsFileName() string {
return path.Join(p.basePath, headsFileName)
}

View File

@ -82,10 +82,14 @@ func TestPersistChunk(t *testing.T) {
fpToChunks := buildTestChunks()
for fp, chunks := range fpToChunks {
for _, c := range chunks {
if err := p.persistChunk(fp, c); err != nil {
for i, c := range chunks {
index, err := p.persistChunk(fp, c)
if err != nil {
t.Fatal(err)
}
if i != index {
t.Errorf("Want chunk index %d, got %d.", i, index)
}
}
}
@ -94,7 +98,7 @@ func TestPersistChunk(t *testing.T) {
for i := range expectedChunks {
indexes = append(indexes, i)
}
actualChunks, err := p.loadChunks(fp, indexes)
actualChunks, err := p.loadChunks(fp, indexes, 0)
if err != nil {
t.Fatal(err)
}

View File

@ -136,10 +136,16 @@ type memorySeries struct {
metric clientmodel.Metric
// Sorted by start time, overlapping chunk ranges are forbidden.
chunkDescs []*chunkDesc
// Whether chunkDescs for chunks on disk are all loaded. If false, some
// (or all) chunkDescs are only on disk. These chunks are all contiguous
// and at the tail end.
chunkDescsLoaded bool
// The chunkDescs in memory might not have all the chunkDescs for the
// chunks that are persisted to disk. The missing chunkDescs are all
// contiguous and at the tail end. chunkDescsOffset is the index of the
// chunk on disk that corresponds to the first chunkDesc in memory. If
// it is 0, the chunkDescs are all loaded. A value of -1 denotes a
// special case: There are chunks on disk, but the offset to the
// chunkDescs in memory is unknown. Also, there is no overlap between
// chunks on disk and chunks in memory (implying that upon first
// persiting of a chunk in memory, the offset has to be set).
chunkDescsOffset int
// Whether the current head chunk has already been scheduled to be
// persisted. If true, the current head chunk must not be modified
// anymore.
@ -155,16 +161,20 @@ type memorySeries struct {
// or (if false) a series for a metric being unarchived, i.e. a series that
// existed before but has been evicted from memory.
func newMemorySeries(m clientmodel.Metric, reallyNew bool) *memorySeries {
return &memorySeries{
s := memorySeries{
metric: m,
chunkDescsLoaded: reallyNew,
headChunkPersisted: !reallyNew,
}
if !reallyNew {
s.chunkDescsOffset = -1
}
return &s
}
// add adds a sample pair to the series.
// It returns chunkDescs that must be queued to be persisted.
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, persistQueue chan *persistRequest) {
func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair) []*chunkDesc {
if len(s.chunkDescs) == 0 || s.headChunkPersisted {
newHead := newChunkDesc(newDeltaEncodedChunk(d1, d0, true))
s.chunkDescs = append(s.chunkDescs, newHead)
@ -187,33 +197,27 @@ func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, per
}
chunks := s.head().add(v)
s.head().chunk = chunks[0]
var chunkDescsToPersist []*chunkDesc
if len(chunks) > 1 {
queuePersist := func(cd *chunkDesc) {
persistQueue <- &persistRequest{
fingerprint: fp,
chunkDesc: cd,
}
}
queuePersist(s.head())
chunkDescsToPersist = append(chunkDescsToPersist, s.head())
for i, c := range chunks[1:] {
cd := newChunkDesc(c)
s.chunkDescs = append(s.chunkDescs, cd)
// The last chunk is still growing.
if i < len(chunks[1:])-1 {
queuePersist(cd)
chunkDescsToPersist = append(chunkDescsToPersist, cd)
}
}
}
return chunkDescsToPersist
}
// evictOlderThan marks for eviction all chunks whose latest sample is older
// than the given timestamp. It returns true if all chunks in the series were
// immediately evicted (i.e. all chunks are older than the timestamp, and none
// of the chunks was pinned).
// than the given timestamp. It returns allEvicted as true if all chunks in the
// series were immediately evicted (i.e. all chunks are older than the
// timestamp, and none of the chunks was pinned).
//
// The method also evicts chunkDescs if there are chunkDescEvictionFactor times
// more chunkDescs in the series than unevicted chunks. (The number of unevicted
@ -222,25 +226,22 @@ func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, per
// series, even if chunks in between were evicted.)
//
// Special considerations for the head chunk: If it has not been scheduled to be
// persisted yet but is old enough for eviction, the scheduling happens now. (To
// do that, the method neets the fingerprint and the persist queue.) It is
// likely that the actual persisting will not happen soon enough to immediately
// evict the head chunk, though. Thus, calling evictOlderThan for a series with
// a non-persisted head chunk will most likely return false, even if no chunk is
// pinned for other reasons. A series old enough for archiving will usually
// persisted yet but is old enough for eviction, this method returns
// persistHeadChunk as true. The caller is then responsible for persisting the
// head chunk. The internal state of this memorySeries is already set
// accordingly by this method. Calling evictOlderThan for a series with a
// non-persisted head chunk that is old enough for eviction will never evict all
// chunks immediately, even if no chunk is pinned for other reasons, because the
// head chunk is not persisted yet. A series old enough for archiving will
// require at least two eviction runs to become ready for archiving: In the
// first run, its head chunk is scheduled to be persisted. The next call of
// first run, its head chunk is requested to be persisted. The next call of
// evictOlderThan will then return true, provided that the series hasn't
// received new samples in the meantime, the head chunk has now been persisted,
// and no chunk is pinned for other reasons.
//
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) evictOlderThan(
t clientmodel.Timestamp,
fp clientmodel.Fingerprint,
persistQueue chan *persistRequest,
) bool {
allEvicted := true
func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool, persistHeadChunk bool) {
allEvicted = true
iOldestNotEvicted := -1
defer func() {
@ -250,8 +251,8 @@ func (s *memorySeries) evictOlderThan(
if iOldestNotEvicted != -1 {
lenToKeep := chunkDescEvictionFactor * (len(s.chunkDescs) - iOldestNotEvicted)
if lenToKeep < len(s.chunkDescs) {
s.chunkDescsLoaded = false
lenEvicted := len(s.chunkDescs) - lenToKeep
s.chunkDescsOffset += lenEvicted
chunkDescOps.WithLabelValues(evict).Add(float64(lenEvicted))
atomic.AddInt64(&numMemChunkDescs, -int64(lenEvicted))
s.chunkDescs = append(
@ -268,22 +269,19 @@ func (s *memorySeries) evictOlderThan(
if iOldestNotEvicted == -1 {
iOldestNotEvicted = i
}
return false
return false, false
}
if cd.isEvicted() {
continue
}
if !s.headChunkPersisted && i == len(s.chunkDescs)-1 {
// This is a non-persisted head chunk that is old enough
// for eviction. Queue it to be persisted:
// for eviction. Request it to be persisted:
persistHeadChunk = true
s.headChunkPersisted = true
// Since we cannot modify the head chunk from now on, we
// don't need to bother with cloning anymore.
s.headChunkUsedByIterator = false
persistQueue <- &persistRequest{
fingerprint: fp,
chunkDesc: cd,
}
}
if !cd.evictOnUnpin() {
if iOldestNotEvicted == -1 {
@ -292,13 +290,15 @@ func (s *memorySeries) evictOlderThan(
allEvicted = false
}
}
return allEvicted
return allEvicted, persistHeadChunk
}
// purgeOlderThan returns true if all chunks have been purged.
// purgeOlderThan removes chunkDescs older than t. It also evicts the chunks of
// those chunkDescs (although that's probably not even necessary). It returns
// the number of purged chunkDescs and true if all chunkDescs have been purged.
//
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) bool {
func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) (int, bool) {
keepIdx := len(s.chunkDescs)
for i, cd := range s.chunkDescs {
if !cd.lastTime().Before(t) {
@ -307,9 +307,11 @@ func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) bool {
}
s.chunkDescs[i].evictOnUnpin()
}
s.chunkDescs = append(make([]*chunkDesc, 0, len(s.chunkDescs)-keepIdx), s.chunkDescs[keepIdx:]...)
atomic.AddInt64(&numMemChunkDescs, -int64(keepIdx))
return len(s.chunkDescs) == 0
if keepIdx > 0 {
s.chunkDescs = append(make([]*chunkDesc, 0, len(s.chunkDescs)-keepIdx), s.chunkDescs[keepIdx:]...)
atomic.AddInt64(&numMemChunkDescs, -int64(keepIdx))
}
return keepIdx, len(s.chunkDescs) == 0
}
// preloadChunks is an internal helper method.
@ -327,8 +329,11 @@ func (s *memorySeries) preloadChunks(indexes []int, p *persistence) ([]*chunkDes
chunkOps.WithLabelValues(pin).Add(float64(len(pinnedChunkDescs)))
if len(loadIndexes) > 0 {
if s.chunkDescsOffset == -1 {
panic("requested loading chunks from persistence in a situation where we must not have persisted data for chunk descriptors in memory")
}
fp := s.metric.Fingerprint()
chunks, err := p.loadChunks(fp, loadIndexes)
chunks, err := p.loadChunks(fp, loadIndexes, s.chunkDescsOffset)
if err != nil {
// Unpin the chunks since we won't return them as pinned chunks now.
for _, cd := range pinnedChunkDescs {
@ -388,13 +393,13 @@ func (s *memorySeries) preloadChunksForRange(
if len(s.chunkDescs) > 0 {
firstChunkDescTime = s.chunkDescs[0].firstTime()
}
if !s.chunkDescsLoaded && from.Before(firstChunkDescTime) {
if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) {
cds, err := p.loadChunkDescs(fp, firstChunkDescTime)
if err != nil {
return nil, err
}
s.chunkDescs = append(cds, s.chunkDescs...)
s.chunkDescsLoaded = true
s.chunkDescsOffset = 0
}
if len(s.chunkDescs) == 0 {

View File

@ -43,15 +43,15 @@ type persistRequest struct {
}
type memorySeriesStorage struct {
fpLocker *fingerprintLocker
fingerprintToSeries *seriesMap
fpLocker *fingerprintLocker
fpToSeries *seriesMap
loopStopping, loopStopped chan struct{}
evictInterval, evictAfter time.Duration
purgeInterval, purgeAfter time.Duration
checkpointInterval time.Duration
persistQueue chan *persistRequest
persistQueue chan persistRequest
persistStopped chan struct{}
persistence *persistence
@ -84,22 +84,22 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
return nil, err
}
glog.Info("Loading series map and head chunks...")
fingerprintToSeries, err := p.loadSeriesMapAndHeads()
fpToSeries, err := p.loadSeriesMapAndHeads()
if err != nil {
return nil, err
}
glog.Infof("%d series loaded.", fingerprintToSeries.length())
glog.Infof("%d series loaded.", fpToSeries.length())
numSeries := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_series",
Help: "The current number of series in memory.",
})
numSeries.Set(float64(fingerprintToSeries.length()))
numSeries.Set(float64(fpToSeries.length()))
return &memorySeriesStorage{
fpLocker: newFingerprintLocker(100), // TODO: Tweak value.
fingerprintToSeries: fingerprintToSeries,
fpLocker: newFingerprintLocker(100), // TODO: Tweak value.
fpToSeries: fpToSeries,
loopStopping: make(chan struct{}),
loopStopped: make(chan struct{}),
@ -109,7 +109,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
purgeAfter: o.PersistenceRetentionPeriod,
checkpointInterval: o.CheckpointInterval,
persistQueue: make(chan *persistRequest, persistQueueCap),
persistQueue: make(chan persistRequest, persistQueueCap),
persistStopped: make(chan struct{}),
persistence: p,
@ -182,7 +182,7 @@ func (s *memorySeriesStorage) Stop() error {
<-s.persistStopped
// One final checkpoint of the series map and the head chunks.
if err := s.persistence.checkpointSeriesMapAndHeads(s.fingerprintToSeries, s.fpLocker); err != nil {
if err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker); err != nil {
return err
}
@ -202,7 +202,7 @@ func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIter
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, ok := s.fingerprintToSeries.get(fp)
series, ok := s.fpToSeries.get(fp)
if !ok {
// Oops, no series for fp found. That happens if, after
// preloading is done, the whole series is identified as old
@ -301,7 +301,7 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, ok := s.fingerprintToSeries.get(fp)
series, ok := s.fpToSeries.get(fp)
if ok {
// Copy required here because caller might mutate the returned
// metric.
@ -330,17 +330,21 @@ func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) {
func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
fp := sample.Metric.Fingerprint()
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series := s.getOrCreateSeries(fp, sample.Metric)
series.add(fp, &metric.SamplePair{
chunkDescsToPersist := series.add(fp, &metric.SamplePair{
Value: sample.Value,
Timestamp: sample.Timestamp,
}, s.persistQueue)
})
s.fpLocker.Unlock(fp)
// Queue only outside of the locked area, processing the persistQueue
// requires the same lock!
for _, cd := range chunkDescsToPersist {
s.persistQueue <- persistRequest{fp, cd}
}
}
func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries {
series, ok := s.fingerprintToSeries.get(fp)
series, ok := s.fpToSeries.get(fp)
if !ok {
unarchived, err := s.persistence.unarchiveMetric(fp)
if err != nil {
@ -354,7 +358,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl
s.seriesOps.WithLabelValues(create).Inc()
}
series = newMemorySeries(m, !unarchived)
s.fingerprintToSeries.put(fp, series)
s.fpToSeries.put(fp, series)
s.numSeries.Inc()
}
return series
@ -362,7 +366,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl
/*
func (s *memorySeriesStorage) preloadChunksAtTime(fp clientmodel.Fingerprint, ts clientmodel.Timestamp) (chunkDescs, error) {
series, ok := s.fingerprintToSeries.get(fp)
series, ok := s.fpToSeries.get(fp)
if !ok {
panic("requested preload for non-existent series")
}
@ -378,7 +382,7 @@ func (s *memorySeriesStorage) preloadChunksForRange(
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, ok := s.fingerprintToSeries.get(fp)
series, ok := s.fpToSeries.get(fp)
if !ok {
has, first, last, err := s.persistence.hasArchivedMetric(fp)
if err != nil {
@ -404,12 +408,21 @@ func (s *memorySeriesStorage) handlePersistQueue() {
for req := range s.persistQueue {
s.persistQueueLength.Set(float64(len(s.persistQueue)))
start := time.Now()
err := s.persistence.persistChunk(req.fingerprint, req.chunkDesc.chunk)
s.fpLocker.Lock(req.fingerprint)
offset, err := s.persistence.persistChunk(req.fingerprint, req.chunkDesc.chunk)
if series, seriesInMemory := s.fpToSeries.get(req.fingerprint); err == nil && seriesInMemory && series.chunkDescsOffset == -1 {
// This is the first chunk persisted for a newly created
// series that had prior chunks on disk. Finally, we can
// set the chunkDescsOffset.
series.chunkDescsOffset = offset
}
s.fpLocker.Unlock(req.fingerprint)
s.persistLatency.Observe(float64(time.Since(start)) / float64(time.Microsecond))
if err != nil {
s.persistErrors.WithLabelValues(err.Error()).Inc()
glog.Error("Error persisting chunk, requeuing: ", err)
s.persistQueue <- req
glog.Error("Error persisting chunk: ", err)
glog.Error("The storage is now inconsistent. Prepare for disaster.")
// TODO: Remove respective chunkDesc to at least be consistent?
continue
}
req.chunkDesc.unpin()
@ -436,13 +449,13 @@ func (s *memorySeriesStorage) loop() {
case <-s.loopStopping:
return
case <-checkpointTicker.C:
s.persistence.checkpointSeriesMapAndHeads(s.fingerprintToSeries, s.fpLocker)
s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker)
case <-evictTicker.C:
// TODO: Change this to be based on number of chunks in memory.
glog.Info("Evicting chunks...")
begin := time.Now()
for m := range s.fingerprintToSeries.iter() {
for m := range s.fpToSeries.iter() {
select {
case <-s.loopStopping:
glog.Info("Interrupted evicting chunks.")
@ -451,11 +464,11 @@ func (s *memorySeriesStorage) loop() {
// Keep going.
}
s.fpLocker.Lock(m.fp)
if m.series.evictOlderThan(
clientmodel.TimestampFromTime(time.Now()).Add(-1*s.evictAfter),
m.fp, s.persistQueue,
) {
s.fingerprintToSeries.del(m.fp)
allEvicted, persistHeadChunk := m.series.evictOlderThan(
clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.evictAfter),
)
if allEvicted {
s.fpToSeries.del(m.fp)
s.numSeries.Dec()
if err := s.persistence.archiveMetric(
m.fp, m.series.metric, m.series.firstTime(), m.series.lastTime(),
@ -466,6 +479,10 @@ func (s *memorySeriesStorage) loop() {
}
}
s.fpLocker.Unlock(m.fp)
// Queue outside of lock!
if persistHeadChunk {
s.persistQueue <- persistRequest{m.fp, m.series.head()}
}
}
duration := time.Since(begin)
@ -476,7 +493,7 @@ func (s *memorySeriesStorage) loop() {
ts := clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.purgeAfter)
begin := time.Now()
for fp := range s.fingerprintToSeries.fpIter() {
for fp := range s.fpToSeries.fpIter() {
select {
case <-s.loopStopping:
glog.Info("Interrupted purging series.")
@ -515,18 +532,24 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime
defer s.fpLocker.Unlock(fp)
// First purge persisted chunks. We need to do that anyway.
allDropped, err := s.persistence.dropChunks(fp, beforeTime)
numDropped, allDropped, err := s.persistence.dropChunks(fp, beforeTime)
if err != nil {
glog.Error("Error purging persisted chunks: ", err)
}
// Purge chunks from memory accordingly.
if series, ok := s.fingerprintToSeries.get(fp); ok {
if series.purgeOlderThan(beforeTime) && allDropped {
s.fingerprintToSeries.del(fp)
if series, ok := s.fpToSeries.get(fp); ok {
numPurged, allPurged := series.purgeOlderThan(beforeTime)
if allPurged && allDropped {
s.fpToSeries.del(fp)
s.numSeries.Dec()
s.seriesOps.WithLabelValues(memoryPurge).Inc()
s.persistence.unindexMetric(series.metric, fp)
} else if series.chunkDescsOffset != -1 {
series.chunkDescsOffset += numPurged - numDropped
if series.chunkDescsOffset < 0 {
panic("dropped more chunks from persistence than from memory")
}
}
return
}

View File

@ -37,7 +37,7 @@ func TestChunk(t *testing.T) {
s.AppendSamples(samples)
for m := range s.(*memorySeriesStorage).fingerprintToSeries.iter() {
for m := range s.(*memorySeriesStorage).fpToSeries.iter() {
for i, v := range m.series.values() {
if samples[i].Timestamp != v.Timestamp {
t.Fatalf("%d. Got %v; want %v", i, v.Timestamp, samples[i].Timestamp)