jsonrpc_io.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. /*
  2. *
  3. * Copyright (C) 2011 Flowroute LLC (flowroute.com)
  4. *
  5. * This file is part of Kamailio, a free SIP server.
  6. *
  7. * This file is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation; either version 2 of the License, or
  10. * (at your option) any later version
  11. *
  12. *
  13. * This file is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License
  19. * along with this program; if not, write to the Free Software
  20. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  21. *
  22. */
  23. #include <stdio.h>
  24. #include <stdlib.h>
  25. #include <errno.h>
  26. #include <string.h>
  27. #include <fcntl.h>
  28. #include <event.h>
  29. #include <sys/timerfd.h>
  30. #include "../../sr_module.h"
  31. #include "../../route.h"
  32. #include "../../route_struct.h"
  33. #include "../../lvalue.h"
  34. #include "../tm/tm_load.h"
  35. #include "jsonrpc_io.h"
  36. #include "jsonrpc.h"
  37. #include "netstring.h"
  38. #define CHECK_MALLOC_VOID(p) if(!p) {LM_ERR("Out of memory!"); return;}
  39. #define CHECK_MALLOC(p) if(!p) {LM_ERR("Out of memory!"); return -1;}
  40. struct jsonrpc_server {
  41. char *host;
  42. int port, socket, status;
  43. struct jsonrpc_server *next;
  44. struct event *ev;
  45. struct itimerspec *timer;
  46. };
  47. struct jsonrpc_server_group {
  48. struct jsonrpc_server *next_server;
  49. int priority;
  50. struct jsonrpc_server_group *next_group;
  51. };
  52. struct tm_binds tmb;
  53. struct jsonrpc_server_group *server_group;
  54. void socket_cb(int fd, short event, void *arg);
  55. void cmd_pipe_cb(int fd, short event, void *arg);
  56. int set_non_blocking(int fd);
  57. int parse_servers(char *_servers, struct jsonrpc_server_group **group_ptr);
  58. int connect_servers(struct jsonrpc_server_group *group);
  59. int connect_server(struct jsonrpc_server *server);
  60. int handle_server_failure(struct jsonrpc_server *server);
  61. int jsonrpc_io_child_process(int cmd_pipe, char* _servers)
  62. {
  63. if (parse_servers(_servers, &server_group) != 0)
  64. {
  65. LM_ERR("servers parameter could not be parsed\n");
  66. return -1;
  67. }
  68. event_init();
  69. struct event pipe_ev;
  70. set_non_blocking(cmd_pipe);
  71. event_set(&pipe_ev, cmd_pipe, EV_READ | EV_PERSIST, cmd_pipe_cb, &pipe_ev);
  72. event_add(&pipe_ev, NULL);
  73. if (!connect_servers(server_group))
  74. {
  75. LM_ERR("failed to connect to any servers\n");
  76. return -1;
  77. }
  78. event_dispatch();
  79. return 0;
  80. }
  81. void timeout_cb(int fd, short event, void *arg)
  82. {
  83. LM_ERR("message timeout\n");
  84. jsonrpc_request_t *req = (jsonrpc_request_t*)arg;
  85. json_object *error = json_object_new_string("timeout");
  86. void_jsonrpc_request(req->id);
  87. close(req->timerfd);
  88. event_del(req->timer_ev);
  89. pkg_free(req->timer_ev);
  90. req->cbfunc(error, req->cbdata, 1);
  91. pkg_free(req);
  92. }
  93. int result_cb(json_object *result, char *data, int error)
  94. {
  95. struct jsonrpc_pipe_cmd *cmd = (struct jsonrpc_pipe_cmd*)data;
  96. pv_spec_t *dst = cmd->cb_pv;
  97. pv_value_t val;
  98. const char* res = json_object_get_string(result);
  99. val.rs.s = (char*)res;
  100. val.rs.len = strlen(res);
  101. val.flags = PV_VAL_STR;
  102. dst->setf(0, &dst->pvp, (int)EQ_T, &val);
  103. int n;
  104. if (error) {
  105. n = route_get(&main_rt, cmd->err_route);
  106. } else {
  107. n = route_get(&main_rt, cmd->cb_route);
  108. }
  109. struct action *a = main_rt.rlist[n];
  110. tmb.t_continue(cmd->t_hash, cmd->t_label, a);
  111. free_pipe_cmd(cmd);
  112. return 0;
  113. }
  114. int (*res_cb)(json_object*, char*, int) = &result_cb;
  115. void cmd_pipe_cb(int fd, short event, void *arg)
  116. {
  117. struct jsonrpc_pipe_cmd *cmd;
  118. char *ns = 0;
  119. size_t bytes;
  120. json_object *payload = NULL;
  121. jsonrpc_request_t *req = NULL;
  122. json_object *params;
  123. /* struct event *ev = (struct event*)arg; */
  124. if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) {
  125. LM_ERR("failed to read from command pipe: %s\n", strerror(errno));
  126. return;
  127. }
  128. params = json_tokener_parse(cmd->params);
  129. if (cmd->notify_only) {
  130. payload = build_jsonrpc_notification(cmd->method, params);
  131. } else {
  132. req = build_jsonrpc_request(cmd->method, params, (char*)cmd, res_cb);
  133. if (req)
  134. payload = req->payload;
  135. }
  136. if (!payload) {
  137. LM_ERR("Failed to build jsonrpc_request_t (method: %s, params: %s)\n", cmd->method, cmd->params);
  138. goto error;
  139. }
  140. char *json = (char*)json_object_get_string(payload);
  141. bytes = netstring_encode_new(&ns, json, (size_t)strlen(json));
  142. struct jsonrpc_server_group *g;
  143. int sent = 0;
  144. for (g = server_group; g != NULL; g = g->next_group)
  145. {
  146. struct jsonrpc_server *s, *first = NULL;
  147. for (s = g->next_server; s != first; s = s->next)
  148. {
  149. if (first == NULL) first = s;
  150. if (s->status == JSONRPC_SERVER_CONNECTED) {
  151. if (send(s->socket, ns, bytes, 0) == bytes)
  152. {
  153. sent = 1;
  154. break;
  155. } else {
  156. handle_server_failure(s);
  157. }
  158. }
  159. g->next_server = s->next;
  160. }
  161. if (sent) {
  162. break;
  163. } else {
  164. LM_WARN("Failed to send on priority group %d... proceeding to next priority group.\n", g->priority);
  165. }
  166. }
  167. if (sent && req) {
  168. int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
  169. if (timerfd == -1) {
  170. LM_ERR("Could not create timerfd.");
  171. goto error;
  172. }
  173. req->timerfd = timerfd;
  174. struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec));
  175. CHECK_MALLOC_VOID(itime);
  176. itime->it_interval.tv_sec = 0;
  177. itime->it_interval.tv_nsec = 0;
  178. itime->it_value.tv_sec = JSONRPC_TIMEOUT/1000;
  179. itime->it_value.tv_nsec = (JSONRPC_TIMEOUT % 1000) * 1000000;
  180. if (timerfd_settime(timerfd, 0, itime, NULL) == -1)
  181. {
  182. LM_ERR("Could not set timer.");
  183. goto error;
  184. }
  185. pkg_free(itime);
  186. struct event *timer_ev = pkg_malloc(sizeof(struct event));
  187. CHECK_MALLOC_VOID(timer_ev);
  188. event_set(timer_ev, timerfd, EV_READ, timeout_cb, req);
  189. if(event_add(timer_ev, NULL) == -1) {
  190. LM_ERR("event_add failed while setting request timer (%s).", strerror(errno));
  191. goto error;
  192. }
  193. req->timer_ev = timer_ev;
  194. } else if (!sent) {
  195. LM_ERR("Request could not be sent... no more failover groups.\n");
  196. if (req) {
  197. json_object *error = json_object_new_string("failure");
  198. void_jsonrpc_request(req->id);
  199. req->cbfunc(error, req->cbdata, 1);
  200. }
  201. }
  202. pkg_free(ns);
  203. json_object_put(payload);
  204. if (cmd->notify_only) free_pipe_cmd(cmd);
  205. return;
  206. error:
  207. if(ns) pkg_free(ns);
  208. if(payload) json_object_put(payload);
  209. if (cmd->notify_only) free_pipe_cmd(cmd);
  210. return;
  211. }
  212. void socket_cb(int fd, short event, void *arg)
  213. {
  214. struct jsonrpc_server *server = (struct jsonrpc_server*)arg;
  215. if (event != EV_READ) {
  216. LM_ERR("unexpected socket event (%d)\n", event);
  217. handle_server_failure(server);
  218. return;
  219. }
  220. char *netstring;
  221. int retval = netstring_read_fd(fd, &netstring);
  222. if (retval != 0) {
  223. LM_ERR("bad netstring (%d)\n", retval);
  224. handle_server_failure(server);
  225. return;
  226. }
  227. struct json_object *res = json_tokener_parse(netstring);
  228. if (res) {
  229. handle_jsonrpc_response(res);
  230. json_object_put(res);
  231. } else {
  232. LM_ERR("netstring could not be parsed: (%s)\n", netstring);
  233. handle_server_failure(server);
  234. }
  235. pkg_free(netstring);
  236. }
  237. int set_non_blocking(int fd)
  238. {
  239. int flags;
  240. flags = fcntl(fd, F_GETFL);
  241. if (flags < 0)
  242. return flags;
  243. flags |= O_NONBLOCK;
  244. if (fcntl(fd, F_SETFL, flags) < 0)
  245. return -1;
  246. return 0;
  247. }
  248. int parse_servers(char *_servers, struct jsonrpc_server_group **group_ptr)
  249. {
  250. char cpy[strlen(_servers)+1];
  251. char *servers = strcpy(cpy, _servers);
  252. struct jsonrpc_server_group *group = NULL;
  253. /* parse servers string */
  254. char *token = strtok(servers, ":");
  255. while (token != NULL)
  256. {
  257. char *host, *port_s, *priority_s, *tail;
  258. int port, priority;
  259. host = token;
  260. /* validate domain */
  261. if (!(isalpha(host[0]) || isdigit(host[0]))) {
  262. LM_ERR("invalid domain (1st char is '%c')\n", host[0]);
  263. return -1;
  264. }
  265. int i;
  266. for (i=1; i<strlen(host)-1; i++)
  267. {
  268. if(!(isalpha(host[i]) || isdigit(host[i]) || host[i] == '-' || host[i] == '.'))
  269. {
  270. LM_ERR("invalid domain (char %d is %c)\n", i, host[i]);
  271. return -1;
  272. }
  273. }
  274. if (!(isalpha(host[i]) || isdigit(host[i]))) {
  275. LM_ERR("invalid domain (char %d (last) is %c)\n", i, host[i]);
  276. return -1;
  277. }
  278. /* convert/validate port */
  279. port_s = strtok(NULL, ",");
  280. if (port_s == NULL || !(port = strtol(port_s, &tail, 0)) || strlen(tail))
  281. {
  282. LM_ERR("invalid port: %s\n", port_s);
  283. return -1;
  284. }
  285. /* convert/validate priority */
  286. priority_s = strtok(NULL, " ");
  287. if (priority_s == NULL || !(priority = strtol(priority_s, &tail, 0)) || strlen(tail))
  288. {
  289. LM_ERR("invalid priority: %s\n", priority_s);
  290. return -1;
  291. }
  292. struct jsonrpc_server *server = pkg_malloc(sizeof(struct jsonrpc_server));
  293. CHECK_MALLOC(server);
  294. memset(server, 0, sizeof(struct jsonrpc_server));
  295. char *h = pkg_malloc(strlen(host)+1);
  296. CHECK_MALLOC(h);
  297. strcpy(h,host);
  298. server->host = h;
  299. server->port = port;
  300. server->status = JSONRPC_SERVER_DISCONNECTED;
  301. server->socket = 0;
  302. int group_cnt = 0;
  303. /* search for a server group with this server's priority */
  304. struct jsonrpc_server_group *selected_group = NULL;
  305. for (selected_group=group; selected_group != NULL; selected_group=selected_group->next_group)
  306. {
  307. if (selected_group->priority == priority) break;
  308. }
  309. if (selected_group == NULL) {
  310. group_cnt++;
  311. LM_INFO("Creating group for priority %d\n", priority);
  312. /* this is the first server for this priority... link it to itself */
  313. server->next = server;
  314. selected_group = pkg_malloc(sizeof(struct jsonrpc_server_group));
  315. CHECK_MALLOC(selected_group);
  316. memset(selected_group, 0, sizeof(struct jsonrpc_server_group));
  317. selected_group->priority = priority;
  318. selected_group->next_server = server;
  319. /* insert the group properly in the linked list */
  320. struct jsonrpc_server_group *x, *pg;
  321. pg = NULL;
  322. if (group == NULL)
  323. {
  324. group = selected_group;
  325. group->next_group = NULL;
  326. } else {
  327. for (x = group; x != NULL; x = x->next_group)
  328. {
  329. if (priority > x->priority)
  330. {
  331. if (pg == NULL)
  332. {
  333. group = selected_group;
  334. } else {
  335. pg->next_group = selected_group;
  336. }
  337. selected_group->next_group = x;
  338. break;
  339. } else if (x->next_group == NULL) {
  340. x->next_group = selected_group;
  341. break;
  342. } else {
  343. pg = x;
  344. }
  345. }
  346. }
  347. } else {
  348. LM_ERR("Using existing group for priority %d\n", priority);
  349. server->next = selected_group->next_server->next;
  350. selected_group->next_server->next = server;
  351. }
  352. token = strtok(NULL, ":");
  353. }
  354. *group_ptr = group;
  355. return 0;
  356. }
  357. int connect_server(struct jsonrpc_server *server)
  358. {
  359. struct sockaddr_in server_addr;
  360. struct hostent *hp;
  361. server_addr.sin_family = AF_INET;
  362. server_addr.sin_port = htons(server->port);
  363. hp = gethostbyname(server->host);
  364. if (hp == NULL) {
  365. LM_ERR("gethostbyname(%s) failed with h_errno=%d.\n", server->host, h_errno);
  366. handle_server_failure(server);
  367. return -1;
  368. }
  369. memcpy(&(server_addr.sin_addr.s_addr), hp->h_addr, hp->h_length);
  370. int sockfd = socket(AF_INET,SOCK_STREAM,0);
  371. if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_in))) {
  372. LM_WARN("Failed to connect to %s on port %d... %s\n", server->host, server->port, strerror(errno));
  373. handle_server_failure(server);
  374. return -1;
  375. }
  376. if (set_non_blocking(sockfd) != 0)
  377. {
  378. LM_WARN("Failed to set socket (%s:%d) to non blocking.\n", server->host, server->port);
  379. handle_server_failure(server);
  380. return -1;
  381. }
  382. server->socket = sockfd;
  383. server->status = JSONRPC_SERVER_CONNECTED;
  384. struct event *socket_ev = pkg_malloc(sizeof(struct event));
  385. CHECK_MALLOC(socket_ev);
  386. event_set(socket_ev, sockfd, EV_READ | EV_PERSIST, socket_cb, server);
  387. event_add(socket_ev, NULL);
  388. server->ev = socket_ev;
  389. return 0;
  390. }
  391. int connect_servers(struct jsonrpc_server_group *group)
  392. {
  393. int connected_servers = 0;
  394. for (;group != NULL; group = group->next_group)
  395. {
  396. struct jsonrpc_server *s, *first = NULL;
  397. LM_INFO("Connecting to servers for priority %d:\n", group->priority);
  398. for (s=group->next_server;s!=first;s=s->next)
  399. {
  400. if (connect_server(s) == 0)
  401. {
  402. connected_servers++;
  403. LM_INFO("Connected to host %s on port %d\n", s->host, s->port);
  404. }
  405. if (first == NULL) first = s;
  406. }
  407. }
  408. return connected_servers;
  409. }
  410. void reconnect_cb(int fd, short event, void *arg)
  411. {
  412. LM_INFO("Attempting to reconnect now.");
  413. struct jsonrpc_server *server = (struct jsonrpc_server*)arg;
  414. if (server->status == JSONRPC_SERVER_CONNECTED) {
  415. LM_WARN("Trying to connect an already connected server.");
  416. return;
  417. }
  418. if (server->ev != NULL) {
  419. event_del(server->ev);
  420. pkg_free(server->ev);
  421. server->ev = NULL;
  422. }
  423. close(fd);
  424. pkg_free(server->timer);
  425. connect_server(server);
  426. }
  427. int handle_server_failure(struct jsonrpc_server *server)
  428. {
  429. LM_INFO("Setting timer to reconnect to %s on port %d in %d seconds.\n", server->host, server->port, JSONRPC_RECONNECT_INTERVAL);
  430. if (server->socket)
  431. close(server->socket);
  432. server->socket = 0;
  433. if (server->ev != NULL) {
  434. event_del(server->ev);
  435. pkg_free(server->ev);
  436. server->ev = NULL;
  437. }
  438. server->status = JSONRPC_SERVER_FAILURE;
  439. int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
  440. if (timerfd == -1) {
  441. LM_ERR("Could not create timerfd to reschedule connection. No further attempts will be made to reconnect this server.");
  442. return -1;
  443. }
  444. struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec));
  445. CHECK_MALLOC(itime);
  446. itime->it_interval.tv_sec = 0;
  447. itime->it_interval.tv_nsec = 0;
  448. itime->it_value.tv_sec = JSONRPC_RECONNECT_INTERVAL;
  449. itime->it_value.tv_nsec = 0;
  450. if (timerfd_settime(timerfd, 0, itime, NULL) == -1)
  451. {
  452. LM_ERR("Could not set timer to reschedule connection. No further attempts will be made to reconnect this server.");
  453. return -1;
  454. }
  455. LM_INFO("timerfd value is %d\n", timerfd);
  456. struct event *timer_ev = pkg_malloc(sizeof(struct event));
  457. CHECK_MALLOC(timer_ev);
  458. event_set(timer_ev, timerfd, EV_READ, reconnect_cb, server);
  459. if(event_add(timer_ev, NULL) == -1) {
  460. LM_ERR("event_add failed while rescheduling connection (%s). No further attempts will be made to reconnect this server.", strerror(errno));
  461. return -1;
  462. }
  463. server->ev = timer_ev;
  464. server->timer = itime;
  465. return 0;
  466. }
  467. void free_pipe_cmd(struct jsonrpc_pipe_cmd *cmd)
  468. {
  469. if (cmd->method)
  470. shm_free(cmd->method);
  471. if (cmd->params)
  472. shm_free(cmd->params);
  473. if (cmd->cb_route)
  474. shm_free(cmd->cb_route);
  475. if (cmd->err_route)
  476. shm_free(cmd->err_route);
  477. if (cmd->cb_pv)
  478. shm_free(cmd->cb_pv);
  479. shm_free(cmd);
  480. }