Browse Source

add netlink options (#1326)

* add netlink options

* force use buffer

* fix namings and add config examples

* fix linter
Andriyanov Nikita 3 months ago
parent
commit
e5ce8966d6
2 changed files with 31 additions and 13 deletions
  1. 4 0
      examples/config.yml
  2. 27 13
      overlay/tun_linux.go

+ 4 - 0
examples/config.yml

@@ -275,6 +275,10 @@ tun:
   # On linux only, set to true to manage unsafe routes directly on the system route table with gateway routes instead of
   # in nebula configuration files. Default false, not reloadable.
   #use_system_route_table: false
+  # Buffer size for reading routes updates. 0 means default system buffer size. (/proc/sys/net/core/rmem_default).
+  # If using massive routes updates, for example BGP, you may need to increase this value to avoid packet loss.
+  # SO_RCVBUFFORCE is used to avoid having to raise the system wide max
+  #use_system_route_table_buffer_size: 0
 
 # Configure logging level
 logging:

+ 27 - 13
overlay/tun_linux.go

@@ -34,10 +34,11 @@ type tun struct {
 	deviceIndex int
 	ioctlFd     uintptr
 
-	Routes          atomic.Pointer[[]Route]
-	routeTree       atomic.Pointer[bart.Table[routing.Gateways]]
-	routeChan       chan struct{}
-	useSystemRoutes bool
+	Routes                    atomic.Pointer[[]Route]
+	routeTree                 atomic.Pointer[bart.Table[routing.Gateways]]
+	routeChan                 chan struct{}
+	useSystemRoutes           bool
+	useSystemRoutesBufferSize int
 
 	l *logrus.Logger
 }
@@ -124,12 +125,13 @@ func newTun(c *config.C, l *logrus.Logger, vpnNetworks []netip.Prefix, multiqueu
 
 func newTunGeneric(c *config.C, l *logrus.Logger, file *os.File, vpnNetworks []netip.Prefix) (*tun, error) {
 	t := &tun{
-		ReadWriteCloser: file,
-		fd:              int(file.Fd()),
-		vpnNetworks:     vpnNetworks,
-		TXQueueLen:      c.GetInt("tun.tx_queue", 500),
-		useSystemRoutes: c.GetBool("tun.use_system_route_table", false),
-		l:               l,
+		ReadWriteCloser:           file,
+		fd:                        int(file.Fd()),
+		vpnNetworks:               vpnNetworks,
+		TXQueueLen:                c.GetInt("tun.tx_queue", 500),
+		useSystemRoutes:           c.GetBool("tun.use_system_route_table", false),
+		useSystemRoutesBufferSize: c.GetInt("tun.use_system_route_table_buffer_size", 0),
+		l:                         l,
 	}
 
 	err := t.reload(c, true)
@@ -531,7 +533,13 @@ func (t *tun) watchRoutes() {
 	rch := make(chan netlink.RouteUpdate)
 	doneChan := make(chan struct{})
 
-	if err := netlink.RouteSubscribe(rch, doneChan); err != nil {
+	netlinkOptions := netlink.RouteSubscribeOptions{
+		ReceiveBufferSize:      t.useSystemRoutesBufferSize,
+		ReceiveBufferForceSize: t.useSystemRoutesBufferSize != 0,
+		ErrorCallback:          func(e error) { t.l.WithError(e).Errorf("netlink error") },
+	}
+
+	if err := netlink.RouteSubscribeWithOptions(rch, doneChan, netlinkOptions); err != nil {
 		t.l.WithError(err).Errorf("failed to subscribe to system route changes")
 		return
 	}
@@ -541,8 +549,14 @@ func (t *tun) watchRoutes() {
 	go func() {
 		for {
 			select {
-			case r := <-rch:
-				t.updateRoutes(r)
+			case r, ok := <-rch:
+				if ok {
+					t.updateRoutes(r)
+				} else {
+					// may be should do something here as
+					// netlink stops sending updates
+					return
+				}
 			case <-doneChan:
 				// netlink.RouteSubscriber will close the rch for us
 				return