Adapt implementation to new generated Go code
This commit is contained in:
parent
0bf1b28c33
commit
4d81dc8bf3
|
@ -27,7 +27,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/matttproud/golang_protobuf_extensions/pbutil"
|
||||
pb "github.com/prometheus/alertmanager/nflog/nflogpb"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
@ -352,33 +351,20 @@ func (l *nlog) log(r *pb.Receiver, gkey, ghash []byte, resolved bool) error {
|
|||
if prevle, ok := l.st[key]; ok {
|
||||
// Entry already exists, only overwrite if timestamp is newer.
|
||||
// This may with raciness or clock-drift across AM nodes.
|
||||
prevts, err := ptypes.Timestamp(prevle.Entry.Timestamp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if prevts.After(now) {
|
||||
if prevle.Entry.Timestamp.After(now) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
ts, err := ptypes.TimestampProto(now)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
expts, err := ptypes.TimestampProto(now.Add(l.retention))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e := &pb.MeshEntry{
|
||||
Entry: &pb.Entry{
|
||||
Receiver: r,
|
||||
GroupKey: gkey,
|
||||
GroupHash: ghash,
|
||||
Resolved: resolved,
|
||||
Timestamp: ts,
|
||||
Timestamp: now,
|
||||
},
|
||||
ExpiresAt: expts,
|
||||
ExpiresAt: now.Add(l.retention),
|
||||
}
|
||||
l.gossip.GossipBroadcast(gossipData{
|
||||
key: e,
|
||||
|
@ -400,9 +386,7 @@ func (l *nlog) GC() (int, error) {
|
|||
defer l.mtx.Unlock()
|
||||
|
||||
for k, le := range l.st {
|
||||
if ets, err := ptypes.Timestamp(le.ExpiresAt); err != nil {
|
||||
return n, err
|
||||
} else if !ets.After(now) {
|
||||
if !le.ExpiresAt.After(now) {
|
||||
delete(l.st, k)
|
||||
n++
|
||||
}
|
||||
|
@ -600,17 +584,7 @@ func (gd gossipData) Merge(other mesh.GossipData) mesh.GossipData {
|
|||
gd[k] = e
|
||||
continue
|
||||
}
|
||||
pts, err := ptypes.Timestamp(prev.Entry.Timestamp)
|
||||
if err != nil {
|
||||
// TODO(fabxc): log error and skip entry. What can actually error here?
|
||||
panic(err)
|
||||
}
|
||||
ets, err := ptypes.Timestamp(e.Entry.Timestamp)
|
||||
if err != nil {
|
||||
// TODO(fabxc): see above.
|
||||
panic(err)
|
||||
}
|
||||
if pts.Before(ets) {
|
||||
if prev.Entry.Timestamp.Before(e.Entry.Timestamp) {
|
||||
gd[k] = e
|
||||
}
|
||||
}
|
||||
|
@ -628,17 +602,7 @@ func (gd gossipData) mergeDelta(od gossipData) gossipData {
|
|||
delta[k] = e
|
||||
continue
|
||||
}
|
||||
pts, err := ptypes.Timestamp(prev.Entry.Timestamp)
|
||||
if err != nil {
|
||||
// TODO(fabxc): log error and skip entry. What can actually error here?
|
||||
panic(err)
|
||||
}
|
||||
ets, err := ptypes.Timestamp(e.Entry.Timestamp)
|
||||
if err != nil {
|
||||
// TODO(fabxc): see above.
|
||||
panic(err)
|
||||
}
|
||||
if pts.Before(ets) {
|
||||
if prev.Entry.Timestamp.Before(e.Entry.Timestamp) {
|
||||
gd[k] = e
|
||||
delta[k] = e
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/prometheus/common/model"
|
||||
|
@ -454,11 +453,7 @@ func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, hash []byte, resolved boo
|
|||
}
|
||||
|
||||
// Nothing changed, only notify if the repeat interval has passed.
|
||||
ts, err := ptypes.Timestamp(entry.Timestamp)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return ts.Before(n.now().Add(-repeat)), nil
|
||||
return entry.Timestamp.Before(n.now().Add(-repeat)), nil
|
||||
}
|
||||
|
||||
// Exec implements the Stage interface.
|
||||
|
|
|
@ -26,7 +26,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/golang/protobuf/ptypes/timestamp"
|
||||
"github.com/matttproud/golang_protobuf_extensions/pbutil"
|
||||
pb "github.com/prometheus/alertmanager/silence/silencepb"
|
||||
|
@ -276,17 +275,14 @@ func (s *Silences) GC() (int, error) {
|
|||
start := time.Now()
|
||||
defer func() { s.metrics.gcDuration.Observe(time.Since(start).Seconds()) }()
|
||||
|
||||
now, err := s.nowProto()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
now := s.now()
|
||||
var n int
|
||||
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
for id, sil := range s.st {
|
||||
if !protoBefore(now, sil.ExpiresAt) {
|
||||
if !now.Before(sil.ExpiresAt) {
|
||||
delete(s.st, id)
|
||||
delete(s.mc, sil.Silence)
|
||||
n++
|
||||
|
@ -336,19 +332,11 @@ func validateSilence(s *pb.Silence) error {
|
|||
return fmt.Errorf("invalid label matcher %d: %s", i, err)
|
||||
}
|
||||
}
|
||||
startsAt, err := ptypes.Timestamp(s.StartsAt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid start time: %s", err)
|
||||
}
|
||||
endsAt, err := ptypes.Timestamp(s.EndsAt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid end time: %s", err)
|
||||
}
|
||||
if endsAt.Before(startsAt) {
|
||||
if s.EndsAt.Before(s.StartsAt) {
|
||||
return errors.New("end time must not be before start time")
|
||||
}
|
||||
if _, err := ptypes.Timestamp(s.UpdatedAt); err != nil {
|
||||
return fmt.Errorf("invalid update timestamp: %s", err)
|
||||
if s.UpdatedAt.IsZero() {
|
||||
return fmt.Errorf("invalid zero update timestamp")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -368,18 +356,9 @@ func (s *Silences) getSilence(id string) (*pb.Silence, bool) {
|
|||
}
|
||||
|
||||
func (s *Silences) setSilence(sil *pb.Silence) error {
|
||||
endsAt, err := ptypes.Timestamp(sil.EndsAt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
expiresAt, err := ptypes.TimestampProto(endsAt.Add(s.retention))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msil := &pb.MeshSilence{
|
||||
Silence: sil,
|
||||
ExpiresAt: expiresAt,
|
||||
ExpiresAt: sil.EndsAt.Add(s.retention),
|
||||
}
|
||||
st := gossipData{sil.Id: msil}
|
||||
|
||||
|
@ -389,11 +368,6 @@ func (s *Silences) setSilence(sil *pb.Silence) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Silences) nowProto() (*timestamp.Timestamp, error) {
|
||||
now := s.now()
|
||||
return ptypes.TimestampProto(now)
|
||||
}
|
||||
|
||||
// Create adds a new silence and returns its ID.
|
||||
func (s *Silences) Create(sil *pb.Silence) (id string, err error) {
|
||||
if sil.Id != "" {
|
||||
|
@ -401,13 +375,11 @@ func (s *Silences) Create(sil *pb.Silence) (id string, err error) {
|
|||
}
|
||||
sil.Id = uuid.NewV4().String()
|
||||
|
||||
now, err := s.nowProto()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if sil.StartsAt == nil {
|
||||
now := s.now()
|
||||
|
||||
if sil.StartsAt.IsZero() {
|
||||
sil.StartsAt = now
|
||||
} else if protoBefore(sil.StartsAt, now) {
|
||||
} else if sil.StartsAt.Before(now) {
|
||||
return "", fmt.Errorf("new silence must not start in the past")
|
||||
}
|
||||
sil.UpdatedAt = now
|
||||
|
@ -435,23 +407,20 @@ func (s *Silences) Expire(id string) error {
|
|||
return ErrNotFound
|
||||
}
|
||||
|
||||
now, err := s.nowProto()
|
||||
now := s.now()
|
||||
|
||||
sil, err := silenceSetTimeRange(sil, now, sil.StartsAt, now)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if sil, err = silenceSetTimeRange(sil, now, sil.StartsAt, now); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.setSilence(sil)
|
||||
}
|
||||
|
||||
// SetTimeRange adjust the time range of a silence if allowed. If start or end
|
||||
// are zero times, the current value remains unmodified.
|
||||
func (s *Silences) SetTimeRange(id string, start, end time.Time) error {
|
||||
now, err := s.nowProto()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
now := s.now()
|
||||
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
|
@ -462,39 +431,35 @@ func (s *Silences) SetTimeRange(id string, start, end time.Time) error {
|
|||
|
||||
// Retrieve protobuf start and end time, default to current value
|
||||
// of the silence.
|
||||
var startp, endp *timestamp.Timestamp
|
||||
if start.IsZero() {
|
||||
startp = sil.StartsAt
|
||||
} else if startp, err = ptypes.TimestampProto(start); err != nil {
|
||||
return err
|
||||
start = sil.StartsAt
|
||||
}
|
||||
if end.IsZero() {
|
||||
endp = sil.EndsAt
|
||||
} else if endp, err = ptypes.TimestampProto(end); err != nil {
|
||||
return err
|
||||
end = sil.EndsAt
|
||||
}
|
||||
|
||||
if sil, err = silenceSetTimeRange(sil, now, startp, endp); err != nil {
|
||||
sil, err := silenceSetTimeRange(sil, now, start, end)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.setSilence(sil)
|
||||
}
|
||||
|
||||
func silenceSetTimeRange(sil *pb.Silence, now, start, end *timestamp.Timestamp) (*pb.Silence, error) {
|
||||
if protoBefore(end, start) {
|
||||
func silenceSetTimeRange(sil *pb.Silence, now, start, end time.Time) (*pb.Silence, error) {
|
||||
if end.Before(start) {
|
||||
return nil, fmt.Errorf("end time must not be before start time")
|
||||
}
|
||||
// Validate modification based on current silence state.
|
||||
switch getState(sil, now) {
|
||||
case StateActive:
|
||||
if *start != *sil.StartsAt {
|
||||
if !start.Equal(sil.StartsAt) {
|
||||
return nil, fmt.Errorf("start time of active silence cannot be modified")
|
||||
}
|
||||
if protoBefore(end, now) {
|
||||
if end.Before(now) {
|
||||
return nil, fmt.Errorf("end time cannot be set into the past")
|
||||
}
|
||||
case StatePending:
|
||||
if protoBefore(start, now) {
|
||||
if start.Before(now) {
|
||||
return nil, fmt.Errorf("start time cannot be set into the past")
|
||||
}
|
||||
case StateExpired:
|
||||
|
@ -526,7 +491,7 @@ type query struct {
|
|||
|
||||
// silenceFilter is a function that returns true if a silence
|
||||
// should be dropped from a result set for a given time.
|
||||
type silenceFilter func(*pb.Silence, *Silences, *timestamp.Timestamp) (bool, error)
|
||||
type silenceFilter func(*pb.Silence, *Silences, time.Time) (bool, error)
|
||||
|
||||
var errNotSupported = errors.New("query parameter not supported")
|
||||
|
||||
|
@ -550,7 +515,7 @@ func QTimeRange(start, end time.Time) QueryParam {
|
|||
// QMatches returns silences that match the given label set.
|
||||
func QMatches(set model.LabelSet) QueryParam {
|
||||
return func(q *query) error {
|
||||
f := func(sil *pb.Silence, s *Silences, _ *timestamp.Timestamp) (bool, error) {
|
||||
f := func(sil *pb.Silence, s *Silences, _ time.Time) (bool, error) {
|
||||
m, err := s.mc.Get(sil)
|
||||
if err != nil {
|
||||
return true, err
|
||||
|
@ -573,11 +538,11 @@ const (
|
|||
)
|
||||
|
||||
// getState returns a silence's SilenceState at the given timestamp.
|
||||
func getState(sil *pb.Silence, ts *timestamp.Timestamp) SilenceState {
|
||||
if protoBefore(ts, sil.StartsAt) {
|
||||
func getState(sil *pb.Silence, ts time.Time) SilenceState {
|
||||
if ts.Before(sil.StartsAt) {
|
||||
return StatePending
|
||||
}
|
||||
if protoBefore(sil.EndsAt, ts) {
|
||||
if sil.EndsAt.Before(ts) {
|
||||
return StateExpired
|
||||
}
|
||||
return StateActive
|
||||
|
@ -586,7 +551,7 @@ func getState(sil *pb.Silence, ts *timestamp.Timestamp) SilenceState {
|
|||
// QState filters queried silences by the given states.
|
||||
func QState(states ...SilenceState) QueryParam {
|
||||
return func(q *query) error {
|
||||
f := func(sil *pb.Silence, _ *Silences, now *timestamp.Timestamp) (bool, error) {
|
||||
f := func(sil *pb.Silence, _ *Silences, now time.Time) (bool, error) {
|
||||
s := getState(sil, now)
|
||||
|
||||
for _, ps := range states {
|
||||
|
@ -613,11 +578,7 @@ func (s *Silences) Query(params ...QueryParam) ([]*pb.Silence, error) {
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
nowpb, err := s.nowProto()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.query(q, nowpb)
|
||||
return s.query(q, s.now())
|
||||
}()
|
||||
if err != nil {
|
||||
s.metrics.queryErrorsTotal.Inc()
|
||||
|
@ -626,7 +587,7 @@ func (s *Silences) Query(params ...QueryParam) ([]*pb.Silence, error) {
|
|||
return sils, err
|
||||
}
|
||||
|
||||
func (s *Silences) query(q *query, now *timestamp.Timestamp) ([]*pb.Silence, error) {
|
||||
func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, error) {
|
||||
// If we have an ID constraint, all silences are our base set.
|
||||
// This and the use of post-filter functions is the
|
||||
// the trivial solution for now.
|
||||
|
@ -824,15 +785,8 @@ func (gd gossipData) Merge(other mesh.GossipData) mesh.GossipData {
|
|||
gd[id] = s
|
||||
continue
|
||||
}
|
||||
pts, err := ptypes.Timestamp(prev.Silence.UpdatedAt)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
sts, err := ptypes.Timestamp(s.Silence.UpdatedAt)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if pts.Before(sts) {
|
||||
|
||||
if prev.Silence.UpdatedAt.Before(s.Silence.UpdatedAt) {
|
||||
gd[id] = s
|
||||
}
|
||||
}
|
||||
|
@ -850,15 +804,7 @@ func (gd gossipData) mergeDelta(od gossipData) gossipData {
|
|||
delta[id] = s
|
||||
continue
|
||||
}
|
||||
pts, err := ptypes.Timestamp(prev.Silence.UpdatedAt)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
sts, err := ptypes.Timestamp(s.Silence.UpdatedAt)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if pts.Before(sts) {
|
||||
if prev.Silence.UpdatedAt.Before(s.Silence.UpdatedAt) {
|
||||
gd[id] = s
|
||||
delta[id] = s
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue