From e7f04d14d51252a57e2ea2e87b56576a3c6b5d2a Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 3 Jan 2017 10:09:20 +0100 Subject: [PATCH] Lock mmapped files --- block.go | 34 ++++++++++++++++++---------------- compact.go | 49 ++++++++++++++++++++++++++++++++----------------- head.go | 12 +++++------- 3 files changed, 55 insertions(+), 40 deletions(-) diff --git a/block.go b/block.go index aaa2b234c..bbb2facee 100644 --- a/block.go +++ b/block.go @@ -8,14 +8,18 @@ import ( "sort" "strconv" + "github.com/coreos/etcd/pkg/fileutil" "github.com/pkg/errors" ) -// Block handles reads against a block of time series data within a time window. +// Block handles reads against a block of time series data. type block interface { + dir() string + // stats() BlockStats interval() (int64, int64) index() IndexReader series() SeriesReader + // persisted() bool } type BlockStats struct { @@ -32,23 +36,24 @@ const ( ) type persistedBlock struct { + d string + stats BlockStats + chunksf, indexf *mmapFile chunkr *seriesReader indexr *indexReader - - stats BlockStats } -func newPersistedBlock(path string) (*persistedBlock, error) { +func newPersistedBlock(p string) (*persistedBlock, error) { // TODO(fabxc): validate match of name and stats time, validate magic. // mmap files belonging to the block. - chunksf, err := openMmapFile(chunksFileName(path)) + chunksf, err := openMmapFile(chunksFileName(p)) if err != nil { return nil, err } - indexf, err := openMmapFile(indexFileName(path)) + indexf, err := openMmapFile(indexFileName(p)) if err != nil { return nil, err } @@ -68,6 +73,7 @@ func newPersistedBlock(path string) (*persistedBlock, error) { } pb := &persistedBlock{ + d: p, chunksf: chunksf, indexf: indexf, chunkr: sr, @@ -87,13 +93,9 @@ func (pb *persistedBlock) Close() error { return err1 } -func (pb *persistedBlock) index() IndexReader { - return pb.indexr -} - -func (pb *persistedBlock) series() SeriesReader { - return pb.chunkr -} +func (pb *persistedBlock) dir() string { return pb.d } +func (pb *persistedBlock) index() IndexReader { return pb.indexr } +func (pb *persistedBlock) series() SeriesReader { return pb.chunkr } func (pb *persistedBlock) interval() (int64, int64) { return pb.stats.MinTime, pb.stats.MaxTime @@ -156,12 +158,12 @@ func indexFileName(path string) string { } type mmapFile struct { - f *os.File + f *fileutil.LockedFile b []byte } func openMmapFile(path string) (*mmapFile, error) { - f, err := os.Open(path) + f, err := fileutil.TryLockFile(path, os.O_RDONLY, 0666) if err != nil { return nil, err } @@ -170,7 +172,7 @@ func openMmapFile(path string) (*mmapFile, error) { return nil, err } - b, err := mmap(f, int(info.Size())) + b, err := mmap(f.File, int(info.Size())) if err != nil { return nil, err } diff --git a/compact.go b/compact.go index 36a3b9ad0..819778ea1 100644 --- a/compact.go +++ b/compact.go @@ -1,25 +1,28 @@ package tsdb import ( - "fmt" "os" "path/filepath" - "time" "github.com/coreos/etcd/pkg/fileutil" "github.com/fabxc/tsdb/labels" "github.com/go-kit/kit/log" - "github.com/prometheus/prometheus/pkg/timestamp" ) type compactor struct { shard *Shard + blocks compactableBlocks logger log.Logger triggerc chan struct{} donec chan struct{} } +type compactableBlocks interface { + compactable() []block + set([]block) +} + func newCompactor(s *Shard, l log.Logger) (*compactor, error) { c := &compactor{ triggerc: make(chan struct{}, 1), @@ -41,25 +44,37 @@ func (c *compactor) trigger() { func (c *compactor) run() { for range c.triggerc { - if len(c.shard.persisted) < 2 { - continue - } - var ( - dir = fmt.Sprintf("compacted-%d", timestamp.FromTime(time.Now())) - a = c.shard.persisted[0] - b = c.shard.persisted[1] - ) + // continue + // bs := c.blocks.get() - if err := persist(dir, func(indexw IndexWriter, chunkw SeriesWriter) error { - return c.compact(indexw, chunkw, a, b) - }); err != nil { - c.logger.Log("msg", "compaction failed", "err", err) - continue - } + // if len(bs) < 2 { + // continue + // } + + // var ( + // dir = fmt.Sprintf("compacted-%d", timestamp.FromTime(time.Now())) + // a = bs[0] + // b = bs[1] + // ) + + // c.blocks.Lock() + + // if err := persist(dir, func(indexw IndexWriter, chunkw SeriesWriter) error { + // return c.compact(indexw, chunkw, a, b) + // }); err != nil { + // c.logger.Log("msg", "compaction failed", "err", err) + // continue + // } + + // c.blocks.Unlock() } close(c.donec) } +func (c *compactor) pick() []block { + return nil +} + func (c *compactor) Close() error { close(c.triggerc) <-c.donec diff --git a/head.go b/head.go index bd86df2f4..0b3ab0880 100644 --- a/head.go +++ b/head.go @@ -12,6 +12,7 @@ import ( // HeadBlock handles reads and writes of time series data within a time window. type HeadBlock struct { mtx sync.RWMutex + d string // descs holds all chunk descs for the head block. Each chunk implicitly // is assigned the index as its ID. @@ -36,6 +37,7 @@ func OpenHeadBlock(dir string, baseTime int64) (*HeadBlock, error) { } b := &HeadBlock{ + d: dir, descs: []*chunkDesc{}, hashes: map[uint64][]*chunkDesc{}, values: map[string]stringset{}, @@ -65,13 +67,9 @@ func (h *HeadBlock) Close() error { return h.wal.Close() } -func (h *HeadBlock) index() IndexReader { - return h -} - -func (h *HeadBlock) series() SeriesReader { - return h -} +func (h *HeadBlock) dir() string { return h.d } +func (h *HeadBlock) index() IndexReader { return h } +func (h *HeadBlock) series() SeriesReader { return h } // Chunk returns the chunk for the reference number. func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) {