mqueue_api.c 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. /**
  2. * $Id$
  3. *
  4. * Copyright (C) 2010 Elena-Ramona Modroiu (asipto.com)
  5. *
  6. * This file is part of Kamailio, a free SIP server.
  7. *
  8. * This file is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU General Public License as published by
  10. * the Free Software Foundation; either version 2 of the License, or
  11. * (at your option) any later version
  12. *
  13. * This file is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License
  19. * along with this program; if not, write to the Free Software
  20. * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  21. *
  22. */
  23. #include <stdio.h>
  24. #include <unistd.h>
  25. #include <stdlib.h>
  26. #include <string.h>
  27. #include "../../dprint.h"
  28. #include "../../mem/mem.h"
  29. #include "../../mem/shm_mem.h"
  30. #include "../../parser/parse_param.h"
  31. #include "../../ut.h"
  32. #include "../../shm_init.h"
  33. #include "mqueue_api.h"
  34. /**
  35. *
  36. */
  37. typedef struct _mq_item
  38. {
  39. str key;
  40. str val;
  41. struct _mq_item *prev;
  42. struct _mq_item *next;
  43. } mq_item_t;
  44. /**
  45. *
  46. */
  47. typedef struct _mq_head
  48. {
  49. str name;
  50. int msize;
  51. int csize;
  52. gen_lock_t lock;
  53. mq_item_t *ifirst;
  54. mq_item_t *ilast;
  55. struct _mq_head *next;
  56. } mq_head_t;
  57. /**
  58. *
  59. */
  60. typedef struct _mq_pv
  61. {
  62. str *name;
  63. mq_item_t *item;
  64. struct _mq_pv *next;
  65. } mq_pv_t;
  66. /**
  67. *
  68. */
  69. static mq_head_t *_mq_head_list = NULL;
  70. /**
  71. *
  72. */
  73. static mq_pv_t *_mq_pv_list = NULL;
  74. /**
  75. *
  76. */
  77. int mq_head_defined(void)
  78. {
  79. if(_mq_head_list!=NULL)
  80. return 1;
  81. return 0;
  82. }
  83. /**
  84. *
  85. */
  86. void mq_destroy(void)
  87. {
  88. mq_head_t *mh = NULL;
  89. mq_pv_t *mp = NULL;
  90. mq_item_t *mi = NULL;
  91. mq_head_t *mh1 = NULL;
  92. mq_pv_t *mp1 = NULL;
  93. mq_item_t *mi1 = NULL;
  94. mh = _mq_head_list;
  95. while(mh!=NULL)
  96. {
  97. mi = mh->ifirst;
  98. while(mi!=NULL)
  99. {
  100. mi1 = mi;
  101. mi = mi->next;
  102. shm_free(mi1);
  103. }
  104. mh1 = mh;
  105. mh = mh->next;
  106. lock_destroy(&mh1->lock);
  107. shm_free(mh1);
  108. }
  109. _mq_head_list = 0;
  110. mp = _mq_pv_list;
  111. while(mp!=NULL)
  112. {
  113. mp1 = mp;
  114. mp = mp->next;
  115. pkg_free(mp1);
  116. }
  117. }
  118. /**
  119. *
  120. */
  121. int mq_head_add(str *name, int msize)
  122. {
  123. mq_head_t *mh = NULL;
  124. mq_pv_t *mp = NULL;
  125. int len;
  126. if(!shm_initialized())
  127. {
  128. LM_ERR("shm not intialized - cannot define mqueue now\n");
  129. return 0;
  130. }
  131. mh = _mq_head_list;
  132. while(mh!=NULL)
  133. {
  134. if(name->len == mh->name.len
  135. && strncmp(mh->name.s, name->s, name->len)==0)
  136. {
  137. LM_ERR("mqueue redefined: %.*s\n", name->len, name->s);
  138. return -1;
  139. }
  140. mh = mh->next;
  141. }
  142. mp = (mq_pv_t*)pkg_malloc(sizeof(mq_pv_t));
  143. if(mp==NULL)
  144. {
  145. LM_ERR("no more pkg for: %.*s\n", name->len, name->s);
  146. return -1;
  147. }
  148. memset(mp, 0, sizeof(mq_pv_t));
  149. len = sizeof(mq_head_t) + name->len + 1;
  150. mh = (mq_head_t*)shm_malloc(len);
  151. if(mh==NULL)
  152. {
  153. LM_ERR("no more shm for: %.*s\n", name->len, name->s);
  154. pkg_free(mp);
  155. return -1;
  156. }
  157. memset(mh, 0, len);
  158. if (lock_init(&mh->lock)==0 )
  159. {
  160. LM_CRIT("failed to init lock\n");
  161. pkg_free(mp);
  162. shm_free(mh);
  163. return -1;
  164. }
  165. mh->name.s = (char*)mh + sizeof(mq_head_t);
  166. memcpy(mh->name.s, name->s, name->len);
  167. mh->name.len = name->len;
  168. mh->name.s[name->len] = '\0';
  169. mh->msize = msize;
  170. mh->next = _mq_head_list;
  171. _mq_head_list = mh;
  172. mp->name = &mh->name;
  173. mp->next = _mq_pv_list;
  174. _mq_pv_list = mp;
  175. return 0;
  176. }
  177. /**
  178. *
  179. */
  180. mq_head_t *mq_head_get(str *name)
  181. {
  182. mq_head_t *mh = NULL;
  183. mh = _mq_head_list;
  184. while(mh!=NULL)
  185. {
  186. if(name->len == mh->name.len
  187. && strncmp(mh->name.s, name->s, name->len)==0)
  188. {
  189. return mh;
  190. }
  191. mh = mh->next;
  192. }
  193. return NULL;
  194. }
  195. /**
  196. *
  197. */
  198. mq_pv_t *mq_pv_get(str *name)
  199. {
  200. mq_pv_t *mp = NULL;
  201. mp = _mq_pv_list;
  202. while(mp!=NULL)
  203. {
  204. if(mp->name->len==name->len
  205. && strncmp(mp->name->s, name->s, name->len)==0)
  206. return mp;
  207. mp = mp->next;
  208. }
  209. return NULL;
  210. }
  211. /**
  212. *
  213. */
  214. int mq_head_fetch(str *name)
  215. {
  216. mq_head_t *mh = NULL;
  217. mq_pv_t *mp = NULL;
  218. mp = mq_pv_get(name);
  219. if(mp==NULL)
  220. return -1;
  221. if(mp->item!=NULL)
  222. {
  223. shm_free(mp->item);
  224. mp->item = NULL;
  225. }
  226. mh = mq_head_get(name);
  227. if(mh==NULL)
  228. return -1;
  229. lock_get(&mh->lock);
  230. if(mh->ifirst==NULL)
  231. {
  232. /* empty queue */
  233. lock_release(&mh->lock);
  234. return -2;
  235. }
  236. mp->item = mh->ifirst;
  237. mh->ifirst = mh->ifirst->next;
  238. if(mh->ifirst==NULL) {
  239. mh->ilast = NULL;
  240. } else {
  241. mh->ifirst->prev = NULL;
  242. }
  243. mh->csize--;
  244. lock_release(&mh->lock);
  245. return 0;
  246. }
  247. /**
  248. *
  249. */
  250. void mq_pv_free(str *name)
  251. {
  252. mq_pv_t *mp = NULL;
  253. mp = mq_pv_get(name);
  254. if(mp==NULL)
  255. return;
  256. if(mp->item!=NULL)
  257. {
  258. shm_free(mp->item);
  259. mp->item = NULL;
  260. }
  261. }
  262. /**
  263. *
  264. */
  265. int mq_item_add(str *qname, str *key, str *val)
  266. {
  267. mq_head_t *mh = NULL;
  268. mq_item_t *mi = NULL;
  269. int len;
  270. mh = mq_head_get(qname);
  271. if(mh==NULL)
  272. {
  273. LM_ERR("mqueue not found: %.*s\n", qname->len, qname->s);
  274. return -1;
  275. }
  276. len = sizeof(mq_item_t) + key->len + val->len + 2;
  277. mi = (mq_item_t*)shm_malloc(len);
  278. if(mi==NULL)
  279. {
  280. LM_ERR("no more shm to add to: %.*s\n", qname->len, qname->s);
  281. return -1;
  282. }
  283. memset(mi, 0, len);
  284. mi->key.s = (char*)mi + sizeof(mq_item_t);
  285. memcpy(mi->key.s, key->s, key->len);
  286. mi->key.len = key->len;
  287. mi->key.s[key->len] = '\0';
  288. mi->val.s = mi->key.s + mi->key.len + 1;
  289. memcpy(mi->val.s, val->s, val->len);
  290. mi->val.len = val->len;
  291. mi->val.s[val->len] = '\0';
  292. lock_get(&mh->lock);
  293. if(mh->ifirst==NULL)
  294. {
  295. mh->ifirst = mi;
  296. mh->ilast = mi;
  297. } else {
  298. mh->ilast->next = mi;
  299. mi->prev = mh->ilast;
  300. mh->ilast = mi;
  301. }
  302. mh->csize++;
  303. if(mh->msize>0 && mh->csize>mh->msize)
  304. {
  305. mi = mh->ifirst;
  306. mh->ifirst = mh->ifirst->next;
  307. if(mh->ifirst==NULL)
  308. mh->ilast = NULL;
  309. else
  310. mh->ifirst->prev = NULL;
  311. shm_free(mi);
  312. }
  313. lock_release(&mh->lock);
  314. return 0;
  315. }
  316. /**
  317. *
  318. */
  319. int pv_parse_mqk_name(pv_spec_t *sp, str *in)
  320. {
  321. mq_head_t *mh = NULL;
  322. mh = mq_head_get(in);
  323. if(mh==NULL)
  324. {
  325. LM_ERR("mqueue not found: %.*s\n", in->len, in->s);
  326. return -1;
  327. }
  328. sp->pvp.pvn.u.isname.name.s = *in;
  329. sp->pvp.pvn.type = PV_NAME_INTSTR;
  330. sp->pvp.pvn.u.isname.type = 1;
  331. return 0;
  332. }
  333. /**
  334. *
  335. */
  336. int pv_get_mqk(struct sip_msg *msg, pv_param_t *param,
  337. pv_value_t *res)
  338. {
  339. mq_pv_t *mp = NULL;
  340. mp = mq_pv_get(&param->pvn.u.isname.name.s);
  341. if(mp==NULL || mp->item==NULL || mp->item->key.len<=0)
  342. return pv_get_null(msg, param, res);
  343. return pv_get_strval(msg, param, res, &mp->item->key);
  344. }
  345. /**
  346. *
  347. */
  348. int pv_parse_mqv_name(pv_spec_t *sp, str *in)
  349. {
  350. mq_head_t *mh = NULL;
  351. mh = mq_head_get(in);
  352. if(mh==NULL)
  353. {
  354. LM_ERR("mqueue not found: %.*s\n", in->len, in->s);
  355. return -1;
  356. }
  357. sp->pvp.pvn.u.isname.name.s = *in;
  358. sp->pvp.pvn.type = PV_NAME_INTSTR;
  359. sp->pvp.pvn.u.isname.type = 1;
  360. return 0;
  361. }
  362. /**
  363. *
  364. */
  365. int pv_get_mqv(struct sip_msg *msg, pv_param_t *param,
  366. pv_value_t *res)
  367. {
  368. mq_pv_t *mp = NULL;
  369. mp = mq_pv_get(&param->pvn.u.isname.name.s);
  370. if(mp==NULL || mp->item==NULL || mp->item->val.len<=0)
  371. return pv_get_null(msg, param, res);
  372. return pv_get_strval(msg, param, res, &mp->item->val);
  373. }