Explorar o código

modules/tm: extended tm async support to SIP responses
-t_suspend and t_continue functions now work with SIP responses as well as requests

Richard Good %!s(int64=12) %!d(string=hai) anos
pai
achega
16e763c32d
Modificáronse 3 ficheiros con 359 adicións e 128 borrados
  1. 65 0
      modules/tm/t_reply.c
  2. 3 0
      modules/tm/t_reply.h
  3. 291 128
      modules/tm/t_suspend.c

+ 65 - 0
modules/tm/t_reply.c

@@ -806,6 +806,59 @@ static int _reply( struct cell *trans, struct sip_msg* p_msg,
 	}
 }
 
+int fake_resp(struct sip_msg *faked_resp,
+		struct sip_msg *shmem_msg, int extra_flags, struct ua_client *uac)
+{
+	/* on_failure_reply faked msg now copied from shmem msg (as opposed
+	* to zero-ing) -- more "read-only" actions (exec in particular) will
+	* work from reply_route as they will see msg->from, etc.; caution,
+	* rw actions may append some pkg stuff to msg, which will possibly be
+	* never released (shmem is released in a single block) */
+	memcpy( faked_resp, shmem_msg, sizeof(struct sip_msg));
+
+	/* if we set msg_id to something different from current's message
+	* id, the first t_fork will properly clean new branch URIs */
+	faked_resp->id=shmem_msg->id-1;
+	/* msg->parsed_uri_ok must be reset since msg_parsed_uri is
+	* not cloned (and cannot be cloned) */
+	faked_resp->parsed_uri_ok = 0;
+
+	faked_resp->msg_flags|=extra_flags; /* set the extra tm flags */
+
+	if(uac) setbflagsval(0, uac->branch_flags);
+	else setbflagsval(0, 0);
+
+	return 1;
+}
+
+void free_faked_resp(struct sip_msg *faked_resp, struct cell *t, int branch)
+{
+	struct hdr_field *hdr;
+
+	/* free all types of lump that were added */
+	del_nonshm_lump( &(faked_resp->add_rm) );
+	del_nonshm_lump( &(faked_resp->body_lumps) );
+	del_nonshm_lump_rpl( &(faked_resp->reply_lump) );
+
+	/* free header's parsed structures that were added */
+	for( hdr=faked_resp->headers ; hdr ; hdr=hdr->next ) {
+		if ( hdr->parsed && hdr_allocs_parse(hdr) &&
+			(hdr->parsed<(void*)t->uac[branch].reply)) {
+			DBG("DBG:free_faked_resp: removing hdr->parsed %d\n",
+				hdr->type);
+ 
+			clean_hdr_field(hdr);
+			hdr->parsed = 0;
+		}
+	}
+	/* free parsed body added by failure handlers */
+	if (faked_resp->body) {
+		if(faked_resp->body->free)
+			faked_resp->body->free(&faked_resp->body);
+			faked_resp->body = 0;
+	}
+}
+
 /** create or restore a "fake environment" for running a failure_route, 
  * OR an "async environment" depending on is_async_value (0=std failure-faked, 1=async)
  * if msg is set -> it will fake the env. vars conforming with the msg; if NULL
@@ -2466,6 +2519,11 @@ int reply_received( struct sip_msg  *p_msg )
 			}
 		}
 #endif
+        
+	if (unlikely(p_msg->flags&FL_RPL_SUSPENDED)) {
+		goto skip_send_reply;
+		/* suspend the reply (async), no error */
+	}
 	if (unlikely(!replies_locked)){
 		LOCK_REPLIES( t );
 		replies_locked=1;
@@ -2522,6 +2580,13 @@ int reply_received( struct sip_msg  *p_msg )
 		restart_rb_fr(& uac->request, t->fr_inv_timeout);
 		uac->request.flags|=F_RB_FR_INV; /* mark fr_inv */
 	} /* provisional replies */
+        
+skip_send_reply:
+
+	if (likely(replies_locked)){
+		UNLOCK_REPLIES(t); /* unlock replies  - this would be unlocked by send function*/
+		replies_locked=0;
+	}
 
 done:
 	tm_ctx_set_branch_index(T_BR_UNDEFINED);

+ 3 - 0
modules/tm/t_reply.h

@@ -238,8 +238,11 @@ void rpc_reply(rpc_t* rpc, void* c);
 void faked_env( struct cell *t,struct sip_msg *msg, int is_async_env);
 int fake_req(struct sip_msg *faked_req,
 		struct sip_msg *shmem_msg, int extra_flags, struct ua_client *uac);
+int fake_resp(struct sip_msg *faked_req,
+		struct sip_msg *shmem_msg, int extra_flags, struct ua_client *uac);
 
 void free_faked_req(struct sip_msg *faked_req, struct cell *t);
+void free_faked_resp(struct sip_msg *faked_req, struct cell *t, int branch);
 
 typedef int (*tget_picked_f)(void);
 int t_get_picked_branch(void);

+ 291 - 128
modules/tm/t_suspend.c

@@ -43,6 +43,8 @@
 #include "t_fwd.h"
 #include "t_funcs.h"
 #include "timer.h"
+#include "t_cancel.h"
+
 #include "t_suspend.h"
 
 /* Suspends the transaction for later use.
@@ -57,6 +59,7 @@ int t_suspend(struct sip_msg *msg,
 		unsigned int *hash_index, unsigned int *label)
 {
 	struct cell	*t;
+	int branch;
 
 	t = get_t();
 	if (!t || t == T_UNDEFINED) {
@@ -73,37 +76,64 @@ int t_suspend(struct sip_msg *msg,
 		return 1;
 	}
 
-	/* send a 100 Trying reply, because the INVITE processing
-	will probably take a long time */
-	if (msg->REQ_METHOD==METHOD_INVITE && (t->flags&T_AUTO_INV_100)
-		&& (t->uas.status < 100)
-	) {
-		if (!t_reply( t, msg , 100 ,
-			cfg_get(tm, tm_cfg, tm_auto_inv_100_r)))
+	if (msg->first_line.type != SIP_REPLY) {
+		/* send a 100 Trying reply, because the INVITE processing
+		will probably take a long time */
+		if (msg->REQ_METHOD==METHOD_INVITE && (t->flags&T_AUTO_INV_100)
+			&& (t->uas.status < 100)
+		) {
+			if (!t_reply( t, msg , 100 ,
+				cfg_get(tm, tm_cfg, tm_auto_inv_100_r)))
 				DBG("SER: ERROR: t_suspend (100)\n");
-	}
+		}
 
-	if ((t->nr_of_outgoings==0) && /* if there had already been
-				an UAC created, then the lumps were
-				saved as well */
-		save_msg_lumps(t->uas.request, msg)
-	) {
-		LOG(L_ERR, "ERROR: t_suspend: " \
-			"failed to save the message lumps\n");
-		return -1;
+		if ((t->nr_of_outgoings==0) && /* if there had already been
+			an UAC created, then the lumps were
+			saved as well */
+			save_msg_lumps(t->uas.request, msg)
+		) {
+			LOG(L_ERR, "ERROR: t_suspend: " \
+				"failed to save the message lumps\n");
+			return -1;
+		}
+		/* save the message flags */
+		t->uas.request->flags = msg->flags;
+
+		/* add a blind UAC to let the fr timer running */
+		if (add_blind_uac() < 0) {
+			LOG(L_ERR, "ERROR: t_suspend: " \
+				"failed to add the blind UAC\n");
+			return -1;
+		}
+	}else{
+		LOG(L_DBG,"DEBUG: t_suspend_reply: This is a suspend on reply - setting msg flag to SUSPEND\n");
+		msg->flags |= FL_RPL_SUSPENDED;
+		/* this is a reply suspend find which branch */
+
+		if (t_check( msg  , &branch )==-1){
+			LOG(L_ERR, "ERROR: t_suspend_reply: " \
+				"failed find UAC branch\n");
+			return -1; 
+		}
+		LOG(L_DBG,"DEBUG: t_suspend_reply:Found a a match with branch id [%d]\n", branch);
+
+		LOG(L_DBG,"DEBUG: t_suspend_reply:Cloning reply message to t->uac[branch].reply\n");
+
+		t->uac[branch].reply = sip_msg_cloner( msg, 0 );
+
+		if (! t->uac[branch].reply ) {
+			LOG(L_ERR, "ERROR: t_suspend_reply: can't alloc' clone memory\n");
+			return -1;
+		}
+
+		LOG(L_DBG,"DEBUG: t_suspend_reply: Saving transaction data\n");
+		t->uac[branch].reply->flags = msg->flags;
 	}
-	/* save the message flags */
-	t->uas.request->flags = msg->flags;
 
-	*hash_index = t->hash_index;
+        *hash_index = t->hash_index;
 	*label = t->label;
 
-	/* add a bling UAC to let the fr timer running */
-	if (add_blind_uac() < 0) {
-		LOG(L_ERR, "ERROR: t_suspend: " \
-			"failed to add the blind UAC\n");
-		return -1;
-	}
+
 
 	/* backup some extra info that can be used in continuation logic */
 	t->async_backup.backup_route = get_route_type();
@@ -126,7 +156,8 @@ int t_continue(unsigned int hash_index, unsigned int label,
 		struct action *route)
 {
 	struct cell	*t;
-	struct sip_msg	faked_req;
+	struct sip_msg	faked_req, faked_resp;
+	struct cancel_info cancel_data;
 	int	branch;
 	struct ua_client *uac =NULL;
 	int	ret;
@@ -148,101 +179,217 @@ int t_continue(unsigned int hash_index, unsigned int label,
 	/* The transaction has to be locked to protect it
 	 * form calling t_continue() multiple times simultaneously */
 	LOCK_ASYNC_CONTINUE(t);
+
 	t->flags |= T_ASYNC_CONTINUE;   /* we can now know anywhere in kamailio 
 					 * that we are executing post a suspend */
 	
 	/* which route block type were we in when we were suspended */
 	int cb_type = REQUEST_CB_TYPE;
-        switch (t->async_backup.backup_route) {
-            case REQUEST_ROUTE:
-                cb_type = REQUEST_CB_TYPE;
-                break;
-            case FAILURE_ROUTE:
-                cb_type = FAILURE_CB_TYPE;
-                break;
-            case TM_ONREPLY_ROUTE:
-                 cb_type = ONREPLY_CB_TYPE;
-                break;
-            case BRANCH_ROUTE:
-                cb_type = BRANCH_CB_TYPE;
-                break;
-        }
-	
-	branch = t->async_backup.blind_uac;	/* get the branch of the blind UAC setup 
-						 * during suspend */
-	if (branch >= 0) {
-		stop_rb_timers(&t->uac[branch].request);
-
-		if (t->uac[branch].last_received != 0) {
-			/* Either t_continue() has already been
-			 * called or the branch has already timed out.
-			 * Needless to continue. */
-			UNLOCK_ASYNC_CONTINUE(t);
-			UNREF(t); /* t_unref would kill the transaction */
-			return 1;
+	switch (t->async_backup.backup_route) {
+		case REQUEST_ROUTE:
+			cb_type = REQUEST_CB_TYPE;
+			break;
+		case FAILURE_ROUTE:
+			cb_type = FAILURE_CB_TYPE;
+			break;
+		case TM_ONREPLY_ROUTE:
+			cb_type = ONREPLY_CB_TYPE;
+			break;
+		case BRANCH_ROUTE:
+			cb_type = BRANCH_CB_TYPE;
+			break;
+	}
+
+	if(t->async_backup.backup_route != TM_ONREPLY_ROUTE){
+		branch = t->async_backup.blind_uac;	/* get the branch of the blind UAC setup 
+			* during suspend */
+		if (branch >= 0) {
+			stop_rb_timers(&t->uac[branch].request);
+ 
+			if (t->uac[branch].last_received != 0) {
+				/* Either t_continue() has already been
+				* called or the branch has already timed out.
+				* Needless to continue. */
+				UNLOCK_ASYNC_CONTINUE(t);
+				UNREF(t); /* t_unref would kill the transaction */
+				return 1;
+			}
+
+			/*we really don't need this next line anymore otherwise we will 
+			never be able to forward replies after a (t_relay) on this branch.
+			We want to try and treat this branch as 'normal' (as if it were a normal req, not async)' */
+			//t->uac[branch].last_received=500;
+			uac = &t->uac[branch];
 		}
+		/* else
+			Not a huge problem, fr timer will fire, but CANCEL
+			will not be sent. last_received will be set to 408. */
 
-		/*we really don't need this next line anymore otherwise we will 
-		  never be able to forward replies after a (t_relay) on this branch.
-		  We want to try and treat this branch as 'normal' (as if it were a normal req, not async)' */
-		//t->uac[branch].last_received=500;
-		uac = &t->uac[branch];
-	}
-	/* else
-		Not a huge problem, fr timer will fire, but CANCEL
-		will not be sent. last_received will be set to 408. */
+		reset_kr();
 
-	reset_kr();
+		/* fake the request and the environment, like in failure_route */
+		if (!fake_req(&faked_req, t->uas.request, 0 /* extra flags */, uac)) {
+			LOG(L_ERR, "ERROR: t_continue: fake_req failed\n");
+			ret = -1;
+			goto kill_trans;
+		}
+		faked_env( t, &faked_req, 1);
 
-	/* fake the request and the environment, like in failure_route */
-	if (!fake_req(&faked_req, t->uas.request, 0 /* extra flags */, uac)) {
-		LOG(L_ERR, "ERROR: t_continue: fake_req failed\n");
-		ret = -1;
-		goto kill_trans;
-	}
-	faked_env( t, &faked_req, 1);
+		/* execute the pre/post -script callbacks based on original route block */
+		if (exec_pre_script_cb(&faked_req, cb_type)>0) {
+			if (run_top_route(route, &faked_req, 0)<0)
+				LOG(L_ERR, "ERROR: t_continue: Error in run_top_route\n");
+			exec_post_script_cb(&faked_req, cb_type);
+		}
 
-	/* execute the pre/post -script callbacks based on original route block */
-	if (exec_pre_script_cb(&faked_req, cb_type)>0) {
-		if (run_top_route(route, &faked_req, 0)<0)
-			LOG(L_ERR, "ERROR: t_continue: Error in run_top_route\n");
-		exec_post_script_cb(&faked_req, cb_type);
-	}
+		/* TODO: save_msg_lumps should clone the lumps to shm mem */
 
-	/* TODO: save_msg_lumps should clone the lumps to shm mem */
+		/* restore original environment and free the fake msg */
+		faked_env( t, 0, 1);
+		free_faked_req(&faked_req, t);
 
-	/* restore original environment and free the fake msg */
-	faked_env( t, 0, 1);
-	free_faked_req(&faked_req, t);
+		/* update the flags */
+		t->uas.request->flags = faked_req.flags;
 
-	/* update the flags */
-	t->uas.request->flags = faked_req.flags;
+		if (t->uas.status < 200) {
+			/* No final reply has been sent yet.
+			* Check whether or not there is any pending branch.
+			*/
+			for (	branch = 0;
+				branch < t->nr_of_outgoings;
+				branch++
+			) {
+				if (t->uac[branch].last_received < 200)
+					break;
+			}
 
-	if (t->uas.status < 200) {
-		/* No final reply has been sent yet.
-		 * Check whether or not there is any pending branch.
-		 */
-		for (	branch = 0;
-			branch < t->nr_of_outgoings;
-			branch++
-		) {
-			if (t->uac[branch].last_received < 200)
-				break;
+			if (branch == t->nr_of_outgoings) {
+			/* There is not any open branch so there is
+			* no chance that a final response will be received. */
+				ret = 0;
+				goto kill_trans;
+			}
 		}
 
-		if (branch == t->nr_of_outgoings) {
-			/* There is not any open branch so there is
-			 * no chance that a final response will be received. */
-			ret = 0;
+	}else{
+		branch = t->async_backup.backup_branch;
+
+		init_cancel_info(&cancel_data);
+
+		LOG(L_DBG,"DEBUG: t_continue_reply: This a continue from a reply suspend\n");
+		/* this is a continue from a reply suspend */
+
+		LOG(L_DBG,"DEBUG: t_continue_reply: Disabling suspend branch");
+		t->uac[branch].reply->flags &= ~FL_RPL_SUSPENDED;
+		if (t->uas.request) t->uas.request->flags&= ~FL_RPL_SUSPENDED;
+
+		LOG(L_DBG,"DEBUG: t_continue_reply: Setting up faked environment");
+		if (!fake_resp(&faked_resp, t->uac[branch].reply, 0 /* extra flags */, 0)) {
+			LOG(L_ERR, "ERROR: t_continue_reply: fake_resp failed\n");
+			ret = -1;
 			goto kill_trans;
 		}
+
+		faked_env( t, &faked_resp, 1);
+
+		LOG(L_DBG,"DEBUG: Running pre script\n");
+		if (exec_pre_script_cb(&faked_resp, cb_type)>0) {
+			if (run_top_route(route, &faked_resp, 0)<0){
+				LOG(L_ERR, "ERROR: t_continue_reply: Error in run_top_route\n");
+			}
+			LOG(L_DBG,"DEBUG: t_continue_reply: Running exec post script\n");
+			exec_post_script_cb(&faked_resp, cb_type);
+		}
+
+		LOG(L_DBG,"DEBUG: t_continue_reply: Restoring previous environment");
+		faked_env( t, 0, 1);
+		free_faked_resp(&faked_resp, t, branch);
+
+		int reply_status;
+
+		/*lock transaction replies - will be unlocked when reply is relayed*/
+		LOCK_REPLIES( t );
+		if ( is_local(t) ) {
+			LOG(L_DBG,"DEBUG: t_continue_reply: t is local sending local reply with status code: [%d]\n", t->uac[branch].reply->first_line.u.reply.statuscode);
+			reply_status = local_reply( t, t->uac[branch].reply, branch, t->uac[branch].reply->first_line.u.reply.statuscode, &cancel_data );
+			if (reply_status == RPS_COMPLETED) {
+				/* no more UAC FR/RETR (if I received a 2xx, there may
+				* be still pending branches ...
+				*/
+				cleanup_uac_timers( t );
+				if (is_invite(t)) cancel_uacs(t, &cancel_data, F_CANCEL_B_KILL);
+				/* There is no need to call set_final_timer because we know
+				* that the transaction is local */
+				put_on_wait(t);
+			}else if (unlikely(cancel_data.cancel_bitmap)){
+				/* cancel everything, even non-INVITEs (e.g in case of 6xx), use
+				* cancel_b_method for canceling unreplied branches */
+				cancel_uacs(t, &cancel_data, cfg_get(tm,tm_cfg, cancel_b_flags));
+			}
+
+		} else {
+			LOG(L_DBG,"DEBUG: t_continue_reply: t is NOT local sending relaying reply with status code: [%d]\n", t->uac[branch].reply->first_line.u.reply.statuscode);
+			int do_put_on_wait = 0;
+			if(t->uac[branch].reply->first_line.u.reply.statuscode>=200){
+				do_put_on_wait = 1;
+			}
+			reply_status=relay_reply( t, t->uac[branch].reply, branch, t->uac[branch].reply->first_line.u.reply.statuscode, &cancel_data, do_put_on_wait );
+			if (reply_status == RPS_COMPLETED) {
+				/* no more UAC FR/RETR (if I received a 2xx, there may
+				be still pending branches ...
+				*/
+				cleanup_uac_timers( t );
+				/* 2xx is a special case: we can have a COMPLETED request
+				* with branches still open => we have to cancel them */
+				if (is_invite(t) && cancel_data.cancel_bitmap) 
+					cancel_uacs( t, &cancel_data,  F_CANCEL_B_KILL);
+				/* FR for negative INVITES, WAIT anything else */
+				/* Call to set_final_timer is embedded in relay_reply to avoid
+				* race conditions when reply is sent out and an ACK to stop
+				* retransmissions comes before retransmission timer is set.*/
+			}else if (unlikely(cancel_data.cancel_bitmap)){
+				/* cancel everything, even non-INVITEs (e.g in case of 6xx), use
+				* cancel_b_method for canceling unreplied branches */
+				cancel_uacs(t, &cancel_data, cfg_get(tm,tm_cfg, cancel_b_flags));
+			}
+
+		}
+		t->uac[branch].request.flags|=F_RB_REPLIED;
+
+		if (reply_status==RPS_ERROR){
+			goto done;
+		}
+
+		/* update FR/RETR timers on provisional replies */
+
+		int msg_status=t->uac[branch].reply->REPLY_STATUS;
+		int last_uac_status=t->uac[branch].last_received;
+
+		if (is_invite(t) && msg_status<200 &&
+			( cfg_get(tm, tm_cfg, restart_fr_on_each_reply) ||
+			( (last_uac_status<msg_status) &&
+			((msg_status>=180) || (last_uac_status==0)) )
+		) ) { /* provisional now */
+			restart_rb_fr(& t->uac[branch].request, t->fr_inv_timeout);
+			t->uac[branch].request.flags|=F_RB_FR_INV; /* mark fr_inv */
+		}
+            
 	}
 
+done:
 	UNLOCK_ASYNC_CONTINUE(t);
 
-	/* unref the transaction */
-	t_unref(t->uas.request);
-
+	if(t->async_backup.backup_route != TM_ONREPLY_ROUTE){
+		/* unref the transaction */
+		t_unref(t->uas.request);
+	}else{
+		tm_ctx_set_branch_index(T_BR_UNDEFINED);        
+		/* unref the transaction */
+		t_unref(t->uac[branch].reply);
+		LOG(L_DBG,"DEBUG: t_continue_reply: Freeing earlier cloned reply\n");
+		sip_msg_free(t->uac[branch].reply);
+		t->uac[branch].reply = 0;
+	}
 	return 0;
 
 kill_trans:
@@ -261,7 +408,12 @@ kill_trans:
 		UNLOCK_ASYNC_CONTINUE(t);
 	}
 
-	t_unref(t->uas.request);
+	if(t->async_backup.backup_route != TM_ONREPLY_ROUTE){
+		t_unref(t->uas.request);
+	}else{
+		/* unref the transaction */
+		t_unref(t->uac[branch].reply);
+	}
 	return ret;
 }
 
@@ -295,34 +447,45 @@ int t_cancel_suspend(unsigned int hash_index, unsigned int label)
 			"transaction id mismatch\n");
 		return -1;
 	}
-	/* The transaction does not need to be locked because this
-	 * function is either executed from the original route block
-	 * or from failure route which already locks */
-
-	reset_kr(); /* the blind UAC of t_suspend has set kr */
-
-	/* Try to find the blind UAC, and cancel its fr timer.
-	 * We assume that the last blind uac called this function. */
-	for (	branch = t->nr_of_outgoings-1;
-		branch >= 0 && t->uac[branch].request.buffer;
-		branch--);
-
-	if (branch >= 0) {
-		stop_rb_timers(&t->uac[branch].request);
-		/* Set last_received to something >= 200,
-		 * the actual value does not matter, the branch
-		 * will never be picked up for response forwarding.
-		 * If last_received is lower than 200,
-		 * then the branch may tried to be cancelled later,
-		 * for example when t_reply() is called from
-		 * a failure rute => deadlock, because both
-		 * of them need the reply lock to be held. */
-		t->uac[branch].last_received=500;
-	} else {
-		/* Not a huge problem, fr timer will fire, but CANCEL
-		will not be sent. last_received will be set to 408. */
-		return -1;
-	}
+        
+	if(t->async_backup.backup_route != TM_ONREPLY_ROUTE){
+		/* The transaction does not need to be locked because this
+		* function is either executed from the original route block
+		* or from failure route which already locks */
+
+		reset_kr(); /* the blind UAC of t_suspend has set kr */
+
+		/* Try to find the blind UAC, and cancel its fr timer.
+		* We assume that the last blind uac called this function. */
+		for (	branch = t->nr_of_outgoings-1;
+			branch >= 0 && t->uac[branch].request.buffer;
+			branch--);
+
+		if (branch >= 0) {
+			stop_rb_timers(&t->uac[branch].request);
+			/* Set last_received to something >= 200,
+			* the actual value does not matter, the branch
+			* will never be picked up for response forwarding.
+			* If last_received is lower than 200,
+			* then the branch may tried to be cancelled later,
+			* for example when t_reply() is called from
+			* a failure rute => deadlock, because both
+			* of them need the reply lock to be held. */
+			t->uac[branch].last_received=500;
+		} else {
+			/* Not a huge problem, fr timer will fire, but CANCEL
+			will not be sent. last_received will be set to 408. */
+			return -1;
+		}
+	}else{
+		branch = t->async_backup.backup_branch;
+
+		LOG(L_DBG,"DEBUG: t_cancel_suspend_reply: This is a cancel suspend for a response\n");
 
+		t->uac[branch].reply->flags &= ~FL_RPL_SUSPENDED;
+		if (t->uas.request) t->uas.request->flags&= ~FL_RPL_SUSPENDED;
+        }
+	
 	return 0;
 }
+