RPC.c 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. #include <stdio.h>
  2. #include <unistd.h>
  3. #include <sys/un.h>
  4. #include <pthread.h>
  5. #include <errno.h>
  6. #include <sys/syscall.h>
  7. #include <sys/socket.h>
  8. #include <strings.h>
  9. #include "RPC.h"
  10. #define RPC_FD 1023
  11. #define SERVICE_CONNECT_ATTEMPTS 30
  12. static int instance_count;
  13. static int rpc_count;
  14. static pthread_mutex_t lock;
  15. void rpc_mutex_init() {
  16. if(pthread_mutex_init(&lock, NULL) != 0) {
  17. fprintf(stderr, "error while initializing service call mutex\n");
  18. }
  19. }
  20. void rpc_mutex_destroy() {
  21. pthread_mutex_destroy(&lock);
  22. }
  23. /*
  24. * Reads a return value from the service and sets errno (if applicable)
  25. */
  26. int get_retval(int rpc_sock)
  27. {
  28. if(rpc_sock >= 0) {
  29. int retval;
  30. int sz = sizeof(char) + sizeof(retval) + sizeof(errno);
  31. char retbuf[BUF_SZ];
  32. memset(&retbuf, 0, sz);
  33. int n_read = read(rpc_sock, &retbuf, sz);
  34. if(n_read > 0) {
  35. memcpy(&retval, &retbuf[1], sizeof(retval));
  36. memcpy(&errno, &retbuf[1+sizeof(retval)], sizeof(errno));
  37. return retval;
  38. }
  39. }
  40. return -1;
  41. }
  42. /*
  43. * Reads a new file descriptor from the service
  44. */
  45. int get_new_fd(int sock)
  46. {
  47. char buf[BUF_SZ];
  48. int newfd;
  49. ssize_t size = sock_fd_read(sock, buf, sizeof(buf), &newfd);
  50. if(size > 0){
  51. return newfd;
  52. }
  53. fprintf(stderr, "get_new_fd(): Error, unable to read fd over (%d)\n", sock);
  54. return -1;
  55. }
  56. int rpc_join(const char * sockname)
  57. {
  58. struct sockaddr_un addr;
  59. int conn_err = -1, attempts = 0;
  60. memset(&addr, 0, sizeof(addr));
  61. addr.sun_family = AF_UNIX;
  62. strncpy(addr.sun_path, sockname, sizeof(addr.sun_path)-1);
  63. int sock;
  64. if((sock = socket(AF_UNIX, SOCK_STREAM, 0)) < 0){
  65. fprintf(stderr, "Error while creating RPC socket\n");
  66. return -1;
  67. }
  68. while((conn_err != 0) && (attempts < SERVICE_CONNECT_ATTEMPTS)){
  69. if((conn_err = connect(sock, (struct sockaddr*)&addr, sizeof(addr))) != 0) {
  70. fprintf(stderr, "Error while connecting to RPC socket. Re-attempting...\n");
  71. sleep(1);
  72. }
  73. else {
  74. int newfd = dup2(sock, RPC_FD-instance_count);
  75. close(sock);
  76. return newfd;
  77. }
  78. attempts++;
  79. }
  80. return -1;
  81. }
  82. /*
  83. * Send a command to the service
  84. */
  85. int rpc_send_command(int cmd, int rpc_sock, void *data, int len)
  86. {
  87. char cmdbuf[BUF_SZ];
  88. cmdbuf[0] = cmd;
  89. memcpy(&cmdbuf[1], data, len);
  90. pthread_mutex_lock(&lock);
  91. char metabuf[BUF_SZ]; // portion of buffer which contains RPC metadata for debugging
  92. #ifdef VERBOSE
  93. /*
  94. #define IDX_PID 0
  95. #define IDX_TID sizeof(pid_t)
  96. #define IDX_COUNT IDX_TID + sizeof(pid_t)
  97. #define IDX_TIME IDX_COUNT + sizeof(int)
  98. #define IDX_CMD IDX_TIME + 20 // 20 being the length of the timestamp string
  99. #define IDX_PAYLOAD IDX_TIME + sizeof(char)
  100. */
  101. /* [pid_t] [pid_t] [rpc_count] [int] [...] */
  102. memset(metabuf, 0, BUF_SZ);
  103. pid_t pid = syscall(SYS_getpid);
  104. pid_t tid = syscall(SYS_gettid);
  105. rpc_count++;
  106. char timestring[20];
  107. time_t timestamp;
  108. timestamp = time(NULL);
  109. strftime(timestring, sizeof(timestring), "%H:%M:%S", localtime(&timestamp));
  110. memcpy(&metabuf[IDX_PID], &pid, sizeof(pid_t) ); /* pid */
  111. memcpy(&metabuf[IDX_TID], &tid, sizeof(pid_t) ); /* tid */
  112. memcpy(&metabuf[IDX_COUNT], &rpc_count, sizeof(rpc_count) ); /* rpc_count */
  113. memcpy(&metabuf[IDX_TIME], &timestring, 20 ); /* timestamp */
  114. #endif
  115. /* Combine command flag+payload with RPC metadata */
  116. memcpy(&metabuf[IDX_PAYLOAD], cmdbuf, len);
  117. int n_write = write(rpc_sock, &metabuf, BUF_SZ);
  118. if(n_write < 0) {
  119. fprintf(stderr, "Error writing command to service (CMD = %d)\n", cmdbuf[0]);
  120. errno = 0;
  121. }
  122. int ret = ERR_OK;
  123. if(n_write > 0) {
  124. if(cmdbuf[0]==RPC_SOCKET) {
  125. ret = get_new_fd(rpc_sock);
  126. }
  127. if(cmdbuf[0]==RPC_MAP_REQ
  128. || cmdbuf[0]==RPC_CONNECT
  129. || cmdbuf[0]==RPC_BIND
  130. || cmdbuf[0]==RPC_LISTEN
  131. || cmdbuf[0]==RPC_MAP) {
  132. ret = get_retval(rpc_sock);
  133. }
  134. if(cmdbuf[0]==RPC_GETSOCKNAME) {
  135. ret = n_write;
  136. }
  137. }
  138. else {
  139. ret = -1;
  140. }
  141. pthread_mutex_unlock(&lock);
  142. return ret;
  143. }
  144. /*
  145. * Send file descriptor
  146. */
  147. ssize_t sock_fd_write(int sock, int fd)
  148. {
  149. ssize_t size;
  150. struct msghdr msg;
  151. struct iovec iov;
  152. char buf = '\0';
  153. int buflen = 1;
  154. union {
  155. struct cmsghdr cmsghdr;
  156. char control[CMSG_SPACE(sizeof (int))];
  157. } cmsgu;
  158. struct cmsghdr *cmsg;
  159. iov.iov_base = &buf;
  160. iov.iov_len = buflen;
  161. msg.msg_name = NULL;
  162. msg.msg_namelen = 0;
  163. msg.msg_iov = &iov;
  164. msg.msg_iovlen = 1;
  165. if (fd != -1) {
  166. msg.msg_control = cmsgu.control;
  167. msg.msg_controllen = sizeof(cmsgu.control);
  168. cmsg = CMSG_FIRSTHDR(&msg);
  169. cmsg->cmsg_len = CMSG_LEN(sizeof (int));
  170. cmsg->cmsg_level = SOL_SOCKET;
  171. cmsg->cmsg_type = SCM_RIGHTS;
  172. *((int *) CMSG_DATA(cmsg)) = fd;
  173. } else {
  174. msg.msg_control = NULL;
  175. msg.msg_controllen = 0;
  176. }
  177. size = sendmsg(sock, &msg, 0);
  178. if (size < 0)
  179. perror ("sendmsg");
  180. return size;
  181. }
  182. /*
  183. * Read a file descriptor
  184. */
  185. ssize_t sock_fd_read(int sock, void *buf, ssize_t bufsize, int *fd)
  186. {
  187. ssize_t size;
  188. if (fd) {
  189. struct msghdr msg;
  190. struct iovec iov;
  191. union {
  192. struct cmsghdr cmsghdr;
  193. char control[CMSG_SPACE(sizeof (int))];
  194. } cmsgu;
  195. struct cmsghdr *cmsg;
  196. iov.iov_base = buf;
  197. iov.iov_len = bufsize;
  198. msg.msg_name = NULL;
  199. msg.msg_namelen = 0;
  200. msg.msg_iov = &iov;
  201. msg.msg_iovlen = 1;
  202. msg.msg_control = cmsgu.control;
  203. msg.msg_controllen = sizeof(cmsgu.control);
  204. size = recvmsg (sock, &msg, 0);
  205. if (size < 0) {
  206. fprintf(stderr, "sock_fd_read(): recvmsg: Error\n");
  207. return -1;
  208. }
  209. cmsg = CMSG_FIRSTHDR(&msg);
  210. if (cmsg && cmsg->cmsg_len == CMSG_LEN(sizeof(int))) {
  211. if (cmsg->cmsg_level != SOL_SOCKET) {
  212. fprintf (stderr, "invalid cmsg_level %d\n",cmsg->cmsg_level);
  213. return -1;
  214. }
  215. if (cmsg->cmsg_type != SCM_RIGHTS) {
  216. fprintf (stderr, "invalid cmsg_type %d\n",cmsg->cmsg_type);
  217. return -1;
  218. }
  219. *fd = *((int *) CMSG_DATA(cmsg));
  220. } else *fd = -1;
  221. } else {
  222. size = read (sock, buf, bufsize);
  223. if (size < 0) {
  224. fprintf(stderr, "sock_fd_read(): read: Error\n");
  225. return -1;
  226. }
  227. }
  228. return size;
  229. }