Add support for neighbor subscription

This commit is contained in:
Yuya Kusakabe 2018-09-21 12:03:18 +09:00 committed by Alessandro Boch
parent 531df7a209
commit 6d53654d01
3 changed files with 341 additions and 0 deletions

View File

@ -23,3 +23,9 @@ type Neigh struct {
func (neigh *Neigh) String() string {
return fmt.Sprintf("%s %s", neigh.IP, neigh.HardwareAddr)
}
// NeighUpdate is sent when a neighbor changes - type is RTM_NEWNEIGH or RTM_DELNEIGH.
type NeighUpdate struct {
Type uint16
Neigh
}

View File

@ -2,9 +2,11 @@ package netlink
import (
"net"
"syscall"
"unsafe"
"github.com/vishvananda/netlink/nl"
"github.com/vishvananda/netns"
"golang.org/x/sys/unix"
)
@ -287,3 +289,94 @@ func NeighDeserialize(m []byte) (*Neigh, error) {
return &neigh, nil
}
// NeighSubscribe takes a chan down which notifications will be sent
// when neighbors are added or deleted. Close the 'done' chan to stop subscription.
func NeighSubscribe(ch chan<- NeighUpdate, done <-chan struct{}) error {
return neighSubscribeAt(netns.None(), netns.None(), ch, done, nil, false)
}
// NeighSubscribeAt works like NeighSubscribe plus it allows the caller
// to choose the network namespace in which to subscribe (ns).
func NeighSubscribeAt(ns netns.NsHandle, ch chan<- NeighUpdate, done <-chan struct{}) error {
return neighSubscribeAt(ns, netns.None(), ch, done, nil, false)
}
// NeighSubscribeOptions contains a set of options to use with
// NeighSubscribeWithOptions.
type NeighSubscribeOptions struct {
Namespace *netns.NsHandle
ErrorCallback func(error)
ListExisting bool
}
// NeighSubscribeWithOptions work like NeighSubscribe but enable to
// provide additional options to modify the behavior. Currently, the
// namespace can be provided as well as an error callback.
func NeighSubscribeWithOptions(ch chan<- NeighUpdate, done <-chan struct{}, options NeighSubscribeOptions) error {
if options.Namespace == nil {
none := netns.None()
options.Namespace = &none
}
return neighSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback, options.ListExisting)
}
func neighSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- NeighUpdate, done <-chan struct{}, cberr func(error), listExisting bool) error {
s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_NEIGH)
if err != nil {
return err
}
if done != nil {
go func() {
<-done
s.Close()
}()
}
if listExisting {
req := pkgHandle.newNetlinkRequest(unix.RTM_GETNEIGH,
unix.NLM_F_DUMP)
infmsg := nl.NewIfInfomsg(unix.AF_UNSPEC)
req.AddData(infmsg)
if err := s.Send(req); err != nil {
return err
}
}
go func() {
defer close(ch)
for {
msgs, err := s.Receive()
if err != nil {
if cberr != nil {
cberr(err)
}
return
}
for _, m := range msgs {
if m.Header.Type == unix.NLMSG_DONE {
continue
}
if m.Header.Type == unix.NLMSG_ERROR {
native := nl.NativeEndian()
error := int32(native.Uint32(m.Data[0:4]))
if error == 0 {
continue
}
if cberr != nil {
cberr(syscall.Errno(-error))
}
return
}
neigh, err := NeighDeserialize(m.Data)
if err != nil {
if cberr != nil {
cberr(err)
}
return
}
ch <- NeighUpdate{Type: m.Header.Type, Neigh: *neigh}
}
}
}()
return nil
}

View File

@ -3,8 +3,13 @@
package netlink
import (
"fmt"
"net"
"testing"
"time"
"github.com/vishvananda/netns"
"golang.org/x/sys/unix"
)
type arpEntry struct {
@ -251,3 +256,240 @@ func TestNeighAddDelProxy(t *testing.T) {
t.Fatal(err)
}
}
func expectNeighUpdate(ch <-chan NeighUpdate, t uint16, state int, ip net.IP) error {
for {
timeout := time.After(time.Second)
select {
case update := <-ch:
if update.Type != t {
return fmt.Errorf("want %d got %d", t, update.Type)
}
if update.Neigh.State != state {
return fmt.Errorf("want %d got %d", state, update.State)
}
if update.Neigh.IP == nil {
return fmt.Errorf("Neigh.IP is nil")
}
if !update.Neigh.IP.Equal(ip) {
return fmt.Errorf("Neigh.IP: want %s got %s", ip, update.Neigh.IP)
}
return nil
case <-timeout:
return fmt.Errorf("timeout")
}
}
}
func TestNeighSubscribe(t *testing.T) {
tearDown := setUpNetlinkTest(t)
defer tearDown()
dummy := &Dummy{LinkAttrs{Name: "neigh0"}}
if err := LinkAdd(dummy); err != nil {
t.Errorf("Failed to create link: %v", err)
}
ensureIndex(dummy.Attrs())
defer func() {
if err := LinkDel(dummy); err != nil {
t.Fatal(err)
}
}()
ch := make(chan NeighUpdate)
done := make(chan struct{})
defer close(done)
if err := NeighSubscribe(ch, done); err != nil {
t.Fatal(err)
}
entry := &Neigh{
LinkIndex: dummy.Index,
State: NUD_REACHABLE,
IP: net.IPv4(10, 99, 0, 1),
HardwareAddr: parseMAC("aa:bb:cc:dd:00:01"),
}
if err := NeighAdd(entry); err != nil {
t.Errorf("Failed to NeighAdd: %v", err)
}
if err := expectNeighUpdate(ch, unix.RTM_NEWNEIGH, NUD_REACHABLE, entry.IP); err != nil {
t.Fatalf("Add update not received as expected: %s", err)
}
if err := NeighDel(entry); err != nil {
t.Fatal(err)
}
if err := expectNeighUpdate(ch, unix.RTM_NEWNEIGH, NUD_FAILED, entry.IP); err != nil {
t.Fatalf("Del update not received as expected: %s", err)
}
}
func TestNeighSubscribeWithOptions(t *testing.T) {
tearDown := setUpNetlinkTest(t)
defer tearDown()
ch := make(chan NeighUpdate)
done := make(chan struct{})
defer close(done)
var lastError error
defer func() {
if lastError != nil {
t.Fatalf("Fatal error received during subscription: %v", lastError)
}
}()
if err := NeighSubscribeWithOptions(ch, done, NeighSubscribeOptions{
ErrorCallback: func(err error) {
lastError = err
},
}); err != nil {
t.Fatal(err)
}
dummy := &Dummy{LinkAttrs{Name: "neigh0"}}
if err := LinkAdd(dummy); err != nil {
t.Errorf("Failed to create link: %v", err)
}
ensureIndex(dummy.Attrs())
defer func() {
if err := LinkDel(dummy); err != nil {
t.Fatal(err)
}
}()
entry := &Neigh{
LinkIndex: dummy.Index,
State: NUD_REACHABLE,
IP: net.IPv4(10, 99, 0, 1),
HardwareAddr: parseMAC("aa:bb:cc:dd:00:01"),
}
err := NeighAdd(entry)
if err != nil {
t.Errorf("Failed to NeighAdd: %v", err)
}
if err := expectNeighUpdate(ch, unix.RTM_NEWNEIGH, NUD_REACHABLE, entry.IP); err != nil {
t.Fatalf("Add update not received as expected: %s", err)
}
}
func TestNeighSubscribeAt(t *testing.T) {
skipUnlessRoot(t)
// Create an handle on a custom netns
newNs, err := netns.New()
if err != nil {
t.Fatal(err)
}
defer newNs.Close()
nh, err := NewHandleAt(newNs)
if err != nil {
t.Fatal(err)
}
defer nh.Delete()
// Subscribe for Neigh events on the custom netns
ch := make(chan NeighUpdate)
done := make(chan struct{})
defer close(done)
if err := NeighSubscribeAt(newNs, ch, done); err != nil {
t.Fatal(err)
}
dummy := &Dummy{LinkAttrs{Name: "neigh0"}}
if err := nh.LinkAdd(dummy); err != nil {
t.Errorf("Failed to create link: %v", err)
}
ensureIndex(dummy.Attrs())
defer func() {
if err := nh.LinkDel(dummy); err != nil {
t.Fatal(err)
}
}()
entry := &Neigh{
LinkIndex: dummy.Index,
State: NUD_REACHABLE,
IP: net.IPv4(198, 51, 100, 1),
HardwareAddr: parseMAC("aa:bb:cc:dd:00:01"),
}
err = nh.NeighAdd(entry)
if err != nil {
t.Errorf("Failed to NeighAdd: %v", err)
}
if err := expectNeighUpdate(ch, unix.RTM_NEWNEIGH, NUD_REACHABLE, entry.IP); err != nil {
t.Fatalf("Add update not received as expected: %s", err)
}
}
func TestNeighSubscribeListExisting(t *testing.T) {
skipUnlessRoot(t)
// Create an handle on a custom netns
newNs, err := netns.New()
if err != nil {
t.Fatal(err)
}
defer newNs.Close()
nh, err := NewHandleAt(newNs)
if err != nil {
t.Fatal(err)
}
defer nh.Delete()
dummy := &Dummy{LinkAttrs{Name: "neigh0"}}
if err := nh.LinkAdd(dummy); err != nil {
t.Errorf("Failed to create link: %v", err)
}
ensureIndex(dummy.Attrs())
defer func() {
if err := nh.LinkDel(dummy); err != nil {
t.Fatal(err)
}
}()
entry1 := &Neigh{
LinkIndex: dummy.Index,
State: NUD_REACHABLE,
IP: net.IPv4(198, 51, 100, 1),
HardwareAddr: parseMAC("aa:bb:cc:dd:00:01"),
}
err = nh.NeighAdd(entry1)
if err != nil {
t.Errorf("Failed to NeighAdd: %v", err)
}
// Subscribe for Neigh events including existing neighbors
ch := make(chan NeighUpdate)
done := make(chan struct{})
defer close(done)
if err := NeighSubscribeWithOptions(ch, done, NeighSubscribeOptions{
Namespace: &newNs,
ListExisting: true},
); err != nil {
t.Fatal(err)
}
if err := expectNeighUpdate(ch, unix.RTM_NEWNEIGH, NUD_REACHABLE, entry1.IP); err != nil {
t.Fatalf("Existing add update not received as expected: %s", err)
}
entry2 := &Neigh{
LinkIndex: dummy.Index,
State: NUD_PERMANENT,
IP: net.IPv4(198, 51, 100, 2),
HardwareAddr: parseMAC("aa:bb:cc:dd:00:02"),
}
err = nh.NeighAdd(entry2)
if err != nil {
t.Errorf("Failed to NeighAdd: %v", err)
}
if err := expectNeighUpdate(ch, unix.RTM_NEWNEIGH, NUD_PERMANENT, entry2.IP); err != nil {
t.Fatalf("Existing add update not received as expected: %s", err)
}
}