Generalize AM actions, batch pushed alerts as defined

This commit is contained in:
Fabian Reinartz 2015-09-30 17:45:37 +02:00
parent 25ee299cb9
commit 297bbf5a0a

View File

@ -68,7 +68,7 @@ func (t *AcceptanceTest) Alertmanager() *Alertmanager {
am := &Alertmanager{ am := &Alertmanager{
t: t.T, t: t.T,
opts: t.opts, opts: t.opts,
input: map[float64][]*types.Alert{}, actions: map[float64][]func(){},
} }
cf, err := ioutil.TempFile("", "am_config") cf, err := ioutil.TempFile("", "am_config")
@ -121,7 +121,7 @@ func (t *AcceptanceTest) Run() {
} }
for _, am := range t.ams { for _, am := range t.ams {
go am.runQueries() go am.runActions()
} }
var latest float64 var latest float64
@ -155,7 +155,7 @@ type Alertmanager struct {
confFile *os.File confFile *os.File
input map[float64][]*types.Alert actions map[float64][]func()
} }
// push declares alerts that are to be pushed to the Alertmanager // push declares alerts that are to be pushed to the Alertmanager
@ -165,7 +165,25 @@ func (am *Alertmanager) Push(at float64, alerts ...*TestAlert) {
for _, a := range alerts { for _, a := range alerts {
nas = append(nas, a.nativeAlert(am.opts)) nas = append(nas, a.nativeAlert(am.opts))
} }
am.input[at] = append(am.input[at], nas...)
am.Do(at, func() {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(nas); err != nil {
am.t.Error(err)
return
}
resp, err := http.Post(am.url+"/api/alerts", "application/json", &buf)
if err != nil {
am.t.Error(err)
return
}
resp.Body.Close()
})
}
func (am *Alertmanager) Do(at float64, f func()) {
am.actions[at] = append(am.actions[at], f)
} }
// start the alertmanager and wait until it is ready to receive. // start the alertmanager and wait until it is ready to receive.
@ -177,32 +195,21 @@ func (am *Alertmanager) start() {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
} }
// runQueries starts sending the declared alerts over time. // runActions performs the stored actions at the defined times.
func (am *Alertmanager) runQueries() { func (am *Alertmanager) runActions() {
var wg sync.WaitGroup var wg sync.WaitGroup
for at, as := range am.input { for at, fs := range am.actions {
ts := am.opts.expandTime(at) ts := am.opts.expandTime(at)
wg.Add(1) wg.Add(len(fs))
go func(as ...*types.Alert) {
defer wg.Done()
for _, f := range fs {
go func() {
time.Sleep(ts.Sub(time.Now())) time.Sleep(ts.Sub(time.Now()))
f()
var buf bytes.Buffer wg.Done()
if err := json.NewEncoder(&buf).Encode(as); err != nil { }()
am.t.Error(err)
return
} }
resp, err := http.Post(am.url+"/api/alerts", "application/json", &buf)
if err != nil {
am.t.Error(err)
return
}
resp.Body.Close()
}(as...)
} }
wg.Wait() wg.Wait()