Merge pull request #198 from prometheus/feature/curator/scheduler
Include curator status in web heads-up-display.
This commit is contained in:
commit
97e7e79a6d
|
@ -29,4 +29,5 @@ type ApplicationState struct {
|
|||
Storage metric.Storage
|
||||
TargetManager retrieval.TargetManager
|
||||
BuildInfo map[string]string
|
||||
CurationState chan metric.CurationState
|
||||
}
|
||||
|
|
6
main.go
6
main.go
|
@ -15,7 +15,6 @@ package main
|
|||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/prometheus/prometheus/appstate"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/retrieval"
|
||||
|
@ -32,8 +31,6 @@ import (
|
|||
|
||||
// Commandline flags.
|
||||
var (
|
||||
_ = fmt.Sprintf("")
|
||||
|
||||
printVersion = flag.Bool("version", false, "print version information")
|
||||
configFile = flag.String("configFile", "prometheus.conf", "Prometheus configuration file name.")
|
||||
metricsStoragePath = flag.String("metricsStoragePath", "/tmp/metrics", "Base path for metrics storage.")
|
||||
|
@ -117,11 +114,12 @@ func main() {
|
|||
}
|
||||
|
||||
appState := &appstate.ApplicationState{
|
||||
BuildInfo: BuildInfo,
|
||||
Config: conf,
|
||||
CurationState: make(chan metric.CurationState),
|
||||
RuleManager: ruleManager,
|
||||
Storage: ts,
|
||||
TargetManager: targetManager,
|
||||
BuildInfo: BuildInfo,
|
||||
}
|
||||
|
||||
web.StartServing(appState)
|
||||
|
|
|
@ -26,6 +26,15 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
// CurationState contains high-level curation state information for the
|
||||
// heads-up-display.
|
||||
type CurationState struct {
|
||||
Active bool
|
||||
Name string
|
||||
Limit time.Duration
|
||||
Fingerprint model.Fingerprint
|
||||
}
|
||||
|
||||
// watermarkFilter determines whether to include or exclude candidate
|
||||
// values from the curation process by virtue of how old the high watermark is.
|
||||
type watermarkFilter struct {
|
||||
|
@ -43,6 +52,8 @@ type watermarkFilter struct {
|
|||
stop chan bool
|
||||
// stopAt is used to determine the elegibility of series for compaction.
|
||||
stopAt time.Time
|
||||
// status is the outbound channel for notifying the status page of its state.
|
||||
status chan CurationState
|
||||
}
|
||||
|
||||
// curator is responsible for effectuating a given curation policy across the
|
||||
|
@ -97,7 +108,7 @@ func newCurator() curator {
|
|||
// curated.
|
||||
// curationState is the on-disk store where the curation remarks are made for
|
||||
// how much progress has been made.
|
||||
func (c curator) run(ignoreYoungerThan time.Duration, instant time.Time, processor processor, curationState, samples, watermarks *leveldb.LevelDBPersistence) (err error) {
|
||||
func (c curator) run(ignoreYoungerThan time.Duration, instant time.Time, processor processor, curationState, samples, watermarks *leveldb.LevelDBPersistence, status chan CurationState) (err error) {
|
||||
defer func(t time.Time) {
|
||||
duration := float64(time.Since(t))
|
||||
|
||||
|
@ -113,6 +124,11 @@ func (c curator) run(ignoreYoungerThan time.Duration, instant time.Time, process
|
|||
curationDuration.IncrementBy(labels, duration)
|
||||
curationDurations.Add(labels, duration)
|
||||
}(time.Now())
|
||||
defer func() {
|
||||
status <- CurationState{
|
||||
Active: false,
|
||||
}
|
||||
}()
|
||||
|
||||
iterator := samples.NewIterator(true)
|
||||
defer iterator.Close()
|
||||
|
@ -130,8 +146,9 @@ func (c curator) run(ignoreYoungerThan time.Duration, instant time.Time, process
|
|||
|
||||
filter := watermarkFilter{
|
||||
curationState: curationState,
|
||||
processor: processor,
|
||||
ignoreYoungerThan: ignoreYoungerThan,
|
||||
processor: processor,
|
||||
status: status,
|
||||
stop: c.stop,
|
||||
stopAt: instant.Add(-1 * ignoreYoungerThan),
|
||||
}
|
||||
|
@ -235,6 +252,8 @@ func getCurationRemark(states raw.Persistence, processor processor, ignoreYounge
|
|||
}
|
||||
|
||||
func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult) {
|
||||
fingerprint := key.(model.Fingerprint)
|
||||
|
||||
defer func() {
|
||||
labels := map[string]string{
|
||||
cutOff: fmt.Sprint(w.ignoreYoungerThan),
|
||||
|
@ -245,11 +264,19 @@ func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult)
|
|||
curationFilterOperations.Increment(labels)
|
||||
}()
|
||||
|
||||
defer func() {
|
||||
w.status <- CurationState{
|
||||
Active: true,
|
||||
Name: w.processor.Name(),
|
||||
Limit: w.ignoreYoungerThan,
|
||||
Fingerprint: fingerprint,
|
||||
}
|
||||
}()
|
||||
|
||||
if w.shouldStop() {
|
||||
return storage.STOP
|
||||
}
|
||||
|
||||
fingerprint := key.(model.Fingerprint)
|
||||
curationRemark, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, fingerprint)
|
||||
if err != nil {
|
||||
return
|
||||
|
|
|
@ -845,8 +845,11 @@ func TestCuratorCompactionProcessor(t *testing.T) {
|
|||
}
|
||||
defer samples.Close()
|
||||
|
||||
updates := make(chan CurationState, 100)
|
||||
defer close(updates)
|
||||
|
||||
c := newCurator()
|
||||
err = c.run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates)
|
||||
err = c.run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates, updates)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"flag"
|
||||
"github.com/prometheus/prometheus/appstate"
|
||||
"github.com/prometheus/prometheus/retrieval"
|
||||
"github.com/prometheus/prometheus/storage/metric"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
|
@ -27,20 +28,22 @@ type PrometheusStatus struct {
|
|||
TargetPools map[string]*retrieval.TargetPool
|
||||
BuildInfo map[string]string
|
||||
Flags map[string]string
|
||||
Curation metric.CurationState
|
||||
}
|
||||
|
||||
type StatusHandler struct {
|
||||
appState *appstate.ApplicationState
|
||||
PrometheusStatus *PrometheusStatus
|
||||
}
|
||||
|
||||
func (h *StatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
func (h *StatusHandler) Run() {
|
||||
flags := map[string]string{}
|
||||
|
||||
flag.VisitAll(func(f *flag.Flag) {
|
||||
flags[f.Name] = f.Value.String()
|
||||
})
|
||||
|
||||
status := &PrometheusStatus{
|
||||
h.PrometheusStatus = &PrometheusStatus{
|
||||
Config: h.appState.Config.ToString(0),
|
||||
Rules: "TODO: list rules here",
|
||||
Status: "TODO: add status information here",
|
||||
|
@ -48,5 +51,13 @@ func (h *StatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
BuildInfo: h.appState.BuildInfo,
|
||||
Flags: flags,
|
||||
}
|
||||
executeTemplate(w, "status", status)
|
||||
|
||||
// Law of Demeter :-(
|
||||
for state := range h.appState.CurationState {
|
||||
h.PrometheusStatus.Curation = state
|
||||
}
|
||||
}
|
||||
|
||||
func (h StatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
executeTemplate(w, "status", h.PrometheusStatus)
|
||||
}
|
||||
|
|
|
@ -35,6 +35,30 @@
|
|||
</ul>
|
||||
</div>
|
||||
|
||||
<h2>Curation</h2>
|
||||
<div class="grouping_box">
|
||||
<table>
|
||||
<tr>
|
||||
<th>Active</th>
|
||||
<td>{{.Curation.Active}}</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<th>Processor Name</th>
|
||||
<td>{{.Curation.Name}}</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<th>Recency Limit</th>
|
||||
<td>{{.Curation.Limit}}</td>
|
||||
</tr>
|
||||
{{if .Curation.Fingerprint}}
|
||||
<tr>
|
||||
<th>Current Fingerprint</th>
|
||||
<td>{{.Curation.Fingerprint}}</td>
|
||||
</tr>
|
||||
{{end}}
|
||||
</table>
|
||||
</div>
|
||||
|
||||
<h2>Build Info</h2>
|
||||
<div class="grouping_box">
|
||||
<table>
|
||||
|
|
10
web/web.go
10
web/web.go
|
@ -44,7 +44,10 @@ func StartServing(appState *appstate.ApplicationState) {
|
|||
exp.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
|
||||
exp.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
|
||||
|
||||
exp.Handle("/", &StatusHandler{appState: appState})
|
||||
statusHandler := &StatusHandler{appState: appState}
|
||||
go statusHandler.Run()
|
||||
|
||||
exp.Handle("/", statusHandler)
|
||||
exp.HandleFunc("/graph", graphHandler)
|
||||
exp.HandleFunc("/console", consoleHandler)
|
||||
|
||||
|
@ -90,5 +93,8 @@ func executeTemplate(w http.ResponseWriter, name string, data interface{}) {
|
|||
log.Printf("Error preparing layout template: %s", err)
|
||||
return
|
||||
}
|
||||
tpl.Execute(w, data)
|
||||
err = tpl.Execute(w, data)
|
||||
if err != nil {
|
||||
log.Printf("Error executing template: %s", err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue