Lock mmapped files

This commit is contained in:
Fabian Reinartz 2017-01-03 10:09:20 +01:00
parent 91b65b55e7
commit e7f04d14d5
3 changed files with 55 additions and 40 deletions

View File

@ -8,14 +8,18 @@ import (
"sort" "sort"
"strconv" "strconv"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
// Block handles reads against a block of time series data within a time window. // Block handles reads against a block of time series data.
type block interface { type block interface {
dir() string
// stats() BlockStats
interval() (int64, int64) interval() (int64, int64)
index() IndexReader index() IndexReader
series() SeriesReader series() SeriesReader
// persisted() bool
} }
type BlockStats struct { type BlockStats struct {
@ -32,23 +36,24 @@ const (
) )
type persistedBlock struct { type persistedBlock struct {
d string
stats BlockStats
chunksf, indexf *mmapFile chunksf, indexf *mmapFile
chunkr *seriesReader chunkr *seriesReader
indexr *indexReader indexr *indexReader
stats BlockStats
} }
func newPersistedBlock(path string) (*persistedBlock, error) { func newPersistedBlock(p string) (*persistedBlock, error) {
// TODO(fabxc): validate match of name and stats time, validate magic. // TODO(fabxc): validate match of name and stats time, validate magic.
// mmap files belonging to the block. // mmap files belonging to the block.
chunksf, err := openMmapFile(chunksFileName(path)) chunksf, err := openMmapFile(chunksFileName(p))
if err != nil { if err != nil {
return nil, err return nil, err
} }
indexf, err := openMmapFile(indexFileName(path)) indexf, err := openMmapFile(indexFileName(p))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -68,6 +73,7 @@ func newPersistedBlock(path string) (*persistedBlock, error) {
} }
pb := &persistedBlock{ pb := &persistedBlock{
d: p,
chunksf: chunksf, chunksf: chunksf,
indexf: indexf, indexf: indexf,
chunkr: sr, chunkr: sr,
@ -87,13 +93,9 @@ func (pb *persistedBlock) Close() error {
return err1 return err1
} }
func (pb *persistedBlock) index() IndexReader { func (pb *persistedBlock) dir() string { return pb.d }
return pb.indexr func (pb *persistedBlock) index() IndexReader { return pb.indexr }
} func (pb *persistedBlock) series() SeriesReader { return pb.chunkr }
func (pb *persistedBlock) series() SeriesReader {
return pb.chunkr
}
func (pb *persistedBlock) interval() (int64, int64) { func (pb *persistedBlock) interval() (int64, int64) {
return pb.stats.MinTime, pb.stats.MaxTime return pb.stats.MinTime, pb.stats.MaxTime
@ -156,12 +158,12 @@ func indexFileName(path string) string {
} }
type mmapFile struct { type mmapFile struct {
f *os.File f *fileutil.LockedFile
b []byte b []byte
} }
func openMmapFile(path string) (*mmapFile, error) { func openMmapFile(path string) (*mmapFile, error) {
f, err := os.Open(path) f, err := fileutil.TryLockFile(path, os.O_RDONLY, 0666)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -170,7 +172,7 @@ func openMmapFile(path string) (*mmapFile, error) {
return nil, err return nil, err
} }
b, err := mmap(f, int(info.Size())) b, err := mmap(f.File, int(info.Size()))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -1,25 +1,28 @@
package tsdb package tsdb
import ( import (
"fmt"
"os" "os"
"path/filepath" "path/filepath"
"time"
"github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/fileutil"
"github.com/fabxc/tsdb/labels" "github.com/fabxc/tsdb/labels"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/pkg/timestamp"
) )
type compactor struct { type compactor struct {
shard *Shard shard *Shard
blocks compactableBlocks
logger log.Logger logger log.Logger
triggerc chan struct{} triggerc chan struct{}
donec chan struct{} donec chan struct{}
} }
type compactableBlocks interface {
compactable() []block
set([]block)
}
func newCompactor(s *Shard, l log.Logger) (*compactor, error) { func newCompactor(s *Shard, l log.Logger) (*compactor, error) {
c := &compactor{ c := &compactor{
triggerc: make(chan struct{}, 1), triggerc: make(chan struct{}, 1),
@ -41,25 +44,37 @@ func (c *compactor) trigger() {
func (c *compactor) run() { func (c *compactor) run() {
for range c.triggerc { for range c.triggerc {
if len(c.shard.persisted) < 2 { // continue
continue // bs := c.blocks.get()
}
var (
dir = fmt.Sprintf("compacted-%d", timestamp.FromTime(time.Now()))
a = c.shard.persisted[0]
b = c.shard.persisted[1]
)
if err := persist(dir, func(indexw IndexWriter, chunkw SeriesWriter) error { // if len(bs) < 2 {
return c.compact(indexw, chunkw, a, b) // continue
}); err != nil { // }
c.logger.Log("msg", "compaction failed", "err", err)
continue // var (
} // dir = fmt.Sprintf("compacted-%d", timestamp.FromTime(time.Now()))
// a = bs[0]
// b = bs[1]
// )
// c.blocks.Lock()
// if err := persist(dir, func(indexw IndexWriter, chunkw SeriesWriter) error {
// return c.compact(indexw, chunkw, a, b)
// }); err != nil {
// c.logger.Log("msg", "compaction failed", "err", err)
// continue
// }
// c.blocks.Unlock()
} }
close(c.donec) close(c.donec)
} }
func (c *compactor) pick() []block {
return nil
}
func (c *compactor) Close() error { func (c *compactor) Close() error {
close(c.triggerc) close(c.triggerc)
<-c.donec <-c.donec

12
head.go
View File

@ -12,6 +12,7 @@ import (
// HeadBlock handles reads and writes of time series data within a time window. // HeadBlock handles reads and writes of time series data within a time window.
type HeadBlock struct { type HeadBlock struct {
mtx sync.RWMutex mtx sync.RWMutex
d string
// descs holds all chunk descs for the head block. Each chunk implicitly // descs holds all chunk descs for the head block. Each chunk implicitly
// is assigned the index as its ID. // is assigned the index as its ID.
@ -36,6 +37,7 @@ func OpenHeadBlock(dir string, baseTime int64) (*HeadBlock, error) {
} }
b := &HeadBlock{ b := &HeadBlock{
d: dir,
descs: []*chunkDesc{}, descs: []*chunkDesc{},
hashes: map[uint64][]*chunkDesc{}, hashes: map[uint64][]*chunkDesc{},
values: map[string]stringset{}, values: map[string]stringset{},
@ -65,13 +67,9 @@ func (h *HeadBlock) Close() error {
return h.wal.Close() return h.wal.Close()
} }
func (h *HeadBlock) index() IndexReader { func (h *HeadBlock) dir() string { return h.d }
return h func (h *HeadBlock) index() IndexReader { return h }
} func (h *HeadBlock) series() SeriesReader { return h }
func (h *HeadBlock) series() SeriesReader {
return h
}
// Chunk returns the chunk for the reference number. // Chunk returns the chunk for the reference number.
func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) { func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) {