diff --git a/block.go b/block.go index dcc2e58ec..aaa2b234c 100644 --- a/block.go +++ b/block.go @@ -119,7 +119,6 @@ func findBlocks(path string) ([]*persistedBlock, *HeadBlock, error) { p := filepath.Join(path, fi.Name()) if _, err := os.Stat(chunksFileName(p)); os.IsNotExist(err) { - fmt.Println("found head dir", p) if head != nil { return nil, nil, errors.Errorf("found two head blocks") } diff --git a/chunks/chunk_test.go b/chunks/chunk_test.go index e45343c41..a00b97394 100644 --- a/chunks/chunk_test.go +++ b/chunks/chunk_test.go @@ -86,8 +86,8 @@ func benchmarkIterator(b *testing.B, newChunk func() Chunk) { ) var exp []pair for i := 0; i < b.N; i++ { - t += int64(rand.Intn(10000) + 1) - // t += int64(1000) + // t += int64(rand.Intn(10000) + 1) + t += int64(1000) // v = rand.Float64() v += float64(100) exp = append(exp, pair{t: t, v: v}) @@ -154,8 +154,8 @@ func benchmarkAppender(b *testing.B, newChunk func() Chunk) { ) var exp []pair for i := 0; i < b.N; i++ { - t += int64(rand.Intn(10000) + 1) - // t += int64(1000) + // t += int64(rand.Intn(10000) + 1) + t += int64(1000) // v = rand.Float64() v += float64(100) exp = append(exp, pair{t: t, v: v}) diff --git a/db.go b/db.go index 917d34118..70830ebb3 100644 --- a/db.go +++ b/db.go @@ -175,15 +175,17 @@ const sep = '\xff' // Shard handles reads and writes of time series falling into // a hashed shard of a series. type Shard struct { - path string - persistCh chan struct{} - logger log.Logger - metrics *shardMetrics + path string + logger log.Logger + metrics *shardMetrics mtx sync.RWMutex persisted persistedBlocks head *HeadBlock compactor *compactor + + donec chan struct{} + persistc chan struct{} } type shardMetrics struct { @@ -242,7 +244,6 @@ func OpenShard(path string, i int, logger log.Logger) (*Shard, error) { } // TODO(fabxc): get time from client-defined `now` function. - // baset := time.Now().UnixNano() / int64(time.Millisecond) baset := time.Unix(0, 0).UnixNano() / int64(time.Millisecond) if len(pbs) > 0 { baset = pbs[len(pbs)-1].stats.MaxTime @@ -256,35 +257,52 @@ func OpenShard(path string, i int, logger log.Logger) (*Shard, error) { s := &Shard{ path: path, - persistCh: make(chan struct{}, 1), logger: logger, metrics: newShardMetrics(prometheus.DefaultRegisterer, i), head: head, persisted: pbs, + persistc: make(chan struct{}, 1), + donec: make(chan struct{}), } - s.compactor, err = newCompactor(s, logger) - if err != nil { + if s.compactor, err = newCompactor(s, logger); err != nil { return nil, err } + go s.run() return s, nil } +func (s *Shard) run() { + for range s.persistc { + start := time.Now() + + if err := s.persist(); err != nil { + s.logger.Log("msg", "persistence error", "err", err) + } + + s.metrics.persistenceDuration.Observe(time.Since(start).Seconds()) + s.metrics.persistences.Inc() + } + close(s.donec) +} + // Close the shard. func (s *Shard) Close() error { + close(s.persistc) + <-s.donec + + var merr MultiError + merr.Add(s.compactor.Close()) + s.mtx.Lock() defer s.mtx.Unlock() - var e MultiError - - e.Add(s.compactor.Close()) - for _, pb := range s.persisted { - e.Add(pb.Close()) + merr.Add(pb.Close()) } - e.Add(s.head.Close()) + merr.Add(s.head.Close()) - return e.Err() + return merr.Err() } func (s *Shard) appendBatch(samples []hashedSample) error { @@ -305,16 +323,7 @@ func (s *Shard) appendBatch(samples []hashedSample) error { // TODO(fabxc): randomize over time and use better scoring function. if s.head.stats.SampleCount/(uint64(s.head.stats.ChunkCount)+1) > 400 { select { - case s.persistCh <- struct{}{}: - go func() { - start := time.Now() - defer func() { s.metrics.persistenceDuration.Observe(time.Since(start).Seconds()) }() - - if err := s.persist(); err != nil { - s.logger.Log("msg", "persistence error", "err", err) - } - s.metrics.persistences.Inc() - }() + case s.persistc <- struct{}{}: default: } } @@ -375,12 +384,6 @@ func (s *Shard) persist() error { s.mtx.Unlock() - // Only allow another persistence to be triggered after the current one - // has completed (successful or not.) - defer func() { - <-s.persistCh - }() - // TODO(fabxc): add grace period where we can still append to old head shard // before actually persisting it. dir := filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MinTime)) diff --git a/head.go b/head.go index 49acccdce..bd86df2f4 100644 --- a/head.go +++ b/head.go @@ -187,8 +187,13 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc { } var ( + // ErrOutOfOrderSample is returned if an appended sample has a + // timestamp larger than the most recent sample. ErrOutOfOrderSample = errors.New("out of order sample") - ErrAmendSample = errors.New("amending sample") + + // ErrAmendSample is returned if an appended sample has the same timestamp + // as the most recent sample but a different value. + ErrAmendSample = errors.New("amending sample") ) func (h *HeadBlock) appendBatch(samples []hashedSample) error {