| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- package nebula
- import (
- "net/netip"
- "github.com/slackhq/nebula/overlay"
- "github.com/slackhq/nebula/udp"
- )
- // batchPipelines tracks whether the inside device can operate on packet batches
- // and, if so, holds the shared packet pool sized for the virtio headroom and
- // payload limits advertised by the device. It also owns the fan-in/fan-out
- // queues between the TUN readers, encrypt/decrypt workers, and the UDP writers.
- type batchPipelines struct {
- enabled bool
- inside overlay.BatchCapableDevice
- headroom int
- payloadCap int
- pool *overlay.PacketPool
- batchSize int
- routines int
- rxQueues []chan *overlay.Packet
- txQueues []chan queuedDatagram
- tunQueues []chan *overlay.Packet
- }
- type queuedDatagram struct {
- packet *overlay.Packet
- addr netip.AddrPort
- }
- func (bp *batchPipelines) init(device overlay.Device, routines int, queueDepth int, maxSegments int) {
- if device == nil || routines <= 0 {
- return
- }
- bcap, ok := device.(overlay.BatchCapableDevice)
- if !ok {
- return
- }
- headroom := bcap.BatchHeadroom()
- payload := bcap.BatchPayloadCap()
- if maxSegments < 1 {
- maxSegments = 1
- }
- requiredPayload := udp.MTU * maxSegments
- if payload < requiredPayload {
- payload = requiredPayload
- }
- batchSize := bcap.BatchSize()
- if headroom <= 0 || payload <= 0 || batchSize <= 0 {
- return
- }
- bp.enabled = true
- bp.inside = bcap
- bp.headroom = headroom
- bp.payloadCap = payload
- bp.batchSize = batchSize
- bp.routines = routines
- bp.pool = overlay.NewPacketPool(headroom, payload)
- queueCap := batchSize * defaultBatchQueueDepthFactor
- if queueDepth > 0 {
- queueCap = queueDepth
- }
- if queueCap < batchSize {
- queueCap = batchSize
- }
- bp.rxQueues = make([]chan *overlay.Packet, routines)
- bp.txQueues = make([]chan queuedDatagram, routines)
- bp.tunQueues = make([]chan *overlay.Packet, routines)
- for i := 0; i < routines; i++ {
- bp.rxQueues[i] = make(chan *overlay.Packet, queueCap)
- bp.txQueues[i] = make(chan queuedDatagram, queueCap)
- bp.tunQueues[i] = make(chan *overlay.Packet, queueCap)
- }
- }
- func (bp *batchPipelines) Pool() *overlay.PacketPool {
- if bp == nil || !bp.enabled {
- return nil
- }
- return bp.pool
- }
- func (bp *batchPipelines) Enabled() bool {
- return bp != nil && bp.enabled
- }
- func (bp *batchPipelines) batchSizeHint() int {
- if bp == nil || bp.batchSize <= 0 {
- return 1
- }
- return bp.batchSize
- }
- func (bp *batchPipelines) rxQueue(i int) chan *overlay.Packet {
- if bp == nil || !bp.enabled || i < 0 || i >= len(bp.rxQueues) {
- return nil
- }
- return bp.rxQueues[i]
- }
- func (bp *batchPipelines) txQueue(i int) chan queuedDatagram {
- if bp == nil || !bp.enabled || i < 0 || i >= len(bp.txQueues) {
- return nil
- }
- return bp.txQueues[i]
- }
- func (bp *batchPipelines) tunQueue(i int) chan *overlay.Packet {
- if bp == nil || !bp.enabled || i < 0 || i >= len(bp.tunQueues) {
- return nil
- }
- return bp.tunQueues[i]
- }
- func (bp *batchPipelines) txQueueLen(i int) int {
- q := bp.txQueue(i)
- if q == nil {
- return 0
- }
- return len(q)
- }
- func (bp *batchPipelines) tunQueueLen(i int) int {
- q := bp.tunQueue(i)
- if q == nil {
- return 0
- }
- return len(q)
- }
- func (bp *batchPipelines) enqueueRx(i int, pkt *overlay.Packet) bool {
- q := bp.rxQueue(i)
- if q == nil {
- return false
- }
- q <- pkt
- return true
- }
- func (bp *batchPipelines) enqueueTx(i int, pkt *overlay.Packet, addr netip.AddrPort) bool {
- q := bp.txQueue(i)
- if q == nil {
- return false
- }
- q <- queuedDatagram{packet: pkt, addr: addr}
- return true
- }
- func (bp *batchPipelines) enqueueTun(i int, pkt *overlay.Packet) bool {
- q := bp.tunQueue(i)
- if q == nil {
- return false
- }
- q <- pkt
- return true
- }
- func (bp *batchPipelines) newPacket() *overlay.Packet {
- if bp == nil || !bp.enabled || bp.pool == nil {
- return nil
- }
- return bp.pool.Get()
- }
|