From 2c34a15fe61f33d895209e04e0cfb3ac2b177212 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 8 Dec 2016 17:43:10 +0100 Subject: [PATCH] Add initial seriailization of block data --- block.go | 206 ++++++++++++++++++++++++++++++++++++++++++++++++++++ db.go | 20 +++++ db_amd64.go | 10 +++ head.go | 47 +++++++++--- index.go | 2 +- 5 files changed, 274 insertions(+), 11 deletions(-) create mode 100644 block.go create mode 100644 db_amd64.go diff --git a/block.go b/block.go new file mode 100644 index 000000000..c3e2921c2 --- /dev/null +++ b/block.go @@ -0,0 +1,206 @@ +package tsdb + +import ( + "fmt" + "hash/crc64" + "io" + "sort" + "unsafe" + + "github.com/fabxc/tsdb/chunks" +) + +const ( + magicIndex = 0xCAFECAFE + magicSeries = 0xAFFEAFFE +) + +// Block handles reads against a block of time series data within a time window. +type Block interface{} + +type block interface { + stats() *seriesStats + seriesData() seriesDataIterator +} + +type seriesDataIterator interface { + next() bool + values() (skiplist, []chunks.Chunk) + err() error +} + +func compactBlocks(a, b block) error { + return nil +} + +const maxMmapSize = 1 << 20 + +type persistedSeries struct { + size int + dataref []byte + data *[maxMmapSize]byte +} + +const ( + flagNone = 0 + flagStd = 1 +) + +const ( + seriesMetaSize = int(unsafe.Sizeof(seriesMeta{})) + seriesStatsSize = int(unsafe.Sizeof(seriesStats{})) +) + +type seriesMeta struct { + magic uint32 + flag byte + _ [7]byte // padding/reserved +} + +type seriesStats struct { + series uint32 + samples uint64 + _ [4]byte // padding/reserved +} + +func (s *persistedSeries) meta() *seriesMeta { + return (*seriesMeta)(unsafe.Pointer(&s.data[0])) +} + +func (s *persistedSeries) stats() *seriesStats { + // The stats start right behind the block meta data. + return (*seriesStats)(unsafe.Pointer(&s.data[seriesMetaSize])) +} + +// A skiplist maps offsets to values. The values found in the data at an +// offset are strictly greater than the indexed value. +type skiplist interface { + // A skiplist can serialize itself into a writer. + io.WriterTo + + // offset returns the offset to data containing values of x and lower. + offset(x int64) (uint32, bool) +} + +// simpleSkiplist is a slice of plain value/offset pairs. +type simpleSkiplist []skiplistPair + +type skiplistPair struct { + value int64 + offset uint32 +} + +func (sl simpleSkiplist) offset(x int64) (uint32, bool) { + // Search for the first offset that contains data greater than x. + i := sort.Search(len(sl), func(i int) bool { return sl[i].value >= x }) + + // If no element was found return false. If the first element is found, + // there's no previous offset actually containing values that are x or lower. + if i == len(sl) || i == 0 { + return 0, false + } + return sl[i-1].offset, true +} + +func (sl simpleSkiplist) WriteTo(w io.Writer) (n int64, err error) { + for _, s := range sl { + b := ((*[unsafe.Sizeof(skiplistPair{})]byte)(unsafe.Pointer(&s)))[:] + + m, err := w.Write(b) + if err != nil { + return n + int64(m), err + } + n += int64(m) + } + return n, err +} + +// seriesAt returns the series stored at offset as a skiplist and the chunks +// it points to as a byte slice. +func (s *persistedSeries) seriesAt(offset int) (skiplist, []byte, error) { + offset += seriesMetaSize + offset += seriesStatsSize + + switch b := s.data[offset]; b { + case flagStd: + default: + return nil, nil, fmt.Errorf("invalid flag: %x", b) + } + offset++ + + var ( + slLen = *(*uint16)(unsafe.Pointer(&s.data[offset])) + slSize = int(slLen) / int(unsafe.Sizeof(skiplistPair{})) + sl = ((*[maxAllocSize]skiplistPair)(unsafe.Pointer(&s.data[offset+2])))[:slSize] + ) + offset += 3 + + chunksLen := *(*uint32)(unsafe.Pointer(&s.data[offset])) + chunks := ((*[maxAllocSize]byte)(unsafe.Pointer(&s.data[offset])))[:chunksLen] + + return simpleSkiplist(sl), chunks, nil +} + +type blockWriter struct { + block block +} + +func (bw *blockWriter) writeSeries(ow io.Writer) (n int64, err error) { + // Duplicate all writes through a CRC64 hash writer. + h := crc64.New(crc64.MakeTable(crc64.ECMA)) + w := io.MultiWriter(h, ow) + + // Write file header including padding. + // + // XXX(fabxc): binary.Write is theoretically more appropriate for serialization. + // However, we'll have to pick correct endianness for the unsafe casts to work + // when reading again. That and the added slowness due to reflection seem to make + // it somewhat pointless. + meta := &seriesMeta{magic: magicSeries, flag: flagStd} + metab := ((*[seriesMetaSize]byte)(unsafe.Pointer(meta)))[:] + + m, err := w.Write(metab) + if err != nil { + return n + int64(m), err + } + n += int64(m) + + // Write stats section including padding. + statsb := ((*[seriesStatsSize]byte)(unsafe.Pointer(bw.block.stats())))[:] + + m, err = w.Write(statsb) + if err != nil { + return n + int64(m), err + } + n += int64(m) + + // Write series data sections. + // + // TODO(fabxc): cache the offsets so we can use them on writing down the index. + it := bw.block.seriesData() + + for it.next() { + sl, chunks := it.values() + + m, err := sl.WriteTo(w) + if err != nil { + return n + int64(m), err + } + n += int64(m) + + for _, c := range chunks { + m, err := w.Write(c.Bytes()) + if err != nil { + return n + int64(m), err + } + n += int64(m) + } + } + if it.err() != nil { + return n, it.err() + } + + // Write checksum to the original writer. + m, err = ow.Write(h.Sum(nil)) + return n + int64(m), err +} diff --git a/db.go b/db.go index d39dc56ec..858b448fc 100644 --- a/db.go +++ b/db.go @@ -4,10 +4,13 @@ package tsdb import ( "fmt" "os" + "path/filepath" "sort" + "sync" "time" "github.com/cespare/xxhash" + "github.com/fabxc/tsdb/chunks" "github.com/prometheus/common/log" ) @@ -25,6 +28,7 @@ type Options struct { type DB struct { logger log.Logger opts *Options + path string shards []*SeriesShard } @@ -48,6 +52,7 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) { c := &DB{ logger: l, opts: opts, + path: path, } // Initialize vertical shards. @@ -68,6 +73,21 @@ func (db *DB) Close() error { fmt.Println("shard", i) fmt.Println(" num chunks", len(shard.head.forward)) fmt.Println(" num samples", shard.head.samples) + + f, err := os.Create(filepath.Join(db.path, fmt.Sprintf("shard-%d-series", i))) + if err != nil { + return err + } + bw := &blockWriter{block: shard.head} + n, err := bw.writeSeries(f) + if err != nil { + return err + } + fmt.Println(" wrote bytes", n) + + if err := f.Close(); err != nil { + return err + } } return nil diff --git a/db_amd64.go b/db_amd64.go new file mode 100644 index 000000000..cfd85c975 --- /dev/null +++ b/db_amd64.go @@ -0,0 +1,10 @@ +package tsdb + +// maxMapSize represents the largest mmap size supported by Bolt. +const maxMapSize = 0xFFFFFFFFFFFF // 256TB + +// maxAllocSize is the size used when creating array pointers. +const maxAllocSize = 0x7FFFFFFF + +// Are unaligned load/stores broken on this arch? +var brokenUnaligned = false diff --git a/head.go b/head.go index 5c52a90dd..857ca47ba 100644 --- a/head.go +++ b/head.go @@ -1,8 +1,6 @@ package tsdb import ( - "fmt" - "io" "math" "sync" @@ -20,14 +18,6 @@ type HeadBlock struct { samples uint64 } -// WriteTo serializes the current head block contents into w. -func (h *HeadBlock) WriteTo(w io.Writer) (int64, error) { - h.mtx.RLock() - defer h.mtx.RUnlock() - - return 0, fmt.Errorf("not implemented") -} - // get retrieves the chunk with the hash and label set and creates // a new one if it doesn't exist yet. func (h *HeadBlock) get(hash uint64, lset Labels) (*chunkDesc, bool) { @@ -78,3 +68,40 @@ func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error h.samples++ return nil } + +func (h *HeadBlock) stats() *seriesStats { + return &seriesStats{} +} + +func (h *HeadBlock) seriesData() seriesDataIterator { + h.mtx.RLock() + defer h.mtx.RUnlock() + + it := &chunkDescsIterator{ + descs: make([]*chunkDesc, 0, len(h.forward)), + i: -1, + } + + for _, cd := range h.forward { + it.descs = append(it.descs, cd) + } + return it +} + +type chunkDescsIterator struct { + descs []*chunkDesc + i int +} + +func (it *chunkDescsIterator) next() bool { + it.i++ + return it.i < len(it.descs) +} + +func (it *chunkDescsIterator) values() (skiplist, []chunks.Chunk) { + return &simpleSkiplist{}, []chunks.Chunk{it.descs[it.i].chunk} +} + +func (it *chunkDescsIterator) err() error { + return nil +} diff --git a/index.go b/index.go index f6a1adc8f..064f31f23 100644 --- a/index.go +++ b/index.go @@ -15,7 +15,7 @@ type memIndex struct { // Postings returns an iterator over the postings list for s. func (ix *memIndex) Postings(s string) Iterator { - return &listIterator{list: ix.m[s]} + return &listIterator{list: ix.m[s], idx: -1} } // add adds a document to the index. The caller has to ensure that no