From 2922def8d0ea80199f103ee7950ab32b685dc00d Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Fri, 4 Jan 2013 14:41:47 +0100 Subject: [PATCH] Use the ``TargetManager`` for targets. --- main.go | 22 ++-- retrieval/target.go | 211 +++++++++++++++++++++++++------------ retrieval/targetmanager.go | 66 ++++++++++++ retrieval/targetpool.go | 78 +++++++++++--- 4 files changed, 289 insertions(+), 88 deletions(-) create mode 100644 retrieval/targetmanager.go diff --git a/main.go b/main.go index 17440933c..cc586dcb3 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,7 @@ import ( "github.com/matttproud/prometheus/storage/metric/leveldb" "log" "os" + "time" ) func main() { @@ -32,21 +33,22 @@ func main() { m.Close() }() + results := make(chan retrieval.Result, 4096) + t := &retrieval.Target{ - Address: "http://localhost:8080/metrics.json", + Address: "http://localhost:8080/metrics.json", + Deadline: time.Second * 5, + Interval: time.Second * 3, } - for i := 0; i < 100000; i++ { - c, err := t.Scrape() - if err != nil { - fmt.Println(err) - continue - } + manager := retrieval.NewTargetManager(results, 1) + manager.Add(t) - for _, s := range c { + for { + result := <-results + fmt.Printf("result -> %s\n", result) + for _, s := range result.Samples { m.AppendSample(&s) } - - fmt.Printf("Finished %d\n", i) } } diff --git a/retrieval/target.go b/retrieval/target.go index 5f696a771..57454418a 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -10,10 +10,17 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - package retrieval import ( + "encoding/json" + "fmt" + "github.com/matttproud/prometheus/model" + "io/ioutil" + "log" + "math" + "net/http" + "strconv" "time" ) @@ -25,96 +32,126 @@ const ( UNREACHABLE ) +const ( + MAXIMUM_BACKOFF = time.Minute * 30 +) + type Target struct { scheduledFor time.Time unreachableCount int state TargetState Address string + Deadline time.Duration + + // XXX: Move this to a field with the target manager initialization instead of here. Interval time.Duration } -// KEPT FOR LEGACY COMPATIBILITY; PENDING REFACTOR +type Result struct { + Err error + Samples []model.Sample + Target Target +} -func (t *Target) Scrape() (samples []model.Sample, err error) { - defer func() { - if err != nil { - t.state = ALIVE +func (t *Target) reschedule(s TargetState) { + currentState := t.state + + switch currentState { + case UNKNOWN, UNREACHABLE: + switch s { + case ALIVE: + t.unreachableCount = 0 + case UNREACHABLE: + backoff := MAXIMUM_BACKOFF + exponential := time.Duration(math.Pow(2, float64(t.unreachableCount))) * time.Second + if backoff > exponential { + backoff = exponential + } + + t.scheduledFor = time.Now().Add(backoff) + t.unreachableCount++ + + log.Printf("%s unavailable %s times deferred for %s.", t, t.unreachableCount, backoff) + default: } + case ALIVE: + switch s { + case UNREACHABLE: + t.unreachableCount++ + } + default: + } + + if s != currentState { + log.Printf("%s transitioning from %s to %s.", t, currentState, s) + } + + t.state = s +} + +func (t *Target) Scrape(results chan Result) (err error) { + result := Result{} + + defer func() { + futureState := t.state + + switch err { + case nil: + futureState = ALIVE + default: + futureState = UNREACHABLE + } + + t.reschedule(futureState) + + result.Err = err + results <- result }() - ti := time.Now() - resp, err := http.Get(t.Address) - if err != nil { - return - } + done := make(chan bool) - defer resp.Body.Close() - - raw, err := ioutil.ReadAll(resp.Body) - if err != nil { - return - } - - intermediate := make(map[string]interface{}) - err = json.Unmarshal(raw, &intermediate) - if err != nil { - return - } - - baseLabels := map[string]string{"instance": t.Address} - - for name, v := range intermediate { - asMap, ok := v.(map[string]interface{}) - - if !ok { - continue + go func() { + ti := time.Now() + resp, err := http.Get(t.Address) + if err != nil { + return } - switch asMap["type"] { - case "counter": - m := model.Metric{} - m["name"] = model.LabelValue(name) - asFloat, ok := asMap["value"].(float64) + defer resp.Body.Close() + + raw, err := ioutil.ReadAll(resp.Body) + if err != nil { + return + } + + intermediate := make(map[string]interface{}) + err = json.Unmarshal(raw, &intermediate) + if err != nil { + return + } + + baseLabels := map[string]string{"instance": t.Address} + + for name, v := range intermediate { + asMap, ok := v.(map[string]interface{}) + if !ok { continue } - s := model.Sample{ - Metric: m, - Value: model.SampleValue(asFloat), - Timestamp: ti, - } - - for baseK, baseV := range baseLabels { - m[model.LabelName(baseK)] = model.LabelValue(baseV) - } - - samples = append(samples, s) - case "histogram": - values, ok := asMap["value"].(map[string]interface{}) - if !ok { - continue - } - - for p, pValue := range values { - asString, ok := pValue.(string) + switch asMap["type"] { + case "counter": + m := model.Metric{} + m["name"] = model.LabelValue(name) + asFloat, ok := asMap["value"].(float64) if !ok { continue } - float, err := strconv.ParseFloat(asString, 64) - if err != nil { - continue - } - - m := model.Metric{} - m["name"] = model.LabelValue(name) - m["percentile"] = model.LabelValue(p) - s := model.Sample{ Metric: m, - Value: model.SampleValue(float), + Value: model.SampleValue(asFloat), Timestamp: ti, } @@ -122,9 +159,51 @@ func (t *Target) Scrape() (samples []model.Sample, err error) { m[model.LabelName(baseK)] = model.LabelValue(baseV) } - samples = append(samples, s) + result.Samples = append(result.Samples, s) + case "histogram": + values, ok := asMap["value"].(map[string]interface{}) + if !ok { + continue + } + + for p, pValue := range values { + asString, ok := pValue.(string) + if !ok { + continue + } + + float, err := strconv.ParseFloat(asString, 64) + if err != nil { + continue + } + + m := model.Metric{} + m["name"] = model.LabelValue(name) + m["percentile"] = model.LabelValue(p) + + s := model.Sample{ + Metric: m, + Value: model.SampleValue(float), + Timestamp: ti, + } + + for baseK, baseV := range baseLabels { + m[model.LabelName(baseK)] = model.LabelValue(baseV) + } + + result.Samples = append(result.Samples, s) + } } } + + done <- true + }() + + select { + case <-done: + break + case <-time.After(t.Deadline): + err = fmt.Errorf("Target %s exceeded %s deadline.", t, t.Deadline) } return diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go new file mode 100644 index 000000000..94fe5c4af --- /dev/null +++ b/retrieval/targetmanager.go @@ -0,0 +1,66 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retrieval + +import ( + "container/heap" + "log" + "time" +) + +type TargetManager interface { + acquire() + release() + Add(t *Target) + Remove(t *Target) +} + +type targetManager struct { + requestAllowance chan bool + pools map[time.Duration]TargetPool + results chan Result +} + +func NewTargetManager(results chan Result, requestAllowance int) TargetManager { + return targetManager{ + requestAllowance: make(chan bool, requestAllowance), + results: results, + pools: make(map[time.Duration]TargetPool), + } +} + +func (m targetManager) acquire() { + m.requestAllowance <- true +} + +func (m targetManager) release() { + <-m.requestAllowance +} + +func (m targetManager) Add(t *Target) { + targetPool, ok := m.pools[t.Interval] + + if !ok { + targetPool.manager = m + log.Printf("Pool %s does not exist; creating and starting...", t.Interval) + go targetPool.Run(m.results, t.Interval) + } + + heap.Push(&targetPool, t) + m.pools[t.Interval] = targetPool +} + +func (m targetManager) Remove(t *Target) { + panic("not implemented") +} diff --git a/retrieval/targetpool.go b/retrieval/targetpool.go index cb02fffa0..00c55cd53 100644 --- a/retrieval/targetpool.go +++ b/retrieval/targetpool.go @@ -1,38 +1,92 @@ package retrieval import ( - "encoding/json" - "github.com/matttproud/prometheus/model" - "io/ioutil" - "net/http" - "strconv" + "container/heap" + "log" "time" ) -type TargetPool []*Target +type TargetPool struct { + done chan bool + targets []*Target + manager TargetManager +} + +func NewTargetPool(m TargetManager) (p TargetPool) { + p.manager = m + + return +} func (p TargetPool) Len() int { - return len(p) + return len(p.targets) } func (p TargetPool) Less(i, j int) bool { - return p[i].scheduledFor.Before(p[j].scheduledFor) + return p.targets[i].scheduledFor.Before(p.targets[j].scheduledFor) } func (p *TargetPool) Pop() interface{} { - oldPool := *p + oldPool := p.targets futureLength := p.Len() - 1 element := oldPool[futureLength] futurePool := oldPool[0:futureLength] - *p = futurePool + p.targets = futurePool return element } func (p *TargetPool) Push(element interface{}) { - *p = append(*p, element.(*Target)) + p.targets = append(p.targets, element.(*Target)) } func (p TargetPool) Swap(i, j int) { - p[i], p[j] = p[j], p[i] + p.targets[i], p.targets[j] = p.targets[j], p.targets[i] +} + +func (p *TargetPool) Run(results chan Result, interval time.Duration) { + ticker := time.Tick(interval) + + for { + select { + case <-ticker: + p.runIteration(results) + case <-p.done: + log.Printf("TargetPool exiting...") + break + } + } +} + +func (p TargetPool) Stop() { + p.done <- true +} + +func (p *TargetPool) runSingle(results chan Result, t *Target) { + p.manager.acquire() + defer p.manager.release() + + t.Scrape(results) +} + +func (p *TargetPool) runIteration(results chan Result) { + for i := 0; i < p.Len(); i++ { + target := heap.Pop(p).(*Target) + if target == nil { + break + } + + now := time.Now() + + if target.scheduledFor.After(now) { + heap.Push(p, target) + + break + } + + go func() { + p.runSingle(results, target) + heap.Push(p, target) + }() + } }