From 0ca755b4aef81445ddb34c25375b16b563f89eab Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 11 Jan 2017 13:02:38 +0100 Subject: [PATCH] Replace single head chunk per series with memSeries This adds a memory series holding several chunk to replace the single head chunk per series so far. This is necessary for uniform maximum chunk sizes in cases where some series have higher frequency samples than others. --- head.go | 238 +++++++++++++++++++++++++++++++++----------------------- 1 file changed, 139 insertions(+), 99 deletions(-) diff --git a/head.go b/head.go index 264af2211..a06555a56 100644 --- a/head.go +++ b/head.go @@ -20,13 +20,13 @@ type headBlock struct { // descs holds all chunk descs for the head block. Each chunk implicitly // is assigned the index as its ID. - descs []*chunkDesc + series []*memSeries // mapping maps a series ID to its position in an ordered list // of all series. The orderDirty flag indicates that it has gone stale. mapper *positionMapper // hashes contains a collision map of label set hashes of chunks // to their chunk descs. - hashes map[uint64][]*chunkDesc + hashes map[uint64][]*memSeries values map[string]stringset // label names to possible values postings *memPostings // postings lists for terms @@ -45,8 +45,8 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { b := &headBlock{ dir: dir, - descs: []*chunkDesc{}, - hashes: map[uint64][]*chunkDesc{}, + series: []*memSeries{}, + hashes: map[uint64][]*memSeries{}, values: map[string]stringset{}, postings: &memPostings{m: make(map[term][]uint32)}, wal: wal, @@ -64,7 +64,9 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { b.stats.ChunkCount++ // head block has one chunk/series }, sample: func(s hashedSample) { - cd := b.descs[s.ref] + si := s.ref + + cd := b.series[si] cd.append(s.t, s.v) if s.t > b.stats.MaxTime { @@ -109,26 +111,29 @@ func (h *headSeriesReader) Chunk(ref uint32) (chunks.Chunk, error) { h.mtx.RLock() defer h.mtx.RUnlock() - if int(ref) >= len(h.descs) { - return nil, errNotFound + c := &safeChunk{ + Chunk: h.series[ref>>8].chunks[int((ref<<24)>>24)].chunk, + s: h.series[ref>>8], + i: int((ref << 24) >> 24), } - return h.descs[int(ref)].chunk, nil + return c, nil } type safeChunk struct { - cd *chunkDesc + chunks.Chunk + s *memSeries + i int } func (c *safeChunk) Iterator() chunks.Iterator { - c.cd.mtx.Lock() - defer c.cd.mtx.Unlock() - - return c.cd.iterator() + c.s.mtx.RLock() + defer c.s.mtx.RUnlock() + return c.s.iterator(c.i) } -func (c *safeChunk) Appender() (chunks.Appender, error) { panic("illegal") } -func (c *safeChunk) Bytes() []byte { panic("illegal") } -func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") } +// func (c *safeChunk) Appender() (chunks.Appender, error) { panic("illegal") } +// func (c *safeChunk) Bytes() []byte { panic("illegal") } +// func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") } type headIndexReader struct { *headBlock @@ -165,19 +170,24 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) h.mtx.RLock() defer h.mtx.RUnlock() - if int(ref) >= len(h.descs) { + if int(ref) >= len(h.series) { return nil, nil, errNotFound } - cd := h.descs[ref] + s := h.series[ref] + metas := make([]ChunkMeta, 0, len(s.chunks)) - cd.mtx.RLock() - meta := ChunkMeta{ - MinTime: cd.firstTimestamp, - MaxTime: cd.lastTimestamp, - Ref: ref, + s.mtx.RLock() + defer s.mtx.RUnlock() + + for i, c := range s.chunks { + metas = append(metas, ChunkMeta{ + MinTime: c.minTime, + MaxTime: c.maxTime, + Ref: (ref << 8) | uint32(i), + }) } - cd.mtx.RUnlock() - return cd.lset, []ChunkMeta{meta}, nil + + return s.lset, metas, nil } func (h *headIndexReader) LabelIndices() ([][]string, error) { @@ -200,35 +210,25 @@ func (h *headIndexReader) Stats() (BlockStats, error) { // get retrieves the chunk with the hash and label set and creates // a new one if it doesn't exist yet. -func (h *headBlock) get(hash uint64, lset labels.Labels) *chunkDesc { - cds := h.hashes[hash] +func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries { + series := h.hashes[hash] - for _, cd := range cds { - if cd.lset.Equals(lset) { - return cd + for _, s := range series { + if s.lset.Equals(lset) { + return s } } return nil } -func (h *headBlock) create(hash uint64, lset labels.Labels) *chunkDesc { - cd := &chunkDesc{ - lset: lset, - chunk: chunks.NewXORChunk(), - lastTimestamp: math.MinInt64, - } +func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries { + s := &memSeries{lset: lset} - var err error - cd.app, err = cd.chunk.Appender() - if err != nil { - // Getting an Appender for a new chunk must not panic. - panic(err) - } // Index the new chunk. - cd.ref = uint32(len(h.descs)) + s.ref = uint32(len(h.series)) - h.descs = append(h.descs, cd) - h.hashes[hash] = append(h.hashes[hash], cd) + h.series = append(h.series, s) + h.hashes[hash] = append(h.hashes[hash], s) for _, l := range lset { valset, ok := h.values[l.Name] @@ -238,12 +238,12 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *chunkDesc { } valset.set(l.Value) - h.postings.add(cd.ref, term{name: l.Name, value: l.Value}) + h.postings.add(s.ref, term{name: l.Name, value: l.Value}) } - h.postings.add(cd.ref, term{}) + h.postings.add(s.ref, term{}) - return cd + return s } var ( @@ -273,18 +273,19 @@ func (h *headBlock) appendBatch(samples []hashedSample) (int, error) { for i := range samples { s := &samples[i] - cd := h.get(s.hash, s.labels) - if cd != nil { - // Samples must only occur in order. - if s.t < cd.lastTimestamp { + ms := h.get(s.hash, s.labels) + if ms != nil { + c := ms.head() + + if s.t < c.maxTime { return 0, ErrOutOfOrderSample } - if cd.lastTimestamp == s.t && cd.lastValue != s.v { + if c.maxTime == s.t && ms.lastValue != s.v { return 0, ErrAmendSample } // TODO(fabxc): sample refs are only scoped within a block for // now and we ignore any previously set value - s.ref = cd.ref + s.ref = ms.ref continue } @@ -303,19 +304,17 @@ func (h *headBlock) appendBatch(samples []hashedSample) (int, error) { newSamples = append(newSamples, s) } - // Write all new series and samples to the WAL and add it to the - // in-mem database on success. - if err := h.wal.Log(newSeries, samples); err != nil { - return 0, err - } - // After the samples were successfully written to the WAL, there may // be no further failures. if len(newSeries) > 0 { + // TODO(fabxc): re-check if we actually have to create a new series + // after acquiring the write lock. + // If concurrent appenders attempt to create the same series, there's + // a semantical race between switching locks. h.mtx.RUnlock() h.mtx.Lock() - base := len(h.descs) + base := len(h.series) for i, s := range newSeries { h.create(newHashes[i], s) @@ -327,6 +326,11 @@ func (h *headBlock) appendBatch(samples []hashedSample) (int, error) { h.mtx.Unlock() h.mtx.RLock() } + // Write all new series and samples to the WAL and add it to the + // in-mem database on success. + if err := h.wal.Log(newSeries, samples); err != nil { + return 0, err + } var ( total = uint64(len(samples)) @@ -334,16 +338,14 @@ func (h *headBlock) appendBatch(samples []hashedSample) (int, error) { maxt = int64(math.MinInt64) ) for _, s := range samples { - cd := h.descs[s.ref] - cd.mtx.Lock() - // Skip duplicate samples. - if cd.lastTimestamp == s.t && cd.lastValue != s.v { + ser := h.series[s.ref] + ser.mtx.Lock() + ok := ser.append(s.t, s.v) + ser.mtx.Unlock() + if !ok { total-- continue } - cd.append(s.t, s.v) - cd.mtx.Unlock() - if mint > s.t { mint = s.t } @@ -379,18 +381,18 @@ func (h *headBlock) fullness() float64 { func (h *headBlock) updateMapping() { h.mtx.RLock() - if h.mapper.sortable != nil && h.mapper.Len() == len(h.descs) { + if h.mapper.sortable != nil && h.mapper.Len() == len(h.series) { h.mtx.RUnlock() return } - cds := make([]*chunkDesc, len(h.descs)) - copy(cds, h.descs) + series := make([]*memSeries, len(h.series)) + copy(series, h.series) h.mtx.RUnlock() - s := slice.SortInterface(cds, func(i, j int) bool { - return labels.Compare(cds[i].lset, cds[j].lset) < 0 + s := slice.SortInterface(series, func(i, j int) bool { + return labels.Compare(series[i].lset, series[j].lset) < 0 }) h.mapper.update(s) @@ -415,51 +417,89 @@ func (h *headBlock) remapPostings(p Postings) Postings { return newListPostings(list) } -// chunkDesc wraps a plain data chunk and provides cached meta data about it. -type chunkDesc struct { +type memSeries struct { mtx sync.RWMutex - ref uint32 - lset labels.Labels - chunk chunks.Chunk - - // Caching fielddb. - firstTimestamp int64 - lastTimestamp int64 - lastValue float64 - numSamples int + ref uint32 + lset labels.Labels + chunks []*memChunk + lastValue float64 sampleBuf [4]sample app chunks.Appender // Current appender for the chunkdb. } -func (cd *chunkDesc) append(ts int64, v float64) { - if cd.numSamples == 0 { - cd.firstTimestamp = ts +func (s *memSeries) cut() *memChunk { + c := &memChunk{ + chunk: chunks.NewXORChunk(), + maxTime: math.MinInt64, } - cd.app.Append(ts, v) + s.chunks = append(s.chunks, c) - cd.lastTimestamp = ts - cd.lastValue = v - cd.numSamples++ + app, err := c.chunk.Appender() + if err != nil { + panic(err) + } - cd.sampleBuf[0] = cd.sampleBuf[1] - cd.sampleBuf[1] = cd.sampleBuf[2] - cd.sampleBuf[2] = cd.sampleBuf[3] - cd.sampleBuf[3] = sample{t: ts, v: v} + s.app = app + return c } -func (cd *chunkDesc) iterator() chunks.Iterator { +func (s *memSeries) append(t int64, v float64) bool { + var c *memChunk + + if s.app == nil || s.head().samples > 10050 { + c = s.cut() + c.minTime = t + } else { + c = s.head() + // Skip duplicate samples. + if c.maxTime == t && s.lastValue != v { + return false + } + } + s.app.Append(t, v) + + c.maxTime = t + c.samples++ + + s.lastValue = v + + s.sampleBuf[0] = s.sampleBuf[1] + s.sampleBuf[1] = s.sampleBuf[2] + s.sampleBuf[2] = s.sampleBuf[3] + s.sampleBuf[3] = sample{t: t, v: v} + + return true +} + +func (s *memSeries) iterator(i int) chunks.Iterator { + c := s.chunks[i] + + if i < len(s.chunks)-1 { + return c.chunk.Iterator() + } + it := &memSafeIterator{ - Iterator: cd.chunk.Iterator(), + Iterator: c.chunk.Iterator(), i: -1, - total: cd.numSamples, - buf: cd.sampleBuf, + total: c.samples, + buf: s.sampleBuf, } return it } +func (s *memSeries) head() *memChunk { + return s.chunks[len(s.chunks)-1] +} + +type memChunk struct { + chunk chunks.Chunk + minTime, maxTime int64 + samples int +} + type memSafeIterator struct { chunks.Iterator