diff --git a/addr_linux.go b/addr_linux.go index dc1fd6a..d59c328 100644 --- a/addr_linux.go +++ b/addr_linux.go @@ -4,6 +4,7 @@ import ( "fmt" "net" "strings" + "syscall" "github.com/vishvananda/netlink/nl" "github.com/vishvananda/netns" @@ -249,13 +250,13 @@ type AddrUpdate struct { // AddrSubscribe takes a chan down which notifications will be sent // when addresses change. Close the 'done' chan to stop subscription. func AddrSubscribe(ch chan<- AddrUpdate, done <-chan struct{}) error { - return addrSubscribeAt(netns.None(), netns.None(), ch, done, nil) + return addrSubscribeAt(netns.None(), netns.None(), ch, done, nil, false) } // AddrSubscribeAt works like AddrSubscribe plus it allows the caller // to choose the network namespace in which to subscribe (ns). func AddrSubscribeAt(ns netns.NsHandle, ch chan<- AddrUpdate, done <-chan struct{}) error { - return addrSubscribeAt(ns, netns.None(), ch, done, nil) + return addrSubscribeAt(ns, netns.None(), ch, done, nil, false) } // AddrSubscribeOptions contains a set of options to use with @@ -263,6 +264,7 @@ func AddrSubscribeAt(ns netns.NsHandle, ch chan<- AddrUpdate, done <-chan struct type AddrSubscribeOptions struct { Namespace *netns.NsHandle ErrorCallback func(error) + ListExisting bool } // AddrSubscribeWithOptions work like AddrSubscribe but enable to @@ -273,10 +275,10 @@ func AddrSubscribeWithOptions(ch chan<- AddrUpdate, done <-chan struct{}, option none := netns.None() options.Namespace = &none } - return addrSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback) + return addrSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback, options.ListExisting) } -func addrSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- AddrUpdate, done <-chan struct{}, cberr func(error)) error { +func addrSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- AddrUpdate, done <-chan struct{}, cberr func(error), listExisting bool) error { s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_IPV4_IFADDR, unix.RTNLGRP_IPV6_IFADDR) if err != nil { return err @@ -287,6 +289,15 @@ func addrSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- AddrUpdate, done <-c s.Close() }() } + if listExisting { + req := pkgHandle.newNetlinkRequest(unix.RTM_GETADDR, + unix.NLM_F_DUMP) + infmsg := nl.NewIfInfomsg(unix.AF_UNSPEC) + req.AddData(infmsg) + if err := s.Send(req); err != nil { + return err + } + } go func() { defer close(ch) for { @@ -298,6 +309,20 @@ func addrSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- AddrUpdate, done <-c return } for _, m := range msgs { + if m.Header.Type == unix.NLMSG_DONE { + continue + } + if m.Header.Type == unix.NLMSG_ERROR { + native := nl.NativeEndian() + error := int32(native.Uint32(m.Data[0:4])) + if error == 0 { + continue + } + if cberr != nil { + cberr(syscall.Errno(-error)) + } + return + } msgType := m.Header.Type if msgType != unix.RTM_NEWADDR && msgType != unix.RTM_DELADDR { if cberr != nil { diff --git a/addr_test.go b/addr_test.go index c78182e..c158159 100644 --- a/addr_test.go +++ b/addr_test.go @@ -245,3 +245,43 @@ func TestAddrSubscribeWithOptions(t *testing.T) { t.Fatal("Add update not received as expected") } } + +func TestAddrSubscribeListExisting(t *testing.T) { + tearDown := setUpNetlinkTest(t) + defer tearDown() + + ch := make(chan AddrUpdate) + done := make(chan struct{}) + defer close(done) + + // get loopback interface + link, err := LinkByName("lo") + if err != nil { + t.Fatal(err) + } + + // bring the interface up + if err = LinkSetUp(link); err != nil { + t.Fatal(err) + } + + var lastError error + defer func() { + if lastError != nil { + t.Fatalf("Fatal error received during subscription: %v", lastError) + } + }() + if err := AddrSubscribeWithOptions(ch, done, AddrSubscribeOptions{ + ErrorCallback: func(err error) { + lastError = err + }, + ListExisting: true, + }); err != nil { + t.Fatal(err) + } + + ip := net.IPv4(127, 0, 0, 1) + if !expectAddrUpdate(ch, true, ip) { + t.Fatal("Add update not received as expected") + } +} diff --git a/link_linux.go b/link_linux.go index 6a59b65..c278f23 100644 --- a/link_linux.go +++ b/link_linux.go @@ -1476,13 +1476,13 @@ type LinkUpdate struct { // LinkSubscribe takes a chan down which notifications will be sent // when links change. Close the 'done' chan to stop subscription. func LinkSubscribe(ch chan<- LinkUpdate, done <-chan struct{}) error { - return linkSubscribeAt(netns.None(), netns.None(), ch, done, nil) + return linkSubscribeAt(netns.None(), netns.None(), ch, done, nil, false) } // LinkSubscribeAt works like LinkSubscribe plus it allows the caller // to choose the network namespace in which to subscribe (ns). func LinkSubscribeAt(ns netns.NsHandle, ch chan<- LinkUpdate, done <-chan struct{}) error { - return linkSubscribeAt(ns, netns.None(), ch, done, nil) + return linkSubscribeAt(ns, netns.None(), ch, done, nil, false) } // LinkSubscribeOptions contains a set of options to use with @@ -1490,6 +1490,7 @@ func LinkSubscribeAt(ns netns.NsHandle, ch chan<- LinkUpdate, done <-chan struct type LinkSubscribeOptions struct { Namespace *netns.NsHandle ErrorCallback func(error) + ListExisting bool } // LinkSubscribeWithOptions work like LinkSubscribe but enable to @@ -1500,10 +1501,10 @@ func LinkSubscribeWithOptions(ch chan<- LinkUpdate, done <-chan struct{}, option none := netns.None() options.Namespace = &none } - return linkSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback) + return linkSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback, options.ListExisting) } -func linkSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- LinkUpdate, done <-chan struct{}, cberr func(error)) error { +func linkSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- LinkUpdate, done <-chan struct{}, cberr func(error), listExisting bool) error { s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_LINK) if err != nil { return err @@ -1514,6 +1515,15 @@ func linkSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- LinkUpdate, done <-c s.Close() }() } + if listExisting { + req := pkgHandle.newNetlinkRequest(unix.RTM_GETLINK, + unix.NLM_F_DUMP) + msg := nl.NewIfInfomsg(unix.AF_UNSPEC) + req.AddData(msg) + if err := s.Send(req); err != nil { + return err + } + } go func() { defer close(ch) for { @@ -1525,6 +1535,20 @@ func linkSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- LinkUpdate, done <-c return } for _, m := range msgs { + if m.Header.Type == unix.NLMSG_DONE { + continue + } + if m.Header.Type == unix.NLMSG_ERROR { + native := nl.NativeEndian() + error := int32(native.Uint32(m.Data[0:4])) + if error == 0 { + continue + } + if cberr != nil { + cberr(syscall.Errno(-error)) + } + return + } ifmsg := nl.DeserializeIfInfomsg(m.Data) header := unix.NlMsghdr(m.Header) link, err := LinkDeserialize(&header, m.Data) diff --git a/link_test.go b/link_test.go index b92b107..7ba87a7 100644 --- a/link_test.go +++ b/link_test.go @@ -1191,6 +1191,59 @@ func TestLinkSubscribeAt(t *testing.T) { } } +func TestLinkSubscribeListExisting(t *testing.T) { + skipUnlessRoot(t) + + // Create an handle on a custom netns + newNs, err := netns.New() + if err != nil { + t.Fatal(err) + } + defer newNs.Close() + + nh, err := NewHandleAt(newNs) + if err != nil { + t.Fatal(err) + } + defer nh.Delete() + + link := &Veth{LinkAttrs{Name: "test", TxQLen: testTxQLen, MTU: 1400}, "bar"} + if err := nh.LinkAdd(link); err != nil { + t.Fatal(err) + } + + // Subscribe for Link events on the custom netns + ch := make(chan LinkUpdate) + done := make(chan struct{}) + defer close(done) + if err := LinkSubscribeWithOptions(ch, done, LinkSubscribeOptions{ + Namespace: &newNs, + ListExisting: true}, + ); err != nil { + t.Fatal(err) + } + + if !expectLinkUpdate(ch, "test", false) { + t.Fatal("Add update not received as expected") + } + + if err := nh.LinkSetUp(link); err != nil { + t.Fatal(err) + } + + if !expectLinkUpdate(ch, "test", true) { + t.Fatal("Link Up update not received as expected") + } + + if err := nh.LinkDel(link); err != nil { + t.Fatal(err) + } + + if !expectLinkUpdate(ch, "test", false) { + t.Fatal("Del update not received as expected") + } +} + func TestLinkStats(t *testing.T) { defer setUpNetlinkTest(t)() diff --git a/route_linux.go b/route_linux.go index fd5ac89..3f85671 100644 --- a/route_linux.go +++ b/route_linux.go @@ -789,13 +789,13 @@ func (h *Handle) RouteGet(destination net.IP) ([]Route, error) { // RouteSubscribe takes a chan down which notifications will be sent // when routes are added or deleted. Close the 'done' chan to stop subscription. func RouteSubscribe(ch chan<- RouteUpdate, done <-chan struct{}) error { - return routeSubscribeAt(netns.None(), netns.None(), ch, done, nil) + return routeSubscribeAt(netns.None(), netns.None(), ch, done, nil, false) } // RouteSubscribeAt works like RouteSubscribe plus it allows the caller // to choose the network namespace in which to subscribe (ns). func RouteSubscribeAt(ns netns.NsHandle, ch chan<- RouteUpdate, done <-chan struct{}) error { - return routeSubscribeAt(ns, netns.None(), ch, done, nil) + return routeSubscribeAt(ns, netns.None(), ch, done, nil, false) } // RouteSubscribeOptions contains a set of options to use with @@ -803,6 +803,7 @@ func RouteSubscribeAt(ns netns.NsHandle, ch chan<- RouteUpdate, done <-chan stru type RouteSubscribeOptions struct { Namespace *netns.NsHandle ErrorCallback func(error) + ListExisting bool } // RouteSubscribeWithOptions work like RouteSubscribe but enable to @@ -813,10 +814,10 @@ func RouteSubscribeWithOptions(ch chan<- RouteUpdate, done <-chan struct{}, opti none := netns.None() options.Namespace = &none } - return routeSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback) + return routeSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback, options.ListExisting) } -func routeSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- RouteUpdate, done <-chan struct{}, cberr func(error)) error { +func routeSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- RouteUpdate, done <-chan struct{}, cberr func(error), listExisting bool) error { s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_IPV4_ROUTE, unix.RTNLGRP_IPV6_ROUTE) if err != nil { return err @@ -827,6 +828,15 @@ func routeSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- RouteUpdate, done < s.Close() }() } + if listExisting { + req := pkgHandle.newNetlinkRequest(unix.RTM_GETROUTE, + unix.NLM_F_DUMP) + infmsg := nl.NewIfInfomsg(unix.AF_UNSPEC) + req.AddData(infmsg) + if err := s.Send(req); err != nil { + return err + } + } go func() { defer close(ch) for { @@ -838,6 +848,20 @@ func routeSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- RouteUpdate, done < return } for _, m := range msgs { + if m.Header.Type == unix.NLMSG_DONE { + continue + } + if m.Header.Type == unix.NLMSG_ERROR { + native := nl.NativeEndian() + error := int32(native.Uint32(m.Data[0:4])) + if error == 0 { + continue + } + if cberr != nil { + cberr(syscall.Errno(-error)) + } + return + } route, err := deserializeRoute(m.Data) if err != nil { if cberr != nil { diff --git a/route_test.go b/route_test.go index 662ddd1..56c539f 100644 --- a/route_test.go +++ b/route_test.go @@ -157,7 +157,9 @@ func expectRouteUpdate(ch <-chan RouteUpdate, t uint16, dst net.IP) bool { timeout := time.After(time.Minute) select { case update := <-ch: - if update.Type == t && update.Route.Dst.IP.Equal(dst) { + if update.Type == t && + update.Route.Dst != nil && + update.Route.Dst.IP.Equal(dst) { return true } case <-timeout: @@ -318,6 +320,88 @@ func TestRouteSubscribeAt(t *testing.T) { } } +func TestRouteSubscribeListExisting(t *testing.T) { + skipUnlessRoot(t) + + // Create an handle on a custom netns + newNs, err := netns.New() + if err != nil { + t.Fatal(err) + } + defer newNs.Close() + + nh, err := NewHandleAt(newNs) + if err != nil { + t.Fatal(err) + } + defer nh.Delete() + + // get loopback interface + link, err := nh.LinkByName("lo") + if err != nil { + t.Fatal(err) + } + + // bring the interface up + if err = nh.LinkSetUp(link); err != nil { + t.Fatal(err) + } + + // add a gateway route before subscribing + dst10 := &net.IPNet{ + IP: net.IPv4(10, 10, 10, 0), + Mask: net.CIDRMask(24, 32), + } + + ip := net.IPv4(127, 100, 1, 1) + route10 := Route{LinkIndex: link.Attrs().Index, Dst: dst10, Src: ip} + if err := nh.RouteAdd(&route10); err != nil { + t.Fatal(err) + } + + // Subscribe for Route events including existing routes + ch := make(chan RouteUpdate) + done := make(chan struct{}) + defer close(done) + if err := RouteSubscribeWithOptions(ch, done, RouteSubscribeOptions{ + Namespace: &newNs, + ListExisting: true}, + ); err != nil { + t.Fatal(err) + } + + if !expectRouteUpdate(ch, unix.RTM_NEWROUTE, dst10.IP) { + t.Fatal("Existing add update not received as expected") + } + + // add a gateway route + dst := &net.IPNet{ + IP: net.IPv4(192, 169, 0, 0), + Mask: net.CIDRMask(24, 32), + } + + route := Route{LinkIndex: link.Attrs().Index, Dst: dst, Src: ip} + if err := nh.RouteAdd(&route); err != nil { + t.Fatal(err) + } + + if !expectRouteUpdate(ch, unix.RTM_NEWROUTE, dst.IP) { + t.Fatal("Add update not received as expected") + } + if err := nh.RouteDel(&route); err != nil { + t.Fatal(err) + } + if !expectRouteUpdate(ch, unix.RTM_DELROUTE, dst.IP) { + t.Fatal("Del update not received as expected") + } + if err := nh.RouteDel(&route10); err != nil { + t.Fatal(err) + } + if !expectRouteUpdate(ch, unix.RTM_DELROUTE, dst10.IP) { + t.Fatal("Del update not received as expected") + } +} + func TestRouteFilterAllTables(t *testing.T) { tearDown := setUpNetlinkTest(t) defer tearDown()