diff --git a/block.go b/block.go index 37deab584..acfd4c733 100644 --- a/block.go +++ b/block.go @@ -6,12 +6,13 @@ import ( "os" "path/filepath" "sort" - "strconv" ) // Block handles reads against a block of time series data within a time window. -type Block interface { +type block interface { Querier(mint, maxt int64) Querier + + interval() (int64, int64) } const ( @@ -25,15 +26,12 @@ type persistedBlock struct { chunks *seriesReader index *indexReader - baseTime int64 + stats BlockStats } func newPersistedBlock(path string) (*persistedBlock, error) { // The directory must be named after the base timestamp for the block. - baset, err := strconv.ParseInt(filepath.Base(path), 10, 0) - if err != nil { - return nil, fmt.Errorf("unexpected directory name %q: %s", path, err) - } + // TODO(fabxc): validate match of name and stats time, validate magic. // mmap files belonging to the block. chunksf, err := openMmapFile(chunksFileName(path)) @@ -54,12 +52,17 @@ func newPersistedBlock(path string) (*persistedBlock, error) { return nil, err } + stats, err := ir.Stats() + if err != nil { + return nil, err + } + pb := &persistedBlock{ - chunksf: chunksf, - indexf: indexf, - chunks: sr, - index: ir, - baseTime: baset, + chunksf: chunksf, + indexf: indexf, + chunks: sr, + index: ir, + stats: stats, } return pb, nil } @@ -74,11 +77,24 @@ func (pb *persistedBlock) Close() error { return err1 } +func (pb *persistedBlock) Querier(mint, maxt int64) Querier { + return &blockQuerier{ + mint: mint, + maxt: maxt, + index: pb.index, + series: pb.chunks, + } +} + +func (pb *persistedBlock) interval() (int64, int64) { + return pb.stats.MinTime, pb.stats.MaxTime +} + type persistedBlocks []*persistedBlock func (p persistedBlocks) Len() int { return len(p) } func (p persistedBlocks) Swap(i, j int) { p[i], p[j] = p[j], p[i] } -func (p persistedBlocks) Less(i, j int) bool { return p[i].baseTime < p[j].baseTime } +func (p persistedBlocks) Less(i, j int) bool { return p[i].stats.MinTime < p[j].stats.MinTime } // findBlocks finds time-ordered persisted blocks within a directory. func findPersistedBlocks(path string) ([]*persistedBlock, error) { diff --git a/db.go b/db.go index 107bbf237..e9abe73e7 100644 --- a/db.go +++ b/db.go @@ -267,12 +267,12 @@ func (s *Shard) appendBatch(ts int64, samples []Sample) error { } } - if ts > s.head.highTimestamp { - s.head.highTimestamp = ts + if ts > s.head.stats.MaxTime { + s.head.stats.MaxTime = ts } // TODO(fabxc): randomize over time - if s.head.stats().samples/uint64(s.head.stats().chunks) > 400 { + if s.head.stats.SampleCount/uint64(s.head.stats.ChunkCount) > 400 { select { case s.persistCh <- struct{}{}: go s.persist() @@ -283,10 +283,36 @@ func (s *Shard) appendBatch(ts int64, samples []Sample) error { return nil } +func intervalOverlap(amin, amax, bmin, bmax int64) bool { + if bmin >= amin && bmin <= amax { + return true + } + if amin >= bmin && amin <= bmax { + return true + } + return false +} + // blocksForRange returns all blocks within the shard that may contain // data for the given time range. -func (s *Shard) blocksForRange(mint, maxt int64) (bs []Block) { - return []Block{s.head} +func (s *Shard) blocksForInterval(mint, maxt int64) []block { + var bs []block + + for _, b := range s.persisted { + bmin, bmax := b.interval() + + if intervalOverlap(mint, maxt, bmin, bmax) { + bs = append(bs, b) + } + } + + hmin, hmax := s.head.interval() + + if intervalOverlap(mint, maxt, hmin, hmax) { + bs = append(bs, s.head) + } + + return bs } // TODO(fabxc): make configurable. @@ -297,7 +323,7 @@ func (s *Shard) persist() error { // Set new head block. head := s.head - s.head = NewHeadBlock(head.highTimestamp) + s.head = NewHeadBlock(head.stats.MaxTime) s.mtx.Unlock() @@ -307,7 +333,7 @@ func (s *Shard) persist() error { // TODO(fabxc): add grace period where we can still append to old head shard // before actually persisting it. - p := filepath.Join(s.path, fmt.Sprintf("%d", head.baseTimestamp)) + p := filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MinTime)) if err := os.MkdirAll(p, 0777); err != nil { return err @@ -323,7 +349,7 @@ func (s *Shard) persist() error { } iw := newIndexWriter(xf) - sw := newSeriesWriter(sf, iw, s.head.baseTimestamp) + sw := newSeriesWriter(sf, iw, s.head.stats.MinTime) defer sw.Close() defer iw.Close() @@ -334,7 +360,7 @@ func (s *Shard) persist() error { } } - if err := iw.WriteStats(nil); err != nil { + if err := iw.WriteStats(s.head.stats); err != nil { return err } for n, v := range head.index.values { @@ -356,7 +382,7 @@ func (s *Shard) persist() error { sz := fmt.Sprintf("%.2fMiB", float64(sw.Size()+iw.Size())/1024/1024) - s.logger.Log("size", sz, "samples", head.samples, "chunks", head.stats().chunks, "msg", "persisted head") + s.logger.Log("size", sz, "samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head") return nil } diff --git a/head.go b/head.go index ac9ca93dd..c27d72958 100644 --- a/head.go +++ b/head.go @@ -14,18 +14,18 @@ type HeadBlock struct { descs map[uint64][]*chunkDesc // labels hash to possible chunks descs index *memIndex - samples uint64 // total samples in the block - highTimestamp int64 // highest timestamp of any sample - baseTimestamp int64 // all samples are strictly later + stats BlockStats } // NewHeadBlock creates a new empty head block. func NewHeadBlock(baseTime int64) *HeadBlock { - return &HeadBlock{ - descs: make(map[uint64][]*chunkDesc, 2048), - index: newMemIndex(), - baseTimestamp: baseTime, + b := &HeadBlock{ + descs: make(map[uint64][]*chunkDesc, 2048), + index: newMemIndex(), } + b.stats.MinTime = baseTime + + return b } // Querier returns a new querier over the head block. @@ -33,6 +33,7 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier { return newBlockQuerier(h, h, mint, maxt) } +// Chunk returns the chunk for the reference number. func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) { c, ok := h.index.forward[ref] if !ok { @@ -41,9 +42,13 @@ func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) { return c.chunk, nil } +func (h *HeadBlock) interval() (int64, int64) { + return h.stats.MinTime, h.stats.MaxTime +} + // Stats returns statisitics about the indexed data. -func (h *HeadBlock) Stats() (*BlockStats, error) { - return nil, nil +func (h *HeadBlock) Stats() (BlockStats, error) { + return h.stats, nil } // LabelValues returns the possible label values @@ -79,7 +84,7 @@ func (h *HeadBlock) Series(ref uint32) (Series, error) { s := &series{ labels: cd.lset, offsets: []ChunkOffset{ - {Value: h.baseTimestamp, Offset: 0}, + {Value: h.stats.MinTime, Offset: 0}, }, chunk: func(ref uint32) (chunks.Chunk, error) { return cd.chunk, nil @@ -104,6 +109,10 @@ func (h *HeadBlock) get(hash uint64, lset Labels) *chunkDesc { } h.index.add(cd) + // For the head block there's exactly one chunk per series. + h.stats.ChunkCount++ + h.stats.SeriesCount++ + h.descs[hash] = append(cds, cd) return cd } @@ -113,18 +122,12 @@ func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error if err := h.get(hash, lset).append(ts, v); err != nil { return err } - h.samples++ + + h.stats.SampleCount++ + + if ts > h.stats.MaxTime { + h.stats.MaxTime = ts + } + return nil } - -type blockStats struct { - chunks uint32 - samples uint64 -} - -func (h *HeadBlock) stats() *blockStats { - return &blockStats{ - chunks: uint32(h.index.numSeries()), - samples: h.samples, - } -} diff --git a/querier.go b/querier.go index ee38f2029..3a970d740 100644 --- a/querier.go +++ b/querier.go @@ -112,7 +112,7 @@ type shardQuerier struct { // Querier returns a new querier over the data shard for the given // time range. func (s *Shard) Querier(mint, maxt int64) Querier { - blocks := s.blocksForRange(mint, maxt) + blocks := s.blocksForInterval(mint, maxt) sq := &shardQuerier{ blocks: make([]Querier, 0, len(blocks)), diff --git a/reader.go b/reader.go index 83ede2ad5..73e88de33 100644 --- a/reader.go +++ b/reader.go @@ -49,7 +49,7 @@ func (s *seriesReader) Chunk(offset uint32) (chunks.Chunk, error) { // IndexReader provides reading access of serialized index data. type IndexReader interface { // Stats returns statisitics about the indexed data. - Stats() (*BlockStats, error) + Stats() (BlockStats, error) // LabelValues returns the possible label values LabelValues(names ...string) (StringTuples, error) @@ -182,8 +182,26 @@ func (r *indexReader) lookupSymbol(o uint32) ([]byte, error) { return r.b[int(o)+n : end], nil } -func (r *indexReader) Stats() (*BlockStats, error) { - return nil, nil +func (r *indexReader) Stats() (BlockStats, error) { + flag, b, err := r.section(8) + if err != nil { + return BlockStats{}, err + } + if flag != flagStd { + return BlockStats{}, errInvalidFlag + } + + if len(b) != 64 { + return BlockStats{}, errInvalidSize + } + + return BlockStats{ + MinTime: int64(binary.BigEndian.Uint64(b)), + MaxTime: int64(binary.BigEndian.Uint64(b[8:])), + SeriesCount: binary.BigEndian.Uint32(b[16:]), + ChunkCount: binary.BigEndian.Uint32(b[20:]), + SampleCount: binary.BigEndian.Uint64(b[24:]), + }, nil } func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { @@ -293,6 +311,39 @@ func (r *indexReader) Series(ref uint32) (Series, error) { return s, nil } +func (r *indexReader) Postings(name, value string) (Postings, error) { + key := name + string(sep) + value + + off, ok := r.postings[key] + if !ok { + return nil, errNotFound + } + + flag, b, err := r.section(off) + if err != nil { + return nil, err + } + + if flag != flagStd { + return nil, errInvalidFlag + } + + // TODO(fabxc): just read into memory as an intermediate solution. + // Add iterator over serialized data. + var l []uint32 + + for len(b) > 0 { + if len(b) < 4 { + return nil, errInvalidSize + } + l = append(l, binary.BigEndian.Uint32(b[:4])) + + b = b[4:] + } + + return &listIterator{list: l, idx: -1}, nil +} + type series struct { labels Labels offsets []ChunkOffset // in-order chunk refs diff --git a/writer.go b/writer.go index 5358eb612..83f8449ee 100644 --- a/writer.go +++ b/writer.go @@ -2,6 +2,7 @@ package tsdb import ( "encoding/binary" + "fmt" "hash/crc32" "io" "os" @@ -146,6 +147,11 @@ type ChunkOffset struct { } type BlockStats struct { + MinTime int64 + MaxTime int64 + SampleCount uint64 + SeriesCount uint32 + ChunkCount uint32 } // IndexWriter serialized the index for a block of series data. @@ -158,7 +164,7 @@ type IndexWriter interface { AddSeries(ref uint32, l Labels, o ...ChunkOffset) // WriteStats writes final stats for the indexed block. - WriteStats(*BlockStats) error + WriteStats(BlockStats) error // WriteLabelIndex serializes an index from label names to values. // The passed in values chained tuples of strings of the length of names. @@ -249,17 +255,35 @@ func (w *indexWriter) AddSeries(ref uint32, lset Labels, offsets ...ChunkOffset) } } -func (w *indexWriter) WriteStats(*BlockStats) error { - if w.n == 0 { - if err := w.writeMeta(); err != nil { - return err - } - if err := w.writeSymbols(); err != nil { - return err - } - if err := w.writeSeries(); err != nil { - return err - } +func (w *indexWriter) WriteStats(stats BlockStats) error { + if w.n != 0 { + return fmt.Errorf("WriteStats must be called first") + } + + if err := w.writeMeta(); err != nil { + return err + } + + b := [64]byte{} + + binary.BigEndian.PutUint64(b[0:], uint64(stats.MinTime)) + binary.BigEndian.PutUint64(b[8:], uint64(stats.MaxTime)) + binary.BigEndian.PutUint32(b[16:], stats.SeriesCount) + binary.BigEndian.PutUint32(b[20:], stats.ChunkCount) + binary.BigEndian.PutUint64(b[24:], stats.SampleCount) + + err := w.section(64, flagStd, func(wr io.Writer) error { + return w.write(wr, b[:]) + }) + if err != nil { + return err + } + + if err := w.writeSymbols(); err != nil { + return err + } + if err := w.writeSeries(); err != nil { + return err } return nil }