Răsfoiți Sursa

- tcp children number is not anymore limited (was limited to 100)
- tcp connection are passed to the tcp reader processes after some data is
received on them and not immediately after accept(). This speeds up accepting
lots of new connections. Old behaviour can be select by undef-ing
TCP_PASS_NEW_CONNECTION_ON_DATA in tcp_main.c
- lots of cleanups and small fixes
- tsend_stream now support waiting forever (timeout==-1)

Orientative test results:
- 50k connections to ser opened, 1 packet sent of them and then closed in 9.5s - 50-65Mb data rate on 50k connections (with 20 ser tcp processes)
- 200-240Mb data rate on 1 connection

Andrei Pelinescu-Onciul 19 ani în urmă
părinte
comite
746f7674ae
3 a modificat fișierele cu 112 adăugiri și 72 ștergeri
  1. 94 64
      tcp_main.c
  2. 5 2
      tcp_read.c
  3. 13 6
      tsend.c

+ 94 - 64
tcp_main.c

@@ -64,8 +64,8 @@
  *              process all children requests, before attempting to send
  *              them new stuff (fixes some deadlocks) (andrei)
  *  2006-02-03  timers are run only once per s (andrei)
- *              tcp children fds can be non-blocking; send fds are queues on
- *              EAGAIN (andrei)
+ *              tcp children fds can be non-blocking; send fds are queued on
+ *              EAGAIN; lots of bug fixes (andrei)
  */
 
 
@@ -112,6 +112,7 @@
 #include "tcp_server.h"
 #include "tcp_init.h"
 #include "tsend.h"
+#include "timer_ticks.h"
 #ifdef USE_TLS
 #include "tls/tls_server.h"
 #endif 
@@ -123,7 +124,9 @@
 #include "io_wait.h"
 #include <fcntl.h> /* must be included after io_wait.h if SIGIO_RT is used */
 
-#define MAX_TCP_CHILDREN 100
+#define TCP_PASS_NEW_CONNECTION_ON_DATA /* don't pass a new connection
+										   immediately to a child, wait for
+										   some data on it first */
 #define TCP_LISTEN_BACKLOG 1024
 #define SEND_FD_QUEUE /* queue send fd requests on EAGAIN, instead of sending 
 							them immediately */
@@ -132,9 +135,10 @@
 #ifndef TCP_CHILD_NON_BLOCKING
 #define TCP_CHILD_NON_BLOCKING
 #endif
-#define MAX_SEND_FD_QUEUE_SIZE	1024  /* alternative: tcp_max_fd_no */
+#define MAX_SEND_FD_QUEUE_SIZE	tcp_max_fd_no
 #define SEND_FD_QUEUE_SIZE		128  /* initial size */
-#define MAX_SEND_FD_RETRIES		3	 /* FIXME: increase */
+#define MAX_SEND_FD_RETRIES		96	 /* FIXME: not used for now */
+#define SEND_FD_QUEUE_TIMEOUT	MS_TO_TICKS(2000)  /* 2 s */
 #endif
 
 
@@ -167,7 +171,7 @@ struct tcp_conn_alias** tcpconn_aliases_hash=0;
 struct tcp_connection** tcpconn_id_hash=0;
 gen_lock_t* tcpconn_lock=0;
 
-struct tcp_child tcp_children[MAX_TCP_CHILDREN];
+static struct tcp_child* tcp_children;
 static int* connection_id=0; /*  unique for each connection, used for 
 								quickly finding the corresponding connection
 								for a reply */
@@ -948,11 +952,44 @@ error:
 
 
 
+/* used internally by tcp_main_loop() */
+static void tcpconn_destroy(struct tcp_connection* tcpconn)
+{
+	int fd;
+
+	TCPCONN_LOCK; /*avoid races w/ tcp_send*/
+	tcpconn->refcnt--;
+	if (tcpconn->refcnt==0){ 
+		DBG("tcpconn_destroy: destroying connection %p, flags %04x\n",
+				tcpconn, tcpconn->flags);
+		fd=tcpconn->s;
+#ifdef USE_TLS
+		/*FIXME: lock ->writelock ? */
+		if (tcpconn->type==PROTO_TLS)
+			tls_close(tcpconn, fd);
+#endif
+		_tcpconn_rm(tcpconn);
+		close(fd);
+		tcp_connections_no--;
+	}else{
+		/* force timeout */
+		tcpconn->timeout=0;
+		tcpconn->state=S_CONN_BAD;
+		DBG("tcpconn_destroy: delaying (%p, flags %04x) ...\n",
+				tcpconn, tcpconn->flags);
+		
+	}
+	TCPCONN_UNLOCK;
+}
+
+
+
 #ifdef SEND_FD_QUEUE
 struct send_fd_info{
 	struct tcp_connection* tcp_conn;
+	ticks_t expire;
 	int unix_sock;
-	int retries;
+	unsigned int retries; /* debugging */
 };
 
 struct tcp_send_fd_q{
@@ -1039,6 +1076,7 @@ inline static int send_fd_queue_add(	struct tcp_send_fd_q* q,
 	}
 	q->crt->tcp_conn=t;
 	q->crt->unix_sock=unix_sock;
+	q->crt->expire=get_ticks_raw()+SEND_FD_QUEUE_TIMEOUT;
 	q->crt->retries=0;
 	q->crt++;
 	return 0;
@@ -1057,15 +1095,19 @@ inline static void send_fd_queue_run(struct tcp_send_fd_q* q)
 		if (send_fd(p->unix_sock, &(p->tcp_conn),
 					sizeof(struct tcp_connection*), p->tcp_conn->s)<=0){
 			if ( ((errno==EAGAIN)||(errno==EWOULDBLOCK)) && 
-							(p->retries<MAX_SEND_FD_RETRIES)){
+							((s_ticks_t)(p->expire-get_ticks_raw())>0)){
 				/* leave in queue for a future try */
 				*t=*p;
 				t->retries++;
 				t++;
 			}else{
 				LOG(L_ERR, "ERROR: run_send_fd_queue: send_fd failed"
-						   "on %d socket, %ld queue entry, retries %d \n",
-						   p->unix_sock, p-&q->data[0], p->retries);
+						   " on socket %d , queue entry %ld, retries %d,"
+						   " connection %p, tcp socket %d, errno=%d (%s) \n",
+						   p->unix_sock, p-&q->data[0], p->retries,
+						   p->tcp_conn, p->tcp_conn->s, errno,
+						   strerror(errno));
+				tcpconn_destroy(p->tcp_conn);
 			}
 		}
 	}
@@ -1077,38 +1119,6 @@ inline static void send_fd_queue_run(struct tcp_send_fd_q* q)
 
 
 
-/* used internally by tcp_main_loop() */
-static void tcpconn_destroy(struct tcp_connection* tcpconn)
-{
-	int fd;
-
-	TCPCONN_LOCK; /*avoid races w/ tcp_send*/
-	tcpconn->refcnt--;
-	if (tcpconn->refcnt==0){ 
-		DBG("tcpconn_destroy: destroying connection %p, flags %04x\n",
-				tcpconn, tcpconn->flags);
-		fd=tcpconn->s;
-#ifdef USE_TLS
-		/*FIXME: lock ->writelock ? */
-		if (tcpconn->type==PROTO_TLS)
-			tls_close(tcpconn, fd);
-#endif
-		_tcpconn_rm(tcpconn);
-		close(fd);
-		tcp_connections_no--;
-	}else{
-		/* force timeout */
-		tcpconn->timeout=0;
-		tcpconn->state=S_CONN_BAD;
-		DBG("tcpconn_destroy: delaying (%p, flags %04x) ...\n",
-				tcpconn, tcpconn->flags);
-		
-	}
-	TCPCONN_UNLOCK;
-}
-
-
-
 /* handles io from a tcp child process
  * params: tcp_c - pointer in the tcp_children array, to the entry for
  *                 which an io event was detected 
@@ -1348,10 +1358,14 @@ inline static int send2child(struct tcp_connection* tcpconn)
 	int i;
 	int min_busy;
 	int idx;
+	static int crt=0; /* current child */
+	int last;
 	
 	min_busy=tcp_children[0].busy;
 	idx=0;
-	for (i=0; i<tcp_children_no; i++){
+	last=crt+tcp_children_no;
+	for (; crt<last; crt++){
+		i=crt%tcp_children_no;
 		if (!tcp_children[i].busy){
 			idx=i;
 			min_busy=0;
@@ -1361,6 +1375,7 @@ inline static int send2child(struct tcp_connection* tcpconn)
 			idx=i;
 		}
 	}
+	crt=idx+1; /* next time we start with crt%tcp_children_no */
 	
 	tcp_children[idx].busy++;
 	tcp_children[idx].n_reqs++;
@@ -1387,8 +1402,10 @@ inline static int send2child(struct tcp_connection* tcpconn)
 			tcpconn->s)<=0){
 		if ((errno==EAGAIN)||(errno==EWOULDBLOCK)){
 			/* FIXME: remove after debugging */
-			LOG(L_WARN, "WARNING: tcp child %d, socket %d: queue full\n",
-					idx, tcp_children[idx].unix_sock);
+			 LOG(L_CRIT, "INFO: tcp child %d, socket %d: queue full,"
+					 	" %d requests queued (total handled %d)\n",
+					idx, tcp_children[idx].unix_sock, min_busy,
+					tcp_children[idx].n_reqs-1);
 			if (send_fd_queue_add(&send2child_q, tcp_children[idx].unix_sock, 
 						tcpconn)!=0){
 				LOG(L_ERR, "ERROR: send2child: queue send op. failed\n");
@@ -1396,6 +1413,7 @@ inline static int send2child(struct tcp_connection* tcpconn)
 			}
 		}else{
 			LOG(L_ERR, "ERROR: send2child: send_fd failed\n");
+			return -1;
 		}
 	}
 #else
@@ -1450,6 +1468,11 @@ static inline int handle_new_connect(struct socket_info* si)
 	/* add socket to list */
 	tcpconn=tcpconn_new(new_sock, &su, si, si->proto, S_CONN_ACCEPT);
 	if (tcpconn){
+#ifdef TCP_PASS_NEW_CONNECTION_ON_DATA
+		io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn);
+		tcpconn->flags&=~F_CONN_REMOVED;
+		tcpconn_add(tcpconn);
+#else
 		tcpconn->refcnt++; /* safe, not yet available to the
 							  outside world */
 		tcpconn_add(tcpconn);
@@ -1459,14 +1482,9 @@ static inline int handle_new_connect(struct socket_info* si)
 		if(send2child(tcpconn)<0){
 			LOG(L_ERR,"ERROR: handle_new_connect: no children "
 					"available\n");
-			TCPCONN_LOCK;
-			tcpconn->refcnt--;
-			if (tcpconn->refcnt==0){
-				close(tcpconn->s);
-				_tcpconn_rm(tcpconn);
-			}else tcpconn->timeout=0; /* force expire */
-			TCPCONN_UNLOCK;
+			tcpconn_destroy(tcpconn);
 		}
+#endif
 	}else{ /*tcpconn==0 */
 		LOG(L_ERR, "ERROR: handle_new_connect: tcpconn_new failed, "
 				"closing socket\n");
@@ -1490,8 +1508,6 @@ static inline int handle_new_connect(struct socket_info* si)
  */
 inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, int fd_i)
 {
-	int fd;
-	
 	/*  is refcnt!=0 really necessary? 
 	 *  No, in fact it's a bug: I can have the following situation: a send only
 	 *   tcp connection used by n processes simultaneously => refcnt = n. In 
@@ -1516,6 +1532,8 @@ inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, int fd_i)
 	tcpconn_ref(tcpconn); /* refcnt ++ */
 	if (send2child(tcpconn)<0){
 		LOG(L_ERR,"ERROR: handle_tcpconn_ev: no children available\n");
+		tcpconn_destroy(tcpconn);
+#if 0
 		TCPCONN_LOCK;
 		tcpconn->refcnt--;
 		if (tcpconn->refcnt==0){
@@ -1524,6 +1542,7 @@ inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, int fd_i)
 			close(fd);
 		}else tcpconn->timeout=0; /* force expire*/
 		TCPCONN_UNLOCK;
+#endif
 	}
 	return 0; /* we are not interested in possibly queued io events, 
 				 the fd was either passed to a child, or closed */
@@ -1631,6 +1650,14 @@ void tcp_main_loop()
 	struct socket_info* si;
 	int r;
 	
+	/* init send fd queues (here because we want mem. alloc only in the tcp
+	 *  process */
+#ifdef SEND_FD_QUEUE
+	if (init_send_fd_queues()<0){
+		LOG(L_CRIT, "ERROR: init_tcp: could not init send fd queues\n");
+		goto error;
+	}
+#endif
 	/* init io_wait (here because we want the memory allocated only in
 	 * the tcp_main process) */
 	
@@ -1758,6 +1785,9 @@ void tcp_main_loop()
 			goto error;
 	}
 error:
+#ifdef SEND_FD_QUEUE
+	destroy_send_fd_queues();
+#endif
 	destroy_io_wait(&io_h);
 	LOG(L_CRIT, "ERROR: tcp_main_loop: exiting...");
 	exit(-1);
@@ -1786,9 +1816,10 @@ void destroy_tcp()
 			lock_dealloc((void*)tcpconn_lock);
 			tcpconn_lock=0;
 		}
-#ifdef SEND_FD_QUEUE
-		destroy_send_fd_queues();
-#endif
+		if (tcp_children){
+			pkg_free(tcp_children);
+			tcp_children=0;
+		}
 }
 
 
@@ -1834,14 +1865,7 @@ int init_tcp()
 			TCP_ALIAS_HASH_SIZE * sizeof(struct tcp_conn_alias*));
 	memset((void*)tcpconn_id_hash, 0, 
 			TCP_ID_HASH_SIZE * sizeof(struct tcp_connection*));
-	/* init send fd queues */
 	
-#ifdef SEND_FD_QUEUE
-	if (init_send_fd_queues()<0){
-		LOG(L_CRIT, "ERROR: init_tcp: could not init send fd queues\n");
-		goto error;
-	}
-#endif
 	/* fix config variables */
 	/* they can have only positive values due the config parser so we can
 	 * ignore most of them */
@@ -1916,6 +1940,12 @@ int tcp_init_children()
 	tcp_max_fd_no=process_count*2 +r-1 /* timer */ +3; /* stdin/out/err*/
 	tcp_max_fd_no+=tcp_max_connections;
 	
+	/* alloc the children array */
+	tcp_children=pkg_malloc(sizeof(struct tcp_child)*tcp_children_no);
+	if (tcp_children==0){
+			LOG(L_ERR, "ERROR: tcp_init_children: out of memory\n");
+			goto error;
+	}
 	/* create the tcp sock_info structures */
 	/* copy the sockets --moved to main_loop*/
 	

+ 5 - 2
tcp_read.c

@@ -34,6 +34,7 @@
  *              parameter & they set c->state to S_CONN_EOF on eof (andrei)
  * 2003-07-04  fixed tcp EOF handling (possible infinite loop) (andrei)
  * 2005-07-05  migrated to the new io_wait code (andrei)
+ * 2006-02-03  use tsend_stream instead of send_all (andrei)
  */
 
 #ifdef USE_TCP
@@ -66,6 +67,7 @@
 #define HANDLE_IO_INLINE
 #include "io_wait.h"
 #include <fcntl.h> /* must be included after io_wait.h if SIGIO_RT is used */
+#include "tsend.h"
 
 /* types used in io_wait* */
 enum fd_types { F_NONE, F_TCPMAIN, F_TCPCONN };
@@ -554,8 +556,9 @@ void release_tcpconn(struct tcp_connection* c, long state, int unix_sock)
 		/* errno==EINTR, EWOULDBLOCK a.s.o todo */
 		response[0]=(long)c;
 		response[1]=state;
-		if (send_all(unix_sock, response, sizeof(response))<=0)
-			LOG(L_ERR, "ERROR: release_tcpconn: send_all failed\n");
+		
+		if (tsend_stream(unix_sock, (char*)response, sizeof(response), -1)<=0)
+			LOG(L_ERR, "ERROR: release_tcpconn: tsend_stream failed\n");
 }
 
 

+ 13 - 6
tsend.c

@@ -31,6 +31,7 @@
  * --------
  *  2004-02-26  created by andrei
  *  2003-03-03  switched to heavy macro use, added tsend_dgram_ev (andrei) 
+ *  2006-02-03  tsend* will wait forever if timeout==-1 (andrei)
  */
 
 #include <string.h>
@@ -58,12 +59,16 @@
 #define TSEND_POLL(f_name) \
 poll_loop: \
 	while(1){ \
-		diff=expire-get_ticks_raw(); \
-		if (diff<=0){ \
-			LOG(L_ERR, "ERROR: " f_name ": send timeout (%d)\n", timeout); \
-			goto error; \
+		if (timeout==-1) \
+			n=poll(&pf, 1, -1); \
+		else{ \
+			diff=expire-get_ticks_raw(); \
+			if (diff<=0){ \
+				LOG(L_ERR, "ERROR: " f_name ": send timeout (%d)\n", timeout);\
+				goto error; \
+			} \
+			n=poll(&pf, 1, TICKS_TO_MS((ticks_t)diff)); \
 		} \
-		n=poll(&pf, 1, TICKS_TO_MS((ticks_t)diff)); \
 		if (n<0){ \
 			if (errno==EINTR) continue; /* signal, ignore */ \
 			LOG(L_ERR, "ERROR: " f_name ": poll failed: %s [%d]\n", \
@@ -100,8 +105,10 @@ poll_loop: \
 	
 
 
-/* sends on fd (which must be O_NONBLOCK); if it cannot send any data
+/* sends on fd (which must be O_NONBLOCK if you want a finite timeout); if it
+ * cannot send any data
  * in timeout milliseconds it will return ERROR
+ * if timeout==-1, it waits forever
  * returns: -1 on error, or number of bytes written
  *  (if less than len => couldn't send all)
  *  bugs: signals will reset the timer