Browse Source

io_wait: kqueue: handle ENOENT and more robust error handling

- handle also ENOENT (along EBADF) when kevent fails due to errors
  in the changelist. ENOENT can be returned in the following valid
  scenario: fd scheduled for delayed removal from the watched fd
  list, fd closed (which automatically removes the fd from the
  kqueue watched list), new opened fd which gets the same number,
  delayed changes applied (kevent()).
- treat all the other kevent errors or EV_ERRORs in a similar way
  but log them (at BUG() level).
- return POLLERR|POLLHUP for EV_EOF with a non-null fflags.

(only kqueue, meaning *bsd and darwin are affected by this fix)
Andrei Pelinescu-Onciul 15 years ago
parent
commit
8966d0a178
1 changed files with 56 additions and 36 deletions
  1. 56 36
      io_wait.h

+ 56 - 36
io_wait.h

@@ -259,34 +259,33 @@ static inline int kq_ev_change(io_wait_h* h, int fd, int filter, int flag,
 again:
 again:
 		n=kevent(h->kq_fd, h->kq_changes, h->kq_nchanges, 0, 0, &tspec);
 		n=kevent(h->kq_fd, h->kq_changes, h->kq_nchanges, 0, 0, &tspec);
 		if (unlikely(n == -1)){
 		if (unlikely(n == -1)){
-			if (likely(errno == EBADF)) {
+			if (unlikely(errno == EINTR)) goto again;
+			else {
+				/* for a detailed explanation of what follows see
+				   io_wait_loop_kqueue EV_ERROR case */
+				if (unlikely(!(errno == EBADF || errno == ENOENT)))
+					BUG("kq_ev_change: kevent flush changes failed"
+							" (unexpected error): %s [%d]\n",
+							strerror(errno), errno);
+					/* ignore error even if it's not a EBADF/ENOENT */
 				/* one of the file descriptors is bad, probably already
 				/* one of the file descriptors is bad, probably already
 				   closed => try to apply changes one-by-one */
 				   closed => try to apply changes one-by-one */
 				for (r = 0; r < h->kq_nchanges; r++) {
 				for (r = 0; r < h->kq_nchanges; r++) {
 retry2:
 retry2:
 					n = kevent(h->kq_fd, &h->kq_changes[r], 1, 0, 0, &tspec);
 					n = kevent(h->kq_fd, &h->kq_changes[r], 1, 0, 0, &tspec);
 					if (n==-1) {
 					if (n==-1) {
-						if (errno == EBADF)
-							continue; /* skip over it */
-						if (errno == EINTR)
+						if (unlikely(errno == EINTR))
 							goto retry2;
 							goto retry2;
-						LOG(L_ERR, "ERROR: io_watch_add: kevent flush changes"
-									" failed: %s [%d]\n",
-										strerror(errno), errno);
-						/* shift the array */
-						memmove(&h->kq_changes[0], &h->kq_changes[r+1],
-									sizeof(h->kq_changes[0])*
-										(h->kq_nchanges-r-1));
-						h->kq_nchanges-=(r+1);
-						return -1;
+					/* for a detailed explanation of what follows see
+						io_wait_loop_kqueue EV_ERROR case */
+						if (unlikely(!(errno == EBADF || errno == ENOENT)))
+							BUG("kq_ev_change: kevent flush changes failed:"
+									" (unexpected error) %s [%d] (%d/%d)\n",
+										strerror(errno), errno,
+										r, h->kq_nchanges);
+						continue; /* skip over it */
 					}
 					}
 				}
 				}
-			} else if (errno == EINTR) goto again;
-			else {
-				LOG(L_ERR, "ERROR: io_watch_add: kevent flush changes"
-						" failed: %s [%d]\n", strerror(errno), errno);
-				h->kq_nchanges=0; /* reset changes array */
-				return -1;
 			}
 			}
 		}
 		}
 		h->kq_nchanges=0; /* changes array is empty */
 		h->kq_nchanges=0; /* changes array is empty */
@@ -1118,17 +1117,18 @@ again:
 		n=kevent(h->kq_fd, h->kq_changes, apply_changes,  h->kq_array,
 		n=kevent(h->kq_fd, h->kq_changes, apply_changes,  h->kq_array,
 					h->fd_no, &tspec);
 					h->fd_no, &tspec);
 		if (unlikely(n==-1)){
 		if (unlikely(n==-1)){
-			if (errno==EINTR) goto again; /* signal, ignore it */
-			else if (errno==EBADF) {
+			if (unlikely(errno==EINTR)) goto again; /* signal, ignore it */
+			else {
+				/* for a detailed explanation of what follows see below
+				   the EV_ERROR case */
+				if (unlikely(!(errno==EBADF || errno==ENOENT)))
+					BUG("io_wait_loop_kqueue: kevent: unexpected error"
+						" %s [%d]\n", strerror(errno), errno);
 				/* some of the FDs in kq_changes are bad (already closed)
 				/* some of the FDs in kq_changes are bad (already closed)
 				   and there is not enough space in kq_array to return all
 				   and there is not enough space in kq_array to return all
 				   of them back */
 				   of them back */
 				apply_changes = h->fd_no;
 				apply_changes = h->fd_no;
 				goto again;
 				goto again;
-			}else{
-				LOG(L_ERR, "ERROR: io_wait_loop_kqueue: kevent:"
-						" %s [%d]\n", strerror(errno), errno);
-				goto error;
 			}
 			}
 		}
 		}
 		/* remove applied changes */
 		/* remove applied changes */
@@ -1148,14 +1148,13 @@ again:
 					r, n, h->kq_array[r].ident, (long)h->kq_array[r].udata,
 					r, n, h->kq_array[r].ident, (long)h->kq_array[r].udata,
 					h->kq_array[r].flags);
 					h->kq_array[r].flags);
 #endif
 #endif
-			if (unlikely((h->kq_array[r].flags & EV_ERROR) &&
-							(h->kq_array[r].data == EBADF ||
-							 h->kq_array[r].udata == 0))){
+			if (unlikely((h->kq_array[r].flags & EV_ERROR) ||
+							 h->kq_array[r].udata == 0)){
 				/* error in changes: we ignore it if it has to do with a
 				/* error in changes: we ignore it if it has to do with a
 				   bad fd or update==0. It can be caused by trying to remove an
 				   bad fd or update==0. It can be caused by trying to remove an
 				   already closed fd: race between adding something to the
 				   already closed fd: race between adding something to the
-				   changes array, close() and applying the changes.
-				   E.g. for ser tcp: tcp_main sends a fd to child fore reading
+				   changes array, close() and applying the changes (EBADF).
+				   E.g. for ser tcp: tcp_main sends a fd to child for reading
 				    => deletes it from the watched fds => the changes array
 				    => deletes it from the watched fds => the changes array
 					will contain an EV_DELETE for it. Before the changes
 					will contain an EV_DELETE for it. Before the changes
 					are applied (they are at the end of the main io_wait loop,
 					are applied (they are at the end of the main io_wait loop,
@@ -1163,6 +1162,16 @@ again:
 					to tcp_main by a sender (send fail) is processed and causes
 					to tcp_main by a sender (send fail) is processed and causes
 					the fd to be closed. When the changes are applied =>
 					the fd to be closed. When the changes are applied =>
 					error for the EV_DELETE attempt of a closed fd.
 					error for the EV_DELETE attempt of a closed fd.
+					Something similar can happen when a fd is scheduled
+					for removal, is close()'ed before being removed and
+					re-opened(a new sock. get the same fd). When the
+					watched fd changes will be applied the fd will be valid
+					(so no EBADF), but it's not already watch => ENOENT.
+					We report a BUG for the other errors (there's nothing
+					constructive we can do if we get an error we don't know 
+					how to handle), but apart from that we ignore it in the
+					idea that it is better apply the rest of the changes,
+					rather then dropping all of them.
 				*/
 				*/
 				/*
 				/*
 					example EV_ERROR for trying to delete a read watched fd,
 					example EV_ERROR for trying to delete a read watched fd,
@@ -1176,9 +1185,12 @@ again:
 						udata = 0x0
 						udata = 0x0
 					}
 					}
 				*/
 				*/
-				if (h->kq_array[r].data != EBADF)
-					LOG(L_INFO, "INFO: io_wait_loop_kqueue: kevent error on "
-							"fd %ld: %s [%ld]\n", (long)h->kq_array[r].ident,
+				if (h->kq_array[r].data != EBADF &&
+						h->kq_array[r].data != ENOENT)
+					BUG("io_wait_loop_kqueue: kevent unexpected error on "
+							"fd %ld udata %lx: %s [%ld]\n",
+							(long)h->kq_array[r].ident,
+							(long)h->kq_array[r].udata,
 							strerror(h->kq_array[r].data),
 							strerror(h->kq_array[r].data),
 							(long)h->kq_array[r].data);
 							(long)h->kq_array[r].data);
 			}else{
 			}else{
@@ -1186,20 +1198,28 @@ again:
 				if (likely(h->kq_array[r].filter==EVFILT_READ)){
 				if (likely(h->kq_array[r].filter==EVFILT_READ)){
 					revents=POLLIN |
 					revents=POLLIN |
 						(((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP) |
 						(((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP) |
-						(((int)!(h->kq_array[r].flags & EV_ERROR)-1)&POLLERR);
+						(((int)!((h->kq_array[r].flags & EV_EOF) &&
+								 	h->kq_array[r].fflags != 0) - 1)&POLLERR);
 					while(fm->type && (fm->events & revents) && 
 					while(fm->type && (fm->events & revents) && 
 							(handle_io(fm, revents, -1)>0) && repeat);
 							(handle_io(fm, revents, -1)>0) && repeat);
 				}else if (h->kq_array[r].filter==EVFILT_WRITE){
 				}else if (h->kq_array[r].filter==EVFILT_WRITE){
 					revents=POLLOUT |
 					revents=POLLOUT |
 						(((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP) |
 						(((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP) |
-						(((int)!(h->kq_array[r].flags & EV_ERROR)-1)&POLLERR);
+						(((int)!((h->kq_array[r].flags & EV_EOF) &&
+								 	h->kq_array[r].fflags != 0) - 1)&POLLERR);
 					while(fm->type && (fm->events & revents) && 
 					while(fm->type && (fm->events & revents) && 
 							(handle_io(fm, revents, -1)>0) && repeat);
 							(handle_io(fm, revents, -1)>0) && repeat);
+				}else{
+					BUG("io_wait_loop_kqueue: unknown filter: kqueue: event "
+							"%d/%d: fd=%d, filter=%d, flags=0x%x, fflags=0x%x,"
+							" data=%lx, udata=%lx\n",
+					r, n, h->kq_array[r].ident, h->kq_array[r].filter,
+					h->kq_array[r].flags, h->kq_array[r].fflags, 
+					(long)h->kq_array[r].data, (long)h->kq_array[r].udata);
 				}
 				}
 			}
 			}
 		}
 		}
 	} while(unlikely(orig_changes));
 	} while(unlikely(orig_changes));
-error:
 	return n;
 	return n;
 }
 }
 #endif
 #endif