Added possibility to pass callback to *mem.NewAlerts, useful for implementing limits on alerts.

Update provider/mem/mem.go

Co-authored-by: Julien Pivotto <roidelapluie@gmail.com>
Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
This commit is contained in:
Peter Štibraný 2021-05-05 15:26:23 +02:00
parent 243accb771
commit cc0b08fd7c
7 changed files with 158 additions and 13 deletions

View File

@ -321,7 +321,7 @@ func run() int {
go peer.Settle(ctx, *gossipInterval*10)
}
alerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, logger)
alerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, nil, logger)
if err != nil {
level.Error(logger).Log("err", err)
return 1

View File

@ -365,7 +365,7 @@ route:
logger := log.NewNopLogger()
route := NewRoute(conf.Route, nil)
marker := types.NewMarker(prometheus.NewRegistry())
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, logger)
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger)
if err != nil {
t.Fatal(err)
}
@ -527,7 +527,7 @@ func newAlert(labels model.LabelSet) *types.Alert {
func TestDispatcherRace(t *testing.T) {
logger := log.NewNopLogger()
marker := types.NewMarker(prometheus.NewRegistry())
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, logger)
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger)
if err != nil {
t.Fatal(err)
}
@ -544,7 +544,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T)
logger := log.NewNopLogger()
marker := types.NewMarker(prometheus.NewRegistry())
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, logger)
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger)
if err != nil {
t.Fatal(err)
}

1
go.mod
View File

@ -30,6 +30,7 @@ require (
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546
github.com/stretchr/testify v1.7.0
github.com/xlab/treeprint v1.1.0
go.uber.org/atomic v1.5.0
golang.org/x/net v0.0.0-20210421230115-4e50805a0758
golang.org/x/tools v0.1.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6

2
go.sum
View File

@ -533,6 +533,7 @@ go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
@ -560,6 +561,7 @@ golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTk
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=

View File

@ -38,16 +38,36 @@ type Alerts struct {
listeners map[int]listeningAlerts
next int
callback AlertStoreCallback
logger log.Logger
}
type AlertStoreCallback interface {
// PreStore is called before alert is stored into the store. If this method returns error,
// this error is passed back to caller of Alerts.Put method.
// Existing flag indicates whether alert has existed before (and is only updated) or not.
// If alert has existed before, then alert passed to PreStore is result of merging existing alert with new alert.
PreStore(alert *types.Alert, existing bool) error
// PostStore is called after alert has been put into store.
PostStore(alert *types.Alert, existing bool)
// PostDelete is called after alert has been removed from the store due to alert garbage collection.
PostDelete(alert *types.Alert)
}
type listeningAlerts struct {
alerts chan *types.Alert
done chan struct{}
}
// NewAlerts returns a new alert provider.
func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, l log.Logger) (*Alerts, error) {
func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, alertCallback AlertStoreCallback, l log.Logger) (*Alerts, error) {
if alertCallback == nil {
alertCallback = noopCallback{}
}
ctx, cancel := context.WithCancel(ctx)
a := &Alerts{
alerts: store.NewAlerts(),
@ -55,6 +75,7 @@ func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, l
listeners: map[int]listeningAlerts{},
next: 0,
logger: log.With(l, "component", "provider"),
callback: alertCallback,
}
a.alerts.SetGCCallback(func(alerts []*types.Alert) {
for _, alert := range alerts {
@ -62,6 +83,7 @@ func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, l
// they are resolved. Alerts waiting for resolved notifications are
// held in memory in aggregation groups redundantly.
m.Delete(alert.Fingerprint())
a.callback.PostDelete(alert)
}
a.mtx.Lock()
@ -148,13 +170,18 @@ func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) {
// Put adds the given alert to the set.
func (a *Alerts) Put(alerts ...*types.Alert) error {
errs := &types.MultiError{}
for _, alert := range alerts {
fp := alert.Fingerprint()
existing := false
// Check that there's an alert existing within the store before
// trying to merge.
if old, err := a.alerts.Get(fp); err == nil {
existing = true
// Merge alerts if there is an overlap in activity range.
if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) ||
(alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) {
@ -162,11 +189,19 @@ func (a *Alerts) Put(alerts ...*types.Alert) error {
}
}
if err := a.callback.PreStore(alert, existing); err != nil {
errs.Add(err)
continue
}
if err := a.alerts.Set(alert); err != nil {
errs.Add(err)
level.Error(a.logger).Log("msg", "error on set alert", "err", err)
continue
}
a.callback.PostStore(alert, existing)
a.mtx.Lock()
for _, l := range a.listeners {
select {
@ -177,5 +212,14 @@ func (a *Alerts) Put(alerts ...*types.Alert) error {
a.mtx.Unlock()
}
if errs.Len() > 0 {
return errs
}
return nil
}
type noopCallback struct{}
func (n noopCallback) PreStore(_ *types.Alert, _ bool) error { return nil }
func (n noopCallback) PostStore(_ *types.Alert, _ bool) {}
func (n noopCallback) PostDelete(_ *types.Alert) {}

View File

@ -25,11 +25,13 @@ import (
"github.com/go-kit/kit/log"
"github.com/kylelemons/godebug/pretty"
"github.com/prometheus/alertmanager/store"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/prometheus/alertmanager/store"
"github.com/prometheus/alertmanager/types"
)
var (
@ -85,7 +87,7 @@ func init() {
// a listener can not unsubscribe as the lock is hold by `alerts.Lock`.
func TestAlertsSubscribePutStarvation(t *testing.T) {
marker := types.NewMarker(prometheus.NewRegistry())
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, log.NewNopLogger())
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, log.NewNopLogger())
if err != nil {
t.Fatal(err)
}
@ -136,7 +138,7 @@ func TestAlertsSubscribePutStarvation(t *testing.T) {
func TestAlertsPut(t *testing.T) {
marker := types.NewMarker(prometheus.NewRegistry())
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, log.NewNopLogger())
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, log.NewNopLogger())
if err != nil {
t.Fatal(err)
}
@ -164,7 +166,7 @@ func TestAlertsSubscribe(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
alerts, err := NewAlerts(ctx, marker, 30*time.Minute, log.NewNopLogger())
alerts, err := NewAlerts(ctx, marker, 30*time.Minute, noopCallback{}, log.NewNopLogger())
if err != nil {
t.Fatal(err)
}
@ -241,7 +243,7 @@ func TestAlertsSubscribe(t *testing.T) {
func TestAlertsGetPending(t *testing.T) {
marker := types.NewMarker(prometheus.NewRegistry())
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, log.NewNopLogger())
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, log.NewNopLogger())
if err != nil {
t.Fatal(err)
}
@ -284,7 +286,7 @@ func TestAlertsGetPending(t *testing.T) {
func TestAlertsGC(t *testing.T) {
marker := types.NewMarker(prometheus.NewRegistry())
alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, log.NewNopLogger())
alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, noopCallback{}, log.NewNopLogger())
if err != nil {
t.Fatal(err)
}
@ -316,6 +318,71 @@ func TestAlertsGC(t *testing.T) {
}
}
func TestAlertsStoreCallback(t *testing.T) {
cb := &limitCountCallback{limit: 3}
marker := types.NewMarker(prometheus.NewRegistry())
alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, cb, log.NewNopLogger())
if err != nil {
t.Fatal(err)
}
err = alerts.Put(alert1, alert2, alert3)
if err != nil {
t.Fatal(err)
}
if num := cb.alerts.Load(); num != 3 {
t.Fatalf("unexpected number of alerts in the store, expected %v, got %v", 3, num)
}
alert1Mod := *alert1
alert1Mod.Annotations = model.LabelSet{"foo": "bar", "new": "test"} // Update annotations for alert1
alert4 := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{"bar4": "foo4"},
Annotations: model.LabelSet{"foo4": "bar4"},
StartsAt: t0,
EndsAt: t1,
GeneratorURL: "http://example.com/prometheus",
},
UpdatedAt: t0,
Timeout: false,
}
err = alerts.Put(&alert1Mod, alert4)
// Verify that we failed to put new alert into store
if err == nil || err.Error() != errTooManyAlerts.Error() {
t.Fatalf("expected %v, got %v", errTooManyAlerts, err)
}
if num := cb.alerts.Load(); num != 3 {
t.Fatalf("unexpected number of alerts in the store, expected %v, got %v", 3, num)
}
// But we still managed to update alert1, since callback doesn't report error when updating existing alert.
a, err := alerts.Get(alert1.Fingerprint())
if err != nil {
t.Fatal(err)
}
if !alertsEqual(a, &alert1Mod) {
t.Errorf("Unexpected alert")
t.Fatalf(pretty.Compare(a, &alert1Mod))
}
// Now wait until existing alerts are GC-ed, and make sure that callback was called.
time.Sleep(300 * time.Millisecond)
if num := cb.alerts.Load(); num != 0 {
t.Fatalf("unexpected number of alerts in the store, expected %v, got %v", 0, num)
}
err = alerts.Put(alert4)
if err != nil {
t.Fatal(err)
}
}
func alertsEqual(a1, a2 *types.Alert) bool {
if a1 == nil || a2 == nil {
return false
@ -340,3 +407,32 @@ func alertsEqual(a1, a2 *types.Alert) bool {
}
return a1.Timeout == a2.Timeout
}
type limitCountCallback struct {
alerts atomic.Int32
limit int
}
var errTooManyAlerts = fmt.Errorf("too many alerts")
func (l *limitCountCallback) PreStore(_ *types.Alert, existing bool) error {
if existing {
return nil
}
if int(l.alerts.Load())+1 > l.limit {
return errTooManyAlerts
}
return nil
}
func (l *limitCountCallback) PostStore(_ *types.Alert, existing bool) {
if !existing {
l.alerts.Inc()
}
}
func (l *limitCountCallback) PostDelete(_ *types.Alert) {
l.alerts.Dec()
}

View File

@ -83,6 +83,8 @@ type Alerts interface {
GetPending() AlertIterator
// Get returns the alert for a given fingerprint.
Get(model.Fingerprint) (*types.Alert, error)
// Put adds the given alert to the set.
// Put adds the given set of alerts to the set.
// This operation is not atomic -- when error is returned, some alerts may have
// be stored already.
Put(...*types.Alert) error
}