Merge pull request #1830 from pgier/amtool-tests

test/cli: add basic amtool cli tests
This commit is contained in:
stuart nelson 2019-07-24 17:41:10 +02:00 committed by GitHub
commit a74758e4c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 1395 additions and 0 deletions

665
test/cli/acceptance.go Normal file
View File

@ -0,0 +1,665 @@
// Copyright 2019 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"sync"
"syscall"
"testing"
"time"
httptransport "github.com/go-openapi/runtime/client"
"github.com/go-openapi/strfmt"
apiclient "github.com/prometheus/alertmanager/api/v2/client"
"github.com/prometheus/alertmanager/api/v2/client/general"
"github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/alertmanager/cli/format"
)
const (
// amtool is the relative path to local amtool binary.
amtool = "../../../amtool"
)
// AcceptanceTest provides declarative definition of given inputs and expected
// output of an Alertmanager setup.
type AcceptanceTest struct {
*testing.T
opts *AcceptanceOpts
amc *AlertmanagerCluster
collectors []*Collector
actions map[float64][]func()
}
// AcceptanceOpts defines configuration parameters for an acceptance test.
type AcceptanceOpts struct {
RoutePrefix string
Tolerance time.Duration
baseTime time.Time
}
func (opts *AcceptanceOpts) alertString(a *models.GettableAlert) string {
if a.EndsAt == nil || time.Time(*a.EndsAt).IsZero() {
return fmt.Sprintf("%v[%v:]", a, opts.relativeTime(time.Time(*a.StartsAt)))
}
return fmt.Sprintf("%v[%v:%v]", a, opts.relativeTime(time.Time(*a.StartsAt)), opts.relativeTime(time.Time(*a.EndsAt)))
}
// expandTime returns the absolute time for the relative time
// calculated from the test's base time.
func (opts *AcceptanceOpts) expandTime(rel float64) time.Time {
return opts.baseTime.Add(time.Duration(rel * float64(time.Second)))
}
// expandTime returns the relative time for the given time
// calculated from the test's base time.
func (opts *AcceptanceOpts) relativeTime(act time.Time) float64 {
return float64(act.Sub(opts.baseTime)) / float64(time.Second)
}
// NewAcceptanceTest returns a new acceptance test with the base time
// set to the current time.
func NewAcceptanceTest(t *testing.T, opts *AcceptanceOpts) *AcceptanceTest {
test := &AcceptanceTest{
T: t,
opts: opts,
actions: map[float64][]func(){},
}
// TODO: Should this really be set during creation time? Why not do this
// during Run() time, maybe there is something else long happening between
// creation and running.
opts.baseTime = time.Now()
return test
}
// freeAddress returns a new listen address not currently in use.
func freeAddress() string {
// Let the OS allocate a free address, close it and hope
// it is still free when starting Alertmanager.
l, err := net.Listen("tcp4", "localhost:0")
if err != nil {
panic(err)
}
defer func() {
if err := l.Close(); err != nil {
panic(err)
}
}()
return l.Addr().String()
}
// AmtoolOk verifies that the "amtool" file exists in the correct location for testing,
// and is a regular file.
func AmtoolOk() (bool, error) {
stat, err := os.Stat(amtool)
if err != nil {
return false, fmt.Errorf("Error accessing amtool command, try 'make build' to generate the file. %v", err)
} else if stat.IsDir() {
return false, fmt.Errorf("file %s is a directory, expecting a binary executable file", amtool)
}
return true, nil
}
// Do sets the given function to be executed at the given time.
func (t *AcceptanceTest) Do(at float64, f func()) {
t.actions[at] = append(t.actions[at], f)
}
// AlertmanagerCluster returns a new AlertmanagerCluster that allows starting a
// cluster of Alertmanager instances on random ports.
func (t *AcceptanceTest) AlertmanagerCluster(conf string, size int) *AlertmanagerCluster {
amc := AlertmanagerCluster{}
for i := 0; i < size; i++ {
am := &Alertmanager{
t: t,
opts: t.opts,
}
dir, err := ioutil.TempDir("", "am_test")
if err != nil {
t.Fatal(err)
}
am.dir = dir
cf, err := os.Create(filepath.Join(dir, "config.yml"))
if err != nil {
t.Fatal(err)
}
am.confFile = cf
am.UpdateConfig(conf)
am.apiAddr = freeAddress()
am.clusterAddr = freeAddress()
transport := httptransport.New(am.apiAddr, t.opts.RoutePrefix+"/api/v2/", nil)
am.clientV2 = apiclient.New(transport, strfmt.Default)
amc.ams = append(amc.ams, am)
}
t.amc = &amc
return &amc
}
// Collector returns a new collector bound to the test instance.
func (t *AcceptanceTest) Collector(name string) *Collector {
co := &Collector{
t: t.T,
name: name,
opts: t.opts,
collected: map[float64][]models.GettableAlerts{},
expected: map[Interval][]models.GettableAlerts{},
}
t.collectors = append(t.collectors, co)
return co
}
// Run starts all Alertmanagers and runs queries against them. It then checks
// whether all expected notifications have arrived at the expected receiver.
func (t *AcceptanceTest) Run() {
errc := make(chan error)
for _, am := range t.amc.ams {
am.errc = errc
defer func(am *Alertmanager) {
am.Terminate()
am.cleanup()
t.Logf("stdout:\n%v", am.cmd.Stdout)
t.Logf("stderr:\n%v", am.cmd.Stderr)
}(am)
}
err := t.amc.Start()
if err != nil {
t.T.Fatal(err)
}
go t.runActions()
var latest float64
for _, coll := range t.collectors {
if l := coll.latest(); l > latest {
latest = l
}
}
deadline := t.opts.expandTime(latest)
select {
case <-time.After(time.Until(deadline)):
// continue
case err := <-errc:
t.Error(err)
}
}
// runActions performs the stored actions at the defined times.
func (t *AcceptanceTest) runActions() {
var wg sync.WaitGroup
for at, fs := range t.actions {
ts := t.opts.expandTime(at)
wg.Add(len(fs))
for _, f := range fs {
go func(f func()) {
time.Sleep(time.Until(ts))
f()
wg.Done()
}(f)
}
}
wg.Wait()
}
type buffer struct {
b bytes.Buffer
mtx sync.Mutex
}
func (b *buffer) Write(p []byte) (int, error) {
b.mtx.Lock()
defer b.mtx.Unlock()
return b.b.Write(p)
}
func (b *buffer) String() string {
b.mtx.Lock()
defer b.mtx.Unlock()
return b.b.String()
}
// Alertmanager encapsulates an Alertmanager process and allows
// declaring alerts being pushed to it at fixed points in time.
type Alertmanager struct {
t *AcceptanceTest
opts *AcceptanceOpts
apiAddr string
clusterAddr string
clientV2 *apiclient.Alertmanager
cmd *exec.Cmd
confFile *os.File
dir string
errc chan<- error
}
// AlertmanagerCluster represents a group of Alertmanager instances
// acting as a cluster.
type AlertmanagerCluster struct {
ams []*Alertmanager
}
// Start the Alertmanager cluster and wait until it is ready to receive.
func (amc *AlertmanagerCluster) Start() error {
var peerFlags []string
for _, am := range amc.ams {
peerFlags = append(peerFlags, "--cluster.peer="+am.clusterAddr)
}
for _, am := range amc.ams {
err := am.Start(peerFlags)
if err != nil {
return fmt.Errorf("starting alertmanager cluster: %v", err.Error())
}
}
for _, am := range amc.ams {
err := am.WaitForCluster(len(amc.ams))
if err != nil {
return fmt.Errorf("waiting alertmanager cluster: %v", err.Error())
}
}
return nil
}
// Members returns the underlying slice of cluster members.
func (amc *AlertmanagerCluster) Members() []*Alertmanager {
return amc.ams
}
// Start the alertmanager and wait until it is ready to receive.
func (am *Alertmanager) Start(additionalArg []string) error {
am.t.Helper()
args := []string{
"--config.file", am.confFile.Name(),
"--log.level", "debug",
"--web.listen-address", am.apiAddr,
"--storage.path", am.dir,
"--cluster.listen-address", am.clusterAddr,
"--cluster.settle-timeout", "0s",
}
if am.opts.RoutePrefix != "" {
args = append(args, "--web.route-prefix", am.opts.RoutePrefix)
}
args = append(args, additionalArg...)
cmd := exec.Command("../../../alertmanager", args...)
if am.cmd == nil {
var outb, errb buffer
cmd.Stdout = &outb
cmd.Stderr = &errb
} else {
cmd.Stdout = am.cmd.Stdout
cmd.Stderr = am.cmd.Stderr
}
am.cmd = cmd
if err := am.cmd.Start(); err != nil {
return fmt.Errorf("starting alertmanager failed: %s", err)
}
go func() {
if err := am.cmd.Wait(); err != nil {
am.errc <- err
}
}()
time.Sleep(50 * time.Millisecond)
for i := 0; i < 10; i++ {
resp, err := http.Get(am.getURL("/"))
if err != nil {
time.Sleep(500 * time.Millisecond)
continue
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("starting alertmanager failed: expected HTTP status '200', got '%d'", resp.StatusCode)
}
_, err = ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("starting alertmanager failed: %s", err)
}
return nil
}
return fmt.Errorf("starting alertmanager failed: timeout")
}
// WaitForCluster waits for the Alertmanager instance to join a cluster with the
// given size.
func (am *Alertmanager) WaitForCluster(size int) error {
params := general.NewGetStatusParams()
params.WithContext(context.Background())
var status general.GetStatusOK
// Poll for 2s
for i := 0; i < 20; i++ {
status, err := am.clientV2.General.GetStatus(params)
if err != nil {
return err
}
if len(status.Payload.Cluster.Peers) == size {
return nil
}
time.Sleep(100 * time.Millisecond)
}
return fmt.Errorf(
"failed to wait for Alertmanager instance %q to join cluster: expected %v peers, but got %v",
am.clusterAddr,
size,
len(status.Payload.Cluster.Peers),
)
}
// Terminate kills the underlying Alertmanager cluster processes and removes intermediate
// data.
func (amc *AlertmanagerCluster) Terminate() {
for _, am := range amc.ams {
am.Terminate()
}
}
// Terminate kills the underlying Alertmanager process and remove intermediate
// data.
func (am *Alertmanager) Terminate() {
am.t.Helper()
if err := syscall.Kill(am.cmd.Process.Pid, syscall.SIGTERM); err != nil {
am.t.Fatalf("Error sending SIGTERM to Alertmanager process: %v", err)
}
}
// Reload sends the reloading signal to the Alertmanager instances.
func (amc *AlertmanagerCluster) Reload() {
for _, am := range amc.ams {
am.Reload()
}
}
// Reload sends the reloading signal to the Alertmanager process.
func (am *Alertmanager) Reload() {
am.t.Helper()
if err := syscall.Kill(am.cmd.Process.Pid, syscall.SIGHUP); err != nil {
am.t.Fatalf("Error sending SIGHUP to Alertmanager process: %v", err)
}
}
func (am *Alertmanager) cleanup() {
am.t.Helper()
if err := os.RemoveAll(am.confFile.Name()); err != nil {
am.t.Errorf("Error removing test config file %q: %v", am.confFile.Name(), err)
}
}
// Version runs the 'amtool' command with the --version option and checks
// for appropriate output.
func Version() (string, error) {
cmd := exec.Command(amtool, "--version")
out, err := cmd.CombinedOutput()
if err != nil {
return "", err
}
versionRE := regexp.MustCompile(`^amtool, version (\d+\.\d+\.\d+) *`)
matched := versionRE.FindStringSubmatch(string(out))
if len(matched) != 2 {
return "", errors.New("Unable to match version info regex: " + string(out))
}
return matched[1], nil
}
// AddAlertsAt declares alerts that are to be added to the Alertmanager
// server at a relative point in time.
func (am *Alertmanager) AddAlertsAt(at float64, alerts ...*TestAlert) {
am.t.Do(at, func() {
am.AddAlerts(alerts...)
})
}
// AddAlerts declares alerts that are to be added to the Alertmanager server.
func (am *Alertmanager) AddAlerts(alerts ...*TestAlert) {
for _, alert := range alerts {
out, err := am.addAlertCommand(alert)
if err != nil {
am.t.Errorf("Error adding alert: %v\nOutput: %s", err, string(out))
}
}
}
func (am *Alertmanager) addAlertCommand(alert *TestAlert) ([]byte, error) {
amURLFlag := "--alertmanager.url=" + am.getURL("/")
args := []string{amURLFlag, "alert", "add"}
for key, val := range alert.labels {
args = append(args, key+"="+val)
}
startsAt := strfmt.DateTime(am.opts.expandTime(alert.startsAt))
args = append(args, "--start="+startsAt.String())
if alert.endsAt > alert.startsAt {
endsAt := strfmt.DateTime(am.opts.expandTime(alert.endsAt))
args = append(args, "--end="+endsAt.String())
}
cmd := exec.Command(amtool, args...)
return cmd.CombinedOutput()
}
// QueryAlerts uses the amtool cli to query alerts.
func (am *Alertmanager) QueryAlerts() ([]TestAlert, error) {
amURLFlag := "--alertmanager.url=" + am.getURL("/")
cmd := exec.Command(amtool, amURLFlag, "alert", "query")
output, err := cmd.CombinedOutput()
if err != nil {
return nil, err
}
return parseAlertQueryResponse(output)
}
func parseAlertQueryResponse(data []byte) ([]TestAlert, error) {
alerts := []TestAlert{}
lines := strings.Split(string(data), "\n")
header, lines := lines[0], lines[1:len(lines)-1]
startTimePos := strings.Index(header, "Starts At")
if startTimePos == -1 {
return alerts, errors.New("Invalid header: " + header)
}
summPos := strings.Index(header, "Summary")
if summPos == -1 {
return alerts, errors.New("Invalid header: " + header)
}
for _, line := range lines {
alertName := strings.TrimSpace(line[0:startTimePos])
startTime := strings.TrimSpace(line[startTimePos:summPos])
startsAt, err := time.Parse(format.DefaultDateFormat, startTime)
if err != nil {
return alerts, err
}
summary := strings.TrimSpace(line[summPos:])
alert := TestAlert{
labels: models.LabelSet{"name": alertName},
startsAt: float64(startsAt.Unix()),
summary: summary,
}
alerts = append(alerts, alert)
}
return alerts, nil
}
// SetSilence updates or creates the given Silence.
func (amc *AlertmanagerCluster) SetSilence(at float64, sil *TestSilence) {
for _, am := range amc.ams {
am.SetSilence(at, sil)
}
}
// SetSilence updates or creates the given Silence.
func (am *Alertmanager) SetSilence(at float64, sil *TestSilence) {
out, err := am.addSilenceCommand(sil)
if err != nil {
am.t.T.Errorf("Unable to set silence %v %v", err, string(out))
}
}
// addSilenceCommand adds a silence using the 'amtool silence add' command.
func (am *Alertmanager) addSilenceCommand(sil *TestSilence) ([]byte, error) {
amURLFlag := "--alertmanager.url=" + am.getURL("/")
args := []string{amURLFlag, "silence", "add"}
if sil.comment != "" {
args = append(args, "--comment="+sil.comment)
}
args = append(args, sil.match...)
cmd := exec.Command(amtool, args...)
return cmd.CombinedOutput()
}
// QuerySilence queries the current silences using the 'amtool silence query' command.
func (am *Alertmanager) QuerySilence() ([]TestSilence, error) {
amURLFlag := "--alertmanager.url=" + am.getURL("/")
args := []string{amURLFlag, "silence", "query"}
cmd := exec.Command(amtool, args...)
out, err := cmd.CombinedOutput()
if err != nil {
am.t.T.Error("Silence query command failed: ", err)
}
return parseSilenceQueryResponse(out)
}
var (
silenceHeaderFields = []string{"ID", "Matchers", "Ends At", "Created By", "Comment"}
)
func parseSilenceQueryResponse(data []byte) ([]TestSilence, error) {
sils := []TestSilence{}
lines := strings.Split(string(data), "\n")
header, lines := lines[0], lines[1:len(lines)-1]
matchersPos := strings.Index(header, silenceHeaderFields[1])
if matchersPos == -1 {
return sils, errors.New("Invalid header: " + header)
}
endsAtPos := strings.Index(header, silenceHeaderFields[2])
if endsAtPos == -1 {
return sils, errors.New("Invalid header: " + header)
}
createdByPos := strings.Index(header, silenceHeaderFields[3])
if createdByPos == -1 {
return sils, errors.New("Invalid header: " + header)
}
commentPos := strings.Index(header, silenceHeaderFields[4])
if commentPos == -1 {
return sils, errors.New("Invalid header: " + header)
}
for _, line := range lines {
id := strings.TrimSpace(line[0:matchersPos])
matchers := strings.TrimSpace(line[matchersPos:endsAtPos])
endsAtString := strings.TrimSpace(line[endsAtPos:createdByPos])
endsAt, err := time.Parse(format.DefaultDateFormat, endsAtString)
if err != nil {
return sils, err
}
createdBy := strings.TrimSpace(line[createdByPos:commentPos])
comment := strings.TrimSpace(line[commentPos:])
silence := TestSilence{
id: id,
endsAt: float64(endsAt.Unix()),
match: strings.Split(matchers, " "),
createdBy: createdBy,
comment: comment,
}
sils = append(sils, silence)
}
return sils, nil
}
// DelSilence deletes the silence with the sid at the given time.
func (amc *AlertmanagerCluster) DelSilence(at float64, sil *TestSilence) {
for _, am := range amc.ams {
am.DelSilence(at, sil)
}
}
// DelSilence deletes the silence with the sid at the given time.
func (am *Alertmanager) DelSilence(at float64, sil *TestSilence) {
output, err := am.expireSilenceCommand(sil)
if err != nil {
am.t.Errorf("Error expiring silence %v: %s", string(output), err)
return
}
}
// expireSilenceCommand expires a silence using the 'amtool silence expire' command.
func (am *Alertmanager) expireSilenceCommand(sil *TestSilence) ([]byte, error) {
amURLFlag := "--alertmanager.url=" + am.getURL("/")
args := []string{amURLFlag, "silence", "expire", sil.ID()}
cmd := exec.Command(amtool, args...)
return cmd.CombinedOutput()
}
// UpdateConfig rewrites the configuration file for the Alertmanager cluster. It
// does not initiate config reloading.
func (amc *AlertmanagerCluster) UpdateConfig(conf string) {
for _, am := range amc.ams {
am.UpdateConfig(conf)
}
}
// UpdateConfig rewrites the configuration file for the Alertmanager. It does not
// initiate config reloading.
func (am *Alertmanager) UpdateConfig(conf string) {
if _, err := am.confFile.WriteString(conf); err != nil {
am.t.Fatal(err)
return
}
if err := am.confFile.Sync(); err != nil {
am.t.Fatal(err)
return
}
}
func (am *Alertmanager) getURL(path string) string {
return fmt.Sprintf("http://%s%s%s", am.apiAddr, am.opts.RoutePrefix, path)
}

View File

@ -0,0 +1,169 @@
// Copyright 2019 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import (
"fmt"
"os"
"testing"
"time"
. "github.com/prometheus/alertmanager/test/cli"
"github.com/stretchr/testify/require"
)
func TestMain(m *testing.M) {
if ok, err := AmtoolOk(); !ok {
panic("unable to access amtool binary: " + err.Error())
}
os.Exit(m.Run())
}
// TestAmtoolVersion checks that amtool is executable and
// is reporting valid version info.
func TestAmtoolVersion(t *testing.T) {
t.Parallel()
version, err := Version()
if err != nil {
t.Fatal("Unable to get amtool version", err)
}
t.Logf("testing amtool version: %v", version)
}
func TestAddAlert(t *testing.T) {
t.Parallel()
conf := `
route:
receiver: "default"
group_by: [alertname]
group_wait: 1s
group_interval: 1s
repeat_interval: 1ms
receivers:
- name: "default"
webhook_configs:
- url: 'http://%s'
send_resolved: true
`
at := NewAcceptanceTest(t, &AcceptanceOpts{
Tolerance: 150 * time.Millisecond,
})
co := at.Collector("webhook")
wh := NewWebhook(co)
amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1)
am := amc.Members()[0]
alert1 := Alert("alertname", "test1").Active(1, 2)
am.AddAlertsAt(0, alert1)
co.Want(Between(1, 2), Alert("alertname", "test1").Active(1))
at.Run()
t.Log(co.Check())
}
func TestQueryAlert(t *testing.T) {
t.Parallel()
conf := `
route:
receiver: "default"
group_by: [alertname]
group_wait: 1s
group_interval: 1s
repeat_interval: 1ms
receivers:
- name: "default"
webhook_configs:
- url: 'http://%s'
send_resolved: true
`
at := NewAcceptanceTest(t, &AcceptanceOpts{
Tolerance: 1 * time.Second,
})
co := at.Collector("webhook")
wh := NewWebhook(co)
amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1)
require.NoError(t, amc.Start())
defer amc.Terminate()
am := amc.Members()[0]
alert1 := Alert("alertname", "test1", "severity", "warning").Active(1)
alert2 := Alert("alertname", "test2", "severity", "info").Active(1)
am.AddAlerts(alert1, alert2)
alerts, err := am.QueryAlerts()
if err != nil {
t.Fatal("Failed to query alerts", err)
}
expectedAlerts := 2
if len(alerts) != expectedAlerts {
t.Fatalf("Incorrect number of alerts, expected %v, got %v", expectedAlerts, len(alerts))
}
}
func TestQuerySilence(t *testing.T) {
t.Parallel()
conf := `
route:
receiver: "default"
group_by: [alertname]
group_wait: 1s
group_interval: 1s
repeat_interval: 1ms
receivers:
- name: "default"
webhook_configs:
- url: 'http://%s'
send_resolved: true
`
at := NewAcceptanceTest(t, &AcceptanceOpts{
Tolerance: 1 * time.Second,
})
co := at.Collector("webhook")
wh := NewWebhook(co)
amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1)
require.NoError(t, amc.Start())
defer amc.Terminate()
am := amc.Members()[0]
silence1 := Silence(0, 4).Match("alertname=test1", "severity=warn").Comment("test1")
silence2 := Silence(0, 4).Match("foo").Comment("test foo")
am.SetSilence(0, silence1)
am.SetSilence(0, silence2)
sils, err := am.QuerySilence()
if err != nil {
t.Error("Failed to query silences: ", err)
}
expectedSils := 2
if len(sils) != expectedSils {
t.Errorf("Incorrect number of silences queried, expected: %v, actual: %v", expectedSils, len(sils))
}
}

258
test/cli/collector.go Normal file
View File

@ -0,0 +1,258 @@
// Copyright 2019 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import (
"encoding/json"
"fmt"
"sync"
"testing"
"time"
"github.com/prometheus/alertmanager/api/v2/models"
)
// Collector gathers alerts received by a notification receiver
// and verifies whether all arrived and within the correct time boundaries.
type Collector struct {
t *testing.T
name string
opts *AcceptanceOpts
collected map[float64][]models.GettableAlerts
expected map[Interval][]models.GettableAlerts
mtx sync.RWMutex
}
func (c *Collector) String() string {
return c.name
}
// Collected returns a map of alerts collected by the collector indexed with the
// receive timestamp.
func (c *Collector) Collected() map[float64][]models.GettableAlerts {
c.mtx.RLock()
defer c.mtx.RUnlock()
return c.collected
}
func batchesEqual(as, bs models.GettableAlerts, opts *AcceptanceOpts) bool {
if len(as) != len(bs) {
return false
}
for _, a := range as {
found := false
for _, b := range bs {
if equalAlerts(a, b, opts) {
found = true
break
}
}
if !found {
return false
}
}
return true
}
// latest returns the latest relative point in time where a notification is
// expected.
func (c *Collector) latest() float64 {
c.mtx.RLock()
defer c.mtx.RUnlock()
var latest float64
for iv := range c.expected {
if iv.end > latest {
latest = iv.end
}
}
return latest
}
// Want declares that the Collector expects to receive the given alerts
// within the given time boundaries.
func (c *Collector) Want(iv Interval, alerts ...*TestAlert) {
c.mtx.Lock()
defer c.mtx.Unlock()
var nas models.GettableAlerts
for _, a := range alerts {
nas = append(nas, a.nativeAlert(c.opts))
}
c.expected[iv] = append(c.expected[iv], nas)
}
// add the given alerts to the collected alerts.
func (c *Collector) add(alerts ...*models.GettableAlert) {
c.mtx.Lock()
defer c.mtx.Unlock()
arrival := c.opts.relativeTime(time.Now())
c.collected[arrival] = append(c.collected[arrival], models.GettableAlerts(alerts))
}
func (c *Collector) Check() string {
report := fmt.Sprintf("\ncollector %q:\n\n", c)
c.mtx.RLock()
defer c.mtx.RUnlock()
for iv, expected := range c.expected {
report += fmt.Sprintf("interval %v\n", iv)
var alerts []models.GettableAlerts
for at, got := range c.collected {
if iv.contains(at) {
alerts = append(alerts, got...)
}
}
for _, exp := range expected {
found := len(exp) == 0 && len(alerts) == 0
report += fmt.Sprintf("---\n")
for _, e := range exp {
report += fmt.Sprintf("- %v\n", c.opts.alertString(e))
}
for _, a := range alerts {
if batchesEqual(exp, a, c.opts) {
found = true
break
}
}
if found {
report += fmt.Sprintf(" [ ✓ ]\n")
} else {
c.t.Fail()
report += fmt.Sprintf(" [ ✗ ]\n")
}
}
}
// Detect unexpected notifications.
var totalExp, totalAct int
for _, exp := range c.expected {
for _, e := range exp {
totalExp += len(e)
}
}
for _, act := range c.collected {
for _, a := range act {
if len(a) == 0 {
c.t.Error("received empty notifications")
}
totalAct += len(a)
}
}
if totalExp != totalAct {
c.t.Fail()
report += fmt.Sprintf("\nExpected total of %d alerts, got %d", totalExp, totalAct)
}
if c.t.Failed() {
report += "\nreceived:\n"
for at, col := range c.collected {
for _, alerts := range col {
report += fmt.Sprintf("@ %v\n", at)
for _, a := range alerts {
report += fmt.Sprintf("- %v\n", c.opts.alertString(a))
}
}
}
}
return report
}
// alertsToString returns a string representation of the given Alerts. Use for
// debugging.
func alertsToString(as []*models.GettableAlert) (string, error) {
b, err := json.Marshal(as)
if err != nil {
return "", err
}
return string(b), nil
}
// CompareCollectors compares two collectors based on their collected alerts
func CompareCollectors(a, b *Collector, opts *AcceptanceOpts) (bool, error) {
f := func(collected map[float64][]models.GettableAlerts) []*models.GettableAlert {
result := []*models.GettableAlert{}
for _, batches := range collected {
for _, batch := range batches {
for _, alert := range batch {
result = append(result, alert)
}
}
}
return result
}
aAlerts := f(a.Collected())
bAlerts := f(b.Collected())
if len(aAlerts) != len(bAlerts) {
aAsString, err := alertsToString(aAlerts)
if err != nil {
return false, err
}
bAsString, err := alertsToString(bAlerts)
if err != nil {
return false, err
}
err = fmt.Errorf(
"first collector has %v alerts, second collector has %v alerts\n%v\n%v",
len(aAlerts), len(bAlerts),
aAsString, bAsString,
)
return false, err
}
for _, aAlert := range aAlerts {
found := false
for _, bAlert := range bAlerts {
if equalAlerts(aAlert, bAlert, opts) {
found = true
break
}
}
if !found {
aAsString, err := alertsToString([]*models.GettableAlert{aAlert})
if err != nil {
return false, err
}
bAsString, err := alertsToString(bAlerts)
if err != nil {
return false, err
}
err = fmt.Errorf(
"could not find matching alert for alert from first collector\n%v\nin alerts of second collector\n%v",
aAsString, bAsString,
)
return false, err
}
}
return true, nil
}

303
test/cli/mock.go Normal file
View File

@ -0,0 +1,303 @@
// Copyright 2019 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import (
"encoding/json"
"fmt"
"net"
"net/http"
"reflect"
"time"
"github.com/go-openapi/strfmt"
"github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/alertmanager/notify/webhook"
)
// At is a convenience method to allow for declarative syntax of Acceptance
// test definitions.
func At(ts float64) float64 {
return ts
}
type Interval struct {
start, end float64
}
func (iv Interval) String() string {
return fmt.Sprintf("[%v,%v]", iv.start, iv.end)
}
func (iv Interval) contains(f float64) bool {
return f >= iv.start && f <= iv.end
}
// Between is a convenience constructor for an interval for declarative syntax
// of Acceptance test definitions.
func Between(start, end float64) Interval {
return Interval{start: start, end: end}
}
// TestSilence models a model.Silence with relative times.
type TestSilence struct {
id string
createdBy string
match []string
matchRE []string
startsAt, endsAt float64
comment string
}
// Silence creates a new TestSilence active for the relative interval given
// by start and end.
func Silence(start, end float64) *TestSilence {
return &TestSilence{
startsAt: start,
endsAt: end,
}
}
// Match adds a new plain matcher to the silence.
func (s *TestSilence) Match(v ...string) *TestSilence {
s.match = append(s.match, v...)
return s
}
// MatchRE adds a new regex matcher to the silence
func (s *TestSilence) MatchRE(v ...string) *TestSilence {
if len(v)%2 == 1 {
panic("bad key/values")
}
s.matchRE = append(s.matchRE, v...)
return s
}
// Comment sets the comment to the silence.
func (s *TestSilence) Comment(c string) *TestSilence {
s.comment = c
return s
}
// SetID sets the silence ID.
func (s *TestSilence) SetID(ID string) {
s.id = ID
}
// ID gets the silence ID.
func (s *TestSilence) ID() string {
return s.id
}
// TestAlert models a model.Alert with relative times.
type TestAlert struct {
labels models.LabelSet
annotations models.LabelSet
startsAt, endsAt float64
summary string
}
// Alert creates a new alert declaration with the given key/value pairs
// as identifying labels.
func Alert(keyval ...interface{}) *TestAlert {
if len(keyval)%2 == 1 {
panic("bad key/values")
}
a := &TestAlert{
labels: models.LabelSet{},
annotations: models.LabelSet{},
}
for i := 0; i < len(keyval); i += 2 {
ln := keyval[i].(string)
lv := keyval[i+1].(string)
a.labels[ln] = lv
}
return a
}
// nativeAlert converts the declared test alert into a full alert based
// on the given parameters.
func (a *TestAlert) nativeAlert(opts *AcceptanceOpts) *models.GettableAlert {
na := &models.GettableAlert{
Alert: models.Alert{
Labels: a.labels,
},
Annotations: a.annotations,
StartsAt: &strfmt.DateTime{},
EndsAt: &strfmt.DateTime{},
}
if a.startsAt > 0 {
start := strfmt.DateTime(opts.expandTime(a.startsAt))
na.StartsAt = &start
}
if a.endsAt > 0 {
end := strfmt.DateTime(opts.expandTime(a.endsAt))
na.EndsAt = &end
}
return na
}
// Annotate the alert with the given key/value pairs.
func (a *TestAlert) Annotate(keyval ...interface{}) *TestAlert {
if len(keyval)%2 == 1 {
panic("bad key/values")
}
for i := 0; i < len(keyval); i += 2 {
ln := keyval[i].(string)
lv := keyval[i+1].(string)
a.annotations[ln] = lv
}
return a
}
// Active declares the relative activity time for this alert. It
// must be a single starting value or two values where the second value
// declares the resolved time.
func (a *TestAlert) Active(tss ...float64) *TestAlert {
if len(tss) > 2 || len(tss) == 0 {
panic("only one or two timestamps allowed")
}
if len(tss) == 2 {
a.endsAt = tss[1]
}
a.startsAt = tss[0]
return a
}
func equalAlerts(a, b *models.GettableAlert, opts *AcceptanceOpts) bool {
if !reflect.DeepEqual(a.Labels, b.Labels) {
return false
}
if !reflect.DeepEqual(a.Annotations, b.Annotations) {
return false
}
if !equalTime(time.Time(*a.StartsAt), time.Time(*b.StartsAt), opts) {
return false
}
if (a.EndsAt == nil) != (b.EndsAt == nil) {
return false
}
if !(a.EndsAt == nil) && !(b.EndsAt == nil) && !equalTime(time.Time(*a.EndsAt), time.Time(*b.EndsAt), opts) {
return false
}
return true
}
func equalTime(a, b time.Time, opts *AcceptanceOpts) bool {
if a.IsZero() != b.IsZero() {
return false
}
diff := a.Sub(b)
if diff < 0 {
diff = -diff
}
return diff <= opts.Tolerance
}
type MockWebhook struct {
opts *AcceptanceOpts
collector *Collector
listener net.Listener
// Func is called early on when retrieving a notification by an
// Alertmanager. If Func returns true, the given notification is dropped.
// See sample usage in `send_test.go/TestRetry()`.
Func func(timestamp float64) bool
}
func NewWebhook(c *Collector) *MockWebhook {
l, err := net.Listen("tcp4", "localhost:0")
if err != nil {
// TODO(fabxc): if shutdown of mock destinations ever becomes a concern
// we want to shut them down after test completion. Then we might want to
// log the error properly, too.
panic(err)
}
wh := &MockWebhook{
listener: l,
collector: c,
opts: c.opts,
}
go func() {
if err := http.Serve(l, wh); err != nil {
panic(err)
}
}()
return wh
}
func (ws *MockWebhook) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Inject Func if it exists.
if ws.Func != nil {
if ws.Func(ws.opts.relativeTime(time.Now())) {
return
}
}
dec := json.NewDecoder(req.Body)
defer req.Body.Close()
var v webhook.Message
if err := dec.Decode(&v); err != nil {
panic(err)
}
// Transform the webhook message alerts back into model.Alerts.
var alerts models.GettableAlerts
for _, a := range v.Alerts {
var (
labels = models.LabelSet{}
annotations = models.LabelSet{}
)
for k, v := range a.Labels {
labels[k] = v
}
for k, v := range a.Annotations {
annotations[k] = v
}
start := strfmt.DateTime(a.StartsAt)
end := strfmt.DateTime(a.EndsAt)
alerts = append(alerts, &models.GettableAlert{
Alert: models.Alert{
Labels: labels,
GeneratorURL: strfmt.URI(a.GeneratorURL),
},
Annotations: annotations,
StartsAt: &start,
EndsAt: &end,
})
}
ws.collector.add(alerts...)
}
func (ws *MockWebhook) Address() string {
return ws.listener.Addr().String()
}