فهرست منبع

- fork_process & fork_tcp_process fixes
- reverted to the old fork()-in-parallel behaviour
(uncomment FORK_DONT_WAIT for the "serial" fork()).

Andrei Pelinescu-Onciul 19 سال پیش
والد
کامیت
144c394f42
5فایلهای تغییر یافته به همراه129 افزوده شده و 65 حذف شده
  1. 1 1
      Makefile.defs
  2. 7 3
      NEWS
  3. 4 2
      modules/tm/doc/functions.xml
  4. 116 58
      pt.c
  5. 1 1
      tcp_main.c

+ 1 - 1
Makefile.defs

@@ -67,7 +67,7 @@ MAIN_NAME=ser
 VERSION = 0
 VERSION = 0
 PATCHLEVEL = 10
 PATCHLEVEL = 10
 SUBLEVEL =   99
 SUBLEVEL =   99
-EXTRAVERSION = -dev46-dns_cache
+EXTRAVERSION = -dev47-dns_cache
 
 
 SER_VER = $(shell expr $(VERSION) \* 1000000 + $(PATCHLEVEL) \* 1000 + \
 SER_VER = $(shell expr $(VERSION) \* 1000000 + $(PATCHLEVEL) \* 1000 + \
 			$(SUBLEVEL) )
 			$(SUBLEVEL) )

+ 7 - 3
NEWS

@@ -46,15 +46,19 @@ modules:
                          (failure_route only).
                          (failure_route only).
                        - t_branch_replied() -- returns true if the failure 
                        - t_branch_replied() -- returns true if the failure 
                          route is executed for a branch that did receive at
                          route is executed for a branch that did receive at
-                         least one reply (failure_route only).. It can be used
+                         least one reply in the past (the current reply 
+                          is not taken into account). It can be used
                          together with t_branch_timeout() to distinguish 
                          together with t_branch_timeout() to distinguish 
                          between a remote side that doesn't respond (some 
                          between a remote side that doesn't respond (some 
                          provisional reply received) and one that is completely
                          provisional reply received) and one that is completely
-                          dead.
+                          dead. (failure_route only)
                        - t_any_timeout() -- returns true if any of the current
                        - t_any_timeout() -- returns true if any of the current
                          transaction branches did timeout.
                          transaction branches did timeout.
                        - t_any_replied() -- returns true if at least one branch
                        - t_any_replied() -- returns true if at least one branch
-                          of the current transaction received one reply.
+                          of the current transaction received one reply in the
+                          past. If called from a failure_route or an
+                          onreply_route, the "current" reply is not taken into
+                          account.
                        - t_is_canceled() -- returns true if the current 
                        - t_is_canceled() -- returns true if the current 
                          transaction  has been canceled.
                          transaction  has been canceled.
                        - new t_set_fr(timeout_fr_inv, timeout_fr) -- allows
                        - new t_set_fr(timeout_fr_inv, timeout_fr) -- allows

+ 4 - 2
modules/tm/doc/functions.xml

@@ -441,7 +441,8 @@ failure_route[0]{
 	</title>
 	</title>
 	<para>
 	<para>
 		Returns true if the failure route is executed for a branch that did
 		Returns true if the failure route is executed for a branch that did
-		receive at least one reply. It can be used only from the 
+		receive at least one reply in the past (the "current" reply is not 
+		taken into account). It can be used only from the 
 		<emphasis>failure_route</emphasis>.
 		<emphasis>failure_route</emphasis>.
 	</para>
 	</para>
 	<example>
 	<example>
@@ -491,7 +492,8 @@ failure_route[0]{
 	</title>
 	</title>
 	<para>
 	<para>
 		Returns true if at least one of the current transactions branches
 		Returns true if at least one of the current transactions branches
-		did receive some reply.
+		did receive some reply in the past. If called from a failure or
+		onreply route, the "current" reply is not taken into account.
 	</para>
 	</para>
 	<example>
 	<example>
 	    <title><function>t_any_replied</function> usage</title>
 	    <title><function>t_any_replied</function> usage</title>

+ 116 - 58
pt.c

@@ -41,6 +41,12 @@
 #include "sr_module.h"
 #include "sr_module.h"
 
 
 #include <stdio.h>
 #include <stdio.h>
+
+#define FORK_DONT_WAIT  /* child doesn't wait for parent before starting 
+						   => faster startup, but the child should not assume
+						   the parent fixed the pt[] entry for it */
+
+
 #ifdef PROFILING
 #ifdef PROFILING
 #include <sys/gmon.h>
 #include <sys/gmon.h>
 
 
@@ -108,6 +114,8 @@ int my_pid()
 	return pt ? pt[process_no].pid : getpid();
 	return pt ? pt[process_no].pid : getpid();
 }
 }
 
 
+
+
 /**
 /**
  * Forks a new process.
  * Forks a new process.
  * @param child_id - rank, if equal to PROC_NOCHLDINIT init_child will not be
  * @param child_id - rank, if equal to PROC_NOCHLDINIT init_child will not be
@@ -118,77 +126,100 @@ int my_pid()
  */
  */
 int fork_process(int child_id, char *desc, int make_sock)
 int fork_process(int child_id, char *desc, int make_sock)
 {
 {
-	int pid,old_process_no;
+	int pid, child_process_no;
+	int ret;
 #ifdef USE_TCP
 #ifdef USE_TCP
 	int sockfd[2];
 	int sockfd[2];
 #endif
 #endif
 
 
-	lock_get(process_lock);	
-	if (*process_count>=estimated_proc_no) {
-		LOG(L_CRIT, "ERROR: fork_process(): Process limit of %d exceeded."
-					" Will simulate fork fail.\n", estimated_proc_no);
-		lock_release(process_lock);
-		return -1;
-	}	
-	
+	ret=-1;
 	#ifdef USE_TCP
 	#ifdef USE_TCP
+		sockfd[0]=sockfd[1]=-1;
 		if(make_sock && !tcp_disable){
 		if(make_sock && !tcp_disable){
 			 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockfd)<0){
 			 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockfd)<0){
 				LOG(L_ERR, "ERROR: fork_process(): socketpair failed: %s\n",
 				LOG(L_ERR, "ERROR: fork_process(): socketpair failed: %s\n",
-					strerror(errno));
-				return -1;
+							strerror(errno));
+				goto error;
 			}
 			}
 		}
 		}
 	#endif
 	#endif
+	lock_get(process_lock);
+	if (*process_count>=estimated_proc_no) {
+		LOG(L_CRIT, "ERROR: fork_process(): Process limit of %d exceeded."
+					" Will simulate fork fail.\n", estimated_proc_no);
+		lock_release(process_lock);
+		goto error;
+	}	
 	
 	
-	old_process_no = process_no;
-	process_no = *process_count;
+	
+	child_process_no = *process_count;
 	pid = fork();
 	pid = fork();
 	if (pid<0) {
 	if (pid<0) {
 		lock_release(process_lock);
 		lock_release(process_lock);
-		return pid;
-	}
-	if (pid==0){
+		ret=pid;
+		goto error;
+	}else if (pid==0){
 		/* child */
 		/* child */
+		process_no=child_process_no;
 #ifdef PROFILING
 #ifdef PROFILING
 		monstartup((u_long) &_start, (u_long) &etext);
 		monstartup((u_long) &_start, (u_long) &etext);
 #endif
 #endif
+#ifdef FORK_DONT_WAIT
+		/* record pid twice to avoid the child using it, before
+		 * parent gets a chance to set it*/
+		pt[process_no].pid=getpid();
+#else
 		/* wait for parent to get out of critical zone.
 		/* wait for parent to get out of critical zone.
 		 * this is actually relevant as the parent updates
 		 * this is actually relevant as the parent updates
 		 * the pt & process_count. */
 		 * the pt & process_count. */
 		lock_get(process_lock);
 		lock_get(process_lock);
+		lock_release(process_lock);	
+#endif
 		#ifdef USE_TCP
 		#ifdef USE_TCP
 			if (make_sock && !tcp_disable){
 			if (make_sock && !tcp_disable){
 				close(sockfd[0]);
 				close(sockfd[0]);
 				unix_tcp_sock=sockfd[1];
 				unix_tcp_sock=sockfd[1];
 			}
 			}
 		#endif		
 		#endif		
-		lock_release(process_lock);	
 		if ((child_id!=PROC_NOCHLDINIT) && (init_child(child_id) < 0)) {
 		if ((child_id!=PROC_NOCHLDINIT) && (init_child(child_id) < 0)) {
-			LOG(L_ERR, "ERROR: fork_process(): init_child failed for %s\n",
-						pt[process_no].desc);
+			LOG(L_ERR, "ERROR: fork_process(): init_child failed for "
+					" process %d, pid %d, \"%s\"\n", process_no,
+					pt[process_no].pid, pt[process_no].desc);
 			return -1;
 			return -1;
 		}
 		}
 		return pid;
 		return pid;
 	} else {
 	} else {
 		/* parent */
 		/* parent */
-		process_no = old_process_no;
+		(*process_count)++;
+#ifdef FORK_DONT_WAIT
+		lock_release(process_lock);
+#endif
 		/* add the process to the list in shm */
 		/* add the process to the list in shm */
-		pt[*process_count].pid=pid;
+		pt[child_process_no].pid=pid;
 		if (desc){
 		if (desc){
-			strncpy(pt[*process_count].desc, desc, MAX_PT_DESC);
+			strncpy(pt[child_process_no].desc, desc, MAX_PT_DESC);
 		}
 		}
 		#ifdef USE_TCP
 		#ifdef USE_TCP
 			if (make_sock && !tcp_disable){
 			if (make_sock && !tcp_disable){
 				close(sockfd[1]);
 				close(sockfd[1]);
-				pt[*process_count].unix_sock=sockfd[0];
-				pt[*process_count].idx=-1; /* this is not "tcp" process*/
+				pt[child_process_no].unix_sock=sockfd[0];
+				pt[child_process_no].idx=-1; /* this is not "tcp" process*/
 			}
 			}
-		#endif		
-		*process_count = (*process_count) +1;
+		#endif
+#ifdef FORK_DONT_WAIT
+#else
 		lock_release(process_lock);
 		lock_release(process_lock);
-		return pid;
+#endif
+		ret=pid;
+		goto end;
 	}
 	}
+error:
+#ifdef USE_TCP
+	if (sockfd[0]!=-1) close(sockfd[0]);
+	if (sockfd[1]!=-1) close(sockfd[1]);
+#endif
+end:
+	return ret;
 }
 }
 
 
 /**
 /**
@@ -199,31 +230,27 @@ int fork_process(int child_id, char *desc, int make_sock)
  * @returns the pid of the new process
  * @returns the pid of the new process
  */
  */
 #ifdef USE_TCP
 #ifdef USE_TCP
-int fork_tcp_process(int child_id,char *desc,int r,int *reader_fd_1)
+int fork_tcp_process(int child_id, char *desc, int r, int *reader_fd_1)
 {
 {
-	int pid,old_process_no;
+	int pid, child_process_no;
 	int sockfd[2];
 	int sockfd[2];
 	int reader_fd[2]; /* for comm. with the tcp children read  */
 	int reader_fd[2]; /* for comm. with the tcp children read  */
-
-
+	int ret;
 	
 	
-	lock_get(process_lock);
-	/* set the local process_no */
-	if (*process_count>=estimated_proc_no) {
-		LOG(L_CRIT, "ERROR: fork_tcp_process(): Process limit of %d exceeded."
-					" Simulating fork fail\n", estimated_proc_no);
-		return -1;
-	}	
+	/* init */
+	sockfd[0]=sockfd[1]=-1;
+	reader_fd[0]=reader_fd[1]=-1;
+	ret=-1;
 	
 	
 	if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockfd)<0){
 	if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockfd)<0){
 		LOG(L_ERR, "ERROR: fork_tcp_process(): socketpair failed: %s\n",
 		LOG(L_ERR, "ERROR: fork_tcp_process(): socketpair failed: %s\n",
 					strerror(errno));
 					strerror(errno));
-		return -1;
+		goto error;
 	}
 	}
 	if (socketpair(AF_UNIX, SOCK_STREAM, 0, reader_fd)<0){
 	if (socketpair(AF_UNIX, SOCK_STREAM, 0, reader_fd)<0){
 		LOG(L_ERR, "ERROR: fork_tcp_process(): socketpair failed: %s\n",
 		LOG(L_ERR, "ERROR: fork_tcp_process(): socketpair failed: %s\n",
 					strerror(errno));
 					strerror(errno));
-		return -1;
+		goto error;
 	}
 	}
 	if (tcp_fix_child_sockets(reader_fd)<0){
 	if (tcp_fix_child_sockets(reader_fd)<0){
 		LOG(L_ERR, "ERROR: fork_tcp_process(): failed to set non blocking"
 		LOG(L_ERR, "ERROR: fork_tcp_process(): failed to set non blocking"
@@ -231,54 +258,85 @@ int fork_tcp_process(int child_id,char *desc,int r,int *reader_fd_1)
 		/* continue, it's not critical (it will go slower under
 		/* continue, it's not critical (it will go slower under
 		 * very high connection rates) */
 		 * very high connection rates) */
 	}
 	}
+	lock_get(process_lock);
+	/* set the local process_no */
+	if (*process_count>=estimated_proc_no) {
+		LOG(L_CRIT, "ERROR: fork_tcp_process(): Process limit of %d exceeded."
+					" Simulating fork fail\n", estimated_proc_no);
+		lock_release(process_lock);
+		goto error;
+	}
+	
 	
 	
-	old_process_no = process_no;
-	process_no = *process_count;
+	child_process_no = *process_count;
 	pid = fork();
 	pid = fork();
 	if (pid<0) {
 	if (pid<0) {
 		lock_release(process_lock);
 		lock_release(process_lock);
-		return pid;
+		ret=pid;
+		goto end;
 	}
 	}
 	if (pid==0){
 	if (pid==0){
+		process_no=child_process_no;
 #ifdef PROFILING
 #ifdef PROFILING
 		monstartup((u_long) &_start, (u_long) &etext);
 		monstartup((u_long) &_start, (u_long) &etext);
 #endif
 #endif
+#ifdef FORK_DONT_WAIT
+		/* record pid twice to avoid the child using it, before
+-		 * parent gets a chance to set it*/
+		pt[process_no].pid=getpid();
+#else
 		/* wait for parent to get out of critical zone */
 		/* wait for parent to get out of critical zone */
 		lock_get(process_lock);
 		lock_get(process_lock);
-			close(sockfd[0]);
-			unix_tcp_sock=sockfd[1];
-			if (reader_fd_1) *reader_fd_1=reader_fd[1];
 		lock_release(process_lock);
 		lock_release(process_lock);
-		if (init_child(child_id) < 0) {
+#endif
+		close(sockfd[0]);
+		unix_tcp_sock=sockfd[1];
+		close(reader_fd[0]);
+		if (reader_fd_1) *reader_fd_1=reader_fd[1];
+		if ((child_id!=PROC_NOCHLDINIT) && (init_child(child_id) < 0)) {
 			LOG(L_ERR, "ERROR: fork_tcp_process(): init_child failed for "
 			LOG(L_ERR, "ERROR: fork_tcp_process(): init_child failed for "
-					"%s\n", pt[process_no].desc);
+					"process %d, pid %d, \"%s\"\n", process_no, 
+					pt[process_no].pid, pt[process_no].desc);
 			return -1;
 			return -1;
 		}
 		}
 		return pid;
 		return pid;
 	} else {
 	} else {
-		/* parent */		
-		process_no = old_process_no;
+		/* parent */
+		(*process_count)++;
+#ifdef FORK_DONT_WAIT
+		lock_release(process_lock);
+#endif
 		/* add the process to the list in shm */
 		/* add the process to the list in shm */
-		pt[*process_count].pid=pid;
-		pt[*process_count].unix_sock=sockfd[0];
-		pt[*process_count].idx=r; 	
+		pt[child_process_no].pid=pid;
+		pt[child_process_no].unix_sock=sockfd[0];
+		pt[child_process_no].idx=r;
 		if (desc){
 		if (desc){
-			snprintf(pt[*process_count].desc, MAX_PT_DESC, "%s child=%d", 
+			snprintf(pt[child_process_no].desc, MAX_PT_DESC, "%s child=%d", 
 						desc, r);
 						desc, r);
 		}
 		}
+#ifdef FORK_DONT_WAIT
+#else
+		lock_release(process_lock);
+#endif
 		
 		
 		close(sockfd[1]);
 		close(sockfd[1]);
 		close(reader_fd[1]);
 		close(reader_fd[1]);
 		
 		
 		tcp_children[r].pid=pid;
 		tcp_children[r].pid=pid;
-		tcp_children[r].proc_no=process_no;
+		tcp_children[r].proc_no=child_process_no;
 		tcp_children[r].busy=0;
 		tcp_children[r].busy=0;
 		tcp_children[r].n_reqs=0;
 		tcp_children[r].n_reqs=0;
 		tcp_children[r].unix_sock=reader_fd[0];
 		tcp_children[r].unix_sock=reader_fd[0];
 		
 		
-		*process_count = (*process_count) +1;
-		lock_release(process_lock);
-		return pid;
+		ret=pid;
+		goto end;
 	}
 	}
+error:
+	if (sockfd[0]!=-1) close(sockfd[0]);
+	if (sockfd[1]!=-1) close(sockfd[1]);
+	if (reader_fd[0]!=-1) close(reader_fd[0]);
+	if (reader_fd[1]!=-1) close(reader_fd[1]);
+end:
+	return ret;
 }
 }
 #endif
 #endif

+ 1 - 1
tcp_main.c

@@ -1986,7 +1986,7 @@ int tcp_init_children()
 	/* fork children & create the socket pairs*/
 	/* fork children & create the socket pairs*/
 	for(r=0; r<tcp_children_no; r++){
 	for(r=0; r<tcp_children_no; r++){
 		child_rank++;
 		child_rank++;
-		pid=fork_tcp_process(child_rank,"tcp receiver",1,&reader_fd_1);
+		pid=fork_tcp_process(child_rank, "tcp receiver", r, &reader_fd_1);
 		if (pid<0){
 		if (pid<0){
 			LOG(L_ERR, "ERROR: tcp_main: fork failed: %s\n",
 			LOG(L_ERR, "ERROR: tcp_main: fork failed: %s\n",
 					strerror(errno));
 					strerror(errno));