Browse Source

io_wait: kqueue: use the entire array during too many errors fallback

Minor fix/optimization: if there are too many errors in the
changelist and the kevent() call has to be retried, use the entire
array (don't rely on the current watched fd number which will be
smaller then the array real size, since commit 996826).

(only kqueue using systems are affected by this fix: *bsd and
 darwin)
 (cherry picked from commit a9cdfc2938ca73d6ba40f5896c6a8930c2e73f85)
Andrei Pelinescu-Onciul 15 years ago
parent
commit
aed168981e
1 changed files with 53 additions and 52 deletions
  1. 53 52
      io_wait.h

+ 53 - 52
io_wait.h

@@ -1,6 +1,6 @@
-/* 
+/*
  * $Id$
- * 
+ *
  * Copyright (C) 2005 iptelorg GmbH
  *
  * Permission to use, copy, modify, and distribute this software for any
@@ -31,9 +31,9 @@
  *                 this assumption)
  *     local_malloc (defaults to pkg_malloc)
  *     local_free   (defaults to pkg_free)
- *  
+ *
  */
-/* 
+/*
  * History:
  * --------
  *  2005-06-13  created by andrei
@@ -79,8 +79,8 @@
 #endif
 #ifdef HAVE_SELECT
 /* needed on openbsd for select*/
-#include <sys/time.h> 
-#include <sys/types.h> 
+#include <sys/time.h>
+#include <sys/types.h>
 #include <unistd.h>
 /* needed according to POSIX for select*/
 #include <sys/select.h>
@@ -109,7 +109,7 @@ extern int _os_ver; /* os version number, needed to select bugs workarrounds */
 
 #if 0
 enum fd_types; /* this should be defined from the including file,
-				  see tcp_main.c for an example, 
+				  see tcp_main.c for an example,
 				  0 has a special meaning: not used/empty*/
 #endif
 
@@ -147,7 +147,7 @@ struct io_wait_handler{
 	enum poll_types poll_method;
 	int flags;
 	struct fd_map* fd_hash;
-	int fd_no; /*  current index used in fd_array and the passed size for 
+	int fd_no; /*  current index used in fd_array and the passed size for
 				   ep_array (for kq_array at least
 				    max(twice the size, kq_changes_size) should be
 				   be passed). */
@@ -222,7 +222,7 @@ static inline struct fd_map* hash_fd_map(	io_wait_h* h,
  *          events - combinations of POLLIN, POLLOUT, POLLERR & POLLHUP
  *          idx    - index in the fd_array (or -1 if not known)
  * return: -1 on error
- *          0 on EAGAIN or when by some other way it is known that no more 
+ *          0 on EAGAIN or when by some other way it is known that no more
  *            io events are queued on the fd (the receive buffer is empty).
  *            Usefull to detect when there are no more io events queued for
  *            sigio_rt, epoll_et, kqueue.
@@ -246,7 +246,7 @@ int handle_io(struct fd_map* fm, short events, int idx);
  *       and EVFILT_WRITE, EV_ADD for the same fd).
  * returns: -1 on error, 0 on success
  */
-static inline int kq_ev_change(io_wait_h* h, int fd, int filter, int flag, 
+static inline int kq_ev_change(io_wait_h* h, int fd, int filter, int flag,
 								void* data)
 {
 	int n;
@@ -424,7 +424,7 @@ inline static int io_watch_add(	io_wait_h* h,
 #ifdef HAVE_SIGIO_RT
 		case POLL_SIGIO_RT:
 			fd_array_setup(events);
-			/* re-set O_ASYNC might be needed, if not done from 
+			/* re-set O_ASYNC might be needed, if not done from
 			 * io_watch_del (or if somebody wants to add a fd which has
 			 * already O_ASYNC/F_SETSIG set on a duplicate)
 			 */
@@ -545,7 +545,7 @@ again_devpoll:
 		pf.events=events;
 check_io_again:
 		n=0;
-		while(e->type && ((n=poll(&pf, 1, 0))>0) && 
+		while(e->type && ((n=poll(&pf, 1, 0))>0) &&
 				(handle_io(e, pf.revents, idx)>0) &&
 				(pf.revents & (e->events|POLLERR|POLLHUP)));
 		if (unlikely(e->type && (n==-1))){
@@ -560,20 +560,20 @@ error:
 	if (e) unhash_fd_map(e);
 	return -1;
 #undef fd_array_setup
-#undef set_fd_flags 
+#undef set_fd_flags
 }
 
 
 
 #define IO_FD_CLOSING 16
-/* parameters:    h - handler 
+/* parameters:    h - handler
  *               fd - file descriptor
  *            index - index in the fd_array if known, -1 if not
  *                    (if index==-1 fd_array will be searched for the
- *                     corresponding fd* entry -- slower but unavoidable in 
+ *                     corresponding fd* entry -- slower but unavoidable in
  *                     some cases). index is not used (no fd_array) for epoll,
  *                     /dev/poll and kqueue
- *            flags - optimization flags, e.g. IO_FD_CLOSING, the fd was 
+ *            flags - optimization flags, e.g. IO_FD_CLOSING, the fd was
  *                    or will shortly be closed, in some cases we can avoid
  *                    extra remove operations (e.g.: epoll, kqueue, sigio)
  * returns 0 if ok, -1 on error */
@@ -642,7 +642,7 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
 				FD_CLR(fd, &h->master_wset);
 			if (unlikely(h->max_fd_select && (h->max_fd_select==fd)))
 				/* we don't know the prev. max, so we just decrement it */
-				h->max_fd_select--; 
+				h->max_fd_select--;
 			fix_fd_array;
 			break;
 #endif
@@ -656,17 +656,17 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
 			 */
 			/*if (!(flags & IO_FD_CLOSING)){*/
 				/* reset ASYNC */
-				fd_flags=fcntl(fd, F_GETFL); 
-				if (unlikely(fd_flags==-1)){ 
-					LOG(L_ERR, "ERROR: io_watch_del: fnctl: GETFL failed:" 
-							" %s [%d]\n", strerror(errno), errno); 
-					goto error; 
-				} 
-				if (unlikely(fcntl(fd, F_SETFL, fd_flags&(~O_ASYNC))==-1)){ 
-					LOG(L_ERR, "ERROR: io_watch_del: fnctl: SETFL" 
-								" failed: %s [%d]\n", strerror(errno), errno); 
-					goto error; 
-				} 
+				fd_flags=fcntl(fd, F_GETFL);
+				if (unlikely(fd_flags==-1)){
+					LOG(L_ERR, "ERROR: io_watch_del: fnctl: GETFL failed:"
+							" %s [%d]\n", strerror(errno), errno);
+					goto error;
+				}
+				if (unlikely(fcntl(fd, F_SETFL, fd_flags&(~O_ASYNC))==-1)){
+					LOG(L_ERR, "ERROR: io_watch_del: fnctl: SETFL"
+								" failed: %s [%d]\n", strerror(errno), errno);
+					goto error;
+				}
 			fix_fd_array; /* only on success */
 			break;
 #endif
@@ -676,7 +676,7 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
 			/* epoll doesn't seem to automatically remove sockets,
 			 * if the socket is a duplicate/moved and the original
 			 * is still open. The fd is removed from the epoll set
-			 * only when the original (and all the  copies?) is/are 
+			 * only when the original (and all the  copies?) is/are
 			 * closed. This is probably a bug in epoll. --andrei */
 #ifdef EPOLL_NO_CLOSE_BUG
 			if (!(flags & IO_FD_CLOSING)){
@@ -726,7 +726,7 @@ again_devpoll:
 				if (write(h->dpoll_fd, &pfd, sizeof(pfd))==-1){
 					if (errno==EINTR) goto again_devpoll;
 					LOG(L_ERR, "ERROR: io_watch_del: removing fd from "
-								"/dev/poll failed: %s [%d]\n", 
+								"/dev/poll failed: %s [%d]\n",
 								strerror(errno), errno);
 					goto error;
 				}
@@ -734,7 +734,7 @@ again_devpoll:
 #endif
 		default:
 			LOG(L_CRIT, "BUG: io_watch_del: no support for poll method "
-					" %s (%d)\n", poll_method_str[h->poll_method], 
+					" %s (%d)\n", poll_method_str[h->poll_method],
 					h->poll_method);
 			goto error;
 	}
@@ -748,12 +748,12 @@ error:
 
 
 
-/* parameters:    h - handler 
+/* parameters:    h - handler
  *               fd - file descriptor
  *           events - new events to watch for
  *              idx - index in the fd_array if known, -1 if not
  *                    (if index==-1 fd_array will be searched for the
- *                     corresponding fd* entry -- slower but unavoidable in 
+ *                     corresponding fd* entry -- slower but unavoidable in
  *                     some cases). index is not used (no fd_array) for epoll,
  *                     /dev/poll and kqueue
  * returns 0 if ok, -1 on error */
@@ -911,7 +911,7 @@ again_devpoll1:
 				if (unlikely(write(h->dpoll_fd, &pfd, sizeof(pfd))==-1)){
 					if (errno==EINTR) goto again_devpoll1;
 					LOG(L_ERR, "ERROR: io_watch_chg: removing fd from "
-								"/dev/poll failed: %s [%d]\n", 
+								"/dev/poll failed: %s [%d]\n",
 								strerror(errno), errno);
 					goto error;
 				}
@@ -921,7 +921,7 @@ again_devpoll2:
 				if (unlikely(write(h->dpoll_fd, &pfd, sizeof(pfd))==-1)){
 					if (errno==EINTR) goto again_devpoll2;
 					LOG(L_ERR, "ERROR: io_watch_chg: re-adding fd to "
-								"/dev/poll failed: %s [%d]\n", 
+								"/dev/poll failed: %s [%d]\n",
 								strerror(errno), errno);
 					/* error re-adding the fd => mark it as removed/unhash */
 					unhash_fd_map(e);
@@ -931,7 +931,7 @@ again_devpoll2:
 #endif
 		default:
 			LOG(L_CRIT, "BUG: io_watch_chg: no support for poll method "
-					" %s (%d)\n", poll_method_str[h->poll_method], 
+					" %s (%d)\n", poll_method_str[h->poll_method],
 					h->poll_method);
 			goto error;
 	}
@@ -944,7 +944,7 @@ error:
 
 
 
-/* io_wait_loop_x style function 
+/* io_wait_loop_x style function.
  * wait for io using poll()
  * params: h      - io_wait handle
  *         t      - timeout in s
@@ -985,11 +985,11 @@ again:
 				/* repeat handle_io if repeat, fd still watched (not deleted
 				 *  inside handle_io), handle_io returns that there's still
 				 *  IO and the fd is still watched for the triggering event */
-				while(fm->type && 
+				while(fm->type &&
 						(handle_io(fm, h->fd_array[r].revents, r) > 0) &&
 						repeat && ((fm->events|POLLERR|POLLHUP) &
 													h->fd_array[r].revents));
-				r=h->crt_fd_array_idx; /* can change due to io_watch_del(fd) 
+				r=h->crt_fd_array_idx; /* can change due to io_watch_del(fd)
 										  array shifting */
 			}
 		}
@@ -1034,9 +1034,9 @@ again:
 			if (unlikely(revents)){
 				h->crt_fd_array_idx=r;
 				fm=get_fd_map(h, h->fd_array[r].fd);
-				while(fm->type && (fm->events & revents) && 
+				while(fm->type && (fm->events & revents) &&
 						(handle_io(fm, revents, r)>0) && repeat);
-				r=h->crt_fd_array_idx; /* can change due to io_watch_del(fd) 
+				r=h->crt_fd_array_idx; /* can change due to io_watch_del(fd)
 										  array shifting */
 				n--;
 			}
@@ -1060,7 +1060,7 @@ again:
 			if (errno==EINTR) goto again; /* signal, ignore it */
 			else{
 				LOG(L_ERR, "ERROR:io_wait_loop_epoll: "
-						"epoll_wait(%d, %p, %d, %d): %s [%d]\n", 
+						"epoll_wait(%d, %p, %d, %d): %s [%d]\n",
 						h->epfd, h->ep_array, h->fd_no, t*1000,
 						strerror(errno), errno);
 				goto error;
@@ -1086,7 +1086,7 @@ again:
 					;
 			if (likely(revents)){
 				fm=(struct fd_map*)h->ep_array[r].data.ptr;
-				while(fm->type && ((fm->events|POLLERR|POLLHUP) & revents) && 
+				while(fm->type && ((fm->events|POLLERR|POLLHUP) & revents) &&
 						(handle_io(fm, revents, -1)>0) && repeat);
 			}else{
 				LOG(L_ERR, "ERROR:io_wait_loop_epoll: unexpected event %x"
@@ -1140,7 +1140,8 @@ again:
 			orig_changes -= apply_changes;
 			memmove(&h->kq_changes[0], &h->kq_changes[apply_changes],
 									sizeof(h->kq_changes[0])*h->kq_nchanges);
-			apply_changes = orig_changes<h->fd_no ? orig_changes : h->fd_no;
+			apply_changes = (orig_changes < h->kq_array_size) ? orig_changes :
+								h->kq_array_size;
 		} else {
 			orig_changes = 0;
 			apply_changes = 0;
@@ -1171,7 +1172,7 @@ again:
 					watched fd changes will be applied the fd will be valid
 					(so no EBADF), but it's not already watch => ENOENT.
 					We report a BUG for the other errors (there's nothing
-					constructive we can do if we get an error we don't know 
+					constructive we can do if we get an error we don't know
 					how to handle), but apart from that we ignore it in the
 					idea that it is better apply the rest of the changes,
 					rather then dropping all of them.
@@ -1203,21 +1204,21 @@ again:
 						(((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP) |
 						(((int)!((h->kq_array[r].flags & EV_EOF) &&
 								 	h->kq_array[r].fflags != 0) - 1)&POLLERR);
-					while(fm->type && (fm->events & revents) && 
+					while(fm->type && (fm->events & revents) &&
 							(handle_io(fm, revents, -1)>0) && repeat);
 				}else if (h->kq_array[r].filter==EVFILT_WRITE){
 					revents=POLLOUT |
 						(((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP) |
 						(((int)!((h->kq_array[r].flags & EV_EOF) &&
 								 	h->kq_array[r].fflags != 0) - 1)&POLLERR);
-					while(fm->type && (fm->events & revents) && 
+					while(fm->type && (fm->events & revents) &&
 							(handle_io(fm, revents, -1)>0) && repeat);
 				}else{
 					BUG("io_wait_loop_kqueue: unknown filter: kqueue: event "
 							"%d/%d: fd=%d, filter=%d, flags=0x%x, fflags=0x%x,"
 							" data=%lx, udata=%lx\n",
 					r, n, h->kq_array[r].ident, h->kq_array[r].filter,
-					h->kq_array[r].flags, h->kq_array[r].fflags, 
+					h->kq_array[r].flags, h->kq_array[r].fflags,
 					(long)h->kq_array[r].data, (long)h->kq_array[r].udata);
 				}
 			}
@@ -1306,14 +1307,14 @@ again:
 			 *  POLLIN|POLLRDNORM|POLLMSG (=POLL_MSG),
 			 *  POLLERR (=POLL_ERR),
 			 *  POLLPRI|POLLRDBAND (=POLL_PRI),
-			 *  POLLHUP|POLLERR (=POLL_HUP) 
+			 *  POLLHUP|POLLERR (=POLL_HUP)
 			 *  [linux 2.6.22 fs/fcntl.c:447]
 			 */
 #ifdef EXTRA_DEBUG
 			DBG("io_wait_loop_sigio_rt: siginfo: signal=%d (%d),"
 					" si_code=%d, si_band=0x%x,"
 					" si_fd=%d\n",
-					siginfo.si_signo, n, siginfo.si_code, 
+					siginfo.si_signo, n, siginfo.si_code,
 					(unsigned)sigio_band,
 					sigio_fd);
 #endif
@@ -1326,7 +1327,7 @@ again:
 				/* fix revents==POLLPRI case */
 				revents |= (!(revents & POLLPRI)-1) & POLLIN;
 				/* we can have queued signals generated by fds not watched
-			 	 * any more, or by fds in transition, to a child 
+			 	 * any more, or by fds in transition, to a child
 				 * => ignore them */
 				if (fm->type && ((fm->events|POLLERR|POLLHUP) & revents))
 					handle_io(fm, revents, -1);
@@ -1342,7 +1343,7 @@ again:
 			}
 		}
 	}else{
-		/* signal queue overflow 
+		/* signal queue overflow
 		 * TODO: increase signal queue size: 2.4x /proc/.., 2.6x -rlimits */
 		LOG(L_WARN, "WARNING: io_wait_loop_sigio_rt: signal queue overflowed"
 					"- falling back to poll\n");