Browse Source

Fixed RX race condition

Joseph Henry 9 years ago
parent
commit
7cb08630d0
3 changed files with 49 additions and 45 deletions
  1. 47 43
      netcon/NetconEthernetTap.cpp
  2. 1 1
      netcon/NetconEthernetTap.hpp
  3. 1 1
      netcon/common.inc.c

+ 47 - 43
netcon/NetconEthernetTap.cpp

@@ -56,8 +56,8 @@
 
 #define APPLICATION_POLL_FREQ 			2
 #define ZT_LWIP_TCP_TIMER_INTERVAL 		5
-#define STATUS_TMR_INTERVAL				250 // How often we check connection statuses (in ms)
-#define DEFAULT_READ_BUFFER_SIZE   		1024 * 1024 * 2
+#define STATUS_TMR_INTERVAL				500 // How often we check connection statuses (in ms)
+#define DEFAULT_BUFFER_SIZE   			1024 * 256
 
 
 namespace ZeroTier {
@@ -116,13 +116,13 @@ class TcpConnection
 public:
 
   bool pending, listening;
-  int pid, txidx, rxidx;
+  int pid, txsz, rxsz;
   PhySocket *rpcsock;
   PhySocket *sock;
   struct tcp_pcb *pcb;
   struct sockaddr_storage *addr;
-  unsigned char txbuf[DEFAULT_READ_BUFFER_SIZE];
-  unsigned char rxbuf[DEFAULT_READ_BUFFER_SIZE];
+  unsigned char txbuf[DEFAULT_BUFFER_SIZE];
+  unsigned char rxbuf[DEFAULT_BUFFER_SIZE];
 };
 
 /*
@@ -348,10 +348,8 @@ void NetconEthernetTap::threadMain()
 			prev_status_time = now;
 			status_remaining = STATUS_TMR_INTERVAL - since_status;
 
-
-			dwr(MSG_DEBUG_EXTRA," tap_thread(): tcp\\jobs = {%d, %d}\n", tcp_connections.size(), jobmap.size());
+			dwr(MSG_DEBUG," tap_thread(): tcp\\jobs = {%d, %d}\n", tcp_connections.size(), jobmap.size());
 			for(size_t i=0; i<tcp_connections.size(); i++) {
-
 				// No TCP connections are associated, this is a candidate for removal
 				if(!tcp_connections[i]->sock)
 					continue; // Skip, this is a pending connection
@@ -360,7 +358,6 @@ void NetconEthernetTap::threadMain()
 				fcntl(fd, F_SETFL, O_NONBLOCK);
 				unsigned char tmpbuf[BUF_SZ];
 				int n = read(fd,&tmpbuf,BUF_SZ);
-				dwr(MSG_DEBUG_EXTRA,"  tap_thread(): <%x> conn->txidx = %d\n", tcp_connections[i]->sock, tcp_connections[i]->txidx);
 				if(tcp_connections[i]->pcb->state == SYN_SENT) {
 					dwr(MSG_DEBUG_EXTRA,"  tap_thread(): <%x> state = SYN_SENT, candidate for removal\n", tcp_connections[i]->sock);
 				}
@@ -382,7 +379,7 @@ void NetconEthernetTap::threadMain()
 
 			// Makeshift poll
 			for(size_t i=0; i<tcp_connections.size(); i++) {
-				if(tcp_connections[i]->txidx > 0){
+				if(tcp_connections[i]->txsz > 0){
 					lwipstack->_lock.lock();
 					handle_write(tcp_connections[i]);
 					lwipstack->_lock.unlock();
@@ -473,7 +470,6 @@ void NetconEthernetTap::closeConnection(PhySocket *sock)
  * Signals us to close the TcpConnection associated with this PhySocket
  */
 void NetconEthernetTap::phyOnUnixClose(PhySocket *sock,void **uptr) {
-	dwr(MSG_DEBUG,"\nphyOnUnixClose(): close connection = %x\n", sock);
 	closeConnection(sock);
 }
 
@@ -511,24 +507,30 @@ void NetconEthernetTap::unload_rpc(void *data, pid_t &pid, pid_t &tid,
  */
 void NetconEthernetTap::phyOnUnixWritable(PhySocket *sock,void **uptr)
 {
+	Mutex::Lock _l(_rx_buf_m);
 	TcpConnection *conn = getConnection(sock);
-	int len = conn->rxidx;
+	float max = (float)DEFAULT_BUFFER_SIZE;
+	fprintf(stderr, " ---------------- { TX: %.3f  |  RX: %.3f } \n", (float)conn->txsz / max, (float)conn->rxsz / max);
+
+	int len = conn->rxsz;
 	int n = _phy.streamSend(conn->sock, conn->rxbuf, len);
 	if(n > 0) {
 		if(n < len) {
-		    dwr(MSG_INFO,"\n phyOnUnixWritable(): unable to write entire \"block\" to stream\n");
+		    dwr(MSG_ERROR,"\n phyOnUnixWritable(): unable to write entire \"block\" to stream\n");
 		}
-		memcpy(conn->rxbuf, conn->rxbuf+n, conn->rxidx-n);
-	  	conn->rxidx -= n;
+		memcpy(conn->rxbuf, conn->rxbuf+n, conn->rxsz-n);
+	  	conn->rxsz -= n;
+	  	//fprintf(stderr,"RX <--- %d bytes (sz = %d)\n", n, conn->rxsz);
 	  	lwipstack->_tcp_recved(conn->pcb, n);
-	  	if(conn->rxidx == 0)
+	  	if(conn->rxsz == 0){
 	  		_phy.setNotifyWritable(conn->sock, false); // Nothing more to be notified about
-		dwr(MSG_DEBUG," phyOnUnixWritable(): wrote %d bytes from RX buffer to <%x> (idx = %d)\n", n, conn->sock, conn->rxidx);
+	  	}
+		dwr(MSG_ERROR," phyOnUnixWritable(): wrote { %d / %d } bytes from RX buffer to <%x> (sz = %d)\n", n, len, conn->sock, conn->rxsz);
 	}
 	else {
 		perror("\n");
 		fprintf(stderr, "errno = %d\n", errno);
-		dwr(MSG_INFO," phyOnUnixWritable(): No data written to stream <%x>\n", conn->sock);
+		dwr(MSG_ERROR," phyOnUnixWritable(): No data written to stream <%x>\n", conn->sock);
 	}
 }
 
@@ -605,11 +607,15 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
 		}
 
 		conn = getConnection(sock);
+	
+		float max = (float)DEFAULT_BUFFER_SIZE;
+		fprintf(stderr, " ---------------- { TX: %.3f  |  RX: %.3f } \n", (float)conn->txsz / max, (float)conn->rxsz / max);
+
 		if(!conn)
 			return;
 
 		if(padding_pos == -1) { // [DATA]
-			memcpy(&conn->txbuf[conn->txidx], buf, wlen);
+			memcpy(&conn->txbuf[conn->txsz], buf, wlen);
 		}
 		else { // Padding found, implies a token is present
 			// [TOKEN]
@@ -621,30 +627,30 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
 				if(len > TOKEN_SIZE && token_pos == 0) {
 					wlen = len - TOKEN_SIZE;
 					data_start = padding_pos+CANARY_PADDING_SIZE;
-					memcpy((&conn->txbuf)+conn->txidx, buf+data_start, wlen);
+					memcpy((&conn->txbuf)+conn->txsz, buf+data_start, wlen);
 				}
 				// [DATA] + [TOKEN]
 				if(len > TOKEN_SIZE && token_pos > 0 && token_pos == len - TOKEN_SIZE) {
 					wlen = len - TOKEN_SIZE;
 					data_start = 0;
-					memcpy((&conn->txbuf)+conn->txidx, buf+data_start, wlen);												
+					memcpy((&conn->txbuf)+conn->txsz, buf+data_start, wlen);												
 				}
 				// [DATA] + [TOKEN] + [DATA]
 				if(len > TOKEN_SIZE && token_pos > 0 && len > (token_pos + TOKEN_SIZE)) {
 					wlen = len - TOKEN_SIZE;
 					data_start = 0;
 					data_end = padding_pos-CANARY_SIZE;
-					memcpy((&conn->txbuf)+conn->txidx, buf+data_start, (data_end-data_start)+1);
-					memcpy((&conn->txbuf)+conn->txidx, buf+(padding_pos+CANARY_PADDING_SIZE), len-(token_pos+TOKEN_SIZE));
+					memcpy((&conn->txbuf)+conn->txsz, buf+data_start, (data_end-data_start)+1);
+					memcpy((&conn->txbuf)+conn->txsz, buf+(padding_pos+CANARY_PADDING_SIZE), len-(token_pos+TOKEN_SIZE));
 				}
 			}
 		}
 		// Write data from stream
-		if(conn->txidx > (DEFAULT_READ_BUFFER_SIZE / 2)) {
+		if(conn->txsz > (DEFAULT_BUFFER_SIZE / 2)) {
 			_phy.setNotifyReadable(sock, false);
 		}
 		lwipstack->_lock.lock();
-		conn->txidx += wlen;
+		conn->txsz += wlen;
 		handle_write(conn);
 		lwipstack->_lock.unlock();
 	}
@@ -843,29 +849,25 @@ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *tpcb, struct pbuf
 			l->tap->closeConnection(l->conn->sock);
 			return ERR_ABRT;
 		}
-		else {
-			//dwr(MSG_ERROR," nc_recved(): invalid connection/state\n");
-		}
 		return err;
 	}
+	Mutex::Lock _l(l->tap->_rx_buf_m);
 	// Cycle through pbufs and write them to the RX buffer
 	// The RX buffer will be emptied via phyOnUnixWritable()
 	while(p != NULL) {
 		if(p->len <= 0)
 			break;
-		int avail = DEFAULT_READ_BUFFER_SIZE - l->conn->rxidx;
+		int avail = DEFAULT_BUFFER_SIZE - l->conn->rxsz;
 		int len = p->len;
-		if(avail < len) {
-			dwr(MSG_DEBUG," nc_recv(): not enough room (%d bytes) on RX buffer\n", avail);
-			exit(1);
-		}
-		memcpy(l->conn->rxbuf + (l->conn->rxidx), p->payload, len);
-		l->conn->rxidx += len;
+		if(avail < len)
+			dwr(MSG_ERROR," nc_recv(): not enough room (%d bytes) on RX buffer\n", avail);
+		memcpy(l->conn->rxbuf + (l->conn->rxsz), p->payload, len);
+		l->conn->rxsz += len;
 		l->tap->_phy.setNotifyWritable(l->conn->sock, true); // Signal that we're interested in knowing when we can write
 		p = p->next;
 		tot += len;
 	}
-	dwr(MSG_DEBUG," nc_recv(): wrote %d bytes to RX buffer for <%x> (idx = %d)\n", tot, l->conn->sock, l->conn->rxidx);
+	dwr(MSG_ERROR," nc_recv(): wrote %d bytes to RX buffer for <%x> (sz = %d)\n", tot, l->conn->sock, l->conn->rxsz);
 	l->tap->lwipstack->_pbuf_free(q);
 	return ERR_OK;
 }
@@ -984,7 +986,8 @@ err_t NetconEthernetTap::nc_sent(void* arg, struct tcp_pcb *tpcb, u16_t len)
 {
 	Larg *l = (Larg*)arg;
 	if(len) {
-		if(l->conn->txidx < DEFAULT_READ_BUFFER_SIZE / 2) {
+		float max = (float)DEFAULT_BUFFER_SIZE;
+		if(l->conn->txsz < max / 2) {
 			l->tap->_phy.setNotifyReadable(l->conn->sock, true);
 			l->tap->_phy.whack();
 		}
@@ -1339,7 +1342,7 @@ void NetconEthernetTap::handle_connect(PhySocket *sock, PhySocket *rpcsock, TcpC
  */
 void NetconEthernetTap::handle_write(TcpConnection *conn)
 {
-	dwr(MSG_DEBUG_EXTRA,"handle_write(): conn->txidx = %d, conn->sock = %x\n", conn->txidx, conn->sock);
+	dwr(MSG_DEBUG_EXTRA,"handle_write(): conn->txsz = %d, conn->sock = %x\n", conn->txsz, conn->sock);
 	if(!conn) {
 		dwr(MSG_ERROR," handle_write(): invalid connection\n");
 		return;
@@ -1357,15 +1360,15 @@ void NetconEthernetTap::handle_write(TcpConnection *conn)
 		_phy.setNotifyReadable(conn->sock, false);
 		return;
 	}
-	if(conn->txidx <= 0) {
-		dwr(MSG_DEBUG,"handle_write(): conn->txidx <= 0, nothing in buffer to write\n");
+	if(conn->txsz <= 0) {
+		dwr(MSG_DEBUG,"handle_write(): conn->txsz <= 0, nothing in buffer to write\n");
 		return;
 	}
 	if(!conn->listening)
 		lwipstack->_tcp_output(conn->pcb);
 
 	if(conn->sock) {
-		r = conn->txidx < sndbuf ? conn->txidx : sndbuf;
+		r = conn->txsz < sndbuf ? conn->txsz : sndbuf;
 		dwr(MSG_DEBUG,"handle_write(): r = %d, sndbuf = %d\n", r, sndbuf);
 		/* Writes data pulled from the client's socket buffer to LWIP. This merely sends the
 		 * data to LWIP to be enqueued and eventually sent to the network. */
@@ -1381,10 +1384,11 @@ void NetconEthernetTap::handle_write(TcpConnection *conn)
 				return;
 			}
 			else {
-				sz = (conn->txidx)-r;
+				sz = (conn->txsz)-r;
 				if(sz)
 					memmove(&conn->txbuf, (conn->txbuf+r), sz);
-				conn->txidx -= r;
+				conn->txsz -= r;
+				//fprintf(stderr,"  TX ---> %d bytes (sz = %d)\n", r, conn->txsz);
 				return;
 			}
 		}

+ 1 - 1
netcon/NetconEthernetTap.hpp

@@ -174,7 +174,7 @@ private:
 	Mutex _multicastGroups_m;
 
 	std::vector<InetAddress> _ips;
-	Mutex _ips_m, _tcpconns_m;
+	Mutex _ips_m, _tcpconns_m, _rx_buf_m, _tx_buf_m;
 
 	unsigned int _mtu;
 	volatile bool _enabled;

+ 1 - 1
netcon/common.inc.c

@@ -42,7 +42,7 @@
 #ifndef _COMMON_H
 #define _COMMON_H  1
 
-#define DEBUG_LEVEL     3
+#define DEBUG_LEVEL     0
 
 #define MSG_WARNING     4
 #define MSG_ERROR       1 // Errors