Phy.hpp 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266
  1. /*
  2. * Copyright (c)2013-2020 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2026-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #ifndef ZT_PHY_HPP
  14. #define ZT_PHY_HPP
  15. #include <list>
  16. #include <stdexcept>
  17. #include <stdio.h>
  18. #include <stdlib.h>
  19. #include <string.h>
  20. #if defined(_WIN32) || defined(_WIN64)
  21. #include <windows.h>
  22. #include <winsock2.h>
  23. #include <ws2tcpip.h>
  24. #define ZT_PHY_SOCKFD_TYPE SOCKET
  25. #define ZT_PHY_SOCKFD_NULL (INVALID_SOCKET)
  26. #define ZT_PHY_SOCKFD_VALID(s) ((s) != INVALID_SOCKET)
  27. #define ZT_PHY_CLOSE_SOCKET(s) ::closesocket(s)
  28. #define ZT_PHY_MAX_SOCKETS (FD_SETSIZE)
  29. #define ZT_PHY_MAX_INTERCEPTS ZT_PHY_MAX_SOCKETS
  30. #define ZT_PHY_SOCKADDR_STORAGE_TYPE struct sockaddr_storage
  31. #else // not Windows
  32. #include "../node/Metrics.hpp"
  33. #include <arpa/inet.h>
  34. #include <errno.h>
  35. #include <fcntl.h>
  36. #include <netinet/in.h>
  37. #include <netinet/tcp.h>
  38. #include <signal.h>
  39. #include <sys/select.h>
  40. #include <sys/socket.h>
  41. #include <sys/time.h>
  42. #include <sys/types.h>
  43. #include <sys/un.h>
  44. #include <unistd.h>
  45. #if defined(__linux__) || defined(linux) || defined(__LINUX__) || defined(__linux)
  46. #ifndef IPV6_DONTFRAG
  47. #define IPV6_DONTFRAG 62
  48. #endif
  49. #endif
  50. #define ZT_PHY_SOCKFD_TYPE int
  51. #define ZT_PHY_SOCKFD_NULL (-1)
  52. #define ZT_PHY_SOCKFD_VALID(s) ((s) > -1)
  53. #define ZT_PHY_CLOSE_SOCKET(s) ::close(s)
  54. #define ZT_PHY_MAX_SOCKETS (FD_SETSIZE)
  55. #define ZT_PHY_MAX_INTERCEPTS ZT_PHY_MAX_SOCKETS
  56. #define ZT_PHY_SOCKADDR_STORAGE_TYPE struct sockaddr_storage
  57. #endif // Windows or not
  58. namespace ZeroTier {
  59. /**
  60. * Opaque socket type
  61. */
  62. typedef void PhySocket;
  63. /**
  64. * Simple templated non-blocking sockets implementation
  65. *
  66. * Yes there is boost::asio and libuv, but I like small binaries and I hate
  67. * build dependencies. Both drag in a whole bunch of pasta with them.
  68. *
  69. * This class is templated on a pointer to a handler class which must
  70. * implement the following functions:
  71. *
  72. * For all platforms:
  73. *
  74. * phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len)
  75. * phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)
  76. * phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from)
  77. * phyOnTcpClose(PhySocket *sock,void **uptr)
  78. * phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len)
  79. * phyOnTcpWritable(PhySocket *sock,void **uptr)
  80. * phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable)
  81. *
  82. * On Linux/OSX/Unix only (not required/used on Windows or elsewhere):
  83. *
  84. * phyOnUnixAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN)
  85. * phyOnUnixClose(PhySocket *sock,void **uptr)
  86. * phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len)
  87. * phyOnUnixWritable(PhySocket *sock,void **uptr)
  88. *
  89. * These templates typically refer to function objects. Templates are used to
  90. * avoid the call overhead of indirection, which is surprisingly high for high
  91. * bandwidth applications pushing a lot of packets.
  92. *
  93. * The 'sock' pointer above is an opaque pointer to a socket. Each socket
  94. * has a 'uptr' user-settable/modifiable pointer associated with it, which
  95. * can be set on bind/connect calls and is passed as a void ** to permit
  96. * resetting at any time. The ACCEPT handler takes two sets of sock and
  97. * uptr: sockL and uptrL for the listen socket, and sockN and uptrN for
  98. * the new TCP connection socket that has just been created.
  99. *
  100. * Handlers are always called. On outgoing TCP connection, CONNECT is always
  101. * called on either success or failure followed by DATA and/or WRITABLE as
  102. * indicated. On socket close, handlers are called unless close() is told
  103. * explicitly not to call handlers. It is safe to close a socket within a
  104. * handler, and in that case close() can be told not to call handlers to
  105. * prevent recursion.
  106. *
  107. * This isn't thread-safe with the exception of whack(), which is safe to
  108. * call from another thread to abort poll().
  109. */
  110. template <typename HANDLER_PTR_TYPE> class Phy {
  111. private:
  112. HANDLER_PTR_TYPE _handler;
  113. enum PhySocketType {
  114. ZT_PHY_SOCKET_CLOSED = 0x00, // socket is closed, will be removed on next poll()
  115. ZT_PHY_SOCKET_TCP_OUT_PENDING = 0x01,
  116. ZT_PHY_SOCKET_TCP_OUT_CONNECTED = 0x02,
  117. ZT_PHY_SOCKET_TCP_IN = 0x03,
  118. ZT_PHY_SOCKET_TCP_LISTEN = 0x04,
  119. ZT_PHY_SOCKET_UDP = 0x05,
  120. ZT_PHY_SOCKET_FD = 0x06,
  121. ZT_PHY_SOCKET_UNIX_IN = 0x07,
  122. ZT_PHY_SOCKET_UNIX_LISTEN = 0x08
  123. };
  124. struct PhySocketImpl {
  125. PhySocketImpl()
  126. {
  127. }
  128. PhySocketType type;
  129. ZT_PHY_SOCKFD_TYPE sock;
  130. void* uptr; // user-settable pointer
  131. uint16_t localPort;
  132. ZT_PHY_SOCKADDR_STORAGE_TYPE saddr; // remote for TCP_OUT and TCP_IN, local for TCP_LISTEN, RAW, and UDP
  133. };
  134. std::list<PhySocketImpl> _socks;
  135. fd_set _readfds;
  136. fd_set _writefds;
  137. #if defined(_WIN32) || defined(_WIN64)
  138. fd_set _exceptfds;
  139. #endif
  140. long _nfds;
  141. ZT_PHY_SOCKFD_TYPE _whackReceiveSocket;
  142. ZT_PHY_SOCKFD_TYPE _whackSendSocket;
  143. bool _noDelay;
  144. bool _noCheck;
  145. public:
  146. /**
  147. * @param handler Pointer of type HANDLER_PTR_TYPE to handler
  148. * @param noDelay If true, disable TCP NAGLE algorithm on TCP sockets
  149. * @param noCheck If true, attempt to set UDP SO_NO_CHECK option to disable sending checksums
  150. */
  151. Phy(HANDLER_PTR_TYPE handler, bool noDelay, bool noCheck) : _handler(handler)
  152. {
  153. FD_ZERO(&_readfds);
  154. FD_ZERO(&_writefds);
  155. #if defined(_WIN32) || defined(_WIN64)
  156. FD_ZERO(&_exceptfds);
  157. SOCKET pipes[2];
  158. { // hack copied from StackOverflow, behaves a bit like pipe() on *nix systems
  159. struct sockaddr_in inaddr;
  160. struct sockaddr addr;
  161. SOCKET lst = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  162. if (lst == INVALID_SOCKET)
  163. throw std::runtime_error("unable to create pipes for select() abort");
  164. memset(&inaddr, 0, sizeof(inaddr));
  165. memset(&addr, 0, sizeof(addr));
  166. inaddr.sin_family = AF_INET;
  167. inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
  168. inaddr.sin_port = 0;
  169. int yes = 1;
  170. setsockopt(lst, SOL_SOCKET, SO_REUSEADDR, (char*)&yes, sizeof(yes));
  171. bind(lst, (struct sockaddr*)&inaddr, sizeof(inaddr));
  172. listen(lst, 1);
  173. int len = sizeof(inaddr);
  174. getsockname(lst, &addr, &len);
  175. pipes[0] = ::socket(AF_INET, SOCK_STREAM, 0);
  176. if (pipes[0] == INVALID_SOCKET)
  177. throw std::runtime_error("unable to create pipes for select() abort");
  178. connect(pipes[0], &addr, len);
  179. pipes[1] = accept(lst, 0, 0);
  180. closesocket(lst);
  181. }
  182. #else // not Windows
  183. int pipes[2];
  184. if (::pipe(pipes))
  185. throw std::runtime_error("unable to create pipes for select() abort");
  186. #endif // Windows or not
  187. _nfds = (pipes[0] > pipes[1]) ? (long)pipes[0] : (long)pipes[1];
  188. _whackReceiveSocket = pipes[0];
  189. _whackSendSocket = pipes[1];
  190. _noDelay = noDelay;
  191. _noCheck = noCheck;
  192. }
  193. ~Phy()
  194. {
  195. for (typename std::list<PhySocketImpl>::const_iterator s(_socks.begin()); s != _socks.end(); ++s) {
  196. if (s->type != ZT_PHY_SOCKET_CLOSED)
  197. this->close((PhySocket*)&(*s), true);
  198. }
  199. ZT_PHY_CLOSE_SOCKET(_whackReceiveSocket);
  200. ZT_PHY_CLOSE_SOCKET(_whackSendSocket);
  201. }
  202. /**
  203. * @param s Socket object
  204. * @return Underlying OS-type (usually int or long) file descriptor associated with object
  205. */
  206. static inline ZT_PHY_SOCKFD_TYPE getDescriptor(PhySocket* s) throw()
  207. {
  208. return reinterpret_cast<PhySocketImpl*>(s)->sock;
  209. }
  210. /**
  211. * @param s Socket object
  212. * @return Pointer to user object
  213. */
  214. static inline void** getuptr(PhySocket* s) throw()
  215. {
  216. return &(reinterpret_cast<PhySocketImpl*>(s)->uptr);
  217. }
  218. /**
  219. * Return the local port corresponding to this PhySocket
  220. *
  221. * @param s Socket object
  222. *
  223. * @return Local port corresponding to this PhySocket
  224. */
  225. static inline uint16_t getLocalPort(PhySocket* s) throw()
  226. {
  227. return reinterpret_cast<PhySocketImpl*>(s)->localPort;
  228. }
  229. /**
  230. * Cause poll() to stop waiting immediately
  231. *
  232. * This can be used to reset the polling loop after changes that require
  233. * attention, or to shut down a background thread that is waiting, etc.
  234. */
  235. inline void whack()
  236. {
  237. #if defined(_WIN32) || defined(_WIN64)
  238. ::send(_whackSendSocket, (const char*)this, 1, 0);
  239. #else
  240. (void)(::write(_whackSendSocket, (PhySocket*)this, 1));
  241. #endif
  242. }
  243. /**
  244. * @return Number of open sockets
  245. */
  246. inline unsigned long count() const throw()
  247. {
  248. return _socks.size();
  249. }
  250. /**
  251. * @return Maximum number of sockets allowed
  252. */
  253. inline unsigned long maxCount() const throw()
  254. {
  255. return ZT_PHY_MAX_SOCKETS;
  256. }
  257. /**
  258. * Wrap a raw file descriptor in a PhySocket structure
  259. *
  260. * This can be used to select/poll on a raw file descriptor as part of this
  261. * class's I/O loop. By default the fd is set for read notification but
  262. * this can be controlled with setNotifyReadable(). When any detected
  263. * condition is present, the phyOnFileDescriptorActivity() callback is
  264. * called with one or both of its arguments 'true'.
  265. *
  266. * The Phy<>::close() method *must* be called when you're done with this
  267. * file descriptor to remove it from the select/poll set, but unlike other
  268. * types of sockets Phy<> does not actually close the underlying fd or
  269. * otherwise manage its life cycle. There is also no close notification
  270. * callback for this fd, since Phy<> doesn't actually perform reading or
  271. * writing or detect error conditions. This is only useful for adding a
  272. * file descriptor to Phy<> to select/poll on it.
  273. *
  274. * @param fd Raw file descriptor
  275. * @param uptr User pointer to supply to callbacks
  276. * @return PhySocket wrapping fd or NULL on failure (out of memory or too many sockets)
  277. */
  278. inline PhySocket* wrapSocket(ZT_PHY_SOCKFD_TYPE fd, void* uptr = (void*)0)
  279. {
  280. if (_socks.size() >= ZT_PHY_MAX_SOCKETS)
  281. return (PhySocket*)0;
  282. try {
  283. _socks.push_back(PhySocketImpl());
  284. }
  285. catch (...) {
  286. return (PhySocket*)0;
  287. }
  288. PhySocketImpl& sws = _socks.back();
  289. if ((long)fd > _nfds)
  290. _nfds = (long)fd;
  291. FD_SET(fd, &_readfds);
  292. sws.type = ZT_PHY_SOCKET_UNIX_IN; /* TODO: Type was changed to allow for CBs with new RPC model */
  293. sws.sock = fd;
  294. sws.uptr = uptr;
  295. memset(&(sws.saddr), 0, sizeof(struct sockaddr_storage));
  296. // no sockaddr for this socket type, leave saddr null
  297. return (PhySocket*)&sws;
  298. }
  299. /**
  300. * Bind a UDP socket
  301. *
  302. * @param localAddress Local endpoint address and port
  303. * @param uptr Initial value of user pointer associated with this socket (default: NULL)
  304. * @param bufferSize Desired socket receive/send buffer size -- will set as close to this as possible (default: 0, leave alone)
  305. * @return Socket or NULL on failure to bind
  306. */
  307. inline PhySocket* udpBind(const struct sockaddr* localAddress, void* uptr = (void*)0, int bufferSize = 0)
  308. {
  309. if (_socks.size() >= ZT_PHY_MAX_SOCKETS)
  310. return (PhySocket*)0;
  311. ZT_PHY_SOCKFD_TYPE s = ::socket(localAddress->sa_family, SOCK_DGRAM, 0);
  312. if (! ZT_PHY_SOCKFD_VALID(s))
  313. return (PhySocket*)0;
  314. if (bufferSize > 0) {
  315. int bs = bufferSize;
  316. while (bs >= 65536) {
  317. int tmpbs = bs;
  318. if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const char*)&tmpbs, sizeof(tmpbs)) == 0)
  319. break;
  320. bs -= 4096;
  321. }
  322. bs = bufferSize;
  323. while (bs >= 65536) {
  324. int tmpbs = bs;
  325. if (setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char*)&tmpbs, sizeof(tmpbs)) == 0)
  326. break;
  327. bs -= 4096;
  328. }
  329. }
  330. #if defined(_WIN32) || defined(_WIN64)
  331. {
  332. BOOL f;
  333. if (localAddress->sa_family == AF_INET6) {
  334. f = TRUE;
  335. setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&f, sizeof(f));
  336. f = FALSE;
  337. setsockopt(s, IPPROTO_IPV6, IPV6_DONTFRAG, (const char*)&f, sizeof(f));
  338. }
  339. f = FALSE;
  340. setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char*)&f, sizeof(f));
  341. f = TRUE;
  342. setsockopt(s, SOL_SOCKET, SO_BROADCAST, (const char*)&f, sizeof(f));
  343. }
  344. #else // not Windows
  345. {
  346. int f;
  347. if (localAddress->sa_family == AF_INET6) {
  348. f = 1;
  349. setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (void*)&f, sizeof(f));
  350. #ifdef IPV6_MTU_DISCOVER
  351. f = 0;
  352. setsockopt(s, IPPROTO_IPV6, IPV6_MTU_DISCOVER, &f, sizeof(f));
  353. #endif
  354. #ifdef IPV6_DONTFRAG
  355. f = 0;
  356. setsockopt(s, IPPROTO_IPV6, IPV6_DONTFRAG, &f, sizeof(f));
  357. #endif
  358. }
  359. f = 0;
  360. setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (void*)&f, sizeof(f));
  361. f = 1;
  362. setsockopt(s, SOL_SOCKET, SO_BROADCAST, (void*)&f, sizeof(f));
  363. #ifdef IP_DONTFRAG
  364. f = 0;
  365. setsockopt(s, IPPROTO_IP, IP_DONTFRAG, &f, sizeof(f));
  366. #endif
  367. #ifdef IP_MTU_DISCOVER
  368. f = 0;
  369. setsockopt(s, IPPROTO_IP, IP_MTU_DISCOVER, &f, sizeof(f));
  370. #endif
  371. #ifdef SO_NO_CHECK
  372. // For now at least we only set SO_NO_CHECK on IPv4 sockets since some
  373. // IPv6 stacks incorrectly discard zero checksum packets. May remove
  374. // this restriction later once broken stuff dies more.
  375. if ((localAddress->sa_family == AF_INET) && (_noCheck)) {
  376. f = 1;
  377. setsockopt(s, SOL_SOCKET, SO_NO_CHECK, (void*)&f, sizeof(f));
  378. }
  379. #endif
  380. }
  381. #endif // Windows or not
  382. if (::bind(s, localAddress, (localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))) {
  383. ZT_PHY_CLOSE_SOCKET(s);
  384. return (PhySocket*)0;
  385. }
  386. #if defined(_WIN32) || defined(_WIN64)
  387. {
  388. u_long iMode = 1;
  389. ioctlsocket(s, FIONBIO, &iMode);
  390. }
  391. #else
  392. fcntl(s, F_SETFL, O_NONBLOCK);
  393. #endif
  394. try {
  395. _socks.push_back(PhySocketImpl());
  396. }
  397. catch (...) {
  398. ZT_PHY_CLOSE_SOCKET(s);
  399. return (PhySocket*)0;
  400. }
  401. PhySocketImpl& sws = _socks.back();
  402. if ((long)s > _nfds)
  403. _nfds = (long)s;
  404. FD_SET(s, &_readfds);
  405. sws.type = ZT_PHY_SOCKET_UDP;
  406. sws.sock = s;
  407. sws.uptr = uptr;
  408. #ifdef __UNIX_LIKE__
  409. struct sockaddr_in* sin = (struct sockaddr_in*)localAddress;
  410. sws.localPort = htons(sin->sin_port);
  411. #endif
  412. memset(&(sws.saddr), 0, sizeof(struct sockaddr_storage));
  413. memcpy(&(sws.saddr), localAddress, (localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
  414. return (PhySocket*)&sws;
  415. }
  416. /**
  417. * Set the IP TTL for the next outgoing packet (for IPv4 UDP sockets only)
  418. *
  419. * @param ttl New TTL (0 or >255 will set it to 255)
  420. * @return True on success
  421. */
  422. inline bool setIp4UdpTtl(PhySocket* sock, unsigned int ttl)
  423. {
  424. PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(sock));
  425. #if defined(_WIN32) || defined(_WIN64)
  426. DWORD tmp = ((ttl == 0) || (ttl > 255)) ? 255 : (DWORD)ttl;
  427. return (::setsockopt(sws.sock, IPPROTO_IP, IP_TTL, (const char*)&tmp, sizeof(tmp)) == 0);
  428. #else
  429. int tmp = ((ttl == 0) || (ttl > 255)) ? 255 : (int)ttl;
  430. return (::setsockopt(sws.sock, IPPROTO_IP, IP_TTL, (void*)&tmp, sizeof(tmp)) == 0);
  431. #endif
  432. }
  433. /**
  434. * Send a UDP packet
  435. *
  436. * @param sock UDP socket
  437. * @param remoteAddress Destination address (must be correct type for socket)
  438. * @param data Data to send
  439. * @param len Length of packet
  440. * @return True if packet appears to have been sent successfully
  441. */
  442. inline bool udpSend(PhySocket* sock, const struct sockaddr* remoteAddress, const void* data, unsigned long len)
  443. {
  444. PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(sock));
  445. bool sent = false;
  446. #if defined(_WIN32) || defined(_WIN64)
  447. sent = ((long)::sendto(sws.sock, reinterpret_cast<const char*>(data), len, 0, remoteAddress, (remoteAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)) == (long)len);
  448. #else
  449. sent = ((long)::sendto(sws.sock, data, len, 0, remoteAddress, (remoteAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)) == (long)len);
  450. #endif
  451. if (sent) {
  452. Metrics::udp_send += len;
  453. }
  454. return sent;
  455. }
  456. #ifdef __UNIX_LIKE__
  457. /**
  458. * Listen for connections on a Unix domain socket
  459. *
  460. * @param path Path to Unix domain socket
  461. * @param uptr Arbitrary pointer to associate
  462. * @return PhySocket or NULL if cannot bind
  463. */
  464. inline PhySocket* unixListen(const char* path, void* uptr = (void*)0)
  465. {
  466. struct sockaddr_un sun;
  467. if (_socks.size() >= ZT_PHY_MAX_SOCKETS)
  468. return (PhySocket*)0;
  469. memset(&sun, 0, sizeof(sun));
  470. sun.sun_family = AF_UNIX;
  471. if (strlen(path) >= sizeof(sun.sun_path))
  472. return (PhySocket*)0;
  473. strcpy(sun.sun_path, path);
  474. ZT_PHY_SOCKFD_TYPE s = ::socket(PF_UNIX, SOCK_STREAM, 0);
  475. if (! ZT_PHY_SOCKFD_VALID(s))
  476. return (PhySocket*)0;
  477. ::fcntl(s, F_SETFL, O_NONBLOCK);
  478. ::unlink(path);
  479. if (::bind(s, (struct sockaddr*)&sun, sizeof(struct sockaddr_un)) != 0) {
  480. ZT_PHY_CLOSE_SOCKET(s);
  481. return (PhySocket*)0;
  482. }
  483. if (::listen(s, 128) != 0) {
  484. ZT_PHY_CLOSE_SOCKET(s);
  485. return (PhySocket*)0;
  486. }
  487. try {
  488. _socks.push_back(PhySocketImpl());
  489. }
  490. catch (...) {
  491. ZT_PHY_CLOSE_SOCKET(s);
  492. return (PhySocket*)0;
  493. }
  494. PhySocketImpl& sws = _socks.back();
  495. if ((long)s > _nfds)
  496. _nfds = (long)s;
  497. FD_SET(s, &_readfds);
  498. sws.type = ZT_PHY_SOCKET_UNIX_LISTEN;
  499. sws.sock = s;
  500. sws.uptr = uptr;
  501. memset(&(sws.saddr), 0, sizeof(struct sockaddr_storage));
  502. memcpy(&(sws.saddr), &sun, sizeof(struct sockaddr_un));
  503. return (PhySocket*)&sws;
  504. }
  505. #endif // __UNIX_LIKE__
  506. /**
  507. * Bind a local listen socket to listen for new TCP connections
  508. *
  509. * @param localAddress Local address and port
  510. * @param uptr Initial value of uptr for new socket (default: NULL)
  511. * @return Socket or NULL on failure to bind
  512. */
  513. inline PhySocket* tcpListen(const struct sockaddr* localAddress, void* uptr = (void*)0)
  514. {
  515. if (_socks.size() >= ZT_PHY_MAX_SOCKETS)
  516. return (PhySocket*)0;
  517. ZT_PHY_SOCKFD_TYPE s = ::socket(localAddress->sa_family, SOCK_STREAM, 0);
  518. if (! ZT_PHY_SOCKFD_VALID(s))
  519. return (PhySocket*)0;
  520. #if defined(_WIN32) || defined(_WIN64)
  521. {
  522. BOOL f;
  523. f = TRUE;
  524. ::setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&f, sizeof(f));
  525. f = TRUE;
  526. ::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char*)&f, sizeof(f));
  527. f = (_noDelay ? TRUE : FALSE);
  528. setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f));
  529. u_long iMode = 1;
  530. ioctlsocket(s, FIONBIO, &iMode);
  531. }
  532. #else
  533. {
  534. int f;
  535. f = 1;
  536. ::setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (void*)&f, sizeof(f));
  537. f = 1;
  538. ::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (void*)&f, sizeof(f));
  539. f = (_noDelay ? 1 : 0);
  540. setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f));
  541. fcntl(s, F_SETFL, O_NONBLOCK);
  542. }
  543. #endif
  544. if (::bind(s, localAddress, (localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))) {
  545. ZT_PHY_CLOSE_SOCKET(s);
  546. return (PhySocket*)0;
  547. }
  548. if (::listen(s, 1024)) {
  549. ZT_PHY_CLOSE_SOCKET(s);
  550. return (PhySocket*)0;
  551. }
  552. try {
  553. _socks.push_back(PhySocketImpl());
  554. }
  555. catch (...) {
  556. ZT_PHY_CLOSE_SOCKET(s);
  557. return (PhySocket*)0;
  558. }
  559. PhySocketImpl& sws = _socks.back();
  560. if ((long)s > _nfds)
  561. _nfds = (long)s;
  562. FD_SET(s, &_readfds);
  563. sws.type = ZT_PHY_SOCKET_TCP_LISTEN;
  564. sws.sock = s;
  565. sws.uptr = uptr;
  566. memset(&(sws.saddr), 0, sizeof(struct sockaddr_storage));
  567. memcpy(&(sws.saddr), localAddress, (localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
  568. return (PhySocket*)&sws;
  569. }
  570. /**
  571. * Start a non-blocking connect; CONNECT handler is called on success or failure
  572. *
  573. * A return value of NULL indicates a synchronous failure such as a
  574. * failure to open a socket. The TCP connection handler is not called
  575. * in this case.
  576. *
  577. * It is possible on some platforms for an "instant connect" to occur,
  578. * such as when connecting to a loopback address. In this case, the
  579. * 'connected' result parameter will be set to 'true' and if the
  580. * 'callConnectHandler' flag is true (the default) the TCP connect
  581. * handler will be called before the function returns.
  582. *
  583. * These semantics can be a bit confusing, but they're less so than
  584. * the underlying semantics of asynchronous TCP connect.
  585. *
  586. * @param remoteAddress Remote address
  587. * @param connected Result parameter: set to whether an "instant connect" has occurred (true if yes)
  588. * @param uptr Initial value of uptr for new socket (default: NULL)
  589. * @param callConnectHandler If true, call TCP connect handler even if result is known before function exit (default: true)
  590. * @return New socket or NULL on failure
  591. */
  592. inline PhySocket* tcpConnect(const struct sockaddr* remoteAddress, bool& connected, void* uptr = (void*)0, bool callConnectHandler = true)
  593. {
  594. if (_socks.size() >= ZT_PHY_MAX_SOCKETS)
  595. return (PhySocket*)0;
  596. ZT_PHY_SOCKFD_TYPE s = ::socket(remoteAddress->sa_family, SOCK_STREAM, 0);
  597. if (! ZT_PHY_SOCKFD_VALID(s)) {
  598. connected = false;
  599. return (PhySocket*)0;
  600. }
  601. #if defined(_WIN32) || defined(_WIN64)
  602. {
  603. BOOL f;
  604. if (remoteAddress->sa_family == AF_INET6) {
  605. f = TRUE;
  606. ::setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&f, sizeof(f));
  607. }
  608. f = TRUE;
  609. ::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char*)&f, sizeof(f));
  610. f = (_noDelay ? TRUE : FALSE);
  611. setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f));
  612. u_long iMode = 1;
  613. ioctlsocket(s, FIONBIO, &iMode);
  614. }
  615. #else
  616. {
  617. int f;
  618. if (remoteAddress->sa_family == AF_INET6) {
  619. f = 1;
  620. ::setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, (void*)&f, sizeof(f));
  621. }
  622. f = 1;
  623. ::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (void*)&f, sizeof(f));
  624. f = (_noDelay ? 1 : 0);
  625. setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f));
  626. fcntl(s, F_SETFL, O_NONBLOCK);
  627. }
  628. #endif
  629. connected = true;
  630. if (::connect(s, remoteAddress, (remoteAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))) {
  631. connected = false;
  632. #if defined(_WIN32) || defined(_WIN64)
  633. if (WSAGetLastError() != WSAEWOULDBLOCK) {
  634. #else
  635. if (errno != EINPROGRESS) {
  636. #endif
  637. ZT_PHY_CLOSE_SOCKET(s);
  638. return (PhySocket*)0;
  639. } // else connection is proceeding asynchronously...
  640. }
  641. try {
  642. _socks.push_back(PhySocketImpl());
  643. }
  644. catch (...) {
  645. ZT_PHY_CLOSE_SOCKET(s);
  646. return (PhySocket*)0;
  647. }
  648. PhySocketImpl& sws = _socks.back();
  649. if ((long)s > _nfds)
  650. _nfds = (long)s;
  651. if (connected) {
  652. FD_SET(s, &_readfds);
  653. sws.type = ZT_PHY_SOCKET_TCP_OUT_CONNECTED;
  654. }
  655. else {
  656. FD_SET(s, &_writefds);
  657. #if defined(_WIN32) || defined(_WIN64)
  658. FD_SET(s, &_exceptfds);
  659. #endif
  660. sws.type = ZT_PHY_SOCKET_TCP_OUT_PENDING;
  661. }
  662. sws.sock = s;
  663. sws.uptr = uptr;
  664. memset(&(sws.saddr), 0, sizeof(struct sockaddr_storage));
  665. memcpy(&(sws.saddr), remoteAddress, (remoteAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
  666. if ((callConnectHandler) && (connected)) {
  667. try {
  668. _handler->phyOnTcpConnect((PhySocket*)&sws, &(sws.uptr), true);
  669. }
  670. catch (...) {
  671. }
  672. }
  673. return (PhySocket*)&sws;
  674. }
  675. /**
  676. * Try to set buffer sizes as close to the given value as possible
  677. *
  678. * This will try the specified value and then lower values in 16K increments
  679. * until one works.
  680. *
  681. * @param sock Socket
  682. * @param receiveBufferSize Desired size of receive buffer
  683. * @param sendBufferSize Desired size of send buffer
  684. */
  685. inline void setBufferSizes(const PhySocket* sock, int receiveBufferSize, int sendBufferSize)
  686. {
  687. PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(sock));
  688. if (receiveBufferSize > 0) {
  689. while (receiveBufferSize > 0) {
  690. int tmpbs = receiveBufferSize;
  691. if (::setsockopt(sws.sock, SOL_SOCKET, SO_RCVBUF, (const char*)&tmpbs, sizeof(tmpbs)) == 0)
  692. break;
  693. receiveBufferSize -= 16384;
  694. }
  695. }
  696. if (sendBufferSize > 0) {
  697. while (sendBufferSize > 0) {
  698. int tmpbs = sendBufferSize;
  699. if (::setsockopt(sws.sock, SOL_SOCKET, SO_SNDBUF, (const char*)&tmpbs, sizeof(tmpbs)) == 0)
  700. break;
  701. sendBufferSize -= 16384;
  702. }
  703. }
  704. }
  705. /**
  706. * Attempt to send data to a stream socket (non-blocking)
  707. *
  708. * If -1 is returned, the socket should no longer be used as it is now
  709. * destroyed. If callCloseHandler is true, the close handler will be
  710. * called before the function returns.
  711. *
  712. * This can be used with TCP, Unix, or socket pair sockets.
  713. *
  714. * @param sock An open stream socket (other socket types will fail)
  715. * @param data Data to send
  716. * @param len Length of data
  717. * @param callCloseHandler If true, call close handler on socket closing failure condition (default: true)
  718. * @return Number of bytes actually sent or -1 on fatal error (socket closure)
  719. */
  720. inline long streamSend(PhySocket* sock, const void* data, unsigned long len, bool callCloseHandler = true)
  721. {
  722. PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(sock));
  723. #if defined(_WIN32) || defined(_WIN64)
  724. long n = (long)::send(sws.sock, reinterpret_cast<const char*>(data), len, 0);
  725. if (n == SOCKET_ERROR) {
  726. switch (WSAGetLastError()) {
  727. case WSAEINTR:
  728. case WSAEWOULDBLOCK:
  729. return 0;
  730. default:
  731. this->close(sock, callCloseHandler);
  732. return -1;
  733. }
  734. }
  735. #else // not Windows
  736. long n = (long)::send(sws.sock, data, len, 0);
  737. if (n < 0) {
  738. switch (errno) {
  739. #ifdef EAGAIN
  740. case EAGAIN:
  741. #endif
  742. #if defined(EWOULDBLOCK) && (! defined(EAGAIN) || (EWOULDBLOCK != EAGAIN))
  743. case EWOULDBLOCK:
  744. #endif
  745. #ifdef EINTR
  746. case EINTR:
  747. #endif
  748. return 0;
  749. default:
  750. this->close(sock, callCloseHandler);
  751. return -1;
  752. }
  753. }
  754. #endif // Windows or not
  755. return n;
  756. }
  757. #ifdef __UNIX_LIKE__
  758. /**
  759. * Attempt to send data to a Unix domain socket connection (non-blocking)
  760. *
  761. * If -1 is returned, the socket should no longer be used as it is now
  762. * destroyed. If callCloseHandler is true, the close handler will be
  763. * called before the function returns.
  764. *
  765. * @param sock An open Unix socket (other socket types will fail)
  766. * @param data Data to send
  767. * @param len Length of data
  768. * @param callCloseHandler If true, call close handler on socket closing failure condition (default: true)
  769. * @return Number of bytes actually sent or -1 on fatal error (socket closure)
  770. */
  771. inline long unixSend(PhySocket* sock, const void* data, unsigned long len, bool callCloseHandler = true)
  772. {
  773. PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(sock));
  774. long n = (long)::write(sws.sock, data, len);
  775. if (n < 0) {
  776. switch (errno) {
  777. #ifdef EAGAIN
  778. case EAGAIN:
  779. #endif
  780. #if defined(EWOULDBLOCK) && (! defined(EAGAIN) || (EWOULDBLOCK != EAGAIN))
  781. case EWOULDBLOCK:
  782. #endif
  783. #ifdef EINTR
  784. case EINTR:
  785. #endif
  786. return 0;
  787. default:
  788. this->close(sock, callCloseHandler);
  789. return -1;
  790. }
  791. }
  792. return n;
  793. }
  794. #endif // __UNIX_LIKE__
  795. /**
  796. * For streams, sets whether we want to be notified that the socket is writable
  797. *
  798. * This can be used with TCP, Unix, or socket pair sockets.
  799. *
  800. * Call whack() if this is being done from another thread and you want
  801. * it to take effect immediately. Otherwise it is only guaranteed to
  802. * take effect on the next poll().
  803. *
  804. * @param sock Stream connection socket
  805. * @param notifyWritable Want writable notifications?
  806. */
  807. inline void setNotifyWritable(PhySocket* sock, bool notifyWritable)
  808. {
  809. PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(sock));
  810. if (notifyWritable) {
  811. FD_SET(sws.sock, &_writefds);
  812. }
  813. else {
  814. FD_CLR(sws.sock, &_writefds);
  815. }
  816. }
  817. /**
  818. * Set whether we want to be notified that a socket is readable
  819. *
  820. * This is primarily for raw sockets added with wrapSocket(). It could be
  821. * used with others, but doing so would essentially lock them and prevent
  822. * data from being read from them until this is set to 'true' again.
  823. *
  824. * @param sock Socket to modify
  825. * @param notifyReadable True if socket should be monitored for readability
  826. */
  827. inline void setNotifyReadable(PhySocket* sock, bool notifyReadable)
  828. {
  829. PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(sock));
  830. if (notifyReadable) {
  831. FD_SET(sws.sock, &_readfds);
  832. }
  833. else {
  834. FD_CLR(sws.sock, &_readfds);
  835. }
  836. }
  837. /**
  838. * Wait for activity and handle one or more events
  839. *
  840. * Note that this is not guaranteed to wait up to 'timeout' even
  841. * if nothing happens, as whack() or other events such as signals
  842. * may cause premature termination.
  843. *
  844. * @param timeout Timeout in milliseconds or 0 for none (forever)
  845. */
  846. inline void poll(unsigned long timeout)
  847. {
  848. char buf[131072];
  849. struct sockaddr_storage ss;
  850. struct timeval tv;
  851. fd_set rfds, wfds, efds;
  852. memcpy(&rfds, &_readfds, sizeof(rfds));
  853. memcpy(&wfds, &_writefds, sizeof(wfds));
  854. #if defined(_WIN32) || defined(_WIN64)
  855. memcpy(&efds, &_exceptfds, sizeof(efds));
  856. #else
  857. FD_ZERO(&efds);
  858. #endif
  859. tv.tv_sec = (long)(timeout / 1000);
  860. tv.tv_usec = (long)((timeout % 1000) * 1000);
  861. if (::select((int)_nfds + 1, &rfds, &wfds, &efds, (timeout > 0) ? &tv : (struct timeval*)0) <= 0)
  862. return;
  863. if (FD_ISSET(_whackReceiveSocket, &rfds)) {
  864. char tmp[16];
  865. #if defined(_WIN32) || defined(_WIN64)
  866. ::recv(_whackReceiveSocket, tmp, 16, 0);
  867. #else
  868. ::read(_whackReceiveSocket, tmp, 16);
  869. #endif
  870. }
  871. for (typename std::list<PhySocketImpl>::iterator s(_socks.begin()); s != _socks.end();) {
  872. switch (s->type) {
  873. case ZT_PHY_SOCKET_TCP_OUT_PENDING:
  874. #if defined(_WIN32) || defined(_WIN64)
  875. if (FD_ISSET(s->sock, &efds)) {
  876. this->close((PhySocket*)&(*s), true);
  877. }
  878. else // ... if
  879. #endif
  880. if (FD_ISSET(s->sock, &wfds)) {
  881. socklen_t slen = sizeof(ss);
  882. if (::getpeername(s->sock, (struct sockaddr*)&ss, &slen) != 0) {
  883. this->close((PhySocket*)&(*s), true);
  884. }
  885. else {
  886. s->type = ZT_PHY_SOCKET_TCP_OUT_CONNECTED;
  887. FD_SET(s->sock, &_readfds);
  888. FD_CLR(s->sock, &_writefds);
  889. #if defined(_WIN32) || defined(_WIN64)
  890. FD_CLR(s->sock, &_exceptfds);
  891. #endif
  892. try {
  893. _handler->phyOnTcpConnect((PhySocket*)&(*s), &(s->uptr), true);
  894. }
  895. catch (...) {
  896. }
  897. }
  898. }
  899. break;
  900. case ZT_PHY_SOCKET_TCP_OUT_CONNECTED:
  901. case ZT_PHY_SOCKET_TCP_IN: {
  902. ZT_PHY_SOCKFD_TYPE sock = s->sock; // if closed, s->sock becomes invalid as s is no longer dereferencable
  903. if (FD_ISSET(sock, &rfds)) {
  904. long n = (long)::recv(sock, buf, sizeof(buf), 0);
  905. if (n <= 0) {
  906. this->close((PhySocket*)&(*s), true);
  907. }
  908. else {
  909. try {
  910. _handler->phyOnTcpData((PhySocket*)&(*s), &(s->uptr), (void*)buf, (unsigned long)n);
  911. }
  912. catch (...) {
  913. }
  914. }
  915. }
  916. if ((FD_ISSET(sock, &wfds)) && (FD_ISSET(sock, &_writefds))) {
  917. try {
  918. _handler->phyOnTcpWritable((PhySocket*)&(*s), &(s->uptr));
  919. }
  920. catch (...) {
  921. }
  922. }
  923. } break;
  924. case ZT_PHY_SOCKET_TCP_LISTEN:
  925. if (FD_ISSET(s->sock, &rfds)) {
  926. memset(&ss, 0, sizeof(ss));
  927. socklen_t slen = sizeof(ss);
  928. ZT_PHY_SOCKFD_TYPE newSock = ::accept(s->sock, (struct sockaddr*)&ss, &slen);
  929. if (ZT_PHY_SOCKFD_VALID(newSock)) {
  930. if (_socks.size() >= ZT_PHY_MAX_SOCKETS) {
  931. ZT_PHY_CLOSE_SOCKET(newSock);
  932. }
  933. else {
  934. #if defined(_WIN32) || defined(_WIN64)
  935. {
  936. BOOL f = (_noDelay ? TRUE : FALSE);
  937. setsockopt(newSock, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f));
  938. }
  939. {
  940. u_long iMode = 1;
  941. ioctlsocket(newSock, FIONBIO, &iMode);
  942. }
  943. #else
  944. {
  945. int f = (_noDelay ? 1 : 0);
  946. setsockopt(newSock, IPPROTO_TCP, TCP_NODELAY, (char*)&f, sizeof(f));
  947. }
  948. fcntl(newSock, F_SETFL, O_NONBLOCK);
  949. #endif
  950. _socks.push_back(PhySocketImpl());
  951. PhySocketImpl& sws = _socks.back();
  952. FD_SET(newSock, &_readfds);
  953. if ((long)newSock > _nfds)
  954. _nfds = (long)newSock;
  955. sws.type = ZT_PHY_SOCKET_TCP_IN;
  956. sws.sock = newSock;
  957. sws.uptr = (void*)0;
  958. memcpy(&(sws.saddr), &ss, sizeof(struct sockaddr_storage));
  959. try {
  960. _handler->phyOnTcpAccept((PhySocket*)&(*s), (PhySocket*)&(_socks.back()), &(s->uptr), &(sws.uptr), (const struct sockaddr*)&(sws.saddr));
  961. }
  962. catch (...) {
  963. }
  964. }
  965. }
  966. }
  967. break;
  968. case ZT_PHY_SOCKET_UDP:
  969. if (FD_ISSET(s->sock, &rfds)) {
  970. #if (defined(__linux__) || defined(linux) || defined(__linux)) && defined(MSG_WAITFORONE)
  971. #define RECVMMSG_WINDOW_SIZE 128
  972. #define RECVMMSG_BUF_SIZE 1500
  973. iovec iovs[RECVMMSG_WINDOW_SIZE];
  974. uint8_t bufs[RECVMMSG_WINDOW_SIZE][RECVMMSG_BUF_SIZE];
  975. sockaddr_storage addrs[RECVMMSG_WINDOW_SIZE];
  976. memset(addrs, 0, sizeof(addrs));
  977. mmsghdr mm[RECVMMSG_WINDOW_SIZE];
  978. memset(mm, 0, sizeof(mm));
  979. for (int i = 0; i < RECVMMSG_WINDOW_SIZE; ++i) {
  980. iovs[i].iov_base = (void*)bufs[i];
  981. iovs[i].iov_len = RECVMMSG_BUF_SIZE;
  982. mm[i].msg_hdr.msg_name = (void*)&(addrs[i]);
  983. mm[i].msg_hdr.msg_iov = &(iovs[i]);
  984. mm[i].msg_hdr.msg_iovlen = 1;
  985. }
  986. for (int k = 0; k < 1024; ++k) {
  987. for (int i = 0; i < RECVMMSG_WINDOW_SIZE; ++i) {
  988. mm[i].msg_hdr.msg_namelen = sizeof(sockaddr_storage);
  989. mm[i].msg_len = 0;
  990. }
  991. int received_count = recvmmsg(s->sock, mm, RECVMMSG_WINDOW_SIZE, MSG_WAITFORONE, nullptr);
  992. if (received_count > 0) {
  993. for (int i = 0; i < received_count; ++i) {
  994. long n = (long)mm[i].msg_len;
  995. if (n > 0) {
  996. try {
  997. _handler->phyOnDatagram((PhySocket*)&(*s), &(s->uptr), (const struct sockaddr*)&(s->saddr), (const struct sockaddr*)&(addrs[i]), bufs[i], (unsigned long)n);
  998. }
  999. catch (...) {
  1000. }
  1001. }
  1002. }
  1003. }
  1004. else {
  1005. break;
  1006. }
  1007. }
  1008. #else
  1009. for (int k = 0; k < 1024; ++k) {
  1010. memset(&ss, 0, sizeof(ss));
  1011. socklen_t slen = sizeof(ss);
  1012. long n = (long)::recvfrom(s->sock, buf, sizeof(buf), 0, (struct sockaddr*)&ss, &slen);
  1013. if (n > 0) {
  1014. try {
  1015. _handler->phyOnDatagram((PhySocket*)&(*s), &(s->uptr), (const struct sockaddr*)&(s->saddr), (const struct sockaddr*)&ss, (void*)buf, (unsigned long)n);
  1016. }
  1017. catch (...) {
  1018. }
  1019. }
  1020. else if (n < 0)
  1021. break;
  1022. }
  1023. #endif
  1024. }
  1025. break;
  1026. case ZT_PHY_SOCKET_UNIX_IN: {
  1027. #ifdef __UNIX_LIKE__
  1028. ZT_PHY_SOCKFD_TYPE sock = s->sock; // if closed, s->sock becomes invalid as s is no longer dereferencable
  1029. if ((FD_ISSET(sock, &wfds)) && (FD_ISSET(sock, &_writefds))) {
  1030. try {
  1031. _handler->phyOnUnixWritable((PhySocket*)&(*s), &(s->uptr));
  1032. }
  1033. catch (...) {
  1034. }
  1035. }
  1036. if (FD_ISSET(sock, &rfds)) {
  1037. long n = (long)::read(sock, buf, sizeof(buf));
  1038. if (n <= 0) {
  1039. this->close((PhySocket*)&(*s), true);
  1040. }
  1041. else {
  1042. try {
  1043. _handler->phyOnUnixData((PhySocket*)&(*s), &(s->uptr), (void*)buf, (unsigned long)n);
  1044. }
  1045. catch (...) {
  1046. }
  1047. }
  1048. }
  1049. #endif // __UNIX_LIKE__
  1050. } break;
  1051. case ZT_PHY_SOCKET_UNIX_LISTEN:
  1052. #ifdef __UNIX_LIKE__
  1053. if (FD_ISSET(s->sock, &rfds)) {
  1054. memset(&ss, 0, sizeof(ss));
  1055. socklen_t slen = sizeof(ss);
  1056. ZT_PHY_SOCKFD_TYPE newSock = ::accept(s->sock, (struct sockaddr*)&ss, &slen);
  1057. if (ZT_PHY_SOCKFD_VALID(newSock)) {
  1058. if (_socks.size() >= ZT_PHY_MAX_SOCKETS) {
  1059. ZT_PHY_CLOSE_SOCKET(newSock);
  1060. }
  1061. else {
  1062. fcntl(newSock, F_SETFL, O_NONBLOCK);
  1063. _socks.push_back(PhySocketImpl());
  1064. PhySocketImpl& sws = _socks.back();
  1065. FD_SET(newSock, &_readfds);
  1066. if ((long)newSock > _nfds)
  1067. _nfds = (long)newSock;
  1068. sws.type = ZT_PHY_SOCKET_UNIX_IN;
  1069. sws.sock = newSock;
  1070. sws.uptr = (void*)0;
  1071. memcpy(&(sws.saddr), &ss, sizeof(struct sockaddr_storage));
  1072. try {
  1073. //_handler->phyOnUnixAccept((PhySocket *)&(*s),(PhySocket *)&(_socks.back()),&(s->uptr),&(sws.uptr));
  1074. }
  1075. catch (...) {
  1076. }
  1077. }
  1078. }
  1079. }
  1080. #endif // __UNIX_LIKE__
  1081. break;
  1082. case ZT_PHY_SOCKET_FD: {
  1083. ZT_PHY_SOCKFD_TYPE sock = s->sock;
  1084. const bool readable = ((FD_ISSET(sock, &rfds)) && (FD_ISSET(sock, &_readfds)));
  1085. const bool writable = ((FD_ISSET(sock, &wfds)) && (FD_ISSET(sock, &_writefds)));
  1086. if ((readable) || (writable)) {
  1087. try {
  1088. //_handler->phyOnFileDescriptorActivity((PhySocket *)&(*s),&(s->uptr),readable,writable);
  1089. }
  1090. catch (...) {
  1091. }
  1092. }
  1093. } break;
  1094. default:
  1095. break;
  1096. }
  1097. if (s->type == ZT_PHY_SOCKET_CLOSED)
  1098. _socks.erase(s++);
  1099. else
  1100. ++s;
  1101. }
  1102. }
  1103. /**
  1104. * @param sock Socket to close
  1105. * @param callHandlers If true, call handlers for TCP connect (success: false) or close (default: true)
  1106. */
  1107. inline void close(PhySocket* sock, bool callHandlers = true)
  1108. {
  1109. if (! sock)
  1110. return;
  1111. PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(sock));
  1112. if (sws.type == ZT_PHY_SOCKET_CLOSED)
  1113. return;
  1114. FD_CLR(sws.sock, &_readfds);
  1115. FD_CLR(sws.sock, &_writefds);
  1116. #if defined(_WIN32) || defined(_WIN64)
  1117. FD_CLR(sws.sock, &_exceptfds);
  1118. #endif
  1119. if (sws.type != ZT_PHY_SOCKET_FD)
  1120. ZT_PHY_CLOSE_SOCKET(sws.sock);
  1121. #ifdef __UNIX_LIKE__
  1122. if (sws.type == ZT_PHY_SOCKET_UNIX_LISTEN)
  1123. ::unlink(((struct sockaddr_un*)(&(sws.saddr)))->sun_path);
  1124. #endif // __UNIX_LIKE__
  1125. if (callHandlers) {
  1126. switch (sws.type) {
  1127. case ZT_PHY_SOCKET_TCP_OUT_PENDING:
  1128. try {
  1129. _handler->phyOnTcpConnect(sock, &(sws.uptr), false);
  1130. }
  1131. catch (...) {
  1132. }
  1133. break;
  1134. case ZT_PHY_SOCKET_TCP_OUT_CONNECTED:
  1135. case ZT_PHY_SOCKET_TCP_IN:
  1136. try {
  1137. _handler->phyOnTcpClose(sock, &(sws.uptr));
  1138. }
  1139. catch (...) {
  1140. }
  1141. break;
  1142. case ZT_PHY_SOCKET_UNIX_IN:
  1143. #ifdef __UNIX_LIKE__
  1144. try {
  1145. _handler->phyOnUnixClose(sock, &(sws.uptr));
  1146. }
  1147. catch (...) {
  1148. }
  1149. #endif // __UNIX_LIKE__
  1150. break;
  1151. default:
  1152. break;
  1153. }
  1154. }
  1155. // Causes entry to be deleted from list in poll(), ignored elsewhere
  1156. sws.type = ZT_PHY_SOCKET_CLOSED;
  1157. if ((long)sws.sock >= (long)_nfds) {
  1158. long nfds = (long)_whackSendSocket;
  1159. if ((long)_whackReceiveSocket > nfds)
  1160. nfds = (long)_whackReceiveSocket;
  1161. for (typename std::list<PhySocketImpl>::iterator s(_socks.begin()); s != _socks.end(); ++s) {
  1162. if ((s->type != ZT_PHY_SOCKET_CLOSED) && ((long)s->sock > nfds))
  1163. nfds = (long)s->sock;
  1164. }
  1165. _nfds = nfds;
  1166. }
  1167. }
  1168. };
  1169. } // namespace ZeroTier
  1170. #endif