From d0406d629536e116a314bbcf072edf386e97cc81 Mon Sep 17 00:00:00 2001 From: gotjosh Date: Tue, 13 Apr 2021 13:51:07 +0100 Subject: [PATCH] Clustering: Fix unsynchronised access Signed-off-by: gotjosh --- cluster/cluster.go | 4 ++++ cluster/delegate.go | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/cluster/cluster.go b/cluster/cluster.go index a12beb95..b884ddb0 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -536,7 +536,11 @@ func (p *Peer) peerUpdate(n *memberlist.Node) { // AddState adds a new state that will be gossiped. It returns a channel to which // broadcast messages for the state can be sent. func (p *Peer) AddState(key string, s State, reg prometheus.Registerer) ClusterChannel { + + p.mtx.Lock() p.states[key] = s + p.mtx.Unlock() + send := func(b []byte) { p.delegate.bcast.QueueBroadcast(simpleBroadcast(b)) } diff --git a/cluster/delegate.go b/cluster/delegate.go index bcacf83e..394edb0f 100644 --- a/cluster/delegate.go +++ b/cluster/delegate.go @@ -157,7 +157,10 @@ func (d *delegate) NotifyMsg(b []byte) { return } + d.mtx.RLock() s, ok := d.states[p.Key] + d.mtx.RUnlock() + if !ok { return } @@ -179,6 +182,8 @@ func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { // LocalState is called when gossip fetches local state. func (d *delegate) LocalState(_ bool) []byte { + d.mtx.RLock() + defer d.mtx.RUnlock() all := &clusterpb.FullState{ Parts: make([]clusterpb.Part, 0, len(d.states)), }