瀏覽代碼

Modified PCB/state management logic

Joseph Henry 9 年之前
父節點
當前提交
c3e2cb9b8e
共有 7 個文件被更改,包括 107 次插入83 次删除
  1. 1 9
      ext/lwipopts.h
  2. 6 6
      netcon/Intercept.c
  3. 93 63
      netcon/NetconEthernetTap.cpp
  4. 3 1
      netcon/NetconEthernetTap.hpp
  5. 1 2
      netcon/RPC.h
  6. 1 1
      netcon/common.inc.c
  7. 2 1
      netcon/make-liblwip.mk

+ 1 - 9
ext/lwipopts.h

@@ -49,14 +49,6 @@
 #undef TCP_MSS
 #define TCP_MSS 1460
 
-
-/**
- * MEMP_NUM_REASSDATA: the number of IP packets simultaneously queued for
- * reassembly (whole packets, not fragments!)
- */
-// #undef MEMP_NUM_REASSDATA
-//#define MEMP_NUM_REASSDATA              64
-
 /*
 The TCP window size can be adjusted by changing the define TCP_WND. However,
 do keep in mind that this should be at least twice the size of TCP_MSS (thus
@@ -480,4 +472,4 @@ happening sooner than they should.
  */
 #define PPP_SUPPORT                     0
 
-#endif /* __LWIPOPTS_H__ */
+#endif /* __LWIPOPTS_H__ */

+ 6 - 6
netcon/Intercept.c

@@ -71,7 +71,7 @@ we simply ask the service via an RPC */
 
 static int connected_to_service(int sockfd)
 {
-  dwr(MSG_DEBUG_EXTRA,"connected_to_service():\n");
+  dwr(MSG_DEBUG,"connected_to_service():\n");
   socklen_t len;
   struct sockaddr_storage addr;
   len = sizeof addr;
@@ -80,11 +80,11 @@ static int connected_to_service(int sockfd)
   if (addr.ss_family == AF_LOCAL || addr.ss_family == AF_LOCAL) {
     addr_un = (struct sockaddr_un*)&addr;
     if(strcmp(addr_un->sun_path, netpath) == 0) {
-      dwr(MSG_DEBUG_EXTRA,"connected_to_service(): Yes, %s\n", addr_un->sun_path);
+      dwr(MSG_DEBUG,"connected_to_service(): Yes, %s\n", addr_un->sun_path);
       return 1;
     }
   }
-  dwr(MSG_DEBUG_EXTRA,"connected_to_service(): Not connected to service\n");
+  dwr(MSG_DEBUG,"connected_to_service(): Not connected to service\n");
   return 0;
 }
 
@@ -488,7 +488,7 @@ int getsockname(GETSOCKNAME_SIG)
     return realgetsockname(sockfd, addr, addrlen);
 
   dwr(MSG_DEBUG,"getsockname(%d)\n", sockfd);
-  if(connected_to_service(sockfd) == 0) {
+  if(!connected_to_service(sockfd)) {
     dwr(MSG_DEBUG,"getsockname(): not used by service\n");
     return realgetsockname(sockfd, addr, addrlen);
   }
@@ -507,8 +507,8 @@ int getsockname(GETSOCKNAME_SIG)
 
   if(rpcfd > -1)
     if(read(rpcfd, &addrbuf, sizeof(struct sockaddr_storage)) > 0)
-      close(rpcfd);
-
+     close(rpcfd);
+   
   struct sockaddr_storage sock_storage;
   memcpy(&sock_storage, addrbuf, sizeof(struct sockaddr_storage));
   *addrlen = sizeof(struct sockaddr_in);

+ 93 - 63
netcon/NetconEthernetTap.cpp

@@ -56,8 +56,8 @@
 
 #define APPLICATION_POLL_FREQ 			20
 #define ZT_LWIP_TCP_TIMER_INTERVAL 		5
-#define STATUS_TMR_INTERVAL				60000 // How often we check connection statuses (in ms)
-#define DEFAULT_READ_BUFFER_SIZE		1024 * 1024 * 5
+#define STATUS_TMR_INTERVAL				250 // How often we check connection statuses (in ms)
+#define DEFAULT_READ_BUFFER_SIZE		1024 * 1024
 
 namespace ZeroTier {
 
@@ -114,8 +114,6 @@ class TcpConnection
 {
 public:
 
-  uint64_t accept_token;
-
   bool pending, listening;
   int pid, idx;
   unsigned long written, acked;
@@ -327,15 +325,6 @@ void NetconEthernetTap::scanMulticastGroups(std::vector<MulticastGroup> &added,s
 	_multicastGroups.swap(newGroups);
 }
 
-TcpConnection *NetconEthernetTap::getConnection(PhySocket *sock)
-{
-	for(size_t i=0; i<tcp_connections.size(); i++) {
-		if(tcp_connections[i]->sock == sock)
-			return tcp_connections[i];
-	}
-	return NULL;
-}
-
 void NetconEthernetTap::threadMain()
 	throw()
 {
@@ -355,6 +344,7 @@ void NetconEthernetTap::threadMain()
 
 		// Connection prunning
 		if (since_status >= STATUS_TMR_INTERVAL) {
+
 			prev_status_time = now;
 			status_remaining = STATUS_TMR_INTERVAL - since_status;
 
@@ -368,15 +358,15 @@ void NetconEthernetTap::threadMain()
 
 				fcntl(fd, F_SETFL, O_NONBLOCK);
 				unsigned char tmpbuf[BUF_SZ];
-				int n;
-
-				if((n = read(fd,&tmpbuf,BUF_SZ)) < 0 && errno != EAGAIN) {
+				int n = read(fd,&tmpbuf,BUF_SZ);
+				//dwr(MSG_DEBUG,"  tap_thread(): <%x> n = %d\n", tcp_connections[i]->sock, n);
+				if(tcp_connections[i]->pcb->state == SYN_SENT) {
+					dwr(MSG_DEBUG,"  tap_thread(): <%x> state = SYN_SENT, candidate for removal\n", tcp_connections[i]->sock);
+				}
+				if((n < 0 && errno != EAGAIN) || (n == 0 && errno == EAGAIN)) {
 					dwr(MSG_DEBUG," tap_thread(): closing sock (%x)\n", tcp_connections[i]->sock);
 					closeConnection(tcp_connections[i]->sock);
 				}
-				// < 0 is failure
-				//   0 nothing to read, RPC still active
-				// > 0 RPC data read, handle it
 				else if (n > 0) {
 					dwr(MSG_DEBUG," tap_thread(): data read during connection check (%d bytes)\n", n);
 					phyOnUnixData(tcp_connections[i]->sock,_phy.getuptr(tcp_connections[i]->sock),&tmpbuf,BUF_SZ);
@@ -419,31 +409,55 @@ void NetconEthernetTap::phyOnTcpClose(PhySocket *sock,void **uptr) {}
 void NetconEthernetTap::phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len) {}
 void NetconEthernetTap::phyOnTcpWritable(PhySocket *sock,void **uptr) {}
 
+void NetconEthernetTap::addConnection(TcpConnection *conn)
+{
+	Mutex::Lock _l(_tcpconns_m);
+	tcp_connections.push_back(conn);
+}
+
+void NetconEthernetTap::removeConnection(TcpConnection *conn)
+{
+	Mutex::Lock _l(_tcpconns_m);
+	for(size_t i=0; i<tcp_connections.size(); i++) {
+		if(tcp_connections[i] == conn){
+			tcp_connections.erase(tcp_connections.begin() + i);
+			return;
+		}
+	}
+}
+
+TcpConnection *NetconEthernetTap::getConnection(PhySocket *sock)
+{
+	Mutex::Lock _l(_tcpconns_m);
+	for(size_t i=0; i<tcp_connections.size(); i++) {
+		if(tcp_connections[i]->sock == sock){
+			return tcp_connections[i];
+		}
+	}
+	return NULL;
+}
+
 /*
  * Closes a TcpConnection and associated LWIP PCB strcuture.
  */
 void NetconEthernetTap::closeConnection(PhySocket *sock)
 {
-	dwr(MSG_DEBUG,"closeConnection(%x)",sock);
+	dwr(MSG_DEBUG,"closeConnection(%x)\n",sock);
+	if(!sock) {
+		dwr(MSG_DEBUG," closeConnection(): invalid PhySocket\n");
+		return;
+	}
 	TcpConnection *conn = getConnection(sock);
-	if(conn) {
-		if(!conn->pcb)
-			return;
-		// TODO: Removed to address double-free segfault when killing a python simple server
-
-		// tell LWIP to close the associated PCB 
-		//if(conn->pcb->state != CLOSED && lwipstack->_tcp_close(conn->pcb) != ERR_OK) {
-		//	dwr(MSG_ERROR," closeConnection(): Error while calling tcp_close()\n");
-		//}
-		// remove from connection list
-		for(size_t i=0; i<tcp_connections.size(); i++) {
-			if(tcp_connections[i]->sock == sock){
-				tcp_connections.erase(tcp_connections.begin() + i);
-				//delete conn;
-				break;
-			}
-		}
+	if(!conn || !conn->pcb)
+		return;
+	if(conn->pcb->state == SYN_SENT) {
+		dwr(MSG_DEBUG," closeConnection(): invalid PCB state (SYN_SENT) -- cannot close right now\n");
+		return;
+	}	
+	if(lwipstack->_tcp_close(conn->pcb) != ERR_OK) {
+		dwr(MSG_ERROR," closeConnection(): Error while calling tcp_close()\n");
 	}
+	removeConnection(conn);
 	if(!sock)
 		return;
 	close(_phy.getDescriptor(sock)); // close underlying fd
@@ -606,8 +620,8 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
 
 	// Process RPC if we have a corresponding jobmap entry
 	if(found_job) {
-		conn = getConnection(sock);
 		unload_rpc(buf, pid, tid, rpc_count, timestamp, CANARY, cmd, payload);
+		dwr(MSG_DEBUG," <%x> RPC: (pid=%d, tid=%d, rpc_count=%d, timestamp=%s, cmd=%d)\n", sock, pid, tid, rpc_count, timestamp, cmd);
 		switch(cmd) {
 			case RPC_BIND:
 				dwr(MSG_DEBUG,"  <%x> RPC_BIND\n", sock);
@@ -632,6 +646,7 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
 			    struct connect_st connect_rpc;
 			    memcpy(&connect_rpc,  &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct connect_st));
 			    handle_connect(sock, rpcsock, conn, &connect_rpc);
+			    jobmap.erase(CANARY_num);
 				return; // Keep open RPC, we'll use it once in nc_connected to send retval
 		  	default:
 				break;
@@ -729,7 +744,7 @@ err_t NetconEthernetTap::nc_accept(void *arg, struct tcp_pcb *newpcb, err_t err)
 		}
 		// create and populate new TcpConnection
 		TcpConnection *new_tcp_conn = new TcpConnection();
-		tap->tcp_connections.push_back(new_tcp_conn);
+		tap->addConnection(new_tcp_conn);
 		new_tcp_conn->pcb = newpcb;
 		new_tcp_conn->sock = tap->_phy.wrapSocket(fds[0], new_tcp_conn);
 
@@ -783,14 +798,14 @@ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *tpcb, struct pbuf
 	if(p == NULL) {
 		if(l->conn && !l->conn->listening) {
 			dwr(MSG_INFO," nc_recved(): closing connection\n");
-			//if(l->tap->lwipstack->_tcp_close(l->conn->pcb) != ERR_OK) {
-			//	dwr(MSG_ERROR," closeConnection(): Error while calling tcp_close()\n");
-			//}
+			if(l->tap->lwipstack->_tcp_close(l->conn->pcb) != ERR_OK) {
+				dwr(MSG_ERROR," closeConnection(): Error while calling tcp_close()\n");
+			}
 			l->tap->closeConnection(l->conn->sock);
 			return ERR_ABRT;
 		}
 		else {
-			dwr(MSG_ERROR," nc_recved(): invalid connection/state\n");
+			//dwr(MSG_ERROR," nc_recved(): invalid connection/state\n");
 		}
 		return err;
 	}
@@ -893,8 +908,8 @@ void NetconEthernetTap::nc_err(void *arg, err_t err)
 		default:
 			break;
 	}
-	//dwr(MSG_ERROR,"nc_err(): closing connection\n");
-	//l->tap->closeConnection(l->conn);
+	dwr(MSG_ERROR,"nc_err(): closing connection\n");
+	l->tap->closeConnection(l->conn);
 }
 
 /*
@@ -1142,7 +1157,7 @@ TcpConnection * NetconEthernetTap::handle_socket(PhySocket *sock, void **uptr, s
   		*uptr = new_conn;
   		new_conn->sock = sock;
   		new_conn->pcb = newpcb;
-  		tcp_connections.push_back(new_conn);
+  		addConnection(new_conn);
   		new_conn->pending = true;
 		return new_conn;
 	}
@@ -1194,14 +1209,30 @@ void NetconEthernetTap::handle_connect(PhySocket *sock, PhySocket *rpcsock, TcpC
 	ip_addr_t conn_addr = convert_ip((struct sockaddr_in *)&connect_rpc->__addr);
 
 	if(conn != NULL) {
-		if (!conn->listening)
-			lwipstack->tcp_sent(conn->pcb, nc_sent);
+		lwipstack->tcp_sent(conn->pcb, nc_sent);
 		lwipstack->tcp_recv(conn->pcb, nc_recved);
 		lwipstack->tcp_err(conn->pcb, nc_err);
 		lwipstack->tcp_poll(conn->pcb, nc_poll, APPLICATION_POLL_FREQ);
 		lwipstack->tcp_arg(conn->pcb, new Larg(this, conn));
 
+		  int ip = connaddr->sin_addr.s_addr;
+		  unsigned char d[4];
+		  d[0] = ip & 0xFF;
+		  d[1] = (ip >>  8) & 0xFF;
+		  d[2] = (ip >> 16) & 0xFF;
+		  d[3] = (ip >> 24) & 0xFF;
+		  dwr(MSG_DEBUG,"handle_write(): %d.%d.%d.%d:\n", d[0],d[1],d[2],d[3]);	
+		  	
+		dwr(MSG_DEBUG,"handle_connect(): conn_port = %d\n", conn_port);
 		int err = 0;
+
+		dwr(MSG_DEBUG,"handle_connect(): pcb->state = %x\n", conn->pcb->state);
+		if(conn->pcb->state != CLOSED) {
+			dwr(MSG_DEBUG,"handle_connect(): PCB != CLOSED, cannot connect using this PCB\n");
+			send_return_value(rpcsock, -1, EAGAIN);
+			return;
+		}
+
 		if((err = lwipstack->tcp_connect(conn->pcb,&conn_addr,conn_port, nc_connected)) < 0)
 		{
 			if(err == ERR_ISCONN) {
@@ -1265,37 +1296,40 @@ void NetconEthernetTap::handle_connect(PhySocket *sock, PhySocket *rpcsock, TcpC
 
 void NetconEthernetTap::handle_write(TcpConnection *conn)
 {
-	int r;
+	dwr(MSG_DEBUG,"handle_write(): conn->idx = %d, conn->sock = %x\n", conn->idx, conn->sock);
 	if(!conn) {
-		dwr(MSG_ERROR," handle_write(): could not locate connection for this fd\n");
+		dwr(MSG_ERROR," handle_write(): invalid connection\n");
 		return;
 	}
-	//dwr(MSG_DEBUG,"conn->idx = %d, TCP_SND_BUF = %d\n", conn->idx, TCP_SND_BUF);
 	if(!conn->pcb) {
 		dwr(MSG_ERROR," handle_write(): conn->pcb == NULL. Failed to write.\n");
 		return;
 	}
-	int sndbuf = conn->pcb->snd_buf; // How much we are currently allowed to write to the connection
-	/* PCB send buffer is full,turn off readability notifications for the
-	corresponding PhySocket until nc_sent() is called and confirms that there is
-	now space on the buffer */
+	int err, sz, r, sndbuf = conn->pcb->snd_buf; // How much we are currently allowed to write to the connection
 	if(sndbuf == 0) {
+		/* PCB send buffer is full,turn off readability notifications for the
+		corresponding PhySocket until nc_sent() is called and confirms that there is
+		now space on the buffer */
+		dwr(MSG_DEBUG," handle_write(): sndbuf == 0, LWIP stack is full\n");
 		_phy.setNotifyReadable(conn->sock, false);
 		return;
 	}
+	if(conn->idx <= 0) {
+		dwr(MSG_DEBUG,"handle_write(): conn->idx <= 0, nothing in buffer to write\n");
+		return;
+	}
 	if(!conn->listening)
 		lwipstack->_tcp_output(conn->pcb);
-	if(conn->sock && !conn->listening) {
 
+	if(conn->sock) {
 		r = conn->idx < sndbuf ? conn->idx : sndbuf;
-		//dwr(MSG_DEBUG,"handle_write(): r = %d\n", r);
+		dwr(MSG_DEBUG,"handle_write(): r = %d, sndbuf = %d\n", r, sndbuf);
 		/* Writes data pulled from the client's socket buffer to LWIP. This merely sends the
 		 * data to LWIP to be enqueued and eventually sent to the network. */
 		if(r > 0) {
-			int sz;
 			// NOTE: this assumes that lwipstack->_lock is locked, either
 			// because we are in a callback or have locked it manually.
-			int err = lwipstack->_tcp_write(conn->pcb, &conn->buf, r, TCP_WRITE_FLAG_COPY);
+			err = lwipstack->_tcp_write(conn->pcb, &conn->buf, r, TCP_WRITE_FLAG_COPY);
 			lwipstack->_tcp_output(conn->pcb);
 			if(err != ERR_OK) {
 				dwr(MSG_ERROR," handle_write(): error while writing to PCB, (err = %d)\n", err);
@@ -1313,10 +1347,6 @@ void NetconEthernetTap::handle_write(TcpConnection *conn)
 				return;
 			}
 		}
-		else {
-			dwr(MSG_INFO," handle_write(): LWIP stack full\n");
-			return;
-		}
 	}
 }
 

+ 3 - 1
netcon/NetconEthernetTap.hpp

@@ -137,6 +137,8 @@ private:
 	void phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable);
 
 	TcpConnection *getConnection(PhySocket *sock);
+	void addConnection(TcpConnection *conn);
+	void removeConnection(TcpConnection *conn);
 	void closeConnection(PhySocket *sock);
 
 	ip_addr_t convert_ip(struct sockaddr_in * addr)
@@ -171,7 +173,7 @@ private:
 	Mutex _multicastGroups_m;
 
 	std::vector<InetAddress> _ips;
-	Mutex _ips_m;
+	Mutex _ips_m, _tcpconns_m;
 
 	unsigned int _mtu;
 	volatile bool _enabled;

+ 1 - 2
netcon/RPC.h

@@ -21,8 +21,7 @@
 #define CANARY_IDX			1
 #define STRUCT_IDX			CANARY_IDX+CANARY_SIZE
 
-#define BUF_SZ          	256
-#define PAYLOAD_SZ			223 /* BUF_SZ-IDX_PAYLOAD */
+#define BUF_SZ          	512
 
 #define ERR_OK          	0
 

+ 1 - 1
netcon/common.inc.c

@@ -42,7 +42,7 @@
 #ifndef _COMMON_H
 #define _COMMON_H  1
 
-#define DEBUG_LEVEL     0
+#define DEBUG_LEVEL     3
 
 #define MSG_WARNING     4
 #define MSG_ERROR       1 // Errors

+ 2 - 1
netcon/make-liblwip.mk

@@ -39,7 +39,8 @@ LWIPDIR=../ext/lwip/src
 
 CCDEP=gcc
 CC=gcc
-CFLAGS=-O3 -g -Wall -DIPv4 -fPIC
+CFLAGS=-O3 -g -Wall -DIPv4 -fPIC 
+#-DLWIP_DEBUG
 
 CFLAGS:=$(CFLAGS) \
 	-I$(LWIPDIR)/include -I$(LWIPARCH)/include -I$(LWIPDIR)/include/ipv4 \