Pārlūkot izejas kodu

modules/tm: extended async usage
- enables resuming of tx in orginal route block tx was suspended, not only failure route
- dedicated lock to prevent multiple invocations of suspend on tz (reply lock used to be used)
- extra flag (T_ASYNC_CONTINUE) to mark a transaction that is being execute post suspend

Jason Penton 12 gadi atpakaļ
vecāks
revīzija
5ab44c7c2f

+ 16 - 3
modules/tm/h_table.h

@@ -91,6 +91,7 @@ struct cell;
 struct timer;
 struct retr_buf;
 struct ua_client;
+struct async_state;
 
 #include "../../mem/shm_mem.h"
 #include "lock.h"
@@ -271,7 +272,13 @@ struct totag_elem {
 	volatile int acked;
 };
 
-
+/* structure for storing transaction state prior to suspending of async transactions */
+typedef struct async_state {
+	unsigned int backup_route;
+	unsigned int backup_branch;
+	unsigned int blind_uac;
+	unsigned int ruri_new;
+} async_state_type;
 
 /* transaction's flags */
 /* is the transaction's request an INVITE? */
@@ -309,8 +316,9 @@ struct totag_elem {
 #	define T_PASS_PROVISIONAL_FLAG (1<<11)
 #	define pass_provisional(_t_)	((_t_)->flags&T_PASS_PROVISIONAL_FLAG)
 #endif
+#define T_ASYNC_CONTINUE (1<<12) /* Is this transaction in a continuation after being suspended */
 
-#define T_DISABLE_INTERNAL_REPLY (1<<12) /* don't send internal negative reply */
+#define T_DISABLE_INTERNAL_REPLY (1<<13) /* don't send internal negative reply */
 
 /* unsigned short should be enough for a retr. timer: max. 65535 ms =>
  * max retr. = 65 s which should be enough and saves us 2*2 bytes */
@@ -417,6 +425,9 @@ typedef struct cell
 	/* UA Clients */
 	struct ua_client  uac[ MAX_BRANCHES ];
 	
+	/* store transaction state to be used for async transactions */
+	struct async_state async_backup;
+	
 	/* to-tags of 200/INVITEs which were received from downstream and 
 	 * forwarded or passed to UAC; note that there can be arbitrarily 
 	 * many due to downstream forking; */
@@ -435,7 +446,9 @@ typedef struct cell
 
 	/* protection against concurrent reply processing */
 	ser_lock_t   reply_mutex;
-	
+	/* protect against concurrent async continues */
+	ser_lock_t   async_mutex;
+		
 	ticks_t fr_timeout;     /* final response interval for retr_bufs */
 	ticks_t fr_inv_timeout; /* final inv. response interval for retr_bufs */
 #ifdef TM_DIFF_RT_TIMEOUT

+ 34 - 2
modules/tm/lock.c

@@ -71,6 +71,7 @@
 static int sem_nr;
 gen_lock_set_t* entry_semaphore=0;
 gen_lock_set_t* reply_semaphore=0;
+gen_lock_set_t* async_semaphore=0;
 #endif
 
 
@@ -100,6 +101,10 @@ again:
 			lock_set_destroy(reply_semaphore);
 			lock_set_dealloc(reply_semaphore);
 		}
+		if (async_semaphore!=0){
+			lock_set_destroy(async_semaphore);
+			lock_set_dealloc(async_semaphore);
+		}
 		
 		if (i==0){
 			LOG(L_CRIT, "lock_initialize: could not allocate semaphore"
@@ -154,6 +159,20 @@ again:
 			i--;
 			goto again;
 	}
+	i++;
+	if (((async_semaphore=lock_set_alloc(i))==0)||
+		(lock_set_init(async_semaphore)==0)){
+			if (async_semaphore){
+				lock_set_dealloc(async_semaphore);
+				async_semaphore=0;
+			}
+			DBG("DEBUG:lock_initialize: async semaphore initialization"
+				" failure: %s\n", strerror(errno));
+			probe_run=1;
+			i--;
+			goto again;
+	}
+	
 
 	/* return success */
 	LOG(L_INFO, "INFO: semaphore arrays of size %d allocated\n", sem_nr );
@@ -193,7 +212,11 @@ void lock_cleanup()
 		lock_set_destroy(reply_semaphore);
 		lock_set_dealloc(reply_semaphore);
 	};
-	entry_semaphore =  reply_semaphore = 0;
+	if (async_semaphore !=0) {
+		lock_set_destroy(async_semaphore);
+		lock_set_dealloc(async_semaphore);
+	};
+	entry_semaphore =  reply_semaphore = async_semaphore = 0;
 
 }
 #endif /*GEN_LOCK_T_PREFERED*/
@@ -229,7 +252,16 @@ int init_entry_lock( struct s_table* ht, struct entry *entry )
 	return 0;
 }
 
-
+int init_async_lock( struct cell *cell )
+{
+#ifdef GEN_LOCK_T_PREFERED
+	lock_init(&cell->async_mutex);
+#else
+	cell->async_mutex.semaphore_set=async_semaphore;
+	cell->async_mutex.semaphore_index = cell->hash_index % sem_nr;
+#endif /* GEN_LOCK_T_PREFERED */
+	return 0;
+}
 
 int release_cell_lock( struct cell *cell )
 {

+ 1 - 0
modules/tm/lock.h

@@ -76,6 +76,7 @@ void lock_cleanup(void);
 
 int init_cell_lock( struct cell *cell );
 int init_entry_lock( struct s_table* ht, struct entry *entry );
+int init_async_lock( struct cell *cell );
 
 
 int release_cell_lock( struct cell *cell );

+ 5 - 0
modules/tm/t_fwd.c

@@ -755,6 +755,11 @@ int add_blind_uac( /*struct cell *t*/ )
 	membar_write(); /* to allow lockless prepare_to_cancel() we want to be sure
 					   all the writes finished before updating branch number*/
 	t->nr_of_outgoings=(branch+1);
+	t->async_backup.blind_uac = branch; /* whenever we create a blind UAC, lets save the current branch
+					 * this is used in async tm processing specifically to be able to route replies
+					 * that were possibly in response to a request forwarded on this blind UAC......
+					 * we still want replies to be processed as if it were a normal UAC */
+	
 	/* start FR timer -- protocol set by default to PROTO_NONE,
        which means retransmission timer will not be started
     */

+ 77 - 58
modules/tm/t_reply.c

@@ -806,13 +806,13 @@ static int _reply( struct cell *trans, struct sip_msg* p_msg,
 	}
 }
 
-/** create or restore a "fake environment" for running a failure_route.
- *if msg is set -> it will fake the env. vars conforming with the msg; if NULL
+/** create or restore a "fake environment" for running a failure_route, 
+ * OR an "async environment" depending on is_async_value (0=std failure-faked, 1=async)
+ * if msg is set -> it will fake the env. vars conforming with the msg; if NULL
  * the env. will be restore to original.
- * Side-effect: mark_ruri_consumed().
+ * Side-effect: mark_ruri_consumed() for faked env only.
  */
-void faked_env( struct cell *t, struct sip_msg *msg)
-{
+void faked_env(struct cell *t, struct sip_msg *msg, int is_async_env) {
 	static int backup_route_type;
 	static struct cell *backup_t;
 	static int backup_branch;
@@ -835,64 +835,83 @@ void faked_env( struct cell *t, struct sip_msg *msg)
 		 * a shmem-ed replica of the request; advertise it in route type;
 		 * for example t_reply needs to know that
 		 */
-		backup_route_type=get_route_type();
-		set_route_type(FAILURE_ROUTE);
-		/* don't bother backing up ruri state, since failure route
-		   is called either on reply or on timer and in both cases
-		   the ruri should not be used again for forking */
-		ruri_mark_consumed(); /* in failure route we assume ruri
-								 should not be used again for forking */
+		backup_route_type = get_route_type();
+
+		if (is_async_env) {
+			set_route_type(t->async_backup.backup_route);
+			if (t->async_backup.ruri_new) {
+				ruri_mark_new();
+			}
+		} else {
+			set_route_type(FAILURE_ROUTE);
+			/* don't bother backing up ruri state, since failure route
+			   is called either on reply or on timer and in both cases
+			   the ruri should not be used again for forking */
+			ruri_mark_consumed(); /* in failure route we assume ruri
+								     should not be used again for forking */
+		}
 		/* also, tm actions look in beginning whether transaction is
 		 * set -- whether we are called from a reply-processing
 		 * or a timer process, we need to set current transaction;
 		 * otherwise the actions would attempt to look the transaction
 		 * up (unnecessary overhead, refcounting)
 		 */
-		/* backup */
-		backup_t=get_t();
-		backup_branch=get_t_branch();
-		backup_msgid=global_msg_id;
-		/* fake transaction and message id */
-		global_msg_id=msg->id;
-		set_t(t, T_BR_UNDEFINED);
-		/* make available the avp list from transaction */
-
-		backup_uri_from = set_avp_list(AVP_TRACK_FROM | AVP_CLASS_URI, &t->uri_avps_from );
-		backup_uri_to = set_avp_list(AVP_TRACK_TO | AVP_CLASS_URI, &t->uri_avps_to );
-		backup_user_from = set_avp_list(AVP_TRACK_FROM | AVP_CLASS_USER, &t->user_avps_from );
-		backup_user_to = set_avp_list(AVP_TRACK_TO | AVP_CLASS_USER, &t->user_avps_to );
-		backup_domain_from = set_avp_list(AVP_TRACK_FROM | AVP_CLASS_DOMAIN, &t->domain_avps_from );
-		backup_domain_to = set_avp_list(AVP_TRACK_TO | AVP_CLASS_DOMAIN, &t->domain_avps_to );
+		if (!is_async_env) {
+			/* backup */
+			backup_t = get_t();
+			backup_branch = get_t_branch();
+			backup_msgid = global_msg_id;
+			/* fake transaction and message id */
+			global_msg_id = msg->id;
+			set_t(t, T_BR_UNDEFINED);
+
+			/* make available the avp list from transaction */
+			backup_uri_from = set_avp_list(AVP_TRACK_FROM | AVP_CLASS_URI, &t->uri_avps_from);
+			backup_uri_to = set_avp_list(AVP_TRACK_TO | AVP_CLASS_URI, &t->uri_avps_to);
+			backup_user_from = set_avp_list(AVP_TRACK_FROM | AVP_CLASS_USER, &t->user_avps_from);
+			backup_user_to = set_avp_list(AVP_TRACK_TO | AVP_CLASS_USER, &t->user_avps_to);
+			backup_domain_from = set_avp_list(AVP_TRACK_FROM | AVP_CLASS_DOMAIN, &t->domain_avps_from);
+			backup_domain_to = set_avp_list(AVP_TRACK_TO | AVP_CLASS_DOMAIN, &t->domain_avps_to);
 #ifdef WITH_XAVP
-		backup_xavps = xavp_set_list(&t->xavps_list);
+			backup_xavps = xavp_set_list(&t->xavps_list);
 #endif
-		/* set default send address to the saved value */
-		backup_si=bind_address;
-		bind_address=t->uac[0].request.dst.send_sock;
-		/* backup lump lists */
-		backup_add_rm = t->uas.request->add_rm;
-		backup_body_lumps = t->uas.request->body_lumps;
-		backup_reply_lump = t->uas.request->reply_lump;
+			/* set default send address to the saved value */
+			backup_si = bind_address;
+			bind_address = t->uac[0].request.dst.send_sock;
+			/* backup lump lists */
+			backup_add_rm = t->uas.request->add_rm;
+			backup_body_lumps = t->uas.request->body_lumps;
+			backup_reply_lump = t->uas.request->reply_lump;
+		} else {
+			global_msg_id = msg->id;
+			set_t(t, t->async_backup.backup_branch);
+		}
 	} else {
-		/* restore original environment */
-		set_t(backup_t, backup_branch);
-		global_msg_id=backup_msgid;
-		set_route_type(backup_route_type);
-		/* restore original avp list */
-		set_avp_list(AVP_TRACK_FROM | AVP_CLASS_USER, backup_user_from );
-		set_avp_list(AVP_TRACK_TO | AVP_CLASS_USER, backup_user_to );
-		set_avp_list(AVP_TRACK_FROM | AVP_CLASS_DOMAIN, backup_domain_from );
-		set_avp_list(AVP_TRACK_TO | AVP_CLASS_DOMAIN, backup_domain_to );
-		set_avp_list(AVP_TRACK_FROM | AVP_CLASS_URI, backup_uri_from );
-		set_avp_list(AVP_TRACK_TO | AVP_CLASS_URI, backup_uri_to );
+		if (!is_async_env) {
+			/* restore original environment */
+			set_t(backup_t, backup_branch);
+			global_msg_id = backup_msgid;
+			set_route_type(backup_route_type);
+			/* restore original avp list */
+			set_avp_list(AVP_TRACK_FROM | AVP_CLASS_USER, backup_user_from);
+			set_avp_list(AVP_TRACK_TO | AVP_CLASS_USER, backup_user_to);
+			set_avp_list(AVP_TRACK_FROM | AVP_CLASS_DOMAIN, backup_domain_from);
+			set_avp_list(AVP_TRACK_TO | AVP_CLASS_DOMAIN, backup_domain_to);
+			set_avp_list(AVP_TRACK_FROM | AVP_CLASS_URI, backup_uri_from);
+			set_avp_list(AVP_TRACK_TO | AVP_CLASS_URI, backup_uri_to);
 #ifdef WITH_XAVP
-		xavp_set_list(backup_xavps);
+			xavp_set_list(backup_xavps);
 #endif
-		bind_address=backup_si;
-		/* restore lump lists */
-		t->uas.request->add_rm = backup_add_rm;
-		t->uas.request->body_lumps = backup_body_lumps;
-		t->uas.request->reply_lump = backup_reply_lump;
+			bind_address = backup_si;
+			/* restore lump lists */
+			t->uas.request->add_rm = backup_add_rm;
+			t->uas.request->body_lumps = backup_body_lumps;
+			t->uas.request->reply_lump = backup_reply_lump;
+		} else {
+			/*we don't need to restore anything as there was no "environment" prior 
+						    to continuing (we are in a different process)*/
+			LM_WARN("nothing to restore in async continue, useless call\n");
+		}
 	}
 }
 
@@ -1031,7 +1050,7 @@ int run_failure_handlers(struct cell *t, struct sip_msg *rpl,
 		return 0;
 	}
 	/* fake also the env. conforming to the fake msg */
-	faked_env( t, &faked_req);
+	faked_env( t, &faked_req, 0);
 	/* DONE with faking ;-) -> run the failure handlers */
 
 	if (unlikely(has_tran_tmcbs( t, TMCB_ON_FAILURE)) ) {
@@ -1053,7 +1072,7 @@ int run_failure_handlers(struct cell *t, struct sip_msg *rpl,
 	}
 
 	/* restore original environment and free the fake msg */
-	faked_env( t, 0);
+	faked_env( t, 0, 0);
 	free_faked_req(&faked_req,t);
 
 	/* if failure handler changed flag, update transaction context */
@@ -1092,7 +1111,7 @@ int run_branch_failure_handlers(struct cell *t, struct sip_msg *rpl,
 		return 0;
 	}
 	/* fake also the env. conforming to the fake msg */
-	faked_env( t, &faked_req);
+	faked_env( t, &faked_req, 0);
 	set_route_type(BRANCH_FAILURE_ROUTE);
 	set_t(t, picked_branch);
 	/* DONE with faking ;-) -> run the branch_failure handlers */
@@ -1113,7 +1132,7 @@ int run_branch_failure_handlers(struct cell *t, struct sip_msg *rpl,
 	}
 
 	/* restore original environment and free the fake msg */
-	faked_env( t, 0);
+	faked_env( t, 0, 0);
 	free_faked_req(&faked_req,t);
 
 	/* if branch_failure handler changed flag, update transaction context */
@@ -1203,8 +1222,8 @@ int t_pick_branch(int inc_branch, int inc_code, struct cell *t, int *res_code)
 		 * to be a pending, incomplete branch. */
 		if ((!t->uac[b].request.buffer) && (t->uac[b].last_received>=200))
 			continue;
-		/* there is still an unfinished UAC transaction; wait now! */
-		if ( t->uac[b].last_received<200 )
+		/* there is still an unfinished UAC transaction (we ignore unfinished blind UACs) wait now! */
+		if ( t->uac[b].last_received<200 && !((t->flags&T_ASYNC_CONTINUE) && b==t->async_backup.blind_uac))
 			return -2;
 		/* if reply is null => t_send_branch "faked" reply, skip over it */
 		if ( rpl && 

+ 1 - 1
modules/tm/t_reply.h

@@ -235,7 +235,7 @@ void t_drop_replies(int v);
 
 void rpc_reply(rpc_t* rpc, void* c);
 
-void faked_env( struct cell *t,struct sip_msg *msg);
+void faked_env( struct cell *t,struct sip_msg *msg, int is_async_env);
 int fake_req(struct sip_msg *faked_req,
 		struct sip_msg *shmem_msg, int extra_flags, struct ua_client *uac);
 

+ 43 - 29
modules/tm/t_suspend.c

@@ -33,6 +33,7 @@
 
 #include "../../action.h"
 #include "../../script_cb.h"
+#include "../../dset.h"
 
 #include "config.h"
 #include "sip_msg.h"
@@ -104,6 +105,12 @@ int t_suspend(struct sip_msg *msg,
 		return -1;
 	}
 
+	/* backup some extra info that can be used in continuation logic */
+	t->async_backup.backup_route = get_route_type();
+	t->async_backup.backup_branch = get_t_branch();
+	t->async_backup.ruri_new = ruri_get_forking_state();
+
+
 	return 0;
 }
 
@@ -140,14 +147,29 @@ int t_continue(unsigned int hash_index, unsigned int label,
 
 	/* The transaction has to be locked to protect it
 	 * form calling t_continue() multiple times simultaneously */
-	LOCK_REPLIES(t);
-
-	/* Try to find the blind UAC, and cancel its fr timer.
-	 * We assume that the last blind uac called t_continue(). */
-	for (	branch = t->nr_of_outgoings-1;
-		branch >= 0 && t->uac[branch].request.buffer;
-		branch--);
-
+	LOCK_ASYNC_CONTINUE(t);
+	t->flags |= T_ASYNC_CONTINUE;   /* we can now know anywhere in kamailio 
+					 * that we are executing post a suspend */
+	
+	/* which route block type were we in when we were suspended */
+	int cb_type = REQUEST_CB_TYPE;
+        switch (t->async_backup.backup_route) {
+            case REQUEST_ROUTE:
+                cb_type = REQUEST_CB_TYPE;
+                break;
+            case FAILURE_ROUTE:
+                cb_type = FAILURE_CB_TYPE;
+                break;
+            case TM_ONREPLY_ROUTE:
+                 cb_type = ONREPLY_CB_TYPE;
+                break;
+            case BRANCH_ROUTE:
+                cb_type = BRANCH_CB_TYPE;
+                break;
+        }
+	
+	branch = t->async_backup.blind_uac;	/* get the branch of the blind UAC setup 
+						 * during suspend */
 	if (branch >= 0) {
 		stop_rb_timers(&t->uac[branch].request);
 
@@ -155,20 +177,15 @@ int t_continue(unsigned int hash_index, unsigned int label,
 			/* Either t_continue() has already been
 			 * called or the branch has already timed out.
 			 * Needless to continue. */
-			UNLOCK_REPLIES(t);
+			UNLOCK_ASYNC_CONTINUE(t);
 			UNREF(t); /* t_unref would kill the transaction */
 			return 1;
 		}
 
-		/* Set last_received to something >= 200,
-		 * the actual value does not matter, the branch
-		 * will never be picked up for response forwarding.
-		 * If last_received is lower than 200,
-		 * then the branch may tried to be cancelled later,
-		 * for example when t_reply() is called from
-		 * a failure route => deadlock, because both
-		 * of them need the reply lock to be held. */
-		t->uac[branch].last_received=500;
+		/*we really don't need this next line anymore otherwise we will 
+		  never be able to forward replies after a (t_relay) on this branch.
+		  We want to try and treat this branch as 'normal' (as if it were a normal req, not async)' */
+		//t->uac[branch].last_received=500;
 		uac = &t->uac[branch];
 	}
 	/* else
@@ -183,22 +200,19 @@ int t_continue(unsigned int hash_index, unsigned int label,
 		ret = -1;
 		goto kill_trans;
 	}
-	faked_env( t, &faked_req);
+	faked_env( t, &faked_req, 1);
 
-	/* The sip msg is a faked msg just like in failure route
-	 * therefore execute the pre- and post-script callbacks
-	 * of failure route (Miklos)
-	 */
-	if (exec_pre_script_cb(&faked_req, FAILURE_CB_TYPE)>0) {
+	/* execute the pre/post -script callbacks based on original route block */
+	if (exec_pre_script_cb(&faked_req, cb_type)>0) {
 		if (run_top_route(route, &faked_req, 0)<0)
 			LOG(L_ERR, "ERROR: t_continue: Error in run_top_route\n");
-		exec_post_script_cb(&faked_req, FAILURE_CB_TYPE);
+		exec_post_script_cb(&faked_req, cb_type);
 	}
 
 	/* TODO: save_msg_lumps should clone the lumps to shm mem */
 
 	/* restore original environment and free the fake msg */
-	faked_env( t, 0);
+	faked_env( t, 0, 1);
 	free_faked_req(&faked_req, t);
 
 	/* update the flags */
@@ -224,7 +238,7 @@ int t_continue(unsigned int hash_index, unsigned int label,
 		}
 	}
 
-	UNLOCK_REPLIES(t);
+	UNLOCK_ASYNC_CONTINUE(t);
 
 	/* unref the transaction */
 	t_unref(t->uas.request);
@@ -241,10 +255,10 @@ kill_trans:
 			"reply generation failed\n");
 		/* The transaction must be explicitely released,
 		 * no more timer is running */
-		UNLOCK_REPLIES(t);
+		UNLOCK_ASYNC_CONTINUE(t);
 		t_release_transaction(t);
 	} else {
-		UNLOCK_REPLIES(t);
+		UNLOCK_ASYNC_CONTINUE(t);
 	}
 
 	t_unref(t->uas.request);

+ 3 - 0
modules/tm/t_suspend.h

@@ -29,6 +29,9 @@
 #ifndef _T_SUSPEND_H
 #define _T_SUSPEND_H
 
+#define LOCK_ASYNC_CONTINUE(_t) lock(&(_t)->async_mutex )
+#define UNLOCK_ASYNC_CONTINUE(_t) unlock(&(_t)->async_mutex )
+
 int t_suspend(struct sip_msg *msg,
 		unsigned int *hash_index, unsigned int *label);
 typedef int (*t_suspend_f)(struct sip_msg *msg,