nflog: switch to gogoproto
This switches the nflog to generate Go code via gogoproto and thereby use standard library timestamp types.
This commit is contained in:
parent
af8cfdde14
commit
4258b028d6
|
@ -27,7 +27,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/matttproud/golang_protobuf_extensions/pbutil"
|
||||
pb "github.com/prometheus/alertmanager/nflog/nflogpb"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
@ -340,34 +339,21 @@ func (l *nlog) Log(r *pb.Receiver, gkey []byte, firingAlerts, resolvedAlerts []u
|
|||
|
||||
if prevle, ok := l.st[key]; ok {
|
||||
// Entry already exists, only overwrite if timestamp is newer.
|
||||
// This may with raciness or clock-drift across AM nodes.
|
||||
prevts, err := ptypes.Timestamp(prevle.Entry.Timestamp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if prevts.After(now) {
|
||||
// This may happen with raciness or clock-drift across AM nodes.
|
||||
if prevle.Entry.Timestamp.After(now) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
ts, err := ptypes.TimestampProto(now)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
expts, err := ptypes.TimestampProto(now.Add(l.retention))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e := &pb.MeshEntry{
|
||||
Entry: &pb.Entry{
|
||||
Receiver: r,
|
||||
GroupKey: gkey,
|
||||
Timestamp: ts,
|
||||
Timestamp: now,
|
||||
FiringAlerts: firingAlerts,
|
||||
ResolvedAlerts: resolvedAlerts,
|
||||
},
|
||||
ExpiresAt: expts,
|
||||
ExpiresAt: now.Add(l.retention),
|
||||
}
|
||||
l.gossip.GossipBroadcast(gossipData{
|
||||
key: e,
|
||||
|
@ -389,9 +375,10 @@ func (l *nlog) GC() (int, error) {
|
|||
defer l.mtx.Unlock()
|
||||
|
||||
for k, le := range l.st {
|
||||
if ets, err := ptypes.Timestamp(le.ExpiresAt); err != nil {
|
||||
return n, err
|
||||
} else if !ets.After(now) {
|
||||
if le.ExpiresAt.IsZero() {
|
||||
return n, errors.New("unexpected zero expiration timestamp")
|
||||
}
|
||||
if !le.ExpiresAt.After(now) {
|
||||
delete(l.st, k)
|
||||
n++
|
||||
}
|
||||
|
@ -589,17 +576,7 @@ func (gd gossipData) Merge(other mesh.GossipData) mesh.GossipData {
|
|||
gd[k] = e
|
||||
continue
|
||||
}
|
||||
pts, err := ptypes.Timestamp(prev.Entry.Timestamp)
|
||||
if err != nil {
|
||||
// TODO(fabxc): log error and skip entry. What can actually error here?
|
||||
panic(err)
|
||||
}
|
||||
ets, err := ptypes.Timestamp(e.Entry.Timestamp)
|
||||
if err != nil {
|
||||
// TODO(fabxc): see above.
|
||||
panic(err)
|
||||
}
|
||||
if pts.Before(ets) {
|
||||
if prev.Entry.Timestamp.Before(e.Entry.Timestamp) {
|
||||
gd[k] = e
|
||||
}
|
||||
}
|
||||
|
@ -617,17 +594,7 @@ func (gd gossipData) mergeDelta(od gossipData) gossipData {
|
|||
delta[k] = e
|
||||
continue
|
||||
}
|
||||
pts, err := ptypes.Timestamp(prev.Entry.Timestamp)
|
||||
if err != nil {
|
||||
// TODO(fabxc): log error and skip entry. What can actually error here?
|
||||
panic(err)
|
||||
}
|
||||
ets, err := ptypes.Timestamp(e.Entry.Timestamp)
|
||||
if err != nil {
|
||||
// TODO(fabxc): see above.
|
||||
panic(err)
|
||||
}
|
||||
if pts.Before(ets) {
|
||||
if prev.Entry.Timestamp.Before(e.Entry.Timestamp) {
|
||||
gd[k] = e
|
||||
delta[k] = e
|
||||
}
|
||||
|
|
|
@ -20,8 +20,6 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/golang/protobuf/ptypes/timestamp"
|
||||
pb "github.com/prometheus/alertmanager/nflog/nflogpb"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -31,7 +29,7 @@ func TestNlogGC(t *testing.T) {
|
|||
// We only care about key names and expiration timestamps.
|
||||
newEntry := func(ts time.Time) *pb.MeshEntry {
|
||||
return &pb.MeshEntry{
|
||||
ExpiresAt: mustTimestampProto(ts),
|
||||
ExpiresAt: ts,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -69,27 +67,27 @@ func TestNlogSnapshot(t *testing.T) {
|
|||
Receiver: &pb.Receiver{GroupName: "abc", Integration: "test1", Idx: 1},
|
||||
GroupHash: []byte("126a8a51b9d1bbd07fddc65819a542c3"),
|
||||
Resolved: false,
|
||||
Timestamp: mustTimestampProto(now),
|
||||
Timestamp: now,
|
||||
},
|
||||
ExpiresAt: mustTimestampProto(now),
|
||||
ExpiresAt: now,
|
||||
}, {
|
||||
Entry: &pb.Entry{
|
||||
GroupKey: []byte("d8e8fca2dc0f8abce7cb4cb0031ba249"),
|
||||
Receiver: &pb.Receiver{GroupName: "def", Integration: "test2", Idx: 29},
|
||||
GroupHash: []byte("122c2331b9d1bbd07fddc65819a542c3"),
|
||||
Resolved: true,
|
||||
Timestamp: mustTimestampProto(now),
|
||||
Timestamp: now,
|
||||
},
|
||||
ExpiresAt: mustTimestampProto(now),
|
||||
ExpiresAt: now,
|
||||
}, {
|
||||
Entry: &pb.Entry{
|
||||
GroupKey: []byte("aaaaaca2dc0f896fd7cb4cb0031ba249"),
|
||||
Receiver: &pb.Receiver{GroupName: "ghi", Integration: "test3", Idx: 0},
|
||||
GroupHash: []byte("126a8a51b9d1bbd07fddc6e3e3e542c3"),
|
||||
Resolved: false,
|
||||
Timestamp: mustTimestampProto(now),
|
||||
Timestamp: now,
|
||||
},
|
||||
ExpiresAt: mustTimestampProto(now),
|
||||
ExpiresAt: now,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -160,7 +158,7 @@ func TestGossipDataMerge(t *testing.T) {
|
|||
// merging logic.
|
||||
newEntry := func(ts time.Time) *pb.MeshEntry {
|
||||
return &pb.MeshEntry{
|
||||
Entry: &pb.Entry{Timestamp: mustTimestampProto(ts)},
|
||||
Entry: &pb.Entry{Timestamp: ts},
|
||||
}
|
||||
}
|
||||
cases := []struct {
|
||||
|
@ -225,27 +223,27 @@ func TestGossipDataCoding(t *testing.T) {
|
|||
Receiver: &pb.Receiver{GroupName: "abc", Integration: "test1", Idx: 1},
|
||||
GroupHash: []byte("126a8a51b9d1bbd07fddc65819a542c3"),
|
||||
Resolved: false,
|
||||
Timestamp: mustTimestampProto(now),
|
||||
Timestamp: now,
|
||||
},
|
||||
ExpiresAt: mustTimestampProto(now),
|
||||
ExpiresAt: now,
|
||||
}, {
|
||||
Entry: &pb.Entry{
|
||||
GroupKey: []byte("d8e8fca2dc0f8abce7cb4cb0031ba249"),
|
||||
Receiver: &pb.Receiver{GroupName: "def", Integration: "test2", Idx: 29},
|
||||
GroupHash: []byte("122c2331b9d1bbd07fddc65819a542c3"),
|
||||
Resolved: true,
|
||||
Timestamp: mustTimestampProto(now),
|
||||
Timestamp: now,
|
||||
},
|
||||
ExpiresAt: mustTimestampProto(now),
|
||||
ExpiresAt: now,
|
||||
}, {
|
||||
Entry: &pb.Entry{
|
||||
GroupKey: []byte("aaaaaca2dc0f896fd7cb4cb0031ba249"),
|
||||
Receiver: &pb.Receiver{GroupName: "ghi", Integration: "test3", Idx: 0},
|
||||
GroupHash: []byte("126a8a51b9d1bbd07fddc6e3e3e542c3"),
|
||||
Resolved: false,
|
||||
Timestamp: mustTimestampProto(now),
|
||||
Timestamp: now,
|
||||
},
|
||||
ExpiresAt: mustTimestampProto(now),
|
||||
ExpiresAt: now,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -266,19 +264,3 @@ func TestGossipDataCoding(t *testing.T) {
|
|||
require.Equal(t, in, out, "decoded data doesn't match encoded data")
|
||||
}
|
||||
}
|
||||
|
||||
func mustTimestamp(ts *timestamp.Timestamp) time.Time {
|
||||
res, err := ptypes.Timestamp(ts)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func mustTimestampProto(ts time.Time) *timestamp.Timestamp {
|
||||
res, err := ptypes.TimestampProto(ts)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -3,6 +3,12 @@ syntax = "proto3";
|
|||
package nflogpb;
|
||||
|
||||
import "google/protobuf/timestamp.proto";
|
||||
import "gogoproto/gogo.proto";
|
||||
|
||||
option (gogoproto.marshaler_all) = true;
|
||||
option (gogoproto.sizer_all) = true;
|
||||
option (gogoproto.unmarshaler_all) = true;
|
||||
option (gogoproto.goproto_getters_all) = false;
|
||||
|
||||
message Receiver {
|
||||
// Configured name of the receiver group.
|
||||
|
@ -28,7 +34,7 @@ message Entry {
|
|||
// Deprecated in favor of ResolvedAlerts field, but kept for compatibility.
|
||||
bool resolved = 4;
|
||||
// Timestamp of the succeeding notification.
|
||||
google.protobuf.Timestamp timestamp = 5;
|
||||
google.protobuf.Timestamp timestamp = 5 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
||||
// FiringAlerts list of hashes of firing alerts at the last notification time.
|
||||
repeated uint64 firing_alerts = 6;
|
||||
// ResolvedAlerts list of hashes of resolved alerts at the last notification time.
|
||||
|
@ -42,5 +48,5 @@ message MeshEntry {
|
|||
Entry entry = 1;
|
||||
// A timestamp indicating when the mesh peer should evict
|
||||
// the log entry from its state.
|
||||
google.protobuf.Timestamp expires_at = 2;
|
||||
google.protobuf.Timestamp expires_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ import (
|
|||
|
||||
"github.com/cenkalti/backoff"
|
||||
"github.com/cespare/xxhash"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/prometheus/common/model"
|
||||
|
@ -481,11 +480,7 @@ func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint
|
|||
}
|
||||
|
||||
// Nothing changed, only notify if the repeat interval has passed.
|
||||
ts, err := ptypes.Timestamp(entry.Timestamp)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return ts.Before(n.now().Add(-repeat)), nil
|
||||
return entry.Timestamp.Before(n.now().Add(-repeat)), nil
|
||||
}
|
||||
|
||||
// Exec implements the Stage interface.
|
||||
|
|
|
@ -114,14 +114,14 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
|
|||
}, {
|
||||
entry: &nflogpb.Entry{
|
||||
FiringAlerts: []uint64{1, 2, 3},
|
||||
Timestamp: nil, // parsing will error
|
||||
Timestamp: time.Time{}, // zero timestamp should always update
|
||||
},
|
||||
firingAlerts: alertHashSet(1, 2, 3),
|
||||
resErr: true,
|
||||
res: true,
|
||||
}, {
|
||||
entry: &nflogpb.Entry{
|
||||
FiringAlerts: []uint64{1, 2, 3},
|
||||
Timestamp: mustTimestampProto(now.Add(-9 * time.Minute)),
|
||||
Timestamp: now.Add(-9 * time.Minute),
|
||||
},
|
||||
repeat: 10 * time.Minute,
|
||||
firingAlerts: alertHashSet(1, 2, 3),
|
||||
|
@ -129,7 +129,7 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
|
|||
}, {
|
||||
entry: &nflogpb.Entry{
|
||||
FiringAlerts: []uint64{1, 2, 3},
|
||||
Timestamp: mustTimestampProto(now.Add(-11 * time.Minute)),
|
||||
Timestamp: now.Add(-11 * time.Minute),
|
||||
},
|
||||
repeat: 10 * time.Minute,
|
||||
firingAlerts: alertHashSet(1, 2, 3),
|
||||
|
@ -212,7 +212,7 @@ func TestDedupStage(t *testing.T) {
|
|||
qres: []*nflogpb.Entry{
|
||||
{
|
||||
FiringAlerts: []uint64{0, 1, 2},
|
||||
Timestamp: mustTimestampProto(now),
|
||||
Timestamp: now,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -227,7 +227,7 @@ func TestDedupStage(t *testing.T) {
|
|||
qres: []*nflogpb.Entry{
|
||||
{
|
||||
FiringAlerts: []uint64{1, 2, 3, 4},
|
||||
Timestamp: mustTimestampProto(now),
|
||||
Timestamp: now,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# Generate all etcd protobuf bindings.
|
||||
# Run from repository root.
|
||||
set -e
|
||||
set -u
|
||||
|
||||
if ! [[ "$0" =~ "scripts/genproto.gogo.sh" ]]; then
|
||||
echo "must be run from repository root"
|
||||
exit 255
|
||||
fi
|
||||
|
||||
if ! [[ $(protoc --version) =~ "3.2.0" ]]; then
|
||||
echo "could not find protoc 3.2.0, is it installed + in PATH?"
|
||||
exit 255
|
||||
fi
|
||||
|
||||
GOGOPROTO_ROOT="${GOPATH}/src/github.com/gogo/protobuf"
|
||||
GOGOPROTO_PATH="${GOGOPROTO_ROOT}:${GOGOPROTO_ROOT}/protobuf"
|
||||
GRPC_GATEWAY_ROOT="${GOPATH}/src/github.com/grpc-ecosystem/grpc-gateway"
|
||||
|
||||
DIRS="nflog/nflogpb"
|
||||
|
||||
for dir in ${DIRS}; do
|
||||
pushd ${dir}
|
||||
protoc --gogofast_out=plugins=grpc:. -I=. \
|
||||
-I="${GOGOPROTO_PATH}" \
|
||||
-I="${GRPC_GATEWAY_ROOT}/third_party/googleapis" \
|
||||
*.proto
|
||||
|
||||
sed -i.bak -E 's/import _ \"gogoproto\"//g' *.pb.go
|
||||
sed -i.bak -E 's/import _ \"google\/protobuf\"//g' *.pb.go
|
||||
rm -f *.bak
|
||||
goimports -w *.pb.go
|
||||
popd
|
||||
done
|
|
@ -1,2 +1 @@
|
|||
protoc --go_out=. nflog/nflogpb/nflog.proto
|
||||
protoc --go_out=. silence/silencepb/silence.proto
|
||||
|
|
Loading…
Reference in New Issue