main: use mesh providers

This commit is contained in:
Fabian Reinartz 2016-06-02 15:43:04 +02:00
parent c8a22d5750
commit e51770ce21
4 changed files with 131 additions and 33 deletions

127
main.go
View File

@ -16,12 +16,16 @@ package main
import (
"flag"
"fmt"
"io/ioutil"
stdlog "log"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"path"
"sort"
"strconv"
"strings"
"syscall"
"time"
@ -30,24 +34,16 @@ import (
"github.com/prometheus/common/log"
"github.com/prometheus/common/route"
"github.com/prometheus/common/version"
"github.com/weaveworks/mesh"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/provider/boltmem"
meshprov "github.com/prometheus/alertmanager/provider/mesh"
"github.com/prometheus/alertmanager/template"
"github.com/prometheus/alertmanager/types"
)
var (
showVersion = flag.Bool("version", false, "Print version information.")
configFile = flag.String("config.file", "alertmanager.yml", "Alertmanager configuration file name.")
dataDir = flag.String("storage.path", "data/", "Base path for data storage.")
externalURL = flag.String("web.external-url", "", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.")
listenAddress = flag.String("web.listen-address", ":9093", "Address to listen on for the web interface and API.")
)
var (
configSuccess = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "alertmanager",
@ -68,6 +64,21 @@ func init() {
}
func main() {
peers := &stringset{}
var (
showVersion = flag.Bool("version", false, "Print version information.")
configFile = flag.String("config.file", "alertmanager.yml", "Alertmanager configuration file name.")
dataDir = flag.String("storage.path", "data/", "Base path for data storage.")
externalURL = flag.String("web.external-url", "", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.")
listenAddress = flag.String("web.listen-address", ":9093", "Address to listen on for the web interface and API.")
meshListen = flag.String("mesh.listen-address", net.JoinHostPort("0.0.0.0", strconv.Itoa(mesh.Port)), "mesh listen address")
hwaddr = flag.String("mesh.hardware-address", mustHardwareAddr(), "MAC address, i.e. mesh peer ID")
nickname = flag.String("mesh.nickname", mustHostname(), "peer nickname")
)
flag.Var(peers, "mesh.peer", "initial peers (may be repeated)")
flag.Parse()
if *showVersion {
@ -83,26 +94,27 @@ func main() {
log.Fatal(err)
}
mrouter := initMesh(*meshListen, *hwaddr, *nickname)
ni := meshprov.NewNotificationInfos(log.Base())
ni.Register(mrouter.NewGossip("notify_info", ni))
marker := types.NewMarker()
silences := meshprov.NewSilences(marker, log.Base())
silences.Register(mrouter.NewGossip("silences", silences))
mrouter.Start()
defer mrouter.Stop()
mrouter.ConnectionMaker.InitiateConnections(peers.slice(), true)
alerts, err := boltmem.NewAlerts(*dataDir)
if err != nil {
log.Fatal(err)
}
defer alerts.Close()
notifies, err := boltmem.NewNotificationInfo(*dataDir)
if err != nil {
log.Fatal(err)
}
defer notifies.Close()
silences, err := boltmem.NewSilences(*dataDir, marker)
if err != nil {
log.Fatal(err)
}
defer silences.Close()
var (
inhibitor *Inhibitor
tmpl *template.Template
@ -123,7 +135,7 @@ func main() {
for i, n := range fo {
n = notify.Retry(n)
n = notify.Log(n, log.With("step", "retry"))
n = notify.Dedup(notifies, n)
n = notify.Dedup(ni, n)
n = notify.Log(n, log.With("step", "dedup"))
fo[i] = n
@ -223,6 +235,34 @@ func main() {
log.Infoln("Received SIGTERM, exiting gracefully...")
}
func initMesh(addr, hwaddr, nickname string) *mesh.Router {
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
log.Fatalf("mesh address: %s: %v", addr, err)
}
port, err := strconv.Atoi(portStr)
if err != nil {
log.Fatalf("mesh address: %s: %v", addr, err)
}
name, err := mesh.PeerNameFromString(hwaddr)
if err != nil {
log.Fatalf("%s: %v", hwaddr, err)
}
return mesh.NewRouter(mesh.Config{
Host: host,
Port: port,
ProtocolMinVersion: mesh.ProtocolMinVersion,
Password: []byte(""),
ConnLimit: 64,
PeerDiscovery: true,
TrustedSubnets: []*net.IPNet{},
}, name, nickname, mesh.NullOverlay{}, stdlog.New(ioutil.Discard, "", 0))
}
func extURL(listen, external string) (*url.URL, error) {
if external == "" {
hostname, err := os.Hostname()
@ -256,3 +296,44 @@ func listen(listen string, router *route.Router) {
log.Fatal(err)
}
}
type stringset map[string]struct{}
func (ss stringset) Set(value string) error {
ss[value] = struct{}{}
return nil
}
func (ss stringset) String() string {
return strings.Join(ss.slice(), ",")
}
func (ss stringset) slice() []string {
slice := make([]string, 0, len(ss))
for k := range ss {
slice = append(slice, k)
}
sort.Strings(slice)
return slice
}
func mustHardwareAddr() string {
ifaces, err := net.Interfaces()
if err != nil {
panic(err)
}
for _, iface := range ifaces {
if s := iface.HardwareAddr.String(); s != "" {
return s
}
}
panic("no valid network interfaces")
}
func mustHostname() string {
hostname, err := os.Hostname()
if err != nil {
panic(err)
}
return hostname
}

View File

@ -25,6 +25,10 @@ func NewNotificationInfos(logger log.Logger) *NotificationInfos {
}
}
func (ni *NotificationInfos) Register(g mesh.Gossip) {
ni.send = g
}
func (ni *NotificationInfos) Gossip() mesh.GossipData {
return ni.st.copy()
}
@ -109,6 +113,10 @@ func NewSilences(mk types.Marker, logger log.Logger) *Silences {
}
}
func (s *Silences) Register(g mesh.Gossip) {
s.send = g
}
func (s *Silences) Mutes(lset model.LabelSet) bool {
s.st.mtx.RLock()
defer s.st.mtx.RUnlock()
@ -142,7 +150,7 @@ func (s *Silences) Set(sil *types.Silence) (uuid.UUID, error) {
sil.UpdatedAt = time.Now()
update := &silenceState{
set: map[uuid.UUID]*model.Silence{
set: map[uuid.UUID]*types.Silence{
sil.ID: sil,
},
}

View File

@ -125,6 +125,9 @@ func (t *AcceptanceTest) Alertmanager(conf string) *Alertmanager {
am.UpdateConfig(conf)
am.addr = freeAddress()
am.mesh = freeAddress()
am.hwaddr = "00:00:00:00:00:01"
am.nickname = "1"
t.Logf("AM on %s", am.addr)
@ -225,11 +228,12 @@ type Alertmanager struct {
t *AcceptanceTest
opts *AcceptanceOpts
addr string
client alertmanager.Client
cmd *exec.Cmd
confFile *os.File
dir string
addr string
mesh, hwaddr, nickname string
client alertmanager.Client
cmd *exec.Cmd
confFile *os.File
dir string
errc chan<- error
}
@ -241,6 +245,9 @@ func (am *Alertmanager) Start() {
"-log.level", "debug",
"-web.listen-address", am.addr,
"-storage.path", am.dir,
"-mesh.listen-address", am.mesh,
"-mesh.hardware-address", am.hwaddr,
"-mesh.nickname", am.nickname,
)
if am.cmd == nil {

View File

@ -87,16 +87,18 @@ receivers:
// Test against a bug which ocurrec after a restart. The previous occurrence of
// the alert was sent rather than the most recent one.
// XXX(fabxc): temporarily disable while alerts are not persisted.
// at.Do(At(6.7), func() {
//
// XXX(fabxc) disabled as notification info won't be persisted. Thus, with a mesh
// notifier we lose the state in this single-node setup.
//at.Do(At(6.7), func() {
// am.Terminate()
// am.Start()
// })
//})
// On restart the alert is flushed right away as the group_wait has already passed.
// However, it must be caught in the deduplication stage.
// The next attempt will be 1s later and won't be filtered in deduping.
// co.Want(Between(7.7, 8), Alert("alertname", "test").Active(5.3))
//co.Want(Between(7.7, 8), Alert("alertname", "test").Active(5.3))
at.Run()
}