Просмотр исходного кода

tm: restore to the disabled async mutex by define

- following commits trying to remove it completely were removing the
  internal transaction flags which were not related to mutex at all
Daniel-Constantin Mierla 10 лет назад
Родитель
Сommit
408778069d
6 измененных файлов с 73 добавлено и 9 удалено
  1. 1 0
      modules/tm/h_table.c
  2. 5 3
      modules/tm/h_table.h
  3. 49 0
      modules/tm/lock.c
  4. 1 0
      modules/tm/lock.h
  5. 1 1
      modules/tm/t_reply.c
  6. 16 5
      modules/tm/t_suspend.c

+ 1 - 0
modules/tm/h_table.c

@@ -371,6 +371,7 @@ struct cell*  build_cell( struct sip_msg* p_msg )
 
 	init_synonym_id(p_msg, new_cell->md5);
 	init_cell_lock(  new_cell );
+	init_async_lock( new_cell );
 	t_stats_created();
 	return new_cell;
 

+ 5 - 3
modules/tm/h_table.h

@@ -296,9 +296,11 @@ typedef struct async_state {
 #	define T_PASS_PROVISIONAL_FLAG (1<<11)
 #	define pass_provisional(_t_)	((_t_)->flags&T_PASS_PROVISIONAL_FLAG)
 #endif
-#define T_DISABLE_INTERNAL_REPLY (1<<12) /* don't send internal negative reply */
-#define T_ADMIN_REPLY (1<<13) /* t reply sent by admin (e.g., from cfg script) */
-#define T_ASYNC_SUSPENDED (1<<14)
+#define T_ASYNC_CONTINUE (1<<12) /* Is this transaction in a continuation after being suspended */
+
+#define T_DISABLE_INTERNAL_REPLY (1<<13) /* don't send internal negative reply */
+#define T_ADMIN_REPLY (1<<14) /* t reply sent by admin (e.g., from cfg script) */
+#define T_ASYNC_SUSPENDED (1<<15)
 
 /* unsigned short should be enough for a retr. timer: max. 65535 ms =>
  * max retr. = 65 s which should be enough and saves us 2*2 bytes */

+ 49 - 0
modules/tm/lock.c

@@ -55,6 +55,9 @@
 static int sem_nr;
 gen_lock_set_t* entry_semaphore=0;
 gen_lock_set_t* reply_semaphore=0;
+#ifdef ENABLE_ASYNC_MUTEX
+gen_lock_set_t* async_semaphore=0;
+#endif
 #endif
 
 
@@ -84,6 +87,12 @@ again:
 			lock_set_destroy(reply_semaphore);
 			lock_set_dealloc(reply_semaphore);
 		}
+#ifdef ENABLE_ASYNC_MUTEX
+		if (async_semaphore!=0){
+			lock_set_destroy(async_semaphore);
+			lock_set_dealloc(async_semaphore);
+		}
+#endif
 		if (i==0){
 			LOG(L_CRIT, "lock_initialize: could not allocate semaphore"
 					" sets\n");
@@ -137,6 +146,22 @@ again:
 			i--;
 			goto again;
 	}
+#ifdef ENABLE_ASYNC_MUTEX
+	i++;
+	if (((async_semaphore=lock_set_alloc(i))==0)||
+		(lock_set_init(async_semaphore)==0)){
+			if (async_semaphore){
+				lock_set_dealloc(async_semaphore);
+				async_semaphore=0;
+			}
+			DBG("DEBUG:lock_initialize: async semaphore initialization"
+				" failure: %s\n", strerror(errno));
+			probe_run=1;
+			i--;
+			goto again;
+	}
+#endif
+
 	/* return success */
 	LOG(L_INFO, "INFO: semaphore arrays of size %d allocated\n", sem_nr );
 #endif /* GEN_LOCK_T_PREFERED*/
@@ -175,7 +200,15 @@ void lock_cleanup()
 		lock_set_destroy(reply_semaphore);
 		lock_set_dealloc(reply_semaphore);
 	};
+#ifdef ENABLE_ASYNC_MUTEX
+	if (async_semaphore !=0) {
+		lock_set_destroy(async_semaphore);
+		lock_set_dealloc(async_semaphore);
+	}
+	async_semaphore = 0;
+#endif
 	entry_semaphore =  reply_semaphore = 0;
+
 }
 #endif /*GEN_LOCK_T_PREFERED*/
 
@@ -210,6 +243,22 @@ int init_entry_lock( struct s_table* ht, struct entry *entry )
 	return 0;
 }
 
+int init_async_lock( struct cell *cell )
+{
+#ifdef ENABLE_ASYNC_MUTEX
+
+#ifdef GEN_LOCK_T_PREFERED
+	lock_init(&cell->async_mutex);
+#else
+	cell->async_mutex.semaphore_set=async_semaphore;
+	cell->async_mutex.semaphore_index = cell->hash_index % sem_nr;
+#endif /* GEN_LOCK_T_PREFERED */
+
+#endif /* ENABLE_ASYNC_MUTEX */
+
+	return 0;
+}
+
 int release_cell_lock( struct cell *cell )
 {
 #ifndef GEN_LOCK_T_PREFERED

+ 1 - 0
modules/tm/lock.h

@@ -60,6 +60,7 @@ void lock_cleanup(void);
 
 int init_cell_lock( struct cell *cell );
 int init_entry_lock( struct s_table* ht, struct entry *entry );
+int init_async_lock( struct cell *cell );
 
 
 int release_cell_lock( struct cell *cell );

+ 1 - 1
modules/tm/t_reply.c

@@ -1144,7 +1144,7 @@ int t_pick_branch(int inc_branch, int inc_code, struct cell *t, int *res_code)
 		if ((!t->uac[b].request.buffer) && (t->uac[b].last_received>=200))
 			continue;
 		/* there is still an unfinished UAC transaction (we ignore unfinished blind UACs) wait now! */
-		if (t->uac[b].last_received<200)
+		if ( t->uac[b].last_received<200 && !((t->flags&T_ASYNC_CONTINUE) && b==t->async_backup.blind_uac))
 			return -2;
 		/* if reply is null => t_send_branch "faked" reply, skip over it */
 		if ( rpl && 

+ 16 - 5
modules/tm/t_suspend.c

@@ -39,6 +39,14 @@
 #include "../../data_lump_rpl.h"
 
 
+#ifdef ENABLE_ASYNC_MUTEX
+#define LOCK_ASYNC_CONTINUE(_t) lock(&(_t)->async_mutex )
+#define UNLOCK_ASYNC_CONTINUE(_t) unlock(&(_t)->async_mutex )
+#else
+#define LOCK_ASYNC_CONTINUE(_t) LOCK_REPLIES(_t)
+#define UNLOCK_ASYNC_CONTINUE(_t) UNLOCK_REPLIES(_t)
+#endif
+
 /* Suspends the transaction for later use.
  * Save the returned hash_index and label to get
  * back to the SIP request processing, see the readme.
@@ -179,7 +187,10 @@ int t_continue(unsigned int hash_index, unsigned int label,
 
 	/* The transaction has to be locked to protect it
 	 * form calling t_continue() multiple times simultaneously */
-	LOCK_REPLIES(t);
+	LOCK_ASYNC_CONTINUE(t);
+
+	t->flags |= T_ASYNC_CONTINUE;   /* we can now know anywhere in kamailio
+					 * that we are executing post a suspend */
 
 	/* which route block type were we in when we were suspended */
 	cb_type =  FAILURE_CB_TYPE;;
@@ -208,7 +219,7 @@ int t_continue(unsigned int hash_index, unsigned int label,
 				/* Either t_continue() has already been
 				* called or the branch has already timed out.
 				* Needless to continue. */
-				UNLOCK_REPLIES(t);
+				UNLOCK_ASYNC_CONTINUE(t);
 				UNREF(t); /* t_unref would kill the transaction */
 				return 1;
 			}
@@ -378,7 +389,7 @@ int t_continue(unsigned int hash_index, unsigned int label,
 	}
 
 done:
-	UNLOCK_REPLIES(t);
+	UNLOCK_ASYNC_CONTINUE(t);
 
 	if(t->async_backup.backup_route != TM_ONREPLY_ROUTE){
 		/* unref the transaction */
@@ -438,10 +449,10 @@ kill_trans:
 			"reply generation failed\n");
 		/* The transaction must be explicitely released,
 		 * no more timer is running */
-		UNLOCK_REPLIES(t);
+		UNLOCK_ASYNC_CONTINUE(t);
 		t_release_transaction(t);
 	} else {
-		UNLOCK_REPLIES(t);
+		UNLOCK_ASYNC_CONTINUE(t);
 	}
 
 	if(t->async_backup.backup_route != TM_ONREPLY_ROUTE){