瀏覽代碼

modules/ims_usrloc_pcscf: Added DB support for persistent storage
- currently supports Write-Through mode.

Jason Penton 11 年之前
父節點
當前提交
d812a8ef9e

+ 56 - 5
modules/ims_usrloc_pcscf/doc/ims_usrloc_pcscf_admin.xml

@@ -72,7 +72,7 @@
 
       <para>The number of entries of the hash table used by to store the
       contact records is 2^hash_size. For hash_size=4, the number of entries
-      of the hash table is 16. </para>
+      of the hash table is 16.</para>
 
       <para><emphasis> Default value is 9. </emphasis></para>
 
@@ -90,7 +90,7 @@ modparam("ims_usrloc_pcscf", "hash_size", 10)
       <title>timer_interval (int)</title>
 
       <para>Number of seconds between two timer runs. The module uses timer to
-      delete expired contacts. </para>
+      delete expired contacts.</para>
 
       <para><emphasis> Default value is 60. </emphasis></para>
 
@@ -100,6 +100,57 @@ modparam("ims_usrloc_pcscf", "hash_size", 10)
         <programlisting format="linespecific">...
 modparam("ims_usrloc_pcscf", "timer_interval", 30)
 ...
+</programlisting>
+      </example>
+    </section>
+
+    <section>
+      <title>db_url (int)</title>
+
+      <para>Database URL</para>
+
+      <para><emphasis>Default value is
+      &ldquo;mysql://kamailio:kamailiorw@localhost/kamailio&rdquor;.</emphasis></para>
+
+      <example>
+        <title>Set db_url parameter</title>
+
+        <programlisting format="linespecific">...
+modparam("ims_usrloc_pcscf", "db_url", "dbdriver://username:password@dbhost/dbname")
+...
+</programlisting>
+      </example>
+    </section>
+
+    <section>
+      <title>db_mode (int)</title>
+
+      <para>This is the database mode to be used for the PCSCF usrloc data
+      persistent storage. Currently this module supports the Write-Through
+      scheme only.</para>
+
+      <itemizedlist>
+        <listitem>
+          <para>0 - This disables DB mode. Only memory will be used for usrloc
+          and data will not survive a restart.</para>
+        </listitem>
+
+        <listitem>
+          <para>1 - Write-Through Scheme. All changes to usrloc are
+          immediately refelcted in the database. This is slow but very
+          reliable. This mode will ensure that no registration data is lost as
+          a result of a restart or crash.</para>
+        </listitem>
+      </itemizedlist>
+
+      <para><emphasis>Default value is 0.</emphasis></para>
+
+      <example>
+        <title>Set db_mode parameter</title>
+
+        <programlisting format="linespecific">...
+modparam("ims_usrloc_pcscf", "db_mode", 1)
+...
 </programlisting>
       </example>
     </section>
@@ -109,8 +160,8 @@ modparam("ims_usrloc_pcscf", "timer_interval", 30)
     <title>Functions</title>
 
     <section>
-      <para>There are no exported functions that could be used in scripts.
-      </para>
+      <para>There are no exported functions that could be used in
+      scripts.</para>
     </section>
   </section>
 
@@ -129,7 +180,7 @@ modparam("ims_usrloc_pcscf", "timer_interval", 30)
   <section>
     <title>Statistics</title>
 
-    <para>Exported statistics are listed in the next sections. </para>
+    <para>Exported statistics are listed in the next sections.</para>
 
     <section>
       <title>registered contacts</title>

+ 9 - 1
modules/ims_usrloc_pcscf/pcontact.c

@@ -55,6 +55,9 @@
 #include "ul_callback.h"
 #include "usrloc.h"
 #include "../../lib/ims/useful_defs.h"
+#include "usrloc_db.h"
+
+extern int db_mode;
 
 /*! retransmission detection interval in seconds */
 int cseq_delay = 20;
@@ -126,7 +129,7 @@ int new_pcontact(struct udomain* _d, str* _contact, struct pcontact_info* _ci, s
 
 	(*_c)->aor.s = (char*) shm_malloc(_contact->len);
 	if ((*_c)->aor.s == 0) {
-		LM_ERR("no more share memory\n");
+		LM_ERR("no more shared memory\n");
 		shm_free(*_c);
 		*_c = 0;
 		return -2;
@@ -238,6 +241,11 @@ static inline void nodb_timer(pcontact_t* _c)
 		if (exists_ulcb_type(PCSCF_CONTACT_EXPIRE)) {
 			run_ul_callbacks(PCSCF_CONTACT_EXPIRE, _c);
 		}
+
+		if (db_mode == WRITE_THROUGH && db_delete_pcontact(_c) != 0) {
+			LM_ERR("Error deleting ims_usrloc_pcscf record in DB");
+		}
+
 		update_stat(_c->slot->d->expired, 1);
 		mem_delete_pcontact(_c->slot->d, _c);
 		return;

+ 290 - 0
modules/ims_usrloc_pcscf/udomain.c

@@ -56,9 +56,12 @@
 #include "usrloc.h"
 #include "utime.h"
 #include "usrloc.h"
+#include "usrloc_db.h"
 
 #include "../../lib/ims/useful_defs.h"
 
+extern int db_mode;
+
 #ifdef STATISTICS
 static char *build_stat_name( str* domain, char *var_name)
 {
@@ -303,6 +306,7 @@ void unlock_ulslot(udomain_t* _d, int i)
 #endif
 }
 
+//TODO: this should be removed...
 int update_rx_regsession(struct udomain* _d, str* session_id, struct pcontact* _c) {
 	if (session_id->len > 0 && session_id->s) {
 		if (_c->rx_session_id.len > 0 && _c->rx_session_id.s) {
@@ -385,7 +389,28 @@ int update_pcontact(struct udomain* _d, struct pcontact_info* _ci, struct pconta
 	if (_ci->received_port > 0) _c->received_port = _ci->received_port;
 	if (_ci->received_proto > 0) _c->received_proto = _ci->received_proto;
 
+	//update Rx reg session information
+	if (_ci->rx_regsession_id && _ci->rx_regsession_id->len>0 && _ci->rx_regsession_id->s) {
+		if (_c->rx_session_id.len > 0 && _c->rx_session_id.s) {
+			_c->rx_session_id.len = 0;
+			shm_free(_c->rx_session_id.s);
+		}
+		_c->rx_session_id.s = shm_malloc(_ci->rx_regsession_id->len);
+		if (!_c->rx_session_id.s) {
+			LM_ERR("no more shm_mem\n");
+			return -1;
+		}
+		memcpy(_c->rx_session_id.s, _ci->rx_regsession_id->s, _ci->rx_regsession_id->len);
+		_c->rx_session_id.len = _ci->rx_regsession_id->len;
+	}
+
 	//TODO: update path, etc
+
+	if (db_mode == WRITE_THROUGH && db_update_pcontact(_c) != 0) {
+		LM_ERR("Error updating record in DB");
+		return -1;
+	}
+
 	run_ul_callbacks(PCSCF_CONTACT_UPDATE, _c);
 	return 0;
 
@@ -402,6 +427,12 @@ int insert_pcontact(struct udomain* _d, str* _contact, struct pcontact_info* _ci
     if (exists_ulcb_type(PCSCF_CONTACT_INSERT)) {
 		run_ul_create_callbacks(*_c);
 	}
+
+	if (db_mode == WRITE_THROUGH && db_insert_pcontact(*_c) != 0) {
+		LM_ERR("error inserting contact into db");
+		goto error;
+	}
+
     return 0;
 
 error:
@@ -516,10 +547,269 @@ int delete_pcontact(udomain_t* _d, str* _aor, struct pcontact* _c)
 			return 0;
 		}
 	}
+
 	if (exists_ulcb_type(PCSCF_CONTACT_DELETE)) {
 		run_ul_callbacks(PCSCF_CONTACT_DELETE, _c);
 	}
 	mem_delete_pcontact(_d, _c);
 
+	if (db_mode == WRITE_THROUGH && db_delete_pcontact(_c) != 0) {
+		LM_ERR("Error deleting contact from DB");
+		return -1;
+	}
+
+	return 0;
+}
+
+/*!
+ * \brief Convert database values into pcontact_info
+ *
+ * Convert database values into pcontact_info,
+ * expects 10 rows (aor, contact, received, rx_session_id_col
+ * reg_state, expires, socket, service_routes_col, public_ids, path
+ * \param vals database values
+ * \param contact contact
+ * \return pointer to the ucontact_info on success, 0 on failure
+ */
+static inline pcontact_info_t* dbrow2info( db_val_t *vals, str *contact)
+{
+	static pcontact_info_t ci;
+	static str received, path, rx_session_id, implicit_impus, tmpstr, service_routes;
+	static str *impu_list, *service_route_list;
+	int flag=0, n;
+	char *p, *q=0;
+
+	memset( &ci, 0, sizeof(pcontact_info_t));
+
+	received.s = (char*) VAL_STRING(vals + 2);
+	if (VAL_NULL(vals+2) || !received.s || !received.s[0]) {
+		received.len = 0;
+		received.s = 0;
+	} else {
+		received.len = strlen(received.s);
+	}
+	ci.received_host = received;
+
+	rx_session_id.s = (char*) VAL_STRING(vals + 3);
+	if (VAL_NULL(vals+3) || !rx_session_id.s || !rx_session_id.s[0]) {
+		rx_session_id.len = 0;
+		rx_session_id.s = 0;
+		LM_DBG("2\n");
+	} else {
+		rx_session_id.len = strlen(rx_session_id.s);
+	}
+	ci.rx_regsession_id = &rx_session_id;
+	if (VAL_NULL(vals + 4)) {
+		LM_CRIT("empty registration state in DB\n");
+		return 0;
+	}
+	ci.reg_state = VAL_INT(vals + 4);
+	if (VAL_NULL(vals + 5)) {
+		LM_CRIT("empty expire\n");
+		return 0;
+	}
+	ci.expires = VAL_TIME(vals + 5);
+	path.s  = (char*)VAL_STRING(vals+9);
+		if (VAL_NULL(vals+9) || !path.s || !path.s[0]) {
+			path.len = 0;
+			path.s = 0;
+		} else {
+			path.len = strlen(path.s);
+		}
+	ci.path = &path;
+
+	//public IDs - implicit set
+	implicit_impus.s = (char*) VAL_STRING(vals + 8);
+	if (!VAL_NULL(vals + 8) && implicit_impus.s && implicit_impus.s[0]) {
+		//how many
+		n=0;
+		p = implicit_impus.s;
+		while (*p) {
+			if ((*p) == '<') {
+				n++;
+			}
+			p++;
+		}
+		impu_list = pkg_malloc(sizeof(str) * n);
+
+		n=0;
+		p = implicit_impus.s;
+		while (*p) {
+			if (*p == '<') {
+				q = p + 1;
+				flag = 1;
+			}
+			if (*p == '>') {
+				if (flag) {
+					tmpstr.s = q;
+					tmpstr.len = p - q;
+					impu_list[n++] = tmpstr;
+				}
+				flag = 0;
+			}
+			p++;
+		}
+		ci.num_public_ids = n;
+		ci.public_ids = impu_list;
+	}
+
+	//service routes
+	service_routes.s = (char*) VAL_STRING(vals + 7);
+	if (!VAL_NULL(vals + 7) && service_routes.s && service_routes.s[0]) {
+		//how many
+		n = 0;
+		p = service_routes.s;
+		while (*p) {
+			if ((*p) == '<') {
+				n++;
+			}
+			p++;
+		}
+		service_route_list = pkg_malloc(sizeof(str) * n);
+
+		n = 0;
+		p = service_routes.s;
+		while (*p) {
+			if (*p == '<') {
+				q = p + 1;
+				flag = 1;
+			}
+			if (*p == '>') {
+				if (flag) {
+					tmpstr.s = q;
+					tmpstr.len = p - q;
+					service_route_list[n++] = tmpstr;
+				}
+				flag = 0;
+			}
+			p++;
+		}
+		ci.num_service_routes = n;
+		ci.service_routes = service_route_list;
+	}
+
+	return &ci;
+}
+
+/*!
+ * \brief Load all records from a udomain
+ *
+ * Load all records from a udomain, useful to populate the
+ * memory cache on startup.
+ * \param _c database connection
+ * \param _d loaded domain
+ * \return 0 on success, -1 on failure
+ */
+int preload_udomain(db1_con_t* _c, udomain_t* _d)
+{
+	pcontact_info_t *ci;
+	db_row_t *row;
+	db_key_t columns[18];
+	db1_res_t* res = NULL;
+	str aor, contact;
+	int i, n;
+
+	pcontact_t* c;
+
+	LM_DBG("pre-loading domain from DB\n");
+
+	columns[0] = &domain_col;
+	columns[1] = &aor_col;
+	columns[2] = &contact_col;
+	columns[3] = &received_col;
+	columns[4] = &rx_session_id_col;
+	columns[5] = &reg_state_col;
+	columns[6] = &expires_col;
+	columns[7] = &socket_col;
+	columns[8] = &service_routes_col;
+	columns[9] = &public_ids_col;
+	columns[10] = &path_col;
+
+	if (ul_dbf.use_table(_c, _d->name) < 0) {
+		LM_ERR("sql use_table failed\n");
+		return -1;
+	}
+
+#ifdef EXTRA_DEBUG
+	LM_NOTICE("load start time [%d]\n", (int)time(NULL));
+#endif
+
+	if (DB_CAPABILITY(ul_dbf, DB_CAP_FETCH)) {
+		if (ul_dbf.query(_c, 0, 0, 0, columns, 0, 11, 0, 0) < 0) {
+			LM_ERR("db_query (1) failed\n");
+			return -1;
+		}
+		if(ul_dbf.fetch_result(_c, &res, ul_fetch_rows)<0) {
+			LM_ERR("fetching rows failed\n");
+			return -1;
+		}
+	} else {
+		if (ul_dbf.query(_c, 0, 0, 0, columns, 0, 11, 0, &res) < 0) {
+			LM_ERR("db_query failed\n");
+			return -1;
+		}
+	}
+
+	if (RES_ROW_N(res) == 0) {
+		LM_DBG("table is empty\n");
+		ul_dbf.free_result(_c, res);
+		return 0;
+	}
+
+	LM_DBG("%d rows returned in preload\n", RES_ROW_N(res));
+
+	n = 0;
+	do {
+		LM_DBG("loading records - cycle [%d]\n", ++n);
+		for(i = 0; i < RES_ROW_N(res); i++) {
+			row = RES_ROWS(res) + i;
+
+			aor.s = (char*) VAL_STRING(ROW_VALUES(row) + 1);
+			if (VAL_NULL(ROW_VALUES(row) + 1) || aor.s == 0 || aor.s[0] == 0) {
+				LM_CRIT("empty aor record in table %s...skipping\n", _d->name->s);
+				continue;
+			}
+			aor.len = strlen(aor.s);
+
+			ci = dbrow2info( ROW_VALUES(row)+1, &contact);
+			if (ci==0) {
+				LM_ERR("usrloc record for %.*s in table %s\n",
+						aor.len, aor.s, _d->name->s);
+				continue;
+			}
+			lock_udomain(_d, &aor);
+
+			if ( (mem_insert_pcontact(_d, &aor, ci, &c)) != 0) {
+				LM_ERR("inserting contact failed\n");
+				unlock_udomain(_d, &aor);
+				goto error1;
+			}
+			unlock_udomain(_d, &aor);
+		}
+
+		if (DB_CAPABILITY(ul_dbf, DB_CAP_FETCH)) {
+			if(ul_dbf.fetch_result(_c, &res, ul_fetch_rows)<0) {
+				LM_ERR("fetching rows (1) failed\n");
+				ul_dbf.free_result(_c, res);
+				return -1;
+			}
+		} else {
+			break;
+		}
+	} while(RES_ROW_N(res)>0);
+
+	ul_dbf.free_result(_c, res);
+
+#ifdef EXTRA_DEBUG
+	LM_NOTICE("load end time [%d]\n", (int)time(NULL));
+#endif
+
 	return 0;
+error1:
+	free_pcontact(c);
+
+	ul_dbf.free_result(_c, res);
+	return -1;
 }
+
+

+ 2 - 0
modules/ims_usrloc_pcscf/udomain.h

@@ -81,4 +81,6 @@ int get_pcontact_by_src(udomain_t* _d, str * _host, unsigned short _port, unsign
 int assert_identity(udomain_t* _d, str * _host, unsigned short _port, unsigned short _proto, str * _identity);
 int delete_pcontact(udomain_t* _d, str* _aor, struct pcontact* _r);
 
+int preload_udomain(db1_con_t* _c, udomain_t* _d);
+
 #endif

+ 74 - 2
modules/ims_usrloc_pcscf/ul_mod.c

@@ -57,6 +57,7 @@
 #include "ul_rpc.h"
 #include "ul_callback.h"
 #include "usrloc.h"
+#include "usrloc_db.h"
 
 MODULE_VERSION
 
@@ -75,10 +76,16 @@ extern int ul_locks_no;
  * Module parameters and their default values
  */
 str usrloc_debug_file = str_init(DEFAULT_DBG_FILE);
-int timer_interval  = 60;				/*!< Timer interval in seconds */
 int usrloc_debug 	= 0;
 int ul_hash_size = 9;
 int init_flag = 0;
+str db_url          = str_init(DEFAULT_DB_URL);	/*!< Database URL */
+int timer_interval  = 60;						/*!< Timer interval in seconds */
+int db_mode         = 0;						/*!< Database sync scheme: 0-no db, 1-write through, 2-write back, 3-only db */
+int ul_fetch_rows = 2000;
+
+db1_con_t* ul_dbh = 0;
+db_func_t ul_dbf; 
 
 /*! \brief
  * Exported functions
@@ -96,6 +103,10 @@ static param_export_t params[] = {
 	{"timer_interval",    INT_PARAM, &timer_interval  },
 	{"usrloc_debug_file", STR_PARAM, &usrloc_debug_file.s},
 	{"enable_debug_file", INT_PARAM, &usrloc_debug},
+
+	{"db_url",              STR_PARAM, &db_url.s        },
+	{"timer_interval",      INT_PARAM, &timer_interval  },
+	{"db_mode",             INT_PARAM, &db_mode         },
 	{0, 0, 0}
 };
 
@@ -160,6 +171,9 @@ static int mod_init(void) {
 		return -1;
 	}
 
+
+	db_url.len = strlen(db_url.s);
+
 	/* Regsiter RPC */
 	if (rpc_register_array(ul_rpc) != 0) {
 		LM_ERR("failed to register RPC commands\n");
@@ -176,13 +190,65 @@ static int mod_init(void) {
 		return -1;
 	}
 
+	/* Shall we use database ? */
+	if (db_mode != NO_DB) { /* Yes */
+		if(ul_fetch_rows<=0) {
+			LM_ERR("invalid fetch_rows number '%d'\n", ul_fetch_rows);
+			return -1;
+		}
+
+		if (init_db(&db_url, timer_interval, ul_fetch_rows) != 0) {
+			LM_ERR("Error initializing db connection\n");
+			return -1;
+		}
+		LM_DBG("Running in DB mode %i\n", db_mode);
+	}
+
 	init_flag = 1;
 
 	return 0;
 }
 
-static int child_init(int rank)
+static int child_init(int _rank)
 {
+	dlist_t* ptr;
+
+	/* connecting to DB ? */
+	switch (db_mode) {
+		case NO_DB:
+			return 0;
+		case WRITE_THROUGH:
+			/* connect to db only from SIP workers, TIMER and MAIN processes */
+			if (_rank<=0 && _rank!=PROC_TIMER && _rank!=PROC_MAIN)
+				return 0;
+			break;
+		case WRITE_BACK:
+			/* connect to db only from TIMER (for flush), from MAIN (for
+			 * final flush() and from child 1 for preload */
+			if (_rank!=PROC_TIMER && _rank!=PROC_MAIN && _rank!=PROC_SIPINIT)
+				return 0;
+			break;
+	}
+
+	LM_DBG("Connecting to usrloc_pcscf DB for rank %d\n", _rank);
+	if (connect_db(&db_url) != 0) {
+		LM_ERR("child(%d): failed to connect to database\n", _rank);
+		return -1;
+	}
+	/* _rank==PROC_SIPINIT is used even when fork is disabled */
+	if (_rank==PROC_SIPINIT && db_mode!=DB_ONLY) {
+		// if cache is used, populate domains from DB
+		for( ptr=root ; ptr ; ptr=ptr->next) {
+			LM_DBG("Preloading domain %.*s\n", ptr->name.len, ptr->name.s);
+			if (preload_udomain(ul_dbh, ptr->d) < 0) {
+				LM_ERR("child(%d): failed to preload domain '%.*s'\n",
+						_rank, ptr->name.len, ZSW(ptr->name.s));
+				return -1;
+			}
+//			uldb_preload_attrs(ptr->d);
+		}
+	}
+
 	return 0;
 }
 
@@ -196,6 +262,12 @@ static void destroy(void)
 
 	/* free callbacks list */
 	destroy_ulcb_list();
+
+	free_service_route_buf();
+	free_impu_buf();
+
+	if (db_mode)
+		destroy_db();
 }
 
 

+ 2 - 2
modules/ims_usrloc_pcscf/ul_rpc.c

@@ -58,13 +58,13 @@ static const char* ul_rpc_dump_doc[2] = {
 static void ul_rpc_dump(rpc_t* rpc, void* ctx) {
 	dlist_t* dl;
 	udomain_t* dom;
-	time_t t;
+//	time_t t;
 	void* th;
 	void* ah;
 	void* sh;
 	int max, n, i;
 
-	t = time(0);
+//	t = time(0);
 	for (dl = root; dl; dl = dl->next) {
 		dom = dl->d;
 		if (rpc->add(ctx, "{", &th) < 0) {

+ 6 - 1
modules/ims_usrloc_pcscf/usrloc.h

@@ -54,6 +54,11 @@
 #include "../../modules/tm/dlg.h"
 #include "../cdp/diameter_ims_code_avp.h"
 
+#define NO_DB         0
+#define WRITE_THROUGH 1
+//#define WRITE_BACK    2		//not implemented yet
+//#define DB_ONLY		  3		//not implemented yet
+
 #define VALID_CONTACT(c, t)   ((c->expires>t) || (c->expires==0))
 
 struct hslot;		/*!< Hash table slot */
@@ -142,7 +147,7 @@ static inline char* reg_state_to_string(enum pcontact_reg_states reg_state) {
 			return "registration pending";
 		case PCONTACT_DEREGISTERED:
 			return "unregistered";
-                case PCONTACT_DEREG_PENDING_PUBLISH:
+		case PCONTACT_DEREG_PENDING_PUBLISH:
 			return "deregistration pending, publish sent";
 		case PCONTACT_REG_PENDING_AAR:
 			return "registration pending, aar sent";

+ 391 - 0
modules/ims_usrloc_pcscf/usrloc_db.c

@@ -0,0 +1,391 @@
+/*
+ * usrloc_db.c
+ *
+ *  Created on: Nov 11, 2013
+ *      Author: carlos
+ */
+
+#include "../../lib/srdb1/db.h"
+#include "usrloc.h"
+#include "usrloc_db.h"
+
+str id_col	        	= str_init(ID_COL);
+str domain_col			= str_init(DOMAIN_COL);
+str aor_col		    	= str_init(AOR_COL);
+str contact_col		    = str_init(CONTACT_COL);
+str received_col    	= str_init(RECEIVED_COL);
+str received_port_col	= str_init(RECEIVED_PORT_COL);
+str received_proto_col	= str_init(RECEIVED_PROTO_COL);
+str path_col			= str_init(PATH_COL);
+str rx_session_id_col	= str_init(RX_SESSION_ID_COL);
+str reg_state_col		= str_init(REG_STATE_COL);
+str expires_col			= str_init(EXPIRES_COL);
+str service_routes_col	= str_init(SERVICE_ROUTES_COL);
+str socket_col			= str_init(SOCKET_COL);
+str public_ids_col		= str_init(PUBLIC_IDS_COL);
+
+t_reusable_buffer service_route_buffer = {0,0,0};
+t_reusable_buffer impu_buffer = {0,0,0};
+
+int connect_db(const str *db_url)
+{
+	if (ul_dbh) {	/* we've obviously already connected... */
+		LM_WARN("DB connection already open... continuing\n");
+		return 0;
+	}
+
+	if ((ul_dbh = ul_dbf.init(db_url)) == 0)
+		return -1;
+
+	LM_DBG("Successfully connected to DB and returned DB handle ptr %p\n", ul_dbh);
+	return 0;
+}
+
+int init_db(const str *db_url, int db_update_period, int fetch_num_rows)
+{
+	/* Find a database module */
+	if (db_bind_mod(db_url, &ul_dbf) < 0){
+		LM_ERR("Unable to bind to a database driver\n");
+		return -1;
+	}
+
+	if (connect_db(db_url)!=0){
+		LM_ERR("unable to connect to the database\n");
+		return -1;
+	}
+
+	if (!DB_CAPABILITY(ul_dbf, DB_CAP_ALL)) {
+		LM_ERR("database module does not implement all functions needed by the module\n");
+		return -1;
+	}
+
+	ul_dbf.close(ul_dbh);
+	ul_dbh = 0;
+
+	return 0;
+}
+
+void destroy_db()
+{
+	/* close the DB connection */
+	if (ul_dbh) {
+		ul_dbf.close(ul_dbh);
+		ul_dbh = 0;
+	}
+}
+
+int use_location_pcscf_table(str* domain)
+{
+	if(!ul_dbh){
+		LM_ERR("invalid database handle\n");
+		return -1;
+	}
+
+	if (ul_dbf.use_table(ul_dbh, domain) < 0) {
+		LM_ERR("Error in use_table\n");
+		return -1;
+	}
+
+	return 0;
+}
+
+int db_update_pcontact(pcontact_t* _c)
+{
+	str impus, service_routes;
+
+	db_val_t match_values[1];
+	db_key_t match_keys[1] = { &aor_col };
+	db_key_t update_keys[8] = { &expires_col, &reg_state_col,
+								&service_routes_col, &received_col,
+								&received_port_col, &received_proto_col,
+								&rx_session_id_col, &public_ids_col };
+	db_val_t values[8];
+
+	LM_DBG("updating pcontact: %.*s\n", _c->aor.len, _c->aor.s);
+
+	VAL_TYPE(match_values) = DB1_STR;
+
+	VAL_NULL(match_values) = 0;
+	VAL_STR(match_values) = _c->aor;
+
+	if (use_location_pcscf_table(_c->domain) < 0) {
+		LM_ERR("Error trying to use table %.*s\n", _c->domain->len, _c->domain->s);
+		return -1;
+	}
+
+	VAL_TYPE(values) 	= DB1_DATETIME;
+	VAL_TIME(values)	= _c->expires;
+	VAL_NULL(values) 	= 0;
+
+	VAL_TYPE(values + 1)= DB1_INT;
+	VAL_NULL(values + 1)= 0;
+	VAL_INT(values + 1)	= _c->reg_state;
+
+	str empty_str = str_init("");
+	if (_c->service_routes) {
+		service_routes.len = service_routes_as_string(_c, &service_route_buffer);
+		service_routes.s = service_route_buffer.buf;
+	}
+	SET_STR_VALUE(values + 2, (_c->service_routes)?service_routes:empty_str);
+	VAL_TYPE(values + 2) = DB1_STR;
+	VAL_NULL(values + 2) = 0;
+
+	SET_STR_VALUE(values + 3, _c->received_host);
+	VAL_TYPE(values + 3) = DB1_STR;
+	VAL_NULL(values + 3) = 0;
+
+	VAL_TYPE(values + 4)= DB1_INT;
+	VAL_NULL(values + 4)= 0;
+	VAL_INT(values + 4)	= _c->received_port;
+
+	VAL_TYPE(values + 5)= DB1_INT;
+	VAL_NULL(values + 5)= 0;
+	VAL_INT(values + 5)	= _c->received_proto;
+
+	VAL_TYPE(values + 6) = DB1_STR;
+	SET_PROPER_NULL_FLAG(_c->rx_session_id, values, 6);
+	LM_DBG("Trying to set rx session id: %.*s\n", _c->rx_session_id.len, _c->rx_session_id.s);
+	SET_STR_VALUE(values + 6, _c->rx_session_id);
+
+	/* add the public identities */
+	impus.len = impus_as_string(_c, &impu_buffer);
+	impus.s = impu_buffer.buf;
+	VAL_TYPE(values + 7) = DB1_STR;
+	SET_PROPER_NULL_FLAG(impus, values, 7);
+	SET_STR_VALUE(values + 7, impus);
+
+	if((ul_dbf.update(ul_dbh, match_keys, NULL, match_values, update_keys,values, 1, 8)) !=0){
+		LM_ERR("could not update database info\n");
+	    return -1;
+	}
+
+	if (ul_dbf.affected_rows && ul_dbf.affected_rows(ul_dbh) == 0) {
+		LM_DBG("no existing rows for an update... doing insert\n");
+		if (db_insert_pcontact(_c) != 0) {
+			LM_ERR("Failed to insert a pcontact on update\n");
+		}
+	}
+
+	return 0;
+}
+
+int db_delete_pcontact(pcontact_t* _c)
+{
+	LM_DBG("Trying to delete contact: %.*s\n", _c->aor.len, _c->aor.s);
+	db_val_t values[1];
+	db_key_t match_keys[1] = { &aor_col};
+
+	VAL_TYPE(values) = DB1_STR;
+	VAL_NULL(values) = 0;
+	SET_STR_VALUE(values, _c->aor);
+
+	if (use_location_pcscf_table(_c->domain) < 0) {
+		LM_ERR("Error trying to use table %.*s\n", _c->domain->len, _c->domain->s);
+		return -1;
+	}
+
+    if(ul_dbf.delete(ul_dbh, match_keys, 0, values, 1) < 0) {
+    	LM_ERR("Failed to delete database information: aor[%.*s], rx_session_id=[%.*s]\n",
+    													_c->aor.len, _c->aor.s,
+    													_c->rx_session_id.len, _c->rx_session_id.s);
+        return -1;
+    }
+
+    return 0;
+}
+
+int db_insert_pcontact(struct pcontact* _c)
+{
+	str empty_str = str_init("");
+	str impus, service_routes;
+
+	db_key_t keys[13] = {
+							&domain_col,
+				&aor_col, 			&contact_col,
+				&received_col,
+				&received_port_col,	&received_proto_col,
+				&path_col,			&rx_session_id_col,
+				&reg_state_col,
+				&expires_col,		&service_routes_col,
+				&socket_col,		&public_ids_col
+	};
+	db_val_t values[13];
+
+	VAL_TYPE(GET_FIELD_IDX(values, LP_DOMAIN_IDX)) = DB1_STR;
+	VAL_TYPE(GET_FIELD_IDX(values, LP_AOR_IDX)) = DB1_STR;
+	VAL_TYPE(GET_FIELD_IDX(values, LP_CONTACT_IDX)) = DB1_STR;
+	VAL_TYPE(GET_FIELD_IDX(values, LP_RECEIVED_IDX)) = DB1_STR;
+	VAL_TYPE(GET_FIELD_IDX(values, LP_RECEIVED_PORT_IDX)) = DB1_INT;
+	VAL_TYPE(GET_FIELD_IDX(values, LP_RECEIVED_PROTO_IDX)) = DB1_INT;
+	VAL_TYPE(GET_FIELD_IDX(values, LP_PATH_IDX)) = DB1_STR;
+	VAL_TYPE(GET_FIELD_IDX(values, LP_RX_SESSION_ID_IDX)) = DB1_STR;
+	VAL_TYPE(GET_FIELD_IDX(values, LP_REG_STATE_IDX)) = DB1_INT;
+	VAL_TYPE(GET_FIELD_IDX(values, LP_EXPIRES_IDX)) = DB1_DATETIME;
+	VAL_TYPE(GET_FIELD_IDX(values, LP_SERVICE_ROUTES_IDX)) = DB1_STR;
+	VAL_TYPE(GET_FIELD_IDX(values, LP_SOCKET_IDX)) = DB1_STR;
+	VAL_TYPE(GET_FIELD_IDX(values, LP_PUBLIC_IPS_IDX)) = DB1_STR;
+
+	SET_STR_VALUE(GET_FIELD_IDX(values, LP_DOMAIN_IDX), (*_c->domain));
+	SET_STR_VALUE(GET_FIELD_IDX(values, LP_AOR_IDX), _c->aor);	//TODO: need to clean AOR
+	SET_STR_VALUE(GET_FIELD_IDX(values, LP_CONTACT_IDX), _c->aor);
+	SET_STR_VALUE(GET_FIELD_IDX(values, LP_RECEIVED_IDX), _c->received_host);
+
+	SET_PROPER_NULL_FLAG((*_c->domain), values, LP_DOMAIN_IDX);
+	SET_PROPER_NULL_FLAG(_c->aor, values, LP_AOR_IDX);
+	SET_PROPER_NULL_FLAG(_c->received_host, values, LP_RECEIVED_IDX);
+
+	VAL_INT(GET_FIELD_IDX(values, LP_RECEIVED_PORT_IDX)) = _c->received_port;
+	VAL_INT(GET_FIELD_IDX(values, LP_RECEIVED_PROTO_IDX)) = _c->received_proto;
+
+	SET_STR_VALUE(GET_FIELD_IDX(values, LP_PATH_IDX), _c->path);
+	SET_STR_VALUE(GET_FIELD_IDX(values, LP_RX_SESSION_ID_IDX), _c->rx_session_id);
+
+	SET_PROPER_NULL_FLAG(_c->path, values, LP_PATH_IDX);
+	SET_PROPER_NULL_FLAG(_c->rx_session_id, values, LP_RX_SESSION_ID_IDX);
+
+	VAL_DOUBLE(GET_FIELD_IDX(values, LP_REG_STATE_IDX)) = _c->reg_state;
+	VAL_TIME(GET_FIELD_IDX(values, LP_EXPIRES_IDX)) = _c->expires;
+
+	SET_STR_VALUE(GET_FIELD_IDX(values, LP_SERVICE_ROUTES_IDX), _c->service_routes?(*_c->service_routes):empty_str);
+	VAL_NULL(GET_FIELD_IDX(values, LP_SERVICE_ROUTES_IDX)) = 1;
+	SET_STR_VALUE(GET_FIELD_IDX(values, LP_SOCKET_IDX), _c->sock?_c->sock->sock_str:empty_str);
+	VAL_NULL(GET_FIELD_IDX(values, LP_SOCKET_IDX)) = 1;
+
+	if (_c->service_routes) {
+		SET_PROPER_NULL_FLAG((*_c->service_routes), values, LP_SERVICE_ROUTES_IDX);
+	}
+	else {
+		VAL_NULL(GET_FIELD_IDX(values, LP_SERVICE_ROUTES_IDX)) = 1;
+	}
+
+	if (_c->sock) {
+		SET_PROPER_NULL_FLAG(_c->sock->sock_str, values, LP_SOCKET_IDX);
+	} else {
+		VAL_NULL(GET_FIELD_IDX(values, LP_SOCKET_IDX)) = 1;
+	}
+
+	/* add the public identities */
+	impus.len = impus_as_string(_c, &impu_buffer);
+	impus.s = impu_buffer.buf;
+	SET_PROPER_NULL_FLAG(impus, values, LP_PUBLIC_IPS_IDX);
+	SET_STR_VALUE(GET_FIELD_IDX(values, LP_PUBLIC_IPS_IDX), impus);
+
+	/* add service routes */
+	service_routes.len = service_routes_as_string(_c, &service_route_buffer);
+	service_routes.s = service_route_buffer.buf;
+	SET_PROPER_NULL_FLAG(service_routes, values, LP_PUBLIC_IPS_IDX);
+	SET_STR_VALUE(GET_FIELD_IDX(values, LP_PUBLIC_IPS_IDX), service_routes);
+
+	if (use_location_pcscf_table(_c->domain) < 0) {
+		LM_ERR("Error trying to use table %.*s\n", _c->domain->len, _c->domain->s);
+		return -1;
+	}
+
+	if (ul_dbf.insert(ul_dbh, keys, values, 13) < 0) {
+		LM_ERR("inserting contact in db failed\n");
+		return -1;
+	}
+
+	return 0;
+}
+
+/* take a contact structure and a pointer to some memory and returns a list of public identities in the format
+ * <impu1><impu2>....<impu(n)>
+ * make sure p already has memory allocated
+ * returns the length of the string (list)
+ * the string list itself will be available in p
+ */
+int impus_as_string(struct pcontact* _c, t_reusable_buffer* buffer) {
+	ppublic_t* impu;
+	int len = 0;
+	char *p;
+
+	impu = _c->head;
+	while (impu) {
+		len += 2 + impu->public_identity.len;
+		impu = impu->next;
+	}
+
+	if (!buffer->buf || buffer->buf_len == 0 || len > buffer->buf_len) {
+		if (buffer->buf) {
+			pkg_free(buffer->buf);
+		}
+		buffer->buf = (char*) pkg_malloc(len);
+		if (!buffer->buf) {
+			LM_CRIT("unable to allocate pkg memory\n");
+			return 0;
+		}
+		buffer->buf_len = len;
+	}
+
+	impu = _c->head;
+	p = buffer->buf;
+	while (impu) {
+		*p++ = '<';
+		memcpy(p, impu->public_identity.s, impu->public_identity.len);
+		p += impu->public_identity.len;
+		*p++ = '>';
+		impu = impu->next;
+	}
+
+	return len;
+}
+
+/* take a contact structure and a pointer to some memory and returns a list of public identities in the format
+ * <impu1><impu2>....<impu(n)>
+ * make sure p already has memory allocated
+ * returns the length of the string (list)
+ * the string list itself will be available in p
+ */
+int service_routes_as_string(struct pcontact* _c, t_reusable_buffer *buffer) {
+	int i;
+	int len = 0;
+	char *p;
+	for (i=0; i<_c->num_service_routes; i++) {
+		len += 2 + _c->service_routes[i].len;
+	}
+
+	if (!buffer->buf || buffer->buf_len==0 || len > buffer->buf_len) {
+		if (buffer->buf) {
+			pkg_free(buffer->buf);
+		}
+		buffer->buf = (char*)pkg_malloc(len);
+		if (!buffer->buf) {
+			LM_CRIT("unable to allocate pkg memory\n");
+			return 0;
+		}
+		buffer->buf_len = len;
+	}
+
+	p = buffer->buf;
+	for (i=0; i<_c->num_service_routes; i++) {
+		*p = '<';
+		p++;
+		memcpy(p, _c->service_routes[i].s, _c->service_routes[i].len);
+		p+=_c->service_routes[i].len;
+		*p = '>';
+		p++;
+	}
+
+	return len;
+}
+
+void free_service_route_buf(void)
+{
+	if (service_route_buffer.buf) {
+		pkg_free(service_route_buffer.buf);
+		service_route_buffer.data_len = 0;
+		service_route_buffer.buf_len = 0;
+		service_route_buffer.buf = 0;
+	}
+}
+
+void free_impu_buf(void) {
+	if (impu_buffer.buf) {
+		pkg_free(impu_buffer.buf);
+		impu_buffer.data_len = 0;
+		impu_buffer.buf_len = 0;
+		impu_buffer.buf = 0;
+	}
+}
+

+ 106 - 0
modules/ims_usrloc_pcscf/usrloc_db.h

@@ -0,0 +1,106 @@
+/*
+ * usrloc_db.h
+ *
+ *  Created on: Nov 11, 2013
+ *      Author: carlos
+ */
+
+#ifndef USRLOC_DB_H_
+#define USRLOC_DB_H_
+
+typedef enum location_pcscf_fields_idx {
+//	LP_ID_IDX = 0,
+	LP_DOMAIN_IDX = 0,
+	LP_AOR_IDX,
+	LP_CONTACT_IDX,
+	LP_RECEIVED_IDX,
+	LP_RECEIVED_PORT_IDX,
+	LP_RECEIVED_PROTO_IDX,
+	LP_PATH_IDX,
+	LP_RX_SESSION_ID_IDX,
+	LP_REG_STATE_IDX,
+	LP_EXPIRES_IDX,
+	LP_SERVICE_ROUTES_IDX,
+	LP_SOCKET_IDX,
+	LP_PUBLIC_IPS_IDX,
+
+} location_pcscf_fields_idx_t;
+
+#define GET_FIELD_IDX(_val, _idx)\
+						(_val + _idx)
+
+#define SET_PROPER_NULL_FLAG(_str, _vals, _index)\
+	do{\
+		if( (_str).len == 0)\
+			VAL_NULL( (_vals)+(_index) ) = 1;\
+		else\
+			VAL_NULL( (_vals)+(_index) ) = 0;\
+	}while(0);
+
+#define SET_STR_VALUE(_val, _str)\
+	do{\
+			VAL_STR((_val)).s 		= (_str).s;\
+			VAL_STR((_val)).len 	= (_str).len;\
+	}while(0);
+
+#define SET_NULL_FLAG(_vals, _i, _max, _flag)\
+	do{\
+		for((_i) = 0;(_i)<(_max); (_i)++)\
+			VAL_NULL((_vals)+(_i)) = (_flag);\
+	}while(0);
+
+
+#define ID_COL				"id"
+#define DOMAIN_COL			"domain"
+#define AOR_COL				"aor"
+#define CONTACT_COL			"contact"
+#define RECEIVED_COL		"received"
+#define RECEIVED_PORT_COL	"received_port"
+#define RECEIVED_PROTO_COL	"received_proto"
+#define PATH_COL			"path"
+#define RX_SESSION_ID_COL	"rx_session_id"
+#define REG_STATE_COL		"reg_state"
+#define EXPIRES_COL			"expires"
+#define SERVICE_ROUTES_COL	"service_routes"
+#define SOCKET_COL			"socket"
+#define PUBLIC_IDS_COL		"public_ids"
+
+extern db1_con_t* ul_dbh;
+extern db_func_t ul_dbf;
+
+extern str id_col;
+extern str domain_col;
+extern str aor_col;
+extern str contact_col;
+extern str received_col;
+extern str received_port_col;
+extern str received_proto_col;
+extern str path_col;
+extern str rx_session_id_col;
+extern str reg_state_col;
+extern str expires_col;
+extern str service_routes_col;
+extern str socket_col;
+extern str public_ids_col;
+
+typedef struct reusable_buffer{
+	char* buf;
+	int buf_len;
+	int data_len;
+} t_reusable_buffer;
+
+int use_location_pcscf_table();
+void destroy_db();
+int init_db(const str *db_url, int db_update_period, int fetch_num_rows);
+int connect_db(const str *db_url);
+
+int impus_as_string(struct pcontact* _c, t_reusable_buffer* buffer);
+int service_routes_as_string(struct pcontact* _c, t_reusable_buffer *buffer);
+void free_service_route_buf(void);
+void free_impu_buf(void);
+
+int db_insert_pcontact(pcontact_t* _c);
+int db_delete_pcontact(pcontact_t* _c);
+int db_update_pcontact(pcontact_t* _c);
+
+#endif /* USRLOC_DB_H_ */