diff --git a/api/api.go b/api/api.go index 6b4a8838..cd0bbc1e 100644 --- a/api/api.go +++ b/api/api.go @@ -56,7 +56,7 @@ type Options struct { // alert. Mandatory. StatusFunc func(model.Fingerprint) types.AlertStatus // Peer from the gossip cluster. If nil, no clustering will be used. - Peer *cluster.Peer + Peer cluster.ClusterPeer // Timeout for all HTTP connections. The zero value (and negative // values) result in no timeout. Timeout time.Duration diff --git a/api/v1/api.go b/api/v1/api.go index 1aa60d99..1a02d172 100644 --- a/api/v1/api.go +++ b/api/v1/api.go @@ -72,7 +72,7 @@ type API struct { config *config.Config route *dispatch.Route uptime time.Time - peer *cluster.Peer + peer cluster.ClusterPeer logger log.Logger m *metrics.Alerts @@ -88,7 +88,7 @@ func New( alerts provider.Alerts, silences *silence.Silences, sf getAlertStatusFn, - peer *cluster.Peer, + peer cluster.ClusterPeer, l log.Logger, r prometheus.Registerer, ) *API { @@ -208,7 +208,7 @@ type clusterStatus struct { Peers []peerStatus `json:"peers"` } -func getClusterStatus(p *cluster.Peer) *clusterStatus { +func getClusterStatus(p cluster.ClusterPeer) *clusterStatus { if p == nil { return nil } @@ -216,7 +216,7 @@ func getClusterStatus(p *cluster.Peer) *clusterStatus { for _, n := range p.Peers() { s.Peers = append(s.Peers, peerStatus{ - Name: n.Name, + Name: n.String(), Address: n.Address(), }) } diff --git a/api/v2/api.go b/api/v2/api.go index 3394206e..0cc1aead 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -52,7 +52,7 @@ import ( // API represents an Alertmanager API v2 type API struct { - peer *cluster.Peer + peer cluster.ClusterPeer silences *silence.Silences alerts provider.Alerts alertGroups groupsFn @@ -83,7 +83,7 @@ func NewAPI( gf groupsFn, sf getAlertStatusFn, silences *silence.Silences, - peer *cluster.Peer, + peer cluster.ClusterPeer, l log.Logger, r prometheus.Registerer, ) (*API, error) { @@ -179,8 +179,9 @@ func (api *API) getStatusHandler(params general_ops.GetStatusParams) middleware. peers := []*open_api_models.PeerStatus{} for _, n := range api.peer.Peers() { address := n.Address() + name := n.String() peers = append(peers, &open_api_models.PeerStatus{ - Name: &n.Name, + Name: &name, Address: &address, }) } diff --git a/cluster/cluster.go b/cluster/cluster.go index 848c7bb6..2cb1f1dc 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -33,6 +33,26 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// ClusterPeer helps consumers gather information about the peer(s) in the cluster +type ClusterPeer interface { + // Name returns the unique identifier of this peer in the cluster. + Name() string + // Address returns the IP address of this peer in the cluster. + Address() string + // Status returns a status string representing the peer state. + Status() string + // Peers returns the peer nodes in the cluster. + Peers() []Node +} + +// Node interface that represents node peers in a cluster +type Node interface { + // String returns the name of the node + String() string + // Address returns the IP address of the node + Address() string +} + // Peer is a single peer in a gossip cluster. type Peer struct { mlist *memberlist.Memberlist @@ -518,9 +538,9 @@ func (p *Peer) AddState(key string, s State, reg prometheus.Registerer) *Channel p.delegate.bcast.QueueBroadcast(simpleBroadcast(b)) } peers := func() []*memberlist.Node { - nodes := p.Peers() + nodes := p.mlist.Members() for i, n := range nodes { - if n.Name == p.Self().Name { + if n.String() == p.Self().Name { nodes = append(nodes[:i], nodes[i+1:]...) break } @@ -545,6 +565,10 @@ func (p *Peer) Name() string { return p.mlist.LocalNode().Name } +func (p *Peer) Address() string { + return p.mlist.LocalNode().Addr.String() +} + // ClusterSize returns the current number of alive members in the cluster. func (p *Peer) ClusterSize() int { return p.mlist.NumMembers() @@ -592,13 +616,17 @@ func (p *Peer) Self() *memberlist.Node { } // Peers returns the peers in the cluster. -func (p *Peer) Peers() []*memberlist.Node { - return p.mlist.Members() +func (p *Peer) Peers() []Node { + peers := make([]Node, 0, len(p.mlist.Members())) + for _, member := range p.mlist.Members() { + peers = append(peers, member) + } + return peers } // Position returns the position of the peer in the cluster. func (p *Peer) Position() int { - all := p.Peers() + all := p.mlist.Members() sort.Slice(all, func(i, j int) bool { return all[i].Name < all[j].Name }) diff --git a/notify/notify.go b/notify/notify.go index 0be8b488..57860c2a 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -28,7 +28,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" - "github.com/prometheus/alertmanager/cluster" "github.com/prometheus/alertmanager/inhibit" "github.com/prometheus/alertmanager/nflog" "github.com/prometheus/alertmanager/nflog/nflogpb" @@ -41,6 +40,12 @@ type ResolvedSender interface { SendResolved() bool } +// Peer represents the cluster node from where we are the sending the notification. +type Peer interface { + // WaitReady waits until the node silences and notifications have settled before attempting to send a notification. + WaitReady() +} + // MinTimeout is the minimum timeout that is set for the context of a call // to a notification pipeline. const MinTimeout = 10 * time.Second @@ -290,7 +295,7 @@ func (pb *PipelineBuilder) New( inhibitor *inhibit.Inhibitor, silencer *silence.Silencer, notificationLog NotificationLog, - peer *cluster.Peer, + peer Peer, ) RoutingStage { rs := make(RoutingStage, len(receivers)) @@ -399,11 +404,11 @@ func (fs FanoutStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.A // GossipSettleStage waits until the Gossip has settled to forward alerts. type GossipSettleStage struct { - peer *cluster.Peer + peer Peer } // NewGossipSettleStage returns a new GossipSettleStage. -func NewGossipSettleStage(p *cluster.Peer) *GossipSettleStage { +func NewGossipSettleStage(p Peer) *GossipSettleStage { return &GossipSettleStage{peer: p} }