Add ReceiveBufferSize and force option to *Subscribe

When there are a large number of existing results for the link, neighbor, and
address subscribe functions with ListExisting are likely to fail with ENOBUFS.
This takes the AddrSubscribeOptions ReceiveBufferSize, already applied
to LinkSubscribeOptions, and applies it to NeighSubscribeOptions and
RouteSubscribeOptions. The ReceiveTimeout option was also added to each.

Added a SetReceiveBufferSize to the nl_linux socket API.

The existing addr_linux subscribe function was modified so instead of setting
the ReceiveBufferSize on the netlink pkghandle, it is set on the socket
associated with the subscription. The new implementations also only change the
receive buffer size on the socket.

Lastly, a new ReceiveBufferForceSize option was applied to all four of the
modified Subscribe functions.
This commit is contained in:
Dave Setzke 2022-05-05 16:56:08 -06:00 committed by Alessandro Boch
parent 3cc961ec4d
commit 229a10237c
5 changed files with 95 additions and 41 deletions

View File

@ -296,23 +296,24 @@ type AddrUpdate struct {
// AddrSubscribe takes a chan down which notifications will be sent // AddrSubscribe takes a chan down which notifications will be sent
// when addresses change. Close the 'done' chan to stop subscription. // when addresses change. Close the 'done' chan to stop subscription.
func AddrSubscribe(ch chan<- AddrUpdate, done <-chan struct{}) error { func AddrSubscribe(ch chan<- AddrUpdate, done <-chan struct{}) error {
return addrSubscribeAt(netns.None(), netns.None(), ch, done, nil, false, 0, nil) return addrSubscribeAt(netns.None(), netns.None(), ch, done, nil, false, 0, nil, false)
} }
// AddrSubscribeAt works like AddrSubscribe plus it allows the caller // AddrSubscribeAt works like AddrSubscribe plus it allows the caller
// to choose the network namespace in which to subscribe (ns). // to choose the network namespace in which to subscribe (ns).
func AddrSubscribeAt(ns netns.NsHandle, ch chan<- AddrUpdate, done <-chan struct{}) error { func AddrSubscribeAt(ns netns.NsHandle, ch chan<- AddrUpdate, done <-chan struct{}) error {
return addrSubscribeAt(ns, netns.None(), ch, done, nil, false, 0, nil) return addrSubscribeAt(ns, netns.None(), ch, done, nil, false, 0, nil, false)
} }
// AddrSubscribeOptions contains a set of options to use with // AddrSubscribeOptions contains a set of options to use with
// AddrSubscribeWithOptions. // AddrSubscribeWithOptions.
type AddrSubscribeOptions struct { type AddrSubscribeOptions struct {
Namespace *netns.NsHandle Namespace *netns.NsHandle
ErrorCallback func(error) ErrorCallback func(error)
ListExisting bool ListExisting bool
ReceiveBufferSize int ReceiveBufferSize int
ReceiveTimeout *unix.Timeval ReceiveBufferForceSize bool
ReceiveTimeout *unix.Timeval
} }
// AddrSubscribeWithOptions work like AddrSubscribe but enable to // AddrSubscribeWithOptions work like AddrSubscribe but enable to
@ -323,10 +324,12 @@ func AddrSubscribeWithOptions(ch chan<- AddrUpdate, done <-chan struct{}, option
none := netns.None() none := netns.None()
options.Namespace = &none options.Namespace = &none
} }
return addrSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback, options.ListExisting, options.ReceiveBufferSize, options.ReceiveTimeout) return addrSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback, options.ListExisting,
options.ReceiveBufferSize, options.ReceiveTimeout, options.ReceiveBufferForceSize)
} }
func addrSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- AddrUpdate, done <-chan struct{}, cberr func(error), listExisting bool, rcvbuf int, rcvTimeout *unix.Timeval) error { func addrSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- AddrUpdate, done <-chan struct{}, cberr func(error), listExisting bool,
rcvbuf int, rcvTimeout *unix.Timeval, rcvBufForce bool) error {
s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_IPV4_IFADDR, unix.RTNLGRP_IPV6_IFADDR) s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_IPV4_IFADDR, unix.RTNLGRP_IPV6_IFADDR)
if err != nil { if err != nil {
return err return err
@ -336,19 +339,18 @@ func addrSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- AddrUpdate, done <-c
return err return err
} }
} }
if rcvbuf != 0 {
err = s.SetReceiveBufferSize(rcvbuf, rcvBufForce)
if err != nil {
return err
}
}
if done != nil { if done != nil {
go func() { go func() {
<-done <-done
s.Close() s.Close()
}() }()
} }
if rcvbuf != 0 {
err = pkgHandle.SetSocketReceiveBufferSize(rcvbuf, false)
if err != nil {
return err
}
}
if listExisting { if listExisting {
req := pkgHandle.newNetlinkRequest(unix.RTM_GETADDR, req := pkgHandle.newNetlinkRequest(unix.RTM_GETADDR,
unix.NLM_F_DUMP) unix.NLM_F_DUMP)

View File

@ -2276,22 +2276,24 @@ type LinkUpdate struct {
// LinkSubscribe takes a chan down which notifications will be sent // LinkSubscribe takes a chan down which notifications will be sent
// when links change. Close the 'done' chan to stop subscription. // when links change. Close the 'done' chan to stop subscription.
func LinkSubscribe(ch chan<- LinkUpdate, done <-chan struct{}) error { func LinkSubscribe(ch chan<- LinkUpdate, done <-chan struct{}) error {
return linkSubscribeAt(netns.None(), netns.None(), ch, done, nil, false, 0) return linkSubscribeAt(netns.None(), netns.None(), ch, done, nil, false, 0, nil, false)
} }
// LinkSubscribeAt works like LinkSubscribe plus it allows the caller // LinkSubscribeAt works like LinkSubscribe plus it allows the caller
// to choose the network namespace in which to subscribe (ns). // to choose the network namespace in which to subscribe (ns).
func LinkSubscribeAt(ns netns.NsHandle, ch chan<- LinkUpdate, done <-chan struct{}) error { func LinkSubscribeAt(ns netns.NsHandle, ch chan<- LinkUpdate, done <-chan struct{}) error {
return linkSubscribeAt(ns, netns.None(), ch, done, nil, false, 0) return linkSubscribeAt(ns, netns.None(), ch, done, nil, false, 0, nil, false)
} }
// LinkSubscribeOptions contains a set of options to use with // LinkSubscribeOptions contains a set of options to use with
// LinkSubscribeWithOptions. // LinkSubscribeWithOptions.
type LinkSubscribeOptions struct { type LinkSubscribeOptions struct {
Namespace *netns.NsHandle Namespace *netns.NsHandle
ErrorCallback func(error) ErrorCallback func(error)
ListExisting bool ListExisting bool
ReceiveBufferSize int ReceiveBufferSize int
ReceiveBufferForceSize bool
ReceiveTimeout *unix.Timeval
} }
// LinkSubscribeWithOptions work like LinkSubscribe but enable to // LinkSubscribeWithOptions work like LinkSubscribe but enable to
@ -2302,14 +2304,27 @@ func LinkSubscribeWithOptions(ch chan<- LinkUpdate, done <-chan struct{}, option
none := netns.None() none := netns.None()
options.Namespace = &none options.Namespace = &none
} }
return linkSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback, options.ListExisting, options.ReceiveBufferSize) return linkSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback, options.ListExisting,
options.ReceiveBufferSize, options.ReceiveTimeout, options.ReceiveBufferForceSize)
} }
func linkSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- LinkUpdate, done <-chan struct{}, cberr func(error), listExisting bool, rcvbuf int) error { func linkSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- LinkUpdate, done <-chan struct{}, cberr func(error), listExisting bool,
rcvbuf int, rcvTimeout *unix.Timeval, rcvbufForce bool) error {
s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_LINK) s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_LINK)
if err != nil { if err != nil {
return err return err
} }
if rcvTimeout != nil {
if err := s.SetReceiveTimeout(rcvTimeout); err != nil {
return err
}
}
if rcvbuf != 0 {
err = s.SetReceiveBufferSize(rcvbuf, rcvbufForce)
if err != nil {
return err
}
}
if done != nil { if done != nil {
go func() { go func() {
<-done <-done
@ -2325,12 +2340,6 @@ func linkSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- LinkUpdate, done <-c
return err return err
} }
} }
if rcvbuf != 0 {
err = pkgHandle.SetSocketReceiveBufferSize(rcvbuf, false)
if err != nil {
return err
}
}
go func() { go func() {
defer close(ch) defer close(ch)
for { for {

View File

@ -339,13 +339,13 @@ func NeighDeserialize(m []byte) (*Neigh, error) {
// NeighSubscribe takes a chan down which notifications will be sent // NeighSubscribe takes a chan down which notifications will be sent
// when neighbors are added or deleted. Close the 'done' chan to stop subscription. // when neighbors are added or deleted. Close the 'done' chan to stop subscription.
func NeighSubscribe(ch chan<- NeighUpdate, done <-chan struct{}) error { func NeighSubscribe(ch chan<- NeighUpdate, done <-chan struct{}) error {
return neighSubscribeAt(netns.None(), netns.None(), ch, done, nil, false) return neighSubscribeAt(netns.None(), netns.None(), ch, done, nil, false, 0, nil, false)
} }
// NeighSubscribeAt works like NeighSubscribe plus it allows the caller // NeighSubscribeAt works like NeighSubscribe plus it allows the caller
// to choose the network namespace in which to subscribe (ns). // to choose the network namespace in which to subscribe (ns).
func NeighSubscribeAt(ns netns.NsHandle, ch chan<- NeighUpdate, done <-chan struct{}) error { func NeighSubscribeAt(ns netns.NsHandle, ch chan<- NeighUpdate, done <-chan struct{}) error {
return neighSubscribeAt(ns, netns.None(), ch, done, nil, false) return neighSubscribeAt(ns, netns.None(), ch, done, nil, false, 0, nil, false)
} }
// NeighSubscribeOptions contains a set of options to use with // NeighSubscribeOptions contains a set of options to use with
@ -354,6 +354,11 @@ type NeighSubscribeOptions struct {
Namespace *netns.NsHandle Namespace *netns.NsHandle
ErrorCallback func(error) ErrorCallback func(error)
ListExisting bool ListExisting bool
// max size is based on value of /proc/sys/net/core/rmem_max
ReceiveBufferSize int
ReceiveBufferForceSize bool
ReceiveTimeout *unix.Timeval
} }
// NeighSubscribeWithOptions work like NeighSubscribe but enable to // NeighSubscribeWithOptions work like NeighSubscribe but enable to
@ -364,10 +369,12 @@ func NeighSubscribeWithOptions(ch chan<- NeighUpdate, done <-chan struct{}, opti
none := netns.None() none := netns.None()
options.Namespace = &none options.Namespace = &none
} }
return neighSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback, options.ListExisting) return neighSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback, options.ListExisting,
options.ReceiveBufferSize, options.ReceiveTimeout, options.ReceiveBufferForceSize)
} }
func neighSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- NeighUpdate, done <-chan struct{}, cberr func(error), listExisting bool) error { func neighSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- NeighUpdate, done <-chan struct{}, cberr func(error), listExisting bool,
rcvbuf int, rcvTimeout *unix.Timeval, rcvbufForce bool) error {
s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_NEIGH) s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_NEIGH)
makeRequest := func(family int) error { makeRequest := func(family int) error {
req := pkgHandle.newNetlinkRequest(unix.RTM_GETNEIGH, unix.NLM_F_DUMP) req := pkgHandle.newNetlinkRequest(unix.RTM_GETNEIGH, unix.NLM_F_DUMP)
@ -381,6 +388,17 @@ func neighSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- NeighUpdate, done <
if err != nil { if err != nil {
return err return err
} }
if rcvTimeout != nil {
if err := s.SetReceiveTimeout(rcvTimeout); err != nil {
return err
}
}
if rcvbuf != 0 {
err = s.SetReceiveBufferSize(rcvbuf, rcvbufForce)
if err != nil {
return err
}
}
if done != nil { if done != nil {
go func() { go func() {
<-done <-done

View File

@ -806,6 +806,15 @@ func (s *NetlinkSocket) SetReceiveTimeout(timeout *unix.Timeval) error {
return unix.SetsockoptTimeval(int(s.fd), unix.SOL_SOCKET, unix.SO_RCVTIMEO, timeout) return unix.SetsockoptTimeval(int(s.fd), unix.SOL_SOCKET, unix.SO_RCVTIMEO, timeout)
} }
// SetReceiveBufferSize allows to set a receive buffer size on the socket
func (s *NetlinkSocket) SetReceiveBufferSize(size int, force bool) error {
opt := unix.SO_RCVBUF
if force {
opt = unix.SO_RCVBUFFORCE
}
return unix.SetsockoptInt(int(s.fd), unix.SOL_SOCKET, opt, size)
}
// SetExtAck requests error messages to be reported on the socket // SetExtAck requests error messages to be reported on the socket
func (s *NetlinkSocket) SetExtAck(enable bool) error { func (s *NetlinkSocket) SetExtAck(enable bool) error {
var enableN int var enableN int

View File

@ -1458,21 +1458,24 @@ func (h *Handle) RouteGet(destination net.IP) ([]Route, error) {
// RouteSubscribe takes a chan down which notifications will be sent // RouteSubscribe takes a chan down which notifications will be sent
// when routes are added or deleted. Close the 'done' chan to stop subscription. // when routes are added or deleted. Close the 'done' chan to stop subscription.
func RouteSubscribe(ch chan<- RouteUpdate, done <-chan struct{}) error { func RouteSubscribe(ch chan<- RouteUpdate, done <-chan struct{}) error {
return routeSubscribeAt(netns.None(), netns.None(), ch, done, nil, false) return routeSubscribeAt(netns.None(), netns.None(), ch, done, nil, false, 0, nil, false)
} }
// RouteSubscribeAt works like RouteSubscribe plus it allows the caller // RouteSubscribeAt works like RouteSubscribe plus it allows the caller
// to choose the network namespace in which to subscribe (ns). // to choose the network namespace in which to subscribe (ns).
func RouteSubscribeAt(ns netns.NsHandle, ch chan<- RouteUpdate, done <-chan struct{}) error { func RouteSubscribeAt(ns netns.NsHandle, ch chan<- RouteUpdate, done <-chan struct{}) error {
return routeSubscribeAt(ns, netns.None(), ch, done, nil, false) return routeSubscribeAt(ns, netns.None(), ch, done, nil, false, 0, nil, false)
} }
// RouteSubscribeOptions contains a set of options to use with // RouteSubscribeOptions contains a set of options to use with
// RouteSubscribeWithOptions. // RouteSubscribeWithOptions.
type RouteSubscribeOptions struct { type RouteSubscribeOptions struct {
Namespace *netns.NsHandle Namespace *netns.NsHandle
ErrorCallback func(error) ErrorCallback func(error)
ListExisting bool ListExisting bool
ReceiveBufferSize int
ReceiveBufferForceSize bool
ReceiveTimeout *unix.Timeval
} }
// RouteSubscribeWithOptions work like RouteSubscribe but enable to // RouteSubscribeWithOptions work like RouteSubscribe but enable to
@ -1483,14 +1486,27 @@ func RouteSubscribeWithOptions(ch chan<- RouteUpdate, done <-chan struct{}, opti
none := netns.None() none := netns.None()
options.Namespace = &none options.Namespace = &none
} }
return routeSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback, options.ListExisting) return routeSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback, options.ListExisting,
options.ReceiveBufferSize, options.ReceiveTimeout, options.ReceiveBufferForceSize)
} }
func routeSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- RouteUpdate, done <-chan struct{}, cberr func(error), listExisting bool) error { func routeSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- RouteUpdate, done <-chan struct{}, cberr func(error), listExisting bool,
rcvbuf int, rcvTimeout *unix.Timeval, rcvbufForce bool) error {
s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_IPV4_ROUTE, unix.RTNLGRP_IPV6_ROUTE) s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_IPV4_ROUTE, unix.RTNLGRP_IPV6_ROUTE)
if err != nil { if err != nil {
return err return err
} }
if rcvTimeout != nil {
if err := s.SetReceiveTimeout(rcvTimeout); err != nil {
return err
}
}
if rcvbuf != 0 {
err = s.SetReceiveBufferSize(rcvbuf, rcvbufForce)
if err != nil {
return err
}
}
if done != nil { if done != nil {
go func() { go func() {
<-done <-done