diff --git a/state.go b/state.go deleted file mode 100644 index e72d54c3..00000000 --- a/state.go +++ /dev/null @@ -1,434 +0,0 @@ -package manager - -import ( - "encoding/json" - "fmt" - "path/filepath" - "sort" - "sync" - "time" - - "github.com/prometheus/common/model" - "github.com/prometheus/log" - - "github.com/prometheus/alertmanager/crdt" -) - -// A State serves the Alertmanager's internal state about active silences. -type State interface { - Silence() SilenceState - Config() ConfigState - Notify() NotifyState - Alert() AlertState -} - -type AlertState interface { - Add(...*Alert) error - Get(model.Fingerprint) (*Alert, error) - GetAll() ([]*Alert, error) - - Iter() <-chan *Alert -} - -type ConfigState interface { - Set(*Config) error - Get() (*Config, error) -} - -type NotifyState interface { - Get(model.Fingerprint) (*NotifyInfo, error) - Set(model.Fingerprint, *NotifyInfo) error - List() ([]*NotifyInfo, error) -} - -type SilenceState interface { - // Silences returns a list of all silences. - List() ([]*Silence, error) - - // SetSilence sets the given silence. - Set(*Silence) error - Del(sid string) error - Get(sid string) (*Silence, error) -} - -// simpleState implements the State interface based on in-memory storage. -type simpleState struct { - silences *memSilences - alerts *crdtAlerts - config *memConfig - notify *memNotify -} - -func NewSimpleState() State { - state := &simpleState{ - silences: &memSilences{ - sils: map[string]*Silence{}, - nextID: 1, - }, - alerts: newCRDTAlerts(crdt.NewMemStorage()), - // alerts: &memAlerts{ - // alerts: map[model.Fingerprint]*Alert{}, - // updates: make(chan *Alert, 100), - // }, - config: &memConfig{}, - notify: &memNotify{ - m: map[model.Fingerprint]*NotifyInfo{}, - }, - } - - go state.alerts.run() - - return state -} - -func NewPersistentState(path string) State { - alertDB, err := crdt.NewLevelDBStorage(filepath.Join(path, "/alerts")) - if err != nil { - panic(err) - } - - state := &simpleState{ - silences: &memSilences{ - sils: map[string]*Silence{}, - nextID: 1, - }, - alerts: newCRDTAlerts(alertDB), - // alerts: &memAlerts{ - // alerts: map[model.Fingerprint]*Alert{}, - // updates: make(chan *Alert, 100), - // }, - config: &memConfig{}, - notify: &memNotify{ - m: map[model.Fingerprint]*NotifyInfo{}, - }, - } - - go state.alerts.run() - - return state -} - -func (s *simpleState) Alert() AlertState { - return s.alerts -} - -func (s *simpleState) Silence() SilenceState { - return s.silences -} - -func (s *simpleState) Config() ConfigState { - return s.config -} - -func (s *simpleState) Notify() NotifyState { - return s.notify -} - -type NotifyInfo struct { - LastSent time.Time - LastResolved bool - Labels model.LabelSet -} - -type memNotify struct { - m map[model.Fingerprint]*NotifyInfo -} - -func (s *memNotify) Get(fp model.Fingerprint) (*NotifyInfo, error) { - if info, ok := s.m[fp]; ok { - return info, nil - } - return nil, fmt.Errorf("notify info for %s not found", fp) -} - -func (s *memNotify) Set(fp model.Fingerprint, info *NotifyInfo) error { - s.m[fp] = info - return nil -} - -func (s *memNotify) List() ([]*NotifyInfo, error) { - var res []*NotifyInfo - for _, ni := range s.m { - res = append(res, ni) - } - return res, nil -} - -type memConfig struct { - config *Config - mtx sync.RWMutex -} - -func (c *memConfig) Set(conf *Config) error { - c.mtx.Lock() - defer c.mtx.Unlock() - - c.config = conf - return nil -} - -func (c *memConfig) Get() (*Config, error) { - c.mtx.RLock() - defer c.mtx.RUnlock() - - return c.config, nil -} - -type crdtAlerts struct { - set crdt.Set - - updates chan *Alert - subs []chan *Alert - mtx sync.RWMutex -} - -func newCRDTAlerts(storage crdt.Storage) *crdtAlerts { - return &crdtAlerts{ - set: crdt.NewLWW(storage), - updates: make(chan *Alert, 100), - } -} - -func (s *crdtAlerts) run() { - for a := range s.updates { - s.mtx.RLock() - - for _, sub := range s.subs { - select { - case <-time.After(100 * time.Millisecond): - log.Errorf("dropped alert %s for subscriber", a) - case sub <- a: - // Success - } - } - - s.mtx.RUnlock() - } -} - -func (s *crdtAlerts) Add(alerts ...*Alert) error { - for _, a := range alerts { - - b, err := json.Marshal(a) - if err != nil { - return err - } - - err = s.set.Add(a.Fingerprint().String(), uint64(a.Timestamp.UnixNano()/1e6), b) - if err != nil { - return err - } - - s.updates <- a - } - return nil -} - -func (s *crdtAlerts) Get(fp model.Fingerprint) (*Alert, error) { - e, err := s.set.Get(fp.String()) - if err != nil { - return nil, err - } - - var alert Alert - err = json.Unmarshal(e.Value, &alert) - if err != nil { - return nil, err - } - - return &alert, nil -} - -func (s *crdtAlerts) GetAll() ([]*Alert, error) { - list, err := s.set.List() - if err != nil { - return nil, err - } - - var alerts []*Alert - for _, e := range list { - var alert Alert - err = json.Unmarshal(e.Value, &alert) - if err != nil { - return nil, err - } - - alerts = append(alerts, &alert) - } - return alerts, nil -} - -func (s *crdtAlerts) Iter() <-chan *Alert { - ch := make(chan *Alert, 100) - - // As we append the channel to the subcription channels - // before sending the current list of all alerts, no alert is lost. - // Handling the some alert twice is effectively a noop. - s.mtx.Lock() - s.subs = append(s.subs, ch) - s.mtx.Unlock() - - prev, err := s.GetAll() - if err != nil { - log.Error(err) - } - - go func() { - for _, alert := range prev { - ch <- alert - } - }() - - return ch -} - -type memAlerts struct { - alerts map[model.Fingerprint]*Alert - updates chan *Alert - subs []chan *Alert - mtx sync.RWMutex -} - -func (s *memAlerts) run() { - for a := range s.updates { - s.mtx.RLock() - - for _, sub := range s.subs { - select { - case <-time.After(100 * time.Millisecond): - log.Errorf("dropped alert %s for subscriber", a) - case sub <- a: - // Success - } - } - - s.mtx.RUnlock() - } -} - -func (s *memAlerts) GetAll() ([]*Alert, error) { - s.mtx.RLock() - defer s.mtx.RUnlock() - - alerts := make([]*Alert, 0, len(s.alerts)) - for _, a := range s.alerts { - alerts = append(alerts, a) - } - - // TODO(fabxc): specify whether time sorting is an interface - // requirement. - sort.Sort(alertTimeline(alerts)) - - return alerts, nil -} - -func (s *memAlerts) Add(alerts ...*Alert) error { - s.mtx.Lock() - defer s.mtx.Unlock() - - for _, alert := range alerts { - fp := alert.Fingerprint() - - // Last write wins. - if prev, ok := s.alerts[fp]; !ok || !prev.Timestamp.After(alert.Timestamp) { - s.alerts[fp] = alert - } - - s.updates <- alert - } - - return nil -} - -func (s *memAlerts) Get(fp model.Fingerprint) (*Alert, error) { - s.mtx.Lock() - defer s.mtx.Unlock() - - if a, ok := s.alerts[fp]; ok { - return a, nil - } - - return nil, fmt.Errorf("alert with fingerprint %s does not exist", fp) -} - -func (s *memAlerts) Del(fp model.Fingerprint) error { - s.mtx.Lock() - defer s.mtx.Unlock() - - delete(s.alerts, fp) - return nil -} - -func (s *memAlerts) Iter() <-chan *Alert { - ch := make(chan *Alert, 100) - - s.mtx.Lock() - s.subs = append(s.subs, ch) - s.mtx.Unlock() - - go func() { - prev, _ := s.GetAll() - - for _, alert := range prev { - ch <- alert - } - }() - - return ch -} - -type memSilences struct { - sils map[string]*Silence - - mtx sync.RWMutex - nextID uint64 -} - -func (s *memSilences) genID() string { - sid := fmt.Sprintf("%x", s.nextID) - s.nextID++ - return sid -} - -func (s *memSilences) Get(sid string) (*Silence, error) { - s.mtx.RLock() - defer s.mtx.RUnlock() - - if sil, ok := s.sils[sid]; ok { - return sil, nil - } - - return nil, fmt.Errorf("silence with ID %s does not exist", sid) -} - -func (s *memSilences) Del(sid string) error { - if _, ok := s.sils[sid]; !ok { - return fmt.Errorf("silence with ID %s does not exist", sid) - } - - delete(s.sils, sid) - return nil -} - -func (s *memSilences) List() ([]*Silence, error) { - s.mtx.Lock() - defer s.mtx.Unlock() - - sils := make([]*Silence, 0, len(s.sils)) - for _, sil := range s.sils { - sils = append(sils, sil) - } - return sils, nil -} - -func (s *memSilences) Set(sil *Silence) error { - s.mtx.RLock() - defer s.mtx.RUnlock() - - if sil.ID == "" { - sil.ID = s.genID() - } - - s.sils[sil.ID] = sil - return nil -}