diff --git a/main.go b/main.go index 55f01c44..73d522b0 100644 --- a/main.go +++ b/main.go @@ -16,12 +16,16 @@ package main import ( "flag" "fmt" + "io/ioutil" + stdlog "log" "net" "net/http" "net/url" "os" "os/signal" "path" + "sort" + "strconv" "strings" "syscall" "time" @@ -30,24 +34,16 @@ import ( "github.com/prometheus/common/log" "github.com/prometheus/common/route" "github.com/prometheus/common/version" + "github.com/weaveworks/mesh" "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/provider/boltmem" + meshprov "github.com/prometheus/alertmanager/provider/mesh" "github.com/prometheus/alertmanager/template" "github.com/prometheus/alertmanager/types" ) -var ( - showVersion = flag.Bool("version", false, "Print version information.") - - configFile = flag.String("config.file", "alertmanager.yml", "Alertmanager configuration file name.") - dataDir = flag.String("storage.path", "data/", "Base path for data storage.") - - externalURL = flag.String("web.external-url", "", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.") - listenAddress = flag.String("web.listen-address", ":9093", "Address to listen on for the web interface and API.") -) - var ( configSuccess = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "alertmanager", @@ -68,6 +64,21 @@ func init() { } func main() { + peers := &stringset{} + var ( + showVersion = flag.Bool("version", false, "Print version information.") + + configFile = flag.String("config.file", "alertmanager.yml", "Alertmanager configuration file name.") + dataDir = flag.String("storage.path", "data/", "Base path for data storage.") + + externalURL = flag.String("web.external-url", "", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.") + listenAddress = flag.String("web.listen-address", ":9093", "Address to listen on for the web interface and API.") + + meshListen = flag.String("mesh.listen-address", net.JoinHostPort("0.0.0.0", strconv.Itoa(mesh.Port)), "mesh listen address") + hwaddr = flag.String("mesh.hardware-address", mustHardwareAddr(), "MAC address, i.e. mesh peer ID") + nickname = flag.String("mesh.nickname", mustHostname(), "peer nickname") + ) + flag.Var(peers, "mesh.peer", "initial peers (may be repeated)") flag.Parse() if *showVersion { @@ -83,26 +94,27 @@ func main() { log.Fatal(err) } + mrouter := initMesh(*meshListen, *hwaddr, *nickname) + + ni := meshprov.NewNotificationInfos(log.Base()) + ni.Register(mrouter.NewGossip("notify_info", ni)) + marker := types.NewMarker() + silences := meshprov.NewSilences(marker, log.Base()) + silences.Register(mrouter.NewGossip("silences", silences)) + + mrouter.Start() + defer mrouter.Stop() + + mrouter.ConnectionMaker.InitiateConnections(peers.slice(), true) + alerts, err := boltmem.NewAlerts(*dataDir) if err != nil { log.Fatal(err) } defer alerts.Close() - notifies, err := boltmem.NewNotificationInfo(*dataDir) - if err != nil { - log.Fatal(err) - } - defer notifies.Close() - - silences, err := boltmem.NewSilences(*dataDir, marker) - if err != nil { - log.Fatal(err) - } - defer silences.Close() - var ( inhibitor *Inhibitor tmpl *template.Template @@ -123,7 +135,7 @@ func main() { for i, n := range fo { n = notify.Retry(n) n = notify.Log(n, log.With("step", "retry")) - n = notify.Dedup(notifies, n) + n = notify.Dedup(ni, n) n = notify.Log(n, log.With("step", "dedup")) fo[i] = n @@ -223,6 +235,34 @@ func main() { log.Infoln("Received SIGTERM, exiting gracefully...") } +func initMesh(addr, hwaddr, nickname string) *mesh.Router { + host, portStr, err := net.SplitHostPort(addr) + + if err != nil { + log.Fatalf("mesh address: %s: %v", addr, err) + } + port, err := strconv.Atoi(portStr) + if err != nil { + log.Fatalf("mesh address: %s: %v", addr, err) + } + + name, err := mesh.PeerNameFromString(hwaddr) + if err != nil { + log.Fatalf("%s: %v", hwaddr, err) + } + + return mesh.NewRouter(mesh.Config{ + Host: host, + Port: port, + ProtocolMinVersion: mesh.ProtocolMinVersion, + Password: []byte(""), + ConnLimit: 64, + PeerDiscovery: true, + TrustedSubnets: []*net.IPNet{}, + }, name, nickname, mesh.NullOverlay{}, stdlog.New(ioutil.Discard, "", 0)) + +} + func extURL(listen, external string) (*url.URL, error) { if external == "" { hostname, err := os.Hostname() @@ -256,3 +296,44 @@ func listen(listen string, router *route.Router) { log.Fatal(err) } } + +type stringset map[string]struct{} + +func (ss stringset) Set(value string) error { + ss[value] = struct{}{} + return nil +} + +func (ss stringset) String() string { + return strings.Join(ss.slice(), ",") +} + +func (ss stringset) slice() []string { + slice := make([]string, 0, len(ss)) + for k := range ss { + slice = append(slice, k) + } + sort.Strings(slice) + return slice +} + +func mustHardwareAddr() string { + ifaces, err := net.Interfaces() + if err != nil { + panic(err) + } + for _, iface := range ifaces { + if s := iface.HardwareAddr.String(); s != "" { + return s + } + } + panic("no valid network interfaces") +} + +func mustHostname() string { + hostname, err := os.Hostname() + if err != nil { + panic(err) + } + return hostname +} diff --git a/provider/mesh/peer.go b/provider/mesh/peer.go index 5b867915..57979900 100644 --- a/provider/mesh/peer.go +++ b/provider/mesh/peer.go @@ -25,6 +25,10 @@ func NewNotificationInfos(logger log.Logger) *NotificationInfos { } } +func (ni *NotificationInfos) Register(g mesh.Gossip) { + ni.send = g +} + func (ni *NotificationInfos) Gossip() mesh.GossipData { return ni.st.copy() } @@ -109,6 +113,10 @@ func NewSilences(mk types.Marker, logger log.Logger) *Silences { } } +func (s *Silences) Register(g mesh.Gossip) { + s.send = g +} + func (s *Silences) Mutes(lset model.LabelSet) bool { s.st.mtx.RLock() defer s.st.mtx.RUnlock() @@ -142,7 +150,7 @@ func (s *Silences) Set(sil *types.Silence) (uuid.UUID, error) { sil.UpdatedAt = time.Now() update := &silenceState{ - set: map[uuid.UUID]*model.Silence{ + set: map[uuid.UUID]*types.Silence{ sil.ID: sil, }, } diff --git a/test/acceptance.go b/test/acceptance.go index 1a408cc0..ee265869 100644 --- a/test/acceptance.go +++ b/test/acceptance.go @@ -125,6 +125,9 @@ func (t *AcceptanceTest) Alertmanager(conf string) *Alertmanager { am.UpdateConfig(conf) am.addr = freeAddress() + am.mesh = freeAddress() + am.hwaddr = "00:00:00:00:00:01" + am.nickname = "1" t.Logf("AM on %s", am.addr) @@ -225,11 +228,12 @@ type Alertmanager struct { t *AcceptanceTest opts *AcceptanceOpts - addr string - client alertmanager.Client - cmd *exec.Cmd - confFile *os.File - dir string + addr string + mesh, hwaddr, nickname string + client alertmanager.Client + cmd *exec.Cmd + confFile *os.File + dir string errc chan<- error } @@ -241,6 +245,9 @@ func (am *Alertmanager) Start() { "-log.level", "debug", "-web.listen-address", am.addr, "-storage.path", am.dir, + "-mesh.listen-address", am.mesh, + "-mesh.hardware-address", am.hwaddr, + "-mesh.nickname", am.nickname, ) if am.cmd == nil { diff --git a/test/acceptance/send_test.go b/test/acceptance/send_test.go index c6d60a75..82cb8884 100644 --- a/test/acceptance/send_test.go +++ b/test/acceptance/send_test.go @@ -87,16 +87,18 @@ receivers: // Test against a bug which ocurrec after a restart. The previous occurrence of // the alert was sent rather than the most recent one. - // XXX(fabxc): temporarily disable while alerts are not persisted. - // at.Do(At(6.7), func() { + // + // XXX(fabxc) disabled as notification info won't be persisted. Thus, with a mesh + // notifier we lose the state in this single-node setup. + //at.Do(At(6.7), func() { // am.Terminate() // am.Start() - // }) + //}) // On restart the alert is flushed right away as the group_wait has already passed. // However, it must be caught in the deduplication stage. // The next attempt will be 1s later and won't be filtered in deduping. - // co.Want(Between(7.7, 8), Alert("alertname", "test").Active(5.3)) + //co.Want(Between(7.7, 8), Alert("alertname", "test").Active(5.3)) at.Run() }