Преглед на файлове

modules_k/pua: Fixed race hazards relating to RLS back-end SUBSCRIBEs

- These resulted in the "no presence dialog record for non-TERMINATED state..."
  error message coming out of RLS a lot.
- When in DB only mode if we receive a back-end NOTIFY we look for a matching
  dialog.  If we don't find one we search the DB again for a matching temporary
  dialog.  You can get the situation where both DB queries fail because a 200
  OK to a SUBSCRIBE is processed and the dialog made "complete" between the
  two searches.  This is now fixed.
- On the sending side (for both hash table and DB only mode) you have can have
  two dialogs (one temporary and one full) stored for a short period of time.
  This is because the full dialog is written before the temporary one is
  deleted.  This can make the lookup when a back-end NOTIFY is received fail
  because only one record is expected.  This is now fixed - instead of
  inserting and then deleting we do a swap (while the hash table is locked) and
  an update operation on the DB.
pd преди 13 години
родител
ревизия
e627bc3177
променени са 5 файла, в които са добавени 226 реда и са изтрити 69 реда
  1. 32 16
      modules_k/pua/hash.c
  2. 1 0
      modules_k/pua/hash.h
  3. 175 25
      modules_k/pua/pua_db.c
  4. 1 0
      modules_k/pua/pua_db.h
  5. 17 28
      modules_k/pua/send_subscribe.c

+ 32 - 16
modules_k/pua/hash.c

@@ -235,10 +235,9 @@ void update_htable(ua_pres_t* p, time_t desired_expires, int expires,
 	}
 }
 /* insert in front; so when searching the most recent result is returned*/
-void insert_htable(ua_pres_t* presentity)
+void _insert_htable(ua_pres_t* presentity, unsigned int hash_code)
 {
 	ua_pres_t* p= NULL;
-	unsigned int hash_code;
 
 	if (dbmode==PUA_DB_ONLY)
 	{
@@ -246,28 +245,24 @@ void insert_htable(ua_pres_t* presentity)
 		return;
 	}
 
-	hash_code= core_hash(presentity->pres_uri,presentity->watcher_uri, 
-			HASH_SIZE);
-	
-	lock_get(&HashT->p_records[hash_code].lock);
-
-/*	
- *	useless since always checking before calling insert
-	if(get_dialog(presentity, hash_code)!= NULL )
-	{
-		LM_DBG("Dialog already found- do not insert\n");
-		return; 
-	}
-*/	
 	p= HashT->p_records[hash_code].entity;
 
 	presentity->db_flag= INSERTDB_FLAG;
 	presentity->next= p->next;
 	
 	p->next= presentity;
+}
 
-	lock_release(&HashT->p_records[hash_code].lock);
+void insert_htable(ua_pres_t* presentity)
+{
+	int hash_code;
 
+	hash_code= core_hash(presentity->pres_uri,presentity->watcher_uri, HASH_SIZE);
+	lock_get(&HashT->p_records[hash_code].lock);
+
+	_insert_htable(presentity, hash_code);
+
+	lock_release(&HashT->p_records[hash_code].lock);
 }
 
 /* This function used to perform a search to find the hash table
@@ -340,6 +335,27 @@ void destroy_htable(void)
   return;
 }
 
+int convert_temporary_dialog(ua_pres_t *dialog)
+{
+	ua_pres_t *temp_dialog;
+	unsigned int hash_code;
+
+	hash_code= core_hash(dialog->pres_uri,dialog->watcher_uri, HASH_SIZE); 
+	lock_get(&HashT->p_records[hash_code].lock);
+
+	temp_dialog = get_temporary_dialog(dialog, hash_code);
+	if (temp_dialog)
+		delete_htable(temp_dialog, hash_code);
+	else
+		return -1;
+
+	_insert_htable(dialog, hash_code);
+
+	lock_release(&HashT->p_records[hash_code].lock);
+
+	return 1;
+}
+
 /* must lock the record line before calling this function*/
 ua_pres_t* get_dialog(ua_pres_t* dialog, unsigned int hash_code)
 {

+ 1 - 0
modules_k/pua/hash.h

@@ -126,6 +126,7 @@ int is_dialog(ua_pres_t* dialog);
 
 ua_pres_t* get_dialog(ua_pres_t* dialog, unsigned int hash_code);
 ua_pres_t* get_temporary_dialog(ua_pres_t* dialog, unsigned int hash_code);
+int convert_temporary_dialog(ua_pres_t *dialog);
 
 int get_record_id(ua_pres_t* dialog, str** rec_id);
 typedef int (*get_record_id_t)(ua_pres_t* dialog, str** rec_id);

+ 175 - 25
modules_k/pua/pua_db.c

@@ -781,10 +781,11 @@ int get_record_id_puadb(ua_pres_t *pres, str **rec_id )
 	db_op_t  q_ops[20];
 	int n_query_cols=0, n_res_cols=0;
 	int puri_col,callid_col;
-	int watcher_col, totag_col, fromtag_col;
+	int watcher_col, totag_col, fromtag_col, to_tag_res_col;
 	int pres_id_col;	
 	db_val_t *values;	
-	str *id;	
+	str *id;
+	str to_tag;
 
 	if (pres==NULL)
 	{
@@ -824,6 +825,14 @@ int get_record_id_puadb(ua_pres_t *pres, str **rec_id )
 	q_ops[callid_col] = OP_EQ;
 	n_query_cols++;
 
+	q_cols[fromtag_col= n_query_cols] = &str_from_tag_col;	
+	q_vals[fromtag_col].type = DB1_STR;
+	q_vals[fromtag_col].nul = 0;
+	q_vals[fromtag_col].val.str_val.s = pres->from_tag.s;
+	q_vals[fromtag_col].val.str_val.len = pres->from_tag.len;
+	q_ops[fromtag_col] = OP_EQ;
+	n_query_cols++;
+
 	q_cols[totag_col= n_query_cols] = &str_to_tag_col;	
 	q_vals[totag_col].type = DB1_STR;
 	q_vals[totag_col].nul = 0;
@@ -832,13 +841,6 @@ int get_record_id_puadb(ua_pres_t *pres, str **rec_id )
 	q_ops[totag_col] = OP_EQ;
 	n_query_cols++;
 
-	q_cols[fromtag_col= n_query_cols] = &str_from_tag_col;	
-	q_vals[fromtag_col].type = DB1_STR;
-	q_vals[fromtag_col].nul = 0;
-	q_vals[fromtag_col].val.str_val.s = pres->from_tag.s;
-	q_vals[fromtag_col].val.str_val.len = pres->from_tag.len;
-	q_ops[fromtag_col] = OP_EQ;
-	n_query_cols++;
 
 	res_cols[n_res_cols] = &str_id_col;	
 	n_res_cols++;
@@ -863,15 +865,22 @@ int get_record_id_puadb(ua_pres_t *pres, str **rec_id )
 
 	nr_rows = RES_ROW_N(res);
 
-	if (nr_rows == 0)
+	switch (nr_rows)
 	{
+	case 1:
+		rows = RES_ROWS(res);
+		values = ROW_VALUES(rows);
+		break;
+
+	case 0:
 		/* no match */
 		LM_DBG("No rows found. Looking for temporary dialog\n");
 		pua_dbf.free_result(pua_db, res);
 
-		q_vals[totag_col].val.str_val.s = "";
-		q_vals[totag_col].val.str_val.len = 0;
+		n_query_cols--;
 
+		res_cols[to_tag_res_col= n_res_cols] = &str_to_tag_col;
+		n_res_cols++;
 
 		if(pua_dbf.query(pua_db, q_cols, q_ops, q_vals,
 			res_cols,n_query_cols,n_res_cols,0,&res) < 0)
@@ -882,30 +891,41 @@ int get_record_id_puadb(ua_pres_t *pres, str **rec_id )
 
 		nr_rows = RES_ROW_N(res);
 
-		if (nr_rows == 0)
+		if (nr_rows == 1)
+		{
+			rows = RES_ROWS(res);
+			values = ROW_VALUES(rows);
+
+			to_tag.s = (char *) VAL_STRING(values + 2);
+			to_tag.len = strlen(to_tag.s);
+
+			if (to_tag.len == 0 ||
+				(to_tag.len > 0
+				 && strncmp(to_tag.s, pres->to_tag.s, pres->to_tag.len) == 0))
+			{
+				LM_DBG( "Found a (possibly temporary) Dialog\n" );
+				break;
+			}
+			else
+				LM_WARN("Failed to find temporary dialog for To-tag: %.*s, found To-tag: %.*s\n",
+					pres->to_tag.len, pres->to_tag.s, to_tag.len, to_tag.s);
+		}
+
+		if (nr_rows <= 1)
 		{
-			LM_DBG( "Temporary Dialog Not found\n" );
+			LM_DBG("Dialog not found\n" );
 			pua_dbf.free_result(pua_db, res);
 			return(0);
 		}
 
-		LM_DBG( "Found a temporary Dialog\n" );
-	}
+		/* Fall-thru */
 
-	if (nr_rows != 1)
-	{
+	default:
 		LM_ERR("Too many rows found (%d)\n", nr_rows);
 		pua_dbf.free_result(pua_db, res);
 		return(-1);
 	}
 
-
-	/* get the results and fill in return data structure */
-	rows = RES_ROWS(res);
-	values = ROW_VALUES(rows);
-
-	/* LM_ERR("db_id= %d\n", VAL_INT(values) ); */
-
 	id= (str*)pkg_malloc(sizeof(str));
 
 	if(id== NULL)
@@ -936,7 +956,137 @@ int get_record_id_puadb(ua_pres_t *pres, str **rec_id )
 }
 
 /******************************************************************************/
+int convert_temporary_dialog_puadb(ua_pres_t *pres)
+{
+	db_key_t query_cols[5], data_cols[10];
+	db_val_t query_vals[5], data_vals[10];
+	int n_query_cols = 0, n_data_cols = 0;
+
+	if (pres==NULL)
+	{
+		LM_ERR("called with NULL param\n");
+		return(-1);
+	}
+
+	/* The columns I need to query to find the temporary dialog */
+        query_cols[n_query_cols] = &str_pres_id_col;
+        query_vals[n_query_cols].type = DB1_STR;
+        query_vals[n_query_cols].nul = 0;
+        query_vals[n_query_cols].val.str_val.s = pres->id.s;
+        query_vals[n_query_cols].val.str_val.len = pres->id.len;
+        n_query_cols++;
+
+	query_cols[n_query_cols] = &str_pres_uri_col;
+	query_vals[n_query_cols].type = DB1_STR;
+	query_vals[n_query_cols].nul = 0;
+	query_vals[n_query_cols].val.str_val.s = pres->pres_uri->s;
+	query_vals[n_query_cols].val.str_val.len = pres->pres_uri->len;
+	n_query_cols++;
+
+	query_cols[n_query_cols] = &str_watcher_uri_col;
+	query_vals[n_query_cols].type = DB1_STR;
+	query_vals[n_query_cols].nul = 0;
+	query_vals[n_query_cols].val.str_val.s = pres->watcher_uri->s;
+	query_vals[n_query_cols].val.str_val.len = pres->watcher_uri->len;
+	n_query_cols++;
+
+	query_cols[n_query_cols] = &str_call_id_col;	
+	query_vals[n_query_cols].type = DB1_STR;
+	query_vals[n_query_cols].nul = 0;
+	query_vals[n_query_cols].val.str_val = pres->call_id;
+	n_query_cols++;
+
+	query_cols[n_query_cols] = &str_from_tag_col;	
+	query_vals[n_query_cols].type = DB1_STR;
+	query_vals[n_query_cols].nul = 0;
+	query_vals[n_query_cols].val.str_val = pres->from_tag;
+	n_query_cols++;
 
+	/* The columns I need to fill in to convert a temporary dialog to a dialog */
+	data_cols[n_data_cols] = &str_expires_col;
+	data_vals[n_data_cols].type = DB1_INT;
+	data_vals[n_data_cols].nul = 0;
+	data_vals[n_data_cols].val.int_val = pres->expires;
+	n_data_cols++;
+
+	data_cols[n_data_cols] = &str_desired_expires_col;
+	data_vals[n_data_cols].type = DB1_INT;
+	data_vals[n_data_cols].nul = 0;
+	data_vals[n_data_cols].val.int_val = pres->desired_expires;
+	n_data_cols++;
+
+	data_cols[n_data_cols] = &str_flag_col;
+	data_vals[n_data_cols].type = DB1_INT;
+	data_vals[n_data_cols].nul = 0;
+	data_vals[n_data_cols].val.int_val = pres->flag;
+	n_data_cols++;
+
+	data_cols[n_data_cols] = &str_to_tag_col;
+	data_vals[n_data_cols].type = DB1_STR;
+	data_vals[n_data_cols].nul = 0;
+	data_vals[n_data_cols].val.str_val = pres->to_tag;
+	n_data_cols++;
+
+	data_cols[n_data_cols] = &str_cseq_col;
+	data_vals[n_data_cols].type = DB1_INT;
+	data_vals[n_data_cols].nul = 0;
+	data_vals[n_data_cols].val.int_val = pres->cseq;
+	n_data_cols++;
+
+	data_cols[n_data_cols] = &str_record_route_col;
+	data_vals[n_data_cols].type = DB1_STR;
+	data_vals[n_data_cols].nul = 0;
+	data_vals[n_data_cols].val.str_val = pres->record_route;
+	n_data_cols++;
+
+	data_cols[n_data_cols] = &str_contact_col;
+	data_vals[n_data_cols].type = DB1_STR;
+	data_vals[n_data_cols].nul = 0;
+	data_vals[n_data_cols].val.str_val = pres->contact;
+	n_data_cols++;
+
+	data_cols[n_data_cols] = &str_remote_contact_col;
+	data_vals[n_data_cols].type = DB1_STR;
+	data_vals[n_data_cols].nul = 0;
+	data_vals[n_data_cols].val.str_val = pres->remote_contact;
+	n_data_cols++;
+
+	data_cols[n_data_cols] = &str_version_col;
+	data_vals[n_data_cols].type = DB1_INT;
+	data_vals[n_data_cols].nul = 0;
+	data_vals[n_data_cols].val.int_val = pres->version;
+	n_data_cols++;
+
+	data_cols[n_data_cols] = &str_extra_headers_col;
+	data_vals[n_data_cols].type = DB1_STR;
+	data_vals[n_data_cols].nul = 0;
+	if (pres->extra_headers)
+	{
+		data_vals[n_data_cols].val.str_val.s = pres->extra_headers->s;
+		data_vals[n_data_cols].val.str_val.len = pres->extra_headers->len;
+	}
+	else
+	{
+		data_vals[n_data_cols].val.str_val.s = "";
+		data_vals[n_data_cols].val.str_val.len = 0;
+	}
+	n_data_cols++;
+
+	if (pua_dbf.update(pua_db, query_cols, 0, query_vals,
+			data_cols, data_vals, n_query_cols, n_data_cols) < 0)
+	{
+		LM_ERR("Failed update db\n");
+		return -1;
+	}
+
+	shm_free(pres->remote_contact.s);
+	shm_free(pres);
+
+	return 1;
+}
+
+
+/******************************************************************************/
 int delete_temporary_dialog_puadb(ua_pres_t *pres ) 
 
 {

+ 1 - 0
modules_k/pua/pua_db.h

@@ -52,6 +52,7 @@ ua_pres_t* search_puadb(ua_pres_t *pres, ua_pres_t *result, db1_res_t **res );
 ua_pres_t* get_dialog_puadb(ua_pres_t *pres, ua_pres_t *result, db1_res_t **res);
 int is_dialog_puadb(ua_pres_t *pres);
 int get_record_id_puadb(ua_pres_t *pres, str **rec_id );
+int convert_temporary_dialog_puadb(ua_pres_t *pres);
 int delete_temporary_dialog_puadb(ua_pres_t *pres );
 int delete_puadb(ua_pres_t *pres );
 int update_version_puadb(ua_pres_t *pres, int version );

+ 17 - 28
modules_k/pua/send_subscribe.c

@@ -241,7 +241,6 @@ void subs_cback_func(struct cell *t, int cb_type, struct tmcb_params *ps)
 	int initial_request = 0;
 	db1_res_t *res=NULL;
  	ua_pres_t dbpres;
-	int need_to_free=0;
 	str pres_uri={0,0}, watcher_uri={0,0}, extra_headers={0,0};
 
 	memset(&dbpres, 0, sizeof(dbpres));
@@ -577,10 +576,6 @@ void subs_cback_func(struct cell *t, int cb_type, struct tmcb_params *ps)
 			pkg_free(record_route.s);
 		goto done;
 	}
-	else
-	{
-		need_to_free=1;
-	}
 	
 	memset(presentity, 0, size);
 	size= sizeof(ua_pres_t);
@@ -654,7 +649,7 @@ void subs_cback_func(struct cell *t, int cb_type, struct tmcb_params *ps)
 
 	/* write the remote contact filed */
 	presentity->remote_contact.s= (char*)shm_malloc(contact.len* sizeof(char));
-	if(presentity->remote_contact.s== NULL)
+	if(presentity->remote_contact.s==NULL)
 	{
 		ERR_MEM(SHARE_MEM);
 	}
@@ -677,11 +672,19 @@ void subs_cback_func(struct cell *t, int cb_type, struct tmcb_params *ps)
 
 	if (dbmode==PUA_DB_ONLY)
 	{
-		insert_puadb(presentity);
+		if (convert_temporary_dialog_puadb(presentity) < 0)
+		{
+			LM_ERR("Could not convert temporary dialog into a dialog\n");
+			goto error;
+		}
 	}
 	else
 	{
-		insert_htable(presentity);
+		if (convert_temporary_dialog(presentity) < 0)
+		{
+			LM_ERR("Could not convert temporary dialog into a dialog\n");
+			goto error;
+		}
 	}
 
 done:
@@ -690,27 +693,13 @@ done:
 		hentity->flag= flag;
 		run_pua_callbacks( hentity, msg);
 	}
+	goto end;
+
 error:	
-	if (dbmode == PUA_DB_ONLY)
-	{
-		if (presentity!=NULL)
-		{
-			delete_temporary_dialog_puadb(presentity);
-			if (need_to_free)
-			{
-				if (presentity->remote_contact.s) shm_free(presentity->remote_contact.s);
-				shm_free(presentity);
-			}
-		}
-	}
-	else
-	{
-		lock_get(&HashT->p_records[hash_code].lock);
-		presentity = get_temporary_dialog(hentity, hash_code);
-		if (presentity!=NULL)
-			delete_htable(presentity, hash_code);
-		lock_release(&HashT->p_records[hash_code].lock);
-	}
+	if (presentity->remote_contact.s) shm_free(presentity->remote_contact.s);
+	if (presentity) shm_free(presentity);
+
+end:
 
 	if(hentity)
 	{