浏览代码

- avoid extra read syscalls (detect short reads) in poll_et or sigio_rt case
(should improve performace)
- if POLLRDHUP or EPOLLRDHUP are supported (linux >= 2.6.17) use them
and avoid an extra syscall on EOF
- on write error try to see if there's still some data in the socket read
buffer and try to process it first (if there's no more data do a quick
connection destroy)

Andrei Pelinescu-Onciul 17 年之前
父节点
当前提交
7498b4dc09
共有 3 个文件被更改,包括 156 次插入53 次删除
  1. 4 0
      tcp_conn.h
  2. 74 13
      tcp_main.c
  3. 78 40
      tcp_read.c

+ 4 - 0
tcp_conn.h

@@ -75,6 +75,10 @@
 #define F_CONN_FD_CLOSED   32 /* fd was already closed */
 #define F_CONN_PENDING     64 /* pending connect  (fd not known yet in main) */
 #define F_CONN_MAIN_TIMER 128 /* timer active in the tcp_main process */
+#define F_CONN_EOF_SEEN   256 /* FIN or RST have been received */
+#define F_CONN_FORCE_EOF  512 /* act as if an EOF was received */
+#define F_CONN_OOB_DATA  1024 /* out of band data on the connection */
+#define F_CONN_WR_ERROR  2048 /* write error on the fd */
 
 
 enum tcp_req_errors {	TCP_REQ_INIT, TCP_REQ_OK, TCP_READ_ERROR,

+ 74 - 13
tcp_main.c

@@ -93,6 +93,10 @@
  *                linked into the hash tables (was 0) (andrei)
  *  2007-12-21  support for pending connects (connections are added to the
  *               hash immediately and writes on them are buffered) (andrei)
+ *  2008-02-05  handle POLLRDHUP (if supported), POLLERR and
+ *               POLLHUP (andrei)
+ *              on write error check if there's still data in the socket 
+ *               read buffer and process it first (andrei)
  */
 
 
@@ -103,10 +107,15 @@
 #error "shared memory support needed (add -DSHM_MEM to Makefile.defs)"
 #endif
 
+#define HANDLE_IO_INLINE
+#include "io_wait.h" /* include first to make sure the needed features are
+						turned on (e.g. _GNU_SOURCE for POLLRDHUP) */
+
 #include <sys/time.h>
 #include <sys/types.h>
 #include <sys/select.h>
 #include <sys/socket.h>
+#include <sys/ioctl.h>  /* ioctl() used on write error */
 #include <netinet/in.h>
 #include <netinet/in_systm.h>
 #include <netinet/ip.h>
@@ -157,8 +166,6 @@
 #define local_malloc pkg_malloc
 #define local_free   pkg_free
 
-#define HANDLE_IO_INLINE
-#include "io_wait.h"
 #include <fcntl.h> /* must be included after io_wait.h if SIGIO_RT is used */
 
 
@@ -2553,7 +2560,7 @@ inline static int handle_tcp_child(struct tcp_child* tcp_c, int fd_i)
 			local_timer_add(&tcp_main_ltimer, &tcpconn->timer, crt_timeout, t);
 			/* must be after the de-ref*/
 			tcpconn->flags|=F_CONN_MAIN_TIMER;
-			tcpconn->flags&=~(F_CONN_REMOVED|F_CONN_READER);
+			tcpconn->flags&=~(F_CONN_REMOVED|F_CONN_READER|F_CONN_OOB_DATA);
 #ifdef TCP_BUF_WRITE
 			if (unlikely(tcpconn->flags & F_CONN_WRITE_W))
 				n=io_watch_chg(&io_h, tcpconn->s, POLLIN| POLLOUT, -1);
@@ -3080,6 +3087,7 @@ inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, short ev,
 {
 #ifdef TCP_BUF_WRITE
 	int empty_q;
+	int bytes;
 #endif /* TCP_BUF_WRITE */
 	/*  is refcnt!=0 really necessary? 
 	 *  No, in fact it's a bug: I can have the following situation: a send only
@@ -3100,13 +3108,36 @@ inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, short ev,
 #endif
 	/* pass it to child, so remove it from the io watch list  and the local
 	 *  timer */
-	DBG("handle_tcpconn_ev: ev (%0x) on %p %d\n", ev, tcpconn, tcpconn->s);
 #ifdef TCP_BUF_WRITE
-	if (unlikely((ev & POLLOUT) && (tcpconn->flags & F_CONN_WRITE_W))){
-		if (unlikely(wbufq_run(tcpconn->s, tcpconn, &empty_q)<0)){
-			io_watch_del(&io_h, tcpconn->s, fd_i, 0);
-			tcpconn->flags|=F_CONN_REMOVED;
+	if (unlikely((ev & (POLLOUT|POLLERR|POLLHUP)) &&
+					(tcpconn->flags & F_CONN_WRITE_W))){
+		if (unlikely((ev & (POLLERR|POLLHUP)) || 
+					(wbufq_run(tcpconn->s, tcpconn, &empty_q)<0))){
+			if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i, 0)<0)){
+				LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_del(1) failed:"
+							" for %p, fd %d\n", tcpconn, tcpconn->s);
+			}
+			if (!(tcpconn->flags & F_CONN_REMOVED) && (ev & POLLIN)){
+				/* connection is watched for read and there is a read event
+				 * (unfortunately if we have POLLIN here we don't know if 
+				 * there's really any data in the read buffer or the POLLIN
+				 * was generated by the error or EOF => to avoid loosing
+				 *  data it's safer to either directly check the read buffer 
+				 *  or *  try a read)*/
+				/* in most cases the read buffer will be empty, so in general
+				 * is cheaper to check it here and then send the 
+				 * conn.  to a a child only if needed (another syscall + at 
+				 * least 2 * syscalls in the reader + ...) */
+				if ((ioctl(tcpconn->s, FIONREAD, &bytes)>=0) && (bytes>0)){
+					tcpconn->flags&=~F_CONN_WRITE_W;
+					tcpconn->flags|=F_CONN_REMOVED;
+					tcpconn->flags|=F_CONN_FORCE_EOF|F_CONN_WR_ERROR;
+					goto send_to_child;
+				}
+				/* if bytes==0 or ioctl failed, destroy the connection now */
+			}
 			tcpconn->flags&=~F_CONN_WRITE_W;
+			tcpconn->flags|=F_CONN_REMOVED;
 			if (unlikely(!tcpconn_try_unhash(tcpconn))){
 				LOG(L_CRIT, "BUG: tcpconn_ev: unhashed connection %p\n",
 							tcpconn);
@@ -3116,12 +3147,20 @@ inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, short ev,
 		}
 		if (empty_q){
 			if (tcpconn->flags & F_CONN_REMOVED){
-				if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i, 0)==-1))
+				if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i, 0)==-1)){
+					LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_del(2)"
+								" failed:" " for %p, fd %d\n",
+								tcpconn, tcpconn->s);
 					goto error;
+				}
 			}else{
 				if (unlikely(io_watch_chg(&io_h, tcpconn->s,
-											POLLIN, fd_i)==-1))
+											POLLIN, fd_i)==-1)){
+					LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_chg(1)"
+								" failed:" " for %p, fd %d\n",
+								tcpconn, tcpconn->s);
 					goto error;
+				}
 			}
 			tcpconn->flags&=~F_CONN_WRITE_W;
 		}
@@ -3133,15 +3172,33 @@ inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, short ev,
 		 * child and stop watching it for input (but continue watching for
 		 *  writes if needed): */
 		if (unlikely(tcpconn->flags & F_CONN_WRITE_W)){
-			if (unlikely(io_watch_chg(&io_h, tcpconn->s, POLLOUT, fd_i)==-1))
+			if (unlikely(io_watch_chg(&io_h, tcpconn->s, POLLOUT, fd_i)==-1)){
+				LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_chg(2)"
+							" failed:" " for %p, fd %d\n",
+							tcpconn, tcpconn->s);
 				goto error;
+			}
 		}else
 #else
 	{
 #endif /* TCP_BUF_WRITE */
-			if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i, 0)==-1))
+			if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i, 0)==-1)){
+				LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_del(3)"
+							" failed:" " for %p, fd %d\n",
+							tcpconn, tcpconn->s);
 				goto error;
+			}
+#ifdef TCP_BUF_WRITE
+send_to_child:
+#endif
 		DBG("tcp: DBG: sendig to child, events %x\n", ev);
+#ifdef POLLRDHUP
+		tcpconn->flags|=((int)!(ev & (POLLRDHUP|POLLHUP|POLLERR)) -1) & 
+							F_CONN_EOF_SEEN;
+#else /* POLLRDHUP */
+		tcpconn->flags|=((int)!(ev & (POLLHUP|POLLERR)) -1) & F_CONN_EOF_SEEN;
+#endif /* POLLRDHUP */
+		tcpconn->flags|= ((int)!(ev & POLLPRI) -1)  & F_CONN_OOB_DATA;
 		tcpconn->flags|=F_CONN_REMOVED|F_CONN_READER;
 		local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
 		tcpconn->flags&=~F_CONN_MAIN_TIMER;
@@ -3151,7 +3208,11 @@ inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, short ev,
 			tcpconn->flags&=~F_CONN_READER;
 #ifdef TCP_BUF_WRITE
 			if (tcpconn->flags & F_CONN_WRITE_W){
-				io_watch_del(&io_h, tcpconn->s, fd_i, 0);
+				if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i, 0)<0)){
+					LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_del(4)"
+							" failed:" " for %p, fd %d\n",
+							tcpconn, tcpconn->s);
+				}
 				tcpconn->flags&=~F_CONN_WRITE_W;
 			}
 #endif /* TCP_BUF_WRITE */

+ 78 - 40
tcp_read.c

@@ -38,6 +38,8 @@
  * 2006-10-13  added STUN support - state machine for TCP (vlada)
  * 2007-02-20  fixed timeout calc. bug (andrei)
  * 2007-11-26  improved tcp timers: switched to local_timer (andrei)
+ * 2008-02-04  optimizations: handle POLLRDHUP (if supported), detect short
+ *              reads (sock. buffer empty) (andrei)
  */
 
 #ifdef USE_TCP
@@ -86,6 +88,10 @@ int is_msg_complete(struct tcp_req* r);
 
 #define TCPCONN_TIMEOUT_MIN_RUN  1 /* run the timers each new tick */
 
+#define RD_CONN_SHORT_READ	1
+#define RD_CONN_EOF		2
+#define RD_CONN_FORCE_EOF	65536
+
 /* types used in io_wait* */
 enum fd_types { F_NONE, F_TCPMAIN, F_TCPCONN };
 
@@ -99,11 +105,19 @@ static ticks_t tcp_reader_prev_ticks;
 
 
 /* reads next available bytes
+ *   c- tcp connection used for reading, tcp_read changes also c->state on
+ *      EOF and c->req.error on read error
+ *   * flags - value/result - used to signal a seen or "forced" EOF on the 
+ *     connection (when it is known that no more data will come after the 
+ *     current socket buffer is emptied )=> return/signal EOF on the first 
+ *     short read (=> don't use it on POLLPRI, as OOB data will cause short
+ *      reads even if there are still remaining bytes in the socket buffer)
  * return number of bytes read, 0 on EOF or -1 on error,
- * on EOF it also sets c->state to S_CONN_EOF
+ * on EOF it also sets c->state to S_CONN_EOF.
  * (to distinguish from reads that would block which could return 0)
+ * RD_CONN_SHORT_READ is also set in *flags for short reads.
  * sets also r->error */
-int tcp_read(struct tcp_connection *c)
+int tcp_read(struct tcp_connection *c, int* flags)
 {
 	int bytes_free, bytes_read;
 	struct tcp_req *r;
@@ -121,19 +135,26 @@ int tcp_read(struct tcp_connection *c)
 again:
 	bytes_read=read(fd, r->pos, bytes_free);
 
-	if(bytes_read==-1){
-		if (errno == EWOULDBLOCK || errno == EAGAIN){
-			return 0; /* nothing has been read */
-		}else if (errno == EINTR) goto again;
-		else{
-			LOG(L_ERR, "ERROR: tcp_read: error reading: %s\n",strerror(errno));
-			r->error=TCP_READ_ERROR;
-			return -1;
+	if (likely(bytes_read!=bytes_free)){
+		if(unlikely(bytes_read==-1)){
+			if (errno == EWOULDBLOCK || errno == EAGAIN){
+				bytes_read=0; /* nothing has been read */
+			}else if (errno == EINTR) goto again;
+			else{
+				LOG(L_ERR, "ERROR: tcp_read: error reading: %s (%d)\n",
+							strerror(errno), errno);
+				r->error=TCP_READ_ERROR;
+				return -1;
+			}
+		}else if (unlikely((bytes_read==0) || 
+					(*flags & RD_CONN_FORCE_EOF))){
+			c->state=S_CONN_EOF;
+			*flags|=RD_CONN_EOF;
+			DBG("tcp_read: EOF on %p, FD %d\n", c, fd);
 		}
-	}else if (bytes_read==0){
-		c->state=S_CONN_EOF;
-		DBG("tcp_read: EOF on %p, FD %d\n", c, fd);
-	}
+		/* short read */
+		*flags|=RD_CONN_SHORT_READ;
+	} /* else normal full read */
 #ifdef EXTRA_DEBUG
 	DBG("tcp_read: read %d bytes:\n%.*s\n", bytes_read, bytes_read, r->pos);
 #endif
@@ -152,7 +173,7 @@ again:
  * when either r->body!=0 or r->state==H_BODY =>
  * all headers have been read. It should be called in a while loop.
  * returns < 0 if error or 0 if EOF */
-int tcp_read_headers(struct tcp_connection *c)
+int tcp_read_headers(struct tcp_connection *c, int* read_flags)
 {
 	int bytes, remaining;
 	char *p;
@@ -206,15 +227,15 @@ int tcp_read_headers(struct tcp_connection *c)
 
 	r=&c->req;
 	/* if we still have some unparsed part, parse it first, don't do the read*/
-	if (r->parsed<r->pos){
+	if (unlikely(r->parsed<r->pos)){
 		bytes=0;
 	}else{
 #ifdef USE_TLS
-		if (c->type==PROTO_TLS)
-			bytes=tls_read(c);
+		if (unlikely(c->type==PROTO_TLS))
+			bytes=tls_read(c); /* FIXME: read_flags support */
 		else
 #endif
-			bytes=tcp_read(c);
+			bytes=tcp_read(c, read_flags);
 		if (bytes<=0) return bytes;
 	}
 	p=r->parsed;
@@ -511,7 +532,7 @@ skip:
 
 
 
-int tcp_read_req(struct tcp_connection* con, int* bytes_read)
+int tcp_read_req(struct tcp_connection* con, int* bytes_read, int* read_flags)
 {
 	int bytes;
 	int total_bytes;
@@ -538,8 +559,8 @@ int tcp_read_req(struct tcp_connection* con, int* bytes_read)
 #endif
 
 again:
-		if(req->error==TCP_REQ_OK){
-			bytes=tcp_read_headers(con);
+		if (likely(req->error==TCP_REQ_OK)){
+			bytes=tcp_read_headers(con, read_flags);
 #ifdef EXTRA_DEBUG
 						/* if timeout state=0; goto end__req; */
 			DBG("read= %d bytes, parsed=%d, state=%d, error=%d\n",
@@ -549,7 +570,7 @@ again:
 					*(req->parsed-1), (int)(req->parsed-req->start),
 					req->start);
 #endif
-			if (bytes==-1){
+			if (unlikely(bytes==-1)){
 				LOG(L_ERR, "ERROR: tcp_read_req: error reading \n");
 				resp=CONN_ERROR;
 				goto end_req;
@@ -560,14 +581,14 @@ again:
 			 * if req. is complete we might have a second unparsed
 			 * request after it, so postpone release_with_eof
 			 */
-			if ((con->state==S_CONN_EOF) && (req->complete==0)) {
+			if (unlikely((con->state==S_CONN_EOF) && (req->complete==0))) {
 				DBG( "tcp_read_req: EOF\n");
 				resp=CONN_EOF;
 				goto end_req;
 			}
 		
 		}
-		if (req->error!=TCP_REQ_OK){
+		if (unlikely(req->error!=TCP_REQ_OK)){
 			LOG(L_ERR,"ERROR: tcp_read_req: bad request, state=%d, error=%d "
 					  "buf:\n%.*s\nparsed:\n%.*s\n", req->state, req->error,
 					  (int)(req->pos-req->buf), req->buf,
@@ -577,7 +598,7 @@ again:
 			resp=CONN_ERROR;
 			goto end_req;
 		}
-		if (req->complete){
+		if (likely(req->complete)){
 #ifdef EXTRA_DEBUG
 			DBG("tcp_read_req: end of header part\n");
 			DBG("- received from: port %d\n", con->rcv.src_port);
@@ -585,7 +606,7 @@ again:
 			DBG("tcp_read_req: headers:\n%.*s.\n",
 					(int)(req->body-req->start), req->start);
 #endif
-			if (req->has_content_len){
+			if (likely(req->has_content_len)){
 				DBG("tcp_read_req: content-length= %d\n", req->content_len);
 #ifdef EXTRA_DEBUG
 				DBG("tcp_read_req: body:\n%.*s\n", req->content_len,req->body);
@@ -637,26 +658,28 @@ again:
 			
 			/* prepare for next request */
 			size=req->pos-req->parsed;
-			if (size) memmove(req->buf, req->parsed, size);
-#ifdef EXTRA_DEBUG
-			DBG("tcp_read_req: preparing for new request, kept %ld bytes\n",
-					size);
-#endif
-			req->pos=req->buf+size;
-			req->parsed=req->buf;
 			req->start=req->buf;
 			req->body=0;
 			req->error=TCP_REQ_OK;
 			req->state=H_SKIP_EMPTY;
 			req->complete=req->content_len=req->has_content_len=0;
 			req->bytes_to_go=0;
-			/* if we still have some unparsed bytes, try to  parse them too*/
-			if (size) goto again;
-			else if (con->state==S_CONN_EOF){
+			req->pos=req->buf+size;
+			
+			if (unlikely(size)){ 
+				memmove(req->buf, req->parsed, size);
+				req->parsed=req->buf; /* fix req->parsed after using it */
+#ifdef EXTRA_DEBUG
+				DBG("tcp_read_req: preparing for new request, kept %ld"
+						" bytes\n", size);
+#endif
+				/*if we still have some unparsed bytes, try to parse them too*/
+				goto again;
+			} else if (unlikely(con->state==S_CONN_EOF)){
 				DBG( "tcp_read_req: EOF after reading complete request\n");
 				resp=CONN_EOF;
 			}
-			
+			req->parsed=req->buf; /* fix req->parsed */
 		}
 		
 		
@@ -732,6 +755,7 @@ inline static int handle_io(struct fd_map* fm, short events, int idx)
 {	
 	int ret;
 	int n;
+	int read_flags;
 	struct tcp_connection* con;
 	int s;
 	long resp;
@@ -787,7 +811,10 @@ again:
 			}
 			/* if we received the fd there is most likely data waiting to
 			 * be read => process it first to avoid extra sys calls */
-			resp=tcp_read_req(con, &n);
+			read_flags=((con->flags & (F_CONN_EOF_SEEN|F_CONN_FORCE_EOF)) && 
+						!(con->flags & F_CONN_OOB_DATA))? RD_CONN_FORCE_EOF
+						:0;
+			resp=tcp_read_req(con, &n, &read_flags);
 			if (unlikely(resp<0)){
 				/* some error occured, but on the new fd, not on the tcp
 				 * main fd, so keep the ret value */
@@ -829,7 +856,14 @@ again:
 							con, con->id, atomic_get(&con->refcnt));
 				goto read_error;
 			}
-			resp=tcp_read_req(con, &ret);
+#ifdef POLLRDHUP
+			read_flags=(((events & POLLRDHUP) | 
+							(con->flags & (F_CONN_EOF_SEEN|F_CONN_FORCE_EOF)))
+						&& !(events & POLLPRI))? RD_CONN_FORCE_EOF: 0;
+#else /* POLLRDHUP */
+			read_flags=0;
+#endif /* POLLRDHUP */
+			resp=tcp_read_req(con, &ret, &read_flags);
 			if (unlikely(resp<0)){
 read_error:
 				ret=-1; /* some error occured */
@@ -849,6 +883,10 @@ read_error:
 			}else{
 				/* update timeout */
 				con->timeout=get_ticks_raw()+S_TO_TICKS(TCP_CHILD_TIMEOUT);
+				/* ret= 0 (read the whole socket buffer) if short read & 
+				 *  !POLLPRI,  bytes read otherwise */
+				ret&=(((read_flags & RD_CONN_SHORT_READ) && 
+						!(events & POLLPRI)) - 1);
 			}
 			break;
 		case F_NONE: