diff --git a/block.go b/block.go index 48855308f..7c5ec2cca 100644 --- a/block.go +++ b/block.go @@ -1,14 +1,10 @@ package tsdb import ( - "fmt" - "io/ioutil" "os" "path/filepath" "sort" - "strconv" - "github.com/bradfitz/slice" "github.com/coreos/etcd/pkg/fileutil" "github.com/pkg/errors" ) @@ -104,54 +100,12 @@ func (pb *persistedBlock) interval() (int64, int64) { return pb.bstats.MinTime, pb.bstats.MaxTime } -// findBlocks finds time-ordered persisted blocks within a directory. -func findBlocks(path string) ([]*persistedBlock, []*HeadBlock, error) { - var ( - pbs []*persistedBlock - heads []*HeadBlock - ) - - files, err := ioutil.ReadDir(path) - if err != nil { - return nil, nil, err - } - - for _, fi := range files { - p := filepath.Join(path, fi.Name()) - - if _, err := os.Stat(chunksFileName(p)); os.IsNotExist(err) { - ts, err := strconv.Atoi(filepath.Base(p)) - if err != nil { - return nil, nil, errors.Errorf("invalid directory name") - } - head, err := OpenHeadBlock(p, int64(ts)) - if err != nil { - return nil, nil, err - } - heads = append(heads, head) - continue - } - - pb, err := newPersistedBlock(p) - if err != nil { - return nil, nil, fmt.Errorf("error initializing block %q: %s", p, err) - } - pbs = append(pbs, pb) - } - - // Order blocks by their base time so they represent a continous - // range of time. - slice.Sort(pbs, func(i, j int) bool { return pbs[i].bstats.MinTime < pbs[j].bstats.MinTime }) - - return pbs, heads, nil -} - func chunksFileName(path string) string { - return filepath.Join(path, "series") + return filepath.Join(path, "000-series") } func indexFileName(path string) string { - return filepath.Join(path, "index") + return filepath.Join(path, "000-index") } type mmapFile struct { diff --git a/db.go b/db.go index ab46d141b..9d02a13f1 100644 --- a/db.go +++ b/db.go @@ -4,12 +4,13 @@ package tsdb import ( "bytes" "fmt" + "io/ioutil" + "math" "os" "path/filepath" "reflect" "strconv" "sync" - "time" "unsafe" "golang.org/x/sync/errgroup" @@ -177,7 +178,7 @@ const sep = '\xff' // Partition handles reads and writes of time series falling into // a hashed partition of a series. type Partition struct { - path string + dir string logger log.Logger metrics *partitionMetrics @@ -185,9 +186,6 @@ type Partition struct { persisted []*persistedBlock heads []*HeadBlock compactor *compactor - - donec chan struct{} - cutc chan struct{} } type partitionMetrics struct { @@ -231,86 +229,109 @@ func newPartitionMetrics(r prometheus.Registerer, i int) *partitionMetrics { } // OpenPartition returns a new Partition. -func OpenPartition(path string, i int, logger log.Logger) (*Partition, error) { +func OpenPartition(dir string, i int, logger log.Logger) (p *Partition, err error) { // Create directory if partition is new. - if _, err := os.Stat(path); os.IsNotExist(err) { - if err := os.MkdirAll(path, 0777); err != nil { + if !fileutil.Exist(dir) { + if err := os.MkdirAll(dir, 0777); err != nil { return nil, err } } - // Initialize previously persisted blocks. - persisted, heads, err := findBlocks(path) - if err != nil { + p = &Partition{ + dir: dir, + logger: logger, + metrics: newPartitionMetrics(nil, i), + } + if err := p.initBlocks(); err != nil { + return nil, err + } + if p.compactor, err = newCompactor(i, p, logger); err != nil { return nil, err } - // TODO(fabxc): get time from client-defined `now` function. - baset := time.Unix(0, 0).UnixNano() / int64(time.Millisecond) - if len(persisted) > 0 { - baset = persisted[len(persisted)-1].bstats.MaxTime - } - if len(heads) == 0 { - head, err := OpenHeadBlock(filepath.Join(path, fmt.Sprintf("%d", baset)), baset) - if err != nil { - return nil, err - } - heads = []*HeadBlock{head} - } - - s := &Partition{ - path: path, - logger: logger, - metrics: newPartitionMetrics(nil, i), - heads: heads, - persisted: persisted, - cutc: make(chan struct{}, 1), - donec: make(chan struct{}), - } - if s.compactor, err = newCompactor(i, s, logger); err != nil { - return nil, err - } - go s.run() - - return s, nil + return p, nil } -func (s *Partition) run() { - for range s.cutc { - // if err := s.cut(); err != nil { - // s.logger.Log("msg", "cut error", "err", err) - // } - // select { - // case <-s.cutc: - // default: - // } - // start := time.Now() - - // if err := s.persist(); err != nil { - // s.logger.Log("msg", "persistence error", "err", err) - // } - - // s.metrics.persistenceDuration.Observe(time.Since(start).Seconds()) - // s.metrics.persistences.Inc() +func isBlockDir(fi os.FileInfo) bool { + if !fi.IsDir() { + return false } - close(s.donec) + if _, err := strconv.ParseUint(fi.Name(), 10, 32); err != nil { + return false + } + return true +} + +func (p *Partition) initBlocks() error { + var ( + pbs []*persistedBlock + heads []*HeadBlock + ) + + files, err := ioutil.ReadDir(p.dir) + if err != nil { + return err + } + + for _, fi := range files { + if !isBlockDir(fi) { + continue + } + dir := filepath.Join(p.dir, fi.Name()) + + if fileutil.Exist(filepath.Join(dir, walFileName)) { + h, err := OpenHeadBlock(dir) + if err != nil { + return err + } + heads = append(heads, h) + continue + } + + b, err := newPersistedBlock(dir) + if err != nil { + return err + } + pbs = append(pbs, b) + } + + // Validate that blocks are sequential in time. + lastTime := int64(math.MinInt64) + + for _, b := range pbs { + if b.stats().MinTime < lastTime { + return errors.Errorf("illegal order for block at %q", b.dir()) + } + lastTime = b.stats().MaxTime + } + for _, b := range heads { + if b.stats().MinTime < lastTime { + return errors.Errorf("illegal order for block at %q", b.dir()) + } + lastTime = b.stats().MaxTime + } + + p.persisted = pbs + p.heads = heads + + if len(heads) == 0 { + return p.cut() + } + return nil } // Close the partition. -func (s *Partition) Close() error { - close(s.cutc) - <-s.donec - +func (p *Partition) Close() error { var merr MultiError - merr.Add(s.compactor.Close()) + merr.Add(p.compactor.Close()) - s.mtx.Lock() - defer s.mtx.Unlock() + p.mtx.Lock() + defer p.mtx.Unlock() - for _, pb := range s.persisted { + for _, pb := range p.persisted { merr.Add(pb.Close()) } - for _, hb := range s.heads { + for _, hb := range p.heads { merr.Add(hb.Close()) } @@ -338,6 +359,8 @@ func (s *Partition) appendBatch(samples []hashedSample) error { if head.bstats.SampleCount/(uint64(head.bstats.ChunkCount)+1) > 250 { if err := s.cut(); err != nil { s.logger.Log("msg", "cut failed", "err", err) + } else { + s.compactor.trigger() } } @@ -444,7 +467,7 @@ func intervalContains(min, max, t int64) bool { return t >= min && t <= max } -// blocksForRange returns all blocks within the partition that may contain +// blocksForInterval returns all blocks within the partition that may contain // data for the given time range. func (s *Partition) blocksForInterval(mint, maxt int64) []block { var bs []block @@ -472,58 +495,33 @@ const headGracePeriod = 60 * 1000 // 60 seconds for millisecond scale // cut starts a new head block to append to. The completed head block // will still be appendable for the configured grace period. -func (s *Partition) cut() error { - // Set new head block. - head := s.heads[len(s.heads)-1] - - newHead, err := OpenHeadBlock(filepath.Join(s.path, fmt.Sprintf("%d", head.bstats.MaxTime)), head.bstats.MaxTime) +func (p *Partition) cut() error { + dir, err := p.nextBlockDir() if err != nil { return err } - s.heads = append(s.heads, newHead) - - s.compactor.trigger() + newHead, err := OpenHeadBlock(dir) + if err != nil { + return err + } + p.heads = append(p.heads, newHead) return nil } -// func (s *Partition) persist() error { -// s.mtx.Lock() - -// // Set new head block. -// head := s.head -// newHead, err := OpenHeadBlock(filepath.Join(s.path, fmt.Sprintf("%d", head.bstats.MaxTime)), head.bstats.MaxTime) -// if err != nil { -// s.mtx.Unlock() -// return err -// } -// s.head = newHead - -// s.mtx.Unlock() - -// // TODO(fabxc): add grace period where we can still append to old head partition -// // before actually persisting it. -// dir := filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MinTime)) - -// if err := persist(dir, head.persist); err != nil { -// return err -// } -// s.logger.Log("samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head") - -// // Reopen block as persisted block for querying. -// pb, err := newPersistedBlock(dir) -// if err != nil { -// return err -// } - -// s.mtx.Lock() -// s.persisted = append(s.persisted, pb) -// s.mtx.Unlock() - -// s.compactor.trigger() - -// return nil -// } +func (p *Partition) nextBlockDir() (string, error) { + names, err := fileutil.ReadDir(p.dir) + if err != nil { + return "", err + } + i := uint64(0) + if len(names) > 0 { + if i, err = strconv.ParseUint(names[len(names)-1], 10, 32); err != nil { + return "", err + } + } + return filepath.Join(p.dir, fmt.Sprintf("%0.6d", i+1)), nil +} // chunkDesc wraps a plain data chunk and provides cached meta data about it. type chunkDesc struct { diff --git a/head.go b/head.go index 55a508441..a01c6556d 100644 --- a/head.go +++ b/head.go @@ -35,7 +35,7 @@ type HeadBlock struct { } // OpenHeadBlock creates a new empty head block. -func OpenHeadBlock(dir string, baseTime int64) (*HeadBlock, error) { +func OpenHeadBlock(dir string) (*HeadBlock, error) { wal, err := OpenWAL(dir) if err != nil { return nil, err @@ -49,14 +49,26 @@ func OpenHeadBlock(dir string, baseTime int64) (*HeadBlock, error) { postings: &memPostings{m: make(map[term][]uint32)}, wal: wal, } - b.bstats.MinTime = baseTime err = wal.ReadAll(&walHandler{ series: func(lset labels.Labels) { b.create(lset.Hash(), lset) }, sample: func(s hashedSample) { - b.descs[s.ref].append(s.t, s.v) + cd := b.descs[s.ref] + + // Duplicated from appendBatch – TODO(fabxc): deduplicate? + if cd.lastTimestamp == s.t && cd.lastValue != s.v { + return + } + cd.append(s.t, s.v) + + if s.t > b.bstats.MaxTime { + b.bstats.MaxTime = s.t + } + if s.t < b.bstats.MinTime { + b.bstats.MinTime = s.t + } b.bstats.SampleCount++ }, }) @@ -172,14 +184,13 @@ func (h *HeadBlock) get(hash uint64, lset labels.Labels) *chunkDesc { } func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc { - var err error - cd := &chunkDesc{ lset: lset, chunk: chunks.NewXORChunk(), lastTimestamp: math.MinInt64, } + var err error cd.app, err = cd.chunk.Appender() if err != nil { // Getting an Appender for a new chunk must not panic. @@ -289,6 +300,9 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { if s.t > h.bstats.MaxTime { h.bstats.MaxTime = s.t } + if s.t < h.bstats.MinTime { + h.bstats.MinTime = s.t + } h.bstats.SampleCount++ } diff --git a/wal.go b/wal.go index be54a467f..efd6ade9c 100644 --- a/wal.go +++ b/wal.go @@ -33,6 +33,8 @@ type WAL struct { symbols map[string]uint32 } +const walFileName = "000-wal" + // OpenWAL opens or creates a write ahead log in the given directory. // The WAL must be read completely before new data is written. func OpenWAL(dir string) (*WAL, error) { @@ -40,7 +42,7 @@ func OpenWAL(dir string) (*WAL, error) { return nil, err } - p := filepath.Join(dir, "wal") + p := filepath.Join(dir, walFileName) f, err := fileutil.TryLockFile(p, os.O_RDWR, 0666) if err != nil { diff --git a/wal_test.go b/wal_test.go index 1cc91e8f0..afd93f719 100644 --- a/wal_test.go +++ b/wal_test.go @@ -177,7 +177,7 @@ func BenchmarkWALReadIntoHead(b *testing.B) { b.ResetTimer() - _, err = OpenHeadBlock(d, 0) + _, err = OpenHeadBlock(d) require.NoError(b, err) // stat, _ := head.wal.f.Stat()