diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index a12dc675d..97280ffe4 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -113,7 +113,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{ WALFlushInterval: 200 * time.Millisecond, RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds - MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds + MinBlockDuration: 3 * 60 * 60 * 1000, // 3 hours in milliseconds MaxBlockDuration: 27 * 60 * 60 * 1000, // 1 days in milliseconds }) if err != nil { @@ -157,6 +157,8 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { }) } +const timeDelta = 30000 + func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (uint64, error) { var mu sync.Mutex var total uint64 @@ -174,7 +176,7 @@ func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (u wg.Add(1) go func() { - n, err := b.ingestScrapesShard(batch, 100, int64(30000*i)) + n, err := b.ingestScrapesShard(batch, 100, int64(timeDelta*i)) if err != nil { // exitWithError(err) fmt.Println(" err", err) @@ -212,7 +214,7 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount for i := 0; i < scrapeCount; i++ { app := b.storage.Appender() - ts += int64(30000) + ts += timeDelta for _, s := range scrape { s.value += 1000 diff --git a/compact.go b/compact.go index ff694e33a..ca646a084 100644 --- a/compact.go +++ b/compact.go @@ -314,6 +314,8 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo var metas []BlockMeta for i, b := range blocks { + metas = append(metas, b.Meta()) + all, err := b.Index().Postings("", "") if err != nil { return nil, err @@ -328,7 +330,6 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo if err != nil { return nil, err } - metas = append(metas, b.Meta()) } // We fully rebuild the postings list index from merged series. diff --git a/head.go b/head.go index 8bf0762bb..db4be4c2f 100644 --- a/head.go +++ b/head.go @@ -719,12 +719,7 @@ func (h *HeadBlock) get(hash uint64, lset labels.Labels) *memSeries { } func (h *HeadBlock) create(hash uint64, lset labels.Labels) *memSeries { - s := &memSeries{ - lset: lset, - ref: uint32(len(h.series)), - } - // create the initial chunk and appender - s.cut() + s := newMemSeries(lset, uint32(len(h.series)), h.meta.MaxTime) // Allocate empty space until we can insert at the given index. h.series = append(h.series, s) @@ -759,15 +754,18 @@ type memSeries struct { lset labels.Labels chunks []*memChunk + nextAt int64 // timestamp at which to cut the next chunk. + maxt int64 // maximum timestamp for the series. lastValue float64 sampleBuf [4]sample app chunks.Appender // Current appender for the chunk. } -func (s *memSeries) cut() *memChunk { +func (s *memSeries) cut(mint int64) *memChunk { c := &memChunk{ chunk: chunks.NewXORChunk(), + minTime: mint, maxTime: math.MinInt64, } s.chunks = append(s.chunks, c) @@ -776,32 +774,47 @@ func (s *memSeries) cut() *memChunk { if err != nil { panic(err) } - s.app = app return c } +func newMemSeries(lset labels.Labels, id uint32, maxt int64) *memSeries { + s := &memSeries{ + lset: lset, + ref: id, + maxt: maxt, + nextAt: math.MinInt64, + } + return s +} + func (s *memSeries) append(t int64, v float64) bool { + const samplesPerChunk = 120 + s.mtx.Lock() defer s.mtx.Unlock() var c *memChunk - if s.head().samples > 130 { - c = s.cut() - c.minTime = t - } else { - c = s.head() - // Skip duplicate and out of order samples. - if c.maxTime >= t { - return false - } + if len(s.chunks) == 0 { + c = s.cut(t) + } + c = s.head() + if c.maxTime >= t { + return false + } + if c.samples > samplesPerChunk/4 && t >= s.nextAt { + c = s.cut(t) } s.app.Append(t, v) c.maxTime = t c.samples++ + if c.samples == samplesPerChunk/4 { + s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.maxt) + } + s.lastValue = v s.sampleBuf[0] = s.sampleBuf[1] @@ -812,6 +825,17 @@ func (s *memSeries) append(t int64, v float64) bool { return true } +// computeChunkEndTime estimates the end timestamp based the beginning of a chunk, +// its current timestamp and the upper bound up to which we insert data. +// It assumes that the time range is 1/4 full. +func computeChunkEndTime(start, cur, max int64) int64 { + a := (max - start) / ((cur - start + 1) * 4) + if a == 0 { + return max + } + return start + (max-start)/a +} + func (s *memSeries) iterator(i int) chunks.Iterator { c := s.chunks[i] diff --git a/head_test.go b/head_test.go index 0463d8430..421fe3992 100644 --- a/head_test.go +++ b/head_test.go @@ -731,3 +731,45 @@ Outer: return ds } + +func TestComputeChunkEndTime(t *testing.T) { + cases := []struct { + start, cur, max int64 + res int64 + }{ + { + start: 0, + cur: 250, + max: 1000, + res: 1000, + }, + { + start: 100, + cur: 200, + max: 1000, + res: 550, + }, + // Case where we fit floored 0 chunks. Must catch division by 0 + // and default to maximum time. + { + start: 0, + cur: 500, + max: 1000, + res: 1000, + }, + // Catch divison by zero for cur == start. Strictly not a possible case. + { + start: 100, + cur: 100, + max: 1000, + res: 104, + }, + } + + for _, c := range cases { + got := computeChunkEndTime(c.start, c.cur, c.max) + if got != c.res { + t.Errorf("expected %d for (start: %d, cur: %d, max: %d), got %d", c.res, c.start, c.cur, c.max, got) + } + } +}