From 9c6a72aaddc1969cde9b9d192c3237b18f1bc866 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 22 Dec 2016 15:54:39 +0100 Subject: [PATCH] Load head with WALs correctly --- block.go | 28 ++++++++++++++++++++++++---- db.go | 15 +++++++++------ wal.go | 4 ++-- 3 files changed, 35 insertions(+), 12 deletions(-) diff --git a/block.go b/block.go index 615f7cdf6..f237ba095 100644 --- a/block.go +++ b/block.go @@ -6,6 +6,9 @@ import ( "os" "path/filepath" "sort" + "strconv" + + "github.com/pkg/errors" ) // Block handles reads against a block of time series data within a time window. @@ -104,20 +107,37 @@ func (p persistedBlocks) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p persistedBlocks) Less(i, j int) bool { return p[i].stats.MinTime < p[j].stats.MinTime } // findBlocks finds time-ordered persisted blocks within a directory. -func findPersistedBlocks(path string) ([]*persistedBlock, error) { +func findBlocks(path string) ([]*persistedBlock, *HeadBlock, error) { var pbs persistedBlocks files, err := ioutil.ReadDir(path) if err != nil { - return nil, err + return nil, nil, err } + var head *HeadBlock for _, fi := range files { p := filepath.Join(path, fi.Name()) + if _, err := os.Stat(chunksFileName(p)); os.IsNotExist(err) { + fmt.Println("found head dir", p) + if head != nil { + return nil, nil, errors.Errorf("found two head blocks") + } + ts, err := strconv.Atoi(filepath.Base(p)) + if err != nil { + return nil, nil, errors.Errorf("invalid directory name") + } + head, err = NewHeadBlock(p, int64(ts)) + if err != nil { + return nil, nil, err + } + continue + } + pb, err := newPersistedBlock(p) if err != nil { - return nil, fmt.Errorf("error initializing block %q: %s", p, err) + return nil, nil, fmt.Errorf("error initializing block %q: %s", p, err) } pbs = append(pbs, pb) } @@ -126,7 +146,7 @@ func findPersistedBlocks(path string) ([]*persistedBlock, error) { // range of time. sort.Sort(pbs) - return pbs, nil + return pbs, head, nil } func chunksFileName(path string) string { diff --git a/db.go b/db.go index 5ef35704c..58dc195b9 100644 --- a/db.go +++ b/db.go @@ -41,7 +41,7 @@ type DB struct { // TODO(fabxc): make configurable const ( - shardShift = 4 + shardShift = 3 numShards = 1 << shardShift maxChunkSize = 1024 ) @@ -193,7 +193,7 @@ func OpenShard(path string, logger log.Logger) (*Shard, error) { } // Initialize previously persisted blocks. - pbs, err := findPersistedBlocks(path) + pbs, head, err := findBlocks(path) if err != nil { return nil, err } @@ -201,12 +201,15 @@ func OpenShard(path string, logger log.Logger) (*Shard, error) { // TODO(fabxc): get time from client-defined `now` function. baset := time.Now().UnixNano() / int64(time.Millisecond) if len(pbs) > 0 { - baset = pbs[0].stats.MaxTime + baset = pbs[len(pbs)-1].stats.MaxTime } + if head == nil { + fmt.Println("creating new head", baset) - head, err := NewHeadBlock(filepath.Join(path, fmt.Sprintf("%d", baset)), baset) - if err != nil { - return nil, err + head, err = NewHeadBlock(filepath.Join(path, fmt.Sprintf("%d", baset)), baset) + if err != nil { + return nil, err + } } s := &Shard{ diff --git a/wal.go b/wal.go index c3b128b0f..48a992090 100644 --- a/wal.go +++ b/wal.go @@ -41,13 +41,13 @@ func OpenWAL(dir string) (*WAL, error) { p := filepath.Join(dir, "wal") - f, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode) + f, err := fileutil.TryLockFile(p, os.O_RDWR, 0666) if err != nil { if !os.IsNotExist(err) { return nil, err } - f, err = fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode) + f, err = fileutil.LockFile(p, os.O_RDWR|os.O_CREATE, 0666) if err != nil { return nil, err }