Add drift, use own proto def
This commit is contained in:
parent
39a97d8d24
commit
b54bca08a2
2
Procfile
2
Procfile
|
@ -1,5 +1,5 @@
|
||||||
a1: ./alertmanager --log.level=debug --storage.path=$TMPDIR/a1 --web.listen-address=:9093 --cluster.listen-address=127.0.0.1:8001 --config.file=examples/ha/alertmanager.yml --cluster.peer-timeout=5s
|
a1: ./alertmanager --log.level=debug --storage.path=$TMPDIR/a1 --web.listen-address=:9093 --cluster.listen-address=127.0.0.1:8001 --config.file=examples/ha/alertmanager.yml --cluster.peer-timeout=5s
|
||||||
a2: ./alertmanager --log.level=debug --storage.path=$TMPDIR/a2 --web.listen-address=:9094 --cluster.listen-address=127.0.0.1:8002 --cluster.peer=127.0.0.1:8001 --config.file=examples/ha/alertmanager.yml --cluster.peer-timeout=5s
|
a2: ./alertmanager --log.level=debug --storage.path=$TMPDIR/a2 --web.listen-address=:9094 --cluster.listen-address=127.0.0.1:8002 --cluster.peer=127.0.0.1:8001 --config.file=examples/ha/alertmanager.yml --cluster.peer-timeout=5s
|
||||||
a3: ./alertmanager --log.level=debug --storage.path=$TMPDIR/a3 --web.listen-address=:9095 --cluster.listen-address=127.0.0.1:8003 --cluster.peer=127.0.0.1:8001 --config.file=examples/ha/alertmanager.yml --cluster.peer-timeout=5s
|
a3: ./alertmanager --log.level=debug --storage.path=$TMPDIR/a3 --web.listen-address=:9095 --cluster.listen-address=127.0.0.1:8003 --cluster.peer=127.0.0.1:8001 --config.file=examples/ha/alertmanager.yml --cluster.peer-timeout=5s --cluster.drift=5s
|
||||||
wh: go run ./examples/webhook/echo.go
|
wh: go run ./examples/webhook/echo.go
|
||||||
|
|
||||||
|
|
|
@ -158,6 +158,8 @@ func main() {
|
||||||
gossipInterval = kingpin.Flag("cluster.gossip-interval", "Interval between sending gossip messages. By lowering this value (more frequent) gossip messages are propagated across the cluster more quickly at the expense of increased bandwidth.").Default(cluster.DefaultGossipInterval.String()).Duration()
|
gossipInterval = kingpin.Flag("cluster.gossip-interval", "Interval between sending gossip messages. By lowering this value (more frequent) gossip messages are propagated across the cluster more quickly at the expense of increased bandwidth.").Default(cluster.DefaultGossipInterval.String()).Duration()
|
||||||
pushPullInterval = kingpin.Flag("cluster.pushpull-interval", "Interval for gossip state syncs. Setting this interval lower (more frequent) will increase convergence speeds across larger clusters at the expense of increased bandwidth usage.").Default(cluster.DefaultPushPullInterval.String()).Duration()
|
pushPullInterval = kingpin.Flag("cluster.pushpull-interval", "Interval for gossip state syncs. Setting this interval lower (more frequent) will increase convergence speeds across larger clusters at the expense of increased bandwidth usage.").Default(cluster.DefaultPushPullInterval.String()).Duration()
|
||||||
settleTimeout = kingpin.Flag("cluster.settle-timeout", "Maximum time to wait for cluster connections to settle before evaluating notifications.").Default(cluster.DefaultPushPullInterval.String()).Duration()
|
settleTimeout = kingpin.Flag("cluster.settle-timeout", "Maximum time to wait for cluster connections to settle before evaluating notifications.").Default(cluster.DefaultPushPullInterval.String()).Duration()
|
||||||
|
triggerCooldown = kingpin.Flag("cluster.synchronize-cooldown", "After a alert pipeline has been triggered, how long to wait before allowing peers to trigger the pipeline again.").Default("5s").Duration()
|
||||||
|
pipelineDrift = kingpin.Flag("cluster.drift", "Interval to be added to pipeline execution after each execution. Accumulative.").Default("0s").Duration()
|
||||||
)
|
)
|
||||||
|
|
||||||
kingpin.Version(version.Print("alertmanager"))
|
kingpin.Version(version.Print("alertmanager"))
|
||||||
|
@ -240,7 +242,11 @@ func main() {
|
||||||
silences.SetBroadcast(c.Broadcast)
|
silences.SetBroadcast(c.Broadcast)
|
||||||
}
|
}
|
||||||
|
|
||||||
triggers := trigger.New()
|
var peerID string
|
||||||
|
if peer != nil {
|
||||||
|
peerID = peer.Name()
|
||||||
|
}
|
||||||
|
triggers := trigger.New(peerID)
|
||||||
if peer != nil {
|
if peer != nil {
|
||||||
c := peer.AddState("trigger", triggers)
|
c := peer.AddState("trigger", triggers)
|
||||||
triggers.SetBroadcast(c.Broadcast)
|
triggers.SetBroadcast(c.Broadcast)
|
||||||
|
@ -355,6 +361,8 @@ func main() {
|
||||||
marker,
|
marker,
|
||||||
timeoutFunc,
|
timeoutFunc,
|
||||||
triggers,
|
triggers,
|
||||||
|
*triggerCooldown,
|
||||||
|
*pipelineDrift,
|
||||||
logger,
|
logger,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,9 @@ type Dispatcher struct {
|
||||||
alerts provider.Alerts
|
alerts provider.Alerts
|
||||||
stage notify.Stage
|
stage notify.Stage
|
||||||
triggers *trigger.Trigger
|
triggers *trigger.Trigger
|
||||||
|
cooldown time.Duration
|
||||||
|
// For testing. Accumulates drift in pipeline execution scheduling.
|
||||||
|
drift time.Duration
|
||||||
|
|
||||||
marker types.Marker
|
marker types.Marker
|
||||||
timeout func(time.Duration) time.Duration
|
timeout func(time.Duration) time.Duration
|
||||||
|
@ -47,6 +50,8 @@ func NewDispatcher(
|
||||||
mk types.Marker,
|
mk types.Marker,
|
||||||
to func(time.Duration) time.Duration,
|
to func(time.Duration) time.Duration,
|
||||||
triggers *trigger.Trigger,
|
triggers *trigger.Trigger,
|
||||||
|
cooldown time.Duration,
|
||||||
|
drift time.Duration,
|
||||||
l log.Logger,
|
l log.Logger,
|
||||||
) *Dispatcher {
|
) *Dispatcher {
|
||||||
disp := &Dispatcher{
|
disp := &Dispatcher{
|
||||||
|
@ -56,6 +61,8 @@ func NewDispatcher(
|
||||||
marker: mk,
|
marker: mk,
|
||||||
timeout: to,
|
timeout: to,
|
||||||
triggers: triggers,
|
triggers: triggers,
|
||||||
|
cooldown: cooldown,
|
||||||
|
drift: drift,
|
||||||
logger: log.With(l, "component", "dispatcher"),
|
logger: log.With(l, "component", "dispatcher"),
|
||||||
}
|
}
|
||||||
return disp
|
return disp
|
||||||
|
@ -261,7 +268,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
|
||||||
// If the group does not exist, create it.
|
// If the group does not exist, create it.
|
||||||
ag, ok := group[fp]
|
ag, ok := group[fp]
|
||||||
if !ok {
|
if !ok {
|
||||||
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.triggers, d.logger)
|
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.triggers, d.cooldown, d.logger)
|
||||||
group[fp] = ag
|
group[fp] = ag
|
||||||
|
|
||||||
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
|
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
|
||||||
|
@ -270,7 +277,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
|
||||||
level.Error(d.logger).Log("msg", "Notify for alerts failed", "num_alerts", len(alerts), "err", err)
|
level.Error(d.logger).Log("msg", "Notify for alerts failed", "num_alerts", len(alerts), "err", err)
|
||||||
}
|
}
|
||||||
return err == nil
|
return err == nil
|
||||||
})
|
}, d.drift)
|
||||||
}
|
}
|
||||||
|
|
||||||
ag.insert(alert)
|
ag.insert(alert)
|
||||||
|
@ -291,6 +298,7 @@ type aggrGroup struct {
|
||||||
next *time.Timer
|
next *time.Timer
|
||||||
timeout func(time.Duration) time.Duration
|
timeout func(time.Duration) time.Duration
|
||||||
triggers *trigger.Trigger
|
triggers *trigger.Trigger
|
||||||
|
cooldown time.Duration
|
||||||
|
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
alerts map[model.Fingerprint]*types.Alert
|
alerts map[model.Fingerprint]*types.Alert
|
||||||
|
@ -305,6 +313,7 @@ func newAggrGroup(
|
||||||
r *Route,
|
r *Route,
|
||||||
to func(time.Duration) time.Duration,
|
to func(time.Duration) time.Duration,
|
||||||
triggers *trigger.Trigger,
|
triggers *trigger.Trigger,
|
||||||
|
cooldown time.Duration,
|
||||||
logger log.Logger,
|
logger log.Logger,
|
||||||
) *aggrGroup {
|
) *aggrGroup {
|
||||||
if to == nil {
|
if to == nil {
|
||||||
|
@ -315,6 +324,7 @@ func newAggrGroup(
|
||||||
routeKey: r.Key(),
|
routeKey: r.Key(),
|
||||||
opts: &r.RouteOpts,
|
opts: &r.RouteOpts,
|
||||||
triggers: triggers,
|
triggers: triggers,
|
||||||
|
cooldown: cooldown,
|
||||||
timeout: to,
|
timeout: to,
|
||||||
alerts: map[model.Fingerprint]*types.Alert{},
|
alerts: map[model.Fingerprint]*types.Alert{},
|
||||||
}
|
}
|
||||||
|
@ -352,7 +362,7 @@ func (ag *aggrGroup) alertSlice() []*types.Alert {
|
||||||
return alerts
|
return alerts
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ag *aggrGroup) run(nf notifyFunc) {
|
func (ag *aggrGroup) run(nf notifyFunc, drift time.Duration) {
|
||||||
ag.done = make(chan struct{})
|
ag.done = make(chan struct{})
|
||||||
// fp is the labels causing this grouping.
|
// fp is the labels causing this grouping.
|
||||||
fp := ag.fingerprint()
|
fp := ag.fingerprint()
|
||||||
|
@ -412,13 +422,18 @@ func (ag *aggrGroup) run(nf notifyFunc) {
|
||||||
|
|
||||||
// Wait the configured interval before calling flush again.
|
// Wait the configured interval before calling flush again.
|
||||||
ag.mtx.Lock()
|
ag.mtx.Lock()
|
||||||
|
|
||||||
|
if drift != 0 {
|
||||||
|
ag.opts.GroupInterval += drift
|
||||||
|
level.Info(ag.logger).Log("msg", "trigger", "action", "drift", "group_interval", ag.opts.GroupInterval)
|
||||||
|
}
|
||||||
ag.next.Reset(ag.opts.GroupInterval)
|
ag.next.Reset(ag.opts.GroupInterval)
|
||||||
ag.hasFlushed = true
|
ag.hasFlushed = true
|
||||||
ag.mtx.Unlock()
|
ag.mtx.Unlock()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
// TODO: How long is the cooldown?
|
// TODO: Set cooldown via commandline option.
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(ag.cooldown)
|
||||||
|
|
||||||
ag.mtx.Lock()
|
ag.mtx.Lock()
|
||||||
ag.triggered = false
|
ag.triggered = false
|
||||||
|
|
|
@ -439,11 +439,14 @@ func NewWaitStage(wait func() time.Duration) *WaitStage {
|
||||||
|
|
||||||
// Exec implements the Stage interface.
|
// Exec implements the Stage interface.
|
||||||
func (ws *WaitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
|
func (ws *WaitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
|
||||||
|
now := time.Now()
|
||||||
select {
|
select {
|
||||||
// TODO: We need to listen here for updates on the mesh, and filter
|
// TODO: We need to listen here for updates on the mesh, and filter
|
||||||
// alerts that have already been sent.
|
// alerts that have already been sent.
|
||||||
case <-time.After(ws.wait()):
|
case <-time.After(ws.wait()):
|
||||||
|
level.Info(l).Log("msg", "trigger", "action", "wait_stage passed", "duration", time.Since(now))
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
level.Info(l).Log("msg", "trigger", "action", "wait_stage canceled", "duration", time.Since(now))
|
||||||
return ctx, nil, ctx.Err()
|
return ctx, nil, ctx.Err()
|
||||||
}
|
}
|
||||||
return ctx, alerts, nil
|
return ctx, alerts, nil
|
||||||
|
@ -644,6 +647,7 @@ func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Ale
|
||||||
select {
|
select {
|
||||||
case <-tick.C:
|
case <-tick.C:
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
level.Info(l).Log("msg", "trigger", "action", "sending")
|
||||||
retry, err := r.integration.Notify(ctx, alerts...)
|
retry, err := r.integration.Notify(ctx, alerts...)
|
||||||
notificationLatencySeconds.WithLabelValues(r.integration.name).Observe(time.Since(now).Seconds())
|
notificationLatencySeconds.WithLabelValues(r.integration.name).Observe(time.Since(now).Seconds())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -16,7 +16,7 @@ package provider
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
pb "github.com/prometheus/alertmanager/nflog/nflogpb"
|
pb "github.com/prometheus/alertmanager/trigger/triggerpb"
|
||||||
"github.com/prometheus/alertmanager/types"
|
"github.com/prometheus/alertmanager/types"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
)
|
)
|
||||||
|
@ -56,7 +56,7 @@ type TriggerIterator interface {
|
||||||
// exhausted. It is not necessary to exhaust the iterator but Close must
|
// exhausted. It is not necessary to exhaust the iterator but Close must
|
||||||
// be called in any case to release resources used by the iterator (even
|
// be called in any case to release resources used by the iterator (even
|
||||||
// if the iterator is exhausted).
|
// if the iterator is exhausted).
|
||||||
Next() <-chan *pb.MeshEntry
|
Next() <-chan *pb.Trigger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAlertIterator returns a new AlertIterator based on the generic alertIterator type
|
// NewAlertIterator returns a new AlertIterator based on the generic alertIterator type
|
||||||
|
|
|
@ -23,7 +23,7 @@ GOGOPROTO_ROOT="${GOPATH}/src/github.com/gogo/protobuf"
|
||||||
GOGOPROTO_PATH="${GOGOPROTO_ROOT}:${GOGOPROTO_ROOT}/protobuf"
|
GOGOPROTO_PATH="${GOGOPROTO_ROOT}:${GOGOPROTO_ROOT}/protobuf"
|
||||||
GRPC_GATEWAY_ROOT="${GOPATH}/src/github.com/grpc-ecosystem/grpc-gateway"
|
GRPC_GATEWAY_ROOT="${GOPATH}/src/github.com/grpc-ecosystem/grpc-gateway"
|
||||||
|
|
||||||
DIRS="nflog/nflogpb silence/silencepb cluster/clusterpb"
|
DIRS="nflog/nflogpb silence/silencepb cluster/clusterpb trigger/triggerpb"
|
||||||
|
|
||||||
for dir in ${DIRS}; do
|
for dir in ${DIRS}; do
|
||||||
pushd ${dir}
|
pushd ${dir}
|
||||||
|
|
|
@ -10,8 +10,8 @@ import (
|
||||||
|
|
||||||
"github.com/matttproud/golang_protobuf_extensions/pbutil"
|
"github.com/matttproud/golang_protobuf_extensions/pbutil"
|
||||||
"github.com/prometheus/alertmanager/nflog"
|
"github.com/prometheus/alertmanager/nflog"
|
||||||
pb "github.com/prometheus/alertmanager/nflog/nflogpb"
|
|
||||||
"github.com/prometheus/alertmanager/provider"
|
"github.com/prometheus/alertmanager/provider"
|
||||||
|
pb "github.com/prometheus/alertmanager/trigger/triggerpb"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -23,17 +23,19 @@ import (
|
||||||
type Trigger struct {
|
type Trigger struct {
|
||||||
st state
|
st state
|
||||||
now func() time.Time
|
now func() time.Time
|
||||||
subscribers map[model.Fingerprint]chan *pb.MeshEntry
|
subscribers map[model.Fingerprint]chan *pb.Trigger
|
||||||
broadcast func([]byte)
|
broadcast func([]byte)
|
||||||
|
peerID string
|
||||||
|
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func New() *Trigger {
|
func New(peerID string) *Trigger {
|
||||||
return &Trigger{
|
return &Trigger{
|
||||||
|
peerID: peerID,
|
||||||
st: state{},
|
st: state{},
|
||||||
now: utcNow,
|
now: utcNow,
|
||||||
subscribers: make(map[model.Fingerprint]chan *pb.MeshEntry),
|
subscribers: make(map[model.Fingerprint]chan *pb.Trigger),
|
||||||
broadcast: func(_ []byte) {},
|
broadcast: func(_ []byte) {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -69,13 +71,13 @@ func (t *Trigger) Merge(b []byte) error {
|
||||||
// TODO: Is there a purpose in storing these? I think we just
|
// TODO: Is there a purpose in storing these? I think we just
|
||||||
// want to send a message and then move on with our lives.
|
// want to send a message and then move on with our lives.
|
||||||
// t.st.merge(e)
|
// t.st.merge(e)
|
||||||
fp, err := model.FingerprintFromString(string(e.Entry.GroupKey))
|
fp, err := model.FingerprintFromString(e.Fingerprint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to parse groupkey to fingerprint %v", err)
|
return fmt.Errorf("failed to parse fingerprint: %v", err)
|
||||||
}
|
}
|
||||||
s, ok := t.subscribers[fp]
|
s, ok := t.subscribers[fp]
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("subscriber for %s does not exist", string(e.Entry.GroupKey))
|
return fmt.Errorf("subscriber for %s does not exist", e.Fingerprint)
|
||||||
}
|
}
|
||||||
s <- e
|
s <- e
|
||||||
}
|
}
|
||||||
|
@ -90,18 +92,13 @@ func (t *Trigger) Trigger(fp model.Fingerprint) error {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
|
|
||||||
e := &pb.MeshEntry{
|
e := &pb.Trigger{
|
||||||
Entry: &pb.Entry{
|
Fingerprint: fp.String(),
|
||||||
Receiver: nil,
|
PeerId: t.peerID,
|
||||||
GroupKey: []byte(fp.String()),
|
Timestamp: now,
|
||||||
Timestamp: now,
|
|
||||||
FiringAlerts: nil,
|
|
||||||
ResolvedAlerts: nil,
|
|
||||||
},
|
|
||||||
ExpiresAt: now,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := marshalMeshEntry(e)
|
b, err := marshalTrigger(e)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -110,7 +107,7 @@ func (t *Trigger) Trigger(fp model.Fingerprint) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func marshalMeshEntry(e *pb.MeshEntry) ([]byte, error) {
|
func marshalTrigger(e *pb.Trigger) ([]byte, error) {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
if _, err := pbutil.WriteDelimited(&buf, e); err != nil {
|
if _, err := pbutil.WriteDelimited(&buf, e); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -128,7 +125,7 @@ func (t *Trigger) SetBroadcast(f func([]byte)) {
|
||||||
// Subscribe returns a channel indicating incoming triggers.
|
// Subscribe returns a channel indicating incoming triggers.
|
||||||
func (t *Trigger) Subscribe(fp model.Fingerprint) provider.TriggerIterator {
|
func (t *Trigger) Subscribe(fp model.Fingerprint) provider.TriggerIterator {
|
||||||
var (
|
var (
|
||||||
ch = make(chan *pb.MeshEntry)
|
ch = make(chan *pb.Trigger)
|
||||||
ctx, cancel = context.WithCancel(context.Background())
|
ctx, cancel = context.WithCancel(context.Background())
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -154,12 +151,12 @@ func (t *Trigger) Subscribe(fp model.Fingerprint) provider.TriggerIterator {
|
||||||
// triggerListener alerts subscribers of a particular labelset when a new
|
// triggerListener alerts subscribers of a particular labelset when a new
|
||||||
// message arrives.
|
// message arrives.
|
||||||
type triggerIterator struct {
|
type triggerIterator struct {
|
||||||
ch chan *pb.MeshEntry
|
ch chan *pb.Trigger
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// Next implements the TriggerIterator interface.
|
// Next implements the TriggerIterator interface.
|
||||||
func (t *triggerIterator) Next() <-chan *pb.MeshEntry {
|
func (t *triggerIterator) Next() <-chan *pb.Trigger {
|
||||||
return t.ch
|
return t.ch
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,28 +171,23 @@ func (t *triggerIterator) Close() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// String is the label fingerprint. Can probably make this "typesafe" later.
|
// String is the label fingerprint. Can probably make this "typesafe" later.
|
||||||
type state map[string]*pb.MeshEntry
|
type state map[string]*pb.Trigger
|
||||||
|
|
||||||
func (s state) merge(e *pb.MeshEntry) {
|
func (s state) merge(e *pb.Trigger) {
|
||||||
k := string(e.Entry.GroupKey)
|
s[e.Fingerprint] = e
|
||||||
|
|
||||||
prev, ok := s[k]
|
|
||||||
if !ok || prev.Entry.Timestamp.Before(e.Entry.Timestamp) {
|
|
||||||
s[k] = e
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func decodeState(r io.Reader) (state, error) {
|
func decodeState(r io.Reader) (state, error) {
|
||||||
t := state{}
|
t := state{}
|
||||||
for {
|
for {
|
||||||
var e pb.MeshEntry
|
var e pb.Trigger
|
||||||
_, err := pbutil.ReadDelimited(r, &e)
|
_, err := pbutil.ReadDelimited(r, &e)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if e.Entry == nil {
|
if e.Fingerprint == "" {
|
||||||
return nil, nflog.ErrInvalidState
|
return nil, nflog.ErrInvalidState
|
||||||
}
|
}
|
||||||
// Create own protobuf def, use fingerprint instead of groupkey
|
// Create own protobuf def, use fingerprint instead of groupkey
|
||||||
t[string(e.Entry.GroupKey)] = &e
|
t[e.Fingerprint] = &e
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
|
|
|
@ -0,0 +1,392 @@
|
||||||
|
// Code generated by protoc-gen-gogo. DO NOT EDIT.
|
||||||
|
// source: trigger.proto
|
||||||
|
|
||||||
|
/*
|
||||||
|
Package triggerpb is a generated protocol buffer package.
|
||||||
|
|
||||||
|
It is generated from these files:
|
||||||
|
trigger.proto
|
||||||
|
|
||||||
|
It has these top-level messages:
|
||||||
|
Trigger
|
||||||
|
*/
|
||||||
|
package triggerpb
|
||||||
|
|
||||||
|
import proto "github.com/gogo/protobuf/proto"
|
||||||
|
import fmt "fmt"
|
||||||
|
import math "math"
|
||||||
|
|
||||||
|
import _ "github.com/gogo/protobuf/gogoproto"
|
||||||
|
|
||||||
|
import time "time"
|
||||||
|
|
||||||
|
import types "github.com/gogo/protobuf/types"
|
||||||
|
|
||||||
|
import io "io"
|
||||||
|
|
||||||
|
// Reference imports to suppress errors if they are not otherwise used.
|
||||||
|
var _ = proto.Marshal
|
||||||
|
var _ = fmt.Errorf
|
||||||
|
var _ = math.Inf
|
||||||
|
var _ = time.Kitchen
|
||||||
|
|
||||||
|
// This is a compile-time assertion to ensure that this generated file
|
||||||
|
// is compatible with the proto package it is being compiled against.
|
||||||
|
// A compilation error at this line likely means your copy of the
|
||||||
|
// proto package needs to be updated.
|
||||||
|
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
|
||||||
|
|
||||||
|
type Trigger struct {
|
||||||
|
// The fingerprint of the pipeline that started execution.
|
||||||
|
Fingerprint string `protobuf:"bytes,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"`
|
||||||
|
// The peer sending the message.
|
||||||
|
PeerId string `protobuf:"bytes,2,opt,name=peer_id,json=peerId,proto3" json:"peer_id,omitempty"`
|
||||||
|
// Timestamp for message.
|
||||||
|
Timestamp time.Time `protobuf:"bytes,3,opt,name=timestamp,stdtime" json:"timestamp"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Trigger) Reset() { *m = Trigger{} }
|
||||||
|
func (m *Trigger) String() string { return proto.CompactTextString(m) }
|
||||||
|
func (*Trigger) ProtoMessage() {}
|
||||||
|
func (*Trigger) Descriptor() ([]byte, []int) { return fileDescriptorTrigger, []int{0} }
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
proto.RegisterType((*Trigger)(nil), "triggerpb.Trigger")
|
||||||
|
}
|
||||||
|
func (m *Trigger) Marshal() (dAtA []byte, err error) {
|
||||||
|
size := m.Size()
|
||||||
|
dAtA = make([]byte, size)
|
||||||
|
n, err := m.MarshalTo(dAtA)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return dAtA[:n], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Trigger) MarshalTo(dAtA []byte) (int, error) {
|
||||||
|
var i int
|
||||||
|
_ = i
|
||||||
|
var l int
|
||||||
|
_ = l
|
||||||
|
if len(m.Fingerprint) > 0 {
|
||||||
|
dAtA[i] = 0xa
|
||||||
|
i++
|
||||||
|
i = encodeVarintTrigger(dAtA, i, uint64(len(m.Fingerprint)))
|
||||||
|
i += copy(dAtA[i:], m.Fingerprint)
|
||||||
|
}
|
||||||
|
if len(m.PeerId) > 0 {
|
||||||
|
dAtA[i] = 0x12
|
||||||
|
i++
|
||||||
|
i = encodeVarintTrigger(dAtA, i, uint64(len(m.PeerId)))
|
||||||
|
i += copy(dAtA[i:], m.PeerId)
|
||||||
|
}
|
||||||
|
dAtA[i] = 0x1a
|
||||||
|
i++
|
||||||
|
i = encodeVarintTrigger(dAtA, i, uint64(types.SizeOfStdTime(m.Timestamp)))
|
||||||
|
n1, err := types.StdTimeMarshalTo(m.Timestamp, dAtA[i:])
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
i += n1
|
||||||
|
return i, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func encodeVarintTrigger(dAtA []byte, offset int, v uint64) int {
|
||||||
|
for v >= 1<<7 {
|
||||||
|
dAtA[offset] = uint8(v&0x7f | 0x80)
|
||||||
|
v >>= 7
|
||||||
|
offset++
|
||||||
|
}
|
||||||
|
dAtA[offset] = uint8(v)
|
||||||
|
return offset + 1
|
||||||
|
}
|
||||||
|
func (m *Trigger) Size() (n int) {
|
||||||
|
var l int
|
||||||
|
_ = l
|
||||||
|
l = len(m.Fingerprint)
|
||||||
|
if l > 0 {
|
||||||
|
n += 1 + l + sovTrigger(uint64(l))
|
||||||
|
}
|
||||||
|
l = len(m.PeerId)
|
||||||
|
if l > 0 {
|
||||||
|
n += 1 + l + sovTrigger(uint64(l))
|
||||||
|
}
|
||||||
|
l = types.SizeOfStdTime(m.Timestamp)
|
||||||
|
n += 1 + l + sovTrigger(uint64(l))
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
|
func sovTrigger(x uint64) (n int) {
|
||||||
|
for {
|
||||||
|
n++
|
||||||
|
x >>= 7
|
||||||
|
if x == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
func sozTrigger(x uint64) (n int) {
|
||||||
|
return sovTrigger(uint64((x << 1) ^ uint64((int64(x) >> 63))))
|
||||||
|
}
|
||||||
|
func (m *Trigger) Unmarshal(dAtA []byte) error {
|
||||||
|
l := len(dAtA)
|
||||||
|
iNdEx := 0
|
||||||
|
for iNdEx < l {
|
||||||
|
preIndex := iNdEx
|
||||||
|
var wire uint64
|
||||||
|
for shift := uint(0); ; shift += 7 {
|
||||||
|
if shift >= 64 {
|
||||||
|
return ErrIntOverflowTrigger
|
||||||
|
}
|
||||||
|
if iNdEx >= l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
b := dAtA[iNdEx]
|
||||||
|
iNdEx++
|
||||||
|
wire |= (uint64(b) & 0x7F) << shift
|
||||||
|
if b < 0x80 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fieldNum := int32(wire >> 3)
|
||||||
|
wireType := int(wire & 0x7)
|
||||||
|
if wireType == 4 {
|
||||||
|
return fmt.Errorf("proto: Trigger: wiretype end group for non-group")
|
||||||
|
}
|
||||||
|
if fieldNum <= 0 {
|
||||||
|
return fmt.Errorf("proto: Trigger: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||||
|
}
|
||||||
|
switch fieldNum {
|
||||||
|
case 1:
|
||||||
|
if wireType != 2 {
|
||||||
|
return fmt.Errorf("proto: wrong wireType = %d for field Fingerprint", wireType)
|
||||||
|
}
|
||||||
|
var stringLen uint64
|
||||||
|
for shift := uint(0); ; shift += 7 {
|
||||||
|
if shift >= 64 {
|
||||||
|
return ErrIntOverflowTrigger
|
||||||
|
}
|
||||||
|
if iNdEx >= l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
b := dAtA[iNdEx]
|
||||||
|
iNdEx++
|
||||||
|
stringLen |= (uint64(b) & 0x7F) << shift
|
||||||
|
if b < 0x80 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
intStringLen := int(stringLen)
|
||||||
|
if intStringLen < 0 {
|
||||||
|
return ErrInvalidLengthTrigger
|
||||||
|
}
|
||||||
|
postIndex := iNdEx + intStringLen
|
||||||
|
if postIndex > l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
m.Fingerprint = string(dAtA[iNdEx:postIndex])
|
||||||
|
iNdEx = postIndex
|
||||||
|
case 2:
|
||||||
|
if wireType != 2 {
|
||||||
|
return fmt.Errorf("proto: wrong wireType = %d for field PeerId", wireType)
|
||||||
|
}
|
||||||
|
var stringLen uint64
|
||||||
|
for shift := uint(0); ; shift += 7 {
|
||||||
|
if shift >= 64 {
|
||||||
|
return ErrIntOverflowTrigger
|
||||||
|
}
|
||||||
|
if iNdEx >= l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
b := dAtA[iNdEx]
|
||||||
|
iNdEx++
|
||||||
|
stringLen |= (uint64(b) & 0x7F) << shift
|
||||||
|
if b < 0x80 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
intStringLen := int(stringLen)
|
||||||
|
if intStringLen < 0 {
|
||||||
|
return ErrInvalidLengthTrigger
|
||||||
|
}
|
||||||
|
postIndex := iNdEx + intStringLen
|
||||||
|
if postIndex > l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
m.PeerId = string(dAtA[iNdEx:postIndex])
|
||||||
|
iNdEx = postIndex
|
||||||
|
case 3:
|
||||||
|
if wireType != 2 {
|
||||||
|
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType)
|
||||||
|
}
|
||||||
|
var msglen int
|
||||||
|
for shift := uint(0); ; shift += 7 {
|
||||||
|
if shift >= 64 {
|
||||||
|
return ErrIntOverflowTrigger
|
||||||
|
}
|
||||||
|
if iNdEx >= l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
b := dAtA[iNdEx]
|
||||||
|
iNdEx++
|
||||||
|
msglen |= (int(b) & 0x7F) << shift
|
||||||
|
if b < 0x80 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if msglen < 0 {
|
||||||
|
return ErrInvalidLengthTrigger
|
||||||
|
}
|
||||||
|
postIndex := iNdEx + msglen
|
||||||
|
if postIndex > l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
if err := types.StdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
iNdEx = postIndex
|
||||||
|
default:
|
||||||
|
iNdEx = preIndex
|
||||||
|
skippy, err := skipTrigger(dAtA[iNdEx:])
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if skippy < 0 {
|
||||||
|
return ErrInvalidLengthTrigger
|
||||||
|
}
|
||||||
|
if (iNdEx + skippy) > l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
iNdEx += skippy
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if iNdEx > l {
|
||||||
|
return io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func skipTrigger(dAtA []byte) (n int, err error) {
|
||||||
|
l := len(dAtA)
|
||||||
|
iNdEx := 0
|
||||||
|
for iNdEx < l {
|
||||||
|
var wire uint64
|
||||||
|
for shift := uint(0); ; shift += 7 {
|
||||||
|
if shift >= 64 {
|
||||||
|
return 0, ErrIntOverflowTrigger
|
||||||
|
}
|
||||||
|
if iNdEx >= l {
|
||||||
|
return 0, io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
b := dAtA[iNdEx]
|
||||||
|
iNdEx++
|
||||||
|
wire |= (uint64(b) & 0x7F) << shift
|
||||||
|
if b < 0x80 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wireType := int(wire & 0x7)
|
||||||
|
switch wireType {
|
||||||
|
case 0:
|
||||||
|
for shift := uint(0); ; shift += 7 {
|
||||||
|
if shift >= 64 {
|
||||||
|
return 0, ErrIntOverflowTrigger
|
||||||
|
}
|
||||||
|
if iNdEx >= l {
|
||||||
|
return 0, io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
iNdEx++
|
||||||
|
if dAtA[iNdEx-1] < 0x80 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return iNdEx, nil
|
||||||
|
case 1:
|
||||||
|
iNdEx += 8
|
||||||
|
return iNdEx, nil
|
||||||
|
case 2:
|
||||||
|
var length int
|
||||||
|
for shift := uint(0); ; shift += 7 {
|
||||||
|
if shift >= 64 {
|
||||||
|
return 0, ErrIntOverflowTrigger
|
||||||
|
}
|
||||||
|
if iNdEx >= l {
|
||||||
|
return 0, io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
b := dAtA[iNdEx]
|
||||||
|
iNdEx++
|
||||||
|
length |= (int(b) & 0x7F) << shift
|
||||||
|
if b < 0x80 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
iNdEx += length
|
||||||
|
if length < 0 {
|
||||||
|
return 0, ErrInvalidLengthTrigger
|
||||||
|
}
|
||||||
|
return iNdEx, nil
|
||||||
|
case 3:
|
||||||
|
for {
|
||||||
|
var innerWire uint64
|
||||||
|
var start int = iNdEx
|
||||||
|
for shift := uint(0); ; shift += 7 {
|
||||||
|
if shift >= 64 {
|
||||||
|
return 0, ErrIntOverflowTrigger
|
||||||
|
}
|
||||||
|
if iNdEx >= l {
|
||||||
|
return 0, io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
b := dAtA[iNdEx]
|
||||||
|
iNdEx++
|
||||||
|
innerWire |= (uint64(b) & 0x7F) << shift
|
||||||
|
if b < 0x80 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
innerWireType := int(innerWire & 0x7)
|
||||||
|
if innerWireType == 4 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
next, err := skipTrigger(dAtA[start:])
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
iNdEx = start + next
|
||||||
|
}
|
||||||
|
return iNdEx, nil
|
||||||
|
case 4:
|
||||||
|
return iNdEx, nil
|
||||||
|
case 5:
|
||||||
|
iNdEx += 4
|
||||||
|
return iNdEx, nil
|
||||||
|
default:
|
||||||
|
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
panic("unreachable")
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrInvalidLengthTrigger = fmt.Errorf("proto: negative length found during unmarshaling")
|
||||||
|
ErrIntOverflowTrigger = fmt.Errorf("proto: integer overflow")
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() { proto.RegisterFile("trigger.proto", fileDescriptorTrigger) }
|
||||||
|
|
||||||
|
var fileDescriptorTrigger = []byte{
|
||||||
|
// 195 bytes of a gzipped FileDescriptorProto
|
||||||
|
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2d, 0x29, 0xca, 0x4c,
|
||||||
|
0x4f, 0x4f, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x84, 0x72, 0x0b, 0x92, 0xa4,
|
||||||
|
0xe4, 0xd3, 0xf3, 0xf3, 0xd3, 0x73, 0x52, 0xf5, 0xc1, 0x12, 0x49, 0xa5, 0x69, 0xfa, 0x25, 0x99,
|
||||||
|
0xb9, 0xa9, 0xc5, 0x25, 0x89, 0xb9, 0x05, 0x10, 0xb5, 0x52, 0x22, 0xe9, 0xf9, 0xe9, 0xf9, 0x60,
|
||||||
|
0xa6, 0x3e, 0x88, 0x05, 0x11, 0x55, 0xea, 0x60, 0xe4, 0x62, 0x0f, 0x81, 0x18, 0x22, 0xa4, 0xc0,
|
||||||
|
0xc5, 0x9d, 0x96, 0x99, 0x07, 0x32, 0xae, 0x28, 0x33, 0xaf, 0x44, 0x82, 0x51, 0x81, 0x51, 0x83,
|
||||||
|
0x33, 0x08, 0x59, 0x48, 0x48, 0x9c, 0x8b, 0xbd, 0x20, 0x35, 0xb5, 0x28, 0x3e, 0x33, 0x45, 0x82,
|
||||||
|
0x09, 0x2c, 0xcb, 0x06, 0xe2, 0x7a, 0xa6, 0x08, 0x39, 0x71, 0x71, 0xc2, 0xed, 0x93, 0x60, 0x56,
|
||||||
|
0x60, 0xd4, 0xe0, 0x36, 0x92, 0xd2, 0x83, 0xb8, 0x48, 0x0f, 0xe6, 0x22, 0xbd, 0x10, 0x98, 0x0a,
|
||||||
|
0x27, 0x8e, 0x13, 0xf7, 0xe4, 0x19, 0x26, 0xdc, 0x97, 0x67, 0x0c, 0x42, 0x68, 0x73, 0x12, 0x38,
|
||||||
|
0xf1, 0x50, 0x8e, 0xe1, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63,
|
||||||
|
0x4c, 0x62, 0x03, 0x6b, 0x35, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0x72, 0x58, 0x01, 0x59, 0xf6,
|
||||||
|
0x00, 0x00, 0x00,
|
||||||
|
}
|
|
@ -0,0 +1,21 @@
|
||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
package triggerpb;
|
||||||
|
|
||||||
|
import "google/protobuf/timestamp.proto";
|
||||||
|
import "gogoproto/gogo.proto";
|
||||||
|
|
||||||
|
option (gogoproto.marshaler_all) = true;
|
||||||
|
option (gogoproto.sizer_all) = true;
|
||||||
|
option (gogoproto.unmarshaler_all) = true;
|
||||||
|
option (gogoproto.goproto_getters_all) = false;
|
||||||
|
|
||||||
|
message Trigger {
|
||||||
|
// The fingerprint of the pipeline that started execution.
|
||||||
|
string fingerprint = 1;
|
||||||
|
// The peer sending the message.
|
||||||
|
string peer_id = 2;
|
||||||
|
|
||||||
|
// Timestamp for message.
|
||||||
|
google.protobuf.Timestamp timestamp = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
||||||
|
}
|
Loading…
Reference in New Issue