async_sleep.c 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. /**
  2. * $Id$
  3. *
  4. * Copyright (C) 2011 Daniel-Constantin Mierla (asipto.com)
  5. *
  6. * This file is part of Kamailio, a free SIP server.
  7. *
  8. * This file is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU General Public License as published by
  10. * the Free Software Foundation; either version 2 of the License, or
  11. * (at your option) any later version
  12. *
  13. *
  14. * This file is distributed in the hope that it will be useful,
  15. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. * GNU General Public License for more details.
  18. *
  19. * You should have received a copy of the GNU General Public License
  20. * along with this program; if not, write to the Free Software
  21. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  22. *
  23. */
  24. #include <stdio.h>
  25. #include <unistd.h>
  26. #include <stdlib.h>
  27. #include <string.h>
  28. #include "../../dprint.h"
  29. #include "../../ut.h"
  30. #include "../../locking.h"
  31. #include "../../timer.h"
  32. #include "../../async_task.h"
  33. #include "../../modules/tm/tm_load.h"
  34. #include "async_sleep.h"
  35. /* tm */
  36. extern struct tm_binds tmb;
  37. typedef struct async_item {
  38. unsigned int tindex;
  39. unsigned int tlabel;
  40. unsigned int ticks;
  41. cfg_action_t *act;
  42. struct async_item *next;
  43. } async_item_t;
  44. typedef struct async_slot {
  45. async_item_t *lstart;
  46. async_item_t *lend;
  47. gen_lock_t lock;
  48. } async_slot_t;
  49. #define ASYNC_RING_SIZE 100
  50. static struct async_list_head {
  51. async_slot_t ring[ASYNC_RING_SIZE];
  52. async_slot_t *later;
  53. } *_async_list_head = NULL;
  54. int async_init_timer_list(void)
  55. {
  56. int i;
  57. _async_list_head = (struct async_list_head*)
  58. shm_malloc(sizeof(struct async_list_head));
  59. if(_async_list_head==NULL)
  60. {
  61. LM_ERR("no more shm\n");
  62. return -1;
  63. }
  64. memset(_async_list_head, 0, sizeof(struct async_list_head));
  65. for(i=0; i<ASYNC_RING_SIZE; i++)
  66. {
  67. if(lock_init(&_async_list_head->ring[i].lock)==0)
  68. {
  69. LM_ERR("cannot init lock at %d\n", i);
  70. i--;
  71. while(i>=0)
  72. {
  73. lock_destroy(&_async_list_head->ring[i].lock);
  74. i--;
  75. }
  76. shm_free(_async_list_head);
  77. _async_list_head = 0;
  78. return -1;
  79. }
  80. }
  81. return 0;
  82. }
  83. int async_destroy_timer_list(void)
  84. {
  85. int i;
  86. if(_async_list_head==NULL)
  87. return 0;
  88. for(i=0; i<ASYNC_RING_SIZE; i++)
  89. {
  90. /* TODO: clean the list */
  91. lock_destroy(&_async_list_head->ring[i].lock);
  92. }
  93. shm_free(_async_list_head);
  94. _async_list_head = 0;
  95. return 0;
  96. }
  97. int async_sleep(struct sip_msg* msg, int seconds, cfg_action_t *act)
  98. {
  99. int slot;
  100. unsigned int ticks;
  101. async_item_t *ai;
  102. tm_cell_t *t = 0;
  103. if(seconds<=0) {
  104. LM_ERR("negative or zero sleep time (%d)\n", seconds);
  105. return -1;
  106. }
  107. if(seconds>=ASYNC_RING_SIZE)
  108. {
  109. LM_ERR("max sleep time is %d sec (%d)\n", ASYNC_RING_SIZE, seconds);
  110. return -1;
  111. }
  112. t = tmb.t_gett();
  113. if (t==NULL || t==T_UNDEFINED)
  114. {
  115. if(tmb.t_newtran(msg)<0)
  116. {
  117. LM_ERR("cannot create the transaction\n");
  118. return -1;
  119. }
  120. t = tmb.t_gett();
  121. if (t==NULL || t==T_UNDEFINED)
  122. {
  123. LM_ERR("cannot lookup the transaction\n");
  124. return -1;
  125. }
  126. }
  127. ticks = seconds + get_ticks();
  128. slot = ticks % ASYNC_RING_SIZE;
  129. ai = (async_item_t*)shm_malloc(sizeof(async_item_t));
  130. if(ai==NULL)
  131. {
  132. LM_ERR("no more shm\n");
  133. return -1;
  134. }
  135. memset(ai, 0, sizeof(async_item_t));
  136. ai->ticks = ticks;
  137. ai->act = act;
  138. if(tmb.t_suspend(msg, &ai->tindex, &ai->tlabel)<0)
  139. {
  140. LM_ERR("failed to suppend the processing\n");
  141. shm_free(ai);
  142. return -1;
  143. }
  144. lock_get(&_async_list_head->ring[slot].lock);
  145. ai->next = _async_list_head->ring[slot].lstart;
  146. _async_list_head->ring[slot].lstart = ai;
  147. lock_release(&_async_list_head->ring[slot].lock);
  148. return 0;
  149. }
  150. void async_timer_exec(unsigned int ticks, void *param)
  151. {
  152. int slot;
  153. async_item_t *ai;
  154. if(_async_list_head==NULL)
  155. return;
  156. slot = ticks % ASYNC_RING_SIZE;
  157. while(1) {
  158. lock_get(&_async_list_head->ring[slot].lock);
  159. ai = _async_list_head->ring[slot].lstart;
  160. if(ai!=NULL)
  161. _async_list_head->ring[slot].lstart = ai->next;
  162. lock_release(&_async_list_head->ring[slot].lock);
  163. if(ai==NULL)
  164. break;
  165. if(ai->act!=NULL)
  166. tmb.t_continue(ai->tindex, ai->tlabel, ai->act);
  167. shm_free(ai);
  168. }
  169. }
  170. /**
  171. *
  172. */
  173. void async_exec_task(void *param)
  174. {
  175. cfg_action_t *act;
  176. unsigned int *p;
  177. unsigned int tindex;
  178. unsigned int tlabel;
  179. act = *((cfg_action_t**)param);
  180. p = (unsigned int*)((char*)param + sizeof(cfg_action_t*));
  181. tindex = p[0];
  182. tlabel = p[1];
  183. if(act!=NULL)
  184. tmb.t_continue(tindex, tlabel, act);
  185. /* param is freed along with the async task strucutre in core */
  186. }
  187. /**
  188. *
  189. */
  190. int async_send_task(sip_msg_t* msg, cfg_action_t *act)
  191. {
  192. async_task_t *at;
  193. tm_cell_t *t = 0;
  194. unsigned int tindex;
  195. unsigned int tlabel;
  196. int dsize;
  197. unsigned int *p;
  198. t = tmb.t_gett();
  199. if (t==NULL || t==T_UNDEFINED)
  200. {
  201. if(tmb.t_newtran(msg)<0)
  202. {
  203. LM_ERR("cannot create the transaction\n");
  204. return -1;
  205. }
  206. t = tmb.t_gett();
  207. if (t==NULL || t==T_UNDEFINED)
  208. {
  209. LM_ERR("cannot lookup the transaction\n");
  210. return -1;
  211. }
  212. }
  213. dsize = sizeof(async_task_t) + sizeof(cfg_action_t*)
  214. + 2*sizeof(unsigned int);
  215. at = (async_task_t*)shm_malloc(dsize);
  216. if(at==NULL)
  217. {
  218. LM_ERR("no more shm\n");
  219. return -1;
  220. }
  221. memset(at, 0, dsize);
  222. if(tmb.t_suspend(msg, &tindex, &tlabel)<0)
  223. {
  224. LM_ERR("failed to suppend the processing\n");
  225. shm_free(at);
  226. return -1;
  227. }
  228. at->exec = async_exec_task;
  229. at->param = (char*)at + sizeof(async_task_t);
  230. *((cfg_action_t**)at->param) = act;
  231. p = (unsigned int*)((char*)at->param + sizeof(cfg_action_t*));
  232. p[0] = tindex;
  233. p[1] = tlabel;
  234. async_task_push(at);
  235. return 0;
  236. }