Browse Source

- tcp receiver concerted to the new io_wait.h
- epoll: close() not always removing the fd from set bug workarround
- sigio_rt: reset O_ASYNC (sigio bug workarround)
- more tcp related fixes

Andrei Pelinescu-Onciul 20 years ago
parent
commit
3135b4bb72
4 changed files with 314 additions and 27 deletions
  1. 1 1
      Makefile.defs
  2. 74 16
      io_wait.h
  3. 3 2
      tcp_main.c
  4. 236 8
      tcp_read.c

+ 1 - 1
Makefile.defs

@@ -57,7 +57,7 @@ MAIN_NAME=ser
 VERSION = 0
 PATCHLEVEL = 10
 SUBLEVEL =   99
-EXTRAVERSION = -dev10
+EXTRAVERSION = -dev11-tcp
 
 RELEASE=$(VERSION).$(PATCHLEVEL).$(SUBLEVEL)$(EXTRAVERSION)
 OS = $(shell uname -s | sed -e s/SunOS/solaris/ | tr "[A-Z]" "[a-z]")

+ 74 - 16
io_wait.h

@@ -297,13 +297,19 @@ inline static int io_watch_add(	io_wait_h* h,
 	int n;
 	struct epoll_event ep_event;
 #endif
-#ifdef HAVE_SIGIO_RT
-	static char buf[65536];
-#endif
 #ifdef HAVE_DEVPOLL
 	struct pollfd pfd;
 #endif
+#if defined(HAVE_SIGIO_RT) || defined (HAVE_EPOLL)
+	int idx;
+	int check_io;
+	struct pollfd pf;
 	
+	check_io=0; /* set to 1 if we need to check for pre-existiing queued
+				   io/data on the fd */
+	idx=-1;
+#endif
+	e=0;
 	if (fd==-1){
 		LOG(L_CRIT, "BUG: io_watch_add: fd is -1!\n");
 		goto error;
@@ -344,6 +350,10 @@ inline static int io_watch_add(	io_wait_h* h,
 #ifdef HAVE_SIGIO_RT
 		case POLL_SIGIO_RT:
 			fd_array_setup;
+			/* re-set O_ASYNC might be needed, if not done from 
+			 * io_watch_del (or if somebody wants to add a fd which has
+			 * already O_ASYNC/F_SETSIG set on a dupplicate)
+			 */
 			/* set async & signal */
 			if (fcntl(fd, F_SETOWN, my_pid())==-1){
 				LOG(L_ERR, "ERROR: io_watch_add: fnctl: SETOWN"
@@ -362,9 +372,13 @@ inline static int io_watch_add(	io_wait_h* h,
 					fd,  h->signo, my_pid());
 #endif
 			/* empty socket receive buffer, if buffer is already full
-			 * (e.g. early media), no more space to put packets
-			 * => no more signals are ever generated -- andrei */
-			while(recv(fd, buf, sizeof(buf), 0)>=0);
+			 * no more space to put packets
+			 * => no more signals are ever generated
+			 * also when moving fds, the freshly moved fd might have
+			 *  already some bytes queued, we want to get them now
+			 *  and not later -- andrei */
+			idx=h->fd_no;
+			check_io=1;
 			break;
 #endif
 #ifdef HAVE_EPOLL
@@ -392,6 +406,8 @@ again2:
 					strerror(errno), errno);
 				goto error;
 			}
+			idx=-1;
+			check_io=1;
 			break;
 #endif
 #ifdef HAVE_KQUEUE
@@ -424,8 +440,23 @@ again_devpoll:
 	
 	h->fd_no++; /* "activate" changes, for epoll/kqueue/devpoll it
 				   has only informative value */
+#if defined(HAVE_SIGIO_RT) || defined (HAVE_EPOLL)
+	if (check_io){
+		/* handle possible pre-existing events */
+		pf.fd=fd;
+		pf.events=POLLIN;
+check_io_again:
+		while( ((n=poll(&pf, 1, 0))>0) && (handle_io(e, idx)>0));
+		if (n==-1){
+			if (errno==EINTR) goto check_io_again;
+			LOG(L_ERR, "ERROR: io_watch_add: check_io poll: %s [%d]\n",
+						strerror(errno), errno);
+		}
+	}
+#endif
 	return 0;
 error:
+	if (e) unhash_fd_map(e);
 	return -1;
 #undef fd_array_setup
 #undef set_fd_flags 
@@ -469,14 +500,17 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
 #ifdef HAVE_DEVPOLL
 	struct pollfd pfd;
 #endif
+#ifdef HAVE_SIGIO_RT
+	int fd_flags;
+#endif
 	
 	if ((fd<0) || (fd>=h->max_fd_no)){
 		LOG(L_CRIT, "BUG: io_watch_del: invalid fd %d, not in [0, %d) \n",
 						fd, h->fd_no);
 		goto error;
 	}
-	DBG("DBG: io_watch_del (%p, %d, %d) fd_no=%d called\n",
-			h, fd, idx, h->fd_no);
+	DBG("DBG: io_watch_del (%p, %d, %d, 0x%x) fd_no=%d called\n",
+			h, fd, idx, flags, h->fd_no);
 	e=get_fd_map(h, fd);
 	/* more sanity checks */
 	if (e==0){
@@ -509,25 +543,47 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
 #ifdef HAVE_SIGIO_RT
 		case POLL_SIGIO_RT:
 			fix_fd_array;
-			/* FIXME: re-set ASYNC? (not needed if the fd is/will be closed
-			 *        but might cause problems if the fd is "moved")
-			 *        update: probably not needed, the fd_map type!=0
-			 *        check should catch old queued signals or in-transit fd
-			 *        (so making another syscall to reset ASYNC is not 
-			 *         necessary)*/
+			/* the O_ASYNC flag must be reset all the time, the fd
+			 *  can be changed only if  O_ASYNC is reset (if not and
+			 *  the fd is a duplicate, you will get signals from the dup. fd
+			 *  and not from the original, even if the dup. fd wa closed
+			 *  and the signals re-set on the original) -- andrei
+			 */
+			/*if (!(flags & IO_FD_CLOSING)){*/
+				/* reset ASYNC */
+				fd_flags=fcntl(fd, F_GETFL); 
+				if (fd_flags==-1){ 
+					LOG(L_ERR, "ERROR: io_watch_del: fnctl: GETFL failed:" 
+							" %s [%d]\n", strerror(errno), errno); 
+					goto error; 
+				} 
+				if (fcntl(fd, F_SETFL, fd_flags&(~O_ASYNC))==-1){ 
+					LOG(L_ERR, "ERROR: io_watch_del: fnctl: SETFL" 
+								" failed: %s [%d]\n", strerror(errno), errno); 
+					goto error; 
+				} 
 			break;
 #endif
 #ifdef HAVE_EPOLL
 		case POLL_EPOLL_LT:
 		case POLL_EPOLL_ET:
+			/* epoll doesn't seem to automatically remove sockets,
+			 * if the socket is a dupplicate/moved and the original
+			 * is still open. The fd is removed from the epoll set
+			 * only when the original (and all the  copies?) is/are 
+			 * closed. This is probably a bug in epoll. --andrei */
+#ifdef EPOLL_NO_CLOSE_BUG
 			if (!(flags & IO_FD_CLOSING)){
+#endif
 				n=epoll_ctl(h->epfd, EPOLL_CTL_DEL, fd, &ep_event);
 				if (n==-1){
 					LOG(L_ERR, "ERROR: io_watch_del: removing fd from epoll "
 							"list failed: %s [%d]\n", strerror(errno), errno);
 					goto error;
 				}
+#ifdef EPOLL_NO_CLOSE_BUG
 			}
+#endif
 			break;
 #endif
 #ifdef HAVE_KQUEUE
@@ -659,8 +715,10 @@ again:
 		if (n==-1){
 			if (errno==EINTR) goto again; /* signal, ignore it */
 			else{
-				LOG(L_ERR, "ERROR:io_wait_loop_epoll: epoll_wait:"
-						" %s [%d]\n", strerror(errno), errno);
+				LOG(L_ERR, "ERROR:io_wait_loop_epoll: "
+						"epoll_wait(%d, %p, %d, %d): %s [%d]\n", 
+						h->epfd, h->ep_array, h->fd_no, t*1000,
+						strerror(errno), errno);
 				goto error;
 			}
 		}

+ 3 - 2
tcp_main.c

@@ -1174,10 +1174,11 @@ inline static int handle_tcp_child(struct tcp_child* tcp_c, int fd_i)
 				tcpconn_destroy(tcpconn);
 				break;
 			}
-			io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn);
 			/* update the timeout*/
 			tcpconn->timeout=get_ticks()+TCP_CON_TIMEOUT;
 			tcpconn_put(tcpconn);
+			/* must be after the de-ref*/
+			io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn);
 			DBG("handle_tcp_child: CONN_RELEASE  %p refcnt= %d\n", 
 											tcpconn, tcpconn->refcnt);
 			break;
@@ -1309,9 +1310,9 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
 			tcpconn->s=fd;
 			/* add tcpconn to the list*/
 			tcpconn_add(tcpconn);
-			io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn);
 			/* update the timeout*/
 			tcpconn->timeout=get_ticks()+TCP_CON_TIMEOUT;
+			io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn);
 			break;
 		default:
 			LOG(L_CRIT, "BUG: handle_ser_child: unknown cmd %d\n", cmd);

+ 236 - 8
tcp_read.c

@@ -33,6 +33,7 @@
  * 2003-07-01  tcp_read & friends take no a single tcp_connection 
  *              parameter & they set c->state to S_CONN_EOF on eof (andrei)
  * 2003-07-04  fixed tcp EOF handling (possible infinite loop) (andrei)
+ * 2005-07-05  migrated to the new io_wait code (andrei)
  */
 
 #ifdef USE_TCP
@@ -62,6 +63,17 @@
 #include "tls/tls_server.h"
 #endif
 
+#define HANDLE_IO_INLINE
+#include "io_wait.h"
+#include <fcntl.h> /* must be included after io_wait.h if SIGIO_RT is used */
+
+/* types used in io_wait* */
+enum fd_types { F_NONE, F_TCPMAIN, F_TCPCONN };
+
+/* list of tcp connections handled by this process */
+static struct tcp_connection* tcp_conn_lst=0;
+static io_wait_h io_w; /* io_wait handler*/
+static int tcpmain_sock=-1;
 
 
 /* reads next available bytes
@@ -385,7 +397,7 @@ skip:
 
 
 
-int tcp_read_req(struct tcp_connection* con)
+int tcp_read_req(struct tcp_connection* con, int* bytes_read)
 {
 	int bytes;
 	int resp;
@@ -394,6 +406,7 @@ int tcp_read_req(struct tcp_connection* con)
 	int s;
 	char c;
 		
+		bytes=-1;
 		resp=CONN_RELEASE;
 		s=con->fd;
 		req=&con->req;
@@ -520,7 +533,7 @@ again:
 		
 		
 	end_req:
-		
+		if (bytes_read) *bytes_read=bytes;
 		return resp;
 }
 
@@ -543,7 +556,8 @@ void release_tcpconn(struct tcp_connection* c, long state, int unix_sock)
 }
 
 
-
+#ifdef DEBUG_TCP_RECEIVE
+/* old code known to work, kept arround for debuging */
 void tcp_receive_loop(int unix_sock)
 {
 	struct tcp_connection* list; /* list with connections in use */
@@ -680,15 +694,229 @@ skip:
 		
 	}
 }
+#else /* DEBUG_TCP_RECEIVE */
+
+
+
+/* handle io routine, based on the fd_map type
+ * (it will be called from io_wait_loop* )
+ * params:  fm  - pointer to a fd hash entry
+ *          idx - index in the fd_array (or -1 if not known)
+ * return: -1 on error, or when we are not interested any more on reads
+ *            from this fd (e.g.: we are closing it )
+ *          0 on EAGAIN or when by some other way it is known that no more 
+ *            io events are queued on the fd (the receive buffer is empty).
+ *            Usefull to detect when there are no more io events queued for
+ *            sigio_rt, epoll_et, kqueue.
+ *         >0 on successfull read from the fd (when there might be more io
+ *            queued -- the receive buffer might still be non-empty)
+ */
+inline static int handle_io(struct fd_map* fm, int idx)
+{	
+	int ret;
+	int n;
+	struct tcp_connection* con;
+	int s;
+	long resp;
+	
+	switch(fm->type){
+		case F_TCPMAIN:
+again:
+			ret=n=receive_fd(fm->fd, &con, sizeof(con), &s, 0);
+			DBG("received n=%d con=%p, fd=%d\n", n, con, s);
+			if (n<0){
+				if (errno == EWOULDBLOCK || errno == EAGAIN){
+					ret=0;
+					break;
+				}else if (errno == EINTR) goto again;
+				else{
+					LOG(L_CRIT,"BUG: tcp_receive: handle_io: read_fd: %s \n",
+							strerror(errno));
+						abort(); /* big error*/
+				}
+			}
+			if (n==0){
+				LOG(L_ERR, "WARNING: tcp_receive: handle_io: 0 bytes read\n");
+				break;
+			}
+			if (con==0){
+					LOG(L_CRIT, "BUG: tcp_receive: handle_io null pointer\n");
+					break;
+			}
+			con->fd=s;
+			if (s==-1) {
+				LOG(L_ERR, "ERROR: tcp_receive: handle_io: read_fd:"
+									"no fd read\n");
+				goto con_error;
+			}
+			if (con==tcp_conn_lst){
+				LOG(L_CRIT, "BUG: tcp_receive: handle_io: duplicate"
+							" connection received: %p, id %d, fd %d, refcnt %d"
+							" state %d (n=%d)\n", con, con->id, con->fd,
+							con->refcnt, con->state, n);
+				release_tcpconn(con, CONN_ERROR, tcpmain_sock);
+				break; /* try to recover */
+			}
+			/* must be before io_watch_add, io_watch_add might catch some
+			 * already existing events => might call handle_io and
+			 * handle_io might decide to del. the new connection =>
+			 * must be in the list */
+			tcpconn_listadd(tcp_conn_lst, con, c_next, c_prev);
+			con->timeout=get_ticks()+TCP_CHILD_TIMEOUT;
+			if (io_watch_add(&io_w, s, F_TCPCONN, con)<0){
+				LOG(L_CRIT, "ERROR: tcp_receive: handle_io: failed to add"
+						" new socket to the fd list\n");
+				tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
+				goto con_error;
+			}
+			break;
+		case F_TCPCONN:
+			con=(struct tcp_connection*)fm->data;
+			resp=tcp_read_req(con, &ret);
+			if (resp<0){
+				ret=-1; /* some error occured */
+				io_watch_del(&io_w, con->fd, idx, IO_FD_CLOSING);
+				tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
+				con->state=S_CONN_BAD;
+				release_tcpconn(con, resp, tcpmain_sock);
+			}else{
+				/* update timeout */
+				con->timeout=get_ticks()+TCP_CHILD_TIMEOUT;
+			}
+			break;
+		case F_NONE:
+			LOG(L_CRIT, "BUG: handle_io: empty fd map %p (%d): "
+						"{%d, %d, %p}\n", fm, (int)(fm-io_w.fd_hash),
+						fm->fd, fm->type, fm->data);
+			goto error;
+		default:
+			LOG(L_CRIT, "BUG: handle_io: uknown fd type %d\n", fm->type); 
+			goto error;
+	}
+	
+	return ret;
+con_error:
+	con->state=S_CONN_BAD;
+	release_tcpconn(con, CONN_ERROR, fm->fd);
+	return ret;
+error:
+	return -1;
+}
 
 
-#if 0
-int main(int argv, char** argc )
+
+/* releases expired connections and cleans up bad ones (state<0) */
+static inline void tcp_receive_timeout()
 {
-	printf("starting tests\n");
-	tcp_receive_loop();
+	struct tcp_connection* con;
+	struct tcp_connection* next;
+	int ticks;
+	
+	ticks=get_ticks();
+	for (con=tcp_conn_lst; con; con=next){
+		next=con->c_next; /* safe for removing */
+		if (con->state<0){   /* kill bad connections */ 
+			/* S_CONN_BAD or S_CONN_ERROR, remove it */
+			/* fd will be closed in release_tcpconn */
+			io_watch_del(&io_w, con->fd, -1, IO_FD_CLOSING);
+			tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
+			con->state=S_CONN_BAD;
+			release_tcpconn(con, CONN_ERROR, tcpmain_sock);
+			continue;
+		}
+		if (con->timeout<=ticks){
+			/* expired, return to "tcp main" */
+			DBG("tcp_receive_loop: %p expired (%d, %d)\n",
+					con, con->timeout, ticks);
+			/* fd will be closed in release_tcpconn */
+			io_watch_del(&io_w, con->fd, -1, IO_FD_CLOSING);
+			tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
+			release_tcpconn(con, CONN_RELEASE, tcpmain_sock);
+		}
+	}
 }
 
-#endif
 
+
+void tcp_receive_loop(int unix_sock)
+{
+	
+	/* init */
+	tcpmain_sock=unix_sock; /* init com. socket */
+	if (init_io_wait(&io_w, tcp_max_fd_no, tcp_poll_method)<0)
+		goto error;
+	/* add the unix socket */
+	if (io_watch_add(&io_w, tcpmain_sock, F_TCPMAIN, 0)<0){
+		LOG(L_CRIT, "ERROR: tcp_receive_loop: init: failed to add socket "
+							" to the fd list\n");
+		goto error;
+	}
+	/* main loop */
+	switch(io_w.poll_method){
+		case POLL_POLL:
+				while(1){
+					io_wait_loop_poll(&io_w, TCP_CHILD_SELECT_TIMEOUT, 0);
+					tcp_receive_timeout();
+				}
+				break;
+#ifdef HAVE_SELECT
+		case POLL_SELECT:
+			while(1){
+				io_wait_loop_select(&io_w, TCP_CHILD_SELECT_TIMEOUT, 0);
+				tcp_receive_timeout();
+			}
+			break;
+#endif
+#ifdef HAVE_SIGIO_RT
+		case POLL_SIGIO_RT:
+			while(1){
+				io_wait_loop_sigio_rt(&io_w, TCP_CHILD_SELECT_TIMEOUT);
+				tcp_receive_timeout();
+			}
+			break;
+#endif
+#ifdef HAVE_EPOLL
+		case POLL_EPOLL_LT:
+			while(1){
+				io_wait_loop_epoll(&io_w, TCP_CHILD_SELECT_TIMEOUT, 0);
+				tcp_receive_timeout();
+			}
+			break;
+		case POLL_EPOLL_ET:
+			while(1){
+				io_wait_loop_epoll(&io_w, TCP_CHILD_SELECT_TIMEOUT, 1);
+				tcp_receive_timeout();
+			}
+			break;
 #endif
+#ifdef HAVE_KQUEUE
+		case POLL_KQUEUE:
+			while(1){
+				io_wait_loop_kqueue(&io_w, TCP_CHILD_SELECT_TIMEOUT, 0);
+				tcp_receive_timeout();
+			}
+			break;
+#endif
+#ifdef HAVE_DEVPOLL
+		case POLL_DEVPOLL:
+			while(1){
+				io_wait_loop_devpoll(&io_w, TCP_CHILD_SELECT_TIMEOUT, 0);
+				tcp_receive_timeout();
+			}
+			break;
+#endif
+		default:
+			LOG(L_CRIT, "BUG: tcp_receive_loop: no support for poll method "
+					" %s (%d)\n", 
+					poll_method_name(io_w.poll_method), io_w.poll_method);
+			goto error;
+	}
+error:
+	destroy_io_wait(&io_w);
+	LOG(L_CRIT, "ERROR: tcp_receive_loop: exiting...");
+	exit(-1);
+}
+
+#endif /* DEBUG_TCP_RECEIVE */
+
+#endif /* USE_TCP */