diff --git a/head.go b/head.go index 264af2211..a06555a56 100644 --- a/head.go +++ b/head.go @@ -20,13 +20,13 @@ type headBlock struct { // descs holds all chunk descs for the head block. Each chunk implicitly // is assigned the index as its ID. - descs []*chunkDesc + series []*memSeries // mapping maps a series ID to its position in an ordered list // of all series. The orderDirty flag indicates that it has gone stale. mapper *positionMapper // hashes contains a collision map of label set hashes of chunks // to their chunk descs. - hashes map[uint64][]*chunkDesc + hashes map[uint64][]*memSeries values map[string]stringset // label names to possible values postings *memPostings // postings lists for terms @@ -45,8 +45,8 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { b := &headBlock{ dir: dir, - descs: []*chunkDesc{}, - hashes: map[uint64][]*chunkDesc{}, + series: []*memSeries{}, + hashes: map[uint64][]*memSeries{}, values: map[string]stringset{}, postings: &memPostings{m: make(map[term][]uint32)}, wal: wal, @@ -64,7 +64,9 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { b.stats.ChunkCount++ // head block has one chunk/series }, sample: func(s hashedSample) { - cd := b.descs[s.ref] + si := s.ref + + cd := b.series[si] cd.append(s.t, s.v) if s.t > b.stats.MaxTime { @@ -109,26 +111,29 @@ func (h *headSeriesReader) Chunk(ref uint32) (chunks.Chunk, error) { h.mtx.RLock() defer h.mtx.RUnlock() - if int(ref) >= len(h.descs) { - return nil, errNotFound + c := &safeChunk{ + Chunk: h.series[ref>>8].chunks[int((ref<<24)>>24)].chunk, + s: h.series[ref>>8], + i: int((ref << 24) >> 24), } - return h.descs[int(ref)].chunk, nil + return c, nil } type safeChunk struct { - cd *chunkDesc + chunks.Chunk + s *memSeries + i int } func (c *safeChunk) Iterator() chunks.Iterator { - c.cd.mtx.Lock() - defer c.cd.mtx.Unlock() - - return c.cd.iterator() + c.s.mtx.RLock() + defer c.s.mtx.RUnlock() + return c.s.iterator(c.i) } -func (c *safeChunk) Appender() (chunks.Appender, error) { panic("illegal") } -func (c *safeChunk) Bytes() []byte { panic("illegal") } -func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") } +// func (c *safeChunk) Appender() (chunks.Appender, error) { panic("illegal") } +// func (c *safeChunk) Bytes() []byte { panic("illegal") } +// func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") } type headIndexReader struct { *headBlock @@ -165,19 +170,24 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) h.mtx.RLock() defer h.mtx.RUnlock() - if int(ref) >= len(h.descs) { + if int(ref) >= len(h.series) { return nil, nil, errNotFound } - cd := h.descs[ref] + s := h.series[ref] + metas := make([]ChunkMeta, 0, len(s.chunks)) - cd.mtx.RLock() - meta := ChunkMeta{ - MinTime: cd.firstTimestamp, - MaxTime: cd.lastTimestamp, - Ref: ref, + s.mtx.RLock() + defer s.mtx.RUnlock() + + for i, c := range s.chunks { + metas = append(metas, ChunkMeta{ + MinTime: c.minTime, + MaxTime: c.maxTime, + Ref: (ref << 8) | uint32(i), + }) } - cd.mtx.RUnlock() - return cd.lset, []ChunkMeta{meta}, nil + + return s.lset, metas, nil } func (h *headIndexReader) LabelIndices() ([][]string, error) { @@ -200,35 +210,25 @@ func (h *headIndexReader) Stats() (BlockStats, error) { // get retrieves the chunk with the hash and label set and creates // a new one if it doesn't exist yet. -func (h *headBlock) get(hash uint64, lset labels.Labels) *chunkDesc { - cds := h.hashes[hash] +func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries { + series := h.hashes[hash] - for _, cd := range cds { - if cd.lset.Equals(lset) { - return cd + for _, s := range series { + if s.lset.Equals(lset) { + return s } } return nil } -func (h *headBlock) create(hash uint64, lset labels.Labels) *chunkDesc { - cd := &chunkDesc{ - lset: lset, - chunk: chunks.NewXORChunk(), - lastTimestamp: math.MinInt64, - } +func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries { + s := &memSeries{lset: lset} - var err error - cd.app, err = cd.chunk.Appender() - if err != nil { - // Getting an Appender for a new chunk must not panic. - panic(err) - } // Index the new chunk. - cd.ref = uint32(len(h.descs)) + s.ref = uint32(len(h.series)) - h.descs = append(h.descs, cd) - h.hashes[hash] = append(h.hashes[hash], cd) + h.series = append(h.series, s) + h.hashes[hash] = append(h.hashes[hash], s) for _, l := range lset { valset, ok := h.values[l.Name] @@ -238,12 +238,12 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *chunkDesc { } valset.set(l.Value) - h.postings.add(cd.ref, term{name: l.Name, value: l.Value}) + h.postings.add(s.ref, term{name: l.Name, value: l.Value}) } - h.postings.add(cd.ref, term{}) + h.postings.add(s.ref, term{}) - return cd + return s } var ( @@ -273,18 +273,19 @@ func (h *headBlock) appendBatch(samples []hashedSample) (int, error) { for i := range samples { s := &samples[i] - cd := h.get(s.hash, s.labels) - if cd != nil { - // Samples must only occur in order. - if s.t < cd.lastTimestamp { + ms := h.get(s.hash, s.labels) + if ms != nil { + c := ms.head() + + if s.t < c.maxTime { return 0, ErrOutOfOrderSample } - if cd.lastTimestamp == s.t && cd.lastValue != s.v { + if c.maxTime == s.t && ms.lastValue != s.v { return 0, ErrAmendSample } // TODO(fabxc): sample refs are only scoped within a block for // now and we ignore any previously set value - s.ref = cd.ref + s.ref = ms.ref continue } @@ -303,19 +304,17 @@ func (h *headBlock) appendBatch(samples []hashedSample) (int, error) { newSamples = append(newSamples, s) } - // Write all new series and samples to the WAL and add it to the - // in-mem database on success. - if err := h.wal.Log(newSeries, samples); err != nil { - return 0, err - } - // After the samples were successfully written to the WAL, there may // be no further failures. if len(newSeries) > 0 { + // TODO(fabxc): re-check if we actually have to create a new series + // after acquiring the write lock. + // If concurrent appenders attempt to create the same series, there's + // a semantical race between switching locks. h.mtx.RUnlock() h.mtx.Lock() - base := len(h.descs) + base := len(h.series) for i, s := range newSeries { h.create(newHashes[i], s) @@ -327,6 +326,11 @@ func (h *headBlock) appendBatch(samples []hashedSample) (int, error) { h.mtx.Unlock() h.mtx.RLock() } + // Write all new series and samples to the WAL and add it to the + // in-mem database on success. + if err := h.wal.Log(newSeries, samples); err != nil { + return 0, err + } var ( total = uint64(len(samples)) @@ -334,16 +338,14 @@ func (h *headBlock) appendBatch(samples []hashedSample) (int, error) { maxt = int64(math.MinInt64) ) for _, s := range samples { - cd := h.descs[s.ref] - cd.mtx.Lock() - // Skip duplicate samples. - if cd.lastTimestamp == s.t && cd.lastValue != s.v { + ser := h.series[s.ref] + ser.mtx.Lock() + ok := ser.append(s.t, s.v) + ser.mtx.Unlock() + if !ok { total-- continue } - cd.append(s.t, s.v) - cd.mtx.Unlock() - if mint > s.t { mint = s.t } @@ -379,18 +381,18 @@ func (h *headBlock) fullness() float64 { func (h *headBlock) updateMapping() { h.mtx.RLock() - if h.mapper.sortable != nil && h.mapper.Len() == len(h.descs) { + if h.mapper.sortable != nil && h.mapper.Len() == len(h.series) { h.mtx.RUnlock() return } - cds := make([]*chunkDesc, len(h.descs)) - copy(cds, h.descs) + series := make([]*memSeries, len(h.series)) + copy(series, h.series) h.mtx.RUnlock() - s := slice.SortInterface(cds, func(i, j int) bool { - return labels.Compare(cds[i].lset, cds[j].lset) < 0 + s := slice.SortInterface(series, func(i, j int) bool { + return labels.Compare(series[i].lset, series[j].lset) < 0 }) h.mapper.update(s) @@ -415,51 +417,89 @@ func (h *headBlock) remapPostings(p Postings) Postings { return newListPostings(list) } -// chunkDesc wraps a plain data chunk and provides cached meta data about it. -type chunkDesc struct { +type memSeries struct { mtx sync.RWMutex - ref uint32 - lset labels.Labels - chunk chunks.Chunk - - // Caching fielddb. - firstTimestamp int64 - lastTimestamp int64 - lastValue float64 - numSamples int + ref uint32 + lset labels.Labels + chunks []*memChunk + lastValue float64 sampleBuf [4]sample app chunks.Appender // Current appender for the chunkdb. } -func (cd *chunkDesc) append(ts int64, v float64) { - if cd.numSamples == 0 { - cd.firstTimestamp = ts +func (s *memSeries) cut() *memChunk { + c := &memChunk{ + chunk: chunks.NewXORChunk(), + maxTime: math.MinInt64, } - cd.app.Append(ts, v) + s.chunks = append(s.chunks, c) - cd.lastTimestamp = ts - cd.lastValue = v - cd.numSamples++ + app, err := c.chunk.Appender() + if err != nil { + panic(err) + } - cd.sampleBuf[0] = cd.sampleBuf[1] - cd.sampleBuf[1] = cd.sampleBuf[2] - cd.sampleBuf[2] = cd.sampleBuf[3] - cd.sampleBuf[3] = sample{t: ts, v: v} + s.app = app + return c } -func (cd *chunkDesc) iterator() chunks.Iterator { +func (s *memSeries) append(t int64, v float64) bool { + var c *memChunk + + if s.app == nil || s.head().samples > 10050 { + c = s.cut() + c.minTime = t + } else { + c = s.head() + // Skip duplicate samples. + if c.maxTime == t && s.lastValue != v { + return false + } + } + s.app.Append(t, v) + + c.maxTime = t + c.samples++ + + s.lastValue = v + + s.sampleBuf[0] = s.sampleBuf[1] + s.sampleBuf[1] = s.sampleBuf[2] + s.sampleBuf[2] = s.sampleBuf[3] + s.sampleBuf[3] = sample{t: t, v: v} + + return true +} + +func (s *memSeries) iterator(i int) chunks.Iterator { + c := s.chunks[i] + + if i < len(s.chunks)-1 { + return c.chunk.Iterator() + } + it := &memSafeIterator{ - Iterator: cd.chunk.Iterator(), + Iterator: c.chunk.Iterator(), i: -1, - total: cd.numSamples, - buf: cd.sampleBuf, + total: c.samples, + buf: s.sampleBuf, } return it } +func (s *memSeries) head() *memChunk { + return s.chunks[len(s.chunks)-1] +} + +type memChunk struct { + chunk chunks.Chunk + minTime, maxTime int64 + samples int +} + type memSafeIterator struct { chunks.Iterator