udp_linux.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811
  1. //go:build !android && !e2e_testing
  2. // +build !android,!e2e_testing
  3. package udp
  4. import (
  5. "encoding/binary"
  6. "errors"
  7. "fmt"
  8. "net"
  9. "net/netip"
  10. "sync"
  11. "syscall"
  12. "time"
  13. "unsafe"
  14. "github.com/rcrowley/go-metrics"
  15. "github.com/sirupsen/logrus"
  16. "github.com/slackhq/nebula/config"
  17. "golang.org/x/sys/unix"
  18. )
  19. var readTimeout = unix.NsecToTimeval(int64(time.Millisecond * 500))
  20. const (
  21. defaultGSOMaxSegments = 8
  22. defaultGSOFlushTimeout = 150 * time.Microsecond
  23. defaultGROReadBufferSize = MTU * defaultGSOMaxSegments
  24. maxGSOBatchBytes = 0xFFFF
  25. )
  26. var (
  27. errGSOFallback = errors.New("udp gso fallback")
  28. errGSODisabled = errors.New("udp gso disabled")
  29. )
  30. type StdConn struct {
  31. sysFd int
  32. isV4 bool
  33. l *logrus.Logger
  34. batch int
  35. enableGRO bool
  36. enableGSO bool
  37. gsoMu sync.Mutex
  38. gsoBuf []byte
  39. gsoAddr netip.AddrPort
  40. gsoSegSize int
  41. gsoSegments int
  42. gsoMaxSegments int
  43. gsoMaxBytes int
  44. gsoFlushTimeout time.Duration
  45. gsoTimer *time.Timer
  46. groBufSize int
  47. }
  48. func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch int) (Conn, error) {
  49. af := unix.AF_INET6
  50. if ip.Is4() {
  51. af = unix.AF_INET
  52. }
  53. syscall.ForkLock.RLock()
  54. fd, err := unix.Socket(af, unix.SOCK_DGRAM, unix.IPPROTO_UDP)
  55. if err == nil {
  56. unix.CloseOnExec(fd)
  57. }
  58. syscall.ForkLock.RUnlock()
  59. if err != nil {
  60. unix.Close(fd)
  61. return nil, fmt.Errorf("unable to open socket: %s", err)
  62. }
  63. if multi {
  64. if err = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEPORT, 1); err != nil {
  65. return nil, fmt.Errorf("unable to set SO_REUSEPORT: %s", err)
  66. }
  67. }
  68. // Set a read timeout
  69. if err = unix.SetsockoptTimeval(fd, unix.SOL_SOCKET, unix.SO_RCVTIMEO, &readTimeout); err != nil {
  70. return nil, fmt.Errorf("unable to set SO_RCVTIMEO: %s", err)
  71. }
  72. var sa unix.Sockaddr
  73. if ip.Is4() {
  74. sa4 := &unix.SockaddrInet4{Port: port}
  75. sa4.Addr = ip.As4()
  76. sa = sa4
  77. } else {
  78. sa6 := &unix.SockaddrInet6{Port: port}
  79. sa6.Addr = ip.As16()
  80. sa = sa6
  81. }
  82. if err = unix.Bind(fd, sa); err != nil {
  83. return nil, fmt.Errorf("unable to bind to socket: %s", err)
  84. }
  85. return &StdConn{
  86. sysFd: fd,
  87. isV4: ip.Is4(),
  88. l: l,
  89. batch: batch,
  90. gsoMaxSegments: defaultGSOMaxSegments,
  91. gsoMaxBytes: MTU * defaultGSOMaxSegments,
  92. gsoFlushTimeout: defaultGSOFlushTimeout,
  93. groBufSize: MTU,
  94. }, err
  95. }
  96. func (u *StdConn) Rebind() error {
  97. return nil
  98. }
  99. func (u *StdConn) SetRecvBuffer(n int) error {
  100. return unix.SetsockoptInt(u.sysFd, unix.SOL_SOCKET, unix.SO_RCVBUFFORCE, n)
  101. }
  102. func (u *StdConn) SetSendBuffer(n int) error {
  103. return unix.SetsockoptInt(u.sysFd, unix.SOL_SOCKET, unix.SO_SNDBUFFORCE, n)
  104. }
  105. func (u *StdConn) SetSoMark(mark int) error {
  106. return unix.SetsockoptInt(u.sysFd, unix.SOL_SOCKET, unix.SO_MARK, mark)
  107. }
  108. func (u *StdConn) GetRecvBuffer() (int, error) {
  109. return unix.GetsockoptInt(int(u.sysFd), unix.SOL_SOCKET, unix.SO_RCVBUF)
  110. }
  111. func (u *StdConn) GetSendBuffer() (int, error) {
  112. return unix.GetsockoptInt(int(u.sysFd), unix.SOL_SOCKET, unix.SO_SNDBUF)
  113. }
  114. func (u *StdConn) GetSoMark() (int, error) {
  115. return unix.GetsockoptInt(int(u.sysFd), unix.SOL_SOCKET, unix.SO_MARK)
  116. }
  117. func (u *StdConn) LocalAddr() (netip.AddrPort, error) {
  118. sa, err := unix.Getsockname(u.sysFd)
  119. if err != nil {
  120. return netip.AddrPort{}, err
  121. }
  122. switch sa := sa.(type) {
  123. case *unix.SockaddrInet4:
  124. return netip.AddrPortFrom(netip.AddrFrom4(sa.Addr), uint16(sa.Port)), nil
  125. case *unix.SockaddrInet6:
  126. return netip.AddrPortFrom(netip.AddrFrom16(sa.Addr), uint16(sa.Port)), nil
  127. default:
  128. return netip.AddrPort{}, fmt.Errorf("unsupported sock type: %T", sa)
  129. }
  130. }
  131. func (u *StdConn) ListenOut(r EncReader) error {
  132. var (
  133. ip netip.Addr
  134. controls [][]byte
  135. )
  136. bufSize := u.readBufferSize()
  137. msgs, buffers, names := u.PrepareRawMessages(u.batch, bufSize)
  138. read := u.ReadMulti
  139. if u.batch == 1 {
  140. read = u.ReadSingle
  141. }
  142. for {
  143. desired := u.readBufferSize()
  144. if len(buffers) == 0 || cap(buffers[0]) < desired {
  145. msgs, buffers, names = u.PrepareRawMessages(u.batch, desired)
  146. controls = nil
  147. }
  148. if u.enableGRO {
  149. if controls == nil {
  150. controls = make([][]byte, len(msgs))
  151. for i := range controls {
  152. controls[i] = make([]byte, unix.CmsgSpace(4))
  153. }
  154. }
  155. for i := range msgs {
  156. setRawMessageControl(&msgs[i], controls[i])
  157. }
  158. } else if controls != nil {
  159. for i := range msgs {
  160. setRawMessageControl(&msgs[i], nil)
  161. }
  162. controls = nil
  163. }
  164. n, err := read(msgs)
  165. if err != nil {
  166. return err
  167. }
  168. for i := 0; i < n; i++ {
  169. // Its ok to skip the ok check here, the slicing is the only error that can occur and it will panic
  170. if u.isV4 {
  171. ip, _ = netip.AddrFromSlice(names[i][4:8])
  172. } else {
  173. ip, _ = netip.AddrFromSlice(names[i][8:24])
  174. }
  175. addr := netip.AddrPortFrom(ip.Unmap(), binary.BigEndian.Uint16(names[i][2:4]))
  176. payload := buffers[i][:msgs[i].Len]
  177. if u.enableGRO && u.l.IsLevelEnabled(logrus.DebugLevel) {
  178. ctrlLen := getRawMessageControlLen(&msgs[i])
  179. msgFlags := getRawMessageFlags(&msgs[i])
  180. u.l.WithFields(logrus.Fields{
  181. "tag": "gro-debug",
  182. "stage": "recv",
  183. "payload_len": len(payload),
  184. "ctrl_len": ctrlLen,
  185. "msg_flags": msgFlags,
  186. }).Debug("gro batch data")
  187. if controls != nil && ctrlLen > 0 {
  188. maxDump := ctrlLen
  189. if maxDump > 16 {
  190. maxDump = 16
  191. }
  192. u.l.WithFields(logrus.Fields{
  193. "tag": "gro-debug",
  194. "stage": "control-bytes",
  195. "control_hex": fmt.Sprintf("%x", controls[i][:maxDump]),
  196. "datalen": ctrlLen,
  197. }).Debug("gro control dump")
  198. }
  199. }
  200. sawControl := false
  201. if controls != nil {
  202. if ctrlLen := getRawMessageControlLen(&msgs[i]); ctrlLen > 0 {
  203. if segSize, segCount := parseGROControl(controls[i][:ctrlLen]); segSize > 0 {
  204. sawControl = true
  205. if u.l.IsLevelEnabled(logrus.DebugLevel) {
  206. u.l.WithFields(logrus.Fields{
  207. "tag": "gro-debug",
  208. "stage": "control",
  209. "seg_size": segSize,
  210. "seg_count": segCount,
  211. "payloadLen": len(payload),
  212. }).Debug("gro control parsed")
  213. }
  214. segSize = normalizeGROSegSize(segSize, segCount, len(payload))
  215. if segSize > 0 && segSize < len(payload) {
  216. if u.emitGROSegments(r, addr, payload, segSize) {
  217. continue
  218. }
  219. }
  220. }
  221. }
  222. }
  223. if u.enableGRO && len(payload) > MTU {
  224. if !sawControl && u.l.IsLevelEnabled(logrus.DebugLevel) {
  225. u.l.WithFields(logrus.Fields{
  226. "tag": "gro-debug",
  227. "stage": "fallback",
  228. "payload_len": len(payload),
  229. }).Debug("gro control missing; splitting payload by MTU")
  230. }
  231. if u.emitGROSegments(r, addr, payload, MTU) {
  232. continue
  233. }
  234. }
  235. r(addr, payload)
  236. }
  237. }
  238. }
  239. func (u *StdConn) readBufferSize() int {
  240. if u.enableGRO && u.groBufSize > MTU {
  241. return u.groBufSize
  242. }
  243. return MTU
  244. }
  245. func (u *StdConn) ReadSingle(msgs []rawMessage) (int, error) {
  246. for {
  247. n, _, err := unix.Syscall6(
  248. unix.SYS_RECVMSG,
  249. uintptr(u.sysFd),
  250. uintptr(unsafe.Pointer(&(msgs[0].Hdr))),
  251. 0,
  252. 0,
  253. 0,
  254. 0,
  255. )
  256. if err != 0 {
  257. if err == unix.EAGAIN || err == unix.EINTR {
  258. continue
  259. }
  260. return 0, &net.OpError{Op: "recvmsg", Err: err}
  261. }
  262. msgs[0].Len = uint32(n)
  263. return 1, nil
  264. }
  265. }
  266. func (u *StdConn) ReadMulti(msgs []rawMessage) (int, error) {
  267. for {
  268. n, _, err := unix.Syscall6(
  269. unix.SYS_RECVMMSG,
  270. uintptr(u.sysFd),
  271. uintptr(unsafe.Pointer(&msgs[0])),
  272. uintptr(len(msgs)),
  273. unix.MSG_WAITFORONE,
  274. 0,
  275. 0,
  276. )
  277. if err != 0 {
  278. if err == unix.EAGAIN || err == unix.EINTR {
  279. continue
  280. }
  281. return 0, &net.OpError{Op: "recvmmsg", Err: err}
  282. }
  283. return int(n), nil
  284. }
  285. }
  286. func (u *StdConn) WriteTo(b []byte, ip netip.AddrPort) error {
  287. if u.enableGSO && ip.IsValid() {
  288. if err := u.queueGSOPacket(b, ip); err == nil {
  289. return nil
  290. } else if !errors.Is(err, errGSOFallback) {
  291. return err
  292. }
  293. }
  294. if u.isV4 {
  295. return u.writeTo4(b, ip)
  296. }
  297. return u.writeTo6(b, ip)
  298. }
  299. func (u *StdConn) writeTo6(b []byte, ip netip.AddrPort) error {
  300. var rsa unix.RawSockaddrInet6
  301. rsa.Family = unix.AF_INET6
  302. rsa.Addr = ip.Addr().As16()
  303. binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&rsa.Port))[:], ip.Port())
  304. for {
  305. _, _, err := unix.Syscall6(
  306. unix.SYS_SENDTO,
  307. uintptr(u.sysFd),
  308. uintptr(unsafe.Pointer(&b[0])),
  309. uintptr(len(b)),
  310. uintptr(0),
  311. uintptr(unsafe.Pointer(&rsa)),
  312. uintptr(unix.SizeofSockaddrInet6),
  313. )
  314. if err != 0 {
  315. return &net.OpError{Op: "sendto", Err: err}
  316. }
  317. return nil
  318. }
  319. }
  320. func (u *StdConn) writeTo4(b []byte, ip netip.AddrPort) error {
  321. if !ip.Addr().Is4() {
  322. return fmt.Errorf("Listener is IPv4, but writing to IPv6 remote")
  323. }
  324. var rsa unix.RawSockaddrInet4
  325. rsa.Family = unix.AF_INET
  326. rsa.Addr = ip.Addr().As4()
  327. binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&rsa.Port))[:], ip.Port())
  328. for {
  329. _, _, err := unix.Syscall6(
  330. unix.SYS_SENDTO,
  331. uintptr(u.sysFd),
  332. uintptr(unsafe.Pointer(&b[0])),
  333. uintptr(len(b)),
  334. uintptr(0),
  335. uintptr(unsafe.Pointer(&rsa)),
  336. uintptr(unix.SizeofSockaddrInet4),
  337. )
  338. if err != 0 {
  339. return &net.OpError{Op: "sendto", Err: err}
  340. }
  341. return nil
  342. }
  343. }
  344. func (u *StdConn) ReloadConfig(c *config.C) {
  345. b := c.GetInt("listen.read_buffer", 0)
  346. if b > 0 {
  347. err := u.SetRecvBuffer(b)
  348. if err == nil {
  349. s, err := u.GetRecvBuffer()
  350. if err == nil {
  351. u.l.WithField("size", s).Info("listen.read_buffer was set")
  352. } else {
  353. u.l.WithError(err).Warn("Failed to get listen.read_buffer")
  354. }
  355. } else {
  356. u.l.WithError(err).Error("Failed to set listen.read_buffer")
  357. }
  358. }
  359. b = c.GetInt("listen.write_buffer", 0)
  360. if b > 0 {
  361. err := u.SetSendBuffer(b)
  362. if err == nil {
  363. s, err := u.GetSendBuffer()
  364. if err == nil {
  365. u.l.WithField("size", s).Info("listen.write_buffer was set")
  366. } else {
  367. u.l.WithError(err).Warn("Failed to get listen.write_buffer")
  368. }
  369. } else {
  370. u.l.WithError(err).Error("Failed to set listen.write_buffer")
  371. }
  372. }
  373. b = c.GetInt("listen.so_mark", 0)
  374. s, err := u.GetSoMark()
  375. if b > 0 || (err == nil && s != 0) {
  376. err := u.SetSoMark(b)
  377. if err == nil {
  378. s, err := u.GetSoMark()
  379. if err == nil {
  380. u.l.WithField("mark", s).Info("listen.so_mark was set")
  381. } else {
  382. u.l.WithError(err).Warn("Failed to get listen.so_mark")
  383. }
  384. } else {
  385. u.l.WithError(err).Error("Failed to set listen.so_mark")
  386. }
  387. }
  388. u.configureGRO(c)
  389. u.configureGSO(c)
  390. }
  391. func (u *StdConn) configureGRO(c *config.C) {
  392. if c == nil {
  393. return
  394. }
  395. enable := c.GetBool("listen.enable_gro", false)
  396. if enable == u.enableGRO {
  397. if enable {
  398. if size := c.GetInt("listen.gro_read_buffer", 0); size > 0 {
  399. u.setGROBufferSize(size)
  400. }
  401. }
  402. return
  403. }
  404. if enable {
  405. if err := unix.SetsockoptInt(u.sysFd, unix.SOL_UDP, unix.UDP_GRO, 1); err != nil {
  406. u.l.WithError(err).Warn("Failed to enable UDP GRO")
  407. return
  408. }
  409. u.enableGRO = true
  410. u.setGROBufferSize(c.GetInt("listen.gro_read_buffer", defaultGROReadBufferSize))
  411. u.l.WithField("buffer_size", u.groBufSize).Info("UDP GRO enabled")
  412. return
  413. }
  414. if err := unix.SetsockoptInt(u.sysFd, unix.SOL_UDP, unix.UDP_GRO, 0); err != nil && err != unix.ENOPROTOOPT {
  415. u.l.WithError(err).Warn("Failed to disable UDP GRO")
  416. }
  417. u.enableGRO = false
  418. u.groBufSize = MTU
  419. }
  420. func (u *StdConn) configureGSO(c *config.C) {
  421. enable := c.GetBool("listen.enable_gso", false)
  422. if !enable {
  423. u.disableGSO()
  424. } else {
  425. u.enableGSO = true
  426. }
  427. segments := c.GetInt("listen.gso_max_segments", defaultGSOMaxSegments)
  428. if segments < 1 {
  429. segments = 1
  430. }
  431. u.gsoMaxSegments = segments
  432. maxBytes := c.GetInt("listen.gso_max_bytes", 0)
  433. if maxBytes <= 0 {
  434. maxBytes = MTU * segments
  435. }
  436. if maxBytes > maxGSOBatchBytes {
  437. u.l.WithField("requested", maxBytes).Warn("listen.gso_max_bytes larger than UDP limit; clamping")
  438. maxBytes = maxGSOBatchBytes
  439. }
  440. u.gsoMaxBytes = maxBytes
  441. timeout := c.GetDuration("listen.gso_flush_timeout", defaultGSOFlushTimeout)
  442. if timeout < 0 {
  443. timeout = 0
  444. }
  445. u.gsoFlushTimeout = timeout
  446. }
  447. func (u *StdConn) setGROBufferSize(size int) {
  448. if size < MTU {
  449. size = defaultGROReadBufferSize
  450. }
  451. if size > maxGSOBatchBytes {
  452. size = maxGSOBatchBytes
  453. }
  454. u.groBufSize = size
  455. }
  456. func (u *StdConn) disableGSO() {
  457. u.gsoMu.Lock()
  458. defer u.gsoMu.Unlock()
  459. u.enableGSO = false
  460. _ = u.flushGSOlocked()
  461. u.gsoBuf = nil
  462. u.gsoSegments = 0
  463. u.gsoSegSize = 0
  464. u.stopGSOTimerLocked()
  465. }
  466. func (u *StdConn) getMemInfo(meminfo *[unix.SK_MEMINFO_VARS]uint32) error {
  467. var vallen uint32 = 4 * unix.SK_MEMINFO_VARS
  468. _, _, err := unix.Syscall6(unix.SYS_GETSOCKOPT, uintptr(u.sysFd), uintptr(unix.SOL_SOCKET), uintptr(unix.SO_MEMINFO), uintptr(unsafe.Pointer(meminfo)), uintptr(unsafe.Pointer(&vallen)), 0)
  469. if err != 0 {
  470. return err
  471. }
  472. return nil
  473. }
  474. func (u *StdConn) queueGSOPacket(b []byte, addr netip.AddrPort) error {
  475. if len(b) == 0 {
  476. return nil
  477. }
  478. u.gsoMu.Lock()
  479. defer u.gsoMu.Unlock()
  480. if !u.enableGSO || !addr.IsValid() || len(b) > u.gsoMaxBytes {
  481. if err := u.flushGSOlocked(); err != nil {
  482. return err
  483. }
  484. return errGSOFallback
  485. }
  486. if u.gsoSegments == 0 {
  487. if cap(u.gsoBuf) < u.gsoMaxBytes {
  488. u.gsoBuf = make([]byte, 0, u.gsoMaxBytes)
  489. }
  490. u.gsoAddr = addr
  491. u.gsoSegSize = len(b)
  492. } else if addr != u.gsoAddr || len(b) != u.gsoSegSize {
  493. if err := u.flushGSOlocked(); err != nil {
  494. return err
  495. }
  496. if cap(u.gsoBuf) < u.gsoMaxBytes {
  497. u.gsoBuf = make([]byte, 0, u.gsoMaxBytes)
  498. }
  499. u.gsoAddr = addr
  500. u.gsoSegSize = len(b)
  501. }
  502. if len(u.gsoBuf)+len(b) > u.gsoMaxBytes {
  503. if err := u.flushGSOlocked(); err != nil {
  504. return err
  505. }
  506. if cap(u.gsoBuf) < u.gsoMaxBytes {
  507. u.gsoBuf = make([]byte, 0, u.gsoMaxBytes)
  508. }
  509. u.gsoAddr = addr
  510. u.gsoSegSize = len(b)
  511. }
  512. u.gsoBuf = append(u.gsoBuf, b...)
  513. u.gsoSegments++
  514. if u.gsoSegments >= u.gsoMaxSegments || u.gsoFlushTimeout <= 0 {
  515. return u.flushGSOlocked()
  516. }
  517. u.scheduleGSOFlushLocked()
  518. return nil
  519. }
  520. func (u *StdConn) flushGSOlocked() error {
  521. if u.gsoSegments == 0 {
  522. u.stopGSOTimerLocked()
  523. return nil
  524. }
  525. payload := append([]byte(nil), u.gsoBuf...)
  526. addr := u.gsoAddr
  527. segSize := u.gsoSegSize
  528. u.gsoBuf = u.gsoBuf[:0]
  529. u.gsoSegments = 0
  530. u.gsoSegSize = 0
  531. u.stopGSOTimerLocked()
  532. if segSize <= 0 {
  533. return errGSOFallback
  534. }
  535. err := u.sendSegmented(payload, addr, segSize)
  536. if errors.Is(err, errGSODisabled) {
  537. u.l.WithField("addr", addr).Warn("UDP GSO disabled by kernel, falling back to sendto")
  538. u.enableGSO = false
  539. return u.sendSegmentsIndividually(payload, addr, segSize)
  540. }
  541. return err
  542. }
  543. func (u *StdConn) sendSegmented(payload []byte, addr netip.AddrPort, segSize int) error {
  544. if len(payload) == 0 {
  545. return nil
  546. }
  547. control := make([]byte, unix.CmsgSpace(2))
  548. hdr := (*unix.Cmsghdr)(unsafe.Pointer(&control[0]))
  549. hdr.Level = unix.SOL_UDP
  550. hdr.Type = unix.UDP_SEGMENT
  551. setCmsgLen(hdr, unix.CmsgLen(2))
  552. binary.NativeEndian.PutUint16(control[unix.CmsgLen(0):unix.CmsgLen(0)+2], uint16(segSize))
  553. var sa unix.Sockaddr
  554. if addr.Addr().Is4() {
  555. var sa4 unix.SockaddrInet4
  556. sa4.Port = int(addr.Port())
  557. sa4.Addr = addr.Addr().As4()
  558. sa = &sa4
  559. } else {
  560. var sa6 unix.SockaddrInet6
  561. sa6.Port = int(addr.Port())
  562. sa6.Addr = addr.Addr().As16()
  563. sa = &sa6
  564. }
  565. if _, err := unix.SendmsgN(u.sysFd, payload, control, sa, 0); err != nil {
  566. if errno, ok := err.(syscall.Errno); ok && (errno == unix.EINVAL || errno == unix.ENOTSUP || errno == unix.EOPNOTSUPP) {
  567. return errGSODisabled
  568. }
  569. return &net.OpError{Op: "sendmsg", Err: err}
  570. }
  571. return nil
  572. }
  573. func (u *StdConn) sendSegmentsIndividually(buf []byte, addr netip.AddrPort, segSize int) error {
  574. if segSize <= 0 {
  575. return errGSOFallback
  576. }
  577. for offset := 0; offset < len(buf); offset += segSize {
  578. end := offset + segSize
  579. if end > len(buf) {
  580. end = len(buf)
  581. }
  582. var err error
  583. if u.isV4 {
  584. err = u.writeTo4(buf[offset:end], addr)
  585. } else {
  586. err = u.writeTo6(buf[offset:end], addr)
  587. }
  588. if err != nil {
  589. return err
  590. }
  591. }
  592. return nil
  593. }
  594. func (u *StdConn) scheduleGSOFlushLocked() {
  595. if u.gsoTimer == nil {
  596. u.gsoTimer = time.AfterFunc(u.gsoFlushTimeout, u.gsoFlushTimer)
  597. return
  598. }
  599. u.gsoTimer.Reset(u.gsoFlushTimeout)
  600. }
  601. func (u *StdConn) stopGSOTimerLocked() {
  602. if u.gsoTimer != nil {
  603. u.gsoTimer.Stop()
  604. u.gsoTimer = nil
  605. }
  606. }
  607. func (u *StdConn) gsoFlushTimer() {
  608. u.gsoMu.Lock()
  609. defer u.gsoMu.Unlock()
  610. _ = u.flushGSOlocked()
  611. }
  612. func parseGROControl(control []byte) (int, int) {
  613. if len(control) == 0 {
  614. return 0, 0
  615. }
  616. cmsgs, err := unix.ParseSocketControlMessage(control)
  617. if err != nil {
  618. return 0, 0
  619. }
  620. for _, c := range cmsgs {
  621. if c.Header.Level == unix.SOL_UDP && c.Header.Type == unix.UDP_GRO && len(c.Data) >= 2 {
  622. segSize := int(binary.NativeEndian.Uint16(c.Data[:2]))
  623. segCount := 0
  624. if len(c.Data) >= 4 {
  625. segCount = int(binary.NativeEndian.Uint16(c.Data[2:4]))
  626. }
  627. return segSize, segCount
  628. }
  629. }
  630. return 0, 0
  631. }
  632. func (u *StdConn) emitGROSegments(r EncReader, addr netip.AddrPort, payload []byte, segSize int) bool {
  633. if segSize <= 0 {
  634. return false
  635. }
  636. for offset := 0; offset < len(payload); offset += segSize {
  637. end := offset + segSize
  638. if end > len(payload) {
  639. end = len(payload)
  640. }
  641. segment := make([]byte, end-offset)
  642. copy(segment, payload[offset:end])
  643. r(addr, segment)
  644. }
  645. return true
  646. }
  647. func normalizeGROSegSize(segSize, segCount, total int) int {
  648. if segSize <= 0 || total <= 0 {
  649. return segSize
  650. }
  651. if segSize > total && segCount > 0 {
  652. segSize = total / segCount
  653. if segSize == 0 {
  654. segSize = total
  655. }
  656. }
  657. if segCount <= 1 && segSize > 0 && total > segSize {
  658. calculated := total / segSize
  659. if calculated <= 1 {
  660. calculated = (total + segSize - 1) / segSize
  661. }
  662. if calculated > 1 {
  663. segCount = calculated
  664. }
  665. }
  666. if segSize > MTU {
  667. return MTU
  668. }
  669. return segSize
  670. }
  671. func (u *StdConn) Close() error {
  672. u.disableGSO()
  673. return syscall.Close(u.sysFd)
  674. }
  675. func NewUDPStatsEmitter(udpConns []Conn) func() {
  676. // Check if our kernel supports SO_MEMINFO before registering the gauges
  677. var udpGauges [][unix.SK_MEMINFO_VARS]metrics.Gauge
  678. var meminfo [unix.SK_MEMINFO_VARS]uint32
  679. if err := udpConns[0].(*StdConn).getMemInfo(&meminfo); err == nil {
  680. udpGauges = make([][unix.SK_MEMINFO_VARS]metrics.Gauge, len(udpConns))
  681. for i := range udpConns {
  682. udpGauges[i] = [unix.SK_MEMINFO_VARS]metrics.Gauge{
  683. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.rmem_alloc", i), nil),
  684. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.rcvbuf", i), nil),
  685. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.wmem_alloc", i), nil),
  686. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.sndbuf", i), nil),
  687. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.fwd_alloc", i), nil),
  688. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.wmem_queued", i), nil),
  689. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.optmem", i), nil),
  690. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.backlog", i), nil),
  691. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.drops", i), nil),
  692. }
  693. }
  694. }
  695. return func() {
  696. for i, gauges := range udpGauges {
  697. if err := udpConns[i].(*StdConn).getMemInfo(&meminfo); err == nil {
  698. for j := 0; j < unix.SK_MEMINFO_VARS; j++ {
  699. gauges[j].Update(int64(meminfo[j]))
  700. }
  701. }
  702. }
  703. }
  704. }