nflog: add mesh gossip support

This commit is contained in:
Fabian Reinartz 2016-08-12 14:28:07 +02:00
parent 3d8e60ded7
commit 80afd502d5
2 changed files with 189 additions and 28 deletions

View File

@ -1,6 +1,7 @@
package nflog
import (
"bytes"
"errors"
"fmt"
"io"
@ -30,8 +31,8 @@ type Log interface {
// - extend the interface by a `QueryOne` method?
// - return an iterator rather than a materialized list?
Query(p ...QueryParam) ([]*pb.Entry, error)
// Delete log entries along the given Parameters. Returns
// the number of deleted entries.
// Delete log st along the given Parameters. Returns
// the number of deleted st.
Delete(p ...DeleteParam) (int, error)
// Snapshot the current log state and return the number
@ -73,8 +74,8 @@ func QGroupKey(gk []byte) QueryParam {
// delQuery holds parameters for a deletion query.
// TODO(fabxc): can this be consolidated with regular QueryParams?
type delQuery struct {
// Delete log entries that are expired. Does NOT delete
// unexpired entries if set to false.
// Delete log st that are expired. Does NOT delete
// unexpired st if set to false.
expired bool
}
@ -82,7 +83,7 @@ type delQuery struct {
// Returns an error for invalid of conflicting parameters.
type DeleteParam func(*delQuery) error
// DExpired adds a parameter to delete expired log entries.
// DExpired adds a parameter to delete expired log st.
func DExpired() DeleteParam {
return func(d *delQuery) error {
d.expired = true
@ -94,10 +95,15 @@ type nlog struct {
retention time.Duration
now func() time.Time
mtx sync.RWMutex
gossip mesh.Gossip // A gossip channel for sharing log state.
// For now we only store the most recently added log entry.
// The key is a serialized concatenation of group key and receiver.
entries map[string]*pb.MeshEntry
// Currently our memory state is equivalent to the mesh.GossipData
// representation. This may change in the future as we support history
// and indexing.
mtx sync.RWMutex
st gossipData
}
// Option configures a new Log implementation.
@ -105,13 +111,14 @@ type Option func(*nlog) error
// WithMesh registers the log with a mesh network with which
// the log state will be shared.
func WithMesh(mr *mesh.Router) Option {
func WithMesh(create func(g mesh.Gossiper) mesh.Gossip) Option {
return func(l *nlog) error {
panic("not implemented")
l.gossip = create(l)
return nil
}
}
// WithRetention sets the retention time for log entries.
// WithRetention sets the retention time for log st.
func WithRetention(d time.Duration) Option {
return func(l *nlog) error {
l.retention = d
@ -133,8 +140,8 @@ func WithNow(f func() time.Time) Option {
// The snapshot is loaded into the Log if it is set.
func New(snapshot io.Reader, opts ...Option) (Log, error) {
l := &nlog{
now: time.Now,
entries: map[string]*pb.MeshEntry{},
now: time.Now,
st: map[string]*pb.MeshEntry{},
}
for _, o := range opts {
if err := o(l); err != nil {
@ -159,15 +166,21 @@ func (l *nlog) LogResolved(r *pb.Receiver, key, hash []byte) error {
return l.log(r, key, hash, true)
}
// stateKey returns a string key for a log entry consisting of entrie's group key
// and receiver.
func stateKey(k []byte, r *pb.Receiver) string {
return fmt.Sprintf("%s:%s", k, r)
}
func (l *nlog) log(r *pb.Receiver, gkey, ghash []byte, resolved bool) error {
// Write all entries with the same timestamp.
// Write all st with the same timestamp.
now := l.now()
key := fmt.Sprintf("%s:%s", r, gkey)
key := stateKey(gkey, r)
l.mtx.Lock()
defer l.mtx.Unlock()
if prevle, ok := l.entries[key]; ok {
if prevle, ok := l.st[key]; ok {
// Entry already exists, only overwrite if timestamp is newer.
// This may with raciness or clock-drift across AM nodes.
prevts, err := ptypes.Timestamp(prevle.Entry.Timestamp)
@ -188,7 +201,7 @@ func (l *nlog) log(r *pb.Receiver, gkey, ghash []byte, resolved bool) error {
return err
}
l.entries[key] = &pb.MeshEntry{
l.st[key] = &pb.MeshEntry{
Entry: &pb.Entry{
Receiver: r,
GroupKey: gkey,
@ -203,9 +216,6 @@ func (l *nlog) log(r *pb.Receiver, gkey, ghash []byte, resolved bool) error {
// Delete implements the Log interface.
func (l *nlog) Delete(params ...DeleteParam) (int, error) {
l.mtx.Lock()
defer l.mtx.Unlock()
var del delQuery
for _, p := range params {
if err := p(&del); err != nil {
@ -218,11 +228,14 @@ func (l *nlog) Delete(params ...DeleteParam) (int, error) {
now := l.now()
var n int
for k, le := range l.entries {
l.mtx.Lock()
defer l.mtx.Unlock()
for k, le := range l.st {
if ets, err := ptypes.Timestamp(le.ExpiresAt); err != nil {
return n, err
} else if ets.Before(now) {
delete(l.entries, k)
delete(l.st, k)
n++
}
}
@ -249,14 +262,13 @@ func (l *nlog) Query(params ...QueryParam) ([]*pb.Entry, error) {
l.mtx.RLock()
defer l.mtx.RUnlock()
key := fmt.Sprintf("%s,%s", q.recv, q.groupKey)
if le, ok := l.entries[key]; ok {
if le, ok := l.st[stateKey(q.groupKey, q.recv)]; ok {
return []*pb.Entry{le.Entry}, nil
}
return nil, ErrNotFound
}
// loadSnapshot loads a snapshot generated by Snapshot() into the state.
func (l *nlog) loadSnapshot(r io.Reader) error {
l.mtx.Lock()
defer l.mtx.Unlock()
@ -269,8 +281,7 @@ func (l *nlog) loadSnapshot(r io.Reader) error {
}
return err
}
key := fmt.Sprintf("%s,%s", r, e.Entry.GroupKey)
l.entries[key] = &e
l.st[stateKey(e.Entry.GroupKey, e.Entry.Receiver)] = &e
}
return nil
@ -282,7 +293,7 @@ func (l *nlog) Snapshot(w io.Writer) (int, error) {
defer l.mtx.RUnlock()
var n int
for _, e := range l.entries {
for _, e := range l.st {
m, err := pbutil.WriteDelimited(w, e)
if err != nil {
return n + m, err
@ -291,3 +302,154 @@ func (l *nlog) Snapshot(w io.Writer) (int, error) {
}
return n, nil
}
// Gossip implements the mesh.Gossiper interface.
func (l *nlog) Gossip() mesh.GossipData {
l.mtx.RLock()
defer l.mtx.RUnlock()
gd := make(gossipData, len(l.st))
for k, v := range l.st {
gd[k] = v
}
return gd
}
// OnGossip implements the mesh.Gossiper interface.
func (l *nlog) OnGossip(msg []byte) (mesh.GossipData, error) {
gd, err := decodeGossipData(msg)
if err != nil {
return nil, err
}
l.mtx.Lock()
defer l.mtx.Unlock()
if delta := l.st.mergeDelta(gd); len(delta) > 0 {
return delta, nil
}
return nil, nil
}
// OnGossipBroadcast implements the mesh.Gossiper interface.
func (l *nlog) OnGossipBroadcast(src mesh.PeerName, msg []byte) (mesh.GossipData, error) {
gd, err := decodeGossipData(msg)
if err != nil {
return nil, err
}
l.mtx.Lock()
defer l.mtx.Unlock()
return l.st.mergeDelta(gd), nil
}
// OnGossipUnicast implements the mesh.Gossiper interface.
func (l *nlog) OnGossipUnicast(src mesh.PeerName, msg []byte) error {
panic("not implemented")
}
// gossipData is a representation of the current log state that
// implements the mesh.GossipData interface.
type gossipData map[string]*pb.MeshEntry
func decodeGossipData(msg []byte) (gossipData, error) {
gd := gossipData{}
rd := bytes.NewReader(msg)
for {
var e pb.MeshEntry
if _, err := pbutil.ReadDelimited(rd, &e); err != nil {
if err == io.EOF {
break
}
return gd, err
}
gd[stateKey(e.Entry.GroupKey, e.Entry.Receiver)] = &e
}
return gd, nil
}
// Encode implements the mesh.GossipData interface.
func (gd gossipData) Encode() [][]byte {
// Split into sub-messages of ~1MB.
const maxSize = 1024 * 1024
var (
buf bytes.Buffer
res [][]byte
n int
)
for _, e := range gd {
m, err := pbutil.WriteDelimited(&buf, e)
n += m
if err != nil {
// TODO(fabxc): log error and skip entry.
panic(err)
}
if n > maxSize {
res = append(res, buf.Bytes())
buf = bytes.Buffer{}
}
}
if buf.Len() > 0 {
res = append(res, buf.Bytes())
}
return res
}
// Merge the notification set with gossip data and return a new notification
// state.
// TODO(fabxc): can we just return the receiver. Does it have to remain
// unmodified. Needs to be clarified upstream.
func (gd gossipData) Merge(other mesh.GossipData) mesh.GossipData {
for k, e := range other.(gossipData) {
prev, ok := gd[k]
if !ok {
gd[k] = e
continue
}
pts, err := ptypes.Timestamp(prev.Entry.Timestamp)
if err != nil {
// TODO(fabxc): log error and skip entry.
panic(err)
}
ets, err := ptypes.Timestamp(e.Entry.Timestamp)
if err != nil {
// TODO(fabxc): see above.
panic(err)
}
if pts.Before(ets) {
gd[k] = e
}
}
return gd
}
// mergeDelta behaves like Merge but returns a gossipData only containing
// things that have changed.
func (gd gossipData) mergeDelta(od gossipData) gossipData {
delta := gossipData{}
for k, e := range od {
prev, ok := gd[k]
if !ok {
gd[k] = e
delta[k] = e
continue
}
pts, err := ptypes.Timestamp(prev.Entry.Timestamp)
if err != nil {
// TODO(fabxc): log error and skip entry.
panic(err)
}
ets, err := ptypes.Timestamp(e.Entry.Timestamp)
if err != nil {
// TODO(fabxc): see above.
panic(err)
}
if pts.Before(ets) {
gd[k] = e
delta[k] = e
}
}
return delta
}

View File

@ -38,4 +38,3 @@ message MeshEntry {
// the log entry from its state.
google.protobuf.Timestamp expires_at = 2;
}