From c1ee634c16334e576b565a1c00d47c82263e95e6 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 10 Jun 2016 12:25:56 +0200 Subject: [PATCH] provider/mesh: add notification garbage collection --- provider/mesh/state.go | 39 ++++++++++++++++++++++++++++++++++--- provider/mesh/state_test.go | 29 +++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 3 deletions(-) diff --git a/provider/mesh/state.go b/provider/mesh/state.go index d117b284..80001668 100644 --- a/provider/mesh/state.go +++ b/provider/mesh/state.go @@ -19,22 +19,55 @@ type notificationEntry struct { } type notificationState struct { - mtx sync.RWMutex - set map[string]notificationEntry + mtx sync.RWMutex + set map[string]notificationEntry + stopc chan struct{} + now func() time.Time // test injection hook } +const gcInterval = 10 * time.Minute + func newNotificationState() *notificationState { return ¬ificationState{ - set: map[string]notificationEntry{}, + set: map[string]notificationEntry{}, + stopc: make(chan struct{}), + now: time.Now, } } +func (s *notificationState) run(retention time.Duration) { + for { + select { + case <-s.stopc: + return + case <-time.After(gcInterval): + s.gc(retention) + } + } +} + +func (s *notificationState) stop() { + close(s.stopc) +} + func decodeNotificationSet(b []byte) (map[string]notificationEntry, error) { var v map[string]notificationEntry err := gob.NewDecoder(bytes.NewReader(b)).Decode(&v) return v, err } +func (s *notificationState) gc(retention time.Duration) { + s.mtx.Lock() + defer s.mtx.Unlock() + + t := s.now().Add(-retention) + for k, v := range s.set { + if v.Timestamp.Before(t) { + delete(s.set, k) + } + } +} + // copy returns a deep copy of the notification state. func (s *notificationState) copy() *notificationState { s.mtx.RLock() diff --git a/provider/mesh/state_test.go b/provider/mesh/state_test.go index dcc40650..1f0932ec 100644 --- a/provider/mesh/state_test.go +++ b/provider/mesh/state_test.go @@ -12,6 +12,35 @@ import ( "github.com/satori/go.uuid" ) +func TestNotificationStateGC(t *testing.T) { + now := time.Now() + + initial := map[string]notificationEntry{ + "1": {true, now}, + "2": {true, now.Add(30 * time.Minute)}, + "3": {true, now.Add(-30 * time.Minute)}, + "4": {true, now.Add(-60 * time.Minute)}, + "5": {true, now.Add(-61 * time.Minute)}, + "6": {true, now.Add(-100 * time.Hour)}, + } + final := map[string]notificationEntry{ + "1": {true, now}, + "2": {true, now.Add(30 * time.Minute)}, + "3": {true, now.Add(-30 * time.Minute)}, + "4": {true, now.Add(-60 * time.Minute)}, + } + + st := newNotificationState() + st.now = func() time.Time { return now } + st.set = initial + st.gc(time.Hour) + + if !reflect.DeepEqual(st.set, final) { + t.Errorf("Unexpected state after GC") + t.Errorf("%s", pretty.Compare(st.set, final)) + } +} + func TestSilenceStateSet(t *testing.T) { var ( now = time.Now()