Browse Source

Transfer tune-up

Joseph Henry 9 years ago
parent
commit
7656e6b9f8
3 changed files with 43 additions and 51 deletions
  1. 42 49
      netcon/NetconEthernetTap.cpp
  2. 0 1
      netcon/NetconEthernetTap.hpp
  3. 1 1
      netcon/common.inc.c

+ 42 - 49
netcon/NetconEthernetTap.cpp

@@ -56,8 +56,8 @@
 
 #define APPLICATION_POLL_FREQ 			20
 #define ZT_LWIP_TCP_TIMER_INTERVAL 		5
-#define STATUS_TMR_INTERVAL				1000 // How often we check connection statuses (in ms)
-#define DEFAULT_READ_BUFFER_SIZE		1024 * 1024
+#define STATUS_TMR_INTERVAL				10000 // How often we check connection statuses (in ms)
+#define DEFAULT_READ_BUFFER_SIZE		1024 * 1024 * 5
 
 namespace ZeroTier {
 
@@ -348,7 +348,6 @@ TcpConnection *NetconEthernetTap::getConnection(PhySocket *sock)
 void NetconEthernetTap::threadMain()
 	throw()
 {
-	dwr(MSG_DEBUG, "MEMP_NUM_REASSDATA = %d\n", MEMP_NUM_REASSDATA);
 	uint64_t prev_tcp_time = 0;
 	uint64_t prev_status_time = 0;
 	uint64_t prev_etharp_time = 0;
@@ -364,11 +363,12 @@ void NetconEthernetTap::threadMain()
 		uint64_t status_remaining = STATUS_TMR_INTERVAL;
 
 		// Connection prunning
+		/*
 		if (since_status >= STATUS_TMR_INTERVAL) {
 			prev_status_time = now;
 			status_remaining = STATUS_TMR_INTERVAL - since_status;
 
-			dwr(MSG_DEBUG," tap_thread(): tcp\\jobs\\socks = {%d, %d, %d}\n", tcp_connections.size(), jobmap.size(), sockmap.size());
+			dwr(MSG_DEBUG," tap_thread(): tcp\\jobs = {%d, %d}\n", tcp_connections.size(), jobmap.size());
 			for(size_t i=0; i<tcp_connections.size(); i++) {
 
 				// No TCP connections are associated, this is a candidate for removal
@@ -376,13 +376,6 @@ void NetconEthernetTap::threadMain()
 					continue; // Skip, this is a pending connection
 				int fd = _phy.getDescriptor(tcp_connections[i]->sock);
 
-					if(tcp_connections[i]->idx > 0){
-						dwr(MSG_DEBUG, "writing from poll\n");
-						lwipstack->_lock.lock();
-						handle_write(tcp_connections[i]);
-						lwipstack->_lock.unlock();
-					}
-
 				fcntl(fd, F_SETFL, O_NONBLOCK);
 				unsigned char tmpbuf[BUF_SZ];
 				int n;
@@ -399,11 +392,23 @@ void NetconEthernetTap::threadMain()
 					phyOnUnixData(tcp_connections[i]->sock,_phy.getuptr(tcp_connections[i]->sock),&tmpbuf,BUF_SZ);
 				}				
 			}
-		}
+		}*/
 		// Main TCP/ETHARP timer section
 		if (since_tcp >= ZT_LWIP_TCP_TIMER_INTERVAL) {
 			prev_tcp_time = now;
 			lwipstack->tcp_tmr();
+
+			// Makeshift poll
+
+			for(size_t i=0; i<tcp_connections.size(); i++) {
+				if(tcp_connections[i]->idx > 0){
+					//dwr(MSG_DEBUG, "writing from poll\n");
+					lwipstack->_lock.lock();
+					handle_write(tcp_connections[i]);
+					lwipstack->_lock.unlock();
+				}
+			}
+
 		} else {
 			tcp_remaining = ZT_LWIP_TCP_TIMER_INTERVAL - since_tcp;
 		}
@@ -463,8 +468,7 @@ void NetconEthernetTap::phyOnUnixClose(PhySocket *sock,void **uptr) {
 /*
  * Handles data on a client's data buffer. Data is sent to LWIP to be enqueued.
  */
-void NetconEthernetTap::phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable)
-{
+void NetconEthernetTap::phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable) {
 	dwr(MSG_DEBUG,"\nphyOnFileDescriptorActivity(): new connection = %x\n", sock);
 }
 
@@ -502,11 +506,9 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
 	unsigned char *buf = (unsigned char*)data;
 	std::pair<PhySocket*, void*> sockdata;
 	PhySocket *streamsock, *rpcsock;
-	bool found_sock = false, found_job = false;
-
+	bool found_job = false;
 
 	TcpConnection *conn;
-	int max_sndbuf = (float)TCP_SND_BUF;
 	int wlen = len;
 
 	// RPC
@@ -527,17 +529,9 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
 			//return; // Don't close the socket, we'll use this later for data
 		}
 		else { // All RPCs other than RPC_SOCKET
-			streamsock = sockmap[magic_num];
-			if(streamsock){ // We found a pre-existing stream socket for this RPC
-				sockmap[magic_num] = NULL;
-				found_sock = true;				
-				dwr(MSG_DEBUG," <%x> found_sock\n", sock);
-			}
-			else {
-				// No matching stream has been encountered, create jobmap entry
-				dwr(MSG_DEBUG," <%x> creating jobmap (cmd=%d) entry for %llu\n", sock, cmd, magic_num);
-				jobmap[magic_num] = std::make_pair<PhySocket*, void*>(sock, data);
-			}
+			// No matching stream has been encountered, create jobmap entry
+			dwr(MSG_DEBUG," <%x> creating jobmap (cmd=%d) entry for %llu\n", sock, cmd, magic_num);
+			jobmap[magic_num] = std::make_pair<PhySocket*, void*>(sock, data);
 		}
 		write(_phy.getDescriptor(sock), "z", 1); // RPC ACK byte to maintain RPC->Stream order
 	}
@@ -555,14 +549,13 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
 		dwr(MSG_DEBUG, " <%x> padding_pos = %d\n", sock, padding_pos);
 		// Grab token, next we'll use it to look up an RPC job
 		if(token_pos > -1) {
-			dwr(MSG_DEBUG, "    <%x> token_pos = %d, GRABBING TOKEN\n", sock, token_pos);
 			memcpy(&magic_num, buf+token_pos, MAGIC_SIZE);
 			if(magic_num != 0) { // TODO: Added to address magic_num==0 bug, last seeen 20160108
 				// Find job
 				sockdata = jobmap[magic_num];
 				if(!sockdata.first) { // Stream before RPC
-					dwr(MSG_DEBUG,"       <%x> creating sockmap entry for %llu\n", sock, magic_num);
-					sockmap[magic_num] = sock;
+					dwr(MSG_DEBUG,"       <%x> unable to locate job entry for %llu\n", sock, magic_num);
+					return;
 				}
 				else
 					found_job = true;
@@ -587,14 +580,12 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
 					wlen = len - TOKEN_SIZE;
 					data_start = padding_pos+MAGIC_PADDING_SIZE;
 					memcpy((&conn->buf)+conn->idx, buf+data_start, wlen);
-					dwr(MSG_DEBUG," wlen = %d, data_start = %d\n", wlen, data_start);
 				}
 				// [DATA] + [TOKEN]
 				if(len > TOKEN_SIZE && token_pos > 0 && token_pos == len - TOKEN_SIZE) {
 					wlen = len - TOKEN_SIZE;
 					data_start = 0;
 					memcpy((&conn->buf)+conn->idx, buf+data_start, wlen);												
-					dwr(MSG_DEBUG," wlen = %d, data_start = %d\n", wlen, data_start);
 				}
 				// [DATA] + [TOKEN] + [DATA]
 				if(len > TOKEN_SIZE && token_pos > 0 && len > (token_pos + TOKEN_SIZE)) {
@@ -603,10 +594,15 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
 					data_end = padding_pos-MAGIC_SIZE;
 					memcpy((&conn->buf)+conn->idx, buf+data_start, (data_end-data_start)+1);
 					memcpy((&conn->buf)+conn->idx, buf+(padding_pos+MAGIC_PADDING_SIZE), len-(token_pos+TOKEN_SIZE));
-					dwr(MSG_DEBUG," wlen = %d, data_start = %d, data_end = %d\n", wlen, data_start, data_end);
 				}
 			}
 		}
+
+		// Write data from stream
+		if(conn->idx > (DEFAULT_READ_BUFFER_SIZE / 2)) {
+			dwr(MSG_DEBUG,"Buffer near full. Slowing\n");
+			_phy.setNotifyReadable(sock, false);
+		}
 		lwipstack->_lock.lock();
 		conn->idx += wlen;
 		handle_write(conn);
@@ -617,17 +613,12 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
 		rpcsock = sockdata.first;
 		buf = (unsigned char*)sockdata.second;
 	}
-	else if(found_sock) {
-		rpcsock = sock;
-		sock = streamsock;
-	}
 
-	// Process RPC if we have a corresponding jobmap/sockmap entry
-	if(found_job || found_sock)
-	{
+	// Process RPC if we have a corresponding jobmap entry
+	if(found_job) {
 		conn = getConnection(sock);
 		unload_rpc(buf, pid, tid, rpc_count, timestamp, magic, cmd, payload);
-		dwr(MSG_DEBUG," <%x> RPC: (pid=, tid=, rpc_count=, timestamp=, cmd=%d)\n", sock, /*pid, tid, rpc_count, timestamp, */cmd);
+		//dwr(MSG_DEBUG," <%x> RPC: (pid=, tid=, rpc_count=, timestamp=, cmd=%d)\n", sock, /*pid, tid, rpc_count, timestamp, */cmd);
 
 		switch(cmd) {
 			case RPC_BIND:
@@ -659,7 +650,6 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
 		}
 		closeConnection(sockdata.first); // close RPC after sending retval, no longer needed
 		jobmap.erase(magic_num);
-		sockmap.erase(magic_num);
 		return;
 	}
 }
@@ -949,10 +939,13 @@ err_t NetconEthernetTap::nc_sent(void* arg, struct tcp_pcb *tpcb, u16_t len)
 {
 	Larg *l = (Larg*)arg;
 	if(len) {
-		dwr(MSG_DEBUG,"nc_sent(ACKED): len = %d\n",len);
-		l->conn->acked+=len;
-		l->tap->_phy.setNotifyReadable(l->conn->sock, true);
-		l->tap->_phy.whack();
+		//dwr(MSG_DEBUG,"nc_sent(ACKED): len = %d\n",len);
+		//l->conn->acked+=len;
+		if(l->conn->idx < DEFAULT_READ_BUFFER_SIZE / 2)
+		{
+			l->tap->_phy.setNotifyReadable(l->conn->sock, true);
+			l->tap->_phy.whack();
+		}
 	}
 	return ERR_OK;
 }
@@ -1292,7 +1285,7 @@ void NetconEthernetTap::handle_write(TcpConnection *conn)
 		dwr(MSG_ERROR," handle_write(): could not locate connection for this fd\n");
 		return;
 	}
-	dwr(MSG_DEBUG,"conn->idx = %d, TCP_SND_BUF = %d\n", conn->idx, TCP_SND_BUF);
+	//dwr(MSG_DEBUG,"conn->idx = %d, TCP_SND_BUF = %d\n", conn->idx, TCP_SND_BUF);
 	if(!conn->pcb) {
 		dwr(MSG_ERROR," handle_write(): conn->pcb == NULL. Failed to write.\n");
 		return;
@@ -1310,7 +1303,7 @@ void NetconEthernetTap::handle_write(TcpConnection *conn)
 	if(conn->sock && !conn->listening) {
 
 		r = conn->idx < sndbuf ? conn->idx : sndbuf;
-		dwr(MSG_DEBUG,"handle_write(): r = %d\n", r);
+		//dwr(MSG_DEBUG,"handle_write(): r = %d\n", r);
 		/* Writes data pulled from the client's socket buffer to LWIP. This merely sends the
 		 * data to LWIP to be enqueued and eventually sent to the network. */
 		if(r > 0) {

+ 0 - 1
netcon/NetconEthernetTap.hpp

@@ -158,7 +158,6 @@ private:
 	std::map<PhySocket*, pid_t> pidmap;
 
 	std::map<uint64_t, std::pair<PhySocket*, void*> > jobmap;
-	std::map<uint64_t, PhySocket*> sockmap;
 
 	pid_t rpc_counter;
 	netif interface;

+ 1 - 1
netcon/common.inc.c

@@ -42,7 +42,7 @@
 #ifndef _COMMON_H
 #define _COMMON_H  1
 
-#define DEBUG_LEVEL     3
+#define DEBUG_LEVEL     1
 
 #define MSG_WARNING     4
 #define MSG_ERROR       1 // Errors