From 5a1c8eaa0e5570a2205b0f58a7463bbbf35a7153 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 2 Feb 2017 11:09:19 +0100 Subject: [PATCH] Fix missing appends after reference lookups --- cmd/tsdb/main.go | 11 +++++------ db.go | 25 +++++++------------------ head.go | 8 ++++---- 3 files changed, 16 insertions(+), 28 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index f10a5ec20..1cb9efaf2 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -122,7 +122,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { dur := measureTime("ingestScrapes", func() { b.startProfiling() - total, err = b.ingestScrapes(metrics, 3000) + total, err = b.ingestScrapes(metrics, 4000) if err != nil { exitWithError(err) } @@ -140,11 +140,11 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { } func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (uint64, error) { - var wg sync.WaitGroup var mu sync.Mutex var total uint64 - for i := 0; i < scrapeCount; i += 50 { + for i := 0; i < scrapeCount; i += 100 { + var wg sync.WaitGroup lbls := lbls for len(lbls) > 0 { l := 1000 @@ -156,7 +156,7 @@ func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (u wg.Add(1) go func() { - n, err := b.ingestScrapesShard(batch, 50, int64(30000*i)) + n, err := b.ingestScrapesShard(batch, 100, int64(30000*i)) if err != nil { // exitWithError(err) fmt.Println(" err", err) @@ -204,10 +204,9 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount if err != nil { panic(err) } - // fmt.Println("Add:", s.labels, ref) s.ref = &ref } else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil { - // fmt.Println("AddFast:", *s.ref) + if err.Error() != "not found" { panic(err) } diff --git a/db.go b/db.go index 9de25e2f7..93c9e1420 100644 --- a/db.go +++ b/db.go @@ -266,7 +266,7 @@ func (db *DB) compact(i, j int) error { for _, b := range blocks { if err := b.Close(); err != nil { - return errors.Wrap(err, "close old block") + return errors.Wrapf(err, "close old block %s", b.Dir()) } } @@ -278,11 +278,9 @@ func (db *DB) compact(i, j int) error { db.removeBlocks(i, j) db.persisted = append(db.persisted, pb) - for i, b := range blocks { - if i > 0 { - if err := os.RemoveAll(b.Dir()); err != nil { - return errors.Wrap(err, "removing old block") - } + for _, b := range blocks[1:] { + if err := os.RemoveAll(b.Dir()); err != nil { + return errors.Wrap(err, "removing old block") } } return nil @@ -355,11 +353,7 @@ func (db *DB) Appender() Appender { } type dbAppender struct { - db *DB - // gen uint8 - // head *headAppender - maxGen uint8 - + db *DB heads []*headAppender } @@ -410,15 +404,12 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { // If there's no fitting head block for t, ensure it gets created. if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { a.db.mtx.RUnlock() - var mints []int64 - for _, h := range a.heads { - mints = append(mints, h.meta.MinTime) - } - fmt.Println("ensure head", t, mints) + if err := a.db.ensureHead(t); err != nil { a.db.mtx.RLock() return nil, err } + a.db.mtx.RLock() if len(a.heads) == 0 { @@ -451,7 +442,6 @@ func (db *DB) ensureHead(t int64) error { // AppendableBlocks-1 front padding heads. if len(db.heads) == 0 { for i := int64(db.opts.AppendableBlocks - 1); i >= 0; i-- { - fmt.Println("cut init for", t-i*int64(db.opts.MinBlockDuration)) if _, err := db.cut(t - i*int64(db.opts.MinBlockDuration)); err != nil { return err } @@ -464,7 +454,6 @@ func (db *DB) ensureHead(t int64) error { if t < h.meta.MaxTime { return nil } - fmt.Println("cut for", h.meta.MaxTime) if _, err := db.cut(h.meta.MaxTime); err != nil { return err } diff --git a/head.go b/head.go index 6af2c9bb7..3e0d0be47 100644 --- a/head.go +++ b/head.go @@ -193,10 +193,10 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro func (a *headAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v float64) (uint64, error) { if ms := a.get(hash, lset); ms != nil { - return uint64(ms.ref), nil + return uint64(ms.ref), a.AddFast(uint64(ms.ref), t, v) } if ref, ok := a.newHashes[hash]; ok { - return uint64(ref), nil + return uint64(ref), a.AddFast(uint64(ref), t, v) } // We only know the actual reference after committing. We generate an @@ -220,7 +220,6 @@ func (a *headAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v flo } func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { - // fmt.Println("add fast ref", ref) // We only own the last 5 bytes of the reference. Anything before is // used by higher-order appenders. We erase it to avoid issues. ref = (ref << 24) >> 24 @@ -325,6 +324,7 @@ func (a *headAppender) Commit() error { if !a.series[s.ref].append(s.t, s.v) { total-- } + if s.t < mint { mint = s.t } @@ -568,7 +568,7 @@ func (s *memSeries) cut() *memChunk { func (s *memSeries) append(t int64, v float64) bool { var c *memChunk - if s.app == nil || s.head().samples > 10050 { + if s.app == nil || s.head().samples > 2000 { c = s.cut() c.minTime = t } else {