From b54bca08a2722f00e613856314c69734e6c5c5e5 Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Tue, 8 May 2018 17:28:09 +0200 Subject: [PATCH] Add drift, use own proto def --- Procfile | 2 +- cmd/alertmanager/main.go | 10 +- dispatch/dispatch.go | 25 +- notify/notify.go | 4 + provider/provider.go | 4 +- scripts/genproto.sh | 2 +- trigger/trigger.go | 56 ++--- trigger/triggerpb/trigger.pb.go | 392 ++++++++++++++++++++++++++++++++ trigger/triggerpb/trigger.proto | 21 ++ 9 files changed, 474 insertions(+), 42 deletions(-) create mode 100644 trigger/triggerpb/trigger.pb.go create mode 100644 trigger/triggerpb/trigger.proto diff --git a/Procfile b/Procfile index 17a2ff81..4aa5df82 100644 --- a/Procfile +++ b/Procfile @@ -1,5 +1,5 @@ a1: ./alertmanager --log.level=debug --storage.path=$TMPDIR/a1 --web.listen-address=:9093 --cluster.listen-address=127.0.0.1:8001 --config.file=examples/ha/alertmanager.yml --cluster.peer-timeout=5s a2: ./alertmanager --log.level=debug --storage.path=$TMPDIR/a2 --web.listen-address=:9094 --cluster.listen-address=127.0.0.1:8002 --cluster.peer=127.0.0.1:8001 --config.file=examples/ha/alertmanager.yml --cluster.peer-timeout=5s -a3: ./alertmanager --log.level=debug --storage.path=$TMPDIR/a3 --web.listen-address=:9095 --cluster.listen-address=127.0.0.1:8003 --cluster.peer=127.0.0.1:8001 --config.file=examples/ha/alertmanager.yml --cluster.peer-timeout=5s +a3: ./alertmanager --log.level=debug --storage.path=$TMPDIR/a3 --web.listen-address=:9095 --cluster.listen-address=127.0.0.1:8003 --cluster.peer=127.0.0.1:8001 --config.file=examples/ha/alertmanager.yml --cluster.peer-timeout=5s --cluster.drift=5s wh: go run ./examples/webhook/echo.go diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 699b5371..276d6951 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -158,6 +158,8 @@ func main() { gossipInterval = kingpin.Flag("cluster.gossip-interval", "Interval between sending gossip messages. By lowering this value (more frequent) gossip messages are propagated across the cluster more quickly at the expense of increased bandwidth.").Default(cluster.DefaultGossipInterval.String()).Duration() pushPullInterval = kingpin.Flag("cluster.pushpull-interval", "Interval for gossip state syncs. Setting this interval lower (more frequent) will increase convergence speeds across larger clusters at the expense of increased bandwidth usage.").Default(cluster.DefaultPushPullInterval.String()).Duration() settleTimeout = kingpin.Flag("cluster.settle-timeout", "Maximum time to wait for cluster connections to settle before evaluating notifications.").Default(cluster.DefaultPushPullInterval.String()).Duration() + triggerCooldown = kingpin.Flag("cluster.synchronize-cooldown", "After a alert pipeline has been triggered, how long to wait before allowing peers to trigger the pipeline again.").Default("5s").Duration() + pipelineDrift = kingpin.Flag("cluster.drift", "Interval to be added to pipeline execution after each execution. Accumulative.").Default("0s").Duration() ) kingpin.Version(version.Print("alertmanager")) @@ -240,7 +242,11 @@ func main() { silences.SetBroadcast(c.Broadcast) } - triggers := trigger.New() + var peerID string + if peer != nil { + peerID = peer.Name() + } + triggers := trigger.New(peerID) if peer != nil { c := peer.AddState("trigger", triggers) triggers.SetBroadcast(c.Broadcast) @@ -355,6 +361,8 @@ func main() { marker, timeoutFunc, triggers, + *triggerCooldown, + *pipelineDrift, logger, ) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index f9e9a2a8..7d314491 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -25,6 +25,9 @@ type Dispatcher struct { alerts provider.Alerts stage notify.Stage triggers *trigger.Trigger + cooldown time.Duration + // For testing. Accumulates drift in pipeline execution scheduling. + drift time.Duration marker types.Marker timeout func(time.Duration) time.Duration @@ -47,6 +50,8 @@ func NewDispatcher( mk types.Marker, to func(time.Duration) time.Duration, triggers *trigger.Trigger, + cooldown time.Duration, + drift time.Duration, l log.Logger, ) *Dispatcher { disp := &Dispatcher{ @@ -56,6 +61,8 @@ func NewDispatcher( marker: mk, timeout: to, triggers: triggers, + cooldown: cooldown, + drift: drift, logger: log.With(l, "component", "dispatcher"), } return disp @@ -261,7 +268,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { // If the group does not exist, create it. ag, ok := group[fp] if !ok { - ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.triggers, d.logger) + ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.triggers, d.cooldown, d.logger) group[fp] = ag go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool { @@ -270,7 +277,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { level.Error(d.logger).Log("msg", "Notify for alerts failed", "num_alerts", len(alerts), "err", err) } return err == nil - }) + }, d.drift) } ag.insert(alert) @@ -291,6 +298,7 @@ type aggrGroup struct { next *time.Timer timeout func(time.Duration) time.Duration triggers *trigger.Trigger + cooldown time.Duration mtx sync.RWMutex alerts map[model.Fingerprint]*types.Alert @@ -305,6 +313,7 @@ func newAggrGroup( r *Route, to func(time.Duration) time.Duration, triggers *trigger.Trigger, + cooldown time.Duration, logger log.Logger, ) *aggrGroup { if to == nil { @@ -315,6 +324,7 @@ func newAggrGroup( routeKey: r.Key(), opts: &r.RouteOpts, triggers: triggers, + cooldown: cooldown, timeout: to, alerts: map[model.Fingerprint]*types.Alert{}, } @@ -352,7 +362,7 @@ func (ag *aggrGroup) alertSlice() []*types.Alert { return alerts } -func (ag *aggrGroup) run(nf notifyFunc) { +func (ag *aggrGroup) run(nf notifyFunc, drift time.Duration) { ag.done = make(chan struct{}) // fp is the labels causing this grouping. fp := ag.fingerprint() @@ -412,13 +422,18 @@ func (ag *aggrGroup) run(nf notifyFunc) { // Wait the configured interval before calling flush again. ag.mtx.Lock() + + if drift != 0 { + ag.opts.GroupInterval += drift + level.Info(ag.logger).Log("msg", "trigger", "action", "drift", "group_interval", ag.opts.GroupInterval) + } ag.next.Reset(ag.opts.GroupInterval) ag.hasFlushed = true ag.mtx.Unlock() go func() { - // TODO: How long is the cooldown? - time.Sleep(5 * time.Second) + // TODO: Set cooldown via commandline option. + time.Sleep(ag.cooldown) ag.mtx.Lock() ag.triggered = false diff --git a/notify/notify.go b/notify/notify.go index e3f25da4..68c07216 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -439,11 +439,14 @@ func NewWaitStage(wait func() time.Duration) *WaitStage { // Exec implements the Stage interface. func (ws *WaitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { + now := time.Now() select { // TODO: We need to listen here for updates on the mesh, and filter // alerts that have already been sent. case <-time.After(ws.wait()): + level.Info(l).Log("msg", "trigger", "action", "wait_stage passed", "duration", time.Since(now)) case <-ctx.Done(): + level.Info(l).Log("msg", "trigger", "action", "wait_stage canceled", "duration", time.Since(now)) return ctx, nil, ctx.Err() } return ctx, alerts, nil @@ -644,6 +647,7 @@ func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Ale select { case <-tick.C: now := time.Now() + level.Info(l).Log("msg", "trigger", "action", "sending") retry, err := r.integration.Notify(ctx, alerts...) notificationLatencySeconds.WithLabelValues(r.integration.name).Observe(time.Since(now).Seconds()) if err != nil { diff --git a/provider/provider.go b/provider/provider.go index d789754a..cd130508 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -16,7 +16,7 @@ package provider import ( "fmt" - pb "github.com/prometheus/alertmanager/nflog/nflogpb" + pb "github.com/prometheus/alertmanager/trigger/triggerpb" "github.com/prometheus/alertmanager/types" "github.com/prometheus/common/model" ) @@ -56,7 +56,7 @@ type TriggerIterator interface { // exhausted. It is not necessary to exhaust the iterator but Close must // be called in any case to release resources used by the iterator (even // if the iterator is exhausted). - Next() <-chan *pb.MeshEntry + Next() <-chan *pb.Trigger } // NewAlertIterator returns a new AlertIterator based on the generic alertIterator type diff --git a/scripts/genproto.sh b/scripts/genproto.sh index c777fb9f..bc799a07 100755 --- a/scripts/genproto.sh +++ b/scripts/genproto.sh @@ -23,7 +23,7 @@ GOGOPROTO_ROOT="${GOPATH}/src/github.com/gogo/protobuf" GOGOPROTO_PATH="${GOGOPROTO_ROOT}:${GOGOPROTO_ROOT}/protobuf" GRPC_GATEWAY_ROOT="${GOPATH}/src/github.com/grpc-ecosystem/grpc-gateway" -DIRS="nflog/nflogpb silence/silencepb cluster/clusterpb" +DIRS="nflog/nflogpb silence/silencepb cluster/clusterpb trigger/triggerpb" for dir in ${DIRS}; do pushd ${dir} diff --git a/trigger/trigger.go b/trigger/trigger.go index 705959b3..c99f4580 100644 --- a/trigger/trigger.go +++ b/trigger/trigger.go @@ -10,8 +10,8 @@ import ( "github.com/matttproud/golang_protobuf_extensions/pbutil" "github.com/prometheus/alertmanager/nflog" - pb "github.com/prometheus/alertmanager/nflog/nflogpb" "github.com/prometheus/alertmanager/provider" + pb "github.com/prometheus/alertmanager/trigger/triggerpb" "github.com/prometheus/common/model" ) @@ -23,17 +23,19 @@ import ( type Trigger struct { st state now func() time.Time - subscribers map[model.Fingerprint]chan *pb.MeshEntry + subscribers map[model.Fingerprint]chan *pb.Trigger broadcast func([]byte) + peerID string sync.Mutex } -func New() *Trigger { +func New(peerID string) *Trigger { return &Trigger{ + peerID: peerID, st: state{}, now: utcNow, - subscribers: make(map[model.Fingerprint]chan *pb.MeshEntry), + subscribers: make(map[model.Fingerprint]chan *pb.Trigger), broadcast: func(_ []byte) {}, } } @@ -69,13 +71,13 @@ func (t *Trigger) Merge(b []byte) error { // TODO: Is there a purpose in storing these? I think we just // want to send a message and then move on with our lives. // t.st.merge(e) - fp, err := model.FingerprintFromString(string(e.Entry.GroupKey)) + fp, err := model.FingerprintFromString(e.Fingerprint) if err != nil { - return fmt.Errorf("failed to parse groupkey to fingerprint %v", err) + return fmt.Errorf("failed to parse fingerprint: %v", err) } s, ok := t.subscribers[fp] if !ok { - return fmt.Errorf("subscriber for %s does not exist", string(e.Entry.GroupKey)) + return fmt.Errorf("subscriber for %s does not exist", e.Fingerprint) } s <- e } @@ -90,18 +92,13 @@ func (t *Trigger) Trigger(fp model.Fingerprint) error { t.Lock() defer t.Unlock() - e := &pb.MeshEntry{ - Entry: &pb.Entry{ - Receiver: nil, - GroupKey: []byte(fp.String()), - Timestamp: now, - FiringAlerts: nil, - ResolvedAlerts: nil, - }, - ExpiresAt: now, + e := &pb.Trigger{ + Fingerprint: fp.String(), + PeerId: t.peerID, + Timestamp: now, } - b, err := marshalMeshEntry(e) + b, err := marshalTrigger(e) if err != nil { return err } @@ -110,7 +107,7 @@ func (t *Trigger) Trigger(fp model.Fingerprint) error { return nil } -func marshalMeshEntry(e *pb.MeshEntry) ([]byte, error) { +func marshalTrigger(e *pb.Trigger) ([]byte, error) { var buf bytes.Buffer if _, err := pbutil.WriteDelimited(&buf, e); err != nil { return nil, err @@ -128,7 +125,7 @@ func (t *Trigger) SetBroadcast(f func([]byte)) { // Subscribe returns a channel indicating incoming triggers. func (t *Trigger) Subscribe(fp model.Fingerprint) provider.TriggerIterator { var ( - ch = make(chan *pb.MeshEntry) + ch = make(chan *pb.Trigger) ctx, cancel = context.WithCancel(context.Background()) ) @@ -154,12 +151,12 @@ func (t *Trigger) Subscribe(fp model.Fingerprint) provider.TriggerIterator { // triggerListener alerts subscribers of a particular labelset when a new // message arrives. type triggerIterator struct { - ch chan *pb.MeshEntry + ch chan *pb.Trigger cancel context.CancelFunc } // Next implements the TriggerIterator interface. -func (t *triggerIterator) Next() <-chan *pb.MeshEntry { +func (t *triggerIterator) Next() <-chan *pb.Trigger { return t.ch } @@ -174,28 +171,23 @@ func (t *triggerIterator) Close() { } // String is the label fingerprint. Can probably make this "typesafe" later. -type state map[string]*pb.MeshEntry +type state map[string]*pb.Trigger -func (s state) merge(e *pb.MeshEntry) { - k := string(e.Entry.GroupKey) - - prev, ok := s[k] - if !ok || prev.Entry.Timestamp.Before(e.Entry.Timestamp) { - s[k] = e - } +func (s state) merge(e *pb.Trigger) { + s[e.Fingerprint] = e } func decodeState(r io.Reader) (state, error) { t := state{} for { - var e pb.MeshEntry + var e pb.Trigger _, err := pbutil.ReadDelimited(r, &e) if err == nil { - if e.Entry == nil { + if e.Fingerprint == "" { return nil, nflog.ErrInvalidState } // Create own protobuf def, use fingerprint instead of groupkey - t[string(e.Entry.GroupKey)] = &e + t[e.Fingerprint] = &e continue } if err == io.EOF { diff --git a/trigger/triggerpb/trigger.pb.go b/trigger/triggerpb/trigger.pb.go new file mode 100644 index 00000000..73b81a67 --- /dev/null +++ b/trigger/triggerpb/trigger.pb.go @@ -0,0 +1,392 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: trigger.proto + +/* + Package triggerpb is a generated protocol buffer package. + + It is generated from these files: + trigger.proto + + It has these top-level messages: + Trigger +*/ +package triggerpb + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" + +import _ "github.com/gogo/protobuf/gogoproto" + +import time "time" + +import types "github.com/gogo/protobuf/types" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf +var _ = time.Kitchen + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type Trigger struct { + // The fingerprint of the pipeline that started execution. + Fingerprint string `protobuf:"bytes,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` + // The peer sending the message. + PeerId string `protobuf:"bytes,2,opt,name=peer_id,json=peerId,proto3" json:"peer_id,omitempty"` + // Timestamp for message. + Timestamp time.Time `protobuf:"bytes,3,opt,name=timestamp,stdtime" json:"timestamp"` +} + +func (m *Trigger) Reset() { *m = Trigger{} } +func (m *Trigger) String() string { return proto.CompactTextString(m) } +func (*Trigger) ProtoMessage() {} +func (*Trigger) Descriptor() ([]byte, []int) { return fileDescriptorTrigger, []int{0} } + +func init() { + proto.RegisterType((*Trigger)(nil), "triggerpb.Trigger") +} +func (m *Trigger) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Trigger) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Fingerprint) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintTrigger(dAtA, i, uint64(len(m.Fingerprint))) + i += copy(dAtA[i:], m.Fingerprint) + } + if len(m.PeerId) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintTrigger(dAtA, i, uint64(len(m.PeerId))) + i += copy(dAtA[i:], m.PeerId) + } + dAtA[i] = 0x1a + i++ + i = encodeVarintTrigger(dAtA, i, uint64(types.SizeOfStdTime(m.Timestamp))) + n1, err := types.StdTimeMarshalTo(m.Timestamp, dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + return i, nil +} + +func encodeVarintTrigger(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *Trigger) Size() (n int) { + var l int + _ = l + l = len(m.Fingerprint) + if l > 0 { + n += 1 + l + sovTrigger(uint64(l)) + } + l = len(m.PeerId) + if l > 0 { + n += 1 + l + sovTrigger(uint64(l)) + } + l = types.SizeOfStdTime(m.Timestamp) + n += 1 + l + sovTrigger(uint64(l)) + return n +} + +func sovTrigger(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozTrigger(x uint64) (n int) { + return sovTrigger(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Trigger) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTrigger + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Trigger: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Trigger: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Fingerprint", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTrigger + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthTrigger + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Fingerprint = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PeerId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTrigger + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthTrigger + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.PeerId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTrigger + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTrigger + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := types.StdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTrigger(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthTrigger + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipTrigger(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowTrigger + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowTrigger + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowTrigger + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthTrigger + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowTrigger + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipTrigger(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthTrigger = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowTrigger = fmt.Errorf("proto: integer overflow") +) + +func init() { proto.RegisterFile("trigger.proto", fileDescriptorTrigger) } + +var fileDescriptorTrigger = []byte{ + // 195 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2d, 0x29, 0xca, 0x4c, + 0x4f, 0x4f, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x84, 0x72, 0x0b, 0x92, 0xa4, + 0xe4, 0xd3, 0xf3, 0xf3, 0xd3, 0x73, 0x52, 0xf5, 0xc1, 0x12, 0x49, 0xa5, 0x69, 0xfa, 0x25, 0x99, + 0xb9, 0xa9, 0xc5, 0x25, 0x89, 0xb9, 0x05, 0x10, 0xb5, 0x52, 0x22, 0xe9, 0xf9, 0xe9, 0xf9, 0x60, + 0xa6, 0x3e, 0x88, 0x05, 0x11, 0x55, 0xea, 0x60, 0xe4, 0x62, 0x0f, 0x81, 0x18, 0x22, 0xa4, 0xc0, + 0xc5, 0x9d, 0x96, 0x99, 0x07, 0x32, 0xae, 0x28, 0x33, 0xaf, 0x44, 0x82, 0x51, 0x81, 0x51, 0x83, + 0x33, 0x08, 0x59, 0x48, 0x48, 0x9c, 0x8b, 0xbd, 0x20, 0x35, 0xb5, 0x28, 0x3e, 0x33, 0x45, 0x82, + 0x09, 0x2c, 0xcb, 0x06, 0xe2, 0x7a, 0xa6, 0x08, 0x39, 0x71, 0x71, 0xc2, 0xed, 0x93, 0x60, 0x56, + 0x60, 0xd4, 0xe0, 0x36, 0x92, 0xd2, 0x83, 0xb8, 0x48, 0x0f, 0xe6, 0x22, 0xbd, 0x10, 0x98, 0x0a, + 0x27, 0x8e, 0x13, 0xf7, 0xe4, 0x19, 0x26, 0xdc, 0x97, 0x67, 0x0c, 0x42, 0x68, 0x73, 0x12, 0x38, + 0xf1, 0x50, 0x8e, 0xe1, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63, + 0x4c, 0x62, 0x03, 0x6b, 0x35, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0x72, 0x58, 0x01, 0x59, 0xf6, + 0x00, 0x00, 0x00, +} diff --git a/trigger/triggerpb/trigger.proto b/trigger/triggerpb/trigger.proto new file mode 100644 index 00000000..7725d0b8 --- /dev/null +++ b/trigger/triggerpb/trigger.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; + +package triggerpb; + +import "google/protobuf/timestamp.proto"; +import "gogoproto/gogo.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_getters_all) = false; + +message Trigger { + // The fingerprint of the pipeline that started execution. + string fingerprint = 1; + // The peer sending the message. + string peer_id = 2; + + // Timestamp for message. + google.protobuf.Timestamp timestamp = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; +}