123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- #include "rls_data.h"
- #include "rls_auth.h"
- #include <cds/logger.h>
- #include <time.h>
- rls_data_t *rls = NULL;
- static gen_lock_t *rls_mutex = NULL;
- static int send_notify_cb(struct _subscription_data_t *s)
- {
- if (s)
- rls_generate_notify((rl_subscription_t *)s->usr_data, 1);
- return 0;
- }
- static int terminate_subscription_cb(struct _subscription_data_t *s)
- {
- if (s) {
- TRACE("destroying RLS subscription %p using timer\n", s);
- rls_remove((rl_subscription_t*)s->usr_data);
- }
- return 0;
- }
- static void do_external_notifications()
- {
- subscription_data_t *s = NULL;
- rl_subscription_t *rs;
- int notified = 0;
-
- if (rls_manager) s = rls_manager->first;
- /* this goes through all EXTERNAL subscriptions only !!!
- * but internal subscriptions are notified immediately, thus this is what
- * we want */
-
- /* there can be some logic to handle at most xxx subscriptions ... */
- while (s) {
- rs = (rl_subscription_t*)(s->usr_data);
- if (rs->changed) {
- rls_generate_notify(rs, 0);
- rls->changed_subscriptions -= rs->changed;
- if (rls->changed_subscriptions <= 0) break;
- if (++notified >= max_notifications_at_once) {
- break;
- }
- }
- s = s->next;
- }
-
- if (rls->changed_subscriptions < 0) {
- ERR("BUG: changed_subscriptions = %d\n", rls->changed_subscriptions);
- rls->changed_subscriptions = 0;
- }
- }
- static void rls_timer_cb(unsigned int ticks, void *param)
- {
- virtual_subscription_t *vs;
- int cnt = 0;
- time_t start, stop;
- mq_message_t *msg;
- client_notify_info_t *info;
- PROF_START(rls_timer_cb)
- start = time(NULL);
- rls_lock();
- /* process all messages for virtual subscriptions */
- while (!is_msg_queue_empty(&rls->notify_mq)) {
- msg = pop_message(&rls->notify_mq);
- if (!msg) continue;
- info = (client_notify_info_t *)msg->data;
- if (info) {
- vs = (virtual_subscription_t *)get_subscriber_data(info->subscription);
- if (!vs) {
- ERR("BUG: empty QSA subscription parameter (vs)\n");
- }
- process_rls_notification(vs, info);
- cnt++;
- }
- free_message(msg);
- }
- if (rls->changed_subscriptions > 0) {
- do_external_notifications();
- /* rls->changed_subscriptions is reset (or decremented) from
- * do_external_notifications() */
- }
-
- rls_unlock();
- stop = time(NULL);
- if (stop - start > 1) WARN("rls_timer_cb took %d secs\n", (int) (stop - start));
- PROF_STOP(rls_timer_cb)
- }
- void rls_lock()
- {
- /* FIXME: solve locking more efficiently - locking whole RLS in
- * all cases of manipulating internal structures is not good
- * solution */
- lock_get(rls_mutex);
- }
- void rls_unlock()
- {
- lock_release(rls_mutex);
- }
- int rls_init()
- {
- rls = (rls_data_t*)mem_alloc(sizeof(rls_data_t));
- if (!rls) {
- LOG(L_ERR, "rls_init(): memory allocation error\n");
- return -1;
- }
- /* rls->first = NULL;
- rls->last = NULL;*/
- rls->changed_subscriptions = 0;
- if (msg_queue_init(&rls->notify_mq) != 0) {
- ERR("can't initialize message queue for RLS notifications!\n");
- return -1;
- }
-
- rls_mutex = lock_alloc();
- if (!rls_mutex) {
- LOG(L_ERR, "rls_init(): Can't initialize mutex\n");
- return -1;
- }
- lock_init(rls_mutex);
- rls_manager = sm_create(send_notify_cb,
- terminate_subscription_cb,
- rls_authorize_subscription,
- rls_mutex,
- rls_min_expiration, /* min expiration time in seconds */
- rls_max_expiration, /* max expiration time in seconds */
- rls_default_expiration, /* default expiration time in seconds */
- rls_expiration_timer_period);
-
- /* register timer for handling notify messages */
- if (register_timer(rls_timer_cb, NULL, rls_timer_interval) < 0) {
- LOG(L_ERR, "vs_init(): can't register timer\n");
- return -1;
- }
-
- return 0;
- }
- int rls_destroy()
- {
- DEBUG_LOG("rls_destroy() called\n");
- /* FIXME: destroy the whole rl_subscription list */
- /* sm_destroy(rls_manager); */
- if (rls_mutex) {
- lock_destroy(rls_mutex);
- lock_dealloc(rls_mutex);
- }
- if (rls) {
- mem_free(rls);
- rls = NULL;
- }
- return 0;
- }
- /*
- static int process_rls_messages()
- {
- int cnt = 0;
- client_notify_info_t *info;
- mq_message_t *msg;
- while (!is_msg_queue_empty(&rls->notify_mq)) {
- msg = pop_message(&rls->notify_mq);
- if (!msg) continue;
- info = (client_notify_info_t *)msg->data;
- if (info) {
- process_notify_info(vs, info);
- cnt++;
- }
- free_message(msg);
- }
- return cnt;
- }*/
- void destroy_notifications(qsa_subscription_t *s)
- {
- /* removes all notifications for given qsa_subscription from message queue
- * and discards them */
- int cnt = 0;
- int other_cnt = 0;
- mq_message_t *msg;
- msg_queue_t tmp;
- client_notify_info_t *info;
- msg_queue_init(&tmp);
-
- /* process all messages for virtual subscriptions */
- while (!is_msg_queue_empty(&rls->notify_mq)) {
- msg = pop_message(&rls->notify_mq);
- if (!msg) continue;
- info = (client_notify_info_t *)msg->data;
- if (info) {
- if (s == info->subscription) {
- cnt++;
- free_message(msg);
- }
- else {
- push_message(&tmp, msg);
- other_cnt++;
- }
- }
- else free_message(msg); /* broken message */
- }
- /* move messages back to main queue */
- while (!is_msg_queue_empty(&tmp)) {
- msg = pop_message(&tmp);
- push_message(&rls->notify_mq, msg);
- }
- }
|