jsonrpc_io.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. /**
  2. * $Id$
  3. *
  4. * Copyright (C) 2011 Flowroute LLC (flowroute.com)
  5. *
  6. * This file is part of Kamailio, a free SIP server.
  7. *
  8. * This file is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU General Public License as published by
  10. * the Free Software Foundation; either version 2 of the License, or
  11. * (at your option) any later version
  12. *
  13. *
  14. * This file is distributed in the hope that it will be useful,
  15. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. * GNU General Public License for more details.
  18. *
  19. * You should have received a copy of the GNU General Public License
  20. * along with this program; if not, write to the Free Software
  21. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  22. *
  23. */
  24. #include <stdio.h>
  25. #include <stdlib.h>
  26. #include <errno.h>
  27. #include <string.h>
  28. #include <fcntl.h>
  29. #include <event.h>
  30. #include <sys/timerfd.h>
  31. #include "../../sr_module.h"
  32. #include "../../route.h"
  33. #include "../../route_struct.h"
  34. #include "../../lvalue.h"
  35. #include "../tm/tm_load.h"
  36. #include "jsonrpc_io.h"
  37. #include "jsonrpc.h"
  38. #include "netstring.h"
  39. #define CHECK_MALLOC_VOID(p) if(!p) {LM_ERR("Out of memory!"); return;}
  40. #define CHECK_MALLOC(p) if(!p) {LM_ERR("Out of memory!"); return -1;}
  41. struct jsonrpc_server {
  42. char *host;
  43. int port, socket, status;
  44. struct jsonrpc_server *next;
  45. struct event *ev;
  46. struct itimerspec *timer;
  47. };
  48. struct jsonrpc_server_group {
  49. struct jsonrpc_server *next_server;
  50. int priority;
  51. struct jsonrpc_server_group *next_group;
  52. };
  53. struct tm_binds tmb;
  54. struct jsonrpc_server_group *server_group;
  55. void socket_cb(int fd, short event, void *arg);
  56. void cmd_pipe_cb(int fd, short event, void *arg);
  57. int set_non_blocking(int fd);
  58. int parse_servers(char *_servers, struct jsonrpc_server_group **group_ptr);
  59. int connect_servers(struct jsonrpc_server_group *group);
  60. int connect_server(struct jsonrpc_server *server);
  61. int handle_server_failure(struct jsonrpc_server *server);
  62. int jsonrpc_io_child_process(int cmd_pipe, char* _servers)
  63. {
  64. if (parse_servers(_servers, &server_group) != 0)
  65. {
  66. LM_ERR("servers parameter could not be parsed\n");
  67. return -1;
  68. }
  69. event_init();
  70. struct event pipe_ev;
  71. set_non_blocking(cmd_pipe);
  72. event_set(&pipe_ev, cmd_pipe, EV_READ | EV_PERSIST, cmd_pipe_cb, &pipe_ev);
  73. event_add(&pipe_ev, NULL);
  74. if (!connect_servers(server_group))
  75. {
  76. LM_ERR("failed to connect to any servers\n");
  77. return -1;
  78. }
  79. event_dispatch();
  80. return 0;
  81. }
  82. void timeout_cb(int fd, short event, void *arg)
  83. {
  84. LM_ERR("message timeout\n");
  85. jsonrpc_request_t *req = (jsonrpc_request_t*)arg;
  86. json_object *error = json_object_new_string("timeout");
  87. void_jsonrpc_request(req->id);
  88. close(req->timerfd);
  89. event_del(req->timer_ev);
  90. pkg_free(req->timer_ev);
  91. req->cbfunc(error, req->cbdata, 1);
  92. pkg_free(req);
  93. }
  94. int result_cb(json_object *result, char *data, int error)
  95. {
  96. struct jsonrpc_pipe_cmd *cmd = (struct jsonrpc_pipe_cmd*)data;
  97. pv_spec_t *dst = cmd->cb_pv;
  98. pv_value_t val;
  99. const char* res = json_object_get_string(result);
  100. val.rs.s = (char*)res;
  101. val.rs.len = strlen(res);
  102. val.flags = PV_VAL_STR;
  103. dst->setf(0, &dst->pvp, (int)EQ_T, &val);
  104. int n;
  105. if (error) {
  106. n = route_get(&main_rt, cmd->err_route);
  107. } else {
  108. n = route_get(&main_rt, cmd->cb_route);
  109. }
  110. struct action *a = main_rt.rlist[n];
  111. tmb.t_continue(cmd->t_hash, cmd->t_label, a);
  112. free_pipe_cmd(cmd);
  113. return 0;
  114. }
  115. int (*res_cb)(json_object*, char*, int) = &result_cb;
  116. void cmd_pipe_cb(int fd, short event, void *arg)
  117. {
  118. struct jsonrpc_pipe_cmd *cmd;
  119. char *ns = 0;
  120. size_t bytes;
  121. json_object *payload = NULL;
  122. jsonrpc_request_t *req = NULL;
  123. json_object *params;
  124. /* struct event *ev = (struct event*)arg; */
  125. if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) {
  126. LM_ERR("failed to read from command pipe: %s\n", strerror(errno));
  127. return;
  128. }
  129. params = json_tokener_parse(cmd->params);
  130. if (cmd->notify_only) {
  131. payload = build_jsonrpc_notification(cmd->method, params);
  132. } else {
  133. req = build_jsonrpc_request(cmd->method, params, (char*)cmd, res_cb);
  134. if (req)
  135. payload = req->payload;
  136. }
  137. if (!payload) {
  138. LM_ERR("Failed to build jsonrpc_request_t (method: %s, params: %s)\n", cmd->method, cmd->params);
  139. goto error;
  140. }
  141. char *json = (char*)json_object_get_string(payload);
  142. bytes = netstring_encode_new(&ns, json, (size_t)strlen(json));
  143. struct jsonrpc_server_group *g;
  144. int sent = 0;
  145. for (g = server_group; g != NULL; g = g->next_group)
  146. {
  147. struct jsonrpc_server *s, *first = NULL;
  148. for (s = g->next_server; s != first; s = s->next)
  149. {
  150. if (first == NULL) first = s;
  151. if (s->status == JSONRPC_SERVER_CONNECTED) {
  152. if (send(s->socket, ns, bytes, 0) == bytes)
  153. {
  154. sent = 1;
  155. break;
  156. } else {
  157. handle_server_failure(s);
  158. }
  159. }
  160. g->next_server = s->next;
  161. }
  162. if (sent) {
  163. break;
  164. } else {
  165. LM_WARN("Failed to send on priority group %d... proceeding to next priority group.\n", g->priority);
  166. }
  167. }
  168. if (sent && req) {
  169. int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
  170. if (timerfd == -1) {
  171. LM_ERR("Could not create timerfd.");
  172. goto error;
  173. }
  174. req->timerfd = timerfd;
  175. struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec));
  176. CHECK_MALLOC_VOID(itime);
  177. itime->it_interval.tv_sec = 0;
  178. itime->it_interval.tv_nsec = 0;
  179. itime->it_value.tv_sec = JSONRPC_TIMEOUT/1000;
  180. itime->it_value.tv_nsec = (JSONRPC_TIMEOUT % 1000) * 1000000;
  181. if (timerfd_settime(timerfd, 0, itime, NULL) == -1)
  182. {
  183. LM_ERR("Could not set timer.");
  184. goto error;
  185. }
  186. pkg_free(itime);
  187. struct event *timer_ev = pkg_malloc(sizeof(struct event));
  188. CHECK_MALLOC_VOID(timer_ev);
  189. event_set(timer_ev, timerfd, EV_READ, timeout_cb, req);
  190. if(event_add(timer_ev, NULL) == -1) {
  191. LM_ERR("event_add failed while setting request timer (%s).", strerror(errno));
  192. goto error;
  193. }
  194. req->timer_ev = timer_ev;
  195. } else if (!sent) {
  196. LM_ERR("Request could not be sent... no more failover groups.\n");
  197. if (req) {
  198. json_object *error = json_object_new_string("failure");
  199. void_jsonrpc_request(req->id);
  200. req->cbfunc(error, req->cbdata, 1);
  201. }
  202. }
  203. pkg_free(ns);
  204. json_object_put(payload);
  205. if (cmd->notify_only) free_pipe_cmd(cmd);
  206. return;
  207. error:
  208. if(ns) pkg_free(ns);
  209. if(payload) json_object_put(payload);
  210. if (cmd->notify_only) free_pipe_cmd(cmd);
  211. return;
  212. }
  213. void socket_cb(int fd, short event, void *arg)
  214. {
  215. struct jsonrpc_server *server = (struct jsonrpc_server*)arg;
  216. if (event != EV_READ) {
  217. LM_ERR("unexpected socket event (%d)\n", event);
  218. handle_server_failure(server);
  219. return;
  220. }
  221. char *netstring;
  222. int retval = netstring_read_fd(fd, &netstring);
  223. if (retval != 0) {
  224. LM_ERR("bad netstring (%d)\n", retval);
  225. handle_server_failure(server);
  226. return;
  227. }
  228. struct json_object *res = json_tokener_parse(netstring);
  229. if (res) {
  230. handle_jsonrpc_response(res);
  231. json_object_put(res);
  232. } else {
  233. LM_ERR("netstring could not be parsed: (%s)\n", netstring);
  234. handle_server_failure(server);
  235. }
  236. pkg_free(netstring);
  237. }
  238. int set_non_blocking(int fd)
  239. {
  240. int flags;
  241. flags = fcntl(fd, F_GETFL);
  242. if (flags < 0)
  243. return flags;
  244. flags |= O_NONBLOCK;
  245. if (fcntl(fd, F_SETFL, flags) < 0)
  246. return -1;
  247. return 0;
  248. }
  249. int parse_servers(char *_servers, struct jsonrpc_server_group **group_ptr)
  250. {
  251. char cpy[strlen(_servers)+1];
  252. char *servers = strcpy(cpy, _servers);
  253. struct jsonrpc_server_group *group = NULL;
  254. /* parse servers string */
  255. char *token = strtok(servers, ":");
  256. while (token != NULL)
  257. {
  258. char *host, *port_s, *priority_s, *tail;
  259. int port, priority;
  260. host = token;
  261. /* validate domain */
  262. if (!(isalpha(host[0]) || isdigit(host[0]))) {
  263. LM_ERR("invalid domain (1st char is '%c')\n", host[0]);
  264. return -1;
  265. }
  266. int i;
  267. for (i=1; i<strlen(host)-1; i++)
  268. {
  269. if(!(isalpha(host[i]) || isdigit(host[i]) || host[i] == '-' || host[i] == '.'))
  270. {
  271. LM_ERR("invalid domain (char %d is %c)\n", i, host[i]);
  272. return -1;
  273. }
  274. }
  275. if (!(isalpha(host[i]) || isdigit(host[i]))) {
  276. LM_ERR("invalid domain (char %d (last) is %c)\n", i, host[i]);
  277. return -1;
  278. }
  279. /* convert/validate port */
  280. port_s = strtok(NULL, ",");
  281. if (port_s == NULL || !(port = strtol(port_s, &tail, 0)) || strlen(tail))
  282. {
  283. LM_ERR("invalid port: %s\n", port_s);
  284. return -1;
  285. }
  286. /* convert/validate priority */
  287. priority_s = strtok(NULL, " ");
  288. if (priority_s == NULL || !(priority = strtol(priority_s, &tail, 0)) || strlen(tail))
  289. {
  290. LM_ERR("invalid priority: %s\n", priority_s);
  291. return -1;
  292. }
  293. struct jsonrpc_server *server = pkg_malloc(sizeof(struct jsonrpc_server));
  294. CHECK_MALLOC(server);
  295. memset(server, 0, sizeof(struct jsonrpc_server));
  296. char *h = pkg_malloc(strlen(host)+1);
  297. CHECK_MALLOC(h);
  298. strcpy(h,host);
  299. server->host = h;
  300. server->port = port;
  301. server->status = JSONRPC_SERVER_DISCONNECTED;
  302. server->socket = 0;
  303. int group_cnt = 0;
  304. /* search for a server group with this server's priority */
  305. struct jsonrpc_server_group *selected_group = NULL;
  306. for (selected_group=group; selected_group != NULL; selected_group=selected_group->next_group)
  307. {
  308. if (selected_group->priority == priority) break;
  309. }
  310. if (selected_group == NULL) {
  311. group_cnt++;
  312. LM_INFO("Creating group for priority %d\n", priority);
  313. /* this is the first server for this priority... link it to itself */
  314. server->next = server;
  315. selected_group = pkg_malloc(sizeof(struct jsonrpc_server_group));
  316. CHECK_MALLOC(selected_group);
  317. memset(selected_group, 0, sizeof(struct jsonrpc_server_group));
  318. selected_group->priority = priority;
  319. selected_group->next_server = server;
  320. /* insert the group properly in the linked list */
  321. struct jsonrpc_server_group *x, *pg;
  322. pg = NULL;
  323. if (group == NULL)
  324. {
  325. group = selected_group;
  326. group->next_group = NULL;
  327. } else {
  328. for (x = group; x != NULL; x = x->next_group)
  329. {
  330. if (priority > x->priority)
  331. {
  332. if (pg == NULL)
  333. {
  334. group = selected_group;
  335. } else {
  336. pg->next_group = selected_group;
  337. }
  338. selected_group->next_group = x;
  339. break;
  340. } else if (x->next_group == NULL) {
  341. x->next_group = selected_group;
  342. break;
  343. } else {
  344. pg = x;
  345. }
  346. }
  347. }
  348. } else {
  349. LM_ERR("Using existing group for priority %d\n", priority);
  350. server->next = selected_group->next_server->next;
  351. selected_group->next_server->next = server;
  352. }
  353. token = strtok(NULL, ":");
  354. }
  355. *group_ptr = group;
  356. return 0;
  357. }
  358. int connect_server(struct jsonrpc_server *server)
  359. {
  360. struct sockaddr_in server_addr;
  361. struct hostent *hp;
  362. server_addr.sin_family = AF_INET;
  363. server_addr.sin_port = htons(server->port);
  364. hp = gethostbyname(server->host);
  365. if (hp == NULL) {
  366. LM_ERR("gethostbyname(%s) failed with h_errno=%d.\n", server->host, h_errno);
  367. handle_server_failure(server);
  368. return -1;
  369. }
  370. memcpy(&(server_addr.sin_addr.s_addr), hp->h_addr, hp->h_length);
  371. int sockfd = socket(AF_INET,SOCK_STREAM,0);
  372. if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_in))) {
  373. LM_WARN("Failed to connect to %s on port %d... %s\n", server->host, server->port, strerror(errno));
  374. handle_server_failure(server);
  375. return -1;
  376. }
  377. if (set_non_blocking(sockfd) != 0)
  378. {
  379. LM_WARN("Failed to set socket (%s:%d) to non blocking.\n", server->host, server->port);
  380. handle_server_failure(server);
  381. return -1;
  382. }
  383. server->socket = sockfd;
  384. server->status = JSONRPC_SERVER_CONNECTED;
  385. struct event *socket_ev = pkg_malloc(sizeof(struct event));
  386. CHECK_MALLOC(socket_ev);
  387. event_set(socket_ev, sockfd, EV_READ | EV_PERSIST, socket_cb, server);
  388. event_add(socket_ev, NULL);
  389. server->ev = socket_ev;
  390. return 0;
  391. }
  392. int connect_servers(struct jsonrpc_server_group *group)
  393. {
  394. int connected_servers = 0;
  395. for (;group != NULL; group = group->next_group)
  396. {
  397. struct jsonrpc_server *s, *first = NULL;
  398. LM_INFO("Connecting to servers for priority %d:\n", group->priority);
  399. for (s=group->next_server;s!=first;s=s->next)
  400. {
  401. if (connect_server(s) == 0)
  402. {
  403. connected_servers++;
  404. LM_INFO("Connected to host %s on port %d\n", s->host, s->port);
  405. }
  406. if (first == NULL) first = s;
  407. }
  408. }
  409. return connected_servers;
  410. }
  411. void reconnect_cb(int fd, short event, void *arg)
  412. {
  413. LM_INFO("Attempting to reconnect now.");
  414. struct jsonrpc_server *server = (struct jsonrpc_server*)arg;
  415. if (server->status == JSONRPC_SERVER_CONNECTED) {
  416. LM_WARN("Trying to connect an already connected server.");
  417. return;
  418. }
  419. if (server->ev != NULL) {
  420. event_del(server->ev);
  421. pkg_free(server->ev);
  422. server->ev = NULL;
  423. }
  424. close(fd);
  425. pkg_free(server->timer);
  426. connect_server(server);
  427. }
  428. int handle_server_failure(struct jsonrpc_server *server)
  429. {
  430. LM_INFO("Setting timer to reconnect to %s on port %d in %d seconds.\n", server->host, server->port, JSONRPC_RECONNECT_INTERVAL);
  431. if (server->socket)
  432. close(server->socket);
  433. server->socket = 0;
  434. if (server->ev != NULL) {
  435. event_del(server->ev);
  436. pkg_free(server->ev);
  437. server->ev = NULL;
  438. }
  439. server->status = JSONRPC_SERVER_FAILURE;
  440. int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
  441. if (timerfd == -1) {
  442. LM_ERR("Could not create timerfd to reschedule connection. No further attempts will be made to reconnect this server.");
  443. return -1;
  444. }
  445. struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec));
  446. CHECK_MALLOC(itime);
  447. itime->it_interval.tv_sec = 0;
  448. itime->it_interval.tv_nsec = 0;
  449. itime->it_value.tv_sec = JSONRPC_RECONNECT_INTERVAL;
  450. itime->it_value.tv_nsec = 0;
  451. if (timerfd_settime(timerfd, 0, itime, NULL) == -1)
  452. {
  453. LM_ERR("Could not set timer to reschedule connection. No further attempts will be made to reconnect this server.");
  454. return -1;
  455. }
  456. LM_INFO("timerfd value is %d\n", timerfd);
  457. struct event *timer_ev = pkg_malloc(sizeof(struct event));
  458. CHECK_MALLOC(timer_ev);
  459. event_set(timer_ev, timerfd, EV_READ, reconnect_cb, server);
  460. if(event_add(timer_ev, NULL) == -1) {
  461. LM_ERR("event_add failed while rescheduling connection (%s). No further attempts will be made to reconnect this server.", strerror(errno));
  462. return -1;
  463. }
  464. server->ev = timer_ev;
  465. server->timer = itime;
  466. return 0;
  467. }
  468. void free_pipe_cmd(struct jsonrpc_pipe_cmd *cmd)
  469. {
  470. if (cmd->method)
  471. shm_free(cmd->method);
  472. if (cmd->params)
  473. shm_free(cmd->params);
  474. if (cmd->cb_route)
  475. shm_free(cmd->cb_route);
  476. if (cmd->err_route)
  477. shm_free(cmd->err_route);
  478. if (cmd->cb_pv)
  479. shm_free(cmd->cb_pv);
  480. shm_free(cmd);
  481. }