Browse Source

- merge from testing-0.8.12-r0:
- tcp updates (lots)
- makefile mips support
- tm timer workarround (present also in stable), t_relay_tls changed
to t_relay_to_tls
- udp_flood sleep & throttle support

Andrei Pelinescu-Onciul 22 years ago
parent
commit
06aaa54ff3
12 changed files with 404 additions and 130 deletions
  1. 47 11
      Makefile.defs
  2. 5 5
      etc/ser.cfg
  3. 2 2
      fifo_server.c
  4. 5 0
      modules/tm/timer.c
  5. 1 1
      modules/tm/tm_load.h
  6. 83 10
      pass_fd.c
  7. 2 0
      pass_fd.h
  8. 4 2
      tcp_conn.h
  9. 199 80
      tcp_main.c
  10. 25 17
      tcp_read.c
  11. 1 1
      test/test.cfg
  12. 30 1
      test/udp_flood.c

+ 47 - 11
Makefile.defs

@@ -29,6 +29,7 @@
 #  2003-09-25  added -pthread into LIBS when compiling on FreeBSD/alpha
 #              and other FreeBSD arches for which no fast locking assembly
 #              code exists (sobomax)
+#  2003-11-08  mips1 support introduced (andrei)
 
 
 # check if already included/exported
@@ -43,7 +44,7 @@ export makefile_defs
 VERSION = 0
 PATCHLEVEL = 8
 SUBLEVEL =   12
-EXTRAVERSION = dev-22-tcp_aliases
+EXTRAVERSION = -dev-23-merged
 
 RELEASE=$(VERSION).$(PATCHLEVEL).$(SUBLEVEL)$(EXTRAVERSION)
 OS = $(shell uname -s | sed -e s/SunOS/solaris/ | tr "[A-Z]" "[a-z]")
@@ -51,7 +52,7 @@ ARCH = $(shell uname -m |sed -e s/i.86/i386/ -e s/sun4u/sparc64/  \
 			-e s/armv4l/arm/)
 
 # TLS support
-TLS ?=
+TLS ?= 
 ifneq ($(TLS),)
 	RELEASE:=$(RELEASE)-tls
 endif
@@ -240,16 +241,16 @@ endif
 # -DUSE_POSIX_SEM
 #		uses posix semaphores for locking (faster than sys v)
 # -DBUSY_WAIT
-#		uses busy waiting on the lock
+#		uses busy waiting on the lock (FAST_LOCK)
 # -DADAPTIVE_WAIT
 #		try busy waiting for a while and if the lock is still held go to
-#		force reschedule
+#		force reschedule (FAST_LOCK)
 # -DADAPTIVE_WAIT_LOOPS=number
 #		number of loops we busy wait, after "number" loops have elapsed we 
-#		force a reschedule
+#		force a reschedule (FAST_LOCK)
 # -DNOSMP
 #		don't use smp compliant locking (faster but won't work on SMP machines)
-#		(not yet enabled)
+#		(not yet enabled) (FAST_LOCK)
 # -DNO_PINGTEL_TAG_HACK
 #		if enabled, To-header-field will be less liberal and will not accept
 #		'tag=' (tag parameter with equal sign and without value); it is called
@@ -273,12 +274,11 @@ DEFS+= $(extra_defs) \
 	 -DCFG_DIR='"$(cfg-target)"'\
 	 -DPKG_MALLOC \
 	 -DSHM_MEM  -DSHM_MMAP \
-	 -DADAPTIVE_WAIT -DADAPTIVE_WAIT_LOOPS=1024 \
 	 -DDNS_IP_HACK \
 	 -DUSE_IPV6 \
 	 -DUSE_TCP \
 	 -DDISABLE_NAGLE \
-	 -DDBG_QM_MALLOC \
+	# -DDBG_QM_MALLOC \
 	# -DF_MALLOC \
 	# -DDBG_F_MALLOC \
 	# -DDBG_QM_MALLOC \
@@ -386,8 +386,13 @@ ifeq ($(ARCH), ppc)
 	use_fast_lock=yes
 endif
 
+ifeq ($(ARCH), mips)
+# mips1 arch. (e.g. R3000) - no hardware locking support
+	use_fast_lock=no
+endif
+
 ifeq ($(use_fast_lock), yes)
-	DEFS+= -DFAST_LOCK
+	DEFS+= -DFAST_LOCK -DADAPTIVE_WAIT -DADAPTIVE_WAIT_LOOPS=1024 
 	found_lock_method=yes
 endif
 
@@ -519,15 +524,46 @@ else
 				#really old version
 $(warning			You are using an old and unsupported gcc \
 					 version ($(CC_SHORTVER)), compile at your own risk!)
-
+	
 endif			# CC_SHORTVER, 2.9x
 endif			# CC_SHORTVER, 3.0
+	
+else		# CC_NAME, gcc
+				#other compilers
+$(error 			Unsupported compiler ($(CC):$(CC_NAME)), try gcc)
+endif		#CC_NAME, gcc
+endif	#ARCH, arm 
 
+	#if  mips (R3000)
+ifeq	($(ARCH), mips)
+		# if gcc 
+ifeq		($(CC_NAME), gcc)
+				#common stuff
+				CFLAGS=-O9 -funroll-loops  -Wcast-align $(PROFILE) \
+					-Wall   \
+			#if gcc 3.0
+ifeq			($(CC_SHORTVER), 3.0)
+					CFLAGS+= -mcpu=r3000
+							#-mcpu=athlon
+else
+ifeq			($(CC_SHORTVER), 2.9x) #older gcc version (2.9[1-5])
+$(warning 			Old gcc detected ($(CC_SHORTVER)), use  gcc 3.0.x \
+					for better results)
+					
+					CFLAGS+=-mcpu=r3000
+else
+				#really old version
+$(warning			You are using an old and unsupported gcc \
+					 version ($(CC_SHORTVER)), compile at your own risk!)
+	
+endif			# CC_SHORTVER, 2.9x
+endif			# CC_SHORTVER, 3.0
+	
 else		# CC_NAME, gcc
 				#other compilers
 $(error 			Unsupported compiler ($(CC):$(CC_NAME)), try gcc)
 endif		#CC_NAME, gcc
-endif	#ARCH, i386
+endif	#ARCH, mips
 
 
 CFLAGS+= $(CC_EXTRA_OPTS)

+ 5 - 5
etc/ser.cfg

@@ -6,9 +6,9 @@
 
 # ----------- global configuration parameters ------------------------
 
-debug=3         # debug level (cmd line: -dddddddddd)
-fork=yes
-log_stderror=no	# (cmd line: -E)
+#debug=3         # debug level (cmd line: -dddddddddd)
+#fork=yes
+#log_stderror=no	# (cmd line: -E)
 
 /* Uncomment these lines to enter debugging mode 
 fork=no
@@ -18,8 +18,8 @@ log_stderror=yes
 check_via=no	# (cmd. line: -v)
 dns=no           # (cmd. line: -r)
 rev_dns=no      # (cmd. line: -R)
-port=5060
-children=4
+#port=5060
+#children=4
 fifo="/tmp/ser_fifo"
 
 # ------------------ module loading ----------------------------------

+ 2 - 2
fifo_server.c

@@ -349,8 +349,8 @@ static int fifo_check(int fd, char* fname)
 	 */
 	if ((lst.st_dev!=fst.st_dev)||(lst.st_ino!=fst.st_ino)){
 		LOG(L_ERR, "ERROR: security: fifo_check: inode/dev number differ"
-				": %ld %ld (%s)\n",
-				 fst.st_ino, lst.st_ino, fname);
+				": %d %d (%s)\n",
+				 (int)fst.st_ino, (int)lst.st_ino, fname);
 		return -1;
 	}
 	/* success */

+ 5 - 0
modules/tm/timer.c

@@ -326,6 +326,11 @@ inline static void final_response_handler( void *attr)
 	struct cell *t;
 
 	r_buf = (struct retr_buf*)attr;
+	if (r_buf==0){
+		/* or BUG?, ignoring it for now */
+		LOG(L_CRIT, "ERROR: final_response_handler(0) called\n");
+		return;
+	}
 	t=r_buf->my_T;
 
 #	ifdef EXTRA_DEBUG

+ 1 - 1
modules/tm/tm_load.h

@@ -52,7 +52,7 @@
 #define T_RELAY_TO           "t_relay_to"
 #define T_RELAY_TO_UDP       "t_relay_to_udp"
 #define T_RELAY_TO_TCP       "t_relay_to_tcp"
-#define T_RELAY_TO_TLS       "t_relay_tls"
+#define T_RELAY_TO_TLS       "t_relay_to_tls"
 #define T_RELAY              "t_relay"
 #define T_REPLY              "t_reply"
 #define T_REPLY_WB           "t_reply_with_body"

+ 83 - 10
pass_fd.c

@@ -29,6 +29,8 @@
   * --------
   *  2002-11-29  created by andrei
   *  2003-02-20  added solaris support (! HAVE_MSGHDR_MSG_CONTROL) (andrei)
+  *  2003-11-03  added send_all, recv_all  and updated send/get_fd
+  *               to handle signals  (andrei)
   */
 
 #ifdef USE_TCP
@@ -37,10 +39,55 @@
 #include <sys/socket.h>
 #include <sys/uio.h>
 #include <stdlib.h> /* for NULL definition on openbsd */
+#include <errno.h>
+#include <string.h>
 
 #include "dprint.h"
 
 
+
+/* receive all the data or returns error (handles EINTR etc.)
+ * returns: bytes read or error (<0)
+ * can return < data_len if EOF */
+int recv_all(int socket, void* data, int data_len)
+{
+	int b_read;
+	int n;
+	
+	b_read=0;
+	do{
+		n=recv(socket, data+b_read, data_len-b_read, MSG_WAITALL);
+		if (n<0){
+			/* error */
+			if (errno==EINTR) continue; /* signal, try again */
+			LOG(L_CRIT, "ERROR: recv_all: recv on %d failed: %s\n",
+					socket, strerror(errno));
+			return n;
+		}
+		b_read+=n;
+	}while( (b_read!=data_len) && (n));
+	return b_read;
+}
+
+
+/* sends all data (takes care of signals) (assumes blocking fd)
+ * returns number of bytes sent or < 0 for an error */
+int send_all(int socket, void* data, int data_len)
+{
+	int n;
+	
+again:
+	n=send(socket, data, data_len, 0);
+	if (n<0){
+			/* error */
+		if (errno==EINTR) goto again; /* signal, try again */
+		LOG(L_CRIT, "ERROR: send_all: send on %d failed: %s\n",
+					socket, strerror(errno));
+	}
+	return n;
+}
+
+
 /* at least 1 byte must be sent! */
 int send_fd(int unix_socket, void* data, int data_len, int fd)
 {
@@ -76,8 +123,13 @@ int send_fd(int unix_socket, void* data, int data_len, int fd)
 	msg.msg_iov=iov;
 	msg.msg_iovlen=1;
 	
-	
+again:
 	ret=sendmsg(unix_socket, &msg, 0);
+	if (ret<0){
+		if (errno==EINTR) goto again;
+		LOG(L_CRIT, "ERROR: send_fd: sendmsg failed on %d: %s\n",
+				unix_socket, strerror(errno));
+	}
 	
 	return ret;
 }
@@ -90,6 +142,7 @@ int receive_fd(int unix_socket, void* data, int data_len, int* fd)
 	struct iovec iov[1];
 	int new_fd;
 	int ret;
+	int n;
 #ifdef HAVE_MSGHDR_MSG_CONTROL
 	struct cmsghdr* cmsg;
 	union{
@@ -112,26 +165,47 @@ int receive_fd(int unix_socket, void* data, int data_len, int* fd)
 	msg.msg_iov=iov;
 	msg.msg_iovlen=1;
 	
-	ret=recvmsg(unix_socket, &msg, 0);
-	if (ret<=0) goto error;
+again:
+	ret=recvmsg(unix_socket, &msg, MSG_WAITALL);
+	if (ret<0){
+		if (errno==EINTR) goto again;
+		LOG(L_CRIT, "ERROR: receive_fd: recvmsg on %d failed: %s\n",
+				unix_socket, strerror(errno));
+		goto error;
+	}
+	if (ret==0){
+		/* EOF */
+		LOG(L_CRIT, "ERROR: receive_fd: EOF on %d\n", unix_socket);
+		goto error;
+	}
+	if (ret<data_len){
+		LOG(L_WARN, "WARNING: receive_fd: too few bytes read (%d from %d)"
+				    "trying to fix...\n", ret, data_len);
+		n=recv_all(unix_socket, (char*)data+ret, data_len-ret);
+		if (n>=0) ret+=n;
+		else{
+			ret=n;
+			goto error;
+		}
+	}
 	
 #ifdef HAVE_MSGHDR_MSG_CONTROL
 	cmsg=CMSG_FIRSTHDR(&msg);
 	if ((cmsg!=0) && (cmsg->cmsg_len==CMSG_LEN(sizeof(new_fd)))){
 		if (cmsg->cmsg_type!= SCM_RIGHTS){
-			LOG(L_ERR, "receive_fd: msg control type != SCM_RIGHTS\n");
+			LOG(L_ERR, "ERROR: receive_fd: msg control type != SCM_RIGHTS\n");
 			ret=-1;
 			goto error;
 		}
 		if (cmsg->cmsg_level!= SOL_SOCKET){
-			LOG(L_ERR, "receive_fd: msg level != SOL_SOCKET\n");
+			LOG(L_ERR, "ERROR: receive_fd: msg level != SOL_SOCKET\n");
 			ret=-1;
 			goto error;
 		}
 		*fd=*((int*) CMSG_DATA(cmsg));
 	}else{
-		LOG(L_ERR, "receive_fd: no descriptor passed, cmsg=%p, len=%d\n",
-				cmsg, cmsg->cmsg_len);
+		LOG(L_ERR, "ERROR: receive_fd: no descriptor passed, cmsg=%p,"
+				"len=%d\n", cmsg, cmsg->cmsg_len);
 		*fd=-1;
 		/* it's not really an error */
 	}
@@ -139,8 +213,8 @@ int receive_fd(int unix_socket, void* data, int data_len, int* fd)
 	if (msg.msg_accrightslen==sizeof(int)){
 		*fd=new_fd;
 	}else{
-		LOG(L_ERR, "receive_fd: no descriptor passed, accrightslen=%d\n",
-				msg.msg_accrightslen);
+		LOG(L_ERR, "ERROR: receive_fd: no descriptor passed,"
+				" accrightslen=%d\n", msg.msg_accrightslen);
 		*fd=-1;
 	}
 #endif
@@ -148,5 +222,4 @@ int receive_fd(int unix_socket, void* data, int data_len, int* fd)
 error:
 	return ret;
 }
-
 #endif

+ 2 - 0
pass_fd.h

@@ -32,6 +32,8 @@
 int send_fd(int unix_socket, void* data, int data_len, int fd);
 int receive_fd(int unix_socket, void* data, int data_len, int* fd);
 
+int recv_all(int socket, void* data, int data_len);
+int send_all(int socket, void* data, int data_len);
 
 
 #endif

+ 4 - 2
tcp_conn.h

@@ -44,8 +44,8 @@
 #define TCP_CON_MAX_ALIASES 4 /* maximum number of port aliases */
 
 #define TCP_BUF_SIZE 65535
-#define TCP_CON_TIMEOUT 60 /* in  seconds */
-#define TCP_CON_SEND_TIMEOUT 30 /* timeout after a send */
+#define TCP_CON_TIMEOUT 120 /* in  seconds */
+#define TCP_CON_SEND_TIMEOUT 120 /* timeout after a send */
 #define TCP_CHILD_TIMEOUT 5 /* after 5 seconds, the child "returns" 
 							 the connection to the tcp master process */
 #define TCP_MAIN_SELECT_TIMEOUT 5 /* how often "tcp main" checks for timeout*/
@@ -73,6 +73,8 @@ enum tcp_conn_states { S_CONN_ERROR=-2, S_CONN_BAD=-1, S_CONN_OK=0,
 /* fd communication commands */
 enum conn_cmds { CONN_DESTROY=-3, CONN_ERROR=-2, CONN_EOF=-1, CONN_RELEASE, 
 					CONN_GET_FD, CONN_NEW };
+/* CONN_RELEASE, EOF, ERROR, DESTROY can be used by "reader" processes
+ * CONN_GET_FD, NEW, ERROR only by writers */
 
 struct tcp_req{
 	struct tcp_req* next;

+ 199 - 80
tcp_main.c

@@ -44,6 +44,10 @@
  *  2003-07-09  tls_close called before closing the tcp connection (andrei)
  *  2003-10-24  converted to the new socket_info lists (andrei)
  *  2003-10-27  tcp port aliases support added (andrei)
+ *  2003-11-04  always lock before manipulating refcnt; sendchild
+ *              does not inc refcnt by itself anymore (andrei)
+ *  2003-11-07  different unix sockets are used for fd passing
+ *              to/from readers/writers (andrei)
  */
 
 
@@ -100,7 +104,7 @@
 struct tcp_child{
 	pid_t pid;
 	int proc_no; /* ser proc_no, for debugging */
-	int unix_sock; /* unix sock fd, copied from pt*/
+	int unix_sock; /* unix "read child" sock fd */
 	int busy;
 	int n_reqs; /* number of requests serviced so far */
 };
@@ -242,7 +246,13 @@ struct tcp_connection* tcpconn_connect(union sockaddr_union* server, int type)
 				strerror(errno), errno);
 		si=0; /* try to go on */
 	}
-	si=find_tcp_si(&my_name);
+#ifdef USE_TLS
+	if (type==PROTO_TLS)
+		si=find_tls_si(&my_name);
+	else
+#endif
+		si=find_tcp_si(&my_name);
+
 	if (si==0){
 		LOG(L_ERR, "ERROR: tcp_connect: could not find coresponding"
 				" listening socket, using default...\n");
@@ -443,9 +453,20 @@ error_sec:
 
 
 
+void tcpconn_ref(struct tcp_connection* c)
+{
+	TCPCONN_LOCK;
+	c->refcnt++; /* FIXME: atomic_dec */
+	TCPCONN_UNLOCK;
+}
+
+
+
 void tcpconn_put(struct tcp_connection* c)
 {
+	TCPCONN_LOCK;
 	c->refcnt--; /* FIXME: atomic_dec */
+	TCPCONN_UNLOCK;
 }
 
 
@@ -455,6 +476,7 @@ int tcp_send(int type, char* buf, unsigned len, union sockaddr_union* to,
 				int id)
 {
 	struct tcp_connection *c;
+	struct tcp_connection *tmp;
 	struct ip_addr ip;
 	int port;
 	int fd;
@@ -494,22 +516,25 @@ no_id:
 				LOG(L_ERR, "ERROR: tcp_send: connect failed\n");
 				return -1;
 			}
-			c->refcnt++;
+			c->refcnt++; /* safe to do it w/o locking, it's not yet
+							available to the rest of the world */
 			fd=c->s;
 			
 			/* send the new tcpconn to "tcp main" */
 			response[0]=(long)c;
 			response[1]=CONN_NEW;
-			n=write(unix_tcp_sock, response, sizeof(response));
-			if (n<0){
+			n=send_all(unix_tcp_sock, response, sizeof(response));
+			if (n<=0){
 				LOG(L_ERR, "BUG: tcp_send: failed write: %s (%d)\n",
 						strerror(errno), errno);
+				n=-1;
 				goto end;
 			}	
 			n=send_fd(unix_tcp_sock, &c, sizeof(c), c->s);
-			if (n<0){
+			if (n<=0){
 				LOG(L_ERR, "BUG: tcp_send: failed send_fd: %s (%d)\n",
 						strerror(errno), errno);
+				n=-1;
 				goto end;
 			}
 			goto send_it;
@@ -517,21 +542,34 @@ no_id:
 get_fd:
 			/* todo: see if this is not the same process holding
 			 *  c  and if so send directly on c->fd */
-			DBG("tcp_send: tcp connection found, acquiring fd\n");
+			DBG("tcp_send: tcp connection found (%p), acquiring fd\n", c);
 			/* get the fd */
 			response[0]=(long)c;
 			response[1]=CONN_GET_FD;
-			n=write(unix_tcp_sock, response, sizeof(response));
-			if (n<0){
+			n=send_all(unix_tcp_sock, response, sizeof(response));
+			if (n<=0){
 				LOG(L_ERR, "BUG: tcp_send: failed to get fd(write):%s (%d)\n",
 						strerror(errno), errno);
+				n=-1;
 				goto release_c;
 			}
 			DBG("tcp_send, c= %p, n=%d\n", c, n);
+			tmp=c;
 			n=receive_fd(unix_tcp_sock, &c, sizeof(c), &fd);
-			if (n<0){
+			if (n<=0){
 				LOG(L_ERR, "BUG: tcp_send: failed to get fd(receive_fd):"
 							" %s (%d)\n", strerror(errno), errno);
+				n=-1;
+				goto release_c;
+			}
+			if (c!=tmp){
+				LOG(L_CRIT, "BUG: tcp_send: get_fd: got different connection:"
+						"  %p (id= %d, refcnt=%d state=%d != "
+						"  %p (id= %d, refcnt=%d state=%d (n=%d)\n",
+						  c,   c->id,   c->refcnt,   c->state,
+						  tmp, tmp->id, tmp->refcnt, tmp->state, n
+				   );
+				n=-1; /* fail */
 				goto release_c;
 			}
 			DBG("tcp_send: after receive_fd: c= %p n=%d fd=%d\n",c, n, fd);
@@ -567,11 +605,12 @@ send_it:
 		/* tell "main" it should drop this (optional it will t/o anyway?)*/
 		response[0]=(long)c;
 		response[1]=CONN_ERROR;
-		n=write(unix_tcp_sock, response, sizeof(response));
-		/* CONN_ERROR wil auto-dec refcnt => we must not call tcpconn_put !!*/
-		if (n<0){
+		n=send_all(unix_tcp_sock, response, sizeof(response));
+		/* CONN_ERROR will auto-dec refcnt => we must not call tcpconn_put !!*/
+		if (n<=0){
 			LOG(L_ERR, "BUG: tcp_send: error return failed (write):%s (%d)\n",
 					strerror(errno), errno);
+			n=-1;
 		}
 		close(fd);
 		return n; /* error return, no tcpconn_put */
@@ -737,19 +776,21 @@ static int send2child(struct tcp_connection* tcpconn)
 	
 	tcp_children[idx].busy++;
 	tcp_children[idx].n_reqs++;
-	tcpconn->refcnt++;
 	if (min_busy){
-		LOG(L_WARN, "WARNING: send2child:no free tcp receiver, "
+		LOG(L_WARN, "WARNING: send2child: no free tcp receiver, "
 				" connection passed to the least busy one (%d)\n",
 				min_busy);
 	}
 	DBG("send2child: to tcp child %d %d(%d), %p\n", idx, 
 					tcp_children[idx].proc_no,
 					tcp_children[idx].pid, tcpconn);
-	send_fd(tcp_children[idx].unix_sock, &tcpconn, sizeof(tcpconn),
-			tcpconn->s);
+	if (send_fd(tcp_children[idx].unix_sock, &tcpconn, sizeof(tcpconn),
+			tcpconn->s)<=0){
+		LOG(L_ERR, "ERROR: send2child: send_fd failed\n");
+		return -1;
+	}
 	
-	return 0; /* just to fix a warning*/
+	return 0;
 }
 
 
@@ -776,6 +817,8 @@ static inline void handle_new_connect(struct socket_info* si,
 		/* add socket to list */
 		tcpconn=tcpconn_new(new_sock, &su, si, si->proto, S_CONN_ACCEPT);
 		if (tcpconn){
+			tcpconn->refcnt++; /* safe, not yet available to the
+								  outside world */
 			tcpconn_add(tcpconn);
 			DBG("tcp_main_loop: new connection: %p %d\n",
 				tcpconn, tcpconn->s);
@@ -784,6 +827,7 @@ static inline void handle_new_connect(struct socket_info* si,
 				LOG(L_ERR,"ERROR: tcp_main_loop: no children "
 						"available\n");
 				TCPCONN_LOCK;
+				tcpconn->refcnt--;
 				if (tcpconn->refcnt==0){
 					close(tcpconn->s);
 					_tcpconn_rm(tcpconn);
@@ -795,6 +839,34 @@ static inline void handle_new_connect(struct socket_info* si,
 }
 
 
+/* used internally by tcp_main_loop() */
+static void tcpconn_destroy(struct tcp_connection* tcpconn)
+{
+	int fd;
+
+	TCPCONN_LOCK; /*avoid races w/ tcp_send*/
+	tcpconn->refcnt--;
+	if (tcpconn->refcnt==0){ 
+		DBG("tcp_main_loop: destroying connection\n");
+		fd=tcpconn->s;
+#ifdef USE_TLS
+		/*FIXME: lock ->writelock ? */
+		if (tcpconn->type==PROTO_TLS)
+			tls_close(tcpconn, fd);
+#endif
+		_tcpconn_rm(tcpconn);
+		close(fd);
+	}else{
+		/* force timeout */
+		tcpconn->timeout=0;
+		tcpconn->state=S_CONN_BAD;
+		DBG("tcp_main_loop: delaying ...\n");
+		
+	}
+	TCPCONN_UNLOCK;
+}
+
+
 void tcp_main_loop()
 {
 	int r;
@@ -843,6 +915,14 @@ void tcp_main_loop()
 			if (pt[r].unix_sock>maxfd) maxfd=pt[r].unix_sock;
 		}
 	}
+	for (r=0; r<tcp_children_no; r++){
+		if (tcp_children[r].unix_sock>0){ /* we can't have 0, 
+											 we never close it!*/
+			FD_SET(tcp_children[r].unix_sock, &master_set);
+			if (tcp_children[r].unix_sock>maxfd)
+				maxfd=tcp_children[r].unix_sock;
+		}
+	}
 	
 	
 	/* main loop*/
@@ -872,6 +952,7 @@ void tcp_main_loop()
 		for (h=0; h<TCP_ID_HASH_SIZE; h++){
 			for(tcpconn=tcpconn_id_hash[h]; tcpconn && n; 
 					tcpconn=tcpconn->id_next){
+				/* FIXME: is refcnt==0 really necessary? */
 				if ((tcpconn->refcnt==0)&&(FD_ISSET(tcpconn->s, &sel_set))){
 					/* new data available */
 					n--;
@@ -879,10 +960,12 @@ void tcp_main_loop()
 					DBG("tcp_main_loop: data available on %p [h:%d] %d\n",
 							tcpconn, h, tcpconn->s);
 					FD_CLR(tcpconn->s, &master_set);
+					tcpconn_ref(tcpconn); /* refcnt ++ */
 					if (send2child(tcpconn)<0){
 						LOG(L_ERR,"ERROR: tcp_main_loop: no "
 									"children available\n");
 						TCPCONN_LOCK;
+						tcpconn->refcnt--;
 						if (tcpconn->refcnt==0){
 							fd=tcpconn->s;
 							_tcpconn_rm(tcpconn);
@@ -894,109 +977,138 @@ void tcp_main_loop()
 			}
 		}
 		/* check unix sockets & listen | destroy connections */
-		/* start from 1, the "main" process does not transmit anything*/
-		for (r=1; r<process_no && n; r++){
-			if ( (pt[r].unix_sock>0) && FD_ISSET(pt[r].unix_sock, &sel_set)){
+		/* tcp_children readers first */
+		for (r=0; r<tcp_children_no && n; r++){
+			if ( (tcp_children[r].unix_sock>0) && 
+					FD_ISSET(tcp_children[r].unix_sock, &sel_set)){
 				/* (we can't have a fd==0, 0 is never closed )*/
 				n--;
-				/* errno==EINTR !!! TODO*/
-read_again:
-				bytes=read(pt[r].unix_sock, response, sizeof(response));
+				/* read until sizeof(response)
+				 * (this is a SOCK_STREAM so read is not atomic */
+				bytes=recv_all(tcp_children[r].unix_sock, response,
+								sizeof(response));
 				if (bytes==0){
 					/* EOF -> bad, child has died */
-					LOG(L_CRIT, "BUG: tcp_main_loop: dead child %d\n", r);
+					LOG(L_CRIT, "BUG: tcp_main_loop: dead tcp child %d\n", r);
 					/* don't listen on it any more */
-					FD_CLR(pt[r].unix_sock, &master_set);
+					FD_CLR(tcp_children[r].unix_sock, &master_set);
 					/*exit(-1);*/
-					continue;
+					continue; /* skip this and try the next one */
 				}else if (bytes<0){
-					if (errno==EINTR) goto read_again;
-					else{
-						LOG(L_CRIT, "ERROR: tcp_main_loop: read from child: "
-								" %s\n", strerror(errno));
-						/* try to continue ? */
-						continue;
-					}
+					LOG(L_CRIT, "ERROR: tcp_main_loop: read from tcp child %d "
+							"%s\n", r, strerror(errno));
+					/* try to ignore ? */
+					continue; /* skip this and try the next one */
 				}
 					
-				DBG("tcp_main_loop: read response= %lx, %ld from %d (%d)\n",
-						response[0], response[1], r, pt[r].pid);
+				DBG("tcp_main_loop: reader response= %lx, %ld from %d \n",
+						response[0], response[1], r);
 				cmd=response[1];
+				tcpconn=(struct tcp_connection*)response[0];
 				switch(cmd){
 					case CONN_RELEASE:
-						if (pt[r].idx>=0){
-							tcp_children[pt[r].idx].busy--;
-						}else{
-							LOG(L_CRIT, "BUG: tcp_main_loop: CONN_RELEASE\n");
-						}
-						tcpconn=(struct tcp_connection*)response[0];
+						tcp_children[r].busy--;
 						if (tcpconn){
-								if (tcpconn->state==S_CONN_BAD) 
-									goto tcpconn_destroy;
+								if (tcpconn->state==S_CONN_BAD){ 
+									tcpconn_destroy(tcpconn);
+									break;
+								}
 								FD_SET(tcpconn->s, &master_set);
 								if (maxfd<tcpconn->s) maxfd=tcpconn->s;
 								/* update the timeout*/
 								tcpconn->timeout=get_ticks()+TCP_CON_TIMEOUT;
 								tcpconn_put(tcpconn);
-								DBG("tcp_main_loop: %p refcnt= %d\n", 
-									tcpconn, tcpconn->refcnt);
+								DBG("tcp_main_loop: CONN_RELEASE  %p"
+										" refcnt= %d\n", 
+										tcpconn, tcpconn->refcnt);
 						}
 						break;
 					case CONN_ERROR:
 					case CONN_DESTROY:
 					case CONN_EOF:
 						/* WARNING: this will auto-dec. refcnt! */
-						if (pt[r].idx>=0){
-							tcp_children[pt[r].idx].busy--;
-						}else{
-							LOG(L_CRIT, "BUG: tcp_main_loop: CONN_RELEASE\n");
+						tcp_children[pt[r].idx].busy--;
+						if (tcpconn){
+							if (tcpconn->s!=-1)
+								FD_CLR(tcpconn->s, &master_set);
+							tcpconn_destroy(tcpconn);
 						}
-						tcpconn=(struct tcp_connection*)response[0];
+						break;
+					default:
+							LOG(L_CRIT, "BUG: tcp_main_loop:  unknown cmd %d"
+										" from tcp reader %d\n",
+									cmd, r);
+				}
+			}
+		}
+		/* check "send" unix sockets & listen | destroy connections */
+		/* start from 1, the "main" process does not transmit anything*/
+		for (r=1; r<process_no && n; r++){
+			if ( (pt[r].unix_sock>0) && FD_ISSET(pt[r].unix_sock, &sel_set)){
+				/* (we can't have a fd==0, 0 is never closed )*/
+				n--;
+				/* read until sizeof(response)
+				 * (this is a SOCK_STREAM so read is not atomic */
+				bytes=recv_all(pt[r].unix_sock, response, sizeof(response));
+				if (bytes==0){
+					/* EOF -> bad, child has died */
+					LOG(L_CRIT, "BUG: tcp_main_loop: dead child %d\n", r);
+					/* don't listen on it any more */
+					FD_CLR(pt[r].unix_sock, &master_set);
+					/*exit(-1);*/
+					continue; /* skip this and try the next one */
+				}else if (bytes<0){
+					LOG(L_CRIT, "ERROR: tcp_main_loop: read from child:  %s\n",
+							strerror(errno));
+					/* try to ignore ? */
+					continue; /* skip this and try the next one */
+				}
+					
+				DBG("tcp_main_loop: read response= %lx, %ld from %d (%d)\n",
+						response[0], response[1], r, pt[r].pid);
+				cmd=response[1];
+				tcpconn=(struct tcp_connection*)response[0];
+				switch(cmd){
+					case CONN_ERROR:
 						if (tcpconn){
 							if (tcpconn->s!=-1)
 								FD_CLR(tcpconn->s, &master_set);
-		tcpconn_destroy:
-							TCPCONN_LOCK; /*avoid races w/ tcp_send*/
-							tcpconn->refcnt--;
-							if (tcpconn->refcnt==0){ 
-								DBG("tcp_main_loop: destroying connection\n");
-								fd=tcpconn->s;
-#ifdef USE_TLS
-								if (tcpconn->type==PROTO_TLS)
-									tls_close(tcpconn, fd);
-#endif
-								_tcpconn_rm(tcpconn);
-								close(fd);
-							}else{
-								/* force timeout */
-								tcpconn->timeout=0;
-								tcpconn->state=S_CONN_BAD;
-								DBG("tcp_main_loop: delaying ...\n");
-								
-							}
-							TCPCONN_UNLOCK;
+							tcpconn_destroy(tcpconn);
 						}
 						break;
 					case CONN_GET_FD:
 						/* send the requested FD  */
-						tcpconn=(struct tcp_connection*)response[0];
 						/* WARNING: take care of setting refcnt properly to
 						 * avoid race condition */
 						if (tcpconn){
-							send_fd(pt[r].unix_sock, &tcpconn,
-									sizeof(tcpconn), tcpconn->s);
+							if (send_fd(pt[r].unix_sock, &tcpconn,
+										sizeof(tcpconn), tcpconn->s)<=0){
+								LOG(L_ERR, "ERROR: tcp_main_loop:"
+										"send_fd failed\n");
+							}
 						}else{
 							LOG(L_CRIT, "BUG: tcp_main_loop: null pointer\n");
 						}
 						break;
 					case CONN_NEW:
 						/* update the fd in the requested tcpconn*/
-						tcpconn=(struct tcp_connection*)response[0];
 						/* WARNING: take care of setting refcnt properly to
 						 * avoid race condition */
 						if (tcpconn){
-							receive_fd(pt[r].unix_sock, &tcpconn,
-										sizeof(tcpconn), &tcpconn->s);
+							bytes=receive_fd(pt[r].unix_sock, &tcpconn,
+									sizeof(tcpconn), &tcpconn->s);
+								if (bytes<sizeof(tcpconn)){
+									if (bytes<0){
+										LOG(L_CRIT, "BUG: tcp_main_loop:"
+												" CONN_NEW: receive_fd "
+												"failed\n");
+									}else{
+										LOG(L_CRIT, "BUG: tcp_main_loop:"
+												" CONN_NEW: to few bytes "
+												"received (%d)\n", bytes );
+									}
+									break; /* try to ignore */
+								}
 							/* add tcpconn to the list*/
 							tcpconn_add(tcpconn);
 							FD_SET(tcpconn->s, &master_set);
@@ -1012,7 +1124,7 @@ read_again:
 									cmd);
 				}
 			}
-		}
+		} /* for */
 		
 		/* remove old connections */
 		tcpconn_timeout(&master_set);
@@ -1114,6 +1226,7 @@ int tcp_init_children()
 {
 	int r;
 	int sockfd[2];
+	int reader_fd[2]; /* for comm. with the tcp children read  */
 	pid_t pid;
 	
 	
@@ -1127,6 +1240,11 @@ int tcp_init_children()
 					strerror(errno));
 			goto error;
 		}
+		if (socketpair(AF_UNIX, SOCK_STREAM, 0, reader_fd)<0){
+			LOG(L_ERR, "ERROR: tcp_main: socketpair failed: %s\n",
+					strerror(errno));
+			goto error;
+		}
 		
 		process_no++;
 		pid=fork();
@@ -1137,11 +1255,12 @@ int tcp_init_children()
 		}else if (pid>0){
 			/* parent */
 			close(sockfd[1]);
+			close(reader_fd[1]);
 			tcp_children[r].pid=pid;
 			tcp_children[r].proc_no=process_no;
 			tcp_children[r].busy=0;
 			tcp_children[r].n_reqs=0;
-			tcp_children[r].unix_sock=sockfd[0];
+			tcp_children[r].unix_sock=reader_fd[0];
 			pt[process_no].pid=pid;
 			pt[process_no].unix_sock=sockfd[0];
 			pt[process_no].idx=r;
@@ -1156,7 +1275,7 @@ int tcp_init_children()
 				LOG(L_ERR, "init_children failed\n");
 				goto error;
 			}
-			tcp_receive_loop(sockfd[1]);
+			tcp_receive_loop(reader_fd[1]);
 		}
 	}
 	return 0;

+ 25 - 17
tcp_read.c

@@ -399,20 +399,11 @@ int tcp_read_req(struct tcp_connection* con)
 		req=&con->req;
 #ifdef USE_TLS
 		if (con->type==PROTO_TLS){
-			if (con->state==S_CONN_ACCEPT){
-				if (tls_accept(con, 0)!=0){
-					resp=CONN_ERROR;
-					goto end_req;
-				}
-				if(con->state!=S_CONN_OK) goto end_req; /* not enough data */
-			}
-			if(con->state==S_CONN_CONNECT){
-				if (tls_connect(con, 0)!=0){
-					resp=CONN_ERROR;
-					goto end_req;
-				}
-				if(con->state!=S_CONN_OK) goto end_req; /* not enough data */
+			if (tls_fix_read_conn(con)!=0){
+				resp=CONN_ERROR;
+				goto end_req;
 			}
+			if(con->state!=S_CONN_OK) goto end_req; /* not enough data */
 		}
 #endif
 
@@ -547,7 +538,8 @@ void release_tcpconn(struct tcp_connection* c, long state, int unix_sock)
 		/* errno==EINTR, EWOULDBLOCK a.s.o todo */
 		response[0]=(long)c;
 		response[1]=state;
-		write(unix_sock, response, sizeof(response));
+		if (send_all(unix_sock, response, sizeof(response))<=0)
+			LOG(L_ERR, "ERROR: release_tcpconn: send_all failed\n");
 }
 
 
@@ -625,12 +617,18 @@ void tcp_receive_loop(int unix_sock)
 					release_tcpconn(con, resp, unix_sock);
 					goto skip;
 				}
-#ifdef USE_TLS
-				if (con->type==PROTO_TLS) tls_tcpconn_update_fd(con, s);
-#endif
 				con->timeout=get_ticks()+TCP_CHILD_TIMEOUT;
 				FD_SET(s, &master_set);
 				if (maxfd<s) maxfd=s;
+				if (con==list){
+					LOG(L_CRIT, "BUG: tcp_receive_loop: duplicate"
+							" connection recevied: %p, id %d, fd %d, refcnt %d"
+							" state %d (n=%d)\n", con, con->id, con->fd,
+							con->refcnt, con->state, n);
+					resp=CONN_ERROR;
+					release_tcpconn(con, resp, unix_sock);
+					goto skip; /* try to recover */
+				}
 				tcpconn_listadd(list, con, c_next, c_prev);
 			}
 skip:
@@ -641,12 +639,22 @@ skip:
 				DBG("tcp receive: list fd=%d, id=%d, timeout=%d, refcnt=%d\n",
 						con->fd, con->id, con->timeout, con->refcnt);
 #endif
+				if (con->state<0){
+					/* S_CONN_BAD or S_CONN_ERROR, remove it */
+					resp=CONN_ERROR;
+					FD_CLR(con->fd, &master_set);
+					tcpconn_listrm(list, con, c_next, c_prev);
+					con->state=S_CONN_BAD;
+					release_tcpconn(con, resp, unix_sock);
+					continue;
+				}
 				if (nfds && FD_ISSET(con->fd, &sel_set)){
 #ifdef EXTRA_DEBUG
 					DBG("tcp receive: match, fd:isset\n");
 #endif
 					nfds--;
 					resp=tcp_read_req(con);
+					
 					if (resp<0){
 						FD_CLR(con->fd, &master_set);
 						tcpconn_listrm(list, con, c_next, c_prev);

+ 1 - 1
test/test.cfg

@@ -26,7 +26,7 @@ rev_dns=off      # (cmd. line: -R)
 alias=iptel.org
 alias="foo.bar"
 fifo="/tmp/ser_fifo"
-listen= tcp:10.0.0.179:5065
+#listen= tcp:10.0.0.179:5065
 alias=  tcp:all:5065
 tcp_accept_aliases=yes
 

+ 30 - 1
test/udp_flood.c

@@ -50,6 +50,8 @@ Options:\n\
     -d address    destination address\n\
     -p port       destination port\n\
     -c count      number of packets to be sent\n\
+    -s usec       microseconds to sleep before sending \"throttle\" packets\n\
+    -t throttle   number of packets to send before sleeping\n\
     -v            increase verbosity level\n\
     -V            version number\n\
     -h            this help message\n\
@@ -74,6 +76,9 @@ int main (int argc, char** argv)
 	char *fname;
 	char *dst;
 	int port;
+	long usec;
+	int throttle;
+	int t;
 	
 	/* init */
 	count=0;
@@ -81,9 +86,11 @@ int main (int argc, char** argv)
 	fname=0;
 	dst=0;
 	port=0;
+	usec=0;
+	throttle=0;
 
 	opterr=0;
-	while ((c=getopt(argc,argv, "f:c:d:p:vhV"))!=-1){
+	while ((c=getopt(argc,argv, "f:c:d:p:s:t:vhV"))!=-1){
 		switch(c){
 			case 'f':
 				fname=optarg;
@@ -108,6 +115,20 @@ int main (int argc, char** argv)
 					goto error;
 				}
 				break;
+			case 's':
+				usec=strtol(optarg, &tmp, 10);
+				if ((tmp==0)||(*tmp)){
+					fprintf(stderr, "bad count: -c %s\n", optarg);
+					goto error;
+				}
+				break;
+			case 't':
+				throttle=strtol(optarg, &tmp, 10);
+				if ((tmp==0)||(*tmp)){
+					fprintf(stderr, "bad count: -c %s\n", optarg);
+					goto error;
+				}
+				break;
 			case 'V':
 				printf("version: %s\n", version);
 				printf("%s\n",id);
@@ -197,12 +218,20 @@ int main (int argc, char** argv)
 
 
 	/* flood loop */
+	t=throttle;
 	for (r=0; r<count; r++){
 		if ((verbose>1)&&(r%1000))  putchar('.');
 		if (send(sock, buf, n, 0)==-1) {
 			fprintf(stderr, "Error: send: %s\n",  strerror(errno));
 			exit(1);
 		}
+		if (usec){
+			t--;
+			if (t==0){
+				usleep(usec);
+				t=throttle;
+			}
+		}
 	}
 	printf("\n%d packets sent, %d bytes each => total %d bytes\n",
 			count, n, n*count);