From 4150efa8697914390c3bb2a12ffe55e020d4ec98 Mon Sep 17 00:00:00 2001 From: gotjosh Date: Wed, 22 Nov 2023 11:16:37 +0000 Subject: [PATCH] Cluster: Add `GetStates` to `Peer` This adds support for retrieving the states from the cluster Peer. This is useful for moving around the states from replica to replica when deem neccesary - while not stricily useful for the Alertmanager itself at the moment, we have a need to support migrations between different types of Alertmanager and this helps the cause. Signed-off-by: gotjosh --- cluster/cluster.go | 20 ++++++++++++++ cluster/cluster_test.go | 60 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/cluster/cluster.go b/cluster/cluster.go index 2c7ee945..2b8dc8b4 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -40,6 +40,8 @@ type ClusterPeer interface { Status() string // Peers returns the peer nodes in the cluster. Peers() []ClusterMember + // GetStates returns the States associated with the given keys and a slice of keys not found. + GetStates(...string) (map[string]State, []string) } // ClusterMember interface that represents node peers in a cluster @@ -545,6 +547,24 @@ func (p *Peer) peerUpdate(n *memberlist.Node) { level.Debug(p.logger).Log("msg", "peer updated", "peer", pr.Node) } +// GetStates returns the States associated with the given keys and a slice of keys not found. +func (p *Peer) GetStates(keys ...string) (map[string]State, []string) { + p.mtx.Lock() + defer p.mtx.Unlock() + + result := make(map[string]State, len(keys)) + notFound := make([]string, 0) + for _, key := range keys { + if v, ok := p.states[key]; ok { + result[key] = v + } else { + notFound = append(notFound, key) + } + } + + return result, notFound +} + // AddState adds a new state that will be gossiped. It returns a channel to which // broadcast messages for the state can be sent. func (p *Peer) AddState(key string, s State, reg prometheus.Registerer) ClusterChannel { diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 51592584..dc900ca5 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -342,3 +342,63 @@ func testTLSConnection(t *testing.T) { require.Equal(t, p2.Self().Address(), p1.peers[p2.Self().Address()].Node.Address()) require.Equal(t, p2.Name(), p1.failedPeers[0].Name) } + +func TestPeer_GetStates(t *testing.T) { + r := prometheus.NewRegistry() + p1, err := Create( + logger, + r, + "127.0.0.1:0", + "", + []string{}, + true, + DefaultPushPullInterval, + DefaultGossipInterval, + DefaultTCPTimeout, + DefaultProbeTimeout, + DefaultProbeInterval, + nil, + false, + "", + ) + require.NoError(t, err) + + p1.AddState("nflog", &fakeState{}, r) + p1.AddState("silences", &fakeState{}, r) + + t.Run("all states are found", func(t *testing.T) { + states, missing := p1.GetStates("nflog", "silences") + + require.Len(t, states, 2) + require.Equal(t, states, map[string]State{"nflog": &fakeState{}, "silences": &fakeState{}}) + require.Len(t, missing, 0) + require.ElementsMatch(t, missing, []string{}) + }) + + t.Run("some states are found", func(t *testing.T) { + states, missing := p1.GetStates("nflog", "notfound") + + require.Len(t, states, 1) + require.Equal(t, states, map[string]State{"nflog": &fakeState{}}) + require.Len(t, missing, 1) + require.ElementsMatch(t, missing, []string{"notfound"}) + }) + + t.Run("no states are found", func(t *testing.T) { + states, missing := p1.GetStates("notfound", "missing") + + require.Len(t, states, 0) + require.Len(t, missing, 2) + require.ElementsMatch(t, missing, []string{"notfound", "missing"}) + }) +} + +type fakeState struct{} + +func (f fakeState) MarshalBinary() ([]byte, error) { + return nil, nil +} + +func (f fakeState) Merge(b []byte) error { + return nil +}