Browse Source

- more tcp stuff and a lot of merging w/ latest cvs
- content-length is appended automatically to messages that cross from udp to
tcp
- tcp2udp and udp2tcp now work under heavy stress
(e.g.: throttle 200, 10 ser processes on dual cpu)
- tcp performance still sucks, some things like disabling Nagle are still not in yet (for better debugging)

Andrei Pelinescu-Onciul 22 years ago
parent
commit
ab130758c9
14 changed files with 275 additions and 65 deletions
  1. 1 1
      Makefile.defs
  2. 1 0
      NEWS
  3. 2 1
      TODO
  4. 1 1
      data_lump.c
  5. 4 3
      forward.c
  6. 0 14
      hash_func.c
  7. 0 1
      hash_func.h
  8. 3 6
      main.c
  9. 133 4
      msg_translator.c
  10. 1 1
      receive.c
  11. 6 2
      tcp_conn.h
  12. 82 19
      tcp_main.c
  13. 39 11
      tcp_read.c
  14. 2 1
      tcp_server.h

+ 1 - 1
Makefile.defs

@@ -8,7 +8,7 @@
 VERSION = 0
 PATCHLEVEL = 8
 SUBLEVEL =   11
-EXTRAVERSION = pre5-tcp1-locking
+EXTRAVERSION = pre6-tcp2
 
 RELEASE=$(VERSION).$(PATCHLEVEL).$(SUBLEVEL)$(EXTRAVERSION)
 OS = $(shell uname -s | sed -e s/SunOS/solaris/ | tr "[A-Z]" "[a-z]")

+ 1 - 0
NEWS

@@ -38,6 +38,7 @@ New features
 - powerpc fast locking support
 - netbsd support
 - 64 bits arch. support (e.g. netbsd/sparc64).
+- tcp2udp and udp2tcp stateless forwarding (see forward_udp & forward_tcp)
 
 Changes to use of ser scripts
 =============================

+ 2 - 1
TODO

@@ -10,7 +10,7 @@ x fix 0 parameter module f. call
 x better Via parsing (handle ' ' in uri, eg: foo.bar : 1234 ; received=) and
  ipv6 addresses ([fec0:aa::01]).
 - fix format string vulnerability in log()
-- fix alignement access problems (warning on Sun)
+- fix alignment access problems (warning on Sun)
 x (different way) add request header bitmap field for the modules
 - introduce variables & function in the script language (cfg. file)
 
@@ -69,4 +69,5 @@ x freopen stdin, stdout, stderr to /dev/null
 x generic locking lib
 - convert tm to use new locking lib
 - tcp disable nagle & other socket stuff (close()?)
+- force add rport (setflag(rport)???)
 

+ 1 - 1
data_lump.c

@@ -43,7 +43,7 @@
 #include <dmalloc.h>
 #endif
 
-/* WARNING: all lump add/insert operations excpect a pkg_malloc'ed char* 
+/* WARNING: all lump add/insert operations expect a pkg_malloc'ed char* 
  * pointer the will be DEALLOCATED when the sip_msg is destroyed! */
 
 enum lump_dir { LD_NEXT, LD_BEFORE, LD_AFTER };

+ 4 - 3
forward.c

@@ -312,7 +312,7 @@ int forward_request( struct sip_msg* msg, struct proxy_l * p, int proto)
 		goto error1;
 	}
 	 /* send it! */
-	DBG("Sending:\n%s.\n", buf);
+	DBG("Sending:\n%.*s.\n", (int)len, buf);
 	DBG("orig. len=%d, new_len=%d, proto=%d\n", msg->len, len, proto );
 	
 	
@@ -510,8 +510,9 @@ int forward_reply(struct sip_msg* msg)
 		STATS_TX_RESPONSE(  (msg->first_line.u.reply.statuscode/100) );
 #endif
 
-	DBG(" reply forwarded to %s:%d\n", msg->via2->host.s,
-		(unsigned short) msg->via2->port);
+	DBG(" reply forwarded to %.*s:%d\n", 
+			msg->via2->host.len, msg->via2->host.s,
+			(unsigned short) msg->via2->port);
 
 	pkg_free(new_buf);
 	free(to);

+ 0 - 14
hash_func.c

@@ -154,25 +154,11 @@ void hashtest_cycle( int hits[TABLE_ENTRIES+5], char *ip )
 				}
 }
 
-int init_hash()
-{
-	if (TABLE_ENTRIES != (1<<10)) {
-		LOG(L_WARN, "WARNING: hash function optimized for %d entries\n",
-			1<<10);
-		LOG(L_WARN, "WARNING: use of %d entries may lead "
-			"to unflat distribution\n", TABLE_ENTRIES );
-	} else {
-		DBG("DEBUG: hash function initialized with optimum table size\n");
-	}
-	return 1;
-}
-
 void hashtest()
 {
 	int hits[TABLE_ENTRIES+5];
 	int i;
 
-	init_hash();	
 	memset( hits, 0, sizeof hits );
 	hashtest_cycle( hits, "192.168.99.100" );
 	hashtest_cycle( hits, "172.168.99.100" );

+ 0 - 1
hash_func.h

@@ -39,7 +39,6 @@
 int new_hash( str  call_id, str cseq_nr );
 int new_hash2( str  call_id, str cseq_nr );
 
-int init_hash();
 
 #define hash( cid, cseq) new_hash2( cid, cseq )
 

+ 3 - 6
main.c

@@ -880,8 +880,10 @@ static void sig_usr(int signo)
 	}else{
 		/* process the important signals */
 		switch(signo){
-			case SIGINT:
 			case SIGPIPE:
+					LOG(L_INFO, "INFO: signal %d received\n", signo);
+				break;
+			case SIGINT:
 			case SIGTERM:
 					LOG(L_INFO, "INFO: signal %d received\n", signo);
 					/* print memory stats for non-main too */
@@ -1280,11 +1282,6 @@ try_again:
 	DBG("test random number %u\n", rand());
 	
 	
-	/* init hash fucntion */
-	if (init_hash()<0) {
-		LOG(L_ERR, "ERROR: init_hash failed\n");
-		goto error;
-	}
 
 	/*init mallocs (before parsing cfg !)*/
 	if (init_mallocs()==-1)

+ 133 - 4
msg_translator.c

@@ -342,6 +342,44 @@ char* id_builder(struct sip_msg* msg, unsigned int *id_len)
 
 
 
+char* clen_builder(struct sip_msg* msg, unsigned int *clen_len)
+{
+	char* buf;
+	int len;
+	int value;
+	char* value_s;
+	int value_len;
+	char* body;
+	
+	
+	body=get_body(msg);
+	if (body==0){
+		ser_error=E_BAD_REQ;
+		LOG(L_ERR, "ERROR: clen_builder: no message body found"
+					" (missing crlf?)");
+		return 0;
+	}
+	value=msg->len-(int)(body-msg->buf);
+	value_s=int2str(value, &value_len);
+	DBG("clen_builder: content-length: %d (%s)\n", value, value_s);
+		
+	len=CONTENT_LENGTH_LEN+value_len+CRLF_LEN;
+	buf=pkg_malloc(sizeof(char)*(len+1));
+	if (buf==0){
+		ser_error=E_OUT_OF_MEM;
+		LOG(L_ERR, "ERROR: clen_builder: out of memory\n");
+		return 0;
+	}
+	memcpy(buf, CONTENT_LENGTH, CONTENT_LENGTH_LEN);
+	memcpy(buf+CONTENT_LENGTH_LEN, value_s, value_len);
+	memcpy(buf+CONTENT_LENGTH_LEN+value_len, CRLF, CRLF_LEN);
+	buf[len+1]=0; /* null terminate it */
+	*clen_len=len;
+	return buf;
+}
+
+
+
 /* computes the "unpacked" len of a lump list,
    code moved from build_req_from_req */
 static inline int lumps_len(struct lump* l)
@@ -548,10 +586,14 @@ char * build_req_buf_from_sip_req( struct sip_msg* msg,
 #ifdef USE_TCP
 	char* id_buf;
 	int id_len;
+	char* clen_buf;
+	int clen_len;
 	
 	
 	id_buf=0;
 	id_len=0;
+	clen_buf=0;
+	clen_len=0;
 #endif
 	extra_params.len=0;
 	extra_params.s=0;
@@ -580,6 +622,24 @@ char * build_req_buf_from_sip_req( struct sip_msg* msg,
 		extra_params.s=id_buf;
 		extra_params.len=id_len;
 	}
+	/* if sending proto == tcp, check if Content-Length needs to be added*/
+	if (proto==PROTO_TCP){
+		/* first of all parse content-length */
+		if (parse_headers(msg, HDR_CONTENTLENGTH, 0)==-1){
+			LOG(L_ERR, "build_req_buf_from_sip_req:"
+							" error parsing content-length\n");
+			goto skip_clen;
+		}
+		if (msg->content_length==0){
+			/* we need to add it */
+			if ((clen_buf=clen_builder(msg, &clen_len))==0){
+				LOG(L_ERR, "build_req_buf_from_sip_req:" 
+								" clen_builder failed\n");
+				goto skip_clen;
+			}
+		}
+	}
+skip_clen:
 #endif
 	branch.s=msg->add_to_branch_s;
 	branch.len=msg->add_to_branch_len;
@@ -652,6 +712,19 @@ char * build_req_buf_from_sip_req( struct sip_msg* msg,
 		if (insert_new_lump_after(anchor, rport_buf, rport_len, HDR_VIA)==0)
 			goto error03; /* free rport_buf*/
 	}
+#ifdef USE_TCP
+	/* if clen needs to be added, add it */
+	if (clen_len){
+		/* msg->unparsed should point just before the final crlf,
+		 * parse_headers is called from clen_builder */
+		anchor=anchor_lump(&(msg->add_rm), msg->unparsed-buf, 0,
+							 HDR_CONTENTLENGTH);
+		if (anchor==0) goto error04; /* free clen_buf*/
+		if (insert_new_lump_after(anchor, clen_buf, clen_len,
+					HDR_CONTENTLENGTH)==0)
+			goto error04; /* free clen_buf*/
+	}
+#endif
 
 	/* compute new msg len and fix overlapping zones*/
 	new_len=len+lumps_len(msg->add_rm);
@@ -716,12 +789,17 @@ error02:
 	if (received_buf) pkg_free(received_buf);
 error03:
 	if (rport_buf) pkg_free(rport_buf);
+#ifdef USE_TCP
+error04:
+	if (clen_buf) pkg_free(clen_buf);
+#endif
 error00:
 	*returned_len=0;
 	return 0;
 }
 
 
+
 char * build_res_buf_from_sip_res( struct sip_msg* msg,
 				unsigned int *returned_len)
 {
@@ -733,7 +811,14 @@ char * build_res_buf_from_sip_res( struct sip_msg* msg,
 #endif
 	char* buf;
 	unsigned int len;
-
+#ifdef USE_TCP
+	struct lump* anchor;
+	char* clen_buf;
+	int clen_len;
+	
+	clen_buf=0;
+	clen_len=0;
+#endif
 #ifdef SCRATCH
 	orig=msg->orig;
 #endif
@@ -763,12 +848,53 @@ char * build_res_buf_from_sip_res( struct sip_msg* msg,
 		via_offset=msg->h_via1->name.s-buf;
 	}
 #endif
+
+#ifdef USE_TCP
+
+	/* if sending proto == tcp, check if Content-Length needs to be added*/
+	if (msg->via2 && (msg->via2->proto==PROTO_TCP)){
+		DBG("build_res_from_sip_res: checking content-length for \n%.*s\n",
+				(int)msg->len, msg->buf);
+		/* first of all parse content-length */
+		if (parse_headers(msg, HDR_CONTENTLENGTH, 0)==-1){
+			LOG(L_ERR, "build_res_buf_from_sip_res:"
+							" error parsing content-length\n");
+			goto skip_clen;
+		}
+		if (msg->content_length==0){
+			DBG("build_res_from_sip_res: no content_length hdr found\n");
+			/* we need to add it */
+			if ((clen_buf=clen_builder(msg, &clen_len))==0){
+				LOG(L_ERR, "build_res_buf_from_sip_res:" 
+								" clen_builder failed\n");
+				goto skip_clen;
+			}
+		}
+	}
+skip_clen:
+#endif
+	
 	/* remove the first via*/
 	if (del_lump( &(msg->repl_add_rm), via_offset, via_len, HDR_VIA)==0){
 		LOG(L_ERR, "build_res_buf_from_sip_res: error trying to remove first"
 					"via\n");
 		goto error;
 	}
+#ifdef USE_TCP
+	/* if clen needs to be added, add it */
+	if (clen_len){
+		/* msg->unparsed should point just before the final crlf,
+		 * parse_headers is called from clen_builder */
+		anchor=anchor_lump(&(msg->repl_add_rm), msg->unparsed-buf, 0, 
+							HDR_CONTENTLENGTH);
+		DBG("build_res_from_sip_res: adding content-length: %.*s\n",
+				clen_len, clen_buf);
+		if (anchor==0) goto error_clen; /* free clen_buf*/
+		if (insert_new_lump_after(anchor, clen_buf, clen_len,
+					HDR_CONTENTLENGTH)==0)
+			goto error_clen; /* free clen_buf*/
+	}
+#endif
 	new_len=len+lumps_len(msg->repl_add_rm);
 
 	DBG(" old size: %d, new size: %d\n", len, new_len);
@@ -796,12 +922,15 @@ char * build_res_buf_from_sip_res( struct sip_msg* msg,
 #endif
 		len-s_offset);
 	 /* send it! */
-	DBG(" copied size: orig:%d, new: %d, rest: %d\n",
-			s_offset, offset,
-			len-s_offset );
+	DBG("build_res_from_sip_res: copied size: orig:%d, new: %d, rest: %d"
+			" msg=\n%s\n", s_offset, offset, len-s_offset, new_buf);
 
 	*returned_len=new_len;
 	return new_buf;
+#ifdef USE_TCP
+error_clen:
+	if (clen_buf) pkg_free(clen_buf);
+#endif
 error:
 	*returned_len=0;
 	return 0;

+ 1 - 1
receive.c

@@ -28,6 +28,7 @@
  * ---------
  * 2003-01-29 transport-independent message zero-termination in
  *            receive_msg (jiri)
+ * 2003-02-07 undoed jiri's zero term. changes (they break tcp) (andrei)
  */
 
 
@@ -81,7 +82,6 @@ int receive_msg(char* buf, unsigned int len, struct receive_info* rcv_info)
 	/* zero termination (termination of orig message bellow not that
 	   useful as most of the work is done with scrath-pad; -jiri  */
 	/* buf[len]=0; */ /* WARNING: zero term removed! */
-	buf[len]=0; /* transport-independent zero-termination */
 	msg->rcv=*rcv_info;
 	msg->id=msg_no;
 #ifdef SCRATCH

+ 6 - 2
tcp_conn.h

@@ -36,10 +36,12 @@
 #define _tcp_conn_h
 
 #include "ip_addr.h"
+#include "locking.h"
 
 
 #define TCP_BUF_SIZE 65535
 #define TCP_CON_TIMEOUT 60 /* in  seconds */
+#define TCP_CON_SEND_TIMEOUT 30 /* timeout after a send */
 #define TCP_CHILD_TIMEOUT 5 /* after 5 seconds, the child "returns" 
 							 the connection to the tcp master process */
 #define TCP_MAIN_SELECT_TIMEOUT 5 /* how often "tcp main" checks for timeout*/
@@ -82,6 +84,7 @@ struct tcp_req{
 struct tcp_connection{
 	int s; /*socket, used by "tcp main" */
 	int fd; /* used only by "children", don't modify it! private data! */
+	lock_t write_lock;
 	int id; /* id (unique!) used to retrieve a specific connection when
 	           reply-ing*/
 	struct receive_info rcv; /* src & dst ip, ports, proto a.s.o*/
@@ -92,9 +95,10 @@ struct tcp_connection{
 	union sockaddr_union su;
 #endif
 	struct tcp_req req; /* request data */
-	int refcnt;
+	volatile int refcnt;
+	int bad; /* if set this is a "bad" connection */
 	int timeout; /* connection timeout, after this it will be removed*/
-	unsigned addr_hash; /* hash indexes in thge 2 tables */
+	unsigned addr_hash; /* hash indexes in the 2 tables */
 	unsigned id_hash;
 	struct tcp_connection* next; /* next, prev in hash table, used by "main" */
 	struct tcp_connection* prev;

+ 82 - 19
tcp_main.c

@@ -38,6 +38,7 @@
 #include <sys/time.h>
 #include <sys/types.h>
 #include <sys/socket.h>
+#include <sys/uio.h>  /* writev*/
 
 #include <unistd.h>
 
@@ -102,9 +103,15 @@ struct tcp_connection* tcpconn_new(int sock, union sockaddr_union* su,
 	}
 	c->s=sock;
 	c->fd=-1; /* not initialized */
+	if (lock_init(&c->write_lock)==0){
+		LOG(L_ERR, "ERROR: tcpconn_add: init lock failed\n");
+		goto error;
+	}
+	
 	c->rcv.src_su=*su;
 	
 	c->refcnt=0;
+	c->bad=0;
 	su2ip_addr(&c->rcv.src_ip, su);
 	c->rcv.src_port=su_getport(su);
 	c->rcv.proto=PROTO_TCP;
@@ -171,6 +178,16 @@ struct tcp_connection*  tcpconn_add(struct tcp_connection *c)
 }
 
 
+/* unsafe tcpconn_rm version (nolocks) */
+void _tcpconn_rm(struct tcp_connection* c)
+{
+	tcpconn_listrm(tcpconn_addr_hash[c->addr_hash], c, next, prev);
+	tcpconn_listrm(tcpconn_id_hash[c->id_hash], c, id_next, id_prev);
+	lock_destroy(&c->write_lock);
+	shm_free(c);
+}
+
+
 
 void tcpconn_rm(struct tcp_connection* c)
 {
@@ -178,6 +195,7 @@ void tcpconn_rm(struct tcp_connection* c)
 	tcpconn_listrm(tcpconn_addr_hash[c->addr_hash], c, next, prev);
 	tcpconn_listrm(tcpconn_id_hash[c->id_hash], c, id_next, id_prev);
 	TCPCONN_UNLOCK;
+	lock_destroy(&c->write_lock);
 	shm_free(c);
 }
 
@@ -198,7 +216,7 @@ struct tcp_connection* _tcpconn_find(int id, struct ip_addr* ip, int port)
 			DBG("c=%p, c->id=%d, ip=",c, c->id);
 			print_ip(&c->rcv.src_ip);
 			DBG(" port=%d\n", ntohs(c->rcv.src_port));
-			if (id==c->id) return c;
+			if ((id==c->id)&&(!c->bad)) return c;
 		}
 	}else if (ip){
 		hash=tcp_addr_hash(ip, port);
@@ -206,7 +224,8 @@ struct tcp_connection* _tcpconn_find(int id, struct ip_addr* ip, int port)
 			DBG("c=%p, c->id=%d, ip=",c, c->id);
 			print_ip(&c->rcv.src_ip);
 			DBG(" port=%d\n", ntohs(c->rcv.src_port));
-			if ( (port==c->rcv.src_port) && (ip_addr_cmp(ip, &c->rcv.src_ip)) )
+			if ( (!c->bad) && (port==c->rcv.src_port) &&
+					(ip_addr_cmp(ip, &c->rcv.src_ip)) )
 				return c;
 		}
 	}
@@ -215,13 +234,17 @@ struct tcp_connection* _tcpconn_find(int id, struct ip_addr* ip, int port)
 
 
 
-/* _tcpconn_find with locks */
-struct tcp_connection* tcpconn_get(int id, struct ip_addr* ip, int port)
+/* _tcpconn_find with locks and timeout */
+struct tcp_connection* tcpconn_get(int id, struct ip_addr* ip, int port,
+									int timeout)
 {
 	struct tcp_connection* c;
 	TCPCONN_LOCK;
 	c=_tcpconn_find(id, ip, port);
-	if (c) c->refcnt++;
+	if (c){ 
+			c->refcnt++;
+			c->timeout=get_ticks()+timeout;
+	}
 	TCPCONN_UNLOCK;
 	return c;
 }
@@ -249,9 +272,9 @@ int tcp_send(char* buf, unsigned len, union sockaddr_union* to, int id)
 	if (to){
 		su2ip_addr(&ip, to);
 		port=su_getport(to);
-		c=tcpconn_get(id, &ip, port); /* lock ;inc refcnt; unlock */
+		c=tcpconn_get(id, &ip, port, TCP_CON_SEND_TIMEOUT); 
 	}else if (id){
-		c=tcpconn_get(id, 0, 0);
+		c=tcpconn_get(id, 0, 0, TCP_CON_SEND_TIMEOUT);
 	}else{
 		LOG(L_CRIT, "BUG: tcp_send called with null id & to\n");
 		return -1;
@@ -260,7 +283,8 @@ int tcp_send(char* buf, unsigned len, union sockaddr_union* to, int id)
 	if (id){
 		if (c==0) {
 			if (to){
-				c=tcpconn_get(0, &ip, port); /* try again w/o id */
+				/* try again w/o id */
+				c=tcpconn_get(0, &ip, port, TCP_CON_SEND_TIMEOUT);
 				goto no_id;
 			}else{
 				LOG(L_ERR, "ERROR: tcp_send: id %d not found, dropping\n",
@@ -323,8 +347,27 @@ get_fd:
 	
 send_it:
 	DBG("tcp_send: sending...\n");
-	n=write(fd, buf, len);
+	lock_get(&c->write_lock);
+	n=send(fd, buf, len, MSG_NOSIGNAL);
+	lock_release(&c->write_lock);
 	DBG("tcp_send: after write: c= %p n=%d fd=%d\n",c, n, fd);
+	DBG("tcp_send: buf=\n%.*s\n", (int)len, buf);
+	if (n<0){
+		LOG(L_ERR, "ERROR: tcpsend: failed to send, n=%d: %s (%d)\n",
+				n, strerror(errno), errno);
+		/* error on the connection , mark it as bad and set 0 timeout */
+		c->bad=1;
+		c->timeout=0;
+		/* tell "main" it should drop this (optional it will t/o anyway?)*/
+		response[0]=(long)c;
+		response[1]=CONN_ERROR;
+		n=write(unix_tcp_sock, response, sizeof(response));
+		if (n<0){
+			LOG(L_ERR, "BUG: tcp_send: failed to get fd(write):%s (%d)\n",
+					strerror(errno), errno);
+			goto release_c;
+		}
+	}
 end:
 	close(fd);
 release_c:
@@ -343,6 +386,7 @@ void tcpconn_timeout(fd_set* set)
 	
 	
 	ticks=get_ticks();
+	TCPCONN_LOCK; /* fixme: we can lock only on delete IMO */
 	for(h=0; h<TCP_ADDR_HASH_SIZE; h++){
 		c=tcpconn_addr_hash[h];
 		while(c){
@@ -354,11 +398,12 @@ void tcpconn_timeout(fd_set* set)
 					FD_CLR(c->s, set);
 					close(c->s);
 				}
-				tcpconn_rm(c);
+				_tcpconn_rm(c);
 			}
 			c=next;
 		}
 	}
+	TCPCONN_UNLOCK;
 }
 
 
@@ -515,8 +560,12 @@ void tcp_main_loop()
 					if(send2child(tcpconn)<0){
 						LOG(L_ERR,"ERROR: tcp_main_loop: no children "
 								"available\n");
-						close(tcpconn->s);
-						tcpconn_rm(tcpconn);
+						TCPCONN_LOCK;
+						if (tcpconn->refcnt==0){
+							close(tcpconn->s);
+							_tcpconn_rm(tcpconn);
+						}else tcpconn->timeout=0; /* force expire */
+						TCPCONN_UNLOCK;
 					}
 				}
 			}
@@ -536,8 +585,12 @@ void tcp_main_loop()
 					if (send2child(tcpconn)<0){
 						LOG(L_ERR,"ERROR: tcp_main_loop: no "
 									"children available\n");
-						close(tcpconn->s);
-						tcpconn_rm(tcpconn);
+						TCPCONN_LOCK;
+						if (tcpconn->refcnt==0){
+							close(tcpconn->s);
+							_tcpconn_rm(tcpconn);
+						}else tcpconn->timeout=0; /* force expire*/
+						TCPCONN_UNLOCK;
 					}
 				}
 			}
@@ -578,13 +631,14 @@ read_again:
 						}
 						tcpconn=(struct tcp_connection*)response[0];
 						if (tcpconn){
-							tcpconn->refcnt--;
-							DBG("tcp_main_loop: %p refcnt= %d\n", 
-									tcpconn, tcpconn->refcnt);
+								if (tcpconn->bad) goto tcpconn_destroy;
 								FD_SET(tcpconn->s, &master_set);
 								if (maxfd<tcpconn->s) maxfd=tcpconn->s;
 								/* update the timeout*/
 								tcpconn->timeout=get_ticks()+TCP_CON_TIMEOUT;
+								tcpconn_put(tcpconn);
+								DBG("tcp_main_loop: %p refcnt= %d\n", 
+									tcpconn, tcpconn->refcnt);
 						}
 						break;
 					case CONN_ERROR:
@@ -597,14 +651,23 @@ read_again:
 						}
 						tcpconn=(struct tcp_connection*)response[0];
 						if (tcpconn){
+							if (tcpconn->s!=-1)
+								FD_CLR(tcpconn->s, &master_set);
+		tcpconn_destroy:
+							TCPCONN_LOCK; /*avoid races w/ tcp_send*/
 							tcpconn->refcnt--;
-							if (tcpconn->refcnt==0){
+							if (tcpconn->refcnt==0){ 
 								DBG("tcp_main_loop: destroying connection\n");
 								close(tcpconn->s);
-								tcpconn_rm(tcpconn);
+								_tcpconn_rm(tcpconn);
 							}else{
+								/* force timeout */
+								tcpconn->timeout=0;
+								tcpconn->bad=1;
 								DBG("tcp_main_loop: delaying ...\n");
+								
 							}
+							TCPCONN_UNLOCK;
 						}
 						break;
 					case CONN_GET_FD:

+ 39 - 11
tcp_read.c

@@ -46,9 +46,9 @@
 #include "globals.h"
 #include "receive.h"
 #include "timer.h"
+#include "ut.h"
 
 
-#define q_memchr memchr
 
 /* reads next available bytes
  * return number of bytes read, 0 on EOF or -1 on error,
@@ -77,6 +77,7 @@ again:
 			return -1;
 		}
 	}
+	DBG("tcp_read: read %d bytes:\n%.*s\n", bytes_read, bytes_read, r->pos);
 	
 	r->pos+=bytes_read;
 	return bytes_read;
@@ -133,9 +134,13 @@ int tcp_read_headers(struct tcp_req *r, int fd)
 							  break
 
 
-	
-	bytes=tcp_read(r, fd);
-	if (bytes<=0) return bytes;
+	/* if we still have some unparsed part, parse it first, don't do the read*/
+	if (r->parsed<r->pos){
+		bytes=0;
+	}else{
+		bytes=tcp_read(r, fd);
+		if (bytes<=0) return bytes;
+	}
 	p=r->parsed;
 	
 	while(p<r->pos && r->error==TCP_REQ_OK){
@@ -154,7 +159,7 @@ int tcp_read_headers(struct tcp_req *r, int fd)
 			case H_SKIP:
 				/* find lf, we are in this state if we are not interested
 				 * in anything till end of line*/
-				p=q_memchr(p, '\n', r->pos-r->parsed);
+				p=q_memchr(p, '\n', r->pos-p);
 				if (p){
 					p++;
 					r->state=H_LF;
@@ -172,14 +177,18 @@ int tcp_read_headers(struct tcp_req *r, int fd)
 					case '\n':
 						/* found LF LF */
 						r->state=H_BODY;
+						DBG("tcp_read_headers: switching to H_BODY (lflf)\n");
 						if (r->has_content_len){
 							r->body=p+1;
 							r->bytes_to_go=r->content_len;
 							if (r->bytes_to_go==0){
 								r->complete=1;
+								p++;
 								goto skip;
 							}
 						}else{
+							DBG("tcp_read_headers: ERROR: no clen, p=%X\n",
+									*p);
 							r->error=TCP_REQ_BAD_LEN;
 						}
 						break;
@@ -193,14 +202,18 @@ int tcp_read_headers(struct tcp_req *r, int fd)
 				if (*p=='\n'){
 					/* found LF CR LF */
 					r->state=H_BODY;
+					DBG("tcp_read_headers: switching to H_BODY (lfcrlf)\n");
 					if (r->has_content_len){
 						r->body=p+1;
 						r->bytes_to_go=r->content_len;
 						if (r->bytes_to_go==0){
 							r->complete=1;
+							p++;
 							goto skip;
 						}
 					}else{
+						DBG("tcp_read_headers: ERROR: no clen, p=%X\n",
+									*p);
 						r->error=TCP_REQ_BAD_LEN;
 					}
 				}else r->state=H_SKIP;
@@ -342,17 +355,21 @@ int tcp_read_req(struct tcp_connection* con)
 		resp=CONN_RELEASE;
 		s=con->fd;
 		req=&con->req;
+		size=0;
+again:
 		if(req->complete==0 && req->error==TCP_REQ_OK){
 			bytes=tcp_read_headers(req, s);
 						/* if timeout state=0; goto end__req; */
 			DBG("read= %d bytes, parsed=%d, state=%d, error=%d\n",
-					bytes, req->parsed-req->buf, req->state, req->error );
+					bytes, req->parsed-req->start, req->state, req->error );
+			DBG("tcp_read_req: last char=%X, parsed msg=\n%.*s\n",
+					*(req->parsed-1), req->parsed-req->start, req->start);
 			if (bytes==-1){
 				LOG(L_ERR, "ERROR: tcp_read_req: error reading \n");
 				resp=CONN_ERROR;
 				goto end_req;
 			}
-			if (bytes==0){
+			if ((size==0) && (bytes==0)){
 				DBG( "tcp_read_req: EOF\n");
 				resp=CONN_EOF;
 				goto end_req;
@@ -360,15 +377,21 @@ int tcp_read_req(struct tcp_connection* con)
 		
 		}
 		if (req->error!=TCP_REQ_OK){
-			LOG(L_ERR,"ERROR: tcp_read_req: bad request, state=%d, error=%d\n",
-					req->state, req->error);
+			LOG(L_ERR,"ERROR: tcp_read_req: bad request, state=%d, error=%d "
+					  "buf:\n%.*s\nparsed:\n%.*s\n", req->state, req->error,
+					  req->pos-req->buf, req->buf,
+					  req->parsed-req->start, req->start);
+			DBG("- received from: port %d, ip -", ntohs(con->rcv.src_port));
+			print_ip(&con->rcv.src_ip); DBG("-\n");
 			resp=CONN_ERROR;
 			goto end_req;
 		}
 		if (req->complete){
 			DBG("tcp_read_req: end of header part\n");
+			DBG("- received from: port %d, ip - ", ntohs(con->rcv.src_port));
+			print_ip(&con->rcv.src_ip); DBG("-\n");
 			DBG("tcp_read_req: headers:\n%.*s.\n",
-					req->body-req->buf, req->buf);
+					req->body-req->start, req->start);
 			if (req->has_content_len){
 				DBG("tcp_read_req: content-length= %d\n", req->content_len);
 				DBG("tcp_read_req: body:\n%.*s\n", req->content_len,req->body);
@@ -383,7 +406,7 @@ int tcp_read_req(struct tcp_connection* con)
 			resp=CONN_RELEASE;
 			/* just for debugging use sendipv4 as receiving socket */
 			DBG("calling receive_msg(%p, %d, )\n",
-					req->buf, (int)(req->parsed-req->start));
+					req->start, (int)(req->parsed-req->start));
 			bind_address=sendipv4; /*&tcp_info[con->sock_idx];*/
 			con->rcv.proto_reserved1=con->id; /* copy the id */
 			if (receive_msg(req->start, req->parsed-req->start, &con->rcv)<0){
@@ -404,6 +427,8 @@ int tcp_read_req(struct tcp_connection* con)
 			req->state=H_SKIP_EMPTY;
 			req->complete=req->content_len=req->has_content_len=0;
 			req->bytes_to_go=0;
+			/* if we still have some unparsed bytes, try to  parse them too*/
+			if (size) goto again;
 			
 		}
 		
@@ -496,11 +521,13 @@ void tcp_receive_loop(int unix_sock)
 					LOG(L_ERR, "ERROR: tcp_receive_loop: read_fd:"
 									"no fd read\n");
 					resp=CONN_ERROR;
+					con->bad=1;
 					release_tcpconn(con, resp, unix_sock);
 				}
 				if (con==0){
 					LOG(L_ERR, "ERROR: tcp_receive_loop: null pointer\n");
 					resp=CONN_ERROR;
+					con->bad=1;
 					release_tcpconn(con, resp, unix_sock);
 				}
 				con->timeout=get_ticks()+TCP_CHILD_TIMEOUT;
@@ -524,6 +551,7 @@ void tcp_receive_loop(int unix_sock)
 					if (resp<0){
 						FD_CLR(con->fd, &master_set);
 						tcpconn_listrm(list, con, c_next, c_prev);
+						con->bad=1;
 						release_tcpconn(con, resp, unix_sock);
 					}else{
 						/* update timeout */

+ 2 - 1
tcp_server.h

@@ -34,7 +34,8 @@
 
 /* "public" functions*/
 
-struct tcp_connection* tcpconn_get(int id, struct ip_addr* ip, int port);
+struct tcp_connection* tcpconn_get(int id, struct ip_addr* ip, int port, 
+									int timeout);
 void tcpconn_put(struct tcp_connection* c);
 int tcp_send(char* buf, unsigned len, union sockaddr_union* to, int id);