Sfoglia il codice sorgente

modules/ims_registrar_scscf: Created a new process to send notifies

Richard Good 11 anni fa
parent
commit
01cf2d8617

+ 6 - 0
modules/ims_registrar_scscf/Makefile

@@ -27,4 +27,10 @@ DEFS+=-DOPENSER_MOD_INTERFACE
 SERLIBPATH=../../lib
 SER_LIBS+=$(SERLIBPATH)/kcore/kcore
 SER_LIBS+=$(SERLIBPATH)/ims/kamailio_ims
+
+ifneq ($(OS),darwin)
+	LIBS += -lrt
+	LIBS += -lpthread
+endif
+
 include ../../Makefile.modules

+ 16 - 0
modules/ims_registrar_scscf/reg_mod.c

@@ -57,6 +57,7 @@
 #include "../../lib/kcore/statistics.h"
 #include "../../modules/sl/sl.h"
 #include "../../mod_fix.h"
+#include "../../cfg/cfg_struct.h"
 
 #include "save.h"
 #include "api.h"
@@ -471,6 +472,21 @@ static int mod_init(void) {
 }
 
 static int child_init(int rank) {
+    LM_DBG("Initialization of module in child [%d] \n", rank);
+    int pid;
+    
+    if (rank == PROC_MAIN) {
+        pid = fork_process(PROC_SIPINIT, "sip_notification_event_process", 1);
+        if (pid < 0)
+            return -1; //error
+        if (pid == 0) {
+            if (cfg_child_init())
+                return -1; //error
+            notification_event_process();
+        }
+    }
+    
+    
     if (rank == PROC_MAIN || rank == PROC_TCP_MAIN)
         return 0;
     if (rank == 1) {

+ 94 - 71
modules/ims_registrar_scscf/registrar_notify.c

@@ -94,11 +94,18 @@ extern int ue_unsubscribe_on_dereg;
 
 int notify_init() {
     notification_list = shm_malloc(sizeof (reg_notification_list));
-    if (!notification_list) return 0;
+    if (!notification_list) {
+	LM_ERR("No more SHM mem\n");
+	return 0; 
+    }
     memset(notification_list, 0, sizeof (reg_notification_list));
     notification_list->lock = lock_alloc();
-    if (!notification_list->lock) return 0;
+    if (!notification_list->lock)  {
+	LM_ERR("failed to create cdp event list lock\n");
+	return 0;
+    }
     notification_list->lock = lock_init(notification_list->lock);
+    sem_new(notification_list->empty, 0); //pre-locked - as we assume list is empty at start
     return 1;
 }
 
@@ -214,9 +221,9 @@ int can_publish_reg(struct sip_msg *msg, char *_t, char *str2) {
 
 	while (c) {
 	    if (c->path.len) {
-		for (i = 0; i < c->path.len - asserted_id.len; i++)
-		    LM_DBG("Path: <%.*s>.\n",
+		LM_DBG("Path: <%.*s>.\n",
 		    c->path.len, c->path.s);
+		for (i = 0; i < c->path.len - (asserted_id.len-4); i++)
 		    //we compare the asserted_id without "sip:" to the path 
 		    if (strncasecmp(c->path.s + i, asserted_id.s+4, asserted_id.len-4) == 0) {
 			LM_DBG("Identity found in Path <%.*s>\n",
@@ -229,7 +236,7 @@ int can_publish_reg(struct sip_msg *msg, char *_t, char *str2) {
 	    c = c->next;
 	}
 	LM_DBG("Did not find p-asserted-identity <%.*s> on Path\n", asserted_id.len, asserted_id.s);
-
+	
 	ul.unlock_udomain((udomain_t*) _t, &presentity_uri);
 	LM_DBG("Publish forbidden\n");
     
@@ -1031,6 +1038,9 @@ int subscribe_to_reg(struct sip_msg *msg, char *_t, char *str2) {
     subscriber_data.record_route = &record_route;
     subscriber_data.sockinfo_str = &sockinfo_str;
     subscriber_data.local_cseq = local_cseq;
+    subscriber_data.watcher_uri = &watcher_impu;
+    subscriber_data.watcher_contact = &watcher_contact;
+    subscriber_data.version = 1; /*default version starts at 1*/
 
     if (expires > 0) {
         LM_DBG("expires is more than zero - SUBSCRIBE");
@@ -1059,11 +1069,12 @@ int subscribe_to_reg(struct sip_msg *msg, char *_t, char *str2) {
         }
 
         LM_DBG("Received impurecord for presentity being subscribed to [%.*s]\n", presentity_impurecord->public_identity.len, presentity_impurecord->public_identity.s);
-
+	
         res = ul.get_subscriber(presentity_impurecord, &presentity_uri, &watcher_contact, event_i, &reg_subscriber);
 	if (res != 0) {
             LM_DBG("this must be a new subscriber, lets add it\n");
-            res = ul.add_subscriber(presentity_impurecord, &watcher_impu, &watcher_contact, &subscriber_data, &reg_subscriber);
+	    subscriber_data.presentity_uri = &presentity_impurecord->public_identity;
+            res = ul.add_subscriber(presentity_impurecord, &subscriber_data, &reg_subscriber, 0 /*not a db_load*/);
             if (res != 0) {
                 LM_ERR("Failed to add new subscription\n");
                 ul.unlock_udomain(domain, &presentity_uri);
@@ -1077,7 +1088,7 @@ int subscribe_to_reg(struct sip_msg *msg, char *_t, char *str2) {
         } else {
 
             LM_DBG("this must be a re subscribe, lets update it\n");
-            res = ul.update_subscriber(presentity_impurecord, &watcher_impu, &watcher_contact, &expires_time, &reg_subscriber);
+	    res = ul.update_subscriber(presentity_impurecord, &reg_subscriber, &expires_time, 0, 0);
             if (res != 1) {
                 LM_ERR("Failed to update subscription - expires is %d\n", expires_time);
                 ul.unlock_udomain(domain, &presentity_uri);
@@ -1096,11 +1107,11 @@ int subscribe_to_reg(struct sip_msg *msg, char *_t, char *str2) {
         //only do reg event on new subscriptions
         if (new_subscription) {
             if (event_reg(domain, 0, 0, event_type, &presentity_uri, &watcher_contact) != 0) {
-                LM_ERR("failed to send NOTIFYs for reg events\n");
+                LM_ERR("failed adding notification for reg events\n");
                 ret = CSCF_RETURN_ERROR;
                 goto error;
             } else {
-                LM_DBG("success sending NOTIFY\n");
+                LM_DBG("success adding notification for reg events\n");
             }
         }
     } else {
@@ -1265,6 +1276,8 @@ void create_notifications(udomain_t* _t, impurecord_t* r_passed, ucontact_t* c_p
     reg_notification *n;
     reg_subscriber *s;
     impurecord_t* r;
+    int local_cseq = 0;
+    int version = 0;
 
     str subscription_state = {"active;expires=10000000000", 26},
     content_type = {"application/reginfo+xml", 23};
@@ -1322,18 +1335,17 @@ void create_notifications(udomain_t* _t, impurecord_t* r_passed, ucontact_t* c_p
                     (presentity_uri->len == s->presentity_uri.len) && (memcmp(s->presentity_uri.s, presentity_uri->s, presentity_uri->len) == 0)) {
                 LM_DBG("This is a fix to ensure that we only send full reg info XML to the UE that just subscribed");
                 LM_DBG("about to make new notification!");
-                n = new_notification(subscription_state, content_type, content,
-                        s->version++, s);
+		
+		LM_DBG("we always increment the local cseq and version before we send a new notification\n");
+		
+		local_cseq = s->local_cseq + 1;
+		version = s->version + 1;
+		ul.update_subscriber(r, &s, 0, &local_cseq, &version);
+		
+                n = new_notification(subscription_state, content_type, content, s);
                 if (n) {
-                    //LM_DBG("Notification exists - about to add it");
-                    //add_notification(n);
-
-                    //Richard just gonna send it - not bother queueing etc.
-                    //TODO look at impact of this - sending straight away vs queueing and getting another process to send
-                    LM_DBG("About to send notification");
-                    send_notification(n);
-                    LM_DBG("About to free notification");
-                    free_notification(n);
+                    LM_DBG("Notification exists - about to add it");
+                    add_notification(n);
                 } else {
                     LM_DBG("Notification does not exist");
                 }
@@ -1349,24 +1361,23 @@ void create_notifications(udomain_t* _t, impurecord_t* r_passed, ucontact_t* c_p
 	    }
 	    else{
 		LM_DBG("about to make new notification!");
-		n = new_notification(subscription_state, content_type, content,
-			s->version++, s);
+		
+		LM_DBG("we always increment the local cseq and version before we send a new notification\n");
+		
+		local_cseq = s->local_cseq + 1;
+		version = s->version + 1;
+		ul.update_subscriber(r, &s, 0, &local_cseq, &version);
+		
+		n = new_notification(subscription_state, content_type, content, s);
 		if (n) {
-		    //LM_DBG("Notification exists - about to add it");
-		    //add_notification(n);
-
-		    //Richard just gonna send it - not bother queueing etc.
-		    //TODO look at impact of this - sending straight away vs queueing and getting another process to send
-		    LM_DBG("About to send notification");
-		    send_notification(n);
-		    LM_DBG("About to free notification");
-		    free_notification(n);
+		    LM_DBG("Notification exists - about to add it");
+		    add_notification(n);
+
 		} else {
 		    LM_DBG("Notification does not exist");
 		}
 	    }
 	}
-        //}
         s = s->next;
 
         if (subscription_state.s) {
@@ -1481,7 +1492,7 @@ str generate_reginfo_full(udomain_t* _t, str* impu_list, int num_impus) {
             STR_APPEND(buf, uri_s);
 
             LM_DBG("Appending contact address: <%.*s>", c->c.len, c->c.s);
-
+	    
             STR_APPEND(buf, (c->c));
             STR_APPEND(buf, uri_e);
 
@@ -1724,41 +1735,6 @@ void send_notification(reg_notification * n) {
 
 }
 
-/**
- * The Notification timer looks for unsent notifications and sends them.
- *  - because not all events should wait until the notifications for them are sent
- * @param ticks - the current time
- * @param param - pointer to the domain_list
- */
-void notification_timer(unsigned int ticks, void* param) {
-
-    LM_DBG("Running notification timer");
-
-    reg_notification *n = 0;
-    LM_DBG("Getting lock of notification list");
-    lock_get(notification_list->lock);
-    LM_DBG("Scrolling through list");
-    while (notification_list->head) {
-        n = notification_list->head;
-        LM_DBG("Taking notification out of list with watcher uri <%.*s> and presentity uri <%.*s>", n->watcher_uri.len, n->watcher_uri.s, n->presentity_uri.len, n->presentity_uri.s);
-        notification_list->head = n->next;
-        if (n->next) n->next->prev = 0;
-        else notification_list->tail = n->next;
-
-        LM_DBG("Releasing lock");
-        lock_release(notification_list->lock);
-
-        LM_DBG("About to send notification");
-        send_notification(n);
-        LM_DBG("About to free notification");
-        free_notification(n);
-        LM_DBG("Getting lock of notification list again");
-        lock_get(notification_list->lock);
-    }
-    LM_DBG("Releasing lock again");
-    lock_release(notification_list->lock);
-}
-
 /**
  * Creates a notification based on the given parameters
  * @param req_uri - the Request-URI for the NOTIFY
@@ -1771,14 +1747,14 @@ void notification_timer(unsigned int ticks, void* param) {
  * @returns the r_notification or NULL on error
  */
 reg_notification * new_notification(str subscription_state,
-        str content_type, str content, int version, reg_subscriber * r) {
+        str content_type, str content, reg_subscriber * r) {
 
     reg_notification *n = 0;
 
     str buf;
     char bufc[MAX_REGINFO_SIZE];
 
-    sprintf(bufc, content.s, version);
+    sprintf(bufc, content.s, r->version);
     buf.s = bufc;
     buf.len = strlen(bufc);
 
@@ -1799,6 +1775,8 @@ reg_notification * new_notification(str subscription_state,
     memset(n, 0, len);
 
     p = (char*) (n + 1);
+    
+    n->local_cseq = r->local_cseq;
 
     n->call_id.s = p;
     n->call_id.len = r->call_id.len;
@@ -1895,9 +1873,54 @@ void add_notification(reg_notification * n) {
     if (notification_list->tail) notification_list->tail->next = n;
     notification_list->tail = n;
     if (!notification_list->head) notification_list->head = n;
+    sem_release(notification_list->empty);
+    lock_release(notification_list->lock);
+}
+
+/**
+* Pop a notification to the list of notifications from the top
+*/
+reg_notification* get_notification() {
+    reg_notification * n;
+
+    lock_get(notification_list->lock);
+    while (notification_list->head == 0) {
+        lock_release(notification_list->lock);
+        sem_get(notification_list->empty);
+        lock_get(notification_list->lock);
+    }
+
+    n = notification_list->head;
+    notification_list->head = n->next;
+
+    if (n == notification_list->tail) { //list now empty
+        notification_list->tail = 0;
+    }
+    n->next = 0; //make sure whoever gets this cant access our list
     lock_release(notification_list->lock);
+
+    return n;
 }
 
+/**
+ * This is the main event process for notifications to be sent
+ */
+void notification_event_process() {
+
+    reg_notification *n = 0;
+    
+    LM_DBG("Running notification_event_process");
+    
+    for (;;) {
+        n = get_notification();
+	LM_DBG("About to send notification");
+        send_notification(n);
+        LM_DBG("About to free notification");
+        free_notification(n);
+    }
+}
+
+
 /**
  * Frees up space taken by a notification
  * @param n - the notification to be freed

+ 5 - 2
modules/ims_registrar_scscf/registrar_notify.h

@@ -49,6 +49,7 @@
 
 #include "../ims_usrloc_scscf/usrloc.h"
 #include "../../locking.h"
+#include "sem.h"
 
 
 #define MSG_REG_SUBSCRIBE_OK "Subscription to REG saved"
@@ -86,6 +87,7 @@ typedef struct {
     gen_lock_t *lock; /**< lock for notifications ops		*/
     reg_notification *head; /**< first notification in the list	*/
     reg_notification *tail; /**< last notification in the list	*/
+    gen_sem_t *empty;
 } reg_notification_list;
 
 /** Events for subscriptions */
@@ -128,15 +130,16 @@ str get_reginfo_partial(impurecord_t *r, ucontact_t *c, int event_type);
 
 void create_notifications(udomain_t* _t, impurecord_t* r_passed, ucontact_t* c_passed, str *presentity_uri, str *watcher_contact, str content, int event_type);
 
-void notification_timer(unsigned int ticks, void* param);
+void notification_event_process();
 
 void free_notification(reg_notification *n);
 
 void send_notification(reg_notification * n);
 
 void add_notification(reg_notification *n);
+
 reg_notification* new_notification(str subscription_state,
-        str content_type, str content, int version, reg_subscriber* r);
+        str content_type, str content, reg_subscriber* r);
 
 dlg_t* build_dlg_t_from_notification(reg_notification* n);