Refactor persistence into interfaces

This commit is contained in:
Fabian Reinartz 2016-12-09 20:45:46 +01:00
parent 62f9dc311c
commit 40a451694f
7 changed files with 245 additions and 171 deletions

104
block.go
View File

@ -2,12 +2,9 @@ package tsdb
import ( import (
"fmt" "fmt"
"hash/crc64"
"io" "io"
"sort" "sort"
"unsafe" "unsafe"
"github.com/fabxc/tsdb/chunks"
) )
const ( const (
@ -18,24 +15,6 @@ const (
// Block handles reads against a block of time series data within a time window. // Block handles reads against a block of time series data within a time window.
type Block interface{} type Block interface{}
type block interface {
stats() *blockStats
seriesData() seriesDataIterator
}
type persistedBlock struct {
}
type seriesDataIterator interface {
next() bool
values() (skiplist, []chunks.Chunk)
err() error
}
func compactBlocks(a, b block) error {
return nil
}
type persistedSeries struct { type persistedSeries struct {
size int size int
dataref []byte dataref []byte
@ -141,86 +120,3 @@ func (sl simpleSkiplist) WriteTo(w io.Writer) (n int64, err error) {
} }
return n, err return n, err
} }
type blockWriter struct {
block block
}
func (bw *blockWriter) writeSeries(ow io.Writer) (n int64, err error) {
// Duplicate all writes through a CRC64 hash writer.
h := crc64.New(crc64.MakeTable(crc64.ECMA))
w := io.MultiWriter(h, ow)
// Write file header including padding.
//
// XXX(fabxc): binary.Write is theoretically more appropriate for serialization.
// However, we'll have to pick correct endianness for the unsafe casts to work
// when reading again. That and the added slowness due to reflection seem to make
// it somewhat pointless.
meta := &meta{magic: magicSeries, flag: flagStd}
metab := ((*[metaSize]byte)(unsafe.Pointer(meta)))[:]
m, err := w.Write(metab)
if err != nil {
return n + int64(m), err
}
n += int64(m)
// Write stats section including padding.
statsb := ((*[seriesStatsSize]byte)(unsafe.Pointer(bw.block.stats())))[:]
m, err = w.Write(statsb)
if err != nil {
return n + int64(m), err
}
n += int64(m)
// Write series data sections.
//
// TODO(fabxc): cache the offsets so we can use them on writing down the index.
it := bw.block.seriesData()
for it.next() {
sl, chunks := it.values()
m, err := sl.WriteTo(w)
if err != nil {
return n + int64(m), err
}
n += int64(m)
for _, c := range chunks {
m, err := w.Write(c.Bytes())
if err != nil {
return n + int64(m), err
}
n += int64(m)
}
}
if it.err() != nil {
return n, it.err()
}
// Write checksum to the original writer.
m, err = ow.Write(h.Sum(nil))
return n + int64(m), err
}
func (bw *blockWriter) writeIndex(ow io.Writer) (n int64, err error) {
// Duplicate all writes through a CRC64 hash writer.
h := crc64.New(crc64.MakeTable(crc64.ECMA))
w := io.MultiWriter(h, ow)
meta := &meta{magic: magicSeries, flag: flagStd}
metab := ((*[metaSize]byte)(unsafe.Pointer(meta)))[:]
m, err := w.Write(metab)
if err != nil {
return n + int64(m), err
}
n += int64(m)
// Write checksum to the original writer.
m, err = ow.Write(h.Sum(nil))
return n + int64(m), err
}

View File

@ -34,22 +34,18 @@ var (
// Chunk holds a sequence of sample pairs that can be iterated over and appended to. // Chunk holds a sequence of sample pairs that can be iterated over and appended to.
type Chunk interface { type Chunk interface {
Bytes() []byte Bytes() []byte
Encoding() Encoding
Appender() (Appender, error) Appender() (Appender, error)
Iterator() Iterator Iterator() Iterator
} }
// FromBytes returns a chunk from a byte slice of chunk data. // FromData returns a chunk from a byte slice of chunk data.
func FromBytes(d []byte) (Chunk, error) { func FromData(e Encoding, d []byte) (Chunk, error) {
if len(d) < 1 {
return nil, fmt.Errorf("no data")
}
e := Encoding(d[0])
switch e { switch e {
case EncXOR: case EncXOR:
return &XORChunk{ return &XORChunk{
b: &bstream{count: 8}, b: &bstream{count: 8},
num: binary.LittleEndian.Uint16(d[1:3]), num: binary.LittleEndian.Uint16(d),
}, nil }, nil
} }
return nil, fmt.Errorf("unknown chunk encoding: %d", e) return nil, fmt.Errorf("unknown chunk encoding: %d", e)

View File

@ -16,8 +16,7 @@ type XORChunk struct {
// NewXORChunk returns a new chunk with XOR encoding of the given size. // NewXORChunk returns a new chunk with XOR encoding of the given size.
func NewXORChunk(size int) *XORChunk { func NewXORChunk(size int) *XORChunk {
b := make([]byte, 3, 128) b := make([]byte, 2, 128)
b[0] = byte(EncXOR)
return &XORChunk{ return &XORChunk{
b: &bstream{stream: b, count: 0}, b: &bstream{stream: b, count: 0},
@ -26,12 +25,16 @@ func NewXORChunk(size int) *XORChunk {
} }
} }
func (c *XORChunk) Encoding() Encoding {
return EncXOR
}
// Bytes returns the underlying byte slice of the chunk. // Bytes returns the underlying byte slice of the chunk.
func (c *XORChunk) Bytes() []byte { func (c *XORChunk) Bytes() []byte {
b := c.b.bytes() b := c.b.bytes()
// Lazily populate length bytes probably not necessary to have the // Lazily populate length bytes probably not necessary to have the
// cache value in struct. // cache value in struct.
binary.LittleEndian.PutUint16(b[1:3], c.num) binary.LittleEndian.PutUint16(b[:2], c.num)
return b return b
} }
@ -68,7 +71,7 @@ func (c *XORChunk) iterator() *xorIterator {
// When using striped locks to guard access to chunks, probably yes. // When using striped locks to guard access to chunks, probably yes.
// Could only copy data if the chunk is not completed yet. // Could only copy data if the chunk is not completed yet.
return &xorIterator{ return &xorIterator{
br: newBReader(c.b.bytes()[3:]), br: newBReader(c.b.bytes()[2:]),
numTotal: c.num, numTotal: c.num,
} }
} }

View File

@ -134,7 +134,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
measureTime("ingestScrapes", func() { measureTime("ingestScrapes", func() {
b.startProfiling() b.startProfiling()
if err := b.ingestScrapes(metrics, 3000); err != nil { if err := b.ingestScrapes(metrics, 1000); err != nil {
exitWithError(err) exitWithError(err)
} }
}) })

33
db.go
View File

@ -112,10 +112,12 @@ func (db *DB) appendSingle(lset Labels, ts int64, v float64) error {
h := lset.Hash() h := lset.Hash()
s := uint16(h >> (64 - seriesShardShift)) s := uint16(h >> (64 - seriesShardShift))
return db.shards[s].appendBatch(ts, Sample{ return db.shards[s].appendBatch(ts, []Sample{
Hash: h, {
Labels: lset, Hash: h,
Value: v, Labels: lset,
Value: v,
},
}) })
} }
@ -211,11 +213,6 @@ func (s *SeriesShard) Close() error {
return nil return nil
} }
// blockFor returns the block of shard series that contains the given timestamp.
func (s *SeriesShard) blockFor(ts int64) block {
return nil
}
func (s *SeriesShard) appendBatch(ts int64, samples []Sample) error { func (s *SeriesShard) appendBatch(ts int64, samples []Sample) error {
// TODO(fabxc): make configurable. // TODO(fabxc): make configurable.
const persistenceTimeThreshold = 1000 * 60 * 60 // 1 hour if timestamp in ms const persistenceTimeThreshold = 1000 * 60 * 60 // 1 hour if timestamp in ms
@ -236,7 +233,6 @@ func (s *SeriesShard) appendBatch(ts int64, samples []Sample) error {
// TODO(fabxc): randomize over time // TODO(fabxc): randomize over time
if s.head.stats().samples/uint64(s.head.stats().chunks) > 400 { if s.head.stats().samples/uint64(s.head.stats().chunks) > 400 {
s.persist()
select { select {
case s.persistCh <- struct{}{}: case s.persistCh <- struct{}{}:
go s.persist() go s.persist()
@ -276,19 +272,14 @@ func (s *SeriesShard) persist() error {
return err return err
} }
bw := &blockWriter{block: head} w := newSeriesWriter(f, s.head.baseTimestamp)
n, err := bw.writeSeries(f) defer w.Close()
if err != nil {
return err for _, cd := range head.index.forward {
w.WriteSeries(cd.lset, []*chunkDesc{cd})
} }
if err := f.Sync(); err != nil { sz := fmt.Sprintf("%fMiB", float64(w.Size())/1024/1024)
return err
}
if err := f.Close(); err != nil {
return err
}
sz := fmt.Sprintf("%fMiB", float64(n)/1024/1024)
s.logger.With("size", sz). s.logger.With("size", sz).
With("samples", head.samples). With("samples", head.samples).

33
head.go
View File

@ -62,36 +62,3 @@ func (h *HeadBlock) stats() *blockStats {
samples: h.samples, samples: h.samples,
} }
} }
func (h *HeadBlock) seriesData() seriesDataIterator {
h.mtx.RLock()
defer h.mtx.RUnlock()
it := &chunkDescsIterator{
descs: make([]*chunkDesc, 0, len(h.index.forward)),
i: -1,
}
for _, cd := range h.index.forward {
it.descs = append(it.descs, cd)
}
return it
}
type chunkDescsIterator struct {
descs []*chunkDesc
i int
}
func (it *chunkDescsIterator) next() bool {
it.i++
return it.i < len(it.descs)
}
func (it *chunkDescsIterator) values() (skiplist, []chunks.Chunk) {
return &simpleSkiplist{}, []chunks.Chunk{it.descs[it.i].chunk}
}
func (it *chunkDescsIterator) err() error {
return nil
}

221
writer.go Normal file
View File

@ -0,0 +1,221 @@
package tsdb
import (
"hash/crc32"
"io"
"os"
"unsafe"
)
const (
// MagicSeries 4 bytes at the head of series file.
MagicSeries = 0x85BD40DD
// MagicIndex 4 bytes at the head of an index file.
MagicIndex = 0xBAAAD700
)
// SeriesWriter serializes a time block of chunked series data.
type SeriesWriter interface {
// WriteSeries writes the time series data chunks for a single series.
WriteSeries(Labels, []*chunkDesc) error
// Size returns the size of the data written so far.
Size() int64
// Close writes any required finalization and closes the resources
// associated with the underlying writer.
Close() error
}
// seriesWriter implements the SeriesWriter interface for the standard
// serialization format.
type seriesWriter struct {
w io.Writer
n int64
c int
baseTimestamp int64
index IndexWriter
chunkOffsets map[uint32][]uint32
seriesOffsets map[uint32]uint32
}
func newSeriesWriter(w io.Writer, base int64) *seriesWriter {
return &seriesWriter{
w: w,
n: 0,
baseTimestamp: base,
}
}
func (w *seriesWriter) write(wr io.Writer, b []byte) error {
n, err := wr.Write(b)
w.n += int64(n)
return err
}
func (w *seriesWriter) writeMeta() error {
meta := &meta{magic: MagicSeries, flag: flagStd}
metab := ((*[metaSize]byte)(unsafe.Pointer(meta)))[:]
return w.write(w.w, metab)
}
func (w *seriesWriter) WriteSeries(lset Labels, chks []*chunkDesc) error {
// Initialize with meta data.
if w.n == 0 {
if err := w.writeMeta(); err != nil {
return err
}
}
// TODO(fabxc): is crc32 enough for chunks of one series?
h := crc32.NewIEEE()
wr := io.MultiWriter(h, w.w)
l := uint32(0)
for _, cd := range chks {
l += uint32(len(cd.chunk.Bytes()))
}
// For normal reads we don't need the length of the chunk section but
// it allows us to verify checksums without reading the index file.
if err := w.write(w.w, ((*[4]byte)(unsafe.Pointer(&l)))[:]); err != nil {
return err
}
offsets := make([]ChunkOffset, 0, len(chks))
lastTimestamp := w.baseTimestamp
for _, cd := range chks {
offsets = append(offsets, ChunkOffset{
Value: lastTimestamp,
Offset: uint32(w.n),
})
if err := w.write(wr, []byte{byte(cd.chunk.Encoding())}); err != nil {
return err
}
if err := w.write(wr, cd.chunk.Bytes()); err != nil {
return err
}
lastTimestamp = cd.lastTimestamp
}
if err := w.write(w.w, h.Sum(nil)); err != nil {
return err
}
if w.index != nil {
w.index.AddOffsets(lset, offsets...)
}
return nil
}
func (w *seriesWriter) Size() int64 {
return w.n
}
func (w *seriesWriter) Close() error {
if f, ok := w.w.(*os.File); ok {
if err := f.Sync(); err != nil {
return err
}
}
if c, ok := w.w.(io.Closer); ok {
return c.Close()
}
return nil
}
type ChunkOffset struct {
Value int64
Offset uint32
}
type BlockStats struct {
}
// IndexWriter serialized the index for a block of series data.
// The methods must generally be called in order they are specified.
type IndexWriter interface {
// AddOffsets populates the index writer with offsets of chunks
// for a series that the index can reference.
AddOffsets(Labels, ...ChunkOffset)
// WriteStats writes final stats for the indexed block.
WriteStats(*BlockStats) error
// WriteSymbols serializes all encountered string symbols.
WriteSymbols([]string) error
// WriteLabelIndex serializes an index from label names to values.
// The passed in values chained tuples of strings of the length of names.
WriteLabelIndex(names []string, values []string) error
// WritesSeries serializes series identifying labels.
WriteSeries(ref uint32, ls ...Labels) error
// WritePostings writes a postings list for a single label pair.
WritePostings(name, value string, it Iterator) error
// Size returns the size of the data written so far.
Size() int64
// Closes writes any finalization and closes theresources associated with
// the underlying writer.
Close() error
}
// indexWriter implements the IndexWriter interface for the standard
// serialization format.
type indexWriter struct {
w io.Writer
n int64
series []Labels
offsets [][]ChunkOffset
}
func (w *indexWriter) AddOffsets(lset Labels, offsets ...ChunkOffset) {
w.series = append(w.series, lset)
w.offsets = append(w.offsets, offsets)
}
func (w *indexWriter) WriteStats(*BlockStats) error {
return nil
}
func (w *indexWriter) WriteSymbols(symbols []string) error {
return nil
}
func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
return nil
}
func (w *indexWriter) WriteSeries(ref uint32, ls ...Labels) error {
return nil
}
func (w *indexWriter) WritePostings(name, value string, it Iterator) error {
return nil
}
func (w *indexWriter) Size() int64 {
return w.n
}
func (w *indexWriter) Close() error {
if f, ok := w.w.(*os.File); ok {
if err := f.Sync(); err != nil {
return err
}
}
if c, ok := w.w.(io.Closer); ok {
return c.Close()
}
return nil
}