Browse Source

- io_wait.h: when calling handle_io() in a loop (e.g. io_watch_add() &
SIGIO_RT or EPOLL_ET or io_wait_loop_* and repeat mode) always check & stop if
the fd was removed inside the handle_io() call

- tcp_main.c: always add the connection & clear the coresponding flags before
io_watch_add-ing its fd, to avoid the possibility of the handle_* being
called without fully init. parameters

Andrei Pelinescu-Onciul 18 năm trước cách đây
mục cha
commit
5702d44833
2 tập tin đã thay đổi với 30 bổ sung17 xóa
  1. 24 13
      io_wait.h
  2. 6 4
      tcp_main.c

+ 24 - 13
io_wait.h

@@ -49,6 +49,8 @@
  *  2005-06-26  added kqueue (andrei)
  *  2005-06-26  added kqueue (andrei)
  *  2005-07-01  added /dev/poll (andrei)
  *  2005-07-01  added /dev/poll (andrei)
  *  2006-05-30  sigio 64 bit workarround enabled for kernels < 2.6.5 (andrei)
  *  2006-05-30  sigio 64 bit workarround enabled for kernels < 2.6.5 (andrei)
+ *  2007-11-22  when handle_io() is called in a loop check & stop if the fd was
+ *               removed inside handle_io() (andrei)
  */
  */
 
 
 
 
@@ -253,6 +255,9 @@ again:
 
 
 /* generic io_watch_add function
 /* generic io_watch_add function
  * returns 0 on success, -1 on error
  * returns 0 on success, -1 on error
+ * WARNING: handle_io() can be called immediately (from io_watch_add()) so
+ *  make sure that any dependent init. (e.g. data stuff) is made before
+ *  calling io_watch_add
  *
  *
  * this version should be faster than pointers to poll_method specific
  * this version should be faster than pointers to poll_method specific
  * functions (it avoids functions calls, the overhead being only an extra
  * functions (it avoids functions calls, the overhead being only an extra
@@ -442,8 +447,8 @@ again_devpoll:
 		pf.fd=fd;
 		pf.fd=fd;
 		pf.events=POLLIN;
 		pf.events=POLLIN;
 check_io_again:
 check_io_again:
-		while( ((n=poll(&pf, 1, 0))>0) && (handle_io(e, idx)>0));
-		if (n==-1){
+		while(e->type && ((n=poll(&pf, 1, 0))>0) && (handle_io(e, idx)>0));
+		if (e->type && (n==-1)){
 			if (errno==EINTR) goto check_io_again;
 			if (errno==EINTR) goto check_io_again;
 			LOG(L_ERR, "ERROR: io_watch_add: check_io poll: %s [%d]\n",
 			LOG(L_ERR, "ERROR: io_watch_add: check_io poll: %s [%d]\n",
 						strerror(errno), errno);
 						strerror(errno), errno);
@@ -633,6 +638,7 @@ inline static int io_wait_loop_poll(io_wait_h* h, int t, int repeat)
 {
 {
 	int n, r;
 	int n, r;
 	int ret;
 	int ret;
+	struct fd_map* fm;
 again:
 again:
 		ret=n=poll(h->fd_array, h->fd_no, t*1000);
 		ret=n=poll(h->fd_array, h->fd_no, t*1000);
 		if (n==-1){
 		if (n==-1){
@@ -656,8 +662,8 @@ again:
 					h->fd_array[r].events=0; /* clear the events */
 					h->fd_array[r].events=0; /* clear the events */
 					continue;
 					continue;
 				}
 				}
-				while((handle_io(get_fd_map(h, h->fd_array[r].fd), r) > 0)
-						 && repeat);
+				fm=get_fd_map(h, h->fd_array[r].fd);
+				while(fm->type && (handle_io(fm, r) > 0) && repeat);
 			}
 			}
 		}
 		}
 error:
 error:
@@ -674,6 +680,7 @@ inline static int io_wait_loop_select(io_wait_h* h, int t, int repeat)
 	int n, ret;
 	int n, ret;
 	struct timeval timeout;
 	struct timeval timeout;
 	int r;
 	int r;
+	struct fd_map* fm;
 	
 	
 again:
 again:
 		sel_set=h->master_set;
 		sel_set=h->master_set;
@@ -690,8 +697,8 @@ again:
 		/* use poll fd array */
 		/* use poll fd array */
 		for(r=0; (r<h->max_fd_no) && n; r++){
 		for(r=0; (r<h->max_fd_no) && n; r++){
 			if (FD_ISSET(h->fd_array[r].fd, &sel_set)){
 			if (FD_ISSET(h->fd_array[r].fd, &sel_set)){
-				while((handle_io(get_fd_map(h, h->fd_array[r].fd), r)>0)
-						&& repeat);
+				fm=get_fd_map(h, h->fd_array[r].fd);
+				while(fm->type && (handle_io(fm, r)>0) && repeat);
 				n--;
 				n--;
 			}
 			}
 		};
 		};
@@ -705,6 +712,7 @@ again:
 inline static int io_wait_loop_epoll(io_wait_h* h, int t, int repeat)
 inline static int io_wait_loop_epoll(io_wait_h* h, int t, int repeat)
 {
 {
 	int n, r;
 	int n, r;
+	struct fd_map* fm;
 	
 	
 again:
 again:
 		n=epoll_wait(h->epfd, h->ep_array, h->fd_no, t*1000);
 		n=epoll_wait(h->epfd, h->ep_array, h->fd_no, t*1000);
@@ -728,8 +736,8 @@ again:
 #endif
 #endif
 		for (r=0; r<n; r++){
 		for (r=0; r<n; r++){
 			if (h->ep_array[r].events & (EPOLLIN|EPOLLERR|EPOLLHUP)){
 			if (h->ep_array[r].events & (EPOLLIN|EPOLLERR|EPOLLHUP)){
-				while((handle_io((struct fd_map*)h->ep_array[r].data.ptr,-1)>0)
-					&& repeat);
+				fm=(struct fd_map*)h->ep_array[r].data.ptr;
+				while(fm->type && (handle_io(fm,-1)>0) && repeat);
 			}else{
 			}else{
 				LOG(L_ERR, "ERROR:io_wait_loop_epoll: unexpected event %x"
 				LOG(L_ERR, "ERROR:io_wait_loop_epoll: unexpected event %x"
 							" on %d/%d, data=%p\n", h->ep_array[r].events,
 							" on %d/%d, data=%p\n", h->ep_array[r].events,
@@ -748,6 +756,7 @@ inline static int io_wait_loop_kqueue(io_wait_h* h, int t, int repeat)
 {
 {
 	int n, r;
 	int n, r;
 	struct timespec tspec;
 	struct timespec tspec;
+	struct fd_map* fm;
 	
 	
 	tspec.tv_sec=t;
 	tspec.tv_sec=t;
 	tspec.tv_nsec=0;
 	tspec.tv_nsec=0;
@@ -778,9 +787,10 @@ again:
 							"fd %d: %s [%ld]\n", h->kq_array[r].ident,
 							"fd %d: %s [%ld]\n", h->kq_array[r].ident,
 							strerror(h->kq_array[r].data),
 							strerror(h->kq_array[r].data),
 							(long)h->kq_array[r].data);
 							(long)h->kq_array[r].data);
-			}else /* READ/EOF */
-				while((handle_io((struct fd_map*)h->kq_array[r].udata, -1)>0)
-						&& repeat);
+			}else{ /* READ/EOF */
+				fm=(struct fd_map*)h->kq_array[r].udata;
+				while(fm->type && (handle_io(fm, -1)>0) && repeat);
+			}
 		}
 		}
 error:
 error:
 	return n;
 	return n;
@@ -913,6 +923,7 @@ inline static int io_wait_loop_devpoll(io_wait_h* h, int t, int repeat)
 	int n, r;
 	int n, r;
 	int ret;
 	int ret;
 	struct dvpoll dpoll;
 	struct dvpoll dpoll;
+	struct fd_map* fm;
 
 
 		dpoll.dp_timeout=t*1000;
 		dpoll.dp_timeout=t*1000;
 		dpoll.dp_nfds=h->fd_no;
 		dpoll.dp_nfds=h->fd_no;
@@ -934,8 +945,8 @@ again:
 							h->fd_array[r].fd, h->fd_array[r].revents);
 							h->fd_array[r].fd, h->fd_array[r].revents);
 			}
 			}
 			/* POLLIN|POLLHUP just go through */
 			/* POLLIN|POLLHUP just go through */
-			while((handle_io(get_fd_map(h, h->fd_array[r].fd), r) > 0) &&
-						repeat);
+			fm=get_fd_map(h, h->fd_array[r].fd);
+			while(fm->type && (handle_io(fm, r) > 0) && repeat);
 		}
 		}
 error:
 error:
 	return ret;
 	return ret;

+ 6 - 4
tcp_main.c

@@ -81,6 +81,8 @@
  *  2007-08-27   split init_sock_opt into a lightweight init_sock_opt_accept() 
  *  2007-08-27   split init_sock_opt into a lightweight init_sock_opt_accept() 
  *               used when accepting connections and init_sock_opt used for 
  *               used when accepting connections and init_sock_opt used for 
  *               connect/ new sockets (andrei)
  *               connect/ new sockets (andrei)
+ *  2007-11-22  always add the connection & clear the coresponding flags before
+ *               io_watch_add-ing its fd - it's safer this way (andrei)
  */
  */
 
 
 
 
@@ -1449,8 +1451,8 @@ inline static int handle_tcp_child(struct tcp_child* tcp_c, int fd_i)
 			tcpconn->timeout=get_ticks_raw()+tcp_con_lifetime;
 			tcpconn->timeout=get_ticks_raw()+tcp_con_lifetime;
 			tcpconn_put(tcpconn);
 			tcpconn_put(tcpconn);
 			/* must be after the de-ref*/
 			/* must be after the de-ref*/
-			io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn);
 			tcpconn->flags&=~F_CONN_REMOVED;
 			tcpconn->flags&=~F_CONN_REMOVED;
+			io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn);
 			DBG("handle_tcp_child: CONN_RELEASE  %p refcnt= %d\n", 
 			DBG("handle_tcp_child: CONN_RELEASE  %p refcnt= %d\n", 
 							tcpconn, atomic_get(&tcpconn->refcnt));
 							tcpconn, atomic_get(&tcpconn->refcnt));
 			break;
 			break;
@@ -1587,8 +1589,8 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
 			tcpconn_add(tcpconn);
 			tcpconn_add(tcpconn);
 			/* update the timeout*/
 			/* update the timeout*/
 			tcpconn->timeout=get_ticks_raw()+tcp_con_lifetime;
 			tcpconn->timeout=get_ticks_raw()+tcp_con_lifetime;
-			io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn);
 			tcpconn->flags&=~F_CONN_REMOVED;
 			tcpconn->flags&=~F_CONN_REMOVED;
+			io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn);
 			break;
 			break;
 		default:
 		default:
 			LOG(L_CRIT, "BUG: handle_ser_child: unknown cmd %d\n", cmd);
 			LOG(L_CRIT, "BUG: handle_ser_child: unknown cmd %d\n", cmd);
@@ -1735,9 +1737,9 @@ static inline int handle_new_connect(struct socket_info* si)
 	tcpconn=tcpconn_new(new_sock, &su, dst_su, si, si->proto, S_CONN_ACCEPT);
 	tcpconn=tcpconn_new(new_sock, &su, dst_su, si, si->proto, S_CONN_ACCEPT);
 	if (tcpconn){
 	if (tcpconn){
 #ifdef TCP_PASS_NEW_CONNECTION_ON_DATA
 #ifdef TCP_PASS_NEW_CONNECTION_ON_DATA
-		io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn);
-		tcpconn->flags&=~F_CONN_REMOVED;
 		tcpconn_add(tcpconn);
 		tcpconn_add(tcpconn);
+		tcpconn->flags&=~F_CONN_REMOVED;
+		io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn);
 #else
 #else
 		atomic_set(&tcpconn->refcnt, 1); /* safe, not yet available to the
 		atomic_set(&tcpconn->refcnt, 1); /* safe, not yet available to the
 											outside world */
 											outside world */