Add ReceiveBufferSize to LinkSubscribeOptions

Sometimes with `ListExisting` enabled and enough interfaces configured,
netlink will try to send more data than the buffer can hold and the
caller will get back ENOBUFS. Thus, it's useful to be able to configure
the netlink buffer size.
This commit is contained in:
Daniel Xu 2022-04-08 10:29:12 -07:00 committed by Alessandro Boch
parent 63484bbf69
commit 3cc961ec4d
1 changed files with 14 additions and 7 deletions

View File

@ -2276,21 +2276,22 @@ type LinkUpdate struct {
// LinkSubscribe takes a chan down which notifications will be sent
// when links change. Close the 'done' chan to stop subscription.
func LinkSubscribe(ch chan<- LinkUpdate, done <-chan struct{}) error {
return linkSubscribeAt(netns.None(), netns.None(), ch, done, nil, false)
return linkSubscribeAt(netns.None(), netns.None(), ch, done, nil, false, 0)
}
// LinkSubscribeAt works like LinkSubscribe plus it allows the caller
// to choose the network namespace in which to subscribe (ns).
func LinkSubscribeAt(ns netns.NsHandle, ch chan<- LinkUpdate, done <-chan struct{}) error {
return linkSubscribeAt(ns, netns.None(), ch, done, nil, false)
return linkSubscribeAt(ns, netns.None(), ch, done, nil, false, 0)
}
// LinkSubscribeOptions contains a set of options to use with
// LinkSubscribeWithOptions.
type LinkSubscribeOptions struct {
Namespace *netns.NsHandle
ErrorCallback func(error)
ListExisting bool
Namespace *netns.NsHandle
ErrorCallback func(error)
ListExisting bool
ReceiveBufferSize int
}
// LinkSubscribeWithOptions work like LinkSubscribe but enable to
@ -2301,10 +2302,10 @@ func LinkSubscribeWithOptions(ch chan<- LinkUpdate, done <-chan struct{}, option
none := netns.None()
options.Namespace = &none
}
return linkSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback, options.ListExisting)
return linkSubscribeAt(*options.Namespace, netns.None(), ch, done, options.ErrorCallback, options.ListExisting, options.ReceiveBufferSize)
}
func linkSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- LinkUpdate, done <-chan struct{}, cberr func(error), listExisting bool) error {
func linkSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- LinkUpdate, done <-chan struct{}, cberr func(error), listExisting bool, rcvbuf int) error {
s, err := nl.SubscribeAt(newNs, curNs, unix.NETLINK_ROUTE, unix.RTNLGRP_LINK)
if err != nil {
return err
@ -2324,6 +2325,12 @@ func linkSubscribeAt(newNs, curNs netns.NsHandle, ch chan<- LinkUpdate, done <-c
return err
}
}
if rcvbuf != 0 {
err = pkgHandle.SetSocketReceiveBufferSize(rcvbuf, false)
if err != nil {
return err
}
}
go func() {
defer close(ch)
for {