diff --git a/block.go b/block.go index 2ecc7687c..4939ee5d3 100644 --- a/block.go +++ b/block.go @@ -2,6 +2,7 @@ package tsdb import ( "encoding/json" + "fmt" "io/ioutil" "os" "path/filepath" @@ -174,6 +175,10 @@ func (pb *persistedBlock) Close() error { return merr.Err() } +func (pb *persistedBlock) String() string { + return fmt.Sprintf("(%d, %s)", pb.meta.Sequence, pb.meta.ULID) +} + func (pb *persistedBlock) Querier(mint, maxt int64) Querier { return &blockQuerier{ mint: mint, diff --git a/compact.go b/compact.go index a1d7b1e5c..a1720605a 100644 --- a/compact.go +++ b/compact.go @@ -1,6 +1,7 @@ package tsdb import ( + "fmt" "math/rand" "os" "path/filepath" @@ -8,6 +9,7 @@ import ( "github.com/coreos/etcd/pkg/fileutil" "github.com/fabxc/tsdb/labels" + "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -32,6 +34,7 @@ type Compactor interface { // compactor implements the Compactor interface. type compactor struct { metrics *compactorMetrics + logger log.Logger opts *compactorOptions } @@ -71,9 +74,10 @@ type compactorOptions struct { maxBlockRange uint64 } -func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor { +func newCompactor(r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor { return &compactor{ opts: opts, + logger: l, metrics: newCompactorMetrics(r), } } @@ -178,6 +182,8 @@ func (c *compactor) Write(dir string, b Block) error { // write creates a new block that is the union of the provided blocks into dir. // It cleans up all files of the old blocks after completing successfully. func (c *compactor) write(dir string, blocks ...Block) (err error) { + c.logger.Log("msg", "compact blocks", "blocks", fmt.Sprintf("%v", blocks)) + defer func(t time.Time) { if err != nil { c.metrics.failed.Inc() diff --git a/db.go b/db.go index 6955477cd..2c00b7d79 100644 --- a/db.go +++ b/db.go @@ -174,7 +174,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db donec: make(chan struct{}), stopc: make(chan struct{}), } - db.compactor = newCompactor(r, &compactorOptions{ + db.compactor = newCompactor(r, l, &compactorOptions{ maxBlockRange: opts.MaxBlockDuration, }) @@ -269,14 +269,10 @@ func (db *DB) compact() (changes bool, err error) { db.headmtx.RUnlock() - db.logger.Log("msg", "picked singles", "singles", fmt.Sprintf("%v", singles)) -Loop: for _, h := range singles { - db.logger.Log("msg", "write head", "seq", h.Meta().Sequence, "dir", h.Dir(), "ulid", h.Meta().ULID) - select { case <-db.stopc: - break Loop + return changes, nil default: } @@ -295,16 +291,15 @@ Loop: select { case <-db.stopc: - return false, nil + return changes, nil default: } + // We just execute compactions sequentially to not cause too extreme // CPU and memory spikes. // TODO(fabxc): return more descriptive plans in the future that allow // estimation of resource usage and conditional parallelization? for _, p := range plans { - db.logger.Log("msg", "compact blocks", "seq", fmt.Sprintf("%v", p)) - if err := db.compactor.Compact(p...); err != nil { return changes, errors.Wrapf(err, "compact %s", p) } diff --git a/head.go b/head.go index 61a131360..4864276f9 100644 --- a/head.go +++ b/head.go @@ -135,6 +135,10 @@ func (h *headBlock) inBounds(t int64) bool { return t >= h.meta.MinTime && t <= h.meta.MaxTime } +func (h *headBlock) String() string { + return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID) +} + // Close syncs all data and closes underlying resources of the head block. func (h *headBlock) Close() error { h.mtx.Lock()