Browse Source

Stability fix + introduction of connection probation

Joseph Henry 9 years ago
parent
commit
357cb92f2e
2 changed files with 23 additions and 36 deletions
  1. 21 35
      netcon/NetconEthernetTap.cpp
  2. 2 1
      netcon/NetconEthernetTap.hpp

+ 21 - 35
netcon/NetconEthernetTap.cpp

@@ -292,9 +292,6 @@ void NetconEthernetTap::threadMain()
 {
 	uint64_t prev_tcp_time = 0, prev_status_time = 0, prev_etharp_time = 0;
 
-	Mutex::Lock _l(_tcpconns_m);
-	_tcpconns_m.unlock();
-
 	// Main timer loop
 	while (_run) {
 		uint64_t now = OSUtils::now();
@@ -303,23 +300,17 @@ void NetconEthernetTap::threadMain()
 		uint64_t since_status = now - prev_status_time;
 		uint64_t tcp_remaining = ZT_LWIP_TCP_TIMER_INTERVAL;
 		uint64_t etharp_remaining = ARP_TMR_INTERVAL;
-		uint64_t status_remaining = STATUS_TMR_INTERVAL;
 
 		// Connection prunning
 		if (since_status >= STATUS_TMR_INTERVAL) {
 			prev_status_time = now;
-			status_remaining = STATUS_TMR_INTERVAL - since_status;
 
-			_tcpconns_m.lock();
 			for(size_t i=0;i<_TcpConnections.size();++i) {
 				if(!_TcpConnections[i]->sock)
-					continue; // Skip, this is a pending connection
+					continue;
 				int fd = _phy.getDescriptor(_TcpConnections[i]->sock);
 				dwr(MSG_DEBUG," tap_thread(): tcp\\jobs = {%d, %d}\n", _TcpConnections.size(), jobmap.size());
 
-
-				dwr(MSG_DEBUG," tap_thread():    sock=%x, pcb->state=%d\n", _TcpConnections[i]->sock, _TcpConnections[i]->pcb->state);
-
 				fcntl(fd, F_SETFL, O_NONBLOCK);
 				unsigned char tmpbuf[BUF_SZ];
 				
@@ -332,17 +323,15 @@ void NetconEthernetTap::threadMain()
 					closeConnection(_TcpConnections[i]->sock);
 				} else if (n > 0) {
 					dwr(MSG_DEBUG," tap_thread(): data read during connection check (%d bytes)\n", n);
-					phyOnUnixData(_TcpConnections[i]->sock,_phy.getuptr(_TcpConnections[i]->sock),&tmpbuf,BUF_SZ);
-				}				
+					phyOnUnixData(_TcpConnections[i]->sock,_phy.getuptr(_TcpConnections[i]->sock),&tmpbuf,n);
+				}		
 			}
-			_tcpconns_m.unlock();
 		}
 		// Main TCP/ETHARP timer section
 		if (since_tcp >= ZT_LWIP_TCP_TIMER_INTERVAL) {
 			prev_tcp_time = now;
 			lwipstack->tcp_tmr();
 			// Makeshift poll
-			_tcpconns_m.lock();
 			for(size_t i=0;i<_TcpConnections.size();++i) {
 				if(_TcpConnections[i]->txsz > 0){
 					lwipstack->_lock.lock();
@@ -350,7 +339,6 @@ void NetconEthernetTap::threadMain()
 					lwipstack->_lock.unlock();
 				}
 			}
-			_tcpconns_m.unlock();
 		} else {
 			tcp_remaining = ZT_LWIP_TCP_TIMER_INTERVAL - since_tcp;
 		}
@@ -386,8 +374,6 @@ TcpConnection *NetconEthernetTap::getConnection(PhySocket *sock)
 void NetconEthernetTap::closeConnection(PhySocket *sock)
 {
 	// Here we assume _tcpconns_m is already locked by caller
-	
-	dwr(MSG_DEBUG,"closeConnection(%x)\n",sock);
 	if(!sock) {
 		dwr(MSG_DEBUG," closeConnection(): invalid PhySocket\n");
 		return;
@@ -395,10 +381,10 @@ void NetconEthernetTap::closeConnection(PhySocket *sock)
 	TcpConnection *conn = getConnection(sock);
 	if(!conn)
 		return;
-	if(conn->pcb) {
-		dwr(MSG_DEBUG," closeConnection(): PCB->state = %d\n", conn->pcb->state);
+	if(conn->pcb && conn->pcb->state != CLOSED) {
+		dwr(MSG_DEBUG," closeConnection(%x): PCB->state = %d\n", sock, conn->pcb->state);
 		if(conn->pcb->state == SYN_SENT) {
-			dwr(MSG_DEBUG," closeConnection(): invalid PCB state for this operation. ignoring.\n");
+			dwr(MSG_DEBUG," closeConnection(%x): invalid PCB state for this operation. ignoring.\n", sock);
 			return;
 		}	
 		if(lwipstack->_tcp_close(conn->pcb) == ERR_OK) {
@@ -410,7 +396,7 @@ void NetconEthernetTap::closeConnection(PhySocket *sock)
 		    lwipstack->_tcp_poll(conn->pcb, NULL, 1);
 		}
 		else {
-			dwr(MSG_ERROR," closeConnection(): error while calling tcp_close()\n");
+			dwr(MSG_ERROR," closeConnection(%x): error while calling tcp_close()\n", sock);
 		}
 	}
 	for(size_t i=0;i<_TcpConnections.size();++i) {
@@ -446,8 +432,8 @@ void NetconEthernetTap::phyOnUnixWritable(PhySocket *sock,void **uptr)
 			memcpy(conn->rxbuf, conn->rxbuf+n, len-n);
 	  	conn->rxsz -= n;
 	  	float max = (float)DEFAULT_BUF_SZ;
-		dwr(MSG_TRANSFER,"    <--- RX :: { TX: %.3f%%  |  RX: %.3f%% }  :: %d bytes\n", 
-			(float)conn->txsz / max, (float)conn->rxsz / max, n);
+		dwr(MSG_TRANSFER,"    <--- RX :: {TX: %.3f%%, RX: %.3f%%, sock=%x} :: %d bytes\n", 
+			(float)conn->txsz / max, (float)conn->rxsz / max, sock, n);
 	  	lwipstack->_tcp_recved(conn->pcb, n);
 	  	if(conn->rxsz == 0){
 	  		_phy.setNotifyWritable(conn->sock, false); // Nothing more to be notified about
@@ -567,7 +553,6 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
 		rpcSock = sockdata.first;
 		buf = (unsigned char*)sockdata.second;
 	}
-
 	// Process RPC if we have a corresponding jobmap entry
 	if(foundJob) {
 		unloadRPC(buf, pid, tid, rpcCount, timestamp, CANARY, cmd, payload);
@@ -576,25 +561,21 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
 
 		switch(cmd) {
 			case RPC_BIND:
-				dwr(MSG_DEBUG,"  <%x> RPC_BIND\n", sock);
 			    struct bind_st bind_rpc;
 			    memcpy(&bind_rpc,  &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct bind_st));
 			    handleBind(sock, rpcSock, uptr, &bind_rpc);
 				break;
 		  	case RPC_LISTEN:
-				dwr(MSG_DEBUG,"  <%x> RPC_LISTEN\n", sock);
 			    struct listen_st listen_rpc;
 			    memcpy(&listen_rpc,  &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct listen_st));
 			    handleListen(sock, rpcSock, uptr, &listen_rpc);
 				break;
 		  	case RPC_GETSOCKNAME:
-		  		dwr(MSG_DEBUG,"  <%x> RPC_GETSOCKNAME\n", sock);
 		  		struct getsockname_st getsockname_rpc;
 		    	memcpy(&getsockname_rpc,  &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct getsockname_st));
 		  		handleGetsockname(sock, rpcSock, uptr, &getsockname_rpc);
 		  		break;
 			case RPC_CONNECT:
-				dwr(MSG_DEBUG,"  <%x> RPC_CONNECT\n", sock);
 			    struct connect_st connect_rpc;
 			    memcpy(&connect_rpc,  &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct connect_st));
 			    handleConnect(sock, rpcSock, conn, &connect_rpc);
@@ -731,9 +712,11 @@ err_t NetconEthernetTap::nc_sent(void* arg, struct tcp_pcb *PCB, u16_t len)
 {
 	Larg *l = (Larg*)arg;
 	Mutex::Lock _l(l->tap->_tcpconns_m);
-	if(l && l->conn && len) {
-		float max = (float)DEFAULT_BUF_SZ;
-		if(l->conn->txsz < max / 2) {
+	if(l->conn->probation && l->conn->txsz == 0){
+		l->conn->probation = false; // TX buffer now empty, removing from probation
+	}
+	if(l && l->conn && len && !l->conn->probation) {
+		if(l->conn->txsz < (float)DEFAULT_BUF_SOFTMAX) {
 			l->tap->_phy.setNotifyReadable(l->conn->sock, true);
 			l->tap->_phy.whack();
 		}
@@ -1033,8 +1016,11 @@ void NetconEthernetTap::handleWrite(TcpConnection *conn)
 		/* PCB send buffer is full, turn off readability notifications for the
 		corresponding PhySocket until nc_sent() is called and confirms that there is
 		now space on the buffer */
-		dwr(MSG_DEBUG," handleWrite(): sndbuf == 0, LWIP stack is full\n");
-		_phy.setNotifyReadable(conn->sock, false);
+		if(!conn->probation) {
+			dwr(MSG_DEBUG," handleWrite(): sndbuf == 0, LWIP stack is full\n");
+			_phy.setNotifyReadable(conn->sock, false);
+			conn->probation = true;
+		}
 		return;
 	}
 	if(conn->txsz <= 0)
@@ -1061,8 +1047,8 @@ void NetconEthernetTap::handleWrite(TcpConnection *conn)
 				conn->txsz -= r;
 
 				float max = (float)DEFAULT_BUF_SZ;
-				dwr(MSG_TRANSFER," TX --->    :: { TX: %.3f%%  |  RX: %.3f%% }  :: %d bytes\n", 
-					(float)conn->txsz / max, (float)conn->rxsz / max, r);
+				dwr(MSG_TRANSFER," TX --->    :: {TX: %.3f%%, RX: %.3f%%, sock=%x} :: %d bytes\n", 
+					(float)conn->txsz / max, (float)conn->rxsz / max, conn->sock, r);
 				return;
 			}
 		}

+ 2 - 1
netcon/NetconEthernetTap.hpp

@@ -60,6 +60,7 @@ struct accept_st;
 #define ZT_LWIP_TCP_TIMER_INTERVAL      5
 #define STATUS_TMR_INTERVAL             500 // How often we check connection statuses (in ms)
 #define DEFAULT_BUF_SZ                  1024 * 1024 * 2
+#define DEFAULT_BUF_SOFTMAX				DEFAULT_BUF_SZ / 2
 
 namespace ZeroTier {
 
@@ -71,7 +72,7 @@ class LWIPStack;
  */
 struct TcpConnection
 {
-  bool listening, closing;
+  bool listening, probation;
   int pid, txsz, rxsz;
   PhySocket *rpcSock, *sock;
   struct tcp_pcb *pcb;