rls_data.c 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. #include "rls_data.h"
  2. #include "rls_auth.h"
  3. #include <cds/logger.h>
  4. #include <time.h>
  5. rls_data_t *rls = NULL;
  6. static gen_lock_t *rls_mutex = NULL;
  7. static int send_notify_cb(struct _subscription_data_t *s)
  8. {
  9. if (s)
  10. rls_generate_notify((rl_subscription_t *)s->usr_data, 1);
  11. return 0;
  12. }
  13. static int terminate_subscription_cb(struct _subscription_data_t *s)
  14. {
  15. if (s) {
  16. TRACE("destroying RLS subscription %p using timer\n", s);
  17. rls_remove((rl_subscription_t*)s->usr_data);
  18. }
  19. return 0;
  20. }
  21. static void do_external_notifications()
  22. {
  23. subscription_data_t *s = NULL;
  24. rl_subscription_t *rs;
  25. int notified = 0;
  26. if (rls_manager) s = rls_manager->first;
  27. /* this goes through all EXTERNAL subscriptions only !!!
  28. * but internal subscriptions are notified immediately, thus this is what
  29. * we want */
  30. /* there can be some logic to handle at most xxx subscriptions ... */
  31. while (s) {
  32. rs = (rl_subscription_t*)(s->usr_data);
  33. if (rs->changed) {
  34. rls_generate_notify(rs, 0);
  35. rls->changed_subscriptions -= rs->changed;
  36. if (rls->changed_subscriptions <= 0) break;
  37. if (++notified >= max_notifications_at_once) {
  38. break;
  39. }
  40. }
  41. s = s->next;
  42. }
  43. if (rls->changed_subscriptions < 0) {
  44. ERR("BUG: changed_subscriptions = %d\n", rls->changed_subscriptions);
  45. rls->changed_subscriptions = 0;
  46. }
  47. }
  48. static void rls_timer_cb(unsigned int ticks, void *param)
  49. {
  50. virtual_subscription_t *vs;
  51. int cnt = 0;
  52. time_t start, stop;
  53. mq_message_t *msg;
  54. client_notify_info_t *info;
  55. PROF_START(rls_timer_cb)
  56. start = time(NULL);
  57. rls_lock();
  58. /* process all messages for virtual subscriptions */
  59. while (!is_msg_queue_empty(&rls->notify_mq)) {
  60. msg = pop_message(&rls->notify_mq);
  61. if (!msg) continue;
  62. info = (client_notify_info_t *)msg->data;
  63. if (info) {
  64. vs = (virtual_subscription_t *)get_subscriber_data(info->subscription);
  65. if (!vs) {
  66. ERR("BUG: empty QSA subscription parameter (vs)\n");
  67. }
  68. process_rls_notification(vs, info);
  69. cnt++;
  70. }
  71. free_message(msg);
  72. }
  73. if (rls->changed_subscriptions > 0) {
  74. do_external_notifications();
  75. /* rls->changed_subscriptions is reset (or decremented) from
  76. * do_external_notifications() */
  77. }
  78. rls_unlock();
  79. stop = time(NULL);
  80. if (stop - start > 1) WARN("rls_timer_cb took %d secs\n", (int) (stop - start));
  81. PROF_STOP(rls_timer_cb)
  82. }
  83. void rls_lock()
  84. {
  85. /* FIXME: solve locking more efficiently - locking whole RLS in
  86. * all cases of manipulating internal structures is not good
  87. * solution */
  88. lock_get(rls_mutex);
  89. }
  90. void rls_unlock()
  91. {
  92. lock_release(rls_mutex);
  93. }
  94. int rls_init()
  95. {
  96. rls = (rls_data_t*)mem_alloc(sizeof(rls_data_t));
  97. if (!rls) {
  98. LOG(L_ERR, "rls_init(): memory allocation error\n");
  99. return -1;
  100. }
  101. /* rls->first = NULL;
  102. rls->last = NULL;*/
  103. rls->changed_subscriptions = 0;
  104. if (msg_queue_init(&rls->notify_mq) != 0) {
  105. ERR("can't initialize message queue for RLS notifications!\n");
  106. return -1;
  107. }
  108. rls_mutex = lock_alloc();
  109. if (!rls_mutex) {
  110. LOG(L_ERR, "rls_init(): Can't initialize mutex\n");
  111. return -1;
  112. }
  113. lock_init(rls_mutex);
  114. rls_manager = sm_create(send_notify_cb,
  115. terminate_subscription_cb,
  116. rls_authorize_subscription,
  117. rls_mutex,
  118. rls_min_expiration, /* min expiration time in seconds */
  119. rls_max_expiration, /* max expiration time in seconds */
  120. rls_default_expiration, /* default expiration time in seconds */
  121. rls_expiration_timer_period);
  122. /* register timer for handling notify messages */
  123. if (register_timer(rls_timer_cb, NULL, rls_timer_interval) < 0) {
  124. LOG(L_ERR, "vs_init(): can't register timer\n");
  125. return -1;
  126. }
  127. return 0;
  128. }
  129. int rls_destroy()
  130. {
  131. DEBUG_LOG("rls_destroy() called\n");
  132. /* FIXME: destroy the whole rl_subscription list */
  133. /* sm_destroy(rls_manager); */
  134. if (rls_mutex) {
  135. lock_destroy(rls_mutex);
  136. lock_dealloc(rls_mutex);
  137. }
  138. if (rls) {
  139. mem_free(rls);
  140. rls = NULL;
  141. }
  142. return 0;
  143. }
  144. /*
  145. static int process_rls_messages()
  146. {
  147. int cnt = 0;
  148. client_notify_info_t *info;
  149. mq_message_t *msg;
  150. while (!is_msg_queue_empty(&rls->notify_mq)) {
  151. msg = pop_message(&rls->notify_mq);
  152. if (!msg) continue;
  153. info = (client_notify_info_t *)msg->data;
  154. if (info) {
  155. process_notify_info(vs, info);
  156. cnt++;
  157. }
  158. free_message(msg);
  159. }
  160. return cnt;
  161. }*/
  162. void destroy_notifications(qsa_subscription_t *s)
  163. {
  164. /* removes all notifications for given qsa_subscription from message queue
  165. * and discards them */
  166. int cnt = 0;
  167. int other_cnt = 0;
  168. mq_message_t *msg;
  169. msg_queue_t tmp;
  170. client_notify_info_t *info;
  171. msg_queue_init(&tmp);
  172. /* process all messages for virtual subscriptions */
  173. while (!is_msg_queue_empty(&rls->notify_mq)) {
  174. msg = pop_message(&rls->notify_mq);
  175. if (!msg) continue;
  176. info = (client_notify_info_t *)msg->data;
  177. if (info) {
  178. if (s == info->subscription) {
  179. cnt++;
  180. free_message(msg);
  181. }
  182. else {
  183. push_message(&tmp, msg);
  184. other_cnt++;
  185. }
  186. }
  187. else free_message(msg); /* broken message */
  188. }
  189. /* move messages back to main queue */
  190. while (!is_msg_queue_empty(&tmp)) {
  191. msg = pop_message(&tmp);
  192. push_message(&rls->notify_mq, msg);
  193. }
  194. }