diff --git a/block.go b/block.go index 35b5e1828..814d2f87f 100644 --- a/block.go +++ b/block.go @@ -4,6 +4,7 @@ import ( "os" "path/filepath" "sort" + "sync" "github.com/coreos/etcd/pkg/fileutil" "github.com/pkg/errors" @@ -20,11 +21,14 @@ type block interface { } type BlockStats struct { - MinTime int64 - MaxTime int64 + // Time range of samples in the block. + MinTime, MaxTime int64 + SampleCount uint64 - SeriesCount uint32 - ChunkCount uint32 + SeriesCount uint64 + ChunkCount uint64 + + mtx sync.RWMutex } const ( diff --git a/compact.go b/compact.go index fd9a556e2..62607d69f 100644 --- a/compact.go +++ b/compact.go @@ -219,7 +219,7 @@ func (c *compactor) write(blocks []block, indexw IndexWriter, chunkw SeriesWrite return err } - stats.ChunkCount += uint32(len(chunks)) + stats.ChunkCount += uint64(len(chunks)) stats.SeriesCount++ for _, l := range lset { diff --git a/db.go b/db.go index 28277fcd8..ac57bc710 100644 --- a/db.go +++ b/db.go @@ -352,8 +352,7 @@ func (db *DB) appendBatch(samples []hashedSample) error { db.metrics.samplesAppended.Add(float64(len(samples))) } - // TODO(fabxc): randomize over time and use better scoring function. - if head.bstats.SampleCount/(uint64(head.bstats.ChunkCount)+1) > 250 { + if head.fullness() > 1.0 { select { case db.cutc <- struct{}{}: default: diff --git a/head.go b/head.go index 81b2b0ef4..5685dba0b 100644 --- a/head.go +++ b/head.go @@ -5,7 +5,6 @@ import ( "math" "sort" "sync" - "sync/atomic" "time" "github.com/bradfitz/slice" @@ -98,7 +97,13 @@ func (h *HeadBlock) dir() string { return h.d } func (h *HeadBlock) persisted() bool { return false } func (h *HeadBlock) index() IndexReader { return h } func (h *HeadBlock) series() SeriesReader { return h } -func (h *HeadBlock) stats() BlockStats { return *h.bstats } + +func (h *HeadBlock) stats() BlockStats { + h.bstats.mtx.RLock() + defer h.bstats.mtx.RUnlock() + + return *h.bstats +} // Chunk returns the chunk for the reference number. func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) { @@ -221,10 +226,6 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc { h.postings.add(cd.ref, term{}) - // For the head block there's exactly one chunk per series. - atomic.AddUint32(&h.bstats.ChunkCount, 1) - atomic.AddUint32(&h.bstats.SeriesCount, 1) - return cd } @@ -307,8 +308,11 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { h.mtx.RLock() } - total := len(samples) - + var ( + total = uint64(len(samples)) + mint = int64(math.MaxInt64) + maxt = int64(math.MinInt64) + ) for _, s := range samples { cd := h.descs[s.ref] // Skip duplicate samples. @@ -318,23 +322,38 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { } cd.append(s.t, s.v) - for t := h.bstats.MaxTime; s.t > t; t = h.bstats.MaxTime { - if atomic.CompareAndSwapInt64(&h.bstats.MaxTime, t, s.t) { - break - } + if mint > s.t { + mint = s.t } - for t := h.bstats.MinTime; s.t < t; t = h.bstats.MinTime { - if atomic.CompareAndSwapInt64(&h.bstats.MinTime, t, s.t) { - break - } + if maxt < s.t { + maxt = s.t } } - atomic.AddUint64(&h.bstats.SampleCount, uint64(total)) + h.bstats.mtx.Lock() + defer h.bstats.mtx.Unlock() + + h.bstats.SampleCount += total + h.bstats.SeriesCount += uint64(len(newSeries)) + h.bstats.ChunkCount += uint64(len(newSeries)) // head block has one chunk/series + + if mint < h.bstats.MinTime { + h.bstats.MinTime = mint + } + if maxt > h.bstats.MaxTime { + h.bstats.MaxTime = maxt + } return nil } +func (h *HeadBlock) fullness() float64 { + h.bstats.mtx.RLock() + defer h.bstats.mtx.RUnlock() + + return float64(h.bstats.SampleCount) / float64(h.bstats.SeriesCount+1) / 250 +} + func (h *HeadBlock) updateMapping() { h.mapper.mtx.Lock() defer h.mapper.mtx.Unlock() diff --git a/reader.go b/reader.go index 06a0cab77..3f2ede6b7 100644 --- a/reader.go +++ b/reader.go @@ -212,9 +212,9 @@ func (r *indexReader) Stats() (BlockStats, error) { return BlockStats{ MinTime: int64(binary.BigEndian.Uint64(b)), MaxTime: int64(binary.BigEndian.Uint64(b[8:])), - SeriesCount: binary.BigEndian.Uint32(b[16:]), - ChunkCount: binary.BigEndian.Uint32(b[20:]), - SampleCount: binary.BigEndian.Uint64(b[24:]), + SeriesCount: binary.BigEndian.Uint64(b[16:]), + ChunkCount: binary.BigEndian.Uint64(b[24:]), + SampleCount: binary.BigEndian.Uint64(b[32:]), }, nil } diff --git a/wal_test.go b/wal_test.go index afd93f719..4baabf094 100644 --- a/wal_test.go +++ b/wal_test.go @@ -5,6 +5,7 @@ import ( "io/ioutil" "os" "testing" + "time" "github.com/fabxc/tsdb/labels" dto "github.com/prometheus/client_model/go" @@ -21,7 +22,7 @@ func BenchmarkWALWrite(b *testing.B) { require.NoError(b, os.RemoveAll(d)) }() - wal, err := OpenWAL(d) + wal, err := OpenWAL(d, nil, 500*time.Millisecond) require.NoError(b, err) f, err := os.Open("cmd/tsdb/testdata.1m") @@ -78,7 +79,7 @@ func BenchmarkWALRead(b *testing.B) { require.NoError(b, os.RemoveAll(d)) }() - wal, err := OpenWAL(d) + wal, err := OpenWAL(d, nil, 500*time.Millisecond) require.NoError(b, err) var ( @@ -111,7 +112,7 @@ func BenchmarkWALRead(b *testing.B) { b.ResetTimer() - wal, err = OpenWAL(d) + wal, err = OpenWAL(d, nil, 500*time.Millisecond) require.NoError(b, err) var numSeries, numSamples int @@ -144,7 +145,7 @@ func BenchmarkWALReadIntoHead(b *testing.B) { require.NoError(b, os.RemoveAll(d)) }() - wal, err := OpenWAL(d) + wal, err := OpenWAL(d, nil, 500*time.Millisecond) require.NoError(b, err) var ( @@ -177,7 +178,7 @@ func BenchmarkWALReadIntoHead(b *testing.B) { b.ResetTimer() - _, err = OpenHeadBlock(d) + _, err = OpenWAL(d, nil, 500*time.Millisecond) require.NoError(b, err) // stat, _ := head.wal.f.Stat() diff --git a/writer.go b/writer.go index 8e88770cd..f91ca84b3 100644 --- a/writer.go +++ b/writer.go @@ -259,9 +259,9 @@ func (w *indexWriter) WriteStats(stats BlockStats) error { binary.BigEndian.PutUint64(b[0:], uint64(stats.MinTime)) binary.BigEndian.PutUint64(b[8:], uint64(stats.MaxTime)) - binary.BigEndian.PutUint32(b[16:], stats.SeriesCount) - binary.BigEndian.PutUint32(b[20:], stats.ChunkCount) - binary.BigEndian.PutUint64(b[24:], stats.SampleCount) + binary.BigEndian.PutUint64(b[16:], stats.SeriesCount) + binary.BigEndian.PutUint64(b[24:], stats.ChunkCount) + binary.BigEndian.PutUint64(b[32:], stats.SampleCount) err := w.section(64, flagStd, func(wr io.Writer) error { return w.write(wr, b[:])