Ver Fonte

modules_k/pua, modules_k/rls: Updated rls_update_subs() to remove back-end SUBSCRIBE dialogs when a contact is removed from a resource list

- rls_update_subs() should be called in kamailio.cfg whenever a resource list
  changes. The initial version just created new back-end SUBSCRIBE dialogs for
  new contacts. This meant that dialogs for removed contacts would remain until
  the dialog expired by itself - which means that NOTIFYs for presence changes
  to contacts that have been removed will continue to be sent (and with some
  clients displayed) for a while after the contact was deleted.
- Working out when a contact has been deleted is a bit more tricky than working
  out which ones are new. The mechanism used is to build two insert-sorted
  lists. List one contains the contacts in the contact list, list two contains
  the active back-end dialogs. You can then pop the top item from list one and
  search and remove it from list two (it should be near the top of list two as
  they are both sorted). Once you have been through list one the remaining URIs
  in list two are those that have been deleted from the contact list.
Peter Dunkley há 13 anos atrás
pai
commit
5c5e969961

+ 48 - 0
modules_k/pua/hash.c

@@ -648,3 +648,51 @@ error:
 	return -1;
 	return -1;
 }
 }
 
 
+list_entry_t *get_subs_list(str *did)
+{
+	int i;
+	str *tmp_str;
+	list_entry_t *list = NULL;
+
+	if (dbmode==PUA_DB_ONLY)
+		return get_subs_list_puadb(did);
+
+	for (i = 0; i < HASH_SIZE; i++)
+	{
+		ua_pres_t *dialog;
+
+		lock_get(&HashT->p_records[i].lock);
+		dialog = HashT->p_records[i].entity;
+		while (dialog != NULL)
+		{
+			if (dialog->id.s != NULL && dialog->id.len > 0 &&
+				strncmp(dialog->id.s, did->s, did->len) == 0 &&
+				dialog->pres_uri != NULL && dialog->pres_uri->s != NULL &&
+				dialog->pres_uri->len > 0)
+			{
+				if ((tmp_str = (str *)pkg_malloc(sizeof(str))) == NULL)
+				{
+					LM_ERR("out of private memory\n");
+					lock_release(&HashT->p_records[i].lock);
+					goto done;
+				}
+				if ((tmp_str->s = (char *)pkg_malloc(sizeof(char) * dialog->pres_uri->len + 1)) == NULL)
+				{
+					pkg_free(tmp_str);
+					LM_ERR("out of private memory\n");
+					lock_release(&HashT->p_records[i].lock);
+					goto done;
+				}
+				memcpy(tmp_str->s, dialog->pres_uri->s, dialog->pres_uri->len);
+				tmp_str->len = dialog->pres_uri->len;
+				tmp_str->s[tmp_str->len] = '\0';
+
+				list = list_insert(tmp_str, list);
+			}
+			dialog = dialog->next;
+		}
+		lock_release(&HashT->p_records[i].lock);
+	}
+done:
+	return list;
+}

+ 4 - 0
modules_k/pua/hash.h

@@ -33,6 +33,7 @@
 #include "../../lock_ops.h"
 #include "../../lock_ops.h"
 #include "../../dprint.h"
 #include "../../dprint.h"
 #include "../../parser/msg_parser.h"
 #include "../../parser/msg_parser.h"
+#include "../rls/list.h"
 
 
 #define PRESENCE_EVENT      1<<0
 #define PRESENCE_EVENT      1<<0
 #define PWINFO_EVENT        1<<1
 #define PWINFO_EVENT        1<<1
@@ -132,6 +133,9 @@ int convert_temporary_dialog(ua_pres_t *dialog);
 int get_record_id(ua_pres_t* dialog, str** rec_id);
 int get_record_id(ua_pres_t* dialog, str** rec_id);
 typedef int (*get_record_id_t)(ua_pres_t* dialog, str** rec_id);
 typedef int (*get_record_id_t)(ua_pres_t* dialog, str** rec_id);
 
 
+list_entry_t *get_subs_list(str *did);
+typedef list_entry_t * (*get_subs_list_t)(str *did);
+
 /* for degug */
 /* for degug */
 void print_ua_pres(ua_pres_t* p);
 void print_ua_pres(ua_pres_t* p);
 
 

+ 1 - 0
modules_k/pua/pua_bind.c

@@ -40,6 +40,7 @@ int bind_pua(pua_api_t* api)
 	api->is_dialog      =  is_dialog;
 	api->is_dialog      =  is_dialog;
 	api->get_record_id  =  get_record_id;
 	api->get_record_id  =  get_record_id;
 	api->add_event      =  add_pua_event;
 	api->add_event      =  add_pua_event;
+	api->get_subs_list  =  get_subs_list;
 
 
 	return 0;
 	return 0;
 }
 }

+ 1 - 0
modules_k/pua/pua_bind.h

@@ -37,6 +37,7 @@ typedef struct pua_api {
 	query_dialog_t is_dialog;
 	query_dialog_t is_dialog;
 	get_record_id_t get_record_id;
 	get_record_id_t get_record_id;
 	add_pua_event_t add_event;
 	add_pua_event_t add_event;
+	get_subs_list_t get_subs_list;
 } pua_api_t;
 } pua_api_t;
 
 
 typedef int (*bind_pua_t)(pua_api_t* api);
 typedef int (*bind_pua_t)(pua_api_t* api);

+ 75 - 0
modules_k/pua/pua_db.c

@@ -1567,3 +1567,78 @@ int insert_puadb(ua_pres_t* pres)
 
 
 /******************************************************************************/
 /******************************************************************************/
 
 
+list_entry_t *get_subs_list_puadb(str *did)
+{
+	list_entry_t *list = NULL;
+	db_key_t q_cols[1], res_cols[1];
+	db1_res_t *res= NULL;
+	db_val_t q_vals[1];
+	int n_query_cols= 0, n_res_cols = 0;
+
+	/* cols and values used for search query */
+	q_cols[n_query_cols] = &str_pres_id_col;
+	q_vals[n_query_cols].type = DB1_STR;
+	q_vals[n_query_cols].nul = 0;
+	q_vals[n_query_cols].val.str_val.s = did->s;
+	q_vals[n_query_cols].val.str_val.len = did->len;
+	n_query_cols++;
+
+	res_cols[n_res_cols] = &str_pres_uri_col;
+	n_res_cols++;
+
+	if(pua_db == NULL)
+	{
+		LM_ERR("null database connection\n");
+		return list;
+	}
+
+	if(db_fetch_query(&pua_dbf, pua_fetch_rows, pua_db, q_cols, 0,
+				q_vals, res_cols, n_query_cols, n_res_cols, 0, &res) < 0)
+	{
+		LM_ERR("DB query error\n");
+		return list;
+	}
+
+	if (RES_ROW_N(res) == 0)
+	{
+		LM_INFO( "No records found\n");
+		pua_dbf.free_result(pua_db, res);
+		return list;
+	}
+
+	do {
+		int i, nr_rows;
+		db_row_t *rows;
+		nr_rows = RES_ROW_N(res);
+		rows = RES_ROWS(res);
+
+		for (i=0; i < nr_rows; i++)
+		{
+			str strng, *tmp_str;
+			strng.s = (char *) VAL_STRING(ROW_VALUES(rows+i));
+			strng.len = strlen(VAL_STRING(ROW_VALUES(rows+i)));
+
+			if ((tmp_str = (str *)pkg_malloc(sizeof(str))) == NULL)
+			{
+				LM_ERR("out of private memory\n");
+				pua_dbf.free_result(pua_db, res);
+				return list;
+			}
+			if ((tmp_str->s = (char *)pkg_malloc(sizeof(char) * strng.len + 1)) == NULL)
+			{
+				pkg_free(tmp_str);
+				LM_ERR("out of private memory\n");
+				pua_dbf.free_result(pua_db, res);
+				return list;
+			}
+			memcpy(tmp_str->s, strng.s, strng.len);
+			tmp_str->len = strng.len;
+			tmp_str->s[tmp_str->len] = '\0';
+
+			list = list_insert(tmp_str, list);
+		}
+	} while ((db_fetch_next(&pua_dbf, pua_fetch_rows, pua_db, &res)==1)
+			&& (RES_ROWS(res)>0));
+
+	return list;
+}

+ 2 - 0
modules_k/pua/pua_db.h

@@ -26,6 +26,7 @@
 #define PUA_DB_H
 #define PUA_DB_H
 
 
 #include "../../lib/srdb1/db.h"
 #include "../../lib/srdb1/db.h"
+#include "../rls/list.h"
 
 
 #define PUA_PRES_URI (1<<0) 
 #define PUA_PRES_URI (1<<0) 
 #define PUA_PRES_ID (1<<1)
 #define PUA_PRES_ID (1<<1)
@@ -59,6 +60,7 @@ int update_contact_puadb(ua_pres_t *pres, str *contact);
 void update_puadb(ua_pres_t* pres, time_t desired_expires, 
 void update_puadb(ua_pres_t* pres, time_t desired_expires, 
                        int expires, str* etag, str *contact);
                        int expires, str* etag, str *contact);
 int insert_puadb(ua_pres_t* pres);
 int insert_puadb(ua_pres_t* pres);
+list_entry_t *get_subs_list_puadb(str *did);
 
 
 
 
 #endif
 #endif

+ 131 - 0
modules_k/rls/list.h

@@ -0,0 +1,131 @@
+#ifndef _LIST_H
+#define _LIST_H
+
+#include <string.h>
+#include "../../dprint.h"
+#include "../../mem/mem.h"
+
+typedef struct list_entry
+{
+	str *strng;
+	struct list_entry *next;
+} list_entry_t;
+
+static inline list_entry_t *list_insert(str *strng, list_entry_t *list)
+{
+	int cmp;
+	list_entry_t *p, *q;
+
+	if (strng == NULL || strng->s == NULL || strng->len == 0)
+	{
+		LM_ERR("bad string\n");
+		return list;
+	}
+
+	if ((p = (list_entry_t *) pkg_malloc(sizeof(list_entry_t))) == NULL)
+	{
+		LM_ERR("out of memory\n");
+		return list;
+	}
+	p->strng = strng;
+	p->next = NULL;
+
+	if (list == NULL)
+		return p;
+
+	cmp = strncmp(list->strng->s, strng->s, strng->len);
+
+	if (cmp == 0)
+		return list;
+	if (cmp > 0)
+	{
+		p->next = list;
+		return p;
+	}
+	else
+	{
+		q = list;
+		while (q->next != NULL && (cmp = strncmp(q->next->strng->s, strng->s, strng->len)) < 0)
+			q = q->next;
+
+		if (cmp == 0)
+			return list;
+
+		p->next = q->next;
+		q->next = p;
+		return list;
+	}
+}
+
+static inline list_entry_t *list_remove(str strng, list_entry_t *list)
+{
+	int cmp = 0;
+	list_entry_t *p = list;
+
+	if (list != NULL)
+	{
+		if (strncmp(p->strng->s, strng.s, strng.len) == 0)
+		{
+			pkg_free(p->strng->s);
+			pkg_free(p->strng);
+			pkg_free(p);
+			return list->next;
+		}
+		else
+		{
+			list_entry_t *p = list, *q;
+
+			while (p->next != NULL && (cmp = strncmp(p->next->strng->s, strng.s, strng.len)) < 0)
+				p = p->next;
+
+			if (cmp == 0)
+			{
+				q = p->next;
+				p->next = q->next;
+				pkg_free(q->strng->s);
+				pkg_free(q->strng);
+				pkg_free(q);
+			}
+		}
+	}
+	return list;
+}
+
+static inline str *list_pop(list_entry_t **list)
+{
+	str *ret = NULL;
+	list_entry_t *tmp;
+
+	if (*list != NULL)
+	{
+		ret = (*list)->strng;
+
+		if ((*list)->next == NULL)
+		{
+			pkg_free(*list);
+			*list = NULL;
+		}
+		else
+		{
+			tmp = *list;
+			*list = (*list)->next;
+			pkg_free(tmp);
+		}
+	}
+
+	return ret;
+}
+
+static inline void list_free(list_entry_t **list)
+{
+	str *strng;
+
+	while ((strng = list_pop(list)) != NULL)
+	{
+		pkg_free(strng->s);
+		pkg_free(strng);
+	}
+	*list = NULL;
+}
+
+#endif /* _LIST_H */

+ 8 - 0
modules_k/rls/rls.c

@@ -140,6 +140,7 @@ xcapGetNewDoc_t xcap_GetNewDoc = 0;
 /* functions imported from pua module*/
 /* functions imported from pua module*/
 send_subscribe_t pua_send_subscribe;
 send_subscribe_t pua_send_subscribe;
 get_record_id_t pua_get_record_id;
 get_record_id_t pua_get_record_id;
+get_subs_list_t pua_get_subs_list;
 
 
 /* TM bind */
 /* TM bind */
 struct tm_binds tmb;
 struct tm_binds tmb;
@@ -585,6 +586,13 @@ static int mod_init(void)
 	}
 	}
 	pua_get_record_id= pua.get_record_id;
 	pua_get_record_id= pua.get_record_id;
 
 
+	if(pua.get_subs_list == NULL)
+	{
+		LM_ERR("Could not import get_subs_list\n");
+		return -1;
+	}
+	pua_get_subs_list= pua.get_subs_list;
+
 	if(!rls_integrated_xcap_server)
 	if(!rls_integrated_xcap_server)
 	{
 	{
 		/* bind xcap */
 		/* bind xcap */

+ 1 - 0
modules_k/rls/rls.h

@@ -138,6 +138,7 @@ extern extract_sdialog_info_t pres_extract_sdialog_info;
 /* functions imported from pua module*/
 /* functions imported from pua module*/
 extern send_subscribe_t pua_send_subscribe;
 extern send_subscribe_t pua_send_subscribe;
 extern get_record_id_t pua_get_record_id;
 extern get_record_id_t pua_get_record_id;
+extern get_subs_list_t pua_get_subs_list;
 
 
 /* functions imported from presence module */
 /* functions imported from presence module */
 extern contains_event_t pres_contains_event;
 extern contains_event_t pres_contains_event;

+ 67 - 0
modules_k/rls/subscribe.c

@@ -50,6 +50,7 @@
 #include "notify.h"
 #include "notify.h"
 #include "rls.h"
 #include "rls.h"
 #include "../../mod_fix.h"
 #include "../../mod_fix.h"
+#include "list.h"
 
 
 int counter= 0;
 int counter= 0;
 
 
@@ -68,6 +69,9 @@ int resource_subscriptions(subs_t* subs, xmlNodePtr rl_node);
 int update_rlsubs( subs_t* subs,unsigned int hash_code);
 int update_rlsubs( subs_t* subs,unsigned int hash_code);
 int remove_expired_rlsubs( subs_t* subs,unsigned int hash_code);
 int remove_expired_rlsubs( subs_t* subs,unsigned int hash_code);
 
 
+list_entry_t *rls_contact_list = NULL;
+list_entry_t *rls_subs_list = NULL;
+
 /**
 /**
  * return the XML node for rls-services matching uri
  * return the XML node for rls-services matching uri
  */
  */
@@ -871,6 +875,28 @@ int send_resource_subs(char* uri, void* param)
 
 
 	((subs_info_t*)param)->pres_uri = &pres_uri;
 	((subs_info_t*)param)->pres_uri = &pres_uri;
 	((subs_info_t*)param)->remote_target = &pres_uri;
 	((subs_info_t*)param)->remote_target = &pres_uri;
+
+	if (((subs_info_t*)param)->internal_update_flag == INTERNAL_UPDATE_TRUE)
+	{
+		str *tmp_str;
+
+		if ((tmp_str = (str *)pkg_malloc(sizeof(str))) == NULL)
+		{
+			LM_ERR("out of private memory\n");
+			return -1;
+		}
+		if ((tmp_str->s = (char *)pkg_malloc(sizeof(char) * pres_uri.len)) == NULL)
+		{
+			pkg_free(tmp_str);
+			LM_ERR("out of private memory\n");
+			return -1;
+		}
+		memcpy(tmp_str->s, pres_uri.s, pres_uri.len);
+		tmp_str->len = pres_uri.len;
+
+		rls_contact_list = list_insert(tmp_str, rls_contact_list);
+	}
+
 	return pua_send_subscribe((subs_info_t*)param);
 	return pua_send_subscribe((subs_info_t*)param);
 }
 }
 
 
@@ -883,6 +909,7 @@ int resource_subscriptions(subs_t* subs, xmlNodePtr xmlnode)
 	str wuri= {0, 0};
 	str wuri= {0, 0};
 	str extra_headers;
 	str extra_headers;
 	str did_str= {0, 0};
 	str did_str= {0, 0};
+	str *tmp_str;
 		
 		
 	/* if is initial send an initial Subscribe 
 	/* if is initial send an initial Subscribe 
 	 * else search in hash table for a previous subscription */
 	 * else search in hash table for a previous subscription */
@@ -923,6 +950,21 @@ int resource_subscriptions(subs_t* subs, xmlNodePtr xmlnode)
 
 
 	s.internal_update_flag = subs->internal_update_flag;
 	s.internal_update_flag = subs->internal_update_flag;
 
 
+	if (subs->internal_update_flag == INTERNAL_UPDATE_TRUE)
+	{
+		if (rls_contact_list != NULL)
+		{
+			LM_WARN("contact list is not empty\n");
+			list_free(&rls_contact_list);
+		}
+
+		if (rls_subs_list != NULL)
+		{
+			LM_WARN("subscriber list is not empty\n");
+			list_free(&rls_subs_list);
+		}
+	}
+
 	counter = 0;
 	counter = 0;
 	
 	
 	if(process_list_and_exec(xmlnode, subs->from_user, subs->from_domain,
 	if(process_list_and_exec(xmlnode, subs->from_user, subs->from_domain,
@@ -936,6 +978,31 @@ int resource_subscriptions(subs_t* subs, xmlNodePtr xmlnode)
 		LM_WARN("%.*s has too many contacts.  Max: %d, has: %d\n",
 		LM_WARN("%.*s has too many contacts.  Max: %d, has: %d\n",
 			wuri.len, wuri.s, rls_max_backend_subs, counter);
 			wuri.len, wuri.s, rls_max_backend_subs, counter);
 
 
+	if (s.internal_update_flag == INTERNAL_UPDATE_TRUE)
+	{
+		counter = 0;
+		s.internal_update_flag = 0;
+
+		rls_subs_list = pua_get_subs_list(&did_str);
+
+		while ((tmp_str = list_pop(&rls_contact_list)) != NULL)
+		{
+			LM_DBG("Finding and removing %.*s from subscription list\n", tmp_str->len, tmp_str->s);
+			rls_subs_list = list_remove(*tmp_str, rls_subs_list);
+			pkg_free(tmp_str->s);
+			pkg_free(tmp_str);
+		}
+
+		while ((tmp_str = list_pop(&rls_subs_list)) != NULL)
+		{
+			LM_DBG("Removing subscription for %.*s\n", tmp_str->len, tmp_str->s);
+			s.expires = 0;
+			send_resource_subs(tmp_str->s, (void*)(&s));
+			pkg_free(tmp_str->s);
+			pkg_free(tmp_str);
+		}
+	}
+
 	pkg_free(wuri.s);
 	pkg_free(wuri.s);
 	pkg_free(did_str.s);
 	pkg_free(did_str.s);