瀏覽代碼

- fix: partial writes were not taken into account

Andrei Pelinescu-Onciul 17 年之前
父節點
當前提交
5b892e3eee
共有 1 個文件被更改,包括 67 次插入11 次删除
  1. 67 11
      tcp_main.c

+ 67 - 11
tcp_main.c

@@ -638,6 +638,59 @@ error:
 
 
 
+/* unsafe version, call while holding the connection write lock
+ * inserts data at the beginning, it ignores the max queue size checks and
+ * the timeout (use sparingly)
+ * Note: it should never be called on a write buffer after wbufq_run() */
+inline static int _wbufq_insert(struct  tcp_connection* c, char* data, 
+							unsigned int size)
+{
+	struct tcp_wbuffer_queue* q;
+	struct tcp_wbuffer* wb;
+	
+	q=&c->wbuf_q;
+	if (likely(q->first==0)) /* if empty, use wbufq_add */
+		return _wbufq_add(c, data, size);
+	
+	if (unlikely((*tcp_total_wq+size)>tcp_options.tcp_wq_max)){
+		LOG(L_ERR, "ERROR: wbufq_insert(%d bytes): write queue full or timeout"
+					" (%d, total %d, last write %d s ago)\n",
+					size, q->queued, *tcp_total_wq,
+					TICKS_TO_S(get_ticks_raw()-q->wr_timeout-
+										tcp_options.tcp_wq_timeout));
+		goto error;
+	}
+	if (unlikely(q->offset)){
+		LOG(L_CRIT, "BUG: wbufq_insert: non-null offset %d (bad call, should"
+				"never be called after the wbufq_run())\n", q->offset);
+		goto error;
+	}
+	if ((q->first==q->last) && ((q->last->b_size-q->last_used)>=size)){
+		/* one block with enough space in it for size bytes */
+		memmove(q->first->buf+size, q->first->buf, size);
+		memcpy(q->first->buf, data, size);
+		q->last_used+=size;
+	}else{
+		/* create a size bytes block directly */
+		wb=shm_malloc(sizeof(*wb)+size-1);
+		if (unlikely(wb==0))
+			goto error;
+		wb->b_size=size;
+		/* insert it */
+		wb->next=q->first;
+		q->first=wb;
+		memcpy(wb->buf, data, size);
+	}
+	
+	q->queued+=size;
+	atomic_add_int((int*)tcp_total_wq, size);
+	return 0;
+error:
+	return -1;
+}
+
+
+
 /* unsafe version, call while holding the connection write lock */
 inline static void _wbufq_destroy( struct  tcp_wbuffer_queue* q)
 {
@@ -1543,13 +1596,14 @@ no_id:
 				 * desired either lock before the write or use 
 				 * _wbufq_insert(...) */
 				n=_tcpconn_write_nb(fd, c, buf, len);
-				if (unlikely(n<0)){
-					if (errno==EAGAIN|| errno==EWOULDBLOCK){
-						DBG("tcp_send: pending write on new connection (%p)\n",
-								c);
+				if (unlikely(n<len)){
+					if ((n>=0) || errno==EAGAIN || errno==EWOULDBLOCK){
+						DBG("tcp_send: pending write on new connection %p "
+								" (%d/%d bytes written)\n", c, n, len);
+						if (n<0) n=0;
 						/* add to the write queue */
 						lock_get(&c->write_lock);
-							if (unlikely(_wbufq_add(c, buf, len)<0)){
+							if (unlikely(_wbufq_insert(c, buf+n, len-n)<0)){
 								lock_release(&c->write_lock);
 								n=-1;
 								LOG(L_ERR, "ERROR: tcp_send: EAGAIN and"
@@ -1735,12 +1789,13 @@ send_it:
 	
 	DBG("tcp_send: after real write: c= %p n=%d fd=%d\n",c, n, fd);
 	DBG("tcp_send: buf=\n%.*s\n", (int)len, buf);
-	if (unlikely(n<0)){
+	if (unlikely(n<len)){
 #ifdef TCP_BUF_WRITE
 		if (tcp_options.tcp_buf_write && 
-				(errno==EAGAIN || errno==EWOULDBLOCK)){
+				((n>=0) || errno==EAGAIN || errno==EWOULDBLOCK)){
 			enable_write_watch=_wbufq_empty(c);
-			if (unlikely(_wbufq_add(c, buf, len)<0)){
+			if (n<0) n=0;
+			if (unlikely(_wbufq_add(c, buf+n, len-n)<0)){
 				lock_release(&c->write_lock);
 				n=-1;
 				goto error;
@@ -2086,14 +2141,15 @@ inline static int tcpconn_put_destroy(struct tcp_connection* tcpconn)
 		tcpconn_destroy(tcpconn);
 		return 1;
 	}else{
+		tcpconn->state=S_CONN_BAD;
+		/* in case it's still in a reader timer */
+		tcpconn->timeout=get_ticks_raw();
+		/* fast close: close fds now */
 		if (likely(!(tcpconn->flags & F_CONN_FD_CLOSED))){
 			tcpconn_close_main_fd(tcpconn);
 			tcpconn->flags|=F_CONN_FD_CLOSED;
 			(*tcp_connections_no)--;
 		}
-		tcpconn->state=S_CONN_BAD;
-		/* in case it's still in a reader timer */
-		tcpconn->timeout=get_ticks_raw();
 	}
 	return 0;
 }