Merge pull request #2543 from gotjosh/concurrent-access

Clustering: Fix unsynchronised access
This commit is contained in:
Goutham Veeramachaneni 2021-04-22 12:17:24 +02:00 committed by GitHub
commit 8176f78a70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 8 additions and 0 deletions

View File

@ -536,7 +536,10 @@ func (p *Peer) peerUpdate(n *memberlist.Node) {
// AddState adds a new state that will be gossiped. It returns a channel to which // AddState adds a new state that will be gossiped. It returns a channel to which
// broadcast messages for the state can be sent. // broadcast messages for the state can be sent.
func (p *Peer) AddState(key string, s State, reg prometheus.Registerer) ClusterChannel { func (p *Peer) AddState(key string, s State, reg prometheus.Registerer) ClusterChannel {
p.mtx.Lock()
p.states[key] = s p.states[key] = s
p.mtx.Unlock()
send := func(b []byte) { send := func(b []byte) {
p.delegate.bcast.QueueBroadcast(simpleBroadcast(b)) p.delegate.bcast.QueueBroadcast(simpleBroadcast(b))
} }

View File

@ -157,7 +157,10 @@ func (d *delegate) NotifyMsg(b []byte) {
return return
} }
d.mtx.RLock()
s, ok := d.states[p.Key] s, ok := d.states[p.Key]
d.mtx.RUnlock()
if !ok { if !ok {
return return
} }
@ -179,6 +182,8 @@ func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
// LocalState is called when gossip fetches local state. // LocalState is called when gossip fetches local state.
func (d *delegate) LocalState(_ bool) []byte { func (d *delegate) LocalState(_ bool) []byte {
d.mtx.RLock()
defer d.mtx.RUnlock()
all := &clusterpb.FullState{ all := &clusterpb.FullState{
Parts: make([]clusterpb.Part, 0, len(d.states)), Parts: make([]clusterpb.Part, 0, len(d.states)),
} }