Merge pull request #725 from prometheus/groupkey

*: switch group key to matcher serialization
This commit is contained in:
Fabian Reinartz 2017-04-21 13:21:39 +02:00 committed by GitHub
commit 0659dfbde3
10 changed files with 130 additions and 129 deletions

View File

@ -88,7 +88,7 @@ type APIAlert struct {
// AlertGroup is a list of alert blocks grouped by the same label set.
type AlertGroup struct {
Labels model.LabelSet `json:"labels"`
GroupKey uint64 `json:"groupKey"`
GroupKey string `json:"groupKey"`
Blocks []*AlertBlock `json:"blocks"`
}
@ -256,7 +256,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
// If the group does not exist, create it.
ag, ok := groups[fp]
if !ok {
ag = newAggrGroup(d.ctx, group, &route.RouteOpts, d.timeout)
ag = newAggrGroup(d.ctx, group, route, d.timeout)
groups[fp] = ag
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
@ -275,10 +275,10 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
// common set of routing options applies.
// It emits notifications in the specified intervals.
type aggrGroup struct {
labels model.LabelSet
opts *RouteOpts
routeFP model.Fingerprint
log log.Logger
labels model.LabelSet
opts *RouteOpts
log log.Logger
routeKey string
ctx context.Context
cancel func()
@ -292,15 +292,16 @@ type aggrGroup struct {
}
// newAggrGroup returns a new aggregation group.
func newAggrGroup(ctx context.Context, labels model.LabelSet, opts *RouteOpts, to func(time.Duration) time.Duration) *aggrGroup {
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration) *aggrGroup {
if to == nil {
to = func(d time.Duration) time.Duration { return d }
}
ag := &aggrGroup{
labels: labels,
opts: opts,
timeout: to,
alerts: map[model.Fingerprint]*types.Alert{},
labels: labels,
routeKey: r.Key(),
opts: &r.RouteOpts,
timeout: to,
alerts: map[model.Fingerprint]*types.Alert{},
}
ag.ctx, ag.cancel = context.WithCancel(ctx)
@ -313,8 +314,16 @@ func newAggrGroup(ctx context.Context, labels model.LabelSet, opts *RouteOpts, t
return ag
}
func (ag *aggrGroup) fingerprint() model.Fingerprint {
return ag.labels.Fingerprint()
}
func (ag *aggrGroup) GroupKey() string {
return fmt.Sprintf("%s:%s", ag.routeKey, ag.labels)
}
func (ag *aggrGroup) String() string {
return fmt.Sprint(ag.fingerprint())
return ag.GroupKey()
}
func (ag *aggrGroup) alertSlice() []*types.Alert {
@ -348,7 +357,7 @@ func (ag *aggrGroup) run(nf notifyFunc) {
ctx = notify.WithNow(ctx, now)
// Populate context with information needed along the pipeline.
ctx = notify.WithGroupKey(ctx, model.Fingerprint(ag.GroupKey()))
ctx = notify.WithGroupKey(ctx, ag.GroupKey())
ctx = notify.WithGroupLabels(ctx, ag.labels)
ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
@ -377,14 +386,6 @@ func (ag *aggrGroup) stop() {
<-ag.done
}
func (ag *aggrGroup) fingerprint() model.Fingerprint {
return ag.labels.Fingerprint()
}
func (ag *aggrGroup) GroupKey() uint64 {
return uint64(ag.labels.Fingerprint() ^ ag.routeFP)
}
// insert inserts the alert into the aggregation group. If the aggregation group
// is empty afterwards, it returns true.
func (ag *aggrGroup) insert(alert *types.Alert) {

View File

@ -100,6 +100,9 @@ func TestAggrGroup(t *testing.T) {
GroupInterval: 300 * time.Millisecond,
RepeatInterval: 1 * time.Hour,
}
route := &Route{
RouteOpts: *opts,
}
var (
a1 = &types.Alert{
@ -173,7 +176,7 @@ func TestAggrGroup(t *testing.T) {
}
// Test regular situation where we wait for group_wait to send out alerts.
ag := newAggrGroup(context.Background(), lset, opts, nil)
ag := newAggrGroup(context.Background(), lset, route, nil)
go ag.run(ntfy)
ag.insert(a1)
@ -221,7 +224,7 @@ func TestAggrGroup(t *testing.T) {
// immediate flushing.
// Finally, set all alerts to be resolved. After successful notify the aggregation group
// should empty itself.
ag = newAggrGroup(context.Background(), lset, opts, nil)
ag = newAggrGroup(context.Background(), lset, route, nil)
go ag.run(ntfy)
ag.insert(a1)

View File

@ -130,8 +130,7 @@ func (r *Route) Match(lset model.LabelSet) []*Route {
}
}
// If no child nodes were matches, the current node itself is
// a match.
// If no child nodes were matches, the current node itself is a match.
if len(all) == 0 {
all = append(all, r)
}
@ -139,32 +138,15 @@ func (r *Route) Match(lset model.LabelSet) []*Route {
return all
}
// SquashMatchers returns the total set of matchers on the path of the tree
// that have to apply to reach the route.
func (r *Route) SquashMatchers() types.Matchers {
var res types.Matchers
res = append(res, r.Matchers...)
// Key returns a key for the route. It does not uniquely identify a the route in general.
func (r *Route) Key() string {
b := make([]byte, 0, 1024)
if r.parent == nil {
return res
if r.parent != nil {
b = append(b, r.parent.Key()...)
b = append(b, '/')
}
pm := r.parent.SquashMatchers()
res = append(pm, res...)
return res
}
// Fingerprint returns a hash of the Route based on its grouping labels,
// routing options and the total set of matchers necessary to reach this route.
func (r *Route) Fingerprint() model.Fingerprint {
lset := make(model.LabelSet, len(r.RouteOpts.GroupBy))
for ln := range r.RouteOpts.GroupBy {
lset[ln] = ""
}
return r.SquashMatchers().Fingerprint() ^ lset.Fingerprint()
return string(append(b, r.Matchers.String()...))
}
// RouteOpts holds various routing options necessary for processing alerts

View File

@ -43,7 +43,7 @@ type Log interface {
// The Log* methods store a notification log entry for
// a fully qualified receiver and a given IDs identifying the
// alert object.
Log(r *pb.Receiver, key []byte, firing, resolved []uint64) error
Log(r *pb.Receiver, key string, firing, resolved []uint64) error
// Query the log along the given Paramteres.
//
@ -67,7 +67,7 @@ type Log interface {
// group or a given time interval.
type query struct {
recv *pb.Receiver
groupKey []byte
groupKey string
}
// QueryParam is a function that modifies a query to incorporate
@ -84,7 +84,7 @@ func QReceiver(r *pb.Receiver) QueryParam {
}
// QGroupKey adds a group key as querying argument.
func QGroupKey(gk []byte) QueryParam {
func QGroupKey(gk string) QueryParam {
return func(q *query) error {
q.groupKey = gk
return nil
@ -323,13 +323,17 @@ Loop:
}
}
// stateKey returns a string key for a log entry consisting of the group key
// and receiver.
func stateKey(k []byte, r *pb.Receiver) string {
return fmt.Sprintf("%s:%s", k, r)
func receiverKey(r *pb.Receiver) string {
return fmt.Sprintf("%s/%s/%d", r.GroupName, r.Integration, r.Idx)
}
func (l *nlog) Log(r *pb.Receiver, gkey []byte, firingAlerts, resolvedAlerts []uint64) error {
// stateKey returns a string key for a log entry consisting of the group key
// and receiver.
func stateKey(k string, r *pb.Receiver) string {
return fmt.Sprintf("%s:%s", k, receiverKey(r))
}
func (l *nlog) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
// Write all st with the same timestamp.
now := l.now()
key := stateKey(gkey, r)
@ -348,7 +352,7 @@ func (l *nlog) Log(r *pb.Receiver, gkey []byte, firingAlerts, resolvedAlerts []u
e := &pb.MeshEntry{
Entry: &pb.Entry{
Receiver: r,
GroupKey: gkey,
GroupKey: []byte(gkey),
Timestamp: now,
FiringAlerts: firingAlerts,
ResolvedAlerts: resolvedAlerts,
@ -401,7 +405,7 @@ func (l *nlog) Query(params ...QueryParam) ([]*pb.Entry, error) {
}
// TODO(fabxc): For now our only query mode is the most recent entry for a
// receiver/group_key combination.
if q.recv == nil || q.groupKey == nil {
if q.recv == nil || q.groupKey == "" {
// TODO(fabxc): allow more complex queries in the future.
// How to enable pagination?
return nil, errors.New("no query parameters specified")
@ -437,7 +441,7 @@ func (l *nlog) loadSnapshot(r io.Reader) error {
}
return err
}
st[stateKey(e.Entry.GroupKey, e.Entry.Receiver)] = &e
st[stateKey(string(e.Entry.GroupKey), e.Entry.Receiver)] = &e
}
l.st = st
@ -523,7 +527,7 @@ func decodeGossipData(msg []byte) (gossipData, error) {
}
return gd, err
}
gd[stateKey(e.Entry.GroupKey, e.Entry.Receiver)] = &e
gd[stateKey(string(e.Entry.GroupKey), e.Entry.Receiver)] = &e
}
return gd, nil

View File

@ -103,7 +103,7 @@ func TestNlogSnapshot(t *testing.T) {
}
// Setup internal state manually.
for _, e := range c.entries {
l1.st[stateKey(e.Entry.GroupKey, e.Entry.Receiver)] = e
l1.st[stateKey(string(e.Entry.GroupKey), e.Entry.Receiver)] = e
}
_, err = l1.Snapshot(f)
require.NoError(t, err, "creating snapshot failed")
@ -253,7 +253,7 @@ func TestGossipDataCoding(t *testing.T) {
// Create gossip data from input.
in := gossipData{}
for _, e := range c.entries {
in[stateKey(e.Entry.GroupKey, e.Entry.Receiver)] = e
in[stateKey(string(e.Entry.GroupKey), e.Entry.Receiver)] = e
}
msg := in.Encode()
require.Equal(t, 1, len(msg), "expected single message for input")

View File

@ -23,7 +23,7 @@ message Receiver {
// Entry holds information about a successful notification
// sent to a receiver.
message Entry {
// The key identifying the dispatching group.
// The key identifying the dispatching group.
bytes group_key = 1;
// The receiver that was notified.
Receiver receiver = 2;

View File

@ -15,6 +15,7 @@ package notify
import (
"bytes"
"crypto/sha256"
"crypto/tls"
"encoding/json"
"errors"
@ -152,7 +153,7 @@ type WebhookMessage struct {
// The protocol version.
Version string `json:"version"`
GroupKey uint64 `json:"groupKey"`
GroupKey string `json:"groupKey"`
}
// Notify implements the Notifier interface.
@ -165,9 +166,9 @@ func (w *Webhook) Notify(ctx context.Context, alerts ...*types.Alert) (bool, err
}
msg := &WebhookMessage{
Version: "3",
Version: "4",
Data: data,
GroupKey: uint64(groupKey),
GroupKey: groupKey,
}
var buf bytes.Buffer
@ -372,7 +373,7 @@ const (
type pagerDutyMessage struct {
ServiceKey string `json:"service_key"`
IncidentKey model.Fingerprint `json:"incident_key"`
IncidentKey string `json:"incident_key"`
EventType string `json:"event_type"`
Description string `json:"description"`
Client string `json:"client,omitempty"`
@ -410,7 +411,7 @@ func (n *PagerDuty) Notify(ctx context.Context, as ...*types.Alert) (bool, error
msg := &pagerDutyMessage{
ServiceKey: tmpl(string(n.conf.ServiceKey)),
EventType: eventType,
IncidentKey: key,
IncidentKey: hashKey(key),
Description: tmpl(n.conf.Description),
Details: details,
}
@ -630,8 +631,8 @@ func NewOpsGenie(c *config.OpsGenieConfig, t *template.Template) *OpsGenie {
}
type opsGenieMessage struct {
APIKey string `json:"apiKey"`
Alias model.Fingerprint `json:"alias"`
APIKey string `json:"apiKey"`
Alias string `json:"alias"`
}
type opsGenieCreateMessage struct {
@ -679,7 +680,7 @@ func (n *OpsGenie) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
apiMsg = opsGenieMessage{
APIKey: string(n.conf.APIKey),
Alias: key,
Alias: hashKey(key),
}
alerts = types.Alerts(as...)
)
@ -763,10 +764,10 @@ const (
)
type victorOpsMessage struct {
MessageType string `json:"message_type"`
EntityID model.Fingerprint `json:"entity_id"`
StateMessage string `json:"state_message"`
MonitoringTool string `json:"monitoring_tool"`
MessageType string `json:"message_type"`
EntityID string `json:"entity_id"`
StateMessage string `json:"state_message"`
MonitoringTool string `json:"monitoring_tool"`
}
type victorOpsErrorResponse struct {
@ -806,7 +807,7 @@ func (n *VictorOps) Notify(ctx context.Context, as ...*types.Alert) (bool, error
msg := &victorOpsMessage{
MessageType: messageType,
EntityID: key,
EntityID: hashKey(key),
StateMessage: tmpl(n.conf.StateMessage),
MonitoringTool: tmpl(n.conf.MonitoringTool),
}
@ -980,3 +981,11 @@ func (a *loginAuth) Next(fromServer []byte, more bool) ([]byte, error) {
}
return nil, nil
}
// hashKey returns the sha256 for a group key as integrations may have
// maximum length requirements on deduplication keys.
func hashKey(s string) string {
h := sha256.New()
h.Write([]byte(s))
return string(h.Sum(nil))
}

View File

@ -14,7 +14,6 @@
package notify
import (
"encoding/binary"
"fmt"
"sort"
"sync"
@ -79,8 +78,8 @@ func WithReceiverName(ctx context.Context, rcv string) context.Context {
}
// WithGroupKey populates a context with a group key.
func WithGroupKey(ctx context.Context, fp model.Fingerprint) context.Context {
return context.WithValue(ctx, keyGroupKey, fp)
func WithGroupKey(ctx context.Context, s string) context.Context {
return context.WithValue(ctx, keyGroupKey, s)
}
// WithFiringAlerts populates a context with a slice of firing alerts.
@ -132,8 +131,8 @@ func receiverName(ctx context.Context) string {
// GroupKey extracts a group key from the context. Iff none exists, the
// second argument is false.
func GroupKey(ctx context.Context) (model.Fingerprint, bool) {
v, ok := ctx.Value(keyGroupKey).(model.Fingerprint)
func GroupKey(ctx context.Context) (string, bool) {
v, ok := ctx.Value(keyGroupKey).(string)
return v, ok
}
@ -434,23 +433,25 @@ func putHashBuffer(b []byte) {
func hashAlert(a *types.Alert) uint64 {
const sep = '\xff'
b := getHashBuffer()
labelNames := make(model.LabelNames, 0, len(a.Labels))
defer putHashBuffer(b)
for labelName, _ := range a.Labels {
labelNames = append(labelNames, labelName)
names := make(model.LabelNames, 0, len(a.Labels))
for ln, _ := range a.Labels {
names = append(names, ln)
}
sort.Sort(labelNames)
sort.Sort(names)
for _, labelName := range labelNames {
b = append(b, string(labelName)...)
for _, ln := range names {
b = append(b, string(ln)...)
b = append(b, sep)
b = append(b, string(a.Labels[labelName])...)
b = append(b, string(a.Labels[ln])...)
b = append(b, sep)
}
hash := xxhash.Sum64(b)
putHashBuffer(b)
return hash
}
@ -485,13 +486,10 @@ func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint
// Exec implements the Stage interface.
func (n *DedupStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
// TODO(fabxc): GroupKey will turn into []byte eventually.
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, fmt.Errorf("group key missing")
}
gkeyb := make([]byte, 8)
binary.BigEndian.PutUint64(gkeyb, uint64(gkey))
repeatInterval, ok := RepeatInterval(ctx)
if !ok {
@ -518,7 +516,7 @@ func (n *DedupStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.
ctx = WithFiringAlerts(ctx, firing)
ctx = WithResolvedAlerts(ctx, resolved)
entries, err := n.nflog.Query(nflog.QGroupKey(gkeyb), nflog.QReceiver(n.recv))
entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv))
if err != nil && err != nflog.ErrNotFound {
return ctx, nil, err
@ -622,8 +620,6 @@ func (n SetNotifiesStage) Exec(ctx context.Context, alerts ...*types.Alert) (con
if !ok {
return ctx, nil, fmt.Errorf("group key missing")
}
gkeyb := make([]byte, 8)
binary.BigEndian.PutUint64(gkeyb, uint64(gkey))
firing, ok := FiringAlerts(ctx)
if !ok {
@ -635,5 +631,5 @@ func (n SetNotifiesStage) Exec(ctx context.Context, alerts ...*types.Alert) (con
return ctx, nil, fmt.Errorf("resolved alerts missing")
}
return ctx, alerts, n.nflog.Log(n.recv, gkeyb, firing, resolved)
return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved)
}

View File

@ -55,14 +55,14 @@ type testNflog struct {
qres []*nflogpb.Entry
qerr error
logFunc func(r *nflogpb.Receiver, gkey []byte, firingAlerts, resolvedAlerts []uint64) error
logFunc func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error
}
func (l *testNflog) Query(p ...nflog.QueryParam) ([]*nflogpb.Entry, error) {
return l.qres, l.qerr
}
func (l *testNflog) Log(r *nflogpb.Receiver, gkey []byte, firingAlerts, resolvedAlerts []uint64) error {
func (l *testNflog) Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
return l.logFunc(r, gkey, firingAlerts, resolvedAlerts)
}
@ -171,7 +171,7 @@ func TestDedupStage(t *testing.T) {
_, _, err := s.Exec(ctx)
require.EqualError(t, err, "group key missing")
ctx = WithGroupKey(ctx, 1)
ctx = WithGroupKey(ctx, "1")
_, _, err = s.Exec(ctx)
require.EqualError(t, err, "repeat interval missing")
@ -384,7 +384,7 @@ func TestSetNotifiesStage(t *testing.T) {
require.Nil(t, res)
require.NotNil(t, resctx)
ctx = WithGroupKey(ctx, 1)
ctx = WithGroupKey(ctx, "1")
resctx, res, err = s.Exec(ctx, alerts...)
require.EqualError(t, err, "firing alerts missing")
@ -400,9 +400,9 @@ func TestSetNotifiesStage(t *testing.T) {
ctx = WithResolvedAlerts(ctx, []uint64{})
tnflog.logFunc = func(r *nflogpb.Receiver, gkey []byte, firingAlerts, resolvedAlerts []uint64) error {
tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
require.Equal(t, s.recv, r)
require.Equal(t, []byte{0, 0, 0, 0, 0, 0, 0, 1}, gkey)
require.Equal(t, "1", gkey)
require.Equal(t, []uint64{0, 1, 2}, firingAlerts)
require.Equal(t, []uint64{}, resolvedAlerts)
return nil
@ -415,9 +415,9 @@ func TestSetNotifiesStage(t *testing.T) {
ctx = WithFiringAlerts(ctx, []uint64{})
ctx = WithResolvedAlerts(ctx, []uint64{0, 1, 2})
tnflog.logFunc = func(r *nflogpb.Receiver, gkey []byte, firingAlerts, resolvedAlerts []uint64) error {
tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
require.Equal(t, s.recv, r)
require.Equal(t, []byte{0, 0, 0, 0, 0, 0, 0, 1}, gkey)
require.Equal(t, "1", gkey)
require.Equal(t, []uint64{}, firingAlerts)
require.Equal(t, []uint64{0, 1, 2}, resolvedAlerts)
return nil

View File

@ -18,6 +18,8 @@ import (
"regexp"
"sort"
"bytes"
"github.com/prometheus/common/model"
)
@ -44,9 +46,24 @@ func (m *Matcher) Init() error {
func (m *Matcher) String() string {
if m.IsRegex {
return fmt.Sprintf("<RegexMatcher %s:%q>", m.Name, m.Value)
return fmt.Sprintf("%s=~%q", m.Name, m.Value)
}
return fmt.Sprintf("<Matcher %s:%q>", m.Name, m.Value)
return fmt.Sprintf("%s=%q", m.Name, m.Value)
}
// Validate returns true iff all fields of the matcher have valid values.
func (m *Matcher) Validate() error {
if !model.LabelName(m.Name).IsValid() {
return fmt.Errorf("invalid name %q", m.Name)
}
if m.IsRegex {
if _, err := regexp.Compile(m.Value); err != nil {
return fmt.Errorf("invalid regular expression %q", m.Value)
}
} else if !model.LabelValue(m.Value).IsValid() || len(m.Value) == 0 {
return fmt.Errorf("invalid value %q", m.Value)
}
return nil
}
// Match checks whether the label of the matcher has the specified
@ -139,28 +156,17 @@ func (ms Matchers) Match(lset model.LabelSet) bool {
return true
}
// Validate returns true iff all fields of the matcher have valid values.
func (m *Matcher) Validate() error {
if !model.LabelName(m.Name).IsValid() {
return fmt.Errorf("invalid name %q", m.Name)
}
if m.IsRegex {
if _, err := regexp.Compile(m.Value); err != nil {
return fmt.Errorf("invalid regular expression %q", m.Value)
func (ms Matchers) String() string {
var buf bytes.Buffer
buf.WriteByte('{')
for i, m := range ms {
if i > 0 {
buf.WriteByte(',')
}
} else if !model.LabelValue(m.Value).IsValid() || len(m.Value) == 0 {
return fmt.Errorf("invalid value %q", m.Value)
buf.WriteString(m.String())
}
return nil
}
// Fingerprint returns a quasi-unique fingerprint for the matchers.
func (ms Matchers) Fingerprint() model.Fingerprint {
lset := make(model.LabelSet, 3*len(ms))
for _, m := range ms {
lset[model.LabelName(fmt.Sprintf("%s-%s-%v", m.Name, m.Value, m.IsRegex))] = ""
}
return lset.Fingerprint()
buf.WriteByte('}')
return buf.String()
}