Browse Source

- even more tcp receive bs
- sip msg.buff & orig are no longer 0 terminated (to avoid an extra copy in the tcp case)

Andrei Pelinescu-Onciul 23 years ago
parent
commit
6ee62314b8
10 changed files with 209 additions and 119 deletions
  1. 3 1
      TODO
  2. 2 1
      forward.c
  3. 8 2
      main.c
  4. 1 1
      parser/msg_parser.c
  5. 8 1
      pt.h
  6. 3 2
      receive.c
  7. 7 3
      tcp_conn.h
  8. 44 23
      tcp_main.c
  9. 132 85
      tcp_read.c
  10. 1 0
      test/stateless.cfg

+ 3 - 1
TODO

@@ -16,6 +16,8 @@ x (different way) add request header bitmap field for the modules
 
 
 High priority:
+- parse_uri should not copy anymore the uri members (and it should not 0
+ terminate them anylonger).
 x fix/replace T_REF/T_UNREF
 x review all the tm locking
 x if () {} else {}
@@ -60,5 +62,5 @@ x jku: branch hash computation over canonical values
 - jku: try CRC as opposed to MD5
 
 
-- freopen stdin, stdou to /dev/null
+x freopen stdin, stdout, stderr to /dev/null
 

+ 2 - 1
forward.c

@@ -224,9 +224,10 @@ int forward_request( struct sip_msg* msg, struct proxy_l * p)
 	}
 	/* sent requests stats */
 	else STATS_TX_REQUEST(  msg->first_line.u.request.method_value );
+	
 	pkg_free(buf);
 	free(to);
-	/* received_buf & line_buf will be freed in receiv_msg by free_lump_list*/
+	/* received_buf & line_buf will be freed in receive_msg by free_lump_list*/
 	return 0;
 
 error1:

+ 8 - 2
main.c

@@ -633,6 +633,8 @@ int main_loop()
 			 * so we open all first*/
 		}
 #ifdef USE_TCP
+			/* start tcp receivers */
+		if (tcp_init_children()<0) goto error;
 			/* start tcp master proc */
 		process_no++;
 		if ((pid=fork())<0){
@@ -641,7 +643,7 @@ int main_loop()
 		}else if (pid==0){
 			/* child */
 			/* is_main=0; */
-			tcp_main();
+			tcp_main_loop();
 		}else{
 			pt[process_no].pid=pid;
 			strncpy(pt[process_no].desc, "tcp main process", MAX_PT_DESC );
@@ -686,7 +688,11 @@ int main_loop()
 		goto error;
 	}
 
-	if (timer_list){
+#ifndef USE_TCP
+	/* if we are using tcp we always need the timer */
+	if (timer_list)
+#endif
+	{
 		/* fork again for the attendant process*/
 		process_no++;
 		if ((pid=fork())<0){

+ 1 - 1
parser/msg_parser.c

@@ -485,7 +485,7 @@ int parse_msg(char* buf, unsigned int len, struct sip_msg* msg)
 	
 error:
 	/* more debugging, msg->orig is/should be null terminated*/
-	LOG(L_ERR, "ERROR: parse_msg: message=<%s>\n", msg->orig);
+	LOG(L_ERR, "ERROR: parse_msg: message=<%.*s>\n", (int)msg->len, msg->orig);
 	return -1;
 }
 

+ 8 - 1
pt.h

@@ -60,7 +60,14 @@ inline static int process_count()
 		/* timer process */
 		+ (timer_list ? 1 : 0 )
 		/* fifo server */
-		+((fifo==NULL || strlen(fifo)==0) ? 0 : 1 );
+		+((fifo==NULL || strlen(fifo)==0) ? 0 : 1 )
+#ifdef USE_TCP
+		+ 1/* tcp main */ + tcp_children_no + 
+		(timer_list ? 0: 1) /* add the timer proc. if not already taken
+							   into account */
+#endif
+		
+		;
 }
 
 

+ 3 - 2
receive.c

@@ -75,7 +75,7 @@ int receive_msg(char* buf, unsigned int len, union sockaddr_union* src_su)
 	msg->len=len;
 	/* zero termination (termination of orig message bellow not that
 	   useful as most of the work is done with scrath-pad; -jiri  */
-	buf[len]=0;
+	/* buf[len]=0; */ /* WARNING: zero term removed! */
 	su2ip_addr(&msg->src_ip, src_su);
 	msg->dst_ip=bind_address->address; /* won't work if listening on 0.0.0.0 */
 	msg->id=msg_no;
@@ -86,7 +86,8 @@ int receive_msg(char* buf, unsigned int len, union sockaddr_union* src_su)
 		goto error01;
 	}
 	memcpy(msg->orig, buf, len);
-	msg->orig[len]=0; /* null terminate it,good for using str* functions
+	/* WARNING: zero term removed! */
+	/* msg->orig[len]=0; */ /* null terminate it,good for using str* functions
 						 on it*/
 	
 	if (parse_msg(buf,len, msg)!=0){

+ 7 - 3
tcp_conn.h

@@ -30,6 +30,7 @@
 #ifndef _tcp_conn_h
 #define _tcp_conn_h
 
+#include "ip_addr.h"
 
 
 #define TCP_BUF_SIZE 65535
@@ -76,8 +77,10 @@ struct tcp_connection{
 	struct tcp_req req; /* request data */
 	int refcnt;
 	int timeout; /* connection timeout, after this it will be removed*/
-	struct tcp_connection* next;
+	struct tcp_connection* next; /* next, prev in hash table, used by "main" */
 	struct tcp_connection* prev;
+	struct tcp_connection* c_next; /* child next prev (use locally) */
+	struct tcp_connection* c_prev;
 };
 
 
@@ -93,7 +96,8 @@ struct tcp_connection{
 
 
 /* add a tcpconn to a list*/
-#define tcpconn_listadd(head, c) \
+/* list head, new element, next member, prev member */
+#define tcpconn_listadd(head, c, next, prev) \
 	do{ \
 		/* add it at the begining of the list*/ \
 		(c)->next=(head); \
@@ -104,7 +108,7 @@ struct tcp_connection{
 
 
 /* remove a tcpconn from a list*/
-#define tcpconn_listrm(head, c) \
+#define tcpconn_listrm(head, c, next, prev) \
 	do{ \
 		if ((head)==(c)) (head)=(c)->next; \
 		if ((c)->next) (c)->next->prev=(c)->prev; \

+ 44 - 23
tcp_main.c

@@ -50,7 +50,10 @@
 #include "pass_fd.h"
 #include "tcp_conn.h"
 #include "globals.h"
+#include "pt.h"
 #include "mem/mem.h"
+#include "mem/shm_mem.h"
+#include "timer.h"
 
 
 
@@ -77,7 +80,7 @@ struct tcp_child tcp_children[MAX_TCP_CHILDREN];
 
 
 
-struct tcp_connection*  tcpconn_add(int sock, union sockaddr_union* su)
+struct tcp_connection*  tcpconn_add(int sock, union sockaddr_union* su, int i)
 {
 	struct tcp_connection *c;
 	
@@ -89,13 +92,14 @@ struct tcp_connection*  tcpconn_add(int sock, union sockaddr_union* su)
 	}
 	c->s=sock;
 	c->su=*su;
+	c->sock_idx=i;
 	c->refcnt=0;
 	su2ip_addr(&c->ip, su);
 	init_tcp_req(&c->req);
 	c->timeout=get_ticks()+TCP_CON_TIMEOUT;
 
 	/* add it at the begining of the list*/
-	tcpconn_listadd(conn_list, c);
+	tcpconn_listadd(conn_list, c, next, prev);
 	return c;
 	
 error:
@@ -106,7 +110,7 @@ error:
 
 void tcpconn_rm(struct tcp_connection* c)
 {
-	tcpconn_listrm(conn_list, c);
+	tcpconn_listrm(conn_list, c, next, prev);
 	shm_free(c);
 }
 
@@ -115,16 +119,16 @@ void tcpconn_rm(struct tcp_connection* c)
 void tcpconn_timeout()
 {
 	struct tcp_connection *c, *next;
-	int jiffies;;
+	int ticks;;
 	
 	
-	jiffies=get_ticks();
+	ticks=get_ticks();
 	c=conn_list;
 	while(c){
 		next=c->next;
-		if ((c->refcnt==0) && (jiffies<c->timeout)) {
-			DBG("tcpconn_timeout: timeout for %p (%d < %d)\n",
-					c, jiffies, c->timeout);
+		if ((c->refcnt==0) && (ticks>c->timeout)) {
+			DBG("tcpconn_timeout: timeout for %p (%d > %d)\n",
+					c, ticks, c->timeout);
 			tcpconn_rm(c);
 		}
 		c=next;
@@ -179,20 +183,34 @@ error:
 static int send2child(struct tcp_connection* tcpconn)
 {
 	int i;
+	int min_busy;
+	int idx;
 	
+	min_busy=tcp_children[0].busy;
+	idx=0;
 	for (i=0; i<tcp_children_no; i++){
 		if (!tcp_children[i].busy){
-			tcp_children[i].busy=1;
-			tcp_children[i].n_reqs++;
-			tcpconn->refcnt++;
-			DBG("send2child: to child %d, %ld\n", i, (long)tcpconn);
-			send_fd(tcp_children[i].s, &tcpconn, sizeof(tcpconn), tcpconn->s);
+			idx=i;
+			min_busy=0;
+			break;
 			return 0;
+		}else if (min_busy>tcp_children[i].busy){
+			min_busy=tcp_children[i].busy;
+			idx=i;
 		}
 	}
-	if (i==tcp_children_no){
-		return -1;
+	
+	tcp_children[idx].busy++;
+	tcp_children[idx].n_reqs++;
+	tcpconn->refcnt++;
+	if (min_busy){
+		LOG(L_WARN, "WARNING: send2child:no free tcp receiver, "
+				" connection passed to the least busy one (%d)\n",
+				min_busy);
 	}
+	DBG("send2child: to child %d, %ld\n", idx, (long)tcpconn);
+	send_fd(tcp_children[idx].s, &tcpconn, sizeof(tcpconn), tcpconn->s);
+	
 	return 0; /* just to fix a warning*/
 }
 
@@ -236,8 +254,8 @@ void tcp_main_loop()
 	
 	while(1){
 		sel_set=master_set;
-		timeout->tv_sec=TCP_MAIN_SELECT_TIMEOUT;
-		timeout->tv_usec=0;
+		timeout.tv_sec=TCP_MAIN_SELECT_TIMEOUT;
+		timeout.tv_usec=0;
 		n=select(maxfd+1, &sel_set, 0 ,0 , &timeout);
 		if (n<0){
 			if (errno==EINTR) continue; /* just a signal */
@@ -260,7 +278,7 @@ void tcp_main_loop()
 				}
 				
 				/* add socket to list */
-				tcpconn=tcpconn_add(new_sock, &su);
+				tcpconn=tcpconn_add(new_sock, &su, r);
 				DBG("tcp_main_loop: new connection: %p %d\n",
 						tcpconn, tcpconn->s);
 				/* pass it to a child */
@@ -298,10 +316,11 @@ void tcp_main_loop()
 read_again:
 				bytes=read(tcp_children[r].s, response, sizeof(response));
 				if (bytes==0){
-					/* EOF -> bad, chidl has died */
+					/* EOF -> bad, child has died */
 					LOG(L_CRIT, "BUG: tcp_main_loop: dead child %d\n", r);
 					/* terminating everybody */
-					exit(-1);
+					FD_CLR(tcp_children[r].s, &master_set);
+					/*exit(-1)*/;
 				}else if (bytes<0){
 					if (errno==EINTR) goto read_again;
 					else{
@@ -351,7 +370,7 @@ read_again:
 
 
 /* starts the tcp processes */
-int tcp_main()
+int tcp_init_children()
 {
 	int r;
 	int sockfd[2];
@@ -373,6 +392,7 @@ int tcp_main()
 			goto error;
 		}
 		
+		process_no++;
 		pid=fork();
 		if (pid<0){
 			LOG(L_ERR, "ERROR: tcp_main: fork failed: %s\n",
@@ -385,14 +405,15 @@ int tcp_main()
 			tcp_children[r].s=sockfd[0];
 			tcp_children[r].busy=0;
 			tcp_children[r].n_reqs=0;
+			pt[process_no].pid=pid;
+			strncpy(pt[process_no].desc, "tcp receiver", MAX_PT_DESC);
 		}else{
 			/* child */
 			close(sockfd[0]);
 			tcp_receive_loop(sockfd[1]);
 		}
 	}
-	
-	tcp_main_loop();
+	return 0;
 error:
 	return -1;
 }

+ 132 - 85
tcp_read.c

@@ -45,6 +45,7 @@
 #include "pass_fd.h"
 #include "globals.h"
 #include "receive.h"
+#include "timer.h"
 
 
 #define q_memchr memchr
@@ -59,7 +60,7 @@ int tcp_read(struct tcp_req *r, int fd)
 	bytes_free=TCP_BUF_SIZE- (int)(r->pos - r->buf);
 	
 	if (bytes_free==0){
-		fprintf(stderr, "buffer overrun, dropping\n");
+		LOG(L_ERR, "ERROR: tcp_read: buffer overrun, dropping\n");
 		r->error=TCP_REQ_OVERRUN;
 		return -1;
 	}
@@ -71,7 +72,7 @@ again:
 			return 0; /* nothing has been read */
 		}else if (errno == EINTR) goto again;
 		else{
-			fprintf(stderr, "error reading: %s\n", strerror(errno));
+			LOG(L_ERR, "ERROR: tcp_read: error reading: %s\n",strerror(errno));
 			r->error=TCP_READ_ERROR;
 			return -1;
 		}
@@ -299,7 +300,8 @@ int tcp_read_headers(struct tcp_req *r, int fd)
 				break;
 			
 			default:
-				fprintf(stderr, "BUG: unexpected state %d\n", r->state);
+				LOG(L_CRIT, "BUG: tcp_read_headers: unexpected state %d\n",
+						r->state);
 				abort();
 		}
 	}
@@ -310,23 +312,113 @@ skip:
 
 
 
+int tcp_read_req(struct tcp_connection* con)
+{
+	int bytes;
+	int state;
+	long size;
+	struct tcp_req* req;
+	int s;
+		
+		state=0;
+		s=con->fd;
+		req=&con->req;
+		if(req->complete==0 && req->error==TCP_REQ_OK){
+			bytes=tcp_read_headers(req, s);
+						/* if timeout state=0; goto end__req; */
+			DBG("read= %d bytes, parsed=%d, state=%d, error=%d\n",
+					bytes, req->parsed-req->buf, req->state, req->error );
+			if (bytes==-1){
+				LOG(L_ERR, "ERROR: tcp_read_req: error reading \n");
+				state=-1;
+				goto end_req;
+			}
+			if (bytes==0){
+				DBG( "tcp_read_req: EOF\n");
+				state=-1;
+				goto end_req;
+			}
+		
+		}
+		if (req->error!=TCP_REQ_OK){
+			LOG(L_ERR,"ERROR: tcp_read_req: bad request, state=%d, error=%d\n",
+					req->state, req->error);
+			state=-1;
+			goto end_req;
+		}
+		if (req->complete){
+			DBG("tcp_read_req: end of header part\n");
+			DBG("tcp_read_req: headers:\n%.*s.\n",
+					req->body-req->buf, req->buf);
+			if (req->has_content_len){
+				DBG("tcp_read_req: content-length= %d\n", req->content_len);
+				DBG("tcp_read_req: body:\n%.*s\n", req->content_len,req->body);
+			}else{
+				req->error=TCP_REQ_BAD_LEN;
+				LOG(L_ERR, "ERROR: tcp_read_req: content length not present or"
+						" unparsable\n");
+				state=-1;
+				goto end_req;
+			}
+			/* if we are here everything is nice and ok*/
+			state=0;
+			/* just for debugging use sendipv4 as receiving socket */
+			DBG("calling receive_msg(%p, %d, )\n",
+					req->buf, (int)(req->parsed-req->buf));
+			bind_address=sendipv4; /*&tcp_info[con->sock_idx];*/
+			receive_msg(req->buf, req->parsed-req->buf, &con->su);
+			
+			/* prepare for next request */
+			size=req->pos-req->body;
+			if (size) memmove(req->buf, req->body, size);
+			DBG("tcp_read_req: preparing for new request, kept %ld bytes\n",
+					size);
+			req->pos=req->buf+size;
+			req->parsed=req->buf;
+			req->body=0;
+			req->error=TCP_REQ_OK;
+			req->state=H_STARTWS;
+			req->complete=req->content_len=req->has_content_len=0;
+			req->bytes_to_go=0;
+			
+		}
+		
+		
+	end_req:
+		return state;
+}
+
+
+
+void release_tcpconn(struct tcp_connection* c, long state, int unix_sock)
+{
+	long response[2];
+	
+		DBG( "releasing con %p, state %ld\n", c, state );
+		/* release req & signal the parent */
+		if (c->fd!=-1) close(c->fd);
+		/* errno==EINTR, EWOULDBLOCK a.s.o todo */
+		response[0]=(long)c;
+		response[1]=state;
+		write(unix_sock, response, sizeof(response));
+}
+
+
 
 void tcp_receive_loop(int unix_sock)
 {
-	struct tcp_req* req;
 	struct tcp_connection* list; /* list with connections in use */
 	struct tcp_connection* con;
-	int bytes;
-	long size;
+	struct tcp_connection* c_next;
 	int n;
 	int nfds;
 	int s;
 	long state;
-	long response[2];
 	fd_set master_set;
 	fd_set sel_set;
 	int maxfd;
 	struct timeval timeout;
+	int ticks;
 	
 	
 	/* init */
@@ -337,8 +429,8 @@ void tcp_receive_loop(int unix_sock)
 	
 	/* listen on the unix socket for the fd */
 	for(;;){
-			timeout->tv_sec=TCP_CHILD_SELECT_TIMEOUT;
-			timeout->tv_usec=0;
+			timeout.tv_sec=TCP_CHILD_SELECT_TIMEOUT;
+			timeout.tv_usec=0;
 			sel_set=master_set;
 			nfds=select(maxfd+1, &sel_set, 0 , 0 , &timeout);
 			if (nfds<0){
@@ -366,97 +458,52 @@ void tcp_receive_loop(int unix_sock)
 					LOG(L_ERR, "WARNING: tcp_receive_loop: 0 bytes read\n");
 					continue;
 				}
-				DBG("received n=%d con=%ld, fd=%d\n", n, con, s);
+				con->fd=s;
+				DBG("received n=%d con=%p, fd=%d\n", n, con, s);
 				if (s==-1) {
 					LOG(L_ERR, "ERROR: tcp_receive_loop: read_fd:"
 									"no fd read\n");
 					state=-1;
-					goto end_req; /* ?*/
+					release_tcpconn(con, state, unix_sock);
 				}
 				if (con==0){
 					LOG(L_ERR, "ERROR: tcp_receive_loop: null pointer\n");
 					state=-1;
-					goto end_req;
+					release_tcpconn(con, state, unix_sock);
 				}
-				con->fd=s;
+				con->timeout=get_ticks()+TCP_CHILD_TIMEOUT;
 				FD_SET(s, &master_set);
 				if (maxfd<s) maxfd=s;
-				tcpconn_listadd(list, con);
+				tcpconn_listadd(list, con, c_next, c_prev);
 			}
-			for (con=list; con && nfds ; con=con->next){
-				if (FD_ISSET(con->fd, &sel_set)){
+			ticks=get_ticks();
+			for (con=list; con ; con=c_next){
+				c_next=con->c_next; /* safe for removing*/
+				if (nfds && FD_ISSET(con->fd, &sel_set)){
 					nfds--;
-					req=&con->req;
-again:
-		while(req->complete==0 && req->error==TCP_REQ_OK){
-			bytes=tcp_read_headers(req, s);
-						/* if timeout state=0; goto end__req; */
-			fprintf(stderr, "read= %d bytes, parsed=%d, state=%d, error=%d\n",
-					bytes, req->parsed-req->buf, req->state, req->error );
-			if (bytes==-1){
-				fprintf(stderr, "ERROR!\n");
-				state=-1;
-				goto end_req;
-			}
-			if (bytes==0){
-				fprintf(stderr, "EOF!\n");
-				state=-1;
-				goto end_req;
+					state=tcp_read_req(con);
+					if (state==-1){
+						FD_CLR(con->fd, &master_set);
+						tcpconn_listrm(list, con, c_next, c_prev);
+						release_tcpconn(con, state, unix_sock);
+					}else{
+						/* update timeout */
+						con->timeout=ticks+TCP_CHILD_TIMEOUT;
+					}
+				}else{
+					/* timeout */
+					if (con->timeout<=ticks){
+						/* expired, return to "tcp main" */
+						DBG("tcp_receive_loop: %p expired (%d, %d)\n",
+								con, con->timeout, ticks);
+						state=0;
+						FD_CLR(con->fd, &master_set);
+						tcpconn_listrm(list, con, c_next, c_prev);
+						release_tcpconn(con, state, unix_sock);
+					}
+				}
 			}
-
-		}
-		if (req->error!=TCP_REQ_OK){
-			fprintf(stderr, "bad request, state=%d, error=%d\n",
-					req->state, req->error);
-			state=-1;
-			goto end_req;
-		}
-		fprintf(stderr, "end of header part\n");
-		fprintf(stderr, "headers:\n%.*s.\n",req->body-req->buf, req->buf);
-		if (req->has_content_len){
-			fprintf(stderr, "content-length= %d\n", req->content_len);
-			fprintf(stderr, "body:\n%.*s\n", req->content_len, req->body);
-		}else{
-			req->error=TCP_REQ_BAD_LEN;
-			fprintf(stderr, "content length not present or unparsable\n");
-			state=-1;
-			goto end_req;
-		}
-
-		/* if we are here everything is nice and ok*/
-		state=0;
-		/* just for debugging use sendipv4 as receiving socket */
-		DBG("calling receive_msg(%p, %d, %p)\n",
-				req->buf, (int)(req->parsed-req->buf), &sendipv4->su);
-		bind_address=sendipv4;
-		receive_msg(req->buf, req->parsed-req->buf, &sendipv4->su);
-
-		/* prepare for next request */
-		size=req->pos-req->body;
-		if (size) memmove(req->buf, req->body, size);
-		fprintf(stderr, "\npreparing for new request, kept %ld bytes\n", size);
-		req->pos=req->buf+size;
-		req->parsed=req->buf;
-		req->body=0;
-		req->error=TCP_REQ_OK;
-		req->state=H_STARTWS;
-		req->complete=req->content_len=req->has_content_len=0;
-		req->bytes_to_go=0;
-	
-		/* process last req. */
 		
-		goto again;
-		
-	end_req:
-			fprintf(stderr, "end req\n");
-		/* release req & signal the parent */
-		if (s!=-1) close(s);
-		/* errno==EINTR, EWOULDBLOCK a.s.o todo */
-		response[0]=con;
-		response[1]=state;
-		write(unix_sock, response, sizeof(response));
-		
-	
 	}
 }
 

+ 1 - 0
test/stateless.cfg

@@ -25,6 +25,7 @@ rev_dns=off      # (cmd. line: -R)
 # for more info: sip_router -h
 alias=iptel.org
 alias="foo.bar"
+fifo="/tmp/ser_fifo"
 
 #modules