diff --git a/block.go b/block.go index 7ef490c7a..ca7c48f46 100644 --- a/block.go +++ b/block.go @@ -39,10 +39,9 @@ type BlockMeta struct { Sequence int `json:"sequence"` // MinTime and MaxTime specify the time range all samples - // in the block must be in. If unset, samples can be appended - // freely until they are set. - MinTime *int64 `json:"minTime,omitempty"` - MaxTime *int64 `json:"maxTime,omitempty"` + // in the block are in. + MinTime int64 `json:"minTime"` + MaxTime int64 `json:"maxTime"` // Stats about the contents of the block. Stats struct { diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 6b98d994c..f10a5ec20 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -118,12 +118,19 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { } }() - measureTime("ingestScrapes", func() { + var total uint64 + + dur := measureTime("ingestScrapes", func() { b.startProfiling() - if err := b.ingestScrapes(metrics, 10000); err != nil { + total, err = b.ingestScrapes(metrics, 3000) + if err != nil { exitWithError(err) } }) + + fmt.Println(" > total samples:", total) + fmt.Println(" > samples/sec:", float64(total)/dur.Seconds()) + measureTime("stopStorage", func() { if err := b.storage.Close(); err != nil { exitWithError(err) @@ -132,7 +139,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { }) } -func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) error { +func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (uint64, error) { var wg sync.WaitGroup var mu sync.Mutex var total uint64 @@ -163,8 +170,7 @@ func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) er wg.Wait() } - fmt.Println("> total samples:", total) - return nil + return total, nil } func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount int, baset int64) (uint64, error) { @@ -194,26 +200,23 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount s.value += 1000 if s.ref == nil { - ref, err := app.SetSeries(s.labels) + ref, err := app.Add(s.labels, ts, float64(s.value)) if err != nil { panic(err) } + // fmt.Println("Add:", s.labels, ref) s.ref = &ref - } - - if err := app.Add(*s.ref, ts, float64(s.value)); err != nil { + } else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil { + // fmt.Println("AddFast:", *s.ref) if err.Error() != "not found" { panic(err) } - ref, err := app.SetSeries(s.labels) + ref, err := app.Add(s.labels, ts, float64(s.value)) if err != nil { panic(err) } s.ref = &ref - if err := app.Add(*s.ref, ts, float64(s.value)); err != nil { - panic(err) - } } total++ @@ -285,11 +288,12 @@ func reportSize(dir string) { } } -func measureTime(stage string, f func()) { +func measureTime(stage string, f func()) time.Duration { fmt.Printf(">> start stage=%s\n", stage) start := time.Now() f() fmt.Printf(">> completed stage=%s duration=%s\n", stage, time.Since(start)) + return time.Since(start) } func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { diff --git a/db.go b/db.go index f65b673f2..2f577d020 100644 --- a/db.go +++ b/db.go @@ -28,8 +28,9 @@ import ( // millisecond precision timestampdb. var DefaultOptions = &Options{ WALFlushInterval: 5 * time.Second, - MaxBlockRange: 24 * 60 * 60 * 1000, // 1 day in milliseconds - GracePeriod: 30 * 60 * 1000, // 30 minutes in milliseconds + MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds + MaxBlockDuration: 48 * 60 * 60 * 1000, // 1 day in milliseconds + GracePeriod: 2 * 60 * 60 * 1000, // 2 hours in milliseconds } // Options of the DB storage. @@ -37,8 +38,12 @@ type Options struct { // The interval at which the write ahead log is flushed to disc. WALFlushInterval time.Duration + // The timestamp range of head blocks after which they get persisted. + // It's the minimum duration of any persisted block. + MinBlockDuration uint64 + // The maximum timestamp range of compacted blocks. - MaxBlockRange uint64 + MaxBlockDuration uint64 // Time window between the highest timestamp and the minimum timestamp // that can still be appended. @@ -48,15 +53,17 @@ type Options struct { // Appender allows appending a batch of data. It must be completed with a // call to Commit or Rollback and must not be reused afterwards. type Appender interface { - // SetSeries ensures that a series with the given label set exists and - // returns a unique reference number identifying it. Returned reference - // numbers are ephemeral and may be rejected in calls to Add() at any point. - // A new reference number can then be requested with another call to - // SetSeries. - SetSeries(labels.Labels) (uint64, error) + // Add adds a sample pair for the given series. A reference number is + // returned which can be used to add further samples in the same or later + // transactions. + // Returned reference numbers are ephemeral and may be rejected in calls + // to AddFast() at any point. Adding the sample via Add() returns a new + // reference number. + Add(l labels.Labels, t int64, v float64) (uint64, error) - // Add adds a sample pair for the referenced serie. - Add(ref uint64, t int64, v float64) error + // Add adds a sample pair for the referenced series. It is generally faster + // than adding a sample by providing its full label set. + AddFast(ref uint64, t int64, v float64) error // Commit submits the collected samples and purges the batch. Commit() error @@ -121,8 +128,8 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) { return nil, err } } - // var r prometheus.Registerer - r := prometheus.DefaultRegisterer + var r prometheus.Registerer + // r := prometheus.DefaultRegisterer if opts == nil { opts = DefaultOptions @@ -139,7 +146,7 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) { stopc: make(chan struct{}), } db.compactor = newCompactor(r, &compactorOptions{ - maxBlockRange: opts.MaxBlockRange, + maxBlockRange: opts.MaxBlockDuration, }) if err := db.initBlocks(); err != nil { @@ -154,31 +161,31 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) { func (db *DB) run() { defer close(db.donec) - go func() { - for { - select { - case <-db.cutc: - db.mtx.Lock() - _, err := db.cut() - db.mtx.Unlock() + // go func() { + // for { + // select { + // case <-db.cutc: + // db.mtx.Lock() + // _, err := db.cut() + // db.mtx.Unlock() - if err != nil { - db.logger.Log("msg", "cut failed", "err", err) - } else { - select { - case db.compactc <- struct{}{}: - default: - } - } - // Drain cut channel so we don't trigger immediately again. - select { - case <-db.cutc: - default: - } - case <-db.stopc: - } - } - }() + // if err != nil { + // db.logger.Log("msg", "cut failed", "err", err) + // } else { + // select { + // case db.compactc <- struct{}{}: + // default: + // } + // } + // // Drain cut channel so we don't trigger immediately again. + // select { + // case <-db.cutc: + // default: + // } + // case <-db.stopc: + // } + // } + // }() for { select { @@ -191,8 +198,8 @@ func (db *DB) run() { infos = append(infos, compactionInfo{ generation: m.Compaction.Generation, - mint: *m.MinTime, - maxt: *m.MaxTime, + mint: m.MinTime, + maxt: m.MaxTime, }) } @@ -317,6 +324,8 @@ func (db *DB) initBlocks() error { if err != nil { return err } + h.generation = db.headGen + db.headGen++ heads = append(heads, h) continue } @@ -330,10 +339,10 @@ func (db *DB) initBlocks() error { db.persisted = persisted db.heads = heads - if len(heads) == 0 { - _, err = db.cut() - } - return err + // if len(heads) == 0 { + // _, err = db.cut() + // } + return nil } // Close the partition. @@ -359,83 +368,185 @@ func (db *DB) Close() error { // Appender returns a new Appender on the database. func (db *DB) Appender() Appender { db.mtx.RLock() + a := &dbAppender{db: db} - return &dbAppender{ - db: db, - head: db.heads[len(db.heads)-1].Appender().(*headAppender), - gen: db.headGen, + for _, b := range db.appendable() { + a.heads = append(a.heads, b.Appender().(*headAppender)) } + return a } type dbAppender struct { - db *DB - gen uint8 - head *headAppender + db *DB + // gen uint8 + // head *headAppender + maxGen uint8 + + heads []*headAppender } -func (a *dbAppender) SetSeries(lset labels.Labels) (uint64, error) { - ref, err := a.head.SetSeries(lset) +func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { + h, err := a.appenderFor(t) + if err != nil { + fmt.Println("no appender") + return 0, err + } + ref, err := h.Add(lset, t, v) if err != nil { return 0, err } - return ref | (uint64(a.gen) << 40), nil + return ref | (uint64(h.generation) << 40), nil } -func (a *dbAppender) setSeries(hash uint64, lset labels.Labels) (uint64, error) { - ref, err := a.head.setSeries(hash, lset) +func (a *dbAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v float64) (uint64, error) { + h, err := a.appenderFor(t) + if err != nil { + fmt.Println("no appender") + return 0, err + } + ref, err := h.hashedAdd(hash, lset, t, v) if err != nil { return 0, err } - return ref | (uint64(a.gen) << 40), nil + return ref | (uint64(h.generation) << 40), nil } -func (a *dbAppender) Add(ref uint64, t int64, v float64) error { +func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { // We store the head generation in the 4th byte and use it to reject // stale references. gen := uint8((ref << 16) >> 56) - if gen != a.gen { + h, err := a.appenderFor(t) + if err != nil { + return err + } + // fmt.Println("check gen", h.generation, gen) + if h.generation != gen { return ErrNotFound } - return a.head.Add(ref, t, v) + return h.AddFast(ref, t, v) +} + +// appenderFor gets the appender for the head containing timestamp t. +// If the head block doesn't exist yet, it gets created. +func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { + if len(a.heads) == 0 { + if err := a.addNextHead(t); err != nil { + return nil, err + } + return a.appenderFor(t) + } + for i := len(a.heads) - 1; i >= 0; i-- { + h := a.heads[i] + + if t > h.meta.MaxTime { + if err := a.addNextHead(t); err != nil { + return nil, err + } + return a.appenderFor(t) + } + if t >= h.meta.MinTime { + return h, nil + } + } + + return nil, ErrNotFound +} + +func (a *dbAppender) addNextHead(t int64) error { + a.db.mtx.RUnlock() + a.db.mtx.Lock() + + // We switched locks, validate that adding a head for the timestamp + // is still required. + if len(a.db.heads) > 1 { + h := a.db.heads[len(a.db.heads)-1] + if t <= h.meta.MaxTime { + a.heads = append(a.heads, h.Appender().(*headAppender)) + a.maxGen++ + a.db.mtx.Unlock() + a.db.mtx.RLock() + return nil + } + } + + h, err := a.db.cut(t) + if err == nil { + a.heads = append(a.heads, h.Appender().(*headAppender)) + a.maxGen++ + } + + a.db.mtx.Unlock() + a.db.mtx.RLock() + + return err } func (a *dbAppender) Commit() error { - defer a.db.mtx.RUnlock() + var merr MultiError - err := a.head.Commit() - - if a.head.headBlock.fullness() > 1.0 { - select { - case a.db.cutc <- struct{}{}: - default: - } + for _, h := range a.heads { + merr.Add(h.Commit()) } - return err + a.db.mtx.RUnlock() + + return merr.Err() } func (a *dbAppender) Rollback() error { - err := a.head.Rollback() + var merr MultiError + + for _, h := range a.heads { + merr.Add(h.Rollback()) + } a.db.mtx.RUnlock() - return err + + return merr.Err() +} + +func (db *DB) appendable() []*headBlock { + if len(db.heads) == 0 { + return nil + } + var blocks []*headBlock + maxHead := db.heads[len(db.heads)-1] + + k := len(db.heads) - 2 + for i := k; i >= 0; i-- { + if db.heads[i].meta.MaxTime < maxHead.meta.MinTime-int64(db.opts.GracePeriod) { + break + } + k-- + } + for i := k + 1; i < len(db.heads); i++ { + blocks = append(blocks, db.heads[i]) + } + return blocks } func (db *DB) compactable() []Block { db.mtx.RLock() defer db.mtx.RUnlock() - // h := db.heads[len(db.heads)-1] - // mint := h.maxt - int64(db.opts.GracePeriod) - var blocks []Block for _, pb := range db.persisted { blocks = append(blocks, pb) } - for _, hb := range db.heads[:len(db.heads)-1] { - // if hb.maxt < mint { - // break - // } + + maxHead := db.heads[len(db.heads)-1] + + k := len(db.heads) - 2 + for i := k; i >= 0; i-- { + if db.heads[i].meta.MaxTime < maxHead.meta.MinTime-int64(db.opts.GracePeriod) { + break + } + k-- + } + for i, hb := range db.heads[:len(db.heads)-1] { + if i > k { + break + } blocks = append(blocks, hb) } @@ -463,12 +574,13 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { for _, b := range db.persisted { m := b.Meta() - if intervalOverlap(mint, maxt, *m.MinTime, *m.MaxTime) { + if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { bs = append(bs, b) } } for _, b := range db.heads { - if intervalOverlap(mint, maxt, b.mint, b.maxt) { + m := b.Meta() + if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { bs = append(bs, b) } } @@ -478,40 +590,31 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { // cut starts a new head block to append to. The completed head block // will still be appendable for the configured grace period. -func (db *DB) cut() (*headBlock, error) { - var mint *int64 - - // If a previous block exists, fix its max time and and take the - // timestamp after as the minimum for the new head. - if len(db.heads) > 0 { - cur := db.heads[len(db.heads)-1] - - cur.metamtx.Lock() - - if cur.meta.MinTime == nil { - mt := cur.mint - cur.meta.MinTime = &mt - } - cur.meta.MaxTime = new(int64) - - mt := cur.maxt + 1 - cur.meta.MaxTime = &mt - mint = &mt - - cur.metamtx.Unlock() - } +func (db *DB) cut(mint int64) (*headBlock, error) { + maxt := mint + int64(db.opts.MinBlockDuration) - 1 + fmt.Println("cut", mint, maxt) dir, seq, err := nextBlockDir(db.dir) if err != nil { return nil, err } - newHead, err := createHeadBlock(dir, seq, db.logger, mint) + newHead, err := createHeadBlock(dir, seq, db.logger, mint, maxt) if err != nil { return nil, err } + db.heads = append(db.heads, newHead) db.headGen++ + newHead.generation = db.headGen + + fmt.Println("headlen", len(db.heads)) + + select { + case db.compactc <- struct{}{}: + default: + } + return newHead, nil } @@ -646,20 +749,20 @@ type partitionedAppender struct { partitions []*dbAppender } -func (a *partitionedAppender) SetSeries(lset labels.Labels) (uint64, error) { +func (a *partitionedAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { h := lset.Hash() p := h >> (64 - a.db.partitionPow) - ref, err := a.partitions[p].setSeries(h, lset) + ref, err := a.partitions[p].hashedAdd(h, lset, t, v) if err != nil { return 0, err } return ref | (p << 48), nil } -func (a *partitionedAppender) Add(ref uint64, t int64, v float64) error { +func (a *partitionedAppender) AddFast(ref uint64, t int64, v float64) error { p := uint8((ref << 8) >> 56) - return a.partitions[p].Add(ref, t, v) + return a.partitions[p].AddFast(ref, t, v) } func (a *partitionedAppender) Commit() error { diff --git a/head.go b/head.go index 9b25d93a2..38626814e 100644 --- a/head.go +++ b/head.go @@ -35,9 +35,10 @@ var ( // headBlock handles reads and writes of time series data within a time window. type headBlock struct { - mtx sync.RWMutex - dir string - wal *WAL + mtx sync.RWMutex + dir string + generation uint8 + wal *WAL // descs holds all chunk descs for the head block. Each chunk implicitly // is assigned the index as its ID. @@ -52,19 +53,19 @@ type headBlock struct { values map[string]stringset // label names to possible values postings *memPostings // postings lists for terms - metamtx sync.RWMutex - meta BlockMeta - mint, maxt int64 // timestamp range of current samples + metamtx sync.RWMutex + meta BlockMeta } -func createHeadBlock(dir string, seq int, l log.Logger, minTime *int64) (*headBlock, error) { +func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) { if err := os.MkdirAll(dir, 0755); err != nil { return nil, err } if err := writeMetaFile(dir, &BlockMeta{ Sequence: seq, - MinTime: minTime, + MinTime: mint, + MaxTime: maxt, }); err != nil { return nil, err } @@ -73,10 +74,6 @@ func createHeadBlock(dir string, seq int, l log.Logger, minTime *int64) (*headBl // openHeadBlock creates a new empty head block. func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { - if err := os.MkdirAll(dir, 0755); err != nil { - return nil, err - } - wal, err := OpenWAL(dir, log.NewContext(l).With("component", "wal"), 5*time.Second) if err != nil { return nil, err @@ -95,8 +92,6 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { postings: &memPostings{m: make(map[term][]uint32)}, mapper: newPositionMapper(nil), meta: *meta, - mint: math.MaxInt64, - maxt: math.MinInt64, } // Replay contents of the write ahead log. @@ -113,12 +108,6 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { return ErrOutOfBounds } - if s.t < h.mint { - h.mint = s.t - } - if s.t > h.maxt { - h.maxt = s.t - } h.meta.Stats.NumSamples++ return nil }, @@ -134,7 +123,7 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { // inBounds returns true if the given timestamp is within the valid // time bounds of the block. func (h *headBlock) inBounds(t int64) bool { - return h.meta.MinTime == nil || t >= *h.meta.MinTime + return t >= h.meta.MinTime && t <= h.meta.MaxTime } // Close syncs all data and closes underlying resources of the head block. @@ -195,15 +184,17 @@ type refdSample struct { v float64 } -func (a *headAppender) SetSeries(lset labels.Labels) (uint64, error) { - return a.setSeries(lset.Hash(), lset) +func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { + return a.hashedAdd(lset.Hash(), lset, t, v) } -func (a *headAppender) setSeries(hash uint64, lset labels.Labels) (uint64, error) { +func (a *headAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v float64) (uint64, error) { if ms := a.get(hash, lset); ms != nil { + // fmt.Println("add ref get", ms.ref) return uint64(ms.ref), nil } if ref, ok := a.newHashes[hash]; ok { + // fmt.Println("add ref newHashes", ref) return uint64(ref), nil } @@ -224,10 +215,13 @@ func (a *headAppender) setSeries(hash uint64, lset labels.Labels) (uint64, error a.newSeries[ref] = hashedLabels{hash: hash, labels: lset} a.newHashes[hash] = ref - return ref, nil + // fmt.Println("add ref", ref) + + return ref, a.AddFast(ref, t, v) } -func (a *headAppender) Add(ref uint64, t int64, v float64) error { +func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { + // fmt.Println("add fast ref", ref) // We only own the last 5 bytes of the reference. Anything before is // used by higher-order appenders. We erase it to avoid issues. ref = (ref << 24) >> 24 @@ -251,12 +245,8 @@ func (a *headAppender) Add(ref uint64, t int64, v float64) error { // Only problem is release of locks in case of a rollback. c := ms.head() - // TODO(fabxc): this is a race. The meta must be locked. - // Just drop out-of-bounds sample for now – support for multiple - // appendable heads needed. if !a.inBounds(t) { - // return ErrOutOfBounds - return nil + return ErrOutOfBounds } if t < c.maxTime { return ErrOutOfOrderSample @@ -352,13 +342,6 @@ func (a *headAppender) Commit() error { a.meta.Stats.NumSamples += total a.meta.Stats.NumSeries += uint64(len(a.newSeries)) - if mint < a.mint { - a.mint = mint - } - if maxt > a.maxt { - a.maxt = maxt - } - return nil }