*: switch group key to matcher serialization

Turn the GroupKey into a string that is composed of the matchers if the
path in the routing tree and the grouping labels.
Only hash it at the very end to ensure we don't exceed size limits of
integration APIs.
This commit is contained in:
Fabian Reinartz 2017-04-21 11:43:12 +02:00
parent cb8729a458
commit 3269bc39e1
10 changed files with 130 additions and 129 deletions

View File

@ -88,7 +88,7 @@ type APIAlert struct {
// AlertGroup is a list of alert blocks grouped by the same label set.
type AlertGroup struct {
Labels model.LabelSet `json:"labels"`
GroupKey uint64 `json:"groupKey"`
GroupKey string `json:"groupKey"`
Blocks []*AlertBlock `json:"blocks"`
}
@ -256,7 +256,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
// If the group does not exist, create it.
ag, ok := groups[fp]
if !ok {
ag = newAggrGroup(d.ctx, group, &route.RouteOpts, d.timeout)
ag = newAggrGroup(d.ctx, group, route, d.timeout)
groups[fp] = ag
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
@ -275,10 +275,10 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
// common set of routing options applies.
// It emits notifications in the specified intervals.
type aggrGroup struct {
labels model.LabelSet
opts *RouteOpts
routeFP model.Fingerprint
log log.Logger
labels model.LabelSet
opts *RouteOpts
log log.Logger
routeKey string
ctx context.Context
cancel func()
@ -292,15 +292,16 @@ type aggrGroup struct {
}
// newAggrGroup returns a new aggregation group.
func newAggrGroup(ctx context.Context, labels model.LabelSet, opts *RouteOpts, to func(time.Duration) time.Duration) *aggrGroup {
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration) *aggrGroup {
if to == nil {
to = func(d time.Duration) time.Duration { return d }
}
ag := &aggrGroup{
labels: labels,
opts: opts,
timeout: to,
alerts: map[model.Fingerprint]*types.Alert{},
labels: labels,
routeKey: r.Key(),
opts: &r.RouteOpts,
timeout: to,
alerts: map[model.Fingerprint]*types.Alert{},
}
ag.ctx, ag.cancel = context.WithCancel(ctx)
@ -313,8 +314,16 @@ func newAggrGroup(ctx context.Context, labels model.LabelSet, opts *RouteOpts, t
return ag
}
func (ag *aggrGroup) fingerprint() model.Fingerprint {
return ag.labels.Fingerprint()
}
func (ag *aggrGroup) GroupKey() string {
return fmt.Sprintf("%s:%s", ag.routeKey, ag.labels)
}
func (ag *aggrGroup) String() string {
return fmt.Sprint(ag.fingerprint())
return ag.GroupKey()
}
func (ag *aggrGroup) alertSlice() []*types.Alert {
@ -348,7 +357,7 @@ func (ag *aggrGroup) run(nf notifyFunc) {
ctx = notify.WithNow(ctx, now)
// Populate context with information needed along the pipeline.
ctx = notify.WithGroupKey(ctx, model.Fingerprint(ag.GroupKey()))
ctx = notify.WithGroupKey(ctx, ag.GroupKey())
ctx = notify.WithGroupLabels(ctx, ag.labels)
ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
@ -377,14 +386,6 @@ func (ag *aggrGroup) stop() {
<-ag.done
}
func (ag *aggrGroup) fingerprint() model.Fingerprint {
return ag.labels.Fingerprint()
}
func (ag *aggrGroup) GroupKey() uint64 {
return uint64(ag.labels.Fingerprint() ^ ag.routeFP)
}
// insert inserts the alert into the aggregation group. If the aggregation group
// is empty afterwards, it returns true.
func (ag *aggrGroup) insert(alert *types.Alert) {

View File

@ -100,6 +100,9 @@ func TestAggrGroup(t *testing.T) {
GroupInterval: 300 * time.Millisecond,
RepeatInterval: 1 * time.Hour,
}
route := &Route{
RouteOpts: *opts,
}
var (
a1 = &types.Alert{
@ -173,7 +176,7 @@ func TestAggrGroup(t *testing.T) {
}
// Test regular situation where we wait for group_wait to send out alerts.
ag := newAggrGroup(context.Background(), lset, opts, nil)
ag := newAggrGroup(context.Background(), lset, route, nil)
go ag.run(ntfy)
ag.insert(a1)
@ -221,7 +224,7 @@ func TestAggrGroup(t *testing.T) {
// immediate flushing.
// Finally, set all alerts to be resolved. After successful notify the aggregation group
// should empty itself.
ag = newAggrGroup(context.Background(), lset, opts, nil)
ag = newAggrGroup(context.Background(), lset, route, nil)
go ag.run(ntfy)
ag.insert(a1)

View File

@ -130,8 +130,7 @@ func (r *Route) Match(lset model.LabelSet) []*Route {
}
}
// If no child nodes were matches, the current node itself is
// a match.
// If no child nodes were matches, the current node itself is a match.
if len(all) == 0 {
all = append(all, r)
}
@ -139,32 +138,15 @@ func (r *Route) Match(lset model.LabelSet) []*Route {
return all
}
// SquashMatchers returns the total set of matchers on the path of the tree
// that have to apply to reach the route.
func (r *Route) SquashMatchers() types.Matchers {
var res types.Matchers
res = append(res, r.Matchers...)
// Key returns a key for the route. It does not uniquely identify a the route in general.
func (r *Route) Key() string {
b := make([]byte, 0, 1024)
if r.parent == nil {
return res
if r.parent != nil {
b = append(b, r.parent.Key()...)
b = append(b, '/')
}
pm := r.parent.SquashMatchers()
res = append(pm, res...)
return res
}
// Fingerprint returns a hash of the Route based on its grouping labels,
// routing options and the total set of matchers necessary to reach this route.
func (r *Route) Fingerprint() model.Fingerprint {
lset := make(model.LabelSet, len(r.RouteOpts.GroupBy))
for ln := range r.RouteOpts.GroupBy {
lset[ln] = ""
}
return r.SquashMatchers().Fingerprint() ^ lset.Fingerprint()
return string(append(b, r.Matchers.String()...))
}
// RouteOpts holds various routing options necessary for processing alerts

View File

@ -43,7 +43,7 @@ type Log interface {
// The Log* methods store a notification log entry for
// a fully qualified receiver and a given IDs identifying the
// alert object.
Log(r *pb.Receiver, key []byte, firing, resolved []uint64) error
Log(r *pb.Receiver, key string, firing, resolved []uint64) error
// Query the log along the given Paramteres.
//
@ -67,7 +67,7 @@ type Log interface {
// group or a given time interval.
type query struct {
recv *pb.Receiver
groupKey []byte
groupKey string
}
// QueryParam is a function that modifies a query to incorporate
@ -84,7 +84,7 @@ func QReceiver(r *pb.Receiver) QueryParam {
}
// QGroupKey adds a group key as querying argument.
func QGroupKey(gk []byte) QueryParam {
func QGroupKey(gk string) QueryParam {
return func(q *query) error {
q.groupKey = gk
return nil
@ -323,13 +323,17 @@ Loop:
}
}
// stateKey returns a string key for a log entry consisting of the group key
// and receiver.
func stateKey(k []byte, r *pb.Receiver) string {
return fmt.Sprintf("%s:%s", k, r)
func receiverKey(r *pb.Receiver) string {
return fmt.Sprintf("%s/%s/%d", r.GroupName, r.Integration, r.Idx)
}
func (l *nlog) Log(r *pb.Receiver, gkey []byte, firingAlerts, resolvedAlerts []uint64) error {
// stateKey returns a string key for a log entry consisting of the group key
// and receiver.
func stateKey(k string, r *pb.Receiver) string {
return fmt.Sprintf("%s:%s", k, receiverKey(r))
}
func (l *nlog) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
// Write all st with the same timestamp.
now := l.now()
key := stateKey(gkey, r)
@ -348,7 +352,7 @@ func (l *nlog) Log(r *pb.Receiver, gkey []byte, firingAlerts, resolvedAlerts []u
e := &pb.MeshEntry{
Entry: &pb.Entry{
Receiver: r,
GroupKey: gkey,
GroupKey: []byte(gkey),
Timestamp: now,
FiringAlerts: firingAlerts,
ResolvedAlerts: resolvedAlerts,
@ -401,7 +405,7 @@ func (l *nlog) Query(params ...QueryParam) ([]*pb.Entry, error) {
}
// TODO(fabxc): For now our only query mode is the most recent entry for a
// receiver/group_key combination.
if q.recv == nil || q.groupKey == nil {
if q.recv == nil || q.groupKey == "" {
// TODO(fabxc): allow more complex queries in the future.
// How to enable pagination?
return nil, errors.New("no query parameters specified")
@ -437,7 +441,7 @@ func (l *nlog) loadSnapshot(r io.Reader) error {
}
return err
}
st[stateKey(e.Entry.GroupKey, e.Entry.Receiver)] = &e
st[stateKey(string(e.Entry.GroupKey), e.Entry.Receiver)] = &e
}
l.st = st
@ -523,7 +527,7 @@ func decodeGossipData(msg []byte) (gossipData, error) {
}
return gd, err
}
gd[stateKey(e.Entry.GroupKey, e.Entry.Receiver)] = &e
gd[stateKey(string(e.Entry.GroupKey), e.Entry.Receiver)] = &e
}
return gd, nil

View File

@ -103,7 +103,7 @@ func TestNlogSnapshot(t *testing.T) {
}
// Setup internal state manually.
for _, e := range c.entries {
l1.st[stateKey(e.Entry.GroupKey, e.Entry.Receiver)] = e
l1.st[stateKey(string(e.Entry.GroupKey), e.Entry.Receiver)] = e
}
_, err = l1.Snapshot(f)
require.NoError(t, err, "creating snapshot failed")
@ -253,7 +253,7 @@ func TestGossipDataCoding(t *testing.T) {
// Create gossip data from input.
in := gossipData{}
for _, e := range c.entries {
in[stateKey(e.Entry.GroupKey, e.Entry.Receiver)] = e
in[stateKey(string(e.Entry.GroupKey), e.Entry.Receiver)] = e
}
msg := in.Encode()
require.Equal(t, 1, len(msg), "expected single message for input")

View File

@ -23,7 +23,7 @@ message Receiver {
// Entry holds information about a successful notification
// sent to a receiver.
message Entry {
// The key identifying the dispatching group.
// The key identifying the dispatching group.
bytes group_key = 1;
// The receiver that was notified.
Receiver receiver = 2;

View File

@ -15,6 +15,7 @@ package notify
import (
"bytes"
"crypto/sha256"
"crypto/tls"
"encoding/json"
"errors"
@ -152,7 +153,7 @@ type WebhookMessage struct {
// The protocol version.
Version string `json:"version"`
GroupKey uint64 `json:"groupKey"`
GroupKey string `json:"groupKey"`
}
// Notify implements the Notifier interface.
@ -165,9 +166,9 @@ func (w *Webhook) Notify(ctx context.Context, alerts ...*types.Alert) (bool, err
}
msg := &WebhookMessage{
Version: "3",
Version: "4",
Data: data,
GroupKey: uint64(groupKey),
GroupKey: groupKey,
}
var buf bytes.Buffer
@ -372,7 +373,7 @@ const (
type pagerDutyMessage struct {
ServiceKey string `json:"service_key"`
IncidentKey model.Fingerprint `json:"incident_key"`
IncidentKey string `json:"incident_key"`
EventType string `json:"event_type"`
Description string `json:"description"`
Client string `json:"client,omitempty"`
@ -410,7 +411,7 @@ func (n *PagerDuty) Notify(ctx context.Context, as ...*types.Alert) (bool, error
msg := &pagerDutyMessage{
ServiceKey: tmpl(string(n.conf.ServiceKey)),
EventType: eventType,
IncidentKey: key,
IncidentKey: hashKey(key),
Description: tmpl(n.conf.Description),
Details: details,
}
@ -630,8 +631,8 @@ func NewOpsGenie(c *config.OpsGenieConfig, t *template.Template) *OpsGenie {
}
type opsGenieMessage struct {
APIKey string `json:"apiKey"`
Alias model.Fingerprint `json:"alias"`
APIKey string `json:"apiKey"`
Alias string `json:"alias"`
}
type opsGenieCreateMessage struct {
@ -679,7 +680,7 @@ func (n *OpsGenie) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
apiMsg = opsGenieMessage{
APIKey: string(n.conf.APIKey),
Alias: key,
Alias: hashKey(key),
}
alerts = types.Alerts(as...)
)
@ -763,10 +764,10 @@ const (
)
type victorOpsMessage struct {
MessageType string `json:"message_type"`
EntityID model.Fingerprint `json:"entity_id"`
StateMessage string `json:"state_message"`
MonitoringTool string `json:"monitoring_tool"`
MessageType string `json:"message_type"`
EntityID string `json:"entity_id"`
StateMessage string `json:"state_message"`
MonitoringTool string `json:"monitoring_tool"`
}
type victorOpsErrorResponse struct {
@ -806,7 +807,7 @@ func (n *VictorOps) Notify(ctx context.Context, as ...*types.Alert) (bool, error
msg := &victorOpsMessage{
MessageType: messageType,
EntityID: key,
EntityID: hashKey(key),
StateMessage: tmpl(n.conf.StateMessage),
MonitoringTool: tmpl(n.conf.MonitoringTool),
}
@ -980,3 +981,11 @@ func (a *loginAuth) Next(fromServer []byte, more bool) ([]byte, error) {
}
return nil, nil
}
// hashKey returns the sha256 for a group key as integrations may have
// maximum length requirements on deduplication keys.
func hashKey(s string) string {
h := sha256.New()
h.Write([]byte(s))
return string(h.Sum(nil))
}

View File

@ -14,7 +14,6 @@
package notify
import (
"encoding/binary"
"fmt"
"sort"
"sync"
@ -79,8 +78,8 @@ func WithReceiverName(ctx context.Context, rcv string) context.Context {
}
// WithGroupKey populates a context with a group key.
func WithGroupKey(ctx context.Context, fp model.Fingerprint) context.Context {
return context.WithValue(ctx, keyGroupKey, fp)
func WithGroupKey(ctx context.Context, s string) context.Context {
return context.WithValue(ctx, keyGroupKey, s)
}
// WithFiringAlerts populates a context with a slice of firing alerts.
@ -132,8 +131,8 @@ func receiverName(ctx context.Context) string {
// GroupKey extracts a group key from the context. Iff none exists, the
// second argument is false.
func GroupKey(ctx context.Context) (model.Fingerprint, bool) {
v, ok := ctx.Value(keyGroupKey).(model.Fingerprint)
func GroupKey(ctx context.Context) (string, bool) {
v, ok := ctx.Value(keyGroupKey).(string)
return v, ok
}
@ -434,23 +433,25 @@ func putHashBuffer(b []byte) {
func hashAlert(a *types.Alert) uint64 {
const sep = '\xff'
b := getHashBuffer()
labelNames := make(model.LabelNames, 0, len(a.Labels))
defer putHashBuffer(b)
for labelName, _ := range a.Labels {
labelNames = append(labelNames, labelName)
names := make(model.LabelNames, 0, len(a.Labels))
for ln, _ := range a.Labels {
names = append(names, ln)
}
sort.Sort(labelNames)
sort.Sort(names)
for _, labelName := range labelNames {
b = append(b, string(labelName)...)
for _, ln := range names {
b = append(b, string(ln)...)
b = append(b, sep)
b = append(b, string(a.Labels[labelName])...)
b = append(b, string(a.Labels[ln])...)
b = append(b, sep)
}
hash := xxhash.Sum64(b)
putHashBuffer(b)
return hash
}
@ -485,13 +486,10 @@ func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint
// Exec implements the Stage interface.
func (n *DedupStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
// TODO(fabxc): GroupKey will turn into []byte eventually.
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, fmt.Errorf("group key missing")
}
gkeyb := make([]byte, 8)
binary.BigEndian.PutUint64(gkeyb, uint64(gkey))
repeatInterval, ok := RepeatInterval(ctx)
if !ok {
@ -518,7 +516,7 @@ func (n *DedupStage) Exec(ctx context.Context, alerts ...*types.Alert) (context.
ctx = WithFiringAlerts(ctx, firing)
ctx = WithResolvedAlerts(ctx, resolved)
entries, err := n.nflog.Query(nflog.QGroupKey(gkeyb), nflog.QReceiver(n.recv))
entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv))
if err != nil && err != nflog.ErrNotFound {
return ctx, nil, err
@ -622,8 +620,6 @@ func (n SetNotifiesStage) Exec(ctx context.Context, alerts ...*types.Alert) (con
if !ok {
return ctx, nil, fmt.Errorf("group key missing")
}
gkeyb := make([]byte, 8)
binary.BigEndian.PutUint64(gkeyb, uint64(gkey))
firing, ok := FiringAlerts(ctx)
if !ok {
@ -635,5 +631,5 @@ func (n SetNotifiesStage) Exec(ctx context.Context, alerts ...*types.Alert) (con
return ctx, nil, fmt.Errorf("resolved alerts missing")
}
return ctx, alerts, n.nflog.Log(n.recv, gkeyb, firing, resolved)
return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved)
}

View File

@ -55,14 +55,14 @@ type testNflog struct {
qres []*nflogpb.Entry
qerr error
logFunc func(r *nflogpb.Receiver, gkey []byte, firingAlerts, resolvedAlerts []uint64) error
logFunc func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error
}
func (l *testNflog) Query(p ...nflog.QueryParam) ([]*nflogpb.Entry, error) {
return l.qres, l.qerr
}
func (l *testNflog) Log(r *nflogpb.Receiver, gkey []byte, firingAlerts, resolvedAlerts []uint64) error {
func (l *testNflog) Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
return l.logFunc(r, gkey, firingAlerts, resolvedAlerts)
}
@ -171,7 +171,7 @@ func TestDedupStage(t *testing.T) {
_, _, err := s.Exec(ctx)
require.EqualError(t, err, "group key missing")
ctx = WithGroupKey(ctx, 1)
ctx = WithGroupKey(ctx, "1")
_, _, err = s.Exec(ctx)
require.EqualError(t, err, "repeat interval missing")
@ -384,7 +384,7 @@ func TestSetNotifiesStage(t *testing.T) {
require.Nil(t, res)
require.NotNil(t, resctx)
ctx = WithGroupKey(ctx, 1)
ctx = WithGroupKey(ctx, "1")
resctx, res, err = s.Exec(ctx, alerts...)
require.EqualError(t, err, "firing alerts missing")
@ -400,9 +400,9 @@ func TestSetNotifiesStage(t *testing.T) {
ctx = WithResolvedAlerts(ctx, []uint64{})
tnflog.logFunc = func(r *nflogpb.Receiver, gkey []byte, firingAlerts, resolvedAlerts []uint64) error {
tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
require.Equal(t, s.recv, r)
require.Equal(t, []byte{0, 0, 0, 0, 0, 0, 0, 1}, gkey)
require.Equal(t, "1", gkey)
require.Equal(t, []uint64{0, 1, 2}, firingAlerts)
require.Equal(t, []uint64{}, resolvedAlerts)
return nil
@ -415,9 +415,9 @@ func TestSetNotifiesStage(t *testing.T) {
ctx = WithFiringAlerts(ctx, []uint64{})
ctx = WithResolvedAlerts(ctx, []uint64{0, 1, 2})
tnflog.logFunc = func(r *nflogpb.Receiver, gkey []byte, firingAlerts, resolvedAlerts []uint64) error {
tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64) error {
require.Equal(t, s.recv, r)
require.Equal(t, []byte{0, 0, 0, 0, 0, 0, 0, 1}, gkey)
require.Equal(t, "1", gkey)
require.Equal(t, []uint64{}, firingAlerts)
require.Equal(t, []uint64{0, 1, 2}, resolvedAlerts)
return nil

View File

@ -18,6 +18,8 @@ import (
"regexp"
"sort"
"bytes"
"github.com/prometheus/common/model"
)
@ -44,9 +46,24 @@ func (m *Matcher) Init() error {
func (m *Matcher) String() string {
if m.IsRegex {
return fmt.Sprintf("<RegexMatcher %s:%q>", m.Name, m.Value)
return fmt.Sprintf("%s=~%q", m.Name, m.Value)
}
return fmt.Sprintf("<Matcher %s:%q>", m.Name, m.Value)
return fmt.Sprintf("%s=%q", m.Name, m.Value)
}
// Validate returns true iff all fields of the matcher have valid values.
func (m *Matcher) Validate() error {
if !model.LabelName(m.Name).IsValid() {
return fmt.Errorf("invalid name %q", m.Name)
}
if m.IsRegex {
if _, err := regexp.Compile(m.Value); err != nil {
return fmt.Errorf("invalid regular expression %q", m.Value)
}
} else if !model.LabelValue(m.Value).IsValid() || len(m.Value) == 0 {
return fmt.Errorf("invalid value %q", m.Value)
}
return nil
}
// Match checks whether the label of the matcher has the specified
@ -139,28 +156,17 @@ func (ms Matchers) Match(lset model.LabelSet) bool {
return true
}
// Validate returns true iff all fields of the matcher have valid values.
func (m *Matcher) Validate() error {
if !model.LabelName(m.Name).IsValid() {
return fmt.Errorf("invalid name %q", m.Name)
}
if m.IsRegex {
if _, err := regexp.Compile(m.Value); err != nil {
return fmt.Errorf("invalid regular expression %q", m.Value)
func (ms Matchers) String() string {
var buf bytes.Buffer
buf.WriteByte('{')
for i, m := range ms {
if i > 0 {
buf.WriteByte(',')
}
} else if !model.LabelValue(m.Value).IsValid() || len(m.Value) == 0 {
return fmt.Errorf("invalid value %q", m.Value)
buf.WriteString(m.String())
}
return nil
}
// Fingerprint returns a quasi-unique fingerprint for the matchers.
func (ms Matchers) Fingerprint() model.Fingerprint {
lset := make(model.LabelSet, 3*len(ms))
for _, m := range ms {
lset[model.LabelName(fmt.Sprintf("%s-%s-%v", m.Name, m.Value, m.IsRegex))] = ""
}
return lset.Fingerprint()
buf.WriteByte('}')
return buf.String()
}