From fea1da6ef55f0d6cac746fe3e6f1817891d26ed9 Mon Sep 17 00:00:00 2001 From: Alessandro Boch Date: Wed, 29 Jun 2016 09:11:35 -0700 Subject: [PATCH] Allow to subscribe to events on a specfic netns (#142) Signed-off-by: Alessandro Boch --- addr_linux.go | 13 ++++++++++- link_linux.go | 13 ++++++++++- link_test.go | 48 ++++++++++++++++++++++++++++++++++++++ nl/nl_linux.go | 63 +++++++++++++++++++++++++++++++++++++++++++++----- route_linux.go | 13 ++++++++++- route_test.go | 58 ++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 199 insertions(+), 9 deletions(-) diff --git a/addr_linux.go b/addr_linux.go index b5eec65..8820c02 100644 --- a/addr_linux.go +++ b/addr_linux.go @@ -8,6 +8,7 @@ import ( "syscall" "github.com/vishvananda/netlink/nl" + "github.com/vishvananda/netns" ) // IFA_FLAGS is a u32 attribute. @@ -192,7 +193,17 @@ type AddrUpdate struct { // AddrSubscribe takes a chan down which notifications will be sent // when addresses change. Close the 'done' chan to stop subscription. func AddrSubscribe(ch chan<- AddrUpdate, done <-chan struct{}) error { - s, err := nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_IPV4_IFADDR, syscall.RTNLGRP_IPV6_IFADDR) + return addrSubscribe(netns.None(), netns.None(), ch, done) +} + +// AddrSubscribeAt works like AddrSubscribe plus it allows the caller +// to choose the network namespace in which to subscribe (ns). +func AddrSubscribeAt(ns netns.NsHandle, ch chan<- AddrUpdate, done <-chan struct{}) error { + return addrSubscribe(ns, netns.None(), ch, done) +} + +func addrSubscribe(newNs, curNs netns.NsHandle, ch chan<- AddrUpdate, done <-chan struct{}) error { + s, err := nl.SubscribeAt(newNs, curNs, syscall.NETLINK_ROUTE, syscall.RTNLGRP_IPV4_IFADDR, syscall.RTNLGRP_IPV6_IFADDR) if err != nil { return err } diff --git a/link_linux.go b/link_linux.go index 08a9384..2d6a5e0 100644 --- a/link_linux.go +++ b/link_linux.go @@ -10,6 +10,7 @@ import ( "unsafe" "github.com/vishvananda/netlink/nl" + "github.com/vishvananda/netns" ) const SizeofLinkStats = 0x5c @@ -1011,7 +1012,17 @@ type LinkUpdate struct { // LinkSubscribe takes a chan down which notifications will be sent // when links change. Close the 'done' chan to stop subscription. func LinkSubscribe(ch chan<- LinkUpdate, done <-chan struct{}) error { - s, err := nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_LINK) + return linkSubscribe(netns.None(), netns.None(), ch, done) +} + +// LinkSubscribeAt works like LinkSubscribe plus it allows the caller +// to choose the network namespace in which to subscribe (ns). +func LinkSubscribeAt(ns netns.NsHandle, ch chan<- LinkUpdate, done <-chan struct{}) error { + return linkSubscribe(ns, netns.None(), ch, done) +} + +func linkSubscribe(newNs, curNs netns.NsHandle, ch chan<- LinkUpdate, done <-chan struct{}) error { + s, err := nl.SubscribeAt(newNs, curNs, syscall.NETLINK_ROUTE, syscall.RTNLGRP_LINK) if err != nil { return err } diff --git a/link_test.go b/link_test.go index ca11bb2..04aec3e 100644 --- a/link_test.go +++ b/link_test.go @@ -814,6 +814,54 @@ func TestLinkSubscribe(t *testing.T) { } } +func TestLinkSubscribeAt(t *testing.T) { + // Create an handle on a custom netns + newNs, err := netns.New() + if err != nil { + t.Fatal(err) + } + defer newNs.Close() + + nh, err := NewHandleAt(newNs) + if err != nil { + t.Fatal(err) + } + defer nh.Delete() + + // Subscribe for Link events on the custom netns + ch := make(chan LinkUpdate) + done := make(chan struct{}) + defer close(done) + if err := LinkSubscribeAt(newNs, ch, done); err != nil { + t.Fatal(err) + } + + link := &Veth{LinkAttrs{Name: "test", TxQLen: testTxQLen, MTU: 1400}, "bar"} + if err := nh.LinkAdd(link); err != nil { + t.Fatal(err) + } + + if !expectLinkUpdate(ch, "test", false) { + t.Fatal("Add update not received as expected") + } + + if err := nh.LinkSetUp(link); err != nil { + t.Fatal(err) + } + + if !expectLinkUpdate(ch, "test", true) { + t.Fatal("Link Up update not received as expected") + } + + if err := nh.LinkDel(link); err != nil { + t.Fatal(err) + } + + if !expectLinkUpdate(ch, "test", false) { + t.Fatal("Del update not received as expected") + } +} + func TestLinkStats(t *testing.T) { defer setUpNetlinkTest(t)() diff --git a/nl/nl_linux.go b/nl/nl_linux.go index fd5550a..17683a7 100644 --- a/nl/nl_linux.go +++ b/nl/nl_linux.go @@ -331,24 +331,63 @@ func getNetlinkSocket(protocol int) (*NetlinkSocket, error) { // moves back into it when done. If newNs is close, the socket will be opened // in the current network namespace. func GetNetlinkSocketAt(newNs, curNs netns.NsHandle, protocol int) (*NetlinkSocket, error) { - var err error + c, err := executeInNetns(newNs, curNs) + if err != nil { + return nil, err + } + defer c() + return getNetlinkSocket(protocol) +} +// executeInNetns sets execution of the code following this call to the +// network namespace newNs, then moves the thread back to curNs if open, +// otherwise to the current netns at the time the function was invoked +// In case of success, the caller is expected to execute the returned function +// at the end of the code that needs to be executed in the network namespace. +// Example: +// func jobAt(...) error { +// d, err := executeInNetns(...) +// if err != nil { return err} +// defer d() +// < code which needs to be executed in specific netns> +// } +// TODO: his function probably belongs to netns pkg. +func executeInNetns(newNs, curNs netns.NsHandle) (func(), error) { + var ( + err error + moveBack func(netns.NsHandle) error + closeNs func() error + unlockThd func() + ) + restore := func() { + // order matters + if moveBack != nil { + moveBack(curNs) + } + if closeNs != nil { + closeNs() + } + if unlockThd != nil { + unlockThd() + } + } if newNs.IsOpen() { runtime.LockOSThread() - defer runtime.UnlockOSThread() + unlockThd = runtime.UnlockOSThread if !curNs.IsOpen() { if curNs, err = netns.Get(); err != nil { + restore() return nil, fmt.Errorf("could not get current namespace while creating netlink socket: %v", err) } - defer curNs.Close() + closeNs = curNs.Close } if err := netns.Set(newNs); err != nil { + restore() return nil, fmt.Errorf("failed to set into network namespace %d while creating netlink socket: %v", newNs, err) } - defer netns.Set(curNs) + moveBack = netns.Set } - - return getNetlinkSocket(protocol) + return restore, nil } // Create a netlink socket with a given protocol (e.g. NETLINK_ROUTE) @@ -377,6 +416,18 @@ func Subscribe(protocol int, groups ...uint) (*NetlinkSocket, error) { return s, nil } +// SubscribeAt works like Subscribe plus let's the caller choose the network +// namespace in which the socket would be opened (newNs). Then control goes back +// to curNs if open, otherwise to the netns at the time this function was called. +func SubscribeAt(newNs, curNs netns.NsHandle, protocol int, groups ...uint) (*NetlinkSocket, error) { + c, err := executeInNetns(newNs, curNs) + if err != nil { + return nil, err + } + defer c() + return Subscribe(protocol, groups...) +} + func (s *NetlinkSocket) Close() { syscall.Close(s.fd) s.fd = -1 diff --git a/route_linux.go b/route_linux.go index 5d684ad..6ba3ea1 100644 --- a/route_linux.go +++ b/route_linux.go @@ -6,6 +6,7 @@ import ( "syscall" "github.com/vishvananda/netlink/nl" + "github.com/vishvananda/netns" ) // RtAttr is shared so it is in netlink_linux.go @@ -421,7 +422,17 @@ func (h *Handle) RouteGet(destination net.IP) ([]Route, error) { // RouteSubscribe takes a chan down which notifications will be sent // when routes are added or deleted. Close the 'done' chan to stop subscription. func RouteSubscribe(ch chan<- RouteUpdate, done <-chan struct{}) error { - s, err := nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_IPV4_ROUTE, syscall.RTNLGRP_IPV6_ROUTE) + return routeSubscribeAt(netns.None(), netns.None(), ch, done) +} + +// RouteSubscribeAt works like RouteSubscribe plus it allows the caller +// to choose the network namespace in which to subscribe (ns). +func RouteSubscribeAt(ns netns.NsHandle, ch chan<- RouteUpdate, done <-chan struct{}) error { + return routeSubscribeAt(ns, netns.None(), ch, done) +} + +func routeSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- RouteUpdate, done <-chan struct{}) error { + s, err := nl.SubscribeAt(newNs, curNs, syscall.NETLINK_ROUTE, syscall.RTNLGRP_IPV4_ROUTE, syscall.RTNLGRP_IPV6_ROUTE) if err != nil { return err } diff --git a/route_test.go b/route_test.go index db3b570..3322af8 100644 --- a/route_test.go +++ b/route_test.go @@ -5,6 +5,8 @@ import ( "syscall" "testing" "time" + + "github.com/vishvananda/netns" ) func TestRouteAddDel(t *testing.T) { @@ -143,6 +145,62 @@ func TestRouteSubscribe(t *testing.T) { } } +func TestRouteSubscribeAt(t *testing.T) { + // Create an handle on a custom netns + newNs, err := netns.New() + if err != nil { + t.Fatal(err) + } + defer newNs.Close() + + nh, err := NewHandleAt(newNs) + if err != nil { + t.Fatal(err) + } + defer nh.Delete() + + // Subscribe for Route events on the custom netns + ch := make(chan RouteUpdate) + done := make(chan struct{}) + defer close(done) + if err := RouteSubscribeAt(newNs, ch, done); err != nil { + t.Fatal(err) + } + + // get loopback interface + link, err := nh.LinkByName("lo") + if err != nil { + t.Fatal(err) + } + + // bring the interface up + if err = nh.LinkSetUp(link); err != nil { + t.Fatal(err) + } + + // add a gateway route + dst := &net.IPNet{ + IP: net.IPv4(192, 169, 0, 0), + Mask: net.CIDRMask(24, 32), + } + + ip := net.IPv4(127, 100, 1, 1) + route := Route{LinkIndex: link.Attrs().Index, Dst: dst, Src: ip} + if err := nh.RouteAdd(&route); err != nil { + t.Fatal(err) + } + + if !expectRouteUpdate(ch, syscall.RTM_NEWROUTE, dst.IP) { + t.Fatal("Add update not received as expected") + } + if err := nh.RouteDel(&route); err != nil { + t.Fatal(err) + } + if !expectRouteUpdate(ch, syscall.RTM_DELROUTE, dst.IP) { + t.Fatal("Del update not received as expected") + } +} + func TestRouteExtraFields(t *testing.T) { tearDown := setUpNetlinkTest(t) defer tearDown()