Browse Source

config tweaks for batching

Ryan 1 month ago
parent
commit
2c6f81c224
3 changed files with 63 additions and 17 deletions
  1. 7 0
      examples/config.yml
  2. 48 17
      interface.go
  3. 8 0
      main.go

+ 7 - 0
examples/config.yml

@@ -132,6 +132,13 @@ listen:
   # Sets the max number of packets to pull from the kernel for each syscall (under systems that support recvmmsg)
   # default is 64, does not support reload
   #batch: 64
+
+# Control batching between UDP and TUN pipelines
+#batch:
+#  inbound_size: 32        # packets to queue from UDP before handing to workers
+#  outbound_size: 32       # packets to queue from TUN before handing to workers
+#  flush_interval: 50us    # flush partially filled batches after this duration
+#  max_outstanding: 1028   # batches buffered per routine on each channel
   # Configure socket buffers for the udp side (outside), leave unset to use the system defaults. Values will be doubled by the kernel
   # Default is net.core.rmem_default and net.core.wmem_default (/proc/sys/net/core/rmem_default and /proc/sys/net/core/rmem_default)
   # Maximum is limited by memory in the system, SO_RCVBUFFORCE and SO_SNDBUFFORCE is used to avoid having to raise the system wide

+ 48 - 17
interface.go

@@ -25,10 +25,10 @@ import (
 const (
 	mtu = 9001
 
-	inboundBatchSize      = 32
-	outboundBatchSize     = 32
-	batchFlushInterval    = 50 * time.Microsecond
-	maxOutstandingBatches = 1028
+	inboundBatchSizeDefault      = 32
+	outboundBatchSizeDefault     = 32
+	batchFlushIntervalDefault    = 50 * time.Microsecond
+	maxOutstandingBatchesDefault = 1028
 )
 
 type InterfaceConfig struct {
@@ -55,9 +55,17 @@ type InterfaceConfig struct {
 	reQueryWait     time.Duration
 
 	ConntrackCacheTimeout time.Duration
+	BatchConfig           BatchConfig
 	l                     *logrus.Logger
 }
 
+type BatchConfig struct {
+	InboundBatchSize      int
+	OutboundBatchSize     int
+	FlushInterval         time.Duration
+	MaxOutstandingPerChan int
+}
+
 type Interface struct {
 	hostMap               *HostMap
 	outside               udp.Conn
@@ -111,15 +119,20 @@ type Interface struct {
 
 	packetBatchPool   sync.Pool
 	outboundBatchPool sync.Pool
+
+	inboundBatchSize      int
+	outboundBatchSize     int
+	batchFlushInterval    time.Duration
+	maxOutstandingPerChan int
 }
 
 type packetBatch struct {
 	packets []*packet.Packet
 }
 
-func newPacketBatch() *packetBatch {
+func newPacketBatch(capacity int) *packetBatch {
 	return &packetBatch{
-		packets: make([]*packet.Packet, 0, inboundBatchSize),
+		packets: make([]*packet.Packet, 0, capacity),
 	}
 }
 
@@ -140,7 +153,7 @@ func (f *Interface) getPacketBatch() *packetBatch {
 		b.reset()
 		return b
 	}
-	return newPacketBatch()
+	return newPacketBatch(f.inboundBatchSize)
 }
 
 func (f *Interface) releasePacketBatch(b *packetBatch) {
@@ -152,8 +165,8 @@ type outboundBatch struct {
 	payloads []*[]byte
 }
 
-func newOutboundBatch() *outboundBatch {
-	return &outboundBatch{payloads: make([]*[]byte, 0, outboundBatchSize)}
+func newOutboundBatch(capacity int) *outboundBatch {
+	return &outboundBatch{payloads: make([]*[]byte, 0, capacity)}
 }
 
 func (b *outboundBatch) add(buf *[]byte) {
@@ -173,7 +186,7 @@ func (f *Interface) getOutboundBatch() *outboundBatch {
 		b.reset()
 		return b
 	}
-	return newOutboundBatch()
+	return newOutboundBatch(f.outboundBatchSize)
 }
 
 func (f *Interface) releaseOutboundBatch(b *outboundBatch) {
@@ -248,6 +261,20 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
 	}
 
 	cs := c.pki.getCertState()
+
+	bc := c.BatchConfig
+	if bc.InboundBatchSize <= 0 {
+		bc.InboundBatchSize = inboundBatchSizeDefault
+	}
+	if bc.OutboundBatchSize <= 0 {
+		bc.OutboundBatchSize = outboundBatchSizeDefault
+	}
+	if bc.FlushInterval <= 0 {
+		bc.FlushInterval = batchFlushIntervalDefault
+	}
+	if bc.MaxOutstandingPerChan <= 0 {
+		bc.MaxOutstandingPerChan = maxOutstandingBatchesDefault
+	}
 	ifce := &Interface{
 		pki:                   c.pki,
 		hostMap:               c.HostMap,
@@ -280,16 +307,20 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
 			dropped: metrics.GetOrRegisterCounter("hostinfo.cached_packets.dropped", nil),
 		},
 
-		//TODO: configurable size
 		inbound:  make([]chan *packetBatch, c.routines),
 		outbound: make([]chan *outboundBatch, c.routines),
 
 		l: c.l,
+
+		inboundBatchSize:      bc.InboundBatchSize,
+		outboundBatchSize:     bc.OutboundBatchSize,
+		batchFlushInterval:    bc.FlushInterval,
+		maxOutstandingPerChan: bc.MaxOutstandingPerChan,
 	}
 
 	for i := 0; i < c.routines; i++ {
-		ifce.inbound[i] = make(chan *packetBatch, maxOutstandingBatches)
-		ifce.outbound[i] = make(chan *outboundBatch, maxOutstandingBatches)
+		ifce.inbound[i] = make(chan *packetBatch, ifce.maxOutstandingPerChan)
+		ifce.outbound[i] = make(chan *outboundBatch, ifce.maxOutstandingPerChan)
 	}
 
 	ifce.inPool = sync.Pool{New: func() any {
@@ -302,11 +333,11 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
 	}}
 
 	ifce.packetBatchPool = sync.Pool{New: func() any {
-		return newPacketBatch()
+		return newPacketBatch(ifce.inboundBatchSize)
 	}}
 
 	ifce.outboundBatchPool = sync.Pool{New: func() any {
-		return newOutboundBatch()
+		return newOutboundBatch(ifce.outboundBatchSize)
 	}}
 
 	ifce.tryPromoteEvery.Store(c.tryPromoteEvery)
@@ -411,7 +442,7 @@ func (f *Interface) listenOut(i int) {
 		p.Addr = fromUdpAddr
 		batch.add(p)
 
-		if len(batch.packets) >= inboundBatchSize || time.Since(lastFlush) >= batchFlushInterval {
+		if len(batch.packets) >= f.inboundBatchSize || time.Since(lastFlush) >= f.batchFlushInterval {
 			flush(false)
 		}
 	})
@@ -465,7 +496,7 @@ func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
 		*p = (*p)[:n]
 		batch.add(p)
 
-		if len(batch.payloads) >= outboundBatchSize || time.Since(lastFlush) >= batchFlushInterval {
+		if len(batch.payloads) >= f.outboundBatchSize || time.Since(lastFlush) >= f.batchFlushInterval {
 			flush(false)
 		}
 	}

+ 8 - 0
main.go

@@ -221,6 +221,13 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg
 		}
 	}
 
+	batchCfg := BatchConfig{
+		InboundBatchSize:      c.GetInt("batch.inbound_size", inboundBatchSizeDefault),
+		OutboundBatchSize:     c.GetInt("batch.outbound_size", outboundBatchSizeDefault),
+		FlushInterval:         c.GetDuration("batch.flush_interval", batchFlushIntervalDefault),
+		MaxOutstandingPerChan: c.GetInt("batch.max_outstanding", maxOutstandingBatchesDefault),
+	}
+
 	ifConfig := &InterfaceConfig{
 		HostMap:               hostMap,
 		Inside:                tun,
@@ -242,6 +249,7 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg
 		relayManager:          NewRelayManager(ctx, l, hostMap, c),
 		punchy:                punchy,
 		ConntrackCacheTimeout: conntrackCacheTimeout,
+		BatchConfig:           batchCfg,
 		l:                     l,
 	}