浏览代码

- tcp major changes part 1: support for >1024 connection, better io poll model
(best poll method selected automatically, there is also an option to enforce
it). So far support for epoll (linux >= 2.5.66), sigio + real time signals
(linux), poll , select. kqueue (*bsd) and /dev/poll (solaris) comming soon.
WARNING: this is still work in progress, the tcp reader part is still not
converted to he new model (this means that while the tcp_main process supports
> 1024 fds, the tcp childs don't), the main reason for leaving this out for
now is debugging.
Still to do: config options for poll_method (for now use -W method if you
want to force one), config options for tcp timeouts a.s.o.

Andrei Pelinescu-Onciul 20 年之前
父节点
当前提交
0ba367ec24
共有 14 个文件被更改,包括 1940 次插入368 次删除
  1. 28 1
      Makefile.defs
  2. 3 0
      fifo_server.c
  3. 5 2
      globals.h
  4. 492 0
      io_wait.c
  5. 663 0
      io_wait.h
  6. 34 2
      main.c
  7. 2 2
      parser/msg_parser.h
  8. 49 10
      pass_fd.c
  9. 2 2
      pass_fd.h
  10. 61 0
      poll_types.h
  11. 1 0
      tcp_conn.h
  12. 596 348
      tcp_main.c
  13. 1 1
      tcp_read.c
  14. 3 0
      unixsock_server.c

+ 28 - 1
Makefile.defs

@@ -55,7 +55,7 @@ MAIN_NAME=ser
 VERSION = 0
 PATCHLEVEL = 10
 SUBLEVEL =   99
-EXTRAVERSION = -dev7
+EXTRAVERSION = -dev8-new_tcp
 
 RELEASE=$(VERSION).$(PATCHLEVEL).$(SUBLEVEL)$(EXTRAVERSION)
 OS = $(shell uname -s | sed -e s/SunOS/solaris/ | tr "[A-Z]" "[a-z]")
@@ -913,6 +913,18 @@ ifeq ($(OS), linux)
 		DEFS+= -DUSE_SYSV_SEM  # try posix sems
 		found_lock_method=yes
 	endif
+	# check for 2.6
+	ifneq ($(shell echo "$(OSREL)"|grep "^2\.6\."),)
+		ifeq ($(NO_EPOLL),)
+			DEFS+=-DHAVE_EPOLL
+		endif
+	endif
+	ifeq ($(NO_SIGIO),)
+		DEFS+=-DHAVE_SIGIO_RT
+	endif
+	ifeq ($(NO_SELECT),)
+		DEFS+=-DHAVE_SELECT
+	endif
 endif
 
 ifeq  ($(OS), solaris)
@@ -922,6 +934,9 @@ ifeq  ($(OS), solaris)
 		DEFS+= -DUSE_PTHREAD_MUTEX  # try pthread sems
 		found_lock_method=yes
 	endif
+	ifeq ($(NO_SELECT),)
+		DEFS+=-DHAVE_SELECT
+	endif
 	ifeq ($(mode), release)
 		#use these only if you're using gcc with Solaris ld
 		#LDFLAGS=-O2 $(PROFILE)
@@ -956,6 +971,9 @@ ifeq ($(OS), freebsd)
 	else
 		LIBS= -lfl  #dlopen is in libc
 	endif
+	ifeq ($(NO_SELECT),)
+		DEFS+=-DHAVE_SELECT
+	endif
 	YACC=yacc
 endif
 
@@ -966,6 +984,9 @@ ifeq ($(OS), openbsd)
 		DEFS+= -DUSE_PTHREAD_MUTEX  # try pthread sems
 		found_lock_method=yes
 	endif
+	ifeq ($(NO_SELECT),)
+		DEFS+=-DHAVE_SELECT
+	endif
 	# (symbols on openbsd are prefixed by "_")
 	YACC=yacc
 	# no sched_yield on openbsd unless linking with c_r (not recommended)
@@ -995,6 +1016,9 @@ ifeq ($(OS), netbsd)
 		DEFS+= -DUSE_SYSV_SEM  # try pthread sems
 		found_lock_method=yes
 	endif
+	ifeq ($(NO_SELECT),)
+		DEFS+=-DHAVE_SELECT
+	endif
 	YACC=yacc
 	LIBS= -lfl 
 endif
@@ -1014,6 +1038,9 @@ ifeq ($(OS), darwin)
 	else
 		LIBS= -lfl -lresolv  #dlopen is in libc
 	endif
+	ifeq ($(NO_SELECT),)
+		DEFS+=-DHAVE_SELECT
+	endif
 	LDFLAGS=        # darwin doesn't like -O2 or -E
 	MOD_LDFLAGS= -bundle -bundle_loader ../../$(MAIN_NAME)
 	YACC=yacc

+ 3 - 0
fifo_server.c

@@ -672,6 +672,9 @@ int start_fifo_server()
 		return -1;
 	}
 	if (fifo_pid==0) { /* child == FIFO server */
+		/* record pid twice to avoid the child using it, before
+		 * parent gets a chance to set it*/
+		pt[process_no].pid=getpid();
 		LOG(L_INFO, "INFO: fifo process starting: %d\n", getpid());
 		/* call per-child module initialization too -- some
 		   FIFO commands may need it

+ 5 - 2
globals.h

@@ -36,6 +36,7 @@
 #include "types.h"
 #include "ip_addr.h"
 #include "str.h"
+#include "poll_types.h"
 
 #define NO_DNS     0
 #define DO_DNS     1
@@ -80,6 +81,8 @@ extern int tcp_disable;
 extern int tcp_accept_aliases;
 extern int tcp_connect_timeout;
 extern int tcp_send_timeout;
+extern enum poll_types tcp_poll_method;
+extern int tcp_max_fd_no;
 #endif
 #ifdef USE_TLS
 extern int tls_disable;
@@ -119,9 +122,9 @@ extern unsigned int msg_no;
 extern unsigned long shm_mem_size;
 
 /* FIFO server config */
-char extern *fifo; /* FIFO name */
+extern char *fifo; /* FIFO name */
 extern int fifo_mode;
-char extern *fifo_dir; /* dir. where  reply fifos are allowed */
+extern char *fifo_dir; /* dir. where  reply fifos are allowed */
 extern char *fifo_db_url;  /* db url used by db_fifo interface */
 
 /* UNIX domain socket configuration */

+ 492 - 0
io_wait.c

@@ -0,0 +1,492 @@
+/* 
+ * $Id$
+ * 
+ * Copyright (C) 2005 iptelorg GmbH
+ *
+ * This file is part of ser, a free SIP server.
+ *
+ * ser is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * For a license to use the ser software under conditions
+ * other than those described here, or to purchase support for this
+ * software, please contact iptel.org by e-mail at the following addresses:
+ *    [email protected]
+ *
+ * ser is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+/* 
+ * tcp io wait common stuff used by tcp_main.c & tcp_read.c
+ * (see io_wait.h)
+ */
+/* 
+ * History:
+ * --------
+ *  2005-06-15  created by andrei
+ */
+
+
+
+#ifdef USE_TCP /* for now it make sense only with tcp */
+
+#ifdef HAVE_EPOLL
+#include <unistd.h> /* close() */
+#endif
+
+#include <sys/utsname.h> /* uname() */
+#include <stdlib.h> /* strtol() */
+#include "io_wait.h"
+
+
+#include "mem/mem.h"
+
+#ifndef local_malloc
+#define local_malloc pkg_malloc
+#endif
+#ifndef local_free
+#define local_free pkg_free
+#endif
+
+char* poll_support="poll"
+#ifdef HAVE_EPOLL
+", epoll_lt, epoll_et"
+#endif
+#ifdef HAVE_SIGIO_RT
+", sigio_rt"
+#endif
+#ifdef HAVE_SELECT
+", select"
+#endif
+#ifdef HAVE_KQUEUE
+", kqueue"
+#endif
+#ifdef HAVE_DEVPOLL
+", /dev/poll"
+#endif
+;
+
+
+char* poll_method_str[POLL_END]={ "none", "poll", "epoll_lt", "epoll_et", 
+								  "sigio_rt", "select", "kqueue",  "/dev/poll"
+								};
+
+#ifdef HAVE_SIGIO_RT
+static int _sigio_init=0;
+static int _sigio_crt_rtsig;
+static sigset_t _sigio_rtsig_used;
+#endif
+
+
+
+#ifdef HAVE_SIGIO_RT
+/* sigio specific init
+ * returns -1 on error, 0 on success */
+static int init_sigio(io_wait_h* h, int rsig)
+{
+	int r;
+	int n;
+	int signo;
+	int start_sig;
+	sigset_t oldset;
+	
+	if (!_sigio_init){
+		_sigio_init=1;
+		_sigio_crt_rtsig=SIGRTMIN;
+		sigemptyset(&_sigio_rtsig_used);
+	}
+	h->signo=0;
+	
+	if (rsig==0){
+		start_sig=_sigio_crt_rtsig;
+		n=SIGRTMAX-SIGRTMIN;
+	}else{
+		if ((rsig < SIGRTMIN) || (rsig >SIGRTMAX)){
+			LOG(L_CRIT, "ERROR: init_sigio: real time signal %d out of"
+					          " range  [%d, %d]\n", rsig, SIGRTMIN, SIGRTMAX);
+			goto error;
+		}
+		start_sig=rsig;
+		n=0;
+	}
+	
+	sigemptyset(&h->sset);
+	sigemptyset(&oldset);
+retry1:
+	/* get current block mask */
+	if (sigprocmask(SIG_BLOCK, &h->sset, &oldset )==-1){
+		if (errno==EINTR) goto retry1;
+		LOG(L_ERR, "ERROR: init_sigio: 1st sigprocmask failed: %s [%d]\n",
+				strerror(errno), errno);
+		/* try to continue */
+	}
+	
+	for (r=start_sig; r<=(n+start_sig); r++){
+		signo=(r>SIGRTMAX)?r-SIGRTMAX+SIGRTMIN:r;
+		if (! sigismember(&_sigio_rtsig_used, signo) &&
+			! sigismember(&oldset, signo)){
+			sigaddset(&_sigio_rtsig_used, signo);
+			h->signo=signo;
+			_sigio_crt_rtsig=(signo<SIGRTMAX)?signo+1:SIGRTMIN;
+			break;
+		}
+	}
+	
+	if (h->signo==0){
+			LOG(L_CRIT, "ERROR: init_sigio: %s\n",
+					rsig?"could not assign requested real-time signal":
+						 "out of real-time signals");
+			goto error;
+	}
+
+	DBG("init_sigio: trying signal %d... \n", h->signo);
+	
+	if (sigaddset(&h->sset, h->signo)==-1){
+		LOG(L_ERR, "ERROR: init_sigio: sigaddset failed for %d: %s [%d]\n",
+				h->signo, strerror(errno), errno);
+		goto error;
+	}
+	if (sigaddset(&h->sset, SIGIO)==-1){
+		LOG(L_ERR, "ERROR: init_sigio: sigaddset failed for %d: %s [%d]\n",
+				SIGIO, strerror(errno), errno);
+		goto error;
+	}
+retry:
+	if (sigprocmask(SIG_BLOCK, &h->sset, 0)==-1){
+		if (errno==EINTR) goto retry;
+		LOG(L_ERR, "ERROR: init_sigio: sigprocmask failed: %s [%d]\n",
+				strerror(errno), errno);
+		goto error;
+	}
+	return 0;
+error:
+	h->signo=0;
+	sigemptyset(&h->sset);
+	return -1;
+}
+
+
+
+/* sigio specific destroy */
+static void destroy_sigio(io_wait_h* h)
+{
+	if (h->signo){
+		sigprocmask(SIG_UNBLOCK, &h->sset, 0);
+		sigemptyset(&h->sset);
+		sigdelset(&_sigio_rtsig_used, h->signo);
+		h->signo=0;
+	}
+}
+#endif
+
+
+
+#ifdef HAVE_EPOLL
+/* epoll specific init
+ * returns -1 on error, 0 on success */
+static int init_epoll(io_wait_h* h)
+{
+	h->epfd=epoll_create(h->max_fd_no);
+	if (h->epfd==-1){
+		LOG(L_ERR, "ERROR: init_epoll: epoll_create: %s [%d]\n",
+				strerror(errno), errno);
+		return -1;
+	}
+	return 0;
+}
+
+
+
+static void destroy_epoll(io_wait_h* h)
+{
+	if (h->epfd!=-1){
+		close(h->epfd);
+		h->epfd=-1;
+	}
+}
+#endif
+
+
+
+#ifdef HAVE_SELECT
+static int init_select(io_wait_h* h)
+{
+	FD_ZERO(&h->master_set);
+	return 0;
+}
+#endif
+
+
+
+/* return system version (major.minor.minor2) as
+ *  (major<<16)|(minor)<<8|(minor2)
+ * (if some of them are missing, they are set to 0)
+ * if the parameters are not null they are set to the coresp. part 
+ */
+static unsigned int get_sys_version(int* major, int* minor, int* minor2)
+{
+	struct utsname un;
+	int m1;
+	int m2;
+	int m3;
+	char* p;
+	
+	memset (&un, 0, sizeof(un));
+	m1=m2=m3=0;
+	/* get sys version */
+	uname(&un);
+	m1=strtol(un.release, &p, 10);
+	if (*p=='.'){
+		p++;
+		m2=strtol(p, &p, 10);
+		if (*p=='.'){
+			p++;
+			m3=strtol(p, &p, 10);
+		}
+	}
+	if (major) *major=m1;
+	if (minor) *minor=m2;
+	if (minor2) *minor2=m3;
+	return ((m1<<16)|(m2<<8)|(m3));
+}
+
+
+
+/*
+ * returns 0 on success, and an error message on error
+ */
+char* check_poll_method(enum poll_types poll_method)
+{
+	char* ret;
+	ret=0;
+	
+	switch(poll_method){
+		case POLL_NONE:
+			break;
+		case POLL_POLL:
+			/* always supported */
+			break;
+		case POLL_SELECT:
+			/* should be always supported */
+#ifndef HAVE_SELECT
+			ret="select not supported, try re-compiling with -DHAVE_SELECT";
+#endif
+			break;
+		case POLL_EPOLL_LT:
+		case POLL_EPOLL_ET:
+			/* only on 2.6 + */
+#ifndef HAVE_EPOLL
+			ret="epoll not supported, try re-compiling with -DHAVE_EPOLL";
+#else
+			if (get_sys_version(0,0,0)<0x020542) /* if ver < 2.5.66 */
+			 	ret="epoll not supported on kernels < 2.6";
+#endif
+			break;
+		case POLL_SIGIO_RT:
+#ifndef HAVE_SIGIO_RT
+			ret="sigio_rt not supported, try re-compiling with"
+				" -DHAVE_SIGIO_RT";
+#endif
+			break;
+		default:
+			ret="unknown not supported method";
+	}
+	return ret;
+}
+
+
+
+enum poll_types choose_poll_method()
+{
+	enum poll_types poll_method;
+	
+	poll_method=0;
+#ifdef HAVE_EPOLL
+	if (get_sys_version(0,0,0)>=0x020542) /* if ver >= 2.5.66 */
+		poll_method=POLL_EPOLL_LT; /* or POLL_EPOLL_ET */
+		
+#endif
+#ifdef  HAVE_SIGIO_RT
+		if (poll_method==0) poll_method=POLL_SIGIO_RT;
+#endif
+		if (poll_method==0) poll_method=POLL_POLL;
+	return poll_method;
+}
+
+
+
+char* poll_method_name(enum poll_types poll_method)
+{
+	if ((poll_method>=POLL_NONE) && (poll_method<POLL_END))
+		return poll_method_str[poll_method];
+	else
+		return "invalid poll method";
+}
+
+
+
+
+/* converts a string into a poll_method
+ * returns POLL_NONE (0) on error, else the corresponding poll type */
+enum poll_types get_poll_type(char* s)
+{
+	int r;
+	int l;
+	
+	l=strlen(s);
+	for (r=POLL_END-1; r>POLL_NONE; r--)
+		if ((strlen(poll_method_str[r])==l) &&
+			(strncasecmp(poll_method_str[r], s, l)==0))
+			break;
+	return r; 
+}
+
+
+
+/* initializes the static vars/arrays
+ * params:      h - pointer to the io_wait_h that will be initialized
+ *         max_fd - maximum allowed fd number
+ *         poll_m - poll method (0 for automatic best fit)
+ */
+int init_io_wait(io_wait_h* h, int max_fd, enum poll_types poll_method)
+{
+	char * poll_err;
+	
+	memset(h, 0, sizeof(*h));
+	h->max_fd_no=max_fd;
+#ifdef HAVE_EPOLL
+	h->epfd=-1;
+#endif
+	
+	poll_err=check_poll_method(poll_method);
+	
+	/* set an appropiate poll method */
+	if (poll_err || (poll_method==0)){
+		poll_method=choose_poll_method();
+		if (poll_err){
+			LOG(L_ERR, "ERROR: init_io_wait: %s, using %s instead\n",
+					poll_err, poll_method_str[poll_method]);
+		}else{
+			LOG(L_INFO, "init_io_wait: using %s as the io watch method"
+					" (auto detected)\n", poll_method_str[poll_method]);
+		}
+	}else{
+			LOG(L_INFO, "init_io_wait: using %s io watch method (forced)\n",
+					poll_method_str[poll_method]);
+	}
+
+	
+	h->poll_method=poll_method;
+	
+	/* common stuff, evrybody has fd_hash */
+	h->fd_hash=local_malloc(sizeof(*(h->fd_hash))*h->max_fd_no);
+	if (h->fd_hash==0){
+		LOG(L_CRIT, "ERROR: init_io_wait: could not alloc"
+					" fd hashtable (%d bytes)\n",
+					sizeof(*(h->fd_hash))*h->max_fd_no );
+		goto error;
+	}
+	memset((void*)h->fd_hash, 0, sizeof(*(h->fd_hash))*h->max_fd_no);
+	
+	switch(poll_method){
+		case POLL_POLL:
+#ifdef HAVE_SELECT
+		case POLL_SELECT:
+#endif
+#ifdef HAVE_SIGIO_RT
+		case POLL_SIGIO_RT:
+#endif
+			h->fd_array=local_malloc(sizeof(*(h->fd_array))*h->max_fd_no);
+			if (h->fd_array==0){
+				LOG(L_CRIT, "ERROR: init_io_wait: could not"
+							" alloc fd array (%d bytes)\n",
+							sizeof(*(h->fd_hash))*h->max_fd_no);
+				goto error;
+			}
+			memset((void*)h->fd_array, 0, sizeof(*(h->fd_array))*h->max_fd_no);
+#ifdef HAVE_SIGIO_RT
+			if ((poll_method==POLL_SIGIO_RT) && (init_sigio(h, 0)<0)){
+				LOG(L_CRIT, "ERROR: init_io_wait: sigio init failed\n");
+				goto error;
+			}
+#endif
+#ifdef HAVE_SELECT
+			if ((poll_method==POLL_SELECT) && (init_select(h)<0)){
+				LOG(L_CRIT, "ERROR: init_io_wait: select init failed\n");
+				goto error;
+			}
+#endif
+			
+			break;
+#ifdef HAVE_EPOLL
+		case POLL_EPOLL_LT:
+		case POLL_EPOLL_ET:
+			h->ep_array=local_malloc(sizeof(*(h->ep_array))*h->max_fd_no);
+			if (h->ep_array==0){
+				LOG(L_CRIT, "ERROR: init_io_wait: could not alloc"
+							" epoll array\n");
+				goto error;
+			}
+			memset((void*)h->ep_array, 0, sizeof(*(h->ep_array))*h->max_fd_no);
+			if (init_epoll(h)<0){
+				LOG(L_CRIT, "ERROR: init_io_wait: epoll init failed\n");
+				goto error;
+			}
+			break;
+#endif
+		default:
+			LOG(L_CRIT, "BUG: init_io_wait: unknown/unsupported poll"
+						" method %s (%d)\n",
+						poll_method_str[poll_method], poll_method);
+			goto error;
+	}
+	return 0;
+error:
+	return -1;
+}
+
+
+
+/* destroys everything init_io_wait allocated */
+void destroy_io_wait(io_wait_h* h)
+{
+	switch(h->poll_method){
+#ifdef HAVE_EPOLL
+		case POLL_EPOLL_LT:
+		case POLL_EPOLL_ET:
+			destroy_epoll(h);
+			if (h->ep_array){
+				local_free(h->ep_array);
+				h->ep_array=0;
+			}
+		break;
+#endif
+#ifdef HAVE_SIGIO_RT
+		case POLL_SIGIO_RT:
+			destroy_sigio(h);
+			break;
+#endif
+		default: /*do  nothing*/
+			;
+	}
+		if (h->fd_array){
+			local_free(h->fd_array);
+			h->fd_array=0;
+		}
+		if (h->fd_hash){
+			local_free(h->fd_hash);
+			h->fd_hash=0;
+		}
+}
+
+
+
+#endif

+ 663 - 0
io_wait.h

@@ -0,0 +1,663 @@
+/* 
+ * $Id$
+ * 
+ * Copyright (C) 2005 iptelorg GmbH
+ *
+ * This file is part of ser, a free SIP server.
+ *
+ * ser is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * For a license to use the ser software under conditions
+ * other than those described here, or to purchase support for this
+ * software, please contact iptel.org by e-mail at the following addresses:
+ *    [email protected]
+ *
+ * ser is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+/*
+ * tcp io wait common stuff used by tcp_main.c & tcp_read.c
+ * All the functions are inline because of speed reasons and because they are
+ * used only from 2 places.
+ * You also have to define:
+ *     int handle_io(struct fd_map* fm, int idx) (see below)
+ *     (this could be trivially replaced by a callback pointer entry attached
+ *      to the io_wait handler if more flexibility rather then performance
+ *      is needed)
+ *      fd_type - define to some enum of you choice and define also
+ *                FD_TYPE_DEFINED (if you don't do it fd_type will be defined
+ *                to int). 0 has a special not set/not init. meaning
+ *                (a lot of sanity checks and the sigio_rt code are based on
+ *                 this assumption)
+ *     local_malloc (defaults to pkg_malloc)
+ *     local_free   (defaults to pkg_free)
+ *  
+ */
+/* 
+ * History:
+ * --------
+ *  2005-06-13  created by andrei
+ */
+
+
+
+#ifndef _io_wait_h
+#define _io_wait_h
+
+#include <errno.h>
+#include <string.h>
+#ifdef HAVE_SIGIO_RT
+#define __USE_GNU /* or else F_SETSIG won't be included */
+#include <sys/types.h> /* recv */
+#include <sys/socket.h> /* recv */
+#include <signal.h> /* sigprocmask, sigwait a.s.o */
+#endif
+#ifdef HAVE_EPOLL
+#include <sys/epoll.h>
+#endif
+#ifdef HAVE_SELECT
+#include <sys/select.h>
+#endif
+#include <sys/poll.h>
+#include <fcntl.h>
+
+#include "dprint.h"
+
+#include "poll_types.h" /* poll_types*/
+#ifdef HAVE_SIGIO_RT
+#include "pt.h" /* mypid() */
+#endif
+
+
+#if 0
+enum fd_types; /* this should be defined from the including file,
+				  see tcp_main.c for an example, 
+				  0 has a special meaning: not used/empty*/
+#endif
+
+#ifndef FD_TYPE_DEFINED
+typedef int fd_type;
+#define FD_TYPE_DEFINED
+#endif
+
+/* maps a fd to some other structure; used in almost all cases
+ * except epoll and maybe kqueue or /dev/poll */
+struct fd_map{
+	int fd;               /* fd no */
+	fd_type type;         /* "data" type */
+	void* data;           /* pointer to the corresponding structure */
+};
+
+
+
+/* handler structure */
+struct io_wait_handler{
+#ifdef HAVE_EPOLL
+	struct epoll_event* ep_array;
+	int epfd; /* epoll ctrl fd */
+#endif
+#ifdef HAVE_SIGIO_RT
+	sigset_t sset; /* signal mask for sigio & sigrtmin */
+	int signo;     /* real time signal used */
+#endif
+#ifdef HAVE_SELECT
+	fd_set master_set;
+	int max_fd_select; /* maximum select used fd */
+#endif
+	/* common stuff for POLL, SIGIO_RT and SELECT
+	 * since poll support is always compiled => this will always be compiled */
+	struct fd_map* fd_hash;
+	struct pollfd* fd_array;
+	int fd_no; /*  current index used in fd_array */
+	int max_fd_no; /* maximum fd no, is also the size of fd_array,
+						       fd_hash  and ep_array*/
+	enum poll_types poll_method;
+	int flags;
+};
+
+typedef struct io_wait_handler io_wait_h;
+
+
+/* get the corresponding fd_map structure pointer */
+#define get_fd_map(h, fd)		(&(h)->fd_hash[(fd)])
+/* remove a fd_map structure from the hash; the pointer must be returned
+ * by get_fd_map or hash_fd_map*/
+#define unhash_fd_map(pfm)	\
+	do{ \
+		(pfm)->type=0 /*F_NONE */; \
+		(pfm)->fd=-1; \
+	}while(0)
+
+/* add a fd_map structure to the fd hash */
+static inline struct fd_map* hash_fd_map(	io_wait_h* h,
+											int fd,
+											fd_type type,
+											void* data)
+{
+	h->fd_hash[fd].fd=fd;
+	h->fd_hash[fd].type=type;
+	h->fd_hash[fd].data=data;
+	return &h->fd_hash[fd];
+}
+
+
+#ifdef HAVE_SIGIO_RT
+typedef unsigned int sigio_rtsig_mask_t;
+extern sigset_t _sigio_rtsig_used;
+extern int _sigio_crt_rtsig;
+extern int _sigio_init;
+#endif
+
+
+
+#ifdef HANDLE_IO_INLINE
+/* generic handle io routine, this must be defined in the including file
+ * (faster then registering a callback pointer)
+ *
+ * params:  fm  - pointer to a fd hash entry
+ *          idx - index in the fd_array (or -1 if not known)
+ * return: -1 on error
+ *          0 on EAGAIN or when by some other way it is known that no more 
+ *            io events are queued on the fd (the receive buffer is empty).
+ *            Usefull to detect when there are no more io events queued for
+ *            sigio_rt, epoll_et, kqueue.
+ *         >0 on successfull read from the fd (when there might be more io
+ *            queued -- the receive buffer might still be non-empty)
+ */
+inline static int handle_io(struct fd_map* fm, int idx);
+#else
+int handle_io(struct fd_map* fm, int idx);
+#endif
+
+
+
+/* generic io_watch_add function
+ * returns 0 on success, -1 on error
+ *
+ * this version should be faster than pointers to poll_method specific
+ * functions (it avoids functions calls, the overhead being only an extra
+ *  switch())*/
+inline static int io_watch_add(	io_wait_h* h,
+								int fd,
+								fd_type type,
+								void* data)
+{
+
+	/* helper macros */
+#define fd_array_setup \
+	do{ \
+		h->fd_array[h->fd_no].fd=fd; \
+		h->fd_array[h->fd_no].events=POLLIN; /* useless for select */ \
+		h->fd_array[h->fd_no].revents=0;     /* useless for select */ \
+	}while(0)
+	
+#define set_fd_flags(f) \
+	do{ \
+			flags=fcntl(fd, F_GETFL); \
+			if (flags==-1){ \
+				LOG(L_ERR, "ERROR: io_watch_add: fnctl: GETFL failed:" \
+						" %s [%d]\n", strerror(errno), errno); \
+				goto error; \
+			} \
+			if (fcntl(fd, F_SETFL, flags|(f))==-1){ \
+				LOG(L_ERR, "ERROR: io_watch_add: fnctl: SETFL" \
+							" failed: %s [%d]\n", strerror(errno), errno); \
+				goto error; \
+			} \
+	}while(0)
+	
+	
+	struct fd_map* e;
+	int flags;
+#ifdef HAVE_EPOLL
+	int n;
+	struct epoll_event ep_event;
+#endif
+#ifdef HAVE_SIGIO_RT
+	static char buf[65536];
+#endif
+	
+	if (fd==-1){
+		LOG(L_CRIT, "BUG: io_watch_add: fd is -1!\n");
+		goto error;
+	}
+	/* add it to the poll fd array */
+	if (h->fd_no>=h->max_fd_no){
+		LOG(L_CRIT, "ERROR: io_watch_add: maximum fd number exceeded:"
+				" %d/%d\n", h->fd_no, h->max_fd_no);
+		goto error;
+	}
+	DBG("DBG: io_watch_add(%p, %d, %d, %p), fd_no=%d\n",
+			h, fd, type, data, h->fd_no);
+	/*  hash sanity check */
+	e=get_fd_map(h, fd);
+	if (e && (e->type!=0 /*F_NONE*/)){
+		LOG(L_ERR, "ERROR: io_watch_add: trying to overwrite entry %d"
+				" in the hash(%d, %d, %p) with (%d, %d, %p)\n",
+				fd, e->fd, e->type, e->data, fd, type, data);
+		goto error;
+	}
+	
+	if ((e=hash_fd_map(h, fd, type, data))==0){
+		LOG(L_ERR, "ERROR: io_watch_add: failed to hash the fd %d\n", fd);
+		goto error;
+	}
+	switch(h->poll_method){ /* faster then pointer to functions */
+		case POLL_POLL:
+			fd_array_setup;
+			set_fd_flags(O_NONBLOCK);
+			break;
+#ifdef HAVE_SELECT
+		case POLL_SELECT:
+			fd_array_setup;
+			FD_SET(fd, &h->master_set);
+			if (h->max_fd_select<fd) h->max_fd_select=fd;
+			break;
+#endif
+#ifdef HAVE_SIGIO_RT
+		case POLL_SIGIO_RT:
+			fd_array_setup;
+			/* set async & signal */
+			if (fcntl(fd, F_SETOWN, my_pid())==-1){
+				LOG(L_ERR, "ERROR: io_watch_add: fnctl: SETOWN"
+				" failed: %s [%d]\n", strerror(errno), errno);
+				goto error;
+			}
+			if (fcntl(fd, F_SETSIG, h->signo)==-1){
+				LOG(L_ERR, "ERROR: io_watch_add: fnctl: SETSIG"
+					" failed: %s [%d]\n", strerror(errno), errno);
+				goto error;
+			}
+			/* set both non-blocking and async */
+			set_fd_flags(O_ASYNC| O_NONBLOCK);
+#ifdef EXTRA_DEBUG
+			DBG("io_watch_add: sigio_rt on f %d, signal %d to pid %d\n",
+					fd,  h->signo, pid);
+#endif
+			/* empty socket receive buffer, if buffer is already full
+			 * (e.g. early media), no more space to put packets
+			 * => no more signals are ever generated -- andrei */
+			while(recv(fd, buf, sizeof(buf), 0)>=0);
+			break;
+#endif
+#ifdef HAVE_EPOLL
+		case POLL_EPOLL_LT:
+			ep_event.events=EPOLLIN;
+			ep_event.data.ptr=e;
+again1:
+			n=epoll_ctl(h->epfd, EPOLL_CTL_ADD, fd, &ep_event);
+			if (n==-1){
+				if (errno==EAGAIN) goto again1;
+				LOG(L_ERR, "ERROR: io_watch_add: epoll_ctl failed: %s [%d]\n",
+					strerror(errno), errno);
+				goto error;
+			}
+			break;
+		case POLL_EPOLL_ET:
+			set_fd_flags(O_NONBLOCK);
+			ep_event.events=EPOLLIN|EPOLLET;
+			ep_event.data.ptr=e;
+again2:
+			n=epoll_ctl(h->epfd, EPOLL_CTL_ADD, fd, &ep_event);
+			if (n==-1){
+				if (errno==EAGAIN) goto again2;
+				LOG(L_ERR, "ERROR: io_watch_add: epoll_ctl failed: %s [%d]\n",
+					strerror(errno), errno);
+				goto error;
+			}
+			break;
+#endif
+		default:
+			LOG(L_CRIT, "BUG: io_watch_add: no support for poll method "
+					" %s (%d)\n", poll_method_str[h->poll_method],
+					h->poll_method);
+			goto error;
+	}
+	
+	h->fd_no++; /* "activate" changes, for epoll it
+				   has only informative value */
+	return 0;
+error:
+	return -1;
+#undef fd_array_setup
+#undef set_fd_flags 
+}
+
+
+
+/* parameters: fd and index in the fd_array
+ * if index==-1, it fd_array will be searched for the corresponding fd
+ * entry (slower but unavoidable in some cases)
+ * index is not used (no fd_arry) for epoll, /dev/poll and kqueue
+ * returns 0 if ok, -1 on error */
+inline static int io_watch_del(io_wait_h* h, int fd, int idx)
+{
+	
+#define fix_fd_array \
+	do{\
+			if (idx==-1){ \
+				/* fix idx if -1 and needed */ \
+				for (idx=0; (idx<h->fd_no) && \
+							(h->fd_array[idx].fd!=fd); idx++); \
+			} \
+			if (idx<h->fd_no){ \
+				memmove(&h->fd_array[idx], &h->fd_array[idx+1], \
+					(h->fd_no-(idx+1))*sizeof(*(h->fd_array))); \
+			} \
+	}while(0)
+	
+	struct fd_map* e;
+#ifdef HAVE_EPOLL
+	int n;
+	struct epoll_event ep_event;
+#endif
+	
+	if ((fd<0) || (fd>=h->max_fd_no)){
+		LOG(L_CRIT, "BUG: io_watch_del: invalid fd %d, not in [0, %d) \n",
+						fd, h->fd_no);
+		goto error;
+	}
+	DBG("DBG: io_watch_del (%p, %d, %d) fd_no=%d called\n",
+			h, fd, idx, h->fd_no);
+	e=get_fd_map(h, fd);
+	/* more sanity checks */
+	if (e==0){
+		LOG(L_CRIT, "BUG: io_watch_del: no corresponding hash entry for %d\n",
+					fd);
+		goto error;
+	}
+	if (e->type==0 /*F_NONE*/){
+		LOG(L_ERR, "ERROR: io_watch_del: trying to delete already erased"
+				" entry %d in the hash(%d, %d, %p) )\n",
+				fd, e->fd, e->type, e->data);
+		goto error;
+	}
+	
+	unhash_fd_map(e);
+	
+	switch(h->poll_method){
+		case POLL_POLL:
+			fix_fd_array;
+			break;
+#ifdef HAVE_SELECT
+		case POLL_SELECT:
+			fix_fd_array;
+			FD_CLR(fd, &h->master_set);
+			if (h->max_fd_select && (h->max_fd_select==fd))
+				/* we don't know the prev. max, so we just decrement it */
+				h->max_fd_select--; 
+			break;
+#endif
+#ifdef HAVE_SIGIO_RT
+		case POLL_SIGIO_RT:
+			fix_fd_array;
+			/* FIXME: re-set ASYNC? (not needed if the fd is/will be closed
+			 *        but might cause problems if the fd is "moved")
+			 *        update: probably not needed, the fd_map type!=0
+			 *        check should catch old queued signals or in-transit fd
+			 *        (so making another syscall to reset ASYNC is not 
+			 *         necessary)*/
+			break;
+#endif
+#ifdef HAVE_EPOLL
+		case POLL_EPOLL_LT:
+		case POLL_EPOLL_ET:
+			n=epoll_ctl(h->epfd, EPOLL_CTL_DEL, fd, &ep_event);
+			if (n==-1){
+				LOG(L_ERR, "ERROR: io_watch_del: removing fd from"
+					" epoll list failed: %s [%d]\n", strerror(errno), errno);
+				goto error;
+			}
+			break;
+#endif
+		default:
+			LOG(L_CRIT, "BUG: io_watch_del: no support for poll method "
+					" %s (%d)\n", poll_method_str[h->poll_method], 
+					h->poll_method);
+			goto error;
+	}
+	h->fd_no--;
+	return 0;
+error:
+	return -1;
+#undef fix_fd_array
+}
+
+
+
+/* io_wait_loop_x style function 
+ * wait for io using poll()
+ * params: h      - io_wait handle
+ *         t      - timeout in s
+ *         repeat - if !=0 handle_io will be called until it returns <=0
+ * returns: 0 on success, -1 on err
+ */
+inline static int io_wait_loop_poll(io_wait_h* h, int t, int repeat)
+{
+	int n, r;
+	int ret;
+again:
+		ret=n=poll(h->fd_array, h->fd_no, t*1000);
+		if (n==-1){
+			if (errno==EINTR) goto again; /* signal, ignore it */
+			else{
+				LOG(L_ERR, "ERROR:io_wait_loop_poll: poll: %s [%d]\n",
+						strerror(errno), errno);
+				goto error;
+			}
+		}
+		for (r=0; (r<h->fd_no) && n; r++){
+			if (h->fd_array[r].revents & (POLLIN|POLLERR|POLLHUP)){
+				n--;
+				/* sanity checks */
+				if ((h->fd_array[r].fd >= h->max_fd_no)||
+						(h->fd_array[r].fd < 0)){
+					LOG(L_CRIT, "BUG: io_wait_loop_poll: bad fd %d "
+							"(no in the 0 - %d range)\n",
+							h->fd_array[r].fd, h->max_fd_no);
+					/* try to continue anyway */
+					h->fd_array[r].events=0; /* clear the events */
+					continue;
+				}
+				while((handle_io(get_fd_map(h, h->fd_array[r].fd), r) > 0)
+						 && repeat);
+			}
+		}
+error:
+	return ret;
+}
+
+
+
+#ifdef HAVE_SELECT
+/* wait for io using select */
+inline static int io_wait_loop_select(io_wait_h* h, int t, int repeat)
+{
+	fd_set sel_set;
+	int n, ret;
+	struct timeval timeout;
+	int r;
+	
+again:
+		sel_set=h->master_set;
+		timeout.tv_sec=t;
+		timeout.tv_usec=0;
+		ret=n=select(h->max_fd_select+1, &sel_set, 0, 0, &timeout);
+		if (n<0){
+			if (errno==EINTR) goto again; /* just a signal */
+			LOG(L_ERR, "ERROR: io_wait_loop_select: select: %s [%d]\n",
+					strerror(errno), errno);
+			n=0;
+			/* continue */
+		}
+		/* use poll fd array */
+		for(r=0; (r<h->max_fd_no) && n; r++){
+			if (FD_ISSET(h->fd_array[r].fd, &sel_set)){
+				while((handle_io(get_fd_map(h, h->fd_array[r].fd), r)>0)
+						&& repeat);
+				n--;
+			}
+		};
+	return ret;
+}
+#endif
+
+
+
+#ifdef HAVE_EPOLL
+inline static int io_wait_loop_epoll(io_wait_h* h, int t, int repeat)
+{
+	int n, r;
+	
+again:
+		n=epoll_wait(h->epfd, h->ep_array, h->fd_no, t*1000);
+		if (n==-1){
+			if (errno==EINTR) goto again; /* signal, ignore it */
+			else{
+				LOG(L_ERR, "ERROR:io_wait_loop_epoll_et: epoll_wait:"
+						" %s [%d]\n", strerror(errno), errno);
+				goto error;
+			}
+		}
+		for (r=0; r<n; r++){
+			while((handle_io((struct fd_map*)h->ep_array[r].data.ptr, -1)>0)
+					&& repeat);
+		}
+error:
+	return n;
+}
+#endif
+
+
+
+#ifdef HAVE_SIGIO_RT
+/* sigio rt version has no repeat (it doesn't make sense)*/
+inline static int io_wait_loop_sigio_rt(io_wait_h* h, int t)
+{
+	int n;
+	int ret;
+	struct timespec ts;
+	siginfo_t siginfo;
+	int sigio_band;
+	int sigio_fd;
+	struct fd_map* fm;
+	
+	
+	ret=1; /* 1 event per call normally */
+	ts.tv_sec=t;
+	ts.tv_nsec=0;
+	if (!sigismember(&h->sset, h->signo) || !sigismember(&h->sset, SIGIO)){
+		LOG(L_CRIT, "BUG: io_wait_loop_sigio_rt: the signal mask"
+				" is not properly set!\n");
+		goto error;
+	}
+
+again:
+	n=sigtimedwait(&h->sset, &siginfo, &ts);
+	if (n==-1){
+		if (errno==EINTR) goto again; /* some other signal, ignore it */
+		else if (errno==EAGAIN){ /* timeout */
+			ret=0;
+			goto end;
+		}else{
+			LOG(L_ERR, "ERROR: io_wait_loop_sigio_rt: sigtimed_wait"
+					" %s [%d]\n", strerror(errno), errno);
+			goto error;
+		}
+	}
+	if (n!=SIGIO){
+#ifdef SIGINFO64_WORKARROUND
+		/* on linux siginfo.si_band is defined as long in userspace
+		 * and as int kernel => on 64 bits things will break!
+		 * (si_band will include si_fd, and si_fd will contain
+		 *  garbage)
+		 *  see /usr/src/linux/include/asm-generic/siginfo.h and
+		 *      /usr/include/bits/siginfo.h
+		 * -- andrei */
+		if (sizeof(siginfo.si_band)>sizeof(int)){
+			sigio_band=*((int*)&siginfo.si_band);
+			sigio_fd=*(((int*)&siginfo.si_band)+1);
+		}else
+#endif
+		{
+			sigio_band=siginfo.si_band;
+			sigio_fd=siginfo.si_fd;
+		}
+		if (siginfo.si_code==SI_SIGIO){
+			/* old style, we don't know the event (linux 2.2.?) */
+			LOG(L_WARN, "WARNING: io_wait_loop_sigio_rt: old style sigio"
+					" interface\n");
+			fm=get_fd_map(h, sigio_fd);
+			/* we can have queued signals generated by fds not watched
+			 * any more, or by fds in transition, to a child => ignore them*/
+			if (fm->type)
+				handle_io(fm, -1);
+		}else{
+#ifdef EXTRA_DEBUG
+			DBG("io_wait_loop_sigio_rt: siginfo: signal=%d (%d),"
+					" si_code=%d, si_band=0x%x,"
+					" si_fd=%d\n",
+					siginfo.si_signo, n, siginfo.si_code, 
+					(unsigned)sigio_band,
+					sigio_fd);
+#endif
+			if (sigio_band&(POLL_IN|POLL_ERR)){
+				fm=get_fd_map(h, sigio_fd);
+				/* we can have queued signals generated by fds not watched
+			 	 * any more, or by fds in transition, to a child 
+				 * => ignore them */
+				if (fm->type)
+					handle_io(fm, -1);
+			}
+		}
+	}else{
+		/* signal queue overflow 
+		 * TODO: increase signal queue size: 2.4x /proc/.., 2.6x -rlimits */
+		LOG(L_WARN, "WARNING: io_wait_loop_sigio_rt: signal queue overflowed"
+					"- falling back to poll\n");
+		/* clear real-time signal queue
+		 * both SIG_IGN and SIG_DFL are needed , it doesn't work
+		 * only with SIG_DFL  */
+		if (signal(h->signo, SIG_IGN)==SIG_ERR){
+			LOG(L_CRIT, "BUG: do_poll: couldn't reset signal to IGN\n");
+		}
+		
+		if (signal(h->signo, SIG_DFL)==SIG_ERR){
+			LOG(L_CRIT, "BUG: do_poll: couldn't reset signal to DFL\n");
+		}
+		/* falling back to normal poll */
+		ret=io_wait_loop_poll(h, -1, 1);
+	}
+end:
+	return ret;
+error:
+	return -1;
+}
+#endif
+
+
+
+/* init */
+
+
+/* initializes the static vars/arrays
+ * params:      h - pointer to the io_wait_h that will be initialized
+ *         max_fd - maximum allowed fd number
+ *         poll_m - poll method (0 for automatic best fit)
+ */
+int init_io_wait(io_wait_h* h, int max_fd, enum poll_types poll_method);
+
+/* destroys everything init_io_wait allocated */
+void destroy_io_wait(io_wait_h* h);
+
+
+#endif

+ 34 - 2
main.c

@@ -56,6 +56,9 @@
  *               crashed childvwhich still holds the lock  (andrei)
  *  2004-12-02  removed -p, extended -l to support [proto:]address[:port],
  *               added parse_phostport, parse_proto (andrei)
+ *  2005-06-16  always record the pid in pt[process_no].pid twice: once in the
+ *               parent & once in the child to avoid a short window when one
+ *               of them might use it "unset" (andrei)
  */
 
 
@@ -111,6 +114,7 @@
 #include "script_cb.h"
 #include "ut.h"
 #ifdef USE_TCP
+#include "poll_types.h"
 #include "tcp_init.h"
 #ifdef USE_TLS
 #include "tls/tls_init.h"
@@ -155,7 +159,8 @@ Options:\n\
     -E           Log to stderr\n"
 #ifdef USE_TCP
 "    -T           Disable tcp\n\
-    -N           Number of tcp child processes (default: equal to `-n`)\n"
+    -N           Number of tcp child processes (default: equal to `-n`)\n\
+    -W           poll method\n"
 #endif
 "    -V           Version number\n\
     -h           This help message\n\
@@ -190,6 +195,9 @@ void print_ct_constants()
 			" MAX_URI_SIZE %d, BUF_SIZE %d\n",
 		MAX_RECV_BUFFER_SIZE, MAX_LISTEN, MAX_URI_SIZE, 
 		BUF_SIZE );
+#ifdef USE_TCP
+	printf("poll method support: %s.\n", poll_support);
+#endif
 }
 
 /* debugging function */
@@ -862,6 +870,9 @@ int main_loop()
 				
 				if (pid==0){
 					/* child */
+					/* record pid twice to avoid the child using it, before
+					 * parent gets a chance to set it*/
+					pt[process_no].pid=getpid();
 					/* timer!*/
 					/* process_bit = 0; */
 					if (init_child(PROC_TIMER) < 0) {
@@ -1021,6 +1032,9 @@ int main_loop()
 						unix_tcp_sock=sockfd[1];
 					}
 #endif
+					/* record pid twice to avoid the child using it, before
+					 * parent gets a chance to set it*/
+					pt[process_no].pid=getpid();
 					bind_address=si; /* shortcut */
 					if (init_child(i + 1) < 0) {
 						LOG(L_ERR, "init_child failed\n");
@@ -1084,6 +1098,9 @@ int main_loop()
 				unix_tcp_sock=sockfd[1];
 			}
 #endif
+			/* record pid twice to avoid the child using it, before
+			 * parent gets a chance to set it*/
+			pt[process_no].pid=getpid();
 			if (init_child(PROC_TIMER) < 0) {
 				LOG(L_ERR, "timer: init_child failed\n");
 				goto error;
@@ -1120,6 +1137,9 @@ int main_loop()
 			}else if (pid==0){
 				/* child */
 				/* is_main=0; */
+				/* record pid twice to avoid the child using it, before
+				 * parent gets a chance to set it*/
+				pt[process_no].pid=getpid();
 				if (init_child(PROC_TCP_MAIN) < 0) {
 					LOG(L_ERR, "tcp_main: error in init_child\n");
 					goto error;
@@ -1215,7 +1235,7 @@ int main(int argc, char** argv)
 #ifdef STATS
 	"s:"
 #endif
-	"f:cm:b:l:n:N:rRvdDETVhw:t:u:g:P:G:i:x:";
+	"f:cm:b:l:n:N:rRvdDETVhw:t:u:g:P:G:i:x:W:";
 	
 	while((c=getopt(argc,argv,options))!=-1){
 		switch(c){
@@ -1306,6 +1326,18 @@ int main(int argc, char** argv)
 					}
 #else
 					fprintf(stderr,"WARNING: tcp support not compiled in\n");
+#endif
+					break;
+			case 'W':
+#ifdef USE_TCP
+					tcp_poll_method=get_poll_type(optarg);
+					if (tcp_poll_method==POLL_NONE){
+						fprintf(stderr, "bad poll method name: -W %s\ntry "
+										"one of %s.\n", optarg, poll_support);
+						goto error;
+					}
+#else
+					fprintf(stderr,"WARNING: tcp support not compiled in\n");
 #endif
 					break;
 			case 'V':

+ 2 - 2
parser/msg_parser.h

@@ -309,13 +309,13 @@ inline static int char_msg_val( struct sip_msg *msg, char *cv )
 inline static char* get_body(struct sip_msg *msg)
 {
 	int offset;
-	int len;
+	unsigned int len;
 
 	if ( parse_headers(msg, HDR_EOH_F, 0)==-1 )
 		return 0;
 
 	if (msg->unparsed){
-		len=(int)(msg->unparsed-msg->buf);
+		len=(unsigned int)(msg->unparsed-msg->buf);
 	}else return 0;
 	if ((len+2<=msg->len) && (strncmp(CRLF,msg->unparsed,CRLF_LEN)==0) )
 		offset = CRLF_LEN;

+ 49 - 10
pass_fd.c

@@ -31,6 +31,8 @@
   *  2003-02-20  added solaris support (! HAVE_MSGHDR_MSG_CONTROL) (andrei)
   *  2003-11-03  added send_all, recv_all  and updated send/get_fd
   *               to handle signals  (andrei)
+  *  2005-06-13  added flags to recv_all & receive_fd, to allow full blocking
+  *              or semi-nonblocking mode (andrei)
   */
 
 #ifdef USE_TCP
@@ -47,29 +49,55 @@
 
 
 /* receive all the data or returns error (handles EINTR etc.)
+ * params: socket
+ *         data     - buffer for the results
+ *         data_len - 
+ *         flags    - recv flags for the first recv (see recv(2)), only
+ *                    0, MSG_WAITALL and MSG_DONTWAIT make sense
+ * if flags is set to MSG_DONWAIT (or to 0 and the socket fd is non-blocking),
+ * and if no data is queued on the fd, recv_all will not wait (it will 
+ * return error and set errno to EAGAIN/EWOULDBLOCK). However if even 1 byte
+ *  is queued, the call will block until the whole data_len was read or an
+ *  error or eof occured ("semi-nonblocking" behaviour,  some tcp code
+ *   counts on it).
+ * if flags is set to MSG_WAITALL it will block even if no byte is available.
+ *  
  * returns: bytes read or error (<0)
  * can return < data_len if EOF */
-int recv_all(int socket, void* data, int data_len)
+int recv_all(int socket, void* data, int data_len, int flags)
 {
 	int b_read;
 	int n;
 	
 	b_read=0;
-	do{
+again:
+	n=recv(socket, (char*)data, data_len, flags);
+	if (n<0){
+		/* error */
+		if (errno==EINTR) goto again; /* signal, try again */
+		/* on EAGAIN just return (let the caller know) */
+		if ((errno==EAGAIN)||(errno==EWOULDBLOCK)) return n;
+			LOG(L_CRIT, "ERROR: recv_all: 1st recv on %d failed: %s\n",
+					socket, strerror(errno));
+			return n;
+	}
+	b_read+=n;
+	while( (b_read!=data_len) && (n)){
 		n=recv(socket, (char*)data+b_read, data_len-b_read, MSG_WAITALL);
 		if (n<0){
 			/* error */
 			if (errno==EINTR) continue; /* signal, try again */
-			LOG(L_CRIT, "ERROR: recv_all: recv on %d failed: %s\n",
+			LOG(L_CRIT, "ERROR: recv_all: 2nd recv on %d failed: %s\n",
 					socket, strerror(errno));
 			return n;
 		}
 		b_read+=n;
-	}while( (b_read!=data_len) && (n));
+	}
 	return b_read;
 }
 
 
+
 /* sends all data (takes care of signals) (assumes blocking fd)
  * returns number of bytes sent or < 0 for an error */
 int send_all(int socket, void* data, int data_len)
@@ -136,7 +164,15 @@ again:
 
 
 
-int receive_fd(int unix_socket, void* data, int data_len, int* fd)
+/* receives a fd and data_len data
+ * params: unix_socket 
+ *         data
+ *         data_len
+ *         fd         - will be set to the passed fd value or -1 if no fd
+ *                      was passed
+ *         flags      - 0, MSG_DONTWAIT, MSG_WAITALL; same as recv_all flags
+ * returns: bytes read on success, -1 on error (and sets errno) */
+int receive_fd(int unix_socket, void* data, int data_len, int* fd, int flags)
 {
 	struct msghdr msg;
 	struct iovec iov[1];
@@ -166,9 +202,10 @@ int receive_fd(int unix_socket, void* data, int data_len, int* fd)
 	msg.msg_iovlen=1;
 	
 again:
-	ret=recvmsg(unix_socket, &msg, MSG_WAITALL);
+	ret=recvmsg(unix_socket, &msg, flags);
 	if (ret<0){
 		if (errno==EINTR) goto again;
+		if ((errno==EAGAIN)||(errno==EWOULDBLOCK)) goto error;
 		LOG(L_CRIT, "ERROR: receive_fd: recvmsg on %d failed: %s\n",
 				unix_socket, strerror(errno));
 		goto error;
@@ -181,7 +218,8 @@ again:
 	if (ret<data_len){
 		LOG(L_WARN, "WARNING: receive_fd: too few bytes read (%d from %d)"
 				    "trying to fix...\n", ret, data_len);
-		n=recv_all(unix_socket, (char*)data+ret, data_len-ret);
+		/* blocking recv_all */
+		n=recv_all(unix_socket, (char*)data+ret, data_len-ret, MSG_WAITALL);
 		if (n>=0) ret+=n;
 		else{
 			ret=n;
@@ -204,8 +242,9 @@ again:
 		}
 		*fd=*((int*) CMSG_DATA(cmsg));
 	}else{
+		/*
 		LOG(L_ERR, "ERROR: receive_fd: no descriptor passed, cmsg=%p,"
-				"len=%d\n", cmsg, (unsigned)cmsg->cmsg_len);
+				"len=%d\n", cmsg, (unsigned)cmsg->cmsg_len); */
 		*fd=-1;
 		/* it's not really an error */
 	}
@@ -213,8 +252,8 @@ again:
 	if (msg.msg_accrightslen==sizeof(int)){
 		*fd=new_fd;
 	}else{
-		LOG(L_ERR, "ERROR: receive_fd: no descriptor passed,"
-				" accrightslen=%d\n", msg.msg_accrightslen);
+		/*LOG(L_ERR, "ERROR: receive_fd: no descriptor passed,"
+				" accrightslen=%d\n", msg.msg_accrightslen); */
 		*fd=-1;
 	}
 #endif

+ 2 - 2
pass_fd.h

@@ -30,9 +30,9 @@
 
 
 int send_fd(int unix_socket, void* data, int data_len, int fd);
-int receive_fd(int unix_socket, void* data, int data_len, int* fd);
+int receive_fd(int unix_socket, void* data, int data_len, int* fd, int flags);
 
-int recv_all(int socket, void* data, int data_len);
+int recv_all(int socket, void* data, int data_len, int flags);
 int send_all(int socket, void* data, int data_len);
 
 

+ 61 - 0
poll_types.h

@@ -0,0 +1,61 @@
+/* 
+ * $Id$
+ * 
+ * Copyright (C) 2005 iptelorg GmbH
+ *
+ * This file is part of ser, a free SIP server.
+ *
+ * ser is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * For a license to use the ser software under conditions
+ * other than those described here, or to purchase support for this
+ * software, please contact iptel.org by e-mail at the following addresses:
+ *    [email protected]
+ *
+ * ser is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+/* 
+ * io wait poll methods (enum, strings, related function)
+ * see io_wait.h for more details
+ * 
+ *  
+ */
+/* 
+ * History:
+ * --------
+ *  2005-06-15  created by andrei
+ */
+
+
+#ifndef _poll_types_h
+#define _poll_types_h
+
+enum poll_types { POLL_NONE, POLL_POLL, POLL_EPOLL_LT, POLL_EPOLL_ET,
+					POLL_SIGIO_RT, POLL_SELECT, POLL_KQUEUE, POLL_DEVPOLL,
+					POLL_END};
+
+/* all the function and vars are defined in io_wait.c */
+
+extern char* poll_method_str[POLL_END];
+extern char* poll_support; 
+
+
+enum poll_types choose_poll_method();
+
+/* returns 0 on success, and an error message on error */
+char* check_poll_method(enum poll_types poll_method);
+
+char* poll_method_name(enum poll_types poll_method);
+enum poll_types get_poll_type(char* s);
+
+#endif

+ 1 - 0
tcp_conn.h

@@ -50,6 +50,7 @@
 									   timeout */
 #define DEFAULT_TCP_CONNECT_TIMEOUT 10 /* if a connect doesn't complete in this
 										  time, timeout */
+#define DEFAULT_TCP_MAX_FD_NO 2048 /* maximum fd number */
 #define TCP_CHILD_TIMEOUT 5 /* after 5 seconds, the child "returns" 
 							 the connection to the tcp master process */
 #define TCP_MAIN_SELECT_TIMEOUT 5 /* how often "tcp main" checks for timeout*/

文件差异内容过多而无法显示
+ 596 - 348
tcp_main.c


+ 1 - 1
tcp_read.c

@@ -588,7 +588,7 @@ void tcp_receive_loop(int unix_sock)
 			if (FD_ISSET(unix_sock, &sel_set)){
 				nfds--;
 				/* a new conn from "main" */
-				n=receive_fd(unix_sock, &con, sizeof(con), &s);
+				n=receive_fd(unix_sock, &con, sizeof(con), &s, 0);
 				if (n<0){
 					if (errno == EWOULDBLOCK || errno == EAGAIN ||
 							errno == EINTR){

+ 3 - 0
unixsock_server.c

@@ -605,6 +605,9 @@ int init_unixsock_children(void)
 				unix_tcp_sock=sockfd[1];
 			}
 #endif
+			/* record pid twice to avoid the child using it, before
+			 * parent gets a chance to set it*/
+			pt[process_no].pid=getpid();
 			if (init_child(PROC_UNIXSOCK) < 0) {
 				LOG(L_ERR, "init_unixsock_server: Error in "
 				    "init_child\n");

部分文件因为文件数量过多而无法显示