#include "rls_data.h" #include "rls_auth.h" #include #include rls_data_t *rls = NULL; static gen_lock_t *rls_mutex = NULL; static int send_notify_cb(struct _subscription_data_t *s) { if (s) rls_generate_notify((rl_subscription_t *)s->usr_data, 1); return 0; } static int terminate_subscription_cb(struct _subscription_data_t *s) { if (s) { TRACE("destroying RLS subscription %p using timer\n", s); rls_remove((rl_subscription_t*)s->usr_data); } return 0; } static void do_external_notifications() { subscription_data_t *s = NULL; rl_subscription_t *rs; int notified = 0; if (rls_manager) s = rls_manager->first; /* this goes through all EXTERNAL subscriptions only !!! * but internal subscriptions are notified immediately, thus this is what * we want */ /* there can be some logic to handle at most xxx subscriptions ... */ while (s) { rs = (rl_subscription_t*)(s->usr_data); if (rs->changed) { rls_generate_notify(rs, 0); rls->changed_subscriptions -= rs->changed; if (rls->changed_subscriptions <= 0) break; if (++notified >= max_notifications_at_once) { break; } } s = s->next; } if (rls->changed_subscriptions < 0) { ERR("BUG: changed_subscriptions = %d\n", rls->changed_subscriptions); rls->changed_subscriptions = 0; } } static void rls_timer_cb(unsigned int ticks, void *param) { virtual_subscription_t *vs; int cnt = 0; time_t start, stop; mq_message_t *msg; client_notify_info_t *info; PROF_START(rls_timer_cb) start = time(NULL); rls_lock(); /* process all messages for virtual subscriptions */ while (!is_msg_queue_empty(&rls->notify_mq)) { msg = pop_message(&rls->notify_mq); if (!msg) continue; info = (client_notify_info_t *)msg->data; if (info) { vs = (virtual_subscription_t *)get_subscriber_data(info->subscription); if (!vs) { ERR("BUG: empty QSA subscription parameter (vs)\n"); } process_rls_notification(vs, info); cnt++; } free_message(msg); } if (rls->changed_subscriptions > 0) { do_external_notifications(); /* rls->changed_subscriptions is reset (or decremented) from * do_external_notifications() */ } rls_unlock(); stop = time(NULL); if (stop - start > 1) WARN("rls_timer_cb took %d secs\n", (int) (stop - start)); PROF_STOP(rls_timer_cb) } void rls_lock() { /* FIXME: solve locking more efficiently - locking whole RLS in * all cases of manipulating internal structures is not good * solution */ lock_get(rls_mutex); } void rls_unlock() { lock_release(rls_mutex); } int rls_init() { rls = (rls_data_t*)mem_alloc(sizeof(rls_data_t)); if (!rls) { LOG(L_ERR, "rls_init(): memory allocation error\n"); return -1; } /* rls->first = NULL; rls->last = NULL;*/ rls->changed_subscriptions = 0; if (msg_queue_init(&rls->notify_mq) != 0) { ERR("can't initialize message queue for RLS notifications!\n"); return -1; } rls_mutex = lock_alloc(); if (!rls_mutex) { LOG(L_ERR, "rls_init(): Can't initialize mutex\n"); return -1; } lock_init(rls_mutex); rls_manager = sm_create(send_notify_cb, terminate_subscription_cb, rls_authorize_subscription, rls_mutex, rls_min_expiration, /* min expiration time in seconds */ rls_max_expiration, /* max expiration time in seconds */ rls_default_expiration, /* default expiration time in seconds */ rls_expiration_timer_period); /* register timer for handling notify messages */ if (register_timer(rls_timer_cb, NULL, rls_timer_interval) < 0) { LOG(L_ERR, "vs_init(): can't register timer\n"); return -1; } return 0; } int rls_destroy() { DEBUG_LOG("rls_destroy() called\n"); /* FIXME: destroy the whole rl_subscription list */ /* sm_destroy(rls_manager); */ if (rls_mutex) { lock_destroy(rls_mutex); lock_dealloc(rls_mutex); } if (rls) { mem_free(rls); rls = NULL; } return 0; } /* static int process_rls_messages() { int cnt = 0; client_notify_info_t *info; mq_message_t *msg; while (!is_msg_queue_empty(&rls->notify_mq)) { msg = pop_message(&rls->notify_mq); if (!msg) continue; info = (client_notify_info_t *)msg->data; if (info) { process_notify_info(vs, info); cnt++; } free_message(msg); } return cnt; }*/ void destroy_notifications(qsa_subscription_t *s) { /* removes all notifications for given qsa_subscription from message queue * and discards them */ int cnt = 0; int other_cnt = 0; mq_message_t *msg; msg_queue_t tmp; client_notify_info_t *info; msg_queue_init(&tmp); /* process all messages for virtual subscriptions */ while (!is_msg_queue_empty(&rls->notify_mq)) { msg = pop_message(&rls->notify_mq); if (!msg) continue; info = (client_notify_info_t *)msg->data; if (info) { if (s == info->subscription) { cnt++; free_message(msg); } else { push_message(&tmp, msg); other_cnt++; } } else free_message(msg); /* broken message */ } /* move messages back to main queue */ while (!is_msg_queue_empty(&tmp)) { msg = pop_message(&tmp); push_message(&rls->notify_mq, msg); } }