diff --git a/cluster/cluster.go b/cluster/cluster.go index a12beb95..c282e035 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -536,7 +536,10 @@ func (p *Peer) peerUpdate(n *memberlist.Node) { // AddState adds a new state that will be gossiped. It returns a channel to which // broadcast messages for the state can be sent. func (p *Peer) AddState(key string, s State, reg prometheus.Registerer) ClusterChannel { + p.mtx.Lock() p.states[key] = s + p.mtx.Unlock() + send := func(b []byte) { p.delegate.bcast.QueueBroadcast(simpleBroadcast(b)) } diff --git a/cluster/delegate.go b/cluster/delegate.go index bcacf83e..394edb0f 100644 --- a/cluster/delegate.go +++ b/cluster/delegate.go @@ -157,7 +157,10 @@ func (d *delegate) NotifyMsg(b []byte) { return } + d.mtx.RLock() s, ok := d.states[p.Key] + d.mtx.RUnlock() + if !ok { return } @@ -179,6 +182,8 @@ func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { // LocalState is called when gossip fetches local state. func (d *delegate) LocalState(_ bool) []byte { + d.mtx.RLock() + defer d.mtx.RUnlock() all := &clusterpb.FullState{ Parts: make([]clusterpb.Part, 0, len(d.states)), }