Bläddra i källkod

Revert "modules/tm: removed no longer needed ASYNC mutex"

This reverts commit 7ca0e1e87a84f323bbfbaf1ea4985fbfecec00ed.
jaybeepee 10 år sedan
förälder
incheckning
cf03a43958
6 ändrade filer med 84 tillägg och 5 borttagningar
  1. 1 0
      modules/tm/h_table.c
  2. 11 3
      modules/tm/h_table.h
  3. 47 0
      modules/tm/lock.c
  4. 2 0
      modules/tm/lock.h
  5. 1 1
      modules/tm/t_reply.c
  6. 22 1
      modules/tm/t_suspend.c

+ 1 - 0
modules/tm/h_table.c

@@ -371,6 +371,7 @@ struct cell*  build_cell( struct sip_msg* p_msg )
 
 	init_synonym_id(p_msg, new_cell->md5);
 	init_cell_lock(  new_cell );
+	init_async_lock( new_cell );
 	t_stats_created();
 	return new_cell;
 

+ 11 - 3
modules/tm/h_table.h

@@ -296,9 +296,11 @@ typedef struct async_state {
 #	define T_PASS_PROVISIONAL_FLAG (1<<11)
 #	define pass_provisional(_t_)	((_t_)->flags&T_PASS_PROVISIONAL_FLAG)
 #endif
-#define T_DISABLE_INTERNAL_REPLY (1<<12) /* don't send internal negative reply */
-#define T_ADMIN_REPLY (1<<13) /* t reply sent by admin (e.g., from cfg script) */
-#define T_ASYNC_SUSPENDED (1<<14)
+#define T_ASYNC_CONTINUE (1<<12) /* Is this transaction in a continuation after being suspended */
+
+#define T_DISABLE_INTERNAL_REPLY (1<<13) /* don't send internal negative reply */
+#define T_ADMIN_REPLY (1<<14) /* t reply sent by admin (e.g., from cfg script) */
+#define T_ASYNC_SUSPENDED (1<<15)
 
 /* unsigned short should be enough for a retr. timer: max. 65535 ms =>
  * max retr. = 65 s which should be enough and saves us 2*2 bytes */
@@ -430,6 +432,12 @@ typedef struct cell
 	atomic_t reply_locker_pid;
 	/* recursive reply lock count */
 	int reply_rec_lock_level;
+
+#ifdef ENABLE_ASYNC_MUTEX
+	/* protect against concurrent async continues */
+	ser_lock_t   async_mutex;
+#endif
+
 	ticks_t fr_timeout;     /* final response interval for retr_bufs */
 	ticks_t fr_inv_timeout; /* final inv. response interval for retr_bufs */
 #ifdef TM_DIFF_RT_TIMEOUT

+ 47 - 0
modules/tm/lock.c

@@ -55,6 +55,9 @@
 static int sem_nr;
 gen_lock_set_t* entry_semaphore=0;
 gen_lock_set_t* reply_semaphore=0;
+#ifdef ENABLE_ASYNC_MUTEX
+gen_lock_set_t* async_semaphore=0;
+#endif
 #endif
 
 
@@ -84,6 +87,12 @@ again:
 			lock_set_destroy(reply_semaphore);
 			lock_set_dealloc(reply_semaphore);
 		}
+#ifdef ENABLE_ASYNC_MUTEX
+		if (async_semaphore!=0){
+			lock_set_destroy(async_semaphore);
+			lock_set_dealloc(async_semaphore);
+		}
+#endif
 		if (i==0){
 			LOG(L_CRIT, "lock_initialize: could not allocate semaphore"
 					" sets\n");
@@ -137,6 +146,21 @@ again:
 			i--;
 			goto again;
 	}
+#ifdef ENABLE_ASYNC_MUTEX
+	i++;
+	if (((async_semaphore=lock_set_alloc(i))==0)||
+		(lock_set_init(async_semaphore)==0)){
+			if (async_semaphore){
+				lock_set_dealloc(async_semaphore);
+				async_semaphore=0;
+			}
+			DBG("DEBUG:lock_initialize: async semaphore initialization"
+				" failure: %s\n", strerror(errno));
+			probe_run=1;
+			i--;
+			goto again;
+	}
+#endif
 
 	/* return success */
 	LOG(L_INFO, "INFO: semaphore arrays of size %d allocated\n", sem_nr );
@@ -176,6 +200,13 @@ void lock_cleanup()
 		lock_set_destroy(reply_semaphore);
 		lock_set_dealloc(reply_semaphore);
 	};
+#ifdef ENABLE_ASYNC_MUTEX
+	if (async_semaphore !=0) {
+		lock_set_destroy(async_semaphore);
+		lock_set_dealloc(async_semaphore);
+	}
+	async_semaphore = 0;
+#endif
 	entry_semaphore =  reply_semaphore = 0;
 
 }
@@ -212,6 +243,22 @@ int init_entry_lock( struct s_table* ht, struct entry *entry )
 	return 0;
 }
 
+int init_async_lock( struct cell *cell )
+{
+#ifdef ENABLE_ASYNC_MUTEX
+
+#ifdef GEN_LOCK_T_PREFERED
+	lock_init(&cell->async_mutex);
+#else
+	cell->async_mutex.semaphore_set=async_semaphore;
+	cell->async_mutex.semaphore_index = cell->hash_index % sem_nr;
+#endif /* GEN_LOCK_T_PREFERED */
+
+#endif /* ENABLE_ASYNC_MUTEX */
+
+	return 0;
+}
+
 int release_cell_lock( struct cell *cell )
 {
 #ifndef GEN_LOCK_T_PREFERED

+ 2 - 0
modules/tm/lock.h

@@ -60,6 +60,8 @@ void lock_cleanup(void);
 
 int init_cell_lock( struct cell *cell );
 int init_entry_lock( struct s_table* ht, struct entry *entry );
+int init_async_lock( struct cell *cell );
+
 
 int release_cell_lock( struct cell *cell );
 int release_entry_lock( struct entry *entry );

+ 1 - 1
modules/tm/t_reply.c

@@ -1144,7 +1144,7 @@ int t_pick_branch(int inc_branch, int inc_code, struct cell *t, int *res_code)
 		if ((!t->uac[b].request.buffer) && (t->uac[b].last_received>=200))
 			continue;
 		/* there is still an unfinished UAC transaction (we ignore unfinished blind UACs) wait now! */
-		if ( t->uac[b].last_received<200)
+		if ( t->uac[b].last_received<200 && !((t->flags&T_ASYNC_CONTINUE) && b==t->async_backup.blind_uac))
 			return -2;
 		/* if reply is null => t_send_branch "faked" reply, skip over it */
 		if ( rpl && 

+ 22 - 1
modules/tm/t_suspend.c

@@ -39,6 +39,14 @@
 #include "../../data_lump_rpl.h"
 
 
+#ifdef ENABLE_ASYNC_MUTEX
+#define LOCK_ASYNC_CONTINUE(_t) lock(&(_t)->async_mutex )
+#define UNLOCK_ASYNC_CONTINUE(_t) unlock(&(_t)->async_mutex )
+#else
+#define LOCK_ASYNC_CONTINUE(_t) LOCK_REPLIES(_t)
+#define UNLOCK_ASYNC_CONTINUE(_t) UNLOCK_REPLIES(_t)
+#endif
+
 /* Suspends the transaction for later use.
  * Save the returned hash_index and label to get
  * back to the SIP request processing, see the readme.
@@ -177,6 +185,13 @@ int t_continue(unsigned int hash_index, unsigned int label,
 		return 1;
 	}
 
+	/* The transaction has to be locked to protect it
+	 * form calling t_continue() multiple times simultaneously */
+	LOCK_ASYNC_CONTINUE(t);
+
+	t->flags |= T_ASYNC_CONTINUE;   /* we can now know anywhere in kamailio 
+					 * that we are executing post a suspend */
+	
 	/* which route block type were we in when we were suspended */
 	cb_type =  FAILURE_CB_TYPE;;
 	switch (t->async_backup.backup_route) {
@@ -204,6 +219,7 @@ int t_continue(unsigned int hash_index, unsigned int label,
 				/* Either t_continue() has already been
 				* called or the branch has already timed out.
 				* Needless to continue. */
+				UNLOCK_ASYNC_CONTINUE(t);
 				UNREF(t); /* t_unref would kill the transaction */
 				return 1;
 			}
@@ -373,6 +389,8 @@ int t_continue(unsigned int hash_index, unsigned int label,
 	}
 
 done:
+	UNLOCK_ASYNC_CONTINUE(t);
+
 	if(t->async_backup.backup_route != TM_ONREPLY_ROUTE){
 		/* unref the transaction */
 		t_unref(t->uas.request);
@@ -431,8 +449,11 @@ kill_trans:
 			"reply generation failed\n");
 		/* The transaction must be explicitely released,
 		 * no more timer is running */
+		UNLOCK_ASYNC_CONTINUE(t);
 		t_release_transaction(t);
-	} 
+	} else {
+		UNLOCK_ASYNC_CONTINUE(t);
+	}
 
 	if(t->async_backup.backup_route != TM_ONREPLY_ROUTE){
 		t_unref(t->uas.request);