nflog: Copy and replace gossipData instead of modifying it in place (#1121)
This commit is contained in:
parent
b62cddd807
commit
bfdff67138
|
@ -491,7 +491,8 @@ func (l *nlog) OnGossip(msg []byte) (mesh.GossipData, error) {
|
||||||
l.mtx.Lock()
|
l.mtx.Lock()
|
||||||
defer l.mtx.Unlock()
|
defer l.mtx.Unlock()
|
||||||
|
|
||||||
if delta := l.st.mergeDelta(gd); len(delta) > 0 {
|
var delta gossipData
|
||||||
|
if l.st, delta = l.st.mergeDelta(gd); len(delta) > 0 {
|
||||||
return delta, nil
|
return delta, nil
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
@ -506,7 +507,10 @@ func (l *nlog) OnGossipBroadcast(src mesh.PeerName, msg []byte) (mesh.GossipData
|
||||||
l.mtx.Lock()
|
l.mtx.Lock()
|
||||||
defer l.mtx.Unlock()
|
defer l.mtx.Unlock()
|
||||||
|
|
||||||
return l.st.mergeDelta(gd), nil
|
var delta mesh.GossipData
|
||||||
|
l.st, delta = l.st.mergeDelta(gd)
|
||||||
|
|
||||||
|
return delta, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnGossipUnicast implements the mesh.Gossiper interface.
|
// OnGossipUnicast implements the mesh.Gossiper interface.
|
||||||
|
@ -577,36 +581,38 @@ func (gd gossipData) clone() gossipData {
|
||||||
// TODO(fabxc): can we just return the receiver. Does it have to remain
|
// TODO(fabxc): can we just return the receiver. Does it have to remain
|
||||||
// unmodified. Needs to be clarified upstream.
|
// unmodified. Needs to be clarified upstream.
|
||||||
func (gd gossipData) Merge(other mesh.GossipData) mesh.GossipData {
|
func (gd gossipData) Merge(other mesh.GossipData) mesh.GossipData {
|
||||||
|
merged := gd.clone()
|
||||||
for k, e := range other.(gossipData) {
|
for k, e := range other.(gossipData) {
|
||||||
prev, ok := gd[k]
|
prev, ok := merged[k]
|
||||||
if !ok {
|
if !ok {
|
||||||
gd[k] = e
|
merged[k] = e
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if prev.Entry.Timestamp.Before(e.Entry.Timestamp) {
|
if prev.Entry.Timestamp.Before(e.Entry.Timestamp) {
|
||||||
gd[k] = e
|
merged[k] = e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return gd
|
return merged
|
||||||
}
|
}
|
||||||
|
|
||||||
// mergeDelta behaves like Merge but returns a gossipData only containing
|
// mergeDelta behaves like Merge but in addition returns a gossipData only
|
||||||
// things that have changed.
|
// containing things that have changed.
|
||||||
func (gd gossipData) mergeDelta(od gossipData) gossipData {
|
func (gd gossipData) mergeDelta(od gossipData) (merged gossipData, delta gossipData) {
|
||||||
delta := gossipData{}
|
merged = gd.clone()
|
||||||
|
delta = gossipData{}
|
||||||
for k, e := range od {
|
for k, e := range od {
|
||||||
prev, ok := gd[k]
|
prev, ok := merged[k]
|
||||||
if !ok {
|
if !ok {
|
||||||
gd[k] = e
|
merged[k] = e
|
||||||
delta[k] = e
|
delta[k] = e
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if prev.Entry.Timestamp.Before(e.Entry.Timestamp) {
|
if prev.Entry.Timestamp.Before(e.Entry.Timestamp) {
|
||||||
gd[k] = e
|
merged[k] = e
|
||||||
delta[k] = e
|
delta[k] = e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return delta
|
return merged, delta
|
||||||
}
|
}
|
||||||
|
|
||||||
// replaceFile wraps a file that is moved to another filename on closing.
|
// replaceFile wraps a file that is moved to another filename on closing.
|
||||||
|
|
|
@ -195,16 +195,17 @@ func TestGossipDataMerge(t *testing.T) {
|
||||||
res := ca.Merge(cb)
|
res := ca.Merge(cb)
|
||||||
|
|
||||||
require.Equal(t, c.final, res, "Merge result should match expectation")
|
require.Equal(t, c.final, res, "Merge result should match expectation")
|
||||||
require.Equal(t, c.final, ca, "Merge should apply changes to original state")
|
|
||||||
require.Equal(t, c.b, cb, "Merged state should remain unmodified")
|
require.Equal(t, c.b, cb, "Merged state should remain unmodified")
|
||||||
|
require.NotEqual(t, c.final, ca, "Merge should not change original state")
|
||||||
|
|
||||||
ca, cb = c.a.clone(), c.b.clone()
|
ca, cb = c.a.clone(), c.b.clone()
|
||||||
|
|
||||||
delta := ca.mergeDelta(cb)
|
res, delta := ca.mergeDelta(cb)
|
||||||
|
|
||||||
require.Equal(t, c.delta, delta, "Merge delta should match expectation")
|
require.Equal(t, c.delta, delta, "Merge delta should match expectation")
|
||||||
require.Equal(t, c.final, ca, "Merge should apply changes to original state")
|
require.Equal(t, c.final, res, "Merge should apply changes to original state")
|
||||||
require.Equal(t, c.b, cb, "Merged state should remain unmodified")
|
require.Equal(t, c.b, cb, "Merged state should remain unmodified")
|
||||||
|
require.NotEqual(t, res, ca, "Merge should not change original state")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue