From 7b02bfee0ac7d93a182bc66dae476f16289955a0 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 18 Sep 2017 12:32:17 +0200 Subject: [PATCH] web: start web handler while TSDB is starting up --- cmd/prometheus/main.go | 82 +++++++++++++--------- retrieval/scrape.go | 1 + storage/tsdb/tsdb.go | 58 +++++++++++++++ vendor/github.com/prometheus/tsdb/block.go | 5 -- vendor/github.com/prometheus/tsdb/db.go | 3 +- vendor/github.com/prometheus/tsdb/head.go | 46 +++++++----- vendor/github.com/prometheus/tsdb/index.go | 3 + vendor/github.com/prometheus/tsdb/wal.go | 15 ++-- vendor/vendor.json | 14 ++-- web/api/v2/api.go | 33 +++++---- web/web.go | 38 +++++----- web/web_test.go | 6 +- 12 files changed, 195 insertions(+), 109 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index b9f0c1ee4..115c0a446 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -215,35 +215,17 @@ func main() { level.Info(logger).Log("build_context", version.BuildContext()) level.Info(logger).Log("host_details", Uname()) - var ( - // sampleAppender = storage.Fanout{} - reloadables []Reloadable - ) - // Make sure that sighup handler is registered with a redirect to the channel before the potentially // long and synchronous tsdb init. hup := make(chan os.Signal) hupReady := make(chan bool) signal.Notify(hup, syscall.SIGHUP) - level.Info(logger).Log("msg", "Starting TSDB") - - localStorage, err := tsdb.Open( - cfg.localStoragePath, - log.With(logger, "component", "tsdb"), - prometheus.DefaultRegisterer, - &cfg.tsdb, + var ( + localStorage = &tsdb.ReadyStorage{} + remoteStorage = remote.NewStorage(log.With(logger, "component", "remote")) + fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) ) - if err != nil { - level.Error(logger).Log("msg", "Opening TSDB failed", "err", err) - os.Exit(1) - } - - level.Info(logger).Log("msg", "TSDB succesfully started") - - remoteStorage := remote.NewStorage(log.With(logger, "component", "remote")) - reloadables = append(reloadables, remoteStorage) - fanoutStorage := storage.NewFanout(logger, tsdb.Adapter(localStorage), remoteStorage) cfg.queryEngine.Logger = log.With(logger, "component", "query engine") var ( @@ -263,7 +245,8 @@ func main() { }) cfg.web.Context = ctx - cfg.web.Storage = localStorage + cfg.web.TSDB = localStorage.Get + cfg.web.Storage = fanoutStorage cfg.web.QueryEngine = queryEngine cfg.web.TargetManager = targetManager cfg.web.RuleManager = ruleManager @@ -285,11 +268,12 @@ func main() { webHandler := web.New(log.With(logger, "component", "web"), &cfg.web) - reloadables = append(reloadables, targetManager, ruleManager, webHandler, notifier) - - if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil { - level.Error(logger).Log("msg", "Error loading config", "err", err) - os.Exit(1) + reloadables := []Reloadable{ + remoteStorage, + targetManager, + ruleManager, + webHandler, + notifier, } // Wait for reload or termination signals. Start the handler for SIGHUP as @@ -314,14 +298,29 @@ func main() { } }() - // Start all components. The order is NOT arbitrary. - defer func() { - if err := fanoutStorage.Close(); err != nil { - level.Error(logger).Log("msg", "Closing storage(s) failed", "err", err) - } - }() + // Start all components while we wait for TSDB to open but only load + // initial config and mark ourselves as ready after it completed. + dbOpen := make(chan struct{}) - // defer remoteStorage.Stop() + go func() { + defer close(dbOpen) + + level.Info(logger).Log("msg", "Starting TSDB") + + db, err := tsdb.Open( + cfg.localStoragePath, + log.With(logger, "component", "tsdb"), + prometheus.DefaultRegisterer, + &cfg.tsdb, + ) + if err != nil { + level.Error(logger).Log("msg", "Opening storage failed", "err", err) + os.Exit(1) + } + level.Info(logger).Log("msg", "TSDB started") + + localStorage.Set(db) + }() prometheus.MustRegister(configSuccess) prometheus.MustRegister(configSuccessTime) @@ -344,6 +343,19 @@ func main() { errc := make(chan error) go func() { errc <- webHandler.Run(ctx) }() + <-dbOpen + + if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil { + level.Error(logger).Log("msg", "Error loading config", "err", err) + os.Exit(1) + } + + defer func() { + if err := fanoutStorage.Close(); err != nil { + level.Error(logger).Log("msg", "Error stopping storage", "err", err) + } + }() + // Wait for reload or termination signals. close(hupReady) // Unblock SIGHUP handler. diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 947701da9..a176dbd94 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -914,6 +914,7 @@ loop: added++ continue default: + level.Debug(sl.l).Log("msg", "unexpected error", "series", string(met), "err", err) break loop } if tp == nil { diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index 4c497cafd..0077b30fd 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -14,6 +14,7 @@ package tsdb import ( + "sync" "time" "unsafe" @@ -27,6 +28,63 @@ import ( tsdbLabels "github.com/prometheus/tsdb/labels" ) +// ErrNotReady is returned if the underlying storage is not ready yet. +var ErrNotReady = errors.New("TSDB not ready") + +// ReadyStorage implements the Storage interface while allowing to set the actual +// storage at a later point in time. +type ReadyStorage struct { + mtx sync.RWMutex + a *adapter +} + +// Set the storage. +func (s *ReadyStorage) Set(db *tsdb.DB) { + s.mtx.Lock() + defer s.mtx.Unlock() + + s.a = &adapter{db: db} +} + +// Get the storage. +func (s *ReadyStorage) Get() *tsdb.DB { + if x := s.get(); x != nil { + return x.db + } + return nil +} + +func (s *ReadyStorage) get() *adapter { + s.mtx.RLock() + x := s.a + s.mtx.RUnlock() + return x +} + +// Querier implements the Storage interface. +func (s *ReadyStorage) Querier(mint, maxt int64) (storage.Querier, error) { + if x := s.get(); x != nil { + return x.Querier(mint, maxt) + } + return nil, ErrNotReady +} + +// Appender implements the Storage interface. +func (s *ReadyStorage) Appender() (storage.Appender, error) { + if x := s.get(); x != nil { + return x.Appender() + } + return nil, ErrNotReady +} + +// Close implements the Storage interface. +func (s *ReadyStorage) Close() error { + if x := s.Get(); x != nil { + return x.Close() + } + return nil +} + func Adapter(db *tsdb.DB) storage.Storage { return &adapter{db: db} } diff --git a/vendor/github.com/prometheus/tsdb/block.go b/vendor/github.com/prometheus/tsdb/block.go index 67cd57491..232e64e67 100644 --- a/vendor/github.com/prometheus/tsdb/block.go +++ b/vendor/github.com/prometheus/tsdb/block.go @@ -64,11 +64,6 @@ type Appendable interface { Appender() Appender } -// Queryable defines an entity which provides a Querier. -type Queryable interface { - Querier(mint, maxt int64) Querier -} - // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index c9745cfc6..87b5ed253 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -165,8 +165,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db return nil, err } if l == nil { - l = log.NewLogfmtLogger(os.Stdout) - l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) + l = log.NewNopLogger() } if opts == nil { opts = DefaultOptions diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index ea7b63f8a..a5ce94e45 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -185,13 +185,18 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( return h, nil } +// ReadWAL initializes the head by consuming the write ahead log. func (h *Head) ReadWAL() error { r := h.wal.Reader() mint := h.MinTime() seriesFunc := func(series []RefSeries) error { for _, s := range series { - h.create(s.Labels.Hash(), s.Labels) + h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + + if h.lastSeriesID < s.Ref { + h.lastSeriesID = s.Ref + } } return nil } @@ -202,7 +207,8 @@ func (h *Head) ReadWAL() error { } ms := h.series.getByID(s.Ref) if ms == nil { - return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref) + h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref) + continue } _, chunkCreated := ms.append(s.T, s.V) if chunkCreated { @@ -210,7 +216,6 @@ func (h *Head) ReadWAL() error { h.metrics.chunks.Inc() } } - return nil } deletesFunc := func(stones []Stone) error { @@ -222,7 +227,6 @@ func (h *Head) ReadWAL() error { h.tombstones.add(s.ref, itv) } } - return nil } @@ -379,17 +383,12 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro if t < a.mint { return 0, ErrOutOfBounds } - hash := lset.Hash() - - s := a.head.series.getByHash(hash, lset) - - if s == nil { - s = a.head.create(hash, lset) + s, created := a.head.getOrCreate(lset.Hash(), lset) + if created { a.series = append(a.series, RefSeries{ Ref: s.ref, Labels: lset, - hash: hash, }) } return s.ref, a.AddFast(s.ref, t, v) @@ -839,20 +838,32 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) { return res, nil } -func (h *Head) create(hash uint64, lset labels.Labels) *memSeries { - h.metrics.series.Inc() - h.metrics.seriesCreated.Inc() +func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) { + // Just using `getOrSet` below would be semantically sufficient, but we'd create + // a new series on every sample inserted via Add(), which causes allocations + // and makes our series IDs rather random and harder to compress in postings. + s := h.series.getByHash(hash, lset) + if s != nil { + return s, false + } // Optimistically assume that we are the first one to create the series. id := atomic.AddUint64(&h.lastSeriesID, 1) + + return h.getOrCreateWithID(id, hash, lset) +} + +func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool) { s := newMemSeries(lset, id, h.chunkRange) s, created := h.series.getOrSet(hash, s) - // Skip indexing if we didn't actually create the series. if !created { - return s + return s, false } + h.metrics.series.Inc() + h.metrics.seriesCreated.Inc() + h.postings.add(id, lset) h.symMtx.Lock() @@ -870,7 +881,7 @@ func (h *Head) create(hash uint64, lset labels.Labels) *memSeries { h.symbols[l.Value] = struct{}{} } - return s + return s, true } // seriesHashmap is a simple hashmap for memSeries by their label set. It is built @@ -1023,6 +1034,7 @@ func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, boo s.locks[i].Lock() if prev := s.hashes[i].get(hash, series.lset); prev != nil { + s.locks[i].Unlock() return prev, false } s.hashes[i].set(hash, series) diff --git a/vendor/github.com/prometheus/tsdb/index.go b/vendor/github.com/prometheus/tsdb/index.go index fd9b25162..3cdaad74d 100644 --- a/vendor/github.com/prometheus/tsdb/index.go +++ b/vendor/github.com/prometheus/tsdb/index.go @@ -570,6 +570,9 @@ var ( errInvalidFlag = fmt.Errorf("invalid flag") ) +// NewIndexReader returns a new IndexReader on the given directory. +func NewIndexReader(dir string) (IndexReader, error) { return newIndexReader(dir) } + // newIndexReader returns a new indexReader on the given directory. func newIndexReader(dir string) (*indexReader, error) { f, err := openMmapFile(filepath.Join(dir, "index")) diff --git a/vendor/github.com/prometheus/tsdb/wal.go b/vendor/github.com/prometheus/tsdb/wal.go index 9af9a1853..27984ea0c 100644 --- a/vendor/github.com/prometheus/tsdb/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal.go @@ -99,9 +99,6 @@ type WALReader interface { type RefSeries struct { Ref uint64 Labels labels.Labels - - // hash for the label set. This field is not generally populated. - hash uint64 } // RefSample is a timestamp/value pair associated with a reference to a series. @@ -827,7 +824,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC if err != nil { return errors.Wrap(err, "decode series entry") } - seriesf(series) + if err := seriesf(series); err != nil { + return err + } cf := r.current() @@ -842,7 +841,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC if err != nil { return errors.Wrap(err, "decode samples entry") } - samplesf(samples) + if err := samplesf(samples); err != nil { + return err + } // Update the times for the WAL segment file. cf := r.current() @@ -858,7 +859,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC if err != nil { return errors.Wrap(err, "decode delete entry") } - deletesf(stones) + if err := deletesf(stones); err != nil { + return err + } // Update the times for the WAL segment file. cf := r.current() diff --git a/vendor/vendor.json b/vendor/vendor.json index 907fb7e06..2b639724e 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -871,22 +871,22 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "mDKxPAubVLTWW/Gar13m7YDHSek=", + "checksumSHA1": "B5ndMoK8lqgFJ8xUZ/0V4zCpUw0=", "path": "github.com/prometheus/tsdb", - "revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563", - "revisionTime": "2017-09-11T08:41:33Z" + "revision": "162a48e4f2c6e486a0ebf61cf9cea73a8023ef0a", + "revisionTime": "2017-09-19T08:20:19Z" }, { "checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=", "path": "github.com/prometheus/tsdb/chunks", - "revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563", - "revisionTime": "2017-09-11T08:41:33Z" + "revision": "162a48e4f2c6e486a0ebf61cf9cea73a8023ef0a", + "revisionTime": "2017-09-19T08:20:19Z" }, { "checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=", "path": "github.com/prometheus/tsdb/labels", - "revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563", - "revisionTime": "2017-09-11T08:41:33Z" + "revision": "162a48e4f2c6e486a0ebf61cf9cea73a8023ef0a", + "revisionTime": "2017-09-19T08:20:19Z" }, { "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=", diff --git a/web/api/v2/api.go b/web/api/v2/api.go index c8da742a4..c7740dfcb 100644 --- a/web/api/v2/api.go +++ b/web/api/v2/api.go @@ -46,8 +46,8 @@ import ( type API struct { enableAdmin bool now func() time.Time - db *tsdb.DB - q func(mint, maxt int64) storage.Querier + db func() *tsdb.DB + q func(mint, maxt int64) (storage.Querier, error) targets func() []*retrieval.Target alertmanagers func() []*url.URL } @@ -55,9 +55,9 @@ type API struct { // New returns a new API object. func New( now func() time.Time, - db *tsdb.DB, + db func() *tsdb.DB, qe *promql.Engine, - q func(mint, maxt int64) storage.Querier, + q func(mint, maxt int64) (storage.Querier, error), targets func() []*retrieval.Target, alertmanagers func() []*url.URL, enableAdmin bool, @@ -149,28 +149,31 @@ func (s *adminDisabled) DeleteSeries(_ context.Context, r *pb.SeriesDeleteReques // Admin provides an administration interface to Prometheus. type Admin struct { - db *tsdb.DB - snapdir string + db func() *tsdb.DB } // NewAdmin returns a Admin server. -func NewAdmin(db *tsdb.DB) *Admin { +func NewAdmin(db func() *tsdb.DB) *Admin { return &Admin{ - db: db, - snapdir: filepath.Join(db.Dir(), "snapshots"), + db: db, } } // TSDBSnapshot implements pb.AdminServer. func (s *Admin) TSDBSnapshot(_ context.Context, _ *pb.TSDBSnapshotRequest) (*pb.TSDBSnapshotResponse, error) { + db := s.db() + if db == nil { + return nil, status.Errorf(codes.Unavailable, "TSDB not ready") + } var ( - name = fmt.Sprintf("%s-%x", time.Now().UTC().Format(time.RFC3339), rand.Int()) - dir = filepath.Join(s.snapdir, name) + snapdir = filepath.Join(db.Dir(), "snapshots") + name = fmt.Sprintf("%s-%x", time.Now().UTC().Format(time.RFC3339), rand.Int()) + dir = filepath.Join(snapdir, name) ) if err := os.MkdirAll(dir, 0777); err != nil { return nil, status.Errorf(codes.Internal, "created snapshot directory: %s", err) } - if err := s.db.Snapshot(dir); err != nil { + if err := db.Snapshot(dir); err != nil { return nil, status.Errorf(codes.Internal, "create snapshot: %s", err) } return &pb.TSDBSnapshotResponse{Name: name}, nil @@ -210,7 +213,11 @@ func (s *Admin) DeleteSeries(_ context.Context, r *pb.SeriesDeleteRequest) (*pb. matchers = append(matchers, lm) } - if err := s.db.Delete(timestamp.FromTime(mint), timestamp.FromTime(maxt), matchers...); err != nil { + db := s.db() + if db == nil { + return nil, status.Errorf(codes.Unavailable, "TSDB not ready") + } + if err := db.Delete(timestamp.FromTime(mint), timestamp.FromTime(maxt), matchers...); err != nil { return nil, status.Error(codes.Internal, err.Error()) } return &pb.SeriesDeleteResponse{}, nil diff --git a/web/web.go b/web/web.go index 2aecc3007..e354594d3 100644 --- a/web/web.go +++ b/web/web.go @@ -47,7 +47,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/storage" - ptsdb "github.com/prometheus/prometheus/storage/tsdb" "github.com/prometheus/tsdb" "golang.org/x/net/context" "golang.org/x/net/netutil" @@ -75,7 +74,7 @@ type Handler struct { ruleManager *rules.Manager queryEngine *promql.Engine context context.Context - tsdb *tsdb.DB + tsdb func() *tsdb.DB storage storage.Storage notifier *notifier.Notifier @@ -122,7 +121,8 @@ type PrometheusVersion struct { // Options for the web Handler. type Options struct { Context context.Context - Storage *tsdb.DB + TSDB func() *tsdb.DB + Storage storage.Storage QueryEngine *promql.Engine TargetManager *retrieval.TargetManager RuleManager *rules.Manager @@ -171,8 +171,8 @@ func New(logger log.Logger, o *Options) *Handler { targetManager: o.TargetManager, ruleManager: o.RuleManager, queryEngine: o.QueryEngine, - tsdb: o.Storage, - storage: ptsdb.Adapter(o.Storage), + tsdb: o.TSDB, + storage: o.Storage, notifier: o.Notifier, now: model.Now, @@ -213,7 +213,7 @@ func New(logger log.Logger, o *Options) *Handler { router.Get("/targets", readyf(instrf("targets", h.targets))) router.Get("/version", readyf(instrf("version", h.version))) - router.Get("/heap", readyf(instrf("heap", h.dumpHeap))) + router.Get("/heap", instrf("heap", h.dumpHeap)) router.Get("/metrics", prometheus.Handler().ServeHTTP) @@ -223,24 +223,24 @@ func New(logger log.Logger, o *Options) *Handler { router.Get("/consoles/*filepath", readyf(instrf("consoles", h.consoles))) - router.Get("/static/*filepath", readyf(instrf("static", h.serveStaticAsset))) + router.Get("/static/*filepath", instrf("static", h.serveStaticAsset)) if o.UserAssetsPath != "" { - router.Get("/user/*filepath", readyf(instrf("user", route.FileServe(o.UserAssetsPath)))) + router.Get("/user/*filepath", instrf("user", route.FileServe(o.UserAssetsPath))) } if o.EnableLifecycle { router.Post("/-/quit", h.quit) router.Post("/-/reload", h.reload) } else { - router.Post("/-/quit", readyf(func(w http.ResponseWriter, _ *http.Request) { + router.Post("/-/quit", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusForbidden) w.Write([]byte("Lifecycle APIs are not enabled")) - })) - router.Post("/-/reload", readyf(func(w http.ResponseWriter, _ *http.Request) { + }) + router.Post("/-/reload", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusForbidden) w.Write([]byte("Lifecycle APIs are not enabled")) - })) + }) } router.Get("/-/quit", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusMethodNotAllowed) @@ -251,8 +251,8 @@ func New(logger log.Logger, o *Options) *Handler { w.Write([]byte("Only POST requests allowed")) }) - router.Get("/debug/*subpath", readyf(serveDebug)) - router.Post("/debug/*subpath", readyf(serveDebug)) + router.Get("/debug/*subpath", serveDebug) + router.Post("/debug/*subpath", serveDebug) router.Get("/-/healthy", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) @@ -380,15 +380,9 @@ func (h *Handler) Run(ctx context.Context) error { ) av2 := api_v2.New( time.Now, - h.options.Storage, + h.options.TSDB, h.options.QueryEngine, - func(mint, maxt int64) storage.Querier { - q, err := ptsdb.Adapter(h.options.Storage).Querier(mint, maxt) - if err != nil { - panic(err) - } - return q - }, + h.options.Storage.Querier, func() []*retrieval.Target { return h.options.TargetManager.Targets() }, diff --git a/web/web_test.go b/web/web_test.go index 6a3250523..eb71c73a0 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -19,6 +19,8 @@ import ( "net/url" "testing" "time" + + "github.com/prometheus/prometheus/storage/tsdb" ) func TestGlobalURL(t *testing.T) { @@ -78,7 +80,7 @@ func TestReadyAndHealthy(t *testing.T) { ReadTimeout: 30 * time.Second, MaxConnections: 512, Context: nil, - Storage: nil, + Storage: &tsdb.ReadyStorage{}, QueryEngine: nil, TargetManager: nil, RuleManager: nil, @@ -155,7 +157,7 @@ func TestRoutePrefix(t *testing.T) { ReadTimeout: 30 * time.Second, MaxConnections: 512, Context: nil, - Storage: nil, + Storage: &tsdb.ReadyStorage{}, QueryEngine: nil, TargetManager: nil, RuleManager: nil,