Merge pull request #3186 from prometheus/startweb
web: start web handler while TSDB is starting up
This commit is contained in:
commit
249d69b513
|
@ -215,35 +215,17 @@ func main() {
|
|||
level.Info(logger).Log("build_context", version.BuildContext())
|
||||
level.Info(logger).Log("host_details", Uname())
|
||||
|
||||
var (
|
||||
// sampleAppender = storage.Fanout{}
|
||||
reloadables []Reloadable
|
||||
)
|
||||
|
||||
// Make sure that sighup handler is registered with a redirect to the channel before the potentially
|
||||
// long and synchronous tsdb init.
|
||||
hup := make(chan os.Signal)
|
||||
hupReady := make(chan bool)
|
||||
signal.Notify(hup, syscall.SIGHUP)
|
||||
|
||||
level.Info(logger).Log("msg", "Starting TSDB")
|
||||
|
||||
localStorage, err := tsdb.Open(
|
||||
cfg.localStoragePath,
|
||||
log.With(logger, "component", "tsdb"),
|
||||
prometheus.DefaultRegisterer,
|
||||
&cfg.tsdb,
|
||||
var (
|
||||
localStorage = &tsdb.ReadyStorage{}
|
||||
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"))
|
||||
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
|
||||
)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "Opening TSDB failed", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
level.Info(logger).Log("msg", "TSDB succesfully started")
|
||||
|
||||
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"))
|
||||
reloadables = append(reloadables, remoteStorage)
|
||||
fanoutStorage := storage.NewFanout(logger, tsdb.Adapter(localStorage), remoteStorage)
|
||||
|
||||
cfg.queryEngine.Logger = log.With(logger, "component", "query engine")
|
||||
var (
|
||||
|
@ -263,7 +245,8 @@ func main() {
|
|||
})
|
||||
|
||||
cfg.web.Context = ctx
|
||||
cfg.web.Storage = localStorage
|
||||
cfg.web.TSDB = localStorage.Get
|
||||
cfg.web.Storage = fanoutStorage
|
||||
cfg.web.QueryEngine = queryEngine
|
||||
cfg.web.TargetManager = targetManager
|
||||
cfg.web.RuleManager = ruleManager
|
||||
|
@ -285,11 +268,12 @@ func main() {
|
|||
|
||||
webHandler := web.New(log.With(logger, "component", "web"), &cfg.web)
|
||||
|
||||
reloadables = append(reloadables, targetManager, ruleManager, webHandler, notifier)
|
||||
|
||||
if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil {
|
||||
level.Error(logger).Log("msg", "Error loading config", "err", err)
|
||||
os.Exit(1)
|
||||
reloadables := []Reloadable{
|
||||
remoteStorage,
|
||||
targetManager,
|
||||
ruleManager,
|
||||
webHandler,
|
||||
notifier,
|
||||
}
|
||||
|
||||
// Wait for reload or termination signals. Start the handler for SIGHUP as
|
||||
|
@ -314,14 +298,29 @@ func main() {
|
|||
}
|
||||
}()
|
||||
|
||||
// Start all components. The order is NOT arbitrary.
|
||||
defer func() {
|
||||
if err := fanoutStorage.Close(); err != nil {
|
||||
level.Error(logger).Log("msg", "Closing storage(s) failed", "err", err)
|
||||
}
|
||||
}()
|
||||
// Start all components while we wait for TSDB to open but only load
|
||||
// initial config and mark ourselves as ready after it completed.
|
||||
dbOpen := make(chan struct{})
|
||||
|
||||
// defer remoteStorage.Stop()
|
||||
go func() {
|
||||
defer close(dbOpen)
|
||||
|
||||
level.Info(logger).Log("msg", "Starting TSDB")
|
||||
|
||||
db, err := tsdb.Open(
|
||||
cfg.localStoragePath,
|
||||
log.With(logger, "component", "tsdb"),
|
||||
prometheus.DefaultRegisterer,
|
||||
&cfg.tsdb,
|
||||
)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "Opening storage failed", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
level.Info(logger).Log("msg", "TSDB started")
|
||||
|
||||
localStorage.Set(db)
|
||||
}()
|
||||
|
||||
prometheus.MustRegister(configSuccess)
|
||||
prometheus.MustRegister(configSuccessTime)
|
||||
|
@ -344,6 +343,19 @@ func main() {
|
|||
errc := make(chan error)
|
||||
go func() { errc <- webHandler.Run(ctx) }()
|
||||
|
||||
<-dbOpen
|
||||
|
||||
if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil {
|
||||
level.Error(logger).Log("msg", "Error loading config", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := fanoutStorage.Close(); err != nil {
|
||||
level.Error(logger).Log("msg", "Error stopping storage", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for reload or termination signals.
|
||||
close(hupReady) // Unblock SIGHUP handler.
|
||||
|
||||
|
|
|
@ -882,6 +882,7 @@ loop:
|
|||
added++
|
||||
continue
|
||||
default:
|
||||
level.Debug(sl.l).Log("msg", "unexpected error", "series", string(met), "err", err)
|
||||
break loop
|
||||
}
|
||||
if tp == nil {
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
|
@ -27,6 +28,63 @@ import (
|
|||
tsdbLabels "github.com/prometheus/tsdb/labels"
|
||||
)
|
||||
|
||||
// ErrNotReady is returned if the underlying storage is not ready yet.
|
||||
var ErrNotReady = errors.New("TSDB not ready")
|
||||
|
||||
// ReadyStorage implements the Storage interface while allowing to set the actual
|
||||
// storage at a later point in time.
|
||||
type ReadyStorage struct {
|
||||
mtx sync.RWMutex
|
||||
a *adapter
|
||||
}
|
||||
|
||||
// Set the storage.
|
||||
func (s *ReadyStorage) Set(db *tsdb.DB) {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
s.a = &adapter{db: db}
|
||||
}
|
||||
|
||||
// Get the storage.
|
||||
func (s *ReadyStorage) Get() *tsdb.DB {
|
||||
if x := s.get(); x != nil {
|
||||
return x.db
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ReadyStorage) get() *adapter {
|
||||
s.mtx.RLock()
|
||||
x := s.a
|
||||
s.mtx.RUnlock()
|
||||
return x
|
||||
}
|
||||
|
||||
// Querier implements the Storage interface.
|
||||
func (s *ReadyStorage) Querier(mint, maxt int64) (storage.Querier, error) {
|
||||
if x := s.get(); x != nil {
|
||||
return x.Querier(mint, maxt)
|
||||
}
|
||||
return nil, ErrNotReady
|
||||
}
|
||||
|
||||
// Appender implements the Storage interface.
|
||||
func (s *ReadyStorage) Appender() (storage.Appender, error) {
|
||||
if x := s.get(); x != nil {
|
||||
return x.Appender()
|
||||
}
|
||||
return nil, ErrNotReady
|
||||
}
|
||||
|
||||
// Close implements the Storage interface.
|
||||
func (s *ReadyStorage) Close() error {
|
||||
if x := s.Get(); x != nil {
|
||||
return x.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Adapter(db *tsdb.DB) storage.Storage {
|
||||
return &adapter{db: db}
|
||||
}
|
||||
|
|
|
@ -46,8 +46,8 @@ import (
|
|||
type API struct {
|
||||
enableAdmin bool
|
||||
now func() time.Time
|
||||
db *tsdb.DB
|
||||
q func(mint, maxt int64) storage.Querier
|
||||
db func() *tsdb.DB
|
||||
q func(mint, maxt int64) (storage.Querier, error)
|
||||
targets func() []*retrieval.Target
|
||||
alertmanagers func() []*url.URL
|
||||
}
|
||||
|
@ -55,9 +55,9 @@ type API struct {
|
|||
// New returns a new API object.
|
||||
func New(
|
||||
now func() time.Time,
|
||||
db *tsdb.DB,
|
||||
db func() *tsdb.DB,
|
||||
qe *promql.Engine,
|
||||
q func(mint, maxt int64) storage.Querier,
|
||||
q func(mint, maxt int64) (storage.Querier, error),
|
||||
targets func() []*retrieval.Target,
|
||||
alertmanagers func() []*url.URL,
|
||||
enableAdmin bool,
|
||||
|
@ -149,28 +149,31 @@ func (s *adminDisabled) DeleteSeries(_ context.Context, r *pb.SeriesDeleteReques
|
|||
|
||||
// Admin provides an administration interface to Prometheus.
|
||||
type Admin struct {
|
||||
db *tsdb.DB
|
||||
snapdir string
|
||||
db func() *tsdb.DB
|
||||
}
|
||||
|
||||
// NewAdmin returns a Admin server.
|
||||
func NewAdmin(db *tsdb.DB) *Admin {
|
||||
func NewAdmin(db func() *tsdb.DB) *Admin {
|
||||
return &Admin{
|
||||
db: db,
|
||||
snapdir: filepath.Join(db.Dir(), "snapshots"),
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
// TSDBSnapshot implements pb.AdminServer.
|
||||
func (s *Admin) TSDBSnapshot(_ context.Context, _ *pb.TSDBSnapshotRequest) (*pb.TSDBSnapshotResponse, error) {
|
||||
db := s.db()
|
||||
if db == nil {
|
||||
return nil, status.Errorf(codes.Unavailable, "TSDB not ready")
|
||||
}
|
||||
var (
|
||||
name = fmt.Sprintf("%s-%x", time.Now().UTC().Format(time.RFC3339), rand.Int())
|
||||
dir = filepath.Join(s.snapdir, name)
|
||||
snapdir = filepath.Join(db.Dir(), "snapshots")
|
||||
name = fmt.Sprintf("%s-%x", time.Now().UTC().Format(time.RFC3339), rand.Int())
|
||||
dir = filepath.Join(snapdir, name)
|
||||
)
|
||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "created snapshot directory: %s", err)
|
||||
}
|
||||
if err := s.db.Snapshot(dir); err != nil {
|
||||
if err := db.Snapshot(dir); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "create snapshot: %s", err)
|
||||
}
|
||||
return &pb.TSDBSnapshotResponse{Name: name}, nil
|
||||
|
@ -210,7 +213,11 @@ func (s *Admin) DeleteSeries(_ context.Context, r *pb.SeriesDeleteRequest) (*pb.
|
|||
|
||||
matchers = append(matchers, lm)
|
||||
}
|
||||
if err := s.db.Delete(timestamp.FromTime(mint), timestamp.FromTime(maxt), matchers...); err != nil {
|
||||
db := s.db()
|
||||
if db == nil {
|
||||
return nil, status.Errorf(codes.Unavailable, "TSDB not ready")
|
||||
}
|
||||
if err := db.Delete(timestamp.FromTime(mint), timestamp.FromTime(maxt), matchers...); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
return &pb.SeriesDeleteResponse{}, nil
|
||||
|
|
38
web/web.go
38
web/web.go
|
@ -47,7 +47,6 @@ import (
|
|||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/common/route"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
ptsdb "github.com/prometheus/prometheus/storage/tsdb"
|
||||
"github.com/prometheus/tsdb"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/netutil"
|
||||
|
@ -75,7 +74,7 @@ type Handler struct {
|
|||
ruleManager *rules.Manager
|
||||
queryEngine *promql.Engine
|
||||
context context.Context
|
||||
tsdb *tsdb.DB
|
||||
tsdb func() *tsdb.DB
|
||||
storage storage.Storage
|
||||
notifier *notifier.Notifier
|
||||
|
||||
|
@ -122,7 +121,8 @@ type PrometheusVersion struct {
|
|||
// Options for the web Handler.
|
||||
type Options struct {
|
||||
Context context.Context
|
||||
Storage *tsdb.DB
|
||||
TSDB func() *tsdb.DB
|
||||
Storage storage.Storage
|
||||
QueryEngine *promql.Engine
|
||||
TargetManager *retrieval.TargetManager
|
||||
RuleManager *rules.Manager
|
||||
|
@ -171,8 +171,8 @@ func New(logger log.Logger, o *Options) *Handler {
|
|||
targetManager: o.TargetManager,
|
||||
ruleManager: o.RuleManager,
|
||||
queryEngine: o.QueryEngine,
|
||||
tsdb: o.Storage,
|
||||
storage: ptsdb.Adapter(o.Storage),
|
||||
tsdb: o.TSDB,
|
||||
storage: o.Storage,
|
||||
notifier: o.Notifier,
|
||||
|
||||
now: model.Now,
|
||||
|
@ -213,7 +213,7 @@ func New(logger log.Logger, o *Options) *Handler {
|
|||
router.Get("/targets", readyf(instrf("targets", h.targets)))
|
||||
router.Get("/version", readyf(instrf("version", h.version)))
|
||||
|
||||
router.Get("/heap", readyf(instrf("heap", h.dumpHeap)))
|
||||
router.Get("/heap", instrf("heap", h.dumpHeap))
|
||||
|
||||
router.Get("/metrics", prometheus.Handler().ServeHTTP)
|
||||
|
||||
|
@ -223,24 +223,24 @@ func New(logger log.Logger, o *Options) *Handler {
|
|||
|
||||
router.Get("/consoles/*filepath", readyf(instrf("consoles", h.consoles)))
|
||||
|
||||
router.Get("/static/*filepath", readyf(instrf("static", h.serveStaticAsset)))
|
||||
router.Get("/static/*filepath", instrf("static", h.serveStaticAsset))
|
||||
|
||||
if o.UserAssetsPath != "" {
|
||||
router.Get("/user/*filepath", readyf(instrf("user", route.FileServe(o.UserAssetsPath))))
|
||||
router.Get("/user/*filepath", instrf("user", route.FileServe(o.UserAssetsPath)))
|
||||
}
|
||||
|
||||
if o.EnableLifecycle {
|
||||
router.Post("/-/quit", h.quit)
|
||||
router.Post("/-/reload", h.reload)
|
||||
} else {
|
||||
router.Post("/-/quit", readyf(func(w http.ResponseWriter, _ *http.Request) {
|
||||
router.Post("/-/quit", func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(http.StatusForbidden)
|
||||
w.Write([]byte("Lifecycle APIs are not enabled"))
|
||||
}))
|
||||
router.Post("/-/reload", readyf(func(w http.ResponseWriter, _ *http.Request) {
|
||||
})
|
||||
router.Post("/-/reload", func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(http.StatusForbidden)
|
||||
w.Write([]byte("Lifecycle APIs are not enabled"))
|
||||
}))
|
||||
})
|
||||
}
|
||||
router.Get("/-/quit", func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
|
@ -251,8 +251,8 @@ func New(logger log.Logger, o *Options) *Handler {
|
|||
w.Write([]byte("Only POST requests allowed"))
|
||||
})
|
||||
|
||||
router.Get("/debug/*subpath", readyf(serveDebug))
|
||||
router.Post("/debug/*subpath", readyf(serveDebug))
|
||||
router.Get("/debug/*subpath", serveDebug)
|
||||
router.Post("/debug/*subpath", serveDebug)
|
||||
|
||||
router.Get("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
@ -380,15 +380,9 @@ func (h *Handler) Run(ctx context.Context) error {
|
|||
)
|
||||
av2 := api_v2.New(
|
||||
time.Now,
|
||||
h.options.Storage,
|
||||
h.options.TSDB,
|
||||
h.options.QueryEngine,
|
||||
func(mint, maxt int64) storage.Querier {
|
||||
q, err := ptsdb.Adapter(h.options.Storage).Querier(mint, maxt)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return q
|
||||
},
|
||||
h.options.Storage.Querier,
|
||||
func() []*retrieval.Target {
|
||||
return h.options.TargetManager.Targets()
|
||||
},
|
||||
|
|
|
@ -19,6 +19,8 @@ import (
|
|||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/prometheus/storage/tsdb"
|
||||
)
|
||||
|
||||
func TestGlobalURL(t *testing.T) {
|
||||
|
@ -78,7 +80,7 @@ func TestReadyAndHealthy(t *testing.T) {
|
|||
ReadTimeout: 30 * time.Second,
|
||||
MaxConnections: 512,
|
||||
Context: nil,
|
||||
Storage: nil,
|
||||
Storage: &tsdb.ReadyStorage{},
|
||||
QueryEngine: nil,
|
||||
TargetManager: nil,
|
||||
RuleManager: nil,
|
||||
|
@ -155,7 +157,7 @@ func TestRoutePrefix(t *testing.T) {
|
|||
ReadTimeout: 30 * time.Second,
|
||||
MaxConnections: 512,
|
||||
Context: nil,
|
||||
Storage: nil,
|
||||
Storage: &tsdb.ReadyStorage{},
|
||||
QueryEngine: nil,
|
||||
TargetManager: nil,
|
||||
RuleManager: nil,
|
||||
|
|
Loading…
Reference in New Issue