Clustering: Interface for Peers in other packages

A Peer as defined by the `cluster` package represents the node in the
cluster. It is used in other packages to know the status of all of the
members or how long should we wait to know if a notification has already fired.

In Cortex, we'd like to implement a slightly different way of
clustering (using gRPC for communication and a
hash ring for node discovery).

This is a small change to support that by changing the consumer of other
packages to an interface.

Silences and Notification channels don't need an interface as they take
a `func([]byte) error` as a parameter.

Signed-off-by: gotjosh <josue@grafana.com>
This commit is contained in:
gotjosh 2021-02-19 19:02:06 +00:00
parent a7ca7b1d29
commit 9a2ae39430
No known key found for this signature in database
GPG Key ID: 731A782929412860
5 changed files with 51 additions and 17 deletions

View File

@ -56,7 +56,7 @@ type Options struct {
// alert. Mandatory. // alert. Mandatory.
StatusFunc func(model.Fingerprint) types.AlertStatus StatusFunc func(model.Fingerprint) types.AlertStatus
// Peer from the gossip cluster. If nil, no clustering will be used. // Peer from the gossip cluster. If nil, no clustering will be used.
Peer *cluster.Peer Peer cluster.ClusterPeer
// Timeout for all HTTP connections. The zero value (and negative // Timeout for all HTTP connections. The zero value (and negative
// values) result in no timeout. // values) result in no timeout.
Timeout time.Duration Timeout time.Duration

View File

@ -72,7 +72,7 @@ type API struct {
config *config.Config config *config.Config
route *dispatch.Route route *dispatch.Route
uptime time.Time uptime time.Time
peer *cluster.Peer peer cluster.ClusterPeer
logger log.Logger logger log.Logger
m *metrics.Alerts m *metrics.Alerts
@ -88,7 +88,7 @@ func New(
alerts provider.Alerts, alerts provider.Alerts,
silences *silence.Silences, silences *silence.Silences,
sf getAlertStatusFn, sf getAlertStatusFn,
peer *cluster.Peer, peer cluster.ClusterPeer,
l log.Logger, l log.Logger,
r prometheus.Registerer, r prometheus.Registerer,
) *API { ) *API {
@ -208,7 +208,7 @@ type clusterStatus struct {
Peers []peerStatus `json:"peers"` Peers []peerStatus `json:"peers"`
} }
func getClusterStatus(p *cluster.Peer) *clusterStatus { func getClusterStatus(p cluster.ClusterPeer) *clusterStatus {
if p == nil { if p == nil {
return nil return nil
} }
@ -216,7 +216,7 @@ func getClusterStatus(p *cluster.Peer) *clusterStatus {
for _, n := range p.Peers() { for _, n := range p.Peers() {
s.Peers = append(s.Peers, peerStatus{ s.Peers = append(s.Peers, peerStatus{
Name: n.Name, Name: n.String(),
Address: n.Address(), Address: n.Address(),
}) })
} }

View File

@ -52,7 +52,7 @@ import (
// API represents an Alertmanager API v2 // API represents an Alertmanager API v2
type API struct { type API struct {
peer *cluster.Peer peer cluster.ClusterPeer
silences *silence.Silences silences *silence.Silences
alerts provider.Alerts alerts provider.Alerts
alertGroups groupsFn alertGroups groupsFn
@ -83,7 +83,7 @@ func NewAPI(
gf groupsFn, gf groupsFn,
sf getAlertStatusFn, sf getAlertStatusFn,
silences *silence.Silences, silences *silence.Silences,
peer *cluster.Peer, peer cluster.ClusterPeer,
l log.Logger, l log.Logger,
r prometheus.Registerer, r prometheus.Registerer,
) (*API, error) { ) (*API, error) {
@ -179,8 +179,9 @@ func (api *API) getStatusHandler(params general_ops.GetStatusParams) middleware.
peers := []*open_api_models.PeerStatus{} peers := []*open_api_models.PeerStatus{}
for _, n := range api.peer.Peers() { for _, n := range api.peer.Peers() {
address := n.Address() address := n.Address()
name := n.String()
peers = append(peers, &open_api_models.PeerStatus{ peers = append(peers, &open_api_models.PeerStatus{
Name: &n.Name, Name: &name,
Address: &address, Address: &address,
}) })
} }

View File

@ -33,6 +33,26 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
// ClusterPeer helps consumers gather information about the peer(s) in the cluster
type ClusterPeer interface {
// Name returns the unique identifier of this peer in the cluster.
Name() string
// Address returns the IP address of this peer in the cluster.
Address() string
// Status returns a status string representing the peer state.
Status() string
// Peers returns the peer nodes in the cluster.
Peers() []Node
}
// Node interface that represents node peers in a cluster
type Node interface {
// String returns the name of the node
String() string
// Address returns the IP address of the node
Address() string
}
// Peer is a single peer in a gossip cluster. // Peer is a single peer in a gossip cluster.
type Peer struct { type Peer struct {
mlist *memberlist.Memberlist mlist *memberlist.Memberlist
@ -518,9 +538,9 @@ func (p *Peer) AddState(key string, s State, reg prometheus.Registerer) *Channel
p.delegate.bcast.QueueBroadcast(simpleBroadcast(b)) p.delegate.bcast.QueueBroadcast(simpleBroadcast(b))
} }
peers := func() []*memberlist.Node { peers := func() []*memberlist.Node {
nodes := p.Peers() nodes := p.mlist.Members()
for i, n := range nodes { for i, n := range nodes {
if n.Name == p.Self().Name { if n.String() == p.Self().Name {
nodes = append(nodes[:i], nodes[i+1:]...) nodes = append(nodes[:i], nodes[i+1:]...)
break break
} }
@ -545,6 +565,10 @@ func (p *Peer) Name() string {
return p.mlist.LocalNode().Name return p.mlist.LocalNode().Name
} }
func (p *Peer) Address() string {
return p.mlist.LocalNode().Addr.String()
}
// ClusterSize returns the current number of alive members in the cluster. // ClusterSize returns the current number of alive members in the cluster.
func (p *Peer) ClusterSize() int { func (p *Peer) ClusterSize() int {
return p.mlist.NumMembers() return p.mlist.NumMembers()
@ -592,13 +616,17 @@ func (p *Peer) Self() *memberlist.Node {
} }
// Peers returns the peers in the cluster. // Peers returns the peers in the cluster.
func (p *Peer) Peers() []*memberlist.Node { func (p *Peer) Peers() []Node {
return p.mlist.Members() peers := make([]Node, 0, len(p.mlist.Members()))
for _, member := range p.mlist.Members() {
peers = append(peers, member)
}
return peers
} }
// Position returns the position of the peer in the cluster. // Position returns the position of the peer in the cluster.
func (p *Peer) Position() int { func (p *Peer) Position() int {
all := p.Peers() all := p.mlist.Members()
sort.Slice(all, func(i, j int) bool { sort.Slice(all, func(i, j int) bool {
return all[i].Name < all[j].Name return all[i].Name < all[j].Name
}) })

View File

@ -28,7 +28,6 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/alertmanager/cluster"
"github.com/prometheus/alertmanager/inhibit" "github.com/prometheus/alertmanager/inhibit"
"github.com/prometheus/alertmanager/nflog" "github.com/prometheus/alertmanager/nflog"
"github.com/prometheus/alertmanager/nflog/nflogpb" "github.com/prometheus/alertmanager/nflog/nflogpb"
@ -41,6 +40,12 @@ type ResolvedSender interface {
SendResolved() bool SendResolved() bool
} }
// Peer represents the cluster node from where we are the sending the notification.
type Peer interface {
// WaitReady waits until the node silences and notifications have settled before attempting to send a notification.
WaitReady()
}
// MinTimeout is the minimum timeout that is set for the context of a call // MinTimeout is the minimum timeout that is set for the context of a call
// to a notification pipeline. // to a notification pipeline.
const MinTimeout = 10 * time.Second const MinTimeout = 10 * time.Second
@ -290,7 +295,7 @@ func (pb *PipelineBuilder) New(
inhibitor *inhibit.Inhibitor, inhibitor *inhibit.Inhibitor,
silencer *silence.Silencer, silencer *silence.Silencer,
notificationLog NotificationLog, notificationLog NotificationLog,
peer *cluster.Peer, peer Peer,
) RoutingStage { ) RoutingStage {
rs := make(RoutingStage, len(receivers)) rs := make(RoutingStage, len(receivers))
@ -399,11 +404,11 @@ func (fs FanoutStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.A
// GossipSettleStage waits until the Gossip has settled to forward alerts. // GossipSettleStage waits until the Gossip has settled to forward alerts.
type GossipSettleStage struct { type GossipSettleStage struct {
peer *cluster.Peer peer Peer
} }
// NewGossipSettleStage returns a new GossipSettleStage. // NewGossipSettleStage returns a new GossipSettleStage.
func NewGossipSettleStage(p *cluster.Peer) *GossipSettleStage { func NewGossipSettleStage(p Peer) *GossipSettleStage {
return &GossipSettleStage{peer: p} return &GossipSettleStage{peer: p}
} }