Merge pull request #2012 from prometheus/context-consolidation
api: Consolidate web API contexts
This commit is contained in:
commit
0c64ad1a3d
|
@ -32,10 +32,38 @@ func WithParam(ctx context.Context, p, v string) context.Context {
|
|||
return context.WithValue(ctx, param(p), v)
|
||||
}
|
||||
|
||||
// handle turns a Handle into httprouter.Handle
|
||||
func handle(h http.HandlerFunc) httprouter.Handle {
|
||||
return func(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
type contextFn func(r *http.Request) context.Context
|
||||
|
||||
// Router wraps httprouter.Router and adds support for prefixed sub-routers
|
||||
// and per-request context injections.
|
||||
type Router struct {
|
||||
rtr *httprouter.Router
|
||||
prefix string
|
||||
ctxFn contextFn
|
||||
}
|
||||
|
||||
// New returns a new Router.
|
||||
func New(ctxFn contextFn) *Router {
|
||||
if ctxFn == nil {
|
||||
ctxFn = func(r *http.Request) context.Context {
|
||||
return context.Background()
|
||||
}
|
||||
}
|
||||
return &Router{
|
||||
rtr: httprouter.New(),
|
||||
ctxFn: ctxFn,
|
||||
}
|
||||
}
|
||||
|
||||
// WithPrefix returns a router that prefixes all registered routes with prefix.
|
||||
func (r *Router) WithPrefix(prefix string) *Router {
|
||||
return &Router{rtr: r.rtr, prefix: r.prefix + prefix, ctxFn: r.ctxFn}
|
||||
}
|
||||
|
||||
// handle turns a HandlerFunc into an httprouter.Handle.
|
||||
func (r *Router) handle(h http.HandlerFunc) httprouter.Handle {
|
||||
return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
|
||||
ctx, cancel := context.WithCancel(r.ctxFn(req))
|
||||
defer cancel()
|
||||
|
||||
for _, p := range params {
|
||||
|
@ -43,56 +71,40 @@ func handle(h http.HandlerFunc) httprouter.Handle {
|
|||
}
|
||||
|
||||
mtx.Lock()
|
||||
ctxts[r] = ctx
|
||||
ctxts[req] = ctx
|
||||
mtx.Unlock()
|
||||
|
||||
h(w, r)
|
||||
h(w, req)
|
||||
|
||||
mtx.Lock()
|
||||
delete(ctxts, r)
|
||||
delete(ctxts, req)
|
||||
mtx.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Router wraps httprouter.Router and adds support for prefixed sub-routers.
|
||||
type Router struct {
|
||||
rtr *httprouter.Router
|
||||
prefix string
|
||||
}
|
||||
|
||||
// New returns a new Router.
|
||||
func New() *Router {
|
||||
return &Router{rtr: httprouter.New()}
|
||||
}
|
||||
|
||||
// WithPrefix returns a router that prefixes all registered routes with prefix.
|
||||
func (r *Router) WithPrefix(prefix string) *Router {
|
||||
return &Router{rtr: r.rtr, prefix: r.prefix + prefix}
|
||||
}
|
||||
|
||||
// Get registers a new GET route.
|
||||
func (r *Router) Get(path string, h http.HandlerFunc) {
|
||||
r.rtr.GET(r.prefix+path, handle(h))
|
||||
r.rtr.GET(r.prefix+path, r.handle(h))
|
||||
}
|
||||
|
||||
// Options registers a new OPTIONS route.
|
||||
func (r *Router) Options(path string, h http.HandlerFunc) {
|
||||
r.rtr.OPTIONS(r.prefix+path, handle(h))
|
||||
r.rtr.OPTIONS(r.prefix+path, r.handle(h))
|
||||
}
|
||||
|
||||
// Del registers a new DELETE route.
|
||||
func (r *Router) Del(path string, h http.HandlerFunc) {
|
||||
r.rtr.DELETE(r.prefix+path, handle(h))
|
||||
r.rtr.DELETE(r.prefix+path, r.handle(h))
|
||||
}
|
||||
|
||||
// Put registers a new PUT route.
|
||||
func (r *Router) Put(path string, h http.HandlerFunc) {
|
||||
r.rtr.PUT(r.prefix+path, handle(h))
|
||||
r.rtr.PUT(r.prefix+path, r.handle(h))
|
||||
}
|
||||
|
||||
// Post registers a new POST route.
|
||||
func (r *Router) Post(path string, h http.HandlerFunc) {
|
||||
r.rtr.POST(r.prefix+path, handle(h))
|
||||
r.rtr.POST(r.prefix+path, r.handle(h))
|
||||
}
|
||||
|
||||
// Redirect takes an absolute path and sends an internal HTTP redirect for it,
|
||||
|
|
|
@ -211,8 +211,8 @@
|
|||
{
|
||||
"checksumSHA1": "CKVJRc1NREmfoAWQLHxqWQlvxo0=",
|
||||
"path": "github.com/prometheus/common/route",
|
||||
"revision": "4402f4e5ea79ec15f3c574773b6a5198fbea215f",
|
||||
"revisionTime": "2016-06-23T15:14:27Z"
|
||||
"revision": "ee21c31a8cbad2a29cce94a8c53855e1dab92bf7",
|
||||
"revisionTime": "2016-09-21T02:27:24+02:00"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "91KYK0SpvkaMJJA2+BcxbVnyRO0=",
|
||||
|
|
|
@ -85,7 +85,6 @@ type apiFunc func(r *http.Request) (interface{}, *apiError)
|
|||
// API can register a set of endpoints in a router and handle
|
||||
// them using the provided storage and query engine.
|
||||
type API struct {
|
||||
Context context.Context
|
||||
Storage local.Storage
|
||||
QueryEngine *promql.Engine
|
||||
|
||||
|
@ -94,9 +93,8 @@ type API struct {
|
|||
}
|
||||
|
||||
// NewAPI returns an initialized API type.
|
||||
func NewAPI(ctx context.Context, qe *promql.Engine, st local.Storage) *API {
|
||||
func NewAPI(qe *promql.Engine, st local.Storage) *API {
|
||||
return &API{
|
||||
Context: ctx,
|
||||
QueryEngine: qe,
|
||||
Storage: st,
|
||||
context: route.Context,
|
||||
|
@ -159,7 +157,7 @@ func (api *API) query(r *http.Request) (interface{}, *apiError) {
|
|||
return nil, &apiError{errorBadData, err}
|
||||
}
|
||||
|
||||
res := qry.Exec(api.Context)
|
||||
res := qry.Exec(api.context(r))
|
||||
if res.Err != nil {
|
||||
switch res.Err.(type) {
|
||||
case promql.ErrQueryCanceled:
|
||||
|
@ -206,7 +204,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError) {
|
|||
return nil, &apiError{errorBadData, err}
|
||||
}
|
||||
|
||||
res := qry.Exec(api.Context)
|
||||
res := qry.Exec(api.context(r))
|
||||
if res.Err != nil {
|
||||
switch res.Err.(type) {
|
||||
case promql.ErrQueryCanceled:
|
||||
|
@ -228,7 +226,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, *apiError) {
|
|||
if !model.LabelNameRE.MatchString(name) {
|
||||
return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)}
|
||||
}
|
||||
vals, err := api.Storage.LabelValuesForLabelName(api.Context, model.LabelName(name))
|
||||
vals, err := api.Storage.LabelValuesForLabelName(api.context(r), model.LabelName(name))
|
||||
if err != nil {
|
||||
return nil, &apiError{errorExec, err}
|
||||
}
|
||||
|
@ -274,7 +272,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
|
|||
matcherSets = append(matcherSets, matchers)
|
||||
}
|
||||
|
||||
res, err := api.Storage.MetricsForLabelMatchers(api.Context, start, end, matcherSets...)
|
||||
res, err := api.Storage.MetricsForLabelMatchers(api.context(r), start, end, matcherSets...)
|
||||
if err != nil {
|
||||
return nil, &apiError{errorExec, err}
|
||||
}
|
||||
|
|
|
@ -52,7 +52,6 @@ func TestEndpoints(t *testing.T) {
|
|||
api := &API{
|
||||
Storage: suite.Storage(),
|
||||
QueryEngine: suite.QueryEngine(),
|
||||
Context: suite.Context(),
|
||||
now: func() model.Time { return now },
|
||||
}
|
||||
|
||||
|
@ -610,7 +609,7 @@ func TestParseDuration(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestOptionsMethod(t *testing.T) {
|
||||
r := route.New()
|
||||
r := route.New(nil)
|
||||
api := &API{}
|
||||
api.Register(r)
|
||||
|
||||
|
|
|
@ -120,7 +120,9 @@ type Options struct {
|
|||
|
||||
// New initializes a new web Handler.
|
||||
func New(o *Options) *Handler {
|
||||
router := route.New()
|
||||
router := route.New(func(r *http.Request) context.Context {
|
||||
return o.Context
|
||||
})
|
||||
|
||||
h := &Handler{
|
||||
router: router,
|
||||
|
@ -138,7 +140,7 @@ func New(o *Options) *Handler {
|
|||
queryEngine: o.QueryEngine,
|
||||
storage: o.Storage,
|
||||
|
||||
apiV1: api_v1.NewAPI(o.Context, o.QueryEngine, o.Storage),
|
||||
apiV1: api_v1.NewAPI(o.QueryEngine, o.Storage),
|
||||
now: model.Now,
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue