Cluster: Add `GetStates` to `Peer`

This adds support for retrieving the states from the cluster Peer.

This is useful for moving around the states from replica to replica when deem neccesary - while not stricily useful for the Alertmanager itself at the moment, we have a need to support migrations between different types of Alertmanager and this helps the cause.

Signed-off-by: gotjosh <josue.abreu@gmail.com>
This commit is contained in:
gotjosh 2023-11-22 11:16:37 +00:00
parent aaf9490576
commit 4150efa869
No known key found for this signature in database
GPG Key ID: A6E1DDE38FF3C74E
2 changed files with 80 additions and 0 deletions

View File

@ -40,6 +40,8 @@ type ClusterPeer interface {
Status() string
// Peers returns the peer nodes in the cluster.
Peers() []ClusterMember
// GetStates returns the States associated with the given keys and a slice of keys not found.
GetStates(...string) (map[string]State, []string)
}
// ClusterMember interface that represents node peers in a cluster
@ -545,6 +547,24 @@ func (p *Peer) peerUpdate(n *memberlist.Node) {
level.Debug(p.logger).Log("msg", "peer updated", "peer", pr.Node)
}
// GetStates returns the States associated with the given keys and a slice of keys not found.
func (p *Peer) GetStates(keys ...string) (map[string]State, []string) {
p.mtx.Lock()
defer p.mtx.Unlock()
result := make(map[string]State, len(keys))
notFound := make([]string, 0)
for _, key := range keys {
if v, ok := p.states[key]; ok {
result[key] = v
} else {
notFound = append(notFound, key)
}
}
return result, notFound
}
// AddState adds a new state that will be gossiped. It returns a channel to which
// broadcast messages for the state can be sent.
func (p *Peer) AddState(key string, s State, reg prometheus.Registerer) ClusterChannel {

View File

@ -342,3 +342,63 @@ func testTLSConnection(t *testing.T) {
require.Equal(t, p2.Self().Address(), p1.peers[p2.Self().Address()].Node.Address())
require.Equal(t, p2.Name(), p1.failedPeers[0].Name)
}
func TestPeer_GetStates(t *testing.T) {
r := prometheus.NewRegistry()
p1, err := Create(
logger,
r,
"127.0.0.1:0",
"",
[]string{},
true,
DefaultPushPullInterval,
DefaultGossipInterval,
DefaultTCPTimeout,
DefaultProbeTimeout,
DefaultProbeInterval,
nil,
false,
"",
)
require.NoError(t, err)
p1.AddState("nflog", &fakeState{}, r)
p1.AddState("silences", &fakeState{}, r)
t.Run("all states are found", func(t *testing.T) {
states, missing := p1.GetStates("nflog", "silences")
require.Len(t, states, 2)
require.Equal(t, states, map[string]State{"nflog": &fakeState{}, "silences": &fakeState{}})
require.Len(t, missing, 0)
require.ElementsMatch(t, missing, []string{})
})
t.Run("some states are found", func(t *testing.T) {
states, missing := p1.GetStates("nflog", "notfound")
require.Len(t, states, 1)
require.Equal(t, states, map[string]State{"nflog": &fakeState{}})
require.Len(t, missing, 1)
require.ElementsMatch(t, missing, []string{"notfound"})
})
t.Run("no states are found", func(t *testing.T) {
states, missing := p1.GetStates("notfound", "missing")
require.Len(t, states, 0)
require.Len(t, missing, 2)
require.ElementsMatch(t, missing, []string{"notfound", "missing"})
})
}
type fakeState struct{}
func (f fakeState) MarshalBinary() ([]byte, error) {
return nil, nil
}
func (f fakeState) Merge(b []byte) error {
return nil
}