diff --git a/cluster/cluster.go b/cluster/cluster.go index b1ca2b77..f6cd4926 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -190,9 +190,11 @@ func (p *Peer) Peers() []*memberlist.Node { // State is a piece of state that can be serialized and merged with other // serialized state. type State interface { + // MarshalBinary serializes the underlying state. MarshalBinary() ([]byte, error) + + // Merge merges serialized state into the underlying state. Merge(b []byte) error - MergeSingle(b []byte) error } // Channel allows clients to send messages for a specific state type that will be @@ -273,7 +275,7 @@ func (d *delegate) NotifyMsg(b []byte) { if !ok { return } - if err := s.MergeSingle(p.Data); err != nil { + if err := s.Merge(p.Data); err != nil { level.Warn(d.logger).Log("msg", "merge broadcast", "err", err) return } diff --git a/nflog/nflog.go b/nflog/nflog.go index 4ddb0fc6..494189bf 100644 --- a/nflog/nflog.go +++ b/nflog/nflog.go @@ -489,6 +489,7 @@ func (l *Log) MarshalBinary() ([]byte, error) { return l.st.MarshalBinary() } +// Merge serialized silence state into own state. func (l *Log) Merge(b []byte) error { st, err := decodeState(bytes.NewReader(b)) if err != nil { @@ -503,17 +504,8 @@ func (l *Log) Merge(b []byte) error { return nil } -func (l *Log) MergeSingle(b []byte) error { - var e pb.MeshEntry - if err := proto.Unmarshal(b, &e); err != nil { - return err - } - l.mtx.Lock() - l.st.merge(&e) - l.mtx.Unlock() - return nil -} - +// SetBroadcast sets a broadcast callback that will be invoked with serialized state +// on updates. func (l *Log) SetBroadcast(f func([]byte)) { l.mtx.Lock() l.broadcast = f