Quellcode durchsuchen

- tcp: support for pending connects: add a connection immediately to the
connection hash (before even attempting the connect sys call) and just queue
possible writes. When the connection completes update the connection info &
aliases and send the queued data. This avoids parallel connects when the
intial connect takes too long (highly experimental, on by default)

Andrei Pelinescu-Onciul vor 17 Jahren
Ursprung
Commit
d22b82a07e
6 geänderte Dateien mit 411 neuen und 150 gelöschten Zeilen
  1. 1 1
      Makefile.defs
  2. 2 1
      core_cmd.c
  3. 6 3
      tcp_conn.h
  4. 377 143
      tcp_main.c
  5. 14 2
      tcp_options.c
  6. 11 0
      tcp_options.h

+ 1 - 1
Makefile.defs

@@ -78,7 +78,7 @@ MAIN_NAME=ser
 VERSION = 2
 PATCHLEVEL = 1
 SUBLEVEL =  0
-EXTRAVERSION = -dev15
+EXTRAVERSION = -dev16-tcp
 
 SER_VER = $(shell expr $(VERSION) \* 1000000 + $(PATCHLEVEL) \* 1000 + \
 			$(SUBLEVEL) )

+ 2 - 1
core_cmd.c

@@ -562,9 +562,10 @@ static void core_tcp_options(rpc_t* rpc, void* c)
 	if (!tcp_disable){
 		tcp_options_get(&t);
 		rpc->add(c, "{", &handle);
-		rpc->struct_add(handle, "ddddddddddddd",
+		rpc->struct_add(handle, "dddddddddddddd",
 			"fd_cache",		t.fd_cache,
 			"tcp_buf_write",	t.tcp_buf_write,
+			"tcp_connect_wait",	t.tcp_connect_wait,
 			"tcpconn_wq_max",	t.tcpconn_wq_max,
 			"tcp_wq_max",	t.tcp_wq_max,
 			"tcp_wq_timeout",	TICKS_TO_S(t.tcp_wq_timeout),

+ 6 - 3
tcp_conn.h

@@ -72,7 +72,9 @@
 #define F_CONN_READER       4 /* handled by a tcp reader */
 #define F_CONN_WRITE_W      8 /* watched for write (main) */
 #define F_CONN_HASHED      16 /* in tcp_main hash */
-#define F_CONN_FD_CLOSED   32 /* in tcp_main hash */
+#define F_CONN_FD_CLOSED   32 /* fd was already closed */
+#define F_CONN_PENDING     64 /* pending connect  (fd not known yet in main) */
+#define F_CONN_MAIN_TIMER 128 /* timer active in the tcp_main process */
 
 
 enum tcp_req_errors {	TCP_REQ_INIT, TCP_REQ_OK, TCP_READ_ERROR,
@@ -87,12 +89,13 @@ enum tcp_req_states {	H_SKIP_EMPTY, H_SKIP, H_LF, H_LFCR,  H_BODY, H_STARTWS,
 
 enum tcp_conn_states { S_CONN_ERROR=-2, S_CONN_BAD=-1, S_CONN_OK=0, 
 						S_CONN_INIT, S_CONN_EOF, 
-						S_CONN_ACCEPT, S_CONN_CONNECT };
+						S_CONN_ACCEPT, S_CONN_CONNECT, S_CONN_PENDING };
 
 
 /* fd communication commands */
 enum conn_cmds { CONN_DESTROY=-3, CONN_ERROR=-2, CONN_EOF=-1, CONN_RELEASE, 
-					CONN_GET_FD, CONN_NEW, CONN_QUEUED_WRITE };
+					CONN_GET_FD, CONN_NEW, CONN_QUEUED_WRITE,
+					CONN_NEW_PENDING_WRITE, CONN_NEW_COMPLETE };
 /* CONN_RELEASE, EOF, ERROR, DESTROY can be used by "reader" processes
  * CONN_GET_FD, NEW, ERROR only by writers */
 

+ 377 - 143
tcp_main.c

@@ -91,6 +91,8 @@
  *  2007-12-12  destroy connection asap on wbuf. timeout (andrei)
  *  2007-12-13  changed the refcnt and destroy scheme, now refcnt is 1 if
  *                linked into the hash tables (was 0) (andrei)
+ *  2007-12-21  support for pending connects (connections are added to the
+ *               hash immediately and writes on them are buffered) (andrei)
  */
 
 
@@ -864,61 +866,60 @@ error:
 
 
 
-struct tcp_connection* tcpconn_connect( union sockaddr_union* server, 
-										union sockaddr_union* from,
-										int type)
+/* do the actual connect, set sock. options a.s.o
+ * returns socket on success, -1 on error
+ * sets also *res_local_addr, res_si and state (S_CONN_CONNECT for an
+ * unfinished connect and S_CONN_OK for a finished one)*/
+inline static int tcp_do_connect(	union sockaddr_union* server,
+									union sockaddr_union* from,
+									int type,
+									union sockaddr_union* res_local_addr,
+									struct socket_info** res_si,
+									enum tcp_conn_states *state
+									)
 {
 	int s;
-	struct socket_info* si;
 	union sockaddr_union my_name;
 	socklen_t my_name_len;
-	struct tcp_connection* con;
 	struct ip_addr ip;
-	enum tcp_conn_states state;
 #ifdef TCP_BUF_WRITE
 	int n;
 #endif /* TCP_BUF_WRITE */
 
-	s=-1;
-	
-	if (*tcp_connections_no >= tcp_max_connections){
-		LOG(L_ERR, "ERROR: tcpconn_connect: maximum number of connections"
-					" exceeded (%d/%d)\n",
-					*tcp_connections_no, tcp_max_connections);
-		goto error;
-	}
 	s=socket(AF2PF(server->s.sa_family), SOCK_STREAM, 0);
-	if (s==-1){
-		LOG(L_ERR, "ERROR: tcpconn_connect: socket: (%d) %s\n",
+	if (unlikely(s==-1)){
+		LOG(L_ERR, "ERROR: tcp_do_connect: socket: (%d) %s\n",
 				errno, strerror(errno));
 		goto error;
 	}
 	if (init_sock_opt(s)<0){
-		LOG(L_ERR, "ERROR: tcpconn_connect: init_sock_opt failed\n");
+		LOG(L_ERR, "ERROR: tcp_do_connect: init_sock_opt failed\n");
 		goto error;
 	}
 	
-	if (from && bind(s, &from->s, sockaddru_len(*from)) != 0)
-		LOG(L_WARN, "WARNING: tcpconn_connect: binding to source address"
+	if (unlikely(from && bind(s, &from->s, sockaddru_len(*from)) != 0)){
+		LOG(L_WARN, "WARNING: tcp_do_connect: binding to source address"
 					" failed: %s [%d]\n", strerror(errno), errno);
-	state=S_CONN_OK;
+	}
+	*state=S_CONN_OK;
 #ifdef TCP_BUF_WRITE
 	if (likely(tcp_options.tcp_buf_write)){
 again:
 		n=connect(s, &server->s, sockaddru_len(*server));
 		if (unlikely(n==-1)){
 			if (errno==EINTR) goto again;
-			if (errno!=EINPROGRESS && errno!=EALREADY){
-				LOG(L_ERR, "ERROR: tcpconn_connect: connect: (%d) %s\n",
+			if (likely(errno==EINPROGRESS))
+				*state=S_CONN_CONNECT;
+			else if (errno!=EALREADY){
+				LOG(L_ERR, "ERROR: tcp_do_connect: connect: (%d) %s\n",
 						errno, strerror(errno));
 				goto error;
 			}
-			state=S_CONN_CONNECT;
 		}
 	}else{
 #endif /* TCP_BUF_WRITE */
 		if (tcp_blocking_connect(s, &server->s, sockaddru_len(*server))<0){
-			LOG(L_ERR, "ERROR: tcpconn_connect: tcp_blocking_connect"
+			LOG(L_ERR, "ERROR: tcp_do_connect: tcp_blocking_connect"
 						" failed\n");
 			goto error;
 		}
@@ -932,31 +933,64 @@ again:
 			goto find_socket;
 	}
 	my_name_len=sizeof(my_name);
-	if (getsockname(s, &my_name.s, &my_name_len)!=0){
-		LOG(L_ERR, "ERROR: tcp_connect: getsockname failed: %s(%d)\n",
+	if (unlikely(getsockname(s, &my_name.s, &my_name_len)!=0)){
+		LOG(L_ERR, "ERROR: tcp_do_connect: getsockname failed: %s(%d)\n",
 				strerror(errno), errno);
-		si=0; /* try to go on */
-		goto skip;
+		*res_si=0;
+		goto error;
 	}
 	from=&my_name; /* update from with the real "from" address */
 	su2ip_addr(&ip, &my_name);
 find_socket:
 #ifdef USE_TLS
-	if (type==PROTO_TLS)
-		si=find_si(&ip, 0, PROTO_TLS);
+	if (unlikely(type==PROTO_TLS))
+		*res_si=find_si(&ip, 0, PROTO_TLS);
 	else
 #endif
-		si=find_si(&ip, 0, PROTO_TCP);
-skip:
-	if (si==0){
-		LOG(L_WARN, "WARNING: tcp_connect: could not find corresponding"
+		*res_si=find_si(&ip, 0, PROTO_TCP);
+	
+	if (unlikely(*res_si==0)){
+		LOG(L_WARN, "WARNING: tcp_do_connect: could not find corresponding"
 				" listening socket, using default...\n");
-		if (server->s.sa_family==AF_INET) si=sendipv4_tcp;
+		if (server->s.sa_family==AF_INET) *res_si=sendipv4_tcp;
 #ifdef USE_IPV6
-		else si=sendipv6_tcp;
+		else *res_si=sendipv6_tcp;
 #endif
 	}
-	con=tcpconn_new(s, server, from, si,  type, state);
+	*res_local_addr=*from;
+	return s;
+error:
+	if (s!=-1) close(s);
+	return -1;
+}
+
+
+
+struct tcp_connection* tcpconn_connect( union sockaddr_union* server, 
+										union sockaddr_union* from,
+										int type)
+{
+	int s;
+	struct socket_info* si;
+	union sockaddr_union my_name;
+	struct tcp_connection* con;
+	enum tcp_conn_states state;
+
+	s=-1;
+	
+	if (*tcp_connections_no >= tcp_max_connections){
+		LOG(L_ERR, "ERROR: tcpconn_connect: maximum number of connections"
+					" exceeded (%d/%d)\n",
+					*tcp_connections_no, tcp_max_connections);
+		goto error;
+	}
+	s=tcp_do_connect(server, from, type, &my_name, &si, &state);
+	if (s==-1){
+		LOG(L_ERR, "ERROR: tcp_do_connect: failed (%d) %s\n",
+				errno, strerror(errno));
+		goto error;
+	}
+	con=tcpconn_new(s, server, &my_name, si, type, state);
 	if (con==0){
 		LOG(L_ERR, "ERROR: tcp_connect: tcpconn_new failed, closing the "
 				 " socket\n");
@@ -971,6 +1005,59 @@ error:
 
 
 
+#ifdef TCP_CONNECT_WAIT
+int tcpconn_finish_connect( struct tcp_connection* c,
+												union sockaddr_union* from)
+{
+	int s;
+	int r;
+	union sockaddr_union local_addr;
+	struct socket_info* si;
+	enum tcp_conn_states state;
+	struct tcp_conn_alias* a;
+	
+	s=tcp_do_connect(&c->rcv.src_su, from, c->type, &local_addr, &si, &state);
+	if (unlikely(s==-1)){
+		LOG(L_ERR, "ERROR: tcpconn_finish_connect: tcp_do_connect for %p"
+						" failed\n", c);
+		return -1;
+	}
+	c->rcv.bind_address=si;
+	su2ip_addr(&c->rcv.dst_ip, &local_addr);
+	c->rcv.dst_port=su_getport(&local_addr);
+	/* update aliases if needed */
+	if (likely(from==0)){
+		/* add aliases */
+		TCPCONN_LOCK;
+		_tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip, 0,
+													tcp_new_conn_alias_flags);
+		_tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip,
+									c->rcv.dst_port, tcp_new_conn_alias_flags);
+		TCPCONN_UNLOCK;
+	}else if (su_cmp(from, &local_addr)!=1){
+		TCPCONN_LOCK;
+			/* remove all the aliases except the first one and re-add them
+			 * (there shouldn't be more then the 3 default aliases at this 
+			 * stage) */
+			for (r=1; r<c->aliases; r++){
+				a=&c->con_aliases[r];
+				tcpconn_listrm(tcpconn_aliases_hash[a->hash], a, next, prev);
+			}
+			c->aliases=1;
+			/* add the local_ip:0 and local_ip:local_port aliases */
+			_tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip,
+												0, tcp_new_conn_alias_flags);
+			_tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip,
+									c->rcv.dst_port, tcp_new_conn_alias_flags);
+		TCPCONN_UNLOCK;
+	}
+	
+	return s;
+}
+#endif /* TCP_CONNECT_WAIT */
+
+
+
 /* adds a tcp connection to the tcpconn hashes
  * Note: it's called _only_ from the tcp_main process */
 inline static struct tcp_connection*  tcpconn_add(struct tcp_connection *c)
@@ -994,10 +1081,12 @@ inline static struct tcp_connection*  tcpconn_add(struct tcp_connection *c)
 		 *   -- for finding if a fully specified connection exists */
 		_tcpconn_add_alias_unsafe(c, c->rcv.src_port, &zero_ip, 0,
 													tcp_new_conn_alias_flags);
-		_tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip, 0,
+		if (likely(c->rcv.dst_ip.af && ! ip_addr_any(&c->rcv.dst_ip))){
+			_tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip, 0,
 													tcp_new_conn_alias_flags);
-		_tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip,
+			_tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip,
 									c->rcv.dst_port, tcp_new_conn_alias_flags);
+		}
 		/* ignore add_alias errors, there are some valid cases when one
 		 *  of the add_alias would fail (e.g. first add_alias for 2 connections
 		 *   with the same destination but different src. ip*/
@@ -1407,6 +1496,98 @@ no_id:
 						break;
 				}
 			}
+#if defined(TCP_CONNECT_WAIT) && defined(TCP_BUF_WRITE)
+			if (likely(tcp_options.tcp_connect_wait && 
+						tcp_options.tcp_buf_write )){
+				/* FIXME: flag for timer? + check again */
+				if (unlikely(*tcp_connections_no >= tcp_max_connections)){
+					LOG(L_ERR, "ERROR: tcp_send: maximum number of connections"
+								" exceeded (%d/%d)\n",
+								*tcp_connections_no, tcp_max_connections);
+					return -1;
+				}
+				c=tcpconn_new(-1, &dst->to, from, 0, dst->proto,
+								S_CONN_PENDING);
+				if (unlikely(c==0)){
+					LOG(L_ERR, "ERROR: tcp_send: could not create new"
+							" connection\n");
+					return -1;
+				}
+				c->flags|=F_CONN_PENDING|F_CONN_FD_CLOSED;
+				atomic_set(&c->refcnt, 2); /* ref from here and from main hash
+											 table */
+				/* add it to id hash and aliases */
+				if (unlikely(tcpconn_add(c)==0)){
+					LOG(L_ERR, "ERROR: tcp_send: could not add "
+									"connection %p\n", c);
+					_tcpconn_free(c);
+					n=-1;
+					goto end_no_conn;
+				}
+				/* do connect and if src ip or port changed, update the 
+				 * aliases */
+				if (unlikely((fd=tcpconn_finish_connect(c, from))<0)){
+					LOG(L_ERR, "ERROR: tcp_send: tcpconn_finsish_connect(%p)"
+							" failed\n", c);
+					goto conn_wait_error;
+				}
+				/* ? TODO: it might be faster just to queue the write directly
+				 *  and send to main CONN_NEW_PENDING_WRITE */
+				/* delay sending the fd to main after the send */
+				
+				/* NOTE: no lock here, because the connection is marked as
+				 * pending and nobody else will try to write on it. However
+				 * this might produce out-of-order writes. If this is not
+				 * desired either lock before the write or use 
+				 * _wbufq_insert(...) */
+				n=_tcpconn_write_nb(fd, c, buf, len);
+				if (unlikely(n<0)){
+					if (errno==EAGAIN|| errno==EWOULDBLOCK){
+						DBG("tcp_send: pending write on new connection (%p)\n",
+								c);
+						/* add to the write queue */
+						lock_get(&c->write_lock);
+							if (unlikely(_wbufq_add(c, buf, len)<0)){
+								lock_release(&c->write_lock);
+								n=-1;
+								LOG(L_ERR, "ERROR: tcp_send: EAGAIN and"
+										" write queue full or failed for %p\n",
+										c);
+								goto conn_wait_error;
+							}
+						lock_release(&c->write_lock);
+						/* send to tcp_main */
+						response[0]=(long)c;
+						response[1]=CONN_NEW_PENDING_WRITE;
+						if (unlikely(send_fd(unix_tcp_sock, response, 
+												sizeof(response), fd) <= 0)){
+							LOG(L_ERR, "BUG: tcp_send: CONN_NEW_PENDING_WRITE"
+										" for %p failed:" " %s (%d)\n",
+										c, strerror(errno), errno);
+							goto conn_wait_error;
+						}
+						goto end;
+					}
+					/* error: destroy it directly */
+					LOG(L_ERR, "ERROR: tcp_send: connect & send "
+										" for %p failed:" " %s (%d)\n",
+										c, strerror(errno), errno);
+					goto conn_wait_error;
+				}
+				LOG(L_INFO, "tcp_send: quick connect for %p\n", c);
+				/* send to tcp_main */
+				response[0]=(long)c;
+				response[1]=CONN_NEW_COMPLETE;
+				if (unlikely(send_fd(unix_tcp_sock, response, 
+										sizeof(response), fd) <= 0)){
+					LOG(L_ERR, "BUG: tcp_send: CONN_NEW_COMPLETE  for %p"
+								" failed:" " %s (%d)\n",
+								c, strerror(errno), errno);
+					goto conn_wait_error;
+				}
+				goto end;
+			}
+#endif /* TCP_CONNECT_WAIT  && TCP_BUF_WRITE */
 			if (unlikely((c=tcpconn_connect(&dst->to, from, dst->proto))==0)){
 				LOG(L_ERR, "ERROR: tcp_send: connect failed\n");
 				return -1;
@@ -1414,6 +1595,9 @@ no_id:
 			atomic_set(&c->refcnt, 2); /* ref. from here and it will also
 			                              be added in the tcp_main hash */
 			fd=c->s;
+			c->flags|=F_CONN_FD_CLOSED; /* not yet opened in main */
+			/* ? TODO: it might be faster just to queue the write and
+			 * send to main a CONN_NEW_PENDING_WRITE */
 			
 			/* send the new tcpconn to "tcp main" */
 			response[0]=(long)c;
@@ -1432,9 +1616,18 @@ no_id:
 get_fd:
 #ifdef TCP_BUF_WRITE
 		/* if data is already queued, we don't need the fd any more */
-		if (unlikely(tcp_options.tcp_buf_write && _wbufq_non_empty(c))){
+		if (unlikely(tcp_options.tcp_buf_write && (_wbufq_non_empty(c)
+#ifdef TCP_CONNECT_WAIT
+												|| (c->state==S_CONN_PENDING)
+#endif /* TCP_CONNECT_WAIT */
+						) )){
 			lock_get(&c->write_lock);
-				if (likely(_wbufq_non_empty(c))){
+				if (likely(_wbufq_non_empty(c)
+#ifdef TCP_CONNECT_WAIT
+							|| (c->state==S_CONN_PENDING)
+#endif /* TCP_CONNECT_WAIT */
+
+							)){
 					do_close_fd=0;
 					if (unlikely(_wbufq_add(c, buf, len)<0)){
 						lock_release(&c->write_lock);
@@ -1503,7 +1696,11 @@ send_it:
 	lock_get(&c->write_lock);
 #ifdef TCP_BUF_WRITE
 	if (likely(tcp_options.tcp_buf_write)){
-		if (_wbufq_non_empty(c)){
+		if (_wbufq_non_empty(c)
+#ifdef TCP_CONNECT_WAIT
+			|| (c->state==S_CONN_PENDING) 
+#endif /* TCP_CONNECT_WAIT */
+			){
 			if (unlikely(_wbufq_add(c, buf, len)<0)){
 				lock_release(&c->write_lock);
 				n=-1;
@@ -1528,7 +1725,8 @@ send_it:
 #else /* ! TCP_BUF_WRITE */
 	lock_release(&c->write_lock);
 #endif /* TCP_BUF_WRITE */
-	DBG("tcp_send: after write: c= %p n=%d fd=%d\n",c, n, fd);
+	
+	DBG("tcp_send: after real write: c= %p n=%d fd=%d\n",c, n, fd);
 	DBG("tcp_send: buf=\n%.*s\n", (int)len, buf);
 	if (unlikely(n<0)){
 #ifdef TCP_BUF_WRITE
@@ -1542,12 +1740,12 @@ send_it:
 			}
 			lock_release(&c->write_lock);
 			n=len;
-			if (enable_write_watch){
+			if (likely(enable_write_watch)){
 				response[0]=(long)c;
 				response[1]=CONN_QUEUED_WRITE;
-				if (send_all(unix_tcp_sock, response, sizeof(response))<=0){
+				if (send_all(unix_tcp_sock, response, sizeof(response)) <= 0){
 					LOG(L_ERR, "BUG: tcp_send: error return failed "
-							"(write):%s (%d)\n", strerror(errno), errno);
+								"(write):%s (%d)\n", strerror(errno), errno);
 					n=-1;
 					goto error;
 				}
@@ -1556,9 +1754,12 @@ send_it:
 		}else{
 			lock_release(&c->write_lock);
 		}
+#endif /* TCP_BUF_WRITE */
+		LOG(L_ERR, "ERROR: tcp_send: failed to send on %p: %s (%d)\n",
+					c, strerror(errno), errno);
+#ifdef TCP_BUF_WRITE
 error:
 #endif /* TCP_BUF_WRITE */
-		LOG(L_ERR, "ERROR: tcp_send: failed to send\n");
 		/* error on the connection , mark it as bad and set 0 timeout */
 		c->state=S_CONN_BAD;
 		c->timeout=get_ticks_raw();
@@ -1566,9 +1767,9 @@ error:
 		response[0]=(long)c;
 		response[1]=CONN_ERROR;
 		if (send_all(unix_tcp_sock, response, sizeof(response))<=0){
-			LOG(L_ERR, "BUG: tcp_send: error return failed (write):%s (%d)\n",
+			LOG(L_CRIT, "BUG: tcp_send: error return failed (write):%s (%d)\n",
 					strerror(errno), errno);
-			tcpconn_chld_put(c); /* deref. it manually & free on 0 */
+			tcpconn_chld_put(c); /* deref. it manually */
 			n=-1;
 		}
 		/* CONN_ERROR will auto-dec refcnt => we must not call tcpconn_put 
@@ -1604,6 +1805,30 @@ release_c:
 	tcpconn_chld_put(c); /* release c (dec refcnt & free on 0) */
 end_no_conn:
 	return n;
+#ifdef TCP_CONNECT_WAIT
+conn_wait_error:
+	/* connect or send failed on newly created connection which was not
+	 * yet sent to tcp_main (but was already hashed) => don't send to main,
+	 * unhash and destroy directly (if refcnt>2 it will be destroyed when the 
+	 * last sender releases the connection (tcpconn_chld_put(c))) or when
+	 * tcp_main receives a CONN_ERROR it*/
+	c->state=S_CONN_BAD;
+	TCPCONN_LOCK;
+		/* FIXME: race: what if CONN_ERROR is sent by another tcp_send() to
+		 * tcp_main ? */
+		if (c->flags & F_CONN_HASHED){
+			/* if some other parallel tcp_send did send CONN_ERROR to
+			 * tcp_main, the connection might be already detached */
+			_tcpconn_detach(c);
+			c->flags&=~F_CONN_HASHED;
+			TCPCONN_UNLOCK;
+			tcpconn_put(c);
+		}else
+			TCPCONN_UNLOCK;
+	/* dec refcnt -> mark it for destruction */
+	tcpconn_chld_put(c);
+	return -1;
+#endif /* TCP_CONNET_WAIT */
 }
 
 
@@ -1752,77 +1977,6 @@ error:
 
 
 
-#if 0
-/* used internally by tcp_main_loop()
- * tries to destroy a tcp connection (if it cannot it will force a timeout)
- * Note: it's called _only_ from the tcp_main process */
-static void tcpconn_destroy(struct tcp_connection* tcpconn)
-{
-	int fd;
-	ticks_t t;
-
-	/* always try to remove the timer to protect against tcpconn_destroy
-	 *  being called several times for the same connection 
-	 *  (if the timer is already removed, nothing happens) */
-	if (likely(!(tcpconn->flags & F_CONN_READER)))
-		local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
-#ifdef TCP_BUF_WRITE
-	if (unlikely((tcpconn->flags & F_CONN_WRITE_W) ||
-				!(tcpconn->flags & F_CONN_REMOVED))){
-		LOG(L_CRIT, "tcpconn_destroy: possible BUG: flags = %0x\n",
-					tcpconn->flags);
-	}
-	if (unlikely(_wbufq_non_empty(tcpconn))){
-		lock_get(&tcpconn->write_lock);
-			/* check again, while holding the lock */
-			if (likely(_wbufq_non_empty(tcpconn)))
-				_wbufq_destroy(&tcpconn->wbuf_q);
-		lock_release(&tcpconn->write_lock);
-	}
-#endif /* TCP_BUF_WRITE */
-	TCPCONN_LOCK; /*avoid races w/ tcp_send*/
-	if (likely(atomic_dec_and_test(&tcpconn->refcnt))){ 
-		_tcpconn_detach(tcpconn);
-		TCPCONN_UNLOCK;
-		DBG("tcpconn_destroy: destroying connection %p (%d, %d) flags %04x\n",
-				tcpconn, tcpconn->id, tcpconn->s, tcpconn->flags);
-		fd=tcpconn->s;
-#ifdef USE_TLS
-		/*FIXME: lock ->writelock ? */
-		if (tcpconn->type==PROTO_TLS)
-			tls_close(tcpconn, fd);
-#endif
-		_tcpconn_free(tcpconn); /* destroys also the wbuf_q if still present*/
-#ifdef TCP_FD_CACHE
-		if (likely(tcp_options.fd_cache)) shutdown(fd, SHUT_RDWR);
-#endif /* TCP_FD_CACHE */
-		if (unlikely(close(fd)<0)){
-			LOG(L_ERR, "ERROR: tcpconn_destroy; close() failed: %s (%d)\n",
-					strerror(errno), errno);
-		}
-		(*tcp_connections_no)--;
-	}else{
-		TCPCONN_UNLOCK;
-		/* force timeout */
-		t=get_ticks_raw();
-		tcpconn->timeout=t+TCPCONN_WAIT_TIMEOUT;
-		tcpconn->state=S_CONN_BAD;
-		if (!(tcpconn->flags & F_CONN_READER)){
-			/* re-activate the timer only if the connection is handled
-			 * by tcp_main (and not by a tcp reader)*/
-			tcpconn->timer.f=tcpconn_main_timeout;
-			local_timer_reinit(&tcpconn->timer);
-			local_timer_add(&tcp_main_ltimer, &tcpconn->timer, 
-									TCPCONN_WAIT_TIMEOUT, t);
-		}
-		DBG("tcpconn_destroy: delaying (%p, flags %04x) ...\n",
-				tcpconn, tcpconn->flags);
-	}
-}
-#endif
-
-
-
 /* close tcp_main's fd from a tcpconn
  * WARNING: call only in tcp_main context */
 inline static void tcpconn_close_main_fd(struct tcp_connection* tcpconn)
@@ -1857,7 +2011,7 @@ close_again:
 inline static int tcpconn_chld_put(struct tcp_connection* tcpconn)
 {
 	if (unlikely(atomic_dec_and_test(&tcpconn->refcnt))){
-		DBG("tcpconn_put_chld: destroying connection %p (%d, %d) "
+		DBG("tcpconn_chld_put: destroying connection %p (%d, %d) "
 				"flags %04x\n", tcpconn, tcpconn->id,
 				tcpconn->s, tcpconn->flags);
 		/* sanity checks */
@@ -1865,7 +2019,7 @@ inline static int tcpconn_chld_put(struct tcp_connection* tcpconn)
 					 !(tcpconn->flags & F_CONN_REMOVED) || 
 					(tcpconn->flags & 
 					 		(F_CONN_HASHED| F_CONN_WRITE_W)) )){
-			LOG(L_CRIT, "BUG: tcpconn_put_chld: %p bad flags = %0x\n",
+			LOG(L_CRIT, "BUG: tcpconn_chld_put: %p bad flags = %0x\n",
 					tcpconn, tcpconn->flags);
 			abort();
 		}
@@ -1889,7 +2043,8 @@ inline static void tcpconn_destroy(struct tcp_connection* tcpconn)
 			LOG(L_CRIT, "BUG: tcpconn_destroy: called with hashed"
 						" connection (%p)\n", tcpconn);
 			/* try to continue */
-			local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
+			if (likely(tcpconn->flags & F_CONN_MAIN_TIMER))
+				local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
 			TCPCONN_LOCK;
 				_tcpconn_detach(tcpconn);
 			TCPCONN_UNLOCK;
@@ -1949,23 +2104,24 @@ inline static int tcpconn_put_destroy(struct tcp_connection* tcpconn)
  */
 inline static int tcpconn_try_unhash(struct tcp_connection* tcpconn)
 {
-	if (unlikely((tcpconn->flags & F_CONN_WRITE_W) ||
-				!(tcpconn->flags & F_CONN_REMOVED))){
-		/* sanity check */
-		LOG(L_CRIT, "BUG: tcpconn_put_destroy: %p flags = %0x\n",
-					tcpconn, tcpconn->flags);
-	}
 	if (likely(tcpconn->flags & F_CONN_HASHED)){
-		tcpconn->flags&=~F_CONN_HASHED;
 		tcpconn->state=S_CONN_BAD;
-		if (likely(!(tcpconn->flags & F_CONN_READER)))
+		if (likely(tcpconn->flags & F_CONN_MAIN_TIMER)){
 			local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
-		else
+			tcpconn->flags&=~F_CONN_MAIN_TIMER;
+		}else
 			/* in case it's still in a reader timer */
 			tcpconn->timeout=get_ticks_raw();
 		TCPCONN_LOCK;
-			_tcpconn_detach(tcpconn);
-		TCPCONN_UNLOCK;
+			if (tcpconn->flags & F_CONN_HASHED){
+				tcpconn->flags&=~F_CONN_HASHED;
+				_tcpconn_detach(tcpconn);
+				TCPCONN_UNLOCK;
+			}else{
+				/* tcp_send was faster and did unhash it itself */
+				TCPCONN_UNLOCK;
+				return 0;
+			}
 #ifdef TCP_BUF_WRITE
 		/* empty possible write buffers (optional) */
 		if (unlikely(_wbufq_non_empty(tcpconn))){
@@ -2279,6 +2435,7 @@ inline static int handle_tcp_child(struct tcp_child* tcp_c, int fd_i)
 			local_timer_reinit(&tcpconn->timer);
 			local_timer_add(&tcp_main_ltimer, &tcpconn->timer, crt_timeout, t);
 			/* must be after the de-ref*/
+			tcpconn->flags|=F_CONN_MAIN_TIMER;
 			tcpconn->flags&=~(F_CONN_REMOVED|F_CONN_READER);
 #ifdef TCP_BUF_WRITE
 			if (unlikely(tcpconn->flags & F_CONN_WRITE_W))
@@ -2416,17 +2573,26 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
 	}
 	switch(cmd){
 		case CONN_ERROR:
+			LOG(L_ERR, "handle_ser_child: ERROR: received CON_ERROR for %p"
+					" (id %d), refcnt %d\n", 
+					tcpconn, tcpconn->id, atomic_get(&tcpconn->refcnt));
+#ifdef TCP_CONNECT_WAIT
+			/* if the connection is pending => it might be on the way of
+			 * reaching tcp_main (e.g. CONN_NEW_COMPLETE or 
+			 *  CONN_NEW_PENDING_WRITE) =>  it cannot be destroyed here */
+			if ( !(tcpconn->flags & F_CONN_PENDING) && 
+					tcpconn_try_unhash(tcpconn) )
+				tcpconn_put(tcpconn);
+#else /* ! TCP_CONNECT_WAIT */
+			if ( tcpconn_try_unhash(tcpconn) )
+				tcpconn_put(tcpconn);
+#endif /* TCP_CONNECT_WAIT */
 			if ( (!(tcpconn->flags & F_CONN_REMOVED) ||
 					(tcpconn->flags & F_CONN_WRITE_W) ) && (tcpconn->s!=-1)){
 				io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
 				tcpconn->flags|=F_CONN_REMOVED;
 				tcpconn->flags&=~F_CONN_WRITE_W;
 			}
-			LOG(L_ERR, "handle_ser_child: ERROR: received CON_ERROR for %p"
-					" (id %d), refcnt %d\n", 
-					tcpconn, tcpconn->id, atomic_get(&tcpconn->refcnt));
-			if (tcpconn_try_unhash(tcpconn))
-				tcpconn_put(tcpconn);
 			tcpconn_put_destroy(tcpconn); /* dec refcnt & destroy on 0 */
 			break;
 		case CONN_GET_FD:
@@ -2460,7 +2626,8 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
 			 * no need for reinit */
 			local_timer_add(&tcp_main_ltimer, &tcpconn->timer, 
 								tcp_con_lifetime, t);
-			tcpconn->flags&=~F_CONN_REMOVED;
+			tcpconn->flags|=F_CONN_MAIN_TIMER;
+			tcpconn->flags&=~(F_CONN_REMOVED|F_CONN_FD_CLOSED);
 			flags=POLLIN 
 #ifdef TCP_BUF_WRITE
 					/* not used for now, the connection is sent to tcp_main
@@ -2482,6 +2649,11 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
 			break;
 #ifdef TCP_BUF_WRITE
 		case CONN_QUEUED_WRITE:
+			/* received only if the wr. queue is empty and a write finishes
+			 * with EAGAIN (common after connect())
+			 * it should only enable write watching on the fd. The connection
+			 * should be  already in the hash. The refcnt is not changed.
+			 */
 			if (unlikely((tcpconn->state==S_CONN_BAD) || 
 							!(tcpconn->flags & F_CONN_HASHED) ))
 				break;
@@ -2509,7 +2681,7 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
 				}
 				tcpconn->flags|=F_CONN_WRITE_W;
 				t=get_ticks_raw();
-				if (likely(!(tcpconn->flags & F_CONN_READER) && 
+				if (likely((tcpconn->flags & F_CONN_MAIN_TIMER) && 
 					(TICKS_LT(tcpconn->wbuf_q.wr_timeout, tcpconn->timeout)) &&
 						TICKS_LT(t, tcpconn->wbuf_q.wr_timeout) )){
 					/* _wbufq_nonempty() is guaranteed here */
@@ -2527,6 +2699,65 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
 							" already watched for write\n", tcpconn);
 			}
 			break;
+#ifdef TCP_CONNECT_WAIT
+		case CONN_NEW_COMPLETE:
+		case CONN_NEW_PENDING_WRITE:
+				/* received when a pending connect completes in the same
+				 * tcp_send() that initiated it
+				 * the connection is already in the hash with S_CONN_PENDING
+				 * state (added by tcp_send()) and refcnt at least 1 (for the
+				 *  hash)*/
+			tcpconn->flags&=~(F_CONN_PENDING|F_CONN_FD_CLOSED|F_CONN_REMOVED);
+			if (unlikely((tcpconn->state==S_CONN_BAD) || (fd==-1))){
+				if (unlikely(fd==-1))
+					LOG(L_CRIT, "BUG: handle_ser_child: CONN_NEW_COMPLETE:"
+								" no fd received\n");
+				else
+					LOG(L_WARN, "WARNING: handle_ser_child: CONN_NEW_COMPLETE:"
+							" received connection with error\n");
+				tcpconn->flags|=F_CONN_FD_CLOSED;
+				tcpconn->state=S_CONN_BAD;
+				tcpconn_try_unhash(tcpconn);
+				tcpconn_put_destroy(tcpconn);
+				break;
+			}
+			tcpconn->state=S_CONN_OK;
+			(*tcp_connections_no)++;
+			tcpconn->s=fd;
+			/* update the timeout*/
+			t=get_ticks_raw();
+			tcpconn->timeout=t+tcp_con_lifetime;
+			/* activate the timer (already properly init. in tcpconn_new())
+			 * no need for reinit */
+			local_timer_add(&tcp_main_ltimer, &tcpconn->timer, 
+								tcp_con_lifetime, t);
+			tcpconn->flags|=F_CONN_MAIN_TIMER;
+			if (unlikely(cmd==CONN_NEW_COMPLETE)){
+				/* check if needs to be watched for write */
+				lock_get(&tcpconn->write_lock);
+					/* if queue non empty watch it for write */
+					flags=(_wbufq_empty(tcpconn)-1)&POLLOUT;
+				lock_release(&tcpconn->write_lock);
+				tcpconn->flags|=(!(flags&POLLOUT)-1)&F_CONN_WRITE_W;
+			}else{
+				/* CONN_NEW_PENDING_WRITE */
+				/* no need to check, we have something queued for write */
+				flags=POLLOUT;
+				tcpconn->flags|=F_CONN_WRITE_W;
+			}
+			flags|=POLLIN;
+			if (unlikely(
+					io_watch_add(&io_h, tcpconn->s, flags,
+												F_TCPCONN, tcpconn)<0)){
+				LOG(L_CRIT, "ERROR: tcp_main: handle_ser_child: failed to add"
+						" new socket to the fd list\n");
+				tcpconn->flags|=F_CONN_REMOVED;
+				tcpconn->flags&=~F_CONN_WRITE_W;
+				tcpconn_try_unhash(tcpconn); /*  unhash & dec refcnt */
+				tcpconn_put_destroy(tcpconn);
+			}
+			break;
+#endif /* TCP_CONNECT_WAIT */
 #endif /* TCP_BUF_WRITE */
 		default:
 			LOG(L_CRIT, "BUG: handle_ser_child: unknown cmd %d\n", cmd);
@@ -2679,6 +2910,7 @@ static inline int handle_new_connect(struct socket_info* si)
 		/* activate the timer */
 		local_timer_add(&tcp_main_ltimer, &tcpconn->timer, 
 								tcp_con_lifetime, get_ticks_raw());
+		tcpconn->flags|=F_CONN_MAIN_TIMER;
 		tcpconn->flags&=~F_CONN_REMOVED;
 		if (unlikely(io_watch_add(&io_h, tcpconn->s, POLLIN, 
 													F_TCPCONN, tcpconn)<0)){
@@ -2794,6 +3026,7 @@ inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, short ev,
 				goto error;
 		tcpconn->flags|=F_CONN_REMOVED|F_CONN_READER;
 		local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
+		tcpconn->flags&=~F_CONN_MAIN_TIMER;
 		tcpconn_ref(tcpconn); /* refcnt ++ */
 		if (unlikely(send2child(tcpconn)<0)){
 			LOG(L_ERR,"ERROR: handle_tcpconn_ev: no children available\n");
@@ -2962,9 +3195,10 @@ static inline void tcpconn_destroy_all()
 				if (is_tcp_main){
 					/* we cannot close or remove the fd if we are not in the
 					 * tcp main proc.*/
-					if (!(c->flags & F_CONN_READER))
+					if ((c->flags & F_CONN_MAIN_TIMER)){
 						local_timer_del(&tcp_main_ltimer, &c->timer);
-					/* else still in some reader */
+						c->flags&=~F_CONN_MAIN_TIMER;
+					} /* else still in some reader */
 					fd=c->s;
 					if (fd>0 && (!(c->flags & F_CONN_REMOVED)
 #ifdef TCP_BUF_WRITE

+ 14 - 2
tcp_options.c

@@ -40,7 +40,10 @@ void init_tcp_options()
 	tcp_options.tcpconn_wq_max=32*1024; /* 32 k */
 	tcp_options.tcp_wq_max=10*1024*1024; /* 10 MB */
 	tcp_options.tcp_wq_timeout=S_TO_TICKS(tcp_send_timeout);
-#endif
+#ifdef TCP_CONNECT_WAIT
+	tcp_options.tcp_connect_wait=1;
+#endif /* TCP_CONNECT_WAIT */
+#endif /* TCP_BUF_WRITE */
 #ifdef TCP_FD_CACHE
 	tcp_options.fd_cache=1;
 #endif
@@ -89,7 +92,16 @@ void tcp_options_check()
 	W_OPT_NC(tcp_wq_max);
 	W_OPT_NC(tcp_wq_timeout);
 #endif /* TCP_BUF_WRITE */
-
+#ifndef TCP_CONNECT_WAIT
+	W_OPT_NC(tcp_connect_wait);
+#endif /* TCP_CONNECT_WAIT */
+	
+	if (tcp_options.tcp_connect_wait && !tcp_options.tcp_buf_write){
+		WARN("tcp_options: tcp_connect_wait depends on tcp_buf_write, "
+				" disabling...\n");
+		tcp_options.tcp_connect_wait=0;
+	}
+	
 #if ! defined HAVE_TCP_DEFER_ACCEPT && ! defined HAVE_TCP_ACCEPT_FILTER
 	W_OPT_NS(defer_accept);
 #endif

+ 11 - 0
tcp_options.h

@@ -31,6 +31,16 @@
 #define TCP_BUF_WRITE /* enabled buffered writing */
 #endif 
 
+#if !defined(NO_TCP_CONNECT_WAIT) && defined(TCP_BUF_WRITE)
+#define TCP_CONNECT_WAIT /* enable pending connects support */
+#endif
+
+#if defined(TCP_CONNECT_WAIT) && !defined(TCP_BUF_WRITE)
+/* check for impossible configuration: TCP_CONNECT_WAIT w/o TCP_BUF_WRITE */
+#warning "disabling TCP_CONNECT_WAIT because TCP_BUF_WRITE is not defined"
+#undef TCP_CONNECT_WAIT
+#endif
+
 #ifndef NO_TCP_FD_CACHE
 #define TCP_FD_CACHE /* enable fd caching */
 #endif
@@ -102,6 +112,7 @@ struct tcp_cfg_options{
 	int fd_cache; /* on /off */
 	/* tcp buf. write options */
 	int tcp_buf_write; /* on / off */
+	int tcp_connect_wait; /* on / off, depends on tcp_buf_write */
 	unsigned int tcpconn_wq_max; /* maximum queue len per connection */
 	unsigned int tcp_wq_max; /* maximum overall queued bytes */
 	unsigned int tcp_wq_timeout;      /* timeout for queue writes */