NetconEthernetTap.cpp 52 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559
  1. /*
  2. * ZeroTier One - Network Virtualization Everywhere
  3. * Copyright (C) 2011-2015 ZeroTier, Inc.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. * --
  19. *
  20. * ZeroTier may be used and distributed under the terms of the GPLv3, which
  21. * are available at: http://www.gnu.org/licenses/gpl-3.0.html
  22. *
  23. * If you would like to embed ZeroTier into a commercial application or
  24. * redistribute it in a modified binary form, please contact ZeroTier Networks
  25. * LLC. Start here: http://www.zerotier.com/
  26. */
  27. #include <algorithm>
  28. #include <utility>
  29. #include <dlfcn.h>
  30. #include <sys/poll.h>
  31. #include "NetconEthernetTap.hpp"
  32. #include "../node/Utils.hpp"
  33. #include "../osdep/OSUtils.hpp"
  34. #include "../osdep/Phy.hpp"
  35. #include "Intercept.h"
  36. #include "LWIPStack.hpp"
  37. #include "lwip/tcp_impl.h"
  38. #include "netif/etharp.h"
  39. #include "lwip/api.h"
  40. #include "lwip/ip.h"
  41. #include "lwip/ip_addr.h"
  42. #include "lwip/ip_frag.h"
  43. #include "lwip/tcp.h"
  44. #include "common.inc.c"
  45. #include "rpc.h"
  46. #define APPLICATION_POLL_FREQ 20
  47. #define ZT_LWIP_TCP_TIMER_INTERVAL 5
  48. #define STATUS_TMR_INTERVAL 3000 // How often we check connection statuses
  49. #define DEFAULT_READ_BUFFER_SIZE 1024 * 63
  50. namespace ZeroTier {
  51. // ---------------------------------------------------------------------------
  52. // Gets the process/path name associated with a pid
  53. static void get_path_from_pid(char* dest, int pid)
  54. {
  55. char ppath[80];
  56. sprintf(ppath, "/proc/%d/exe", pid);
  57. if (readlink (ppath, dest, 80) != -1){
  58. }
  59. }
  60. static err_t tapif_init(struct netif *netif)
  61. {
  62. // Actual init functionality is in addIp() of tap
  63. return ERR_OK;
  64. }
  65. static err_t low_level_output(struct netif *netif, struct pbuf *p)
  66. {
  67. struct pbuf *q;
  68. char buf[ZT_MAX_MTU+32];
  69. char *bufptr;
  70. int tot_len = 0;
  71. ZeroTier::NetconEthernetTap *tap = (ZeroTier::NetconEthernetTap*)netif->state;
  72. /* initiate transfer(); */
  73. bufptr = buf;
  74. for(q = p; q != NULL; q = q->next) {
  75. /* Send the data from the pbuf to the interface, one pbuf at a
  76. time. The size of the data in each pbuf is kept in the ->len
  77. variable. */
  78. /* send data from(q->payload, q->len); */
  79. memcpy(bufptr, q->payload, q->len);
  80. bufptr += q->len;
  81. tot_len += q->len;
  82. }
  83. // [Send packet to network]
  84. // Split ethernet header and feed into handler
  85. struct eth_hdr *ethhdr;
  86. ethhdr = (struct eth_hdr *)buf;
  87. ZeroTier::MAC src_mac;
  88. ZeroTier::MAC dest_mac;
  89. src_mac.setTo(ethhdr->src.addr, 6);
  90. dest_mac.setTo(ethhdr->dest.addr, 6);
  91. tap->_handler(tap->_arg,tap->_nwid,src_mac,dest_mac,
  92. Utils::ntoh((uint16_t)ethhdr->type),0,buf + sizeof(struct eth_hdr),tot_len - sizeof(struct eth_hdr));
  93. return ERR_OK;
  94. }
  95. /*
  96. * TCP connection administered by service
  97. */
  98. class TcpConnection
  99. {
  100. public:
  101. int perceived_fd;
  102. int their_fd;
  103. bool pending;
  104. bool listening;
  105. int pid;
  106. unsigned long written;
  107. unsigned long acked;
  108. PhySocket *rpcSock;
  109. PhySocket *dataSock;
  110. struct tcp_pcb *pcb;
  111. struct sockaddr_storage *addr;
  112. unsigned char buf[DEFAULT_READ_BUFFER_SIZE];
  113. int idx;
  114. };
  115. /*
  116. * A helper class for passing a reference to _phy to LWIP callbacks as a "state"
  117. */
  118. class Larg
  119. {
  120. public:
  121. NetconEthernetTap *tap;
  122. TcpConnection *conn;
  123. Larg(NetconEthernetTap *_tap, TcpConnection *conn) : tap(_tap), conn(conn) {}
  124. };
  125. // ---------------------------------------------------------------------------
  126. NetconEthernetTap::NetconEthernetTap(
  127. const char *homePath,
  128. const MAC &mac,
  129. unsigned int mtu,
  130. unsigned int metric,
  131. uint64_t nwid,
  132. const char *friendlyName,
  133. void (*handler)(void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int),
  134. void *arg) :
  135. _nwid(nwid),
  136. _handler(handler),
  137. _arg(arg),
  138. _phy(this,false,true),
  139. _unixListenSocket((PhySocket *)0),
  140. _mac(mac),
  141. _homePath(homePath),
  142. _mtu(mtu),
  143. _enabled(true),
  144. _run(true)
  145. {
  146. char sockPath[4096],lwipPath[4096];
  147. rpc_counter = -1;
  148. Utils::snprintf(sockPath,sizeof(sockPath),"%s%snc_%.16llx",homePath,ZT_PATH_SEPARATOR_S,_nwid,ZT_PATH_SEPARATOR_S,(unsigned long long)nwid);
  149. _dev = sockPath; // in netcon mode, set device to be just the network ID
  150. Utils::snprintf(lwipPath,sizeof(lwipPath),"%s%sliblwip.so",homePath,ZT_PATH_SEPARATOR_S);
  151. lwipstack = new LWIPStack(lwipPath);
  152. if(!lwipstack)
  153. throw std::runtime_error("unable to dynamically load a new instance of liblwip.so (searched ZeroTier home path)");
  154. lwipstack->lwip_init();
  155. _unixListenSocket = _phy.unixListen(sockPath,(void *)this);
  156. dwr(MSG_INFO, " NetconEthernetTap initialized!\n", _phy.getDescriptor(_unixListenSocket));
  157. if (!_unixListenSocket)
  158. throw std::runtime_error(std::string("unable to bind to ")+sockPath);
  159. _thread = Thread::start(this);
  160. }
  161. NetconEthernetTap::~NetconEthernetTap()
  162. {
  163. _run = false;
  164. _phy.whack();
  165. _phy.whack();
  166. Thread::join(_thread);
  167. _phy.close(_unixListenSocket,false);
  168. delete lwipstack;
  169. }
  170. void NetconEthernetTap::setEnabled(bool en)
  171. {
  172. _enabled = en;
  173. }
  174. bool NetconEthernetTap::enabled() const
  175. {
  176. return _enabled;
  177. }
  178. bool NetconEthernetTap::addIp(const InetAddress &ip)
  179. {
  180. Mutex::Lock _l(_ips_m);
  181. if (std::find(_ips.begin(),_ips.end(),ip) == _ips.end()) {
  182. _ips.push_back(ip);
  183. std::sort(_ips.begin(),_ips.end());
  184. if (ip.isV4()) {
  185. // Set IP
  186. static ip_addr_t ipaddr, netmask, gw;
  187. IP4_ADDR(&gw,192,168,0,1);
  188. ipaddr.addr = *((u32_t *)ip.rawIpData());
  189. netmask.addr = *((u32_t *)ip.netmask().rawIpData());
  190. // Set up the lwip-netif for LWIP's sake
  191. lwipstack->netif_add(&interface,&ipaddr, &netmask, &gw, NULL, tapif_init, lwipstack->_ethernet_input);
  192. interface.state = this;
  193. interface.output = lwipstack->_etharp_output;
  194. _mac.copyTo(interface.hwaddr, 6);
  195. interface.mtu = _mtu;
  196. interface.name[0] = 't';
  197. interface.name[1] = 'p';
  198. interface.linkoutput = low_level_output;
  199. interface.hwaddr_len = 6;
  200. interface.flags = NETIF_FLAG_BROADCAST | NETIF_FLAG_ETHARP | NETIF_FLAG_IGMP;
  201. lwipstack->netif_set_default(&interface);
  202. lwipstack->netif_set_up(&interface);
  203. }
  204. }
  205. return true;
  206. }
  207. bool NetconEthernetTap::removeIp(const InetAddress &ip)
  208. {
  209. Mutex::Lock _l(_ips_m);
  210. std::vector<InetAddress>::iterator i(std::find(_ips.begin(),_ips.end(),ip));
  211. if (i == _ips.end())
  212. return false;
  213. _ips.erase(i);
  214. if (ip.isV4()) {
  215. // TODO: dealloc from LWIP
  216. }
  217. return true;
  218. }
  219. std::vector<InetAddress> NetconEthernetTap::ips() const
  220. {
  221. Mutex::Lock _l(_ips_m);
  222. return _ips;
  223. }
  224. void NetconEthernetTap::put(const MAC &from,const MAC &to,unsigned int etherType,const void *data,unsigned int len)
  225. {
  226. struct pbuf *p,*q;
  227. if (!_enabled)
  228. return;
  229. struct eth_hdr ethhdr;
  230. from.copyTo(ethhdr.src.addr, 6);
  231. to.copyTo(ethhdr.dest.addr, 6);
  232. ethhdr.type = Utils::hton((uint16_t)etherType);
  233. // We allocate a pbuf chain of pbufs from the pool.
  234. p = lwipstack->pbuf_alloc(PBUF_RAW, len+sizeof(struct eth_hdr), PBUF_POOL);
  235. if (p != NULL) {
  236. const char *dataptr = reinterpret_cast<const char *>(data);
  237. // First pbuf gets ethernet header at start
  238. q = p;
  239. if (q->len < sizeof(ethhdr)) {
  240. dwr(MSG_ERROR, "_put(): Dropped packet: first pbuf smaller than ethernet header\n");
  241. return;
  242. }
  243. memcpy(q->payload,&ethhdr,sizeof(ethhdr));
  244. memcpy((char*)q->payload + sizeof(ethhdr),dataptr,q->len - sizeof(ethhdr));
  245. dataptr += q->len - sizeof(ethhdr);
  246. // Remaining pbufs (if any) get rest of data
  247. while ((q = q->next)) {
  248. memcpy(q->payload,dataptr,q->len);
  249. dataptr += q->len;
  250. }
  251. } else {
  252. dwr(MSG_ERROR, "put(): Dropped packet: no pbufs available\n");
  253. return;
  254. }
  255. {
  256. Mutex::Lock _l2(lwipstack->_lock);
  257. if(interface.input(p, &interface) != ERR_OK) {
  258. dwr(MSG_ERROR, "put(): Error while RXing packet (netif->input)\n");
  259. }
  260. }
  261. }
  262. std::string NetconEthernetTap::deviceName() const
  263. {
  264. return _dev;
  265. }
  266. void NetconEthernetTap::setFriendlyName(const char *friendlyName) {
  267. }
  268. void NetconEthernetTap::scanMulticastGroups(std::vector<MulticastGroup> &added,std::vector<MulticastGroup> &removed)
  269. {
  270. std::vector<MulticastGroup> newGroups;
  271. Mutex::Lock _l(_multicastGroups_m);
  272. // TODO: get multicast subscriptions from LWIP
  273. std::vector<InetAddress> allIps(ips());
  274. for(std::vector<InetAddress>::iterator ip(allIps.begin());ip!=allIps.end();++ip)
  275. newGroups.push_back(MulticastGroup::deriveMulticastGroupForAddressResolution(*ip));
  276. std::sort(newGroups.begin(),newGroups.end());
  277. std::unique(newGroups.begin(),newGroups.end());
  278. for(std::vector<MulticastGroup>::iterator m(newGroups.begin());m!=newGroups.end();++m) {
  279. if (!std::binary_search(_multicastGroups.begin(),_multicastGroups.end(),*m))
  280. added.push_back(*m);
  281. }
  282. for(std::vector<MulticastGroup>::iterator m(_multicastGroups.begin());m!=_multicastGroups.end();++m) {
  283. if (!std::binary_search(newGroups.begin(),newGroups.end(),*m))
  284. removed.push_back(*m);
  285. }
  286. _multicastGroups.swap(newGroups);
  287. }
  288. TcpConnection *NetconEthernetTap::getConnectionByTheirFD(PhySocket *sock, int fd)
  289. {
  290. for(size_t i=0; i<tcp_connections.size(); i++) {
  291. if(tcp_connections[i]->perceived_fd == fd && tcp_connections[i]->rpcSock == sock)
  292. return tcp_connections[i];
  293. }
  294. return NULL;
  295. }
  296. /*
  297. * Dumps service state in 80x25 when debug mode is off
  298. */
  299. void NetconEthernetTap::compact_dump()
  300. {
  301. /*
  302. clearscreen();
  303. gotoxy(0,0);
  304. fprintf(stderr, "ZeroTier - Network Containers Service [State Dump]\n\r");
  305. fprintf(stderr, " RPC Sockets = %d\n\r", rpc_sockets.size());
  306. fprintf(stderr, " TCP Connections = %d\n\r", tcp_connections.size());
  307. for(size_t i=0; i<rpc_sockets.size(); i++) {
  308. int rpc_fd = _phy.getDescriptor(rpc_sockets[i]);
  309. char buf[80];
  310. int pid = pidmap[rpc_sockets[i]];
  311. memset(&buf, '\0', 80);
  312. get_path_from_pid(buf, pid);
  313. fprintf(stderr, "\n Client(addr=0x%x, rpc=%d, pid=%d) %s\n", rpc_sockets[i], rpc_fd, pid, buf);
  314. for(size_t j=0; j<tcp_connections.size(); j++) {
  315. memset(&buf, '\0', 80);
  316. get_path_from_pid(buf, tcp_connections[j]->pid);
  317. if(tcp_connections[j]->rpcSock==rpc_sockets[i]) {
  318. fprintf(stderr, "\t\tpath\t\t= %s\n", buf);
  319. }
  320. }
  321. }
  322. */
  323. for(size_t i=0; i<rpc_sockets.size(); i++) {
  324. fprintf(stderr, "\n\n\nrpc(%d)\n", _phy.getDescriptor(rpc_sockets[i]));
  325. for(size_t j=0; j<tcp_connections.size(); j++) {
  326. if(_phy.getDescriptor(tcp_connections[j]->rpcSock) == _phy.getDescriptor(rpc_sockets[i]))
  327. fprintf(stderr, "\t(%d) ----> (%d)\n\n", _phy.getDescriptor(tcp_connections[j]->dataSock), tcp_connections[j]->perceived_fd);
  328. }
  329. }
  330. }
  331. /*
  332. * Dumps service state
  333. */
  334. void NetconEthernetTap::dump()
  335. {
  336. fprintf(stderr, "\n\n---\n\ndie(): BEGIN SERVICE STATE DUMP\n");
  337. fprintf(stderr, "*** IF YOU SEE THIS, EMAIL THE DUMP TEXT TO [email protected] ***\n");
  338. fprintf(stderr, " tcp_conns = %lu, rpc_socks = %lu\n", tcp_connections.size(), rpc_sockets.size());
  339. // TODO: Add logic to detect bad mapping conditions
  340. for(size_t i=0; i<rpc_sockets.size(); i++) {
  341. for(size_t j=0; j<rpc_sockets.size(); j++) {
  342. if(j != i && rpc_sockets[i] == rpc_sockets[j]) {
  343. fprintf(stderr, "Duplicate PhySockets found! (0x%p)\n", rpc_sockets[i]);
  344. }
  345. }
  346. }
  347. // Dump the state of the service mapping
  348. for(size_t i=0; i<rpc_sockets.size(); i++) {
  349. int rpc_fd = _phy.getDescriptor(rpc_sockets[i]);
  350. char buf[80];
  351. int pid = pidmap[rpc_sockets[i]];
  352. get_path_from_pid(buf, pid);
  353. fprintf(stderr, "\nClient(addr=0x%p, rpc=%d, pid=%d) %s\n", rpc_sockets[i], rpc_fd, pid, buf);
  354. for(size_t j=0; j<tcp_connections.size(); j++) {
  355. get_path_from_pid(buf, tcp_connections[j]->pid);
  356. if(tcp_connections[j]->rpcSock==rpc_sockets[i]){
  357. fprintf(stderr, " |\n");
  358. fprintf(stderr, " |-Connection(0x%p):\n", tcp_connections[j]);
  359. fprintf(stderr, " | path\t\t\t= %s\n", buf);
  360. fprintf(stderr, " | perceived_fd\t\t= %d\t(fd)\n", tcp_connections[j]->perceived_fd);
  361. fprintf(stderr, " | their_fd\t\t= %d\t(fd)\n", tcp_connections[j]->their_fd);
  362. fprintf(stderr, " | dataSock(0x%p)\t= %d\t(fd)\n", tcp_connections[j]->dataSock, _phy.getDescriptor(tcp_connections[j]->dataSock));
  363. fprintf(stderr, " | rpcSock(0x%p)\t= %d\t(fd)\n", tcp_connections[j]->rpcSock, _phy.getDescriptor(tcp_connections[j]->rpcSock));
  364. fprintf(stderr, " | pending\t\t= %d\n", tcp_connections[j]->pending);
  365. fprintf(stderr, " | listening\t\t= %d\n", tcp_connections[j]->listening);
  366. fprintf(stderr, " \\------pcb(0x%p)->state\t= %d\n", tcp_connections[j]->pcb, tcp_connections[j]->pcb->state);
  367. }
  368. }
  369. }
  370. fprintf(stderr, "\n\ndie(): END SERVICE STATE DUMP\n\n---\n\n");
  371. }
  372. /*
  373. * Dumps service state and then exits
  374. */
  375. void NetconEthernetTap::die(int exret) {
  376. dump();
  377. exit(exret);
  378. }
  379. /*
  380. * Closes a TcpConnection and associated LWIP PCB strcuture.
  381. */
  382. void NetconEthernetTap::closeConnection(TcpConnection *conn)
  383. {
  384. //return;
  385. //dwr(MSG_DEBUG, "closeConnection(): conn = 0x%x\n", conn);
  386. if(!conn)
  387. return;
  388. dwr(MSG_DEBUG, " closeConnection(%x, %d)\n", conn->pcb, _phy.getDescriptor(conn->dataSock));
  389. //lwipstack->_tcp_sent(conn->pcb, NULL);
  390. //lwipstack->_tcp_recv(conn->pcb, NULL);
  391. //lwipstack->_tcp_err(conn->pcb, NULL);
  392. //lwipstack->_tcp_poll(conn->pcb, NULL, 0);
  393. //lwipstack->_tcp_arg(conn->pcb, NULL);
  394. if(lwipstack->_tcp_close(conn->pcb) != ERR_OK) {
  395. dwr(MSG_ERROR, " closeConnection(): Error while calling tcp_close()\n");
  396. exit(0);
  397. }
  398. else {
  399. if(conn->dataSock) {
  400. close(_phy.getDescriptor(conn->dataSock));
  401. _phy.close(conn->dataSock,false);
  402. }
  403. /* Eventually we might want to use a map here instead */
  404. for(int i=0; i<tcp_connections.size(); i++) {
  405. if(tcp_connections[i] == conn) {
  406. tcp_connections.erase(tcp_connections.begin() + i);
  407. delete conn;
  408. break;
  409. }
  410. }
  411. }
  412. }
  413. /*
  414. * Close a single RPC connection and associated PhySocket
  415. */
  416. void NetconEthernetTap::closeClient(PhySocket *sock)
  417. {
  418. for(size_t i=0; i<rpc_sockets.size(); i++) {
  419. if(rpc_sockets[i] == sock){
  420. rpc_sockets.erase(rpc_sockets.begin() + i);
  421. break;
  422. }
  423. }
  424. close(_phy.getDescriptor(sock));
  425. _phy.close(sock);
  426. }
  427. /*
  428. * Close all RPC and TCP connections
  429. */
  430. void NetconEthernetTap::closeAll()
  431. {
  432. while(rpc_sockets.size())
  433. closeClient(rpc_sockets.front());
  434. while(tcp_connections.size())
  435. closeConnection(tcp_connections.front());
  436. }
  437. #include <sys/resource.h>
  438. void NetconEthernetTap::threadMain()
  439. throw()
  440. {
  441. uint64_t prev_tcp_time = 0;
  442. uint64_t prev_status_time = 0;
  443. uint64_t prev_etharp_time = 0;
  444. /*
  445. fprintf(stderr, "- MEM_SIZE = %dM\n", MEM_SIZE / (1024*1024));
  446. fprintf(stderr, "- PBUF_POOL_SIZE = %d\n", PBUF_POOL_SIZE);
  447. fprintf(stderr, "- PBUF_POOL_BUFSIZE = %d\n", PBUF_POOL_BUFSIZE);
  448. fprintf(stderr, "- MEMP_NUM_PBUF = %d\n", MEMP_NUM_PBUF);
  449. fprintf(stderr, "- MEMP_NUM_TCP_PCB = %d\n", MEMP_NUM_TCP_PCB);
  450. fprintf(stderr, "- MEMP_NUM_TCP_PCB_LISTEN = %d\n", MEMP_NUM_TCP_PCB_LISTEN);
  451. fprintf(stderr, "- MEMP_NUM_TCP_SEG = %d\n\n", MEMP_NUM_TCP_SEG);
  452. fprintf(stderr, "- TCP_SND_BUF = %dK\n", TCP_SND_BUF / 1024);
  453. fprintf(stderr, "- TCP_SND_QUEUELEN = %d\n\n", TCP_SND_QUEUELEN);
  454. fprintf(stderr, "- TCP_WND = %d\n", TCP_WND);
  455. fprintf(stderr, "- TCP_MSS = %d\n", TCP_MSS);
  456. fprintf(stderr, "- TCP_MAXRTX = %d\n", TCP_MAXRTX);
  457. fprintf(stderr, "- IP_REASSEMBLY = %d\n\n", IP_REASSEMBLY);
  458. fprintf(stderr, "- ARP_TMR_INTERVAL = %d\n", ARP_TMR_INTERVAL);
  459. fprintf(stderr, "- TCP_TMR_INTERVAL = %d\n", TCP_TMR_INTERVAL);
  460. fprintf(stderr, "- IP_TMR_INTERVAL = %d\n", IP_TMR_INTERVAL);
  461. */
  462. // Main timer loop
  463. while (_run) {
  464. uint64_t now = OSUtils::now();
  465. uint64_t since_tcp = now - prev_tcp_time;
  466. uint64_t since_etharp = now - prev_etharp_time;
  467. uint64_t since_status = now - prev_status_time;
  468. uint64_t tcp_remaining = ZT_LWIP_TCP_TIMER_INTERVAL;
  469. uint64_t etharp_remaining = ARP_TMR_INTERVAL;
  470. uint64_t status_remaining = STATUS_TMR_INTERVAL;
  471. // Connection prunning
  472. if (since_status >= STATUS_TMR_INTERVAL) {
  473. //compact_dump();
  474. prev_status_time = now;
  475. status_remaining = STATUS_TMR_INTERVAL - since_status;
  476. if(rpc_sockets.size() || tcp_connections.size()) {
  477. // dump();
  478. // Here we will periodically check the list of rpc_sockets for those that
  479. // do not currently have any data connection associated with them. If they are
  480. // unused, then we will try to read from them, if they fail, we can safely assume
  481. // that the client has closed their end and we can close ours
  482. for(size_t i = 0; i<tcp_connections.size(); i++) {
  483. if(tcp_connections[i]->listening) {
  484. char c;
  485. if (read(_phy.getDescriptor(tcp_connections[i]->dataSock), &c, 1) < 0) {
  486. // Still in listening state
  487. }
  488. else {
  489. // Here we should handle the case there there is incoming data (?)
  490. dwr(MSG_DEBUG, " tap_thread(): Listening socketpair closed. Removing RPC connection (%d)\n",
  491. _phy.getDescriptor(tcp_connections[i]->dataSock));
  492. closeConnection(tcp_connections[i]);
  493. }
  494. }
  495. }
  496. }
  497. for(size_t i=0, associated = 0; i<rpc_sockets.size(); i++, associated = 0) {
  498. for(size_t j=0; j<tcp_connections.size(); j++) {
  499. if (tcp_connections[j]->rpcSock == rpc_sockets[i])
  500. associated++;
  501. }
  502. if(!associated){
  503. // No TCP connections are associated, this is a candidate for removal
  504. int fd = _phy.getDescriptor(rpc_sockets[i]);
  505. fcntl(fd, F_SETFL, O_NONBLOCK);
  506. unsigned char tmpbuf[BUF_SZ];
  507. int n;
  508. if((n = read(fd,&tmpbuf,BUF_SZ)) < 0) {
  509. dwr(MSG_DEBUG, " tap_thread(): closing RPC (%d)\n", _phy.getDescriptor(rpc_sockets[i]));
  510. closeClient(rpc_sockets[i]);
  511. }
  512. // < 0 is failure
  513. // 0 nothing to read, RPC still active
  514. // > 0 RPC data read, handle it
  515. else if (n > 0) {
  516. // Handle RPC call, this is rare
  517. dwr(MSG_DEBUG, " tap_thread(): RPC read during connection check (%d bytes)\n", n);
  518. phyOnUnixData(rpc_sockets[i],_phy.getuptr(rpc_sockets[i]),&tmpbuf,BUF_SZ);
  519. }
  520. }
  521. }
  522. }
  523. // Main TCP/ETHARP timer section
  524. if (since_tcp >= ZT_LWIP_TCP_TIMER_INTERVAL) {
  525. prev_tcp_time = now;
  526. lwipstack->tcp_tmr();
  527. } else {
  528. tcp_remaining = ZT_LWIP_TCP_TIMER_INTERVAL - since_tcp;
  529. }
  530. if (since_etharp >= ARP_TMR_INTERVAL) {
  531. prev_etharp_time = now;
  532. lwipstack->etharp_tmr();
  533. } else {
  534. etharp_remaining = ARP_TMR_INTERVAL - since_etharp;
  535. }
  536. _phy.poll((unsigned long)std::min(tcp_remaining,etharp_remaining));
  537. }
  538. closeAll();
  539. dlclose(lwipstack->_libref);
  540. }
  541. // Unused -- no UDP or TCP from this thread/Phy<>
  542. void NetconEthernetTap::phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) {}
  543. void NetconEthernetTap::phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) {}
  544. void NetconEthernetTap::phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from) {}
  545. void NetconEthernetTap::phyOnTcpClose(PhySocket *sock,void **uptr) {}
  546. void NetconEthernetTap::phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len) {}
  547. void NetconEthernetTap::phyOnTcpWritable(PhySocket *sock,void **uptr) {}
  548. void NetconEthernetTap::phyOnUnixClose(PhySocket *sock,void **uptr) {
  549. dwr(MSG_DEBUG, " phyOnUnixClose(sock=0x%x, uptr=0x%x): fd = %d\n", sock, uptr, _phy.getDescriptor(sock));
  550. TcpConnection *conn = (TcpConnection*)*uptr;
  551. closeConnection(conn);
  552. }
  553. /*
  554. * Handles data on a client's data buffer. Data is sent to LWIP to be enqueued.
  555. */
  556. void NetconEthernetTap::phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable)
  557. {
  558. if(readable) {
  559. TcpConnection *conn = (TcpConnection*)*uptr;
  560. if(conn->dataSock) { // Sometimes a connection may be closed via nc_recved, check first
  561. lwipstack->_lock.lock();
  562. handle_write(conn);
  563. lwipstack->_lock.unlock();
  564. }
  565. }
  566. else {
  567. dwr(MSG_ERROR, "phyOnFileDescriptorActivity(): PhySocket not readable\n");
  568. }
  569. }
  570. /*
  571. * Add a new PhySocket for the client connections
  572. */
  573. void NetconEthernetTap::phyOnUnixAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN) {
  574. dwr(MSG_DEBUG, " phyOnUnixAccept(): accepting new connection\n");
  575. if(find(rpc_sockets.begin(), rpc_sockets.end(), sockN) != rpc_sockets.end()){
  576. dwr(MSG_ERROR, " phyOnUnixAccept(): SockN (0x%x) already exists!\n", sockN);
  577. return;
  578. }
  579. rpc_sockets.push_back(sockN);
  580. }
  581. /*
  582. * Processes incoming data on a client-specific RPC connection
  583. */
  584. void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len)
  585. {
  586. pid_t pid, tid;
  587. int rpc_count;
  588. char cmd, timestamp[20];
  589. void *payload;
  590. unload_rpc(data, pid, tid, rpc_count, timestamp, cmd, payload);
  591. dwr(MSG_DEBUG, "\n\nRPC: (pid=%d, tid=%d, rpc_count=%d, timestamp=%s, cmd=%d\n", pid, tid, rpc_count, timestamp, cmd);
  592. unsigned char *buf = (unsigned char*)data;
  593. switch(cmd)
  594. {
  595. case RPC_SOCKET:
  596. dwr(MSG_DEBUG, "RPC_SOCKET\n");
  597. struct socket_st socket_rpc;
  598. memcpy(&socket_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct socket_st));
  599. if(rpc_count==rpc_counter) {
  600. dwr(MSG_ERROR, "Detected repeat RPC.\n");
  601. //return;
  602. }
  603. else {
  604. rpc_counter = rpc_count;
  605. }
  606. TcpConnection * new_conn;
  607. if((new_conn = handle_socket(sock, uptr, &socket_rpc))) {
  608. pidmap[sock] = pid;
  609. new_conn->pid = pid;
  610. }
  611. break;
  612. case RPC_LISTEN:
  613. dwr(MSG_DEBUG, "RPC_LISTEN\n");
  614. struct listen_st listen_rpc;
  615. memcpy(&listen_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct listen_st));
  616. handle_listen(sock, uptr, &listen_rpc);
  617. break;
  618. case RPC_BIND:
  619. dwr(MSG_DEBUG, "RPC_BIND\n");
  620. struct bind_st bind_rpc;
  621. memcpy(&bind_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct bind_st));
  622. handle_bind(sock, uptr, &bind_rpc);
  623. break;
  624. case RPC_CONNECT:
  625. dwr(MSG_DEBUG, "RPC_CONNECT\n");
  626. struct connect_st connect_rpc;
  627. memcpy(&connect_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct connect_st));
  628. handle_connect(sock, uptr, &connect_rpc);
  629. break;
  630. case RPC_MAP:
  631. dwr(MSG_DEBUG, "RPC_MAP (len = %d)\n", len);
  632. int newfd;
  633. memcpy(&newfd, &buf[IDX_PAYLOAD+1], sizeof(int));
  634. handle_retval(sock, uptr, rpc_count, newfd);
  635. break;
  636. case RPC_MAP_REQ:
  637. dwr(MSG_DEBUG, "RPC_MAP_REQ\n");
  638. handle_map_request(sock, uptr, buf);
  639. break;
  640. case RPC_GETSOCKNAME:
  641. dwr(MSG_DEBUG, "RPC_GETSOCKNAME\n");
  642. struct getsockname_st getsockname_rpc;
  643. memcpy(&getsockname_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct getsockname_st));
  644. handle_getsockname(sock, uptr, &getsockname_rpc);
  645. break;
  646. default:
  647. dwr(MSG_ERROR, "POSSIBLE RPC CORRUPTION. TRY AGAIN!\n");
  648. break;
  649. }
  650. }
  651. /*
  652. * Send a 'retval' and 'errno' to the client for an RPC over connection->rpcSock
  653. */
  654. int NetconEthernetTap::send_return_value(TcpConnection *conn, int retval, int _errno = 0)
  655. {
  656. if(conn) {
  657. int n = send_return_value(_phy.getDescriptor(conn->rpcSock), retval, _errno);
  658. if(n > 0)
  659. conn->pending = false;
  660. else {
  661. dwr(MSG_ERROR, " Unable to send return value to the intercept. Closing connection\n");
  662. closeConnection(conn);
  663. }
  664. return n;
  665. }
  666. return -1;
  667. }
  668. int NetconEthernetTap::send_return_value(int fd, int retval, int _errno = 0)
  669. {
  670. dwr(MSG_DEBUG, " send_return_value(): fd = %d, retval = %d, errno = %d\n", fd, retval, _errno);
  671. int sz = sizeof(char) + sizeof(retval) + sizeof(errno);
  672. char retmsg[sz];
  673. memset(&retmsg, '\0', sizeof(retmsg));
  674. retmsg[0]=RPC_RETVAL;
  675. memcpy(&retmsg[1], &retval, sizeof(retval));
  676. memcpy(&retmsg[1]+sizeof(retval), &_errno, sizeof(_errno));
  677. return write(fd, &retmsg, sz);
  678. }
  679. /*------------------------------------------------------------------------------
  680. --------------------------------- LWIP callbacks -------------------------------
  681. ------------------------------------------------------------------------------*/
  682. // NOTE: these are called from within LWIP, meaning that lwipstack->_lock is ALREADY
  683. // locked in this case!
  684. /*
  685. * Callback from LWIP for when a connection has been accepted and the PCB has been
  686. * put into an ACCEPT state.
  687. *
  688. * A socketpair is created, one end is kept and wrapped into a PhySocket object
  689. * for use in the main ZT I/O loop, and one end is sent to the client. The client
  690. * is then required to tell the service what new file descriptor it has allocated
  691. * for this connection. After the mapping is complete, the accepted socket can be
  692. * used.
  693. *
  694. * @param associated service state object
  695. * @param newly allocated PCB
  696. * @param error code
  697. * @return ERR_OK if everything is ok, -1 otherwise
  698. i := should be implemented in intercept lib
  699. I := is implemented in intercept lib
  700. X := is implemented in service
  701. ? := required treatment Unknown
  702. - := Not needed
  703. [ ] EAGAIN or EWOULDBLOCK - The socket is marked nonblocking and no connections are present
  704. to be accepted. POSIX.1-2001 allows either error to be returned for
  705. this case, and does not require these constants to have the same value,
  706. so a portable application should check for both possibilities.
  707. [I] EBADF - The descriptor is invalid.
  708. [I] ECONNABORTED - A connection has been aborted.
  709. [i] EFAULT - The addr argument is not in a writable part of the user address space.
  710. [-] EINTR - The system call was interrupted by a signal that was caught before a valid connection arrived; see signal(7).
  711. [I] EINVAL - Socket is not listening for connections, or addrlen is invalid (e.g., is negative).
  712. [I] EINVAL - (accept4()) invalid value in flags.
  713. [I] EMFILE - The per-process limit of open file descriptors has been reached.
  714. [ ] ENFILE - The system limit on the total number of open files has been reached.
  715. [ ] ENOBUFS, ENOMEM - Not enough free memory. This often means that the memory allocation is
  716. limited by the socket buffer limits, not by the system memory.
  717. [I] ENOTSOCK - The descriptor references a file, not a socket.
  718. [I] EOPNOTSUPP - The referenced socket is not of type SOCK_STREAM.
  719. [ ] EPROTO - Protocol error.
  720. *
  721. */
  722. err_t NetconEthernetTap::nc_accept(void *arg, struct tcp_pcb *newpcb, err_t err)
  723. {
  724. dwr(MSG_DEBUG, " nc_accept()\n");
  725. Larg *l = (Larg*)arg;
  726. TcpConnection *conn = l->conn;
  727. NetconEthernetTap *tap = l->tap;
  728. int listening_fd = tap->_phy.getDescriptor(conn->dataSock);
  729. if(conn) {
  730. ZT_PHY_SOCKFD_TYPE fds[2];
  731. if(socketpair(PF_LOCAL, SOCK_STREAM, 0, fds) < 0) {
  732. if(errno < 0) {
  733. l->tap->send_return_value(conn, -1, errno);
  734. dwr(MSG_ERROR, " nc_accept(): unable to create socketpair\n");
  735. return ERR_MEM;
  736. }
  737. }
  738. TcpConnection *new_tcp_conn = new TcpConnection();
  739. new_tcp_conn->dataSock = tap->_phy.wrapSocket(fds[0], new_tcp_conn);
  740. new_tcp_conn->rpcSock = conn->rpcSock;
  741. new_tcp_conn->pcb = newpcb;
  742. new_tcp_conn->their_fd = fds[1];
  743. tap->tcp_connections.push_back(new_tcp_conn);
  744. dwr(MSG_DEBUG, " nc_accept(): socketpair = {%d, %d}\n", fds[0], fds[1]);
  745. int send_fd = tap->_phy.getDescriptor(conn->rpcSock);
  746. dwr(MSG_DEBUG, " nc_accept(): sending %d via %d\n", fds[1], listening_fd);
  747. if(sock_fd_write(listening_fd, fds[1]) < 0){
  748. dwr(MSG_ERROR, " nc_accept(%d): error writing signal byte (listen_fd = %d, perceived_fd = %d)\n", listening_fd, send_fd, fds[1]);
  749. return -1;
  750. }
  751. else {
  752. close(fds[1]); // close other end of socketpair
  753. new_tcp_conn->pending = true;
  754. }
  755. tap->lwipstack->_tcp_arg(newpcb, new Larg(tap, new_tcp_conn));
  756. tap->lwipstack->_tcp_recv(newpcb, nc_recved);
  757. tap->lwipstack->_tcp_err(newpcb, nc_err);
  758. tap->lwipstack->_tcp_sent(newpcb, nc_sent);
  759. tap->lwipstack->_tcp_poll(newpcb, nc_poll, 1);
  760. tcp_accepted(conn->pcb); // Let lwIP know that it can queue additional incoming connections
  761. return ERR_OK;
  762. }
  763. else {
  764. dwr(MSG_ERROR, " nc_accept(%d): can't locate Connection object for PCB.\n", listening_fd);
  765. }
  766. return -1;
  767. }
  768. /*
  769. * Callback from LWIP for when data is available to be read from the network.
  770. *
  771. * Data is in the form of a linked list of struct pbufs, it is then recombined and
  772. * send to the client over the associated unix socket.
  773. *
  774. * @param associated service state object
  775. * @param allocated PCB
  776. * @param chain of pbufs
  777. * @param error code
  778. * @return ERR_OK if everything is ok, -1 otherwise
  779. *
  780. */
  781. err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *tpcb, struct pbuf *p, err_t err)
  782. {
  783. dwr(MSG_DEBUG, " nc_recved()\n");
  784. Larg *l = (Larg*)arg;
  785. int n;
  786. struct pbuf* q = p;
  787. if(!l->conn) {
  788. dwr(MSG_ERROR, " nc_recved(): no connection object\n");
  789. return ERR_OK; // ?
  790. }
  791. if(p == NULL) {
  792. if(l->conn && !l->conn->listening) {
  793. dwr(MSG_INFO, " nc_recved(): closing connection\n");
  794. l->tap->closeConnection(l->conn);
  795. return ERR_ABRT;
  796. }
  797. else {
  798. dwr(MSG_ERROR, " nc_recved(): can't locate connection via (arg)\n");
  799. }
  800. return err;
  801. }
  802. q = p;
  803. while(p != NULL) { // Cycle through pbufs and write them to the socket
  804. if(p->len <= 0)
  805. break; // ?
  806. if((n = l->tap->_phy.streamSend(l->conn->dataSock,p->payload, p->len)) > 0) {
  807. if(n < p->len) {
  808. dwr(MSG_INFO, " nc_recved(): unable to write entire pbuf to buffer\n");
  809. }
  810. l->tap->lwipstack->_tcp_recved(tpcb, n); // TODO: would it be more efficient to call this once at the end?
  811. dwr(MSG_DEBUG, " nc_recved(): wrote %d bytes to (%d)\n", n, l->tap->_phy.getDescriptor(l->conn->dataSock));
  812. }
  813. else {
  814. dwr(MSG_INFO, " nc_recved(): No data written to intercept buffer (%d)\n", l->tap->_phy.getDescriptor(l->conn->dataSock));
  815. }
  816. p = p->next;
  817. }
  818. l->tap->lwipstack->_pbuf_free(q); // free pbufs
  819. return ERR_OK;
  820. }
  821. /*
  822. * Callback from LWIP when an internal error is associtated with the given (arg)
  823. *
  824. * Since the PCB related to this error might no longer exist, only its perviously
  825. * associated (arg) is provided to us.
  826. *
  827. * @param associated service state object
  828. * @param error code
  829. *
  830. */
  831. void NetconEthernetTap::nc_err(void *arg, err_t err)
  832. {
  833. dwr(MSG_DEBUG, "nc_err()\n");
  834. Larg *l = (Larg*)arg;
  835. if(!l->conn)
  836. dwr(MSG_ERROR, "nc_err(): Connection is NULL!\n");
  837. if(l->conn->listening)
  838. return;
  839. switch(err)
  840. {
  841. case ERR_MEM:
  842. dwr(MSG_ERROR, "nc_err(): ERR_MEM->ENOMEM\n");
  843. l->tap->send_return_value(l->conn, -1, ENOMEM);
  844. break;
  845. case ERR_BUF:
  846. dwr(MSG_ERROR, "nc_err(): ERR_BUF->ENOBUFS\n");
  847. l->tap->send_return_value(l->conn, -1, ENOBUFS);
  848. break;
  849. case ERR_TIMEOUT:
  850. dwr(MSG_ERROR, "nc_err(): ERR_TIMEOUT->ETIMEDOUT\n");
  851. l->tap->send_return_value(l->conn, -1, ETIMEDOUT);
  852. break;
  853. case ERR_RTE:
  854. dwr(MSG_ERROR, "nc_err(): ERR_RTE->ENETUNREACH\n");
  855. l->tap->send_return_value(l->conn, -1, ENETUNREACH);
  856. break;
  857. case ERR_INPROGRESS:
  858. dwr(MSG_ERROR, "nc_err(): ERR_INPROGRESS->EINPROGRESS\n");
  859. l->tap->send_return_value(l->conn, -1, EINPROGRESS);
  860. break;
  861. case ERR_VAL:
  862. dwr(MSG_ERROR, "nc_err(): ERR_VAL->EINVAL\n");
  863. l->tap->send_return_value(l->conn, -1, EINVAL);
  864. break;
  865. case ERR_WOULDBLOCK:
  866. dwr(MSG_ERROR, "nc_err(): ERR_WOULDBLOCK->EWOULDBLOCK\n");
  867. l->tap->send_return_value(l->conn, -1, EWOULDBLOCK);
  868. break;
  869. case ERR_USE:
  870. dwr(MSG_ERROR, "nc_err(): ERR_USE->EADDRINUSE\n");
  871. l->tap->send_return_value(l->conn, -1, EADDRINUSE);
  872. break;
  873. case ERR_ISCONN:
  874. dwr(MSG_ERROR, "nc_err(): ERR_ISCONN->EISCONN\n");
  875. l->tap->send_return_value(l->conn, -1, EISCONN);
  876. break;
  877. case ERR_ABRT:
  878. dwr(MSG_ERROR, "nc_err(): ERR_ABRT->ECONNREFUSED\n");
  879. l->tap->send_return_value(l->conn, -1, ECONNREFUSED);
  880. break;
  881. // FIXME: Below are errors which don't have a standard errno correlate
  882. case ERR_RST:
  883. l->tap->send_return_value(l->conn, -1, -1);
  884. break;
  885. case ERR_CLSD:
  886. l->tap->send_return_value(l->conn, -1, -1);
  887. break;
  888. case ERR_CONN:
  889. l->tap->send_return_value(l->conn, -1, -1);
  890. break;
  891. case ERR_ARG:
  892. l->tap->send_return_value(l->conn, -1, -1);
  893. break;
  894. case ERR_IF:
  895. l->tap->send_return_value(l->conn, -1, -1);
  896. break;
  897. default:
  898. break;
  899. }
  900. dwr(MSG_ERROR, "nc_err(): closing connection\n");
  901. l->tap->closeConnection(l->conn);
  902. }
  903. /*
  904. * Callback from LWIP to do whatever work we might need to do.
  905. *
  906. * @param associated service state object
  907. * @param PCB we're polling on
  908. * @return ERR_OK if everything is ok, -1 otherwise
  909. *
  910. */
  911. err_t NetconEthernetTap::nc_poll(void* arg, struct tcp_pcb *tpcb)
  912. {
  913. return ERR_OK;
  914. }
  915. /*
  916. * Callback from LWIP to signal that 'len' bytes have successfully been sent.
  917. * As a result, we should put our socket back into a notify-on-readability state
  918. * since there is now room on the PCB buffer to write to.
  919. *
  920. * NOTE: This could be used to track the amount of data sent by a connection.
  921. *
  922. * @param associated service state object
  923. * @param relevant PCB
  924. * @param length of data sent
  925. * @return ERR_OK if everything is ok, -1 otherwise
  926. *
  927. */
  928. err_t NetconEthernetTap::nc_sent(void* arg, struct tcp_pcb *tpcb, u16_t len)
  929. {
  930. //dwr(5, " nc_sent()\n");
  931. Larg *l = (Larg*)arg;
  932. if(len) {
  933. l->conn->acked+=len;
  934. //dwr("W = %d, A = %d\n", l->conn->written, l->conn->acked);
  935. //dwr("ACK = %d\n", len);
  936. l->tap->_phy.setNotifyReadable(l->conn->dataSock, true);
  937. l->tap->_phy.whack();
  938. }
  939. return ERR_OK;
  940. }
  941. /*
  942. * Callback from LWIP which sends a return value to the client to signal that
  943. * a connection was established for this PCB
  944. *
  945. * @param associated service state object
  946. * @param relevant PCB
  947. * @param error code
  948. * @return ERR_OK if everything is ok, -1 otherwise
  949. *
  950. */
  951. err_t NetconEthernetTap::nc_connected(void *arg, struct tcp_pcb *tpcb, err_t err)
  952. {
  953. dwr(MSG_DEBUG, " nc_connected()\n");
  954. Larg *l = (Larg*)arg;
  955. l->tap->send_return_value(l->conn, ERR_OK);
  956. return ERR_OK;
  957. }
  958. /*------------------------------------------------------------------------------
  959. ----------------------------- RPC Handler functions ----------------------------
  960. ------------------------------------------------------------------------------*/
  961. /* Unpacks the buffer from an RPC command */
  962. void NetconEthernetTap::unload_rpc(void *data, pid_t &pid, pid_t &tid, int &rpc_count, char (timestamp[20]), char &cmd, void* &payload)
  963. {
  964. unsigned char *buf = (unsigned char*)data;
  965. memcpy(&pid, &buf[IDX_PID], sizeof(pid_t));
  966. memcpy(&tid, &buf[IDX_TID], sizeof(pid_t));
  967. memcpy(&rpc_count, &buf[IDX_COUNT], sizeof(int));
  968. memcpy(timestamp, &buf[IDX_TIME], 20);
  969. memcpy(&cmd, &buf[IDX_PAYLOAD], sizeof(char));
  970. }
  971. /*
  972. Responds to a request from the [intercept] to determine whether a local socket is
  973. mapped to this service. In other words, how do the intercept's overridden calls
  974. tell the difference between regular AF_LOCAL sockets and one of our socketpairs
  975. that is used to communicate over the network?
  976. */
  977. void NetconEthernetTap::handle_map_request(PhySocket *sock, void **uptr, unsigned char* buf)
  978. {
  979. dwr(4, " handle_map_request()\n");
  980. TcpConnection *conn = (TcpConnection*)*uptr;
  981. int req_fd;
  982. memcpy(&req_fd, &buf[IDX_PAYLOAD+1], sizeof(req_fd));
  983. for(size_t i=0; i<tcp_connections.size(); i++) {
  984. if(tcp_connections[i]->rpcSock == conn->rpcSock && tcp_connections[i]->perceived_fd == req_fd){
  985. send_return_value(conn, 1, ERR_OK); // True
  986. dwr(MSG_DEBUG, " handle_map_request(their=%d): MAPPED (to %d)\n", req_fd,
  987. _phy.getDescriptor(tcp_connections[i]->dataSock));
  988. return;
  989. }
  990. }
  991. send_return_value(conn, 0, ERR_OK); // False
  992. dwr(MSG_DEBUG, " handle_map_request(their=%d): NOT MAPPED\n", req_fd);
  993. }
  994. /**
  995. * Handles a return value (client's perceived fd) and completes a mapping
  996. * so that we know what connection an RPC call should be associated with.
  997. *
  998. * @param PhySocket associated with this RPC connection
  999. * @param structure containing the data and parameters for this client's RPC
  1000. *
  1001. */
  1002. void NetconEthernetTap::handle_retval(PhySocket *sock, void **uptr, int rpc_count, int newfd)
  1003. {
  1004. dwr(MSG_DEBUG, " handle_retval()\n");
  1005. TcpConnection *conn = (TcpConnection*)*uptr;
  1006. if(!conn->pending){
  1007. send_return_value(conn, -1, -1);
  1008. return;
  1009. }
  1010. conn->pending = false;
  1011. conn->perceived_fd = newfd;
  1012. if(rpc_count==rpc_counter) {
  1013. dwr(MSG_ERROR, " handle_retval(): Detected repeat RPC.\n");
  1014. send_return_value(conn, -1, -1);
  1015. //return;
  1016. }
  1017. else
  1018. rpc_counter = rpc_count;
  1019. dwr(MSG_DEBUG, " handle_retval(): CONN:%x - Mapping [our=%d -> their=%d]\n",conn,
  1020. _phy.getDescriptor(conn->dataSock), conn->perceived_fd);
  1021. /* Check for pre-existing connection for this socket ---
  1022. This block is in response to interesting behaviour from redis-server. A
  1023. socket is created, setsockopt is called and the socket is set to IPV6 but fails (for now),
  1024. then it is closed and re-opened and consequently remapped. With two pipes mapped
  1025. to the same socket, makes it possible that we write to the wrong pipe and fail. So
  1026. this block merely searches for a possible duplicate mapping and erases it
  1027. */
  1028. for(size_t i=0; i<tcp_connections.size(); i++) {
  1029. if(tcp_connections[i] == conn)
  1030. continue;
  1031. if(tcp_connections[i]->rpcSock == conn->rpcSock) {
  1032. if(tcp_connections[i]->perceived_fd == conn->perceived_fd) {
  1033. int n;
  1034. if((n = send(_phy.getDescriptor(tcp_connections[i]->dataSock), "z", 1, MSG_NOSIGNAL)) < 0) {
  1035. dwr(MSG_DEBUG, " handle_retval(): CONN:%x - Socket (%d) already mapped (originally CONN:%x)\n", conn, tcp_connections[i]->perceived_fd, tcp_connections[i]);
  1036. closeConnection(tcp_connections[i]);
  1037. }
  1038. else {
  1039. dwr(MSG_ERROR, " handle_retval(): CONN:%x - This socket is mapped to two different pipes (?). Exiting.\n", conn);
  1040. //die(0); // FIXME: Print service mapping state and exit
  1041. }
  1042. }
  1043. }
  1044. }
  1045. send_return_value(conn, ERR_OK, ERR_OK); // Success
  1046. }
  1047. /* Return the address that the socket is bound to */
  1048. void NetconEthernetTap::handle_getsockname(PhySocket *sock, void **uptr, struct getsockname_st *getsockname_rpc)
  1049. {
  1050. TcpConnection *conn = getConnectionByTheirFD(sock, getsockname_rpc->sockfd);
  1051. dwr(MSG_DEBUG, " handle_getsockname(): sockfd = %d\n", getsockname_rpc->sockfd);
  1052. dwr(MSG_DEBUG, " handle_getsockname(): conn = 0x%x\n", conn);
  1053. /*
  1054. if(!conn){
  1055. return;
  1056. }
  1057. struct sockaddr_in * myaddr = (struct sockaddr_in*)conn->addr;
  1058. int port = myaddr->sin_port;
  1059. int ip = myaddr->sin_addr.s_addr;
  1060. unsigned char d[4];
  1061. d[0] = ip & 0xFF;
  1062. d[1] = (ip >> 8) & 0xFF;
  1063. d[2] = (ip >> 16) & 0xFF;
  1064. d[3] = (ip >> 24) & 0xFF;
  1065. dwr(MSG_ERROR, " handle_getsockname(): addr = %d.%d.%d.%d: %d\n", d[0],d[1],d[2],d[3], lwipstack->ntohs(port));
  1066. */
  1067. // Assemble address "command" to send to intercept
  1068. char retmsg[sizeof(struct sockaddr_storage)];
  1069. memset(&retmsg, 0, sizeof(retmsg));
  1070. if ((conn)&&(conn->addr))
  1071. memcpy(&retmsg, conn->addr, sizeof(struct sockaddr_storage));
  1072. int n = write(_phy.getDescriptor(conn->rpcSock), &retmsg, sizeof(struct sockaddr_storage));
  1073. dwr(MSG_DEBUG, " handle_getsockname(): wrote %d bytes\n", n);
  1074. }
  1075. /*
  1076. * Handles an RPC to bind an LWIP PCB to a given address and port
  1077. *
  1078. * @param PhySocket associated with this RPC connection
  1079. * @param structure containing the data and parameters for this client's RPC
  1080. *
  1081. i := should be implemented in intercept lib
  1082. I := is implemented in intercept lib
  1083. X := is implemented in service
  1084. ? := required treatment Unknown
  1085. - := Not needed
  1086. [ ] EACCES - The address is protected, and the user is not the superuser.
  1087. [X] EADDRINUSE - The given address is already in use.
  1088. [I] EBADF - sockfd is not a valid descriptor.
  1089. [X] EINVAL - The socket is already bound to an address.
  1090. [I] ENOTSOCK - sockfd is a descriptor for a file, not a socket.
  1091. [X] ENOMEM - Insufficient kernel memory was available.
  1092. - The following errors are specific to UNIX domain (AF_UNIX) sockets:
  1093. [-] EACCES - Search permission is denied on a component of the path prefix. (See also path_resolution(7).)
  1094. [-] EADDRNOTAVAIL - A nonexistent interface was requested or the requested address was not local.
  1095. [-] EFAULT - addr points outside the user's accessible address space.
  1096. [-] EINVAL - The addrlen is wrong, or the socket was not in the AF_UNIX family.
  1097. [-] ELOOP - Too many symbolic links were encountered in resolving addr.
  1098. [-] ENAMETOOLONG - s addr is too long.
  1099. [-] ENOENT - The file does not exist.
  1100. [-] ENOTDIR - A component of the path prefix is not a directory.
  1101. [-] EROFS - The socket inode would reside on a read-only file system.
  1102. */
  1103. void NetconEthernetTap::handle_bind(PhySocket *sock, void **uptr, struct bind_st *bind_rpc)
  1104. {
  1105. struct sockaddr_in *connaddr;
  1106. connaddr = (struct sockaddr_in *) &bind_rpc->addr;
  1107. int conn_port = lwipstack->ntohs(connaddr->sin_port);
  1108. ip_addr_t conn_addr;
  1109. conn_addr.addr = *((u32_t *)_ips[0].rawIpData());
  1110. TcpConnection *conn = getConnectionByTheirFD(sock, bind_rpc->sockfd);
  1111. dwr(MSG_DEBUG, " handle_bind(%d)\n", bind_rpc->sockfd);
  1112. if(conn) {
  1113. if(conn->pcb->state == CLOSED){
  1114. int err = lwipstack->tcp_bind(conn->pcb, &conn_addr, conn_port);
  1115. int ip = connaddr->sin_addr.s_addr;
  1116. unsigned char d[4];
  1117. d[0] = ip & 0xFF;
  1118. d[1] = (ip >> 8) & 0xFF;
  1119. d[2] = (ip >> 16) & 0xFF;
  1120. d[3] = (ip >> 24) & 0xFF;
  1121. dwr(MSG_DEBUG, " handle_bind(): %d.%d.%d.%d : %d\n", d[0],d[1],d[2],d[3], conn_port);
  1122. if(err != ERR_OK) {
  1123. dwr(MSG_ERROR, " handle_bind(): err = %d\n", err);
  1124. if(err == ERR_USE)
  1125. send_return_value(conn, -1, EADDRINUSE);
  1126. if(err == ERR_MEM)
  1127. send_return_value(conn, -1, ENOMEM);
  1128. if(err == ERR_BUF)
  1129. send_return_value(conn, -1, ENOMEM); // FIXME: Closest match
  1130. }
  1131. else {
  1132. conn->addr = (struct sockaddr_storage *) &bind_rpc->addr;
  1133. send_return_value(conn, ERR_OK, ERR_OK); // Success
  1134. }
  1135. }
  1136. else {
  1137. dwr(MSG_ERROR, " handle_bind(): PCB (%x) not in CLOSED state. Ignoring BIND request.\n", conn->pcb);
  1138. send_return_value(conn, -1, EINVAL);
  1139. }
  1140. }
  1141. else {
  1142. dwr(MSG_ERROR, " handle_bind(): can't locate connection for PCB\n");
  1143. send_return_value(conn, -1, EBADF);
  1144. }
  1145. }
  1146. /*
  1147. * Handles an RPC to put an LWIP PCB into LISTEN mode
  1148. *
  1149. * @param PhySocket associated with this RPC connection
  1150. * @param structure containing the data and parameters for this client's RPC
  1151. *
  1152. i := should be implemented in intercept lib
  1153. I := is implemented in intercept lib
  1154. X := is implemented in service
  1155. ? := required treatment Unknown
  1156. - := Not needed
  1157. [?] EADDRINUSE - Another socket is already listening on the same port.
  1158. [IX] EBADF - The argument sockfd is not a valid descriptor.
  1159. [I] ENOTSOCK - The argument sockfd is not a socket.
  1160. [I] EOPNOTSUPP - The socket is not of a type that supports the listen() operation.
  1161. */
  1162. void NetconEthernetTap::handle_listen(PhySocket *sock, void **uptr, struct listen_st *listen_rpc)
  1163. {
  1164. dwr(3, " handle_listen(their=%d):\n", listen_rpc->sockfd);
  1165. TcpConnection *conn = getConnectionByTheirFD(sock, listen_rpc->sockfd);
  1166. if(!conn){
  1167. dwr(MSG_ERROR, " handle_listen(): unable to locate connection object\n");
  1168. send_return_value(conn, -1, EBADF);
  1169. return;
  1170. }
  1171. dwr(3, " handle_listen(our=%d -> their=%d)\n", _phy.getDescriptor(conn->dataSock), conn->perceived_fd);
  1172. if(conn->pcb->state == LISTEN) {
  1173. dwr(MSG_ERROR, " handle_listen(): PCB is already in listening state.\n");
  1174. send_return_value(conn, ERR_OK, ERR_OK);
  1175. return;
  1176. }
  1177. struct tcp_pcb* listening_pcb;
  1178. #ifdef TCP_LISTEN_BACKLOG
  1179. listening_pcb = lwipstack->tcp_listen_with_backlog(conn->pcb, listen_rpc->backlog);
  1180. #else
  1181. listening_pcb = lwipstack->tcp_listen(conn->pcb);
  1182. #endif
  1183. if(listening_pcb != NULL) {
  1184. conn->pcb = listening_pcb;
  1185. lwipstack->tcp_accept(listening_pcb, nc_accept);
  1186. lwipstack->tcp_arg(listening_pcb, new Larg(this, conn));
  1187. /* we need to wait for the client to send us the fd allocated on their end
  1188. for this listening socket */
  1189. fcntl(_phy.getDescriptor(conn->dataSock), F_SETFL, O_NONBLOCK);
  1190. conn->listening = true;
  1191. conn->pending = true;
  1192. send_return_value(conn, ERR_OK, ERR_OK);
  1193. return;
  1194. }
  1195. send_return_value(conn, -1, -1);
  1196. }
  1197. /*
  1198. * Handles an RPC to create a socket (LWIP PCB and associated socketpair)
  1199. *
  1200. * A socketpair is created, one end is kept and wrapped into a PhySocket object
  1201. * for use in the main ZT I/O loop, and one end is sent to the client. The client
  1202. * is then required to tell the service what new file descriptor it has allocated
  1203. * for this connection. After the mapping is complete, the socket can be used.
  1204. *
  1205. * @param PhySocket associated with this RPC connection
  1206. * @param structure containing the data and parameters for this client's RPC
  1207. *
  1208. i := should be implemented in intercept lib
  1209. I := is implemented in intercept lib
  1210. X := is implemented in service
  1211. ? := required treatment Unknown
  1212. - := Not needed
  1213. [-] EACCES - Permission to create a socket of the specified type and/or protocol is denied.
  1214. [I] EAFNOSUPPORT - The implementation does not support the specified address family.
  1215. [I] EINVAL - Unknown protocol, or protocol family not available.
  1216. [I] EINVAL - Invalid flags in type.
  1217. [I] EMFILE - Process file table overflow.
  1218. [?] ENFILE - The system limit on the total number of open files has been reached.
  1219. [X] ENOBUFS or ENOMEM - Insufficient memory is available. The socket cannot be created until sufficient resources are freed.
  1220. [?] EPROTONOSUPPORT - The protocol type or the specified protocol is not supported within this domain.
  1221. */
  1222. TcpConnection * NetconEthernetTap::handle_socket(PhySocket *sock, void **uptr, struct socket_st* socket_rpc)
  1223. {
  1224. int rpc_fd = _phy.getDescriptor(sock);
  1225. struct tcp_pcb *newpcb = lwipstack->tcp_new();
  1226. dwr(MSG_DEBUG, " handle_socket(): pcb=%x\n", newpcb);
  1227. if(newpcb != NULL) {
  1228. ZT_PHY_SOCKFD_TYPE fds[2];
  1229. if(socketpair(PF_LOCAL, SOCK_STREAM, 0, fds) < 0) {
  1230. if(errno < 0) {
  1231. send_return_value(rpc_fd, -1, errno);
  1232. return NULL;
  1233. }
  1234. }
  1235. dwr(MSG_DEBUG, " handle_socket(): socketpair = {%d, %d}\n", fds[0], fds[1]);
  1236. TcpConnection *new_conn = new TcpConnection();
  1237. new_conn->dataSock = _phy.wrapSocket(fds[0], new_conn);
  1238. *uptr = new_conn;
  1239. new_conn->rpcSock = sock;
  1240. new_conn->pcb = newpcb;
  1241. new_conn->their_fd = fds[1];
  1242. tcp_connections.push_back(new_conn);
  1243. sock_fd_write(_phy.getDescriptor(sock), fds[1]);
  1244. close(fds[1]); // close other end of socketpair
  1245. // Once the client tells us what its fd is on the other end, we can then complete the mapping
  1246. new_conn->pending = true;
  1247. return new_conn;
  1248. }
  1249. sock_fd_write(rpc_fd, -1); // Send a bad fd, to signal error
  1250. dwr(MSG_ERROR, " handle_socket(): Memory not available for new PCB\n");
  1251. send_return_value(rpc_fd, -1, ENOMEM);
  1252. return NULL;
  1253. }
  1254. /*
  1255. * Handles an RPC to connect to a given address and port
  1256. *
  1257. * @param PhySocket associated with this RPC connection
  1258. * @param structure containing the data and parameters for this client's RPC
  1259. --- Error handling in this method will only catch problems which are immedately
  1260. apprent. Some errors will need to be caught in the nc_connected(0 callback
  1261. i := should be implemented in intercept lib
  1262. I := is implemented in intercept lib
  1263. X := is implemented in service
  1264. ? := required treatment Unknown
  1265. - := Not needed
  1266. [-] EACCES - For UNIX domain sockets, which are identified by pathname: Write permission is denied ...
  1267. [?] EACCES, EPERM - The user tried to connect to a broadcast address without having the socket broadcast flag enabled ...
  1268. [X] EADDRINUSE - Local address is already in use.
  1269. [I] EAFNOSUPPORT - The passed address didn't have the correct address family in its sa_family field.
  1270. [X] EAGAIN - No more free local ports or insufficient entries in the routing cache.
  1271. [ ] EALREADY - The socket is nonblocking and a previous connection attempt has not yet been completed.
  1272. [IX] EBADF - The file descriptor is not a valid index in the descriptor table.
  1273. [ ] ECONNREFUSED - No-one listening on the remote address.
  1274. [i] EFAULT - The socket structure address is outside the user's address space.
  1275. [ ] EINPROGRESS - The socket is nonblocking and the connection cannot be completed immediately.
  1276. [-] EINTR - The system call was interrupted by a signal that was caught.
  1277. [X] EISCONN - The socket is already connected.
  1278. [X] ENETUNREACH - Network is unreachable.
  1279. [I] ENOTSOCK - The file descriptor is not associated with a socket.
  1280. [X] ETIMEDOUT - Timeout while attempting connection.
  1281. [X] EINVAL - Invalid argument, SVr4, generally makes sense to set this
  1282. *
  1283. */
  1284. void NetconEthernetTap::handle_connect(PhySocket *sock, void **uptr, struct connect_st* connect_rpc)
  1285. {
  1286. dwr(MSG_DEBUG, " handle_connect()\n");
  1287. TcpConnection *conn = (TcpConnection*)*uptr;
  1288. struct sockaddr_in *connaddr;
  1289. connaddr = (struct sockaddr_in *) &connect_rpc->__addr;
  1290. int conn_port = lwipstack->ntohs(connaddr->sin_port);
  1291. ip_addr_t conn_addr = convert_ip((struct sockaddr_in *)&connect_rpc->__addr);
  1292. if(conn != NULL) {
  1293. if (!conn->listening)
  1294. lwipstack->tcp_sent(conn->pcb, nc_sent);
  1295. lwipstack->tcp_recv(conn->pcb, nc_recved);
  1296. lwipstack->tcp_err(conn->pcb, nc_err);
  1297. lwipstack->tcp_poll(conn->pcb, nc_poll, APPLICATION_POLL_FREQ);
  1298. lwipstack->tcp_arg(conn->pcb, new Larg(this, conn));
  1299. int err = 0;
  1300. if((err = lwipstack->tcp_connect(conn->pcb,&conn_addr,conn_port, nc_connected)) < 0)
  1301. {
  1302. if(err == ERR_ISCONN) {
  1303. send_return_value(conn, -1, EISCONN); // Already in connected state
  1304. return;
  1305. }
  1306. if(err == ERR_USE) {
  1307. send_return_value(conn, -1, EADDRINUSE); // Already in use
  1308. return;
  1309. }
  1310. if(err == ERR_VAL) {
  1311. send_return_value(conn, -1, EINVAL); // Invalid ipaddress parameter
  1312. return;
  1313. }
  1314. if(err == ERR_RTE) {
  1315. send_return_value(conn, -1, ENETUNREACH); // No route to host
  1316. return;
  1317. }
  1318. if(err == ERR_BUF) {
  1319. send_return_value(conn, -1, EAGAIN); // No more ports available
  1320. return;
  1321. }
  1322. if(err == ERR_MEM)
  1323. {
  1324. /* Can occur for the following reasons: tcp_enqueue_flags()
  1325. 1) tcp_enqueue_flags is always called with either SYN or FIN in flags.
  1326. We need one available snd_buf byte to do that.
  1327. This means we can't send FIN while snd_buf==0. A better fix would be to
  1328. not include SYN and FIN sequence numbers in the snd_buf count.
  1329. 2) Cannot allocate new pbuf
  1330. 3) Cannot allocate new TCP segment
  1331. */
  1332. send_return_value(conn, -1, EAGAIN); // FIXME: Doesn't describe the problem well, but closest match
  1333. return;
  1334. }
  1335. // We should only return a value if failure happens immediately
  1336. // Otherwise, we still need to wait for a callback from lwIP.
  1337. // - This is because an ERR_OK from tcp_connect() only verifies
  1338. // that the SYN packet was enqueued onto the stack properly,
  1339. // that's it!
  1340. // - Most instances of a retval for a connect() should happen
  1341. // in the nc_connect() and nc_err() callbacks!
  1342. dwr(MSG_ERROR, " handle_connect(): unable to connect\n");
  1343. send_return_value(conn, -1, EAGAIN);
  1344. }
  1345. // Everything seems to be ok, but we don't have enough info to retval
  1346. conn->pending=true;
  1347. conn->listening=true;
  1348. send_return_value(conn, -1);
  1349. }
  1350. else {
  1351. dwr(MSG_ERROR, " handle_connect(): could not locate PCB based on their fd\n");
  1352. send_return_value(conn, -1, EBADF);
  1353. }
  1354. }
  1355. void NetconEthernetTap::handle_write(TcpConnection *conn)
  1356. {
  1357. float max = (float)TCP_SND_BUF;
  1358. int r;
  1359. if(!conn) {
  1360. dwr(MSG_ERROR, " handle_write(): could not locate connection for this fd\n");
  1361. return;
  1362. }
  1363. if(conn->idx < max) {
  1364. if(!conn->pcb) {
  1365. dwr(MSG_ERROR, " handle_write(): conn->pcb == NULL. Failed to write.\n");
  1366. return;
  1367. }
  1368. int sndbuf = conn->pcb->snd_buf; // How much we are currently allowed to write to the connection
  1369. /* PCB send buffer is full,turn off readability notifications for the
  1370. corresponding PhySocket until nc_sent() is called and confirms that there is
  1371. now space on the buffer */
  1372. if(sndbuf == 0) {
  1373. _phy.setNotifyReadable(conn->dataSock, false);
  1374. return;
  1375. }
  1376. if(!conn->listening)
  1377. lwipstack->_tcp_output(conn->pcb);
  1378. if(conn->dataSock && !conn->listening) {
  1379. int read_fd = _phy.getDescriptor(conn->dataSock);
  1380. if((r = recvfrom(read_fd, (&conn->buf)+conn->idx, sndbuf, MSG_DONTWAIT, NULL, NULL)) > 0) {
  1381. conn->idx += r;
  1382. /* Writes data pulled from the client's socket buffer to LWIP. This merely sends the
  1383. * data to LWIP to be enqueued and eventually sent to the network. */
  1384. if(r > 0) {
  1385. int sz;
  1386. // NOTE: this assumes that lwipstack->_lock is locked, either
  1387. // because we are in a callback or have locked it manually.
  1388. int err = lwipstack->_tcp_write(conn->pcb, &conn->buf, r, TCP_WRITE_FLAG_COPY);
  1389. lwipstack->_tcp_output(conn->pcb);
  1390. if(err != ERR_OK) {
  1391. dwr(MSG_ERROR, " handle_write(): error while writing to PCB, (err = %d)\n", err);
  1392. return;
  1393. }
  1394. else {
  1395. sz = (conn->idx)-r;
  1396. if(sz) {
  1397. memmove(&conn->buf, (conn->buf+r), sz);
  1398. }
  1399. conn->idx -= r;
  1400. conn->written+=r;
  1401. return;
  1402. }
  1403. }
  1404. else {
  1405. dwr(MSG_INFO, " handle_write(): LWIP stack full\n");
  1406. return;
  1407. }
  1408. }
  1409. }
  1410. }
  1411. }
  1412. } // namespace ZeroTier