Browse Source

tcp: tcp_send() split in 3 smaller functions

- tcp_send() split into 3 smaller internal functions (needed for
  future tls use) + minor cleanups.
- CONN_QUEUED_WRITE now auto-decrements the connection refcnt.
Andrei Pelinescu-Onciul 15 years ago
parent
commit
0c7e84ff04
1 changed files with 244 additions and 114 deletions
  1. 244 114
      tcp_main.c

+ 244 - 114
tcp_main.c

@@ -1717,7 +1717,12 @@ inline static void tcp_fd_cache_add(struct tcp_connection *c, int fd)
 
 inline static int tcpconn_chld_put(struct tcp_connection* tcpconn);
 
+static int tcpconn_do_send(int fd, struct tcp_connection* c,
+							char* buf, unsigned len,
+							snd_flags_t send_flags, long* resp);
 
+static int tcpconn_send_put(struct tcp_connection* c, char* buf, unsigned len,
+							snd_flags_t send_flags);
 
 /* finds a tcpconn & sends on it
  * uses the dst members to, proto (TCP|TLS) and id and tries to send
@@ -1728,7 +1733,6 @@ int tcp_send(struct dest_info* dst, union sockaddr_union* from,
 					char* buf, unsigned len)
 {
 	struct tcp_connection *c;
-	struct tcp_connection *tmp;
 	struct ip_addr ip;
 	int port;
 	int fd;
@@ -1736,16 +1740,7 @@ int tcp_send(struct dest_info* dst, union sockaddr_union* from,
 	int n;
 	int do_close_fd;
 	ticks_t con_lifetime;
-#ifdef TCP_ASYNC
-	int enable_write_watch;
-#endif /* TCP_ASYNC */
-#ifdef TCP_FD_CACHE
-	struct fd_cache_entry* fd_cache_e;
-	int use_fd_cache;
 	
-	use_fd_cache=cfg_get(tcp, tcp_cfg, fd_cache);
-	fd_cache_e=0;
-#endif /* TCP_FD_CACHE */
 	do_close_fd=1; /* close the fd on exit */
 	port=su_getport(&dst->to);
 	con_lifetime=cfg_get(tcp, tcp_cfg, con_lifetime);
@@ -1890,7 +1885,7 @@ int tcp_send(struct dest_info* dst, union sockaddr_union* from,
 							goto conn_wait_error;
 						}
 						n=len;
-						goto end;
+						goto conn_wait_success;
 					}
 					/* if first write failed it's most likely a
 					   connect error */
@@ -1945,7 +1940,7 @@ int tcp_send(struct dest_info* dst, union sockaddr_union* from,
 								c, strerror(errno), errno);
 					goto conn_wait_error;
 				}
-				goto end;
+				goto conn_wait_success;
 			}
 #endif /* TCP_CONNECT_WAIT  && TCP_ASYNC */
 			if (unlikely((c=tcpconn_connect(&dst->to, from, dst->proto,
@@ -1977,11 +1972,129 @@ int tcp_send(struct dest_info* dst, union sockaddr_union* from,
 				n=-1;
 				goto end_no_conn;
 			}
-			goto send_it;
+			/* new connection => send on it directly */
+			n = tcpconn_do_send(fd, c, buf, len, dst->send_flags,
+									&response[1]);
+			if (unlikely(response[1] != CONN_NOP)) {
+				response[0]=(long)c;
+				if (send_all(unix_tcp_sock, response, sizeof(response)) <= 0) {
+					BUG("tcp_main command %ld sending failed (write):"
+							"%s (%d)\n", response[1], strerror(errno), errno);
+					/* all commands != CONN_NOP returned by tcpconn_do_send()
+					   (CONN_EOF, CONN_ERROR, CONN_QUEUED_WRITE) will auto-dec
+					   refcnt => if sending the command fails we have to
+					   dec. refcnt by hand */
+					tcpconn_chld_put(c); /* deref. it manually */
+					n=-1;
+				}
+				/* here refcnt for c is already decremented => c contents can
+				   no longer be used and refcnt _must_ _not_ be decremented
+				   again on exit */
+				if (unlikely(n < 0 || response[1] == CONN_EOF)) {
+					/* on error or eof, close fd */
+					close(fd);
+				} else if (response[1] == CONN_QUEUED_WRITE) {
+#ifdef TCP_FD_CACHE
+					if (cfg_get(tcp, tcp_cfg, fd_cache)) {
+						tcp_fd_cache_add(c, fd);
+					} else
+#endif /* TCP_FD_CACHE */
+						close(fd);
+				} else {
+					BUG("unexpected tcpconn_do_send() return & response:"
+							" %d, %ld\n", n, response[1]);
+				}
+				goto end_no_deref;
+			}
+#ifdef TCP_FD_CACHE
+			if (cfg_get(tcp, tcp_cfg, fd_cache)) {
+				tcp_fd_cache_add(c, fd);
+			}else
+#endif /* TCP_FD_CACHE */
+				close(fd);
+		/* here we can have only commands that _do_ _not_ dec refcnt.
+		   (CONN_EOF, CON_ERROR, CON_QUEUED_WRITE are all treated above) */
+			goto release_c;
 		}
 /* get_fd: */
+	/* existing connection, send on it */
+	n = tcpconn_send_put(c, buf, len, dst->send_flags);
+	/* no deref needed (automatically done inside tcpconn_send_put() */
+	return n;
+#ifdef TCP_CONNECT_WAIT
+conn_wait_success:
+#ifdef TCP_FD_CACHE
+	if (cfg_get(tcp, tcp_cfg, fd_cache)) {
+		tcp_fd_cache_add(c, fd);
+	} else
+#endif /* TCP_FD_CACHE */
+		close(fd);
+	tcpconn_chld_put(c); /* release c (dec refcnt & free on 0) */
+	return n;
+conn_wait_error:
+	n=-1;
+conn_wait_close:
+	/* connect or send failed or immediate close-after-send was requested on
+	 * newly created connection which was not yet sent to tcp_main (but was
+	 * already hashed) => don't send to main, unhash and destroy directly
+	 * (if refcnt>2 it will be destroyed when the last sender releases the
+	 * connection (tcpconn_chld_put(c))) or when tcp_main receives a
+	 * CONN_ERROR it*/
+	c->state=S_CONN_BAD;
+	/* we are here only if we opened a new fd (and not reused a cached or
+	   a reader one) => if the connect was successful close the fd */
+	if (fd>=0) close(fd);
+	TCPCONN_LOCK;
+		if (c->flags & F_CONN_HASHED){
+			/* if some other parallel tcp_send did send CONN_ERROR to
+			 * tcp_main, the connection might be already detached */
+			_tcpconn_detach(c);
+			c->flags&=~F_CONN_HASHED;
+			TCPCONN_UNLOCK;
+			tcpconn_put(c);
+		}else
+			TCPCONN_UNLOCK;
+	/* dec refcnt -> mark it for destruction */
+	tcpconn_chld_put(c);
+	return n;
+#endif /* TCP_CONNET_WAIT */
+release_c:
+	tcpconn_chld_put(c); /* release c (dec refcnt & free on 0) */
+end_no_deref:
+end_no_conn:
+	return n;
+}
+
+
+
+/** sends on an existing tcpconn.
+ * As opposed to tcp_send(), this function requires an existing
+ * tcp connection.
+ * WARNING: the tcp_connection will be de-referenced.
+ * @param c - existing tcp connection pointer.
+ * @param buf - data to be sent.
+ * @param len - data length,
+ * @return >=0 on success, -1 on error.
+ */
+static int tcpconn_send_put(struct tcp_connection* c, char* buf, unsigned len,
+							snd_flags_t send_flags)
+{
+	struct tcp_connection *tmp;
+	int fd;
+	long response[2];
+	int n;
+	int do_close_fd;
+#ifdef TCP_FD_CACHE
+	struct fd_cache_entry* fd_cache_e;
+	int use_fd_cache;
+	
+	use_fd_cache=cfg_get(tcp, tcp_cfg, fd_cache);
+	fd_cache_e=0;
+#endif /* TCP_FD_CACHE */
+	do_close_fd=1; /* close the fd on exit */
+	response[1] = CONN_NOP;
 #ifdef TCP_ASYNC
-		/* if data is already queued, we don't need the fd any more */
+	/* if data is already queued, we don't need the fd */
 #ifdef TCP_CONNECT_WAIT
 		if (unlikely(cfg_get(tcp, tcp_cfg, async) &&
 						(_wbufq_non_empty(c) || (c->flags&F_CONN_PENDING)) ))
@@ -2000,6 +2113,9 @@ int tcp_send(struct dest_info* dst, union sockaddr_union* from,
 					if (unlikely(_wbufq_add(c, buf, len)<0)){
 						lock_release(&c->write_lock);
 						n=-1;
+						response[1] = CONN_ERROR;
+						c->state=S_CONN_BAD;
+						c->timeout=get_ticks_raw(); /* force timeout */
 						goto error;
 					}
 					n=len;
@@ -2057,17 +2173,105 @@ int tcp_send(struct dest_info* dst, union sockaddr_union* from,
 						  tmp, n
 				   );
 				n=-1; /* fail */
+				/* don't cache fd & close it */
+				do_close_fd = 1;
+#ifdef TCP_FD_CACHE
+				use_fd_cache = 0;
+#endif /* TCP_FD_CACHE */
 				goto end;
 			}
 			DBG("tcp_send: after receive_fd: c= %p n=%d fd=%d\n",c, n, fd);
 		}
 	
-	
-send_it:
+	n = tcpconn_do_send(fd, c, buf, len, send_flags, &response[1]);
+	if (unlikely(response[1] != CONN_NOP)) {
+error:
+		response[0]=(long)c;
+		if (send_all(unix_tcp_sock, response, sizeof(response)) <= 0) {
+			BUG("tcp_main command %ld sending failed (write):%s (%d)\n",
+					response[1], strerror(errno), errno);
+			/* all commands != CONN_NOP returned by tcpconn_do_send()
+			   (CONN_EOF, CONN_ERROR, CONN_QUEUED_WRITE) will auto-dec refcnt
+			   => if sending the command fails we have to dec. refcnt by hand
+			 */
+			tcpconn_chld_put(c); /* deref. it manually */
+			n=-1;
+		}
+		/* here refcnt for c is already decremented => c contents can no
+		   longer be used and refcnt _must_ _not_ be decremented again
+		   on exit */
+		if (unlikely(n < 0 || response[1] == CONN_EOF)) {
+			/* on error or eof, remove from cache or close fd */
+#ifdef TCP_FD_CACHE
+			if (unlikely(fd_cache_e)){
+				tcp_fd_cache_rm(fd_cache_e);
+				fd_cache_e = 0;
+				close(fd);
+			}else
+#endif /* TCP_FD_CACHE */
+				if (do_close_fd) close(fd);
+		} else if (response[1] == CONN_QUEUED_WRITE) {
+#ifdef TCP_FD_CACHE
+			if (unlikely((fd_cache_e==0) && use_fd_cache)){
+				tcp_fd_cache_add(c, fd);
+			}else
+#endif /* TCP_FD_CACHE */
+				if (do_close_fd) close(fd);
+		} else {
+			BUG("unexpected tcpconn_do_send() return & response: %d, %ld\n",
+					n, response[1]);
+		}
+		return n; /* no tcpconn_put */
+	}
+end:
+#ifdef TCP_FD_CACHE
+	if (unlikely((fd_cache_e==0) && use_fd_cache)){
+		tcp_fd_cache_add(c, fd);
+	}else
+#endif /* TCP_FD_CACHE */
+	if (do_close_fd) close(fd);
+	/* here we can have only commands that _do_ _not_ dec refcnt.
+	   (CONN_EOF, CON_ERROR, CON_QUEUED_WRITE are all treated above) */
+release_c:
+	tcpconn_chld_put(c); /* release c (dec refcnt & free on 0) */
+	return n;
+}
+
+
+
+/** lower level send (connection and fd should be known).
+ * It takes care of possible write-queueing, blacklisting a.s.o.
+ * It expects a valid tcp connection. It doesn't touch the ref. cnts.
+ * @param c - existing tcp connection pointer (state and flags might be
+ *            changed).
+ * @param buf - data to be sent.
+ * @param len - data length.
+ * @param send_flags
+ * @param resp - filled with a cmd. for tcp_main:
+ *                      CONN_NOP - nothing needs to be done (do not send
+ *                                 anything to tcp_main).
+ *                      CONN_ERROR - error, connection should be closed.
+ *                      CONN_EOF - no error, but connection should be closed.
+ *                      CONN_QUEUED_WRITE - new write queue (connection
+ *                                 should be watched for write and the wr.
+ *                                 queue flushed).
+ * @return >=0 on success, < 0 on error && *resp == CON_ERROR.
+ *
+ */
+static int tcpconn_do_send(int fd, struct tcp_connection* c,
+							char* buf, unsigned len,
+							snd_flags_t send_flags, long* resp)
+{
+	int  n;
+#ifdef TCP_ASYNC
+	int enable_write_watch;
+#endif /* TCP_ASYNC */
+
 	DBG("tcp_send: sending...\n");
+	*resp = CONN_NOP;
 	lock_get(&c->write_lock);
 	/* update connection send flags with the current ones */
-	tcpconn_set_send_flags(c, dst->send_flags);
+	tcpconn_set_send_flags(c, send_flags);
 #ifdef TCP_ASYNC
 	if (likely(cfg_get(tcp, tcp_cfg, async))){
 		if (_wbufq_non_empty(c)
@@ -2088,14 +2292,14 @@ send_it:
 	}else{
 #endif /* TCP_ASYNC */
 #ifdef USE_TLS
-	if (c->type==PROTO_TLS)
-		n=tls_blocking_write(c, fd, buf, len);
-	else
+		if (c->type==PROTO_TLS)
+			n=tls_blocking_write(c, fd, buf, len);
+		else
 #endif
 		/* n=tcp_blocking_write(c, fd, buf, len); */
-		n=tsend_stream(fd, buf, len,
-						TICKS_TO_S(cfg_get(tcp, tcp_cfg, send_timeout)) *
-						1000);
+			n=tsend_stream(fd, buf, len,
+							TICKS_TO_S(cfg_get(tcp, tcp_cfg, send_timeout)) *
+							1000);
 #ifdef TCP_ASYNC
 	}
 #else /* ! TCP_ASYNC */
@@ -2122,16 +2326,8 @@ send_it:
 			}
 			lock_release(&c->write_lock);
 			n=len;
-			if (likely(enable_write_watch)){
-				response[0]=(long)c;
-				response[1]=CONN_QUEUED_WRITE;
-				if (send_all(unix_tcp_sock, response, sizeof(response)) <= 0){
-					LOG(L_ERR, "BUG: tcp_send: error return failed "
-								"(write):%s (%d)\n", strerror(errno), errno);
-					n=-1;
-					goto error;
-				}
-			}
+			if (likely(enable_write_watch))
+				*resp=CONN_QUEUED_WRITE;
 			goto end;
 		}else{
 			lock_release(&c->write_lock);
@@ -2181,6 +2377,7 @@ send_it:
 					"\n", c, ip_addr2a(&c->rcv.dst_ip), c->rcv.dst_port,
 					su2a(&c->rcv.src_su, sizeof(c->rcv.src_su)),
 					strerror(errno), errno);
+		n = -1;
 #ifdef TCP_ASYNC
 error:
 #endif /* TCP_ASYNC */
@@ -2188,27 +2385,7 @@ error:
 		c->state=S_CONN_BAD;
 		c->timeout=get_ticks_raw();
 		/* tell "main" it should drop this (optional it will t/o anyway?)*/
-		response[0]=(long)c;
-		response[1]=CONN_ERROR;
-		if (send_all(unix_tcp_sock, response, sizeof(response))<=0){
-			LOG(L_CRIT, "BUG: tcp_send: error return failed (write):%s (%d)\n",
-					strerror(errno), errno);
-			tcpconn_chld_put(c); /* deref. it manually */
-			n=-1;
-		}
-		/* CONN_ERROR will auto-dec refcnt => we must not call tcpconn_put 
-		 * if it succeeds */
-#ifdef TCP_FD_CACHE
-		if (unlikely(fd_cache_e)){
-			LOG(L_ERR, "ERROR: tcp_send %s: error on cached fd, removing from"
-					" the cache (%d, %p, %d)\n", 
-					su2a(&c->rcv.src_su, sizeof(c->rcv.src_su)),
-					fd, fd_cache_e->con, fd_cache_e->id);
-			tcp_fd_cache_rm(fd_cache_e);
-			close(fd);
-		}else
-#endif /* TCP_FD_CACHE */
-		if (do_close_fd) close(fd);
+		*resp=CONN_ERROR;
 		return n; /* error return, no tcpconn_put */
 	}
 	
@@ -2220,70 +2397,16 @@ error:
 			TCP_STATS_ESTABLISHED(c->state);
 			c->state=S_CONN_OK;
 	}
-	if (unlikely(dst->send_flags.f & SND_F_CON_CLOSE)){
+	if (unlikely(send_flags.f & SND_F_CON_CLOSE)){
 		/* close after write => send EOF request to tcp_main */
 		c->state=S_CONN_BAD;
 		c->timeout=get_ticks_raw();
 		/* tell "main" it should drop this*/
-		response[0]=(long)c;
-		response[1]=CONN_EOF;
-		if (send_all(unix_tcp_sock, response, sizeof(response))<=0){
-			LOG(L_CRIT, "BUG: tcp_send: error return failed (write):%s (%d)\n",
-					strerror(errno), errno);
-			tcpconn_chld_put(c); /* deref. it manually */
-			n=-1;
-		}
-		/* CONN_EOF will auto-dec refcnt => we must not call tcpconn_put 
-		 * if it succeeds */
-#ifdef TCP_FD_CACHE
-		if (unlikely(fd_cache_e)){
-			tcp_fd_cache_rm(fd_cache_e);
-			fd_cache_e=0;
-			close(fd);
-		}else
-#endif /* TCP_FD_CACHE */
-		if (do_close_fd) close(fd);
-		goto end_no_conn;
+		*resp=CONN_EOF;
+		return n;
 	}
 end:
-#ifdef TCP_FD_CACHE
-	if (unlikely((fd_cache_e==0) && use_fd_cache)){
-		tcp_fd_cache_add(c, fd);
-	}else
-#endif /* TCP_FD_CACHE */
-	if (do_close_fd) close(fd);
-release_c:
-	tcpconn_chld_put(c); /* release c (dec refcnt & free on 0) */
-end_no_conn:
 	return n;
-#ifdef TCP_CONNECT_WAIT
-conn_wait_error:
-	n=-1;
-conn_wait_close:
-	/* connect or send failed or immediate close-after-send was requested on
-	 * newly created connection which was not yet sent to tcp_main (but was
-	 * already hashed) => don't send to main, unhash and destroy directly
-	 * (if refcnt>2 it will be destroyed when the last sender releases the
-	 * connection (tcpconn_chld_put(c))) or when tcp_main receives a
-	 * CONN_ERROR it*/
-	c->state=S_CONN_BAD;
-	/* we are here only if we opened a new fd (and not reused a cached or
-	   a reader one) => if the connect was successful close the fd */
-	if (fd>=0) close(fd);
-	TCPCONN_LOCK;
-		if (c->flags & F_CONN_HASHED){
-			/* if some other parallel tcp_send did send CONN_ERROR to
-			 * tcp_main, the connection might be already detached */
-			_tcpconn_detach(c);
-			c->flags&=~F_CONN_HASHED;
-			TCPCONN_UNLOCK;
-			tcpconn_put(c);
-		}else
-			TCPCONN_UNLOCK;
-	/* dec refcnt -> mark it for destruction */
-	tcpconn_chld_put(c);
-	return n;
-#endif /* TCP_CONNET_WAIT */
 }
 
 
@@ -2525,7 +2648,7 @@ inline static void tcpconn_destroy(struct tcp_connection* tcpconn)
  */
 inline static int tcpconn_put_destroy(struct tcp_connection* tcpconn)
 {
-	if (unlikely((tcpconn->flags & 
+	if (unlikely((tcpconn->flags &
 			(F_CONN_WRITE_W|F_CONN_HASHED|F_CONN_MAIN_TIMER|F_CONN_READ_W)) )){
 		/* sanity check */
 		if (unlikely(tcpconn->flags & F_CONN_HASHED)){
@@ -3158,10 +3281,17 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
 			/* received only if the wr. queue is empty and a write finishes
 			 * with EAGAIN (common after connect())
 			 * it should only enable write watching on the fd. The connection
-			 * should be  already in the hash. The refcnt is not changed.
+			 * should be  already in the hash. The refcnt is automatically
+			 * decremented.
 			 */
-			if (unlikely((tcpconn->state==S_CONN_BAD) || 
+			/* auto-dec refcnt */
+			if (unlikely(tcpconn_put(tcpconn))){
+				tcpconn_destroy(tcpconn);
+				break;
+			}
+			if (unlikely((tcpconn->state==S_CONN_BAD) ||
 							!(tcpconn->flags & F_CONN_HASHED) ))
+				/* in the process of being destroyed => do nothing */
 				break;
 			if (!(tcpconn->flags & F_CONN_WANTS_WR)){
 				tcpconn->flags|=F_CONN_WANTS_WR;