diff --git a/block.go b/block.go index 41dc640e41..7ef490c7ae 100644 --- a/block.go +++ b/block.go @@ -35,18 +35,23 @@ type Block interface { // BlockMeta provides meta information about a block. type BlockMeta struct { + // Sequence number of the block. + Sequence int `json:"sequence"` + // MinTime and MaxTime specify the time range all samples // in the block must be in. If unset, samples can be appended // freely until they are set. MinTime *int64 `json:"minTime,omitempty"` MaxTime *int64 `json:"maxTime,omitempty"` + // Stats about the contents of the block. Stats struct { NumSamples uint64 `json:"numSamples,omitempty"` NumSeries uint64 `json:"numSeries,omitempty"` NumChunks uint64 `json:"numChunks,omitempty"` } `json:"stats,omitempty"` + // Information on compactions the block was created from. Compaction struct { Generation int `json:"generation"` } `json:"compaction"` diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 62b1139fc9..6b98d994c0 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -120,7 +120,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { measureTime("ingestScrapes", func() { b.startProfiling() - if err := b.ingestScrapes(metrics, 2000); err != nil { + if err := b.ingestScrapes(metrics, 10000); err != nil { exitWithError(err) } }) diff --git a/compact.go b/compact.go index 20c46aaa80..b4aad2c7d1 100644 --- a/compact.go +++ b/compact.go @@ -110,10 +110,13 @@ func (c *compactor) match(bs []compactionInfo) bool { } func mergeBlockMetas(blocks ...Block) (res BlockMeta) { - res.MinTime = blocks[0].Meta().MinTime + m0 := blocks[0].Meta() + + res.Sequence = m0.Sequence + res.MinTime = m0.MinTime res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime - g := blocks[0].Meta().Compaction.Generation + g := m0.Compaction.Generation if g == 0 && len(blocks) > 1 { g++ } @@ -134,7 +137,6 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { c.metrics.duration.Observe(time.Since(start).Seconds()) }() - // Write to temporary directory to make persistence appear atomic. if fileutil.Exist(dir) { if err = os.RemoveAll(dir); err != nil { return err diff --git a/db.go b/db.go index 0e869f0eb3..19b7873e19 100644 --- a/db.go +++ b/db.go @@ -29,12 +29,20 @@ import ( var DefaultOptions = &Options{ WALFlushInterval: 5 * time.Second, MaxBlockRange: 24 * 60 * 60 * 1000, // 1 day in milliseconds + GracePeriod: 30 * 60 * 1000, // 30 minutes in milliseconds } // Options of the DB storage. type Options struct { + // The interval at which the write ahead log is flushed to disc. WALFlushInterval time.Duration - MaxBlockRange uint64 + + // The maximum timestamp range of compacted blocks. + MaxBlockRange uint64 + + // Time window between the highest timestamp and the minimum timestamp + // that can still be appended. + GracePeriod uint64 } // Appender allows appending a batch of data. It must be completed with a @@ -193,6 +201,11 @@ func (db *DB) run() { if !ok { continue } + db.logger.Log("msg", "picked", "i", i, "j", j) + for k := i; k <= j; k++ { + db.logger.Log("k", k, "generation", infos[k].generation) + } + if err := db.compact(i, j); err != nil { db.logger.Log("msg", "compaction failed", "err", err) continue @@ -413,11 +426,17 @@ func (db *DB) compactable() []Block { db.mtx.RLock() defer db.mtx.RUnlock() + // h := db.heads[len(db.heads)-1] + // mint := h.maxt - int64(db.opts.GracePeriod) + var blocks []Block for _, pb := range db.persisted { blocks = append(blocks, pb) } for _, hb := range db.heads[:len(db.heads)-1] { + // if hb.maxt < mint { + // break + // } blocks = append(blocks, hb) } @@ -483,11 +502,11 @@ func (db *DB) cut() (*headBlock, error) { cur.metamtx.Unlock() } - dir, err := nextBlockDir(db.dir) + dir, seq, err := nextBlockDir(db.dir) if err != nil { return nil, err } - newHead, err := createHeadBlock(dir, db.logger, mint) + newHead, err := createHeadBlock(dir, seq, db.logger, mint) if err != nil { return nil, err } @@ -525,10 +544,10 @@ func blockDirs(dir string) ([]string, error) { return dirs, nil } -func nextBlockDir(dir string) (string, error) { +func nextBlockDir(dir string) (string, int, error) { names, err := fileutil.ReadDir(dir) if err != nil { - return "", err + return "", 0, err } i := uint64(0) @@ -542,7 +561,7 @@ func nextBlockDir(dir string) (string, error) { } i = j } - return filepath.Join(dir, fmt.Sprintf("b-%0.6d", i+1)), nil + return filepath.Join(dir, fmt.Sprintf("b-%0.6d", i+1)), int(i + 1), nil } // PartitionedDB is a time series storage. diff --git a/head.go b/head.go index c973e78ea5..9b25d93a27 100644 --- a/head.go +++ b/head.go @@ -57,12 +57,15 @@ type headBlock struct { mint, maxt int64 // timestamp range of current samples } -func createHeadBlock(dir string, l log.Logger, minTime *int64) (*headBlock, error) { +func createHeadBlock(dir string, seq int, l log.Logger, minTime *int64) (*headBlock, error) { if err := os.MkdirAll(dir, 0755); err != nil { return nil, err } - if err := writeMetaFile(dir, &BlockMeta{MinTime: minTime}); err != nil { + if err := writeMetaFile(dir, &BlockMeta{ + Sequence: seq, + MinTime: minTime, + }); err != nil { return nil, err } return openHeadBlock(dir, l)