瀏覽代碼

tweak defaults and turn on gsogro on linux by default

Ryan 1 月之前
父節點
當前提交
9253f36a3c
共有 4 個文件被更改,包括 75 次插入21 次删除
  1. 1 1
      connection_state.go
  2. 66 15
      interface.go
  3. 4 1
      main.go
  4. 4 4
      udp/udp_linux.go

+ 1 - 1
connection_state.go

@@ -15,7 +15,7 @@ import (
 
 // TODO: In a 5Gbps test, 1024 is not sufficient. With a 1400 MTU this is about 1.4Gbps of window, assuming full packets.
 // 4092 should be sufficient for 5Gbps
-const ReplayWindow = 1024
+const ReplayWindow = 4096
 
 type ConnectionState struct {
 	eKey           *NebulaCipherState

+ 66 - 15
interface.go

@@ -25,11 +25,14 @@ import (
 const (
 	mtu = 9001
 
-	inboundBatchSizeDefault      = 32
-	outboundBatchSizeDefault     = 32
-	batchFlushIntervalDefault    = 50 * time.Microsecond
-	maxOutstandingBatchesDefault = 1028
-	sendBatchSizeDefault         = 32
+	inboundBatchSizeDefault      = 128
+	outboundBatchSizeDefault     = 64
+	batchFlushIntervalDefault    = 12 * time.Microsecond
+	maxOutstandingBatchesDefault = 8
+	sendBatchSizeDefault         = 64
+	maxPendingPacketsDefault     = 32
+	maxPendingBytesDefault       = 64 * 1024
+	maxSendBufPerRoutineDefault  = 16
 )
 
 type InterfaceConfig struct {
@@ -65,6 +68,9 @@ type BatchConfig struct {
 	OutboundBatchSize     int
 	FlushInterval         time.Duration
 	MaxOutstandingPerChan int
+	MaxPendingPackets     int
+	MaxPendingBytes       int
+	MaxSendBuffersPerChan int
 }
 
 type Interface struct {
@@ -122,12 +128,16 @@ type Interface struct {
 	outboundBatchPool sync.Pool
 
 	sendPool      sync.Pool
+	sendBufCache  [][]*[]byte
 	sendBatchSize int
 
 	inboundBatchSize      int
 	outboundBatchSize     int
 	batchFlushInterval    time.Duration
 	maxOutstandingPerChan int
+	maxPendingPackets     int
+	maxPendingBytes       int
+	maxSendBufPerRoutine  int
 }
 
 type outboundSend struct {
@@ -204,7 +214,14 @@ func (f *Interface) releaseOutboundBatch(b *outboundBatch) {
 	f.outboundBatchPool.Put(b)
 }
 
-func (f *Interface) getSendBuffer() *[]byte {
+func (f *Interface) getSendBuffer(q int) *[]byte {
+	cache := f.sendBufCache[q]
+	if n := len(cache); n > 0 {
+		buf := cache[n-1]
+		f.sendBufCache[q] = cache[:n-1]
+		*buf = (*buf)[:0]
+		return buf
+	}
 	if v := f.sendPool.Get(); v != nil {
 		buf := v.(*[]byte)
 		*buf = (*buf)[:0]
@@ -214,15 +231,20 @@ func (f *Interface) getSendBuffer() *[]byte {
 	return &b
 }
 
-func (f *Interface) releaseSendBuffer(buf *[]byte) {
+func (f *Interface) releaseSendBuffer(q int, buf *[]byte) {
 	if buf == nil {
 		return
 	}
 	*buf = (*buf)[:0]
+	cache := f.sendBufCache[q]
+	if len(cache) < f.maxSendBufPerRoutine {
+		f.sendBufCache[q] = append(cache, buf)
+		return
+	}
 	f.sendPool.Put(buf)
 }
 
-func (f *Interface) flushSendQueue(q int, pending *[]outboundSend) {
+func (f *Interface) flushSendQueue(q int, pending *[]outboundSend, pendingBytes *int) {
 	if len(*pending) == 0 {
 		return
 	}
@@ -241,9 +263,12 @@ func (f *Interface) flushSendQueue(q int, pending *[]outboundSend) {
 	}
 
 	for _, entry := range *pending {
-		f.releaseSendBuffer(entry.buf)
+		f.releaseSendBuffer(q, entry.buf)
 	}
 	*pending = (*pending)[:0]
+	if pendingBytes != nil {
+		*pendingBytes = 0
+	}
 }
 
 type EncWriter interface {
@@ -327,6 +352,15 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
 	if bc.MaxOutstandingPerChan <= 0 {
 		bc.MaxOutstandingPerChan = maxOutstandingBatchesDefault
 	}
+	if bc.MaxPendingPackets <= 0 {
+		bc.MaxPendingPackets = maxPendingPacketsDefault
+	}
+	if bc.MaxPendingBytes <= 0 {
+		bc.MaxPendingBytes = maxPendingBytesDefault
+	}
+	if bc.MaxSendBuffersPerChan <= 0 {
+		bc.MaxSendBuffersPerChan = maxSendBufPerRoutineDefault
+	}
 	ifce := &Interface{
 		pki:                   c.pki,
 		hostMap:               c.HostMap,
@@ -368,6 +402,9 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
 		outboundBatchSize:     bc.OutboundBatchSize,
 		batchFlushInterval:    bc.FlushInterval,
 		maxOutstandingPerChan: bc.MaxOutstandingPerChan,
+		maxPendingPackets:     bc.MaxPendingPackets,
+		maxPendingBytes:       bc.MaxPendingBytes,
+		maxSendBufPerRoutine:  bc.MaxSendBuffersPerChan,
 		sendBatchSize:         bc.OutboundBatchSize,
 	}
 
@@ -397,6 +434,7 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
 		buf := make([]byte, mtu)
 		return &buf
 	}}
+	ifce.sendBufCache = make([][]*[]byte, c.routines)
 
 	ifce.tryPromoteEvery.Store(c.tryPromoteEvery)
 	ifce.reQueryEvery.Store(c.reQueryEvery)
@@ -598,37 +636,50 @@ func (f *Interface) workerOut(i int, ctx context.Context) {
 	fwPacket1 := &firewall.Packet{}
 	nb1 := make([]byte, 12, 12)
 	pending := make([]outboundSend, 0, f.sendBatchSize)
+	pendingBytes := 0
+	maxPendingPackets := f.maxPendingPackets
+	if maxPendingPackets <= 0 {
+		maxPendingPackets = f.sendBatchSize
+	}
+	maxPendingBytes := f.maxPendingBytes
+	if maxPendingBytes <= 0 {
+		maxPendingBytes = f.sendBatchSize * mtu
+	}
 
 	for {
 		select {
 		case batch := <-f.outbound[i]:
 			for _, data := range batch.payloads {
-				sendBuf := f.getSendBuffer()
+				sendBuf := f.getSendBuffer(i)
 				buf := (*sendBuf)[:0]
 				queue := func(addr netip.AddrPort, length int) {
+					if len(pending) >= maxPendingPackets || pendingBytes+length > maxPendingBytes {
+						f.flushSendQueue(i, &pending, &pendingBytes)
+					}
 					pending = append(pending, outboundSend{
 						buf:    sendBuf,
 						length: length,
 						addr:   addr,
 					})
-					if len(pending) >= f.sendBatchSize {
-						f.flushSendQueue(i, &pending)
+					pendingBytes += length
+					if len(pending) >= f.sendBatchSize || pendingBytes >= maxPendingBytes {
+						f.flushSendQueue(i, &pending, &pendingBytes)
 					}
 				}
 				sent := f.consumeInsidePacket(*data, fwPacket1, nb1, buf, queue, i, conntrackCache.Get(f.l))
 				if !sent {
-					f.releaseSendBuffer(sendBuf)
+					f.releaseSendBuffer(i, sendBuf)
 				}
 				*data = (*data)[:mtu]
 				f.outPool.Put(data)
 			}
 			f.releaseOutboundBatch(batch)
 			if len(pending) > 0 {
-				f.flushSendQueue(i, &pending)
+				f.flushSendQueue(i, &pending, &pendingBytes)
 			}
 		case <-ctx.Done():
 			if len(pending) > 0 {
-				f.flushSendQueue(i, &pending)
+				f.flushSendQueue(i, &pending, &pendingBytes)
 			}
 			f.wg.Done()
 			return

+ 4 - 1
main.go

@@ -164,7 +164,7 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg
 
 		for i := 0; i < routines; i++ {
 			l.Infof("listening on %v", netip.AddrPortFrom(listenHost, uint16(port)))
-			udpServer, err := udp.NewListener(l, listenHost, port, routines > 1, c.GetInt("listen.batch", 64))
+			udpServer, err := udp.NewListener(l, listenHost, port, routines > 1, c.GetInt("listen.batch", 128))
 			if err != nil {
 				return nil, util.NewContextualError("Failed to open udp listener", m{"queue": i}, err)
 			}
@@ -226,6 +226,9 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg
 		OutboundBatchSize:     c.GetInt("batch.outbound_size", outboundBatchSizeDefault),
 		FlushInterval:         c.GetDuration("batch.flush_interval", batchFlushIntervalDefault),
 		MaxOutstandingPerChan: c.GetInt("batch.max_outstanding", maxOutstandingBatchesDefault),
+		MaxPendingPackets:     c.GetInt("batch.max_pending_packets", 0),
+		MaxPendingBytes:       c.GetInt("batch.max_pending_bytes", 0),
+		MaxSendBuffersPerChan: c.GetInt("batch.max_send_buffers_per_routine", 0),
 	}
 
 	ifConfig := &InterfaceConfig{

+ 4 - 4
udp/udp_linux.go

@@ -23,8 +23,8 @@ import (
 var readTimeout = unix.NsecToTimeval(int64(time.Millisecond * 500))
 
 const (
-	defaultGSOMaxSegments    = 8
-	defaultGSOFlushTimeout   = 150 * time.Microsecond
+	defaultGSOMaxSegments    = 128
+	defaultGSOFlushTimeout   = 80 * time.Microsecond
 	defaultGROReadBufferSize = MTU * defaultGSOMaxSegments
 	maxGSOBatchBytes         = 0xFFFF
 )
@@ -565,7 +565,7 @@ func (u *StdConn) configureGRO(c *config.C) {
 		return
 	}
 
-	enable := c.GetBool("listen.enable_gro", false)
+	enable := c.GetBool("listen.enable_gro", true)
 	if enable == u.enableGRO {
 		if enable {
 			if size := c.GetInt("listen.gro_read_buffer", 0); size > 0 {
@@ -594,7 +594,7 @@ func (u *StdConn) configureGRO(c *config.C) {
 }
 
 func (u *StdConn) configureGSO(c *config.C) {
-	enable := c.GetBool("listen.enable_gso", false)
+	enable := c.GetBool("listen.enable_gso", true)
 	if !enable {
 		u.disableGSO()
 	} else {