Bladeren bron

ref_count turned into ref_bitmap; oncancel module handler added

Jiri Kuthan 24 jaren geleden
bovenliggende
commit
4bd1673d3d
13 gewijzigde bestanden met toevoegingen van 166 en 80 verwijderingen
  1. 8 0
      action.c
  2. 10 3
      config.h
  3. 5 1
      globals.h
  4. 12 2
      main.c
  5. 7 9
      modules/tm/h_table.c
  6. 33 44
      modules/tm/h_table.h
  7. 16 11
      modules/tm/t_funcs.c
  8. 46 4
      modules/tm/t_funcs.h
  9. 8 4
      modules/tm/t_lookup.c
  10. 4 1
      modules/tm/tm.c
  11. 2 0
      sr_module.h
  12. 4 1
      test/xx.cfg
  13. 11 0
      types.h

+ 8 - 0
action.c

@@ -385,6 +385,7 @@ int run_actions(struct action* a, struct sip_msg* msg)
 	struct action* t;
 	int ret;
 	static int rec_lev=0;
+	struct sr_module *mod;
 
 	rec_lev++;
 	if (rec_lev>ROUTE_MAX_REC_LEV){
@@ -408,6 +409,13 @@ int run_actions(struct action* a, struct sip_msg* msg)
 	}
 	
 	rec_lev--;
+	/* process module onbreak handlers if present */
+	if (rec_lev==0 && ret==0) 
+		for (mod=modules;mod;mod=mod->next) 
+			if (mod->exports && mod->exports->onbreak_f) {
+				mod->exports->onbreak_f( msg );
+				DBG("DEBUG: %s onbreak handler called\n", mod->exports->name);
+			}
 	return ret;
 	
 

+ 10 - 3
config.h

@@ -1,5 +1,5 @@
 /*
- *  $Id $
+ *  $Id$
  */
 
 
@@ -7,6 +7,8 @@
 #ifndef config_h
 #define config_h
 
+#include "types.h"
+
 /* default sip port if none specified */
 #define SIP_PORT 5060
 
@@ -62,12 +64,17 @@
 #define MAX_BUCKET		15
 
 /* receive buffer size -- preferably set low to
-   avoid terror of excessively huge messages
+   avoid terror of excessively huge messages; they are
+   useless anyway
 */
 #define BUF_SIZE (MAX_FIXED_BLOCK-32)
 
-/* forwarding */
+/* forwarding  -- Via buffer dimensioning */
 #define MAX_VIA_LINE_SIZE	240
 #define MAX_RECEIVED_SIZE	57
 
+/* maximum number of processes is constrained by capacity of
+   process bitmaps */
+#define MAX_PROCESSES (sizeof( process_bm_t) * 8 )
+
 #endif

+ 5 - 1
globals.h

@@ -1,5 +1,5 @@
 /*
- * $Id*
+ * $Id$
  *
  * global variables
  *
@@ -9,6 +9,8 @@
 #ifndef globals_h
 #define globals_h
 
+#include "types.h"
+
 #define NO_DNS     0
 #define DO_DNS     1
 #define DO_REV_DNS 2
@@ -32,6 +34,8 @@ extern int check_via;
 extern int received_dns;
 extern int loop_checks;
 extern int process_no;
+
+extern process_bm_t process_bit;
 extern int *pids;
 
 extern int cfg_errors;

+ 12 - 2
main.c

@@ -118,8 +118,8 @@ Options:\n\
 /* print compile-time constants */
 void print_ct_constants()
 {
-	printf("MAX_RECV_BUFFER_SIZE %d, MAX_LISTEN %d, MAX_URI_SIZE %d\n",
-		MAX_RECV_BUFFER_SIZE, MAX_LISTEN, MAX_URI_SIZE );
+	printf("MAX_RECV_BUFFER_SIZE %d, MAX_LISTEN %d, MAX_URI_SIZE %d, MAX_PROCESSES %d\n",
+		MAX_RECV_BUFFER_SIZE, MAX_LISTEN, MAX_URI_SIZE, MAX_PROCESSES );
 }
 
 /* debuging function */
@@ -166,6 +166,7 @@ int addresses_no=0;                    /* number of names/ips */
 
 /* ipc related globals */
 int process_no = 0;
+process_bm_t process_bit = 0;
 #ifdef ROUTE_SRV
 #endif
 
@@ -261,6 +262,7 @@ int main_loop()
 				if (pid==0){
 					/* child */
 					/* timer!*/
+					process_bit = 0;
 					for(;;){
 						sleep(TIMER_TICK);
 						timer_ticker();
@@ -272,6 +274,7 @@ int main_loop()
 		/* main process, receive loop */
 		is_main=1;
 		pids[0]=getpid();
+		process_bit = 1;
 		process_no=0; /*main process number*/
 		udp_rcv_loop();
 	}else{
@@ -286,6 +289,7 @@ int main_loop()
 				if (pid==0){
 					/* child */
 					process_no=i+1; /*0=main*/
+					process_bit = 1 << i;
 #ifdef STATS
 					setstats( i );
 #endif
@@ -299,6 +303,7 @@ int main_loop()
 	}
 	/*this is the main process*/
 	pids[process_no]=getpid();
+	process_bit = 0;
 	is_main=1;
 	if (timer_list){
 		for(;;){
@@ -569,6 +574,11 @@ int main(int argc, char** argv)
 
 	
 	if (children_no<=0) children_no=CHILD_NO;
+	else if (children_no >= MAX_PROCESSES ) {
+		fprintf(stderr, "ERROR: too many children processes configured; maximum is %d\n",
+			MAX_PROCESSES-1 );
+		goto error;
+	}
 	/*alloc pids*/
 #ifdef SHM_MEM
 	pids=shm_malloc(sizeof(int)*children_no);

+ 7 - 9
modules/tm/h_table.c

@@ -31,17 +31,15 @@ void free_cell( struct cell* dead_cell )
 		/* outbound requests*/
 		DBG("DEBUG: free_cell: outbound_request[%d] %p\n",i,dead_cell->outbound_request[i]);
 		if ( rb=dead_cell->outbound_request[i] )
-      		{
-/*
-			if (rb->retr_buffer) shm_free( rb->retr_buffer );
+   		{
+			if (rb->retr_buffer) shm_free_unsafe( rb->retr_buffer );
 	 		dead_cell->outbound_request[i] = NULL;
-*/
          		shm_free_unsafe( rb );
-      		}
-      		/* outbound requests*/
-      		DBG("DEBUG: free_cell: inbound_response[%d] %p\n",i,dead_cell->inbound_response[i]);
-      		if ( dead_cell -> inbound_response[i] )
-         		sip_msg_free_unsafe( dead_cell->inbound_response[i] );
+   		}
+   		/* outbound requests*/
+   		DBG("DEBUG: free_cell: inbound_response[%d] %p\n",i,dead_cell->inbound_response[i]);
+   		if ( dead_cell -> inbound_response[i] )
+       		sip_msg_free_unsafe( dead_cell->inbound_response[i] );
    	}
    	/* mutex */
    	/* release_cell_lock( dead_cell ); */

+ 33 - 44
modules/tm/h_table.h

@@ -12,6 +12,7 @@
 #include <arpa/inet.h>
 
 #include "../../msg_parser.h"
+#include "../../types.h"
 #include "config.h"
 
 struct s_table;
@@ -37,18 +38,11 @@ typedef struct retrans_buff
    int                  bufflen;
 
    struct sockaddr_in to;
-   /* changed in favour of Solaris to size_t
-   socklen_t tolen;
-   */
    size_t tolen;
 
    /* a message can be linked just to retransmission and FR list */
    struct timer_link retr_timer;
    struct timer_link fr_timer;
-/*
-   unsigned int timeout_ceiling;
-   unsigned int timeout_value;
-*/
 
    /*the cell that containes this retrans_buff*/
    struct cell* my_T;
@@ -62,43 +56,38 @@ typedef struct retrans_buff
 
 typedef struct cell
 {
-   /* linking data */
-   struct cell*     next_cell;
-   struct cell*     prev_cell;
-
-   /*sync data */
-   /*
-	/* we use hash table mutexes now */
-   /* ser_lock_t   mutex; */
-   int       ref_counter;
-
-   /* cell payload data */
-   /* tells in which hash table entry the cell lives */
-   unsigned int  hash_index;
-   /* sequence number within hash collision slot */
-   unsigned int  label;
-
-   /* bindings to wait and delete timer */
-   struct timer_link wait_tl;
-   struct timer_link dele_tl;
-
-   /*the transaction that is canceled (usefull only for CANCEL req)*/
-   struct cell *T_canceled;
-   struct cell *T_canceler;
-
-   /* usefull data */
-   /* UA Server */
-   struct sip_msg         *inbound_request;
-   struct retrans_buff   outbound_response;
-   unsigned int             status;
-   str*                             tag;
-   unsigned int             inbound_request_isACKed;
-   int                              relaied_reply_branch;
-   int                               nr_of_outgoings;
-   /* UA Clients */
-   struct retrans_buff   *outbound_request[ MAX_FORK ];
-   struct sip_msg          *inbound_response[ MAX_FORK ];
-   unsigned int             outbound_request_isACKed[MAX_FORK];
+	/* linking data */
+	struct cell*     next_cell;
+	struct cell*     prev_cell;
+
+	/* indicates which process is currently processing this transaction */
+	process_bm_t	ref_bitmap;
+	/* tells in which hash table entry the cell lives */
+	unsigned int  hash_index;
+	/* sequence number within hash collision slot */
+	unsigned int  label;
+
+	/* bindings to wait and delete timer */
+	struct timer_link wait_tl;
+	struct timer_link dele_tl;
+
+	/*the transaction that is canceled (usefull only for CANCEL req)*/
+	struct cell *T_canceled;
+	struct cell *T_canceler;
+
+	/* useful data */
+	/* UA Server */
+	struct sip_msg         *inbound_request;
+	struct retrans_buff   outbound_response;
+	unsigned int             status;
+	str*                             tag;
+	unsigned int             inbound_request_isACKed;
+	int                              relaied_reply_branch;
+	int                               nr_of_outgoings;
+	/* UA Clients */
+	struct retrans_buff   *outbound_request[ MAX_FORK ];
+	struct sip_msg          *inbound_response[ MAX_FORK ];
+	unsigned int             outbound_request_isACKed[MAX_FORK];
 
 #ifdef	EXTRA_DEBUG
 	/* scheduled for deletion ? */

+ 16 - 11
modules/tm/t_funcs.c

@@ -174,7 +174,7 @@ int t_add_transaction( struct sip_msg* p_msg, char* foo, char* bar )
    DBG("DEBUG: t_add_transaction: new transaction inserted, hash: %d\n", new_cell->hash_index );
 
    T = new_cell;
-   T->ref_counter =1;
+	T_REF(T);
    return 1;
 }
 
@@ -451,7 +451,8 @@ int t_on_reply_received( struct sip_msg  *p_msg )
 	msg_class=REPLY_CLASS(p_msg);
 	relay = t_should_relay_response( T , msg_status );
 	if (relay && !(clone=sip_msg_cloner( p_msg ))) {
-		t_unref( p_msg, NULL, NULL );
+		T_UNREF( T );
+		/* t_unref( p_msg, NULL, NULL ); */
 		return 0;
 	}
 
@@ -481,7 +482,8 @@ int t_on_reply_received( struct sip_msg  *p_msg )
            		{
                			LOG( L_ERR , "ERROR: t_on_reply_received: unable to send ACK\n" );
 						if (clone ) sip_msg_free( clone );
-						t_unref( p_msg, NULL, NULL );
+						/* t_unref( p_msg, NULL, NULL ); */
+						T_UNREF( T );
                			return 0;
            		}
        		}
@@ -493,7 +495,8 @@ int t_on_reply_received( struct sip_msg  *p_msg )
 
 	/* if the incoming response code is not reliable->drop it*/
 	if (!relay) {
-		t_unref( p_msg, NULL, NULL );
+		/* t_unref( p_msg, NULL, NULL ); */
+		T_UNREF( T );
 		return 0;
 	}
 
@@ -522,11 +525,13 @@ int t_on_reply_received( struct sip_msg  *p_msg )
    	}
 
 	/* nothing to do for the ser core */
-	t_unref( p_msg, NULL, NULL );
+	/* t_unref( p_msg, NULL, NULL ); */
+	T_UNREF( T );
 	return 0;
 
 error:
-	t_unref( p_msg, NULL, NULL );
+	/* t_unref( p_msg, NULL, NULL ); */
+	T_UNREF( T );
 	T->inbound_response[branch]=NULL;
 	sip_msg_free( clone );
 	/* don't try to relay statelessly on error */
@@ -577,7 +582,7 @@ int t_unref( struct sip_msg* p_msg, char* foo, char* bar )
 {
 	if (T==T_UNDEFINED || T==T_NULL)
 		return -1;
-	unref_T(T);
+	T_UNREF( T );
 	T=T_UNDEFINED;
 	return 1;
 }
@@ -1091,16 +1096,16 @@ void delete_cell( struct cell *p_cell )
 	}
 #endif
 	/* still in use ... don't delete */
-	if ( p_cell->ref_counter ) {
+	if ( T_IS_REFED(p_cell) ) {
 #ifdef	EXTRA_DEBUG
-		if (p_cell->ref_counter>1) {
+		if (T_REFCOUNTER(p_cell)>1) {
 			DBG("DEBUG: while debugging with a single process, ref_count > 1\n");
 			DBG("DEBUG: transaction =%p\n", p_cell );
 			abort();
 		}
 #endif
-		DBG("DEBUG: delete_cell: t=%p post for delete (%d)\n",
-			p_cell,p_cell->ref_counter);
+		DBG("DEBUG: delete_cell: t=%p post for delete (refbitmap %x, refcount %d)\n",
+			p_cell,p_cell->ref_bitmap, T_REFCOUNTER(p_cell));
 		/* it's added to del list for future del */
 		set_timer( hash_table, &(p_cell->dele_tl), DELETE_LIST );
 	} else {

+ 46 - 4
modules/tm/t_funcs.h

@@ -46,13 +46,54 @@ extern struct s_table*  hash_table;
 	__FUNCTION__, __LINE__ ); })
 
 
-/* to avoid too many locks/unlocks, we gave up using separate locks
-   for cells and use those of transaction table entries
+/* 
+  macros for reference bitmap (lock-less process non-exclusive ownership) 
 */
+#define T_IS_REFED(_T_cell) ((_T_cell)->ref_bitmap)
+#define T_REFCOUNTER(_T_cell) \
+	( { int _i=0; \
+		process_bm_t _b=(_T_cell)->ref_bitmap; \
+		while (_b) { \
+			if ( (_b) & 1 ) _i++; \
+			(_b) >>= 1; \
+		} ;\
+		(_i); \
+	 } )
+		
+
+#ifdef EXTRA_DEBUG
+#	define DBG_REF(_action, _t) DBG("DEBUG: XXXXX %s (%s:%d): T=%p , ref (bm=%x, cnt=%d)\n",\
+			(_action), __FUNCTION__, __LINE__, (_t),(_t)->ref_bitmap, T_REFCOUNTER(_t));
+#	define T_UNREF(_T_cell) \
+	( { \
+		DBG_REF("unref", (_T_cell)); \
+		if (!T_IS_REFED(_T_cell)) { \
+			DBG("ERROR: unrefering unrefered transaction %p from %s , %s : %d\n", \
+				(_T_cell), __FUNCTION__, __FILE__, __LINE__ ); \
+			abort(); \
+		} \
+		(_T_cell)->ref_bitmap &= ~process_bit; \
+	} )
+
+#	define T_REF(_T_cell) \
+	( { \
+		DBG_REF("ref", (_T_cell));	 \
+		if (T_IS_REFED(_T_cell)) { \
+			DBG("ERROR: refering already refered transaction %p from %s , %s : %d\n", \
+				(_T_cell), __FUNCTION__, __FILE__, __LINE__ ); \
+			abort(); \
+		} \
+		(_T_cell)->ref_bitmap |= process_bit; \
+	} )
+#else
+#	define T_UNREF(_T_cell) ({ (_T_cell)->ref_bitmap &= ~process_bit; })
+#	define T_REF(_T_cell) ({ (_T_cell)->ref_bitmap |= process_bit; })
+#endif
 
-#define DBG_REF(_action, _t) DBG("DEBUG: XXXXX %s (%s:%d): T=%p , ref=%d\n",\
-			(_action), __FUNCTION__, __LINE__, (_t),(_t)->ref_counter);
 
+	
+
+#ifdef _OLD_XX
 #define unref_T(_T_cell) \
 	( {\
 		lock( hash_table->entrys[(_T_cell)->hash_index].mutex );\
@@ -67,6 +108,7 @@ extern struct s_table*  hash_table;
 */
 #define ref_T(_T_cell) ({ ((_T_cell)->ref_counter++); \
 		DBG_REF("ref", (_T_cell));	})
+#endif
 
 
 int   tm_startup();

+ 8 - 4
modules/tm/t_lookup.c

@@ -130,9 +130,10 @@ int t_lookup_request( struct sip_msg* p_msg )
 
 found:
 	T=p_cell;
-	ref_T( T );
+	/* ref_T( T ); */
+	T_REF( T );
 	DBG("DEBUG:XXXXXXXXXXXXXXXXXXXXX t_lookup_request: "
-			"transaction found ( T=%p , ref=%d)\n",T,T->ref_counter);
+			"transaction found ( T=%p , ref=%x)\n",T,T->ref_bitmap);
 	unlock( hash_table->entrys[hash_index].mutex );
 	return 1;
 }
@@ -285,9 +286,10 @@ int t_reply_matching( struct sip_msg *p_msg , unsigned int *p_branch )
           T = p_cell;
           *p_branch = branch_id;
           /* T->ref_counter ++; */
-		  ref_T( T );
+		  /* ref_T( T ); */
+			T_REF( T );
           unlock( hash_table->entrys[hash_index].mutex );
-          DBG("DEBUG:XXXXXXXXXXXXXXXXXXXXX t_reply_matching: reply matched (T=%p, ref=%d)!\n",T,T->ref_counter);
+          DBG("DEBUG:XXXXXXXXXXXXXXXXXXXXX t_reply_matching: reply matched (T=%p, ref=%x)!\n",T,T->ref_bitmap);
         return 1;
       }
       /* next cell */
@@ -324,8 +326,10 @@ int t_check( struct sip_msg* p_msg , int *param_branch)
    if ( p_msg->id != global_msg_id || T==T_UNDEFINED )
    {
       global_msg_id = p_msg->id;
+	/*
       if ( T && T!=T_UNDEFINED )
          unref_T(T);
+	*/
       T = T_UNDEFINED;
       /* transaction lookup */
      if ( p_msg->first_line.type==SIP_REQUEST ) {

+ 4 - 1
modules/tm/tm.c

@@ -30,6 +30,8 @@ static int fixup_t_forward(void** param, int param_no);
 static int fixup_t_forward_def(void** param, int param_no);
 static int fixup_t_send_reply(void** param, int param_no);
 
+static void w_onbreak(struct sip_msg* msg) { t_unref(msg, NULL, NULL); }
+
 static struct module_exports nm_exports= {
 	"tm_module",
 	(char*[]){			"t_add_transaction",
@@ -77,7 +79,8 @@ static struct module_exports nm_exports= {
 		},
 	9,
 	(response_function) t_on_reply_received,
-	(destroy_function) tm_shutdown
+	(destroy_function) tm_shutdown,
+	w_onbreak
 };
 
 

+ 2 - 0
sr_module.h

@@ -12,6 +12,7 @@ typedef  struct module_exports* (*module_register)();
 typedef  int (*cmd_function)(struct sip_msg*, char*, char*);
 typedef  int (*fixup_function)(void** param, int param_no);
 typedef  int (*response_function)(struct sip_msg*);
+typedef  void (*onbreak_function)(struct sip_msg*);
 typedef void (*destroy_function)();
 
 struct module_exports{
@@ -29,6 +30,7 @@ struct module_exports{
 	destroy_function destroy_f; /*function called when the module should
 								  be "destroyed", e.g: on ser exit;
 								  can be null */
+	onbreak_function onbreak_f;
 };
 
 struct sr_module{

+ 4 - 1
test/xx.cfg

@@ -9,7 +9,9 @@ log_stderror=yes # (cmd line: -E)
 check_via=yes     # (cmd. line: -v)
 dns=on           # (cmd. line: -r)
 rev_dns=yes      # (cmd. line: -R)
-fork=no          # (cmd. line: -D)
+fork=yes
+children=4
+#fork=no          # (cmd. line: -D)
 port=5080
 #listen=127.0.0.1
 listen=192.168.99.100
@@ -47,6 +49,7 @@ route[0]{
 			};
 #			t_forward("195.37.77.100", "5090" );
 			t_forward("195.37.78.146", "5060" );
+			break;
 			t_unref();
 		};
 	};

+ 11 - 0
types.h

@@ -0,0 +1,11 @@
+/*
+ * $Id$
+ *
+ */
+
+#ifndef _TYPES_H_
+#define _TYPES_H_
+
+typedef unsigned int process_bm_t;
+
+#endif