batch_pipeline.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package nebula
  2. import (
  3. "net/netip"
  4. "github.com/slackhq/nebula/overlay"
  5. "github.com/slackhq/nebula/udp"
  6. )
  7. // batchPipelines tracks whether the inside device can operate on packet batches
  8. // and, if so, holds the shared packet pool sized for the virtio headroom and
  9. // payload limits advertised by the device. It also owns the fan-in/fan-out
  10. // queues between the TUN readers, encrypt/decrypt workers, and the UDP writers.
  11. type batchPipelines struct {
  12. enabled bool
  13. inside overlay.BatchCapableDevice
  14. headroom int
  15. payloadCap int
  16. pool *overlay.PacketPool
  17. batchSize int
  18. routines int
  19. rxQueues []chan *overlay.Packet
  20. txQueues []chan queuedDatagram
  21. tunQueues []chan *overlay.Packet
  22. }
  23. type queuedDatagram struct {
  24. packet *overlay.Packet
  25. addr netip.AddrPort
  26. }
  27. func (bp *batchPipelines) init(device overlay.Device, routines int, queueDepth int, maxSegments int) {
  28. if device == nil || routines <= 0 {
  29. return
  30. }
  31. bcap, ok := device.(overlay.BatchCapableDevice)
  32. if !ok {
  33. return
  34. }
  35. headroom := bcap.BatchHeadroom()
  36. payload := bcap.BatchPayloadCap()
  37. if maxSegments < 1 {
  38. maxSegments = 1
  39. }
  40. requiredPayload := udp.MTU * maxSegments
  41. if payload < requiredPayload {
  42. payload = requiredPayload
  43. }
  44. batchSize := bcap.BatchSize()
  45. if headroom <= 0 || payload <= 0 || batchSize <= 0 {
  46. return
  47. }
  48. bp.enabled = true
  49. bp.inside = bcap
  50. bp.headroom = headroom
  51. bp.payloadCap = payload
  52. bp.batchSize = batchSize
  53. bp.routines = routines
  54. bp.pool = overlay.NewPacketPool(headroom, payload)
  55. queueCap := batchSize * defaultBatchQueueDepthFactor
  56. if queueDepth > 0 {
  57. queueCap = queueDepth
  58. }
  59. if queueCap < batchSize {
  60. queueCap = batchSize
  61. }
  62. bp.rxQueues = make([]chan *overlay.Packet, routines)
  63. bp.txQueues = make([]chan queuedDatagram, routines)
  64. bp.tunQueues = make([]chan *overlay.Packet, routines)
  65. for i := 0; i < routines; i++ {
  66. bp.rxQueues[i] = make(chan *overlay.Packet, queueCap)
  67. bp.txQueues[i] = make(chan queuedDatagram, queueCap)
  68. bp.tunQueues[i] = make(chan *overlay.Packet, queueCap)
  69. }
  70. }
  71. func (bp *batchPipelines) Pool() *overlay.PacketPool {
  72. if bp == nil || !bp.enabled {
  73. return nil
  74. }
  75. return bp.pool
  76. }
  77. func (bp *batchPipelines) Enabled() bool {
  78. return bp != nil && bp.enabled
  79. }
  80. func (bp *batchPipelines) batchSizeHint() int {
  81. if bp == nil || bp.batchSize <= 0 {
  82. return 1
  83. }
  84. return bp.batchSize
  85. }
  86. func (bp *batchPipelines) rxQueue(i int) chan *overlay.Packet {
  87. if bp == nil || !bp.enabled || i < 0 || i >= len(bp.rxQueues) {
  88. return nil
  89. }
  90. return bp.rxQueues[i]
  91. }
  92. func (bp *batchPipelines) txQueue(i int) chan queuedDatagram {
  93. if bp == nil || !bp.enabled || i < 0 || i >= len(bp.txQueues) {
  94. return nil
  95. }
  96. return bp.txQueues[i]
  97. }
  98. func (bp *batchPipelines) tunQueue(i int) chan *overlay.Packet {
  99. if bp == nil || !bp.enabled || i < 0 || i >= len(bp.tunQueues) {
  100. return nil
  101. }
  102. return bp.tunQueues[i]
  103. }
  104. func (bp *batchPipelines) txQueueLen(i int) int {
  105. q := bp.txQueue(i)
  106. if q == nil {
  107. return 0
  108. }
  109. return len(q)
  110. }
  111. func (bp *batchPipelines) tunQueueLen(i int) int {
  112. q := bp.tunQueue(i)
  113. if q == nil {
  114. return 0
  115. }
  116. return len(q)
  117. }
  118. func (bp *batchPipelines) enqueueRx(i int, pkt *overlay.Packet) bool {
  119. q := bp.rxQueue(i)
  120. if q == nil {
  121. return false
  122. }
  123. q <- pkt
  124. return true
  125. }
  126. func (bp *batchPipelines) enqueueTx(i int, pkt *overlay.Packet, addr netip.AddrPort) bool {
  127. q := bp.txQueue(i)
  128. if q == nil {
  129. return false
  130. }
  131. q <- queuedDatagram{packet: pkt, addr: addr}
  132. return true
  133. }
  134. func (bp *batchPipelines) enqueueTun(i int, pkt *overlay.Packet) bool {
  135. q := bp.tunQueue(i)
  136. if q == nil {
  137. return false
  138. }
  139. q <- pkt
  140. return true
  141. }
  142. func (bp *batchPipelines) newPacket() *overlay.Packet {
  143. if bp == nil || !bp.enabled || bp.pool == nil {
  144. return nil
  145. }
  146. return bp.pool.Get()
  147. }