Phy.hpp 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  4. *
  5. * (c) ZeroTier, Inc.
  6. * https://www.zerotier.com/
  7. */
  8. #ifndef ZT_PHY_HPP
  9. #define ZT_PHY_HPP
  10. #include <list>
  11. #include <stdexcept>
  12. #include <stdio.h>
  13. #include <stdlib.h>
  14. #include <string.h>
  15. #if defined(_WIN32) || defined(_WIN64)
  16. #include <windows.h>
  17. #include <winsock2.h>
  18. #include <ws2tcpip.h>
  19. #define ZT_PHY_SOCKFD_TYPE SOCKET
  20. #define ZT_PHY_SOCKFD_NULL (INVALID_SOCKET)
  21. #define ZT_PHY_SOCKFD_VALID(s) ((s) != INVALID_SOCKET)
  22. #define ZT_PHY_CLOSE_SOCKET(s) ::closesocket(s)
  23. #define ZT_PHY_MAX_SOCKETS (FD_SETSIZE)
  24. #define ZT_PHY_MAX_INTERCEPTS ZT_PHY_MAX_SOCKETS
  25. #define ZT_PHY_SOCKADDR_STORAGE_TYPE struct sockaddr_storage
  26. #else // not Windows
  27. #include "../node/Metrics.hpp"
  28. #include <arpa/inet.h>
  29. #include <errno.h>
  30. #include <fcntl.h>
  31. #include <netinet/in.h>
  32. #include <netinet/tcp.h>
  33. #include <signal.h>
  34. #include <sys/select.h>
  35. #include <sys/socket.h>
  36. #include <sys/time.h>
  37. #include <sys/types.h>
  38. #include <sys/un.h>
  39. #include <unistd.h>
  40. #if defined(__linux__) || defined(linux) || defined(__LINUX__) || defined(__linux)
  41. #ifndef IPV6_DONTFRAG
  42. #define IPV6_DONTFRAG 62
  43. #endif
  44. #endif
  45. #define ZT_PHY_SOCKFD_TYPE int
  46. #define ZT_PHY_SOCKFD_NULL (-1)
  47. #define ZT_PHY_SOCKFD_VALID(s) ((s) > -1)
  48. #define ZT_PHY_CLOSE_SOCKET(s) ::close(s)
  49. #define ZT_PHY_MAX_SOCKETS (FD_SETSIZE)
  50. #define ZT_PHY_MAX_INTERCEPTS ZT_PHY_MAX_SOCKETS
  51. #define ZT_PHY_SOCKADDR_STORAGE_TYPE struct sockaddr_storage
  52. #endif // Windows or not
  53. namespace ZeroTier {
  54. /**
  55. * Opaque socket type
  56. */
  57. typedef void PhySocket;
  58. /**
  59. * Simple templated non-blocking sockets implementation
  60. *
  61. * Yes there is boost::asio and libuv, but I like small binaries and I hate
  62. * build dependencies. Both drag in a whole bunch of pasta with them.
  63. *
  64. * This class is templated on a pointer to a handler class which must
  65. * implement the following functions:
  66. *
  67. * For all platforms:
  68. *
  69. * phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len)
  70. * phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)
  71. * phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from)
  72. * phyOnTcpClose(PhySocket *sock,void **uptr)
  73. * phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len)
  74. * phyOnTcpWritable(PhySocket *sock,void **uptr)
  75. * phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable)
  76. *
  77. * On Linux/OSX/Unix only (not required/used on Windows or elsewhere):
  78. *
  79. * phyOnUnixAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN)
  80. * phyOnUnixClose(PhySocket *sock,void **uptr)
  81. * phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len)
  82. * phyOnUnixWritable(PhySocket *sock,void **uptr)
  83. *
  84. * These templates typically refer to function objects. Templates are used to
  85. * avoid the call overhead of indirection, which is surprisingly high for high
  86. * bandwidth applications pushing a lot of packets.
  87. *
  88. * The 'sock' pointer above is an opaque pointer to a socket. Each socket
  89. * has a 'uptr' user-settable/modifiable pointer associated with it, which
  90. * can be set on bind/connect calls and is passed as a void ** to permit
  91. * resetting at any time. The ACCEPT handler takes two sets of sock and
  92. * uptr: sockL and uptrL for the listen socket, and sockN and uptrN for
  93. * the new TCP connection socket that has just been created.
  94. *
  95. * Handlers are always called. On outgoing TCP connection, CONNECT is always
  96. * called on either success or failure followed by DATA and/or WRITABLE as
  97. * indicated. On socket close, handlers are called unless close() is told
  98. * explicitly not to call handlers. It is safe to close a socket within a
  99. * handler, and in that case close() can be told not to call handlers to
  100. * prevent recursion.
  101. *
  102. * This isn't thread-safe with the exception of whack(), which is safe to
  103. * call from another thread to abort poll().
  104. */
  105. template <typename HANDLER_PTR_TYPE> class Phy {
  106. private:
  107. HANDLER_PTR_TYPE _handler;
  108. enum PhySocketType {
  109. ZT_PHY_SOCKET_CLOSED = 0x00, // socket is closed, will be removed on next poll()
  110. ZT_PHY_SOCKET_TCP_OUT_PENDING = 0x01,
  111. ZT_PHY_SOCKET_TCP_OUT_CONNECTED = 0x02,
  112. ZT_PHY_SOCKET_TCP_IN = 0x03,
  113. ZT_PHY_SOCKET_TCP_LISTEN = 0x04,
  114. ZT_PHY_SOCKET_UDP = 0x05,
  115. ZT_PHY_SOCKET_FD = 0x06,
  116. ZT_PHY_SOCKET_UNIX_IN = 0x07,
  117. ZT_PHY_SOCKET_UNIX_LISTEN = 0x08
  118. };
  119. struct PhySocketImpl {
  120. PhySocketImpl()
  121. {
  122. }
  123. PhySocketType type;
  124. ZT_PHY_SOCKFD_TYPE sock;
  125. void* uptr; // user-settable pointer
  126. uint16_t localPort;
  127. ZT_PHY_SOCKADDR_STORAGE_TYPE saddr; // remote for TCP_OUT and TCP_IN, local for TCP_LISTEN, RAW, and UDP
  128. };
  129. std::list<PhySocketImpl> _socks;
  130. fd_set _readfds;
  131. fd_set _writefds;
  132. #if defined(_WIN32) || defined(_WIN64)
  133. fd_set _exceptfds;
  134. #endif
  135. long _nfds;
  136. ZT_PHY_SOCKFD_TYPE _whackReceiveSocket;
  137. ZT_PHY_SOCKFD_TYPE _whackSendSocket;
  138. bool _noDelay;
  139. bool _noCheck;
  140. public:
  141. /**
  142. * @param handler Pointer of type HANDLER_PTR_TYPE to handler
  143. * @param noDelay If true, disable TCP NAGLE algorithm on TCP sockets
  144. * @param noCheck If true, attempt to set UDP SO_NO_CHECK option to disable sending checksums
  145. */
  146. Phy(HANDLER_PTR_TYPE handler, bool noDelay, bool noCheck) : _handler(handler)
  147. {
  148. FD_ZERO(&_readfds);
  149. FD_ZERO(&_writefds);
  150. #if defined(_WIN32) || defined(_WIN64)
  151. FD_ZERO(&_exceptfds);
  152. SOCKET pipes[2];
  153. { // hack copied from StackOverflow, behaves a bit like pipe() on *nix systems
  154. struct sockaddr_in inaddr;
  155. struct sockaddr addr;
  156. SOCKET lst = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  157. if (lst == INVALID_SOCKET)
  158. throw std::runtime_error("unable to create pipes for select() abort");
  159. memset(&inaddr, 0, sizeof(inaddr));
  160. memset(&addr, 0, sizeof(addr));
  161. inaddr.sin_family = AF_INET;
  162. inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
  163. inaddr.sin_port = 0;
  164. int yes = 1;
  165. setsockopt(lst, SOL_SOCKET, SO_REUSEADDR, (char*)&yes, sizeof(yes));
  166. bind(lst, (struct sockaddr*)&inaddr, sizeof(inaddr));
  167. listen(lst, 1);
  168. int len = sizeof(inaddr);
  169. getsockname(lst, &addr, &len);
  170. pipes[0] = ::socket(AF_INET, SOCK_STREAM, 0);
  171. if (pipes[0] == INVALID_SOCKET)
  172. throw std::runtime_error("unable to create pipes for select() abort");
  173. connect(pipes[0], &addr, len);
  174. pipes[1] = accept(lst, 0, 0);
  175. closesocket(lst);
  176. }
  177. #else // not Windows
  178. int pipes[2];
  179. if (::pipe(pipes))
  180. throw std::runtime_error("unable to create pipes for select() abort");
  181. #endif // Windows or not
  182. _nfds = (pipes[0] > pipes[1]) ? (long)pipes[0] : (long)pipes[1];
  183. _whackReceiveSocket = pipes[0];
  184. _whackSendSocket = pipes[1];
  185. _noDelay = noDelay;
  186. _noCheck = noCheck;
  187. }
  188. ~Phy()
  189. {
  190. for (typename std::list<PhySocketImpl>::const_iterator s(_socks.begin()); s != _socks.end(); ++s) {
  191. if (s->type != ZT_PHY_SOCKET_CLOSED)
  192. this->close((PhySocket*)&(*s), true);
  193. }
  194. ZT_PHY_CLOSE_SOCKET(_whackReceiveSocket);
  195. ZT_PHY_CLOSE_SOCKET(_whackSendSocket);
  196. }
  197. /**
  198. * @param s Socket object
  199. * @return Underlying OS-type (usually int or long) file descriptor associated with object
  200. */
  201. static inline ZT_PHY_SOCKFD_TYPE getDescriptor(PhySocket* s) throw()
  202. {
  203. return reinterpret_cast<PhySocketImpl*>(s)->sock;
  204. }
  205. /**
  206. * @param s Socket object
  207. * @return Pointer to user object
  208. */
  209. static inline void** getuptr(PhySocket* s) throw()
  210. {
  211. return &(reinterpret_cast<PhySocketImpl*>(s)->uptr);
  212. }
  213. /**
  214. * Return the local port corresponding to this PhySocket
  215. *
  216. * @param s Socket object
  217. *
  218. * @return Local port corresponding to this PhySocket
  219. */
  220. static inline uint16_t getLocalPort(PhySocket* s) throw()
  221. {
  222. return reinterpret_cast<PhySocketImpl*>(s)->localPort;
  223. }
  224. /**
  225. * Cause poll() to stop waiting immediately
  226. *
  227. * This can be used to reset the polling loop after changes that require
  228. * attention, or to shut down a background thread that is waiting, etc.
  229. */
  230. inline void whack()
  231. {
  232. #if defined(_WIN32) || defined(_WIN64)
  233. ::send(_whackSendSocket, (const char*)this, 1, 0);
  234. #else
  235. (void)(::write(_whackSendSocket, (PhySocket*)this, 1));
  236. #endif
  237. }
  238. /**
  239. * @return Number of open sockets
  240. */
  241. inline unsigned long count() const throw()
  242. {
  243. return _socks.size();
  244. }
  245. /**
  246. * @return Maximum number of sockets allowed
  247. */
  248. inline unsigned long maxCount() const throw()
  249. {
  250. return ZT_PHY_MAX_SOCKETS;
  251. }
  252. /**
  253. * Wrap a raw file descriptor in a PhySocket structure
  254. *
  255. * This can be used to select/poll on a raw file descriptor as part of this
  256. * class's I/O loop. By default the fd is set for read notification but
  257. * this can be controlled with setNotifyReadable(). When any detected
  258. * condition is present, the phyOnFileDescriptorActivity() callback is
  259. * called with one or both of its arguments 'true'.
  260. *
  261. * The Phy<>::close() method *must* be called when you're done with this
  262. * file descriptor to remove it from the select/poll set, but unlike other
  263. * types of sockets Phy<> does not actually close the underlying fd or
  264. * otherwise manage its life cycle. There is also no close notification
  265. * callback for this fd, since Phy<> doesn't actually perform reading or
  266. * writing or detect error conditions. This is only useful for adding a
  267. * file descriptor to Phy<> to select/poll on it.
  268. *
  269. * @param fd Raw file descriptor
  270. * @param uptr User pointer to supply to callbacks
  271. * @return PhySocket wrapping fd or NULL on failure (out of memory or too many sockets)
  272. */
  273. inline PhySocket* wrapSocket(ZT_PHY_SOCKFD_TYPE fd, void* uptr = (void*)0)
  274. {
  275. if (_socks.size() >= ZT_PHY_MAX_SOCKETS)
  276. return (PhySocket*)0;
  277. try {
  278. _socks.push_back(PhySocketImpl());
  279. }
  280. catch (...) {
  281. return (PhySocket*)0;
  282. }
  283. PhySocketImpl& sws = _socks.back();
  284. if ((long)fd > _nfds)
  285. _nfds = (long)fd;
  286. FD_SET(fd, &_readfds);
  287. sws.type = ZT_PHY_SOCKET_UNIX_IN; /* TODO: Type was changed to allow for CBs with new RPC model */
  288. sws.sock = fd;
  289. sws.uptr = uptr;
  290. memset(&(sws.saddr), 0, sizeof(struct sockaddr_storage));
  291. // no sockaddr for this socket type, leave saddr null
  292. return (PhySocket*)&sws;
  293. }
  294. /**
  295. * Bind a UDP socket
  296. *
  297. * @param localAddress Local endpoint address and port
  298. * @param uptr Initial value of user pointer associated with this socket (default: NULL)
  299. * @param bufferSize Desired socket receive/send buffer size -- will set as close to this as possible (default: 0, leave alone)
  300. * @return Socket or NULL on failure to bind
  301. */
  302. inline PhySocket* udpBind(const struct sockaddr* localAddress, void* uptr = (void*)0, int bufferSize = 0)
  303. {
  304. if (_socks.size() >= ZT_PHY_MAX_SOCKETS)
  305. return (PhySocket*)0;
  306. ZT_PHY_SOCKFD_TYPE s = ::socket(localAddress->sa_family, SOCK_DGRAM, 0);
  307. if (! ZT_PHY_SOCKFD_VALID(s))
  308. return (PhySocket*)0;
  309. if (bufferSize > 0) {
  310. int bs = bufferSize;
  311. while (bs >= 65536) {
  312. int tmpbs = bs;
  313. if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const char*)&tmpbs, sizeof(tmpbs)) == 0)
  314. break;
  315. bs -= 4096;
  316. }
  317. bs = bufferSize;
  318. while (bs >= 65536) {
  319. int tmpbs = bs;
  320. if (setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char*)&tmpbs, sizeof(tmpbs)) == 0)
  321. break;
  322. bs -= 4096;
  323. }
  324. }
  325. #if defined(_WIN32) || defined(_WIN64)
  326. {
  327. BOOL f;
  328. if (localAddress->sa_family == AF_INET6) {
  329. f = TRUE;
  330. setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&f, sizeof(f));
  331. f = FALSE;
  332. setsockopt(s, IPPROTO_IPV6, IPV6_DONTFRAG, (const char*)&f, sizeof(f));
  333. }
  334. f = FALSE;
  335. setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char*)&f, sizeof(f));
  336. f = TRUE;
  337. setsockopt(s, SOL_SOCKET, SO_BROADCAST, (const char*)&f, sizeof(f));
  338. }
  339. #else // not Windows
  340. {
  341. int f;
  342. if (localAddress->sa_family == AF_INET6) {
  343. f = 1;
  344. setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (void*)&f, sizeof(f));
  345. #ifdef IPV6_MTU_DISCOVER
  346. f = 0;
  347. setsockopt(s, IPPROTO_IPV6, IPV6_MTU_DISCOVER, &f, sizeof(f));
  348. #endif
  349. #ifdef IPV6_DONTFRAG
  350. f = 0;
  351. setsockopt(s, IPPROTO_IPV6, IPV6_DONTFRAG, &f, sizeof(f));
  352. #endif
  353. }
  354. f = 0;
  355. setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (void*)&f, sizeof(f));
  356. f = 1;
  357. setsockopt(s, SOL_SOCKET, SO_BROADCAST, (void*)&f, sizeof(f));
  358. #ifdef IP_DONTFRAG
  359. f = 0;
  360. setsockopt(s, IPPROTO_IP, IP_DONTFRAG, &f, sizeof(f));
  361. #endif
  362. #ifdef IP_MTU_DISCOVER
  363. f = 0;
  364. setsockopt(s, IPPROTO_IP, IP_MTU_DISCOVER, &f, sizeof(f));
  365. #endif
  366. #ifdef SO_NO_CHECK
  367. // For now at least we only set SO_NO_CHECK on IPv4 sockets since some
  368. // IPv6 stacks incorrectly discard zero checksum packets. May remove
  369. // this restriction later once broken stuff dies more.
  370. if ((localAddress->sa_family == AF_INET) && (_noCheck)) {
  371. f = 1;
  372. setsockopt(s, SOL_SOCKET, SO_NO_CHECK, (void*)&f, sizeof(f));
  373. }
  374. #endif
  375. }
  376. #endif // Windows or not
  377. if (::bind(s, localAddress, (localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))) {
  378. ZT_PHY_CLOSE_SOCKET(s);
  379. return (PhySocket*)0;
  380. }
  381. #if defined(_WIN32) || defined(_WIN64)
  382. {
  383. u_long iMode = 1;
  384. ioctlsocket(s, FIONBIO, &iMode);
  385. }
  386. #else
  387. fcntl(s, F_SETFL, O_NONBLOCK);
  388. #endif
  389. try {
  390. _socks.push_back(PhySocketImpl());
  391. }
  392. catch (...) {
  393. ZT_PHY_CLOSE_SOCKET(s);
  394. return (PhySocket*)0;
  395. }
  396. PhySocketImpl& sws = _socks.back();
  397. if ((long)s > _nfds)
  398. _nfds = (long)s;
  399. FD_SET(s, &_readfds);
  400. sws.type = ZT_PHY_SOCKET_UDP;
  401. sws.sock = s;
  402. sws.uptr = uptr;
  403. #ifdef __UNIX_LIKE__
  404. struct sockaddr_in* sin = (struct sockaddr_in*)localAddress;
  405. sws.localPort = htons(sin->sin_port);
  406. #endif
  407. memset(&(sws.saddr), 0, sizeof(struct sockaddr_storage));
  408. memcpy(&(sws.saddr), localAddress, (localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
  409. return (PhySocket*)&sws;
  410. }
  411. /**
  412. * Set the IP TTL for the next outgoing packet (for IPv4 UDP sockets only)
  413. *
  414. * @param ttl New TTL (0 or >255 will set it to 255)
  415. * @return True on success
  416. */
  417. inline bool setIp4UdpTtl(PhySocket* sock, unsigned int ttl)
  418. {
  419. PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(sock));
  420. #if defined(_WIN32) || defined(_WIN64)
  421. DWORD tmp = ((ttl == 0) || (ttl > 255)) ? 255 : (DWORD)ttl;
  422. return (::setsockopt(sws.sock, IPPROTO_IP, IP_TTL, (const char*)&tmp, sizeof(tmp)) == 0);
  423. #else
  424. int tmp = ((ttl == 0) || (ttl > 255)) ? 255 : (int)ttl;
  425. return (::setsockopt(sws.sock, IPPROTO_IP, IP_TTL, (void*)&tmp, sizeof(tmp)) == 0);
  426. #endif
  427. }
  428. /**
  429. * Send a UDP packet
  430. *
  431. * @param sock UDP socket
  432. * @param remoteAddress Destination address (must be correct type for socket)
  433. * @param data Data to send
  434. * @param len Length of packet
  435. * @return True if packet appears to have been sent successfully
  436. */
  437. inline bool udpSend(PhySocket* sock, const struct sockaddr* remoteAddress, const void* data, unsigned long len)
  438. {
  439. PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(sock));
  440. bool sent = false;
  441. #if defined(_WIN32) || defined(_WIN64)
  442. sent = ((long)::sendto(sws.sock, reinterpret_cast<const char*>(data), len, 0, remoteAddress, (remoteAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)) == (long)len);
  443. #else
  444. sent = ((long)::sendto(sws.sock, data, len, 0, remoteAddress, (remoteAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)) == (long)len);
  445. #endif
  446. if (sent) {
  447. Metrics::udp_send += len;
  448. }
  449. return sent;
  450. }
  451. #ifdef __UNIX_LIKE__
  452. /**
  453. * Listen for connections on a Unix domain socket
  454. *
  455. * @param path Path to Unix domain socket
  456. * @param uptr Arbitrary pointer to associate
  457. * @return PhySocket or NULL if cannot bind
  458. */
  459. inline PhySocket* unixListen(const char* path, void* uptr = (void*)0)
  460. {
  461. struct sockaddr_un sun;
  462. if (_socks.size() >= ZT_PHY_MAX_SOCKETS)
  463. return (PhySocket*)0;
  464. memset(&sun, 0, sizeof(sun));
  465. sun.sun_family = AF_UNIX;
  466. if (strlen(path) >= sizeof(sun.sun_path))
  467. return (PhySocket*)0;
  468. strcpy(sun.sun_path, path);
  469. ZT_PHY_SOCKFD_TYPE s = ::socket(PF_UNIX, SOCK_STREAM, 0);
  470. if (! ZT_PHY_SOCKFD_VALID(s))
  471. return (PhySocket*)0;
  472. ::fcntl(s, F_SETFL, O_NONBLOCK);
  473. ::unlink(path);
  474. if (::bind(s, (struct sockaddr*)&sun, sizeof(struct sockaddr_un)) != 0) {
  475. ZT_PHY_CLOSE_SOCKET(s);
  476. return (PhySocket*)0;
  477. }
  478. if (::listen(s, 128) != 0) {
  479. ZT_PHY_CLOSE_SOCKET(s);
  480. return (PhySocket*)0;
  481. }
  482. try {
  483. _socks.push_back(PhySocketImpl());
  484. }
  485. catch (...) {
  486. ZT_PHY_CLOSE_SOCKET(s);
  487. return (PhySocket*)0;
  488. }
  489. PhySocketImpl& sws = _socks.back();
  490. if ((long)s > _nfds)
  491. _nfds = (long)s;
  492. FD_SET(s, &_readfds);
  493. sws.type = ZT_PHY_SOCKET_UNIX_LISTEN;
  494. sws.sock = s;
  495. sws.uptr = uptr;
  496. memset(&(sws.saddr), 0, sizeof(struct sockaddr_storage));
  497. memcpy(&(sws.saddr), &sun, sizeof(struct sockaddr_un));
  498. return (PhySocket*)&sws;
  499. }
  500. #endif // __UNIX_LIKE__
  501. /**
  502. * Bind a local listen socket to listen for new TCP connections
  503. *
  504. * @param localAddress Local address and port
  505. * @param uptr Initial value of uptr for new socket (default: NULL)
  506. * @return Socket or NULL on failure to bind
  507. */
  508. inline PhySocket* tcpListen(const struct sockaddr* localAddress, void* uptr = (void*)0)
  509. {
  510. if (_socks.size() >= ZT_PHY_MAX_SOCKETS)
  511. return (PhySocket*)0;
  512. ZT_PHY_SOCKFD_TYPE s = ::socket(localAddress->sa_family, SOCK_STREAM, 0);
  513. if (! ZT_PHY_SOCKFD_VALID(s))
  514. return (PhySocket*)0;
  515. #if defined(_WIN32) || defined(_WIN64)
  516. {
  517. BOOL f;
  518. f = TRUE;
  519. ::setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&f, sizeof(f));
  520. f = TRUE;
  521. ::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char*)&f, sizeof(f));
  522. f = (_noDelay ? TRUE : FALSE);
  523. setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f));
  524. u_long iMode = 1;
  525. ioctlsocket(s, FIONBIO, &iMode);
  526. }
  527. #else
  528. {
  529. int f;
  530. f = 1;
  531. ::setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (void*)&f, sizeof(f));
  532. f = 1;
  533. ::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (void*)&f, sizeof(f));
  534. f = (_noDelay ? 1 : 0);
  535. setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f));
  536. fcntl(s, F_SETFL, O_NONBLOCK);
  537. }
  538. #endif
  539. if (::bind(s, localAddress, (localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))) {
  540. ZT_PHY_CLOSE_SOCKET(s);
  541. return (PhySocket*)0;
  542. }
  543. if (::listen(s, 1024)) {
  544. ZT_PHY_CLOSE_SOCKET(s);
  545. return (PhySocket*)0;
  546. }
  547. try {
  548. _socks.push_back(PhySocketImpl());
  549. }
  550. catch (...) {
  551. ZT_PHY_CLOSE_SOCKET(s);
  552. return (PhySocket*)0;
  553. }
  554. PhySocketImpl& sws = _socks.back();
  555. if ((long)s > _nfds)
  556. _nfds = (long)s;
  557. FD_SET(s, &_readfds);
  558. sws.type = ZT_PHY_SOCKET_TCP_LISTEN;
  559. sws.sock = s;
  560. sws.uptr = uptr;
  561. memset(&(sws.saddr), 0, sizeof(struct sockaddr_storage));
  562. memcpy(&(sws.saddr), localAddress, (localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
  563. return (PhySocket*)&sws;
  564. }
  565. /**
  566. * Start a non-blocking connect; CONNECT handler is called on success or failure
  567. *
  568. * A return value of NULL indicates a synchronous failure such as a
  569. * failure to open a socket. The TCP connection handler is not called
  570. * in this case.
  571. *
  572. * It is possible on some platforms for an "instant connect" to occur,
  573. * such as when connecting to a loopback address. In this case, the
  574. * 'connected' result parameter will be set to 'true' and if the
  575. * 'callConnectHandler' flag is true (the default) the TCP connect
  576. * handler will be called before the function returns.
  577. *
  578. * These semantics can be a bit confusing, but they're less so than
  579. * the underlying semantics of asynchronous TCP connect.
  580. *
  581. * @param remoteAddress Remote address
  582. * @param connected Result parameter: set to whether an "instant connect" has occurred (true if yes)
  583. * @param uptr Initial value of uptr for new socket (default: NULL)
  584. * @param callConnectHandler If true, call TCP connect handler even if result is known before function exit (default: true)
  585. * @return New socket or NULL on failure
  586. */
  587. inline PhySocket* tcpConnect(const struct sockaddr* remoteAddress, bool& connected, void* uptr = (void*)0, bool callConnectHandler = true)
  588. {
  589. if (_socks.size() >= ZT_PHY_MAX_SOCKETS)
  590. return (PhySocket*)0;
  591. ZT_PHY_SOCKFD_TYPE s = ::socket(remoteAddress->sa_family, SOCK_STREAM, 0);
  592. if (! ZT_PHY_SOCKFD_VALID(s)) {
  593. connected = false;
  594. return (PhySocket*)0;
  595. }
  596. #if defined(_WIN32) || defined(_WIN64)
  597. {
  598. BOOL f;
  599. if (remoteAddress->sa_family == AF_INET6) {
  600. f = TRUE;
  601. ::setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&f, sizeof(f));
  602. }
  603. f = TRUE;
  604. ::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char*)&f, sizeof(f));
  605. f = (_noDelay ? TRUE : FALSE);
  606. setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f));
  607. u_long iMode = 1;
  608. ioctlsocket(s, FIONBIO, &iMode);
  609. }
  610. #else
  611. {
  612. int f;
  613. if (remoteAddress->sa_family == AF_INET6) {
  614. f = 1;
  615. ::setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (void*)&f, sizeof(f));
  616. }
  617. f = 1;
  618. ::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (void*)&f, sizeof(f));
  619. f = (_noDelay ? 1 : 0);
  620. setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f));
  621. fcntl(s, F_SETFL, O_NONBLOCK);
  622. }
  623. #endif
  624. connected = true;
  625. if (::connect(s, remoteAddress, (remoteAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))) {
  626. connected = false;
  627. #if defined(_WIN32) || defined(_WIN64)
  628. if (WSAGetLastError() != WSAEWOULDBLOCK) {
  629. #else
  630. if (errno != EINPROGRESS) {
  631. #endif
  632. ZT_PHY_CLOSE_SOCKET(s);
  633. return (PhySocket*)0;
  634. } // else connection is proceeding asynchronously...
  635. }
  636. try {
  637. _socks.push_back(PhySocketImpl());
  638. }
  639. catch (...) {
  640. ZT_PHY_CLOSE_SOCKET(s);
  641. return (PhySocket*)0;
  642. }
  643. PhySocketImpl& sws = _socks.back();
  644. if ((long)s > _nfds)
  645. _nfds = (long)s;
  646. if (connected) {
  647. FD_SET(s, &_readfds);
  648. sws.type = ZT_PHY_SOCKET_TCP_OUT_CONNECTED;
  649. }
  650. else {
  651. FD_SET(s, &_writefds);
  652. #if defined(_WIN32) || defined(_WIN64)
  653. FD_SET(s, &_exceptfds);
  654. #endif
  655. sws.type = ZT_PHY_SOCKET_TCP_OUT_PENDING;
  656. }
  657. sws.sock = s;
  658. sws.uptr = uptr;
  659. memset(&(sws.saddr), 0, sizeof(struct sockaddr_storage));
  660. memcpy(&(sws.saddr), remoteAddress, (remoteAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
  661. if ((callConnectHandler) && (connected)) {
  662. try {
  663. _handler->phyOnTcpConnect((PhySocket*)&sws, &(sws.uptr), true);
  664. }
  665. catch (...) {
  666. }
  667. }
  668. return (PhySocket*)&sws;
  669. }
  670. /**
  671. * Try to set buffer sizes as close to the given value as possible
  672. *
  673. * This will try the specified value and then lower values in 16K increments
  674. * until one works.
  675. *
  676. * @param sock Socket
  677. * @param receiveBufferSize Desired size of receive buffer
  678. * @param sendBufferSize Desired size of send buffer
  679. */
  680. inline void setBufferSizes(const PhySocket* sock, int receiveBufferSize, int sendBufferSize)
  681. {
  682. PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(sock));
  683. if (receiveBufferSize > 0) {
  684. while (receiveBufferSize > 0) {
  685. int tmpbs = receiveBufferSize;
  686. if (::setsockopt(sws.sock, SOL_SOCKET, SO_RCVBUF, (const char*)&tmpbs, sizeof(tmpbs)) == 0)
  687. break;
  688. receiveBufferSize -= 16384;
  689. }
  690. }
  691. if (sendBufferSize > 0) {
  692. while (sendBufferSize > 0) {
  693. int tmpbs = sendBufferSize;
  694. if (::setsockopt(sws.sock, SOL_SOCKET, SO_SNDBUF, (const char*)&tmpbs, sizeof(tmpbs)) == 0)
  695. break;
  696. sendBufferSize -= 16384;
  697. }
  698. }
  699. }
  700. /**
  701. * Attempt to send data to a stream socket (non-blocking)
  702. *
  703. * If -1 is returned, the socket should no longer be used as it is now
  704. * destroyed. If callCloseHandler is true, the close handler will be
  705. * called before the function returns.
  706. *
  707. * This can be used with TCP, Unix, or socket pair sockets.
  708. *
  709. * @param sock An open stream socket (other socket types will fail)
  710. * @param data Data to send
  711. * @param len Length of data
  712. * @param callCloseHandler If true, call close handler on socket closing failure condition (default: true)
  713. * @return Number of bytes actually sent or -1 on fatal error (socket closure)
  714. */
  715. inline long streamSend(PhySocket* sock, const void* data, unsigned long len, bool callCloseHandler = true)
  716. {
  717. PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(sock));
  718. #if defined(_WIN32) || defined(_WIN64)
  719. long n = (long)::send(sws.sock, reinterpret_cast<const char*>(data), len, 0);
  720. if (n == SOCKET_ERROR) {
  721. switch (WSAGetLastError()) {
  722. case WSAEINTR:
  723. case WSAEWOULDBLOCK:
  724. return 0;
  725. default:
  726. this->close(sock, callCloseHandler);
  727. return -1;
  728. }
  729. }
  730. #else // not Windows
  731. long n = (long)::send(sws.sock, data, len, 0);
  732. if (n < 0) {
  733. switch (errno) {
  734. #ifdef EAGAIN
  735. case EAGAIN:
  736. #endif
  737. #if defined(EWOULDBLOCK) && (! defined(EAGAIN) || (EWOULDBLOCK != EAGAIN))
  738. case EWOULDBLOCK:
  739. #endif
  740. #ifdef EINTR
  741. case EINTR:
  742. #endif
  743. return 0;
  744. default:
  745. this->close(sock, callCloseHandler);
  746. return -1;
  747. }
  748. }
  749. #endif // Windows or not
  750. return n;
  751. }
  752. #ifdef __UNIX_LIKE__
  753. /**
  754. * Attempt to send data to a Unix domain socket connection (non-blocking)
  755. *
  756. * If -1 is returned, the socket should no longer be used as it is now
  757. * destroyed. If callCloseHandler is true, the close handler will be
  758. * called before the function returns.
  759. *
  760. * @param sock An open Unix socket (other socket types will fail)
  761. * @param data Data to send
  762. * @param len Length of data
  763. * @param callCloseHandler If true, call close handler on socket closing failure condition (default: true)
  764. * @return Number of bytes actually sent or -1 on fatal error (socket closure)
  765. */
  766. inline long unixSend(PhySocket* sock, const void* data, unsigned long len, bool callCloseHandler = true)
  767. {
  768. PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(sock));
  769. long n = (long)::write(sws.sock, data, len);
  770. if (n < 0) {
  771. switch (errno) {
  772. #ifdef EAGAIN
  773. case EAGAIN:
  774. #endif
  775. #if defined(EWOULDBLOCK) && (! defined(EAGAIN) || (EWOULDBLOCK != EAGAIN))
  776. case EWOULDBLOCK:
  777. #endif
  778. #ifdef EINTR
  779. case EINTR:
  780. #endif
  781. return 0;
  782. default:
  783. this->close(sock, callCloseHandler);
  784. return -1;
  785. }
  786. }
  787. return n;
  788. }
  789. #endif // __UNIX_LIKE__
  790. /**
  791. * For streams, sets whether we want to be notified that the socket is writable
  792. *
  793. * This can be used with TCP, Unix, or socket pair sockets.
  794. *
  795. * Call whack() if this is being done from another thread and you want
  796. * it to take effect immediately. Otherwise it is only guaranteed to
  797. * take effect on the next poll().
  798. *
  799. * @param sock Stream connection socket
  800. * @param notifyWritable Want writable notifications?
  801. */
  802. inline void setNotifyWritable(PhySocket* sock, bool notifyWritable)
  803. {
  804. PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(sock));
  805. if (notifyWritable) {
  806. FD_SET(sws.sock, &_writefds);
  807. }
  808. else {
  809. FD_CLR(sws.sock, &_writefds);
  810. }
  811. }
  812. /**
  813. * Set whether we want to be notified that a socket is readable
  814. *
  815. * This is primarily for raw sockets added with wrapSocket(). It could be
  816. * used with others, but doing so would essentially lock them and prevent
  817. * data from being read from them until this is set to 'true' again.
  818. *
  819. * @param sock Socket to modify
  820. * @param notifyReadable True if socket should be monitored for readability
  821. */
  822. inline void setNotifyReadable(PhySocket* sock, bool notifyReadable)
  823. {
  824. PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(sock));
  825. if (notifyReadable) {
  826. FD_SET(sws.sock, &_readfds);
  827. }
  828. else {
  829. FD_CLR(sws.sock, &_readfds);
  830. }
  831. }
  832. /**
  833. * Wait for activity and handle one or more events
  834. *
  835. * Note that this is not guaranteed to wait up to 'timeout' even
  836. * if nothing happens, as whack() or other events such as signals
  837. * may cause premature termination.
  838. *
  839. * @param timeout Timeout in milliseconds or 0 for none (forever)
  840. */
  841. inline void poll(unsigned long timeout)
  842. {
  843. char buf[131072];
  844. struct sockaddr_storage ss;
  845. struct timeval tv;
  846. fd_set rfds, wfds, efds;
  847. memcpy(&rfds, &_readfds, sizeof(rfds));
  848. memcpy(&wfds, &_writefds, sizeof(wfds));
  849. #if defined(_WIN32) || defined(_WIN64)
  850. memcpy(&efds, &_exceptfds, sizeof(efds));
  851. #else
  852. FD_ZERO(&efds);
  853. #endif
  854. tv.tv_sec = (long)(timeout / 1000);
  855. tv.tv_usec = (long)((timeout % 1000) * 1000);
  856. if (::select((int)_nfds + 1, &rfds, &wfds, &efds, (timeout > 0) ? &tv : (struct timeval*)0) <= 0)
  857. return;
  858. if (FD_ISSET(_whackReceiveSocket, &rfds)) {
  859. char tmp[16];
  860. #if defined(_WIN32) || defined(_WIN64)
  861. ::recv(_whackReceiveSocket, tmp, 16, 0);
  862. #else
  863. ::read(_whackReceiveSocket, tmp, 16);
  864. #endif
  865. }
  866. for (typename std::list<PhySocketImpl>::iterator s(_socks.begin()); s != _socks.end();) {
  867. switch (s->type) {
  868. case ZT_PHY_SOCKET_TCP_OUT_PENDING:
  869. #if defined(_WIN32) || defined(_WIN64)
  870. if (FD_ISSET(s->sock, &efds)) {
  871. this->close((PhySocket*)&(*s), true);
  872. }
  873. else // ... if
  874. #endif
  875. if (FD_ISSET(s->sock, &wfds)) {
  876. socklen_t slen = sizeof(ss);
  877. if (::getpeername(s->sock, (struct sockaddr*)&ss, &slen) != 0) {
  878. this->close((PhySocket*)&(*s), true);
  879. }
  880. else {
  881. s->type = ZT_PHY_SOCKET_TCP_OUT_CONNECTED;
  882. FD_SET(s->sock, &_readfds);
  883. FD_CLR(s->sock, &_writefds);
  884. #if defined(_WIN32) || defined(_WIN64)
  885. FD_CLR(s->sock, &_exceptfds);
  886. #endif
  887. try {
  888. _handler->phyOnTcpConnect((PhySocket*)&(*s), &(s->uptr), true);
  889. }
  890. catch (...) {
  891. }
  892. }
  893. }
  894. break;
  895. case ZT_PHY_SOCKET_TCP_OUT_CONNECTED:
  896. case ZT_PHY_SOCKET_TCP_IN: {
  897. ZT_PHY_SOCKFD_TYPE sock = s->sock; // if closed, s->sock becomes invalid as s is no longer dereferencable
  898. if (FD_ISSET(sock, &rfds)) {
  899. long n = (long)::recv(sock, buf, sizeof(buf), 0);
  900. if (n <= 0) {
  901. this->close((PhySocket*)&(*s), true);
  902. }
  903. else {
  904. try {
  905. _handler->phyOnTcpData((PhySocket*)&(*s), &(s->uptr), (void*)buf, (unsigned long)n);
  906. }
  907. catch (...) {
  908. }
  909. }
  910. }
  911. if ((FD_ISSET(sock, &wfds)) && (FD_ISSET(sock, &_writefds))) {
  912. try {
  913. _handler->phyOnTcpWritable((PhySocket*)&(*s), &(s->uptr));
  914. }
  915. catch (...) {
  916. }
  917. }
  918. } break;
  919. case ZT_PHY_SOCKET_TCP_LISTEN:
  920. if (FD_ISSET(s->sock, &rfds)) {
  921. memset(&ss, 0, sizeof(ss));
  922. socklen_t slen = sizeof(ss);
  923. ZT_PHY_SOCKFD_TYPE newSock = ::accept(s->sock, (struct sockaddr*)&ss, &slen);
  924. if (ZT_PHY_SOCKFD_VALID(newSock)) {
  925. if (_socks.size() >= ZT_PHY_MAX_SOCKETS) {
  926. ZT_PHY_CLOSE_SOCKET(newSock);
  927. }
  928. else {
  929. #if defined(_WIN32) || defined(_WIN64)
  930. {
  931. BOOL f = (_noDelay ? TRUE : FALSE);
  932. setsockopt(newSock, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f));
  933. }
  934. {
  935. u_long iMode = 1;
  936. ioctlsocket(newSock, FIONBIO, &iMode);
  937. }
  938. #else
  939. {
  940. int f = (_noDelay ? 1 : 0);
  941. setsockopt(newSock, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f));
  942. }
  943. fcntl(newSock, F_SETFL, O_NONBLOCK);
  944. #endif
  945. _socks.push_back(PhySocketImpl());
  946. PhySocketImpl& sws = _socks.back();
  947. FD_SET(newSock, &_readfds);
  948. if ((long)newSock > _nfds)
  949. _nfds = (long)newSock;
  950. sws.type = ZT_PHY_SOCKET_TCP_IN;
  951. sws.sock = newSock;
  952. sws.uptr = (void*)0;
  953. memcpy(&(sws.saddr), &ss, sizeof(struct sockaddr_storage));
  954. try {
  955. _handler->phyOnTcpAccept((PhySocket*)&(*s), (PhySocket*)&(_socks.back()), &(s->uptr), &(sws.uptr), (const struct sockaddr*)&(sws.saddr));
  956. }
  957. catch (...) {
  958. }
  959. }
  960. }
  961. }
  962. break;
  963. case ZT_PHY_SOCKET_UDP:
  964. if (FD_ISSET(s->sock, &rfds)) {
  965. #if (defined(__linux__) || defined(linux) || defined(__linux)) && defined(MSG_WAITFORONE)
  966. #define RECVMMSG_WINDOW_SIZE 128
  967. #define RECVMMSG_BUF_SIZE 1500
  968. iovec iovs[RECVMMSG_WINDOW_SIZE];
  969. uint8_t bufs[RECVMMSG_WINDOW_SIZE][RECVMMSG_BUF_SIZE];
  970. sockaddr_storage addrs[RECVMMSG_WINDOW_SIZE];
  971. memset(addrs, 0, sizeof(addrs));
  972. mmsghdr mm[RECVMMSG_WINDOW_SIZE];
  973. memset(mm, 0, sizeof(mm));
  974. for (int i = 0; i < RECVMMSG_WINDOW_SIZE; ++i) {
  975. iovs[i].iov_base = (void*)bufs[i];
  976. iovs[i].iov_len = RECVMMSG_BUF_SIZE;
  977. mm[i].msg_hdr.msg_name = (void*)&(addrs[i]);
  978. mm[i].msg_hdr.msg_iov = &(iovs[i]);
  979. mm[i].msg_hdr.msg_iovlen = 1;
  980. }
  981. for (int k = 0; k < 1024; ++k) {
  982. for (int i = 0; i < RECVMMSG_WINDOW_SIZE; ++i) {
  983. mm[i].msg_hdr.msg_namelen = sizeof(sockaddr_storage);
  984. mm[i].msg_len = 0;
  985. }
  986. int received_count = recvmmsg(s->sock, mm, RECVMMSG_WINDOW_SIZE, MSG_WAITFORONE, nullptr);
  987. if (received_count > 0) {
  988. for (int i = 0; i < received_count; ++i) {
  989. long n = (long)mm[i].msg_len;
  990. if (n > 0) {
  991. try {
  992. _handler->phyOnDatagram((PhySocket*)&(*s), &(s->uptr), (const struct sockaddr*)&(s->saddr), (const struct sockaddr*)&(addrs[i]), bufs[i], (unsigned long)n);
  993. }
  994. catch (...) {
  995. }
  996. }
  997. }
  998. }
  999. else {
  1000. break;
  1001. }
  1002. }
  1003. #else
  1004. for (int k = 0; k < 1024; ++k) {
  1005. memset(&ss, 0, sizeof(ss));
  1006. socklen_t slen = sizeof(ss);
  1007. long n = (long)::recvfrom(s->sock, buf, sizeof(buf), 0, (struct sockaddr*)&ss, &slen);
  1008. if (n > 0) {
  1009. try {
  1010. _handler->phyOnDatagram((PhySocket*)&(*s), &(s->uptr), (const struct sockaddr*)&(s->saddr), (const struct sockaddr*)&ss, (void*)buf, (unsigned long)n);
  1011. }
  1012. catch (...) {
  1013. }
  1014. }
  1015. else if (n < 0)
  1016. break;
  1017. }
  1018. #endif
  1019. }
  1020. break;
  1021. case ZT_PHY_SOCKET_UNIX_IN: {
  1022. #ifdef __UNIX_LIKE__
  1023. ZT_PHY_SOCKFD_TYPE sock = s->sock; // if closed, s->sock becomes invalid as s is no longer dereferencable
  1024. if ((FD_ISSET(sock, &wfds)) && (FD_ISSET(sock, &_writefds))) {
  1025. try {
  1026. _handler->phyOnUnixWritable((PhySocket*)&(*s), &(s->uptr));
  1027. }
  1028. catch (...) {
  1029. }
  1030. }
  1031. if (FD_ISSET(sock, &rfds)) {
  1032. long n = (long)::read(sock, buf, sizeof(buf));
  1033. if (n <= 0) {
  1034. this->close((PhySocket*)&(*s), true);
  1035. }
  1036. else {
  1037. try {
  1038. _handler->phyOnUnixData((PhySocket*)&(*s), &(s->uptr), (void*)buf, (unsigned long)n);
  1039. }
  1040. catch (...) {
  1041. }
  1042. }
  1043. }
  1044. #endif // __UNIX_LIKE__
  1045. } break;
  1046. case ZT_PHY_SOCKET_UNIX_LISTEN:
  1047. #ifdef __UNIX_LIKE__
  1048. if (FD_ISSET(s->sock, &rfds)) {
  1049. memset(&ss, 0, sizeof(ss));
  1050. socklen_t slen = sizeof(ss);
  1051. ZT_PHY_SOCKFD_TYPE newSock = ::accept(s->sock, (struct sockaddr*)&ss, &slen);
  1052. if (ZT_PHY_SOCKFD_VALID(newSock)) {
  1053. if (_socks.size() >= ZT_PHY_MAX_SOCKETS) {
  1054. ZT_PHY_CLOSE_SOCKET(newSock);
  1055. }
  1056. else {
  1057. fcntl(newSock, F_SETFL, O_NONBLOCK);
  1058. _socks.push_back(PhySocketImpl());
  1059. PhySocketImpl& sws = _socks.back();
  1060. FD_SET(newSock, &_readfds);
  1061. if ((long)newSock > _nfds)
  1062. _nfds = (long)newSock;
  1063. sws.type = ZT_PHY_SOCKET_UNIX_IN;
  1064. sws.sock = newSock;
  1065. sws.uptr = (void*)0;
  1066. memcpy(&(sws.saddr), &ss, sizeof(struct sockaddr_storage));
  1067. try {
  1068. //_handler->phyOnUnixAccept((PhySocket *)&(*s),(PhySocket *)&(_socks.back()),&(s->uptr),&(sws.uptr));
  1069. }
  1070. catch (...) {
  1071. }
  1072. }
  1073. }
  1074. }
  1075. #endif // __UNIX_LIKE__
  1076. break;
  1077. case ZT_PHY_SOCKET_FD: {
  1078. ZT_PHY_SOCKFD_TYPE sock = s->sock;
  1079. const bool readable = ((FD_ISSET(sock, &rfds)) && (FD_ISSET(sock, &_readfds)));
  1080. const bool writable = ((FD_ISSET(sock, &wfds)) && (FD_ISSET(sock, &_writefds)));
  1081. if ((readable) || (writable)) {
  1082. try {
  1083. //_handler->phyOnFileDescriptorActivity((PhySocket *)&(*s),&(s->uptr),readable,writable);
  1084. }
  1085. catch (...) {
  1086. }
  1087. }
  1088. } break;
  1089. default:
  1090. break;
  1091. }
  1092. if (s->type == ZT_PHY_SOCKET_CLOSED)
  1093. _socks.erase(s++);
  1094. else
  1095. ++s;
  1096. }
  1097. }
  1098. /**
  1099. * @param sock Socket to close
  1100. * @param callHandlers If true, call handlers for TCP connect (success: false) or close (default: true)
  1101. */
  1102. inline void close(PhySocket* sock, bool callHandlers = true)
  1103. {
  1104. if (! sock)
  1105. return;
  1106. PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(sock));
  1107. if (sws.type == ZT_PHY_SOCKET_CLOSED)
  1108. return;
  1109. FD_CLR(sws.sock, &_readfds);
  1110. FD_CLR(sws.sock, &_writefds);
  1111. #if defined(_WIN32) || defined(_WIN64)
  1112. FD_CLR(sws.sock, &_exceptfds);
  1113. #endif
  1114. if (sws.type != ZT_PHY_SOCKET_FD)
  1115. ZT_PHY_CLOSE_SOCKET(sws.sock);
  1116. #ifdef __UNIX_LIKE__
  1117. if (sws.type == ZT_PHY_SOCKET_UNIX_LISTEN)
  1118. ::unlink(((struct sockaddr_un*)(&(sws.saddr)))->sun_path);
  1119. #endif // __UNIX_LIKE__
  1120. if (callHandlers) {
  1121. switch (sws.type) {
  1122. case ZT_PHY_SOCKET_TCP_OUT_PENDING:
  1123. try {
  1124. _handler->phyOnTcpConnect(sock, &(sws.uptr), false);
  1125. }
  1126. catch (...) {
  1127. }
  1128. break;
  1129. case ZT_PHY_SOCKET_TCP_OUT_CONNECTED:
  1130. case ZT_PHY_SOCKET_TCP_IN:
  1131. try {
  1132. _handler->phyOnTcpClose(sock, &(sws.uptr));
  1133. }
  1134. catch (...) {
  1135. }
  1136. break;
  1137. case ZT_PHY_SOCKET_UNIX_IN:
  1138. #ifdef __UNIX_LIKE__
  1139. try {
  1140. _handler->phyOnUnixClose(sock, &(sws.uptr));
  1141. }
  1142. catch (...) {
  1143. }
  1144. #endif // __UNIX_LIKE__
  1145. break;
  1146. default:
  1147. break;
  1148. }
  1149. }
  1150. // Causes entry to be deleted from list in poll(), ignored elsewhere
  1151. sws.type = ZT_PHY_SOCKET_CLOSED;
  1152. if ((long)sws.sock >= (long)_nfds) {
  1153. long nfds = (long)_whackSendSocket;
  1154. if ((long)_whackReceiveSocket > nfds)
  1155. nfds = (long)_whackReceiveSocket;
  1156. for (typename std::list<PhySocketImpl>::iterator s(_socks.begin()); s != _socks.end(); ++s) {
  1157. if ((s->type != ZT_PHY_SOCKET_CLOSED) && ((long)s->sock > nfds))
  1158. nfds = (long)s->sock;
  1159. }
  1160. _nfds = nfds;
  1161. }
  1162. }
  1163. };
  1164. } // namespace ZeroTier
  1165. #endif