package manager import ( "encoding/json" "fmt" "path/filepath" "sort" "sync" "time" "github.com/prometheus/common/model" "github.com/prometheus/log" "github.com/prometheus/alertmanager/crdt" ) // A State serves the Alertmanager's internal state about active silences. type State interface { Silence() SilenceState Config() ConfigState Notify() NotifyState Alert() AlertState } type AlertState interface { Add(...*Alert) error Get(model.Fingerprint) (*Alert, error) GetAll() ([]*Alert, error) Iter() <-chan *Alert } type ConfigState interface { Set(*Config) error Get() (*Config, error) } type NotifyState interface { Get(model.Fingerprint) (*NotifyInfo, error) Set(model.Fingerprint, *NotifyInfo) error List() ([]*NotifyInfo, error) } type SilenceState interface { // Silences returns a list of all silences. List() ([]*Silence, error) // SetSilence sets the given silence. Set(*Silence) error Del(sid string) error Get(sid string) (*Silence, error) } // simpleState implements the State interface based on in-memory storage. type simpleState struct { silences *memSilences alerts *crdtAlerts config *memConfig notify *memNotify } func NewSimpleState() State { state := &simpleState{ silences: &memSilences{ sils: map[string]*Silence{}, nextID: 1, }, alerts: newCRDTAlerts(crdt.NewMemStorage()), // alerts: &memAlerts{ // alerts: map[model.Fingerprint]*Alert{}, // updates: make(chan *Alert, 100), // }, config: &memConfig{}, notify: &memNotify{ m: map[model.Fingerprint]*NotifyInfo{}, }, } go state.alerts.run() return state } func NewPersistentState(path string) State { alertDB, err := crdt.NewLevelDBStorage(filepath.Join(path, "/alerts")) if err != nil { panic(err) } state := &simpleState{ silences: &memSilences{ sils: map[string]*Silence{}, nextID: 1, }, alerts: newCRDTAlerts(alertDB), // alerts: &memAlerts{ // alerts: map[model.Fingerprint]*Alert{}, // updates: make(chan *Alert, 100), // }, config: &memConfig{}, notify: &memNotify{ m: map[model.Fingerprint]*NotifyInfo{}, }, } go state.alerts.run() return state } func (s *simpleState) Alert() AlertState { return s.alerts } func (s *simpleState) Silence() SilenceState { return s.silences } func (s *simpleState) Config() ConfigState { return s.config } func (s *simpleState) Notify() NotifyState { return s.notify } type NotifyInfo struct { LastSent time.Time LastResolved bool Labels model.LabelSet } type memNotify struct { m map[model.Fingerprint]*NotifyInfo } func (s *memNotify) Get(fp model.Fingerprint) (*NotifyInfo, error) { if info, ok := s.m[fp]; ok { return info, nil } return nil, fmt.Errorf("notify info for %s not found", fp) } func (s *memNotify) Set(fp model.Fingerprint, info *NotifyInfo) error { s.m[fp] = info return nil } func (s *memNotify) List() ([]*NotifyInfo, error) { var res []*NotifyInfo for _, ni := range s.m { res = append(res, ni) } return res, nil } type memConfig struct { config *Config mtx sync.RWMutex } func (c *memConfig) Set(conf *Config) error { c.mtx.Lock() defer c.mtx.Unlock() c.config = conf return nil } func (c *memConfig) Get() (*Config, error) { c.mtx.RLock() defer c.mtx.RUnlock() return c.config, nil } type crdtAlerts struct { set crdt.Set updates chan *Alert subs []chan *Alert mtx sync.RWMutex } func newCRDTAlerts(storage crdt.Storage) *crdtAlerts { return &crdtAlerts{ set: crdt.NewLWW(storage), updates: make(chan *Alert, 100), } } func (s *crdtAlerts) run() { for a := range s.updates { s.mtx.RLock() for _, sub := range s.subs { select { case <-time.After(100 * time.Millisecond): log.Errorf("dropped alert %s for subscriber", a) case sub <- a: // Success } } s.mtx.RUnlock() } } func (s *crdtAlerts) Add(alerts ...*Alert) error { for _, a := range alerts { b, err := json.Marshal(a) if err != nil { return err } err = s.set.Add(a.Fingerprint().String(), uint64(a.Timestamp.UnixNano()/1e6), b) if err != nil { return err } s.updates <- a } return nil } func (s *crdtAlerts) Get(fp model.Fingerprint) (*Alert, error) { e, err := s.set.Get(fp.String()) if err != nil { return nil, err } var alert Alert err = json.Unmarshal(e.Value, &alert) if err != nil { return nil, err } return &alert, nil } func (s *crdtAlerts) GetAll() ([]*Alert, error) { list, err := s.set.List() if err != nil { return nil, err } var alerts []*Alert for _, e := range list { var alert Alert err = json.Unmarshal(e.Value, &alert) if err != nil { return nil, err } alerts = append(alerts, &alert) } return alerts, nil } func (s *crdtAlerts) Iter() <-chan *Alert { ch := make(chan *Alert, 100) // As we append the channel to the subcription channels // before sending the current list of all alerts, no alert is lost. // Handling the some alert twice is effectively a noop. s.mtx.Lock() s.subs = append(s.subs, ch) s.mtx.Unlock() prev, err := s.GetAll() if err != nil { log.Error(err) } go func() { for _, alert := range prev { ch <- alert } }() return ch } type memAlerts struct { alerts map[model.Fingerprint]*Alert updates chan *Alert subs []chan *Alert mtx sync.RWMutex } func (s *memAlerts) run() { for a := range s.updates { s.mtx.RLock() for _, sub := range s.subs { select { case <-time.After(100 * time.Millisecond): log.Errorf("dropped alert %s for subscriber", a) case sub <- a: // Success } } s.mtx.RUnlock() } } func (s *memAlerts) GetAll() ([]*Alert, error) { s.mtx.RLock() defer s.mtx.RUnlock() alerts := make([]*Alert, 0, len(s.alerts)) for _, a := range s.alerts { alerts = append(alerts, a) } // TODO(fabxc): specify whether time sorting is an interface // requirement. sort.Sort(alertTimeline(alerts)) return alerts, nil } func (s *memAlerts) Add(alerts ...*Alert) error { s.mtx.Lock() defer s.mtx.Unlock() for _, alert := range alerts { fp := alert.Fingerprint() // Last write wins. if prev, ok := s.alerts[fp]; !ok || !prev.Timestamp.After(alert.Timestamp) { s.alerts[fp] = alert } s.updates <- alert } return nil } func (s *memAlerts) Get(fp model.Fingerprint) (*Alert, error) { s.mtx.Lock() defer s.mtx.Unlock() if a, ok := s.alerts[fp]; ok { return a, nil } return nil, fmt.Errorf("alert with fingerprint %s does not exist", fp) } func (s *memAlerts) Del(fp model.Fingerprint) error { s.mtx.Lock() defer s.mtx.Unlock() delete(s.alerts, fp) return nil } func (s *memAlerts) Iter() <-chan *Alert { ch := make(chan *Alert, 100) s.mtx.Lock() s.subs = append(s.subs, ch) s.mtx.Unlock() go func() { prev, _ := s.GetAll() for _, alert := range prev { ch <- alert } }() return ch } type memSilences struct { sils map[string]*Silence mtx sync.RWMutex nextID uint64 } func (s *memSilences) genID() string { sid := fmt.Sprintf("%x", s.nextID) s.nextID++ return sid } func (s *memSilences) Get(sid string) (*Silence, error) { s.mtx.RLock() defer s.mtx.RUnlock() if sil, ok := s.sils[sid]; ok { return sil, nil } return nil, fmt.Errorf("silence with ID %s does not exist", sid) } func (s *memSilences) Del(sid string) error { if _, ok := s.sils[sid]; !ok { return fmt.Errorf("silence with ID %s does not exist", sid) } delete(s.sils, sid) return nil } func (s *memSilences) List() ([]*Silence, error) { s.mtx.Lock() defer s.mtx.Unlock() sils := make([]*Silence, 0, len(s.sils)) for _, sil := range s.sils { sils = append(sils, sil) } return sils, nil } func (s *memSilences) Set(sil *Silence) error { s.mtx.RLock() defer s.mtx.RUnlock() if sil.ID == "" { sil.ID = s.genID() } s.sils[sil.ID] = sil return nil }