Răsfoiți Sursa

- more tcp stuff (not sure it compiles though, so don't use -DUSE_TCP yet)

Andrei Pelinescu-Onciul 23 ani în urmă
părinte
comite
6bc40dea88
4 a modificat fișierele cu 173 adăugiri și 72 ștergeri
  1. 1 1
      ip_addr.h
  2. 43 3
      tcp_conn.h
  3. 45 18
      tcp_main.c
  4. 84 50
      tcp_read.c

+ 1 - 1
ip_addr.h

@@ -241,7 +241,7 @@ static inline int init_su( union sockaddr_union* su,
 
 
 
-/* inits a struct sockaddr_union from a struct hostent, an address index int
+/* inits a struct sockaddr_union from a struct hostent, an address index in
  * the hostent structure and a port no.
  * WARNING: no index overflow  checks!
  * returns 0 if ok, -1 on error (unknown address family) */

+ 43 - 3
tcp_conn.h

@@ -33,6 +33,13 @@
 
 
 #define TCP_BUF_SIZE 65535
+#define TCP_CON_TIMEOUT 60 /* in  seconds */
+#define TCP_CHILD_TIMEOUT 5 /* after 5 seconds, the child "returns" 
+							 the connection to the tcp master process */
+#define TCP_MAIN_SELECT_TIMEOUT 5 /* how often "tcp main" checks for timeout*/
+#define TCP_CHILD_SELECT_TIMEOUT 2 /* the same as above but for children */
+
+
 enum {	TCP_REQ_INIT, TCP_REQ_OK, TCP_READ_ERROR, TCP_REQ_OVERRUN, 
 	 	TCP_REQ_BAD_LEN };
 enum {	H_SKIP, H_LF, H_LFCR,  H_BODY, H_STARTWS,
@@ -44,7 +51,6 @@ enum {	H_SKIP, H_LF, H_LFCR,  H_BODY, H_STARTWS,
 
 struct tcp_req{
 	struct tcp_req* next;
-	int fd;
 	/* sockaddr ? */
 	char buf[TCP_BUF_SIZE]; /* bytes read so far*/
 	char* pos; /* current position in buf */
@@ -59,17 +65,51 @@ struct tcp_req{
 };
 
 
-#define init_tcp_req( r, f) \
+
+
+struct tcp_connection{
+	int s; /*socket, used by "tcp main" */
+	int fd; /* used only by "children" */
+	struct ip_addr ip; /* peer ip */
+	int sock_idx; /* receiving socket index in the tcp_info array */
+	union sockaddr_union su;
+	struct tcp_req req; /* request data */
+	int refcnt;
+	int timeout; /* connection timeout, after this it will be removed*/
+	struct tcp_connection* next;
+	struct tcp_connection* prev;
+};
+
+
+
+
+#define init_tcp_req( r) \
 	do{ \
 		memset( (r), 0, sizeof(struct tcp_req)); \
-		(r)->fd=(f); \
 		(r)->parsed=(r)->pos=(r)->buf; \
 		(r)->error=TCP_REQ_OK;\
 		(r)->state=H_STARTWS; \
 	}while(0)
 
 
+/* add a tcpconn to a list*/
+#define tcpconn_listadd(head, c) \
+	do{ \
+		/* add it at the begining of the list*/ \
+		(c)->next=(head); \
+		(c)->prev=0; \
+		if ((head)) (head)->prev=(c); \
+		(head)=(c); \
+	} while(0)
+
 
+/* remove a tcpconn from a list*/
+#define tcpconn_listrm(head, c) \
+	do{ \
+		if ((head)==(c)) (head)=(c)->next; \
+		if ((c)->next) (c)->next->prev=(c)->prev; \
+		if ((c)->prev) (c)->prev->next=(c)->next; \
+	}while(0)
 
 
 

+ 45 - 18
tcp_main.c

@@ -28,6 +28,11 @@
 
 #ifdef USE_TCP
 
+
+#ifndef SHM_MEM
+#error "shared memory support needed (add -DSHM_MEM to Makefile.defs)"
+#endif
+
 #include <sys/select.h>
 
 #include <sys/time.h>
@@ -43,10 +48,12 @@
 
 #include "ip_addr.h"
 #include "pass_fd.h"
+#include "tcp_conn.h"
 #include "globals.h"
 #include "mem/mem.h"
 
 
+
 #define local_malloc pkg_malloc
 #define local_free   pkg_free
 
@@ -62,14 +69,6 @@ struct tcp_child{
 
 enum { CONN_OK, CONN_ERROR };
 
-struct tcp_connection{
-	int s; /*socket */
-	struct ip_addr ip;
-	union sockaddr_union su;
-	int refcnt;
-	struct tcp_connection* next;
-	struct tcp_connection* prev;
-};
 
 
 
@@ -83,7 +82,7 @@ struct tcp_connection*  tcpconn_add(int sock, union sockaddr_union* su)
 	struct tcp_connection *c;
 	
 
-	c=(struct tcp_connection*)local_malloc(sizeof(struct tcp_connection));
+	c=(struct tcp_connection*)shm_malloc(sizeof(struct tcp_connection));
 	if (c==0){
 		LOG(L_ERR, "ERROR: tcpconn_add: mem. allocation failure\n");
 		goto error;
@@ -92,12 +91,11 @@ struct tcp_connection*  tcpconn_add(int sock, union sockaddr_union* su)
 	c->su=*su;
 	c->refcnt=0;
 	su2ip_addr(&c->ip, su);
+	init_tcp_req(&c->req);
+	c->timeout=get_ticks()+TCP_CON_TIMEOUT;
 
 	/* add it at the begining of the list*/
-	c->next=conn_list;
-	c->prev=0;
-	if (conn_list) conn_list->prev=c;
-	conn_list=c;
+	tcpconn_listadd(conn_list, c);
 	return c;
 	
 error:
@@ -108,11 +106,29 @@ error:
 
 void tcpconn_rm(struct tcp_connection* c)
 {
+	tcpconn_listrm(conn_list, c);
+	shm_free(c);
+}
+
+
+/* very ineficient for now, use hashtable some day - FIXME*/
+void tcpconn_timeout()
+{
+	struct tcp_connection *c, *next;
+	int jiffies;;
 	
-	if (conn_list==c) conn_list=c->next;
-	if (c->next) c->next->prev=c->prev;
-	if (c->prev) c->prev->next=c->next;
-	local_free(c);
+	
+	jiffies=get_ticks();
+	c=conn_list;
+	while(c){
+		next=c->next;
+		if ((c->refcnt==0) && (jiffies<c->timeout)) {
+			DBG("tcpconn_timeout: timeout for %p (%d < %d)\n",
+					c, jiffies, c->timeout);
+			tcpconn_rm(c);
+		}
+		c=next;
+	}
 }
 
 
@@ -195,6 +211,7 @@ void tcp_main_loop()
 	int state;
 	int bytes;
 	socklen_t su_len;
+	struct timeval timeout;
 
 	/*init */
 	maxfd=0;
@@ -219,9 +236,14 @@ void tcp_main_loop()
 	
 	while(1){
 		sel_set=master_set;
-		n=select(maxfd+1, &sel_set, 0 ,0 , 0);
+		timeout->tv_sec=TCP_MAIN_SELECT_TIMEOUT;
+		timeout->tv_usec=0;
+		n=select(maxfd+1, &sel_set, 0 ,0 , &timeout);
 		if (n<0){
+			if (errno==EINTR) continue; /* just a signal */
 			/* errors */
+			LOG(L_ERR, "ERROR: tcp_main_loop: select:(%d) %s\n", errno,
+					strerror(errno));
 		}
 		
 		for (r=0; r<sock_no && n; r++){
@@ -301,6 +323,8 @@ read_again:
 						if (tcpconn->refcnt==0){
 							FD_SET(tcpconn->s, &master_set);
 							if (maxfd<tcpconn->s) maxfd=tcpconn->s;
+							/* update the timeout*/
+							tcpconn->timeout=get_ticks()+TCP_CON_TIMEOUT;
 						}
 					}else{
 						/*error, we should destroy it */
@@ -317,6 +341,9 @@ read_again:
 				}
 			}
 		}
+		
+		/* remove old connections */
+		tcpconn_timeout();
 	
 	}
 }

+ 84 - 50
tcp_read.c

@@ -52,7 +52,7 @@
 /* reads next available bytes
  * return number of bytes read, 0 on EOF or -1 on error,
  * sets also r->error */
-int tcp_read(struct tcp_req *r)
+int tcp_read(struct tcp_req *r, int fd)
 {
 	int bytes_free, bytes_read;
 	
@@ -64,7 +64,7 @@ int tcp_read(struct tcp_req *r)
 		return -1;
 	}
 again:
-	bytes_read=read(r->fd, r->pos, bytes_free);
+	bytes_read=read(fd, r->pos, bytes_free);
 
 	if(bytes_read==-1){
 		if (errno == EWOULDBLOCK || errno == EAGAIN){
@@ -92,7 +92,7 @@ again:
  * when either r->body!=0 or r->state==H_BODY =>
  * all headers have been read. It should be called in a while loop.
  * returns < 0 if error or 0 if EOF */
-int tcp_read_headers(struct tcp_req *r)
+int tcp_read_headers(struct tcp_req *r, int fd)
 {
 	int bytes, remaining;
 	char *p;
@@ -133,7 +133,7 @@ int tcp_read_headers(struct tcp_req *r)
 
 
 	
-	bytes=tcp_read(r);
+	bytes=tcp_read(r, fd);
 	if (bytes<=0) return bytes;
 	p=r->parsed;
 	
@@ -313,52 +313,86 @@ skip:
 
 void tcp_receive_loop(int unix_sock)
 {
-	struct tcp_req req;
+	struct tcp_req* req;
+	struct tcp_connection* list; /* list with connections in use */
+	struct tcp_connection* con;
 	int bytes;
 	long size;
 	int n;
-	long id;
+	int nfds;
 	int s;
 	long state;
 	long response[2];
+	fd_set master_set;
+	fd_set sel_set;
+	int maxfd;
+	struct timeval timeout;
 	
 	
-	
+	/* init */
+	list=con=0;
+	FD_ZERO(&master_set);
+	FD_SET(unix_sock, &master_set);
+	maxfd=unix_sock;
 	
 	/* listen on the unix socket for the fd */
 	for(;;){
-			n=receive_fd(unix_sock, &id, sizeof(id), &s);
-			if (n<0){
-				if (errno == EWOULDBLOCK || errno == EAGAIN || errno == EINTR){
-					continue;
-				}else{
-					fprintf(stderr, "ERROR: tcp_receive_loop: read_fd: %s\n",
+			timeout->tv_sec=TCP_CHILD_SELECT_TIMEOUT;
+			timeout->tv_usec=0;
+			sel_set=master_set;
+			nfds=select(maxfd+1, &sel_set, 0 , 0 , &timeout);
+			if (nfds<0){
+				if (errno==EINTR) continue; /* just a signal */
+				/* errors */
+				LOG(L_ERR, "ERROR: tcp_receive_loop: select:(%d) %s\n", errno,
+					strerror(errno));
+				continue;
+			}
+			if (FD_ISSET(unix_sock, &sel_set)){
+				nfds--;
+				/* a new conn from "main" */
+				n=receive_fd(unix_sock, &con, sizeof(con), &s);
+				if (n<0){
+					if (errno == EWOULDBLOCK || errno == EAGAIN ||
+							errno == EINTR){
+						continue;
+					}else{
+						LOG(L_CRIT,"BUG: tcp_receive_loop: read_fd: %s\n",
 							strerror(errno));
-					abort(); /* big error*/
+						abort(); /* big error*/
+					}
 				}
-			}
-			if (n==0){
-					fprintf(stderr, 
-							"WARNING: tcp_receive_loop: 0 bytes read\n");
+				if (n==0){
+					LOG(L_ERR, "WARNING: tcp_receive_loop: 0 bytes read\n");
 					continue;
-			}
-			fprintf(stderr, "received n=%d id=%ld, fd=%d\n", n, id, s);
-			if (s==-1) {
-					fprintf(stderr, "ERROR: tcp_receive_loop: read_fd:"
+				}
+				DBG("received n=%d con=%ld, fd=%d\n", n, con, s);
+				if (s==-1) {
+					LOG(L_ERR, "ERROR: tcp_receive_loop: read_fd:"
 									"no fd read\n");
 					state=-1;
 					goto end_req; /* ?*/
+				}
+				if (con==0){
+					LOG(L_ERR, "ERROR: tcp_receive_loop: null pointer\n");
+					state=-1;
+					goto end_req;
+				}
+				con->fd=s;
+				FD_SET(s, &master_set);
+				if (maxfd<s) maxfd=s;
+				tcpconn_listadd(list, con);
 			}
-		
-		init_tcp_req(&req, s);
-		
-		
-	again:
-		while(req.complete==0 && req.error==TCP_REQ_OK){
-			bytes=tcp_read_headers(&req);
-			/* if timeout state=0; goto end__req; */
+			for (con=list; con && nfds ; con=con->next){
+				if (FD_ISSET(con->fd, &sel_set)){
+					nfds--;
+					req=&con->req;
+again:
+		while(req->complete==0 && req->error==TCP_REQ_OK){
+			bytes=tcp_read_headers(req, s);
+						/* if timeout state=0; goto end__req; */
 			fprintf(stderr, "read= %d bytes, parsed=%d, state=%d, error=%d\n",
-					bytes, req.parsed-req.buf, req.state, req.error );
+					bytes, req->parsed-req->buf, req->state, req->error );
 			if (bytes==-1){
 				fprintf(stderr, "ERROR!\n");
 				state=-1;
@@ -371,19 +405,19 @@ void tcp_receive_loop(int unix_sock)
 			}
 
 		}
-		if (req.error!=TCP_REQ_OK){
+		if (req->error!=TCP_REQ_OK){
 			fprintf(stderr, "bad request, state=%d, error=%d\n",
-					req.state, req.error);
+					req->state, req->error);
 			state=-1;
 			goto end_req;
 		}
 		fprintf(stderr, "end of header part\n");
-		fprintf(stderr, "headers:\n%.*s.\n",req.body-req.buf, req.buf);
-		if (req.has_content_len){
-			fprintf(stderr, "content-length= %d\n", req.content_len);
-			fprintf(stderr, "body:\n%.*s\n", req.content_len, req.body);
+		fprintf(stderr, "headers:\n%.*s.\n",req->body-req->buf, req->buf);
+		if (req->has_content_len){
+			fprintf(stderr, "content-length= %d\n", req->content_len);
+			fprintf(stderr, "body:\n%.*s\n", req->content_len, req->body);
 		}else{
-			req.error=TCP_REQ_BAD_LEN;
+			req->error=TCP_REQ_BAD_LEN;
 			fprintf(stderr, "content length not present or unparsable\n");
 			state=-1;
 			goto end_req;
@@ -393,21 +427,21 @@ void tcp_receive_loop(int unix_sock)
 		state=0;
 		/* just for debugging use sendipv4 as receiving socket */
 		DBG("calling receive_msg(%p, %d, %p)\n",
-				req.buf, (int)(req.parsed-req.buf), &sendipv4->su);
+				req->buf, (int)(req->parsed-req->buf), &sendipv4->su);
 		bind_address=sendipv4;
-		receive_msg(req.buf, req.parsed-req.buf, &sendipv4->su);
+		receive_msg(req->buf, req->parsed-req->buf, &sendipv4->su);
 
 		/* prepare for next request */
-		size=req.pos-req.body;
-		if (size) memmove(req.buf, req.body, size);
+		size=req->pos-req->body;
+		if (size) memmove(req->buf, req->body, size);
 		fprintf(stderr, "\npreparing for new request, kept %ld bytes\n", size);
-		req.pos=req.buf+size;
-		req.parsed=req.buf;
-		req.body=0;
-		req.error=TCP_REQ_OK;
-		req.state=H_STARTWS;
-		req.complete=req.content_len=req.has_content_len=0;
-		req.bytes_to_go=0;
+		req->pos=req->buf+size;
+		req->parsed=req->buf;
+		req->body=0;
+		req->error=TCP_REQ_OK;
+		req->state=H_STARTWS;
+		req->complete=req->content_len=req->has_content_len=0;
+		req->bytes_to_go=0;
 	
 		/* process last req. */
 		
@@ -418,7 +452,7 @@ void tcp_receive_loop(int unix_sock)
 		/* release req & signal the parent */
 		if (s!=-1) close(s);
 		/* errno==EINTR, EWOULDBLOCK a.s.o todo */
-		response[0]=id;
+		response[0]=con;
 		response[1]=state;
 		write(unix_sock, response, sizeof(response));