Forráskód Böngészése

modules_k/presence: Use database row/table locking where supported in DB only mode

- Under load there are lots of DB deadlocks when using
  (start|end)_transaction() with multiple presence processes and/or
  servers.
- Without using (start|end)_transaction() multiple processes/servers
  overwrite each others changes.
- Using row locking (where possible) and table locking (where
  required) fixes these problems.
- IMPORTANT NOTE: DB only, multi-process/multi-server, presence will
  only work properly under high-load when using a database driver that
  supports transactions and locking (currently just db_postgres).
Peter Dunkley 13 éve
szülő
commit
34cd2acb53

+ 39 - 8
modules_k/presence/notify.c

@@ -1309,6 +1309,7 @@ int publ_notify_notifier(str pres_uri, pres_ev_t *event)
 	int i;
 	int ret = -1;
 	subs_t subs;
+	db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
 
 	if(pa_db == NULL)
 	{
@@ -1338,7 +1339,16 @@ int publ_notify_notifier(str pres_uri, pres_ev_t *event)
 	result_cols[r_to_tag_col=n_result_cols++] = &str_to_tag_col;
 	result_cols[r_from_tag_col=n_result_cols++] = &str_from_tag_col;
 
-	if(pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols, 
+	if (pa_dbf.start_transaction)
+	{
+		if (pa_dbf.start_transaction(pa_db, DB_LOCKING_WRITE) < 0)
+		{
+			LM_ERR("in start_transaction\n");
+			goto error;
+		}
+	}
+
+	if(query_fn(pa_db, query_cols, 0, query_vals, result_cols, 
 				n_query_cols, n_result_cols, 0, &result )< 0)
 	{
 		LM_ERR("Can't query db\n");
@@ -1366,10 +1376,26 @@ int publ_notify_notifier(str pres_uri, pres_ev_t *event)
 		set_updated(&subs);
 	}
 
+	if (pa_dbf.end_transaction)
+	{
+		if (pa_dbf.end_transaction(pa_db) < 0)
+		{
+			LM_ERR("in end_transaction\n");
+			goto error;
+		}
+	}
+
 	ret = RES_ROW_N(result);
 
 error:
 	if (result) pa_dbf.free_result(pa_db, result);
+
+	if (pa_dbf.abort_transaction)
+	{
+		if (pa_dbf.abort_transaction(pa_db) < 0)
+			LM_ERR("in abort_transaction\n");
+	}
+
 	return ret;
 }
 
@@ -2007,6 +2033,7 @@ static int unset_watchers_updated_winfo(str *pres_uri)
 	int n_query_cols = 0;
 	int ret = -1;
 	str winfo = str_init("presence.winfo");
+	db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
 
 	/* If this is the only presence.winfo dialog awaiting
 	   update for this presentity reset all of the watchers
@@ -2044,7 +2071,7 @@ static int unset_watchers_updated_winfo(str *pres_uri)
 		goto error;
 	}
 
-	if (pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols,
+	if (query_fn(pa_db, query_cols, 0, query_vals, result_cols,
 					n_query_cols, 1, 0, &result) < 0)
 	{
 		LM_ERR("in sql query\n");
@@ -2091,6 +2118,7 @@ static int dialogs_awaiting_update(str *pres_uri, str event)
 	db1_res_t *result = NULL;
 	int n_query_cols = 0;
 	int ret = -1;
+	db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
 
 	query_cols[n_query_cols] = &str_presentity_uri_col;
 	query_vals[n_query_cols].type = DB1_STR;
@@ -2122,7 +2150,7 @@ static int dialogs_awaiting_update(str *pres_uri, str event)
 		goto error;
 	}
 
-	if (pa_dbf.query(pa_db, query_cols, query_ops, query_vals,
+	if (query_fn(pa_db, query_cols, query_ops, query_vals,
 				result_cols, n_query_cols, 1, 0, &result) < 0)
 	{
 		LM_ERR("in sql query\n");
@@ -2152,6 +2180,7 @@ int set_wipeer_subs_updated(str *pres_uri, pres_ev_t *event, int full)
 	int callid_col, from_tag_col, to_tag_col;
 	int i, ret = -1, count;
 	str callid, from_tag, to_tag;
+	db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
 
 	query_cols[n_query_cols] = &str_presentity_uri_col;
 	query_vals[n_query_cols].type = DB1_STR;
@@ -2176,7 +2205,7 @@ int set_wipeer_subs_updated(str *pres_uri, pres_ev_t *event, int full)
 		goto error;
 	}
 
-	if (pa_dbf.query (pa_db, query_cols, 0, query_vals, result_cols,
+	if (query_fn(pa_db, query_cols, 0, query_vals, result_cols,
 				n_query_cols, n_result_cols, 0,  &result) < 0) 
 	{
 		LM_ERR("in sql query\n");
@@ -2259,6 +2288,7 @@ int set_wipeer_subs_updated(str *pres_uri, pres_ev_t *event, int full)
 done:
 error:
 	if (result) pa_dbf.free_result(pa_db, result);
+
 	return ret;
 }
 
@@ -2640,6 +2670,7 @@ int process_dialogs(int round, int presence_winfo)
 	str ev_sname, winfo = str_init("presence.winfo");
 	int now = (int)time(NULL);
 	int updated = 0;
+	db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
 
 	if (++subset > (pres_waitn_time * pres_notifier_poll_rate) -1)
 		subset = 0;
@@ -2678,7 +2709,7 @@ int process_dialogs(int round, int presence_winfo)
 
 	if (pa_dbf.start_transaction)
 	{
-		if (pa_dbf.start_transaction(pa_db) < 0)
+		if (pa_dbf.start_transaction(pa_db, DB_LOCKING_WRITE) < 0)
 		{
 			LM_ERR("in start_transaction\n");
 			goto error;
@@ -2686,7 +2717,7 @@ int process_dialogs(int round, int presence_winfo)
 	}
 
 	/* Step 1: Find active_watchers that require notification */
-	if (pa_dbf.query(pa_db, query_cols, query_ops, query_vals, result_cols,
+	if (query_fn(pa_db, query_cols, query_ops, query_vals, result_cols,
 			 n_query_cols, n_result_cols, 0, &dialog_list) < 0)
 	{
 		LM_ERR("in sql query\n");
@@ -2787,7 +2818,7 @@ int process_dialogs(int round, int presence_winfo)
 
 		if (pa_dbf.start_transaction)
 		{
-			if (pa_dbf.start_transaction(pa_db) < 0)
+			if (pa_dbf.start_transaction(pa_db, DB_LOCKING_WRITE) < 0)
 			{
 				LM_ERR("in start_transaction\n");
 				goto error;
@@ -2795,7 +2826,7 @@ int process_dialogs(int round, int presence_winfo)
 		}
 		end_transaction = 1;
 
-		if (pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols,
+		if (query_fn(pa_db, query_cols, 0, query_vals, result_cols,
 				 n_query_cols, n_result_cols, 0, &dialog) < 0)
 		{
 			LM_ERR("in sql query\n");

+ 52 - 1
modules_k/presence/presentity.c

@@ -423,11 +423,36 @@ int update_presentity(struct sip_msg* msg, presentity_t* presentity, str* body,
 				LM_ERR("replace is required for pidf-manipulation support\n");
 				goto error;
 			}
+
+			if (pa_dbf.start_transaction)
+			{
+				if (pa_dbf.start_transaction(pa_db, DB_LOCKING_WRITE) < 0)
+				{
+					LM_ERR("in start_transaction\n");
+					goto error;
+				}
+			}
+
+
 			if (pa_dbf.replace(pa_db, query_cols, query_vals, n_query_cols, 4, 0) < 0) 
 			{
 				LM_ERR("replacing record in database\n");
+				if (pa_dbf.abort_transaction)
+				{
+					if (pa_dbf.abort_transaction(pa_db) < 0)
+						LM_ERR("in abort_transaction\n");
+				}
 				goto error;
 			}
+
+			if (pa_dbf.end_transaction)
+			{
+				if (pa_dbf.end_transaction(pa_db) < 0)
+				{
+					LM_ERR("in end_transaction\n");
+					goto error;
+				}
+			}
 		}
 
 		if( publ_send200ok(msg, presentity->expires, presentity->etag)< 0)
@@ -1123,6 +1148,7 @@ int mark_presentity_for_delete(presentity_t *pres)
 	int n_query_cols = 0, n_update_cols = 0;
 	int ret = -1;
 	str *cur_body = NULL, *new_body = NULL;
+	db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
 
 	if (pres->event->agg_nbody == NULL)
 	{
@@ -1167,7 +1193,16 @@ int mark_presentity_for_delete(presentity_t *pres)
 
 	result_cols[0] = &str_body_col;
 
-	if (pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols,
+	if (pa_dbf.start_transaction)
+	{
+		if (pa_dbf.start_transaction(pa_db, DB_LOCKING_WRITE) < 0)
+		{
+			LM_ERR("in start_transaction\n");
+			goto error;
+		}
+	}
+
+	if (query_fn(pa_db, query_cols, 0, query_vals, result_cols,
 				n_query_cols, 1, 0, &result) < 0)
 	{
 		LM_ERR("query failed\n");
@@ -1247,6 +1282,15 @@ int mark_presentity_for_delete(presentity_t *pres)
 		goto error;
 	}
 
+	if (pa_dbf.end_transaction)
+	{
+		if (pa_dbf.end_transaction(pa_db) < 0)
+		{
+			LM_ERR("in end_transaction\n");
+			goto error;
+		}
+	}
+
 	if (pa_dbf.affected_rows)
 		ret = pa_dbf.affected_rows(pa_db);
 	else
@@ -1257,6 +1301,13 @@ error:
 	free_notify_body(new_body, pres->event);
 	if (cur_body) pkg_free(cur_body);
 	if (result) pa_dbf.free_result(pa_db, result);
+
+	if (pa_dbf.abort_transaction)
+	{
+		if (pa_dbf.abort_transaction(pa_db) < 0)
+			LM_ERR("in abort_transaction\n");
+	}
+
 	return ret;
 }
 

+ 74 - 7
modules_k/presence/subscribe.c

@@ -856,6 +856,20 @@ int handle_subscribe(struct sip_msg* msg, str watcher_user, str watcher_domain)
 		goto error;
 	}
 
+	if (pres_notifier_processes > 0 && pa_dbf.start_transaction)
+	{
+		if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0) 	
+		{
+			LM_ERR("unsuccessful use_table sql operation\n");
+			goto error;
+		}
+		if (pa_dbf.start_transaction(pa_db, DB_LOCKING_WRITE) < 0)
+		{
+			LM_ERR("in start_transaction\n");
+			goto error;
+		}
+	}
+
 	/* getting presentity uri from Request-URI if initial subscribe - or else from database*/
 	if(to_tag_gen)
 	{
@@ -980,6 +994,15 @@ int handle_subscribe(struct sip_msg* msg, str watcher_user, str watcher_domain)
 		LM_ERR("in update_subscription\n");
 		goto error;
 	}
+	if (pres_notifier_processes > 0 && pa_dbf.end_transaction)
+	{
+		if (pa_dbf.end_transaction(pa_db) < 0)
+		{
+			LM_ERR("in end_transaction\n");
+			goto error;
+		}
+	}
+
 	if(subs.auth_rules_doc)
 	{
 		pkg_free(subs.auth_rules_doc->s);
@@ -1040,6 +1063,12 @@ error:
 	if(subs.record_route.s)
 		pkg_free(subs.record_route.s);
 
+	if (pres_notifier_processes > 0 && pa_dbf.abort_transaction)
+	{
+		if (pa_dbf.abort_transaction(pa_db) < 0)
+			LM_ERR("in abort_transaction\n");
+	}
+
 	return -1;
 
 }
@@ -1433,6 +1462,7 @@ int get_database_info(struct sip_msg* msg, subs_t* subs, int* reply_code, str* r
 	unsigned int remote_cseq;
 	str pres_uri, record_route;
 	str reason;
+	db_query_f query_fn = pa_dbf.query;
 
 	query_cols[n_query_cols] = &str_callid_col;
 	query_vals[n_query_cols].type = DB1_STR;
@@ -1468,7 +1498,10 @@ int get_database_info(struct sip_msg* msg, subs_t* subs, int* reply_code, str* r
 		return -1;
 	}
 	
-	if (pa_dbf.query (pa_db, query_cols, 0, query_vals,
+	if (pres_notifier_processes > 0 && pa_dbf.start_transaction)
+		query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
+
+	if (query_fn (pa_db, query_cols, 0, query_vals,
 		 result_cols, n_query_cols, n_result_cols, 0,  &result) < 0) 
 	{
 		LM_ERR("querying subscription dialog\n");
@@ -1594,7 +1627,7 @@ void update_db_subs_timer_notifier(void)
 	db1_res_t *result = NULL;
 	int n_query_cols = 0, n_result_cols = 0;
 	int r_callid_col = 0, r_to_tag_col = 0, r_from_tag_col = 0;
-	int i;
+	int i, res;
 	subs_t subs;
 
 	if(pa_db == NULL)
@@ -1620,9 +1653,24 @@ void update_db_subs_timer_notifier(void)
 	result_cols[r_to_tag_col=n_result_cols++] = &str_to_tag_col;
 	result_cols[r_from_tag_col=n_result_cols++] = &str_from_tag_col;
 
-	if(db_fetch_query(&pa_dbf, pres_fetch_rows, pa_db, query_cols,
-			  query_ops, query_vals, result_cols,
-			  n_query_cols, n_result_cols, 0, &result )< 0)
+	if (pa_dbf.start_transaction)
+	{
+		if (pa_dbf.start_transaction(pa_db, DB_LOCKING_WRITE) < 0)
+		{
+			LM_ERR("in start_transaction\n");
+			goto error;
+		}
+	}
+
+	if (pa_dbf.query_lock)
+		res = db_fetch_query_lock(&pa_dbf, pres_fetch_rows, pa_db, query_cols,
+				  query_ops, query_vals, result_cols,
+				  n_query_cols, n_result_cols, 0, &result );
+	else
+		res = db_fetch_query(&pa_dbf, pres_fetch_rows, pa_db, query_cols,
+				  query_ops, query_vals, result_cols,
+				  n_query_cols, n_result_cols, 0, &result );
+	if (res < 0)
 	{
 		LM_ERR("Can't query db\n");
 		goto error;
@@ -1653,8 +1701,24 @@ void update_db_subs_timer_notifier(void)
 	} while (db_fetch_next(&pa_dbf, pres_fetch_rows, pa_db, &result) == 1
 			&& RES_ROW_N(result) > 0);
 
+	if (pa_dbf.end_transaction)
+	{
+		if (pa_dbf.end_transaction(pa_db) < 0)
+		{
+			LM_ERR("in end_transaction\n");
+			goto error;
+		}
+	}
+
 error:
 	if (result) pa_dbf.free_result(pa_db, result);
+
+	if (pa_dbf.abort_transaction)
+	{
+		if (pa_dbf.abort_transaction(pa_db) < 0)
+			LM_ERR("in abort_transaction\n");
+	}
+
 }
 
 void update_db_subs_timer_dbonly(void)
@@ -2559,7 +2623,7 @@ int insert_db_subs_auth(subs_t* subs)
 					2, 0) < 0)
 		{
 			LM_ERR("in sql replace\n");
-			return -1;
+			goto error;
 		}
 	}
 	else
@@ -2573,9 +2637,12 @@ int insert_db_subs_auth(subs_t* subs)
 		if(pa_dbf.insert(pa_db, db_keys, db_vals, n_query_cols )< 0)
 		{	
 			LM_ERR("in sql insert\n");
-			return -1;
+			goto error;
 		}
 	}
 
 	return 0;
+
+error:
+	return -1;
 }