Bläddra i källkod

modules_k/presence: added new functionality to the presence engine

The new "dbmode" parameter controls the way subscriptions are handled: in memory, fallback to database, or database only.
The "fallback2db" parameter will still ve available for config compatibility.
Marius Bucur 14 år sedan
förälder
incheckning
35568faf70

+ 59 - 35
modules_k/presence/README

@@ -47,10 +47,11 @@ Juha Heinanen
               3.9. max_expires (int)
               3.10. server_address (str)
               3.11. fallback2db (int)
-              3.12. subs_htable_size (int)
-              3.13. pres_htable_size (int)
-              3.14. enable_sphere_check (int)
-              3.15. timeout_rm_subs (int)
+              3.12. dbmode (int)
+              3.13. subs_htable_size (int)
+              3.14. pres_htable_size (int)
+              3.15. enable_sphere_check (int)
+              3.16. timeout_rm_subs (int)
 
         4. Exported Functions
 
@@ -98,15 +99,16 @@ Juha Heinanen
    1.9. Set max_expires parameter
    1.10. Set server_address parameter
    1.11. Set fallback2db parameter
-   1.12. Set subs_htable_size parameter
-   1.13. Set pres_htable_size parameter
-   1.14. Set enable_sphere_check parameter
-   1.15. Set timeout_rm_subs parameter
-   1.16. handle_publish usage
-   1.17. handle_subscribe usage
-   1.18. pres_auth_status usage
-   1.19. pres_refresh_watchers usage
-   1.20. pres_update_watchers usage
+   1.12. Set dbmode parameter
+   1.13. Set subs_htable_size parameter
+   1.14. Set pres_htable_size parameter
+   1.15. Set enable_sphere_check parameter
+   1.16. Set timeout_rm_subs parameter
+   1.17. handle_publish usage
+   1.18. handle_subscribe usage
+   1.19. pres_auth_status usage
+   1.20. pres_refresh_watchers usage
+   1.21. pres_update_watchers usage
    2.1. presence_api_t structure
 
 Chapter 1. Admin Guide
@@ -132,10 +134,11 @@ Chapter 1. Admin Guide
         3.9. max_expires (int)
         3.10. server_address (str)
         3.11. fallback2db (int)
-        3.12. subs_htable_size (int)
-        3.13. pres_htable_size (int)
-        3.14. enable_sphere_check (int)
-        3.15. timeout_rm_subs (int)
+        3.12. dbmode (int)
+        3.13. subs_htable_size (int)
+        3.14. pres_htable_size (int)
+        3.15. enable_sphere_check (int)
+        3.16. timeout_rm_subs (int)
 
    4. Exported Functions
 
@@ -172,7 +175,13 @@ Chapter 1. Admin Guide
    mode, in case a searched record is not found in cache, the search is
    continued in database. This is useful for an architecture in which
    processing and memory load might be divided on several Kamailio
-   instances, maybe on different servers using the same database.
+   instances, maybe on different servers using the same database. This
+   parameter remains only for legacy purposes. As a new feature for the
+   presence engine, it is possible to have three database modes, which one
+   can configure through the dbmode parameter. Setting dbmode to 0, 1, 2
+   respective will cause the subscribers to be retrieved from memory only,
+   from memory and to fallback to database mode in case a record is not
+   found in memory, and from database only.
 
    The module implements several API functions, that can be used by other
    modules. In fact, it can be used only as a resource module, or
@@ -211,10 +220,11 @@ Chapter 1. Admin Guide
    3.9. max_expires (int)
    3.10. server_address (str)
    3.11. fallback2db (int)
-   3.12. subs_htable_size (int)
-   3.13. pres_htable_size (int)
-   3.14. enable_sphere_check (int)
-   3.15. timeout_rm_subs (int)
+   3.12. dbmode (int)
+   3.13. subs_htable_size (int)
+   3.14. pres_htable_size (int)
+   3.15. enable_sphere_check (int)
+   3.16. timeout_rm_subs (int)
 
 3.1. db_url(str)
 
@@ -352,7 +362,21 @@ modparam("presence", "server_address", "sip:10.10.10.10:5060")
 modparam("presence", "fallback2db", 1)
 ...
 
-3.12. subs_htable_size (int)
+3.12. dbmode (int)
+
+   This parameter sets the mode in which records are retrieved. Setting
+   this parameter to 0 or 1 is equivalent to setting fallback2db to 0 or
+   1, respectiv. The dbmode parameter can also take a third value, 2, in
+   which records are retrieved from database only. So, the three database
+   modes in which the presence engine can operate are: memory only,
+   fallback to database, and database only.
+
+   Example 1.12. Set dbmode parameter
+...
+modparam("presence", "dbmode", 2)
+...
+
+3.13. subs_htable_size (int)
 
    The size of the in-memory hash table to store subscription dialogs.
    This parameter will be used as the power of 2 when computing table
@@ -360,24 +384,24 @@ modparam("presence", "fallback2db", 1)
 
    Default value is “9 (512)”.
 
-   Example 1.12. Set subs_htable_size parameter
+   Example 1.13. Set subs_htable_size parameter
 ...
 modparam("presence", "subs_htable_size", 11)
 ...
 
-3.13. pres_htable_size (int)
+3.14. pres_htable_size (int)
 
    The size of the in-memory hash table to store publish records. This
    parameter will be used as the power of 2 when computing table size.
 
    Default value is “9 (512)”.
 
-   Example 1.13. Set pres_htable_size parameter
+   Example 1.14. Set pres_htable_size parameter
 ...
 modparam("presence", "pres_htable_size", 11)
 ...
 
-3.14. enable_sphere_check (int)
+3.15. enable_sphere_check (int)
 
    This parameter is a flag that should be set if permission rules include
    sphere checking. The sphere information is expected to be present in
@@ -387,12 +411,12 @@ modparam("presence", "pres_htable_size", 11)
 
    Default value is “0 ”.
 
-   Example 1.14. Set enable_sphere_check parameter
+   Example 1.15. Set enable_sphere_check parameter
 ...
 modparam("presence", "enable_sphere_check", 1)
 ...
 
-3.15. timeout_rm_subs (int)
+3.16. timeout_rm_subs (int)
 
    This parameter is a flag that should be set if subscriptions should be
    removed from the active_watchers when a NOTIFY times out. RFC3265
@@ -402,7 +426,7 @@ modparam("presence", "enable_sphere_check", 1)
 
    Default value is “1”.
 
-   Example 1.15. Set timeout_rm_subs parameter
+   Example 1.16. Set timeout_rm_subs parameter
 ...
 modparam("presence", "timeout_rm_subs", 0)
 ...
@@ -435,7 +459,7 @@ modparam("presence", "timeout_rm_subs", 0)
 
    The module sends an appropriate stateless reply in all cases.
 
-   Example 1.16. handle_publish usage
+   Example 1.17. handle_publish usage
 ...
         if(is_method("PUBLISH"))
         {
@@ -461,7 +485,7 @@ modparam("presence", "timeout_rm_subs", 0)
 
    The module sends an appropriate stateless reply in all cases.
 
-   Example 1.17. handle_subscribe usage
+   Example 1.18. handle_subscribe usage
 ...
 if(method=="SUBSCRIBE")
     handle_subscribe();
@@ -478,7 +502,7 @@ if(method=="SUBSCRIBE")
 
    This function can be used from REQUEST_ROUTE.
 
-   Example 1.18. pres_auth_status usage
+   Example 1.19. pres_auth_status usage
 ...
 if (method=="MESSAGE") {
     pres_auth_status("$fu", $ru");
@@ -509,7 +533,7 @@ if (method=="MESSAGE") {
 
    This function can be used from ANY_ROUTE.
 
-   Example 1.19. pres_refresh_watchers usage
+   Example 1.20. pres_refresh_watchers usage
 ...
 pres_refresh_watchers("sip:[email protected]", "presence", 1);
 ...
@@ -527,7 +551,7 @@ pres_refresh_watchers("sip:[email protected]", "presence", 1);
 
    This function can be used from ANY_ROUTE.
 
-   Example 1.20. pres_update_watchers usage
+   Example 1.21. pres_update_watchers usage
 ...
 pres_update_watchers("sip:[email protected]", "presence");
 ...

+ 21 - 1
modules_k/presence/doc/presence_admin.xml

@@ -32,6 +32,10 @@
 	found in cache, the search is continued	in database. This is useful for
 	an architecture in which processing and memory load might be divided on 
 	several &kamailio; instances, maybe on different servers using the same database.
+        This parameter remains only for legacy purposes. As a new feature for the presence engine, it is possible
+        to have three database modes, which one can configure through the dbmode parameter.
+        Setting dbmode to 0, 1, 2 respective will cause the subscribers to be retrieved from memory only,
+        from memory and to fallback to database mode in case a record is not found in memory, and from database only.
 	</para>
 	<para>The module implements several API functions, that can be used by other
 	modules. In fact, it can be used only as a resource module, or "library".
@@ -294,7 +298,23 @@ modparam("presence", "fallback2db", 1)
 </programlisting>
 		</example>
 	</section>
-
+<section>
+		<title><varname>dbmode</varname> (int)</title>
+		<para>
+		This parameter sets the mode in which records are retrieved.
+                Setting this parameter to 0 or 1 is equivalent to setting fallback2db to 0 or 1, respectiv.
+                The dbmode parameter can also take a third value, 2, in which records are retrieved from database only.
+                So, the three database modes in which the presence engine can operate are: memory only, fallback to database, and database only.
+		</para>
+		<example>
+		<title>Set <varname>dbmode</varname> parameter</title>
+		<programlisting format="linespecific">
+...
+modparam("presence", "dbmode", 2)
+...
+</programlisting>
+		</example>
+	</section>
 	<section>
 		<title><varname>subs_htable_size</varname> (int)</title>
 		<para>

+ 2 - 2
modules_k/presence/hash.c

@@ -261,8 +261,8 @@ int insert_shtable(shtable_t htable,unsigned int hash_code, subs_t* subs)
 	}
 
 	new_rec->expires+= (int)time(NULL);
-	if(fallback2db!=0) {
-		if(new_rec->db_flag==0)
+	if(dbmode == DB_FALLBACK) {
+		if(new_rec->db_flag == 0)
 			new_rec->db_flag = INSERTDB_FLAG;
 	} else {
 		new_rec->db_flag = NO_UPDATEDB_FLAG;

+ 77 - 67
modules_k/presence/notify.c

@@ -380,7 +380,7 @@ str* get_wi_notify_body(subs_t* subs, subs_t* watcher_subs)
 		goto done;
 	}
 
-	if(fallback2db)
+	if(dbmode != DB_MEMORY_ONLY)
 	{
 		if(get_wi_subs_db(subs, watchers)< 0)
 		{
@@ -405,7 +405,7 @@ str* get_wi_notify_body(subs_t* subs, subs_t* watcher_subs)
 			continue;
 		}
 
-		if(fallback2db && s->db_flag!= INSERTDB_FLAG)
+		if(dbmode != DB_MEMORY_ONLY && s->db_flag!= INSERTDB_FLAG)
 		{
 			LM_DBG("record already found in database\n");
 			continue;
@@ -641,7 +641,7 @@ str* get_p_notify_body(str pres_uri, pres_ev_t* event, str* etag,
 	if(search_phtable(&pres_uri, event->evp->type, hash_code)== NULL)
 	{
 		LM_DBG("No record exists in hash_table\n");
-		if(fallback2db)
+		if(dbmode != DB_MEMORY_ONLY)
 			goto db_query;
 
 		/* for pidf manipulation */
@@ -1226,10 +1226,12 @@ subs_t* get_subs_dialog(str* pres_uri, pres_ev_t* event, str* sender)
 	subs_t* s_array= NULL;
 	int n= 0, i= 0;
 	
-	/* if fallback2db -> should take all dialogs from db
-	 * and the only those dialogs from cache with db_flag= INSERTDB_FLAG */
+	/* if in memory mode, should take the subscriptions from the hashtable only
+	   in dbonly mode should take all dialogs from db
+	   in fallback mode, should take those dialogs with db_flag = INSERTDB_FLAG
+	*/
 
-	if(fallback2db)
+	if(dbmode != DB_MEMORY_ONLY)
 	{
 		if(get_subs_db(pres_uri, event, sender, &s_array, &n)< 0)			
 		{
@@ -1237,68 +1239,73 @@ subs_t* get_subs_dialog(str* pres_uri, pres_ev_t* event, str* sender)
 			goto error;
 		}
 	}
-	hash_code= core_hash(pres_uri, &event->name, shtable_size);
 	
-	lock_get(&subs_htable[hash_code].lock);
-
-	s= subs_htable[hash_code].entries;
-
-	while(s->next)
+	if(dbmode != DB_ONLY)
 	{
-		s= s->next;
-	
-		printf_subs(s);
+		hash_code= core_hash(pres_uri, &event->name, shtable_size);
 		
-		if(s->expires< (int)time(NULL))
-		{
-			LM_DBG("expired subs\n");
-			continue;
-		}
-		
-		if((!(s->status== ACTIVE_STATUS &&
-            s->reason.len== 0 &&
-			s->event== event && s->pres_uri.len== pres_uri->len &&
-			strncmp(s->pres_uri.s, pres_uri->s, pres_uri->len)== 0)) || 
-			(sender && sender->len== s->contact.len && 
-			strncmp(sender->s, s->contact.s, sender->len)== 0))
-			continue;
+		lock_get(&subs_htable[hash_code].lock);
 
-		if(fallback2db)
+		s= subs_htable[hash_code].entries;
+
+		while(s->next)
 		{
-			if(s->db_flag== NO_UPDATEDB_FLAG)
+			s= s->next;
+		
+			printf_subs(s);
+			
+			if(s->expires< (int)time(NULL))
 			{
-				LM_DBG("s->db_flag==NO_UPDATEDB_FLAG\n");
+				LM_DBG("expired subs\n");
 				continue;
 			}
 			
-			if(s->db_flag== UPDATEDB_FLAG)
+			if((!(s->status== ACTIVE_STATUS &&
+		    s->reason.len== 0 &&
+				s->event== event && s->pres_uri.len== pres_uri->len &&
+				strncmp(s->pres_uri.s, pres_uri->s, pres_uri->len)== 0)) || 
+				(sender && sender->len== s->contact.len && 
+				strncmp(sender->s, s->contact.s, sender->len)== 0))
+				continue;
+
+			if(dbmode == DB_FALLBACK)
 			{
-				LM_DBG("s->db_flag== UPDATEDB_FLAG\n");
-				if(n>0 && update_in_list(s, s_array, i, n)< 0)
+				if(s->db_flag== NO_UPDATEDB_FLAG)
 				{
-					LM_DBG("dialog not found in list fetched from database\n");
-					/* insert record */
+					LM_DBG("s->db_flag==NO_UPDATEDB_FLAG\n");
+					continue;
+				}
+				
+				if(s->db_flag== UPDATEDB_FLAG)
+				{
+					LM_DBG("s->db_flag== UPDATEDB_FLAG\n");
+					if(n>0 && update_in_list(s, s_array, i, n)< 0)
+					{
+						LM_DBG("dialog not found in list fetched from database\n");
+						/* insert record */
+					}
+					else
+						continue;			
 				}
-				else
-					continue;			
 			}
+			
+			LM_DBG("s->db_flag= INSERTDB_FLAG\n");
+			s_new= mem_copy_subs(s, PKG_MEM_TYPE);
+			if(s_new== NULL)
+			{
+				LM_ERR("copying subs_t structure\n");
+				lock_release(&subs_htable[hash_code].lock);
+				goto error;
+			}
+			s_new->expires-= (int)time(NULL);
+			s_new->next= s_array;
+			s_array= s_new;
+			i++;
 		}
 		
-		LM_DBG("s->db_flag= INSERTDB_FLAG\n");
-		s_new= mem_copy_subs(s, PKG_MEM_TYPE);
-		if(s_new== NULL)
-		{
-			LM_ERR("copying subs_t structure\n");
-			lock_release(&subs_htable[hash_code].lock);
-			goto error;
-		}
-		s_new->expires-= (int)time(NULL);
-		s_new->next= s_array;
-		s_array= s_new;
-		i++;
-	}
-
 	lock_release(&subs_htable[hash_code].lock);
+	}
+	
 	LM_DBG("found %d dialogs( %d in database and %d in hash_table)\n",n+i,n,i);
 
 	return s_array;
@@ -1643,30 +1650,33 @@ int notify(subs_t* subs, subs_t * watcher_subs,str* n_body,int force_null_body)
 		unsigned int hash_code;
 		hash_code= core_hash(&subs->pres_uri, &subs->event->name, shtable_size);
 
-		if(update_shtable(subs_htable, hash_code, subs, LOCAL_TYPE)< 0)
+		/* if subscriptions are held also in memory, update the subscription hashtable */
+		if(dbmode != DB_ONLY)
 		{
-			if(subs->db_flag!= INSERTDB_FLAG && fallback2db)
+			if(update_shtable(subs_htable, hash_code, subs, LOCAL_TYPE) < 0 && dbmode == DB_MEMORY_ONLY)
 			{
-				LM_DBG("record not found in subs htable\n");
-				if(update_subs_db(subs, LOCAL_TYPE)< 0)
-				{
-					LM_ERR("updating subscription in database\n");
-					return -1;
-				}
+				/* subscriptions are held only in memory, and hashtable update failed */
+				LM_ERR("updating subscription record in hash table\n");
+				return -1;
 			}
-			else
+		}
+		/* if dbonly mode, or if fallback2db mode and the subscription was inserted into the database */
+		if((dbmode == DB_ONLY) || (subs->db_flag != INSERTDB_FLAG && dbmode == DB_FALLBACK))
+		{
+			LM_DBG("updating subscription to database\n");
+			if(update_subs_db(subs, LOCAL_TYPE)< 0)
 			{
-				LM_ERR("record not found in subs htable\n");
+				LM_ERR("updating subscription in database\n");
 				return -1;
 			}
 		}
 	}
      
-    if(subs->reason.s && subs->status== ACTIVE_STATUS && 
-        subs->reason.len== 12 && strncmp(subs->reason.s, "polite-block", 12)== 0)
-    {
-        force_null_body = 1;
-    }
+	if(subs->reason.s && subs->status== ACTIVE_STATUS && 
+	subs->reason.len== 12 && strncmp(subs->reason.s, "polite-block", 12)== 0)
+	{
+		force_null_body = 1;
+	}
 
 	if(send_notify_request(subs, watcher_subs, n_body, force_null_body)< 0)
 	{

+ 10 - 1
modules_k/presence/presence.c

@@ -69,6 +69,7 @@
 #include "../../lib/kmi/mi.h"
 #include "../../lib/kcore/hash_func.h"
 #include "../pua/hash.h"
+#include "presence.h"
 #include "publish.h"
 #include "subscribe.h"
 #include "event_list.h"
@@ -136,7 +137,8 @@ int expires_offset = 0;
 int max_expires= 3600;
 int shtable_size= 9;
 shtable_t subs_htable= NULL;
-int fallback2db= 0;
+int dbmode = 0;
+int fallback2db = 0;
 int sphere_enable= 0;
 int timeout_rm_subs = 1;
 
@@ -175,6 +177,7 @@ static param_export_t params[]={
 	{ "server_address",         STR_PARAM, &server_address.s},
 	{ "subs_htable_size",       INT_PARAM, &shtable_size},
 	{ "pres_htable_size",       INT_PARAM, &phtable_size},
+	{ "dbmode",                 INT_PARAM, &dbmode},
 	{ "fallback2db",            INT_PARAM, &fallback2db},
 	{ "enable_sphere_check",    INT_PARAM, &sphere_enable},
 	{ "timeout_rm_subs",        INT_PARAM, &timeout_rm_subs},
@@ -358,6 +361,12 @@ static int mod_init(void)
 	if(pa_db)
 		pa_dbf.close(pa_db);
 	pa_db = NULL;
+	
+	/* for legacy, we also keep the fallback2db parameter, but make sure for consistency */
+	if(fallback2db)
+	{
+		dbmode = DB_FALLBACK;
+	}
 
 	return 0;
 }

+ 10 - 1
modules_k/presence/presence.h

@@ -44,6 +44,15 @@
 #include "event_list.h"
 #include "hash.h"
 
+/* DB modes */
+
+/* subscriptions are held in memory and periodically updated to db, but retrieved from db only at startup */
+#define DB_MEMORY_ONLY 0
+/* same as memory_only, but if a subscription is not found, it falls back to db */
+#define DB_FALLBACK 1
+/* subscriptions are held only in database */
+#define DB_ONLY 2
+
 /* TM bind */
 extern struct tm_binds tmb;
 
@@ -66,7 +75,7 @@ extern char *to_tag_pref;
 extern int expires_offset;
 extern str server_address;
 extern int max_expires;
-extern int fallback2db;
+extern int dbmode;
 extern int sphere_enable;
 extern int timeout_rm_subs;
 extern int shtable_size;

+ 2 - 2
modules_k/presence/presentity.c

@@ -896,8 +896,8 @@ char* get_sphere(str* pres_uri)
 	lock_release(&pres_htable[hash_code].lock);
 
 
-	/* if record not found and fallback2db query database*/
-	if(!fallback2db)
+	/* if record not found and subscriptions are held also in database, query database*/
+	if(dbmode == DB_MEMORY_ONLY)
 	{
 		return NULL;
 	}

+ 196 - 22
modules_k/presence/subscribe.c

@@ -180,6 +180,152 @@ int delete_db_subs(str pres_uri, str ev_stored_name, str to_tag)
 	return 0;
 }
 
+int insert_subs_db(subs_t* s, int type)
+{
+	db_key_t query_cols[22];
+	db_val_t query_vals[22];
+	int n_query_cols = 0;
+	int pres_uri_col, to_user_col, to_domain_col, from_user_col, from_domain_col,
+		callid_col, totag_col, fromtag_col, event_col,status_col, event_id_col, 
+		local_cseq_col, remote_cseq_col, expires_col, record_route_col, 
+		contact_col, local_contact_col, version_col,socket_info_col,reason_col;
+		
+	if(pa_dbf.use_table(pa_db, &active_watchers_table)< 0)
+	{
+		LM_ERR("sql use table failed\n");
+		return -1;
+	}
+	
+	query_cols[pres_uri_col= n_query_cols] =&str_presentity_uri_col;
+	query_vals[pres_uri_col].type = DB1_STR;
+	query_vals[pres_uri_col].nul = 0;
+	n_query_cols++;
+	
+	query_cols[callid_col= n_query_cols] =&str_callid_col;
+	query_vals[callid_col].type = DB1_STR;
+	query_vals[callid_col].nul = 0;
+	n_query_cols++;
+
+	query_cols[totag_col= n_query_cols] =&str_to_tag_col;
+	query_vals[totag_col].type = DB1_STR;
+	query_vals[totag_col].nul = 0;
+	n_query_cols++;
+
+	query_cols[fromtag_col= n_query_cols] =&str_from_tag_col;
+	query_vals[fromtag_col].type = DB1_STR;
+	query_vals[fromtag_col].nul = 0;
+	n_query_cols++;
+
+	query_cols[to_user_col= n_query_cols] =&str_to_user_col;
+	query_vals[to_user_col].type = DB1_STR;
+	query_vals[to_user_col].nul = 0;
+	n_query_cols++;
+
+	query_cols[to_domain_col= n_query_cols] =&str_to_domain_col;
+	query_vals[to_domain_col].type = DB1_STR;
+	query_vals[to_domain_col].nul = 0;
+	n_query_cols++;
+	
+	query_cols[from_user_col= n_query_cols] =&str_watcher_username_col;
+	query_vals[from_user_col].type = DB1_STR;
+	query_vals[from_user_col].nul = 0;
+	n_query_cols++;
+
+	query_cols[from_domain_col= n_query_cols] =&str_watcher_domain_col;
+	query_vals[from_domain_col].type = DB1_STR;
+	query_vals[from_domain_col].nul = 0;
+	n_query_cols++;
+
+	query_cols[event_col= n_query_cols] =&str_event_col;
+	query_vals[event_col].type = DB1_STR;
+	query_vals[event_col].nul = 0;
+	n_query_cols++;	
+
+	query_cols[event_id_col= n_query_cols] =&str_event_id_col;
+	query_vals[event_id_col].type = DB1_STR;
+	query_vals[event_id_col].nul = 0;
+	n_query_cols++;
+
+	query_cols[local_cseq_col= n_query_cols]=&str_local_cseq_col;
+	query_vals[local_cseq_col].type = DB1_INT;
+	query_vals[local_cseq_col].nul = 0;
+	n_query_cols++;
+
+	query_cols[remote_cseq_col= n_query_cols]=&str_remote_cseq_col;
+	query_vals[remote_cseq_col].type = DB1_INT;
+	query_vals[remote_cseq_col].nul = 0;
+	n_query_cols++;
+
+	query_cols[expires_col= n_query_cols] =&str_expires_col;
+	query_vals[expires_col].type = DB1_INT;
+	query_vals[expires_col].nul = 0;
+	n_query_cols++;
+
+	query_cols[status_col= n_query_cols] =&str_status_col;
+	query_vals[status_col].type = DB1_INT;
+	query_vals[status_col].nul = 0;
+	n_query_cols++;
+
+	query_cols[reason_col= n_query_cols] =&str_reason_col;
+	query_vals[reason_col].type = DB1_STR;
+	query_vals[reason_col].nul = 0;
+	n_query_cols++;
+
+	query_cols[record_route_col= n_query_cols] =&str_record_route_col;
+	query_vals[record_route_col].type = DB1_STR;
+	query_vals[record_route_col].nul = 0;
+	n_query_cols++;
+	
+	query_cols[contact_col= n_query_cols] =&str_contact_col;
+	query_vals[contact_col].type = DB1_STR;
+	query_vals[contact_col].nul = 0;
+	n_query_cols++;
+
+	query_cols[local_contact_col= n_query_cols] =&str_local_contact_col;
+	query_vals[local_contact_col].type = DB1_STR;
+	query_vals[local_contact_col].nul = 0;
+	n_query_cols++;
+
+	query_cols[socket_info_col= n_query_cols] =&str_socket_info_col;
+	query_vals[socket_info_col].type = DB1_STR;
+	query_vals[socket_info_col].nul = 0;
+	n_query_cols++;
+
+	query_cols[version_col= n_query_cols]=&str_version_col;
+	query_vals[version_col].type = DB1_INT;
+	query_vals[version_col].nul = 0;
+	n_query_cols++;
+	
+	query_vals[pres_uri_col].val.str_val= s->pres_uri;
+	query_vals[callid_col].val.str_val= s->callid;
+	query_vals[totag_col].val.str_val= s->to_tag;
+	query_vals[fromtag_col].val.str_val= s->from_tag;
+	query_vals[to_user_col].val.str_val = s->to_user;
+	query_vals[to_domain_col].val.str_val = s->to_domain;
+	query_vals[from_user_col].val.str_val = s->from_user;
+	query_vals[from_domain_col].val.str_val = s->from_domain;
+	query_vals[event_col].val.str_val = s->event->name;
+	query_vals[event_id_col].val.str_val = s->event_id;
+	query_vals[local_cseq_col].val.int_val= s->local_cseq;
+	query_vals[remote_cseq_col].val.int_val= s->remote_cseq;
+	query_vals[expires_col].val.int_val = s->expires + (int)time(NULL);
+	query_vals[record_route_col].val.str_val = s->record_route;
+	query_vals[contact_col].val.str_val = s->contact;
+	query_vals[local_contact_col].val.str_val = s->local_contact;
+	query_vals[version_col].val.int_val= s->version;
+	query_vals[status_col].val.int_val= s->status;
+	query_vals[reason_col].val.str_val= s->reason;
+	query_vals[socket_info_col].val.str_val= s->sockinfo_str;
+
+	LM_DBG("inserting subscription in active_watchers table\n");
+	if(pa_dbf.insert(pa_db, query_cols, query_vals, n_query_cols) < 0)
+	{
+		LM_ERR("unsuccessful sql insert\n");
+		return -1;
+	}
+	return 0;
+}
+
 int update_subs_db(subs_t* subs, int type)
 {
 	db_key_t query_cols[22], update_keys[7];
@@ -316,10 +462,11 @@ int update_subscription(struct sip_msg* msg, subs_t* subs, int to_tag_gen,
 				LM_ERR("deleting subscription record from database\n");
 				goto error;
 			}
-			/* delete record from hash table also */
-
-			subs->local_cseq= delete_shtable(subs_htable,hash_code,
-					subs->to_tag);
+			/* delete record from hash table also if not in dbonly mode */
+			if(dbmode != DB_ONLY)
+			{
+				subs->local_cseq= delete_shtable(subs_htable, hash_code, subs->to_tag);
+			}
 		
 			if(subs->event->type & PUBL_TYPE)
 			{	
@@ -359,16 +506,21 @@ int update_subscription(struct sip_msg* msg, subs_t* subs, int to_tag_gen,
 			}
 			return 1;
 		}
-
-		if(update_shtable(subs_htable, hash_code, subs, REMOTE_TYPE)< 0)
+		/* if subscribers are held in memory, update them */
+		if(dbmode != DB_ONLY)
 		{
-			if(fallback2db==0)
+			if(update_shtable(subs_htable, hash_code, subs, REMOTE_TYPE)< 0)
 			{
-				LM_ERR("updating subscription record in hash table\n");
-				goto error;
+				/* if subscribers are also retrieved from database, it is not a fatal error */
+				if(dbmode != DB_MEMORY_ONLY)
+				{
+					LM_ERR("updating subscription record in hash table\n");
+					goto error;
+				}
 			}
 		}
-		if(fallback2db!=0)
+		/* if subscribers are retrieved from db also, update the subscription in database immediately */
+		if(dbmode != DB_MEMORY_ONLY)
 		{
 			/* update in database table */
 			if(update_subs_db(subs, REMOTE_TYPE)< 0)
@@ -380,16 +532,30 @@ int update_subscription(struct sip_msg* msg, subs_t* subs, int to_tag_gen,
 	}
 	else
 	{
+		LM_DBG("subscription not in dialog\n");
 		if(subs->expires!= 0)
-		{	
-			if(insert_shtable(subs_htable,hash_code,subs)< 0)
+		{
+			if(dbmode != DB_ONLY)
 			{
-				LM_ERR("inserting new record in subs_htable\n");
-				goto error;
+				LM_DBG("inserting in shtable\n");
+				if(insert_shtable(subs_htable,hash_code,subs)< 0)
+				{
+					LM_ERR("inserting new record in subs_htable\n");
+					goto error;
+				}
 			}
+			else
+			{
+				if(insert_subs_db(subs, REMOTE_TYPE));
+			}
+			/* TODO if req_auth, the subscription was in the watcher table first, we must delete it */
 		}
 		/*otherwise there is a subscription outside a dialog with expires= 0 
 		 * no update in database, but should try to send Notify */
+		else
+		{
+			LM_DBG("subscription request with expiry=0 not in dialog\n");
+		}
 	}
 
 /* reply_and_notify  */
@@ -527,7 +693,7 @@ int handle_subscribe(struct sip_msg* msg, char* str1, char* str2)
 	reply_code= 500;
 	reply_str= pu_500_rpl;
 
-	if( parse_headers(msg,HDR_EOH_F, 0)==-1 )
+	if(parse_headers(msg,HDR_EOH_F, 0) == -1)
 	{
 		LM_ERR("parsing headers\n");
 		reply_code= 400;
@@ -602,7 +768,7 @@ int handle_subscribe(struct sip_msg* msg, char* str1, char* str2)
 			goto error;
 		}
 		reason= subs.reason;
-	}	
+	}
 
 	/* call event specific subscription handling */
 	if(event->evs_subs_handl)
@@ -653,7 +819,7 @@ int handle_subscribe(struct sip_msg* msg, char* str1, char* str2)
 					LM_ERR("in event specific function is_watcher_allowed\n");
 					goto error;
 				}
-				if(get_status_str(subs.status)== NULL)
+				if(get_status_str(subs.status) == NULL)
 				{
 					LM_ERR("wrong status= %d\n", subs.status);
 					goto error;
@@ -678,7 +844,7 @@ int handle_subscribe(struct sip_msg* msg, char* str1, char* str2)
 		LM_ERR("wrong status\n");
 		goto error;
 	}
-    LM_DBG("subscription status= %s - %s\n", get_status_str(subs.status), 
+	LM_DBG("subscription status= %s - %s\n", get_status_str(subs.status), 
             found==0?"inserted":"found in watcher table");
 	
 	if(update_subscription(msg, &subs, to_tag_gen, &sent_reply) <0)
@@ -1045,7 +1211,7 @@ int get_stored_info(struct sip_msg* msg, subs_t* subs, int* reply_code,
 		lock_release(&subs_htable[i].lock);
 	}
 
-	if(fallback2db)
+	if(dbmode == DB_FALLBACK)
 	{
 		return get_database_info(msg, subs, reply_code, reply_str);	
 	}
@@ -1326,7 +1492,7 @@ int handle_expired_subs(subs_t* s)
 void timer_db_update(unsigned int ticks,void *param)
 {	
 	int no_lock=0;
-
+	LM_DBG("db_update timer\n");
 	if(ticks== 0 && param == NULL)
 		no_lock= 1;
 	
@@ -1498,6 +1664,13 @@ void update_db_subs(db1_con_t *db,db_func_t dbf, shtable_t hash_table,
 		LM_ERR("null database connection\n");
 		return;
 	}
+	
+	/* if in dbonly mode, no update to database is required */
+	if(dbmode == DB_ONLY)
+	{
+		goto delete_expired_subs;
+	}
+	
 	for(i=0; i<htable_size; i++) 
 	{
 		if(!no_lock)
@@ -1602,6 +1775,7 @@ void update_db_subs(db1_con_t *db,db_func_t dbf, shtable_t hash_table,
 			lock_release(&hash_table[i].lock);	
 	}
 
+delete_expired_subs:
 	update_vals[0].val.int_val= (int)time(NULL) - expires_offset;
 	update_ops[0]= OP_LT;
 	if(dbf.delete(db, update_cols, update_ops, update_vals, 1) < 0)
@@ -1801,7 +1975,7 @@ int restore_db_subs(void)
 			s.sockinfo_str.s=(char*)row_vals[sockinfo_col].val.string_val;
 			s.sockinfo_str.len= strlen(s.sockinfo_str.s);
 
-			if(fallback2db!=0)
+			if(dbmode == DB_FALLBACK)
 				s.db_flag = NO_UPDATEDB_FLAG;
 			hash_code= core_hash(&s.pres_uri, &s.event->name, shtable_size);
 			if(insert_shtable(subs_htable, hash_code, &s)< 0)
@@ -1828,7 +2002,7 @@ int restore_db_subs(void)
 	pa_dbf.free_result(pa_db, result);
 
 	/* delete all records */
-	if(fallback2db==0 && pa_dbf.delete(pa_db, 0,0,0,0)< 0)
+	if(dbmode != DB_MEMORY_ONLY && pa_dbf.delete(pa_db, 0,0,0,0)< 0)
 	{
 		LM_ERR("deleting all records from database table\n");
 		return -1;

+ 13 - 0
modules_k/presence_conference/test_framework/presence_tests/clean_subscribers.sh

@@ -0,0 +1,13 @@
+#!/bin/bash
+killall -9 sipp &> /dev/null
+
+mysql_username="-u${MYSQL_USERNAME}"
+if [[ ${MYSQL_PASSWORD} ]]; then
+	mysql_pass="-p$2"
+	echo "da"
+else
+	mysql_pass=""
+fi
+
+mysql ${mysql_username} ${mysql_pass} ${MYSQL_DATABASE} -e 'truncate table watchers'
+mysql ${mysql_username} ${mysql_pass} ${MYSQL_DATABASE} -e 'truncate table active_watchers'

+ 12 - 0
modules_k/presence_conference/test_framework/presence_tests/config.sh

@@ -0,0 +1,12 @@
+#!/bin/bash
+
+export MYSQL_USERNAME="root"
+export MYSQL_PASSWORD=""
+export MYSQL_DATABASE="openser"
+export EXIT_FAILURE=2
+export SIPP_SCEN_DIR="../"
+export MI_HOST="127.0.0.1"
+export KAMAILIO_HOST="127.0.0.1"
+export SUBSCRIBE_WAIT_SECONDS=8
+export NGREP_LOG_FILE="ngrep.log"
+export SUBSCRIBERS_FILE="usernames.txt"

+ 58 - 0
modules_k/presence_conference/test_framework/presence_tests/presence_subscribe_suite.sh

@@ -0,0 +1,58 @@
+#!/bin/bash
+
+source config.sh
+
+# some basic checking for dependencies
+if [[ `whoami` != "root" ]]; then
+	echo "you must be root to run the suite..."
+	exit $EXIT_FAILURE;
+fi
+if [[ ! `which sipp` ]]; then
+	echo "you do not have sipp installed..."
+	exit $EXIT_FAILURE;
+fi
+
+if [[ ! `which ngrep` ]]; then
+	echo "you do not have ngrep installed..."
+	exit $EXIT_FAILURE;
+fi
+
+if [[ ! $1 ]]; then
+	echo "please choose the number of subscribers..."
+	exit $EXIT_FAILURE;
+else
+	subscribers_no=$1
+fi
+
+echo "starting suite..."
+
+# truncating log file
+cat /dev/null > ${NGREP_LOG_FILE}
+# killing sipp
+killall -9 sipp &> /dev/null
+# running ngrep
+ngrep -d any -W byline port 5060 &> ${NGREP_LOG_FILE} &
+
+./clean_subscribers.sh && ./send_subscribe.sh ${subscribers_no} && ./send_publish.sh
+
+sleep ${SUBSCRIBE_WAIT_SECONDS}
+
+# killing ngrep
+killall -9 ngrep &> /dev/null
+
+# get the number of notifies sent
+notify_response=`grep 'NOTIFY sip' ${NGREP_LOG_FILE} | wc -l`
+
+echo received ${notify_response} responses...
+
+let responses=${subscribers_no}*${subscribers_no}+${subscribers_no}
+if [[ $responses == ${notify_response} ]]; then
+	echo that look\'s ok...
+else
+	echo should have received $responses responses...
+fi
+
+# cleaning up the sipp stderr output file
+sed s/'Resolving remote host.*'// -i errors.txt
+sed s/'Done.$'// -i errors.txt
+sed '/^$/d' -i errors.txt

+ 29 - 0
modules_k/presence_conference/test_framework/presence_tests/send_publish.sh

@@ -0,0 +1,29 @@
+#!/bin/bash
+
+source ../functions/register.sh
+source ../functions/subscribe.sh
+source ../functions/publish.sh
+source ../functions/notify.sh
+source ../functions/rand.sh
+
+EXPECTED_RETURN=0
+event="presence"
+start_port=8020
+content_type="application\/pidf+xml"
+usernames_txt="${SUBSCRIBERS_FILE}"
+if [[ $1 ]]; then
+	usernames_txt=$1
+fi
+line=`cat $usernames_txt | wc -l`
+line_no=`head -1 /dev/urandom | od -N 1 | awk '{ print $2 }'`
+let line_no=$line_no%$line
+let line_no=$line_no+1
+username=`cat $usernames_txt | sed -n ${line_no}p`
+for i in `seq 1 $line`; do
+	let start_port=$start_port+1
+	ADDITIONAL_PARAMETERS="-p $start_port -timeout 20"
+	notify > /dev/null 2>> errors.txt
+done
+sleep 4
+publish $event $content_type $username ${KAMAILIO_HOST}
+wait

+ 47 - 0
modules_k/presence_conference/test_framework/presence_tests/send_subscribe.sh

@@ -0,0 +1,47 @@
+#!/bin/bash -e
+
+source ../functions/register.sh
+source ../functions/subscribe.sh
+source ../functions/publish.sh
+source ../functions/notify.sh
+source ../functions/rand.sh
+
+cat /dev/null > errors.txt
+cat /dev/null > ${SUBSCRIBERS_FILE}
+
+EXPECTED_RETURN=0
+start_port=8020
+event="presence"
+content_type="application\/pidf+xml"
+control_port=40400
+mi_port=20200
+
+for i in `seq 1 $1`
+do 
+	let start_port=$start_port+1
+	let mi_port=$mi_port+100
+	let control_port=$control_port+100
+	usernames[$i]=$i
+	#`rand_md5` is more professional
+	local_ports[$i]=$start_port
+	mi_ports[$i]=$mi_port
+	control_ports[$i]=$control_port
+done
+
+k=0
+for i in `seq 1 $1`
+do
+	echo ${usernames[$i]} >> ${SUBSCRIBERS_FILE}
+	for j in `seq 1 $1`
+	do
+		echo subscribing ${usernames[$j]} to ${usernames[$i]} on port ${local_ports[$j]}...
+		ADDITIONAL_PARAMETERS=" -mi ${MI_HOST} -mp ${mi_ports[$j]} -cp ${control_ports[$j]} -p ${local_ports[$j]}"
+		subscribe $event $content_type ${usernames[$i]} ${KAMAILIO_HOST} 3600 ${usernames[$j]} 2>> errors.txt > /dev/null &
+		pids[$k]=$!
+		let k=$k+1
+	done
+	wait
+done
+
+wait
+