alertmanager/test/acceptance.go

411 lines
9.6 KiB
Go
Raw Normal View History

2015-10-11 15:24:49 +00:00
// Copyright 2015 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2015-09-29 18:45:38 +00:00
package test
import (
"bytes"
"encoding/json"
2015-09-29 20:40:44 +00:00
"fmt"
2015-09-29 18:45:38 +00:00
"io/ioutil"
"net"
"net/http"
2015-09-29 18:45:38 +00:00
"os"
"os/exec"
"path/filepath"
2015-09-29 18:45:38 +00:00
"sync"
"syscall"
2015-09-29 18:45:38 +00:00
"testing"
"time"
"github.com/prometheus/client_golang/api"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/alertmanager/cli"
2015-09-29 18:45:38 +00:00
)
2015-10-02 10:32:19 +00:00
// AcceptanceTest provides declarative definition of given inputs and expected
// output of an Alertmanager setup.
type AcceptanceTest struct {
2015-09-29 18:45:38 +00:00
*testing.T
opts *AcceptanceOpts
2015-09-29 18:45:38 +00:00
2015-09-29 20:40:44 +00:00
ams []*Alertmanager
collectors []*Collector
actions map[float64][]func()
2015-09-29 18:45:38 +00:00
}
2015-10-02 10:32:19 +00:00
// AcceptanceOpts defines configuration paramters for an acceptance test.
type AcceptanceOpts struct {
Tolerance time.Duration
baseTime time.Time
2015-09-29 18:45:38 +00:00
}
func (opts *AcceptanceOpts) alertString(a *model.Alert) string {
if a.EndsAt.IsZero() {
return fmt.Sprintf("%s[%v:]", a, opts.relativeTime(a.StartsAt))
}
return fmt.Sprintf("%s[%v:%v]", a, opts.relativeTime(a.StartsAt), opts.relativeTime(a.EndsAt))
}
2015-10-02 10:32:19 +00:00
// expandTime returns the absolute time for the relative time
// calculated from the test's base time.
func (opts *AcceptanceOpts) expandTime(rel float64) time.Time {
return opts.baseTime.Add(time.Duration(rel * float64(time.Second)))
}
2015-10-02 10:32:19 +00:00
// expandTime returns the relative time for the given time
// calculated from the test's base time.
func (opts *AcceptanceOpts) relativeTime(act time.Time) float64 {
return float64(act.Sub(opts.baseTime)) / float64(time.Second)
}
2015-10-02 10:32:19 +00:00
// NewAcceptanceTest returns a new acceptance test with the base time
// set to the current time.
func NewAcceptanceTest(t *testing.T, opts *AcceptanceOpts) *AcceptanceTest {
test := &AcceptanceTest{
T: t,
opts: opts,
actions: map[float64][]func(){},
2015-09-29 18:45:38 +00:00
}
2015-09-29 20:40:44 +00:00
opts.baseTime = time.Now()
return test
2015-09-29 18:45:38 +00:00
}
2015-10-02 10:32:19 +00:00
// freeAddress returns a new listen address not currently in use.
func freeAddress() string {
2015-10-01 20:15:27 +00:00
// Let the OS allocate a free address, close it and hope
// it is still free when starting Alertmanager.
l, err := net.Listen("tcp4", "localhost:0")
2015-10-01 20:15:27 +00:00
if err != nil {
panic(err)
}
2015-10-01 20:15:27 +00:00
defer l.Close()
2015-10-01 20:15:27 +00:00
return l.Addr().String()
}
// Do sets the given function to be executed at the given time.
func (t *AcceptanceTest) Do(at float64, f func()) {
t.actions[at] = append(t.actions[at], f)
}
// Alertmanager returns a new structure that allows starting an instance
2015-09-29 20:40:44 +00:00
// of Alertmanager on a random port.
func (t *AcceptanceTest) Alertmanager(conf string) *Alertmanager {
2015-09-29 20:40:44 +00:00
am := &Alertmanager{
t: t,
opts: t.opts,
2015-09-29 20:40:44 +00:00
}
2015-09-29 18:45:38 +00:00
dir, err := ioutil.TempDir("", "am_test")
if err != nil {
t.Fatal(err)
}
2015-10-07 14:18:55 +00:00
am.dir = dir
cf, err := os.Create(filepath.Join(dir, "config.yml"))
2015-09-29 18:45:38 +00:00
if err != nil {
t.Fatal(err)
}
2015-09-29 20:40:44 +00:00
am.confFile = cf
am.UpdateConfig(conf)
2018-02-07 15:36:47 +00:00
am.apiAddr = freeAddress()
am.clusterAddr = freeAddress()
2018-02-07 15:36:47 +00:00
t.Logf("AM on %s", am.apiAddr)
2015-10-01 19:28:18 +00:00
client, err := api.NewClient(api.Config{
2018-02-07 15:36:47 +00:00
Address: fmt.Sprintf("http://%s", am.apiAddr),
})
if err != nil {
t.Fatal(err)
}
am.client = client
2015-09-29 20:40:44 +00:00
t.ams = append(t.ams, am)
return am
}
2015-10-02 10:32:19 +00:00
// Collector returns a new collector bound to the test instance.
func (t *AcceptanceTest) Collector(name string) *Collector {
co := &Collector{
2015-09-29 20:40:44 +00:00
t: t.T,
2015-09-30 13:02:07 +00:00
name: name,
2015-09-29 20:40:44 +00:00
opts: t.opts,
collected: map[float64][]model.Alerts{},
2015-10-05 11:22:23 +00:00
expected: map[Interval][]model.Alerts{},
2015-09-29 20:40:44 +00:00
}
t.collectors = append(t.collectors, co)
return co
}
// Run starts all Alertmanagers and runs queries against them. It then checks
2015-11-10 12:47:04 +00:00
// whether all expected notifications have arrived at the expected receiver.
func (t *AcceptanceTest) Run() {
2015-10-15 14:15:37 +00:00
errc := make(chan error)
2015-09-29 20:40:44 +00:00
for _, am := range t.ams {
2015-10-15 14:15:37 +00:00
am.errc = errc
am.Start()
defer func(am *Alertmanager) {
am.Terminate()
am.cleanup()
t.Logf("stdout:\n%v", am.cmd.Stdout)
t.Logf("stderr:\n%v", am.cmd.Stderr)
}(am)
2015-09-29 20:40:44 +00:00
}
2015-10-15 14:15:37 +00:00
go t.runActions()
2015-09-29 20:40:44 +00:00
var latest float64
for _, coll := range t.collectors {
if l := coll.latest(); l > latest {
latest = l
}
}
deadline := t.opts.expandTime(latest)
2015-10-15 14:15:37 +00:00
select {
case <-time.After(deadline.Sub(time.Now())):
// continue
case err := <-errc:
2015-10-15 14:17:04 +00:00
t.Error(err)
2015-10-15 14:15:37 +00:00
}
2015-09-29 20:40:44 +00:00
for _, coll := range t.collectors {
report := coll.check()
t.Log(report)
}
}
2015-09-29 18:45:38 +00:00
// runActions performs the stored actions at the defined times.
func (t *AcceptanceTest) runActions() {
var wg sync.WaitGroup
for at, fs := range t.actions {
ts := t.opts.expandTime(at)
wg.Add(len(fs))
for _, f := range fs {
go func(f func()) {
time.Sleep(ts.Sub(time.Now()))
f()
wg.Done()
}(f)
}
}
wg.Wait()
}
type buffer struct {
b bytes.Buffer
mtx sync.Mutex
}
func (b *buffer) Write(p []byte) (int, error) {
b.mtx.Lock()
defer b.mtx.Unlock()
return b.b.Write(p)
}
func (b *buffer) String() string {
b.mtx.Lock()
defer b.mtx.Unlock()
return b.b.String()
}
2015-09-29 20:40:44 +00:00
// Alertmanager encapsulates an Alertmanager process and allows
// declaring alerts being pushed to it at fixed points in time.
type Alertmanager struct {
t *AcceptanceTest
opts *AcceptanceOpts
2015-09-29 18:45:38 +00:00
2018-02-07 15:36:47 +00:00
apiAddr string
clusterAddr string
client api.Client
2018-02-07 15:36:47 +00:00
cmd *exec.Cmd
confFile *os.File
dir string
2015-10-15 14:15:37 +00:00
errc chan<- error
2015-09-29 20:40:44 +00:00
}
// Start the alertmanager and wait until it is ready to receive.
func (am *Alertmanager) Start() {
2015-10-07 14:18:55 +00:00
cmd := exec.Command("../../alertmanager",
"--config.file", am.confFile.Name(),
"--log.level", "debug",
2018-02-07 15:36:47 +00:00
"--web.listen-address", am.apiAddr,
"--storage.path", am.dir,
"--cluster.listen-address", am.clusterAddr,
"--cluster.settle-timeout", "0s",
2015-10-07 14:18:55 +00:00
)
if am.cmd == nil {
var outb, errb buffer
2015-10-07 14:18:55 +00:00
cmd.Stdout = &outb
cmd.Stderr = &errb
} else {
cmd.Stdout = am.cmd.Stdout
cmd.Stderr = am.cmd.Stderr
}
am.cmd = cmd
if err := am.cmd.Start(); err != nil {
am.t.Fatalf("Starting alertmanager failed: %s", err)
}
2015-10-15 14:15:37 +00:00
go func() {
if err := am.cmd.Wait(); err != nil {
am.errc <- err
}
}()
2015-10-07 14:18:55 +00:00
time.Sleep(50 * time.Millisecond)
for i := 0; i < 10; i++ {
2018-02-07 15:36:47 +00:00
resp, err := http.Get(fmt.Sprintf("http://%s/status", am.apiAddr))
if err == nil {
_, err := ioutil.ReadAll(resp.Body)
if err != nil {
am.t.Fatalf("Starting alertmanager failed: %s", err)
}
resp.Body.Close()
return
}
time.Sleep(500 * time.Millisecond)
}
am.t.Fatalf("Starting alertmanager failed: timeout")
}
// Terminate kills the underlying Alertmanager process and remove intermediate
// data.
func (am *Alertmanager) Terminate() {
syscall.Kill(am.cmd.Process.Pid, syscall.SIGTERM)
}
// Reload sends the reloading signal to the Alertmanager process.
func (am *Alertmanager) Reload() {
syscall.Kill(am.cmd.Process.Pid, syscall.SIGHUP)
}
func (am *Alertmanager) cleanup() {
os.RemoveAll(am.confFile.Name())
}
// Push declares alerts that are to be pushed to the Alertmanager
2015-09-29 20:40:44 +00:00
// server at a relative point in time.
func (am *Alertmanager) Push(at float64, alerts ...*TestAlert) {
var cas []cli.Alert
for i := range alerts {
a := alerts[i].nativeAlert(am.opts)
al := cli.Alert{
Labels: cli.LabelSet{},
Annotations: cli.LabelSet{},
StartsAt: a.StartsAt,
EndsAt: a.EndsAt,
GeneratorURL: a.GeneratorURL,
}
for n, v := range a.Labels {
al.Labels[cli.LabelName(n)] = cli.LabelValue(v)
}
for n, v := range a.Annotations {
al.Annotations[cli.LabelName(n)] = cli.LabelValue(v)
}
cas = append(cas, al)
2015-09-29 18:45:38 +00:00
}
alertAPI := cli.NewAlertAPI(am.client)
am.t.Do(at, func() {
if err := alertAPI.Push(context.Background(), cas...); err != nil {
am.t.Errorf("Error pushing %v: %s", cas, err)
}
})
}
// SetSilence updates or creates the given Silence.
func (am *Alertmanager) SetSilence(at float64, sil *TestSilence) {
am.t.Do(at, func() {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(sil.nativeSilence(am.opts)); err != nil {
am.t.Errorf("Error setting silence %v: %s", sil, err)
return
}
2018-02-07 15:36:47 +00:00
resp, err := http.Post(fmt.Sprintf("http://%s/api/v1/silences", am.apiAddr), "application/json", &buf)
if err != nil {
2015-12-08 10:53:28 +00:00
am.t.Errorf("Error setting silence %v: %s", sil, err)
return
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
panic(err)
}
var v struct {
Status string `json:"status"`
Data struct {
2016-08-30 09:58:27 +00:00
SilenceID string `json:"silenceId"`
} `json:"data"`
}
2016-08-30 09:58:27 +00:00
if err := json.Unmarshal(b, &v); err != nil || resp.StatusCode/100 != 2 {
am.t.Errorf("error setting silence %v: %s", sil, err)
return
}
sil.SetID(v.Data.SilenceID)
})
}
// DelSilence deletes the silence with the sid at the given time.
func (am *Alertmanager) DelSilence(at float64, sil *TestSilence) {
am.t.Do(at, func() {
req, err := http.NewRequest("DELETE", fmt.Sprintf("http://%s/api/v1/silence/%s", am.apiAddr, sil.ID()), nil)
if err != nil {
am.t.Errorf("Error deleting silence %v: %s", sil, err)
return
}
2016-08-30 09:58:27 +00:00
resp, err := http.DefaultClient.Do(req)
if err != nil || resp.StatusCode/100 != 2 {
2015-12-08 10:53:28 +00:00
am.t.Errorf("Error deleting silence %v: %s", sil, err)
return
}
})
}
// UpdateConfig rewrites the configuration file for the Alertmanager. It does not
// initiate config reloading.
func (am *Alertmanager) UpdateConfig(conf string) {
if _, err := am.confFile.WriteString(conf); err != nil {
am.t.Fatal(err)
return
}
if err := am.confFile.Sync(); err != nil {
am.t.Fatal(err)
return
}
}