Add a 'ListExisting' option to get the existing entries in the

route/addr/link tables as part of RouteSubscribeWithOptions,
AddrSubscribeWithOptions, and LinkSubscribeWithOptions.
This commit is contained in:
eriknordmark 2018-01-19 13:40:24 -08:00 committed by Alessandro Boch
parent 5a988e882d
commit 5f5d5cddcf
6 changed files with 263 additions and 13 deletions

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"net" "net"
"strings" "strings"
"syscall"
"github.com/vishvananda/netlink/nl" "github.com/vishvananda/netlink/nl"
"github.com/vishvananda/netns" "github.com/vishvananda/netns"
@ -249,13 +250,13 @@ type AddrUpdate struct {
// AddrSubscribe takes a chan down which notifications will be sent // AddrSubscribe takes a chan down which notifications will be sent
// when addresses change. Close the 'done' chan to stop subscription. // when addresses change. Close the 'done' chan to stop subscription.
func AddrSubscribe(ch chan<- AddrUpdate, done <-chan struct{}) error { func AddrSubscribe(ch chan<- AddrUpdate, done <-chan struct{}) error {
return addrSubscribeAt(netns.None(), netns.None(), ch, done, nil) return addrSubscribeAt(netns.None(), netns.None(), ch, done, nil, false)
} }
// AddrSubscribeAt works like AddrSubscribe plus it allows the caller // AddrSubscribeAt works like AddrSubscribe plus it allows the caller
// to choose the network namespace in which to subscribe (ns). // to choose the network namespace in which to subscribe (ns).
func AddrSubscribeAt(ns netns.NsHandle, ch chan<- AddrUpdate, done <-chan struct{}) error { func AddrSubscribeAt(ns netns.NsHandle, ch chan<- AddrUpdate, done <-chan struct{}) error {
return addrSubscribeAt(ns, netns.None(), ch, done, nil) return addrSubscribeAt(ns, netns.None(), ch, done, nil, false)
} }
// AddrSubscribeOptions contains a set of options to use with // AddrSubscribeOptions contains a set of options to use with
@ -263,6 +264,7 @@ func AddrSubscribeAt(ns netns.NsHandle, ch chan<- AddrUpdate, done <-chan struct
type AddrSubscribeOptions struct { type AddrSubscribeOptions struct {
Namespace *netns.NsHandle Namespace *netns.NsHandle
ErrorCallback func(error) ErrorCallback func(error)
ListExisting bool
} }
// AddrSubscribeWithOptions work like AddrSubscribe but enable to // AddrSubscribeWithOptions work like AddrSubscribe but enable to
@ -273,10 +275,10 @@ func AddrSubscribeWithOptions(ch chan<- AddrUpdate, done <-chan struct{}, option
none := netns.None() none := netns.None()
options.Namespace = &none options.Namespace = &none
} }
return addrSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback) return addrSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback, options.ListExisting)
} }
func addrSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- AddrUpdate, done <-chan struct{}, cberr func(error)) error { func addrSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- AddrUpdate, done <-chan struct{}, cberr func(error), listExisting bool) error {
s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_IPV4_IFADDR, unix.RTNLGRP_IPV6_IFADDR) s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_IPV4_IFADDR, unix.RTNLGRP_IPV6_IFADDR)
if err != nil { if err != nil {
return err return err
@ -287,6 +289,15 @@ func addrSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- AddrUpdate, done <-c
s.Close() s.Close()
}() }()
} }
if listExisting {
req := pkgHandle.newNetlinkRequest(unix.RTM_GETADDR,
unix.NLM_F_DUMP)
infmsg := nl.NewIfInfomsg(unix.AF_UNSPEC)
req.AddData(infmsg)
if err := s.Send(req); err != nil {
return err
}
}
go func() { go func() {
defer close(ch) defer close(ch)
for { for {
@ -298,6 +309,20 @@ func addrSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- AddrUpdate, done <-c
return return
} }
for _, m := range msgs { for _, m := range msgs {
if m.Header.Type == unix.NLMSG_DONE {
continue
}
if m.Header.Type == unix.NLMSG_ERROR {
native := nl.NativeEndian()
error := int32(native.Uint32(m.Data[0:4]))
if error == 0 {
continue
}
if cberr != nil {
cberr(syscall.Errno(-error))
}
return
}
msgType := m.Header.Type msgType := m.Header.Type
if msgType != unix.RTM_NEWADDR && msgType != unix.RTM_DELADDR { if msgType != unix.RTM_NEWADDR && msgType != unix.RTM_DELADDR {
if cberr != nil { if cberr != nil {

View File

@ -245,3 +245,43 @@ func TestAddrSubscribeWithOptions(t *testing.T) {
t.Fatal("Add update not received as expected") t.Fatal("Add update not received as expected")
} }
} }
func TestAddrSubscribeListExisting(t *testing.T) {
tearDown := setUpNetlinkTest(t)
defer tearDown()
ch := make(chan AddrUpdate)
done := make(chan struct{})
defer close(done)
// get loopback interface
link, err := LinkByName("lo")
if err != nil {
t.Fatal(err)
}
// bring the interface up
if err = LinkSetUp(link); err != nil {
t.Fatal(err)
}
var lastError error
defer func() {
if lastError != nil {
t.Fatalf("Fatal error received during subscription: %v", lastError)
}
}()
if err := AddrSubscribeWithOptions(ch, done, AddrSubscribeOptions{
ErrorCallback: func(err error) {
lastError = err
},
ListExisting: true,
}); err != nil {
t.Fatal(err)
}
ip := net.IPv4(127, 0, 0, 1)
if !expectAddrUpdate(ch, true, ip) {
t.Fatal("Add update not received as expected")
}
}

View File

@ -1476,13 +1476,13 @@ type LinkUpdate struct {
// LinkSubscribe takes a chan down which notifications will be sent // LinkSubscribe takes a chan down which notifications will be sent
// when links change. Close the 'done' chan to stop subscription. // when links change. Close the 'done' chan to stop subscription.
func LinkSubscribe(ch chan<- LinkUpdate, done <-chan struct{}) error { func LinkSubscribe(ch chan<- LinkUpdate, done <-chan struct{}) error {
return linkSubscribeAt(netns.None(), netns.None(), ch, done, nil) return linkSubscribeAt(netns.None(), netns.None(), ch, done, nil, false)
} }
// LinkSubscribeAt works like LinkSubscribe plus it allows the caller // LinkSubscribeAt works like LinkSubscribe plus it allows the caller
// to choose the network namespace in which to subscribe (ns). // to choose the network namespace in which to subscribe (ns).
func LinkSubscribeAt(ns netns.NsHandle, ch chan<- LinkUpdate, done <-chan struct{}) error { func LinkSubscribeAt(ns netns.NsHandle, ch chan<- LinkUpdate, done <-chan struct{}) error {
return linkSubscribeAt(ns, netns.None(), ch, done, nil) return linkSubscribeAt(ns, netns.None(), ch, done, nil, false)
} }
// LinkSubscribeOptions contains a set of options to use with // LinkSubscribeOptions contains a set of options to use with
@ -1490,6 +1490,7 @@ func LinkSubscribeAt(ns netns.NsHandle, ch chan<- LinkUpdate, done <-chan struct
type LinkSubscribeOptions struct { type LinkSubscribeOptions struct {
Namespace *netns.NsHandle Namespace *netns.NsHandle
ErrorCallback func(error) ErrorCallback func(error)
ListExisting bool
} }
// LinkSubscribeWithOptions work like LinkSubscribe but enable to // LinkSubscribeWithOptions work like LinkSubscribe but enable to
@ -1500,10 +1501,10 @@ func LinkSubscribeWithOptions(ch chan<- LinkUpdate, done <-chan struct{}, option
none := netns.None() none := netns.None()
options.Namespace = &none options.Namespace = &none
} }
return linkSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback) return linkSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback, options.ListExisting)
} }
func linkSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- LinkUpdate, done <-chan struct{}, cberr func(error)) error { func linkSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- LinkUpdate, done <-chan struct{}, cberr func(error), listExisting bool) error {
s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_LINK) s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_LINK)
if err != nil { if err != nil {
return err return err
@ -1514,6 +1515,15 @@ func linkSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- LinkUpdate, done <-c
s.Close() s.Close()
}() }()
} }
if listExisting {
req := pkgHandle.newNetlinkRequest(unix.RTM_GETLINK,
unix.NLM_F_DUMP)
msg := nl.NewIfInfomsg(unix.AF_UNSPEC)
req.AddData(msg)
if err := s.Send(req); err != nil {
return err
}
}
go func() { go func() {
defer close(ch) defer close(ch)
for { for {
@ -1525,6 +1535,20 @@ func linkSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- LinkUpdate, done <-c
return return
} }
for _, m := range msgs { for _, m := range msgs {
if m.Header.Type == unix.NLMSG_DONE {
continue
}
if m.Header.Type == unix.NLMSG_ERROR {
native := nl.NativeEndian()
error := int32(native.Uint32(m.Data[0:4]))
if error == 0 {
continue
}
if cberr != nil {
cberr(syscall.Errno(-error))
}
return
}
ifmsg := nl.DeserializeIfInfomsg(m.Data) ifmsg := nl.DeserializeIfInfomsg(m.Data)
header := unix.NlMsghdr(m.Header) header := unix.NlMsghdr(m.Header)
link, err := LinkDeserialize(&header, m.Data) link, err := LinkDeserialize(&header, m.Data)

View File

@ -1191,6 +1191,59 @@ func TestLinkSubscribeAt(t *testing.T) {
} }
} }
func TestLinkSubscribeListExisting(t *testing.T) {
skipUnlessRoot(t)
// Create an handle on a custom netns
newNs, err := netns.New()
if err != nil {
t.Fatal(err)
}
defer newNs.Close()
nh, err := NewHandleAt(newNs)
if err != nil {
t.Fatal(err)
}
defer nh.Delete()
link := &Veth{LinkAttrs{Name: "test", TxQLen: testTxQLen, MTU: 1400}, "bar"}
if err := nh.LinkAdd(link); err != nil {
t.Fatal(err)
}
// Subscribe for Link events on the custom netns
ch := make(chan LinkUpdate)
done := make(chan struct{})
defer close(done)
if err := LinkSubscribeWithOptions(ch, done, LinkSubscribeOptions{
Namespace: &newNs,
ListExisting: true},
); err != nil {
t.Fatal(err)
}
if !expectLinkUpdate(ch, "test", false) {
t.Fatal("Add update not received as expected")
}
if err := nh.LinkSetUp(link); err != nil {
t.Fatal(err)
}
if !expectLinkUpdate(ch, "test", true) {
t.Fatal("Link Up update not received as expected")
}
if err := nh.LinkDel(link); err != nil {
t.Fatal(err)
}
if !expectLinkUpdate(ch, "test", false) {
t.Fatal("Del update not received as expected")
}
}
func TestLinkStats(t *testing.T) { func TestLinkStats(t *testing.T) {
defer setUpNetlinkTest(t)() defer setUpNetlinkTest(t)()

View File

@ -789,13 +789,13 @@ func (h *Handle) RouteGet(destination net.IP) ([]Route, error) {
// RouteSubscribe takes a chan down which notifications will be sent // RouteSubscribe takes a chan down which notifications will be sent
// when routes are added or deleted. Close the 'done' chan to stop subscription. // when routes are added or deleted. Close the 'done' chan to stop subscription.
func RouteSubscribe(ch chan<- RouteUpdate, done <-chan struct{}) error { func RouteSubscribe(ch chan<- RouteUpdate, done <-chan struct{}) error {
return routeSubscribeAt(netns.None(), netns.None(), ch, done, nil) return routeSubscribeAt(netns.None(), netns.None(), ch, done, nil, false)
} }
// RouteSubscribeAt works like RouteSubscribe plus it allows the caller // RouteSubscribeAt works like RouteSubscribe plus it allows the caller
// to choose the network namespace in which to subscribe (ns). // to choose the network namespace in which to subscribe (ns).
func RouteSubscribeAt(ns netns.NsHandle, ch chan<- RouteUpdate, done <-chan struct{}) error { func RouteSubscribeAt(ns netns.NsHandle, ch chan<- RouteUpdate, done <-chan struct{}) error {
return routeSubscribeAt(ns, netns.None(), ch, done, nil) return routeSubscribeAt(ns, netns.None(), ch, done, nil, false)
} }
// RouteSubscribeOptions contains a set of options to use with // RouteSubscribeOptions contains a set of options to use with
@ -803,6 +803,7 @@ func RouteSubscribeAt(ns netns.NsHandle, ch chan<- RouteUpdate, done <-chan stru
type RouteSubscribeOptions struct { type RouteSubscribeOptions struct {
Namespace *netns.NsHandle Namespace *netns.NsHandle
ErrorCallback func(error) ErrorCallback func(error)
ListExisting bool
} }
// RouteSubscribeWithOptions work like RouteSubscribe but enable to // RouteSubscribeWithOptions work like RouteSubscribe but enable to
@ -813,10 +814,10 @@ func RouteSubscribeWithOptions(ch chan<- RouteUpdate, done <-chan struct{}, opti
none := netns.None() none := netns.None()
options.Namespace = &none options.Namespace = &none
} }
return routeSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback) return routeSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback, options.ListExisting)
} }
func routeSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- RouteUpdate, done <-chan struct{}, cberr func(error)) error { func routeSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- RouteUpdate, done <-chan struct{}, cberr func(error), listExisting bool) error {
s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_IPV4_ROUTE, unix.RTNLGRP_IPV6_ROUTE) s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_IPV4_ROUTE, unix.RTNLGRP_IPV6_ROUTE)
if err != nil { if err != nil {
return err return err
@ -827,6 +828,15 @@ func routeSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- RouteUpdate, done <
s.Close() s.Close()
}() }()
} }
if listExisting {
req := pkgHandle.newNetlinkRequest(unix.RTM_GETROUTE,
unix.NLM_F_DUMP)
infmsg := nl.NewIfInfomsg(unix.AF_UNSPEC)
req.AddData(infmsg)
if err := s.Send(req); err != nil {
return err
}
}
go func() { go func() {
defer close(ch) defer close(ch)
for { for {
@ -838,6 +848,20 @@ func routeSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- RouteUpdate, done <
return return
} }
for _, m := range msgs { for _, m := range msgs {
if m.Header.Type == unix.NLMSG_DONE {
continue
}
if m.Header.Type == unix.NLMSG_ERROR {
native := nl.NativeEndian()
error := int32(native.Uint32(m.Data[0:4]))
if error == 0 {
continue
}
if cberr != nil {
cberr(syscall.Errno(-error))
}
return
}
route, err := deserializeRoute(m.Data) route, err := deserializeRoute(m.Data)
if err != nil { if err != nil {
if cberr != nil { if cberr != nil {

View File

@ -157,7 +157,9 @@ func expectRouteUpdate(ch <-chan RouteUpdate, t uint16, dst net.IP) bool {
timeout := time.After(time.Minute) timeout := time.After(time.Minute)
select { select {
case update := <-ch: case update := <-ch:
if update.Type == t && update.Route.Dst.IP.Equal(dst) { if update.Type == t &&
update.Route.Dst != nil &&
update.Route.Dst.IP.Equal(dst) {
return true return true
} }
case <-timeout: case <-timeout:
@ -318,6 +320,88 @@ func TestRouteSubscribeAt(t *testing.T) {
} }
} }
func TestRouteSubscribeListExisting(t *testing.T) {
skipUnlessRoot(t)
// Create an handle on a custom netns
newNs, err := netns.New()
if err != nil {
t.Fatal(err)
}
defer newNs.Close()
nh, err := NewHandleAt(newNs)
if err != nil {
t.Fatal(err)
}
defer nh.Delete()
// get loopback interface
link, err := nh.LinkByName("lo")
if err != nil {
t.Fatal(err)
}
// bring the interface up
if err = nh.LinkSetUp(link); err != nil {
t.Fatal(err)
}
// add a gateway route before subscribing
dst10 := &net.IPNet{
IP: net.IPv4(10, 10, 10, 0),
Mask: net.CIDRMask(24, 32),
}
ip := net.IPv4(127, 100, 1, 1)
route10 := Route{LinkIndex: link.Attrs().Index, Dst: dst10, Src: ip}
if err := nh.RouteAdd(&route10); err != nil {
t.Fatal(err)
}
// Subscribe for Route events including existing routes
ch := make(chan RouteUpdate)
done := make(chan struct{})
defer close(done)
if err := RouteSubscribeWithOptions(ch, done, RouteSubscribeOptions{
Namespace: &newNs,
ListExisting: true},
); err != nil {
t.Fatal(err)
}
if !expectRouteUpdate(ch, unix.RTM_NEWROUTE, dst10.IP) {
t.Fatal("Existing add update not received as expected")
}
// add a gateway route
dst := &net.IPNet{
IP: net.IPv4(192, 169, 0, 0),
Mask: net.CIDRMask(24, 32),
}
route := Route{LinkIndex: link.Attrs().Index, Dst: dst, Src: ip}
if err := nh.RouteAdd(&route); err != nil {
t.Fatal(err)
}
if !expectRouteUpdate(ch, unix.RTM_NEWROUTE, dst.IP) {
t.Fatal("Add update not received as expected")
}
if err := nh.RouteDel(&route); err != nil {
t.Fatal(err)
}
if !expectRouteUpdate(ch, unix.RTM_DELROUTE, dst.IP) {
t.Fatal("Del update not received as expected")
}
if err := nh.RouteDel(&route10); err != nil {
t.Fatal(err)
}
if !expectRouteUpdate(ch, unix.RTM_DELROUTE, dst10.IP) {
t.Fatal("Del update not received as expected")
}
}
func TestRouteFilterAllTables(t *testing.T) { func TestRouteFilterAllTables(t *testing.T) {
tearDown := setUpNetlinkTest(t) tearDown := setUpNetlinkTest(t)
defer tearDown() defer tearDown()