system_transport.c 32 KB


  1. /* libanode: the Anode C reference implementation
  2. * Copyright (C) 2009-2010 Adam Ierymenko <[email protected]>
  3. *
  4. * This program is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU General Public License as published by
  6. * the Free Software Foundation, either version 3 of the License, or
  7. * (at your option) any later version.
  8. *
  9. * This program is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. * GNU General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU General Public License
  15. * along with this program. If not, see <http://www.gnu.org/licenses/>. */
  16. #include <stdio.h>
  17. #include <netdb.h>
  18. #include <fcntl.h>
  19. #include <errno.h>
  20. #include <sys/types.h>
  21. #include <sys/socket.h>
  22. #include <arpa/inet.h>
  23. #include "anode.h"
  24. #include "impl/mutex.h"
  25. #include "impl/thread.h"
  26. #include "impl/misc.h"
  27. #include "impl/dns_txt.h"
  28. #ifdef WINDOWS
  29. #include <windows.h>
  30. #include <winsock2.h>
  31. #define AnodeSystemTransport__close_socket(s) closesocket((s))
  32. #define ANODE_USE_SELECT 1
  33. #else
  34. #include <poll.h>
  35. #include <unistd.h>
  36. #define AnodeSystemTransport__close_socket(s) close((s))
  37. #endif
  38. static const char *AnodeSystemTransport_CLASS = "SystemTransport";
  39. /* ======================================================================== */
  40. struct AnodeSystemTransport;
  41. struct AnodeSystemTransport_AnodeSocket
  42. {
  43. AnodeSocket base; /* must be first */
  44. unsigned int entry_idx;
  45. };
  46. #define ANODE_SYSTEM_TRANSPORT_DNS_MAX_RESULTS 16
  47. struct AnodeSystemTransport__dns_request
  48. {
  49. struct AnodeSystemTransport__dns_request *next;
  50. AnodeThread *thread;
  51. struct AnodeSystemTransport *owner;
  52. void (*event_handler)(const AnodeEvent *event);
  53. char name[256];
  54. enum AnodeTransportDnsIncludeMode ipv4_include_mode;
  55. enum AnodeTransportDnsIncludeMode ipv6_include_mode;
  56. enum AnodeTransportDnsIncludeMode anode_include_mode;
  57. AnodeNetworkAddress addresses[ANODE_SYSTEM_TRANSPORT_DNS_MAX_RESULTS];
  58. unsigned int address_count;
  59. int error_code;
  60. };
  61. #ifdef ANODE_USE_SELECT
  62. typedef int AnodeSystemTransport__poll_fd; /* for select() */
  63. #else
  64. typedef struct pollfd AnodeSystemTransport__poll_fd; /* for poll() */
  65. #endif
  66. struct AnodeSystemTransport
  67. {
  68. AnodeTransport interface; /* must be first */
  69. AnodeTransport *base;
  70. #ifdef ANODE_USE_SELECT
  71. FD_SET readfds;
  72. FD_SET writefds;
  73. #endif
  74. void (*default_event_handler)(const AnodeEvent *event);
  75. AnodeSystemTransport__poll_fd *fds;
  76. struct AnodeSystemTransport_AnodeSocket *sockets;
  77. unsigned int fd_count;
  78. unsigned int fd_capacity;
  79. struct AnodeSystemTransport__dns_request *pending_dns_requests;
  80. int invoke_pipe[2];
  81. AnodeMutex invoke_pipe_m;
  82. void *invoke_pipe_buf[2];
  83. unsigned int invoke_pipe_buf_ptr;
  84. };
  85. /* ======================================================================== */
  86. /* Internal helper methods */
  87. static unsigned int AnodeSystemTransport__add_entry(struct AnodeSystemTransport *transport)
  88. {
  89. if ((transport->fd_count + 1) > transport->fd_capacity) {
  90. transport->fd_capacity += 8;
  91. transport->fds = realloc(transport->fds,sizeof(AnodeSystemTransport__poll_fd) * transport->fd_capacity);
  92. transport->sockets = realloc(transport->sockets,sizeof(struct AnodeSystemTransport_AnodeSocket) * transport->fd_capacity);
  93. }
  94. return transport->fd_count++;
  95. }
  96. static void AnodeSystemTransport__remove_entry(struct AnodeSystemTransport *transport,const unsigned int idx)
  97. {
  98. unsigned int i;
  99. --transport->fd_count;
  100. for(i=idx;i<transport->fd_count;++i) {
  101. Anode_memcpy(&transport->fds[i],&transport->fds[i+1],sizeof(AnodeSystemTransport__poll_fd));
  102. Anode_memcpy(&transport->sockets[i],&transport->sockets[i+1],sizeof(struct AnodeSystemTransport_AnodeSocket));
  103. }
  104. if ((transport->fd_capacity - transport->fd_count) > 16) {
  105. transport->fd_capacity -= 16;
  106. transport->fds = realloc(transport->fds,sizeof(AnodeSystemTransport__poll_fd) * transport->fd_capacity);
  107. transport->sockets = realloc(transport->sockets,sizeof(struct AnodeSystemTransport_AnodeSocket) * transport->fd_capacity);
  108. }
  109. }
  110. static void AnodeSystemTransport__dns_invoke_on_completion(void *_dreq)
  111. {
  112. struct AnodeSystemTransport__dns_request *dreq = (struct AnodeSystemTransport__dns_request *)_dreq;
  113. struct AnodeSystemTransport__dns_request *ptr,**lastnext;
  114. AnodeThread_join(dreq->thread);
  115. ptr = dreq->owner->pending_dns_requests;
  116. lastnext = &dreq->owner->pending_dns_requests;
  117. while (ptr) {
  118. if (ptr == dreq) {
  119. *lastnext = ptr->next;
  120. break;
  121. } else {
  122. lastnext = &ptr->next;
  123. ptr = ptr->next;
  124. }
  125. }
  126. free(dreq);
  127. }
  128. static void AnodeSystemTransport__dns_thread_main(void *_dreq)
  129. {
  130. struct AnodeSystemTransport__dns_request *dreq = (struct AnodeSystemTransport__dns_request *)_dreq;
  131. dreq->owner->interface.invoke((AnodeTransport *)dreq->owner,dreq,&AnodeSystemTransport__dns_invoke_on_completion);
  132. }
  133. static void AnodeSystemTransport__do_close(struct AnodeSystemTransport *transport,struct AnodeSystemTransport_AnodeSocket *sock,const int error_code,const int generate_event)
  134. {
  135. AnodeEvent evbuf;
  136. int fd;
  137. if (sock->base.class_name == AnodeSystemTransport_CLASS) {
  138. #ifdef ANODE_USE_SELECT
  139. fd = (int)(transport->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx]);
  140. #else
  141. fd = transport->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx].fd;
  142. #endif
  143. if ((sock->base.type == ANODE_SOCKET_STREAM_CONNECTION)&&(sock->base.state != ANODE_SOCKET_CLOSED)) {
  144. sock->base.state = ANODE_SOCKET_CLOSED;
  145. if (generate_event) {
  146. evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_CLOSED;
  147. evbuf.transport = (AnodeTransport *)transport;
  148. evbuf.sock = (AnodeSocket *)sock;
  149. evbuf.datagram_from = NULL;
  150. evbuf.dns_name = NULL;
  151. evbuf.dns_addresses = NULL;
  152. evbuf.dns_address_count = 0;
  153. evbuf.error_code = error_code;
  154. evbuf.data_length = 0;
  155. evbuf.data = NULL;
  156. if (sock->base.event_handler)
  157. sock->base.event_handler(&evbuf);
  158. else if (transport->default_event_handler)
  159. transport->default_event_handler(&evbuf);
  160. }
  161. }
  162. AnodeSystemTransport__close_socket(fd);
  163. AnodeSystemTransport__remove_entry(transport,((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx);
  164. #ifdef ANODE_USE_SELECT
  165. FD_CLR(sock,&THIS->readfds);
  166. FD_CLR(sock,&THIS->writefds);
  167. #endif
  168. } else transport->base->close(transport->base,(AnodeSocket *)sock);
  169. }
  170. static int AnodeSystemTransport__populate_network_endpoint(const struct sockaddr_storage *saddr,AnodeNetworkEndpoint *ep)
  171. {
  172. switch(saddr->ss_family) {
  173. case AF_INET:
  174. ep->address.type = ANODE_NETWORK_ADDRESS_IPV4;
  175. *((uint32_t *)ep->address.bits) = ((struct sockaddr_in *)saddr)->sin_addr.s_addr;
  176. ep->port = ntohs(((struct sockaddr_in *)saddr)->sin_port);
  177. return 1;
  178. case AF_INET6:
  179. ep->address.type = ANODE_NETWORK_ADDRESS_IPV6;
  180. Anode_memcpy(ep->address.bits,((struct sockaddr_in6 *)saddr)->sin6_addr.s6_addr,16);
  181. ep->port = ntohs(((struct sockaddr_in6 *)saddr)->sin6_port);
  182. return 1;
  183. }
  184. return 0;
  185. }
  186. /* ======================================================================== */
  187. #ifdef THIS
  188. #undef THIS
  189. #endif
  190. #define THIS ((struct AnodeSystemTransport *)transport)
  191. static void AnodeSystemTransport_invoke(AnodeTransport *transport,
  192. void *ptr,
  193. void (*func)(void *))
  194. {
  195. void *invoke_msg[2];
  196. invoke_msg[0] = ptr;
  197. invoke_msg[1] = (void *)func;
  198. AnodeMutex_lock(&THIS->invoke_pipe_m);
  199. write(THIS->invoke_pipe[1],(void *)(&invoke_msg),sizeof(invoke_msg));
  200. AnodeMutex_unlock(&THIS->invoke_pipe_m);
  201. }
  202. static void AnodeSystemTransport_dns_resolve(AnodeTransport *transport,
  203. const char *name,
  204. void (*event_handler)(const AnodeEvent *),
  205. enum AnodeTransportDnsIncludeMode ipv4_include_mode,
  206. enum AnodeTransportDnsIncludeMode ipv6_include_mode,
  207. enum AnodeTransportDnsIncludeMode anode_include_mode)
  208. {
  209. struct AnodeSystemTransport__dns_request *dreq = malloc(sizeof(struct AnodeSystemTransport__dns_request));
  210. dreq->owner = THIS;
  211. dreq->event_handler = event_handler;
  212. Anode_str_copy(dreq->name,name,sizeof(dreq->name));
  213. dreq->ipv4_include_mode = ipv4_include_mode;
  214. dreq->ipv6_include_mode = ipv6_include_mode;
  215. dreq->anode_include_mode = anode_include_mode;
  216. dreq->address_count = 0;
  217. dreq->error_code = 0;
  218. dreq->next = THIS->pending_dns_requests;
  219. THIS->pending_dns_requests = dreq;
  220. dreq->thread = AnodeThread_create(&AnodeSystemTransport__dns_thread_main,dreq,0);
  221. }
  222. static AnodeSocket *AnodeSystemTransport_datagram_listen(AnodeTransport *transport,
  223. const AnodeNetworkAddress *local_address,
  224. int local_port,
  225. int *error_code)
  226. {
  227. struct sockaddr_in sin4;
  228. struct sockaddr_in6 sin6;
  229. struct AnodeSystemTransport_AnodeSocket *sock;
  230. unsigned int entry_idx;
  231. int fd;
  232. int tmp;
  233. switch(local_address->type) {
  234. case ANODE_NETWORK_ADDRESS_IPV4:
  235. fd = socket(AF_INET,SOCK_DGRAM,0);
  236. if (fd <= 0) {
  237. *error_code = ANODE_ERR_UNABLE_TO_BIND;
  238. return (AnodeSocket *)0;
  239. }
  240. tmp = 1;
  241. setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&tmp,sizeof(tmp));
  242. fcntl(fd,F_SETFL,O_NONBLOCK);
  243. Anode_zero(&sin4,sizeof(struct sockaddr_in));
  244. sin4.sin_family = AF_INET;
  245. sin4.sin_port = htons(local_port);
  246. sin4.sin_addr.s_addr = *((uint32_t *)local_address->bits);
  247. if (bind(fd,(const struct sockaddr *)&sin4,sizeof(sin4))) {
  248. AnodeSystemTransport__close_socket(fd);
  249. *error_code = ANODE_ERR_UNABLE_TO_BIND;
  250. return (AnodeSocket *)0;
  251. }
  252. break;
  253. case ANODE_NETWORK_ADDRESS_IPV6:
  254. fd = socket(AF_INET6,SOCK_DGRAM,0);
  255. if (fd <= 0) {
  256. *error_code = ANODE_ERR_UNABLE_TO_BIND;
  257. return (AnodeSocket *)0;
  258. }
  259. tmp = 1; setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&tmp,sizeof(tmp));
  260. fcntl(fd,F_SETFL,O_NONBLOCK);
  261. #ifdef IPV6_V6ONLY
  262. tmp = 1; setsockopt(fd,IPPROTO_IPV6,IPV6_V6ONLY,&tmp,sizeof(tmp));
  263. #endif
  264. Anode_zero(&sin6,sizeof(struct sockaddr_in6));
  265. sin6.sin6_family = AF_INET6;
  266. sin6.sin6_port = htons(local_port);
  267. Anode_memcpy(sin6.sin6_addr.s6_addr,local_address->bits,16);
  268. if (bind(fd,(const struct sockaddr *)&sin6,sizeof(sin6))) {
  269. AnodeSystemTransport__close_socket(fd);
  270. *error_code = ANODE_ERR_UNABLE_TO_BIND;
  271. return (AnodeSocket *)0;
  272. }
  273. break;
  274. default:
  275. if (THIS->base)
  276. return THIS->base->datagram_listen(THIS->base,local_address,local_port,error_code);
  277. else {
  278. *error_code = ANODE_ERR_ADDRESS_TYPE_NOT_SUPPORTED;
  279. return (AnodeSocket *)0;
  280. }
  281. }
  282. entry_idx = AnodeSystemTransport__add_entry(THIS);
  283. sock = &(THIS->sockets[entry_idx]);
  284. sock->base.type = ANODE_SOCKET_DATAGRAM;
  285. sock->base.state = ANODE_SOCKET_OPEN;
  286. Anode_memcpy(&sock->base.endpoint.address,local_address,sizeof(AnodeNetworkAddress));
  287. sock->base.endpoint.port = local_port;
  288. sock->base.class_name = AnodeSystemTransport_CLASS;
  289. sock->base.user_ptr[0] = NULL;
  290. sock->base.user_ptr[1] = NULL;
  291. sock->base.event_handler = NULL;
  292. sock->entry_idx = entry_idx;
  293. THIS->fds[entry_idx].fd = fd;
  294. THIS->fds[entry_idx].events = POLLIN;
  295. THIS->fds[entry_idx].revents = 0;
  296. *error_code = 0;
  297. return (AnodeSocket *)sock;
  298. }
  299. static AnodeSocket *AnodeSystemTransport_stream_listen(AnodeTransport *transport,
  300. const AnodeNetworkAddress *local_address,
  301. int local_port,
  302. int *error_code)
  303. {
  304. struct sockaddr_in sin4;
  305. struct sockaddr_in6 sin6;
  306. struct AnodeSystemTransport_AnodeSocket *sock;
  307. unsigned int entry_idx;
  308. int fd;
  309. int tmp;
  310. switch(local_address->type) {
  311. case ANODE_NETWORK_ADDRESS_IPV4:
  312. fd = socket(AF_INET,SOCK_STREAM,0);
  313. if (fd < 0) {
  314. *error_code = ANODE_ERR_UNABLE_TO_BIND;
  315. return (AnodeSocket *)0;
  316. }
  317. fcntl(fd,F_SETFL,O_NONBLOCK);
  318. Anode_zero(&sin4,sizeof(struct sockaddr_in));
  319. sin4.sin_family = AF_INET;
  320. sin4.sin_port = htons(local_port);
  321. sin4.sin_addr.s_addr = *((uint32_t *)local_address->bits);
  322. if (bind(fd,(const struct sockaddr *)&sin4,sizeof(sin4))) {
  323. AnodeSystemTransport__close_socket(fd);
  324. *error_code = ANODE_ERR_UNABLE_TO_BIND;
  325. return (AnodeSocket *)0;
  326. }
  327. if (listen(fd,8)) {
  328. AnodeSystemTransport__close_socket(fd);
  329. *error_code = ANODE_ERR_UNABLE_TO_BIND;
  330. return (AnodeSocket *)0;
  331. }
  332. break;
  333. case ANODE_NETWORK_ADDRESS_IPV6:
  334. fd = socket(AF_INET6,SOCK_STREAM,0);
  335. if (fd < 0) {
  336. *error_code = ANODE_ERR_UNABLE_TO_BIND;
  337. return (AnodeSocket *)0;
  338. }
  339. fcntl(fd,F_SETFL,O_NONBLOCK);
  340. #ifdef IPV6_V6ONLY
  341. tmp = 1; setsockopt(fd,IPPROTO_IPV6,IPV6_V6ONLY,&tmp,sizeof(tmp));
  342. #endif
  343. Anode_zero(&sin6,sizeof(struct sockaddr_in6));
  344. sin6.sin6_family = AF_INET6;
  345. sin6.sin6_port = htons(local_port);
  346. Anode_memcpy(sin6.sin6_addr.s6_addr,local_address->bits,16);
  347. if (bind(fd,(const struct sockaddr *)&sin6,sizeof(sin6))) {
  348. AnodeSystemTransport__close_socket(fd);
  349. *error_code = ANODE_ERR_UNABLE_TO_BIND;
  350. return (AnodeSocket *)0;
  351. }
  352. if (listen(fd,8)) {
  353. AnodeSystemTransport__close_socket(fd);
  354. *error_code = ANODE_ERR_UNABLE_TO_BIND;
  355. return (AnodeSocket *)0;
  356. }
  357. break;
  358. default:
  359. if (THIS->base)
  360. return THIS->base->stream_listen(THIS->base,local_address,local_port,error_code);
  361. else {
  362. *error_code = ANODE_ERR_ADDRESS_TYPE_NOT_SUPPORTED;
  363. return (AnodeSocket *)0;
  364. }
  365. }
  366. entry_idx = AnodeSystemTransport__add_entry(THIS);
  367. sock = &(THIS->sockets[entry_idx]);
  368. sock->base.type = ANODE_SOCKET_STREAM_LISTEN;
  369. sock->base.state = ANODE_SOCKET_OPEN;
  370. Anode_memcpy(&sock->base.endpoint.address,local_address,sizeof(AnodeNetworkAddress));
  371. sock->base.endpoint.port = local_port;
  372. sock->base.class_name = AnodeSystemTransport_CLASS;
  373. sock->base.user_ptr[0] = NULL;
  374. sock->base.user_ptr[1] = NULL;
  375. sock->base.event_handler = NULL;
  376. sock->entry_idx = entry_idx;
  377. THIS->fds[entry_idx].fd = fd;
  378. THIS->fds[entry_idx].events = POLLIN;
  379. THIS->fds[entry_idx].revents = 0;
  380. *error_code = 0;
  381. return (AnodeSocket *)sock;
  382. }
  383. static int AnodeSystemTransport_datagram_send(AnodeTransport *transport,
  384. AnodeSocket *sock,
  385. const void *data,
  386. int data_len,
  387. const AnodeNetworkEndpoint *to_endpoint)
  388. {
  389. struct sockaddr_in sin4;
  390. struct sockaddr_in6 sin6;
  391. #ifdef ANODE_USE_SELECT
  392. const int fd = (int)(THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx]);
  393. #else
  394. const int fd = THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx].fd;
  395. #endif
  396. switch(to_endpoint->address.type) {
  397. case ANODE_NETWORK_ADDRESS_IPV4:
  398. Anode_zero(&sin4,sizeof(struct sockaddr_in));
  399. sin4.sin_family = AF_INET;
  400. sin4.sin_port = htons((uint16_t)to_endpoint->port);
  401. sin4.sin_addr.s_addr = *((uint32_t *)to_endpoint->address.bits);
  402. sendto(fd,data,data_len,0,(struct sockaddr *)&sin4,sizeof(sin4));
  403. return 0;
  404. case ANODE_NETWORK_ADDRESS_IPV6:
  405. Anode_zero(&sin6,sizeof(struct sockaddr_in6));
  406. sin6.sin6_family = AF_INET6;
  407. sin6.sin6_port = htons((uint16_t)to_endpoint->port);
  408. Anode_memcpy(sin6.sin6_addr.s6_addr,to_endpoint->address.bits,16);
  409. sendto(fd,data,data_len,0,(struct sockaddr *)&sin6,sizeof(sin6));
  410. return 0;
  411. default:
  412. if (THIS->base)
  413. return THIS->base->datagram_send(THIS->base,sock,data,data_len,to_endpoint);
  414. else return ANODE_ERR_ADDRESS_TYPE_NOT_SUPPORTED;
  415. }
  416. }
  417. static AnodeSocket *AnodeSystemTransport_stream_connect(AnodeTransport *transport,
  418. const AnodeNetworkEndpoint *to_endpoint,
  419. int *error_code)
  420. {
  421. struct sockaddr_in sin4;
  422. struct sockaddr_in6 sin6;
  423. struct AnodeSystemTransport_AnodeSocket *sock;
  424. unsigned int entry_idx;
  425. int fd;
  426. switch(to_endpoint->address.type) {
  427. case ANODE_NETWORK_ADDRESS_IPV4:
  428. Anode_zero(&sin4,sizeof(struct sockaddr_in));
  429. sin4.sin_family = AF_INET;
  430. sin4.sin_port = htons(to_endpoint->port);
  431. sin4.sin_addr.s_addr = *((uint32_t *)to_endpoint->address.bits);
  432. fd = socket(AF_INET,SOCK_STREAM,0);
  433. if (fd < 0) {
  434. *error_code = ANODE_ERR_ADDRESS_TYPE_NOT_SUPPORTED;
  435. return (AnodeSocket *)0;
  436. }
  437. fcntl(fd,F_SETFL,O_NONBLOCK);
  438. if (connect(fd,(struct sockaddr *)&sin4,sizeof(sin4))) {
  439. if (errno != EINPROGRESS) {
  440. *error_code = ANODE_ERR_CONNECT_FAILED;
  441. AnodeSystemTransport__close_socket(fd);
  442. return (AnodeSocket *)0;
  443. }
  444. }
  445. break;
  446. case ANODE_NETWORK_ADDRESS_IPV6:
  447. Anode_zero(&sin6,sizeof(struct sockaddr_in6));
  448. sin6.sin6_family = AF_INET6;
  449. sin6.sin6_port = htons(to_endpoint->port);
  450. Anode_memcpy(sin6.sin6_addr.s6_addr,to_endpoint->address.bits,16);
  451. fd = socket(AF_INET6,SOCK_STREAM,0);
  452. if (fd < 0) {
  453. *error_code = ANODE_ERR_ADDRESS_TYPE_NOT_SUPPORTED;
  454. return (AnodeSocket *)0;
  455. }
  456. fcntl(fd,F_SETFL,O_NONBLOCK);
  457. if (connect(fd,(struct sockaddr *)&sin6,sizeof(sin6))) {
  458. if (errno == EINPROGRESS) {
  459. *error_code = ANODE_ERR_CONNECT_FAILED;
  460. AnodeSystemTransport__close_socket(fd);
  461. return (AnodeSocket *)0;
  462. }
  463. }
  464. break;
  465. default:
  466. if (THIS->base)
  467. return THIS->base->stream_connect(THIS->base,to_endpoint,error_code);
  468. else {
  469. *error_code = ANODE_ERR_ADDRESS_TYPE_NOT_SUPPORTED;
  470. return (AnodeSocket *)0;
  471. }
  472. }
  473. entry_idx = AnodeSystemTransport__add_entry(THIS);
  474. sock = &(THIS->sockets[entry_idx]);
  475. sock->base.type = ANODE_SOCKET_STREAM_CONNECTION;
  476. sock->base.state = ANODE_SOCKET_CONNECTING;
  477. Anode_memcpy(&sock->base.endpoint,to_endpoint,sizeof(AnodeNetworkEndpoint));
  478. sock->base.class_name = AnodeSystemTransport_CLASS;
  479. sock->base.user_ptr[0] = NULL;
  480. sock->base.user_ptr[1] = NULL;
  481. sock->base.event_handler = NULL;
  482. sock->entry_idx = entry_idx;
  483. THIS->fds[entry_idx].fd = fd;
  484. THIS->fds[entry_idx].events = POLLIN|POLLOUT;
  485. THIS->fds[entry_idx].revents = 0;
  486. return (AnodeSocket *)sock;
  487. }
  488. static void AnodeSystemTransport_stream_start_writing(AnodeTransport *transport,
  489. AnodeSocket *sock)
  490. {
  491. if ((sock->type == ANODE_SOCKET_STREAM_CONNECTION)&&(((struct AnodeSystemTransport_AnodeSocket *)sock)->base.state == ANODE_SOCKET_OPEN)) {
  492. if (sock->class_name == AnodeSystemTransport_CLASS) {
  493. #ifdef ANODE_USE_SELECT
  494. FD_SET((int)(THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx]),&THIS->writefds);
  495. #else
  496. THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx].events = (POLLIN|POLLOUT);
  497. #endif
  498. } else THIS->base->stream_start_writing(THIS->base,sock);
  499. }
  500. }
  501. static void AnodeSystemTransport_stream_stop_writing(AnodeTransport *transport,
  502. AnodeSocket *sock)
  503. {
  504. if ((sock->type == ANODE_SOCKET_STREAM_CONNECTION)&&(((struct AnodeSystemTransport_AnodeSocket *)sock)->base.state == ANODE_SOCKET_OPEN)) {
  505. if (sock->class_name == AnodeSystemTransport_CLASS) {
  506. #ifdef ANODE_USE_SELECT
  507. FD_CLR((int)(THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx]),&THIS->writefds);
  508. #else
  509. THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx].events = POLLIN;
  510. #endif
  511. } else THIS->base->stream_stop_writing(THIS->base,sock);
  512. }
  513. }
  514. static int AnodeSystemTransport_stream_send(AnodeTransport *transport,
  515. AnodeSocket *sock,
  516. const void *data,
  517. int data_len)
  518. {
  519. int result;
  520. if (sock->type == ANODE_SOCKET_STREAM_CONNECTION) {
  521. if (sock->class_name == AnodeSystemTransport_CLASS) {
  522. if (((struct AnodeSystemTransport_AnodeSocket *)sock)->base.state != ANODE_SOCKET_OPEN)
  523. return ANODE_ERR_CONNECTION_CLOSED;
  524. #ifdef ANODE_USE_SELECT
  525. result = send((int)(THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx]),data,data_len,0);
  526. #else
  527. result = send(THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx].fd,data,data_len,0);
  528. #endif
  529. if (result >= 0)
  530. return result;
  531. else {
  532. AnodeSystemTransport__do_close(THIS,(struct AnodeSystemTransport_AnodeSocket *)sock,ANODE_ERR_CONNECTION_CLOSED_BY_REMOTE,1);
  533. return ANODE_ERR_CONNECTION_CLOSED;
  534. }
  535. } else return THIS->base->stream_send(THIS->base,sock,data,data_len);
  536. } else return ANODE_ERR_INVALID_ARGUMENT;
  537. }
  538. static void AnodeSystemTransport_close(AnodeTransport *transport,
  539. AnodeSocket *sock)
  540. {
  541. AnodeSystemTransport__do_close(THIS,(struct AnodeSystemTransport_AnodeSocket *)sock,0,1);
  542. }
  543. static void AnodeSystemTransport__poll_do_read_datagram(struct AnodeSystemTransport *transport,int fd,struct AnodeSystemTransport_AnodeSocket *sock)
  544. {
  545. char buf[16384];
  546. struct sockaddr_storage fromaddr;
  547. AnodeNetworkEndpoint tmp_ep;
  548. AnodeEvent evbuf;
  549. socklen_t addrlen;
  550. int n;
  551. addrlen = sizeof(struct sockaddr_storage);
  552. n = recvfrom(fd,buf,sizeof(buf),0,(struct sockaddr *)&fromaddr,&addrlen);
  553. if ((n >= 0)&&(AnodeSystemTransport__populate_network_endpoint(&fromaddr,&tmp_ep))) {
  554. evbuf.type = ANODE_TRANSPORT_EVENT_DATAGRAM_RECEIVED;
  555. evbuf.transport = (AnodeTransport *)transport;
  556. evbuf.sock = (AnodeSocket *)sock;
  557. evbuf.datagram_from = &tmp_ep;
  558. evbuf.dns_name = NULL;
  559. evbuf.dns_addresses = NULL;
  560. evbuf.dns_address_count = 0;
  561. evbuf.error_code = 0;
  562. evbuf.data_length = n;
  563. evbuf.data = buf;
  564. if (sock->base.event_handler)
  565. sock->base.event_handler(&evbuf);
  566. else if (transport->default_event_handler)
  567. transport->default_event_handler(&evbuf);
  568. }
  569. }
  570. static void AnodeSystemTransport__poll_do_accept_incoming_connection(struct AnodeSystemTransport *transport,int fd,struct AnodeSystemTransport_AnodeSocket *sock)
  571. {
  572. struct sockaddr_storage fromaddr;
  573. AnodeNetworkEndpoint tmp_ep;
  574. AnodeEvent evbuf;
  575. struct AnodeSystemTransport_AnodeSocket *newsock;
  576. socklen_t addrlen;
  577. int n;
  578. unsigned int entry_idx;
  579. addrlen = sizeof(struct sockaddr_storage);
  580. n = accept(fd,(struct sockaddr *)&fromaddr,&addrlen);
  581. if ((n >= 0)&&(AnodeSystemTransport__populate_network_endpoint(&fromaddr,&tmp_ep))) {
  582. entry_idx = AnodeSystemTransport__add_entry(transport);
  583. newsock = &(transport->sockets[entry_idx]);
  584. newsock->base.type = ANODE_SOCKET_STREAM_CONNECTION;
  585. newsock->base.state = ANODE_SOCKET_OPEN;
  586. Anode_memcpy(&newsock->base.endpoint,&tmp_ep,sizeof(AnodeNetworkEndpoint));
  587. newsock->base.class_name = AnodeSystemTransport_CLASS;
  588. newsock->base.user_ptr[0] = NULL;
  589. newsock->base.user_ptr[1] = NULL;
  590. newsock->base.event_handler = NULL;
  591. newsock->entry_idx = entry_idx;
  592. THIS->fds[entry_idx].fd = n;
  593. THIS->fds[entry_idx].events = POLLIN;
  594. THIS->fds[entry_idx].revents = 0;
  595. evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_INCOMING_CONNECT;
  596. evbuf.transport = (AnodeTransport *)transport;
  597. evbuf.sock = (AnodeSocket *)newsock;
  598. evbuf.datagram_from = NULL;
  599. evbuf.dns_name = NULL;
  600. evbuf.dns_addresses = NULL;
  601. evbuf.dns_address_count = 0;
  602. evbuf.error_code = 0;
  603. evbuf.data_length = 0;
  604. evbuf.data = NULL;
  605. if (sock->base.event_handler)
  606. sock->base.event_handler(&evbuf);
  607. else if (transport->default_event_handler)
  608. transport->default_event_handler(&evbuf);
  609. }
  610. }
  611. static void AnodeSystemTransport__poll_do_read_stream(struct AnodeSystemTransport *transport,int fd,struct AnodeSystemTransport_AnodeSocket *sock)
  612. {
  613. char buf[65536];
  614. AnodeEvent evbuf;
  615. int n;
  616. n = recv(fd,buf,sizeof(buf),0);
  617. if (n > 0) {
  618. evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_DATA_RECEIVED;
  619. evbuf.transport = (AnodeTransport *)transport;
  620. evbuf.sock = (AnodeSocket *)sock;
  621. evbuf.datagram_from = NULL;
  622. evbuf.dns_name = NULL;
  623. evbuf.dns_addresses = NULL;
  624. evbuf.dns_address_count = 0;
  625. evbuf.error_code = 0;
  626. evbuf.data_length = n;
  627. evbuf.data = buf;
  628. if (sock->base.event_handler)
  629. sock->base.event_handler(&evbuf);
  630. else if (transport->default_event_handler)
  631. transport->default_event_handler(&evbuf);
  632. } else AnodeSystemTransport__do_close(transport,sock,ANODE_ERR_CONNECTION_CLOSED_BY_REMOTE,1);
  633. }
  634. static void AnodeSystemTransport__poll_do_stream_available_for_write(struct AnodeSystemTransport *transport,int fd,struct AnodeSystemTransport_AnodeSocket *sock)
  635. {
  636. AnodeEvent evbuf;
  637. evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_DATA_RECEIVED;
  638. evbuf.transport = (AnodeTransport *)transport;
  639. evbuf.sock = (AnodeSocket *)sock;
  640. evbuf.datagram_from = NULL;
  641. evbuf.dns_name = NULL;
  642. evbuf.dns_addresses = NULL;
  643. evbuf.dns_address_count = 0;
  644. evbuf.error_code = 0;
  645. evbuf.data_length = 0;
  646. evbuf.data = NULL;
  647. if (sock->base.event_handler)
  648. sock->base.event_handler(&evbuf);
  649. else if (transport->default_event_handler)
  650. transport->default_event_handler(&evbuf);
  651. }
  652. static void AnodeSystemTransport__poll_do_outgoing_connect(struct AnodeSystemTransport *transport,int fd,struct AnodeSystemTransport_AnodeSocket *sock)
  653. {
  654. AnodeEvent evbuf;
  655. int err_code;
  656. socklen_t optlen;
  657. optlen = sizeof(err_code);
  658. if (getsockopt(fd,SOL_SOCKET,SO_ERROR,(void *)&err_code,&optlen)) {
  659. /* Error getting result, so we assume a failure */
  660. evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_OUTGOING_CONNECT_FAILED;
  661. evbuf.transport = (AnodeTransport *)transport;
  662. evbuf.sock = (AnodeSocket *)sock;
  663. evbuf.datagram_from = NULL;
  664. evbuf.dns_name = NULL;
  665. evbuf.dns_addresses = NULL;
  666. evbuf.dns_address_count = 0;
  667. evbuf.error_code = ANODE_ERR_CONNECT_FAILED;
  668. evbuf.data_length = 0;
  669. evbuf.data = NULL;
  670. AnodeSystemTransport__do_close(transport,sock,0,0);
  671. } else if (err_code) {
  672. /* Error code is nonzero, so connect failed */
  673. evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_OUTGOING_CONNECT_FAILED;
  674. evbuf.transport = (AnodeTransport *)transport;
  675. evbuf.sock = (AnodeSocket *)sock;
  676. evbuf.datagram_from = NULL;
  677. evbuf.dns_name = NULL;
  678. evbuf.dns_addresses = NULL;
  679. evbuf.dns_address_count = 0;
  680. evbuf.error_code = ANODE_ERR_CONNECT_FAILED;
  681. evbuf.data_length = 0;
  682. evbuf.data = NULL;
  683. AnodeSystemTransport__do_close(transport,sock,0,0);
  684. } else {
  685. /* Connect succeeded */
  686. evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_OUTGOING_CONNECT_ESTABLISHED;
  687. evbuf.transport = (AnodeTransport *)transport;
  688. evbuf.sock = (AnodeSocket *)sock;
  689. evbuf.datagram_from = NULL;
  690. evbuf.dns_name = NULL;
  691. evbuf.dns_addresses = NULL;
  692. evbuf.dns_address_count = 0;
  693. evbuf.error_code = 0;
  694. evbuf.data_length = 0;
  695. evbuf.data = NULL;
  696. }
  697. if (sock->base.event_handler)
  698. sock->base.event_handler(&evbuf);
  699. else if (transport->default_event_handler)
  700. transport->default_event_handler(&evbuf);
  701. }
  702. static int AnodeSystemTransport_poll(AnodeTransport *transport)
  703. {
  704. int timeout = -1;
  705. unsigned int fd_idx;
  706. int event_count = 0;
  707. int n;
  708. if (poll((struct pollfd *)THIS->fds,THIS->fd_count,timeout) > 0) {
  709. for(fd_idx=0;fd_idx<THIS->fd_count;++fd_idx) {
  710. if ((THIS->fds[fd_idx].revents & (POLLERR|POLLHUP|POLLNVAL))) {
  711. if (THIS->sockets[fd_idx].base.type == ANODE_SOCKET_STREAM_CONNECTION) {
  712. if (THIS->sockets[fd_idx].base.state == ANODE_SOCKET_CONNECTING)
  713. AnodeSystemTransport__poll_do_outgoing_connect(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]);
  714. else AnodeSystemTransport__do_close(THIS,&THIS->sockets[fd_idx],ANODE_ERR_CONNECTION_CLOSED_BY_REMOTE,1);
  715. ++event_count;
  716. }
  717. } else {
  718. if ((THIS->fds[fd_idx].revents & POLLIN)) {
  719. if (THIS->fds[fd_idx].fd == THIS->invoke_pipe[0]) {
  720. n = read(THIS->invoke_pipe[0],&(((unsigned char *)(&(THIS->invoke_pipe_buf)))[THIS->invoke_pipe_buf_ptr]),sizeof(THIS->invoke_pipe_buf) - THIS->invoke_pipe_buf_ptr);
  721. if (n > 0) {
  722. THIS->invoke_pipe_buf_ptr += (unsigned int)n;
  723. if (THIS->invoke_pipe_buf_ptr >= sizeof(THIS->invoke_pipe_buf)) {
  724. THIS->invoke_pipe_buf_ptr -= sizeof(THIS->invoke_pipe_buf);
  725. ((void (*)(void *))(THIS->invoke_pipe_buf[1]))(THIS->invoke_pipe_buf[0]);
  726. }
  727. }
  728. } else {
  729. switch(THIS->sockets[fd_idx].base.type) {
  730. case ANODE_SOCKET_DATAGRAM:
  731. AnodeSystemTransport__poll_do_read_datagram(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]);
  732. break;
  733. case ANODE_SOCKET_STREAM_LISTEN:
  734. AnodeSystemTransport__poll_do_accept_incoming_connection(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]);
  735. break;
  736. case ANODE_SOCKET_STREAM_CONNECTION:
  737. if (THIS->sockets[fd_idx].base.state == ANODE_SOCKET_CONNECTING)
  738. AnodeSystemTransport__poll_do_outgoing_connect(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]);
  739. else AnodeSystemTransport__poll_do_read_stream(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]);
  740. break;
  741. }
  742. ++event_count;
  743. }
  744. }
  745. if ((THIS->fds[fd_idx].revents & POLLOUT)) {
  746. if (THIS->sockets[fd_idx].base.state == ANODE_SOCKET_CONNECTING)
  747. AnodeSystemTransport__poll_do_outgoing_connect(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]);
  748. else AnodeSystemTransport__poll_do_stream_available_for_write(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]);
  749. ++event_count;
  750. }
  751. }
  752. }
  753. }
  754. return event_count;
  755. }
  756. static int AnodeSystemTransport_supports_address_type(const AnodeTransport *transport,
  757. enum AnodeNetworkAddressType at)
  758. {
  759. switch(at) {
  760. case ANODE_NETWORK_ADDRESS_IPV4:
  761. return 1;
  762. case ANODE_NETWORK_ADDRESS_IPV6:
  763. return 1;
  764. default:
  765. if (THIS->base)
  766. return THIS->base->supports_address_type(THIS->base,at);
  767. return 0;
  768. }
  769. }
  770. static AnodeTransport *AnodeSystemTransport_base_instance(const AnodeTransport *transport)
  771. {
  772. return THIS->base;
  773. }
  774. static const char *AnodeSystemTransport_class_name(AnodeTransport *transport)
  775. {
  776. return AnodeSystemTransport_CLASS;
  777. }
  778. static void AnodeSystemTransport_delete(AnodeTransport *transport)
  779. {
  780. close(THIS->invoke_pipe[0]);
  781. close(THIS->invoke_pipe[1]);
  782. AnodeMutex_destroy(&THIS->invoke_pipe_m);
  783. if (THIS->fds) free(THIS->fds);
  784. if (THIS->sockets) free(THIS->sockets);
  785. if (THIS->base) THIS->base->delete(THIS->base);
  786. free(transport);
  787. }
  788. /* ======================================================================== */
  789. AnodeTransport *AnodeSystemTransport_new(AnodeTransport *base)
  790. {
  791. struct AnodeSystemTransport *t;
  792. unsigned int entry_idx;
  793. t = malloc(sizeof(struct AnodeSystemTransport));
  794. if (!t) return (AnodeTransport *)0;
  795. Anode_zero(t,sizeof(struct AnodeSystemTransport));
  796. t->interface.invoke = &AnodeSystemTransport_invoke;
  797. t->interface.dns_resolve = &AnodeSystemTransport_dns_resolve;
  798. t->interface.datagram_listen = &AnodeSystemTransport_datagram_listen;
  799. t->interface.stream_listen = &AnodeSystemTransport_stream_listen;
  800. t->interface.datagram_send = &AnodeSystemTransport_datagram_send;
  801. t->interface.stream_connect = &AnodeSystemTransport_stream_connect;
  802. t->interface.stream_start_writing = &AnodeSystemTransport_stream_start_writing;
  803. t->interface.stream_stop_writing = &AnodeSystemTransport_stream_stop_writing;
  804. t->interface.stream_send = &AnodeSystemTransport_stream_send;
  805. t->interface.close = &AnodeSystemTransport_close;
  806. t->interface.poll = &AnodeSystemTransport_poll;
  807. t->interface.supports_address_type = &AnodeSystemTransport_supports_address_type;
  808. t->interface.base_instance = &AnodeSystemTransport_base_instance;
  809. t->interface.class_name = &AnodeSystemTransport_class_name;
  810. t->interface.delete = &AnodeSystemTransport_delete;
  811. t->base = base;
  812. pipe(t->invoke_pipe);
  813. fcntl(t->invoke_pipe[0],F_SETFL,O_NONBLOCK);
  814. entry_idx = AnodeSystemTransport__add_entry(t);
  815. t->fds[entry_idx].fd = t->invoke_pipe[0];
  816. t->fds[entry_idx].events = POLLIN;
  817. t->fds[entry_idx].revents = 0;
  818. AnodeMutex_init(&t->invoke_pipe_m);
  819. return (AnodeTransport *)t;
  820. }