From 40a451694f5ef72aff6063df1dea43524c4a775c Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 9 Dec 2016 20:45:46 +0100 Subject: [PATCH] Refactor persistence into interfaces --- block.go | 104 ---------------------- chunks/chunk.go | 12 +-- chunks/xor.go | 11 ++- cmd/tsdb/main.go | 2 +- db.go | 33 +++---- head.go | 33 ------- writer.go | 221 +++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 245 insertions(+), 171 deletions(-) create mode 100644 writer.go diff --git a/block.go b/block.go index 9043a7a23..e11a3fe46 100644 --- a/block.go +++ b/block.go @@ -2,12 +2,9 @@ package tsdb import ( "fmt" - "hash/crc64" "io" "sort" "unsafe" - - "github.com/fabxc/tsdb/chunks" ) const ( @@ -18,24 +15,6 @@ const ( // Block handles reads against a block of time series data within a time window. type Block interface{} -type block interface { - stats() *blockStats - seriesData() seriesDataIterator -} - -type persistedBlock struct { -} - -type seriesDataIterator interface { - next() bool - values() (skiplist, []chunks.Chunk) - err() error -} - -func compactBlocks(a, b block) error { - return nil -} - type persistedSeries struct { size int dataref []byte @@ -141,86 +120,3 @@ func (sl simpleSkiplist) WriteTo(w io.Writer) (n int64, err error) { } return n, err } - -type blockWriter struct { - block block -} - -func (bw *blockWriter) writeSeries(ow io.Writer) (n int64, err error) { - // Duplicate all writes through a CRC64 hash writer. - h := crc64.New(crc64.MakeTable(crc64.ECMA)) - w := io.MultiWriter(h, ow) - - // Write file header including padding. - // - // XXX(fabxc): binary.Write is theoretically more appropriate for serialization. - // However, we'll have to pick correct endianness for the unsafe casts to work - // when reading again. That and the added slowness due to reflection seem to make - // it somewhat pointless. - meta := &meta{magic: magicSeries, flag: flagStd} - metab := ((*[metaSize]byte)(unsafe.Pointer(meta)))[:] - - m, err := w.Write(metab) - if err != nil { - return n + int64(m), err - } - n += int64(m) - - // Write stats section including padding. - statsb := ((*[seriesStatsSize]byte)(unsafe.Pointer(bw.block.stats())))[:] - - m, err = w.Write(statsb) - if err != nil { - return n + int64(m), err - } - n += int64(m) - - // Write series data sections. - // - // TODO(fabxc): cache the offsets so we can use them on writing down the index. - it := bw.block.seriesData() - - for it.next() { - sl, chunks := it.values() - - m, err := sl.WriteTo(w) - if err != nil { - return n + int64(m), err - } - n += int64(m) - - for _, c := range chunks { - m, err := w.Write(c.Bytes()) - if err != nil { - return n + int64(m), err - } - n += int64(m) - } - } - if it.err() != nil { - return n, it.err() - } - - // Write checksum to the original writer. - m, err = ow.Write(h.Sum(nil)) - return n + int64(m), err -} - -func (bw *blockWriter) writeIndex(ow io.Writer) (n int64, err error) { - // Duplicate all writes through a CRC64 hash writer. - h := crc64.New(crc64.MakeTable(crc64.ECMA)) - w := io.MultiWriter(h, ow) - - meta := &meta{magic: magicSeries, flag: flagStd} - metab := ((*[metaSize]byte)(unsafe.Pointer(meta)))[:] - - m, err := w.Write(metab) - if err != nil { - return n + int64(m), err - } - n += int64(m) - - // Write checksum to the original writer. - m, err = ow.Write(h.Sum(nil)) - return n + int64(m), err -} diff --git a/chunks/chunk.go b/chunks/chunk.go index c0594a5b5..83d346d49 100644 --- a/chunks/chunk.go +++ b/chunks/chunk.go @@ -34,22 +34,18 @@ var ( // Chunk holds a sequence of sample pairs that can be iterated over and appended to. type Chunk interface { Bytes() []byte + Encoding() Encoding Appender() (Appender, error) Iterator() Iterator } -// FromBytes returns a chunk from a byte slice of chunk data. -func FromBytes(d []byte) (Chunk, error) { - if len(d) < 1 { - return nil, fmt.Errorf("no data") - } - e := Encoding(d[0]) - +// FromData returns a chunk from a byte slice of chunk data. +func FromData(e Encoding, d []byte) (Chunk, error) { switch e { case EncXOR: return &XORChunk{ b: &bstream{count: 8}, - num: binary.LittleEndian.Uint16(d[1:3]), + num: binary.LittleEndian.Uint16(d), }, nil } return nil, fmt.Errorf("unknown chunk encoding: %d", e) diff --git a/chunks/xor.go b/chunks/xor.go index d80a06334..cf2648dfb 100644 --- a/chunks/xor.go +++ b/chunks/xor.go @@ -16,8 +16,7 @@ type XORChunk struct { // NewXORChunk returns a new chunk with XOR encoding of the given size. func NewXORChunk(size int) *XORChunk { - b := make([]byte, 3, 128) - b[0] = byte(EncXOR) + b := make([]byte, 2, 128) return &XORChunk{ b: &bstream{stream: b, count: 0}, @@ -26,12 +25,16 @@ func NewXORChunk(size int) *XORChunk { } } +func (c *XORChunk) Encoding() Encoding { + return EncXOR +} + // Bytes returns the underlying byte slice of the chunk. func (c *XORChunk) Bytes() []byte { b := c.b.bytes() // Lazily populate length bytes – probably not necessary to have the // cache value in struct. - binary.LittleEndian.PutUint16(b[1:3], c.num) + binary.LittleEndian.PutUint16(b[:2], c.num) return b } @@ -68,7 +71,7 @@ func (c *XORChunk) iterator() *xorIterator { // When using striped locks to guard access to chunks, probably yes. // Could only copy data if the chunk is not completed yet. return &xorIterator{ - br: newBReader(c.b.bytes()[3:]), + br: newBReader(c.b.bytes()[2:]), numTotal: c.num, } } diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index b5e0984c1..01b843653 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -134,7 +134,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { measureTime("ingestScrapes", func() { b.startProfiling() - if err := b.ingestScrapes(metrics, 3000); err != nil { + if err := b.ingestScrapes(metrics, 1000); err != nil { exitWithError(err) } }) diff --git a/db.go b/db.go index c13dfec2f..e65ac877b 100644 --- a/db.go +++ b/db.go @@ -112,10 +112,12 @@ func (db *DB) appendSingle(lset Labels, ts int64, v float64) error { h := lset.Hash() s := uint16(h >> (64 - seriesShardShift)) - return db.shards[s].appendBatch(ts, Sample{ - Hash: h, - Labels: lset, - Value: v, + return db.shards[s].appendBatch(ts, []Sample{ + { + Hash: h, + Labels: lset, + Value: v, + }, }) } @@ -211,11 +213,6 @@ func (s *SeriesShard) Close() error { return nil } -// blockFor returns the block of shard series that contains the given timestamp. -func (s *SeriesShard) blockFor(ts int64) block { - return nil -} - func (s *SeriesShard) appendBatch(ts int64, samples []Sample) error { // TODO(fabxc): make configurable. const persistenceTimeThreshold = 1000 * 60 * 60 // 1 hour if timestamp in ms @@ -236,7 +233,6 @@ func (s *SeriesShard) appendBatch(ts int64, samples []Sample) error { // TODO(fabxc): randomize over time if s.head.stats().samples/uint64(s.head.stats().chunks) > 400 { - s.persist() select { case s.persistCh <- struct{}{}: go s.persist() @@ -276,19 +272,14 @@ func (s *SeriesShard) persist() error { return err } - bw := &blockWriter{block: head} - n, err := bw.writeSeries(f) - if err != nil { - return err + w := newSeriesWriter(f, s.head.baseTimestamp) + defer w.Close() + + for _, cd := range head.index.forward { + w.WriteSeries(cd.lset, []*chunkDesc{cd}) } - if err := f.Sync(); err != nil { - return err - } - if err := f.Close(); err != nil { - return err - } - sz := fmt.Sprintf("%fMiB", float64(n)/1024/1024) + sz := fmt.Sprintf("%fMiB", float64(w.Size())/1024/1024) s.logger.With("size", sz). With("samples", head.samples). diff --git a/head.go b/head.go index a07743be2..c14be744b 100644 --- a/head.go +++ b/head.go @@ -62,36 +62,3 @@ func (h *HeadBlock) stats() *blockStats { samples: h.samples, } } - -func (h *HeadBlock) seriesData() seriesDataIterator { - h.mtx.RLock() - defer h.mtx.RUnlock() - - it := &chunkDescsIterator{ - descs: make([]*chunkDesc, 0, len(h.index.forward)), - i: -1, - } - - for _, cd := range h.index.forward { - it.descs = append(it.descs, cd) - } - return it -} - -type chunkDescsIterator struct { - descs []*chunkDesc - i int -} - -func (it *chunkDescsIterator) next() bool { - it.i++ - return it.i < len(it.descs) -} - -func (it *chunkDescsIterator) values() (skiplist, []chunks.Chunk) { - return &simpleSkiplist{}, []chunks.Chunk{it.descs[it.i].chunk} -} - -func (it *chunkDescsIterator) err() error { - return nil -} diff --git a/writer.go b/writer.go new file mode 100644 index 000000000..e78a1ab7e --- /dev/null +++ b/writer.go @@ -0,0 +1,221 @@ +package tsdb + +import ( + "hash/crc32" + "io" + "os" + "unsafe" +) + +const ( + // MagicSeries 4 bytes at the head of series file. + MagicSeries = 0x85BD40DD + + // MagicIndex 4 bytes at the head of an index file. + MagicIndex = 0xBAAAD700 +) + +// SeriesWriter serializes a time block of chunked series data. +type SeriesWriter interface { + // WriteSeries writes the time series data chunks for a single series. + WriteSeries(Labels, []*chunkDesc) error + + // Size returns the size of the data written so far. + Size() int64 + + // Close writes any required finalization and closes the resources + // associated with the underlying writer. + Close() error +} + +// seriesWriter implements the SeriesWriter interface for the standard +// serialization format. +type seriesWriter struct { + w io.Writer + n int64 + c int + + baseTimestamp int64 + index IndexWriter + + chunkOffsets map[uint32][]uint32 + seriesOffsets map[uint32]uint32 +} + +func newSeriesWriter(w io.Writer, base int64) *seriesWriter { + return &seriesWriter{ + w: w, + n: 0, + baseTimestamp: base, + } +} + +func (w *seriesWriter) write(wr io.Writer, b []byte) error { + n, err := wr.Write(b) + w.n += int64(n) + return err +} + +func (w *seriesWriter) writeMeta() error { + meta := &meta{magic: MagicSeries, flag: flagStd} + metab := ((*[metaSize]byte)(unsafe.Pointer(meta)))[:] + + return w.write(w.w, metab) +} + +func (w *seriesWriter) WriteSeries(lset Labels, chks []*chunkDesc) error { + // Initialize with meta data. + if w.n == 0 { + if err := w.writeMeta(); err != nil { + return err + } + } + + // TODO(fabxc): is crc32 enough for chunks of one series? + h := crc32.NewIEEE() + wr := io.MultiWriter(h, w.w) + + l := uint32(0) + for _, cd := range chks { + l += uint32(len(cd.chunk.Bytes())) + } + // For normal reads we don't need the length of the chunk section but + // it allows us to verify checksums without reading the index file. + if err := w.write(w.w, ((*[4]byte)(unsafe.Pointer(&l)))[:]); err != nil { + return err + } + + offsets := make([]ChunkOffset, 0, len(chks)) + lastTimestamp := w.baseTimestamp + + for _, cd := range chks { + offsets = append(offsets, ChunkOffset{ + Value: lastTimestamp, + Offset: uint32(w.n), + }) + + if err := w.write(wr, []byte{byte(cd.chunk.Encoding())}); err != nil { + return err + } + if err := w.write(wr, cd.chunk.Bytes()); err != nil { + return err + } + lastTimestamp = cd.lastTimestamp + } + + if err := w.write(w.w, h.Sum(nil)); err != nil { + return err + } + + if w.index != nil { + w.index.AddOffsets(lset, offsets...) + } + return nil +} + +func (w *seriesWriter) Size() int64 { + return w.n +} + +func (w *seriesWriter) Close() error { + if f, ok := w.w.(*os.File); ok { + if err := f.Sync(); err != nil { + return err + } + } + + if c, ok := w.w.(io.Closer); ok { + return c.Close() + } + return nil +} + +type ChunkOffset struct { + Value int64 + Offset uint32 +} + +type BlockStats struct { +} + +// IndexWriter serialized the index for a block of series data. +// The methods must generally be called in order they are specified. +type IndexWriter interface { + // AddOffsets populates the index writer with offsets of chunks + // for a series that the index can reference. + AddOffsets(Labels, ...ChunkOffset) + + // WriteStats writes final stats for the indexed block. + WriteStats(*BlockStats) error + + // WriteSymbols serializes all encountered string symbols. + WriteSymbols([]string) error + + // WriteLabelIndex serializes an index from label names to values. + // The passed in values chained tuples of strings of the length of names. + WriteLabelIndex(names []string, values []string) error + + // WritesSeries serializes series identifying labels. + WriteSeries(ref uint32, ls ...Labels) error + + // WritePostings writes a postings list for a single label pair. + WritePostings(name, value string, it Iterator) error + + // Size returns the size of the data written so far. + Size() int64 + + // Closes writes any finalization and closes theresources associated with + // the underlying writer. + Close() error +} + +// indexWriter implements the IndexWriter interface for the standard +// serialization format. +type indexWriter struct { + w io.Writer + n int64 + + series []Labels + offsets [][]ChunkOffset +} + +func (w *indexWriter) AddOffsets(lset Labels, offsets ...ChunkOffset) { + w.series = append(w.series, lset) + w.offsets = append(w.offsets, offsets) +} + +func (w *indexWriter) WriteStats(*BlockStats) error { + return nil +} + +func (w *indexWriter) WriteSymbols(symbols []string) error { + return nil +} + +func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { + return nil +} + +func (w *indexWriter) WriteSeries(ref uint32, ls ...Labels) error { + return nil +} + +func (w *indexWriter) WritePostings(name, value string, it Iterator) error { + return nil +} + +func (w *indexWriter) Size() int64 { + return w.n +} +func (w *indexWriter) Close() error { + if f, ok := w.w.(*os.File); ok { + if err := f.Sync(); err != nil { + return err + } + } + + if c, ok := w.w.(io.Closer); ok { + return c.Close() + } + return nil +}