Browse Source

- high connection number deadlock fix (tested with 50k open tcp connections)
- experimental queue send fd ops code
WARNING: lightly tested

Andrei Pelinescu-Onciul 19 years ago
parent
commit
98f3d5e2b9
1 changed files with 324 additions and 147 deletions
  1. 324 147
      tcp_main.c

+ 324 - 147
tcp_main.c

@@ -59,7 +59,10 @@
  *  2005-07-08  tcp_max_connections, tcp_connection_lifetime, don't accept
  *               more connections if tcp_max_connections is exceeded (andrei)
  *  2005-10-21  cleanup all the open connections on exit
- *              decrement the no. of open connections on timeout too    (andrei)
+ *              decrement the no. of open connections on timeout too    (andrei) *  2006-01-30  queue send_fd request and execute them at the end of the
+ *              poll loop  (#ifdef) (andrei)
+ *              process all children requests, before attempting to send
+ *              them new stuff (fixes some deadlocks) (andrei)
  */
 
 
@@ -119,6 +122,13 @@
 
 #define MAX_TCP_CHILDREN 100
 
+/* #define SEND_FD_QUEUE */ /* queue send fd request, instead of sending 
+							   them immediately */
+#ifdef SEND_FD_QUEUE
+#define MAX_SEND_FD_QUEUE_SIZE	tcp_max_fd_no
+#define SEND_FD_QUEUE_SIZE		128  /* initial size */
+#define MAX_SEND_FD_RETRIES		3	 /* FIXME: increase */
+#endif
 
 
 enum fd_types { F_NONE, F_SOCKINFO /* a tcp_listen fd */,
@@ -911,7 +921,7 @@ int tcp_init(struct socket_info* sock_info)
 				strerror(errno));
 		goto error;
 	}
-	if (listen(sock_info->socket, 10)==-1){
+	if (listen(sock_info->socket, 1024)==-1){
 		LOG(L_ERR, "ERROR: tcp_init: listen(%x, %p, %d) on %s: %s\n",
 				sock_info->socket, &addr->s, 
 				(unsigned)sockaddru_len(*addr),
@@ -931,109 +941,134 @@ error:
 
 
 
-static int send2child(struct tcp_connection* tcpconn)
+#ifdef SEND_FD_QUEUE
+struct send_fd_info{
+	struct tcp_connection* tcp_conn;
+	int unix_sock;
+	int retries;
+};
+
+struct tcp_send_fd_q{
+	struct send_fd_info* data; /* buffer */
+	struct send_fd_info* crt;  /* pointer inside the buffer */
+	struct send_fd_info* end;  /* points after the last valid position */
+};
+
+
+static struct tcp_send_fd_q send2child_q;
+
+
+
+static int send_fd_queue_init(struct tcp_send_fd_q *q, unsigned int size)
 {
-	int i;
-	int min_busy;
-	int idx;
-	
-	min_busy=tcp_children[0].busy;
-	idx=0;
-	for (i=0; i<tcp_children_no; i++){
-		if (!tcp_children[i].busy){
-			idx=i;
-			min_busy=0;
-			break;
-		}else if (min_busy>tcp_children[i].busy){
-			min_busy=tcp_children[i].busy;
-			idx=i;
-		}
-	}
-	
-	tcp_children[idx].busy++;
-	tcp_children[idx].n_reqs++;
-	if (min_busy){
-		DBG("WARNING: send2child: no free tcp receiver, "
-				" connection passed to the least busy one (%d)\n",
-				min_busy);
-	}
-	DBG("send2child: to tcp child %d %d(%d), %p\n", idx, 
-					tcp_children[idx].proc_no,
-					tcp_children[idx].pid, tcpconn);
-	if (send_fd(tcp_children[idx].unix_sock, &tcpconn, sizeof(tcpconn),
-			tcpconn->s)<=0){
-		LOG(L_ERR, "ERROR: send2child: send_fd failed\n");
+	q->data=pkg_malloc(size*sizeof(struct send_fd_info));
+	if (q->data==0){
+		LOG(L_ERR, "ERROR: send_fd_queue_init: out of memory\n");
 		return -1;
 	}
-	
+	q->crt=&q->data[0];
+	q->end=&q->data[size];
 	return 0;
 }
 
+static void send_fd_queue_destroy(struct tcp_send_fd_q *q)
+{
+	if (q->data){
+		pkg_free(q->data);
+		q->data=0;
+		q->crt=q->end=0;
+	}
+}
 
-/* handles a new connection, called internally by tcp_main_loop/handle_io.
- * params: si - pointer to one of the tcp socket_info structures on which
- *              an io event was detected (connection attempt)
- * returns:  handle_* return convention: -1 on error, 0 on EAGAIN (no more
- *           io events queued), >0 on success. success/error refer only to
- *           the accept.
- */
-static inline int handle_new_connect(struct socket_info* si)
+
+
+static int init_send_fd_queues()
 {
-	union sockaddr_union su;
-	struct tcp_connection* tcpconn;
-	socklen_t su_len;
-	int new_sock;
+	if (send_fd_queue_init(&get_fd_q, SEND_FD_QUEUE_SIZE)!=0)
+		goto error;
+	if (send_fd_queue_init(&send2child_q, SEND_FD_QUEUE_SIZE)!=0)
+		goto error;
+	return 0;
+error:
+	LOG(L_ERR, "ERROR: init_send_fd_queues: init failed\n");
+	return -1;
+}
+
+
+
+static void destroy_send_fd_queues()
+{
+	send_fd_queue_destroy(&get_fd_q);
+	send_fd_queue_destroy(&send2child_q);
+}
+
+
+
+
+inline static int send_fd_queue_add(	struct tcp_send_fd_q* q, 
+										int unix_sock,
+										struct tcp_connection *t)
+{
+	struct send_fd_info* tmp;
+	unsigned long new_size;
 	
-	/* got a connection on r */
-	su_len=sizeof(su);
-	new_sock=accept(si->socket, &(su.s), &su_len);
-	if (new_sock==-1){
-		if ((errno==EAGAIN)||(errno==EWOULDBLOCK))
-			return 0;
-		LOG(L_ERR,  "WARNING: handle_new_connect: error while accepting"
-				" connection(%d): %s\n", errno, strerror(errno));
-		return -1;
-	}
-	if (tcp_connections_no>=tcp_max_connections){
-		LOG(L_ERR, "ERROR: maximum number of connections exceeded: %d/%d\n",
-					tcp_connections_no, tcp_max_connections);
-		close(new_sock);
-		return 1; /* success, because the accept was succesfull */
-	}
-	if (init_sock_opt(new_sock)<0){
-		LOG(L_ERR, "ERROR: handle_new_connect: init_sock_opt failed\n");
-		close(new_sock);
-		return 1; /* success, because the accept was succesfull */
-	}
+	if (q->crt>=q->end){
+		new_size=q->end-&q->data[0];
+		if (new_size< MAX_SEND_FD_QUEUE_SIZE/2){
+			new_size*=2;
+		}else new_size=MAX_SEND_FD_QUEUE_SIZE;
+		if (q->crt>=&q->data[new_size]){
+			LOG(L_ERR, "ERROR: send_fd_queue_add: queue full: %ld/%ld\n",
+					q->crt-&q->data[0]-1, new_size);
+			goto error;
+		}
+		LOG(L_CRIT, "INFO: send_fd_queue: queue full: %ld, extending to %ld\n",
+				q->end-&q->data[0], new_size);
+		tmp=pkg_realloc(q->data, new_size*sizeof(struct send_fd_info));
+		if (tmp==0){
+			LOG(L_ERR, "ERROR: send_fd_queue_add: out of memory\n");
+			goto error;
+		}
+		q->crt=(q->crt-&q->data[0])+tmp;
+		q->data=tmp;
+		q->end=&q->data[new_size];
+	}
+	q->crt->tcp_conn=t;
+	q->crt->unix_sock=unix_sock;
+	q->crt->retries=0;
+	q->crt++;
+	return 0;
+error:
+	return -1;
+}
+
+
+
+inline static void send_fd_queue_run(struct tcp_send_fd_q* q)
+{
+	struct send_fd_info* p;
+	struct send_fd_info* t;
 	
-	/* add socket to list */
-	tcpconn=tcpconn_new(new_sock, &su, si, si->proto, S_CONN_ACCEPT);
-	if (tcpconn){
-		tcpconn->refcnt++; /* safe, not yet available to the
-							  outside world */
-		tcpconn_add(tcpconn);
-		DBG("handle_new_connect: new connection: %p %d flags: %04x\n",
-			tcpconn, tcpconn->s, tcpconn->flags);
-		/* pass it to a child */
-		if(send2child(tcpconn)<0){
-			LOG(L_ERR,"ERROR: handle_new_connect: no children "
-					"available\n");
-			TCPCONN_LOCK;
-			tcpconn->refcnt--;
-			if (tcpconn->refcnt==0){
-				close(tcpconn->s);
-				_tcpconn_rm(tcpconn);
-			}else tcpconn->timeout=0; /* force expire */
-			TCPCONN_UNLOCK;
+	for (p=t=&q->data[0]; p<q->crt; p++){
+		if (send_fd(p->unix_sock, &(p->tcp_conn),
+					sizeof(struct tcp_connection*), p->tcp_conn->s)<=0){
+			if (/*FIXME: E_WOULD_BLOCK && */(p->retries<MAX_SEND_FD_RETRIES)){
+				/* leave in queue for a future try */
+				*t=*p;
+				t->retries++;
+				t++;
+			}else{
+				LOG(L_ERR, "ERROR: rund_send_fd_queue: send_fd failed"
+						   "on %d socket, %ld queue entry, retries %d \n",
+						   p->unix_sock, p-&q->data[0], p->retries);
+			}
 		}
-	}else{ /*tcpconn==0 */
-		LOG(L_ERR, "ERROR: handle_new_connect: tcpconn_new failed, "
-				"closing socket\n");
-		close(new_sock);
-		
 	}
-	return 1; /* accept() was succesfull */
+	q->crt=t;
 }
+#else
+#define send_fd_queue_run(q)
+#endif
 
 
 
@@ -1069,61 +1104,6 @@ static void tcpconn_destroy(struct tcp_connection* tcpconn)
 
 
 
-/* handles an io event on one of the watched tcp connections
- * 
- * params: tcpconn - pointer to the tcp_connection for which we have an io ev.
- *         fd_i    - index in the fd_array table (needed for delete)
- * returns:  handle_* return convention, but on success it always returns 0
- *           (because it's one-shot, after a succesfull execution the fd is
- *            removed from tcp_main's watch fd list and passed to a child =>
- *            tcp_main is not interested in further io events that might be
- *            queued for this fd)
- */
-inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, int fd_i)
-{
-	int fd;
-	
-	/*  is refcnt!=0 really necessary? 
-	 *  No, in fact it's a bug: I can have the following situation: a send only
-	 *   tcp connection used by n processes simultaneously => refcnt = n. In 
-	 *   the same time I can have a read event and this situation is perfectly
-	 *   valid. -- andrei
-	 */
-#if 0
-	if ((tcpconn->refcnt!=0)){
-		/* FIXME: might be valid for sigio_rt iff fd flags are not cleared
-		 *        (there is a short window in which it could generate a sig
-		 *         that would be catched by tcp_main) */
-		LOG(L_CRIT, "BUG: handle_tcpconn_ev: io event on referenced"
-					" tcpconn (%p), refcnt=%d, fd=%d\n",
-					tcpconn, tcpconn->refcnt, tcpconn->s);
-		return -1;
-	}
-#endif
-	/* pass it to child, so remove it from the io watch list */
-	DBG("handle_tcpconn_ev: data available on %p %d\n", tcpconn, tcpconn->s);
-	if (io_watch_del(&io_h, tcpconn->s, fd_i, 0)==-1) goto error;
-	tcpconn->flags|=F_CONN_REMOVED;
-	tcpconn_ref(tcpconn); /* refcnt ++ */
-	if (send2child(tcpconn)<0){
-		LOG(L_ERR,"ERROR: handle_tcpconn_ev: no children available\n");
-		TCPCONN_LOCK;
-		tcpconn->refcnt--;
-		if (tcpconn->refcnt==0){
-			fd=tcpconn->s;
-			_tcpconn_rm(tcpconn);
-			close(fd);
-		}else tcpconn->timeout=0; /* force expire*/
-		TCPCONN_UNLOCK;
-	}
-	return 0; /* we are not interested in possibly queued io events, 
-				 the fd was either passed to a child, or closed */
-error:
-	return -1;
-}
-
-
-
 /* handles io from a tcp child process
  * params: tcp_c - pointer in the tcp_children array, to the entry for
  *                 which an io event was detected 
@@ -1357,6 +1337,186 @@ error:
 
 
 
+/* sends a tcpconn + fd to a choosen child */
+inline static int send2child(struct tcp_connection* tcpconn)
+{
+	int i;
+	int min_busy;
+	int idx;
+	
+	min_busy=tcp_children[0].busy;
+	idx=0;
+	for (i=0; i<tcp_children_no; i++){
+		if (!tcp_children[i].busy){
+			idx=i;
+			min_busy=0;
+			break;
+		}else if (min_busy>tcp_children[i].busy){
+			min_busy=tcp_children[i].busy;
+			idx=i;
+		}
+	}
+	
+	tcp_children[idx].busy++;
+	tcp_children[idx].n_reqs++;
+	if (min_busy){
+		DBG("WARNING: send2child: no free tcp receiver, "
+				" connection passed to the least busy one (%d)\n",
+				min_busy);
+	}
+	DBG("send2child: to tcp child %d %d(%d), %p\n", idx, 
+					tcp_children[idx].proc_no,
+					tcp_children[idx].pid, tcpconn);
+	/* first make sure this child doesn't have pending request for
+	 * tcp_main (to avoid a possible deadlock: e.g. child wants to
+	 * send a release command, but the master fills its socket buffer
+	 * with new connection commands => deadlock) */
+	/* answer tcp_send requests first */
+	while(handle_ser_child(&pt[tcp_children[idx].proc_no], -1)>0);
+	/* process tcp readers requests */
+	while(handle_tcp_child(&tcp_children[idx], -1)>0);
+		
+#ifdef SEND_FD_QUEUE
+	if (send_fd_queue_add(&send2child_q, tcp_children[idx].unix_sock, 
+						tcpconn)!=0){
+		LOG(L_ERR, "ERROR: send2child: queue send op. failed\n");
+		return -1;
+	}
+#else
+	if (send_fd(tcp_children[idx].unix_sock, &tcpconn, sizeof(tcpconn),
+			tcpconn->s)<=0){
+		LOG(L_ERR, "ERROR: send2child: send_fd failed\n");
+		return -1;
+	}
+#endif
+	
+	return 0;
+}
+
+
+
+/* handles a new connection, called internally by tcp_main_loop/handle_io.
+ * params: si - pointer to one of the tcp socket_info structures on which
+ *              an io event was detected (connection attempt)
+ * returns:  handle_* return convention: -1 on error, 0 on EAGAIN (no more
+ *           io events queued), >0 on success. success/error refer only to
+ *           the accept.
+ */
+static inline int handle_new_connect(struct socket_info* si)
+{
+	union sockaddr_union su;
+	struct tcp_connection* tcpconn;
+	socklen_t su_len;
+	int new_sock;
+	
+	/* got a connection on r */
+	su_len=sizeof(su);
+	new_sock=accept(si->socket, &(su.s), &su_len);
+	if (new_sock==-1){
+		if ((errno==EAGAIN)||(errno==EWOULDBLOCK))
+			return 0;
+		LOG(L_ERR,  "WARNING: handle_new_connect: error while accepting"
+				" connection(%d): %s\n", errno, strerror(errno));
+		return -1;
+	}
+	if (tcp_connections_no>=tcp_max_connections){
+		LOG(L_ERR, "ERROR: maximum number of connections exceeded: %d/%d\n",
+					tcp_connections_no, tcp_max_connections);
+		close(new_sock);
+		return 1; /* success, because the accept was succesfull */
+	}
+	if (init_sock_opt(new_sock)<0){
+		LOG(L_ERR, "ERROR: handle_new_connect: init_sock_opt failed\n");
+		close(new_sock);
+		return 1; /* success, because the accept was succesfull */
+	}
+	
+	/* add socket to list */
+	tcpconn=tcpconn_new(new_sock, &su, si, si->proto, S_CONN_ACCEPT);
+	if (tcpconn){
+		tcpconn->refcnt++; /* safe, not yet available to the
+							  outside world */
+		tcpconn_add(tcpconn);
+		DBG("handle_new_connect: new connection: %p %d flags: %04x\n",
+			tcpconn, tcpconn->s, tcpconn->flags);
+		/* pass it to a child */
+		if(send2child(tcpconn)<0){
+			LOG(L_ERR,"ERROR: handle_new_connect: no children "
+					"available\n");
+			TCPCONN_LOCK;
+			tcpconn->refcnt--;
+			if (tcpconn->refcnt==0){
+				close(tcpconn->s);
+				_tcpconn_rm(tcpconn);
+			}else tcpconn->timeout=0; /* force expire */
+			TCPCONN_UNLOCK;
+		}
+	}else{ /*tcpconn==0 */
+		LOG(L_ERR, "ERROR: handle_new_connect: tcpconn_new failed, "
+				"closing socket\n");
+		close(new_sock);
+		
+	}
+	return 1; /* accept() was succesfull */
+}
+
+
+
+/* handles an io event on one of the watched tcp connections
+ * 
+ * params: tcpconn - pointer to the tcp_connection for which we have an io ev.
+ *         fd_i    - index in the fd_array table (needed for delete)
+ * returns:  handle_* return convention, but on success it always returns 0
+ *           (because it's one-shot, after a succesfull execution the fd is
+ *            removed from tcp_main's watch fd list and passed to a child =>
+ *            tcp_main is not interested in further io events that might be
+ *            queued for this fd)
+ */
+inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, int fd_i)
+{
+	int fd;
+	
+	/*  is refcnt!=0 really necessary? 
+	 *  No, in fact it's a bug: I can have the following situation: a send only
+	 *   tcp connection used by n processes simultaneously => refcnt = n. In 
+	 *   the same time I can have a read event and this situation is perfectly
+	 *   valid. -- andrei
+	 */
+#if 0
+	if ((tcpconn->refcnt!=0)){
+		/* FIXME: might be valid for sigio_rt iff fd flags are not cleared
+		 *        (there is a short window in which it could generate a sig
+		 *         that would be catched by tcp_main) */
+		LOG(L_CRIT, "BUG: handle_tcpconn_ev: io event on referenced"
+					" tcpconn (%p), refcnt=%d, fd=%d\n",
+					tcpconn, tcpconn->refcnt, tcpconn->s);
+		return -1;
+	}
+#endif
+	/* pass it to child, so remove it from the io watch list */
+	DBG("handle_tcpconn_ev: data available on %p %d\n", tcpconn, tcpconn->s);
+	if (io_watch_del(&io_h, tcpconn->s, fd_i, 0)==-1) goto error;
+	tcpconn->flags|=F_CONN_REMOVED;
+	tcpconn_ref(tcpconn); /* refcnt ++ */
+	if (send2child(tcpconn)<0){
+		LOG(L_ERR,"ERROR: handle_tcpconn_ev: no children available\n");
+		TCPCONN_LOCK;
+		tcpconn->refcnt--;
+		if (tcpconn->refcnt==0){
+			fd=tcpconn->s;
+			_tcpconn_rm(tcpconn);
+			close(fd);
+		}else tcpconn->timeout=0; /* force expire*/
+		TCPCONN_UNLOCK;
+	}
+	return 0; /* we are not interested in possibly queued io events, 
+				 the fd was either passed to a child, or closed */
+error:
+	return -1;
+}
+
+
+
 /* generic handle io routine, it will call the appropiate
  *  handle_xxx() based on the fd_map type
  *
@@ -1515,6 +1675,7 @@ void tcp_main_loop()
 			while(1){
 				/* wait and process IO */
 				io_wait_loop_poll(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0); 
+				send_fd_queue_run(&send2child_q); /* then new io */
 				/* remove old connections */
 				tcpconn_timeout(0);
 			}
@@ -1523,6 +1684,7 @@ void tcp_main_loop()
 		case POLL_SELECT:
 			while(1){
 				io_wait_loop_select(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0);
+				send_fd_queue_run(&send2child_q); /* then new io */
 				tcpconn_timeout(0);
 			}
 			break;
@@ -1531,6 +1693,7 @@ void tcp_main_loop()
 		case POLL_SIGIO_RT:
 			while(1){
 				io_wait_loop_sigio_rt(&io_h, TCP_MAIN_SELECT_TIMEOUT);
+				send_fd_queue_run(&send2child_q); /* then new io */
 				tcpconn_timeout(0);
 			}
 			break;
@@ -1539,12 +1702,14 @@ void tcp_main_loop()
 		case POLL_EPOLL_LT:
 			while(1){
 				io_wait_loop_epoll(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0);
+				send_fd_queue_run(&send2child_q); /* then new io */
 				tcpconn_timeout(0);
 			}
 			break;
 		case POLL_EPOLL_ET:
 			while(1){
 				io_wait_loop_epoll(&io_h, TCP_MAIN_SELECT_TIMEOUT, 1);
+				send_fd_queue_run(&send2child_q); /* then new io */
 				tcpconn_timeout(0);
 			}
 			break;
@@ -1553,6 +1718,7 @@ void tcp_main_loop()
 		case POLL_KQUEUE:
 			while(1){
 				io_wait_loop_kqueue(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0);
+				send_fd_queue_run(&send2child_q); /* then new io */
 				tcpconn_timeout(0);
 			}
 			break;
@@ -1561,6 +1727,7 @@ void tcp_main_loop()
 		case POLL_DEVPOLL:
 			while(1){
 				io_wait_loop_devpoll(&io_h, TCP_MAIN_SELECT_TIMEOUT, 0);
+				send_fd_queue_run(&send2child_q); /* then new io */
 				tcpconn_timeout(0);
 			}
 			break;
@@ -1600,6 +1767,9 @@ void destroy_tcp()
 			lock_dealloc((void*)tcpconn_lock);
 			tcpconn_lock=0;
 		}
+#ifdef SEND_FD_QUEUE
+		destroy_send_fd_queues();
+#endif
 }
 
 
@@ -1645,7 +1815,14 @@ int init_tcp()
 			TCP_ALIAS_HASH_SIZE * sizeof(struct tcp_conn_alias*));
 	memset((void*)tcpconn_id_hash, 0, 
 			TCP_ID_HASH_SIZE * sizeof(struct tcp_connection*));
+	/* init send fd queues */
 	
+#ifdef SEND_FD_QUEUE
+	if (init_send_fd_queues()<0){
+		LOG(L_CRIT, "ERROR: init_tcp: could not init send fd queues\n");
+		goto error;
+	}
+#endif
 	/* fix config variables */
 	/* they can have only positive values due the config parser so we can
 	 * ignore most of them */