udp_linux.go 21 KB


  1. //go:build !android && !e2e_testing
  2. // +build !android,!e2e_testing
  3. package udp
  4. import (
  5. "encoding/binary"
  6. "errors"
  7. "fmt"
  8. "net"
  9. "net/netip"
  10. "sync"
  11. "syscall"
  12. "time"
  13. "unsafe"
  14. "github.com/rcrowley/go-metrics"
  15. "github.com/sirupsen/logrus"
  16. "github.com/slackhq/nebula/config"
  17. "golang.org/x/sys/unix"
  18. )
  19. var readTimeout = unix.NsecToTimeval(int64(time.Millisecond * 500))
  20. const (
  21. defaultGSOMaxSegments = 128
  22. defaultGSOFlushTimeout = 80 * time.Microsecond
  23. defaultGROReadBufferSize = MTU * defaultGSOMaxSegments
  24. maxGSOBatchBytes = 0xFFFF
  25. )
  26. var (
  27. errGSOFallback = errors.New("udp gso fallback")
  28. errGSODisabled = errors.New("udp gso disabled")
  29. )
  30. type StdConn struct {
  31. sysFd int
  32. isV4 bool
  33. l *logrus.Logger
  34. batch int
  35. enableGRO bool
  36. enableGSO bool
  37. gsoMu sync.Mutex
  38. gsoBuf []byte
  39. gsoAddr netip.AddrPort
  40. gsoSegSize int
  41. gsoSegments int
  42. gsoMaxSegments int
  43. gsoMaxBytes int
  44. gsoFlushTimeout time.Duration
  45. gsoTimer *time.Timer
  46. groBufSize int
  47. }
  48. func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch int) (Conn, error) {
  49. af := unix.AF_INET6
  50. if ip.Is4() {
  51. af = unix.AF_INET
  52. }
  53. syscall.ForkLock.RLock()
  54. fd, err := unix.Socket(af, unix.SOCK_DGRAM, unix.IPPROTO_UDP)
  55. if err == nil {
  56. unix.CloseOnExec(fd)
  57. }
  58. syscall.ForkLock.RUnlock()
  59. if err != nil {
  60. unix.Close(fd)
  61. return nil, fmt.Errorf("unable to open socket: %s", err)
  62. }
  63. if multi {
  64. if err = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEPORT, 1); err != nil {
  65. return nil, fmt.Errorf("unable to set SO_REUSEPORT: %s", err)
  66. }
  67. }
  68. // Set a read timeout
  69. if err = unix.SetsockoptTimeval(fd, unix.SOL_SOCKET, unix.SO_RCVTIMEO, &readTimeout); err != nil {
  70. return nil, fmt.Errorf("unable to set SO_RCVTIMEO: %s", err)
  71. }
  72. var sa unix.Sockaddr
  73. if ip.Is4() {
  74. sa4 := &unix.SockaddrInet4{Port: port}
  75. sa4.Addr = ip.As4()
  76. sa = sa4
  77. } else {
  78. sa6 := &unix.SockaddrInet6{Port: port}
  79. sa6.Addr = ip.As16()
  80. sa = sa6
  81. }
  82. if err = unix.Bind(fd, sa); err != nil {
  83. return nil, fmt.Errorf("unable to bind to socket: %s", err)
  84. }
  85. return &StdConn{
  86. sysFd: fd,
  87. isV4: ip.Is4(),
  88. l: l,
  89. batch: batch,
  90. gsoMaxSegments: defaultGSOMaxSegments,
  91. gsoMaxBytes: MTU * defaultGSOMaxSegments,
  92. gsoFlushTimeout: defaultGSOFlushTimeout,
  93. groBufSize: MTU,
  94. }, err
  95. }
  96. func (u *StdConn) Rebind() error {
  97. return nil
  98. }
  99. func (u *StdConn) SetRecvBuffer(n int) error {
  100. return unix.SetsockoptInt(u.sysFd, unix.SOL_SOCKET, unix.SO_RCVBUFFORCE, n)
  101. }
  102. func (u *StdConn) SetSendBuffer(n int) error {
  103. return unix.SetsockoptInt(u.sysFd, unix.SOL_SOCKET, unix.SO_SNDBUFFORCE, n)
  104. }
  105. func (u *StdConn) SetSoMark(mark int) error {
  106. return unix.SetsockoptInt(u.sysFd, unix.SOL_SOCKET, unix.SO_MARK, mark)
  107. }
  108. func (u *StdConn) GetRecvBuffer() (int, error) {
  109. return unix.GetsockoptInt(int(u.sysFd), unix.SOL_SOCKET, unix.SO_RCVBUF)
  110. }
  111. func (u *StdConn) GetSendBuffer() (int, error) {
  112. return unix.GetsockoptInt(int(u.sysFd), unix.SOL_SOCKET, unix.SO_SNDBUF)
  113. }
  114. func (u *StdConn) GetSoMark() (int, error) {
  115. return unix.GetsockoptInt(int(u.sysFd), unix.SOL_SOCKET, unix.SO_MARK)
  116. }
  117. func (u *StdConn) LocalAddr() (netip.AddrPort, error) {
  118. sa, err := unix.Getsockname(u.sysFd)
  119. if err != nil {
  120. return netip.AddrPort{}, err
  121. }
  122. switch sa := sa.(type) {
  123. case *unix.SockaddrInet4:
  124. return netip.AddrPortFrom(netip.AddrFrom4(sa.Addr), uint16(sa.Port)), nil
  125. case *unix.SockaddrInet6:
  126. return netip.AddrPortFrom(netip.AddrFrom16(sa.Addr), uint16(sa.Port)), nil
  127. default:
  128. return netip.AddrPort{}, fmt.Errorf("unsupported sock type: %T", sa)
  129. }
  130. }
  131. func (u *StdConn) ListenOut(r EncReader) error {
  132. var (
  133. ip netip.Addr
  134. controls [][]byte
  135. )
  136. bufSize := u.readBufferSize()
  137. msgs, buffers, names := u.PrepareRawMessages(u.batch, bufSize)
  138. read := u.ReadMulti
  139. if u.batch == 1 {
  140. read = u.ReadSingle
  141. }
  142. for {
  143. desired := u.readBufferSize()
  144. if len(buffers) == 0 || cap(buffers[0]) < desired {
  145. msgs, buffers, names = u.PrepareRawMessages(u.batch, desired)
  146. controls = nil
  147. }
  148. if u.enableGRO {
  149. if controls == nil {
  150. controls = make([][]byte, len(msgs))
  151. for i := range controls {
  152. controls[i] = make([]byte, unix.CmsgSpace(4))
  153. }
  154. }
  155. for i := range msgs {
  156. setRawMessageControl(&msgs[i], controls[i])
  157. }
  158. } else if controls != nil {
  159. for i := range msgs {
  160. setRawMessageControl(&msgs[i], nil)
  161. }
  162. controls = nil
  163. }
  164. n, err := read(msgs)
  165. if err != nil {
  166. return err
  167. }
  168. for i := 0; i < n; i++ {
  169. // Its ok to skip the ok check here, the slicing is the only error that can occur and it will panic
  170. if u.isV4 {
  171. ip, _ = netip.AddrFromSlice(names[i][4:8])
  172. } else {
  173. ip, _ = netip.AddrFromSlice(names[i][8:24])
  174. }
  175. addr := netip.AddrPortFrom(ip.Unmap(), binary.BigEndian.Uint16(names[i][2:4]))
  176. payload := buffers[i][:msgs[i].Len]
  177. if u.enableGRO && u.l.IsLevelEnabled(logrus.DebugLevel) {
  178. ctrlLen := getRawMessageControlLen(&msgs[i])
  179. msgFlags := getRawMessageFlags(&msgs[i])
  180. u.l.WithFields(logrus.Fields{
  181. "tag": "gro-debug",
  182. "stage": "recv",
  183. "payload_len": len(payload),
  184. "ctrl_len": ctrlLen,
  185. "msg_flags": msgFlags,
  186. }).Debug("gro batch data")
  187. if controls != nil && ctrlLen > 0 {
  188. maxDump := ctrlLen
  189. if maxDump > 16 {
  190. maxDump = 16
  191. }
  192. u.l.WithFields(logrus.Fields{
  193. "tag": "gro-debug",
  194. "stage": "control-bytes",
  195. "control_hex": fmt.Sprintf("%x", controls[i][:maxDump]),
  196. "datalen": ctrlLen,
  197. }).Debug("gro control dump")
  198. }
  199. }
  200. sawControl := false
  201. if controls != nil {
  202. if ctrlLen := getRawMessageControlLen(&msgs[i]); ctrlLen > 0 {
  203. if segSize, segCount := parseGROControl(controls[i][:ctrlLen]); segSize > 0 {
  204. sawControl = true
  205. if u.l.IsLevelEnabled(logrus.DebugLevel) {
  206. u.l.WithFields(logrus.Fields{
  207. "tag": "gro-debug",
  208. "stage": "control",
  209. "seg_size": segSize,
  210. "seg_count": segCount,
  211. "payloadLen": len(payload),
  212. }).Debug("gro control parsed")
  213. }
  214. segSize = normalizeGROSegSize(segSize, segCount, len(payload))
  215. if segSize > 0 && segSize < len(payload) {
  216. if u.emitGROSegments(r, addr, payload, segSize) {
  217. continue
  218. }
  219. }
  220. }
  221. }
  222. }
  223. if u.enableGRO && len(payload) > MTU {
  224. if !sawControl && u.l.IsLevelEnabled(logrus.DebugLevel) {
  225. u.l.WithFields(logrus.Fields{
  226. "tag": "gro-debug",
  227. "stage": "fallback",
  228. "payload_len": len(payload),
  229. }).Debug("gro control missing; splitting payload by MTU")
  230. }
  231. if u.emitGROSegments(r, addr, payload, MTU) {
  232. continue
  233. }
  234. }
  235. r(addr, payload)
  236. }
  237. }
  238. }
  239. func (u *StdConn) readBufferSize() int {
  240. if u.enableGRO && u.groBufSize > MTU {
  241. return u.groBufSize
  242. }
  243. return MTU
  244. }
  245. func (u *StdConn) ReadSingle(msgs []rawMessage) (int, error) {
  246. for {
  247. n, _, err := unix.Syscall6(
  248. unix.SYS_RECVMSG,
  249. uintptr(u.sysFd),
  250. uintptr(unsafe.Pointer(&(msgs[0].Hdr))),
  251. 0,
  252. 0,
  253. 0,
  254. 0,
  255. )
  256. if err != 0 {
  257. if err == unix.EAGAIN || err == unix.EINTR {
  258. continue
  259. }
  260. return 0, &net.OpError{Op: "recvmsg", Err: err}
  261. }
  262. msgs[0].Len = uint32(n)
  263. return 1, nil
  264. }
  265. }
  266. func (u *StdConn) ReadMulti(msgs []rawMessage) (int, error) {
  267. for {
  268. n, _, err := unix.Syscall6(
  269. unix.SYS_RECVMMSG,
  270. uintptr(u.sysFd),
  271. uintptr(unsafe.Pointer(&msgs[0])),
  272. uintptr(len(msgs)),
  273. unix.MSG_WAITFORONE,
  274. 0,
  275. 0,
  276. )
  277. if err != 0 {
  278. if err == unix.EAGAIN || err == unix.EINTR {
  279. continue
  280. }
  281. return 0, &net.OpError{Op: "recvmmsg", Err: err}
  282. }
  283. return int(n), nil
  284. }
  285. }
  286. func (u *StdConn) WriteTo(b []byte, ip netip.AddrPort) error {
  287. if u.enableGSO && ip.IsValid() {
  288. if err := u.queueGSOPacket(b, ip); err == nil {
  289. return nil
  290. } else if !errors.Is(err, errGSOFallback) {
  291. return err
  292. }
  293. }
  294. if u.isV4 {
  295. return u.writeTo4(b, ip)
  296. }
  297. return u.writeTo6(b, ip)
  298. }
  299. func (u *StdConn) WriteBatch(pkts []BatchPacket) (int, error) {
  300. if len(pkts) == 0 {
  301. return 0, nil
  302. }
  303. msgs := make([]rawMessage, 0, len(pkts))
  304. iovs := make([]iovec, 0, len(pkts))
  305. names := make([][unix.SizeofSockaddrInet6]byte, 0, len(pkts))
  306. sent := 0
  307. for _, pkt := range pkts {
  308. if len(pkt.Payload) == 0 {
  309. sent++
  310. continue
  311. }
  312. if u.enableGSO && pkt.Addr.IsValid() {
  313. if err := u.queueGSOPacket(pkt.Payload, pkt.Addr); err == nil {
  314. sent++
  315. continue
  316. } else if !errors.Is(err, errGSOFallback) {
  317. return sent, err
  318. }
  319. }
  320. if !pkt.Addr.IsValid() {
  321. if err := u.WriteTo(pkt.Payload, pkt.Addr); err != nil {
  322. return sent, err
  323. }
  324. sent++
  325. continue
  326. }
  327. msgs = append(msgs, rawMessage{})
  328. iovs = append(iovs, iovec{})
  329. names = append(names, [unix.SizeofSockaddrInet6]byte{})
  330. idx := len(msgs) - 1
  331. msg := &msgs[idx]
  332. iov := &iovs[idx]
  333. name := &names[idx]
  334. setIovecSlice(iov, pkt.Payload)
  335. msg.Hdr.Iov = iov
  336. msg.Hdr.Iovlen = 1
  337. setRawMessageControl(msg, nil)
  338. msg.Hdr.Flags = 0
  339. nameLen, err := u.encodeSockaddr(name[:], pkt.Addr)
  340. if err != nil {
  341. return sent, err
  342. }
  343. msg.Hdr.Name = &name[0]
  344. msg.Hdr.Namelen = nameLen
  345. }
  346. if len(msgs) == 0 {
  347. return sent, nil
  348. }
  349. offset := 0
  350. for offset < len(msgs) {
  351. n, _, errno := unix.Syscall6(
  352. unix.SYS_SENDMMSG,
  353. uintptr(u.sysFd),
  354. uintptr(unsafe.Pointer(&msgs[offset])),
  355. uintptr(len(msgs)-offset),
  356. 0,
  357. 0,
  358. 0,
  359. )
  360. if errno != 0 {
  361. if errno == unix.EINTR {
  362. continue
  363. }
  364. return sent + offset, &net.OpError{Op: "sendmmsg", Err: errno}
  365. }
  366. if n == 0 {
  367. break
  368. }
  369. offset += int(n)
  370. }
  371. return sent + len(msgs), nil
  372. }
  373. func (u *StdConn) encodeSockaddr(dst []byte, addr netip.AddrPort) (uint32, error) {
  374. if u.isV4 {
  375. if !addr.Addr().Is4() {
  376. return 0, fmt.Errorf("Listener is IPv4, but writing to IPv6 remote")
  377. }
  378. var sa unix.RawSockaddrInet4
  379. sa.Family = unix.AF_INET
  380. sa.Addr = addr.Addr().As4()
  381. binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&sa.Port))[:], addr.Port())
  382. size := unix.SizeofSockaddrInet4
  383. copy(dst[:size], (*(*[unix.SizeofSockaddrInet4]byte)(unsafe.Pointer(&sa)))[:])
  384. return uint32(size), nil
  385. }
  386. var sa unix.RawSockaddrInet6
  387. sa.Family = unix.AF_INET6
  388. sa.Addr = addr.Addr().As16()
  389. binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&sa.Port))[:], addr.Port())
  390. size := unix.SizeofSockaddrInet6
  391. copy(dst[:size], (*(*[unix.SizeofSockaddrInet6]byte)(unsafe.Pointer(&sa)))[:])
  392. return uint32(size), nil
  393. }
  394. func (u *StdConn) writeTo6(b []byte, ip netip.AddrPort) error {
  395. var rsa unix.RawSockaddrInet6
  396. rsa.Family = unix.AF_INET6
  397. rsa.Addr = ip.Addr().As16()
  398. binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&rsa.Port))[:], ip.Port())
  399. for {
  400. _, _, err := unix.Syscall6(
  401. unix.SYS_SENDTO,
  402. uintptr(u.sysFd),
  403. uintptr(unsafe.Pointer(&b[0])),
  404. uintptr(len(b)),
  405. uintptr(0),
  406. uintptr(unsafe.Pointer(&rsa)),
  407. uintptr(unix.SizeofSockaddrInet6),
  408. )
  409. if err != 0 {
  410. return &net.OpError{Op: "sendto", Err: err}
  411. }
  412. return nil
  413. }
  414. }
  415. func (u *StdConn) writeTo4(b []byte, ip netip.AddrPort) error {
  416. if !ip.Addr().Is4() {
  417. return fmt.Errorf("Listener is IPv4, but writing to IPv6 remote")
  418. }
  419. var rsa unix.RawSockaddrInet4
  420. rsa.Family = unix.AF_INET
  421. rsa.Addr = ip.Addr().As4()
  422. binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&rsa.Port))[:], ip.Port())
  423. for {
  424. _, _, err := unix.Syscall6(
  425. unix.SYS_SENDTO,
  426. uintptr(u.sysFd),
  427. uintptr(unsafe.Pointer(&b[0])),
  428. uintptr(len(b)),
  429. uintptr(0),
  430. uintptr(unsafe.Pointer(&rsa)),
  431. uintptr(unix.SizeofSockaddrInet4),
  432. )
  433. if err != 0 {
  434. return &net.OpError{Op: "sendto", Err: err}
  435. }
  436. return nil
  437. }
  438. }
  439. func (u *StdConn) ReloadConfig(c *config.C) {
  440. b := c.GetInt("listen.read_buffer", 0)
  441. if b > 0 {
  442. err := u.SetRecvBuffer(b)
  443. if err == nil {
  444. s, err := u.GetRecvBuffer()
  445. if err == nil {
  446. u.l.WithField("size", s).Info("listen.read_buffer was set")
  447. } else {
  448. u.l.WithError(err).Warn("Failed to get listen.read_buffer")
  449. }
  450. } else {
  451. u.l.WithError(err).Error("Failed to set listen.read_buffer")
  452. }
  453. }
  454. b = c.GetInt("listen.write_buffer", 0)
  455. if b > 0 {
  456. err := u.SetSendBuffer(b)
  457. if err == nil {
  458. s, err := u.GetSendBuffer()
  459. if err == nil {
  460. u.l.WithField("size", s).Info("listen.write_buffer was set")
  461. } else {
  462. u.l.WithError(err).Warn("Failed to get listen.write_buffer")
  463. }
  464. } else {
  465. u.l.WithError(err).Error("Failed to set listen.write_buffer")
  466. }
  467. }
  468. b = c.GetInt("listen.so_mark", 0)
  469. s, err := u.GetSoMark()
  470. if b > 0 || (err == nil && s != 0) {
  471. err := u.SetSoMark(b)
  472. if err == nil {
  473. s, err := u.GetSoMark()
  474. if err == nil {
  475. u.l.WithField("mark", s).Info("listen.so_mark was set")
  476. } else {
  477. u.l.WithError(err).Warn("Failed to get listen.so_mark")
  478. }
  479. } else {
  480. u.l.WithError(err).Error("Failed to set listen.so_mark")
  481. }
  482. }
  483. u.configureGRO(c)
  484. u.configureGSO(c)
  485. }
  486. func (u *StdConn) configureGRO(c *config.C) {
  487. if c == nil {
  488. return
  489. }
  490. enable := c.GetBool("listen.enable_gro", true)
  491. if enable == u.enableGRO {
  492. if enable {
  493. if size := c.GetInt("listen.gro_read_buffer", 0); size > 0 {
  494. u.setGROBufferSize(size)
  495. }
  496. }
  497. return
  498. }
  499. if enable {
  500. if err := unix.SetsockoptInt(u.sysFd, unix.SOL_UDP, unix.UDP_GRO, 1); err != nil {
  501. u.l.WithError(err).Warn("Failed to enable UDP GRO")
  502. return
  503. }
  504. u.enableGRO = true
  505. u.setGROBufferSize(c.GetInt("listen.gro_read_buffer", defaultGROReadBufferSize))
  506. u.l.WithField("buffer_size", u.groBufSize).Info("UDP GRO enabled")
  507. return
  508. }
  509. if err := unix.SetsockoptInt(u.sysFd, unix.SOL_UDP, unix.UDP_GRO, 0); err != nil && err != unix.ENOPROTOOPT {
  510. u.l.WithError(err).Warn("Failed to disable UDP GRO")
  511. }
  512. u.enableGRO = false
  513. u.groBufSize = MTU
  514. }
  515. func (u *StdConn) configureGSO(c *config.C) {
  516. enable := c.GetBool("listen.enable_gso", true)
  517. if !enable {
  518. u.disableGSO()
  519. } else {
  520. u.enableGSO = true
  521. }
  522. segments := c.GetInt("listen.gso_max_segments", defaultGSOMaxSegments)
  523. if segments < 1 {
  524. segments = 1
  525. }
  526. u.gsoMaxSegments = segments
  527. maxBytes := c.GetInt("listen.gso_max_bytes", 0)
  528. if maxBytes <= 0 {
  529. maxBytes = MTU * segments
  530. }
  531. if maxBytes > maxGSOBatchBytes {
  532. u.l.WithField("requested", maxBytes).Warn("listen.gso_max_bytes larger than UDP limit; clamping")
  533. maxBytes = maxGSOBatchBytes
  534. }
  535. u.gsoMaxBytes = maxBytes
  536. timeout := c.GetDuration("listen.gso_flush_timeout", defaultGSOFlushTimeout)
  537. if timeout < 0 {
  538. timeout = 0
  539. }
  540. u.gsoFlushTimeout = timeout
  541. }
  542. func (u *StdConn) setGROBufferSize(size int) {
  543. if size < MTU {
  544. size = defaultGROReadBufferSize
  545. }
  546. if size > maxGSOBatchBytes {
  547. size = maxGSOBatchBytes
  548. }
  549. u.groBufSize = size
  550. }
  551. func (u *StdConn) disableGSO() {
  552. u.gsoMu.Lock()
  553. defer u.gsoMu.Unlock()
  554. u.enableGSO = false
  555. _ = u.flushGSOlocked()
  556. u.gsoBuf = nil
  557. u.gsoSegments = 0
  558. u.gsoSegSize = 0
  559. u.stopGSOTimerLocked()
  560. }
  561. func (u *StdConn) getMemInfo(meminfo *[unix.SK_MEMINFO_VARS]uint32) error {
  562. var vallen uint32 = 4 * unix.SK_MEMINFO_VARS
  563. _, _, err := unix.Syscall6(unix.SYS_GETSOCKOPT, uintptr(u.sysFd), uintptr(unix.SOL_SOCKET), uintptr(unix.SO_MEMINFO), uintptr(unsafe.Pointer(meminfo)), uintptr(unsafe.Pointer(&vallen)), 0)
  564. if err != 0 {
  565. return err
  566. }
  567. return nil
  568. }
  569. func (u *StdConn) queueGSOPacket(b []byte, addr netip.AddrPort) error {
  570. if len(b) == 0 {
  571. return nil
  572. }
  573. u.gsoMu.Lock()
  574. defer u.gsoMu.Unlock()
  575. if !u.enableGSO || !addr.IsValid() || len(b) > u.gsoMaxBytes {
  576. if err := u.flushGSOlocked(); err != nil {
  577. return err
  578. }
  579. return errGSOFallback
  580. }
  581. if u.gsoSegments == 0 {
  582. if cap(u.gsoBuf) < u.gsoMaxBytes {
  583. u.gsoBuf = make([]byte, 0, u.gsoMaxBytes)
  584. }
  585. u.gsoAddr = addr
  586. u.gsoSegSize = len(b)
  587. } else if addr != u.gsoAddr || len(b) != u.gsoSegSize {
  588. if err := u.flushGSOlocked(); err != nil {
  589. return err
  590. }
  591. if cap(u.gsoBuf) < u.gsoMaxBytes {
  592. u.gsoBuf = make([]byte, 0, u.gsoMaxBytes)
  593. }
  594. u.gsoAddr = addr
  595. u.gsoSegSize = len(b)
  596. }
  597. if len(u.gsoBuf)+len(b) > u.gsoMaxBytes {
  598. if err := u.flushGSOlocked(); err != nil {
  599. return err
  600. }
  601. if cap(u.gsoBuf) < u.gsoMaxBytes {
  602. u.gsoBuf = make([]byte, 0, u.gsoMaxBytes)
  603. }
  604. u.gsoAddr = addr
  605. u.gsoSegSize = len(b)
  606. }
  607. u.gsoBuf = append(u.gsoBuf, b...)
  608. u.gsoSegments++
  609. if u.gsoSegments >= u.gsoMaxSegments || u.gsoFlushTimeout <= 0 {
  610. return u.flushGSOlocked()
  611. }
  612. u.scheduleGSOFlushLocked()
  613. return nil
  614. }
  615. func (u *StdConn) flushGSOlocked() error {
  616. if u.gsoSegments == 0 {
  617. u.stopGSOTimerLocked()
  618. return nil
  619. }
  620. payload := append([]byte(nil), u.gsoBuf...)
  621. addr := u.gsoAddr
  622. segSize := u.gsoSegSize
  623. u.gsoBuf = u.gsoBuf[:0]
  624. u.gsoSegments = 0
  625. u.gsoSegSize = 0
  626. u.stopGSOTimerLocked()
  627. if segSize <= 0 {
  628. return errGSOFallback
  629. }
  630. err := u.sendSegmented(payload, addr, segSize)
  631. if errors.Is(err, errGSODisabled) {
  632. u.l.WithField("addr", addr).Warn("UDP GSO disabled by kernel, falling back to sendto")
  633. u.enableGSO = false
  634. return u.sendSegmentsIndividually(payload, addr, segSize)
  635. }
  636. return err
  637. }
  638. func (u *StdConn) sendSegmented(payload []byte, addr netip.AddrPort, segSize int) error {
  639. if len(payload) == 0 {
  640. return nil
  641. }
  642. control := make([]byte, unix.CmsgSpace(2))
  643. hdr := (*unix.Cmsghdr)(unsafe.Pointer(&control[0]))
  644. hdr.Level = unix.SOL_UDP
  645. hdr.Type = unix.UDP_SEGMENT
  646. setCmsgLen(hdr, unix.CmsgLen(2))
  647. binary.NativeEndian.PutUint16(control[unix.CmsgLen(0):unix.CmsgLen(0)+2], uint16(segSize))
  648. var sa unix.Sockaddr
  649. if addr.Addr().Is4() {
  650. var sa4 unix.SockaddrInet4
  651. sa4.Port = int(addr.Port())
  652. sa4.Addr = addr.Addr().As4()
  653. sa = &sa4
  654. } else {
  655. var sa6 unix.SockaddrInet6
  656. sa6.Port = int(addr.Port())
  657. sa6.Addr = addr.Addr().As16()
  658. sa = &sa6
  659. }
  660. if _, err := unix.SendmsgN(u.sysFd, payload, control, sa, 0); err != nil {
  661. if errno, ok := err.(syscall.Errno); ok && (errno == unix.EINVAL || errno == unix.ENOTSUP || errno == unix.EOPNOTSUPP) {
  662. return errGSODisabled
  663. }
  664. return &net.OpError{Op: "sendmsg", Err: err}
  665. }
  666. return nil
  667. }
  668. func (u *StdConn) sendSegmentsIndividually(buf []byte, addr netip.AddrPort, segSize int) error {
  669. if segSize <= 0 {
  670. return errGSOFallback
  671. }
  672. for offset := 0; offset < len(buf); offset += segSize {
  673. end := offset + segSize
  674. if end > len(buf) {
  675. end = len(buf)
  676. }
  677. var err error
  678. if u.isV4 {
  679. err = u.writeTo4(buf[offset:end], addr)
  680. } else {
  681. err = u.writeTo6(buf[offset:end], addr)
  682. }
  683. if err != nil {
  684. return err
  685. }
  686. }
  687. return nil
  688. }
  689. func (u *StdConn) scheduleGSOFlushLocked() {
  690. if u.gsoTimer == nil {
  691. u.gsoTimer = time.AfterFunc(u.gsoFlushTimeout, u.gsoFlushTimer)
  692. return
  693. }
  694. u.gsoTimer.Reset(u.gsoFlushTimeout)
  695. }
  696. func (u *StdConn) stopGSOTimerLocked() {
  697. if u.gsoTimer != nil {
  698. u.gsoTimer.Stop()
  699. u.gsoTimer = nil
  700. }
  701. }
  702. func (u *StdConn) gsoFlushTimer() {
  703. u.gsoMu.Lock()
  704. defer u.gsoMu.Unlock()
  705. _ = u.flushGSOlocked()
  706. }
  707. func parseGROControl(control []byte) (int, int) {
  708. if len(control) == 0 {
  709. return 0, 0
  710. }
  711. cmsgs, err := unix.ParseSocketControlMessage(control)
  712. if err != nil {
  713. return 0, 0
  714. }
  715. for _, c := range cmsgs {
  716. if c.Header.Level == unix.SOL_UDP && c.Header.Type == unix.UDP_GRO && len(c.Data) >= 2 {
  717. segSize := int(binary.NativeEndian.Uint16(c.Data[:2]))
  718. segCount := 0
  719. if len(c.Data) >= 4 {
  720. segCount = int(binary.NativeEndian.Uint16(c.Data[2:4]))
  721. }
  722. return segSize, segCount
  723. }
  724. }
  725. return 0, 0
  726. }
  727. func (u *StdConn) emitGROSegments(r EncReader, addr netip.AddrPort, payload []byte, segSize int) bool {
  728. if segSize <= 0 {
  729. return false
  730. }
  731. for offset := 0; offset < len(payload); offset += segSize {
  732. end := offset + segSize
  733. if end > len(payload) {
  734. end = len(payload)
  735. }
  736. segment := make([]byte, end-offset)
  737. copy(segment, payload[offset:end])
  738. r(addr, segment)
  739. }
  740. return true
  741. }
  742. func normalizeGROSegSize(segSize, segCount, total int) int {
  743. if segSize <= 0 || total <= 0 {
  744. return segSize
  745. }
  746. if segSize > total && segCount > 0 {
  747. segSize = total / segCount
  748. if segSize == 0 {
  749. segSize = total
  750. }
  751. }
  752. if segCount <= 1 && segSize > 0 && total > segSize {
  753. calculated := total / segSize
  754. if calculated <= 1 {
  755. calculated = (total + segSize - 1) / segSize
  756. }
  757. if calculated > 1 {
  758. segCount = calculated
  759. }
  760. }
  761. if segSize > MTU {
  762. return MTU
  763. }
  764. return segSize
  765. }
  766. func (u *StdConn) Close() error {
  767. u.disableGSO()
  768. return syscall.Close(u.sysFd)
  769. }
  770. func NewUDPStatsEmitter(udpConns []Conn) func() {
  771. // Check if our kernel supports SO_MEMINFO before registering the gauges
  772. var udpGauges [][unix.SK_MEMINFO_VARS]metrics.Gauge
  773. var meminfo [unix.SK_MEMINFO_VARS]uint32
  774. if err := udpConns[0].(*StdConn).getMemInfo(&meminfo); err == nil {
  775. udpGauges = make([][unix.SK_MEMINFO_VARS]metrics.Gauge, len(udpConns))
  776. for i := range udpConns {
  777. udpGauges[i] = [unix.SK_MEMINFO_VARS]metrics.Gauge{
  778. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.rmem_alloc", i), nil),
  779. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.rcvbuf", i), nil),
  780. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.wmem_alloc", i), nil),
  781. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.sndbuf", i), nil),
  782. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.fwd_alloc", i), nil),
  783. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.wmem_queued", i), nil),
  784. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.optmem", i), nil),
  785. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.backlog", i), nil),
  786. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.drops", i), nil),
  787. }
  788. }
  789. }
  790. return func() {
  791. for i, gauges := range udpGauges {
  792. if err := udpConns[i].(*StdConn).getMemInfo(&meminfo); err == nil {
  793. for j := 0; j < unix.SK_MEMINFO_VARS; j++ {
  794. gauges[j].Update(int64(meminfo[j]))
  795. }
  796. }
  797. }
  798. }
  799. }