jsonrpc_io.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557
  1. /**
  2. * $Id$
  3. *
  4. * Copyright (C) 2011 Flowroute LLC (flowroute.com)
  5. *
  6. * This file is part of Kamailio, a free SIP server.
  7. *
  8. * This file is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU General Public License as published by
  10. * the Free Software Foundation; either version 2 of the License, or
  11. * (at your option) any later version
  12. *
  13. *
  14. * This file is distributed in the hope that it will be useful,
  15. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. * GNU General Public License for more details.
  18. *
  19. * You should have received a copy of the GNU General Public License
  20. * along with this program; if not, write to the Free Software
  21. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  22. *
  23. */
  24. #include <stdio.h>
  25. #include <stdlib.h>
  26. #include <errno.h>
  27. #include <string.h>
  28. #include <fcntl.h>
  29. #include <event.h>
  30. #include <sys/timerfd.h>
  31. #include "../../sr_module.h"
  32. #include "../../route.h"
  33. #include "../../route_struct.h"
  34. #include "../../lvalue.h"
  35. #include "../tm/tm_load.h"
  36. #include "jsonrpc_io.h"
  37. #include "jsonrpc.h"
  38. #include "netstring.h"
  39. #define CHECK_MALLOC_VOID(p) if(!p) {LM_ERR("Out of memory!"); return;}
  40. #define CHECK_MALLOC(p) if(!p) {LM_ERR("Out of memory!"); return -1;}
  41. struct jsonrpc_server {
  42. char *host;
  43. int port, socket, status;
  44. struct jsonrpc_server *next;
  45. struct event *ev;
  46. struct itimerspec *timer;
  47. };
  48. struct jsonrpc_server_group {
  49. struct jsonrpc_server *next_server;
  50. int priority;
  51. struct jsonrpc_server_group *next_group;
  52. };
  53. struct tm_binds tmb;
  54. struct jsonrpc_server_group *server_group;
  55. void socket_cb(int fd, short event, void *arg);
  56. void cmd_pipe_cb(int fd, short event, void *arg);
  57. int set_non_blocking(int fd);
  58. int parse_servers(char *_servers, struct jsonrpc_server_group **group_ptr);
  59. int connect_servers(struct jsonrpc_server_group *group);
  60. int connect_server(struct jsonrpc_server *server);
  61. int handle_server_failure(struct jsonrpc_server *server);
  62. int jsonrpc_io_child_process(int cmd_pipe, char* _servers)
  63. {
  64. if (parse_servers(_servers, &server_group) != 0)
  65. {
  66. LM_ERR("servers parameter could not be parsed\n");
  67. return -1;
  68. }
  69. event_init();
  70. struct event pipe_ev;
  71. set_non_blocking(cmd_pipe);
  72. event_set(&pipe_ev, cmd_pipe, EV_READ | EV_PERSIST, cmd_pipe_cb, &pipe_ev);
  73. event_add(&pipe_ev, NULL);
  74. if (!connect_servers(server_group))
  75. {
  76. LM_ERR("failed to connect to any servers\n");
  77. return -1;
  78. }
  79. event_dispatch();
  80. return 0;
  81. }
  82. void timeout_cb(int fd, short event, void *arg)
  83. {
  84. LM_ERR("message timeout\n");
  85. jsonrpc_request_t *req = (jsonrpc_request_t*)arg;
  86. json_object *error = json_object_new_string("timeout");
  87. void_jsonrpc_request(req->id);
  88. close(req->timerfd);
  89. event_del(req->timer_ev);
  90. pkg_free(req->timer_ev);
  91. req->cbfunc(error, req->cbdata, 1);
  92. pkg_free(req);
  93. }
  94. int result_cb(json_object *result, char *data, int error)
  95. {
  96. struct jsonrpc_pipe_cmd *cmd = (struct jsonrpc_pipe_cmd*)data;
  97. pv_spec_t *dst = cmd->cb_pv;
  98. pv_value_t val;
  99. const char* res = json_object_get_string(result);
  100. val.rs.s = (char*)res;
  101. val.rs.len = strlen(res);
  102. val.flags = PV_VAL_STR;
  103. dst->setf(0, &dst->pvp, (int)EQ_T, &val);
  104. int n;
  105. if (error) {
  106. n = route_get(&main_rt, cmd->err_route);
  107. } else {
  108. n = route_get(&main_rt, cmd->cb_route);
  109. }
  110. struct action *a = main_rt.rlist[n];
  111. tmb.t_continue(cmd->t_hash, cmd->t_label, a);
  112. free_pipe_cmd(cmd);
  113. return 0;
  114. }
  115. int (*res_cb)(json_object*, char*, int) = &result_cb;
  116. void cmd_pipe_cb(int fd, short event, void *arg)
  117. {
  118. struct jsonrpc_pipe_cmd *cmd;
  119. /* struct event *ev = (struct event*)arg; */
  120. if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) {
  121. LM_ERR("failed to read from command pipe: %s\n", strerror(errno));
  122. return;
  123. }
  124. json_object *params = json_tokener_parse(cmd->params);
  125. json_object *payload = NULL;
  126. jsonrpc_request_t *req = NULL;
  127. if (cmd->notify_only) {
  128. payload = build_jsonrpc_notification(cmd->method, params);
  129. } else {
  130. req = build_jsonrpc_request(cmd->method, params, (char*)cmd, res_cb);
  131. if (req)
  132. payload = req->payload;
  133. }
  134. if (!payload) {
  135. LM_ERR("Failed to build jsonrpc_request_t (method: %s, params: %s)\n", cmd->method, cmd->params);
  136. return;
  137. }
  138. char *json = (char*)json_object_get_string(payload);
  139. char *ns; size_t bytes;
  140. bytes = netstring_encode_new(&ns, json, (size_t)strlen(json));
  141. struct jsonrpc_server_group *g;
  142. int sent = 0;
  143. for (g = server_group; g != NULL; g = g->next_group)
  144. {
  145. struct jsonrpc_server *s, *first = NULL;
  146. for (s = g->next_server; s != first; s = s->next)
  147. {
  148. if (first == NULL) first = s;
  149. if (s->status == JSONRPC_SERVER_CONNECTED) {
  150. if (send(s->socket, ns, bytes, 0) == bytes)
  151. {
  152. sent = 1;
  153. break;
  154. } else {
  155. handle_server_failure(s);
  156. }
  157. }
  158. g->next_server = s->next;
  159. }
  160. if (sent) {
  161. break;
  162. } else {
  163. LM_WARN("Failed to send on priority group %d... proceeding to next priority group.\n", g->priority);
  164. }
  165. }
  166. if (sent && req) {
  167. int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
  168. if (timerfd == -1) {
  169. LM_ERR("Could not create timerfd.");
  170. return;
  171. }
  172. req->timerfd = timerfd;
  173. struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec));
  174. CHECK_MALLOC_VOID(itime);
  175. itime->it_interval.tv_sec = 0;
  176. itime->it_interval.tv_nsec = 0;
  177. itime->it_value.tv_sec = JSONRPC_TIMEOUT/1000;
  178. itime->it_value.tv_nsec = (JSONRPC_TIMEOUT % 1000) * 1000000;
  179. if (timerfd_settime(timerfd, 0, itime, NULL) == -1)
  180. {
  181. LM_ERR("Could not set timer.");
  182. return;
  183. }
  184. pkg_free(itime);
  185. struct event *timer_ev = pkg_malloc(sizeof(struct event));
  186. CHECK_MALLOC_VOID(timer_ev);
  187. event_set(timer_ev, timerfd, EV_READ, timeout_cb, req);
  188. if(event_add(timer_ev, NULL) == -1) {
  189. LM_ERR("event_add failed while setting request timer (%s).", strerror(errno));
  190. return;
  191. }
  192. req->timer_ev = timer_ev;
  193. } else if (!sent) {
  194. LM_ERR("Request could not be sent... no more failover groups.\n");
  195. if (req) {
  196. json_object *error = json_object_new_string("failure");
  197. void_jsonrpc_request(req->id);
  198. req->cbfunc(error, req->cbdata, 1);
  199. }
  200. }
  201. pkg_free(ns);
  202. json_object_put(payload);
  203. }
  204. void socket_cb(int fd, short event, void *arg)
  205. {
  206. struct jsonrpc_server *server = (struct jsonrpc_server*)arg;
  207. if (event != EV_READ) {
  208. LM_ERR("unexpected socket event (%d)\n", event);
  209. handle_server_failure(server);
  210. return;
  211. }
  212. char *netstring;
  213. int retval = netstring_read_fd(fd, &netstring);
  214. if (retval != 0) {
  215. LM_ERR("bad netstring (%d)\n", retval);
  216. handle_server_failure(server);
  217. return;
  218. }
  219. struct json_object *res = json_tokener_parse(netstring);
  220. if (res) {
  221. handle_jsonrpc_response(res);
  222. json_object_put(res);
  223. } else {
  224. LM_ERR("netstring could not be parsed: (%s)\n", netstring);
  225. handle_server_failure(server);
  226. }
  227. pkg_free(netstring);
  228. }
  229. int set_non_blocking(int fd)
  230. {
  231. int flags;
  232. flags = fcntl(fd, F_GETFL);
  233. if (flags < 0)
  234. return flags;
  235. flags |= O_NONBLOCK;
  236. if (fcntl(fd, F_SETFL, flags) < 0)
  237. return -1;
  238. return 0;
  239. }
  240. int parse_servers(char *_servers, struct jsonrpc_server_group **group_ptr)
  241. {
  242. char cpy[strlen(_servers)+1];
  243. char *servers = strcpy(cpy, _servers);
  244. struct jsonrpc_server_group *group = NULL;
  245. /* parse servers string */
  246. char *token = strtok(servers, ":");
  247. while (token != NULL)
  248. {
  249. char *host, *port_s, *priority_s, *tail;
  250. int port, priority;
  251. host = token;
  252. /* validate domain */
  253. if (!(isalpha(host[0]) || isdigit(host[0]))) {
  254. LM_ERR("invalid domain (1st char is '%c')\n", host[0]);
  255. return -1;
  256. }
  257. int i;
  258. for (i=1; i<strlen(host)-1; i++)
  259. {
  260. if(!(isalpha(host[i]) || isdigit(host[i]) || host[i] == '-' || host[i] == '.'))
  261. {
  262. LM_ERR("invalid domain (char %d is %c)\n", i, host[i]);
  263. return -1;
  264. }
  265. }
  266. if (!(isalpha(host[i]) || isdigit(host[i]))) {
  267. LM_ERR("invalid domain (char %d (last) is %c)\n", i, host[i]);
  268. return -1;
  269. }
  270. /* convert/validate port */
  271. port_s = strtok(NULL, ",");
  272. if (port_s == NULL || !(port = strtol(port_s, &tail, 0)) || strlen(tail))
  273. {
  274. LM_ERR("invalid port: %s\n", port_s);
  275. return -1;
  276. }
  277. /* convert/validate priority */
  278. priority_s = strtok(NULL, " ");
  279. if (priority_s == NULL || !(priority = strtol(priority_s, &tail, 0)) || strlen(tail))
  280. {
  281. LM_ERR("invalid priority: %s\n", priority_s);
  282. return -1;
  283. }
  284. struct jsonrpc_server *server = pkg_malloc(sizeof(struct jsonrpc_server));
  285. CHECK_MALLOC(server);
  286. char *h = pkg_malloc(strlen(host)+1);
  287. CHECK_MALLOC(h);
  288. strcpy(h,host);
  289. server->host = h;
  290. server->port = port;
  291. server->status = JSONRPC_SERVER_DISCONNECTED;
  292. server->socket = 0;
  293. int group_cnt = 0;
  294. /* search for a server group with this server's priority */
  295. struct jsonrpc_server_group *selected_group = NULL;
  296. for (selected_group=group; selected_group != NULL; selected_group=selected_group->next_group)
  297. {
  298. if (selected_group->priority == priority) break;
  299. }
  300. if (selected_group == NULL) {
  301. group_cnt++;
  302. LM_INFO("Creating group for priority %d\n", priority);
  303. /* this is the first server for this priority... link it to itself */
  304. server->next = server;
  305. selected_group = pkg_malloc(sizeof(struct jsonrpc_server_group));
  306. CHECK_MALLOC(selected_group);
  307. selected_group->priority = priority;
  308. selected_group->next_server = server;
  309. /* insert the group properly in the linked list */
  310. struct jsonrpc_server_group *x, *pg;
  311. pg = NULL;
  312. if (group == NULL)
  313. {
  314. group = selected_group;
  315. group->next_group = NULL;
  316. } else {
  317. for (x = group; x != NULL; x = x->next_group)
  318. {
  319. if (priority > x->priority)
  320. {
  321. if (pg == NULL)
  322. {
  323. group = selected_group;
  324. } else {
  325. pg->next_group = selected_group;
  326. }
  327. selected_group->next_group = x;
  328. break;
  329. } else if (x->next_group == NULL) {
  330. x->next_group = selected_group;
  331. break;
  332. } else {
  333. pg = x;
  334. }
  335. }
  336. }
  337. } else {
  338. LM_ERR("Using existing group for priority %d\n", priority);
  339. server->next = selected_group->next_server->next;
  340. selected_group->next_server->next = server;
  341. }
  342. token = strtok(NULL, ":");
  343. }
  344. *group_ptr = group;
  345. return 0;
  346. }
  347. int connect_server(struct jsonrpc_server *server)
  348. {
  349. struct sockaddr_in server_addr;
  350. struct hostent *hp;
  351. server_addr.sin_family = AF_INET;
  352. server_addr.sin_port = htons(server->port);
  353. hp = gethostbyname(server->host);
  354. if (hp == NULL) {
  355. LM_ERR("gethostbyname(%s) failed with h_errno=%d.\n", server->host, h_errno);
  356. handle_server_failure(server);
  357. return -1;
  358. }
  359. memcpy(&(server_addr.sin_addr.s_addr), hp->h_addr, hp->h_length);
  360. int sockfd = socket(AF_INET,SOCK_STREAM,0);
  361. if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_in))) {
  362. LM_WARN("Failed to connect to %s on port %d... %s\n", server->host, server->port, strerror(errno));
  363. handle_server_failure(server);
  364. return -1;
  365. }
  366. if (set_non_blocking(sockfd) != 0)
  367. {
  368. LM_WARN("Failed to set socket (%s:%d) to non blocking.\n", server->host, server->port);
  369. handle_server_failure(server);
  370. return -1;
  371. }
  372. server->socket = sockfd;
  373. server->status = JSONRPC_SERVER_CONNECTED;
  374. struct event *socket_ev = pkg_malloc(sizeof(struct event));
  375. CHECK_MALLOC(socket_ev);
  376. event_set(socket_ev, sockfd, EV_READ | EV_PERSIST, socket_cb, server);
  377. event_add(socket_ev, NULL);
  378. server->ev = socket_ev;
  379. return 0;
  380. }
  381. int connect_servers(struct jsonrpc_server_group *group)
  382. {
  383. int connected_servers = 0;
  384. for (;group != NULL; group = group->next_group)
  385. {
  386. struct jsonrpc_server *s, *first = NULL;
  387. LM_INFO("Connecting to servers for priority %d:\n", group->priority);
  388. for (s=group->next_server;s!=first;s=s->next)
  389. {
  390. if (connect_server(s) == 0)
  391. {
  392. connected_servers++;
  393. LM_INFO("Connected to host %s on port %d\n", s->host, s->port);
  394. }
  395. if (first == NULL) first = s;
  396. }
  397. }
  398. return connected_servers;
  399. }
  400. void reconnect_cb(int fd, short event, void *arg)
  401. {
  402. LM_INFO("Attempting to reconnect now.");
  403. struct jsonrpc_server *server = (struct jsonrpc_server*)arg;
  404. if (server->status == JSONRPC_SERVER_CONNECTED) {
  405. LM_WARN("Trying to connect an already connected server.");
  406. return;
  407. }
  408. if (server->ev != NULL) {
  409. event_del(server->ev);
  410. pkg_free(server->ev);
  411. server->ev = NULL;
  412. }
  413. close(fd);
  414. pkg_free(server->timer);
  415. connect_server(server);
  416. }
  417. int handle_server_failure(struct jsonrpc_server *server)
  418. {
  419. LM_INFO("Setting timer to reconnect to %s on port %d in %d seconds.\n", server->host, server->port, JSONRPC_RECONNECT_INTERVAL);
  420. if (server->socket)
  421. close(server->socket);
  422. server->socket = 0;
  423. if (server->ev != NULL) {
  424. event_del(server->ev);
  425. pkg_free(server->ev);
  426. server->ev = NULL;
  427. }
  428. server->status = JSONRPC_SERVER_FAILURE;
  429. int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
  430. if (timerfd == -1) {
  431. LM_ERR("Could not create timerfd to reschedule connection. No further attempts will be made to reconnect this server.");
  432. return -1;
  433. }
  434. struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec));
  435. CHECK_MALLOC(itime);
  436. itime->it_interval.tv_sec = 0;
  437. itime->it_interval.tv_nsec = 0;
  438. itime->it_value.tv_sec = JSONRPC_RECONNECT_INTERVAL;
  439. itime->it_value.tv_nsec = 0;
  440. if (timerfd_settime(timerfd, 0, itime, NULL) == -1)
  441. {
  442. LM_ERR("Could not set timer to reschedule connection. No further attempts will be made to reconnect this server.");
  443. return -1;
  444. }
  445. LM_INFO("timerfd value is %d\n", timerfd);
  446. struct event *timer_ev = pkg_malloc(sizeof(struct event));
  447. CHECK_MALLOC(timer_ev);
  448. event_set(timer_ev, timerfd, EV_READ, reconnect_cb, server);
  449. if(event_add(timer_ev, NULL) == -1) {
  450. LM_ERR("event_add failed while rescheduling connection (%s). No further attempts will be made to reconnect this server.", strerror(errno));
  451. return -1;
  452. }
  453. server->ev = timer_ev;
  454. server->timer = itime;
  455. return 0;
  456. }
  457. void free_pipe_cmd(struct jsonrpc_pipe_cmd *cmd)
  458. {
  459. if (cmd->method)
  460. shm_free(cmd->method);
  461. if (cmd->params)
  462. shm_free(cmd->params);
  463. if (cmd->cb_route)
  464. shm_free(cmd->cb_route);
  465. if (cmd->err_route)
  466. shm_free(cmd->err_route);
  467. if (cmd->cb_pv)
  468. shm_free(cmd->cb_pv);
  469. shm_free(cmd);
  470. }