nflog: add notification log package

This adds a new nflog package meant to replace provider.Notifies. It
has a central protobuf type package, which is also meant for usage for
other packages and the API.
The generated Go types are also the in-memory representation.
This commit is contained in:
Fabian Reinartz 2016-08-11 11:33:59 +02:00
parent cbdcf0a983
commit 3d8e60ded7
5 changed files with 477 additions and 0 deletions

293
nflog/nflog.go Normal file
View File

@ -0,0 +1,293 @@
package nflog
import (
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/matttproud/golang_protobuf_extensions/pbutil"
pb "github.com/prometheus/alertmanager/nflog/nflogpb"
"github.com/weaveworks/mesh"
)
var ErrNotFound = errors.New("not found")
// Log stores and serves information about notifications
// about byte-slice addressed alert objects to different receivers.
type Log interface {
// The Log* methods store a notification log entry for
// a fully qualified receiver and a given IDs identifying the
// alert object.
LogActive(r *pb.Receiver, key, hash []byte) error
LogResolved(r *pb.Receiver, key, hash []byte) error
// Query the log along the given Paramteres.
//
// TODO(fabxc):
// - extend the interface by a `QueryOne` method?
// - return an iterator rather than a materialized list?
Query(p ...QueryParam) ([]*pb.Entry, error)
// Delete log entries along the given Parameters. Returns
// the number of deleted entries.
Delete(p ...DeleteParam) (int, error)
// Snapshot the current log state and return the number
// of bytes written.
Snapshot(w io.Writer) (int, error)
}
// query currently allows filtering by and/or receiver group key.
// It is configured via QueryParameter functions.
//
// TODO(fabxc): Future versions could allow querying a certain receiver
// group or a given time interval.
type query struct {
recv *pb.Receiver
groupKey []byte
}
// QueryParam is a function that modifies a query to incorporate
// a set of parameters. Returns an error for invalid or conflicting
// parameters.
type QueryParam func(*query) error
// QReceiver adds a receiver parameter to a query.
func QReceiver(r *pb.Receiver) QueryParam {
return func(q *query) error {
q.recv = r
return nil
}
}
// QGroupKey adds a group key as querying argument.
func QGroupKey(gk []byte) QueryParam {
return func(q *query) error {
q.groupKey = gk
return nil
}
}
// delQuery holds parameters for a deletion query.
// TODO(fabxc): can this be consolidated with regular QueryParams?
type delQuery struct {
// Delete log entries that are expired. Does NOT delete
// unexpired entries if set to false.
expired bool
}
// DeleteParam is a function that modifies parameters of a delete request.
// Returns an error for invalid of conflicting parameters.
type DeleteParam func(*delQuery) error
// DExpired adds a parameter to delete expired log entries.
func DExpired() DeleteParam {
return func(d *delQuery) error {
d.expired = true
return nil
}
}
type nlog struct {
retention time.Duration
now func() time.Time
mtx sync.RWMutex
// For now we only store the most recently added log entry.
// The key is a serialized concatenation of group key and receiver.
entries map[string]*pb.MeshEntry
}
// Option configures a new Log implementation.
type Option func(*nlog) error
// WithMesh registers the log with a mesh network with which
// the log state will be shared.
func WithMesh(mr *mesh.Router) Option {
return func(l *nlog) error {
panic("not implemented")
}
}
// WithRetention sets the retention time for log entries.
func WithRetention(d time.Duration) Option {
return func(l *nlog) error {
l.retention = d
return nil
}
}
// WithNow overwrites the function used to retrieve a timestamp
// for the current point in time.
// This is generally useful for injection during tests.
func WithNow(f func() time.Time) Option {
return func(l *nlog) error {
l.now = f
return nil
}
}
// New creates a new notification log based on the provided options.
// The snapshot is loaded into the Log if it is set.
func New(snapshot io.Reader, opts ...Option) (Log, error) {
l := &nlog{
now: time.Now,
entries: map[string]*pb.MeshEntry{},
}
for _, o := range opts {
if err := o(l); err != nil {
return nil, err
}
}
if snapshot != nil {
if err := l.loadSnapshot(snapshot); err != nil {
return l, err
}
}
return l, nil
}
// LogActive implements the Log interface.
func (l *nlog) LogActive(r *pb.Receiver, key, hash []byte) error {
return l.log(r, key, hash, false)
}
// LogResolved implements the Log interface.
func (l *nlog) LogResolved(r *pb.Receiver, key, hash []byte) error {
return l.log(r, key, hash, true)
}
func (l *nlog) log(r *pb.Receiver, gkey, ghash []byte, resolved bool) error {
// Write all entries with the same timestamp.
now := l.now()
key := fmt.Sprintf("%s:%s", r, gkey)
l.mtx.Lock()
defer l.mtx.Unlock()
if prevle, ok := l.entries[key]; ok {
// Entry already exists, only overwrite if timestamp is newer.
// This may with raciness or clock-drift across AM nodes.
prevts, err := ptypes.Timestamp(prevle.Entry.Timestamp)
if err != nil {
return err
}
if prevts.After(now) {
return nil
}
}
ts, err := ptypes.TimestampProto(now)
if err != nil {
return err
}
expts, err := ptypes.TimestampProto(now.Add(l.retention))
if err != nil {
return err
}
l.entries[key] = &pb.MeshEntry{
Entry: &pb.Entry{
Receiver: r,
GroupKey: gkey,
GroupHash: ghash,
Resolved: resolved,
Timestamp: ts,
},
ExpiresAt: expts,
}
return nil
}
// Delete implements the Log interface.
func (l *nlog) Delete(params ...DeleteParam) (int, error) {
l.mtx.Lock()
defer l.mtx.Unlock()
var del delQuery
for _, p := range params {
if err := p(&del); err != nil {
return 0, err
}
}
if !del.expired {
return 0, errors.New("only expiration deletion supported")
}
now := l.now()
var n int
for k, le := range l.entries {
if ets, err := ptypes.Timestamp(le.ExpiresAt); err != nil {
return n, err
} else if ets.Before(now) {
delete(l.entries, k)
n++
}
}
return n, nil
}
// Query implements the Log interface.
func (l *nlog) Query(params ...QueryParam) ([]*pb.Entry, error) {
q := &query{}
for _, p := range params {
if err := p(q); err != nil {
return nil, err
}
}
// TODO(fabxc): For now our only query mode is the most recent entry for a
// receiver/group_key combination.
if q.recv == nil || q.groupKey == nil {
// TODO(fabxc): allow more complex queries in the future.
// How to enable pagination?
return nil, errors.New("no query parameters specified")
}
l.mtx.RLock()
defer l.mtx.RUnlock()
key := fmt.Sprintf("%s,%s", q.recv, q.groupKey)
if le, ok := l.entries[key]; ok {
return []*pb.Entry{le.Entry}, nil
}
return nil, ErrNotFound
}
func (l *nlog) loadSnapshot(r io.Reader) error {
l.mtx.Lock()
defer l.mtx.Unlock()
for {
var e pb.MeshEntry
if _, err := pbutil.ReadDelimited(r, &e); err != nil {
if err == io.EOF {
break
}
return err
}
key := fmt.Sprintf("%s,%s", r, e.Entry.GroupKey)
l.entries[key] = &e
}
return nil
}
// Snapshot implements the Log interface.
func (l *nlog) Snapshot(w io.Writer) (int, error) {
l.mtx.RLock()
defer l.mtx.RUnlock()
var n int
for _, e := range l.entries {
m, err := pbutil.WriteDelimited(w, e)
if err != nil {
return n + m, err
}
n += m
}
return n, nil
}

1
nflog/nflog_test.go Normal file
View File

@ -0,0 +1 @@
package nflog

141
nflog/nflogpb/nflog.pb.go Normal file
View File

@ -0,0 +1,141 @@
// Code generated by protoc-gen-go.
// source: nflog/nflogpb/nflog.proto
// DO NOT EDIT!
/*
Package nflogpb is a generated protocol buffer package.
It is generated from these files:
nflog/nflogpb/nflog.proto
It has these top-level messages:
Receiver
Entry
MeshEntry
*/
package nflogpb
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import google_protobuf "github.com/golang/protobuf/ptypes/timestamp"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Receiver struct {
// Configured name of the receiver group.
GroupName string `protobuf:"bytes,1,opt,name=group_name,json=groupName" json:"group_name,omitempty"`
// Name of the integration of the receiver.
Integration string `protobuf:"bytes,2,opt,name=integration" json:"integration,omitempty"`
// Index of the receiver with respect to the integration.
// Every integration in a group may have 0..N configurations.
Idx uint32 `protobuf:"varint,3,opt,name=idx" json:"idx,omitempty"`
}
func (m *Receiver) Reset() { *m = Receiver{} }
func (m *Receiver) String() string { return proto.CompactTextString(m) }
func (*Receiver) ProtoMessage() {}
func (*Receiver) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
// Entry holds information about a successful notification
// sent to a receiver.
type Entry struct {
// The key identifying the dispatching group.
GroupKey []byte `protobuf:"bytes,1,opt,name=group_key,json=groupKey,proto3" json:"group_key,omitempty"`
// The receiver that was notified.
Receiver *Receiver `protobuf:"bytes,2,opt,name=receiver" json:"receiver,omitempty"`
// Hash over the state of the group at notification time.
GroupHash []byte `protobuf:"bytes,3,opt,name=group_hash,json=groupHash,proto3" json:"group_hash,omitempty"`
// Whether the notification was about a resolved alert.
Resolved bool `protobuf:"varint,4,opt,name=resolved" json:"resolved,omitempty"`
// Timestamp of the succeeding notification.
Timestamp *google_protobuf.Timestamp `protobuf:"bytes,5,opt,name=timestamp" json:"timestamp,omitempty"`
}
func (m *Entry) Reset() { *m = Entry{} }
func (m *Entry) String() string { return proto.CompactTextString(m) }
func (*Entry) ProtoMessage() {}
func (*Entry) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *Entry) GetReceiver() *Receiver {
if m != nil {
return m.Receiver
}
return nil
}
func (m *Entry) GetTimestamp() *google_protobuf.Timestamp {
if m != nil {
return m.Timestamp
}
return nil
}
// MeshEntry is a wrapper message to communicate a notify log
// entry through a mesh network.
type MeshEntry struct {
// The original raw notify log entry.
Entry *Entry `protobuf:"bytes,1,opt,name=entry" json:"entry,omitempty"`
// A timestamp indicating when the mesh peer should evict
// the log entry from its state.
ExpiresAt *google_protobuf.Timestamp `protobuf:"bytes,2,opt,name=expires_at,json=expiresAt" json:"expires_at,omitempty"`
}
func (m *MeshEntry) Reset() { *m = MeshEntry{} }
func (m *MeshEntry) String() string { return proto.CompactTextString(m) }
func (*MeshEntry) ProtoMessage() {}
func (*MeshEntry) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *MeshEntry) GetEntry() *Entry {
if m != nil {
return m.Entry
}
return nil
}
func (m *MeshEntry) GetExpiresAt() *google_protobuf.Timestamp {
if m != nil {
return m.ExpiresAt
}
return nil
}
func init() {
proto.RegisterType((*Receiver)(nil), "nflogpb.Receiver")
proto.RegisterType((*Entry)(nil), "nflogpb.Entry")
proto.RegisterType((*MeshEntry)(nil), "nflogpb.MeshEntry")
}
func init() { proto.RegisterFile("nflog/nflogpb/nflog.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 299 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x7c, 0x50, 0xc1, 0x4e, 0x83, 0x40,
0x10, 0x4d, 0xad, 0x55, 0x98, 0x56, 0xa3, 0x7b, 0xc2, 0x1a, 0x63, 0xd3, 0x78, 0xf0, 0xe2, 0x36,
0xa9, 0x17, 0x3d, 0x7a, 0x30, 0x31, 0x31, 0x7a, 0xd8, 0x78, 0x35, 0x04, 0xec, 0x14, 0x36, 0x02,
0x4b, 0x96, 0x6d, 0x53, 0xfe, 0xd0, 0xcf, 0x12, 0x66, 0x17, 0xea, 0xc9, 0x0b, 0xcc, 0xbe, 0xf7,
0x32, 0xef, 0xbd, 0x81, 0x8b, 0x62, 0x9d, 0xa9, 0x64, 0x41, 0xdf, 0x32, 0xb6, 0x7f, 0x5e, 0x6a,
0x65, 0x14, 0x3b, 0x76, 0xe0, 0xf4, 0x3a, 0x51, 0x2a, 0xc9, 0x70, 0x41, 0x70, 0xbc, 0x59, 0x2f,
0x8c, 0xcc, 0xb1, 0x32, 0x51, 0x5e, 0x5a, 0xe5, 0xfc, 0x13, 0x3c, 0x81, 0x5f, 0x28, 0xb7, 0xa8,
0xd9, 0x15, 0x40, 0xa2, 0xd5, 0xa6, 0x0c, 0x8b, 0x28, 0xc7, 0x60, 0x30, 0x1b, 0xdc, 0xfa, 0xc2,
0x27, 0xe4, 0xbd, 0x01, 0xd8, 0x0c, 0xc6, 0xb2, 0x30, 0x98, 0xe8, 0xc8, 0x48, 0x55, 0x04, 0x07,
0xc4, 0xff, 0x85, 0xd8, 0x19, 0x0c, 0xe5, 0x6a, 0x17, 0x0c, 0x1b, 0xe6, 0x44, 0xb4, 0xe3, 0xfc,
0x67, 0x00, 0xa3, 0xe7, 0xc2, 0xe8, 0x9a, 0x5d, 0x82, 0x5d, 0x15, 0x7e, 0x63, 0x4d, 0xbb, 0x27,
0xc2, 0x23, 0xe0, 0x15, 0x6b, 0x76, 0x07, 0x9e, 0x76, 0x29, 0x68, 0xef, 0x78, 0x79, 0xce, 0x5d,
0x05, 0xde, 0xc5, 0x13, 0xbd, 0x64, 0x1f, 0x34, 0x8d, 0xaa, 0x94, 0xec, 0x26, 0x2e, 0xe8, 0x4b,
0x03, 0xb0, 0x69, 0xbb, 0xad, 0x52, 0xd9, 0x16, 0x57, 0xc1, 0x61, 0x43, 0x7a, 0xa2, 0x7f, 0xb3,
0x07, 0xf0, 0xfb, 0x13, 0x04, 0x23, 0xb2, 0x9a, 0x72, 0x7b, 0x24, 0xde, 0x1d, 0x89, 0x7f, 0x74,
0x0a, 0xb1, 0x17, 0xcf, 0x33, 0xf0, 0xdf, 0xb0, 0x4a, 0x6d, 0x9b, 0x1b, 0x18, 0x61, 0x3b, 0x50,
0x93, 0xf1, 0xf2, 0xb4, 0x4f, 0x4b, 0xb4, 0xb0, 0x24, 0x7b, 0x04, 0xc0, 0x5d, 0x29, 0x1b, 0xf3,
0x30, 0x32, 0xae, 0xd8, 0xbf, 0x6e, 0x4e, 0xfd, 0x64, 0xe2, 0x23, 0xa2, 0xef, 0x7f, 0x03, 0x00,
0x00, 0xff, 0xff, 0xd7, 0x8e, 0x51, 0xb4, 0xe5, 0x01, 0x00, 0x00,
}

41
nflog/nflogpb/nflog.proto Normal file
View File

@ -0,0 +1,41 @@
syntax = "proto3";
package nflogpb;
import "google/protobuf/timestamp.proto";
message Receiver {
// Configured name of the receiver group.
string group_name = 1;
// Name of the integration of the receiver.
string integration = 2;
// Index of the receiver with respect to the integration.
// Every integration in a group may have 0..N configurations.
uint32 idx = 3;
}
// Entry holds information about a successful notification
// sent to a receiver.
message Entry {
// The key identifying the dispatching group.
bytes group_key = 1;
// The receiver that was notified.
Receiver receiver = 2;
// Hash over the state of the group at notification time.
bytes group_hash = 3;
// Whether the notification was about a resolved alert.
bool resolved = 4;
// Timestamp of the succeeding notification.
google.protobuf.Timestamp timestamp = 5;
}
// MeshEntry is a wrapper message to communicate a notify log
// entry through a mesh network.
message MeshEntry {
// The original raw notify log entry.
Entry entry = 1;
// A timestamp indicating when the mesh peer should evict
// the log entry from its state.
google.protobuf.Timestamp expires_at = 2;
}

1
scripts/genproto.sh Executable file
View File

@ -0,0 +1 @@
protoc --go_out=. nflog/nflogpb/nflog.proto