Clustering: Fix unsynchronised access
Signed-off-by: gotjosh <josue@grafana.com>
This commit is contained in:
parent
54431be888
commit
d0406d6295
|
@ -536,7 +536,11 @@ func (p *Peer) peerUpdate(n *memberlist.Node) {
|
||||||
// AddState adds a new state that will be gossiped. It returns a channel to which
|
// AddState adds a new state that will be gossiped. It returns a channel to which
|
||||||
// broadcast messages for the state can be sent.
|
// broadcast messages for the state can be sent.
|
||||||
func (p *Peer) AddState(key string, s State, reg prometheus.Registerer) ClusterChannel {
|
func (p *Peer) AddState(key string, s State, reg prometheus.Registerer) ClusterChannel {
|
||||||
|
|
||||||
|
p.mtx.Lock()
|
||||||
p.states[key] = s
|
p.states[key] = s
|
||||||
|
p.mtx.Unlock()
|
||||||
|
|
||||||
send := func(b []byte) {
|
send := func(b []byte) {
|
||||||
p.delegate.bcast.QueueBroadcast(simpleBroadcast(b))
|
p.delegate.bcast.QueueBroadcast(simpleBroadcast(b))
|
||||||
}
|
}
|
||||||
|
|
|
@ -157,7 +157,10 @@ func (d *delegate) NotifyMsg(b []byte) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
d.mtx.RLock()
|
||||||
s, ok := d.states[p.Key]
|
s, ok := d.states[p.Key]
|
||||||
|
d.mtx.RUnlock()
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -179,6 +182,8 @@ func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
|
||||||
|
|
||||||
// LocalState is called when gossip fetches local state.
|
// LocalState is called when gossip fetches local state.
|
||||||
func (d *delegate) LocalState(_ bool) []byte {
|
func (d *delegate) LocalState(_ bool) []byte {
|
||||||
|
d.mtx.RLock()
|
||||||
|
defer d.mtx.RUnlock()
|
||||||
all := &clusterpb.FullState{
|
all := &clusterpb.FullState{
|
||||||
Parts: make([]clusterpb.Part, 0, len(d.states)),
|
Parts: make([]clusterpb.Part, 0, len(d.states)),
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue