Add stats serialization, load querier of all blocks

This commit is contained in:
Fabian Reinartz 2016-12-15 16:14:33 +01:00
parent 1a35e54450
commit bd77103a49
6 changed files with 182 additions and 62 deletions

View File

@ -6,12 +6,13 @@ import (
"os"
"path/filepath"
"sort"
"strconv"
)
// Block handles reads against a block of time series data within a time window.
type Block interface {
type block interface {
Querier(mint, maxt int64) Querier
interval() (int64, int64)
}
const (
@ -25,15 +26,12 @@ type persistedBlock struct {
chunks *seriesReader
index *indexReader
baseTime int64
stats BlockStats
}
func newPersistedBlock(path string) (*persistedBlock, error) {
// The directory must be named after the base timestamp for the block.
baset, err := strconv.ParseInt(filepath.Base(path), 10, 0)
if err != nil {
return nil, fmt.Errorf("unexpected directory name %q: %s", path, err)
}
// TODO(fabxc): validate match of name and stats time, validate magic.
// mmap files belonging to the block.
chunksf, err := openMmapFile(chunksFileName(path))
@ -54,12 +52,17 @@ func newPersistedBlock(path string) (*persistedBlock, error) {
return nil, err
}
stats, err := ir.Stats()
if err != nil {
return nil, err
}
pb := &persistedBlock{
chunksf: chunksf,
indexf: indexf,
chunks: sr,
index: ir,
baseTime: baset,
stats: stats,
}
return pb, nil
}
@ -74,11 +77,24 @@ func (pb *persistedBlock) Close() error {
return err1
}
func (pb *persistedBlock) Querier(mint, maxt int64) Querier {
return &blockQuerier{
mint: mint,
maxt: maxt,
index: pb.index,
series: pb.chunks,
}
}
func (pb *persistedBlock) interval() (int64, int64) {
return pb.stats.MinTime, pb.stats.MaxTime
}
type persistedBlocks []*persistedBlock
func (p persistedBlocks) Len() int { return len(p) }
func (p persistedBlocks) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p persistedBlocks) Less(i, j int) bool { return p[i].baseTime < p[j].baseTime }
func (p persistedBlocks) Less(i, j int) bool { return p[i].stats.MinTime < p[j].stats.MinTime }
// findBlocks finds time-ordered persisted blocks within a directory.
func findPersistedBlocks(path string) ([]*persistedBlock, error) {

46
db.go
View File

@ -267,12 +267,12 @@ func (s *Shard) appendBatch(ts int64, samples []Sample) error {
}
}
if ts > s.head.highTimestamp {
s.head.highTimestamp = ts
if ts > s.head.stats.MaxTime {
s.head.stats.MaxTime = ts
}
// TODO(fabxc): randomize over time
if s.head.stats().samples/uint64(s.head.stats().chunks) > 400 {
if s.head.stats.SampleCount/uint64(s.head.stats.ChunkCount) > 400 {
select {
case s.persistCh <- struct{}{}:
go s.persist()
@ -283,10 +283,36 @@ func (s *Shard) appendBatch(ts int64, samples []Sample) error {
return nil
}
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
if bmin >= amin && bmin <= amax {
return true
}
if amin >= bmin && amin <= bmax {
return true
}
return false
}
// blocksForRange returns all blocks within the shard that may contain
// data for the given time range.
func (s *Shard) blocksForRange(mint, maxt int64) (bs []Block) {
return []Block{s.head}
func (s *Shard) blocksForInterval(mint, maxt int64) []block {
var bs []block
for _, b := range s.persisted {
bmin, bmax := b.interval()
if intervalOverlap(mint, maxt, bmin, bmax) {
bs = append(bs, b)
}
}
hmin, hmax := s.head.interval()
if intervalOverlap(mint, maxt, hmin, hmax) {
bs = append(bs, s.head)
}
return bs
}
// TODO(fabxc): make configurable.
@ -297,7 +323,7 @@ func (s *Shard) persist() error {
// Set new head block.
head := s.head
s.head = NewHeadBlock(head.highTimestamp)
s.head = NewHeadBlock(head.stats.MaxTime)
s.mtx.Unlock()
@ -307,7 +333,7 @@ func (s *Shard) persist() error {
// TODO(fabxc): add grace period where we can still append to old head shard
// before actually persisting it.
p := filepath.Join(s.path, fmt.Sprintf("%d", head.baseTimestamp))
p := filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MinTime))
if err := os.MkdirAll(p, 0777); err != nil {
return err
@ -323,7 +349,7 @@ func (s *Shard) persist() error {
}
iw := newIndexWriter(xf)
sw := newSeriesWriter(sf, iw, s.head.baseTimestamp)
sw := newSeriesWriter(sf, iw, s.head.stats.MinTime)
defer sw.Close()
defer iw.Close()
@ -334,7 +360,7 @@ func (s *Shard) persist() error {
}
}
if err := iw.WriteStats(nil); err != nil {
if err := iw.WriteStats(s.head.stats); err != nil {
return err
}
for n, v := range head.index.values {
@ -356,7 +382,7 @@ func (s *Shard) persist() error {
sz := fmt.Sprintf("%.2fMiB", float64(sw.Size()+iw.Size())/1024/1024)
s.logger.Log("size", sz, "samples", head.samples, "chunks", head.stats().chunks, "msg", "persisted head")
s.logger.Log("size", sz, "samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head")
return nil
}

45
head.go
View File

@ -14,18 +14,18 @@ type HeadBlock struct {
descs map[uint64][]*chunkDesc // labels hash to possible chunks descs
index *memIndex
samples uint64 // total samples in the block
highTimestamp int64 // highest timestamp of any sample
baseTimestamp int64 // all samples are strictly later
stats BlockStats
}
// NewHeadBlock creates a new empty head block.
func NewHeadBlock(baseTime int64) *HeadBlock {
return &HeadBlock{
b := &HeadBlock{
descs: make(map[uint64][]*chunkDesc, 2048),
index: newMemIndex(),
baseTimestamp: baseTime,
}
b.stats.MinTime = baseTime
return b
}
// Querier returns a new querier over the head block.
@ -33,6 +33,7 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier {
return newBlockQuerier(h, h, mint, maxt)
}
// Chunk returns the chunk for the reference number.
func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) {
c, ok := h.index.forward[ref]
if !ok {
@ -41,9 +42,13 @@ func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) {
return c.chunk, nil
}
func (h *HeadBlock) interval() (int64, int64) {
return h.stats.MinTime, h.stats.MaxTime
}
// Stats returns statisitics about the indexed data.
func (h *HeadBlock) Stats() (*BlockStats, error) {
return nil, nil
func (h *HeadBlock) Stats() (BlockStats, error) {
return h.stats, nil
}
// LabelValues returns the possible label values
@ -79,7 +84,7 @@ func (h *HeadBlock) Series(ref uint32) (Series, error) {
s := &series{
labels: cd.lset,
offsets: []ChunkOffset{
{Value: h.baseTimestamp, Offset: 0},
{Value: h.stats.MinTime, Offset: 0},
},
chunk: func(ref uint32) (chunks.Chunk, error) {
return cd.chunk, nil
@ -104,6 +109,10 @@ func (h *HeadBlock) get(hash uint64, lset Labels) *chunkDesc {
}
h.index.add(cd)
// For the head block there's exactly one chunk per series.
h.stats.ChunkCount++
h.stats.SeriesCount++
h.descs[hash] = append(cds, cd)
return cd
}
@ -113,18 +122,12 @@ func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error
if err := h.get(hash, lset).append(ts, v); err != nil {
return err
}
h.samples++
h.stats.SampleCount++
if ts > h.stats.MaxTime {
h.stats.MaxTime = ts
}
return nil
}
type blockStats struct {
chunks uint32
samples uint64
}
func (h *HeadBlock) stats() *blockStats {
return &blockStats{
chunks: uint32(h.index.numSeries()),
samples: h.samples,
}
}

View File

@ -112,7 +112,7 @@ type shardQuerier struct {
// Querier returns a new querier over the data shard for the given
// time range.
func (s *Shard) Querier(mint, maxt int64) Querier {
blocks := s.blocksForRange(mint, maxt)
blocks := s.blocksForInterval(mint, maxt)
sq := &shardQuerier{
blocks: make([]Querier, 0, len(blocks)),

View File

@ -49,7 +49,7 @@ func (s *seriesReader) Chunk(offset uint32) (chunks.Chunk, error) {
// IndexReader provides reading access of serialized index data.
type IndexReader interface {
// Stats returns statisitics about the indexed data.
Stats() (*BlockStats, error)
Stats() (BlockStats, error)
// LabelValues returns the possible label values
LabelValues(names ...string) (StringTuples, error)
@ -182,8 +182,26 @@ func (r *indexReader) lookupSymbol(o uint32) ([]byte, error) {
return r.b[int(o)+n : end], nil
}
func (r *indexReader) Stats() (*BlockStats, error) {
return nil, nil
func (r *indexReader) Stats() (BlockStats, error) {
flag, b, err := r.section(8)
if err != nil {
return BlockStats{}, err
}
if flag != flagStd {
return BlockStats{}, errInvalidFlag
}
if len(b) != 64 {
return BlockStats{}, errInvalidSize
}
return BlockStats{
MinTime: int64(binary.BigEndian.Uint64(b)),
MaxTime: int64(binary.BigEndian.Uint64(b[8:])),
SeriesCount: binary.BigEndian.Uint32(b[16:]),
ChunkCount: binary.BigEndian.Uint32(b[20:]),
SampleCount: binary.BigEndian.Uint64(b[24:]),
}, nil
}
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
@ -293,6 +311,39 @@ func (r *indexReader) Series(ref uint32) (Series, error) {
return s, nil
}
func (r *indexReader) Postings(name, value string) (Postings, error) {
key := name + string(sep) + value
off, ok := r.postings[key]
if !ok {
return nil, errNotFound
}
flag, b, err := r.section(off)
if err != nil {
return nil, err
}
if flag != flagStd {
return nil, errInvalidFlag
}
// TODO(fabxc): just read into memory as an intermediate solution.
// Add iterator over serialized data.
var l []uint32
for len(b) > 0 {
if len(b) < 4 {
return nil, errInvalidSize
}
l = append(l, binary.BigEndian.Uint32(b[:4]))
b = b[4:]
}
return &listIterator{list: l, idx: -1}, nil
}
type series struct {
labels Labels
offsets []ChunkOffset // in-order chunk refs

View File

@ -2,6 +2,7 @@ package tsdb
import (
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"os"
@ -146,6 +147,11 @@ type ChunkOffset struct {
}
type BlockStats struct {
MinTime int64
MaxTime int64
SampleCount uint64
SeriesCount uint32
ChunkCount uint32
}
// IndexWriter serialized the index for a block of series data.
@ -158,7 +164,7 @@ type IndexWriter interface {
AddSeries(ref uint32, l Labels, o ...ChunkOffset)
// WriteStats writes final stats for the indexed block.
WriteStats(*BlockStats) error
WriteStats(BlockStats) error
// WriteLabelIndex serializes an index from label names to values.
// The passed in values chained tuples of strings of the length of names.
@ -249,18 +255,36 @@ func (w *indexWriter) AddSeries(ref uint32, lset Labels, offsets ...ChunkOffset)
}
}
func (w *indexWriter) WriteStats(*BlockStats) error {
if w.n == 0 {
func (w *indexWriter) WriteStats(stats BlockStats) error {
if w.n != 0 {
return fmt.Errorf("WriteStats must be called first")
}
if err := w.writeMeta(); err != nil {
return err
}
b := [64]byte{}
binary.BigEndian.PutUint64(b[0:], uint64(stats.MinTime))
binary.BigEndian.PutUint64(b[8:], uint64(stats.MaxTime))
binary.BigEndian.PutUint32(b[16:], stats.SeriesCount)
binary.BigEndian.PutUint32(b[20:], stats.ChunkCount)
binary.BigEndian.PutUint64(b[24:], stats.SampleCount)
err := w.section(64, flagStd, func(wr io.Writer) error {
return w.write(wr, b[:])
})
if err != nil {
return err
}
if err := w.writeSymbols(); err != nil {
return err
}
if err := w.writeSeries(); err != nil {
return err
}
}
return nil
}