mqueue_api.c 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. /**
  2. * $Id$
  3. *
  4. * Copyright (C) 2010 Elena-Ramona Modroiu (asipto.com)
  5. *
  6. * This file is part of Kamailio, a free SIP server.
  7. *
  8. * This file is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU General Public License as published by
  10. * the Free Software Foundation; either version 2 of the License, or
  11. * (at your option) any later version
  12. *
  13. * This file is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License
  19. * along with this program; if not, write to the Free Software
  20. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  21. *
  22. */
  23. #include <stdio.h>
  24. #include <unistd.h>
  25. #include <stdlib.h>
  26. #include <string.h>
  27. #include "../../dprint.h"
  28. #include "../../mem/mem.h"
  29. #include "../../mem/shm_mem.h"
  30. #include "../../parser/parse_param.h"
  31. #include "../../ut.h"
  32. #include "../../shm_init.h"
  33. #include "../../lib/kcore/faked_msg.h"
  34. #include "mqueue_api.h"
  35. /**
  36. *
  37. */
  38. typedef struct _mq_item
  39. {
  40. str key;
  41. str val;
  42. struct _mq_item *prev;
  43. struct _mq_item *next;
  44. } mq_item_t;
  45. /**
  46. *
  47. */
  48. typedef struct _mq_head
  49. {
  50. str name;
  51. int msize;
  52. int csize;
  53. gen_lock_t lock;
  54. mq_item_t *ifirst;
  55. mq_item_t *ilast;
  56. struct _mq_head *next;
  57. } mq_head_t;
  58. /**
  59. *
  60. */
  61. typedef struct _mq_pv
  62. {
  63. str *name;
  64. mq_item_t *item;
  65. struct _mq_pv *next;
  66. } mq_pv_t;
  67. /**
  68. *
  69. */
  70. static mq_head_t *_mq_head_list = NULL;
  71. /**
  72. *
  73. */
  74. static mq_pv_t *_mq_pv_list = NULL;
  75. /**
  76. *
  77. */
  78. int mq_head_defined(void)
  79. {
  80. if(_mq_head_list!=NULL)
  81. return 1;
  82. return 0;
  83. }
  84. /**
  85. *
  86. */
  87. void mq_destroy(void)
  88. {
  89. mq_head_t *mh = NULL;
  90. mq_pv_t *mp = NULL;
  91. mq_item_t *mi = NULL;
  92. mq_head_t *mh1 = NULL;
  93. mq_pv_t *mp1 = NULL;
  94. mq_item_t *mi1 = NULL;
  95. mh = _mq_head_list;
  96. while(mh!=NULL)
  97. {
  98. mi = mh->ifirst;
  99. while(mi!=NULL)
  100. {
  101. mi1 = mi;
  102. mi = mi->next;
  103. shm_free(mi1);
  104. }
  105. mh1 = mh;
  106. mh = mh->next;
  107. lock_destroy(&mh1->lock);
  108. shm_free(mh1);
  109. }
  110. _mq_head_list = 0;
  111. mp = _mq_pv_list;
  112. while(mp!=NULL)
  113. {
  114. mp1 = mp;
  115. mp = mp->next;
  116. pkg_free(mp1);
  117. }
  118. }
  119. /**
  120. *
  121. */
  122. int mq_head_add(str *name, int msize)
  123. {
  124. mq_head_t *mh = NULL;
  125. mq_pv_t *mp = NULL;
  126. int len;
  127. if(!shm_initialized())
  128. {
  129. LM_ERR("shm not intialized - cannot define mqueue now\n");
  130. return 0;
  131. }
  132. mh = _mq_head_list;
  133. while(mh!=NULL)
  134. {
  135. if(name->len == mh->name.len
  136. && strncmp(mh->name.s, name->s, name->len)==0)
  137. {
  138. LM_ERR("mqueue redefined: %.*s\n", name->len, name->s);
  139. return -1;
  140. }
  141. mh = mh->next;
  142. }
  143. mp = (mq_pv_t*)pkg_malloc(sizeof(mq_pv_t));
  144. if(mp==NULL)
  145. {
  146. LM_ERR("no more pkg for: %.*s\n", name->len, name->s);
  147. return -1;
  148. }
  149. memset(mp, 0, sizeof(mq_pv_t));
  150. len = sizeof(mq_head_t) + name->len + 1;
  151. mh = (mq_head_t*)shm_malloc(len);
  152. if(mh==NULL)
  153. {
  154. LM_ERR("no more shm for: %.*s\n", name->len, name->s);
  155. pkg_free(mp);
  156. return -1;
  157. }
  158. memset(mh, 0, len);
  159. if (lock_init(&mh->lock)==0 )
  160. {
  161. LM_CRIT("failed to init lock\n");
  162. pkg_free(mp);
  163. shm_free(mh);
  164. return -1;
  165. }
  166. mh->name.s = (char*)mh + sizeof(mq_head_t);
  167. memcpy(mh->name.s, name->s, name->len);
  168. mh->name.len = name->len;
  169. mh->name.s[name->len] = '\0';
  170. mh->msize = msize;
  171. mh->next = _mq_head_list;
  172. _mq_head_list = mh;
  173. mp->name = &mh->name;
  174. mp->next = _mq_pv_list;
  175. _mq_pv_list = mp;
  176. return 0;
  177. }
  178. /**
  179. *
  180. */
  181. mq_head_t *mq_head_get(str *name)
  182. {
  183. mq_head_t *mh = NULL;
  184. mh = _mq_head_list;
  185. while(mh!=NULL)
  186. {
  187. if(name->len == mh->name.len
  188. && strncmp(mh->name.s, name->s, name->len)==0)
  189. {
  190. return mh;
  191. }
  192. mh = mh->next;
  193. }
  194. return NULL;
  195. }
  196. /**
  197. *
  198. */
  199. mq_pv_t *mq_pv_get(str *name)
  200. {
  201. mq_pv_t *mp = NULL;
  202. mp = _mq_pv_list;
  203. while(mp!=NULL)
  204. {
  205. if(mp->name->len==name->len
  206. && strncmp(mp->name->s, name->s, name->len)==0)
  207. return mp;
  208. mp = mp->next;
  209. }
  210. return NULL;
  211. }
  212. /**
  213. *
  214. */
  215. int mq_head_fetch(str *name)
  216. {
  217. mq_head_t *mh = NULL;
  218. mq_pv_t *mp = NULL;
  219. mp = mq_pv_get(name);
  220. if(mp==NULL)
  221. return -1;
  222. if(mp->item!=NULL)
  223. {
  224. shm_free(mp->item);
  225. mp->item = NULL;
  226. }
  227. mh = mq_head_get(name);
  228. if(mh==NULL)
  229. return -1;
  230. lock_get(&mh->lock);
  231. if(mh->ifirst==NULL)
  232. {
  233. /* empty queue */
  234. lock_release(&mh->lock);
  235. return -2;
  236. }
  237. mp->item = mh->ifirst;
  238. mh->ifirst = mh->ifirst->next;
  239. if(mh->ifirst==NULL) {
  240. mh->ilast = NULL;
  241. } else {
  242. mh->ifirst->prev = NULL;
  243. }
  244. mh->csize--;
  245. lock_release(&mh->lock);
  246. return 0;
  247. }
  248. /**
  249. *
  250. */
  251. void mq_pv_free(str *name)
  252. {
  253. mq_pv_t *mp = NULL;
  254. mp = mq_pv_get(name);
  255. if(mp==NULL)
  256. return;
  257. if(mp->item!=NULL)
  258. {
  259. shm_free(mp->item);
  260. mp->item = NULL;
  261. }
  262. }
  263. /**
  264. *
  265. */
  266. int mq_item_add(str *qname, str *key, str *val)
  267. {
  268. mq_head_t *mh = NULL;
  269. mq_item_t *mi = NULL;
  270. int len;
  271. mh = mq_head_get(qname);
  272. if(mh==NULL)
  273. {
  274. LM_ERR("mqueue not found: %.*s\n", qname->len, qname->s);
  275. return -1;
  276. }
  277. len = sizeof(mq_item_t) + key->len + val->len + 2;
  278. mi = (mq_item_t*)shm_malloc(len);
  279. if(mi==NULL)
  280. {
  281. LM_ERR("no more shm to add to: %.*s\n", qname->len, qname->s);
  282. return -1;
  283. }
  284. memset(mi, 0, len);
  285. mi->key.s = (char*)mi + sizeof(mq_item_t);
  286. memcpy(mi->key.s, key->s, key->len);
  287. mi->key.len = key->len;
  288. mi->key.s[key->len] = '\0';
  289. mi->val.s = mi->key.s + mi->key.len + 1;
  290. memcpy(mi->val.s, val->s, val->len);
  291. mi->val.len = val->len;
  292. mi->val.s[val->len] = '\0';
  293. lock_get(&mh->lock);
  294. if(mh->ifirst==NULL)
  295. {
  296. mh->ifirst = mi;
  297. mh->ilast = mi;
  298. } else {
  299. mh->ilast->next = mi;
  300. mi->prev = mh->ilast;
  301. mh->ilast = mi;
  302. }
  303. mh->csize++;
  304. if(mh->msize>0 && mh->csize>mh->msize)
  305. {
  306. mi = mh->ifirst;
  307. mh->ifirst = mh->ifirst->next;
  308. if(mh->ifirst==NULL)
  309. mh->ilast = NULL;
  310. else
  311. mh->ifirst->prev = NULL;
  312. mh->csize--;
  313. shm_free(mi);
  314. }
  315. lock_release(&mh->lock);
  316. return 0;
  317. }
  318. /**
  319. *
  320. */
  321. int pv_parse_mq_name(pv_spec_t *sp, str *in)
  322. {
  323. sp->pvp.pvn.u.isname.name.s = *in;
  324. sp->pvp.pvn.type = PV_NAME_INTSTR;
  325. sp->pvp.pvn.u.isname.type = 1;
  326. return 0;
  327. }
  328. str *pv_get_mq_name(sip_msg_t *msg, str *in)
  329. {
  330. static str queue;
  331. pv_spec_t *pvs;
  332. pv_value_t pvv;
  333. if (in->s[0] != '$')
  334. return in;
  335. else
  336. {
  337. if (pv_locate_name(in) != in->len)
  338. {
  339. LM_ERR("invalid pv [%.*s]\n", in->len, in->s);
  340. return NULL;
  341. }
  342. if ((pvs = pv_cache_get(in)) == NULL)
  343. {
  344. LM_ERR("failed to get pv spec for [%.*s]\n", in->len, in->s);
  345. return NULL;
  346. }
  347. memset(&pvv, 0, sizeof(pv_value_t));
  348. if (msg==NULL && faked_msg_init() < 0)
  349. {
  350. LM_ERR("faked_msg_init() failed\n");
  351. return NULL;
  352. }
  353. if (pv_get_spec_value((msg)?msg:faked_msg_next(), pvs, &pvv) != 0)
  354. {
  355. LM_ERR("failed to get pv value for [%.*s]\n", in->len, in->s);
  356. return NULL;
  357. }
  358. queue = pvv.rs;
  359. }
  360. return &queue;
  361. }
  362. /**
  363. *
  364. */
  365. int pv_get_mqk(struct sip_msg *msg, pv_param_t *param,
  366. pv_value_t *res)
  367. {
  368. mq_pv_t *mp = NULL;
  369. str *in = pv_get_mq_name(msg, &param->pvn.u.isname.name.s);
  370. if (in == NULL)
  371. {
  372. LM_ERR("failed to get mq name\n");
  373. return -1;
  374. }
  375. if (mq_head_get(in) == NULL)
  376. {
  377. LM_ERR("mqueue not found: %.*s\n", in->len, in->s);
  378. return -1;
  379. }
  380. mp = mq_pv_get(in);
  381. if(mp==NULL || mp->item==NULL || mp->item->key.len<=0)
  382. return pv_get_null(msg, param, res);
  383. return pv_get_strval(msg, param, res, &mp->item->key);
  384. }
  385. /**
  386. *
  387. */
  388. int pv_get_mqv(struct sip_msg *msg, pv_param_t *param,
  389. pv_value_t *res)
  390. {
  391. mq_pv_t *mp = NULL;
  392. str *in = pv_get_mq_name(msg, &param->pvn.u.isname.name.s);
  393. if (in == NULL)
  394. {
  395. LM_ERR("failed to get mq name\n");
  396. return -1;
  397. }
  398. if (mq_head_get(in) == NULL)
  399. {
  400. LM_ERR("mqueue not found: %.*s\n", in->len, in->s);
  401. return -1;
  402. }
  403. mp = mq_pv_get(in);
  404. if(mp==NULL || mp->item==NULL || mp->item->val.len<=0)
  405. return pv_get_null(msg, param, res);
  406. return pv_get_strval(msg, param, res, &mp->item->val);
  407. }
  408. /**
  409. *
  410. */
  411. int pv_get_mq_size(struct sip_msg *msg, pv_param_t *param,
  412. pv_value_t *res)
  413. {
  414. int mqs = -1;
  415. str *in = pv_get_mq_name(msg, &param->pvn.u.isname.name.s);
  416. if (in == NULL)
  417. {
  418. LM_ERR("failed to get mq name\n");
  419. return -1;
  420. }
  421. mqs = _mq_get_csize(in);
  422. if (mqs < 0)
  423. {
  424. LM_ERR("mqueue not found: %.*s\n", in->len, in->s);
  425. return -1;
  426. }
  427. return pv_get_sintval(msg, param, res, mqs);
  428. }
  429. /**
  430. * Return head->csize for a given queue
  431. */
  432. int _mq_get_csize(str *name)
  433. {
  434. mq_head_t *mh = mq_head_get(name);
  435. int mqueue_size = 0;
  436. if(mh == NULL)
  437. return -1;
  438. lock_get(&mh->lock);
  439. mqueue_size = mh->csize;
  440. lock_release(&mh->lock);
  441. return mqueue_size;
  442. }