udp_linux.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. // +build !android
  2. package nebula
  3. import (
  4. "encoding/binary"
  5. "encoding/json"
  6. "fmt"
  7. "net"
  8. "strconv"
  9. "strings"
  10. "syscall"
  11. "unsafe"
  12. "github.com/rcrowley/go-metrics"
  13. "golang.org/x/sys/unix"
  14. )
  15. //TODO: make it support reload as best you can!
  16. type udpConn struct {
  17. sysFd int
  18. }
  19. type udpAddr struct {
  20. IP uint32
  21. Port uint16
  22. }
  23. func NewUDPAddr(ip uint32, port uint16) *udpAddr {
  24. return &udpAddr{IP: ip, Port: port}
  25. }
  26. func NewUDPAddrFromString(s string) *udpAddr {
  27. p := strings.Split(s, ":")
  28. if len(p) < 2 {
  29. return nil
  30. }
  31. port, _ := strconv.Atoi(p[1])
  32. return &udpAddr{
  33. IP: ip2int(net.ParseIP(p[0])),
  34. Port: uint16(port),
  35. }
  36. }
  37. type rawSockaddr struct {
  38. Family uint16
  39. Data [14]uint8
  40. }
  41. type rawSockaddrAny struct {
  42. Addr rawSockaddr
  43. Pad [96]int8
  44. }
  45. var x int
  46. // From linux/sock_diag.h
  47. const (
  48. _SK_MEMINFO_RMEM_ALLOC = iota
  49. _SK_MEMINFO_RCVBUF
  50. _SK_MEMINFO_WMEM_ALLOC
  51. _SK_MEMINFO_SNDBUF
  52. _SK_MEMINFO_FWD_ALLOC
  53. _SK_MEMINFO_WMEM_QUEUED
  54. _SK_MEMINFO_OPTMEM
  55. _SK_MEMINFO_BACKLOG
  56. _SK_MEMINFO_DROPS
  57. _SK_MEMINFO_VARS
  58. )
  59. type _SK_MEMINFO [_SK_MEMINFO_VARS]uint32
  60. func NewListener(ip string, port int, multi bool) (*udpConn, error) {
  61. syscall.ForkLock.RLock()
  62. fd, err := unix.Socket(unix.AF_INET, unix.SOCK_DGRAM, unix.IPPROTO_UDP)
  63. if err == nil {
  64. unix.CloseOnExec(fd)
  65. }
  66. syscall.ForkLock.RUnlock()
  67. if err != nil {
  68. unix.Close(fd)
  69. return nil, fmt.Errorf("unable to open socket: %s", err)
  70. }
  71. var lip [4]byte
  72. copy(lip[:], net.ParseIP(ip).To4())
  73. if multi {
  74. if err = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEPORT, 1); err != nil {
  75. return nil, fmt.Errorf("unable to set SO_REUSEPORT: %s", err)
  76. }
  77. }
  78. if err = unix.Bind(fd, &unix.SockaddrInet4{Addr: lip, Port: port}); err != nil {
  79. return nil, fmt.Errorf("unable to bind to socket: %s", err)
  80. }
  81. //TODO: this may be useful for forcing threads into specific cores
  82. //unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_INCOMING_CPU, x)
  83. //v, err := unix.GetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_INCOMING_CPU)
  84. //l.Println(v, err)
  85. return &udpConn{sysFd: fd}, err
  86. }
  87. func (u *udpConn) Rebind() error {
  88. return nil
  89. }
  90. func (ua *udpAddr) Copy() udpAddr {
  91. return *ua
  92. }
  93. func (u *udpConn) SetRecvBuffer(n int) error {
  94. return unix.SetsockoptInt(u.sysFd, unix.SOL_SOCKET, unix.SO_RCVBUFFORCE, n)
  95. }
  96. func (u *udpConn) SetSendBuffer(n int) error {
  97. return unix.SetsockoptInt(u.sysFd, unix.SOL_SOCKET, unix.SO_SNDBUFFORCE, n)
  98. }
  99. func (u *udpConn) GetRecvBuffer() (int, error) {
  100. return unix.GetsockoptInt(int(u.sysFd), unix.SOL_SOCKET, unix.SO_RCVBUF)
  101. }
  102. func (u *udpConn) GetSendBuffer() (int, error) {
  103. return unix.GetsockoptInt(int(u.sysFd), unix.SOL_SOCKET, unix.SO_SNDBUF)
  104. }
  105. func (u *udpConn) LocalAddr() (*udpAddr, error) {
  106. var rsa rawSockaddrAny
  107. var rLen = unix.SizeofSockaddrAny
  108. _, _, err := unix.Syscall(
  109. unix.SYS_GETSOCKNAME,
  110. uintptr(u.sysFd),
  111. uintptr(unsafe.Pointer(&rsa)),
  112. uintptr(unsafe.Pointer(&rLen)),
  113. )
  114. if err != 0 {
  115. return nil, err
  116. }
  117. addr := &udpAddr{}
  118. if rsa.Addr.Family == unix.AF_INET {
  119. addr.Port = uint16(rsa.Addr.Data[0])<<8 + uint16(rsa.Addr.Data[1])
  120. addr.IP = uint32(rsa.Addr.Data[2])<<24 + uint32(rsa.Addr.Data[3])<<16 + uint32(rsa.Addr.Data[4])<<8 + uint32(rsa.Addr.Data[5])
  121. } else {
  122. addr.Port = 0
  123. addr.IP = 0
  124. }
  125. return addr, nil
  126. }
  127. func (u *udpConn) ListenOut(f *Interface, q int) {
  128. plaintext := make([]byte, mtu)
  129. header := &Header{}
  130. fwPacket := &FirewallPacket{}
  131. udpAddr := &udpAddr{}
  132. nb := make([]byte, 12, 12)
  133. lhh := f.lightHouse.NewRequestHandler()
  134. //TODO: should we track this?
  135. //metric := metrics.GetOrRegisterHistogram("test.batch_read", nil, metrics.NewExpDecaySample(1028, 0.015))
  136. msgs, buffers, names := u.PrepareRawMessages(f.udpBatchSize)
  137. read := u.ReadMulti
  138. if f.udpBatchSize == 1 {
  139. read = u.ReadSingle
  140. }
  141. conntrackCache := NewConntrackCacheTicker(f.conntrackCacheTimeout)
  142. for {
  143. n, err := read(msgs)
  144. if err != nil {
  145. l.WithError(err).Error("Failed to read packets")
  146. continue
  147. }
  148. //metric.Update(int64(n))
  149. for i := 0; i < n; i++ {
  150. udpAddr.IP = binary.BigEndian.Uint32(names[i][4:8])
  151. udpAddr.Port = binary.BigEndian.Uint16(names[i][2:4])
  152. f.readOutsidePackets(udpAddr, plaintext[:0], buffers[i][:msgs[i].Len], header, fwPacket, lhh, nb, q, conntrackCache.Get())
  153. }
  154. }
  155. }
  156. func (u *udpConn) ReadSingle(msgs []rawMessage) (int, error) {
  157. for {
  158. n, _, err := unix.Syscall6(
  159. unix.SYS_RECVMSG,
  160. uintptr(u.sysFd),
  161. uintptr(unsafe.Pointer(&(msgs[0].Hdr))),
  162. 0,
  163. 0,
  164. 0,
  165. 0,
  166. )
  167. if err != 0 {
  168. return 0, &net.OpError{Op: "recvmsg", Err: err}
  169. }
  170. msgs[0].Len = uint32(n)
  171. return 1, nil
  172. }
  173. }
  174. func (u *udpConn) ReadMulti(msgs []rawMessage) (int, error) {
  175. for {
  176. n, _, err := unix.Syscall6(
  177. unix.SYS_RECVMMSG,
  178. uintptr(u.sysFd),
  179. uintptr(unsafe.Pointer(&msgs[0])),
  180. uintptr(len(msgs)),
  181. unix.MSG_WAITFORONE,
  182. 0,
  183. 0,
  184. )
  185. if err != 0 {
  186. return 0, &net.OpError{Op: "recvmmsg", Err: err}
  187. }
  188. return int(n), nil
  189. }
  190. }
  191. func (u *udpConn) WriteTo(b []byte, addr *udpAddr) error {
  192. var rsa unix.RawSockaddrInet4
  193. //TODO: sometimes addr is nil!
  194. rsa.Family = unix.AF_INET
  195. p := (*[2]byte)(unsafe.Pointer(&rsa.Port))
  196. p[0] = byte(addr.Port >> 8)
  197. p[1] = byte(addr.Port)
  198. rsa.Addr[0] = byte(addr.IP & 0xff000000 >> 24)
  199. rsa.Addr[1] = byte(addr.IP & 0x00ff0000 >> 16)
  200. rsa.Addr[2] = byte(addr.IP & 0x0000ff00 >> 8)
  201. rsa.Addr[3] = byte(addr.IP & 0x000000ff)
  202. for {
  203. _, _, err := unix.Syscall6(
  204. unix.SYS_SENDTO,
  205. uintptr(u.sysFd),
  206. uintptr(unsafe.Pointer(&b[0])),
  207. uintptr(len(b)),
  208. uintptr(0),
  209. uintptr(unsafe.Pointer(&rsa)),
  210. uintptr(unix.SizeofSockaddrInet4),
  211. )
  212. if err != 0 {
  213. return &net.OpError{Op: "sendto", Err: err}
  214. }
  215. //TODO: handle incomplete writes
  216. return nil
  217. }
  218. }
  219. func (u *udpConn) reloadConfig(c *Config) {
  220. b := c.GetInt("listen.read_buffer", 0)
  221. if b > 0 {
  222. err := u.SetRecvBuffer(b)
  223. if err == nil {
  224. s, err := u.GetRecvBuffer()
  225. if err == nil {
  226. l.WithField("size", s).Info("listen.read_buffer was set")
  227. } else {
  228. l.WithError(err).Warn("Failed to get listen.read_buffer")
  229. }
  230. } else {
  231. l.WithError(err).Error("Failed to set listen.read_buffer")
  232. }
  233. }
  234. b = c.GetInt("listen.write_buffer", 0)
  235. if b > 0 {
  236. err := u.SetSendBuffer(b)
  237. if err == nil {
  238. s, err := u.GetSendBuffer()
  239. if err == nil {
  240. l.WithField("size", s).Info("listen.write_buffer was set")
  241. } else {
  242. l.WithError(err).Warn("Failed to get listen.write_buffer")
  243. }
  244. } else {
  245. l.WithError(err).Error("Failed to set listen.write_buffer")
  246. }
  247. }
  248. }
  249. func (u *udpConn) getMemInfo(meminfo *_SK_MEMINFO) error {
  250. var vallen uint32 = 4 * _SK_MEMINFO_VARS
  251. _, _, err := unix.Syscall6(unix.SYS_GETSOCKOPT, uintptr(u.sysFd), uintptr(unix.SOL_SOCKET), uintptr(unix.SO_MEMINFO), uintptr(unsafe.Pointer(meminfo)), uintptr(unsafe.Pointer(&vallen)), 0)
  252. if err != 0 {
  253. return err
  254. }
  255. return nil
  256. }
  257. func NewUDPStatsEmitter(udpConns []*udpConn) func() {
  258. // Check if our kernel supports SO_MEMINFO before registering the gauges
  259. var udpGauges [][_SK_MEMINFO_VARS]metrics.Gauge
  260. var meminfo _SK_MEMINFO
  261. if err := udpConns[0].getMemInfo(&meminfo); err == nil {
  262. udpGauges = make([][_SK_MEMINFO_VARS]metrics.Gauge, len(udpConns))
  263. for i := range udpConns {
  264. udpGauges[i] = [_SK_MEMINFO_VARS]metrics.Gauge{
  265. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.rmem_alloc", i), nil),
  266. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.rcvbuf", i), nil),
  267. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.wmem_alloc", i), nil),
  268. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.sndbuf", i), nil),
  269. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.fwd_alloc", i), nil),
  270. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.wmem_queued", i), nil),
  271. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.optmem", i), nil),
  272. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.backlog", i), nil),
  273. metrics.GetOrRegisterGauge(fmt.Sprintf("udp.%d.drops", i), nil),
  274. }
  275. }
  276. }
  277. return func() {
  278. for i, gauges := range udpGauges {
  279. if err := udpConns[i].getMemInfo(&meminfo); err == nil {
  280. for j := 0; j < _SK_MEMINFO_VARS; j++ {
  281. gauges[j].Update(int64(meminfo[j]))
  282. }
  283. }
  284. }
  285. }
  286. }
  287. func (ua *udpAddr) Equals(t *udpAddr) bool {
  288. if t == nil || ua == nil {
  289. return t == nil && ua == nil
  290. }
  291. return ua.IP == t.IP && ua.Port == t.Port
  292. }
  293. func (ua *udpAddr) String() string {
  294. return fmt.Sprintf("%s:%v", int2ip(ua.IP), ua.Port)
  295. }
  296. func (ua *udpAddr) MarshalJSON() ([]byte, error) {
  297. return json.Marshal(m{"ip": int2ip(ua.IP), "port": ua.Port})
  298. }
  299. func udp2ip(addr *udpAddr) net.IP {
  300. return int2ip(addr.IP)
  301. }
  302. func udp2ipInt(addr *udpAddr) uint32 {
  303. return addr.IP
  304. }
  305. func hostDidRoam(addr *udpAddr, newaddr *udpAddr) bool {
  306. return !addr.Equals(newaddr)
  307. }