Merge pull request #1 from prometheus/event-model-reworking

Change model to be more state- and less event-focussed.
This commit is contained in:
juliusv 2013-07-19 02:17:43 -07:00
commit f362b04f61
9 changed files with 165 additions and 104 deletions

22
main.go
View File

@ -32,7 +32,7 @@ func main() {
aggregator := manager.NewAggregator()
defer aggregator.Close()
summarizer := new(manager.SummaryDispatcher)
summarizer := manager.NewSummaryDispatcher()
go aggregator.Dispatch(summarizer)
log.Println("Done.")
@ -40,10 +40,28 @@ func main() {
AlertManagerService: &api.AlertManagerService{
Aggregator: aggregator,
},
AlertsHandler: nil,
AlertsHandler: &web.AlertsHandler{
Aggregator: aggregator,
},
}
go webService.ServeForever()
// BEGIN EXAMPLE CODE - replace with config loading later.
done := make(chan bool)
go func() {
rules := manager.AggregationRules{
&manager.AggregationRule{
Filters: manager.Filters{manager.NewFilter("service", "discovery")},
},
}
aggregator.SetRules(rules)
done <- true
}()
<-done
// END EXAMPLE CODE
log.Println("Running summary dispatcher...")
summarizer.Dispatch(suppressor)
}

View File

@ -19,11 +19,9 @@ import (
"time"
)
type aggDispatchState int
const (
aggUnsent aggDispatchState = iota
aggSent
minimumRefreshPeriod = 5 * time.Minute
notificationRetryPeriod = 1 * time.Minute
)
// AggregationRule creates and manages the scope for received events.
@ -33,13 +31,25 @@ type AggregationRule struct {
RepeatRate time.Duration
}
type AggregationInstances []*AggregationInstance
type AggregationInstance struct {
Rule *AggregationRule
Events Events
Rule *AggregationRule
Event *Event
EndsAt time.Time
// When was this AggregationInstance created?
Created time.Time
// When was the last refresh received into this AggregationInstance?
LastRefreshed time.Time
state aggDispatchState
// When was the last successful notification sent out for this
// AggregationInstance?
lastNotificationSent time.Time
// Timer used to trigger a notification retry/resend.
notificationResendTimer *time.Timer
// Timer used to trigger the deletion of the AggregationInstance after it
// hasn't been refreshed for too long.
expiryTimer *time.Timer
}
func (r *AggregationRule) Handles(e *Event) bool {
@ -47,90 +57,79 @@ func (r *AggregationRule) Handles(e *Event) bool {
}
func (r *AggregationInstance) Ingest(e *Event) {
r.Events = append(r.Events, e)
}
r.Event = e
r.LastRefreshed = time.Now()
func (r *AggregationInstance) Tidy() {
// BUG(matt): Drop this in favor of having the entire AggregationInstance
// being dropped when too old.
log.Println("Tidying...")
if len(r.Events) == 0 {
return
}
events := Events{}
t := time.Now()
for _, e := range r.Events {
if t.Before(e.CreatedAt) {
events = append(events, e)
}
}
if len(events) == 0 {
r.state = aggSent
}
r.Events = events
r.expiryTimer.Reset(minimumRefreshPeriod)
}
func (r *AggregationInstance) SendNotification(s SummaryReceiver) {
if r.state == aggSent {
if time.Since(r.lastNotificationSent) < r.Rule.RepeatRate {
return
}
err := s.Receive(&EventSummary{
Rule: r.Rule,
Events: r.Events,
Rule: r.Rule,
Event: r.Event,
})
if err != nil {
if err.Retryable() {
return
}
log.Println("Unretryable error while sending notification:", err)
log.Printf("Error while sending notification: %s, retrying in %v", err, notificationRetryPeriod)
r.resendNotificationAfter(notificationRetryPeriod, s)
return
}
r.state = aggSent
r.resendNotificationAfter(r.Rule.RepeatRate, s)
r.lastNotificationSent = time.Now()
}
func (r *AggregationInstance) resendNotificationAfter(d time.Duration, s SummaryReceiver) {
// BUG: we can't just call SendNotification whenever the timer ends without
// any synchronisation. The timer should instead feed into a channel which is
// served by the main Dispatch() loop.
r.notificationResendTimer = time.AfterFunc(d, func() {
r.SendNotification(s)
})
}
func (r *AggregationInstance) Close() {
if r.notificationResendTimer != nil {
r.notificationResendTimer.Stop()
}
if r.expiryTimer != nil {
r.expiryTimer.Stop()
}
}
type AggregationRules []*AggregationRule
type Aggregator struct {
Rules AggregationRules
// Used for O(1) lookup and removal of aggregations when new ones come into the system.
Aggregates map[uint64]*AggregationInstance
// TODO: Add priority queue sorted by expiration time.Time (newest, oldest).
// When a new element comes into this queue and the last head is not equal to
// current head, cancel the existing internal timer and create a new timer for
// expiry.Sub(time.Now) and have that (<- chan time.Time) funnel into the
// event into the dispatch loop where the present tidy call is made. Delete
// tidy, and just shift the head element of the priority queue off and remove
// it from the O(1) membership index above.
Aggregates map[EventFingerprint]*AggregationInstance
// TODO?: Build a new priority queue type that uses an internal wrapper container for
// the AggregationInstance it decorates to note the last dispatch time. The
// queue uses higher-level add and remove methods.
// SHORTFALL: Needing to garbage collect aggregations across three containers?
aggRequests chan *aggregateEventsRequest
rulesRequests chan *aggregatorResetRulesRequest
closed chan bool
aggRequests chan *aggregateEventsRequest
getAggregatesRequests chan *getAggregatesRequest
removeAggregateRequests chan EventFingerprint
rulesRequests chan *aggregatorResetRulesRequest
closed chan bool
}
func NewAggregator() *Aggregator {
return &Aggregator{
Aggregates: make(map[uint64]*AggregationInstance),
Aggregates: make(map[EventFingerprint]*AggregationInstance),
aggRequests: make(chan *aggregateEventsRequest),
rulesRequests: make(chan *aggregatorResetRulesRequest),
closed: make(chan bool),
aggRequests: make(chan *aggregateEventsRequest),
getAggregatesRequests: make(chan *getAggregatesRequest),
removeAggregateRequests: make(chan EventFingerprint),
rulesRequests: make(chan *aggregatorResetRulesRequest),
closed: make(chan bool),
}
}
func (a *Aggregator) Close() {
close(a.rulesRequests)
close(a.aggRequests)
close(a.getAggregatesRequests)
close(a.removeAggregateRequests)
<-a.closed
close(a.closed)
@ -146,6 +145,14 @@ type aggregateEventsRequest struct {
Response chan *aggregateEventsResponse
}
type getAggregatesResponse struct {
Aggregates AggregationInstances
}
type getAggregatesRequest struct {
Response chan getAggregatesResponse
}
func (a *Aggregator) aggregate(req *aggregateEventsRequest, s SummaryReceiver) {
if len(a.Rules) == 0 {
req.Response <- &aggregateEventsResponse{
@ -162,8 +169,14 @@ func (a *Aggregator) aggregate(req *aggregateEventsRequest, s SummaryReceiver) {
fp := element.Fingerprint()
aggregation, ok := a.Aggregates[fp]
if !ok {
expTimer := time.AfterFunc(minimumRefreshPeriod, func() {
a.removeAggregateRequests <- fp
})
aggregation = &AggregationInstance{
Rule: r,
Rule: r,
Created: time.Now(),
expiryTimer: expTimer,
}
a.Aggregates[fp] = aggregation
@ -196,6 +209,26 @@ func (a *Aggregator) replaceRules(r *aggregatorResetRulesRequest) {
close(r.Response)
}
func (a *Aggregator) AlertAggregates() AggregationInstances {
req := &getAggregatesRequest{
Response: make(chan getAggregatesResponse),
}
a.getAggregatesRequests <- req
result := <-req.Response
return result.Aggregates
}
func (a *Aggregator) aggregates() AggregationInstances {
aggs := make(AggregationInstances, 0, len(a.Aggregates))
for _, agg := range a.Aggregates {
aggs = append(aggs, agg)
}
return aggs
}
func (a *Aggregator) Receive(e Events) error {
req := &aggregateEventsRequest{
Events: e,
@ -244,10 +277,17 @@ func (a *Aggregator) Dispatch(s SummaryReceiver) {
closed++
}
case <-t.C:
for _, a := range a.Aggregates {
a.Tidy()
case req := <-a.getAggregatesRequests:
aggs := a.aggregates()
req.Response <- getAggregatesResponse{
Aggregates: aggs,
}
close(req.Response)
case fp := <-a.removeAggregateRequests:
log.Println("Deleting expired aggregation instance", a)
a.Aggregates[fp].Close()
delete(a.Aggregates, fp)
}
}

View File

@ -34,7 +34,7 @@ func DispatcherFor(destination string) DestinationDispatcher {
type EventSummary struct {
Rule *AggregationRule
Events Events
Event *Event
Destination string
}
@ -116,7 +116,7 @@ func (d *SummaryDispatcher) Receive(s *EventSummary) RemoteError {
}
func (d *SummaryDispatcher) dispatchSummary(r *summaryDispatchRequest, i IsInhibitedInterrogator) {
if i.IsInhibited(r.Summary.Events[0]) {
if i.IsInhibited(r.Summary.Event) {
r.Response <- &summaryDispatchResponse{
Disposition: SUPPRESSED,
}
@ -124,6 +124,9 @@ func (d *SummaryDispatcher) dispatchSummary(r *summaryDispatchRequest, i IsInhib
}
// BUG: Perform sending of summaries.
r.Response <- &summaryDispatchResponse{
Disposition: DISPATCHED,
}
}
func (d *SummaryDispatcher) Dispatch(i IsInhibitedInterrogator) {

View File

@ -17,28 +17,23 @@ import (
"fmt"
"hash/fnv"
"sort"
"time"
)
type EventFingerprint uint64
// Event models an action triggered by Prometheus.
type Event struct {
// Label value pairs for purpose of aggregation, matching, and disposition
// dispatching. This must minimally include a "name" label.
Labels map[string]string
// CreatedAt indicates when the event was created.
CreatedAt time.Time
// ExpiresAt is the allowed lifetime for this event before it is reaped.
ExpiresAt time.Time
// Extra key/value information which is not used for aggregation.
Payload map[string]string
}
func (e Event) Fingerprint() uint64 {
func (e Event) Fingerprint() EventFingerprint {
keys := []string{}
for k := range e.Payload {
for k := range e.Labels {
keys = append(keys, k)
}
@ -50,7 +45,7 @@ func (e Event) Fingerprint() uint64 {
fmt.Fprintf(summer, k, e.Labels[k])
}
return summer.Sum64()
return EventFingerprint(summer.Sum64())
}
type Events []*Event

View File

@ -14,11 +14,21 @@
package web
import (
"github.com/prometheus/alert_manager/manager"
"net/http"
)
type AlertsHandler struct {}
type AlertStatus struct {
AlertAggregates []*manager.AggregationInstance
}
type AlertsHandler struct {
Aggregator *manager.Aggregator
}
func (h *AlertsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
executeTemplate(w, "alerts", nil)
alertStatus := &AlertStatus{
AlertAggregates: h.Aggregator.AlertAggregates(),
}
executeTemplate(w, "alerts", alertStatus)
}

View File

@ -17,7 +17,7 @@ import (
"net/http"
)
type SilencesHandler struct {}
type SilencesHandler struct{}
func (h *SilencesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
executeTemplate(w, "silences", nil)

View File

@ -17,7 +17,7 @@ import (
"net/http"
)
type StatusHandler struct {}
type StatusHandler struct{}
func (h *StatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
executeTemplate(w, "status", nil)

View File

@ -16,25 +16,31 @@
<th>Alert</th>
<th>Labels</th>
<th>Active Since</th>
<th>Last Refreshed</th>
<th>Surpressed</th>
</tr>
</thead>
<tbody>
{{range .AlertAggregates}}
<tr>
<td>{{.Event}} <button class="btn btn-mini">Silence Alert</button></td>
<td>{{.Event.Labels}} <button class="btn btn-mini">Silence Instance</button></td>
<td>{{.Created}}</td>
<td>{{.LastRefreshed}}</td>
<td>No</td>
</tr>
{{end}}
<tr>
<td>TheTaxesAreTooDamnHigh <button class="btn btn-mini">Silence Alert</button></td>
<td>{foo="bar",baz="biz"} <button class="btn btn-mini">Silence Instance</button></td>
<td>...</td>
<td>...</td>
<td>No</td>
</tr>
<tr>
<td>TheTaxesAreTooDamnHigh <button class="btn btn-mini">Silence Alert</button></td>
<td>{foo="bar",baz="biz"} <button class="btn btn-mini">Silence Instance</button></td>
<td>...</td>
<td>No</td>
</tr>
<tr>
<td>TheTaxesAreTooDamnHigh <button class="btn btn-mini">Silence Alert</button></td>
<td>{foo="bar",baz="biz"} <button class="btn btn-mini">Silence Instance</button></td>
<td>...</td>
<td>No</td>
</tr>
@ -42,17 +48,6 @@
<td>TheTaxesAreTooDamnHigh <button class="btn btn-mini">Silence Alert</button></td>
<td>{foo="bar",baz="biz"} <button class="btn btn-mini">Silence Instance</button></td>
<td>...</td>
<td>No</td>
</tr>
<tr>
<td>TheTaxesAreTooDamnHigh <button class="btn btn-mini">Silence Alert</button></td>
<td>{foo="bar",baz="biz"} <button class="btn btn-mini">Silence Instance</button></td>
<td>...</td>
<td>No</td>
</tr>
<tr>
<td>TheTaxesAreTooDamnHigh <button class="btn btn-mini">Silence Alert</button></td>
<td>{foo="bar",baz="biz"} <button class="btn btn-mini">Silence Instance</button></td>
<td>...</td>
<td>No</td>
</tr>

View File

@ -38,9 +38,9 @@ var (
type WebService struct {
AlertManagerService *api.AlertManagerService
AlertsHandler *AlertsHandler
SilencesHandler *SilencesHandler
StatusHandler *StatusHandler
AlertsHandler *AlertsHandler
SilencesHandler *SilencesHandler
StatusHandler *StatusHandler
}
func (w WebService) ServeForever() error {