async_http.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. /**
  2. * Copyright 2016 (C) Federico Cabiddu <[email protected]>
  3. * Copyright 2016 (C) Giacomo Vacca <[email protected]>
  4. * Copyright 2016 (C) Orange - Camille Oudot <[email protected]>
  5. *
  6. * This file is part of Kamailio, a free SIP server.
  7. *
  8. * This file is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU General Public License as published by
  10. * the Free Software Foundation; either version 2 of the License, or
  11. * (at your option) any later version
  12. *
  13. *
  14. * This file is distributed in the hope that it will be useful,
  15. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. * GNU General Public License for more details.
  18. *
  19. * You should have received a copy of the GNU General Public License
  20. * along with this program; if not, write to the Free Software
  21. * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  22. *
  23. */
  24. /*! \file
  25. * \brief Kamailio http_async_client :: Include file
  26. * \ingroup http_async_client
  27. */
  28. #include <stdio.h>
  29. #include <unistd.h>
  30. #include <stdlib.h>
  31. #include <string.h>
  32. #include <sys/socket.h>
  33. #include <sys/types.h>
  34. #include <netinet/in.h>
  35. #include <arpa/inet.h>
  36. #include <fcntl.h>
  37. #include <event2/event.h>
  38. #include "../../sr_module.h"
  39. #include "../../dprint.h"
  40. #include "../../ut.h"
  41. #include "../../cfg/cfg_struct.h"
  42. #include "../../lib/kcore/faked_msg.h"
  43. #include "../../modules/tm/tm_load.h"
  44. #include "async_http.h"
  45. /* tm */
  46. extern struct tm_binds tmb;
  47. struct sip_msg *ah_reply = NULL;
  48. str ah_error = {NULL, 0};
  49. async_http_worker_t *workers;
  50. int num_workers = 1;
  51. struct query_params ah_params;
  52. int async_http_init_worker(int prank, async_http_worker_t* worker)
  53. {
  54. LM_DBG("initializing worker process: %d\n", prank);
  55. worker->evbase = event_base_new();
  56. LM_DBG("base event %p created\n", worker->evbase);
  57. worker->g = shm_malloc(sizeof(struct http_m_global));
  58. memset(worker->g, 0, sizeof(http_m_global_t));
  59. LM_DBG("initialized global struct %p\n", worker->g);
  60. init_socket(worker);
  61. LM_INFO("started worker process: %d\n", prank);
  62. return 0;
  63. }
  64. void async_http_run_worker(async_http_worker_t* worker)
  65. {
  66. init_http_multi(worker->evbase, worker->g);
  67. event_base_dispatch(worker->evbase);
  68. }
  69. int async_http_init_sockets(async_http_worker_t *worker)
  70. {
  71. if (socketpair(PF_UNIX, SOCK_DGRAM, 0, worker->notication_socket) < 0) {
  72. LM_ERR("opening tasks dgram socket pair\n");
  73. return -1;
  74. }
  75. LM_INFO("inter-process event notification sockets initialized\n");
  76. return 0;
  77. }
  78. void async_http_cb(struct http_m_reply *reply, void *param)
  79. {
  80. async_query_t *aq;
  81. cfg_action_t *act;
  82. unsigned int tindex;
  83. unsigned int tlabel;
  84. struct cell *t = NULL;
  85. sip_msg_t *fmsg;
  86. if (reply->result != NULL) {
  87. LM_DBG("query result = %.*s [%d]\n", reply->result->len, reply->result->s, reply->result->len);
  88. }
  89. /* clean process-local result variables */
  90. ah_error.s = NULL;
  91. ah_error.len = 0;
  92. memset(ah_reply, 0, sizeof(struct sip_msg));
  93. /* set process-local result variables */
  94. if (reply->result == NULL) {
  95. /* error */
  96. ah_error.s = reply->error;
  97. ah_error.len = strlen(ah_error.s);
  98. } else {
  99. /* success */
  100. ah_reply->buf = reply->result->s;
  101. ah_reply->len = reply->result->len;
  102. if (parse_msg(reply->result->s, reply->result->len, ah_reply) != 0) {
  103. LM_DBG("failed to parse the http_reply\n");
  104. } else {
  105. LM_DBG("successfully parsed http reply %p\n", ah_reply);
  106. }
  107. }
  108. aq = param;
  109. act = (cfg_action_t*)aq->param;
  110. if (aq->query_params.suspend_transaction) {
  111. tindex = aq->tindex;
  112. tlabel = aq->tlabel;
  113. if (tmb.t_lookup_ident(&t, tindex, tlabel) < 0) {
  114. LM_ERR("transaction not found %d:%d\n", tindex, tlabel);
  115. LM_DBG("freeing query %p\n", aq);
  116. free_async_query(aq);
  117. return;
  118. }
  119. // we bring the list of AVPs of the transaction to the current context
  120. set_avp_list(AVP_TRACK_FROM | AVP_CLASS_URI, &t->uri_avps_from);
  121. set_avp_list(AVP_TRACK_TO | AVP_CLASS_URI, &t->uri_avps_to);
  122. set_avp_list(AVP_TRACK_FROM | AVP_CLASS_USER, &t->user_avps_from);
  123. set_avp_list(AVP_TRACK_TO | AVP_CLASS_USER, &t->user_avps_to);
  124. set_avp_list(AVP_TRACK_FROM | AVP_CLASS_DOMAIN, &t->domain_avps_from);
  125. set_avp_list(AVP_TRACK_TO | AVP_CLASS_DOMAIN, &t->domain_avps_to);
  126. if (t)
  127. tmb.unref_cell(t);
  128. LM_DBG("resuming transaction (%d:%d)\n", tindex, tlabel);
  129. if(act!=NULL)
  130. tmb.t_continue(tindex, tlabel, act);
  131. } else {
  132. fmsg = faked_msg_next();
  133. if (run_top_route(act, fmsg, 0)<0)
  134. LM_ERR("failure inside run_top_route\n");
  135. }
  136. free_sip_msg(ah_reply);
  137. free_async_query(aq);
  138. return;
  139. }
  140. void notification_socket_cb(int fd, short event, void *arg)
  141. {
  142. (void)fd; /* unused */
  143. (void)event; /* unused */
  144. const async_http_worker_t *worker = (async_http_worker_t *) arg;
  145. int received;
  146. int i;
  147. async_query_t *aq;
  148. http_m_params_t query_params;
  149. str query;
  150. str post;
  151. if ((received = recvfrom(worker->notication_socket[0],
  152. &aq, sizeof(async_query_t*),
  153. 0, NULL, 0)) < 0) {
  154. LM_ERR("failed to read from socket (%d: %s)\n", errno, strerror(errno));
  155. return;
  156. }
  157. if(received != sizeof(async_query_t*)) {
  158. LM_ERR("invalid query size %d\n", received);
  159. return;
  160. }
  161. query = ((str)aq->query);
  162. post = ((str)aq->post);
  163. query_params.timeout = aq->query_params.timeout;
  164. query_params.verify_peer = aq->query_params.verify_peer;
  165. query_params.verify_host = aq->query_params.verify_host;
  166. query_params.headers = NULL;
  167. for (i = 0 ; i < aq->query_params.headers.len ; i++) {
  168. query_params.headers = curl_slist_append(query_params.headers, aq->query_params.headers.t[i]);
  169. }
  170. query_params.method = aq->query_params.method;
  171. query_params.ssl_cert.s = NULL;
  172. query_params.ssl_cert.len = 0;
  173. if (aq->query_params.ssl_cert.s && aq->query_params.ssl_cert.len > 0) {
  174. if (shm_str_dup(&query_params.ssl_cert, &(aq->query_params.ssl_cert)) < 0) {
  175. LM_ERR("Error allocating query_params.ssl_cert\n");
  176. return;
  177. }
  178. }
  179. query_params.ssl_key.s = NULL;
  180. query_params.ssl_key.len = 0;
  181. if (aq->query_params.ssl_key.s && aq->query_params.ssl_key.len > 0) {
  182. if (shm_str_dup(&query_params.ssl_key, &(aq->query_params.ssl_key)) < 0) {
  183. LM_ERR("Error allocating query_params.ssl_key\n");
  184. return;
  185. }
  186. }
  187. query_params.ca_path.s = NULL;
  188. query_params.ca_path.len = 0;
  189. if (aq->query_params.ca_path.s && aq->query_params.ca_path.len > 0) {
  190. if (shm_str_dup(&query_params.ca_path, &(aq->query_params.ca_path)) < 0) {
  191. LM_ERR("Error allocating query_params.ca_path\n");
  192. return;
  193. }
  194. }
  195. LM_DBG("query received: [%.*s] (%p)\n", query.len, query.s, aq);
  196. if (new_request(&query, &post, &query_params, async_http_cb, aq) < 0) {
  197. LM_ERR("Cannot create request for %.*s\n", query.len, query.s);
  198. free_async_query(aq);
  199. }
  200. if (query_params.ssl_cert.s && query_params.ssl_cert.len > 0) {
  201. shm_free(query_params.ssl_cert.s);
  202. query_params.ssl_cert.s = NULL;
  203. query_params.ssl_cert.len = 0;
  204. }
  205. if (query_params.ssl_key.s && query_params.ssl_key.len > 0) {
  206. shm_free(query_params.ssl_key.s);
  207. query_params.ssl_key.s = NULL;
  208. query_params.ssl_key.len = 0;
  209. }
  210. if (query_params.ca_path.s && query_params.ca_path.len > 0) {
  211. shm_free(query_params.ca_path.s);
  212. query_params.ca_path.s = NULL;
  213. query_params.ca_path.len = 0;
  214. }
  215. return;
  216. }
  217. int init_socket(async_http_worker_t *worker)
  218. {
  219. worker->socket_event = event_new(worker->evbase, worker->notication_socket[0], EV_READ|EV_PERSIST, notification_socket_cb, worker);
  220. event_add(worker->socket_event, NULL);
  221. return (0);
  222. }
  223. int async_send_query(sip_msg_t *msg, str *query, str *post, cfg_action_t *act)
  224. {
  225. async_query_t *aq;
  226. unsigned int tindex = 0;
  227. unsigned int tlabel = 0;
  228. short suspend = 0;
  229. int dsize;
  230. tm_cell_t *t = 0;
  231. if(query==0) {
  232. LM_ERR("invalid parameters\n");
  233. return -1;
  234. }
  235. t = tmb.t_gett();
  236. if (t==NULL || t==T_UNDEFINED) {
  237. LM_DBG("no pre-existing transaction, switching to transaction-less behavior\n");
  238. } else if (!ah_params.suspend_transaction) {
  239. LM_DBG("transaction won't be suspended\n");
  240. } else {
  241. if(tmb.t_suspend==NULL) {
  242. LM_ERR("http async query is disabled - tm module not loaded\n");
  243. return -1;
  244. }
  245. if(tmb.t_suspend(msg, &tindex, &tlabel)<0) {
  246. LM_ERR("failed to suspend request processing\n");
  247. return -1;
  248. }
  249. suspend = 1;
  250. LM_DBG("transaction suspended [%u:%u]\n", tindex, tlabel);
  251. }
  252. dsize = sizeof(async_query_t);
  253. aq = (async_query_t*)shm_malloc(dsize);
  254. if(aq==NULL)
  255. {
  256. LM_ERR("no more shm\n");
  257. goto error;
  258. }
  259. memset(aq,0,dsize);
  260. if(shm_str_dup(&aq->query, query)<0) {
  261. goto error;
  262. }
  263. if (post != NULL) {
  264. if(shm_str_dup(&aq->post, post)<0) {
  265. goto error;
  266. }
  267. }
  268. aq->param = act;
  269. aq->tindex = tindex;
  270. aq->tlabel = tlabel;
  271. aq->query_params.verify_peer = ah_params.verify_peer;
  272. aq->query_params.verify_host = ah_params.verify_host;
  273. aq->query_params.suspend_transaction = suspend;
  274. aq->query_params.timeout = ah_params.timeout;
  275. aq->query_params.headers = ah_params.headers;
  276. aq->query_params.method = ah_params.method;
  277. aq->query_params.ssl_cert.s = NULL;
  278. aq->query_params.ssl_cert.len = 0;
  279. if (ah_params.ssl_cert.s && ah_params.ssl_cert.len > 0) {
  280. if (shm_str_dup(&aq->query_params.ssl_cert, &(ah_params.ssl_cert)) < 0) {
  281. LM_ERR("Error allocating aq->query_params.ssl_cert\n");
  282. goto error;
  283. }
  284. }
  285. aq->query_params.ssl_key.s = NULL;
  286. aq->query_params.ssl_key.len = 0;
  287. if (ah_params.ssl_key.s && ah_params.ssl_key.len > 0) {
  288. if (shm_str_dup(&aq->query_params.ssl_key, &(ah_params.ssl_key)) < 0) {
  289. LM_ERR("Error allocating aq->query_params.ssl_key\n");
  290. goto error;
  291. }
  292. }
  293. aq->query_params.ca_path.s = NULL;
  294. aq->query_params.ca_path.len = 0;
  295. if (ah_params.ca_path.s && ah_params.ca_path.len > 0) {
  296. if (shm_str_dup(&aq->query_params.ca_path, &(ah_params.ca_path)) < 0) {
  297. LM_ERR("Error allocating aq->query_params.ca_path\n");
  298. goto error;
  299. }
  300. }
  301. set_query_params(&ah_params);
  302. if(async_push_query(aq)<0) {
  303. LM_ERR("failed to relay query: %.*s\n", query->len, query->s);
  304. goto error;
  305. }
  306. if (suspend) {
  307. /* force exit in config */
  308. return 0;
  309. }
  310. /* continue route processing */
  311. return 1;
  312. error:
  313. if (suspend) {
  314. tmb.t_cancel_suspend(tindex, tlabel);
  315. }
  316. free_async_query(aq);
  317. return -1;
  318. }
  319. int async_push_query(async_query_t *aq)
  320. {
  321. int len;
  322. int worker;
  323. static unsigned long rr = 0; /* round robin */
  324. str query;
  325. query = ((str)aq->query);
  326. worker = rr++ % num_workers;
  327. len = write(workers[worker].notication_socket[1], &aq, sizeof(async_query_t*));
  328. if(len<=0) {
  329. LM_ERR("failed to pass the query to async workers\n");
  330. return -1;
  331. }
  332. LM_DBG("query sent [%.*s] (%p) to worker %d\n", query.len, query.s, aq, worker + 1);
  333. return 0;
  334. }
  335. void init_query_params(struct query_params *p) {
  336. memset(&ah_params, 0, sizeof(struct query_params));
  337. set_query_params(p);
  338. }
  339. void set_query_params(struct query_params *p) {
  340. p->headers.len = 0;
  341. p->headers.t = NULL;
  342. p->verify_host = verify_host;
  343. p->verify_peer = verify_peer;
  344. p->suspend_transaction = 1;
  345. p->timeout = http_timeout;
  346. p->method = AH_METH_DEFAULT;
  347. if (p->ssl_cert.s && p->ssl_cert.len > 0) {
  348. shm_free(p->ssl_cert.s);
  349. p->ssl_cert.s = NULL;
  350. p->ssl_cert.len = 0;
  351. }
  352. if (ssl_cert.s && ssl_cert.len > 0) {
  353. if (shm_str_dup(&p->ssl_cert, &ssl_cert) < 0) {
  354. LM_ERR("Error allocating ssl_cert\n");
  355. return;
  356. }
  357. }
  358. if (p->ssl_key.s && p->ssl_key.len > 0) {
  359. shm_free(p->ssl_key.s);
  360. p->ssl_key.s = NULL;
  361. p->ssl_key.len = 0;
  362. }
  363. if (ssl_key.s && ssl_key.len > 0) {
  364. if (shm_str_dup(&p->ssl_key, &ssl_key) < 0) {
  365. LM_ERR("Error allocating ssl_key\n");
  366. return;
  367. }
  368. }
  369. if (p->ca_path.s && p->ca_path.len > 0) {
  370. shm_free(p->ca_path.s);
  371. p->ca_path.s = NULL;
  372. p->ca_path.len = 0;
  373. }
  374. if (ca_path.s && ca_path.len > 0) {
  375. if (shm_str_dup(&p->ca_path, &ca_path) < 0) {
  376. LM_ERR("Error allocating ca_path\n");
  377. return;
  378. }
  379. }
  380. }