Add a mutex to silences.go:gossipData (#984)

This should fix silence/silence.go #982
This commit is contained in:
Corentin Chary 2017-09-13 11:18:01 +02:00 committed by stuart nelson
parent 1c53b344a9
commit 869a038a2b
2 changed files with 231 additions and 167 deletions

View File

@ -103,7 +103,7 @@ type Silences struct {
// range and affected labels.
// Mutex also guards the matcherCache, which always need write lock access.
mtx sync.Mutex
st gossipData
st *gossipData
mc matcherCache
}
@ -199,7 +199,7 @@ func New(o Options) (*Silences, error) {
retention: o.Retention,
now: utcNow,
gossip: nopGossip{},
st: gossipData{},
st: newGossipData(),
}
if o.Logger != nil {
s.logger = o.Logger
@ -281,12 +281,12 @@ func (s *Silences) GC() (int, error) {
s.mtx.Lock()
defer s.mtx.Unlock()
for id, sil := range s.st {
for id, sil := range s.st.data {
if sil.ExpiresAt.IsZero() {
return n, errors.New("unexpected zero expiration timestamp")
}
if !sil.ExpiresAt.After(now) {
delete(s.st, id)
delete(s.st.data, id)
delete(s.mc, sil.Silence)
n++
}
@ -347,7 +347,7 @@ func cloneSilence(sil *pb.Silence) *pb.Silence {
}
func (s *Silences) getSilence(id string) (*pb.Silence, bool) {
msil, ok := s.st[id]
msil, ok := s.st.data[id]
if !ok {
return nil, false
}
@ -365,7 +365,9 @@ func (s *Silences) setSilence(sil *pb.Silence) error {
Silence: sil,
ExpiresAt: sil.EndsAt.Add(s.retention),
}
st := gossipData{sil.Id: msil}
st := &gossipData{
data: silenceMap{sil.Id: msil},
}
s.st.Merge(st)
s.gossip.GossipBroadcast(st)
@ -593,12 +595,12 @@ func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, error) {
if q.ids != nil {
for _, id := range q.ids {
if s, ok := s.st[string(id)]; ok {
if s, ok := s.st.data[string(id)]; ok {
res = append(res, s.Silence)
}
}
} else {
for _, sil := range s.st {
for _, sil := range s.st.data {
res = append(res, sil.Silence)
}
}
@ -627,7 +629,7 @@ func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, error) {
// loadSnapshot loads a snapshot generated by Snapshot() into the state.
// Any previous state is wiped.
func (s *Silences) loadSnapshot(r io.Reader) error {
st := gossipData{}
st := newGossipData()
s.mtx.Lock()
defer s.mtx.Unlock()
@ -647,14 +649,14 @@ func (s *Silences) loadSnapshot(r io.Reader) error {
sil.Silence.Comments = nil
}
st[sil.Silence.Id] = &sil
st.data[sil.Silence.Id] = &sil
_, err := s.mc.Get(sil.Silence)
if err != nil {
return err
}
}
s.st = st
s.st.data = st.data
return nil
}
@ -669,7 +671,7 @@ func (s *Silences) Snapshot(w io.Writer) (int, error) {
defer s.mtx.Unlock()
var n int
for _, s := range s.st {
for _, s := range s.st.data {
m, err := pbutil.WriteDelimited(w, s)
if err != nil {
return n + m, err
@ -700,7 +702,7 @@ func (g gossiper) OnGossip(msg []byte) (mesh.GossipData, error) {
g.mtx.Lock()
defer g.mtx.Unlock()
if delta := g.st.mergeDelta(gd); len(delta) > 0 {
if delta := g.st.mergeDelta(gd); len(delta.data) > 0 {
return delta, nil
}
return nil, nil
@ -724,10 +726,23 @@ func (g gossiper) OnGossipUnicast(src mesh.PeerName, msg []byte) error {
panic("not implemented")
}
type gossipData map[string]*pb.MeshSilence
type silenceMap map[string]*pb.MeshSilence
func decodeGossipData(msg []byte) (gossipData, error) {
gd := gossipData{}
type gossipData struct {
data silenceMap
mtx sync.RWMutex
}
var _ mesh.GossipData = &gossipData{}
func newGossipData() *gossipData {
return &gossipData{
data: silenceMap{},
}
}
func decodeGossipData(msg []byte) (*gossipData, error) {
gd := newGossipData()
rd := bytes.NewReader(msg)
for {
@ -738,13 +753,13 @@ func decodeGossipData(msg []byte) (gossipData, error) {
}
return gd, err
}
gd[s.Silence.Id] = &s
gd.data[s.Silence.Id] = &s
}
return gd, nil
}
// Encode implements the mesh.GossipData interface.
func (gd gossipData) Encode() [][]byte {
func (gd *gossipData) Encode() [][]byte {
// Split into sub-messages of ~1MB.
const maxSize = 1024 * 1024
@ -753,7 +768,11 @@ func (gd gossipData) Encode() [][]byte {
res [][]byte
n int
)
for _, s := range gd {
gd.mtx.RLock()
defer gd.mtx.RUnlock()
for _, s := range gd.data {
m, err := pbutil.WriteDelimited(&buf, s)
n += m
if err != nil {
@ -771,17 +790,27 @@ func (gd gossipData) Encode() [][]byte {
return res
}
func (gd gossipData) clone() gossipData {
res := make(gossipData, len(gd))
for id, s := range gd {
res[id] = s
func (gd *gossipData) clone() *gossipData {
gd.mtx.RLock()
defer gd.mtx.RUnlock()
data := make(silenceMap, len(gd.data))
for id, s := range gd.data {
data[id] = s
}
return res
return &gossipData{data: data}
}
// Merge the silence set with gossip data and return a new silence state.
func (gd gossipData) Merge(other mesh.GossipData) mesh.GossipData {
for id, s := range other.(gossipData) {
func (gd *gossipData) Merge(other mesh.GossipData) mesh.GossipData {
ot := other.(*gossipData)
ot.mtx.RLock()
defer ot.mtx.RUnlock()
gd.mtx.Lock()
defer gd.mtx.Unlock()
for id, s := range ot.data {
// Comments list was moved to a single comment. Apply upgrade
// on silences received from peers.
if len(s.Silence.Comments) > 0 {
@ -790,13 +819,13 @@ func (gd gossipData) Merge(other mesh.GossipData) mesh.GossipData {
s.Silence.Comments = nil
}
prev, ok := gd[id]
prev, ok := gd.data[id]
if !ok {
gd[id] = s
gd.data[id] = s
continue
}
if prev.Silence.UpdatedAt.Before(s.Silence.UpdatedAt) {
gd[id] = s
gd.data[id] = s
}
}
return gd
@ -804,9 +833,16 @@ func (gd gossipData) Merge(other mesh.GossipData) mesh.GossipData {
// mergeDelta behaves like Merge but returns a gossipData only
// containing things that have changed.
func (gd gossipData) mergeDelta(od gossipData) gossipData {
delta := gossipData{}
for id, s := range od {
func (gd *gossipData) mergeDelta(od *gossipData) *gossipData {
delta := newGossipData()
od.mtx.RLock()
defer od.mtx.RUnlock()
gd.mtx.Lock()
defer gd.mtx.Unlock()
for id, s := range od.data {
// Comments list was moved to a single comment. Apply upgrade
// on silences received from peers.
if len(s.Silence.Comments) > 0 {
@ -815,15 +851,15 @@ func (gd gossipData) mergeDelta(od gossipData) gossipData {
s.Silence.Comments = nil
}
prev, ok := gd[id]
prev, ok := gd.data[id]
if !ok {
gd[id] = s
delta[id] = s
gd.data[id] = s
delta.data[id] = s
continue
}
if prev.Silence.UpdatedAt.Before(s.Silence.UpdatedAt) {
gd[id] = s
delta[id] = s
gd.data[id] = s
delta.data[id] = s
}
}
return delta

View File

@ -81,13 +81,17 @@ func TestSilencesGC(t *testing.T) {
newSilence := func(exp time.Time) *pb.MeshSilence {
return &pb.MeshSilence{ExpiresAt: exp}
}
s.st = gossipData{
"1": newSilence(now),
"2": newSilence(now.Add(-time.Second)),
"3": newSilence(now.Add(time.Second)),
s.st = &gossipData{
data: silenceMap{
"1": newSilence(now),
"2": newSilence(now.Add(-time.Second)),
"3": newSilence(now.Add(time.Second)),
},
}
want := gossipData{
"3": newSilence(now.Add(time.Second)),
want := &gossipData{
data: silenceMap{
"3": newSilence(now.Add(time.Second)),
},
}
n, err := s.GC()
@ -138,10 +142,10 @@ func TestSilencesSnapshot(t *testing.T) {
f, err := ioutil.TempFile("", "snapshot")
require.NoError(t, err, "creating temp file failed")
s1 := &Silences{st: gossipData{}, metrics: newMetrics(nil)}
s1 := &Silences{st: newGossipData(), metrics: newMetrics(nil)}
// Setup internal state manually.
for _, e := range c.entries {
s1.st[e.Silence.Id] = e
s1.st.data[e.Silence.Id] = e
}
_, err = s1.Snapshot(f)
require.NoError(t, err, "creating snapshot failed")
@ -152,10 +156,10 @@ func TestSilencesSnapshot(t *testing.T) {
require.NoError(t, err, "opening snapshot file failed")
// Check again against new nlog instance.
s2 := &Silences{mc: matcherCache{}}
s2 := &Silences{mc: matcherCache{}, st: newGossipData()}
err = s2.loadSnapshot(f)
require.NoError(t, err, "error loading snapshot")
require.Equal(t, s1.st, s2.st, "state after loading snapshot did not match snapshotted state")
require.Equal(t, s1.st.data, s2.st.data, "state after loading snapshot did not match snapshotted state")
require.NoError(t, f.Close(), "closing snapshot file failed")
}
@ -184,17 +188,19 @@ func TestSilencesSetSilence(t *testing.T) {
EndsAt: nowpb,
}
want := gossipData{
"some_id": &pb.MeshSilence{
Silence: sil,
ExpiresAt: now.Add(time.Minute),
want := &gossipData{
data: silenceMap{
"some_id": &pb.MeshSilence{
Silence: sil,
ExpiresAt: now.Add(time.Minute),
},
},
}
var called bool
s.gossip = &mockGossip{
broadcast: func(d mesh.GossipData) {
data, ok := d.(gossipData)
data, ok := d.(*gossipData)
require.True(t, ok, "gossip data of unknown type")
require.Equal(t, want, data, "unexpected gossip broadcast data")
@ -203,7 +209,7 @@ func TestSilencesSetSilence(t *testing.T) {
}
require.NoError(t, s.setSilence(sil))
require.True(t, called, "GossipBroadcast was not called")
require.Equal(t, want, s.st, "Unexpected silence state")
require.Equal(t, want.data, s.st.data, "Unexpected silence state")
}
func TestSilenceSet(t *testing.T) {
@ -226,19 +232,21 @@ func TestSilenceSet(t *testing.T) {
require.NoError(t, err)
require.NotEqual(t, id1, "")
want := gossipData{
id1: &pb.MeshSilence{
Silence: &pb.Silence{
Id: id1,
Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}},
StartsAt: now1.Add(2 * time.Minute),
EndsAt: now1.Add(5 * time.Minute),
UpdatedAt: now1,
want := &gossipData{
data: silenceMap{
id1: &pb.MeshSilence{
Silence: &pb.Silence{
Id: id1,
Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}},
StartsAt: now1.Add(2 * time.Minute),
EndsAt: now1.Add(5 * time.Minute),
UpdatedAt: now1,
},
ExpiresAt: now1.Add(5*time.Minute + s.retention),
},
ExpiresAt: now1.Add(5*time.Minute + s.retention),
},
}
require.Equal(t, want, s.st, "unexpected state after silence creation")
require.Equal(t, want.data, s.st.data, "unexpected state after silence creation")
// Insert silence with unset start time. Must be set to now.
now = now.Add(time.Minute)
@ -252,20 +260,22 @@ func TestSilenceSet(t *testing.T) {
require.NoError(t, err)
require.NotEqual(t, id2, "")
want = gossipData{
id1: want[id1],
id2: &pb.MeshSilence{
Silence: &pb.Silence{
Id: id2,
Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}},
StartsAt: now2,
EndsAt: now2.Add(1 * time.Minute),
UpdatedAt: now2,
want = &gossipData{
data: silenceMap{
id1: want.data[id1],
id2: &pb.MeshSilence{
Silence: &pb.Silence{
Id: id2,
Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}},
StartsAt: now2,
EndsAt: now2.Add(1 * time.Minute),
UpdatedAt: now2,
},
ExpiresAt: now2.Add(1*time.Minute + s.retention),
},
ExpiresAt: now2.Add(1*time.Minute + s.retention),
},
}
require.Equal(t, want, s.st, "unexpected state after silence creation")
require.Equal(t, want.data, s.st.data, "unexpected state after silence creation")
// Overwrite silence 2 with new end time.
now = now.Add(time.Minute)
@ -278,20 +288,22 @@ func TestSilenceSet(t *testing.T) {
require.NoError(t, err)
require.Equal(t, id2, id3)
want = gossipData{
id1: want[id1],
id2: &pb.MeshSilence{
Silence: &pb.Silence{
Id: id2,
Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}},
StartsAt: now2,
EndsAt: now3.Add(100 * time.Minute),
UpdatedAt: now3,
want = &gossipData{
data: silenceMap{
id1: want.data[id1],
id2: &pb.MeshSilence{
Silence: &pb.Silence{
Id: id2,
Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}},
StartsAt: now2,
EndsAt: now3.Add(100 * time.Minute),
UpdatedAt: now3,
},
ExpiresAt: now3.Add(100*time.Minute + s.retention),
},
ExpiresAt: now3.Add(100*time.Minute + s.retention),
},
}
require.Equal(t, want, s.st, "unexpected state after silence creation")
require.Equal(t, want.data, s.st.data, "unexpected state after silence creation")
// Update silence 2 with new matcher expires it and creates a new one.
now = now.Add(time.Minute)
@ -304,30 +316,32 @@ func TestSilenceSet(t *testing.T) {
require.NoError(t, err)
require.NotEqual(t, id2, id4)
want = gossipData{
id1: want[id1],
id2: &pb.MeshSilence{
Silence: &pb.Silence{
Id: id2,
Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}},
StartsAt: now2,
EndsAt: now4,
UpdatedAt: now4,
want = &gossipData{
data: silenceMap{
id1: want.data[id1],
id2: &pb.MeshSilence{
Silence: &pb.Silence{
Id: id2,
Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}},
StartsAt: now2,
EndsAt: now4,
UpdatedAt: now4,
},
ExpiresAt: now4.Add(s.retention),
},
ExpiresAt: now4.Add(s.retention),
},
id4: &pb.MeshSilence{
Silence: &pb.Silence{
Id: id4,
Matchers: []*pb.Matcher{{Name: "a", Pattern: "c"}},
StartsAt: now4,
EndsAt: now3.Add(100 * time.Minute),
UpdatedAt: now4,
id4: &pb.MeshSilence{
Silence: &pb.Silence{
Id: id4,
Matchers: []*pb.Matcher{{Name: "a", Pattern: "c"}},
StartsAt: now4,
EndsAt: now3.Add(100 * time.Minute),
UpdatedAt: now4,
},
ExpiresAt: now3.Add(100*time.Minute + s.retention),
},
ExpiresAt: now3.Add(100*time.Minute + s.retention),
},
}
require.Equal(t, want, s.st, "unexpected state after silence creation")
require.Equal(t, want.data, s.st.data, "unexpected state after silence creation")
// Re-create expired silence.
now = now.Add(time.Minute)
@ -341,22 +355,24 @@ func TestSilenceSet(t *testing.T) {
require.NoError(t, err)
require.NotEqual(t, id2, id4)
want = gossipData{
id1: want[id1],
id2: want[id2],
id4: want[id4],
id5: &pb.MeshSilence{
Silence: &pb.Silence{
Id: id5,
Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}},
StartsAt: now5,
EndsAt: now5.Add(5 * time.Minute),
UpdatedAt: now5,
want = &gossipData{
data: silenceMap{
id1: want.data[id1],
id2: want.data[id2],
id4: want.data[id4],
id5: &pb.MeshSilence{
Silence: &pb.Silence{
Id: id5,
Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}},
StartsAt: now5,
EndsAt: now5.Add(5 * time.Minute),
UpdatedAt: now5,
},
ExpiresAt: now5.Add(5*time.Minute + s.retention),
},
ExpiresAt: now5.Add(5*time.Minute + s.retention),
},
}
require.Equal(t, want, s.st, "unexpected state after silence creation")
require.Equal(t, want.data, s.st.data, "unexpected state after silence creation")
}
func TestSilencesSetFail(t *testing.T) {
@ -492,7 +508,7 @@ func TestQMatches(t *testing.T) {
},
}
for _, c := range cases {
drop, err := f(c.sil, &Silences{mc: matcherCache{}}, time.Time{})
drop, err := f(c.sil, &Silences{mc: matcherCache{}, st: newGossipData()}, time.Time{})
require.NoError(t, err)
require.Equal(t, c.drop, drop, "unexpected filter result")
}
@ -502,12 +518,14 @@ func TestSilencesQuery(t *testing.T) {
s, err := New(Options{})
require.NoError(t, err)
s.st = gossipData{
"1": &pb.MeshSilence{Silence: &pb.Silence{Id: "1"}},
"2": &pb.MeshSilence{Silence: &pb.Silence{Id: "2"}},
"3": &pb.MeshSilence{Silence: &pb.Silence{Id: "3"}},
"4": &pb.MeshSilence{Silence: &pb.Silence{Id: "4"}},
"5": &pb.MeshSilence{Silence: &pb.Silence{Id: "5"}},
s.st = &gossipData{
data: silenceMap{
"1": &pb.MeshSilence{Silence: &pb.Silence{Id: "1"}},
"2": &pb.MeshSilence{Silence: &pb.Silence{Id: "2"}},
"3": &pb.MeshSilence{Silence: &pb.Silence{Id: "3"}},
"4": &pb.MeshSilence{Silence: &pb.Silence{Id: "4"}},
"5": &pb.MeshSilence{Silence: &pb.Silence{Id: "5"}},
},
}
cases := []struct {
q *query
@ -717,28 +735,30 @@ func TestSilenceExpire(t *testing.T) {
m := &pb.Matcher{Type: pb.Matcher_EQUAL, Name: "a", Pattern: "b"}
s.st = gossipData{
"pending": &pb.MeshSilence{Silence: &pb.Silence{
Id: "pending",
Matchers: []*pb.Matcher{m},
StartsAt: now.Add(time.Minute),
EndsAt: now.Add(time.Hour),
UpdatedAt: now.Add(-time.Hour),
}},
"active": &pb.MeshSilence{Silence: &pb.Silence{
Id: "active",
Matchers: []*pb.Matcher{m},
StartsAt: now.Add(-time.Minute),
EndsAt: now.Add(time.Hour),
UpdatedAt: now.Add(-time.Hour),
}},
"expired": &pb.MeshSilence{Silence: &pb.Silence{
Id: "expired",
Matchers: []*pb.Matcher{m},
StartsAt: now.Add(-time.Hour),
EndsAt: now.Add(-time.Minute),
UpdatedAt: now.Add(-time.Hour),
}},
s.st = &gossipData{
data: silenceMap{
"pending": &pb.MeshSilence{Silence: &pb.Silence{
Id: "pending",
Matchers: []*pb.Matcher{m},
StartsAt: now.Add(time.Minute),
EndsAt: now.Add(time.Hour),
UpdatedAt: now.Add(-time.Hour),
}},
"active": &pb.MeshSilence{Silence: &pb.Silence{
Id: "active",
Matchers: []*pb.Matcher{m},
StartsAt: now.Add(-time.Minute),
EndsAt: now.Add(time.Hour),
UpdatedAt: now.Add(-time.Hour),
}},
"expired": &pb.MeshSilence{Silence: &pb.Silence{
Id: "expired",
Matchers: []*pb.Matcher{m},
StartsAt: now.Add(-time.Hour),
EndsAt: now.Add(-time.Minute),
UpdatedAt: now.Add(-time.Hour),
}},
},
}
require.NoError(t, s.expire("pending"))
@ -979,29 +999,37 @@ func TestGossipDataMerge(t *testing.T) {
}
}
cases := []struct {
a, b gossipData
final, delta gossipData
a, b *gossipData
final, delta *gossipData
}{
{
a: gossipData{
"a1": newSilence(now),
"a2": newSilence(now),
"a3": newSilence(now),
a: &gossipData{
data: silenceMap{
"a1": newSilence(now),
"a2": newSilence(now),
"a3": newSilence(now),
},
},
b: gossipData{
"b1": newSilence(now), // new key, should be added
"a2": newSilence(now.Add(-time.Minute)), // older timestamp, should be dropped
"a3": newSilence(now.Add(time.Minute)), // newer timestamp, should overwrite
b: &gossipData{
data: silenceMap{
"b1": newSilence(now), // new key, should be added
"a2": newSilence(now.Add(-time.Minute)), // older timestamp, should be dropped
"a3": newSilence(now.Add(time.Minute)), // newer timestamp, should overwrite
},
},
final: gossipData{
"a1": newSilence(now),
"a2": newSilence(now),
"a3": newSilence(now.Add(time.Minute)),
"b1": newSilence(now),
final: &gossipData{
data: silenceMap{
"a1": newSilence(now),
"a2": newSilence(now),
"a3": newSilence(now.Add(time.Minute)),
"b1": newSilence(now),
},
},
delta: gossipData{
"b1": newSilence(now),
"a3": newSilence(now.Add(time.Minute)),
delta: &gossipData{
data: silenceMap{
"b1": newSilence(now),
"a3": newSilence(now.Add(time.Minute)),
},
},
},
}
@ -1065,9 +1093,9 @@ func TestGossipDataCoding(t *testing.T) {
for _, c := range cases {
// Create gossip data from input.
in := gossipData{}
in := newGossipData()
for _, e := range c.entries {
in[e.Silence.Id] = e
in.data[e.Silence.Id] = e
}
msg := in.Encode()
require.Equal(t, 1, len(msg), "expected single message for input")