mirror of
https://github.com/prometheus/alertmanager
synced 2025-02-04 12:42:16 +00:00
Implement PagerDuty notifications.
This commit is contained in:
parent
f431335c69
commit
70e67b920c
9
main.go
9
main.go
@ -38,9 +38,10 @@ func main() {
|
||||
suppressor := manager.NewSuppressor()
|
||||
defer suppressor.Close()
|
||||
|
||||
summarizer := manager.NewSummaryDispatcher()
|
||||
notifier := manager.NewNotifier(conf.NotificationConfig)
|
||||
defer notifier.Close()
|
||||
|
||||
aggregator := manager.NewAggregator(summarizer)
|
||||
aggregator := manager.NewAggregator(notifier)
|
||||
defer aggregator.Close()
|
||||
|
||||
webService := &web.WebService{
|
||||
@ -61,8 +62,8 @@ func main() {
|
||||
}
|
||||
go webService.ServeForever()
|
||||
|
||||
aggregator.SetRules(conf.AggregationRules())
|
||||
aggregator.SetRules(conf.AggregationRules())
|
||||
|
||||
log.Println("Running summary dispatcher...")
|
||||
summarizer.Dispatch(suppressor)
|
||||
notifier.Dispatch(suppressor)
|
||||
}
|
||||
|
@ -65,28 +65,26 @@ func (r *AggregationInstance) Ingest(e *Event) {
|
||||
r.expiryTimer.Reset(minimumRefreshPeriod)
|
||||
}
|
||||
|
||||
func (r *AggregationInstance) SendNotification(s SummaryReceiver) {
|
||||
func (r *AggregationInstance) SendNotification(n Notifier) {
|
||||
if time.Since(r.lastNotificationSent) < r.Rule.RepeatRate {
|
||||
return
|
||||
}
|
||||
|
||||
err := s.Receive(&EventSummary{
|
||||
Rule: r.Rule,
|
||||
Event: r.Event,
|
||||
})
|
||||
err := n.QueueNotification(r.Event, r.Rule.NotificationConfig)
|
||||
if err != nil {
|
||||
// BUG: Limit the number of retries.
|
||||
log.Printf("Error while sending notification: %s, retrying in %v", err, notificationRetryPeriod)
|
||||
r.resendNotificationAfter(notificationRetryPeriod, s)
|
||||
r.resendNotificationAfter(notificationRetryPeriod, n)
|
||||
return
|
||||
}
|
||||
|
||||
r.resendNotificationAfter(r.Rule.RepeatRate, s)
|
||||
r.resendNotificationAfter(r.Rule.RepeatRate, n)
|
||||
r.lastNotificationSent = time.Now()
|
||||
}
|
||||
|
||||
func (r *AggregationInstance) resendNotificationAfter(d time.Duration, s SummaryReceiver) {
|
||||
func (r *AggregationInstance) resendNotificationAfter(d time.Duration, n Notifier) {
|
||||
r.notificationResendTimer = time.AfterFunc(d, func() {
|
||||
r.SendNotification(s)
|
||||
r.SendNotification(n)
|
||||
})
|
||||
}
|
||||
|
||||
@ -102,18 +100,18 @@ func (r *AggregationInstance) Close() {
|
||||
type AggregationRules []*AggregationRule
|
||||
|
||||
type Aggregator struct {
|
||||
Rules AggregationRules
|
||||
Aggregates map[EventFingerprint]*AggregationInstance
|
||||
SummaryReceiver SummaryReceiver
|
||||
Rules AggregationRules
|
||||
Aggregates map[EventFingerprint]*AggregationInstance
|
||||
Notifier Notifier
|
||||
|
||||
// Mutex to protect the above.
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewAggregator(s SummaryReceiver) *Aggregator {
|
||||
func NewAggregator(n Notifier) *Aggregator {
|
||||
return &Aggregator{
|
||||
Aggregates: make(map[EventFingerprint]*AggregationInstance),
|
||||
SummaryReceiver: s,
|
||||
Aggregates: make(map[EventFingerprint]*AggregationInstance),
|
||||
Notifier: n,
|
||||
}
|
||||
}
|
||||
|
||||
@ -153,7 +151,7 @@ func (a *Aggregator) Receive(events Events) error {
|
||||
}
|
||||
|
||||
aggregation.Ingest(e)
|
||||
aggregation.SendNotification(a.SummaryReceiver)
|
||||
aggregation.SendNotification(a.Notifier)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -15,14 +15,20 @@ package manager
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
pb "github.com/prometheus/alert_manager/config/generated"
|
||||
)
|
||||
|
||||
type dummyReceiver struct{}
|
||||
type dummyNotifier struct{}
|
||||
|
||||
func (d *dummyReceiver) Receive(*EventSummary) RemoteError {
|
||||
func (d *dummyNotifier) QueueNotification(*Event, string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *dummyNotifier) SetNotificationConfigs([]pb.NotificationConfig) {}
|
||||
|
||||
func (d *dummyNotifier) Dispatch(IsInhibitedInterrogator) {}
|
||||
|
||||
type testAggregatorScenario struct {
|
||||
rules AggregationRules
|
||||
inMatch Events
|
||||
@ -30,7 +36,7 @@ type testAggregatorScenario struct {
|
||||
}
|
||||
|
||||
func (s *testAggregatorScenario) test(i int, t *testing.T) {
|
||||
a := NewAggregator(&dummyReceiver{})
|
||||
a := NewAggregator(&dummyNotifier{})
|
||||
a.SetRules(s.rules)
|
||||
|
||||
if len(s.inMatch) > 0 {
|
||||
|
@ -1,138 +0,0 @@
|
||||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package manager
|
||||
|
||||
import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
type DestinationDispatcher interface {
|
||||
Send(*EventSummary) error
|
||||
}
|
||||
|
||||
func DispatcherFor(destination string) DestinationDispatcher {
|
||||
switch {
|
||||
case strings.HasPrefix(destination, "IRC"):
|
||||
case strings.HasPrefix(destination, "TRELLO"):
|
||||
case strings.HasPrefix(destination, "MAIL"):
|
||||
case strings.HasPrefix(destination, "PAGERDUTY"):
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type EventSummary struct {
|
||||
Rule *AggregationRule
|
||||
|
||||
Event *Event
|
||||
|
||||
Destination string
|
||||
}
|
||||
|
||||
type SummaryDispatcher struct {
|
||||
summaryReqs chan *summaryDispatchRequest
|
||||
|
||||
closed chan bool
|
||||
}
|
||||
|
||||
type summaryDispatchRequest struct {
|
||||
Summary *EventSummary
|
||||
|
||||
Response chan *summaryDispatchResponse
|
||||
}
|
||||
|
||||
type Disposition int
|
||||
|
||||
const (
|
||||
UNHANDLED Disposition = iota
|
||||
DISPATCHED
|
||||
SUPPRESSED
|
||||
)
|
||||
|
||||
type summaryDispatchResponse struct {
|
||||
Disposition Disposition
|
||||
Err RemoteError
|
||||
}
|
||||
|
||||
func (s *SummaryDispatcher) Close() {
|
||||
close(s.summaryReqs)
|
||||
<-s.closed
|
||||
}
|
||||
|
||||
func NewSummaryDispatcher() *SummaryDispatcher {
|
||||
return &SummaryDispatcher{
|
||||
summaryReqs: make(chan *summaryDispatchRequest),
|
||||
closed: make(chan bool),
|
||||
}
|
||||
}
|
||||
|
||||
type RemoteError interface {
|
||||
error
|
||||
|
||||
Retryable() bool
|
||||
}
|
||||
|
||||
type remoteError struct {
|
||||
error
|
||||
|
||||
retryable bool
|
||||
}
|
||||
|
||||
func (e *remoteError) Retryable() bool {
|
||||
return e.retryable
|
||||
}
|
||||
|
||||
func NewRemoteError(err error, retryable bool) RemoteError {
|
||||
return &remoteError{
|
||||
err,
|
||||
retryable,
|
||||
}
|
||||
}
|
||||
|
||||
type SummaryReceiver interface {
|
||||
Receive(*EventSummary) RemoteError
|
||||
}
|
||||
|
||||
func (d *SummaryDispatcher) Receive(s *EventSummary) RemoteError {
|
||||
req := &summaryDispatchRequest{
|
||||
Summary: s,
|
||||
Response: make(chan *summaryDispatchResponse),
|
||||
}
|
||||
|
||||
d.summaryReqs <- req
|
||||
resp := <-req.Response
|
||||
|
||||
return resp.Err
|
||||
}
|
||||
|
||||
func (d *SummaryDispatcher) dispatchSummary(r *summaryDispatchRequest, i IsInhibitedInterrogator) {
|
||||
if inhibited, _ := i.IsInhibited(r.Summary.Event); inhibited {
|
||||
r.Response <- &summaryDispatchResponse{
|
||||
Disposition: SUPPRESSED,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// BUG: Perform sending of summaries.
|
||||
r.Response <- &summaryDispatchResponse{
|
||||
Disposition: DISPATCHED,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *SummaryDispatcher) Dispatch(i IsInhibitedInterrogator) {
|
||||
for req := range d.summaryReqs {
|
||||
d.dispatchSummary(req, i)
|
||||
}
|
||||
|
||||
d.closed <- true
|
||||
}
|
187
manager/notifier.go
Normal file
187
manager/notifier.go
Normal file
@ -0,0 +1,187 @@
|
||||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package manager
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
pb "github.com/prometheus/alert_manager/config/generated"
|
||||
)
|
||||
|
||||
const contentTypeJson = "application/json"
|
||||
|
||||
var (
|
||||
notificationBufferSize = flag.Int("notificationBufferSize", 1000, "Size of buffer for pending notifications.")
|
||||
pagerdutyApiUrl = flag.String("pagerdutyApiUrl", "https://events.pagerduty.com/generic/2010-04-15/create_event.json", "PagerDuty API URL.")
|
||||
)
|
||||
|
||||
// A Notifier is responsible for sending notifications for events according to
|
||||
// a provided notification configuration.
|
||||
type Notifier interface {
|
||||
// Queue a notification for asynchronous dispatching.
|
||||
QueueNotification(e *Event, configName string) error
|
||||
// Replace current notification configs. Already enqueued messages will remain
|
||||
// unaffected.
|
||||
SetNotificationConfigs([]*pb.NotificationConfig)
|
||||
// Start event notification dispatch loop.
|
||||
Dispatch(IsInhibitedInterrogator)
|
||||
// Stop the event notification dispatch loop.
|
||||
Close()
|
||||
}
|
||||
|
||||
// Request for sending a notification.
|
||||
type notificationReq struct {
|
||||
event *Event
|
||||
notificationConfig *pb.NotificationConfig
|
||||
}
|
||||
|
||||
// Alert notification multiplexer and dispatcher.
|
||||
type notifier struct {
|
||||
// Notifications that are queued to be sent.
|
||||
pendingNotifications chan *notificationReq
|
||||
// Channel for stopping the dispatch loop.
|
||||
stop chan bool
|
||||
|
||||
// Mutex to protect the fields below.
|
||||
mu sync.Mutex
|
||||
// Map of notification configs by name.
|
||||
notificationConfigs map[string]*pb.NotificationConfig
|
||||
}
|
||||
|
||||
// Construct a new notifier.
|
||||
func NewNotifier(configs []*pb.NotificationConfig) *notifier {
|
||||
notifier := ¬ifier{
|
||||
pendingNotifications: make(chan *notificationReq, *notificationBufferSize),
|
||||
stop: make(chan bool),
|
||||
}
|
||||
notifier.SetNotificationConfigs(configs)
|
||||
return notifier
|
||||
}
|
||||
|
||||
func (n *notifier) SetNotificationConfigs(configs []*pb.NotificationConfig) {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
|
||||
n.notificationConfigs = map[string]*pb.NotificationConfig{}
|
||||
for _, c := range configs {
|
||||
n.notificationConfigs[c.GetName()] = c
|
||||
}
|
||||
}
|
||||
|
||||
func (n *notifier) QueueNotification(event *Event, configName string) error {
|
||||
n.mu.Lock()
|
||||
nc, ok := n.notificationConfigs[configName]
|
||||
n.mu.Unlock()
|
||||
|
||||
if !ok {
|
||||
return fmt.Errorf("No such notification configuration %s", configName)
|
||||
}
|
||||
|
||||
// We need to save a reference to the notification config in the
|
||||
// notificationReq since the config might be replaced or gone at the time the
|
||||
// message gets dispatched.
|
||||
n.pendingNotifications <- ¬ificationReq{
|
||||
event: event,
|
||||
notificationConfig: nc,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *notifier) sendPagerDutyNotification(serviceKey string, event *Event) error {
|
||||
// http://developer.pagerduty.com/documentation/integration/events/trigger
|
||||
incidentKey := event.Fingerprint()
|
||||
buf, err := json.Marshal(map[string]interface{}{
|
||||
"service_key": serviceKey,
|
||||
"event_type": "trigger",
|
||||
"description": event.Description,
|
||||
"incident_key": incidentKey,
|
||||
"details": map[string]interface{}{
|
||||
"grouping_labels": event.Labels,
|
||||
"extra_labels": event.Payload,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := http.Post(
|
||||
*pagerdutyApiUrl,
|
||||
contentTypeJson,
|
||||
bytes.NewBuffer(buf),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
respBuf, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("Sent PagerDuty notification: %s: HTTP %d: %s", incidentKey, resp.StatusCode, respBuf)
|
||||
// BUG: Check response for result of operation.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *notifier) sendEmailNotification(email string, event *Event) error {
|
||||
// BUG: Implement email notifications.
|
||||
log.Printf("Would send email notification for event %s to %s\n", event, email)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *notifier) handleNotification(event *Event, config *pb.NotificationConfig, i IsInhibitedInterrogator) {
|
||||
if inhibited, _ := i.IsInhibited(event); !inhibited {
|
||||
return
|
||||
}
|
||||
|
||||
for _, pdConfig := range config.PagerdutyConfig {
|
||||
if err := n.sendPagerDutyNotification(pdConfig.GetServiceKey(), event); err != nil {
|
||||
log.Printf("Error sending PagerDuty notification: %s", err)
|
||||
}
|
||||
}
|
||||
for _, emailConfig := range config.EmailConfig {
|
||||
if err := n.sendEmailNotification(emailConfig.GetEmail(), event); err != nil {
|
||||
log.Printf("Error sending email notification: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (n *notifier) Dispatch(i IsInhibitedInterrogator) {
|
||||
for {
|
||||
select {
|
||||
case req := <-n.pendingNotifications:
|
||||
n.handleNotification(req.event, req.notificationConfig, i)
|
||||
case <-n.stop:
|
||||
// We require that Close() is only called after nobody sends new
|
||||
// notification requests anymore, so we only need to drain existing ones.
|
||||
// BUG: We might want to add state validation for this.
|
||||
close(n.pendingNotifications)
|
||||
for req := range n.pendingNotifications {
|
||||
n.handleNotification(req.event, req.notificationConfig, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (n *notifier) Close() {
|
||||
n.stop <- true
|
||||
}
|
Loading…
Reference in New Issue
Block a user