udp_darwin.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. //go:build !e2e_testing
  2. // +build !e2e_testing
  3. package udp
  4. // Darwin support is primarily implemented in udp_generic, besides NewListenConfig
  5. import (
  6. "context"
  7. "encoding/binary"
  8. "errors"
  9. "fmt"
  10. "net"
  11. "net/netip"
  12. "syscall"
  13. "unsafe"
  14. "github.com/sirupsen/logrus"
  15. "github.com/slackhq/nebula/config"
  16. "golang.org/x/sys/unix"
  17. )
  18. type StdConn struct {
  19. *net.UDPConn
  20. isV4 bool
  21. sysFd uintptr
  22. l *logrus.Logger
  23. }
  24. var _ Conn = &StdConn{}
  25. func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch int) (Conn, error) {
  26. lc := NewListenConfig(multi)
  27. pc, err := lc.ListenPacket(context.TODO(), "udp", net.JoinHostPort(ip.String(), fmt.Sprintf("%v", port)))
  28. if err != nil {
  29. return nil, err
  30. }
  31. if uc, ok := pc.(*net.UDPConn); ok {
  32. c := &StdConn{UDPConn: uc, l: l}
  33. rc, err := uc.SyscallConn()
  34. if err != nil {
  35. return nil, fmt.Errorf("failed to open udp socket: %w", err)
  36. }
  37. err = rc.Control(func(fd uintptr) {
  38. c.sysFd = fd
  39. })
  40. if err != nil {
  41. return nil, fmt.Errorf("failed to get udp fd: %w", err)
  42. }
  43. la, err := c.LocalAddr()
  44. if err != nil {
  45. return nil, err
  46. }
  47. c.isV4 = la.Addr().Is4()
  48. return c, nil
  49. }
  50. return nil, fmt.Errorf("unexpected PacketConn: %T %#v", pc, pc)
  51. }
  52. func NewListenConfig(multi bool) net.ListenConfig {
  53. return net.ListenConfig{
  54. Control: func(network, address string, c syscall.RawConn) error {
  55. if multi {
  56. var controlErr error
  57. err := c.Control(func(fd uintptr) {
  58. if err := syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEPORT, 1); err != nil {
  59. controlErr = fmt.Errorf("SO_REUSEPORT failed: %v", err)
  60. return
  61. }
  62. })
  63. if err != nil {
  64. return err
  65. }
  66. if controlErr != nil {
  67. return controlErr
  68. }
  69. }
  70. return nil
  71. },
  72. }
  73. }
  74. //go:linkname sendto golang.org/x/sys/unix.sendto
  75. //go:noescape
  76. func sendto(s int, buf []byte, flags int, to unsafe.Pointer, addrlen int32) (err error)
  77. func (u *StdConn) WriteTo(b []byte, ap netip.AddrPort) error {
  78. var sa unsafe.Pointer
  79. var addrLen int32
  80. if u.isV4 {
  81. if ap.Addr().Is6() {
  82. return ErrInvalidIPv6RemoteForSocket
  83. }
  84. var rsa unix.RawSockaddrInet6
  85. rsa.Family = unix.AF_INET6
  86. rsa.Addr = ap.Addr().As16()
  87. binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&rsa.Port))[:], ap.Port())
  88. sa = unsafe.Pointer(&rsa)
  89. addrLen = syscall.SizeofSockaddrInet4
  90. } else {
  91. var rsa unix.RawSockaddrInet6
  92. rsa.Family = unix.AF_INET6
  93. rsa.Addr = ap.Addr().As16()
  94. binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&rsa.Port))[:], ap.Port())
  95. sa = unsafe.Pointer(&rsa)
  96. addrLen = syscall.SizeofSockaddrInet6
  97. }
  98. // Golang stdlib doesn't handle EAGAIN correctly in some situations so we do writes ourselves
  99. // See https://github.com/golang/go/issues/73919
  100. for {
  101. //_, _, err := unix.Syscall6(unix.SYS_SENDTO, u.sysFd, uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)), 0, sa, addrLen)
  102. err := sendto(int(u.sysFd), b, 0, sa, addrLen)
  103. if err == nil {
  104. // Written, get out before the error handling
  105. return nil
  106. }
  107. if errors.Is(err, syscall.EINTR) {
  108. // Write was interrupted, retry
  109. continue
  110. }
  111. if errors.Is(err, syscall.EAGAIN) {
  112. return &net.OpError{Op: "sendto", Err: unix.EWOULDBLOCK}
  113. }
  114. if errors.Is(err, syscall.EBADF) {
  115. return net.ErrClosed
  116. }
  117. return &net.OpError{Op: "sendto", Err: err}
  118. }
  119. }
  120. func (u *StdConn) LocalAddr() (netip.AddrPort, error) {
  121. a := u.UDPConn.LocalAddr()
  122. switch v := a.(type) {
  123. case *net.UDPAddr:
  124. addr, ok := netip.AddrFromSlice(v.IP)
  125. if !ok {
  126. return netip.AddrPort{}, fmt.Errorf("LocalAddr returned invalid IP address: %s", v.IP)
  127. }
  128. return netip.AddrPortFrom(addr, uint16(v.Port)), nil
  129. default:
  130. return netip.AddrPort{}, fmt.Errorf("LocalAddr returned: %#v", a)
  131. }
  132. }
  133. func (u *StdConn) ReloadConfig(c *config.C) {
  134. // TODO
  135. }
  136. func NewUDPStatsEmitter(udpConns []Conn) func() {
  137. // No UDP stats for non-linux
  138. return func() {}
  139. }
  140. func (u *StdConn) ListenOut(r EncReader) {
  141. buffer := make([]byte, MTU)
  142. for {
  143. // Just read one packet at a time
  144. n, rua, err := u.ReadFromUDPAddrPort(buffer)
  145. if err != nil {
  146. if errors.Is(err, net.ErrClosed) {
  147. u.l.WithError(err).Debug("udp socket is closed, exiting read loop")
  148. return
  149. }
  150. u.l.WithError(err).Error("unexpected udp socket receive error")
  151. }
  152. r(netip.AddrPortFrom(rua.Addr().Unmap(), rua.Port()), buffer[:n])
  153. }
  154. }
  155. func (u *StdConn) Rebind() error {
  156. var err error
  157. if u.isV4 {
  158. err = syscall.SetsockoptInt(int(u.sysFd), syscall.IPPROTO_IP, syscall.IP_BOUND_IF, 0)
  159. } else {
  160. err = syscall.SetsockoptInt(int(u.sysFd), syscall.IPPROTO_IPV6, syscall.IPV6_BOUND_IF, 0)
  161. }
  162. if err != nil {
  163. u.l.WithError(err).Error("Failed to rebind udp socket")
  164. }
  165. return nil
  166. }